04.Consumer Rebalance

Consumer์˜ ๋™์ž‘ ๋ฐฉ์‹

Partition์—์„œ ๋ฉ”์‹œ์ง€๋ฅผ Polling

  • Consumer๋Š” ๋ฉ”์‹œ์ง€๋ฅผ ๊ฐ€์ ธ์˜ค๊ธฐ ์œ„ํ•ด์„œ Partition์— ์—ฐ์†์ ์œผ๋กœ Poll ํ•จ

  • ๊ฐ€์ ธ์˜จ ์œ„์น˜๋ฅผ ๋‚˜ํƒ€๋‚ด๋Š” offset ์ •๋ณด๋ฅผ __consumer_offsets Topic์— ์ €์žฅํ•˜์—ฌ ๊ด€๋ฆฌ

Consumer Load Balancing

Consumer Group Id

  • ๋™์ผํ•œ group.id ๋กœ ๊ตฌ์„ฑ๋œ ๋ชจ๋“  Consumer๋“ค์€ ํ•˜๋‚˜์˜ Consumer Group์„ ํ˜•์„ฑ

  • Consumer Group์˜ Consumer๋“ค์€ ์ž‘์—…๋Ÿ‰์„ ์–ด๋Š ์ •๋„ ๊ท ๋“ฑํ•˜๊ฒŒ ๋ถ„ํ• ํ•จ

  • ๋™์ผํ•œ Topic์—์„œ consumeํ•˜๋Š” ์—ฌ๋Ÿฌ Consumer Group์ด ์žˆ์„ ์ˆ˜ ์žˆ์Œ

Partition Assignment

ํ•˜๋‚˜์˜ Consumer Group์—์„œ์˜ Partition ํ• ๋‹น

Partition์„ Consumer์—๊ฒŒ Assign(ํ• ๋‹น) ํ•  ๋•Œ,

  • ํ•˜๋‚˜์˜ Partition์€ ์ง€์ •๋œ Consumer Group๋‚ด์˜ ํ•˜๋‚˜์˜ Consumer ๋งŒ ์‚ฌ์šฉ

  • ๋™์ผํ•œ Key๋ฅผ ๊ฐ€์ง„ ๋ฉ”์‹œ์ง€๋Š” ๋™์ผํ•œ Consumer๊ฐ€ ์‚ฌ์šฉ (Partition ์ˆ˜๋ฅผ ๋ณ€๊ฒฝํ•˜์ง€ ์•Š๋Š” ํ•œ)

  • Consumer์˜ ์„ค์ • ํŒŒ๋ผ๋ฏธํ„ฐ ์ค‘์—์„œ partition.assignment.strategy ๋กœ ํ• ๋‹น ๋ฐฉ์‹ ์กฐ์ •

  • Consumer Group์€ Group Coordinator๋ผ๋Š” ํ”„๋กœ์„ธ์Šค์— ์˜ํ•ด ๊ด€๋ฆฌ๋จ

Consumer Group Coordination

  • Consumer Group์„ ๋“ฑ๋ก

Group Coordinator(ํ•˜๋‚˜์˜ Broker) ์™€ Group Leader(ํ•˜๋‚˜์˜ Consumer)๊ฐ€ ์ƒํ˜ธ์ž‘์šฉ

ํ”„๋กœ์„ธ์Šค

1. Consumer ๋“ฑ๋ก ๋ฐ Group Coordinator ์„ ํƒ

  • hash(group.id) % offsets.topic.num.partitions ์ˆ˜์‹์„ ์‚ฌ์šฉํ•˜์—ฌ group.id๊ฐ€ ์ €์žฅ๋  __consumer_offsets ์˜ Partition์„ ๊ฒฐ์ •

2. JoinGroup ์š”์ฒญ ์ˆœ์„œ์— ๋”ฐ๋ผ Consumer ๋‚˜์—ด

  • Group Coordinator๋Š” Group์˜ Consumers ์นดํƒˆ๋กœ๊ทธ๋ฅผ ์ƒ์„ฑํ•˜๊ธฐ ์ „์— Consumers์˜ JoinGroup ์š”์ฒญ์— ๋Œ€ํ•ด group.initial.rebalance.delay.ms(๊ธฐ๋ณธ๊ฐ’ 3์ดˆ)๋ฅผ ๋Œ€๊ธฐ

  • Consumer๋“ค์ด Consumeํ•  ์ตœ๋Œ€ Partition ์ˆ˜๊นŒ์ง€ JoinGroup ์š”์ฒญ์„ ์ˆ˜์‹ ํ•˜๋Š” ์ˆœ์„œ๋Œ€๋กœ Consumer๋ฅผ ๋‚˜์—ด

3. Group Leader ๊ฒฐ์ • ๋ฐ Partition ํ• ๋‹น

  • JoinGroup ์š”์ฒญ์„ ๋ณด๋‚ด๋Š” ์ตœ์ดˆ Consumer๋Š” Group Leader๋กœ ์ง€์ •๋˜๋ฉฐ, Group Coordinator๋กœ๋ถ€ํ„ฐ Consumer ๋ชฉ๋ก์„ ๋ฐ›์Œ Group Leader๋Š” ๊ตฌ์„ฑ๋œ partition.assignment.strategy๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๊ฐ Consumer์—๊ฒŒ Partition์„ ํ• ๋‹น ์œ„ ๊ทธ๋ฆผ์—์„œ๋Š” Partition๋ณด๋‹ค ๋” ๋งŽ์€ Consumer๊ฐ€ ์žˆ์œผ๋ฏ€๋กœ ๊ฐ Consumer๋Š” Consumeํ•  Partition์ด ์ตœ๋Œ€ 1๊ฐœ ์žˆ์Œ

4. โ€œConsumer โ†’ Partitionโ€ ๋งตํ•‘์ •๋ณด๋ฅผ Group Coordinator์—๊ฒŒ ์ „์†ก

  • Group Leader๋Š” โ€œConsumer โ†’ Partitionโ€ ๋งตํ•‘์ •๋ณด๋ฅผ Group Coordinator์—๊ฒŒ ๋‹ค์‹œ ๋ณด๋ƒ„

  • Group Coordinator๋Š” ๋งตํ•‘์ •๋ณด๋ฅผ ๋ฉ”๋ชจ๋ฆฌ์— ์บ์‹œํ•˜๊ณ  ZooKeeper์— ์œ ์ง€

5. ๊ฐ Consumer์—๊ฒŒ ํ• ๋‹น๋œ Partition ์ •๋ณด๋ฅผ ๋ณด๋ƒ„

  • Group Coordinator๋Š” ๊ฐ Consumer์—๊ฒŒ ํ• ๋‹น๋œ Partition ์ •๋ณด๋ฅผ ๋ณด๋ƒ„

  • ๊ฐ Consumer๋Š” ํ• ๋‹น๋œ Partition์—์„œ Consume์„ ์‹œ์ž‘

์™œ Group Coordinator(a Broker)๊ฐ€ ์ง์ ‘ Partition์„ ํ• ๋‹นํ•˜์ง€ ์•Š๋Š”๊ฐ€?

  • Kafka์˜ ํ•œ ๊ฐ€์ง€ ์›์น™์€ ๊ฐ€๋Šฅํ•œ ํ•œ ๋งŽ์€ ๊ณ„์‚ฐ์„ ํด๋ผ์ด์–ธํŠธ์— ์ˆ˜ํ–‰ํ•˜๋„๋ก ํ•˜์—ฌ, Broker์˜ ๋ถ€๋‹ด์„ ์ค„์ด๋Š” ๊ฒƒ

  • ๋งŽ์€ Consumer Group๊ณผ Consumer๋“ค์ด ์žˆ๊ณ  Broker ํ˜ผ์ž์„œ Rebalance๋ฅผ ์œ„ํ•œ ๊ณ„์‚ฐ์„ ํ•œ๋‹ค๊ณ  ์ƒ๊ฐํ•ด ๋ณด๋ฉด...

    • Broker์— ์—„์ฒญ๋‚œ ๋ถ€๋‹ด

    • ์ด๋Ÿฌํ•œ ๊ณ„์‚ฐ์„ Broker๊ฐ€ ์•„๋‹Œ ํด๋ผ์ด์–ธํŠธ์—๊ฒŒ ์˜คํ”„๋กœ๋“œ(Offload)ํ•˜๋Š” ๊ฒƒ์ด ๊ฐ€์žฅ ๋ฐ”๋žŒ์งํ•จ

Consumer Rebalancing Trigger

๋ถˆํ•„์š”ํ•œ Rebalancing์€ ํ”ผํ•ด์•ผ ํ•จ

Consumer Rebalancing์‹œ Consumer๋“ค์€ ๋ฉ”์‹œ์ง€๋ฅผ Consumeํ•˜์ง€ ๋ชปํ•จ ๋”ฐ๋ผ์„œ, ๋ถˆํ•„์š”ํ•œ Rebalancing์€ ๋ฐ˜๋“œ์‹œ ํ”ผํ•ด์•ผ ํ•จ

Rebalancing Trigger

  • Consumer๊ฐ€ Consumer Group์—์„œ ํƒˆํ‡ด

  • ์‹ ๊ทœ Consumer๊ฐ€ Consumer Group์— ํ•ฉ๋ฅ˜

  • Consumer๊ฐ€ Topic ๊ตฌ๋…์„ ๋ณ€๊ฒฝ

  • Consumer Group์€ Topic ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ์˜ ๋ณ€๊ฒฝ ์‚ฌํ•ญ์„ ์ธ์ง€ (์˜ˆ: Partition ์ฆ๊ฐ€)

Rebalancing Process

  1. Group Coordinator๋Š” heartbeats์˜ ํ”Œ๋ž˜๊ทธ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ Consumer์—๊ฒŒ Rebalance ์‹ ํ˜ธ๋ฅผ ๋ณด๋ƒ„

  2. Consumer๊ฐ€ ์ผ์‹œ ์ค‘์ง€ํ•˜๊ณ  Offset์„ Commit

  3. Consumer๋Š” Consumer Group์˜ ์ƒˆ๋กœ์šด โ€Generation"์— ๋‹ค์‹œ ํ•ฉ๋ฅ˜

  4. Partition ์žฌํ• ๋‹น

  5. Consumer๋Š” ์ƒˆ Partition์—์„œ ๋‹ค์‹œ Consume์„ ์‹œ์ž‘

Consumer Heartbeats

Consumer ์žฅ์• ๋ฅผ ์ธ์ง€ํ•˜๊ธฐ ์œ„ํ•จ

  • Consumer๋Š” poll()๊ณผ ๋ณ„๋„๋กœ ๋ฐฑ๊ทธ๋ผ์šด๋“œ Thread์—์„œ Heartbeats๋ฅผ ๋ณด๋ƒ„

    • heartbeat.interval.ms (๊ธฐ๋ณธ๊ฐ’ : 3 ์ดˆ)

  • ์•„๋ž˜ ์‹œ๊ฐ„ ๋™์•ˆ Heartbeats๊ฐ€ ์ˆ˜์‹ ๋˜์ง€ ์•Š์œผ๋ฉด Consumer๋Š” Consumer Group์—์„œ ์‚ญ์ œ

    • session.timeout.ms (๊ธฐ๋ณธ๊ฐ’ : 10 ์ดˆ)

  • poll()์€ Heartbeats์™€ ์ƒ๊ด€์—†์ด ์ฃผ๊ธฐ์ ์œผ๋กœ ํ˜ธ์ถœ๋˜์–ด์•ผ ํ•จ

    • max.poll.interval.ms (๊ธฐ๋ณธ๊ฐ’ : 5 ๋ถ„)

๊ณผ๋„ํ•œ Rebalancing์„ ํ”ผํ•˜๋Š” ๋ฐฉ๋ฒ•

์„ฑ๋Šฅ ์ตœ์ ํ™”์— ํ•„์ˆ˜

  1. Consumer Group ๋ฉค๋ฒ„ ๊ณ ์ •

    • Group์˜ ๊ฐ Consumer์—๊ฒŒ ๊ณ ์œ ํ•œ group.instance.id ๋ฅผ ํ• ๋‹นํ•ฉ๋‹ˆ๋‹ค.

    • Consumer๋Š” LeaveGroupRequest๋ฅผ ์‚ฌ์šฉํ•˜์ง€ ์•Š์•„์•ผ ํ•จ

    • Rejoin(์žฌ๊ฐ€์ž…)์€ ์•Œ๋ ค์ง„ group.instance.id ์— ๋Œ€ํ•œ Rebalance๋ฅผ triggerํ•˜์ง€ ์•Š์Œ

  2. session.timeout.ms ํŠœ๋‹

    • heartbeat.interval.ms๋ฅผ session.timeout.ms์˜ 1/3๋กœ ์„ค์ •

    • group.min.session.timeout.ms (Default: 6 seconds) ์™€ group.max.session.timeout.ms (Default: 5 minutes) ์˜ ์‚ฌ์ด๊ฐ’

    • ์žฅ์  : Consumer๊ฐ€ Rejoin(์žฌ๊ฐ€์ž…)ํ•  ์ˆ˜ ์žˆ๋Š” ๋” ๋งŽ์€ ์‹œ๊ฐ„์„ ์ œ๊ณต

    • ๋‹จ์  : Consumer ์žฅ์• ๋ฅผ ๊ฐ์ง€ํ•˜๋Š” ๋ฐ ์‹œ๊ฐ„์ด ๋” ์˜ค๋ž˜ ๊ฑธ๋ฆผ

  3. max.poll.interval.ms ํŠœ๋‹

    • Consumer์—๊ฒŒ poll()ํ•œ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋Š” ์ถฉ๋ถ„ํ•œ ์‹œ๊ฐ„ ์ œ๊ณต

    • ๋„ˆ๋ฌด ํฌ๊ฒŒ ํ•˜๋ฉด ์•ˆ ๋จ

Last updated