3.4 카프카 클라이언트
카프카 클라이언트
카프카 클러스터에 명령을 내리거나 데이터를 송수신하는 라이브러리
프로듀서 API
데이터의 시작점으로 카프카에 필요한 데이터를 선언하고 리더 파티션에 전송.
프로듀서는 데이터를 직렬화 하여, 동영상,이미지 같은 바이너리 데이터도 프로듀서를 통해 전송 할 수 있다.
실습 Code (GitHub)
https://github.com/machine-geon/simple_kafka
프로듀서 중요 개념
토픽과 메시지값은 생성 필수 파라미터이다.
파티션 번호 지정, 타임스탬프 설정, 메시지 키 설정 또한 가능하다.
레코드의 타임스탬프는 카프카 브로커에 저장될 때 브로커 시간을 기준으로 설정 → 필요에 따라 변경 가능.
전송 단계
Producer가 send() 메서드를 호출할 때, Partitioner에서 토픽의 어느 파티션으로 전송할지 정한다.
Partitioner에 의해 구분된 record는 데이터를 전송하기 전에 accumulator에 버퍼로 쌓아놓고 발송한다. → batch로 묶어서 전송.
파티셔너
프로듀서 API는 2가지 파티션을 제공한다.
UniformStickyPartitioner (ver 2.5.0 default)
RoundRobinPartitioner (ver 2.4.0 이전에서 default)
메지지 키의 해시값과 파티션을 매칭하여 데이터를 전송하는 점은 동일하지만, 메시지 키가 없을 때 UniformStickyPartitioner가 더 효율적이다.
Partitioner 인터페이스를 상속받아 사용자 정의 클래스에서 파티션 지정 로직을 적용할 수 있다.
압축
압축 옵션을 통해 브로커로 전송 시 압축 방식을 지정할 수 있다.
gzip, snappy,lz4, zstd 지원
압축 옵션을 지정하지 않는 경우 압축이 되지 않는 상태로 전송된다.
리소스는 이득, 그러나 CPU, 메모리 리소스 손해
Consumer에서 압축을 풀면서 리소스가 사용되는 점을 주의.
프로듀서 주요 옵션
필수 옵션
bootstrap.servers
전송할 대상이되는 카프카 클러스터의 host:port
key.serializer
직렬화 설정
value.serializer
직렬화 설정
선택 옵션
acks
1: 리더 파티션의 저장을 전송 성공으로 판단(default)
0: 저장 여부와 상관 없이 성공으로 판단
-1(all): min.insync.replicas(isr) 개수에 해당하는 파티션에 저장을 전송 성공으로 판단
buffer.memory
버퍼 메모리양
default: 33554432(32MB)
retries
재전송 시도 횟수
default: 2147483647
batch.size
배치로 전송할 레코드 최대 용량
size가 너무 작은 경우 네트워크 부담, size가 너무 큰 경우 메모리에 부담
default: 16384 (16KB)
배치 전송하기 전까지 기다리는 최소 시간
default: 0
partitioner.class
레코드를 파티션에 전송할 때 적용하는 파티셔너 클래스를 지정
default: org.apache.kafka.clients.producer.internals.DefaultPartitioner
enable.idempotence
멱등성 ( f(f(x)) = f(x) )
enable.idempotence를 사용하면, 하나의 Batch가 실패하면, 같은 Partition으로 들어오는 후속 Batch들도 OutOfOrderSequenceException과 함께 실패 → 메시지 순서 보장
default: false
레코드를 전송할 때 트랜잭션 단위로 묶을지 여부
프로듀서의 고유한 트랜잭션 아이디를 설정할 수 있다.
설정할 경우 트랜잭션 프로듀서로 동작
참고
컨슈머 API
브로커에 적재된 데이터를 사용하기 위한 API
목적에 따라 컨슈머 그룹 이름을 정하는 것을 추천한다. (ex. email-application-group)
컨슈머에게 토픽을 할당하기 위해 subscribe() 메서드를 사용한다. →Collection 타입의 String 값으로 토픽 이름들을 받는다.
중요 개념
운영 방법 2가지
1개 이상의 컨슈머로 이루어진 컨슈머 그룹 운영
컨슈머 그룹으로 묶인 컨슈머들은 토픽의 파티션들에게 할당되어 데이터를 수집한다.
단, 1개의 파티션은 최대 1개의 컨슈머에 할당 가능하다. →컨슈머 그룹의 컨슈머 개수는 토픽의 파티션 개수와 같거나 작아야한다.
토픽의 특정 파티션만 구독하는 컨슈머를 운영
컨슈머 그룹간의 격리
컨슈머 그룹 끼리의 영향을 받지 않는다. → 파이프라인을 운영함에 있어 저장소의 장애에 유연하게 대응할 수 있다.
컨슈머 그룹으로 나눌 수 있는 경우 나누는 것을 추천한다.
컨슈머 그룹의 장애 발생시
장애가 발생한 컨슈머에 할당된 파티션은 활동준인 컨슈머에 소유권이 넘어간다. → 이를 rebalancing(리밸런싱)이라 한다.
리밸런싱 발생 상황
컨슈머의 추가
컨슈머의 장애
단, 리밸런싱이 일어나게 되면, 재할당 과정에서 컨슈머들은 데이터를 수집할 수 없다.
컨슈머 커밋
커밋(commit)을 통해 데이터를 어디까지 가져갔는지 기록한다. → 내부 토픽(__consumer_offsets)에 기록, 기록이 되지 않는 경우 중복 발생의 원인이 될 수 있다.
비명시 커밋
enable.auto.commit=true 옵션은 poll() 메서드가 수행될 때, 자동으로 커밋을 발생시킨다.
데이터의 중복이나 유실을 허용하지 않는 서비스라면 자동 커밋의 사용은 좋지 않다.
명시 커밋
poll() 메서드 호출 이후 반환 받은 데이터의 처리가 완료되고, commitSync() 메서드를 호출
commitSync() 메서드는 poll() 메서드를 통해 반환된 레코드의 가장 마지막 오프셋을 기준으로 커밋을 수행
이는 커밋 요청 후 응답을 기다리는데 컨슈머의 처리량에 영향을 끼친다. → commitAsync() 메서드를 통해 보완할 수 있지만, 커밋 요청이 실패했을 경우, 현재 처리중인 데이터의 순서 및 중복에 안전하지 않다.
컨슈머 내부 구조
poll() 메서드를 호출하는 시점에 데이터를 가져오는 것이 아닌, 컨슈머 애플리케이션 내부의 Fetcher 인스턴스가 미리 레코드들을 내부 큐에 가져온 뒤, poll() 메서드에 반환한다.
컨슈머 주요 옵션
필수 옵션
bootstrap.servers
전송할 대상이되는 카프카 클러스터의 host:port
key.serializer
직렬화 설정
value.serializer
직렬화 설정
주요 옵션
컨슈머 그룹 아이디 지정
subscribe() 메서드로 토픽을 구독하여 사용할 경우 필수
default: null
auto.offset.reset
저장된 컨슈머 오프셋이 없는 경우 어디서부터 읽을지 선택하는 옵션
latest: 가장 최근에 넣은 오프셋
earliest: 가장 오래전 오프셋
none: 컨슈머 그룹의 커밋 기록을 찾아본 뒤, 없으면 오류 반환, 있다면 기존 커밋 기록 이후
default: latest
enable.auto.commit
자동 커밋 여부
default: true
자동 커밋이 true일 경우 오프셋 커밋 간격을 정한다.
default: 5000(5초)
max.poll.records
poll() 메서드를 통해 반환되는 레코드 개수 지정
default: 500
컨슈머가 브로커와 연결이 끊키는 최대 시간
해당 시간내에 heartbeat를 전송하지 않는 경우 이슈 발생으로 가정되며, 리밸런싱이 일어난다.
보통 hearbeat의 3배로 설정
default: 10000(10초)
하트비트 전송 시간 간격
default: 3000(3초)
poll() 메서드를 호출하는 간격의 최대 시간 지정
해당 시간을 넘는 경우 비정상으로 판단 뒤 리밸런싱 시작
default: 300000(5분)
isolation.level
트랜잭션 프로듀서가 레코드를 트랜잭션 단위로 보낼 경우 사용
read_committed
커밋이 완료된 레코드만 읽는다.
read_uncommitted
커밋 여부와 상관없이 읽는다.
deafult
참고
어드민 API
카프카 클라이언트 내부 옵션들의 설정, 조회를 위한 API
활용 사례
토픽의 파티션 개수 조회
ACL이 적용된 클러스터의 접근 권한 수정
특정 토픽의 데이터양 변동 감지 → 파티션 추가
주요 메서드
describeCluster(DescribeClusteroptions options)
브로커 정보 조회
listTopics(ListTopicsOptions options)
토픽 리스트 조회
listConsumerGroups(ListConsumerGroupsOptions options)
컨슈머 그룹 조회
createTopics(Collection<NewTopic> new Topics, CreateTopicOptions options)
신규 토픽 생성
createPartitions(Map<String,NewPartitions> newPartitions, CreatePartitionsOptions options)
파티션 개수 변경
createAcls(Collection <AclBinding> acls, CreateAclsOptions options)
접근 제어 규칙 생성
Last updated