알림 서비스에 대해서 적용됨
알림 서비스는 타 서비스에게 kafka 메세지를 받아 처리되기에 consumer 만 처리될 예정
@EnableKafka
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConsumerFactory<String, NotificationRequestDto> notificationConsumerFactory() {
Map<String, Object> props = new HashMap<>();
// Kafka 브로커 주소 설정
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// Consumer 그룹 ID 설정
// 같은 그룹의 consumer들은 토픽의 파티션을 분배하여 메시지를 소비
props.put(ConsumerConfig.GROUP_ID_CONFIG, "notification-consumer");
// 메시지 키의 역직렬화 설정 (String 타입)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 메시지 값의 역직렬화 설정 (JSON -> KafkaFeedRequestDto)
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
// JSON을 자바 객체로 변환할 때 신뢰할 패키지 설정 ("*"는 모든 패키지 허용)
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
//props.put(JsonDeserializer.TYPE_MAPPINGS, "lookids.commentread.comment.adaptor.in.kafka.vo.CommentEventVo");
// Consumer Factory 생성
// StringDeserializer: 키를 String으로 역직렬화
// ErrorHandlingDeserializer: 역직렬화 실패 시 에러 처리
// JsonDeserializer: JSON을 KafkaFeedRequestDto로 변환
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
new ErrorHandlingDeserializer<>(new JsonDeserializer<>(NotificationRequestDto.class, false)));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, NotificationRequestDto> notificationEventListenerContainerFactory() {
// @KafkaListener 어노테이션이 사용할 Factory 설정
ConcurrentKafkaListenerContainerFactory<String, NotificationRequestDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(notificationConsumerFactory());
return factory;
}
}
@Service
@RequiredArgsConstructor
@Slf4j
public class NotificationKafkaListener {
private final NotificationRepository notificationRepository;
private final KafkaTemplate<String, NotificationRequestDto> kafkaTemplate;
@KafkaListener(topics = "feed-create-join-subscribe"
, groupId = "notification-consumer"
, containerFactory = "notificationEventListenerContainerFactory")
public void consumeNotificationEvent(NotificationRequestDto notificationRequestDto) {
log.info("consumeNotificationEvent: {}", notificationRequestDto.getType().toUpperCase());
log.info("consumeNotificationEvent: {}", notificationRequestDto.getContent());
// 알람 저장
Notification notification = Notification.builder()
.senderUuid(notificationRequestDto.getSenderUuid())
.receiverUuidList(notificationRequestDto.getReceiverUuidList())
.content(notificationRequestDto.getContent())
.type(NotificationType.valueOf(notificationRequestDto.getType().toUpperCase()))
.createdAt(LocalDateTime.now())
.build();
notificationRepository.save(notification);
}
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, NotificationRequestDto> notificationEventListenerContainerFactory() {
// @KafkaListener 어노테이션이 사용할 Factory 설정
ConcurrentKafkaListenerContainerFactory<String, NotificationRequestDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(notificationConsumerFactory());
return factory;
}
@KafkaListener(topics = "feed-create-join-subscribe"
, groupId = "notification-consumer"
, containerFactory = "notificationEventListenerContainerFactory")
public void consumeNotificationEvent(NotificationRequestDto notificationRequestDto) {
log.info("consumeNotificationEvent: {}", notificationRequestDto.getType().toUpperCase());
log.info("consumeNotificationEvent: {}", notificationRequestDto.getContent());
// 알람 저장
Notification notification = Notification.builder()
.senderUuid(notificationRequestDto.getSenderUuid())
.receiverUuidList(notificationRequestDto.getReceiverUuidList())
.content(notificationRequestDto.getContent())
.type(NotificationType.valueOf(notificationRequestDto.getType().toUpperCase()))
.createdAt(LocalDateTime.now())
.build();
notificationRepository.save(notification);
}
← 기존에는 consumeNotificationEvent 의 파라미터가 NotificationRequestDto가 아닌, NotificationKafkaRequestDto 이었기에 containerFactory 측과의 타입이 맞지 않아서 발생했던 오류