1. 개요
- 콘서트 티켓 프로젝트에서는 부하 조절을 위해 콘서트 대기열 기능을 제공합니다.
대기열에 있는 사용자는 주기적으로(예: 10초마다) 활성화열로 이동하며, 이때 토큰 활성화 알림을 받습니다.
사용자는 토큰 활성화 알림을 통해 티켓 예약 가능 여부를 확인하고, 예약을 진행할 수 있습니다.
2. 기술 스택
- 이 시스템은 WebSocket과 Redis Pub/Sub을 활용하여 구현되었습니다.
3. 동작 방식
- 사용자 대기열 등록
- 사용자가 콘서트 티켓을 예매하려 하면 대기열에 등록됩니다.
- 주기적 활성화열 이동
- 일정 주기(예: 10초)마다 일부 사용자가 대기열에서 활성화열로 이동합니다.
- 활성화열의 최대 인원은 5천명입니다.
- WebSocket을 통한 알림 전송
- Redis Pub/Sub을 이용하여 활성화된 사용자 목록을 발송하고,
WebSocket을 통해 구독한 후, 해당 사용자에게 토큰 활성화 알림을 보냅니다.
- Redis Pub/Sub을 이용하여 활성화된 사용자 목록을 발송하고,
- 사용자의 티켓 예약 진행
- 토큰 활성화 알림을 받은 사용자는 콘서트 예약을 진행할 수 있습니다.
- 토큰 활성화 알림을 받은 사용자는 콘서트 예약을 진행할 수 있습니다.
이 글은 이러한 WebSocket과 Redis Pub/Sub을 활용한 대기열 -> 활성화열 전환 알림 시스템의 개발 과정을
다룹니다.
4. 구현 상세
(1) 외부 Cron Job을 통한 대기열 ->활성화열 전환 요청
-> 대기열->활성화열 전환이 여러 서버에서 중복으로 실행되는 것을 방지하기 위해,
Nginx와 외부 Cron Job(ex) Jenkins)을 활용하여 대기열->활성화열 전환 API 요청이 수행됩니다.
(1-1) nginx.conf
location /api/v1/waitingQueue/migration {
proxy_pass http://34.64.102.105:8080; # 특정 서버로 고정
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
(1-2) POST API
@PostMapping("/api/v1/waitingQueue/migration")
public ResponseEntity<Boolean> processWaitingQueueMigration() {
try {
waitingQueueMigrationApplicationService.migrateFromWaitingToActiveQueue();
return ResponseEntity.ok(true); // 성공 시 true 반환
} catch (Exception e) {
log.error("대기열 처리 중 오류 발생: " + e.getMessage(), e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(false); // 실패 시 false 반환
}
}
(2) 대기열 활성화 여부 확인
- 콘서트 티켓 프로젝트에서 대기열은 트래픽 임계치에 따라 활성화/비활성화가 결정됩니다
따라서 우선적으로 대기열이 활성화되어 있는지 확인한 후에,
활성화되어 있다면 실제 요청을 보내게 됩니다.
@Component
@Slf4j
@RequiredArgsConstructor
public class WaitingQueueMigrationApplicationService {
private final WaitingQueueService waitingQueueService;
public void migrateFromWaitingToActiveQueue() {
String currentStatus = waitingQueueService.getWaitingQueueStatus();
if(WaitingQueueStatus.ACTIVE.equals(currentStatus)) {
waitingQueueService.migrateFromWaitingToActiveQueue();
}
}
}
(3) 대기열->활성화열 전환
private static final long MAX_ACTIVE_QUEUE_SIZE = 5000L;
private static final long MAX_TRANSFER_COUNT = 250L;
public void migrateFromWaitingToActiveQueue() {
long activeQueueSize = activeQueueDAO.getActiveQueueSize();
long transferCount = Math.min(MAX_ACTIVE_QUEUE_SIZE - activeQueueSize, MAX_TRANSFER_COUNT);
if(transferCount == 0){
return;
}
Collection<WaitingDTO> tokenList = waitingQueueDAO.getAllWaitingTokens(transferCount);
if (tokenList.isEmpty()) {
return;
}
activeQueueDAO.migrateTokensFromWaitingQueueToActiveQueue(tokenList);
// 토큰 목록을 Redis Pub/Sub을 통해 발행한다
// 발행한 토큰 목록은 WebSocket 서버에서 Redis Pub/Sub을 통해 구독한다
// 토큰 Pub/Sub의 목적은, 토큰이 활성화되었을 때, 웹소켓 클라이언트에게 알림을 주기 위함이다.
tokenPublisher.publishAllActiveTokens(tokenList);
// 토큰 목록을 대기열에서 삭제처리한다
waitingQueueDAO.deleteWaitingQueueTokens(tokenList);
Collection<WaitingDTO> waitingTokenList = waitingQueueDAO.getAllWaitingTokensWithRank();
tokenPublisher.publishAllWaitingTokens(waitingTokenList);
}
public void migrateTokensFromWaitingQueueToActiveQueue(Collection<WaitingDTO> tokens) {
RBatch batch = redisson.createBatch();
for (WaitingDTO waitingDTO : tokens) {
String uuid = waitingDTO.getUuid();
String token = waitingDTO.getToken();
// 활성화 큐에 추가
batch.getMapCache(RedisKey.ACTIVE_QUEUE).putIfAbsentAsync(uuid, token, 300, TimeUnit.SECONDS);
// 대기열에서 삭제
batch.getMapCache(RedisKey.WAITING_QUEUE).removeAsync(token);
}
// 원자적으로 실행
batch.execute();
}
- 대기열 -> 활성화열 전환을 할 때 중요하게 고려한 점은 다음과 같습니다.
(3-1) 활성화열 최대 크기 관리
- 활성화열의 최대 크기를 5000으로 관리하여, 트래픽 부하를 제어하였습니다.
(3-2) 대기열 -> 활성화열 전환 시 트랜잭션 관리
- 서버 다운 등으로 인한 데이터 부정합을 방지하기 위해, Redisson 라이브러리의 RBatch를 활용하였습니다.
즉, 대기열 → 활성화열 전환을 배치 방식으로 처리하여 하나의 트랜잭션으로 동작하도록 구현하였습니다.
(3-3) Redis Pub/Sub을 통한 발송
- Redis Pub/Sub으로 활성화된 토큰 목록을 발행하여, Websocket 서버에서 구독할 수 있도록 하였습니다.
(4) 웹소켓 서버에서의 구독 및 웹소켓 클라이언트로 발송
// 활성화된 토큰 정보를 Redis Pub/Sub 채널을 통해 수신하는 기능
// 웹소켓 서버가 이중화 되었음을 고려해, Redis 분산락을 통해서 1번만 실행되도록 보장합니다
// 사용자의 Websocket 클라이언트에 해당하는 활성화된 토큰을 매칭하여 발송합니다.
private void startActiveTokenChannelListening() {
// Redis 채널을 구독하기 위한 RTopic 객체
RTopic topic = redissonClient.getTopic(RedisKey.ACTIVE_TOKEN_PUB_SUB_CHANNEL);
// 메시지 리스너 등록
topic.addListener(String.class, (channel, message) -> {
log.info("Received active token message: {}", message);
List<String> tokens = jsonConverter.convertFromJsonToList(message, String.class);
for (String token : tokens) {
if (tokenSessionManager.isExistsToken(token)) {
tokenService.sendActivatedTokenToClient(token);
}
}
});
log.info("Started listening on Redis Pub/Sub channel: {}", RedisKey.ACTIVE_TOKEN_PUB_SUB_CHANNEL);
};
@Component
@RequiredArgsConstructor
@Slf4j
public class TokenSessionManager {
private final Map<String, String> tokenSessionMap = new ConcurrentHashMap<>();
public void saveTokenSession(String token, String sessionId) {
tokenSessionMap.put(token, sessionId);
log.info("token, sessionId is saved, {}, {}", token, sessionId);
}
public boolean isExistsToken(String token) {
return tokenSessionMap.containsKey(token);
}
public String getSessionIdByToken(String token) {
return tokenSessionMap.get(token);
}
public void removeTokenBySessionId(String sessionId) {
tokenSessionMap.keySet().removeIf(key -> tokenSessionMap.get(key).equals(sessionId));
}
}
public void sendActivatedTokenToClient(String token) {
log.info("Processing active token: {}", token);
String sessionId = tokenSessionManager.getSessionIdByToken(token);
log.info("User session ID: {}", sessionId);
if (null == sessionId) {
log.warn("No session found for token: {}", token);
return;
}
try {
ActivatedTokenResponse response = ActivatedTokenResponse.activated(token);
websocketClientMessageSender.sendActivatedTokenToClient(sessionId, response);
log.info("Successfully sent active token to user with sessionId: {}", sessionId);
} catch (Exception e) {
log.error("Failed to send active token to user with sessionId: {}. Error: {}", sessionId, e.getMessage(), e);
}
}
public void sendActivatedTokenToClient(String sessionId, ActivatedTokenResponse response) {
messagingTemplate.convertAndSendToUser(sessionId, WebsocketInfo.TOKEN_DESTINATION, response);
}
- 웹소켓 서버에서의 구독 및 웹소켓 클라이언트로 발송 시 중요하게 고려한 점은 다음과 같습니다.
(3-1) 토큰-세션ID 로컬 캐시 관리
- 사용자가 첫 웹소켓 연결을 할 때, 토큰-세션 ID가 특정 서버의 로컬 캐시(ConcurrentHashMap)에 저장됩니다.
- 그리고 이를 통해 분산 서버 환경에서 웹소켓 클라이언트로의 메시지 중복 발송을 방지하였습니다.
(3-2) convertAndSendToUser과 sessionId를 통한 유저별 발송
- 로컬 캐시에 저장해둔 sessionId를 활용하여, 토큰 활성화 정보를 해당 session의 사용자에게 정확히 전달하였습니다.
'콘서트 티켓 프로젝트' 카테고리의 다른 글
활성/비활성 사용자 구분 후 비활성 사용자 웹소켓 연결 끊김 처리 (0) | 2025.03.09 |
---|---|
[#1] 서버 성능 개선하기 - 웹소켓 연결 테스트 (0) | 2025.03.03 |
Nginx 로드밸런싱 설정하기 (0) | 2025.03.03 |