04.Consumer Rebalance
Last updated
Last updated
Partition์์ ๋ฉ์์ง๋ฅผ Polling
Consumer๋ ๋ฉ์์ง๋ฅผ ๊ฐ์ ธ์ค๊ธฐ ์ํด์ Partition์ ์ฐ์์ ์ผ๋ก Poll ํจ
๊ฐ์ ธ์จ ์์น๋ฅผ ๋ํ๋ด๋ offset ์ ๋ณด๋ฅผ __consumer_offsets Topic์ ์ ์ฅํ์ฌ ๊ด๋ฆฌ
Consumer Group Id
๋์ผํ group.id ๋ก ๊ตฌ์ฑ๋ ๋ชจ๋ Consumer๋ค์ ํ๋์ Consumer Group์ ํ์ฑ
Consumer Group์ Consumer๋ค์ ์์ ๋์ ์ด๋ ์ ๋ ๊ท ๋ฑํ๊ฒ ๋ถํ ํจ
๋์ผํ Topic์์ consumeํ๋ ์ฌ๋ฌ Consumer Group์ด ์์ ์ ์์
ํ๋์ Consumer Group์์์ Partition ํ ๋น
Partition์ Consumer์๊ฒ Assign(ํ ๋น) ํ ๋,
ํ๋์ Partition์ ์ง์ ๋ Consumer Group๋ด์ ํ๋์ Consumer ๋ง ์ฌ์ฉ
๋์ผํ Key๋ฅผ ๊ฐ์ง ๋ฉ์์ง๋ ๋์ผํ Consumer๊ฐ ์ฌ์ฉ (Partition ์๋ฅผ ๋ณ๊ฒฝํ์ง ์๋ ํ)
Consumer์ ์ค์ ํ๋ผ๋ฏธํฐ ์ค์์ partition.assignment.strategy ๋ก ํ ๋น ๋ฐฉ์ ์กฐ์
Consumer Group์ Group Coordinator๋ผ๋ ํ๋ก์ธ์ค์ ์ํด ๊ด๋ฆฌ๋จ
Consumer Group์ ๋ฑ๋ก
Group Coordinator(ํ๋์ Broker) ์ Group Leader(ํ๋์ Consumer)๊ฐ ์ํธ์์ฉ
1. Consumer ๋ฑ๋ก ๋ฐ Group Coordinator ์ ํ
hash(group.id) % offsets.topic.num.partitions ์์์ ์ฌ์ฉํ์ฌ group.id๊ฐ ์ ์ฅ๋ __consumer_offsets ์ Partition์ ๊ฒฐ์
2. JoinGroup ์์ฒญ ์์์ ๋ฐ๋ผ Consumer ๋์ด
Consumer๋ค์ด Consumeํ ์ต๋ Partition ์๊น์ง JoinGroup ์์ฒญ์ ์์ ํ๋ ์์๋๋ก Consumer๋ฅผ ๋์ด
3. Group Leader ๊ฒฐ์ ๋ฐ Partition ํ ๋น
JoinGroup ์์ฒญ์ ๋ณด๋ด๋ ์ต์ด Consumer๋ Group Leader๋ก ์ง์ ๋๋ฉฐ, Group Coordinator๋ก๋ถํฐ Consumer ๋ชฉ๋ก์ ๋ฐ์ Group Leader๋ ๊ตฌ์ฑ๋ partition.assignment.strategy๋ฅผ ์ฌ์ฉํ์ฌ ๊ฐ Consumer์๊ฒ Partition์ ํ ๋น ์ ๊ทธ๋ฆผ์์๋ Partition๋ณด๋ค ๋ ๋ง์ Consumer๊ฐ ์์ผ๋ฏ๋ก ๊ฐ Consumer๋ Consumeํ Partition์ด ์ต๋ 1๊ฐ ์์
4. โConsumer โ Partitionโ ๋งตํ์ ๋ณด๋ฅผ Group Coordinator์๊ฒ ์ ์ก
Group Leader๋ โConsumer โ Partitionโ ๋งตํ์ ๋ณด๋ฅผ Group Coordinator์๊ฒ ๋ค์ ๋ณด๋
Group Coordinator๋ ๋งตํ์ ๋ณด๋ฅผ ๋ฉ๋ชจ๋ฆฌ์ ์บ์ํ๊ณ ZooKeeper์ ์ ์ง
5. ๊ฐ Consumer์๊ฒ ํ ๋น๋ Partition ์ ๋ณด๋ฅผ ๋ณด๋
Group Coordinator๋ ๊ฐ Consumer์๊ฒ ํ ๋น๋ Partition ์ ๋ณด๋ฅผ ๋ณด๋
๊ฐ Consumer๋ ํ ๋น๋ Partition์์ Consume์ ์์
๋ถํ์ํ Rebalancing์ ํผํด์ผ ํจ
Consumer Rebalancing์ Consumer๋ค์ ๋ฉ์์ง๋ฅผ Consumeํ์ง ๋ชปํจ ๋ฐ๋ผ์, ๋ถํ์ํ Rebalancing์ ๋ฐ๋์ ํผํด์ผ ํจ
Consumer๊ฐ Consumer Group์์ ํํด
์ ๊ท Consumer๊ฐ Consumer Group์ ํฉ๋ฅ
Consumer๊ฐ Topic ๊ตฌ๋ ์ ๋ณ๊ฒฝ
Consumer Group์ Topic ๋ฉํ๋ฐ์ดํฐ์ ๋ณ๊ฒฝ ์ฌํญ์ ์ธ์ง (์: Partition ์ฆ๊ฐ)
Group Coordinator๋ heartbeats์ ํ๋๊ทธ๋ฅผ ์ฌ์ฉํ์ฌ Consumer์๊ฒ Rebalance ์ ํธ๋ฅผ ๋ณด๋
Consumer๊ฐ ์ผ์ ์ค์งํ๊ณ Offset์ Commit
Consumer๋ Consumer Group์ ์๋ก์ด โGeneration"์ ๋ค์ ํฉ๋ฅ
Partition ์ฌํ ๋น
Consumer๋ ์ Partition์์ ๋ค์ Consume์ ์์
Consumer ์ฅ์ ๋ฅผ ์ธ์งํ๊ธฐ ์ํจ
Consumer๋ poll()๊ณผ ๋ณ๋๋ก ๋ฐฑ๊ทธ๋ผ์ด๋ Thread์์ Heartbeats๋ฅผ ๋ณด๋
heartbeat.interval.ms (๊ธฐ๋ณธ๊ฐ : 3 ์ด)
์๋ ์๊ฐ ๋์ Heartbeats๊ฐ ์์ ๋์ง ์์ผ๋ฉด Consumer๋ Consumer Group์์ ์ญ์
session.timeout.ms (๊ธฐ๋ณธ๊ฐ : 10 ์ด)
poll()์ Heartbeats์ ์๊ด์์ด ์ฃผ๊ธฐ์ ์ผ๋ก ํธ์ถ๋์ด์ผ ํจ
max.poll.interval.ms (๊ธฐ๋ณธ๊ฐ : 5 ๋ถ)
์ฑ๋ฅ ์ต์ ํ์ ํ์
Consumer Group ๋ฉค๋ฒ ๊ณ ์
Group์ ๊ฐ Consumer์๊ฒ ๊ณ ์ ํ group.instance.id ๋ฅผ ํ ๋นํฉ๋๋ค.
Consumer๋ LeaveGroupRequest๋ฅผ ์ฌ์ฉํ์ง ์์์ผ ํจ
Rejoin(์ฌ๊ฐ์ )์ ์๋ ค์ง group.instance.id ์ ๋ํ Rebalance๋ฅผ triggerํ์ง ์์
session.timeout.ms ํ๋
heartbeat.interval.ms๋ฅผ session.timeout.ms์ 1/3๋ก ์ค์
group.min.session.timeout.ms (Default: 6 seconds) ์ group.max.session.timeout.ms (Default: 5 minutes) ์ ์ฌ์ด๊ฐ
์ฅ์ : Consumer๊ฐ Rejoin(์ฌ๊ฐ์ )ํ ์ ์๋ ๋ ๋ง์ ์๊ฐ์ ์ ๊ณต
๋จ์ : Consumer ์ฅ์ ๋ฅผ ๊ฐ์งํ๋ ๋ฐ ์๊ฐ์ด ๋ ์ค๋ ๊ฑธ๋ฆผ
max.poll.interval.ms ํ๋
Consumer์๊ฒ poll()ํ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ ์ ์๋ ์ถฉ๋ถํ ์๊ฐ ์ ๊ณต
๋๋ฌด ํฌ๊ฒ ํ๋ฉด ์ ๋จ
Group Coordinator๋ Group์ Consumers ์นดํ๋ก๊ทธ๋ฅผ ์์ฑํ๊ธฐ ์ ์ Consumers์ JoinGroup ์์ฒญ์ ๋ํด (๊ธฐ๋ณธ๊ฐ 3์ด)๋ฅผ ๋๊ธฐ