비지니스 요구사항에 따라 메시지의 스키마가 변경 될 수 있다.
→ 하나의 컨슈머라도 바뀐 스키마를 다루지 못할 때, 컨슈머 서비스에서 장애가 발생할 수 있다. 이와 같은 상황을 방지하기 위해 상호 스키마 유효성을 확인해주는 작업이 필요하다.
Versioning & Compatibility Types
아브로(Avro)는 아파치의 하둡 프로젝트에서 개발된 원격 프로시저 호출(RPC) 및 데이터 직렬화 프레임워크이다. 자료형과 프로토콜 정의를 위해 JSON을 사용하며 콤팩트 바이너리 포맷으로 데이터를 직렬화한다.
// host 설정
sudo vi /etc/hosts
127.0.0.1 schemaregistry1 restproxy1
docker compose -f docker-compose-confluent-schema-registry-and-rest-proxy.yml
docker-compose-confluent-schema-registry-and-rest-proxy.yml
version: '3'
services:
zookeeper-1:
hostname: zookeeper1
image: confluentinc/cp-zookeeper:6.2.0
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 12181
ZOOKEEPER_DATA_DIR: /zookeeper/data
ports:
- 12181:12181
volumes:
- ./zookeeper/data/1:/zookeeper/data
kafka-1:
hostname: kafka1
image: confluentinc/cp-kafka:6.2.0
depends_on:
- zookeeper-1
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:12181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:19092
KAFKA_LOG_DIRS: /kafka
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- 19092:19092
volumes:
- ./kafka/logs/1:/kafka
schema-registry-1:
hostname: schemaregistry1
image: confluentinc/cp-schema-registry:6.2.0
depends_on:
- kafka-1
environment:
SCHEMA_REGISTRY_HOST_NAME: schemaregistry1
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
-> 실제로 연결한 broker 목록
-> 스키마 레지스트리의 인스턴스 리더를 선택할 때 관여
-> 스키마 데이터 저장에 사용
SCHEMA_REGISTRY_LISTENERS: <http://0.0.0.0:18081>
SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: backward
-> 기본값이 backward
ports:
- 18081:18081
rest-proxy-1:
hostname: restproxy1
image: confluentinc/cp-kafka-rest:6.2.0
depends_on:
- schema-registry-1
ports:
- 18082:18082
environment:
KAFKA_REST_HOST_NAME: restproxy1
KAFKA_REST_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
-> rest proxy가 접근할 broker
KAFKA_REST_LISTENERS: <http://0.0.0.0:18082>
-> rest proxy endpoint
KAFKA_REST_SCHEMA_REGISTRY_URL: <http://schemaregistry1:18081>
-> 스키마 유효성 검증시 사용
KAFKA_REST_PRODUCER_THREADS: 3
KAFKA_REST_CONSUMER_THREADS: 3
-> 스레드 개수
KAFKA_REST_CONSUMER_REQUEST_TIMEOUT_MS: 5000
##################################
# 1. Schema Registry
##################################
# 스키마 생성
curl -v -XPOST -H'Content-Type: application/vnd.schemaregistry.v1+json' --data '{"schema": "{\\"type\\": \\"string\\"}"}' <http://schemaregistry1:18081/subjects/fast/versions>
curl -v -XPOST -H'Content-Type: application/vnd.schemaregistry.v1+json' --data '{"schema": "{\\"type\\": \\"int\\"}"}' <http://schemaregistry1:18081/subjects/campus/versions>
-> 생성 시 스키마 id가 출력된다.
# 등록한 subject(범위) 조회
curl -v -XGET <http://schemaregistry1:18081/subjects>
# 1. 설정한 호환성 타입 확인(글로벌 호환성 타입 변경)
curl -v -XGET <http://schemaregistry1:18081/config>
curl -v -XPUT --data '{"compatibility": "FULL"}' -H'Content-Type: application/vnd.schemaregistry.v1+json' <http://schemaregistry1:18081/config>
-> 글로벌 호환성 타입 FULL로 변경
curl -v -XGET <http://schemaregistry1:18081/config>
curl -v -XPUT --data '{"compatibility": "BACKWARD"}' -H'Content-Type: application/vnd.schemaregistry.v1+json' <http://schemaregistry1:18081/config>
-> 글로벌 호환성 타입 BACKWARD로 변경
curl -v -XGET <http://schemaregistry1:18081/config>
# subject별 호환성 타입 설정
curl -v -XPUT --data '{"compatibility": "NONE"}' -H'Content-Type: application/vnd.schemaregistry.v1+json' <http://schemaregistry1:18081/config/fast>
# 스키마 아이디로 등록된 스키마 조회(전역)
curl -v -XGET <http://schemaregistry1:18081/schemas/ids/1>
-> 스키마 아이디는 유니크하다.
# fast subject 내 버전으로 스키마 조회(지역)
curl -v -XGET <http://schemaregistry1:18081/subjects/fast/versions/1>
# fast subject 내 최신버전으로 스키마 조회
curl -v -XGET <http://schemaregistry1:18081/subjects/fast/versions/latest>
# 호환성 체크
curl -v -XPOST -H'Content-Type: application/vnd.schemaregistry.v1+json' --data '{"schema": "{\\"type\\": \\"int\\"}"}' <http://schemaregistry1:18081/compatibility/subjects/fast/versions/latest>
curl -v -XPOST -H'Content-Type: application/vnd.schemaregistry.v1+json' --data '{"schema": "{\\"type\\": \\"string\\"}"}' <http://schemaregistry1:18081/compatibility/subjects/campus/versions/latest>
curl -v -XPOST -H'Content-Type: application/vnd.schemaregistry.v1+json' --data '{"schema": "{\\"type\\": \\"int\\"}"}' <http://schemaregistry1:18081/compatibility/subjects/campus/versions/latest>
# subject 삭제
curl -v -XDELETE <http://schemaregistry1:18081/subjects/fast>
curl -v -XDELETE <http://schemaregistry1:18081/subjects/campus>
-> 삭제시 해당 id가 출력 된다.
##################################
# 2. REST Proxy
##################################
# Avro examples
# Topic 생성
curl -v -XPOST -H'Content-Type: application/vnd.kafka.avro.v2+json' -H'Accept: application/vnd.kafka.v2+json' --data '{"value_schema": "{\\"type\\": \\"record\\", \\"name\\": \\"account\\", \\"fields\\": [{\\"name\\": \\"balance\\", \\"type\\": \\"int\\"}]}", "records": [{"value": {"balance": 10000}}]}' <http://restproxy1:18082/topics/bank>
-> content-type을 avro를 추가, accept에 추가
-> key, value schema 지정 가능(위 코드에선 value만 지정)
# Consumer 생성
curl -v -XPOST -H'Content-Type: application/vnd.kafka.v2+json' --data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' <http://restproxy1:18082/consumers/my_consumer_group>
-> 호출 시 base url 출력이 된다.
# Topic 구독
curl -v -XPOST -H'Content-Type: application/vnd.kafka.v2+json' --data '{"topics":["bank"]}' <http://restproxy1:18082/consumers/my_consumer_group/instances/my_consumer_instance/subscription>
# 메세지 처음부터 소비
curl -v -XGET -H'Accept: application/vnd.kafka.avro.v2+json' <http://restproxy1:18082/consumers/my_consumer_group/instances/my_consumer_instance/records>
Avro 스키마 (avsc)
// User.avsc
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}