Getter
@NoArgsConstructor
public class KafkaFeedRequestDto {
private String senderUuid;
private String receiverUuid;
private List<String> mediaUrlList; // 첫 이미지만 알림 서비스로 전송
private String feedContent;
@Builder
public KafkaFeedRequestDto(
String senderUuid,
String receiverUuid,
List<String> mediaUrlList,
String feedContent
) {
this.senderUuid = senderUuid;
this.receiverUuid = receiverUuid;
this.mediaUrlList = mediaUrlList;
this.feedContent = feedContent;
}
}
@Getter
@NoArgsConstructor
public class KafkaAlarmRequestDto {
private String senderUuid;
private String receiverUuid;
private String feedContent;
private String mediaUrl;
private String type;
@Builder
public KafkaAlarmRequestDto(
String senderUuid,
String receiverUuid,
String feedContent,
String mediaUrl,
String type
) {
this.senderUuid = senderUuid;
this.receiverUuid = receiverUuid;
this.feedContent = feedContent;
this.mediaUrl = mediaUrl;
this.type = type;
}
}
위와 같은 kafka를 통해 받고 보낼 데이터에 대한 dto 정의
kafka 실행을 위해 docker-compose.yml 작성
version: '3.8'
services:
kafka-1:
image: apache/kafka
hostname: kafka-1
container_name: kafka-1
ports:
- 29092:9092
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093'
KAFKA_LISTENERS: 'PLAINTEXT://kafka-1:19092,CONTROLLER://kafka-1:9093,PLAINTEXT_HOST://kafka-1:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka-1:19092,PLAINTEXT_HOST://kafka-1:9092'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
kafka-2:
image: apache/kafka
hostname: kafka-2
container_name: kafka-2
ports:
- 39092:9092
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093'
KAFKA_LISTENERS: 'PLAINTEXT://kafka-2:19092,CONTROLLER://kafka-2:9093,PLAINTEXT_HOST://kafka-2:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka-2:19092,PLAINTEXT_HOST://kafka-2:9092'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
kafka-3:
image: apache/kafka
hostname: kafka-3
container_name: kafka-3
ports:
- 49092:9092
environment:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093'
KAFKA_LISTENERS: 'PLAINTEXT://kafka-3:19092,CONTROLLER://kafka-3:9093,PLAINTEXT_HOST://kafka-3:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka-3:19092,PLAINTEXT_HOST://kafka-3:9092'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
kafka-connect:
image: confluentinc/cp-kafka-connect:7.2.1 # Confluent 플랫폼의 Kafka Connect 이미지
user: root # root 권한으로 실행
hostname: kafka-connect # 컨테이너의 호스트 이름
ports: - "8083:8083" # REST API 접근용 포트
# MongoDB 클러스터 연결을 위한 호스트 매핑
extra_hosts:
- "unioncluster-shard-00-00.wudxj.mongodb.net:3.37.91.61"
- "unioncluster-shard-00-01.wudxj.mongodb.net:3.37.220.42"
- "unioncluster-shard-00-02.wudxj.mongodb.net:3.35.150.151"
environment:
# Kafka 브로커 연결 정보, 원래 9092로 작성되어있으나 29092, 39092, 49092로 변경되어야 작동
CONNECT_BOOTSTRAP_SERVERS: "kafka-1:9092,kafka-2:9092,kafka-3:9092"
# Kafka Connect 자체 설정
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect" # REST API 호스트 이름
CONNECT_GROUP_ID: "kafka-connect-cluster" # Connect 워커 그룹 ID
# Kafka Connect 내부 토픽 설정
CONNECT_CONFIG_STORAGE_TOPIC: "connect-configs" # 커넥터 설정 저장 토픽
CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets" # 처리된 데이터 오프셋 저장
CONNECT_STATUS_STORAGE_TOPIC: "connect-status" # 커넥터 상태 정보 저장
# 복제 팩터 설정 (각 토픽의 복제본 수)
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
# 데이터 변환 설정
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" # 키를 JSON으로 변환
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" # 값을 JSON으로 변환
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" # 내부 키 변환
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"# 내부 값 변환
# 플러그인 경로 설정
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
# DNS 설정 (Google DNS 서버 사용)
dns:
- 8.8.8.8
- 8.8.4.4
# 볼륨 마운트 (로컬의 plugins 디렉토리를 컨테이너에 마운트)
volumes:
- ./plugins:/usr/share/confluent-hub-components
kafka-ui:
image: provectuslabs/kafka-ui # Kafka 관리용 UI 도구 이미지
container_name: kafka-ui # 컨테이너 이름
ports:
- "20000:8080" # UI 접근용 포트 (외부:내부)
restart: always # 컨테이너 자동 재시작 설정
depends_on:
- kafka-3 # kafka-3 서비스가 시작된 후에 실행
# 환경 변수 설정
environment:
- KAFKA_CLUSTERS_0_NAME=kafka-ui # UI에 표시될 클러스터 이름
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 # 연결할 브로커 주소들
→ Kafka 브로커 클러스터 (kafka-1, kafka-2, kafka-3) (브로커: Kafka의 서버 단위, 메시지를 저장하고, 전달하는 하나의 프로세스/서버)
(클러스터: 여러 브로커를 하나의 그룹으로 묶은 것)
ex: 은행으로 비유
→ 각 브로커의 environment
→ kafka-connect
// Spring 코드
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class)
// Spring 코드는 Kafka에서 애플리케이션으로 데이터를 가져올 때의 변환을 담당
extra_hosts
에 대해서는 실제 사용하는 MongoDB 클러스터의 정보로 변경
(MongoDB Atlas를 사용하는 경우, Atlas에서 제공하는 connection string에서 호스트명)
# 외부에서 접근할 때 사용할 호스트명
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect" # REST API 호스트 이름
# Connect 워커들의 그룹 식별자
CONNECT_GROUP_ID: "kafka-connect-cluster" # Connect 워커 그룹 ID
ports:
- "20000:8080" # UI 접근용 포트 (외부:내부)
# Kafka UI: localhost:20000 접속
# Spring 애플리케이션: localhost:8081 접속
# 각각 독립적으로 실행되는 서비스이므로 포트가 겹치지만 않으면 됨
→ kafka-ui
kafka 클러스터를 모니터링 하고 관리하기 위한 웹 인터페이스
Feed service에서 값을 받기 위해 kafkaConfig 작성
@EnableKafka
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConsumerFactory<String, KafkaFeedRequestDto> feedConsumerFactory() {
Map<String, Object> props = new HashMap<>();
// Kafka 브로커 주소 설정
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// Consumer 그룹 ID 설정
// 같은 그룹의 consumer들은 토픽의 파티션을 분배하여 메시지를 소비
props.put(ConsumerConfig.GROUP_ID_CONFIG, "feed-join-subscribe");
// 메시지 키의 역직렬화 설정 (String 타입)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 메시지 값의 역직렬화 설정 (JSON -> KafkaFeedRequestDto)
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
// JSON을 자바 객체로 변환할 때 신뢰할 패키지 설정 ("*"는 모든 패키지 허용)
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
//props.put(JsonDeserializer.TYPE_MAPPINGS, "lookids.commentread.comment.adaptor.in.kafka.vo.CommentEventVo");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
new ErrorHandlingDeserializer<>(new JsonDeserializer<>(KafkaFeedRequestDto.class, false)));
}
// Consumer Factory 생성
// StringDeserializer: 키를 String으로 역직렬화
// ErrorHandlingDeserializer: 역직렬화 실패 시 에러 처리
// JsonDeserializer: JSON을 KafkaFeedRequestDto로 변환
@Bean
public ConcurrentKafkaListenerContainerFactory<String, KafkaFeedRequestDto> feedEventListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, KafkaFeedRequestDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(feedConsumerFactory());
return factory;
}
}