curl -X GET http://localhost:8083/connectors/local-file-source/status
Flow
커넥트에 커넥터 생성 명령
커넥트 내부에 커넥터와 테스크를 생성
커넥터는 테스크들을 관리
테스크는 커넥터에 종속되는 개념으로 실질적인 데이터 처리를 한다. → 데이터 처리의 status를 확인하기 위해 테스크 상태를 확인해야 한다.
커넥트 실행 방법
단일 모드 커넥트(standalone mode kafka connect)
1개 프로세스만 실행
단일 장애점(SPOF)에 취약하다.
로컬 파일에 오프셋 정보를 저장
커넥트 설정파일과 커넥터 설정파일을 함께 정의하여 실행해야 한다.
분산 모드 커넥트(distributed mode kafka connect)
2대 이상의 서버에서 클러스터 형태로 운영
무중단 스케일 아웃이 가능하여 데이터 처리량의 변화에도 유연하다.
그룹으로 묶여서 운영
카프카 내부 토픽에 오프셋 정보를 저장
복제 개수를 3보다 큰값을 권장
REST API를 통해 커넥터 플로그인 종류, 테스크 상태, 커넥터 상태 등을 조회할 수 있다.
8083 포트, HTTP 메서드 기반 API 제공
REST API
Method
Path
description
GET
/
실행 중인 커넥트 정보 확인
GET
/connectors
실행 중인 커넥터 이름 확인
POST
/connectors
새로운 커넥터 생성 요청
GET
/connectors{커넥터 이름}
실행 중인 커넥터 정보 확인
GET
/connectors{커넥터 이름}/config
실행 중인 커넥터의 설정값 확인
PUT
/connectors{커넥터 이름}/config
실행 중인 커넥터의 설정값 변경 요청
GET
/connectors{커넥터 이름}/status
실행 중인 커넥터 상태 확인
POST
/connectors{커넥터 이름}/restart
실행 중인 커넥터 재시작 요청
PUT
/connectors{커넥터 이름}/pause
커넥터 일시 중지 요청
PUT
/connectors{커넥터 이름}/resume
일시 중지된 커넥터 실행 요청
DELETE
/connectors{커넥터 이름}/
실행 중인 커넥터 종료
GET
/connectors{커넥터 이름}/tasks
실행 중인 커넥터의 테스크 정보 확인
GET
/connectors{커넥터 이름}/tasks/{테스크 아이디}/status
실행 중인 커넥터의 테스크 상태 확인
POST
/connectors{커넥터 이름}/tasks/{테스크 아이디}/restart
실행 중인 커넥터의 테스크 재시작 요청
GET
/connectors{커넥터 이름}/topics
커넥터별 연동된 토픽 정보 확인
GET
connector-plugins/
커넥트에 존재하는 커넥터 플러그인 확인
PUT
connector-plugins/{커넥터 플러그인 이름}/config/validate
커넥터 생성 시 설정값 유효 여부 확인
소스 커넥터
소스 파일로부터 데이터를 가져와 토픽으로 넣는 역할
오픈 소스 커넥터를 사용해도 되지만 요구사항과 맞지 않는 경우 직접 개발해야 하는데,
카프카 커넥트 라이브러리에서 제공하는 SourceConnector와 SourceTask를 사용하여 구현하며 된다.
직접 구현한 소스 커넥터를 빌드하여 jar파일로 만들고 커넥트를 실행 시 플러그인으로 추가하여 사용한다.
사용 라이브러리: connect-api 라이브러리
SourceConnector
커넥터 설정파일 초기화하고 어떤 클래스를 사용할 것인지 정의하는 클래스
6개의 메서드를 오버라이드하여 각 메서드의 내부로직을 구현
public classTestSourceConnectorextendsSourceConnector {// 커넥트에서 호출할 때 사용 직관적인 클래스명으로 사용 @Overridepublic String version() {// 커넥터의 버전 리턴 } @Overridepublicvoidstart(Map<String, String> props) {// 설정값을 초기화하는 메서드// 올바른 값이 아닌 경우 ConnectException()을 호출 } @Overridepublicclass<?,extendsTask> taskClass() {// 이 커넥터가 사용할 테스크 클래스를 지정 } @Overridepublic List<Map<String, String>>taskConfigs(int maxTasks) {// 테스크 개수가 2개 이상인 경우 테스크마다 각기 다른 옵션을 설정할 때 사용 } @Overridepublic ConfigDef config() {// 커넥터가 사용할 설정값에 대한 정보를 받는다. } @Overridepublicvoidstop() {// 커넥터가 종료할 때 필요한 로직 작성 }}
SourceTask
데이터를 다루는 클래스
토픽에서 사용하는 오프셋이 아닌 자체적으로 사용하는 오프셋을 사용 → 중복해서 토픽으로 보내는 것을 방지
public classTestSourceTaskextendsSourceTask { @Overridepublic String version() {// 테스크 버전 지정 } @Overridepublicvoidstart(Map<String, String> props) {// 데이터 처리에 필요한 모든 리소스를 여기서 초기화 } @Overridepublic List<SourceRecord>poll() {//데이터를 읽어오는 로직 작성 } @Overridepublicvoidstop() {// 테스크가 종료될 때 필요한 로직 작성 }}
Sample Code
Config
packagecom.example;import org.apache.kafka.common.config.AbstractConfig;import org.apache.kafka.common.config.ConfigDef;import org.apache.kafka.common.config.ConfigDef.Importance;import org.apache.kafka.common.config.configDef.Type;import java.util.Map;public class SingleFileSourceConnectorConfig extends AbstractConfig {// 읽을 파일 지정 public static final String DIR_FILE_NAME = "file"; public static final String DIR_FILE_NAME_DEFAULT_VALUE = "/tmp/kafka.txt"; public static final String DIR_FILE_NAME_DOC = "읽을 파일 경로와 이름";// 어느 토픽으로 보낼것인지 지정 public static final String TOPIC_NAME = "topic"; private static final String TOPIC_DEFAULT_VALUE = "test"; private static final String TOPIC_DOC = "보낼 토픽 이름";// importance enum은 HIGH, MEDIUM, LOW 3가지 종류가 있다. public static ConfigDef CONFIG = new ConfigDef().define(DIR_FILE_NAME, Type.STRING, DIR_FILE_NAME_DEFAULT_VALUE, Importance.HIGH, DIR_FILE_NAME_DOC) .define(TOPIC_NAME, TYPE.STRING, TOPIC_DEFAULT_VALUE, Importance.HIGH, TOPIC_DOC); public SingleFileSourceConnectorConfig(Map<String, String> props) { super(CONFIG, props); }}
Source Connector
packagecom.example;import org.apache.kafka.common.config.configDef;import org.apache.kafka.common.config.ConfigException;import org.apache.kafka.connect.connector.Task;import org.apache.kafka.connect.errors.ConnectException;import org.apache.kafka.connect.source.SourceConnector;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.ArrayList;import java.util.Hashmap;import java.util.List;import java.util.Map;// 커넥트에서 사용할 커넥터 이름이 된다.public class SingleFileSourceConnector extends SourceConnector { private Map<String, String> configProperties; @Override public String version() { return "1.0"; }// 생성할 때 받은 설정값들을 초기화, 필수 설정값이 없는 경우 exception 발생 @Override public void start(Map<String, String> props) { this.configProperties = props; try { new SingleFileSourceConnectorConfig(props); } catch (ConfigException e) { throw new ConnectException(e.getMessage(), e); } }// 테스크 클래스 이름을 정한다. @Override public Class<?, extends Task> taskClass() { return SingleFileSourceTask.class; }// 테스크가 2개 이상인 경우 테스크마다 다른 설정값으 줄 때 사용한다.// 이곳에서는 2개 이상이더라도 동일한 설정값을 받도록 설정. @Override public List<Map<String, String>> taskConfigs(int maxTasks) { List<Map<String, String>> taskConfigs = new ArrayList<>(); Map<String,String> taskProps = new HashMap<>(); taskProps.putAll(configProperties); for (int i = 0; i< maxTasks; i++) { taskConfigs.add(taskProps); } return taskConfigs; }// 커넥터에서 사용할 설정값을 지정// 이곳에서는 SingleFileSourceConnectorConfig의 정의된 CONFIG 인스턴스를 리턴. @Override public ConfigDef config() { return SingleFileSourceConnectorConfig.CONFIG; }// 커넥터가 종료될 때 필요한 로직 추가 @Override public void stop() { }}
SourceTask
packagecom.example;import org.apahce.kafka.connect.data.Schema;import org.apahce.kafka.connect.errors.ConnectException;import org.apahce.kafka.connect.source.SourceRecord;import org.apahce.kafka.connect.source.SourceTask;import org.apahce.kafka.connect.storage.OffsetStorageReader;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.*;import java.nio.charset.StandardCharsets;import java.nio.file.Files;import java.nio.file.NoSuchFileException;import java.nio.file.Paths;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.Map;import java.util.stream.Collectors;public class SingleFileSourceTask extends SourceTask { private Logger logger = LoggerFactory.getLogger(SingleFileSourceTask.class);// 2개의 키를 기준으로 오프셋 스토리지에 읽은 위치를 저장한다. public final String FILENAME_FIELD = "filename"; public final String POSITION_FIELD = "position";//오프셋 스토리지에 데이터를 저장하고 읽을 때는 Map 자료구조에 담은 데이터를 사용.// fileName이 키, 커넥터가 읽는 파일 이름이 값으로 저장 private Map<String, String> fileNamePartition; private Map<String, String> offset; private String topic; private String file;// 읽은 파일의 위치를 커넥터 멤버 변수로 지정하여 사용 private long position = -1; @Override public String version() { return "1.0"; } @Override public void start(Map<String, String> props) { try {// Init Variables// 커넥터 실행시 받은 설정값을 선언하여 사용 SingleFileSourceConnectorConfig config = new SingleFileSourceConnectorConfig(props); topic = config.getString(SingleFileSourceConnectorConfig.TOPIC_NAME); file = config.getString(SingleFileSourceConnectorConfig.DIR_FILE_NAME); fileNamePartition = Collections.singletonMap(FILENAME_FIELD, file);// 현재 읽고자 하는 파일 정보 offset = context.offsetStorageReader().offset(fileNamePartition);// Get file offset from offsetStorageReader if (offset != null) {// 만약 null인 경우 읽고자하는 데이터가 없다는 뜻이다.// null이 아닌 경우 한 번이라도 커넥터를 통해 해당 파일을 처리했다는 뜻이다. Object lastReadFileOffset = offset.get(POSITION_FIELD); if (lastReadFileOffset != null) {// 마지막 오프셋을 포지션에 할당한다. position = (Long) lastReadFileOffset } } else {// null인 경우 파일을 처리한 적ㅇ이 없다는 뜻이므로 0을 할당한다. position = 0; } } catch (Exception e) { throw new ConnectException(e.getMessage(), e); } }// 테스크가 시작한 이후 지속적으로 데이터를 가져오기 위해 반복 호출되는 메서드이다. @Override public List<SourceRecord> poll() { List<SourceRecord> results = new ArrayList<>(); try { Thread.sleep(1000); List<String> lines = getLines(position); if(lines.size() > 0) { lines.forEach(line -> {// 오프셋 위치를 저장 Map<String, Long> sourceOffset = Collections.singletonMap(POSITION_FILED, ++position); SourceRecord sourceRecord = new SourceRecord(fileNamePartition, sourceOffset, topic, Schema.STRING.SCHEMA, line);// 읽어들인 데이터를 담는다. results.add(sourceRecord); }); }// 토픽으로 데이터를 보내는 부분 return results; } catch (Exception e) { logger.error(e.getMessage(), e); throw new ConnectException(e.getMessage(), e); } }// 마지막 읽었던 지점 이후로 파일의 마지막 지점까지 읽어서 리턴 private List<String> getLines(long readLine) throws Exception { BufferedReader reader = Files.newBufferedReader(Paths.get(file)); return reader.lines().skip(readLine).collect(Collectors.toList()); } @Override public void stop() { }}
싱크 커넥터
싱크 커넥터는 토픽의 데이터를 타킷 애플리케이션 또는 파일로 저장하는 역할.
카프카 커넥트 라이브러리에서 제공하는 SinkConnector와 SinkTask 클래스를 사용하여 직접 싱크 커넥터를 구현할 수 있다.
사용 라이브러리: connect-api 라이브러리
SinkConnector
사용자로부터 입력받은 설정값을 초기화
어떤 테스크 클래스를 사용할 것인지 정의
public classTestSinkConnectorextendsSinkConnector {// 커넥트에서 호출할 때, 사용 직관적인 클래스명으로 사용 @Overridepublic String version() {// 커넥터 버전 리턴 } @Overridepublicvoidstart(Map<String, String> props) {// 설정값을 초기화하는 메서드// 올바른 값이 아닌 경우 ConnectException()을 호출 } @Overridepublic Class<? extends Task>taskClass() {// 커넥터가 사용할 테스크 클래스 지정 } @Override public List<Map<String, String>>taskConfigs(int maxTasks) {// 테스크가 2개 이상인 경우 각기 다른 옵션을 설정할 때 사용 } @Override public ConfigDef config() {// 커넥터가 사용할 설정값 정보 } @Override public voidstop() {// 커넥터가 종료될 때, 필요한 로직 작성 }}
SinkTask
컨슈머 역할
데이터를 저장하는 코드를 가지게 된다.
public classTestSinkTaskextendsSinkTask { @Overridepublic String version() {// 테스크 버전 지정, 일반적으로 커넥터의 버전과 동일하게 맞춘다. } @Overridepublicvoidstart(Map<String, String> props) {// 테스크가 시작할 때 필요한 로직 작성// 데이터 처리에 필요한 리소스 초기화(ex. DB connection,...) } @Overridepublicvoidput(Collection<SinkRecord> records) {// 데이터를 토픽에서 주기적으로 가져오는 메서드// SinkRecord는 토픽의 한 개 레코드이며, 토픽, 파티션, 타임스탬프 등의 정보를 가진다. } @Overridepublicvoidflush(Map<TopicPartition, OffsetAndMetadata> offsets) {// put() 메서드를 통해 가져온 데이터를 일정 주기로 싱크 애플리케이션 또는 싱크 파일에 저장할때 사용하는 로직// ex. DB의 경우 commit 수행(실제 저장) } @Overridepublicvoidstop() {// 테스크가 종료될 때 필요한 로직 작성. }}
Sample Code
Config
packagecom.example;import org.apache.kafka.common.config.AbstractCofig;import org.apache.kafka.common.config.ConfigDef;import org.apache.kafka.common.config.ConfigDef.Importance;import org.apache.kafka.common.config.ConfigDef.Type;import java.util.Map;public class SingleFileSinkConnectorConfig extends AbstractConfig {// 토픽의 데이터를 저장할 파일 설정 public static final String DIR_FILE_NAME = "file"; private static final String DIR_FILE_NAME_DEFAULT_VALUE = "/tmp/kafka.txt"; private static final String DIR_FILE_NAME_DOC = "저장할 디렉토리와 파일 이름"; public static ConfigDef CONFIG = new ConfigDef().define(DIR_FILE_NAME, TYPE.STRING, DIR_FILE_NAME_DEFAULT_VALUE, Importance.HIGH, DIR_FILE_NAME_DOC); public SingleFileSinkConnectorConfig(Map<String, String> props) { super(CONFIG, props); }}
Sink Connector
package.com.example;import org.apache.kafka.common.config.configDef;import org.apache.kafka.common.config.ConfigException;import org.apache.kafka.connect.connector.Task;import org.apache.kafka.connect.errors.ConnectException;import org.apache.kafka.connect.source.SourceConnector;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.ArrayList;import java.util.Hashmap;import java.util.List;import java.util.Map;public class SingleFileSinkConnector extends SinkConnector { private Map<String, String> configProperties; @Override public String version() { return "1.0"; }// 커넥터 생성할 때 받은 설정값 초기화, 필수 설정값이 없는 경우 exception 발생 @Override public void start(Map<String, String> props) { this.configProperties = props; try { new SingleFileSinkConnectorConfig(props); } catch (ConfigException e) { throw new ConnectException(e.getMessage(), e); } }// 사용할 테스크 클래스 이름 지정 @Override public Class<? extends Task> taskClass() { return SingleFileSinkTask.class; }// 테스크가 2개 이상인 경우 각각 설정값을 줄 때 사용// 이곳에서는 동일한 설정값을 받도록 구현 @Override public List<Map<String, String>> taskConfigs(int maxTasks) { List<Map<String, String>> taskConfigs = new ArrayList<>(); Map<String, String> taskProps = new HashMap<>(); taskProps.putAll(configProperties); for (int i = 0; i < maxTasks; i++) { taskConfigs.add(taskProps); } return taskConfigs; }// 커넥터에서 사용할 설정값 지정// 이곳에서는 SingleFileSinkConnectorConfig의 맴버 변수 사용 @Override public ConfigDef config() { return SingleFileSinkConnectorConfig.CONFIG; } @Override public void stop() { }}
Sink Task
packagecom.example;import org.apahce.kafka.clients.consumer.OffsetAndMetadata;import org.apahce.kafka.connect.TopicPartition;import org.apahce.kafka.connect.errors.ConnectException;import org.apahce.kafka.connect.sink.SinkRecord;import org.apahce.kafka.connect.sink.SinkTask;import java.io.File;import java.io.FileWriter;import java.util.Collections;import java.util.Map;import java.io.IOException;public class SinlgeFileSinkTask extends SinkTask { private SingleFileSinkConnectorConfig config; private File file; private FileWriter fileWriter; @Override public String version() { return "1.0"; }// 리소스 초기화 @Override public void start(Map<String, String> props) { try { config = new SingleFileSinkConnectorConfig(props); file = new File(config.getString(config.DIR_FILE_NAME); fileWriter = new FileWriter(file, true); } catch (Exception e) { throw new ConnectException(e.getMessage(), e); } }// 토픽의 데이터를 가져오는 메서드 @Override public void put(Collections<SinkRecord> records) { try { for (SinkRecord record : records) { fileWriter.write(record.value().toString() + "\\n"); } } catch (IOException e) { throw new ConnectException(e.getMessage(), e); } }// 실질적으로 파일 시스템에 데이터 저장 @Override public void flush(Map<TopicPartition, OffsetAndMetadata> offsets> { try { fileWriter.flush(); } catch (IOException e) { throw new ConnectException(e.getMessage(), e); } }// 열었던 파일 닫기 @Override public void stop() { try { fileWriter.close(); } catch (IOException e) { throw new ConnectException(e.getMessage(),e); } }}