본문 바로가기

콘서트 티켓 프로젝트

Websocket과 Redis Pub/Sub을 활용한 대기열->활성화열 전환 알림 시스템

1. 개요

- 콘서트 티켓 프로젝트에서는 부하 조절을 위해 콘서트 대기열 기능을 제공합니다.

  대기열에 있는 사용자는 주기적으로(예: 10초마다) 활성화열로 이동하며, 이때 토큰 활성화 알림을 받습니다.

  사용자는 토큰 활성화 알림을 통해 티켓 예약 가능 여부를 확인하고, 예약을 진행할 수 있습니다.


2. 기술 스택

- 이 시스템은 WebSocket과 Redis Pub/Sub을 활용하여 구현되었습니다.


3. 동작 방식

  1. 사용자 대기열 등록
    • 사용자가 콘서트 티켓을 예매하려 하면 대기열에 등록됩니다.
  2. 주기적 활성화열 이동
    • 일정 주기(예: 10초)마다 일부 사용자가 대기열에서 활성화열로 이동합니다.
    • 활성화열의 최대 인원은 5천명입니다.
  3. WebSocket을 통한 알림 전송
    • Redis Pub/Sub을 이용하여 활성화된 사용자 목록을 발송하고,
      WebSocket을 통해 구독한 후, 해당 사용자에게 토큰 활성화 알림을 보냅니다.
  4. 사용자의 티켓 예약 진행
    • 토큰 활성화 알림을 받은 사용자는 콘서트 예약을 진행할 수 있습니다.

이 글은 이러한 WebSocket과 Redis Pub/Sub을 활용한 대기열 -> 활성화열 전환 알림 시스템의 개발 과정을
다룹니다.

 

4. 구현 상세 

 

(1) 외부 Cron Job을 통한 대기열 ->활성화열 전환 요청

->  대기열->활성화열 전환이 여러 서버에서 중복으로 실행되는 것을 방지하기 위해,

     Nginx와 외부 Cron Job(ex) Jenkins)을 활용하여 대기열->활성화열 전환 API 요청이 수행됩니다.     

 

(1-1) nginx.conf

location /api/v1/waitingQueue/migration {
       proxy_pass http://34.64.102.105:8080;  # 특정 서버로 고정
       proxy_set_header Host $host;
       proxy_set_header X-Real-IP $remote_addr;
       proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
       proxy_set_header X-Forwarded-Proto $scheme;
}

 

 

(1-2) POST API

@PostMapping("/api/v1/waitingQueue/migration")
public ResponseEntity<Boolean> processWaitingQueueMigration() {
      try {
          waitingQueueMigrationApplicationService.migrateFromWaitingToActiveQueue();
          return ResponseEntity.ok(true); // 성공 시 true 반환
      } catch (Exception e) {
          log.error("대기열 처리 중 오류 발생: " + e.getMessage(), e);
          return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(false); // 실패 시 false 반환
      }
}

 

 

(2) 대기열 활성화 여부 확인

- 콘서트 티켓 프로젝트에서 대기열은 트래픽 임계치에 따라 활성화/비활성화가 결정됩니다

  따라서 우선적으로 대기열이 활성화되어 있는지 확인한 후에, 

  활성화되어 있다면 실제 요청을 보내게 됩니다. 

@Component
@Slf4j
@RequiredArgsConstructor
public class WaitingQueueMigrationApplicationService {

  private final WaitingQueueService waitingQueueService;

  public void migrateFromWaitingToActiveQueue() {
    String currentStatus = waitingQueueService.getWaitingQueueStatus();
    if(WaitingQueueStatus.ACTIVE.equals(currentStatus)) {
      waitingQueueService.migrateFromWaitingToActiveQueue();
    }
  }
}

 

 

(3) 대기열->활성화열 전환

private static final long MAX_ACTIVE_QUEUE_SIZE = 5000L;
private static final long MAX_TRANSFER_COUNT = 250L;

public void migrateFromWaitingToActiveQueue() {

      long activeQueueSize = activeQueueDAO.getActiveQueueSize();
      long transferCount = Math.min(MAX_ACTIVE_QUEUE_SIZE - activeQueueSize, MAX_TRANSFER_COUNT);

      if(transferCount == 0){
          return;
      }

      Collection<WaitingDTO> tokenList = waitingQueueDAO.getAllWaitingTokens(transferCount);

      if (tokenList.isEmpty()) {
          return;
      }

      activeQueueDAO.migrateTokensFromWaitingQueueToActiveQueue(tokenList);

      // 토큰 목록을 Redis Pub/Sub을 통해 발행한다
      // 발행한 토큰 목록은 WebSocket 서버에서 Redis Pub/Sub을 통해 구독한다
      // 토큰 Pub/Sub의 목적은, 토큰이 활성화되었을 때, 웹소켓 클라이언트에게 알림을 주기 위함이다.
      tokenPublisher.publishAllActiveTokens(tokenList);

      // 토큰 목록을 대기열에서 삭제처리한다
      waitingQueueDAO.deleteWaitingQueueTokens(tokenList);

      Collection<WaitingDTO> waitingTokenList = waitingQueueDAO.getAllWaitingTokensWithRank();
      tokenPublisher.publishAllWaitingTokens(waitingTokenList);
 }

 

public void migrateTokensFromWaitingQueueToActiveQueue(Collection<WaitingDTO> tokens) {
     RBatch batch = redisson.createBatch();

     for (WaitingDTO waitingDTO : tokens) {
         String uuid = waitingDTO.getUuid();
         String token = waitingDTO.getToken();
         // 활성화 큐에 추가
         batch.getMapCache(RedisKey.ACTIVE_QUEUE).putIfAbsentAsync(uuid, token, 300, TimeUnit.SECONDS);
         // 대기열에서 삭제
         batch.getMapCache(RedisKey.WAITING_QUEUE).removeAsync(token);
     }
     // 원자적으로 실행
     batch.execute();
 }

 

- 대기열 -> 활성화열 전환을 할 때 중요하게 고려한 점은 다음과 같습니다. 

 

(3-1) 활성화열 최대 크기 관리

- 활성화열의 최대 크기를 5000으로 관리하여, 트래픽 부하를 제어하였습니다.  

 

(3-2) 대기열 -> 활성화열 전환 시 트랜잭션 관리

- 서버 다운 등으로 인한 데이터 부정합을 방지하기 위해, Redisson 라이브러리의 RBatch를 활용하였습니다.

  즉, 대기열 → 활성화열 전환을 배치 방식으로 처리하여 하나의 트랜잭션으로 동작하도록 구현하였습니다.

 

(3-3) Redis Pub/Sub을 통한 발송

- Redis Pub/Sub으로 활성화된 토큰 목록을 발행하여, Websocket 서버에서 구독할 수 있도록 하였습니다. 

 

 


(4) 웹소켓 서버에서의 구독 및 웹소켓 클라이언트로 발송

  // 활성화된 토큰 정보를 Redis Pub/Sub 채널을 통해 수신하는 기능
  // 웹소켓 서버가 이중화 되었음을 고려해, Redis 분산락을 통해서 1번만 실행되도록 보장합니다
  // 사용자의 Websocket 클라이언트에 해당하는 활성화된 토큰을 매칭하여 발송합니다.
  private void startActiveTokenChannelListening() {
        // Redis 채널을 구독하기 위한 RTopic 객체
        RTopic topic = redissonClient.getTopic(RedisKey.ACTIVE_TOKEN_PUB_SUB_CHANNEL);

        // 메시지 리스너 등록
        topic.addListener(String.class, (channel, message) -> {
                log.info("Received active token message: {}", message);
                List<String> tokens = jsonConverter.convertFromJsonToList(message, String.class);

                for (String token : tokens) {
                    if (tokenSessionManager.isExistsToken(token)) {
                        tokenService.sendActivatedTokenToClient(token);
                    }
                }
        });
        log.info("Started listening on Redis Pub/Sub channel: {}", RedisKey.ACTIVE_TOKEN_PUB_SUB_CHANNEL);
 };
@Component
@RequiredArgsConstructor
@Slf4j
public class TokenSessionManager {

    private final Map<String, String> tokenSessionMap = new ConcurrentHashMap<>();

    public void saveTokenSession(String token, String sessionId) {
        tokenSessionMap.put(token, sessionId);
        log.info("token, sessionId is saved, {}, {}", token, sessionId);
    }

    public boolean isExistsToken(String token) {
        return tokenSessionMap.containsKey(token);
    }

    public String getSessionIdByToken(String token) {
        return tokenSessionMap.get(token);
    }

    public void removeTokenBySessionId(String sessionId) {
        tokenSessionMap.keySet().removeIf(key -> tokenSessionMap.get(key).equals(sessionId));
    }
}
public void sendActivatedTokenToClient(String token) {
        log.info("Processing active token: {}", token);
        String sessionId = tokenSessionManager.getSessionIdByToken(token);
        log.info("User session ID: {}", sessionId);

        if (null == sessionId) {
            log.warn("No session found for token: {}", token);
            return;
        }

        try {
            ActivatedTokenResponse response = ActivatedTokenResponse.activated(token);
            websocketClientMessageSender.sendActivatedTokenToClient(sessionId, response);

            log.info("Successfully sent active token to user with sessionId: {}", sessionId);
        } catch (Exception e) {
            log.error("Failed to send active token to user with sessionId: {}. Error: {}", sessionId, e.getMessage(), e);
        }
}
public void sendActivatedTokenToClient(String sessionId, ActivatedTokenResponse response) {
       messagingTemplate.convertAndSendToUser(sessionId, WebsocketInfo.TOKEN_DESTINATION, response);
}

 

- 웹소켓 서버에서의 구독 및 웹소켓 클라이언트로 발송 시 중요하게 고려한 점은 다음과 같습니다. 

 

(3-1) 토큰-세션ID 로컬 캐시 관리 

- 사용자가 첫 웹소켓 연결을 할 때, 토큰-세션 ID가 특정 서버의 로컬 캐시(ConcurrentHashMap)에 저장됩니다

- 그리고 이를 통해 분산 서버 환경에서 웹소켓 클라이언트로의 메시지 중복 발송을 방지하였습니다.  

 

(3-2) convertAndSendToUser과 sessionId를 통한 유저별 발송

- 로컬 캐시에 저장해둔 sessionId를 활용하여, 토큰 활성화 정보해당 session의 사용자에게 정확히 전달하였습니다.