02.SingleMessageTransform(SMT),Converter

Connect Worker

Connector๋ฅผ ๋ฐฐํฌ/๊ตฌ๋™ ์‹œํ‚ค๋Š” ํ”„๋กœ์„ธ์Šค

Connector ๋ฐฐํฌ ์ „์˜ ๋ชจ์Šต

Connector ๋ฐฐํฌ

Connecotr Class ์ง€์ •

Connector ๋ฐฐํฌ๋ฅผ ์œ„ํ•œ Configuration์˜ ์ผ๋ถ€

"config": {
[...]
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://postgresql_ip:5432/",
"topics": โ€sample.orders",
}

Connect Worker์— Connector Instance ๋ฐ Task ์ƒ์„ฑ

  • Connector Task๊ฐ€ Source ์‹œ์Šคํ…œ์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์™€์„œ Connect Record๋กœ ๋ณ€ํ™˜

  • ๋Œ€์ƒ ์‹œ์Šคํ…œ์„ ์—ฐ๊ฒฐํ•˜๊ธฐ ์œ„ํ•œ Connector๊ฐ€ ์—†๋‹ค๋ฉด, Connect Framework๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ Custom Connector ๊ฐœ๋ฐœ ๊ฐ€๋Šฅ

Converter ์„ค์ •

Key์™€ Value์— ๊ฐ๊ฐ Converter ์„ค์ •

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
  • ์„ค์ •ํ•œ Converter๊ฐ€ Connect Record๋ฅผ Byte Array๋กœ ๋ณ€ํ™˜ํ•œ ํ›„, Kafka๋กœ ์ „๋‹ฌ

  • Converter๋„ ํ•„์š”์‹œ custom๊ฐœ๋ฐœ ๊ฐ€๋Šฅ

Single Message Transform(SMT)

Task์™€ Converter ์‚ฌ์ด์—์„œ ๋ฐ์ดํ„ฐ ๋ณ€ํ™˜์ด ํ•„์š”ํ•œ ๊ฒฝ์šฐ ์‚ฌ์šฉ

TransformDescription

Cast

ํ•„๋“œ ๋˜๋Š” ์ „์ฒด Key ๋˜๋Š” Value๋ฅผ ํŠน์ • ์œ ํ˜•์œผ๋กœ ์บ์ŠคํŠธ(ํƒ€์ž… ๋ณ€ํ™˜) (์˜ˆ: ์ •์ˆ˜ ํ•„๋“œ๋ฅผ ๋” ์ž‘์€ ๋„ˆ๋น„๋กœ ๊ฐ•์ œ ์ ์šฉ)

Drop

๋ ˆ์ฝ”๋“œ์—์„œ Key ๋˜๋Š” Value๋ฅผ ์‚ญ์ œํ•˜๊ณ  null๋กœ ์„ค์ •

InsertField

๋ ˆ์ฝ”๋“œ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ๋˜๋Š” ๊ตฌ์„ฑ๋œ Static Value์˜ ์†์„ฑ์„ ์‚ฌ์šฉํ•˜์—ฌ ํ•„๋“œ๋ฅผ ์‚ฝ์ž…

MaskField

ํ•„๋“œ ์œ ํ˜•์— ๋Œ€ํ•ด ์œ ํšจํ•œ null ๊ฐ’์œผ๋กœ ์ง€์ •๋œ ํ•„๋“œ๋ฅผ ๋งˆ์Šคํ‚น

ReplaceField

ํ•„๋“œ๋ฅผ ํ•„ํ„ฐ๋งํ•˜๊ฑฐ๋‚˜ ์ด๋ฆ„์„ ๋ณ€๊ฒฝ

  • ์•ฝ 20๊ฐœ ์ •๋„์˜ Pre-defined SMT ์ œ๊ณต

SMT ์„ค์ •

"config": {
[...]
"transforms": "addTimeToTopic,labelBar",
"transforms.addTimeToTopic.type": "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.addTimeToTopic.topic.format": "${topic}-${timestamp}",
"transforms.addTimeToTopic.timestamp.format": "YYYYMMDD",
"transforms.labelBar.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.labelBar.renames": "delivery_addr:shipping_addr",
}
  • ์—ฌ๋Ÿฌ ๊ฐœ์˜ SMT์„ ์—ฐ๊ฒฐ(Chaining)ํ•˜์—ฌ ์‚ฌ์šฉ ๊ฐ€๋Šฅ

๋‹จ๊ฑด ๋ฉ”์‹œ์ง€๋ณ„ ๋ฐ์ดํ„ฐ ๋ณ€ํ™˜ ๊ธฐ๋Šฅ

  • Connect Record๋ฅผ ์„ค์ •๋œ SMT์˜ ์ˆœ์„œ์— ๋”ฐ๋ผ ๋ณ€ํ™˜

  • SMT๋„ ํ•„์š”์‹œ custom ๊ฐœ๋ฐœ ๊ฐ€๋Šฅ

  • Transform์„ ์ž์ฃผํ•˜๊ฒŒ ๋˜๋ฉด CPU ์‚ฌ์šฉ๋ฅ ์„ ๊ณ ๋ คํ•ด์•ผํ•œ๋‹ค.

Sink Connector์˜ Data Flow

  • Sink Connector์˜ Data Flow๋Š” Source Connector์˜ ์—ญ๋ฐฉํ–ฅ

Last updated