3.5 카프카 스트림즈

카프카 스트림즈

  • 토픽에 적재된데이터를 실시간으로 변화하여 다른 토픽에 적재하는 라이브러리

  • JVM 위에서 하나의 프로세스로 실행된다.

    → 분산 시스템, 스케줄링 프로그램이 불필요하다.

  • 내부적으로 스레드를 1개 이상 생성할 수 있다.

    • 스레드를 증가시킴으로 처리량을 늘릴 수 있다.

  • 스레드는 1개 이상의 태스크를 가진다.

    • 테스크: 데이터 처리 최소 단위 ex) 3개의 파티션으로 이루어진 토픽 처리의 경우 3개의 태스크가 생긴다.

  • 카프카 스트림 데이터 처리 애플리케이션

    • 아파치 스파크(Apache Spark)

    • 아파치 플링크(Apache Flink)

    • 아파치 스톰(Apache Storm)

    • 플루언트디(Flunetd)

카프카 스트림즈를 사용하는 이유

  • 카프카에서 공식적으로 지원하는 라이브러리이다.

  • 장애의 발생에도 정확히 한번(exactly once) 할 수 있는 장애 허용 시스템이 있어 데이터 처리 안정성이 매우 뛰어나다.

  • API를 사용하여 기능 확장에 용이하다.

주의

  • 운영환경에서는 장애 발생에도 안정적으로 운영하기 위해 2개 이상의 서버로 구성

구조

  • 카프카 스트림즈는 트리 형태와 유사한 토폴로지이다.

  • 토폴로지를 이루는 노드를 하나의 프로세서(processor)라 부른다.

    • 소스 프로세서

      • 데이터를 가져오는 역할

    • 스트림 프로세서

      • 데이터를 처리하는 역할

    • 싱크 프로세서

      • 특정 카프카 토픽으로 저장하는 역할

  • 노드와 노드를 이은 선은 스트림(Stream)이라 부른다. → 레코드와 동일

데이터 처리 예시

스트림즈 DSL, 프로세서 API 2가지 방법으로 개발 가능

  • 스트림즈 DSL

    • 메시지 값을 기반으로 토픽 분기 처리

    • 지난 10분간 들어온 데이터의 개수 집계

    • 토픽과 다른 토픽의 결합으로 새로운 데이터 생성

  • 프로세서 API

    • 메시지 값의 종류에 따라 토픽을 가변적으로 전송

    • 일정한 시간 간격으로 데이터 처리

스트림즈DSL

레코드의 흐름을 추상화한 3가지(KStream, KTable, GlobalKTable)이 있다.

  • KSream

    • 레코드의 흐름을 표현한 것으로 메시지 key,value로 구성

    • KStream으로 데이터 조회 시에 토픽에 존재하는 모든 레코드가 출력

  • KTable

    • 메시지 key를 기준으로 묶어서 출력 → 데이터 적재할 때 동일한 메시지 키가 있을 경우 데이터가 업데이트

    • KTable로 데이터 조회 시에 메시지 key를 기준으로 가장 최신에 추가된 데이터 출력

  • GlobalKTable

    • 메시지 키를 기준으로 묶어서 사용

    • 모든 파티션 데이터가 각 테스크에 할당되어 사용된다. → KTable로 선언된 토픽은 1개 파티션이 1개 테스크에 할당되어 사용된다.

KTable, GlobalKTable 과 KStream의 데이터 조인(join) 비교

co-partitioning 조인을 하는 2개 데이터의 파티션 개수가 동일하고 파티셔닝 전략을 동일하게 맞추는 작업 repatitioning 새로운 토픽에 새로운 메시지 키를 가지도록 재배열하는 과정

KTable, KStream Join

  • Kstream과 KTable을 조인하기 위해선 반드시 코파티셔닝(co-partitioning) 되어 있어야한다. 동일한 파티션 개수와 파티셔닝 전략의 경우 동일한 메시지 key를 가진 데이터가 동일한 테스크에 들어가는 것을 보장. 하지만 문제는 조인을 수행하려는 토픽들이 코파티셔닝 되어 있다는 것을 보장 할 수 없다. 코파티셔닝이 되지 않은 2개의 토픽을 조인하는 로직으로 실행하면 TopologyEception 발생.

  • 코파티셔닝이 되어 있지 않다면 리파티셔닝 하는 과정을 거쳐야 한다.

GlobalKTable, KStream Join

  • 모든 테스크에 동일하게 공유되어 사용되기 때문에 코파티션닝과 관계없이 조인 가능

  • 주의: 모든 테스크에 데이터를 저장하고 사용하기 때문에 로컬 스토리지 사용량, 네트워크, 브로커에 부하가 생기게 된다.

  • 많은 양의 데이터인 경우 리파티셔닝을 통해 KTable 사용을 권장

주요 옵션

  • 필수 옵션

    • bootstrap.severs

      • 전송할 대상카프카 클러스터에 속한 브로커 호스트 이름:포트 (1개 이상)

    • application.id

      • 고유한 아이디로 다른 로직을 가진 스트림즈 애플리케이션들은 다른 값을 가져야한다.

  • 선택옵션

    • default.key.serde

      • 레코드 메시지 키를 직렬화, 역직렬화하는 클래스 지정

      • default: Serdes.ByteArray().getClass().getName()

    • default.value.serde

      • 레코드 메시지 메시지 값을 직렬화, 역직렬화하는 클래스 지정

      • default: Serdes.ByteArray().getClass().getName()

    • num.stream.threads

      • 스레드 개수 지정

      • default: 1

    • state.dir

      • 상태기반 데이터 처리를 할 때 데이터를 저장할 디렉토리 지정

      • default: /tmp/kafka-streams

프로세서 API

  • 투박한 코드를 가지지만 토폴로지를 기준으로 데이터를 처리한다는 관점에서는 스트림즈 DSL과 동일한 역할을 한다.

  • 상세 로직의 구현이 필요한 경우 활용 할 수 있다.

  • 주의: 스트림즈DSL에서 사요한 KStream, KTable, GlobalKTable의 개념이 없다.

Last updated