3.6 카프카 커넥트

카프카 커넥트(Kafka Connect)

  • 오픈소스에 포함된 툴 중 하나로 데이터 파이프 라인 생성시 반복 작업을 줄이고 효율적인 전송을 이루기 위한 어플리케이션

  • 특정한 작업 형태를 템플릿으로 만들어 놓은 커넥터를 실행 → 반복 작업의 감소

  • 카프카 2.6에 포함된 커넥트를 실행할 경우 클러스터 간 토픽 미러링을 지원하는 미러메이커2 커넥터와 파일 싱크 커넥터, 파일 소스 커넥터를 기본 플로그인으로 제공

  • 추가적인 커넥터를 사용하고 싶은 경우 jar 파일을 추가 하여 사용 가능

bin/connect-distributed.sh -daemon config/connect-distributed.properties

커넥터(Connector)

  • 소스 커넥터(source connector)

    • 프로듀서 역할

  • 싱크 커넥터(sync connector)

    • 컨슈머 역할

  • 커넥터는 각 커넥터가 가진 고유한 설정값을 입력받아서 데이터를 처리한다.

  • 파일 외에도 일정한 프로토콜을 가진 소스 애플리케이션이나 싱크 애플리케이션이 있는 경우 커넥터를 통해 카프카로 데이터를 송수신 할 수 있다.

  • 저장소(MySql, S3, MongoDB,...)가 대표적인 싱크,소스 애플리케이션으로 볼 수 있다.

  • 카프카 2.5.0 제공 커넥터

    • FileStreamSinkConnector, FileStreamSourceConnector, MirrorcheckpointConnector, MirrorHeartbeatConnector, MirrorSourceConnector

  • 파이프라인 생성시 옵션

    • 컨버터

      • 데이터 처리 전 스키마 변경을 도와준다.

      • 커스텀 컨버터 사용 가능

      • 기본 제공 : JsonConverter, StringConverter, ByteArrayConverter

    • 트랜스폼

      • 메시지 단위로 데이터를 변환하기 위한 용도

      • 기본 제공: Cast,Drop, ExtractField

커넥터 실행

 curl -X POST -H "Content-Type: application/json" \
 --data '{
    "name": "local-file-source", 
    "config":
    {
       "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
       "file": "/tmp/test.txt",
       "tasks.max": "1",
       "topic": "connect-test"
   }
}' \
http://localhost:8083/connectors

커넥터 확인

curl -X GET http://localhost:8083/connectors/local-file-source/status

Flow

  1. 커넥트에 커넥터 생성 명령

  2. 커넥트 내부에 커넥터와 테스크를 생성

  3. 커넥터는 테스크들을 관리

  4. 테스크는 커넥터에 종속되는 개념으로 실질적인 데이터 처리를 한다. → 데이터 처리의 status를 확인하기 위해 테스크 상태를 확인해야 한다.

커넥트 실행 방법

  1. 단일 모드 커넥트(standalone mode kafka connect)

    • 1개 프로세스만 실행

    • 단일 장애점(SPOF)에 취약하다.

    • 로컬 파일에 오프셋 정보를 저장

    • 커넥트 설정파일과 커넥터 설정파일을 함께 정의하여 실행해야 한다.

  2. 분산 모드 커넥트(distributed mode kafka connect)

    • 2대 이상의 서버에서 클러스터 형태로 운영

    • 무중단 스케일 아웃이 가능하여 데이터 처리량의 변화에도 유연하다.

    • 그룹으로 묶여서 운영

    • 카프카 내부 토픽에 오프셋 정보를 저장

      • 복제 개수를 3보다 큰값을 권장

  • REST API를 통해 커넥터 플로그인 종류, 테스크 상태, 커넥터 상태 등을 조회할 수 있다.

    • 8083 포트, HTTP 메서드 기반 API 제공

REST API

MethodPathdescription

GET

/

실행 중인 커넥트 정보 확인

GET

/connectors

실행 중인 커넥터 이름 확인

POST

/connectors

새로운 커넥터 생성 요청

GET

/connectors{커넥터 이름}

실행 중인 커넥터 정보 확인

GET

/connectors{커넥터 이름}/config

실행 중인 커넥터의 설정값 확인

PUT

/connectors{커넥터 이름}/config

실행 중인 커넥터의 설정값 변경 요청

GET

/connectors{커넥터 이름}/status

실행 중인 커넥터 상태 확인

POST

/connectors{커넥터 이름}/restart

실행 중인 커넥터 재시작 요청

PUT

/connectors{커넥터 이름}/pause

커넥터 일시 중지 요청

PUT

/connectors{커넥터 이름}/resume

일시 중지된 커넥터 실행 요청

DELETE

/connectors{커넥터 이름}/

실행 중인 커넥터 종료

GET

/connectors{커넥터 이름}/tasks

실행 중인 커넥터의 테스크 정보 확인

GET

/connectors{커넥터 이름}/tasks/{테스크 아이디}/status

실행 중인 커넥터의 테스크 상태 확인

POST

/connectors{커넥터 이름}/tasks/{테스크 아이디}/restart

실행 중인 커넥터의 테스크 재시작 요청

GET

/connectors{커넥터 이름}/topics

커넥터별 연동된 토픽 정보 확인

GET

connector-plugins/

커넥트에 존재하는 커넥터 플러그인 확인

PUT

connector-plugins/{커넥터 플러그인 이름}/config/validate

커넥터 생성 시 설정값 유효 여부 확인

소스 커넥터

소스 파일로부터 데이터를 가져와 토픽으로 넣는 역할

오픈 소스 커넥터를 사용해도 되지만 요구사항과 맞지 않는 경우 직접 개발해야 하는데,

카프카 커넥트 라이브러리에서 제공하는 SourceConnector와 SourceTask를 사용하여 구현하며 된다.

직접 구현한 소스 커넥터를 빌드하여 jar파일로 만들고 커넥트를 실행 시 플러그인으로 추가하여 사용한다.

사용 라이브러리: connect-api 라이브러리

  1. SourceConnector

    • 커넥터 설정파일 초기화하고 어떤 클래스를 사용할 것인지 정의하는 클래스

    • 6개의 메서드를 오버라이드하여 각 메서드의 내부로직을 구현

public class TestSourceConnector extends SourceConnector {
// 커넥트에서 호출할 때 사용 직관적인 클래스명으로 사용

	@Override
	public String version() {
		// 커넥터의 버전 리턴
	}

	@Override
	public void start(Map<String, String> props) {
		// 설정값을 초기화하는 메서드
		// 올바른 값이 아닌 경우 ConnectException()을 호출
	}

	@Override
	public class<?, extends Task> taskClass() {
		// 이 커넥터가 사용할 테스크 클래스를 지정
	}

	@Override
	public List<Map<String, String>> taskConfigs(int maxTasks) {
		// 테스크 개수가 2개 이상인 경우 테스크마다 각기 다른 옵션을 설정할 때 사용
	}

	@Override
	public ConfigDef config() {
		// 커넥터가 사용할 설정값에 대한 정보를 받는다.
	}

	@Override
	public void stop() {
		// 커넥터가 종료할 때 필요한 로직 작성
	}
}
  1. SourceTask

    • 데이터를 다루는 클래스

    • 토픽에서 사용하는 오프셋이 아닌 자체적으로 사용하는 오프셋을 사용 → 중복해서 토픽으로 보내는 것을 방지

public class TestSourceTask extends SourceTask {

	@Override
	public String version() {
		// 테스크 버전 지정
	}

	@Override
	public void start(Map<String, String> props) {
		// 데이터 처리에 필요한 모든 리소스를 여기서 초기화
	}

	@Override
	public List<SourceRecord> poll() {
		//데이터를 읽어오는 로직 작성
	}

	@Override
	public void stop() {
		// 테스크가 종료될 때 필요한 로직 작성
	}

}

Sample Code

Config

package com.example;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.configDef.Type;
import java.util.Map;

public class SingleFileSourceConnectorConfig extends AbstractConfig {
	
	// 읽을 파일 지정
	public static final String DIR_FILE_NAME = "file";
	public static final String DIR_FILE_NAME_DEFAULT_VALUE = "/tmp/kafka.txt";
	public static final String DIR_FILE_NAME_DOC = "읽을 파일 경로와 이름";

	// 어느 토픽으로 보낼것인지 지정
	public static final String TOPIC_NAME = "topic";
	private static final String TOPIC_DEFAULT_VALUE = "test";
	private static final String TOPIC_DOC = "보낼 토픽 이름";
	
	// importance enum은 HIGH, MEDIUM, LOW 3가지 종류가 있다.
	public static ConfigDef CONFIG = new ConfigDef().define(DIR_FILE_NAME,
																													Type.STRING,
																													DIR_FILE_NAME_DEFAULT_VALUE,
																													Importance.HIGH,
																													DIR_FILE_NAME_DOC)
																													.define(TOPIC_NAME,
																																	TYPE.STRING,
																																	TOPIC_DEFAULT_VALUE,
																																	Importance.HIGH,
																																	TOPIC_DOC);
	public SingleFileSourceConnectorConfig(Map<String, String> props) {
		super(CONFIG, props);	
	}
}

Source Connector

package com.example;

import org.apache.kafka.common.config.configDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Hashmap;
import java.util.List;
import java.util.Map;

// 커넥트에서 사용할 커넥터 이름이 된다.
public class SingleFileSourceConnector extends SourceConnector {

	private Map<String, String> configProperties;

	@Override
	public String version() {
		return "1.0";
	}

	// 생성할 때 받은 설정값들을 초기화, 필수 설정값이 없는 경우 exception 발생
	@Override
	public void start(Map<String, String> props) {
		this.configProperties = props;
		try {
			new SingleFileSourceConnectorConfig(props);
		} catch (ConfigException e) {
			throw new ConnectException(e.getMessage(), e);		
		}
	}

	// 테스크 클래스 이름을 정한다.
	@Override
	public Class<?, extends Task> taskClass() {
		return SingleFileSourceTask.class;
	}

	// 테스크가 2개 이상인 경우 테스크마다 다른 설정값으 줄 때 사용한다.
	// 이곳에서는 2개 이상이더라도 동일한 설정값을 받도록 설정.
	@Override
	public List<Map<String, String>> taskConfigs(int maxTasks) {
		List<Map<String, String>> taskConfigs = new ArrayList<>();
		Map<String,String> taskProps = new HashMap<>();
		taskProps.putAll(configProperties);
		for (int i = 0; i< maxTasks; i++) {
			taskConfigs.add(taskProps);
		}
		return taskConfigs;
	}

	// 커넥터에서 사용할 설정값을 지정
	// 이곳에서는 SingleFileSourceConnectorConfig의 정의된 CONFIG 인스턴스를 리턴.
	@Override
	public ConfigDef config() {
		return SingleFileSourceConnectorConfig.CONFIG;
	}
	
	// 커넥터가 종료될 때 필요한 로직 추가
	@Override
	public void stop() {
	}
}

SourceTask

package com.example;

import org.apahce.kafka.connect.data.Schema;
import org.apahce.kafka.connect.errors.ConnectException;
import org.apahce.kafka.connect.source.SourceRecord;
import org.apahce.kafka.connect.source.SourceTask;
import org.apahce.kafka.connect.storage.OffsetStorageReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class SingleFileSourceTask extends SourceTask {
	private Logger logger = LoggerFactory.getLogger(SingleFileSourceTask.class);
	
	// 2개의 키를 기준으로 오프셋 스토리지에 읽은 위치를 저장한다.
	public final String FILENAME_FIELD = "filename";
	public final String POSITION_FIELD = "position";
	
	//오프셋 스토리지에 데이터를 저장하고 읽을 때는 Map 자료구조에 담은 데이터를 사용.
	// fileName이 키, 커넥터가 읽는 파일 이름이 값으로 저장
	private Map<String, String> fileNamePartition;
	private Map<String, String> offset;
	private String topic;
	private String file;
	// 읽은 파일의 위치를 커넥터 멤버 변수로 지정하여 사용
	private long position = -1;

	@Override
	public String version() {
		return "1.0";
	}

	@Override
	public void start(Map<String, String> props) {
		try {
			// Init Variables
			// 커넥터 실행시 받은 설정값을 선언하여 사용 
			SingleFileSourceConnectorConfig config = new SingleFileSourceConnectorConfig(props);
			topic = config.getString(SingleFileSourceConnectorConfig.TOPIC_NAME);
			file = config.getString(SingleFileSourceConnectorConfig.DIR_FILE_NAME);
			fileNamePartition = Collections.singletonMap(FILENAME_FIELD, file);
			// 현재 읽고자 하는 파일 정보
			offset = context.offsetStorageReader().offset(fileNamePartition);

			// Get file offset from offsetStorageReader
			if (offset != null) {
				// 만약 null인 경우 읽고자하는 데이터가 없다는 뜻이다.
				// null이 아닌 경우 한 번이라도 커넥터를 통해 해당 파일을 처리했다는 뜻이다.
				Object lastReadFileOffset = offset.get(POSITION_FIELD);
				if (lastReadFileOffset != null) {
					// 마지막 오프셋을 포지션에 할당한다.
					position = (Long) lastReadFileOffset
				} 
			} else {
				// null인 경우 파일을 처리한 적ㅇ이 없다는 뜻이므로 0을 할당한다.
				position = 0;
			}

		} catch (Exception e) {
			throw new ConnectException(e.getMessage(), e);
		}
	}

	// 테스크가 시작한 이후 지속적으로 데이터를 가져오기 위해 반복 호출되는 메서드이다.
	@Override
	public List<SourceRecord> poll() {
		List<SourceRecord> results = new ArrayList<>();
		try {
			Thread.sleep(1000);
			
			List<String> lines = getLines(position);
			
			if(lines.size() > 0) {
				lines.forEach(line -> {
					// 오프셋 위치를 저장
					Map<String, Long> sourceOffset = Collections.singletonMap(POSITION_FILED, ++position);
					SourceRecord sourceRecord = new SourceRecord(fileNamePartition, sourceOffset, topic, Schema.STRING.SCHEMA, line);
					// 읽어들인 데이터를 담는다.
					results.add(sourceRecord);
				});
			}
			// 토픽으로 데이터를 보내는 부분
			return results;
		} catch (Exception e) {
			logger.error(e.getMessage(), e);
			throw new ConnectException(e.getMessage(), e);
		}
	}

	//  마지막 읽었던 지점 이후로 파일의 마지막 지점까지 읽어서 리턴
	private List<String> getLines(long readLine) throws Exception {
		BufferedReader reader = Files.newBufferedReader(Paths.get(file));
		return reader.lines().skip(readLine).collect(Collectors.toList());	
	}

	@Override
	public void stop() {
	}
}

싱크 커넥터

싱크 커넥터는 토픽의 데이터를 타킷 애플리케이션 또는 파일로 저장하는 역할.

카프카 커넥트 라이브러리에서 제공하는 SinkConnector와 SinkTask 클래스를 사용하여 직접 싱크 커넥터를 구현할 수 있다.

사용 라이브러리: connect-api 라이브러리

  1. SinkConnector

    • 사용자로부터 입력받은 설정값을 초기화

    • 어떤 테스크 클래스를 사용할 것인지 정의

public class TestSinkConnector extends SinkConnector {
// 커넥트에서 호출할 때, 사용 직관적인 클래스명으로 사용

	@Override
	public String version() {
		// 커넥터 버전 리턴
	}
	
	@Override
	public void start(Map<String, String> props) {
		// 설정값을 초기화하는 메서드
		// 올바른 값이 아닌 경우 ConnectException()을 호출
	}
	
	@Override
	public Class<? extends Task> taskClass() {
		// 커넥터가 사용할 테스크 클래스 지정
	}
	
	@Override
	public List<Map<String, String>> taskConfigs(int maxTasks) {
		// 테스크가 2개 이상인 경우 각기 다른 옵션을 설정할 때 사용
	}
	
	@Override
	public ConfigDef config() {
		// 커넥터가 사용할 설정값 정보
	}
	
	@Override
	public void stop() {
		// 커넥터가 종료될 때, 필요한 로직 작성
	}

}
  1. SinkTask

    • 컨슈머 역할

    • 데이터를 저장하는 코드를 가지게 된다.

public class TestSinkTask extends SinkTask {

	@Override
	public String version() {
		// 테스크 버전 지정, 일반적으로 커넥터의 버전과 동일하게 맞춘다.
	}
	
	@Override
	public void start(Map<String, String> props) {
		// 테스크가 시작할 때 필요한 로직 작성
		// 데이터 처리에 필요한 리소스 초기화(ex. DB connection,...)
	}
	
	@Override
	public void put(Collection<SinkRecord> records) {
		// 데이터를 토픽에서 주기적으로 가져오는 메서드
		// SinkRecord는 토픽의 한 개 레코드이며, 토픽, 파티션, 타임스탬프 등의 정보를 가진다.
	}
	
	@Override
	public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
		// put() 메서드를 통해 가져온 데이터를 일정 주기로 싱크 애플리케이션 또는 싱크 파일에 저장할때 사용하는 로직
		// ex. DB의 경우 commit 수행(실제 저장)
	}
	
	@Override
	public void stop() {
		// 테스크가 종료될 때 필요한 로직 작성.
	}

}

Sample Code

Config

package com.example;

import org.apache.kafka.common.config.AbstractCofig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import java.util.Map;

public class SingleFileSinkConnectorConfig extends AbstractConfig {
	
	// 토픽의 데이터를 저장할 파일 설정
	public static final String DIR_FILE_NAME = "file";
	private static final String DIR_FILE_NAME_DEFAULT_VALUE = "/tmp/kafka.txt";
	private static final String DIR_FILE_NAME_DOC = "저장할 디렉토리와 파일 이름";

	public static ConfigDef CONFIG = new ConfigDef().define(DIR_FILE_NAME,
																													TYPE.STRING,
																													DIR_FILE_NAME_DEFAULT_VALUE,
																													Importance.HIGH,
																													DIR_FILE_NAME_DOC);

	public SingleFileSinkConnectorConfig(Map<String, String> props) {
		super(CONFIG, props);
	}
}

Sink Connector

package.com.example;

import org.apache.kafka.common.config.configDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Hashmap;
import java.util.List;
import java.util.Map;

public class SingleFileSinkConnector extends SinkConnector {
	private Map<String, String> configProperties;

	@Override
	public String version() {
		return "1.0";
	}
	
	// 커넥터 생성할 때 받은 설정값 초기화, 필수 설정값이 없는 경우 exception 발생
	@Override
	public void start(Map<String, String> props) {
		this.configProperties = props;
		try {
			new SingleFileSinkConnectorConfig(props);
		} catch (ConfigException e) {
			throw new ConnectException(e.getMessage(), e);
		}
	}

	// 사용할 테스크 클래스 이름 지정
	@Override
	public Class<? extends Task> taskClass() {
		return SingleFileSinkTask.class;
	}

	// 테스크가 2개 이상인 경우 각각 설정값을 줄 때 사용
	// 이곳에서는 동일한 설정값을 받도록 구현
	@Override
	public List<Map<String, String>> taskConfigs(int maxTasks) {
		List<Map<String, String>> taskConfigs = new ArrayList<>();
		Map<String, String> taskProps = new HashMap<>();
		taskProps.putAll(configProperties);
		for (int i = 0; i < maxTasks; i++) {
			taskConfigs.add(taskProps);
		}
		return taskConfigs;
	}

	// 커넥터에서 사용할 설정값 지정
	// 이곳에서는 SingleFileSinkConnectorConfig의 맴버 변수 사용
	@Override
	public ConfigDef config() {
		return SingleFileSinkConnectorConfig.CONFIG;
	}

	@Override
	public void stop() {
	}

}

Sink Task

package com.example;

import org.apahce.kafka.clients.consumer.OffsetAndMetadata;
import org.apahce.kafka.connect.TopicPartition;
import org.apahce.kafka.connect.errors.ConnectException;
import org.apahce.kafka.connect.sink.SinkRecord;
import org.apahce.kafka.connect.sink.SinkTask;

import java.io.File;
import java.io.FileWriter;
import java.util.Collections;
import java.util.Map;
import java.io.IOException;

public class SinlgeFileSinkTask extends SinkTask {
	private SingleFileSinkConnectorConfig config;
	private File file;
	private FileWriter fileWriter;

	@Override
	public String version() {
		return "1.0";
	}

	// 리소스 초기화
	@Override
	public void start(Map<String, String> props) {
		try {
			config = new SingleFileSinkConnectorConfig(props);
			file = new File(config.getString(config.DIR_FILE_NAME);
			fileWriter = new FileWriter(file, true);
		} catch (Exception e) {
			throw new ConnectException(e.getMessage(), e);
		}
	}

	// 토픽의 데이터를 가져오는 메서드
	@Override
	public void put(Collections<SinkRecord> records) {
		try {
			for (SinkRecord record : records) {
				fileWriter.write(record.value().toString() + "\\n");
			}
		} catch (IOException e) {
			throw new ConnectException(e.getMessage(), e);
		}
	}

	// 실질적으로 파일 시스템에 데이터 저장
	@Override
	public void flush(Map<TopicPartition, OffsetAndMetadata> offsets> {
		try {
			fileWriter.flush();		
		} catch (IOException e) {
			throw new ConnectException(e.getMessage(), e);
		}
	}

	// 열었던 파일 닫기
	@Override
	public void stop() {
		try {
			fileWriter.close();
		} catch (IOException e) {
			throw new ConnectException(e.getMessage(),e);
		}
	}
	
}

Last updated