3.4 카프카 클라이언트

카프카 클라이언트

카프카 클러스터에 명령을 내리거나 데이터를 송수신하는 라이브러리

프로듀서 API

  • 데이터의 시작점으로 카프카에 필요한 데이터를 선언하고 리더 파티션에 전송.

  • 프로듀서는 데이터를 직렬화 하여, 동영상,이미지 같은 바이너리 데이터도 프로듀서를 통해 전송 할 수 있다.

실습 Code (GitHub)

https://github.com/machine-geon/simple_kafka

프로듀서 중요 개념

  • 토픽과 메시지값은 생성 필수 파라미터이다.

  • 파티션 번호 지정, 타임스탬프 설정, 메시지 키 설정 또한 가능하다.

  • 레코드의 타임스탬프는 카프카 브로커에 저장될 때 브로커 시간을 기준으로 설정 → 필요에 따라 변경 가능.

전송 단계

  1. Producer가 send() 메서드를 호출할 때, Partitioner에서 토픽의 어느 파티션으로 전송할지 정한다.

  2. Partitioner에 의해 구분된 record는 데이터를 전송하기 전에 accumulator에 버퍼로 쌓아놓고 발송한다. → batch로 묶어서 전송.

파티셔너

  • 프로듀서 API는 2가지 파티션을 제공한다.

    • UniformStickyPartitioner (ver 2.5.0 default)

    • RoundRobinPartitioner (ver 2.4.0 이전에서 default)

  • 메지지 키의 해시값과 파티션을 매칭하여 데이터를 전송하는 점은 동일하지만, 메시지 키가 없을 때 UniformStickyPartitioner가 더 효율적이다.

  • Partitioner 인터페이스를 상속받아 사용자 정의 클래스에서 파티션 지정 로직을 적용할 수 있다.

압축

  • 압축 옵션을 통해 브로커로 전송 시 압축 방식을 지정할 수 있다.

    • gzip, snappy,lz4, zstd 지원

  • 압축 옵션을 지정하지 않는 경우 압축이 되지 않는 상태로 전송된다.

  • 리소스는 이득, 그러나 CPU, 메모리 리소스 손해

  • Consumer에서 압축을 풀면서 리소스가 사용되는 점을 주의.

프로듀서 주요 옵션

필수 옵션

  • bootstrap.servers

    • 전송할 대상이되는 카프카 클러스터의 host:port

  • key.serializer

    • 직렬화 설정

  • value.serializer

    • 직렬화 설정

선택 옵션

  • acks

    • 1: 리더 파티션의 저장을 전송 성공으로 판단(default)

    • 0: 저장 여부와 상관 없이 성공으로 판단

    • -1(all): min.insync.replicas(isr) 개수에 해당하는 파티션에 저장을 전송 성공으로 판단

  • buffer.memory

    • 버퍼 메모리양

    • default: 33554432(32MB)

  • retries

    • 재전송 시도 횟수

    • default: 2147483647

  • batch.size

    • 배치로 전송할 레코드 최대 용량

    • size가 너무 작은 경우 네트워크 부담, size가 너무 큰 경우 메모리에 부담

    • default: 16384 (16KB)

  • linger.ms

    • 배치 전송하기 전까지 기다리는 최소 시간

    • default: 0

  • partitioner.class

    • 레코드를 파티션에 전송할 때 적용하는 파티셔너 클래스를 지정

    • default: org.apache.kafka.clients.producer.internals.DefaultPartitioner

  • enable.idempotence

    • 멱등성 ( f(f(x)) = f(x) )

    • enable.idempotence를 사용하면, 하나의 Batch가 실패하면, 같은 Partition으로 들어오는 후속 Batch들도 OutOfOrderSequenceException과 함께 실패 → 메시지 순서 보장

    • default: false

  • transactional.id

    • 레코드를 전송할 때 트랜잭션 단위로 묶을지 여부

    • 프로듀서의 고유한 트랜잭션 아이디를 설정할 수 있다.

    • 설정할 경우 트랜잭션 프로듀서로 동작

참고

GitBook

컨슈머 API

  • 브로커에 적재된 데이터를 사용하기 위한 API

  • 목적에 따라 컨슈머 그룹 이름을 정하는 것을 추천한다. (ex. email-application-group)

  • 컨슈머에게 토픽을 할당하기 위해 subscribe() 메서드를 사용한다. →Collection 타입의 String 값으로 토픽 이름들을 받는다.

중요 개념

운영 방법 2가지

  • 1개 이상의 컨슈머로 이루어진 컨슈머 그룹 운영

    • 컨슈머 그룹으로 묶인 컨슈머들은 토픽의 파티션들에게 할당되어 데이터를 수집한다.

    • 단, 1개의 파티션은 최대 1개의 컨슈머에 할당 가능하다. →컨슈머 그룹의 컨슈머 개수는 토픽의 파티션 개수와 같거나 작아야한다.

  • 토픽의 특정 파티션만 구독하는 컨슈머를 운영

컨슈머 그룹간의 격리

  • 컨슈머 그룹 끼리의 영향을 받지 않는다. → 파이프라인을 운영함에 있어 저장소의 장애에 유연하게 대응할 수 있다.

  • 컨슈머 그룹으로 나눌 수 있는 경우 나누는 것을 추천한다.

컨슈머 그룹의 장애 발생시

  • 장애가 발생한 컨슈머에 할당된 파티션은 활동준인 컨슈머에 소유권이 넘어간다. → 이를 rebalancing(리밸런싱)이라 한다.

  • 리밸런싱 발생 상황

    • 컨슈머의 추가

    • 컨슈머의 장애

  • 단, 리밸런싱이 일어나게 되면, 재할당 과정에서 컨슈머들은 데이터를 수집할 수 없다.

컨슈머 커밋

  • 커밋(commit)을 통해 데이터를 어디까지 가져갔는지 기록한다. → 내부 토픽(__consumer_offsets)에 기록, 기록이 되지 않는 경우 중복 발생의 원인이 될 수 있다.

  • 비명시 커밋

    • enable.auto.commit=true 옵션은 poll() 메서드가 수행될 때, 자동으로 커밋을 발생시킨다.

    • 데이터의 중복이나 유실을 허용하지 않는 서비스라면 자동 커밋의 사용은 좋지 않다.

  • 명시 커밋

    • poll() 메서드 호출 이후 반환 받은 데이터의 처리가 완료되고, commitSync() 메서드를 호출

    • commitSync() 메서드는 poll() 메서드를 통해 반환된 레코드의 가장 마지막 오프셋을 기준으로 커밋을 수행

    • 이는 커밋 요청 후 응답을 기다리는데 컨슈머의 처리량에 영향을 끼친다. → commitAsync() 메서드를 통해 보완할 수 있지만, 커밋 요청이 실패했을 경우, 현재 처리중인 데이터의 순서 및 중복에 안전하지 않다.

컨슈머 내부 구조

  • poll() 메서드를 호출하는 시점에 데이터를 가져오는 것이 아닌, 컨슈머 애플리케이션 내부의 Fetcher 인스턴스가 미리 레코드들을 내부 큐에 가져온 뒤, poll() 메서드에 반환한다.

컨슈머 주요 옵션

필수 옵션

  • bootstrap.servers

    • 전송할 대상이되는 카프카 클러스터의 host:port

  • key.serializer

    • 직렬화 설정

  • value.serializer

    • 직렬화 설정

주요 옵션

  • group.id

    • 컨슈머 그룹 아이디 지정

    • subscribe() 메서드로 토픽을 구독하여 사용할 경우 필수

    • default: null

  • auto.offset.reset

    • 저장된 컨슈머 오프셋이 없는 경우 어디서부터 읽을지 선택하는 옵션

    • latest: 가장 최근에 넣은 오프셋

    • earliest: 가장 오래전 오프셋

    • none: 컨슈머 그룹의 커밋 기록을 찾아본 뒤, 없으면 오류 반환, 있다면 기존 커밋 기록 이후

    • default: latest

  • enable.auto.commit

    • 자동 커밋 여부

    • default: true

  • auto.commit.interval.ms

    • 자동 커밋이 true일 경우 오프셋 커밋 간격을 정한다.

    • default: 5000(5초)

  • max.poll.records

    • poll() 메서드를 통해 반환되는 레코드 개수 지정

    • default: 500

  • session.timeout.ms

    • 컨슈머가 브로커와 연결이 끊키는 최대 시간

    • 해당 시간내에 heartbeat를 전송하지 않는 경우 이슈 발생으로 가정되며, 리밸런싱이 일어난다.

    • 보통 hearbeat의 3배로 설정

    • default: 10000(10초)

  • heartbeat.interval.ms

    • 하트비트 전송 시간 간격

    • default: 3000(3초)

  • max.poll.interval.ms

    • poll() 메서드를 호출하는 간격의 최대 시간 지정

    • 해당 시간을 넘는 경우 비정상으로 판단 뒤 리밸런싱 시작

    • default: 300000(5분)

  • isolation.level

    • 트랜잭션 프로듀서가 레코드를 트랜잭션 단위로 보낼 경우 사용

    • read_committed

      • 커밋이 완료된 레코드만 읽는다.

    • read_uncommitted

      • 커밋 여부와 상관없이 읽는다.

      • deafult

참고

GitBook

어드민 API

  • 카프카 클라이언트 내부 옵션들의 설정, 조회를 위한 API

  • 활용 사례

    • 토픽의 파티션 개수 조회

    • ACL이 적용된 클러스터의 접근 권한 수정

    • 특정 토픽의 데이터양 변동 감지 → 파티션 추가

주요 메서드

  • describeCluster(DescribeClusteroptions options)

    • 브로커 정보 조회

  • listTopics(ListTopicsOptions options)

    • 토픽 리스트 조회

  • listConsumerGroups(ListConsumerGroupsOptions options)

    • 컨슈머 그룹 조회

  • createTopics(Collection<NewTopic> new Topics, CreateTopicOptions options)

    • 신규 토픽 생성

  • createPartitions(Map<String,NewPartitions> newPartitions, CreatePartitionsOptions options)

    • 파티션 개수 변경

  • createAcls(Collection <AclBinding> acls, CreateAclsOptions options)

    • 접근 제어 규칙 생성

Last updated