05. Consume Messages 이론

Message Listener

이름Listenerdecription

Record

MessageListener

Auto Commit

AcknowledgingMessageListener

Manual Commit

ConsumerAwareMessageListener

Consumer 객체 활용

AcknowledgingConsumerAwareMessageListener

Manual Commit, 컨슈머 객체 활용

Batch

BatchMessageListener

Auto Commit

BatchAcknowledgingMessageListener

Manual Commit

BatchConsumerAwareMessageListener

Consumer 객체 활용

BatchAcknowledgingConsumerAwareMessageListener

Manual Commit, 컨슈머 객체 활용

  • Consumer는 Thread-Safe 하지 않으므로 Listener를 호출하는 쓰레드에서만 호출

Commit Method

AckMode

namedescription

RECORD

레코드를 처리한 후 리스너가 반환할때 커밋

BATCH

poll() 메서드로 호출된 레코드가 모두 처리된 이후 커밋

스프링 카프카 컨슈머의 AckMode 기본값

TIME

ackTime 만큼 지난 이후에 커밋

시간 간격을 선언하는 ackTime 옵션을 설정해야함

COUNT

ackCount로 설정된 개수만큼 레코드가 처리된 이후에 커밋

레코드 개수를 선언하는 ackCount 옵션을 설정해야함

COUNT_TIME

COUNT, TIME 중 맞는 조건이 나오면 커밋

MANUAL

Acknowledgement.acknowledge() 메서드가 호출되면 다음번 poll() 메서드 호출시 커밋

매번 acknowledge()를 호출하면 BATCH 옵션과 동일

AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 사용해야 함

MANUAL_IMMEDIATE

Acknowledgement.acknowledge() 메서드가 호출되면 커밋

AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 사용해야 함

Container

  1. KafkaMessageListenerContainer

    1. Single Thread

  2. ConcurrentMessageListenerContainer

    1. KafkaMessageListenerContainer 인스턴스를 1개 이상 사용하는 Multi-Thread

    2. start, stop 등 메서드를 foreach로 순차적으로 실행

  3. 장점

    1. start, stop, pause, resume이 용이

    2. 풍부한 AckMode를 지원

@KafkaListener

  1. @KafkaListener

    1. @EnableKafka을 사용하려면, @Configuration 중 Bean 이름이 kafkaListenerContainerFactory인 ConcurrentMessageListenerContainer 객체 필요

    2. Spring Boot 에서는 모든 것이 기본 세팅되어 있음

      • KafkaAutoConfiguration

      • ConcurrentKafkaListenerContainerFactoryConfigurer

    3. 다양한 설정을 property로 손쉽게 가능

      • clientIdPrefix, autoStartup, concurrency, topicPartitions, …

    4. 메타 데이터

      • OFFSET, RECEIVED_MESSAGE_KEY, RECEIVED_TOPIC, RECEIVED_PARTITION_ID, RECEIVED_TIMESTAMP, TIMESTAMP_TYPE

      • ConsumerRecordMetadata를 이용해서 수신할 수도 있음

Payload Validator

  1. Payload Validator

    1. 2.2 이후부터 손쉽게 추가 가능

    2. 이전에는 DefaultMessageHandlerMethodFactory에 등록하고 추가했어야 하나, 이제 KafkaListenerEndpointRegistrar에 등록해서 사용할 수 있음

    3. RetryTemplate과 RecoveryCallback<Void>를 Container Factory에 설정하여 사용

      1. RecoveryCallback이 설정되지 않으면, 모든 재시도가 실패시 Container Error 발생

  2. Retry Stateful

    1. BackOffPolicy를 이용해 재시도 하는 과정에서 Consumer Thread가 중지될 수 있음

      • 재시도를 하는 동안 poll()이 수행되지 않기 때문

      • session.timeout.ms: 설정된 시간안에 hearbeat을 받지 못하면 Consumer Group에서 제거하고 rebalance 발생

      • max.poll.interval.ms: 설정된 시간안에 poll()이 호출되지 않으면 Consumer가 죽었다고 판단 할당 파티션이 revoke되고 rebalance 발생

Last updated