인식한 상황
잠시 시험 기간으로 프로젝트 진행을 멈추고 다시 돌아왔더니… 여러 API 서버를 고려하여 STOMP를 사용한 채팅 서비스를 구성하던 중, 세션이 유지되지 않는 문제가 발생했다고 전달받았다.
(살려주세요~~)
STOMP를 사용하고 기존 SimpleMessageBroker를 이용하여 채팅을 개발했을 때 스케일 아웃하는 부분에서 채팅이 원활하게 전송되지 않는 문제가 있었다. log를 확인해 보니 멀티 인스턴스로 API Server를 배포했는데 Nginx의 로드밸런싱 기능을 사용할 때 생긴 문제였다.
외부 브로커가 없을 때의 문제점
nginx를 통해 로드 밸런싱으로 유저 1은 1번 API Server와 연결을 하고, 유저 2는 2번 API Server와 연결을 했을 때 1번 유저가 /pub을 하게 되더라도 2번 유저는 받을 수 없었다. 아래 사진은 예시이다.
해결 과정
이 부분에 있어 다음과 같이 생각했다.
각 유저의 서버 연결 정보가 Redis에 저장되어 있고 API Server에서 SEND Command를 받았을 때, Content에서 destination을 보고 Redis에서 검색한 뒤 각 서버로 릴레이 요청 보내면 되지 않을까 생각했다. 하지만 이 방법을 적용하면 서버가 추가로 스케일 아웃될 때 그물망 연결이 될 거 같았다…
이후 검색을 통해 찾아본 결과 영상 하나에서 저와 비슷한 문제를 해결한 것을 확인했다. 저는 영상을 기반을 제 나름대로 정리한 방법과 외부 브로커를 사용할 때 방법을 비교하여 기술을 선택하기로 했다.
Worker Node를 만들 것인가? vs 외부 브로커를 사용할 것인가?
1번 방법을 사용할 때는 추후 API Server에서 받은 Message를 Message Queue로 보내고 Worker Node가 해당 Message를 처리하는 방식이다. 이 방식은 원하는 대로 커스터마이징이 가능하다는 장점이 있지만 따로 Message Queue와 Worker Node, Session을 관리해야 하므로 비용이 상당하다고 판단했다.
2번 방식은 따로 Woker Node를 구성하지 않고 외부 메시지 브로커를 사용하여 Message를 처리하는 방식이다. 해당 방식은 커스터마이징이 불가능한 대신 이미 만들어진 모듈을 사용해서 개발하고 RabbitMQ 1개만 배포하면 되기 때문에 비용이 적을 것이라고 판단했다.
결론적으로, 빠르게 개발하여 버그르 없애야 하는 관점과 실제 서버 비용을 기준으로 2번을 사용하게 되었다. 하지만 추후 사용자가 늘어나고 지원이 더 들어올 수 있으니 “발행” 부분에서 개발할 때 추가로 고려해야 할 점이 생겼다. 추후 서비스가 커졌을 때, 1번으로 바꿔야하는 상황에서 유지보수면에서 어떻게 챙길 것인가…
외부 브로커를 둘 때 위 문제가 어떻게 해결되는지는 여기를 확인하면 됩니다!!
디자인 패턴 도입
ChatService에서 바로 RabbitTemplate을 사용하는 것이 아닌 객체 어댑터 방식으로 다음 그림과 같이 구성했다.
보안은 어떻게 도입할까?
인증과 인가를 거치지 않고 UserID를 같이 받는 Dto를 사용하는 것이 보안이 떨어진다고 생각하여 spring security message를 사용하여 인증과 인가를 구현하려고 했지만, 기존의 MVC와 겹치기도 하고 filter 등이 제대로 작동하지 않는다는 Stack Overflow의 답변을 본 뒤 ChannalInterceptor를 사용하여 인증과 인가를 Spring Security의 Filter와 비슷하게 역할에 나눠서 개발했다.
Authentication Interceptor
@Component
@RequiredArgsConstructor
public class StompAuthenticationInterceptor implements ChannelInterceptor {
private final JwtUtil jwtUtil;
@Override
public Message<?> preSend(
Message<?> message,
MessageChannel channel
) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
if (isNotRequiredAuthentication(accessor)) {
return message;
}
Long userId = null;
try {
String token = HeaderUtil.refineHeader(accessor, Constants.AUTHORIZATION_HEADER, Constants.BEARER_PREFIX)
.orElseThrow(() -> new CommonException(ErrorCode.INVALID_HEADER_ERROR));
Claims claims = jwtUtil.validateToken(token);
userId = claims.get(Constants.USER_ID_CLAIM_NAME, Long.class);
} catch (JsonWebTokenException e) {
throw new StompSecurityException(e.getErrorCode());
} catch (CommonException e) {
throw new StompSecurityException(e.getErrorCode());
}
if (accessor.getCommand().equals(StompCommand.CONNECT)) {
accessor.getSessionAttributes().put(Constants.USER_ID_ATTRIBUTE_NAME, userId);
}
accessor.setNativeHeader(Constants.USER_ID_ATTRIBUTE_NAME, userId.toString());
return MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders());
}
// ...
}
Authorization Interceptor
@Component
@RequiredArgsConstructor
public class StompAuthorizationInterceptor implements ChannelInterceptor {
private final ChatRoomService chatRoomService;
@Override
public Message<?> preSend(
Message<?> message,
MessageChannel channel
) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
if (isNotRequiredAuthorization(accessor)) {
return message;
}
Long userId = extractUserIdFromAccessor(accessor);
Long chatRoomId = extractChatRoomIdFromDestination(Objects.requireNonNull(accessor.getDestination()));
try {
chatRoomService.checkAccessToChatRoom(userId, chatRoomId);
} catch (CommonException e) {
throw new StompSecurityException(ErrorCode.ACCESS_DENIED);
}
if (StompCommand.SUBSCRIBE.equals(accessor.getCommand())) {
Objects.requireNonNull(accessor.getSessionAttributes()).put(Constants.CHAT_ROOM_ID_ATTRIBUTE_NAME, chatRoomId);
}
return MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders());
}
// ...
}
UserId Argument Resolver
@Component
public class StompUserIdArgumentResolver implements HandlerMethodArgumentResolver {
@Override
public boolean supportsParameter(MethodParameter parameter) {
return parameter.getParameterType().equals(Long.class)
&& parameter.hasParameterAnnotation(UserId.class);
}
@Override
public Object resolveArgument(
MethodParameter parameter,
Message<?> message
) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
String userIdStr = accessor.getFirstNativeHeader(Constants.USER_ID_ATTRIBUTE_NAME);
if (userIdStr == null) {
throw new CommonException(ErrorCode.INVALID_HEADER_ERROR);
}
return Long.parseLong(userIdStr);
}
}
Exception이 발생한다면?
Stomp 전송 뒤 Exception이 발생했을 때, 두 가지 방안을 생각하였다.
- 사용자의 개인 Channal에 에러 메세지를 발행한다.
- 해당 채팅방 Channal에 에러 메세지를 발행한다.
개인이 보낸 메세지의 에러를 다른이가 볼 수 있게 하니 Message에 누가 받는지에 대한 정보도 들어가야하고 보안이 걱정되었기에 개인이 보낸 메세지를 에러(유효성 검사 예외, 처리 중 예외)가 발생해 실패했다는 정보를 보내는 것은 옳지 않다는 판단을 하게 되었다.
또한, MessageExceptionHandler
를 이용하여 처리 중 발생한 에러에 대해서 AOP를 적용하였다.
@Slf4j
@RestControllerAdvice
@RequiredArgsConstructor
public class GlobalExceptionHandler {
private final RabbitTemplate rabbitTemplate;
/**
* Stomp 전용 인증/인가 이후 Controller, Service, Repository 에서 발생하는 CommonException 예외 처리
*
* @param clientMessage 클라이언트 메시지
* @param e 사용자 실수 예외
*/
@MessageExceptionHandler(CommonException.class)
public void handleCommonException(
Message<byte[]> clientMessage,
CommonException e
) {
log.error("GlobalExceptionHandler catch CommonException In Stomp Processing : {}", e.getMessage());
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(clientMessage);
Long userId = extractUserIdFromAccess(accessor);
ChatPublishDto publishDto = ChatPublishDto.builder()
.id("-1")
.type("ERROR")
.senderId(-1L)
.data(
Map.of(
"code", e.getErrorCode().getCode(),
"message", e.getErrorCode().getMessage()
)
)
.createdAt(DateTimeUtil.convertLocalDateTimeToStringWithTime(LocalDateTime.now()))
.build();
rabbitTemplate.convertAndSend(
Constants.CHAT_EXCHANGE_NAME,
String.format("users.%d.lobby", userId),
publishDto
);
}
// ...
}
결과
이후 Nginx의 Log를 보니 성공적으로 어떤 API 서버로 publish를 하든 외부 브로커를 사용했기에 채팅이 유지되었다. 또한, 디자인패턴을 적용함으로써 유지보수와 확장성을 높일 수 있었다.