05. Consume Messages 이론
Message Listener
이름 | Listener | decription |
---|---|---|
Record | MessageListener | Auto Commit |
AcknowledgingMessageListener | Manual Commit | |
ConsumerAwareMessageListener | Consumer 객체 활용 | |
AcknowledgingConsumerAwareMessageListener | Manual Commit, 컨슈머 객체 활용 | |
Batch | BatchMessageListener | Auto Commit |
BatchAcknowledgingMessageListener | Manual Commit | |
BatchConsumerAwareMessageListener | Consumer 객체 활용 | |
BatchAcknowledgingConsumerAwareMessageListener | Manual Commit, 컨슈머 객체 활용 |
Consumer는 Thread-Safe 하지 않으므로 Listener를 호출하는 쓰레드에서만 호출
Commit Method
AckMode
name | description |
---|---|
RECORD | 레코드를 처리한 후 리스너가 반환할때 커밋 |
BATCH | poll() 메서드로 호출된 레코드가 모두 처리된 이후 커밋 |
스프링 카프카 컨슈머의 AckMode 기본값 | |
TIME | ackTime 만큼 지난 이후에 커밋 |
시간 간격을 선언하는 ackTime 옵션을 설정해야함 | |
COUNT | ackCount로 설정된 개수만큼 레코드가 처리된 이후에 커밋 |
레코드 개수를 선언하는 ackCount 옵션을 설정해야함 | |
COUNT_TIME | COUNT, TIME 중 맞는 조건이 나오면 커밋 |
MANUAL | Acknowledgement.acknowledge() 메서드가 호출되면 다음번 poll() 메서드 호출시 커밋 |
매번 acknowledge()를 호출하면 BATCH 옵션과 동일 | |
AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 사용해야 함 | |
MANUAL_IMMEDIATE | Acknowledgement.acknowledge() 메서드가 호출되면 커밋 |
AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 사용해야 함 |
Container
KafkaMessageListenerContainer
Single Thread
ConcurrentMessageListenerContainer
KafkaMessageListenerContainer 인스턴스를 1개 이상 사용하는 Multi-Thread
start, stop 등 메서드를 foreach로 순차적으로 실행
장점
start, stop, pause, resume이 용이
풍부한 AckMode를 지원
@KafkaListener
@KafkaListener
@EnableKafka을 사용하려면, @Configuration 중 Bean 이름이 kafkaListenerContainerFactory인 ConcurrentMessageListenerContainer 객체 필요
Spring Boot 에서는 모든 것이 기본 세팅되어 있음
KafkaAutoConfiguration
ConcurrentKafkaListenerContainerFactoryConfigurer
다양한 설정을 property로 손쉽게 가능
clientIdPrefix, autoStartup, concurrency, topicPartitions, …
메타 데이터
OFFSET, RECEIVED_MESSAGE_KEY, RECEIVED_TOPIC, RECEIVED_PARTITION_ID, RECEIVED_TIMESTAMP, TIMESTAMP_TYPE
ConsumerRecordMetadata를 이용해서 수신할 수도 있음
Payload Validator
Payload Validator
2.2 이후부터 손쉽게 추가 가능
이전에는 DefaultMessageHandlerMethodFactory에 등록하고 추가했어야 하나, 이제 KafkaListenerEndpointRegistrar에 등록해서 사용할 수 있음
RetryTemplate과 RecoveryCallback<Void>를 Container Factory에 설정하여 사용
RecoveryCallback이 설정되지 않으면, 모든 재시도가 실패시 Container Error 발생
Retry Stateful
BackOffPolicy를 이용해 재시도 하는 과정에서 Consumer Thread가 중지될 수 있음
재시도를 하는 동안 poll()이 수행되지 않기 때문
session.timeout.ms: 설정된 시간안에 hearbeat을 받지 못하면 Consumer Group에서 제거하고 rebalance 발생
max.poll.interval.ms: 설정된 시간안에 poll()이 호출되지 않으면 Consumer가 죽었다고 판단 할당 파티션이 revoke되고 rebalance 발생
Last updated