09.ExactlyOnceSemantics(EOS)2

Transaction

Transaction์„ ๊ตฌํ˜„ํ•˜๊ธฐ ์œ„ํ•ด, ๋ช‡ ๊ฐ€์ง€ ์ƒˆ๋กœ์šด ๊ฐœ๋…๋“ค์ด ๋„์ž…

  • Transaction Coordinator Consumer Group Coordinator์™€ ๋น„์Šทํ•˜๊ฒŒ, ๊ฐ Producer์—๊ฒŒ๋Š” Transaction Coordinator๊ฐ€ ํ• ๋‹น๋˜๋ฉฐ, PID ํ• ๋‹น ๋ฐ Transaction ๊ด€๋ฆฌ์˜ ๋ชจ๋“  ๋กœ์ง์„ ์ˆ˜ํ–‰

  • Transaction Log ์ƒˆ๋กœ์šด Internal Kafka Topic์œผ๋กœ์จ, Consumer Offset Topic๊ณผ ์œ ์‚ฌํ•˜๊ฒŒ, ๋ชจ๋“  Transaction์˜ ์˜๊ตฌ์ ์ด๊ณ  ๋ณต์ œ๋œ Record๋ฅผ ์ €์žฅํ•˜๋Š” Transaction Coordinator์˜ ์ƒํƒœ ์ €์žฅ์†Œ

  • TransactionalId Producer๋ฅผ ๊ณ ์œ ํ•˜๊ฒŒ ์‹๋ณ„ํ•˜๊ธฐ ์œ„ํ•ด ์‚ฌ์šฉ๋˜๋ฉฐ, ๋™์ผํ•œ TransactionalId๋ฅผ ๊ฐ€์ง„ Producer์˜ ๋‹ค๋ฅธ ์ธ์Šคํ„ด์Šค๋“ค์€ ์ด์ „ ์ธ์Šคํ„ด์Šค์— ์˜ํ•ด ๋งŒ๋“ค์–ด์ง„ ๋ชจ๋“  Transaction์„ ์žฌ๊ฐœ(๋˜๋Š” ์ค‘๋‹จ)ํ•  ์ˆ˜ ์žˆ์Œ

Broker Configs

Parameter
Description
Default

transactional.id.timeout.ms

Transaction Coordinator๊ฐ€ Producer TransactionalId๋กœ๋ถ€ํ„ฐ Transaction ์ƒํƒœ ์—…๋ฐ์ดํŠธ๋ฅผ ์ˆ˜์‹ ํ•˜์ง€ ์•Š๊ณ  ์‚ฌ์ „์— ๋งŒ๋ฃŒ๋˜๊ธฐ ์ „์— ๋Œ€๊ธฐํ•˜๋Š” ์ตœ๋Œ€ ์‹œ๊ฐ„(ms)

604800000 (7 days)

max.transaction.timeout.ms

Transaction์— ํ—ˆ์šฉ๋˜๋Š” ์ตœ๋Œ€ timeout ์‹œ๊ฐ„

  • Client๊ฐ€ ์š”์ฒญํ•œ Transaction ์‹œ๊ฐ„์ด ์ด ์‹œ๊ฐ„์„ ์ดˆ๊ณผํ•˜๋ฉด Broker๋Š” InitPidRequest์—์„œ InvalidTransactionTimeout ์˜ค๋ฅ˜๋ฅผ ๋ฐ˜ํ™˜

  • Producer๊ฐ€ Transaction์— ํฌํ•จ๋œ Topic์—์„œ ์ฝ๋Š” Consumer๋ฅผ ์ง€์—ฐ์‹œํ‚ฌ ์ˆ˜ ์žˆ๋Š” ๋„ˆ๋ฌด ํฐ ์‹œ๊ฐ„ ์ดˆ๊ณผ๋ฅผ ๋ฐฉ์ง€

900000 (15 min)

transaction.state.log.replication.factor

Transaction State Topic์˜ Replication Factor

3

transaction.state.log.num.partitions

Transaction State Topic์˜ Partition ๊ฐœ์ˆ˜

50

transaction.state.log.min.isr

Transaction State Topic์˜ min ISR ๊ฐœ์ˆ˜

2

transaction.state.log.segment.bytes

Transaction State Topic์˜ Segment ํฌ๊ธฐ

104857600 bytes

Producer Configs

Parameter
Description
Default

enable.idempotence

  • ๋น„ํ™œ์„ฑํ™”๋œ ๊ฒฝ์šฐ Transaction ๊ธฐ๋Šฅ์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์—†์Œ

  • ํ™œ์„ฑํ™”(true)ํ•˜๊ณ  acks=all, retries > 1, max.inflight.requests.per.connection=1 ์„ ๊ฐ™์ด ์‚ฌ์šฉํ•ด์•ผ ํ•จ

false

transaction.timeout.ms

  • Transaction Coordinator๊ฐ€ ์ง„ํ–‰ ์ค‘์ธ Transaction์„ ์‚ฌ์ „์— ์ค‘๋‹จํ•˜๊ธฐ ์ „์— Producer์˜ Transaction ์ƒํƒœ ์—…๋ฐ์ดํŠธ๋ฅผ ๊ธฐ๋‹ค๋ฆฌ๋Š” ์ตœ๋Œ€ ์‹œ๊ฐ„(ms)

  • ์ด ๊ตฌ์„ฑ ๊ฐ’์€ InitPidRequest์™€ ํ•จ๊ป˜ Transaction Coordinator์—๊ฒŒ ์ „์†ก

  • ์ด ๊ฐ’์ด Broker์˜ max.transaction.timeout.ms ์„ค์ •๋ณด๋‹ค ํฌ๋ฉด 'InvalidTransactionTimeout' ์˜ค๋ฅ˜์™€ ํ•จ๊ป˜ ์š”์ฒญ์ด ์‹คํŒจ

60000 (60 sec)

transactional.id

  • Transaction ์ „๋‹ฌ์— ์‚ฌ์šฉํ•  TransactionalId

  • ์ด๋ฅผ ํ†ตํ•ด ํด๋ผ์ด์–ธํŠธ๋Š” ์ƒˆ๋กœ์šด Transaction์„ ์‹œ์ž‘ํ•˜๊ธฐ ์ „์— ๋™์ผํ•œ TransactionalId๋ฅผ ์‚ฌ์šฉํ•˜๋Š” Transaction์ด ์™„๋ฃŒ๋˜์—ˆ์Œ์„ ๋ณด์žฅํ•  ์ˆ˜ ์žˆ์œผ๋ฏ€๋กœ ์—ฌ๋Ÿฌ Producer session์— ๊ฑธ์ณ ์žˆ๋Š” ์•ˆ์ •์„ฑ ์˜๋ฏธ ์ฒด๊ณ„๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Œ

  • TransactionalId๊ฐ€ ๋น„์–ด์žˆ์œผ๋ฉด(default), Producer๋Š” Idempotent Delivery ๋งŒ์œผ๋กœ ์ œํ•œ

  • TransactionalId๊ฐ€ ๊ตฌ์„ฑ๋œ ๊ฒฝ์šฐ, ๋ฐ˜๋“œ์‹œ enable.idempotence๋ฅผ ํ™œ์„ฑํ™”ํ•ด์•ผ ํ•จ

์—†์Œ

Consumer Configs

Parameter
Description
Default

isolation.level

  • read_uncommitted: Offset ์ˆœ์„œ๋กœ Commit๋œ ๋ฉ”์‹œ์ง€์™€ Commit๋˜์ง€ ์•Š์€ ๋ฉ”์‹œ์ง€๋ฅผ ๋ชจ๋‘ ์‚ฌ์šฉ

  • read_committed: Non-Transaction ๋ฉ”์‹œ์ง€ ๋˜๋Š” Commit๋œ Transaction ๋ฉ”์‹œ์ง€๋งŒ Offset ์ˆœ์„œ๋กœ ์‚ฌ์šฉ

read_uncommitted

enable.auto.commit

false : Consumer Offset์— ๋Œ€ํ•œ Auto Commit ์„ Off

true

  • Consumer๊ฐ€ ์ค‘๋ณตํ•ด์„œ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌํ•˜๋Š” ๊ฒƒ์— ๋Œ€ํ•ด ๋ณด์žฅํ•˜์ง€ ์•Š์œผ๋ฏ€๋กœ, Consumer์˜ ์ค‘๋ณต์ฒ˜๋ฆฌ๋Š” ๋”ฐ๋กœ ๋กœ์ง์„ ์ž‘์„ฑํ•ด์•ผ ํ•จ(Idempotent Consumer)

  • ์˜ˆ๋ฅผ ๋“ค์–ด, ๋ฉ”์‹œ์ง€๋ฅผ ์„ฑ๊ณต์ ์œผ๋กœ ์‚ฌ์šฉํ•œ ํ›„ Kafka Consumer๋ฅผ ์ด์ „ Offset์œผ๋กœ ๋˜๊ฐ์œผ๋ฉด ํ•ด๋‹น Offset์—์„œ ์ตœ์‹  Offset๊นŒ์ง€ ๋ชจ๋“  ๋ฉ”์‹œ์ง€๋ฅผ ๋‹ค์‹œ ์ˆ˜์‹ ํ•˜๊ฒŒ ๋จ

Transaction Data Flow

KIP-98 : Exactly Once Delivery and Transactional Messaginghttps://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging

public class KafkaTransactionsExample {
  
  public static void main(String args[]) {
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);
 
 
    // Note that the โ€˜transactional.idโ€™ configuration _must_ be specified in the
    // producer config in order to use transactions.
    KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);
 
    // We need to initialize transactions once per producer instance. To use transactions,
    // it is assumed that the application id is specified in the config with the key
    // transactional.id.
    //
    // This method will recover or abort transactions initiated by previous instances of a
    // producer with the same app id. Any other transactional messages will report an error
    // if initialization was not performed.
    //
    // The response indicates success or failure. Some failures are irrecoverable and will
    // require a new producer  instance. See the documentation for TransactionMetadata for a
    // list of error codes.
    producer.initTransactions();
     
    while(true) {
      ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
      if (!records.isEmpty()) {
        // Start a new transaction. This will begin the process of batching the consumed
        // records as well
        // as an records produced as a result of processing the input records.
        //
        // We need to check the response to make sure that this producer is able to initiate
        // a new transaction.
        producer.beginTransaction();
         
        // Process the input records and send them to the output topic(s).
        List<ProducerRecord<String, String>> outputRecords = processRecords(records);
        for (ProducerRecord<String, String> outputRecord : outputRecords) {
          producer.send(outputRecord);
        }
         
        // To ensure that the consumed and produced messages are batched, we need to commit
        // the offsets through
        // the producer and not the consumer.
        //
        // If this returns an error, we should abort the transaction.
         
        sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets());
         
      
        // Now that we have consumed, processed, and produced a batch of messages, let's
        // commit the results.
        // If this does not report success, then the transaction will be rolled back.
        producer.endTransaction();
      }
    }
  }
}
  1. initTransactions

  2. poll ๋กœ Source Topic์—์„œ record๋ฅผ ๊ฐ€์ ธ์˜ด

  3. Transaction์„ ์‹œ์ž‘

  4. record๋กœ ๋น„์ฆˆ๋‹ˆ์Šค๋กœ์ง ์ˆ˜ํ–‰ ํ›„, ๊ฒฐ๊ณผ record๋ฅผ Target Topic์œผ๋กœ send

  5. sendOffsetsToTransaction์„ ํ˜ธ์ถœํ•˜์—ฌ consume(poll)ํ•œ Source Topic์— consumer offset์„ commit

  6. commitTransaction ๋˜๋Š” abortTransaction ์œผ๋กœ Transaction Commit ๋˜๋Š” Rollback์ˆ˜ํ–‰

Transaction ์ฒ˜๋ฆฌ ํ”„๋กœ์„ธ์Šค

  1. Transactions Coordinator ์ฐพ๊ธฐ Producer๊ฐ€ initTransactions()๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ Broker์—๊ฒŒ FindCoordinatorRequest๋ฅผ ๋ณด๋‚ด์„œ Transaction Coordinator์˜ ์œ„์น˜๋ฅผ ์ฐพ์Œ Transaction Coordinator๋Š” PID๋ฅผ ํ• ๋‹น

  2. Producer ID ์–ป๊ธฐ

    1. Producer๊ฐ€ Transaction Coordinator์—๊ฒŒ InitPidRequest๋ฅผ ๋ณด๋‚ด์„œ(TransactionalId๋ฅผ ์ „๋‹ฌ) Producer์˜ PID๋ฅผ ๊ฐ€์ ธ์˜ด

    2. PID์˜ Epoch๋ฅผ ๋†’์—ฌ Producer์˜ ์ด์ „ Zombie ์ธ์Šคํ„ด์Šค๊ฐ€ ์ฐจ๋‹จ๋˜๊ณ  Transaction์„ ์ง„ํ–‰ํ•  ์ˆ˜ ์—†๋„๋ก ํ•จ

    3. ํ•ด๋‹น PID์— ๋Œ€ํ•œ ๋งคํ•‘์ด 2a๋‹จ๊ณ„์—์„œ Transaction Log์— ๊ธฐ๋ก

  3. Transaction ์‹œ์ž‘ Producer๊ฐ€ beginTransactions()๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ์ƒˆ Transaction์˜ ์‹œ์ž‘์„ ์•Œ๋ฆผ Producer๋Š” Transaction์ด ์‹œ์ž‘๋˜์—ˆ์Œ์„ ๋‚˜ํƒ€๋‚ด๋Š” ๋กœ์ปฌ ์ƒํƒœ๋ฅผ ๊ธฐ๋ก ์ฒซ ๋ฒˆ์งธ Record๊ฐ€ ์ „์†ก๋  ๋•Œ๊นŒ์ง€ Transaction Coordinator์˜ ๊ด€์ ์—์„œ๋Š” Transaction์ด ์‹œ์ž‘๋˜์ง€ ์•Š์Œ

    1. AddPartitionsToTxnRequest Producer๋Š” Transaction์˜ ์ผ๋ถ€๋กœ ์ƒˆ TopicPartition์ด ์ฒ˜์Œ ๊ธฐ๋ก๋  ๋•Œ ์ด ์š”์ฒญ์„ Transaction Coordinator์—๊ฒŒ ๋ณด๋ƒ„ ์ด TopicPartition์„ Transaction์— ์ถ”๊ฐ€ํ•˜๋ฉด Transaction Coordinator๊ฐ€ 4.1a ๋‹จ๊ณ„์—์„œ ๊ธฐ๋ก Transaction์— ์ถ”๊ฐ€๋œ ์ฒซ ๋ฒˆ์งธ Partition์ธ ๊ฒฝ์šฐ Transaction Coordinator๋Š” Transaction Timer๋„ ์‹œ์ž‘

    2. ProduceRequest Producer๋Š” ํ•˜๋‚˜ ์ด์ƒ์˜ ProduceRequests(Producer์˜ send()์—์„œ ์‹œ์ž‘๋จ)๋ฅผ ํ†ตํ•ด User Topic Partitions์— ๋ฉ”์‹œ์ง€๋ฅผ Write ์ด๋Ÿฌํ•œ ์š”์ฒญ์—๋Š” 4.2a ์— ํ‘œ์‹œ๋œ ๋Œ€๋กœ PID, Epoch ๋ฐ Sequence Number๊ฐ€ ํฌํ•จ

    3. AddOffsetCommitsToTxnRequest

      Producer์—๋Š” Consume๋˜๊ฑฐ๋‚˜ Produce๋˜๋Š” ๋ฉ”์‹œ์ง€๋ฅผ Batch ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋Š” sendOffsetsToTransaction() ๊ฐ€ ์žˆ์Œ

      sendOffsetsToTransaction ๋ฉ”์„œ๋“œ๋Š” groupId๊ฐ€ ์žˆ๋Š” AddOffsetCommitsToTxnRequests๋ฅผ Transaction Coordinator์—๊ฒŒ ๋ณด๋ƒ„

      ์—ฌ๊ธฐ์„œ Transaction Coordinator ๋Š” ๋‚ด๋ถ€ __consumer_offsets Topic์—์„œ ์ด Consumer Group์— ๋Œ€ํ•œ TopicPartition์„ ์ถ”๋ก ํ•จ

      Transaction Coordinator๋Š” 4.3a ๋‹จ๊ณ„์—์„œ Transaction Log์— ์ด Topic Partition์˜ ์ถ”๊ฐ€๋ฅผ ๊ธฐ๋ก

    4. TxnOffsetCommitRequest Producer๋Š” __consumer_offsets Topic์—์„œ Offset์„ ์œ ์ง€ํ•˜๊ธฐ ์œ„ํ•ด TxnOffsetCommitRequest๋ฅผ Consumer Coordinator์—๊ฒŒ ๋ณด๋ƒ„ Consumer Coordinator๋Š” ์ „์†ก๋˜๋Š” PID ๋ฐ Producer Epoch๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ Producer๊ฐ€ ์ด ์š”์ฒญ์„ ํ•  ์ˆ˜ ์žˆ๋Š”์ง€(Zombie๊ฐ€ ์•„๋‹˜) ํ™•์ธ Transaction์ด Commit ๋  ๋•Œ๊นŒ์ง€ ํ•ด๋‹น Offset์€ ์™ธ๋ถ€์—์„œ ๋ณผ ์ˆ˜ ์—†์Œ

    1. EndTxnRequest Producer๋Š” Transaction์„ ์™„๋ฃŒํ•˜๊ธฐ ์œ„ํ•ด commitTransaction() ๋˜๋Š” abortTransaction()์„ ํ˜ธ์ถœ Producer๋Š” Commit๋˜๊ฑฐ๋‚˜ Abort๋˜๋Š”์ง€๋ฅผ ๋‚˜ํƒ€๋‚ด๋Š” ๋ฐ์ดํ„ฐ์™€ ํ•จ๊ป˜ Transaction Coordinator์—๊ฒŒ EndTxnRequest๋ฅผ ๋ณด๋ƒ„ Transaction Log์— PREPARE_COMMIT ๋˜๋Š” PREPARE_ABORT ๋ฉ”์‹œ์ง€๋ฅผ write

    2. WriteTxnMarkerRequest Transaction Coordinator๊ฐ€ Transaction์— ํฌํ•จ๋œ ๊ฐ TopicPartition์˜ Leader์—๊ฒŒ ์ด ์š”์ฒญ์„ ๋ณด๋ƒ„ ์ด ์š”์ฒญ์„ ๋ฐ›์€ ๊ฐ Broker๋Š” COMMIT(PID) ๋˜๋Š” ABORT(PID) ์ œ์–ด ๋ฉ”์‹œ์ง€๋ฅผ ๋กœ๊ทธ์— ๊ธฐ๋ก __consumer_offsets Topic์—๋„ Commit (๋˜๋Š” Abort) ๊ฐ€ ๋กœ๊ทธ์— ๊ธฐ๋ก Consumer Coordinator๋Š” Commit์˜ ๊ฒฝ์šฐ ์ด๋Ÿฌํ•œ ์˜คํ”„์…‹์„ ๊ตฌ์ฒดํ™”ํ•˜๊ฑฐ๋‚˜ Abort์˜ ๊ฒฝ์šฐ ๋ฌด์‹œํ•ด์•ผ ํ•œ๋‹ค๋Š” ์•Œ๋ฆผ์„ ๋ฐ›์Œ

    3. Writing the final Commit or Abort Message Transaction Coordinator๋Š” Transaction์ด ์™„๋ฃŒ๋˜์—ˆ์Œ์„ ๋‚˜ํƒ€๋‚ด๋Š” ์ตœ์ข… COMMITTED ๋˜๋Š” ABORTED๋ฅผ Transaction Log์— ๊ธฐ๋ก ์ด ์‹œ์ ์—์„œ Transaction Log์— ์žˆ๋Š” Transaction๊ณผ ๊ด€๋ จ๋œ ๋Œ€๋ถ€๋ถ„์˜ ๋ฉ”์‹œ์ง€๋ฅผ ์ œ๊ฑฐํ•  ์ˆ˜ ์žˆ์Œ Timestamp์™€ ํ•จ๊ป˜ ์™„๋ฃŒ๋œ Transaction์˜ PID๋งŒ ์œ ์ง€ํ•˜๋ฉด ๋˜๋ฏ€๋กœ ๊ฒฐ๊ตญ Producer์— ๋Œ€ํ•œ TransactionalId->PID ๋งคํ•‘์„ ์ œ๊ฑฐํ•  ์ˆ˜ ์žˆ์Œ

Confluent Doc

Last updated