02. Confluent Kafka 추가기능
Rest Proxy
카프카 클라이언트를 사용하지 않고 Restful API를 사용해 카프카의 일부 기능을 사용하는 서비스
토픽 생성/삭제
메시지 프로듀싱/컨슘
카프카 클러스터 상태 조회
Kafka Broker와 별도로 구성
Confluent Community 라이센스
장점
테스트나 디버깅 편리
HTTP 서비스에 통합이 간편
단점
관리 포인트 증가
상대적으로 저조한 성능
제한된 기능만 제공
Schema Registry
스키마 레지스트리는 Producer와 Consumer가 주고 받으려는 메시지의 스키마를 서로 알게 해주고
호환을 강제한다
비지니스 요구사항에 따라 메시지의 스키마가 변경 될 수 있다.
→ 하나의 컨슈머라도 바뀐 스키마를 다루지 못할 때, 컨슈머 서비스에서 장애가 발생할 수 있다. 이와 같은 상황을 방지하기 위해 상호 스키마 유효성을 확인해주는 작업이 필요하다.
Versioning & Compatibility Types
Schema Registry는 동일 스키마에 대한 호환성 체크를 하기 위해 버전을 유지
Backward(기본설정): 필드 삭제, optional 필드 추가 허용 - 컨슈머부터 업그레이드
새로운 스키마를 사용하여 이전 데이터를 읽는 것이 가능한 것.
필드 삭제 혹은 기본 값이 있는 필드 추가인 경우
Forward: 필드 추가, optional 필드 삭제 허용 - 프로듀서부터 업그레이드
이전 스키마를 사용하여 새로운 데이터를 읽는 것이 가능한 것.
필드 추가 혹은 기본값이 있는 필드 삭제
Full: Backward, Forward 모두 만족함 - 순서 무관
기본 값이 있는 필드 추가 혹은 삭제
None: Backward, Forward 모두 만족하지 않음 - 순서 무관
새로운 스키마 = 새로 등록할 스키마
가장 최근에 등록된 스키마 = 스키마 레지스트리에 등록된 스키마의 마지막 버
TRANSITIVE = 모든 등록된 버전으로 확인한다는 뜻이며, {OPTION}_TRANSITIVE가 없는 경우 마지막 버전으로 확인한다.
동작 방식
Kafka Broker와 별도로 구성
스키마 ID를 추출하여 스키마를 스키마 레지스트리로 부터 수신. 받은 스키마를 통해 역직렬화 한다.
프로듀서와 컨슈머는 로컬 캐쉬에 해당 스키마가 존재하지 않는 경우 스키마 레지스트리와 통신
스키마 레지스트리 자체의 장애에 주의해야 한다.
아브로(Avro)는 아파치의 하둡 프로젝트에서 개발된 원격 프로시저 호출(RPC) 및 데이터 직렬화 프레임워크이다. 자료형과 프로토콜 정의를 위해 JSON을 사용하며 콤팩트 바이너리 포맷으로 데이터를 직렬화한다.
실습
// host 설정
sudo vi /etc/hosts
127.0.0.1 schemaregistry1 restproxy1
docker compose -f docker-compose-confluent-schema-registry-and-rest-proxy.yml
docker-compose-confluent-schema-registry-and-rest-proxy.yml
version: '3'
services:
zookeeper-1:
hostname: zookeeper1
image: confluentinc/cp-zookeeper:6.2.0
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 12181
ZOOKEEPER_DATA_DIR: /zookeeper/data
ports:
- 12181:12181
volumes:
- ./zookeeper/data/1:/zookeeper/data
kafka-1:
hostname: kafka1
image: confluentinc/cp-kafka:6.2.0
depends_on:
- zookeeper-1
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:12181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:19092
KAFKA_LOG_DIRS: /kafka
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- 19092:19092
volumes:
- ./kafka/logs/1:/kafka
schema-registry-1:
hostname: schemaregistry1
image: confluentinc/cp-schema-registry:6.2.0
depends_on:
- kafka-1
environment:
SCHEMA_REGISTRY_HOST_NAME: schemaregistry1
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
-> 실제로 연결한 broker 목록
-> 스키마 레지스트리의 인스턴스 리더를 선택할 때 관여
-> 스키마 데이터 저장에 사용
SCHEMA_REGISTRY_LISTENERS: <http://0.0.0.0:18081>
SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: backward
-> 기본값이 backward
ports:
- 18081:18081
rest-proxy-1:
hostname: restproxy1
image: confluentinc/cp-kafka-rest:6.2.0
depends_on:
- schema-registry-1
ports:
- 18082:18082
environment:
KAFKA_REST_HOST_NAME: restproxy1
KAFKA_REST_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
-> rest proxy가 접근할 broker
KAFKA_REST_LISTENERS: <http://0.0.0.0:18082>
-> rest proxy endpoint
KAFKA_REST_SCHEMA_REGISTRY_URL: <http://schemaregistry1:18081>
-> 스키마 유효성 검증시 사용
KAFKA_REST_PRODUCER_THREADS: 3
KAFKA_REST_CONSUMER_THREADS: 3
-> 스레드 개수
KAFKA_REST_CONSUMER_REQUEST_TIMEOUT_MS: 5000
##################################
# 1. Schema Registry
##################################
# 스키마 생성
curl -v -XPOST -H'Content-Type: application/vnd.schemaregistry.v1+json' --data '{"schema": "{\\"type\\": \\"string\\"}"}' <http://schemaregistry1:18081/subjects/fast/versions>
curl -v -XPOST -H'Content-Type: application/vnd.schemaregistry.v1+json' --data '{"schema": "{\\"type\\": \\"int\\"}"}' <http://schemaregistry1:18081/subjects/campus/versions>
-> 생성 시 스키마 id가 출력된다.
# 등록한 subject(범위) 조회
curl -v -XGET <http://schemaregistry1:18081/subjects>
# 1. 설정한 호환성 타입 확인(글로벌 호환성 타입 변경)
curl -v -XGET <http://schemaregistry1:18081/config>
curl -v -XPUT --data '{"compatibility": "FULL"}' -H'Content-Type: application/vnd.schemaregistry.v1+json' <http://schemaregistry1:18081/config>
-> 글로벌 호환성 타입 FULL로 변경
curl -v -XGET <http://schemaregistry1:18081/config>
curl -v -XPUT --data '{"compatibility": "BACKWARD"}' -H'Content-Type: application/vnd.schemaregistry.v1+json' <http://schemaregistry1:18081/config>
-> 글로벌 호환성 타입 BACKWARD로 변경
curl -v -XGET <http://schemaregistry1:18081/config>
# subject별 호환성 타입 설정
curl -v -XPUT --data '{"compatibility": "NONE"}' -H'Content-Type: application/vnd.schemaregistry.v1+json' <http://schemaregistry1:18081/config/fast>
# 스키마 아이디로 등록된 스키마 조회(전역)
curl -v -XGET <http://schemaregistry1:18081/schemas/ids/1>
-> 스키마 아이디는 유니크하다.
# fast subject 내 버전으로 스키마 조회(지역)
curl -v -XGET <http://schemaregistry1:18081/subjects/fast/versions/1>
# fast subject 내 최신버전으로 스키마 조회
curl -v -XGET <http://schemaregistry1:18081/subjects/fast/versions/latest>
# 호환성 체크
curl -v -XPOST -H'Content-Type: application/vnd.schemaregistry.v1+json' --data '{"schema": "{\\"type\\": \\"int\\"}"}' <http://schemaregistry1:18081/compatibility/subjects/fast/versions/latest>
curl -v -XPOST -H'Content-Type: application/vnd.schemaregistry.v1+json' --data '{"schema": "{\\"type\\": \\"string\\"}"}' <http://schemaregistry1:18081/compatibility/subjects/campus/versions/latest>
curl -v -XPOST -H'Content-Type: application/vnd.schemaregistry.v1+json' --data '{"schema": "{\\"type\\": \\"int\\"}"}' <http://schemaregistry1:18081/compatibility/subjects/campus/versions/latest>
# subject 삭제
curl -v -XDELETE <http://schemaregistry1:18081/subjects/fast>
curl -v -XDELETE <http://schemaregistry1:18081/subjects/campus>
-> 삭제시 해당 id가 출력 된다.
##################################
# 2. REST Proxy
##################################
# Avro examples
# Topic 생성
curl -v -XPOST -H'Content-Type: application/vnd.kafka.avro.v2+json' -H'Accept: application/vnd.kafka.v2+json' --data '{"value_schema": "{\\"type\\": \\"record\\", \\"name\\": \\"account\\", \\"fields\\": [{\\"name\\": \\"balance\\", \\"type\\": \\"int\\"}]}", "records": [{"value": {"balance": 10000}}]}' <http://restproxy1:18082/topics/bank>
-> content-type을 avro를 추가, accept에 추가
-> key, value schema 지정 가능(위 코드에선 value만 지정)
# Consumer 생성
curl -v -XPOST -H'Content-Type: application/vnd.kafka.v2+json' --data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' <http://restproxy1:18082/consumers/my_consumer_group>
-> 호출 시 base url 출력이 된다.
# Topic 구독
curl -v -XPOST -H'Content-Type: application/vnd.kafka.v2+json' --data '{"topics":["bank"]}' <http://restproxy1:18082/consumers/my_consumer_group/instances/my_consumer_instance/subscription>
# 메세지 처음부터 소비
curl -v -XGET -H'Accept: application/vnd.kafka.avro.v2+json' <http://restproxy1:18082/consumers/my_consumer_group/instances/my_consumer_instance/records>
Avro 스키마 (avsc)
// User.avsc
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [ {"name": "name", "type": "string"},
{"name": "age", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
Last updated