2.2 카프카 커맨드 라인 툴

Kafka-topics.sh

circle-check

토픽 생성

// 기본적인 토픽 생성
$ bin/kafka-topics.sh \
--create \
--bootstrap-server my-kafka:9092 \
--topic hello.kafka 

// 옵션을 활용한 토픽 생성
$ bin/kafka-topics.sh \
--create \
--bootstrap-server my-kafka:9092 \
--partitions 3 \
--replication-factor 1 \
--config retention.ms=172800000 \
--topic hello.kafka.2

option

  • partitions : 파티션의 개수 (최소 = 1개, default = server.properties의 num.partitions)

  • replication-factor : 레플리카 개수 (최소 = 1개 / 2인 경우 (원본 1개 +복제품 1개))

  • config rentention.ms : 토픽 데이터 유지 기간

토픽 리스트 조회

토픽 상세 조회

토픽 옵션 수정

Kafka-console-producer.sh

  • kafka-console-producer.sh로 전송하는 레코드 값은 UTF-8을 기반으로 Byte로 변환되고 ByteArraySerializer로만 직렬화 된다. (텍스트 목적으로 문자열만 전송 가능)

  • parse.key = true -> 메시지 키 추가 가능

  • key.separator 원하는 separator 선언 가능.

  • 구분자 없이 입력할 경우 kafkaException 발생과 종료.

Kafka-console-consumer.sh

  • --from-beginning : 토픽에 저장된 가장 처음 데이터부터 출력.

  • print.key : 메시지 키 확인.

  • key.seperator : 미설정의 경우 tab delimieter(\t)로 설정되어 출력.

  • --group 옵션을 통해 신규 컨슈머 그룹(consumer group)을 생성.

    컨슈머 그룹을 통해 가져간 토픽의 메시지는 커밋.(레코드의 오프셋 번호를 브로커에 저장)

    커밋 정보는 __consumer_offsets 이름의 내부 토픽에 저장.

  • 파티션의 순서를 보장하고 싶다면 파티션 1개로 구성된 토픽을 생성하는 방법을 추천.

Kafka-consumer-groups.sh

  • group / topic / partition : 조회한 group이 마지막으로 커밋한 토픽과 파티션.

    -> hello-group 이름의 컨슈머 그룹이 hello.kafka 토픽의 0번 파티션의 레코드가 마지막으로 커밋.

  • current-offset : 컨슈머 그룹이 가져간 토픽의 파티션에 가장 최신 오프셋.

  • log-end-offset : 해당 컨슈머 그룹의 컨슈머가 어느 오프셋까지 커밋했는지 나타냄.

    -> current-offset은 log-end-offset과 같거나 작은값이다.

  • 렉(lag)은 컨슈머 그룹이 토픽의 파티션에 있는 데이터를 가져가는데 발생하는 지연.

  • consumer-id : 컨슈머의 토픽(파티션) 할당을 카프카 내부적으로 구분하기 위한 id.

    -> client-id + uuid(universally unique identifier) 값을 붙여서 자동 할당되어 유니크한 값으로 설정.

  • host : 컨슈머가 동작하는 host명.

  • client-id : 컨슈머에 할당된 id. (사용자 지정 가능 -> 지정하지 않을 시 자동 생성)

컨슈머 그룹의 상세정보를 통해

  1. 컨슈머 그룹의 중복 여부.

  2. 운영하는 컨슈머의 렉을 확인하여 컨슈머 상태 최적화.

    -> consumer의 lag이 증가하는 경우, producer가 topic에 data를 적재하는 속도에 비해 consumer의 처리량이 느리다는 증거.

Kafka-verifiable-producer,consumer.sh

카프카 클러스터 설치가 완료된 이후 토픽에 데이터를 전송하여 간단한 네트워크 통신 테스트에 사용.

Kafka-delete-records.sh

적재된 토픽의 데이터 삭제.

  • 삭제하고자 하는 데이터에 대한 정보를 파일로 저장해서 사용.

    -> delete-topic.json 파일로 생성. (토픽, 파티션, 오프셋 정보가 들어가야 한다.)

주의 : 토픽의 특정 레코드 하나만 삭제되는 것이 아닌, 파티션 내의 가장 오래된 오프셋부터 지정한 오프셋까지 삭제됨. ( 토픽의 파티션에 지정된 특정 데이터만 삭제할 수 없다.)

Last updated