@EnableKafka
@Configuration
public class KafkaConfig {
	@Value("${spring.kafka.bootstrap-servers}")
	private String bootstrapServers;

	@Bean
	public ConsumerFactory<String, NotificationRequestDto> notificationConsumerFactory() {
		Map<String, Object> props = new HashMap<>();
		// Kafka 브로커 주소 설정
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		// Consumer 그룹 ID 설정
		// 같은 그룹의 consumer들은 토픽의 파티션을 분배하여 메시지를 소비
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "notification-consumer");
		// 메시지 키의 역직렬화 설정 (String 타입)
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		// 메시지 값의 역직렬화 설정 (JSON -> KafkaFeedRequestDto)
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
		// JSON을 자바 객체로 변환할 때 신뢰할 패키지 설정 ("*"는 모든 패키지 허용)
		props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
		//props.put(JsonDeserializer.TYPE_MAPPINGS, "lookids.commentread.comment.adaptor.in.kafka.vo.CommentEventVo");

		// Consumer Factory 생성
		// StringDeserializer: 키를 String으로 역직렬화
		// ErrorHandlingDeserializer: 역직렬화 실패 시 에러 처리
		// JsonDeserializer: JSON을 KafkaFeedRequestDto로 변환
		return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
			new ErrorHandlingDeserializer<>(new JsonDeserializer<>(NotificationRequestDto.class, false)));
	}

	@Bean
	public ConcurrentKafkaListenerContainerFactory<String, NotificationRequestDto> notificationEventListenerContainerFactory() {
		// @KafkaListener 어노테이션이 사용할 Factory 설정
		ConcurrentKafkaListenerContainerFactory<String, NotificationRequestDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(notificationConsumerFactory());
		return factory;
	}
}

@Service
@RequiredArgsConstructor
@Slf4j
public class NotificationKafkaListener {
	private final NotificationRepository notificationRepository;
	private final KafkaTemplate<String, NotificationRequestDto> kafkaTemplate;

	@KafkaListener(topics = "feed-create-join-subscribe"
		, groupId = "notification-consumer"
		, containerFactory = "notificationEventListenerContainerFactory")
	public void consumeNotificationEvent(NotificationRequestDto notificationRequestDto) {
		log.info("consumeNotificationEvent: {}", notificationRequestDto.getType().toUpperCase());
		log.info("consumeNotificationEvent: {}", notificationRequestDto.getContent());
		// 알람 저장
		Notification notification = Notification.builder()
			.senderUuid(notificationRequestDto.getSenderUuid())
			.receiverUuidList(notificationRequestDto.getReceiverUuidList())
			.content(notificationRequestDto.getContent())
			.type(NotificationType.valueOf(notificationRequestDto.getType().toUpperCase()))
			.createdAt(LocalDateTime.now())
			.build();

		notificationRepository.save(notification);

	}
}
@Bean
	public ConcurrentKafkaListenerContainerFactory<String, NotificationRequestDto> notificationEventListenerContainerFactory() {
		// @KafkaListener 어노테이션이 사용할 Factory 설정
		ConcurrentKafkaListenerContainerFactory<String, NotificationRequestDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(notificationConsumerFactory());
		return factory;
	}
@KafkaListener(topics = "feed-create-join-subscribe"
		, groupId = "notification-consumer"
		, containerFactory = "notificationEventListenerContainerFactory")
public void consumeNotificationEvent(NotificationRequestDto notificationRequestDto) {
		log.info("consumeNotificationEvent: {}", notificationRequestDto.getType().toUpperCase());
		log.info("consumeNotificationEvent: {}", notificationRequestDto.getContent());
		// 알람 저장
		Notification notification = Notification.builder()
			.senderUuid(notificationRequestDto.getSenderUuid())
			.receiverUuidList(notificationRequestDto.getReceiverUuidList())
			.content(notificationRequestDto.getContent())
			.type(NotificationType.valueOf(notificationRequestDto.getType().toUpperCase()))
			.createdAt(LocalDateTime.now())
			.build();

		notificationRepository.save(notification);

	}

← 기존에는 consumeNotificationEvent 의 파라미터가 NotificationRequestDto가 아닌, NotificationKafkaRequestDto 이었기에 containerFactory 측과의 타입이 맞지 않아서 발생했던 오류

스크린샷 2024-11-27 오후 10.28.58.png

스크린샷 2024-11-27 오후 10.29.18.png