3.5 카프카 스트림즈
카프카 스트림즈
토픽에 적재된데이터를 실시간으로 변화하여 다른 토픽에 적재하는 라이브러리
JVM 위에서 하나의 프로세스로 실행된다.
→ 분산 시스템, 스케줄링 프로그램이 불필요하다.
내부적으로 스레드를 1개 이상 생성할 수 있다.
스레드를 증가시킴으로 처리량을 늘릴 수 있다.
스레드는 1개 이상의 태스크를 가진다.
테스크: 데이터 처리 최소 단위 ex) 3개의 파티션으로 이루어진 토픽 처리의 경우 3개의 태스크가 생긴다.
카프카 스트림 데이터 처리 애플리케이션
아파치 스파크(Apache Spark)
아파치 플링크(Apache Flink)
아파치 스톰(Apache Storm)
플루언트디(Flunetd)
카프카 스트림즈를 사용하는 이유
카프카에서 공식적으로 지원하는 라이브러리이다.
장애의 발생에도 정확히 한번(exactly once) 할 수 있는 장애 허용 시스템이 있어 데이터 처리 안정성이 매우 뛰어나다.
API를 사용하여 기능 확장에 용이하다.
주의
운영환경에서는 장애 발생에도 안정적으로 운영하기 위해 2개 이상의 서버로 구성
구조
카프카 스트림즈는 트리 형태와 유사한 토폴로지이다.
토폴로지를 이루는 노드를 하나의 프로세서(processor)라 부른다.
소스 프로세서
데이터를 가져오는 역할
스트림 프로세서
데이터를 처리하는 역할
싱크 프로세서
특정 카프카 토픽으로 저장하는 역할
노드와 노드를 이은 선은 스트림(Stream)이라 부른다. → 레코드와 동일
데이터 처리 예시
스트림즈 DSL, 프로세서 API 2가지 방법으로 개발 가능
스트림즈 DSL
메시지 값을 기반으로 토픽 분기 처리
지난 10분간 들어온 데이터의 개수 집계
토픽과 다른 토픽의 결합으로 새로운 데이터 생성
프로세서 API
메시지 값의 종류에 따라 토픽을 가변적으로 전송
일정한 시간 간격으로 데이터 처리
스트림즈DSL
레코드의 흐름을 추상화한 3가지(KStream, KTable, GlobalKTable)이 있다.
KSream
레코드의 흐름을 표현한 것으로 메시지 key,value로 구성
KStream으로 데이터 조회 시에 토픽에 존재하는 모든 레코드가 출력
KTable
메시지 key를 기준으로 묶어서 출력 → 데이터 적재할 때 동일한 메시지 키가 있을 경우 데이터가 업데이트
KTable로 데이터 조회 시에 메시지 key를 기준으로 가장 최신에 추가된 데이터 출력
GlobalKTable
메시지 키를 기준으로 묶어서 사용
모든 파티션 데이터가 각 테스크에 할당되어 사용된다. → KTable로 선언된 토픽은 1개 파티션이 1개 테스크에 할당되어 사용된다.
KTable, GlobalKTable 과 KStream의 데이터 조인(join) 비교
co-partitioning 조인을 하는 2개 데이터의 파티션 개수가 동일하고 파티셔닝 전략을 동일하게 맞추는 작업 repatitioning 새로운 토픽에 새로운 메시지 키를 가지도록 재배열하는 과정
KTable, KStream Join
Kstream과 KTable을 조인하기 위해선 반드시 코파티셔닝(co-partitioning) 되어 있어야한다. 동일한 파티션 개수와 파티셔닝 전략의 경우 동일한 메시지 key를 가진 데이터가 동일한 테스크에 들어가는 것을 보장. 하지만 문제는 조인을 수행하려는 토픽들이 코파티셔닝 되어 있다는 것을 보장 할 수 없다. 코파티셔닝이 되지 않은 2개의 토픽을 조인하는 로직으로 실행하면 TopologyEception 발생.
코파티셔닝이 되어 있지 않다면 리파티셔닝 하는 과정을 거쳐야 한다.
GlobalKTable, KStream Join
모든 테스크에 동일하게 공유되어 사용되기 때문에 코파티션닝과 관계없이 조인 가능
주의: 모든 테스크에 데이터를 저장하고 사용하기 때문에 로컬 스토리지 사용량, 네트워크, 브로커에 부하가 생기게 된다.
많은 양의 데이터인 경우 리파티셔닝을 통해 KTable 사용을 권장
주요 옵션
필수 옵션
bootstrap.severs
전송할 대상카프카 클러스터에 속한 브로커 호스트 이름:포트 (1개 이상)
application.id
고유한 아이디로 다른 로직을 가진 스트림즈 애플리케이션들은 다른 값을 가져야한다.
선택옵션
default.key.serde
레코드 메시지 키를 직렬화, 역직렬화하는 클래스 지정
default: Serdes.ByteArray().getClass().getName()
default.value.serde
레코드 메시지 메시지 값을 직렬화, 역직렬화하는 클래스 지정
default: Serdes.ByteArray().getClass().getName()
num.stream.threads
스레드 개수 지정
default: 1
state.dir
상태기반 데이터 처리를 할 때 데이터를 저장할 디렉토리 지정
default: /tmp/kafka-streams
프로세서 API
투박한 코드를 가지지만 토폴로지를 기준으로 데이터를 처리한다는 관점에서는 스트림즈 DSL과 동일한 역할을 한다.
상세 로직의 구현이 필요한 경우 활용 할 수 있다.
주의: 스트림즈DSL에서 사요한 KStream, KTable, GlobalKTable의 개념이 없다.
Last updated