02. Confluent Kafka 추가기능
Rest Proxy
카프카 클라이언트를 사용하지 않고 Restful API를 사용해 카프카의 일부 기능을 사용하는 서비스
토픽 생성/삭제
메시지 프로듀싱/컨슘
카프카 클러스터 상태 조회
Kafka Broker와 별도로 구성
Confluent Community 라이센스
장점
테스트나 디버깅 편리
HTTP 서비스에 통합이 간편
단점
관리 포인트 증가
상대적으로 저조한 성능
제한된 기능만 제공
Schema Registry
스키마 레지스트리는 Producer와 Consumer가 주고 받으려는 메시지의 스키마를 서로 알게 해주고
호환을 강제한다
Versioning & Compatibility Types
Schema Registry는 동일 스키마에 대한 호환성 체크를 하기 위해 버전을 유지

Backward(기본설정): 필드 삭제, optional 필드 추가 허용 - 컨슈머부터 업그레이드
새로운 스키마를 사용하여 이전 데이터를 읽는 것이 가능한 것.
필드 삭제 혹은 기본 값이 있는 필드 추가인 경우
Forward: 필드 추가, optional 필드 삭제 허용 - 프로듀서부터 업그레이드
이전 스키마를 사용하여 새로운 데이터를 읽는 것이 가능한 것.
필드 추가 혹은 기본값이 있는 필드 삭제
Full: Backward, Forward 모두 만족함 - 순서 무관
기본 값이 있는 필드 추가 혹은 삭제
None: Backward, Forward 모두 만족하지 않음 - 순서 무관
동작 방식

Kafka Broker와 별도로 구성
스키마 ID를 추출하여 스키마를 스키마 레지스트리로 부터 수신. 받은 스키마를 통해 역직렬화 한다.
프로듀서와 컨슈머는 로컬 캐쉬에 해당 스키마가 존재하지 않는 경우 스키마 레지스트리와 통신
스키마 레지스트리 자체의 장애에 주의해야 한다.
실습
// host 설정
sudo vi /etc/hosts
127.0.0.1 schemaregistry1 restproxy1
docker compose -f docker-compose-confluent-schema-registry-and-rest-proxy.yml
docker-compose-confluent-schema-registry-and-rest-proxy.yml
version: '3'
services:
zookeeper-1:
hostname: zookeeper1
image: confluentinc/cp-zookeeper:6.2.0
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 12181
ZOOKEEPER_DATA_DIR: /zookeeper/data
ports:
- 12181:12181
volumes:
- ./zookeeper/data/1:/zookeeper/data
kafka-1:
hostname: kafka1
image: confluentinc/cp-kafka:6.2.0
depends_on:
- zookeeper-1
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:12181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:19092
KAFKA_LOG_DIRS: /kafka
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- 19092:19092
volumes:
- ./kafka/logs/1:/kafka
schema-registry-1:
hostname: schemaregistry1
image: confluentinc/cp-schema-registry:6.2.0
depends_on:
- kafka-1
environment:
SCHEMA_REGISTRY_HOST_NAME: schemaregistry1
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
-> 실제로 연결한 broker 목록
-> 스키마 레지스트리의 인스턴스 리더를 선택할 때 관여
-> 스키마 데이터 저장에 사용
SCHEMA_REGISTRY_LISTENERS: <http://0.0.0.0:18081>
SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: backward
-> 기본값이 backward
ports:
- 18081:18081
rest-proxy-1:
hostname: restproxy1
image: confluentinc/cp-kafka-rest:6.2.0
depends_on:
- schema-registry-1
ports:
- 18082:18082
environment:
KAFKA_REST_HOST_NAME: restproxy1
KAFKA_REST_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
-> rest proxy가 접근할 broker
KAFKA_REST_LISTENERS: <http://0.0.0.0:18082>
-> rest proxy endpoint
KAFKA_REST_SCHEMA_REGISTRY_URL: <http://schemaregistry1:18081>
-> 스키마 유효성 검증시 사용
KAFKA_REST_PRODUCER_THREADS: 3
KAFKA_REST_CONSUMER_THREADS: 3
-> 스레드 개수
KAFKA_REST_CONSUMER_REQUEST_TIMEOUT_MS: 5000
##################################
# 1. Schema Registry
##################################
# 스키마 생성
curl -v -XPOST -H'Content-Type: application/vnd.schemaregistry.v1+json' --data '{"schema": "{\\"type\\": \\"string\\"}"}' <http://schemaregistry1:18081/subjects/fast/versions>
curl -v -XPOST -H'Content-Type: application/vnd.schemaregistry.v1+json' --data '{"schema": "{\\"type\\": \\"int\\"}"}' <http://schemaregistry1:18081/subjects/campus/versions>
-> 생성 시 스키마 id가 출력된다.
# 등록한 subject(범위) 조회
curl -v -XGET <http://schemaregistry1:18081/subjects>
# 1. 설정한 호환성 타입 확인(글로벌 호환성 타입 변경)
curl -v -XGET <http://schemaregistry1:18081/config>
curl -v -XPUT --data '{"compatibility": "FULL"}' -H'Content-Type: application/vnd.schemaregistry.v1+json' <http://schemaregistry1:18081/config>
-> 글로벌 호환성 타입 FULL로 변경
curl -v -XGET <http://schemaregistry1:18081/config>
curl -v -XPUT --data '{"compatibility": "BACKWARD"}' -H'Content-Type: application/vnd.schemaregistry.v1+json' <http://schemaregistry1:18081/config>
-> 글로벌 호환성 타입 BACKWARD로 변경
curl -v -XGET <http://schemaregistry1:18081/config>
# subject별 호환성 타입 설정
curl -v -XPUT --data '{"compatibility": "NONE"}' -H'Content-Type: application/vnd.schemaregistry.v1+json' <http://schemaregistry1:18081/config/fast>
# 스키마 아이디로 등록된 스키마 조회(전역)
curl -v -XGET <http://schemaregistry1:18081/schemas/ids/1>
-> 스키마 아이디는 유니크하다.
# fast subject 내 버전으로 스키마 조회(지역)
curl -v -XGET <http://schemaregistry1:18081/subjects/fast/versions/1>
# fast subject 내 최신버전으로 스키마 조회
curl -v -XGET <http://schemaregistry1:18081/subjects/fast/versions/latest>
# 호환성 체크
curl -v -XPOST -H'Content-Type: application/vnd.schemaregistry.v1+json' --data '{"schema": "{\\"type\\": \\"int\\"}"}' <http://schemaregistry1:18081/compatibility/subjects/fast/versions/latest>
curl -v -XPOST -H'Content-Type: application/vnd.schemaregistry.v1+json' --data '{"schema": "{\\"type\\": \\"string\\"}"}' <http://schemaregistry1:18081/compatibility/subjects/campus/versions/latest>
curl -v -XPOST -H'Content-Type: application/vnd.schemaregistry.v1+json' --data '{"schema": "{\\"type\\": \\"int\\"}"}' <http://schemaregistry1:18081/compatibility/subjects/campus/versions/latest>
# subject 삭제
curl -v -XDELETE <http://schemaregistry1:18081/subjects/fast>
curl -v -XDELETE <http://schemaregistry1:18081/subjects/campus>
-> 삭제시 해당 id가 출력 된다.
##################################
# 2. REST Proxy
##################################
# Avro examples
# Topic 생성
curl -v -XPOST -H'Content-Type: application/vnd.kafka.avro.v2+json' -H'Accept: application/vnd.kafka.v2+json' --data '{"value_schema": "{\\"type\\": \\"record\\", \\"name\\": \\"account\\", \\"fields\\": [{\\"name\\": \\"balance\\", \\"type\\": \\"int\\"}]}", "records": [{"value": {"balance": 10000}}]}' <http://restproxy1:18082/topics/bank>
-> content-type을 avro를 추가, accept에 추가
-> key, value schema 지정 가능(위 코드에선 value만 지정)
# Consumer 생성
curl -v -XPOST -H'Content-Type: application/vnd.kafka.v2+json' --data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' <http://restproxy1:18082/consumers/my_consumer_group>
-> 호출 시 base url 출력이 된다.
# Topic 구독
curl -v -XPOST -H'Content-Type: application/vnd.kafka.v2+json' --data '{"topics":["bank"]}' <http://restproxy1:18082/consumers/my_consumer_group/instances/my_consumer_instance/subscription>
# 메세지 처음부터 소비
curl -v -XGET -H'Accept: application/vnd.kafka.avro.v2+json' <http://restproxy1:18082/consumers/my_consumer_group/instances/my_consumer_instance/records>
Last updated