Backend

Kafka DLQ 동작 원리와 실패한 event 처리하기

minjiwoo 2026. 5. 31. 18:39
728x90

Kafka 실패 메시지는 어떻게 다시 처리할까

kafka 를 이용하여 컨슘한 메시지를 처리하고, 완료 후에 이벤트를 발행하는 서비스를 개발하였다. 개발 환경에서 테스트를 할 때는 DLQ 로 빠지는 이벤트가 없었지만, 실제 운영 환경에서는 msa 서버간의 통신 중에 timeout 이 종종 일어난다거나 Database 증설 작업 등으로 순단이 발생하였다. 

현재 내가 개발한 서비스는 event 처리에 실패하면 DLQ topic으로 메시지를 보내고, 원본 topic의 offset은 진행된다. 다만 DLQ topic을 별도로 consume하는 로직은 아직 없고, 운영자가 API를 직접 호출해서 필요한 비즈니스 로직과 event 발행을 수동으로 수행하고 있다. 이 구조는 당장에 처리해야할 메시지를 정상적인 것 처럼 처리하고 event 를 발행할 수 있어서 외부 시스템에 영향은 없지만, 몇가지 질문이 생겼다. 

  • DLQ에 남아 있는 이벤트는 실제로 복구된 것인가?
  • 수동 API가 정상 consumer와 같은 로직을 실행한다고 보장할 수 있는가?
  • 같은 이벤트를 다시 처리하면 중복 side effect가 생기지 않는가?
  • offset reset이나 replay로도 해결할 수 있는 문제인가?

현재 구조의 가장 큰 문제는 실패 이벤트의 상태를 추적하기 어렵다는 점이다. 운영자가 API로 수동 복구를 수행하더라도,
DLQ topic에는 원본 실패 이벤트가 그대로 남아 있다. 따라서 시간이 지나면 이 이벤트가 아직 복구 대상인지,
이미 수동으로 처리된 이벤트인지, 다시 처리하면 안 되는 이벤트인지 구분하기 어려워진다. 두 번째 문제는 정상 처리 경로와 복구 처리 경로가 달라질 수 있다는 점이다. 정상 이벤트는 Kafka consumer를 통해 EventHandler를 거치지만, 수동 복구는 운영자 API를 통해 별도 service method를 호출한다. 처음에는 같은 결과를 만들더라도 시간이 지나면 validation, idempotency check,
후속 event 발행, error handling이 달라질 수 있다. 

DLQ (Dead Letter Queue)란 ?

DLQ 란 Dead Letter Queue 의 약자이다. Kafka 에서는 보통 topic 에 이벤트를 발행하기 때문에 Dead Letter Topic 이라고도 많이 부른다.정상적으로 처리할 수 없는 메세지를 원래의 consumer 흐름에서 분리해서 따로 보관하는 곳이다. 예를 들어서 아래와 같이, 특정 event 를 consume 하다가 해당 이벤트 처리가 계속 실패하면, 그 이벤트를 별도의 DLQ topic 으로 보낸다.

generated by gpt

DLQ 가 필요한 이유

Kafka consumer 는 partition 단위로 순서대로 메시지를 읽는다. 그래서 특정 메시지 하나가 계속 실패하면 그 뒤의 메시지들이 처리되지 못하고 막힐 수 있다. 실패한 메시지를 계속해서 재시도하면 consumer 가 멈춘것처럼 될것이다. 이런 실패 메시지를 poison pill 이라고 부른다.  아래의 그림에서 offset 11 에 해당하는 메시지가 poison pill 의 예시이다. 따라서 DLQ는 이런 실패한 메시지를 격리한 후, consumer 가 다음 메시지를 계속 해서 처리할 수 있도록 한다. 

DLQ 가 있을때와 없을 때의 비교

즉, DLQ 의 핵심 목적은 실패 메시지의 격리이다. 장애를 해결했다는 뜻이 아니라, 장애를 격리했다는 의미이다. DLQ 에 메시지가 쌓였는데 재처리 되지 않는다면, 그냥 실패 이벤트가 쌓이기만 한다는 의미이다. 

DLQ 는 보통 retry 이후의 마지막 단계이다. retry 가능한 메시지의 경우, 다시 시도하면 성공할 수도 있는 실패 메시지이다. DB timeout, 외부 API 의 일시적인 장애, 네트워크 오류 등의 이슈가 있을 수 있다. 그렇지만 반대로 retry 해도 의미없는 실패가 있을 수 있다. JSON schema 의 오류, 필수 필드 누락 등의 원인이 여기에 해당 될 것이다. 이 경우 Retry 에 실패하면 해당 메시지는 DLQ 로 빠지게 될 것이다. 

 

앞으로 개선해 볼 DLQ 처리 구조

DLQ 처리 구조의 개선 방향을 수동 API로 비슷한 비즈니스 효과를 만드는 것이 아니라,DLQ에 저장된 원본 이벤트를 다시 정상 consumer handler로 흘려보내는 것으로 설계해보았다. 

1. onsumer handler가 eventId 기준으로 멱등하게 처리할 수 있도록 event 에 unique 한 EventId 를 둔다. 

data class OrderPaidEvent(
    val eventId: String,
    val orderId: String,
    val amount: Long,
    val failMode: FailMode = FailMode.NONE,
)

enum class FailMode {
    NONE,
    RETRYABLE,
    NON_RETRYABLE,
}

2. processed_event 테이블 DDL 

CREATE TABLE processed_event (
    event_id VARCHAR(100) NOT NULL,
    consumer_name VARCHAR(100) NOT NULL,
    processed_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (event_id, consumer_name)
);

3. event 처리에 대해 저장하는 로직을 구현한다. 

@Repository
class ProcessedEventRepository(
    private val jdbcClient: JdbcClient,
) {
    fun insertIfAbsent(eventId: String, consumerName: String): Boolean {
        return try {
            jdbcClient.sql(
                """
                INSERT INTO processed_event (event_id, consumer_name)
                VALUES (:eventId, :consumerName)
                """.trimIndent(),
            )
                .param("eventId", eventId)
                .param("consumerName", consumerName)
                .update()

            true
        } catch (exception: DuplicateKeyException) {
            false
        }
    }
}

 

4. handler 를 멱등하게 구현한다. 

@Component
class OrderPaidEventHandler(
    private val processedEventRepository: ProcessedEventRepository,
    private val orderPaymentRepository: OrderPaymentRepository,
) {
    @Transactional
    fun handle(event: OrderPaidEvent) {
        val firstProcessing = processedEventRepository.insertIfAbsent(
            eventId = event.eventId,
            consumerName = "order-payment-consumer",
        )

        if (!firstProcessing) {
            return
        }

        when (event.failMode) {
            FailMode.NONE -> {
                orderPaymentRepository.markPaidIfAbsent(
                    orderId = event.orderId,
                    eventId = event.eventId,
                    amount = event.amount,
                )
            }

            FailMode.RETRYABLE -> throw IllegalStateException("temporary failure")
            FailMode.NON_RETRYABLE -> throw IllegalArgumentException("invalid event")
        }
    }
}

5. presentation layer 구현 

@RestController
@RequestMapping("/api/v1/failed-events")
class AdminFailedEventController(
    private val failedEventRedriveService: FailedEventRedriveService,
) {
    @PostMapping("/{id}/redrive")
    fun redrive(@PathVariable id: Long): ResponseEntity<Void> {
        failedEventRedriveService.redrive(id)
        return ResponseEntity.accepted().build()
    }
}

 

728x90