SSE 는 컨슈머 측에서 토픽에 대한 데이터를 구독할 때를 기준으로 send 처리
public void registerClient(String userInfo, SseEmitter emitter) {
clients.put(userInfo, emitter);
log.info("Registered client for user: {}", userInfo);
log.debug("Current clients: {}", clients.keySet());
emitter.onCompletion(() -> {
clients.remove(userInfo);
log.info("Client disconnected: {}", userInfo);
});
emitter.onTimeout(() -> clients.remove(userInfo));
emitter.onError((e) -> clients.remove(userInfo));
}
public void sendFeedNotification(String userInfo, Notification notification) {
SseEmitter emitter = clients.get(userInfo);
if (emitter != null) {
try {
emitter.send(SseEmitter.event()
.name("feed-notification")
.data(notification));
log.info("sse success send event");
} catch (Exception e) {
clients.remove(userInfo);
log.error("Error while sending feed notification: {}", e.getMessage());
}
}
}
ConcurrentHashMap<>()
: 다중 스레드 환경에서 스레드 간 충돌 없이 안전하게 데이터를 읽고, 쓰고, 삭제할 수 있는 Map 구현체clients.remove(userInfo)
**로 제거// text/event-stream을 설정하여 서버가 클라이언트로 SSE 스트림을 전송함을 알림
@GetMapping(value = "/feed/{userInfo}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter feedNotification(@PathVariable String userInfo) {
log.info("Received userInfo: {}", userInfo);
SseEmitter emitter = new SseEmitter(0L); // 클라이언트와 서버 간의 지속적인 연결을 관리하는 객체, 기본적으로 연결은 클라이언트가 끊을 때까지 열려 있음
notificationKafkaListener.registerClient(userInfo, emitter); // 클라이언트 등록
return emitter;
}
new SseEmitter(0L);
를 통하여 서버와 클라이언트 무제한 연결 ← 알림을 받기 위해서는 계속 연결이 필요하기 때문FCM 처리를 위해 의존성 주입 후 config class 설정
@Configuration
public class FirebaseConfig {
@Bean
public FirebaseMessaging firebaseMessaging() throws IOException{
FileInputStream notificationAccount = new FileInputStream(
"src/main/resources/??????????????????????????????????.json");
// GoogleCredentials.fromStream()을 사용하여 Firebase의 인증을 처리, json 파일을 읽고, 이를 통해 Firebase 인증을 구성
GoogleCredentials credentials = GoogleCredentials.fromStream(notificationAccount);
FirebaseOptions options = FirebaseOptions.builder()
.setCredentials(credentials)
.build();
FirebaseApp app = FirebaseApp.initializeApp(options);
return FirebaseMessaging.getInstance(app);
}
}
src/main/resources
디렉토리에 위치FirebaseOptions.builder()
: Firebase를 초기화하기 위한 옵션을 설정
(평소 사용하던 Lombok 의 Builder X )public void sendPushFeedNotification(String token, Notification notification) {
// com.google.firebase.messaging.Notification 은 Builder 클래스에 대한 public 접근을 허용하지 않지만, Notification.builder()라는 정적 메서드를 제공
com.google.firebase.messaging.Notification fcmNotification =
com.google.firebase.messaging.Notification.builder() // builder() 메서드 사용
.setTitle(notification.getTitle()) // 제목 설정
.setBody(notification.getContent()) // 내용 설정
.build(); // 빌드하여 Notification 객체 생성
Message message = Message.builder()
.setToken(token)
.setNotification(fcmNotification)
.build();
try {
String response = firebaseMessaging.send(message);
log.info("FCM Notification sent: {}", response);
} catch (FirebaseMessagingException e) {
log.error("FCM Notification Error: {}", e.getMessage());
}
}
sendPushFeedNotification(receiverUuid, notification);
현재 이러한 인자값으로 처리되어 있는데, 유저 정보가 아닌 토큰으로 변경 필요 ← sse 연결과 동시에 토큰 받기 처리할 예정Notification
객체를 생성builder()
메서드는 Notification
객체를 빌드하는 데 사용 ( lombok의 builder X )setNotification(fcmNotification)
: 알림 데이터를 메세지에 추가send()
메서드를 호출하여 메시지를 전송 sse 실행 결과 테스트
sse & fcm 결과 테스트, fcm 경우는 토큰값 처리에 대한 변경 필요