02.SingleMessageTransform(SMT),Converter
Connect Worker
Connector๋ฅผ ๋ฐฐํฌ/๊ตฌ๋ ์ํค๋ ํ๋ก์ธ์ค
Connector ๋ฐฐํฌ ์ ์ ๋ชจ์ต

Connector ๋ฐฐํฌ
Connecotr Class ์ง์
Connector ๋ฐฐํฌ๋ฅผ ์ํ Configuration์ ์ผ๋ถ
"config": {
[...]
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://postgresql_ip:5432/",
"topics": โsample.orders",
}
Connect Worker์ Connector Instance ๋ฐ Task ์์ฑ

Connector Task๊ฐ Source ์์คํ ์์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์์ Connect Record๋ก ๋ณํ
๋์ ์์คํ ์ ์ฐ๊ฒฐํ๊ธฐ ์ํ Connector๊ฐ ์๋ค๋ฉด, Connect Framework๋ฅผ ๊ธฐ๋ฐ์ผ๋ก Custom Connector ๊ฐ๋ฐ ๊ฐ๋ฅ
Converter ์ค์
Key์ Value์ ๊ฐ๊ฐ Converter ์ค์
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

์ค์ ํ Converter๊ฐ Connect Record๋ฅผ Byte Array๋ก ๋ณํํ ํ, Kafka๋ก ์ ๋ฌ
Converter๋ ํ์์ custom๊ฐ๋ฐ ๊ฐ๋ฅ
Single Message Transform(SMT)
Task์ Converter ์ฌ์ด์์ ๋ฐ์ดํฐ ๋ณํ์ด ํ์ํ ๊ฒฝ์ฐ ์ฌ์ฉ
Cast
ํ๋ ๋๋ ์ ์ฒด Key ๋๋ Value๋ฅผ ํน์ ์ ํ์ผ๋ก ์บ์คํธ(ํ์ ๋ณํ) (์: ์ ์ ํ๋๋ฅผ ๋ ์์ ๋๋น๋ก ๊ฐ์ ์ ์ฉ)
Drop
๋ ์ฝ๋์์ Key ๋๋ Value๋ฅผ ์ญ์ ํ๊ณ null๋ก ์ค์
InsertField
๋ ์ฝ๋ ๋ฉํ๋ฐ์ดํฐ ๋๋ ๊ตฌ์ฑ๋ Static Value์ ์์ฑ์ ์ฌ์ฉํ์ฌ ํ๋๋ฅผ ์ฝ์
MaskField
ํ๋ ์ ํ์ ๋ํด ์ ํจํ null ๊ฐ์ผ๋ก ์ง์ ๋ ํ๋๋ฅผ ๋ง์คํน
ReplaceField
ํ๋๋ฅผ ํํฐ๋งํ๊ฑฐ๋ ์ด๋ฆ์ ๋ณ๊ฒฝ
์ฝ 20๊ฐ ์ ๋์ Pre-defined SMT ์ ๊ณต
SMT ์ค์
"config": {
[...]
"transforms": "addTimeToTopic,labelBar",
"transforms.addTimeToTopic.type": "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.addTimeToTopic.topic.format": "${topic}-${timestamp}",
"transforms.addTimeToTopic.timestamp.format": "YYYYMMDD",
"transforms.labelBar.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.labelBar.renames": "delivery_addr:shipping_addr",
}
์ฌ๋ฌ ๊ฐ์ SMT์ ์ฐ๊ฒฐ(Chaining)ํ์ฌ ์ฌ์ฉ ๊ฐ๋ฅ
๋จ๊ฑด ๋ฉ์์ง๋ณ ๋ฐ์ดํฐ ๋ณํ ๊ธฐ๋ฅ

Connect Record๋ฅผ ์ค์ ๋ SMT์ ์์์ ๋ฐ๋ผ ๋ณํ
SMT๋ ํ์์ custom ๊ฐ๋ฐ ๊ฐ๋ฅ
Transform์ ์์ฃผํ๊ฒ ๋๋ฉด CPU ์ฌ์ฉ๋ฅ ์ ๊ณ ๋ คํด์ผํ๋ค.
Sink Connector์ Data Flow

Sink Connector์ Data Flow๋ Source Connector์ ์ญ๋ฐฉํฅ
Last updated