09.ExactlyOnceSemantics(EOS)2
Transaction
Transaction์ ๊ตฌํํ๊ธฐ ์ํด, ๋ช ๊ฐ์ง ์๋ก์ด ๊ฐ๋
๋ค์ด ๋์
Transaction Coordinator Consumer Group Coordinator์ ๋น์ทํ๊ฒ, ๊ฐ Producer์๊ฒ๋ Transaction Coordinator๊ฐ ํ ๋น๋๋ฉฐ, PID ํ ๋น ๋ฐ Transaction ๊ด๋ฆฌ์ ๋ชจ๋ ๋ก์ง์ ์ํ
Transaction Log ์๋ก์ด Internal Kafka Topic์ผ๋ก์จ, Consumer Offset Topic๊ณผ ์ ์ฌํ๊ฒ, ๋ชจ๋ Transaction์ ์๊ตฌ์ ์ด๊ณ ๋ณต์ ๋ Record๋ฅผ ์ ์ฅํ๋ Transaction Coordinator์ ์ํ ์ ์ฅ์
TransactionalId Producer๋ฅผ ๊ณ ์ ํ๊ฒ ์๋ณํ๊ธฐ ์ํด ์ฌ์ฉ๋๋ฉฐ, ๋์ผํ TransactionalId๋ฅผ ๊ฐ์ง Producer์ ๋ค๋ฅธ ์ธ์คํด์ค๋ค์ ์ด์ ์ธ์คํด์ค์ ์ํด ๋ง๋ค์ด์ง ๋ชจ๋ Transaction์ ์ฌ๊ฐ(๋๋ ์ค๋จ)ํ ์ ์์
Broker Configs
transactional.id.timeout.ms
Transaction Coordinator๊ฐ Producer TransactionalId๋ก๋ถํฐ Transaction ์ํ ์ ๋ฐ์ดํธ๋ฅผ ์์ ํ์ง ์๊ณ ์ฌ์ ์ ๋ง๋ฃ๋๊ธฐ ์ ์ ๋๊ธฐํ๋ ์ต๋ ์๊ฐ(ms)
604800000 (7 days)
max.transaction.timeout.ms
Transaction์ ํ์ฉ๋๋ ์ต๋ timeout ์๊ฐ
Client๊ฐ ์์ฒญํ Transaction ์๊ฐ์ด ์ด ์๊ฐ์ ์ด๊ณผํ๋ฉด Broker๋ InitPidRequest์์ InvalidTransactionTimeout ์ค๋ฅ๋ฅผ ๋ฐํ
Producer๊ฐ Transaction์ ํฌํจ๋ Topic์์ ์ฝ๋ Consumer๋ฅผ ์ง์ฐ์ํฌ ์ ์๋ ๋๋ฌด ํฐ ์๊ฐ ์ด๊ณผ๋ฅผ ๋ฐฉ์ง
900000 (15 min)
transaction.state.log.replication.factor
Transaction State Topic์ Replication Factor
3
transaction.state.log.num.partitions
Transaction State Topic์ Partition ๊ฐ์
50
transaction.state.log.min.isr
Transaction State Topic์ min ISR ๊ฐ์
2
transaction.state.log.segment.bytes
Transaction State Topic์ Segment ํฌ๊ธฐ
104857600 bytes
Producer Configs
enable.idempotence
๋นํ์ฑํ๋ ๊ฒฝ์ฐ Transaction ๊ธฐ๋ฅ์ ์ฌ์ฉํ ์ ์์
ํ์ฑํ(true)ํ๊ณ acks=all, retries > 1, max.inflight.requests.per.connection=1 ์ ๊ฐ์ด ์ฌ์ฉํด์ผ ํจ
false
transaction.timeout.ms
Transaction Coordinator๊ฐ ์งํ ์ค์ธ Transaction์ ์ฌ์ ์ ์ค๋จํ๊ธฐ ์ ์ Producer์ Transaction ์ํ ์ ๋ฐ์ดํธ๋ฅผ ๊ธฐ๋ค๋ฆฌ๋ ์ต๋ ์๊ฐ(ms)
์ด ๊ตฌ์ฑ ๊ฐ์ InitPidRequest์ ํจ๊ป Transaction Coordinator์๊ฒ ์ ์ก
์ด ๊ฐ์ด Broker์ max.transaction.timeout.ms ์ค์ ๋ณด๋ค ํฌ๋ฉด 'InvalidTransactionTimeout' ์ค๋ฅ์ ํจ๊ป ์์ฒญ์ด ์คํจ
60000 (60 sec)
transactional.id
Transaction ์ ๋ฌ์ ์ฌ์ฉํ TransactionalId
์ด๋ฅผ ํตํด ํด๋ผ์ด์ธํธ๋ ์๋ก์ด Transaction์ ์์ํ๊ธฐ ์ ์ ๋์ผํ TransactionalId๋ฅผ ์ฌ์ฉํ๋ Transaction์ด ์๋ฃ๋์์์ ๋ณด์ฅํ ์ ์์ผ๋ฏ๋ก ์ฌ๋ฌ Producer session์ ๊ฑธ์ณ ์๋ ์์ ์ฑ ์๋ฏธ ์ฒด๊ณ๋ฅผ ์ฌ์ฉํ ์ ์์
TransactionalId๊ฐ ๋น์ด์์ผ๋ฉด(default), Producer๋ Idempotent Delivery ๋ง์ผ๋ก ์ ํ
TransactionalId๊ฐ ๊ตฌ์ฑ๋ ๊ฒฝ์ฐ, ๋ฐ๋์ enable.idempotence๋ฅผ ํ์ฑํํด์ผ ํจ
์์
Consumer Configs
isolation.level
read_uncommitted: Offset ์์๋ก Commit๋ ๋ฉ์์ง์ Commit๋์ง ์์ ๋ฉ์์ง๋ฅผ ๋ชจ๋ ์ฌ์ฉ
read_committed: Non-Transaction ๋ฉ์์ง ๋๋ Commit๋ Transaction ๋ฉ์์ง๋ง Offset ์์๋ก ์ฌ์ฉ
read_uncommitted
enable.auto.commit
false : Consumer Offset์ ๋ํ Auto Commit ์ Off
true
Consumer๊ฐ ์ค๋ณตํด์ ๋ฐ์ดํฐ ์ฒ๋ฆฌํ๋ ๊ฒ์ ๋ํด ๋ณด์ฅํ์ง ์์ผ๋ฏ๋ก, Consumer์ ์ค๋ณต์ฒ๋ฆฌ๋ ๋ฐ๋ก ๋ก์ง์ ์์ฑํด์ผ ํจ(Idempotent Consumer)
์๋ฅผ ๋ค์ด, ๋ฉ์์ง๋ฅผ ์ฑ๊ณต์ ์ผ๋ก ์ฌ์ฉํ ํ Kafka Consumer๋ฅผ ์ด์ Offset์ผ๋ก ๋๊ฐ์ผ๋ฉด ํด๋น Offset์์ ์ต์ Offset๊น์ง ๋ชจ๋ ๋ฉ์์ง๋ฅผ ๋ค์ ์์ ํ๊ฒ ๋จ
Transaction Data Flow
KIP-98 : Exactly Once Delivery and Transactional Messaginghttps://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
initTransactions
poll ๋ก Source Topic์์ record๋ฅผ ๊ฐ์ ธ์ด
Transaction์ ์์
record๋ก ๋น์ฆ๋์ค๋ก์ง ์ํ ํ, ๊ฒฐ๊ณผ record๋ฅผ Target Topic์ผ๋ก send
sendOffsetsToTransaction์ ํธ์ถํ์ฌ consume(poll)ํ Source Topic์ consumer offset์ commit
commitTransaction ๋๋ abortTransaction ์ผ๋ก Transaction Commit ๋๋ Rollback์ํ
Transaction ์ฒ๋ฆฌ ํ๋ก์ธ์ค
Transactions Coordinator ์ฐพ๊ธฐ Producer๊ฐ initTransactions()๋ฅผ ํธ์ถํ์ฌ Broker์๊ฒ FindCoordinatorRequest๋ฅผ ๋ณด๋ด์ Transaction Coordinator์ ์์น๋ฅผ ์ฐพ์ Transaction Coordinator๋ PID๋ฅผ ํ ๋น
Producer ID ์ป๊ธฐ
Producer๊ฐ Transaction Coordinator์๊ฒ InitPidRequest๋ฅผ ๋ณด๋ด์(TransactionalId๋ฅผ ์ ๋ฌ) Producer์ PID๋ฅผ ๊ฐ์ ธ์ด
PID์ Epoch๋ฅผ ๋์ฌ Producer์ ์ด์ Zombie ์ธ์คํด์ค๊ฐ ์ฐจ๋จ๋๊ณ Transaction์ ์งํํ ์ ์๋๋ก ํจ
ํด๋น PID์ ๋ํ ๋งคํ์ด 2a๋จ๊ณ์์ Transaction Log์ ๊ธฐ๋ก
Transaction ์์ Producer๊ฐ beginTransactions()๋ฅผ ํธ์ถํ์ฌ ์ Transaction์ ์์์ ์๋ฆผ Producer๋ Transaction์ด ์์๋์์์ ๋ํ๋ด๋ ๋ก์ปฌ ์ํ๋ฅผ ๊ธฐ๋ก ์ฒซ ๋ฒ์งธ Record๊ฐ ์ ์ก๋ ๋๊น์ง Transaction Coordinator์ ๊ด์ ์์๋ Transaction์ด ์์๋์ง ์์
AddPartitionsToTxnRequest Producer๋ Transaction์ ์ผ๋ถ๋ก ์ TopicPartition์ด ์ฒ์ ๊ธฐ๋ก๋ ๋ ์ด ์์ฒญ์ Transaction Coordinator์๊ฒ ๋ณด๋ ์ด TopicPartition์ Transaction์ ์ถ๊ฐํ๋ฉด Transaction Coordinator๊ฐ 4.1a ๋จ๊ณ์์ ๊ธฐ๋ก Transaction์ ์ถ๊ฐ๋ ์ฒซ ๋ฒ์งธ Partition์ธ ๊ฒฝ์ฐ Transaction Coordinator๋ Transaction Timer๋ ์์
ProduceRequest Producer๋ ํ๋ ์ด์์ ProduceRequests(Producer์ send()์์ ์์๋จ)๋ฅผ ํตํด User Topic Partitions์ ๋ฉ์์ง๋ฅผ Write ์ด๋ฌํ ์์ฒญ์๋ 4.2a ์ ํ์๋ ๋๋ก PID, Epoch ๋ฐ Sequence Number๊ฐ ํฌํจ
AddOffsetCommitsToTxnRequest
Producer์๋ Consume๋๊ฑฐ๋ Produce๋๋ ๋ฉ์์ง๋ฅผ Batch ์ฒ๋ฆฌํ ์ ์๋ sendOffsetsToTransaction() ๊ฐ ์์
sendOffsetsToTransaction ๋ฉ์๋๋ groupId๊ฐ ์๋ AddOffsetCommitsToTxnRequests๋ฅผ Transaction Coordinator์๊ฒ ๋ณด๋
์ฌ๊ธฐ์ Transaction Coordinator ๋ ๋ด๋ถ __consumer_offsets Topic์์ ์ด Consumer Group์ ๋ํ TopicPartition์ ์ถ๋ก ํจ
Transaction Coordinator๋ 4.3a ๋จ๊ณ์์ Transaction Log์ ์ด Topic Partition์ ์ถ๊ฐ๋ฅผ ๊ธฐ๋ก
TxnOffsetCommitRequest Producer๋ __consumer_offsets Topic์์ Offset์ ์ ์งํ๊ธฐ ์ํด TxnOffsetCommitRequest๋ฅผ Consumer Coordinator์๊ฒ ๋ณด๋ Consumer Coordinator๋ ์ ์ก๋๋ PID ๋ฐ Producer Epoch๋ฅผ ์ฌ์ฉํ์ฌ Producer๊ฐ ์ด ์์ฒญ์ ํ ์ ์๋์ง(Zombie๊ฐ ์๋) ํ์ธ Transaction์ด Commit ๋ ๋๊น์ง ํด๋น Offset์ ์ธ๋ถ์์ ๋ณผ ์ ์์
EndTxnRequest Producer๋ Transaction์ ์๋ฃํ๊ธฐ ์ํด commitTransaction() ๋๋ abortTransaction()์ ํธ์ถ Producer๋ Commit๋๊ฑฐ๋ Abort๋๋์ง๋ฅผ ๋ํ๋ด๋ ๋ฐ์ดํฐ์ ํจ๊ป Transaction Coordinator์๊ฒ EndTxnRequest๋ฅผ ๋ณด๋ Transaction Log์ PREPARE_COMMIT ๋๋ PREPARE_ABORT ๋ฉ์์ง๋ฅผ write
WriteTxnMarkerRequest Transaction Coordinator๊ฐ Transaction์ ํฌํจ๋ ๊ฐ TopicPartition์ Leader์๊ฒ ์ด ์์ฒญ์ ๋ณด๋ ์ด ์์ฒญ์ ๋ฐ์ ๊ฐ Broker๋ COMMIT(PID) ๋๋ ABORT(PID) ์ ์ด ๋ฉ์์ง๋ฅผ ๋ก๊ทธ์ ๊ธฐ๋ก __consumer_offsets Topic์๋ Commit (๋๋ Abort) ๊ฐ ๋ก๊ทธ์ ๊ธฐ๋ก Consumer Coordinator๋ Commit์ ๊ฒฝ์ฐ ์ด๋ฌํ ์คํ์ ์ ๊ตฌ์ฒดํํ๊ฑฐ๋ Abort์ ๊ฒฝ์ฐ ๋ฌด์ํด์ผ ํ๋ค๋ ์๋ฆผ์ ๋ฐ์
Writing the final Commit or Abort Message Transaction Coordinator๋ Transaction์ด ์๋ฃ๋์์์ ๋ํ๋ด๋ ์ต์ข COMMITTED ๋๋ ABORTED๋ฅผ Transaction Log์ ๊ธฐ๋ก ์ด ์์ ์์ Transaction Log์ ์๋ Transaction๊ณผ ๊ด๋ จ๋ ๋๋ถ๋ถ์ ๋ฉ์์ง๋ฅผ ์ ๊ฑฐํ ์ ์์ Timestamp์ ํจ๊ป ์๋ฃ๋ Transaction์ PID๋ง ์ ์งํ๋ฉด ๋๋ฏ๋ก ๊ฒฐ๊ตญ Producer์ ๋ํ TransactionalId->PID ๋งคํ์ ์ ๊ฑฐํ ์ ์์
Confluent Doc
Last updated