Getter
@NoArgsConstructor
public class KafkaFeedRequestDto {
	private String senderUuid;
	private String receiverUuid;
	private List<String> mediaUrlList; // 첫 이미지만 알림 서비스로 전송
	private String feedContent;

	@Builder
	public KafkaFeedRequestDto(
		String senderUuid,
		String receiverUuid,
		List<String> mediaUrlList,
		String feedContent
	) {
		this.senderUuid = senderUuid;
		this.receiverUuid = receiverUuid;
		this.mediaUrlList = mediaUrlList;
		this.feedContent = feedContent;
	}
}

@Getter
@NoArgsConstructor
public class KafkaAlarmRequestDto {
	private String senderUuid;
	private String receiverUuid;
	private String feedContent;
	private String mediaUrl;
	private String type;

	@Builder
	public KafkaAlarmRequestDto(
		String senderUuid,
		String receiverUuid,
		String feedContent,
		String mediaUrl,
		String type
	) {
		this.senderUuid = senderUuid;
		this.receiverUuid = receiverUuid;
		this.feedContent = feedContent;
		this.mediaUrl = mediaUrl;
		this.type = type;
	}
}
version: '3.8'
services:
  kafka-1:
    image: apache/kafka
    hostname: kafka-1
    container_name: kafka-1
    ports:
      - 29092:9092
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093'
      KAFKA_LISTENERS: 'PLAINTEXT://kafka-1:19092,CONTROLLER://kafka-1:9093,PLAINTEXT_HOST://kafka-1:9092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka-1:19092,PLAINTEXT_HOST://kafka-1:9092'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'

  kafka-2:
    image: apache/kafka
    hostname: kafka-2
    container_name: kafka-2
    ports:
      - 39092:9092
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093'
      KAFKA_LISTENERS: 'PLAINTEXT://kafka-2:19092,CONTROLLER://kafka-2:9093,PLAINTEXT_HOST://kafka-2:9092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka-2:19092,PLAINTEXT_HOST://kafka-2:9092'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'

  kafka-3:
    image: apache/kafka
    hostname: kafka-3
    container_name: kafka-3
    ports:
      - 49092:9092
    environment:
      KAFKA_NODE_ID: 3
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093'
      KAFKA_LISTENERS: 'PLAINTEXT://kafka-3:19092,CONTROLLER://kafka-3:9093,PLAINTEXT_HOST://kafka-3:9092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka-3:19092,PLAINTEXT_HOST://kafka-3:9092'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'

  kafka-connect:
    image: confluentinc/cp-kafka-connect:7.2.1  # Confluent 플랫폼의 Kafka Connect 이미지
		user: root                                   # root 권한으로 실행
		hostname: kafka-connect                      # 컨테이너의 호스트 이름
		ports: - "8083:8083"                        # REST API 접근용 포트

		# MongoDB 클러스터 연결을 위한 호스트 매핑
		extra_hosts:
		  - "unioncluster-shard-00-00.wudxj.mongodb.net:3.37.91.61"
		  - "unioncluster-shard-00-01.wudxj.mongodb.net:3.37.220.42"
		  - "unioncluster-shard-00-02.wudxj.mongodb.net:3.35.150.151"

		environment:
		 # Kafka 브로커 연결 정보, 원래 9092로 작성되어있으나 29092, 39092, 49092로 변경되어야 작동
	  CONNECT_BOOTSTRAP_SERVERS: "kafka-1:9092,kafka-2:9092,kafka-3:9092"

		# Kafka Connect 자체 설정
		CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"  # REST API 호스트 이름
		CONNECT_GROUP_ID: "kafka-connect-cluster"           # Connect 워커 그룹 ID

		# Kafka Connect 내부 토픽 설정
		CONNECT_CONFIG_STORAGE_TOPIC: "connect-configs"     # 커넥터 설정 저장 토픽
		CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets"     # 처리된 데이터 오프셋 저장
		CONNECT_STATUS_STORAGE_TOPIC: "connect-status"      # 커넥터 상태 정보 저장

		# 복제 팩터 설정 (각 토픽의 복제본 수)
		CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
		CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
		CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3

		# 데이터 변환 설정
		CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"           # 키를 JSON으로 변환
		CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"         # 값을 JSON으로 변환
		CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"  # 내부 키 변환
		CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"# 내부 값 변환

		# 플러그인 경로 설정
		CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"

		# DNS 설정 (Google DNS 서버 사용)
		dns:
		  - 8.8.8.8
		  - 8.8.4.4

		# 볼륨 마운트 (로컬의 plugins 디렉토리를 컨테이너에 마운트)
		volumes:
		  - ./plugins:/usr/share/confluent-hub-components
		  
	kafka-ui:
    image: provectuslabs/kafka-ui        # Kafka 관리용 UI 도구 이미지
		container_name: kafka-ui             # 컨테이너 이름
		ports:
		  - "20000:8080"                     # UI 접근용 포트 (외부:내부)
		restart: always                      # 컨테이너 자동 재시작 설정
		
		depends_on:
		  - kafka-3                          # kafka-3 서비스가 시작된 후에 실행
		
		# 환경 변수 설정
		environment:
		  - KAFKA_CLUSTERS_0_NAME=kafka-ui   # UI에 표시될 클러스터 이름
		  - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092  # 연결할 브로커 주소들

→ Kafka 브로커 클러스터 (kafka-1, kafka-2, kafka-3) (브로커: Kafka의 서버 단위, 메시지를 저장하고, 전달하는 하나의 프로세스/서버)

(클러스터: 여러 브로커를 하나의 그룹으로 묶은 것)

ex: 은행으로 비유

→ 각 브로커의 environment

→ kafka-connect

// Spring 코드
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class)
// Spring 코드는 Kafka에서 애플리케이션으로 데이터를 가져올 때의 변환을 담당
# 외부에서 접근할 때 사용할 호스트명
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"  # REST API 호스트 이름
# Connect 워커들의 그룹 식별자
CONNECT_GROUP_ID: "kafka-connect-cluster"           # Connect 워커 그룹 ID
ports:
		  - "20000:8080"                     # UI 접근용 포트 (외부:내부)
# Kafka UI: localhost:20000 접속
# Spring 애플리케이션: localhost:8081 접속
# 각각 독립적으로 실행되는 서비스이므로 포트가 겹치지만 않으면 됨

→ kafka-ui


@EnableKafka
@Configuration
public class KafkaConfig {

	@Value("${spring.kafka.bootstrap-servers}")
	private String bootstrapServers;

	@Bean
	public ConsumerFactory<String, KafkaFeedRequestDto> feedConsumerFactory() {
		Map<String, Object> props = new HashMap<>();
		// Kafka 브로커 주소 설정
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		// Consumer 그룹 ID 설정
		// 같은 그룹의 consumer들은 토픽의 파티션을 분배하여 메시지를 소비
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "feed-join-subscribe");
		// 메시지 키의 역직렬화 설정 (String 타입)
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		// 메시지 값의 역직렬화 설정 (JSON -> KafkaFeedRequestDto)
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
		// JSON을 자바 객체로 변환할 때 신뢰할 패키지 설정 ("*"는 모든 패키지 허용)
		props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
		//props.put(JsonDeserializer.TYPE_MAPPINGS, "lookids.commentread.comment.adaptor.in.kafka.vo.CommentEventVo");

		return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
			new ErrorHandlingDeserializer<>(new JsonDeserializer<>(KafkaFeedRequestDto.class, false)));
	}
	
	// Consumer Factory 생성
	// StringDeserializer: 키를 String으로 역직렬화
	// ErrorHandlingDeserializer: 역직렬화 실패 시 에러 처리
	// JsonDeserializer: JSON을 KafkaFeedRequestDto로 변환
	@Bean
	public ConcurrentKafkaListenerContainerFactory<String, KafkaFeedRequestDto> feedEventListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory<String, KafkaFeedRequestDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(feedConsumerFactory());
		return factory;
	}

}