02. Confluent Kafka 추가기능

Rest Proxy

  • 카프카 클라이언트를 사용하지 않고 Restful API를 사용해 카프카의 일부 기능을 사용하는 서비스

    • 토픽 생성/삭제

    • 메시지 프로듀싱/컨슘

    • 카프카 클러스터 상태 조회

  • Kafka Broker와 별도로 구성

  • Confluent Community 라이센스

  • 장점

    • 테스트나 디버깅 편리

    • HTTP 서비스에 통합이 간편

  • 단점

    • 관리 포인트 증가

    • 상대적으로 저조한 성능

    • 제한된 기능만 제공

Schema Registry

스키마 레지스트리는 Producer와 Consumer가 주고 받으려는 메시지의 스키마를 서로 알게 해주고

호환을 강제한다

비지니스 요구사항에 따라 메시지의 스키마가 변경 될 수 있다.

→ 하나의 컨슈머라도 바뀐 스키마를 다루지 못할 때, 컨슈머 서비스에서 장애가 발생할 수 있다. 이와 같은 상황을 방지하기 위해 상호 스키마 유효성을 확인해주는 작업이 필요하다.

Versioning & Compatibility Types

Schema Registry는 동일 스키마에 대한 호환성 체크를 하기 위해 버전을 유지

  • Backward(기본설정): 필드 삭제, optional 필드 추가 허용 - 컨슈머부터 업그레이드

    • 새로운 스키마를 사용하여 이전 데이터를 읽는 것이 가능한 것.

    • 필드 삭제 혹은 기본 값이 있는 필드 추가인 경우

  • Forward: 필드 추가, optional 필드 삭제 허용 - 프로듀서부터 업그레이드

    • 이전 스키마를 사용하여 새로운 데이터를 읽는 것이 가능한 것.

    • 필드 추가 혹은 기본값이 있는 필드 삭제

  • Full: Backward, Forward 모두 만족함 - 순서 무관

    • 기본 값이 있는 필드 추가 혹은 삭제

  • None: Backward, Forward 모두 만족하지 않음 - 순서 무관

새로운 스키마 = 새로 등록할 스키마

  • 가장 최근에 등록된 스키마 = 스키마 레지스트리에 등록된 스키마의 마지막 버

  • TRANSITIVE = 모든 등록된 버전으로 확인한다는 뜻이며, {OPTION}_TRANSITIVE가 없는 경우 마지막 버전으로 확인한다.

동작 방식

  • Kafka Broker와 별도로 구성

  • 스키마 ID를 추출하여 스키마를 스키마 레지스트리로 부터 수신. 받은 스키마를 통해 역직렬화 한다.

  • 프로듀서와 컨슈머는 로컬 캐쉬에 해당 스키마가 존재하지 않는 경우 스키마 레지스트리와 통신

  • 스키마 레지스트리 자체의 장애에 주의해야 한다.

아브로(Avro)는 아파치의 하둡 프로젝트에서 개발된 원격 프로시저 호출(RPC) 및 데이터 직렬화 프레임워크이다. 자료형과 프로토콜 정의를 위해 JSON을 사용하며 콤팩트 바이너리 포맷으로 데이터를 직렬화한다.

실습

// host 설정
sudo vi /etc/hosts
127.0.0.1 schemaregistry1 restproxy1

docker compose -f docker-compose-confluent-schema-registry-and-rest-proxy.yml
docker-compose-confluent-schema-registry-and-rest-proxy.yml
version: '3'
services:
  zookeeper-1:
    hostname: zookeeper1
    image: confluentinc/cp-zookeeper:6.2.0
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 12181
      ZOOKEEPER_DATA_DIR: /zookeeper/data
    ports:
      - 12181:12181
    volumes:
      - ./zookeeper/data/1:/zookeeper/data

  kafka-1:
    hostname: kafka1
    image: confluentinc/cp-kafka:6.2.0
    depends_on:
      - zookeeper-1
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper1:12181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:19092
      KAFKA_LOG_DIRS: /kafka
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    ports:
      - 19092:19092
    volumes:
      - ./kafka/logs/1:/kafka

  schema-registry-1:
    hostname: schemaregistry1
    image: confluentinc/cp-schema-registry:6.2.0
    depends_on:
      - kafka-1
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schemaregistry1
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
			-> 실제로 연결한 broker 목록
			-> 스키마 레지스트리의 인스턴스 리더를 선택할 때 관여
			-> 스키마 데이터 저장에 사용
      SCHEMA_REGISTRY_LISTENERS: <http://0.0.0.0:18081>
      SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: backward
			-> 기본값이 backward
    ports:
      - 18081:18081

  rest-proxy-1:
    hostname: restproxy1
    image: confluentinc/cp-kafka-rest:6.2.0
    depends_on:
      - schema-registry-1
    ports:
      - 18082:18082
    environment:
      KAFKA_REST_HOST_NAME: restproxy1
      KAFKA_REST_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
			-> rest proxy가 접근할 broker
      KAFKA_REST_LISTENERS: <http://0.0.0.0:18082>
			-> rest proxy endpoint
      KAFKA_REST_SCHEMA_REGISTRY_URL: <http://schemaregistry1:18081>
			-> 스키마 유효성 검증시 사용
      KAFKA_REST_PRODUCER_THREADS: 3
      KAFKA_REST_CONSUMER_THREADS: 3
			-> 스레드 개수
      KAFKA_REST_CONSUMER_REQUEST_TIMEOUT_MS: 5000
##################################
# 1. Schema Registry
##################################

# 스키마 생성
curl -v -XPOST -H'Content-Type: application/vnd.schemaregistry.v1+json' --data '{"schema": "{\\"type\\": \\"string\\"}"}' <http://schemaregistry1:18081/subjects/fast/versions>
curl -v -XPOST -H'Content-Type: application/vnd.schemaregistry.v1+json' --data '{"schema": "{\\"type\\": \\"int\\"}"}' <http://schemaregistry1:18081/subjects/campus/versions>
-> 생성 시 스키마 id가 출력된다.

# 등록한 subject(범위) 조회
curl -v -XGET <http://schemaregistry1:18081/subjects>

# 1. 설정한 호환성 타입 확인(글로벌 호환성 타입 변경)
curl -v -XGET <http://schemaregistry1:18081/config>
curl -v -XPUT --data '{"compatibility": "FULL"}' -H'Content-Type: application/vnd.schemaregistry.v1+json' <http://schemaregistry1:18081/config>
-> 글로벌 호환성 타입 FULL로 변경
curl -v -XGET <http://schemaregistry1:18081/config>

curl -v -XPUT --data '{"compatibility": "BACKWARD"}' -H'Content-Type: application/vnd.schemaregistry.v1+json' <http://schemaregistry1:18081/config>
-> 글로벌 호환성 타입 BACKWARD로 변경
curl -v -XGET <http://schemaregistry1:18081/config>

# subject별 호환성 타입 설정
curl -v -XPUT --data '{"compatibility": "NONE"}' -H'Content-Type: application/vnd.schemaregistry.v1+json' <http://schemaregistry1:18081/config/fast>

# 스키마 아이디로 등록된 스키마 조회(전역)
curl -v -XGET <http://schemaregistry1:18081/schemas/ids/1>
-> 스키마 아이디는 유니크하다.

# fast subject 내 버전으로 스키마 조회(지역)
curl -v -XGET <http://schemaregistry1:18081/subjects/fast/versions/1>

# fast subject 내 최신버전으로 스키마 조회
curl -v -XGET <http://schemaregistry1:18081/subjects/fast/versions/latest>

# 호환성 체크
curl -v -XPOST -H'Content-Type: application/vnd.schemaregistry.v1+json' --data '{"schema": "{\\"type\\": \\"int\\"}"}' <http://schemaregistry1:18081/compatibility/subjects/fast/versions/latest>
curl -v -XPOST -H'Content-Type: application/vnd.schemaregistry.v1+json' --data '{"schema": "{\\"type\\": \\"string\\"}"}' <http://schemaregistry1:18081/compatibility/subjects/campus/versions/latest>
curl -v -XPOST -H'Content-Type: application/vnd.schemaregistry.v1+json' --data '{"schema": "{\\"type\\": \\"int\\"}"}' <http://schemaregistry1:18081/compatibility/subjects/campus/versions/latest>

# subject 삭제
curl -v -XDELETE <http://schemaregistry1:18081/subjects/fast>
curl -v -XDELETE <http://schemaregistry1:18081/subjects/campus>
-> 삭제시 해당 id가 출력 된다.

##################################
# 2. REST Proxy
##################################

# Avro examples
# Topic 생성
curl -v -XPOST -H'Content-Type: application/vnd.kafka.avro.v2+json' -H'Accept: application/vnd.kafka.v2+json' --data '{"value_schema": "{\\"type\\": \\"record\\", \\"name\\": \\"account\\", \\"fields\\": [{\\"name\\": \\"balance\\", \\"type\\": \\"int\\"}]}", "records": [{"value": {"balance": 10000}}]}' <http://restproxy1:18082/topics/bank>
-> content-type을 avro를 추가, accept에 추가
-> key, value schema 지정 가능(위 코드에선 value만 지정)

# Consumer 생성
curl -v -XPOST -H'Content-Type: application/vnd.kafka.v2+json' --data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' <http://restproxy1:18082/consumers/my_consumer_group>
-> 호출 시 base url 출력이 된다.

# Topic 구독
curl -v -XPOST -H'Content-Type: application/vnd.kafka.v2+json' --data '{"topics":["bank"]}' <http://restproxy1:18082/consumers/my_consumer_group/instances/my_consumer_instance/subscription>

# 메세지 처음부터 소비
curl -v -XGET -H'Accept: application/vnd.kafka.avro.v2+json' <http://restproxy1:18082/consumers/my_consumer_group/instances/my_consumer_instance/records>

Avro 스키마 (avsc)

// User.avsc

{

"namespace": "example.avro",

"type": "record",

"name": "User",

"fields": [ {"name": "name", "type": "string"},

{"name": "age", "type": ["int", "null"]},

{"name": "favorite_color", "type": ["string", "null"]}

]

}

Last updated