3.6 카프카 커넥트
카프카 커넥트(Kafka Connect)
오픈소스에 포함된 툴 중 하나로 데이터 파이프 라인 생성시 반복 작업을 줄이고 효율적인 전송을 이루기 위한 어플리케이션
특정한 작업 형태를 템플릿으로 만들어 놓은 커넥터를 실행 → 반복 작업의 감소
카프카 2.6에 포함된 커넥트를 실행할 경우 클러스터 간 토픽 미러링을 지원하는 미러메이커2 커넥터와 파일 싱크 커넥터, 파일 소스 커넥터를 기본 플로그인으로 제공
추가적인 커넥터를 사용하고 싶은 경우 jar 파일을 추가 하여 사용 가능
bin/connect-distributed.sh -daemon config/connect-distributed.properties커넥터(Connector)
소스 커넥터(source connector)
프로듀서 역할
싱크 커넥터(sync connector)
컨슈머 역할
커넥터는 각 커넥터가 가진 고유한 설정값을 입력받아서 데이터를 처리한다.
파일 외에도 일정한 프로토콜을 가진 소스 애플리케이션이나 싱크 애플리케이션이 있는 경우 커넥터를 통해 카프카로 데이터를 송수신 할 수 있다.
저장소(MySql, S3, MongoDB,...)가 대표적인 싱크,소스 애플리케이션으로 볼 수 있다.
카프카 2.5.0 제공 커넥터
FileStreamSinkConnector, FileStreamSourceConnector, MirrorcheckpointConnector, MirrorHeartbeatConnector, MirrorSourceConnector
파이프라인 생성시 옵션
컨버터
데이터 처리 전 스키마 변경을 도와준다.
커스텀 컨버터 사용 가능
기본 제공 : JsonConverter, StringConverter, ByteArrayConverter
트랜스폼
메시지 단위로 데이터를 변환하기 위한 용도
기본 제공: Cast,Drop, ExtractField
커넥터 실행
커넥터 확인
Flow
커넥트에 커넥터 생성 명령
커넥트 내부에 커넥터와 테스크를 생성
커넥터는 테스크들을 관리
테스크는 커넥터에 종속되는 개념으로 실질적인 데이터 처리를 한다. → 데이터 처리의 status를 확인하기 위해 테스크 상태를 확인해야 한다.
커넥트 실행 방법
단일 모드 커넥트(standalone mode kafka connect)
1개 프로세스만 실행
단일 장애점(SPOF)에 취약하다.
로컬 파일에 오프셋 정보를 저장
커넥트 설정파일과 커넥터 설정파일을 함께 정의하여 실행해야 한다.
분산 모드 커넥트(distributed mode kafka connect)
2대 이상의 서버에서 클러스터 형태로 운영
무중단 스케일 아웃이 가능하여 데이터 처리량의 변화에도 유연하다.
그룹으로 묶여서 운영
카프카 내부 토픽에 오프셋 정보를 저장
복제 개수를 3보다 큰값을 권장
REST API를 통해 커넥터 플로그인 종류, 테스크 상태, 커넥터 상태 등을 조회할 수 있다.
8083 포트, HTTP 메서드 기반 API 제공
REST API
GET
/
실행 중인 커넥트 정보 확인
GET
/connectors
실행 중인 커넥터 이름 확인
POST
/connectors
새로운 커넥터 생성 요청
GET
/connectors{커넥터 이름}
실행 중인 커넥터 정보 확인
GET
/connectors{커넥터 이름}/config
실행 중인 커넥터의 설정값 확인
PUT
/connectors{커넥터 이름}/config
실행 중인 커넥터의 설정값 변경 요청
GET
/connectors{커넥터 이름}/status
실행 중인 커넥터 상태 확인
POST
/connectors{커넥터 이름}/restart
실행 중인 커넥터 재시작 요청
PUT
/connectors{커넥터 이름}/pause
커넥터 일시 중지 요청
PUT
/connectors{커넥터 이름}/resume
일시 중지된 커넥터 실행 요청
DELETE
/connectors{커넥터 이름}/
실행 중인 커넥터 종료
GET
/connectors{커넥터 이름}/tasks
실행 중인 커넥터의 테스크 정보 확인
GET
/connectors{커넥터 이름}/tasks/{테스크 아이디}/status
실행 중인 커넥터의 테스크 상태 확인
POST
/connectors{커넥터 이름}/tasks/{테스크 아이디}/restart
실행 중인 커넥터의 테스크 재시작 요청
GET
/connectors{커넥터 이름}/topics
커넥터별 연동된 토픽 정보 확인
GET
connector-plugins/
커넥트에 존재하는 커넥터 플러그인 확인
PUT
connector-plugins/{커넥터 플러그인 이름}/config/validate
커넥터 생성 시 설정값 유효 여부 확인
소스 커넥터
소스 파일로부터 데이터를 가져와 토픽으로 넣는 역할
오픈 소스 커넥터를 사용해도 되지만 요구사항과 맞지 않는 경우 직접 개발해야 하는데,
카프카 커넥트 라이브러리에서 제공하는 SourceConnector와 SourceTask를 사용하여 구현하며 된다.
직접 구현한 소스 커넥터를 빌드하여 jar파일로 만들고 커넥트를 실행 시 플러그인으로 추가하여 사용한다.
사용 라이브러리: connect-api 라이브러리
SourceConnector
커넥터 설정파일 초기화하고 어떤 클래스를 사용할 것인지 정의하는 클래스
6개의 메서드를 오버라이드하여 각 메서드의 내부로직을 구현
SourceTask
데이터를 다루는 클래스
토픽에서 사용하는 오프셋이 아닌 자체적으로 사용하는 오프셋을 사용 → 중복해서 토픽으로 보내는 것을 방지
Sample Code
Config
Source Connector
SourceTask
싱크 커넥터
싱크 커넥터는 토픽의 데이터를 타킷 애플리케이션 또는 파일로 저장하는 역할.
카프카 커넥트 라이브러리에서 제공하는 SinkConnector와 SinkTask 클래스를 사용하여 직접 싱크 커넥터를 구현할 수 있다.
사용 라이브러리: connect-api 라이브러리
SinkConnector
사용자로부터 입력받은 설정값을 초기화
어떤 테스크 클래스를 사용할 것인지 정의
SinkTask
컨슈머 역할
데이터를 저장하는 코드를 가지게 된다.
Sample Code
Config
Sink Connector
Sink Task
Last updated