2.2 카프카 커맨드 라인 툴

Kafka-topics.sh

Topic 생성 2가지

  1. Consumer, Producer 가 Broker에 생성되지 않은 Topic에 대해 데이터를 요청할 때.

  2. 'Command line tool' 명시적으로 Topic 생성.

Topic마다 처리 하는 데이터의 특성이 다르기 때문에 2번을 추천.

토픽 생성

// 기본적인 토픽 생성
$ bin/kafka-topics.sh \
--create \
--bootstrap-server my-kafka:9092 \
--topic hello.kafka 

// 옵션을 활용한 토픽 생성
$ bin/kafka-topics.sh \
--create \
--bootstrap-server my-kafka:9092 \
--partitions 3 \
--replication-factor 1 \
--config retention.ms=172800000 \
--topic hello.kafka.2

option

  • partitions : 파티션의 개수 (최소 = 1개, default = server.properties의 num.partitions)

  • replication-factor : 레플리카 개수 (최소 = 1개 / 2인 경우 (원본 1개 +복제품 1개))

  • config rentention.ms : 토픽 데이터 유지 기간

토픽 리스트 조회

$ bin/kafka-topics.sh --bootstrap-server my-kafka:9092 --list

토픽 상세 조회

$ bin/kafka-topics.sh --bootstrap-server my-kafka:9092 --describe --topic hello.kafka.2

토픽 옵션 수정

// 토픽관련 옵션은 topics 와 configs에 위치.

// 파티션 추가 (기존 3개)
$ bin/kafka-topics.sh --bootstrap-server my-kafka:9092 \
--topic hello.kafka \
--alter \
--partitions 4

// 조회
$ bin/kafka-topics.sh --bootstrap-server my-kafka:9092 \
--topic hello.kafka \
--describe


// 토픽 데이터 저장 기간 변경
$ bin/kafka-configs.sh --bootstrap-server my-kafka:9092 \
--entity-type topics \
--entity-name hello.kafka \
--alter --add-config retention.ms=86400000

// 조회
$ bin/kafka-configs.sh --bootstrap-server my-kafka:9092 \
--entity-type topics \
--entity-name hello.kafka \
--describe

Kafka-console-producer.sh

// 메시지 키 없이 메시지 값만 전송
$ bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 \
--topic hello.kafka
>hello
>kafka
>0
>1
>2
>3
>4
>5

// 메시지 키를 가지는 레코드 전송
$ bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 \
--topic hello.kafka \
--property "parse.key=true" \
--property "key.separator=:"
>key1:no1
>key2:no2
>key3:no3

  • kafka-console-producer.sh로 전송하는 레코드 값은 UTF-8을 기반으로 Byte로 변환되고 ByteArraySerializer로만 직렬화 된다. (텍스트 목적으로 문자열만 전송 가능)

  • parse.key = true -> 메시지 키 추가 가능

  • key.separator 원하는 separator 선언 가능.

  • 구분자 없이 입력할 경우 kafkaException 발생과 종료.

Kafka-console-consumer.sh

$ bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
--topic hello.kafka \
--from-beginning

$ bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
--topic hello.kafka \
--property print.key=true \
--property key.separator="-" \
--group hello-group \
--from-beginning
  • --from-beginning : 토픽에 저장된 가장 처음 데이터부터 출력.

  • print.key : 메시지 키 확인.

  • key.seperator : 미설정의 경우 tab delimieter(\t)로 설정되어 출력.

  • --group 옵션을 통해 신규 컨슈머 그룹(consumer group)을 생성.

    컨슈머 그룹을 통해 가져간 토픽의 메시지는 커밋.(레코드의 오프셋 번호를 브로커에 저장)

    커밋 정보는 __consumer_offsets 이름의 내부 토픽에 저장.

  • 파티션의 순서를 보장하고 싶다면 파티션 1개로 구성된 토픽을 생성하는 방법을 추천.

Kafka-consumer-groups.sh

// 생성된 컨슈머 그룹 리스트
$ bin/kafka-consumer-groups.sh --bootstrap-server my-kafka:9092 --list

// 컨슈머 그룹의 상세 내용
$ bin/kafka-consumer-groups.sh --bootstrap-server my-kafka:9092 \
--group hello-group \
--describe

// 컨슈머 그룹 상세 내용에 대한 출력
Consumer group 'hello-group' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
hello-group     hello.kafka     0          7               7               0               -               -               -
  • group / topic / partition : 조회한 group이 마지막으로 커밋한 토픽과 파티션.

    -> hello-group 이름의 컨슈머 그룹이 hello.kafka 토픽의 0번 파티션의 레코드가 마지막으로 커밋.

  • current-offset : 컨슈머 그룹이 가져간 토픽의 파티션에 가장 최신 오프셋.

  • log-end-offset : 해당 컨슈머 그룹의 컨슈머가 어느 오프셋까지 커밋했는지 나타냄.

    -> current-offset은 log-end-offset과 같거나 작은값이다.

  • 렉(lag)은 컨슈머 그룹이 토픽의 파티션에 있는 데이터를 가져가는데 발생하는 지연.

  • consumer-id : 컨슈머의 토픽(파티션) 할당을 카프카 내부적으로 구분하기 위한 id.

    -> client-id + uuid(universally unique identifier) 값을 붙여서 자동 할당되어 유니크한 값으로 설정.

  • host : 컨슈머가 동작하는 host명.

  • client-id : 컨슈머에 할당된 id. (사용자 지정 가능 -> 지정하지 않을 시 자동 생성)

컨슈머 그룹의 상세정보를 통해

  1. 컨슈머 그룹의 중복 여부.

  2. 운영하는 컨슈머의 렉을 확인하여 컨슈머 상태 최적화.

    -> consumer의 lag이 증가하는 경우, producer가 topic에 data를 적재하는 속도에 비해 consumer의 처리량이 느리다는 증거.

Kafka-verifiable-producer,consumer.sh

카프카 클러스터 설치가 완료된 이후 토픽에 데이터를 전송하여 간단한 네트워크 통신 테스트에 사용.

// 데이터 전송
$ bin/kafka-verifiable-producer.sh --bootstrap-server my-kafka:9092 \
--max-message 10 \
--topic verify-test

// 전송한 데이터 확인
$ bin/kafka-verifiable-consumer.sh --bootstrap-server my-kafka:9092 \
--topic verify-test \
--group-id test-group

// 출력된 데이터
{"timestamp":1637043498474,"name":"startup_complete"}
{"timestamp":1637043500155,"name":"partitions_assigned","partitions":[{"topic":"verify-test","partition":0}]}
{"timestamp":1637043501146,"name":"records_consumed","count":10,"partitions":[{"topic":"verify-test","partition":0,"count":10,"minOffset":0,"maxOffset":9}]}
{"timestamp":1637043501336,"name":"offsets_committed","offsets":[{"topic":"verify-test","partition":0,"offset":10}],"success":true}

Kafka-delete-records.sh

적재된 토픽의 데이터 삭제.

$ vi delete-topic.json
{"partitions": [{"topic": "test", "partition": 0, "offset": 50}], "version":1 }

$ bin/kafka-delete-records.sh --bootstrap-server my-kafka:9092 \
--offset-json-file delete-topic.json
  • 삭제하고자 하는 데이터에 대한 정보를 파일로 저장해서 사용.

    -> delete-topic.json 파일로 생성. (토픽, 파티션, 오프셋 정보가 들어가야 한다.)

주의 : 토픽의 특정 레코드 하나만 삭제되는 것이 아닌, 파티션 내의 가장 오래된 오프셋부터 지정한 오프셋까지 삭제됨. ( 토픽의 파티션에 지정된 특정 데이터만 삭제할 수 없다.)

Last updated