04.Consumer Rebalance
Consumer์ ๋์ ๋ฐฉ์
Partition์์ ๋ฉ์์ง๋ฅผ Polling
Consumer๋ ๋ฉ์์ง๋ฅผ ๊ฐ์ ธ์ค๊ธฐ ์ํด์ Partition์ ์ฐ์์ ์ผ๋ก Poll ํจ
๊ฐ์ ธ์จ ์์น๋ฅผ ๋ํ๋ด๋ offset ์ ๋ณด๋ฅผ __consumer_offsets Topic์ ์ ์ฅํ์ฌ ๊ด๋ฆฌ
Consumer Load Balancing
Consumer Group Id
๋์ผํ group.id ๋ก ๊ตฌ์ฑ๋ ๋ชจ๋ Consumer๋ค์ ํ๋์ Consumer Group์ ํ์ฑ
Consumer Group์ Consumer๋ค์ ์์ ๋์ ์ด๋ ์ ๋ ๊ท ๋ฑํ๊ฒ ๋ถํ ํจ
๋์ผํ Topic์์ consumeํ๋ ์ฌ๋ฌ Consumer Group์ด ์์ ์ ์์
Partition Assignment
ํ๋์ Consumer Group์์์ Partition ํ ๋น
Partition์ Consumer์๊ฒ Assign(ํ ๋น) ํ ๋,
ํ๋์ Partition์ ์ง์ ๋ Consumer Group๋ด์ ํ๋์ Consumer ๋ง ์ฌ์ฉ
๋์ผํ Key๋ฅผ ๊ฐ์ง ๋ฉ์์ง๋ ๋์ผํ Consumer๊ฐ ์ฌ์ฉ (Partition ์๋ฅผ ๋ณ๊ฒฝํ์ง ์๋ ํ)
Consumer์ ์ค์ ํ๋ผ๋ฏธํฐ ์ค์์ partition.assignment.strategy ๋ก ํ ๋น ๋ฐฉ์ ์กฐ์
Consumer Group์ Group Coordinator๋ผ๋ ํ๋ก์ธ์ค์ ์ํด ๊ด๋ฆฌ๋จ
Consumer Group Coordination
Consumer Group์ ๋ฑ๋ก
Group Coordinator(ํ๋์ Broker) ์ Group Leader(ํ๋์ Consumer)๊ฐ ์ํธ์์ฉ
ํ๋ก์ธ์ค
1. Consumer ๋ฑ๋ก ๋ฐ Group Coordinator ์ ํ
hash(group.id) % offsets.topic.num.partitions ์์์ ์ฌ์ฉํ์ฌ group.id๊ฐ ์ ์ฅ๋ __consumer_offsets ์ Partition์ ๊ฒฐ์
2. JoinGroup ์์ฒญ ์์์ ๋ฐ๋ผ Consumer ๋์ด
Group Coordinator๋ Group์ Consumers ์นดํ๋ก๊ทธ๋ฅผ ์์ฑํ๊ธฐ ์ ์ Consumers์ JoinGroup ์์ฒญ์ ๋ํด group.initial.rebalance.delay.ms(๊ธฐ๋ณธ๊ฐ 3์ด)๋ฅผ ๋๊ธฐ
Consumer๋ค์ด Consumeํ ์ต๋ Partition ์๊น์ง JoinGroup ์์ฒญ์ ์์ ํ๋ ์์๋๋ก Consumer๋ฅผ ๋์ด
3. Group Leader ๊ฒฐ์ ๋ฐ Partition ํ ๋น
JoinGroup ์์ฒญ์ ๋ณด๋ด๋ ์ต์ด Consumer๋ Group Leader๋ก ์ง์ ๋๋ฉฐ, Group Coordinator๋ก๋ถํฐ Consumer ๋ชฉ๋ก์ ๋ฐ์ Group Leader๋ ๊ตฌ์ฑ๋ partition.assignment.strategy๋ฅผ ์ฌ์ฉํ์ฌ ๊ฐ Consumer์๊ฒ Partition์ ํ ๋น ์ ๊ทธ๋ฆผ์์๋ Partition๋ณด๋ค ๋ ๋ง์ Consumer๊ฐ ์์ผ๋ฏ๋ก ๊ฐ Consumer๋ Consumeํ Partition์ด ์ต๋ 1๊ฐ ์์
4. โConsumer โ Partitionโ ๋งตํ์ ๋ณด๋ฅผ Group Coordinator์๊ฒ ์ ์ก
Group Leader๋ โConsumer โ Partitionโ ๋งตํ์ ๋ณด๋ฅผ Group Coordinator์๊ฒ ๋ค์ ๋ณด๋
Group Coordinator๋ ๋งตํ์ ๋ณด๋ฅผ ๋ฉ๋ชจ๋ฆฌ์ ์บ์ํ๊ณ ZooKeeper์ ์ ์ง
5. ๊ฐ Consumer์๊ฒ ํ ๋น๋ Partition ์ ๋ณด๋ฅผ ๋ณด๋
Group Coordinator๋ ๊ฐ Consumer์๊ฒ ํ ๋น๋ Partition ์ ๋ณด๋ฅผ ๋ณด๋
๊ฐ Consumer๋ ํ ๋น๋ Partition์์ Consume์ ์์
์ Group Coordinator(a Broker)๊ฐ ์ง์ Partition์ ํ ๋นํ์ง ์๋๊ฐ?
Kafka์ ํ ๊ฐ์ง ์์น์ ๊ฐ๋ฅํ ํ ๋ง์ ๊ณ์ฐ์ ํด๋ผ์ด์ธํธ์ ์ํํ๋๋ก ํ์ฌ, Broker์ ๋ถ๋ด์ ์ค์ด๋ ๊ฒ
๋ง์ Consumer Group๊ณผ Consumer๋ค์ด ์๊ณ Broker ํผ์์ Rebalance๋ฅผ ์ํ ๊ณ์ฐ์ ํ๋ค๊ณ ์๊ฐํด ๋ณด๋ฉด...
Broker์ ์์ฒญ๋ ๋ถ๋ด
์ด๋ฌํ ๊ณ์ฐ์ Broker๊ฐ ์๋ ํด๋ผ์ด์ธํธ์๊ฒ ์คํ๋ก๋(Offload)ํ๋ ๊ฒ์ด ๊ฐ์ฅ ๋ฐ๋์งํจ
Consumer Rebalancing Trigger
๋ถํ์ํ Rebalancing์ ํผํด์ผ ํจ
Consumer Rebalancing์ Consumer๋ค์ ๋ฉ์์ง๋ฅผ Consumeํ์ง ๋ชปํจ ๋ฐ๋ผ์, ๋ถํ์ํ Rebalancing์ ๋ฐ๋์ ํผํด์ผ ํจ
Rebalancing Trigger
Consumer๊ฐ Consumer Group์์ ํํด
์ ๊ท Consumer๊ฐ Consumer Group์ ํฉ๋ฅ
Consumer๊ฐ Topic ๊ตฌ๋ ์ ๋ณ๊ฒฝ
Consumer Group์ Topic ๋ฉํ๋ฐ์ดํฐ์ ๋ณ๊ฒฝ ์ฌํญ์ ์ธ์ง (์: Partition ์ฆ๊ฐ)
Rebalancing Process
Group Coordinator๋ heartbeats์ ํ๋๊ทธ๋ฅผ ์ฌ์ฉํ์ฌ Consumer์๊ฒ Rebalance ์ ํธ๋ฅผ ๋ณด๋
Consumer๊ฐ ์ผ์ ์ค์งํ๊ณ Offset์ Commit
Consumer๋ Consumer Group์ ์๋ก์ด โGeneration"์ ๋ค์ ํฉ๋ฅ
Partition ์ฌํ ๋น
Consumer๋ ์ Partition์์ ๋ค์ Consume์ ์์
Consumer Heartbeats
Consumer ์ฅ์ ๋ฅผ ์ธ์งํ๊ธฐ ์ํจ
Consumer๋ poll()๊ณผ ๋ณ๋๋ก ๋ฐฑ๊ทธ๋ผ์ด๋ Thread์์ Heartbeats๋ฅผ ๋ณด๋
heartbeat.interval.ms (๊ธฐ๋ณธ๊ฐ : 3 ์ด)
์๋ ์๊ฐ ๋์ Heartbeats๊ฐ ์์ ๋์ง ์์ผ๋ฉด Consumer๋ Consumer Group์์ ์ญ์
session.timeout.ms (๊ธฐ๋ณธ๊ฐ : 10 ์ด)
poll()์ Heartbeats์ ์๊ด์์ด ์ฃผ๊ธฐ์ ์ผ๋ก ํธ์ถ๋์ด์ผ ํจ
max.poll.interval.ms (๊ธฐ๋ณธ๊ฐ : 5 ๋ถ)
๊ณผ๋ํ Rebalancing์ ํผํ๋ ๋ฐฉ๋ฒ
์ฑ๋ฅ ์ต์ ํ์ ํ์
Consumer Group ๋ฉค๋ฒ ๊ณ ์
Group์ ๊ฐ Consumer์๊ฒ ๊ณ ์ ํ group.instance.id ๋ฅผ ํ ๋นํฉ๋๋ค.
Consumer๋ LeaveGroupRequest๋ฅผ ์ฌ์ฉํ์ง ์์์ผ ํจ
Rejoin(์ฌ๊ฐ์ )์ ์๋ ค์ง group.instance.id ์ ๋ํ Rebalance๋ฅผ triggerํ์ง ์์
session.timeout.ms ํ๋
heartbeat.interval.ms๋ฅผ session.timeout.ms์ 1/3๋ก ์ค์
group.min.session.timeout.ms (Default: 6 seconds) ์ group.max.session.timeout.ms (Default: 5 minutes) ์ ์ฌ์ด๊ฐ
์ฅ์ : Consumer๊ฐ Rejoin(์ฌ๊ฐ์ )ํ ์ ์๋ ๋ ๋ง์ ์๊ฐ์ ์ ๊ณต
๋จ์ : Consumer ์ฅ์ ๋ฅผ ๊ฐ์งํ๋ ๋ฐ ์๊ฐ์ด ๋ ์ค๋ ๊ฑธ๋ฆผ
max.poll.interval.ms ํ๋
Consumer์๊ฒ poll()ํ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ ์ ์๋ ์ถฉ๋ถํ ์๊ฐ ์ ๊ณต
๋๋ฌด ํฌ๊ฒ ํ๋ฉด ์ ๋จ
Last updated