항해 9주차에는 Kafka라는 기술이 무엇인지 살펴보았다!
분산 시스템에서 Kafka를 활용하여 메세지를 처리하는 방식에 대해 배웠다.
또한 이벤트 기반 아키텍처에서 Kafka를 활용하여 서비스를 유연하게 확장하는 방법과, 안정적인 이벤트 처리를 위한 전략도 함께 익혔다~!
목차
1. Kafka란?
2. Kafka 핵심 아키텍처 이해
3. 마이크로서비스와 카프카 기반 이벤트 확장
4. 안정적인 이벤트 처리를 위한 핵심 전략
5. 회고
1. Kafka란?
카프카(Kafka)는 분산형 스트리밍 플랫폼으로, 대량의 데이터를 안정적이고 실시간으로 처리할 수 있도록 설계되었다.
카프카는 주로 대량의 이벤트 스트림 데이터를 처리하고 여러 시스템 간에 데이터를 신속하게 전송하는 데 사용된다.
기업에서 대규모 데이터 처리 및 이벤트 기반 시스템을 구축하는데 널리 사용되며, 특히 대용량의 로그 데이터를 수집하고 분석하는 데 유용하다. 성능과 기능이 좋은 큐라고 할 수 있다.
발행 · 구독 모델
메시지 발행 구독 시스템에서 데이터를 발행자가 직접 구독자에게 보내지 않는다. 대신 발행자가 어떤 형태로든 메시지를 구분해서 발행 구독 시스템에 전송하면, 구독자가 특정 부류의 메시지를 구독할 수 있게 하는데, 이때 발행된 메시지를 저장하고 중계하는 역할을 브로커가 한다.
카프카는 발행 · 구독(pub/sub) 모델에서 브로커(broker)의 역할을 하고 있다. 발행자(Publisher)와 구독자(Consumer) 사이에서 이벤트라 불리는 메시지를 카프카는 전달한다.
2. Kafka 핵심 아키텍처 이해
1. Producer & Consumer
- Producer - 메세지를 카프카 브로커에 적재(발행)하는 서비스
- Consumer - 카프카 브로커에 적재된 메세지를 읽어오는(소비) 서비스
- 메세지를 읽을 때마다 파티션 별로 offset을 유지해 처리했던 메세지의 위치를 추적한다.
2. Broker
- 카프카 서버 프로세스로, 토픽/파티션 데이터를 저장하고 클라이언트 요청을 처리한다.
- 여러 브로커를 묶어 Cluster를 구성하며, Leader-Follower 구조로 Replication을 수행한다.
- Cluster
- 고가용성(HA)를 위해 여러 서버를 묶어 특정 서버의 장애를 극복할 수 있도록 구성한다.
- Broker가 증가할수록 메시지 수신, 전략달 처리량을 분산시킬 수 있으므로 확장에 유리하다.
(동작중인 다른 Broker에 영향없이 확장이 가능하므로, 트래픽 양의 증가에 따른 브로커 증설이 손쉽게 가능하다.)
- Replication
- Cluster의 가용성을 보장하는 개념이다.
- 각 Partition의 Replica를 만들어 백업 및 장애를 극복한다.
- Leader Replica
각 파티션은 1개의 리더 Replica를 가진다. 모든 Producer, Consumer 요청은 리더를 통해 처리되게 하여 일관성을 보장한다. - Follower Replica
각 파티션의 리더를 제외한 Replica이며 단순히 리더의 메시지를 복재해서 백업한다.
만일, 파티션의 리더가 중단되는 경우 팔로워 중 하나를 새로운 리더로 선출한다.
- Leader Replica
- Cluster
3. Topic & Partition
- Topic - 메세지를 분류하는 기준이며 N개의 Partition으로 구성된다.
- Partition - 발행된 순서대로 컨슘하므로써 순차 처리를 보장한다.
- 대용량 트래픽을 파티션의 개수만큼 병렬로 처리할 수 있어 빠른 처리가 가능하다.
- 전체 메세지의 발행 순차처리를 보장하지 않지만, 같은 파티션의 메세지에 대해서는 순차 처리를 보장한다.
- 동시성 제어의 개념을 생각했을때, 동시에 처리되면 안되는 자원의 Id 등을 메세지의 키로 설정하면
순차 처리가 보장되어야 하는 케이스는 보장이 되면서 병렬 처리로 높은 처리량을 보장한 하이브리드이다.- Partitional
메시지를 발행할 때, 토픽의 어떤 파티션에 저장될지 결정하며 Producer 측에서 결정한다.
특정 메시지에 키가 존재한다면 키의 해시 값에 매칭되는 파티션에 데이터를 전송함으로써 키가 같은 메시지를 다건 발행하더라고, 항상 같은 파티션에 메시지를 적재해 처리 순서를 보장할 수 있다.
- Partitional
3. 마이크로서비스와 카프카 기반 이벤트 확장
마이크로서비스 + 이벤트 기반 설계
- 마이크로서비스 아키텍처에서 각 서비스 간 결합도를 낮추기 위해 비동기 메시지 큐(카프카)를 활용한다.
- 서비스 간 통신을 이벤트(메시지)로 주고받음으로써, 서로 직접 호출하지 않고 느슨하게 연결된다.
class OrderPaymentService {
@Transactional
public ReservationResult createSeatReservation() {
updateSeatStatus(); // 좌석 상태 업데이트 (점유)
updateScheduleRemainingTicket(); // 잔여 티켓 수 업데이트(-1)
creatSeatReservation(); // 예약 신청
reservationEventPublisher.success(new ReservationSuccessEvent());
return new ReservationResult();
}
}
class ReservationEventListener {
@TransactionalEventListener(phase = AFTER_COMMIT)
public void sendReservationInfo() {
kafkaProducer.publishReservationInfo();
}
}
예약 정보에 대한 이벤트가 발생하면, 이를 이벤트 리스너에서 받아 카프카(Kafka) 브로커에 메시지로 발행하게 된다. 이 과정을 통해 예약 정보는 데이터 플랫폼으로 전달되며, 메시지를 수신하든 그렇지 않든 발행 측의 책임은 종료된다.
코드에선 메시지 발행을 트랜잭션 외부에서 처리하고 있는데,
이는 트랜잭션 내부에서 메시지를 발행했을 때 발생할 수 있는 두 가지 문제를 방지하기 위해서이다.
- 트랜잭션 커밋 실패 시 유령 이벤트 발생 가능성
메시지를 먼저 발행한 후 트랜잭션이 커밋되지 않으면, 데이터는 저장되지 않았는데 이벤트만 발행되어 비정상적인 상황이 발생할 수 있다. - 컨슈머 측에서 조회 실패 가능성
트랜잭션 커밋 전에 이벤트를 컨슈머가 수신하여 데이터를 조회하면, 아직 커밋되지 않은 데이터로 인해 조회 실패가 발생할 수 있다.
이러한 이유로 메시지 발행은 트랜잭션 종료 이후에 수행된다. 하지만 이 방식도 완벽하지는 않다.
메시지 발행이 실패하면 데이터는 저장되었음에도 불구하고, 이벤트가 누락되어 후속 비즈니스 프로세스가 정상적으로 작동하지 않게 된다. 이는 결국 데이터 정합성 문제로 이어질 수 있다.
이러한 문제를 해결하기 위해서는 Transactional Messaging 기법을 도입하여, 데이터베이스 트랜잭션과 메시지 발행을 원자적으로 처리할 수 있도록 해야 한다.
4. 안정적인 이벤트 처리를 위한 핵심 전략
✅ Transactional Messaging
데이터 저장과 메시지 발행을 하나의 트랜잭션처럼 처리하여, 둘 중 하나라도 실패하면 전체 작업을 롤백하거나 재시도하게 만드는 방식이다. Transactional Messaging의 대표적인 패턴으로는 다음과 같이 있다.
1. Outbox Pattern
도메인 로직이 성공적으로 수행되었으면, 이에 대한 이벤트 메시지를 별도로 Outbox Table에 저장하여 함께 Commit 한다.
이는 동일한 트랜잭션 내에서 이벤트 발행을 위한 Outbox 데이터 적재까지 진행해 이벤트 발행에 대해 보장한다.
이벤트 발행 상태 또한 Outbox 데이터에 존재하므로, 배치 프로세스 등을 이용해 미발행된 데이터에 대한 Fallback 처리가 용이하다.
2. Change Data Capture ( CDC )
데이터베이스의 변경 사항을 모니터링하고 (e.g Debezium) 다른 시스템으로 해당 변경사항에 대한 내용을 전파하는 소프트웨워 프로세스이다. DB에서 데이터가 변경되는 것을 감지하고, 해당 변경 건에 관한 이벤트를 발행한다.
3. Outbox 패턴 기반의 Transactional Messaging 처리 흐름
- Service (Publisher)
- 비즈니스 로직 수행
- 데이터 저장과 동시에 Outbox Table에 이벤트 메시지를 저장 (Tx1 트랜잭션 내)
- Kafka에는 아직 메시지가 발행되지 않음
- Kafka 메시지 발행
- 별도 Polling 프로세스 또는 CDC 등을 통해 Outbox에 저장된 메시지를 Kafka로 발행
- Consumer 1 (출발 도메인의 컨슈머)
- Kafka에서 발행된 메시지를 수신
- 메시지 발행을 확인한 후 Outbox 테이블의 상태를 갱신 (outbox_status = SENT 등)
- Consumer 2 (실제 메시지를 처리하는 도메인)
- Kafka 메시지를 수신하여 비즈니스 로직 수행 (예: 다른 시스템 호출, 알림 전송 등)
- Scheduler (재발행 담당 스케줄러)
- 일정 시간 동안 Outbox Table의 상태가 미변경 상태로 남아있으면
메시지 발행 실패로 간주하고 재시도 로직 수행
- 일정 시간 동안 Outbox Table의 상태가 미변경 상태로 남아있으면
예약 서비스
- 예약 정보 이벤트 발행
@Component
@RequiredArgsConstructor
public class ReservationFacade {
private final ConcertService concertService;
private final ReservationService reservationService;
private final ReservationEventPublisher reservationEventPublisher;
@DistributedLock(key = "#reservationParam.scheduleId")
public ReservationResult createSeatReservation(ReservationParam reservationParam) {
//1. 좌석(seat) 상태 업데이트 (점유)
Seat updatedSeat = concertService.updateSeatStatus(reservationParam.seatId(), SeatStatus.OCCUPIED);
//2. 스케줄(schedule) 잔여 티켓 수 업데이트(-1)
Schedule updatedSchedule = concertService.updateScheduleRemainingTicket(reservationParam.scheduleId(), -1);
//3. 예약(reservation) 신청
Reservation savedReservation = reservationService.creatSeatReservation(updatedSeat, reservationParam.userId());
//4. 예약 성공 이벤트 발행
reservationEventPublisher.success(new ReservationSuccessEvent(savedReservation.getId()));
return new ReservationResult(savedReservation.getId(), updatedSchedule.getId(),
savedReservation.getSeatId(), savedReservation.getUserId(), savedReservation.getReservationState(), savedReservation.getCreatedAt());
}
}
예약 완료 이벤트 리스너
- Outbox 이벤트 정보 저장
- 카프카 브로커에 메시지 발행
@RequiredArgsConstructor
public class ReservationEventListener {
private final KafkaProducer kafkaProducer;
private final OutboxService outboxService;
private final ObjectMapper objectMapper;
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
public void saveOutbox(ReservationSuccessEvent event) throws JsonProcessingException {
outboxService.saveOutbox(objectMapper.writeValueAsString(event));
}
@Async
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void reservationDataSendHandler(ReservationSuccessEvent event) throws JsonProcessingException {
kafkaProducer.send("reservation.completed", objectMapper.writeValueAsString(event));
}
}
예약 카프카 컨슈머
- Outbox 상태 업데이트
- 데이터플랫폼 호출
@RequiredArgsConstructor
public class KafkaConsumer {
private final ObjectMapper objectMapper;
private final DataPlatformClient dataPlatformClient;
private final OutboxService outboxService;
@KafkaListener(topics = "reservation.completed", groupId = "my-group")
public void listen(String payload) throws JsonProcessingException {
log.info("Received payload: " + payload);
//outbox 상태가 init 이면 publish로 변경
// 1. 메시지 파싱
ReservationSuccessEvent event = parseEvent(payload);
// 2. Outbox 상태 업데이트 (INIT -> PUBLISHED)
Optional<Outbox> outboxOpt = outboxService.findInitEvent(event.getReservationId());
outboxOpt.ifPresent(outbox -> {
outboxService.updateStatus(outbox.getId(), OutboxStatus.PUBLISHED);
log.info("Outbox 상태 업데이트 완료: outboxId={}", outbox.getId());
});
// 3. 데이터 플랫폼 호출
dataPlatformClient.send(event);
}
public ReservationSuccessEvent parseEvent(String payload) throws JsonProcessingException {
return objectMapper.readValue(payload, ReservationSuccessEvent.class);
}
}
Outbox 상태 스케줄러
- 메시지 재발행 스케줄러
- 오래된 이벤트 삭제 스케줄러
@Slf4j
@Component
@RequiredArgsConstructor
public class OutboxScheduler {
private final OutboxService outboxService;
private final KafkaProducer kafkaProducer;
private final ObjectMapper objectMapper;
// 재처리 스케줄러: 5분마다 실행
@Scheduled(fixedDelay = 300000) // 5분
public void retryOutboxEvents() {
List<Outbox> pendingEvents = outboxService.findInitEventsOlderThan5Min();
if (!pendingEvents.isEmpty()) {
pendingEvents.forEach(event -> {
try {
// Kafka 메시지 재발행
kafkaProducer.send("reservation.completed", event.getPayload());
// 재발행 성공 시 상태를 PUBLISHED로 업데이트
outboxService.updateStatus(event.getId(), OutboxStatus.PUBLISHED);
// 재처리 성공 로그
log.info("Outbox 이벤트 재처리 성공: 이벤트ID={}", event.getId());
} catch (Exception e) {
// 재처리 실패 시 로그
log.error("Outbox 이벤트 재처리 실패: 이벤트ID={}, error={}", event.getId(), e.getMessage());
}
});
}
}
// 정리 스케줄러: 5분마다 실행
@Scheduled(fixedDelay = 300000)
public void cleanupOldPublishedEvents() {
List<Outbox> oldEvents = outboxService.findPublishedEventsOlderThan30Days();
if (!oldEvents.isEmpty()) {
oldEvents.forEach(event -> {
try {
outboxService.deleteOutbox(event);
log.info("오래된 Outbox 이벤트 삭제 완료: 이벤트ID={}", event.getId());
} catch (Exception e) {
log.error("오래된 Outbox 이벤트 삭제 실패: 이벤트ID={}, error={}", event.getId(), e.getMessage());
}
});
}
}
}
5. 회고
이번 시간에 카프카에 대해 알아보았는데, 전에는 카프카에 대해 이름만 들어봤지 실제로 어떤 상황에 쓰이는 건지 몰랐었다.
MSA 구조로 시스템을 전환 후 도메인 간의 통신이나 트랜잭션 이후 데이터 플랫폼을 호출을 이벤트 기반으로 처리하는 상황에 메세지 전달의 보장을 하기 위해서 카프카가 쓰인다는 것을 알게 되었다.
또한 비즈니스 로직 처리와 동시에 카프카로 직접 메세지를 발행했지만, 트랜잭션 실패나 메세지 유실 등의 문제가 발생하였을때를 고려하여 메세지를 DB의 Outbox 테이블에 저장한 후 별도 프로세스를 통해 카프카로 발행하는 구조로 바꿔 보았다. 그리고 Outbox 상태를 체크해서 발행이 안 된 메시지는 스케줄러를 통해 재발행 되도록 개발하였다.
이 방식으로 데이터 정합성과 메시지 전파의 안정성을 모두 챙길 수 있었다.
현업에서는 대용량 트래픽을 경험할 일이 적어서, 카프카를 사용하는 것은 이번이 처음이었다. 직접 카프카를 설치하고, 메시지를 발행하고 소비하는 로직까지 작성해보면서 카프카 사용에 대한 감을 익힐 수 있었다. 😉
'항해 플러스 백엔드 > 대용량 트래픽 처리' 카테고리의 다른 글
[항해] 8주차, 쿼리 성능 개선 & 이벤트 구현 (0) | 2025.02.16 |
---|---|
[항해] 7주차, Redis를 활용한 캐싱 & 대기열 시스템 개선 (0) | 2025.02.16 |
[항해] 6주차, 동시성 제어 방식 분석 (0) | 2025.02.16 |