01.Producer Acks, Batch, Page Cache, Flush
Producer Acks
acks ์ค์ ์ ์์ฒญ์ด ์ฑ๊ณตํ ๋๋ฅผ ์ ์ํ๋๋ฐ ์ฌ์ฉ๋๋ Producer์ ์ค์ ํ๋ Parameter (Producer Parameter ์ค ํ๋)
Ack = 0
acks = 0 : ack๊ฐ ํ์ํ์ง ์์. ๋ฉ์์ง ์์ค์ด ๋ค์ ์๋๋ผ๋ ๋น ๋ฅด๊ฒ ๋ฉ์์ง๋ฅผ ๋ณด๋ด์ผ ํ๋ ๊ฒฝ์ฐ์ ์ฌ์ฉ(๊ถ์ฅ X)
Ack = 1
acks=1 : (default ๊ฐ) Leader๊ฐ ๋ฉ์์ง๋ฅผ ์์ ํ๋ฉด ack๋ฅผ ๋ณด๋. Leader๊ฐ Producer์๊ฒ ACK๋ฅผ ๋ณด๋ธ ํ, Follower๊ฐ ๋ณต์ ํ๊ธฐ ์ ์ Leader์ ์ฅ์ ๊ฐ ๋ฐ์ํ๋ฉด ๋ฉ์์ง๊ฐ ์์ค. โAt most once(์ต๋ ํ ๋ฒ)โ ์ ์ก์ ๋ณด์ฅ
Ack = -1
acks=-1 : acks=all ๊ณผ ๋์ผ. ๋ฉ์์ง๊ฐ Leader๊ฐ ๋ชจ๋ Replica๊น์ง Commit ๋๋ฉด ack๋ฅผ ๋ณด๋. Leader๋ฅผ ์์ด๋ ๋ฐ์ดํฐ๊ฐ ์ด์๋จ์ ์ ์๋๋ก ๋ณด์ฅ. ๊ทธ๋ฌ๋ ๋๊ธฐ ์๊ฐ์ด ๋ ๊ธธ๊ณ ํน์ ์คํจ ์ฌ๋ก์์ ๋ฐ๋ณต๋๋ ๋ฐ์ดํฐ ๋ฐ์ ๊ฐ๋ฅ์ฑ ์์. โAt least once(์ต์ ํ ๋ฒ)โ ์ ์ก์ ๋ณด์ฅ
Producer Retry
Parameter | description | Default |
---|---|---|
retries | ๋ฉ์์ง๋ฅผ sendํ๊ธฐ ์ํด ์ฌ์๋ํ๋ ํ์ | MAX_INT |
retry.backoff.ms | ์ฌ์๋ ์ฌ์ด์ ์ถ๊ฐ๋๋ ๋๊ธฐ ์๊ฐ | 100 |
request.timeout.ms | Producer๊ฐ ์๋ต์ ๊ธฐ๋ค๋ฆฌ๋ ์ต๋ ์๊ฐ | 30,000(30์ด) |
delivery.timeout.ms | send() ํ ์ฑ๊ณต ๋๋ ์คํจ๋ฅผ ๋ณด๊ณ ํ๋ ์๊ฐ์ ์ํ | 120,000(2๋ถ) |
retries๋ฅผ ์กฐ์ ํ๋ ๋์ ์ delivery.timeout.ms ์กฐ์ ์ผ๋ก ์ฌ์๋ ๋์์ ์ ์ด
acks = 0 ์์ retry๋ ๋ฌด์๋ฏธ
Producer Batch ์ฒ๋ฆฌ
๋ฉ์์ง๋ฅผ ๋ชจ์์ ํ๋ฒ์ ์ ์ก
Batch ์ฒ๋ฆฌ๋ RPC(Remote Procedure Call)์๋ฅผ ์ค์ฌ์ Broker๊ฐ ์ฒ๋ฆฌํ๋ ์์ ์ด ์ค์ด๋ค๊ธฐ ๋๋ฌธ์ ๋ ๋์ ์ฒ๋ฆฌ๋์ ์ ๊ณต
linger.ms ์ batch.size
linger.ms (default : 0, ์ฆ์ ๋ณด๋)
๋ฉ์์ง๊ฐ ํจ๊ป Batch ์ฒ๋ฆฌ๋ ๋๊น์ง ๋๊ธฐ ์๊ฐ
batch.size (default : 16 KB)
๋ณด๋ด๊ธฐ ์ Batch์ ์ต๋ ํฌ๊ธฐ
Batch ์ฒ๋ฆฌ์ ์ผ๋ฐ์ ์ธ ์ค์ ์ linger.ms=100 ๋ฐ batch.size=1000000 batch.size๋ก ์ง์ ํ ํฌ๊ธฐ๋งํผ ๋ฐ์ดํฐ๊ฐ ์ ์ฌ๋์ง ์๋ ๊ฒฝ์ฐ ๋ฌธ์ ๊ฐ ๋ ์ ์๋ค. ์ฃผ๋ก linger.ms๋ฅผ ํ์ฉ.
Producer Delivery Timeout
send() ํ ์ฑ๊ณต ๋๋ ์คํจ๋ฅผ ๋ณด๊ณ ํ๋ ์๊ฐ์ ์ํ
Message Send ์์ ๋ณด์ฅ
์งํ ์ค(in-flight)์ธ ์ฌ๋ฌ ์์ฒญ(request)์ ์ฌ์๋ํ๋ฉด ์์๊ฐ ๋ณ๊ฒฝ๋ ์ ์์ ๋ฉ์์ง ์์๋ฅผ ๋ณด์ฅํ๋ ค๋ฉด Producer์์ enable.idempotence๋ฅผ true๋ก ์ค์
Multiple in-flight request๋ฅผ ์ ์ก max.in.flight.requests.per.connection=5(default)
Batch0๊ฐ ์คํจํ์ง๋ง, Batch1์ ์ฑ๊ณตํ๋ฉด, Batch1์ด Batch0๋ณด๋ค ๋จผ์ Commit Log์ ์ถ๊ฐ๋์ด ์์๊ฐ ๋ฌ๋ผ์ง
enable.idempotence๋ฅผ ์ฌ์ฉํ๋ฉด, ํ๋์ Batch๊ฐ ์คํจํ๋ฉด, ๊ฐ์ Partition์ผ๋ก ๋ค์ด์ค๋ ํ์ Batch๋ค๋ OutOfOrderSequenceException๊ณผ ํจ๊ป ์คํจ
Page Cache ์ Flush
๋ฉ์์ง๋ Partition์ ๊ธฐ๋ก๋จ
Partition์ Log Segment file๋ก ๊ตฌ์ฑ (๊ธฐ๋ณธ๊ฐ : 1GB๋ง๋ค ์๋ก์ด Segment ์์ฑ)
์ฑ๋ฅ์ ์ํด Log Segment๋ OS Page Cache์ ๊ธฐ๋ก๋จ
๋ก๊ทธ ํ์ผ์ ์ ์ฅ๋ ๋ฉ์์ง์ ๋ฐ์ดํฐ ํ์์ Broker๊ฐ Producer๋ก๋ถํฐ ์์ ํ ๊ฒ, ๊ทธ๋ฆฌ๊ณ Consumer์๊ฒ ๋ณด๋ด๋ ๊ฒ๊ณผ ์ ํํ ๋์ผํ๋ฏ๋ก, Zero-Copy๊ฐ ๊ฐ๋ฅ
Page Cache๋ ๋ค์๊ณผ ๊ฐ์ ๊ฒฝ์ฐ ๋์คํฌ๋ก Flush๋จ
Broker๊ฐ ์์ ํ ์ข ๋ฃ
OS background โFlusher Threadโ ์คํ
Zero-copy
์ ์ก์ ๋ฐ์ดํฐ๊ฐ, User Space์ ๋ณต์ฌ๋์ง ์๊ณ , CPU ๊ฐ์ ์์ด Page Cache์ Network Buffer ์ฌ์ด์์ ์ง์ ์ ์ก๋๋ ๊ฒ์ ์๋ฏธ. ์ด๊ฒ์ ํตํด Broker Heap ๋ฉ๋ชจ๋ฆฌ๋ฅผ ์ ์ฝํ๊ณ ๋ํ ์์ฒญ๋ ์ฒ๋ฆฌ๋์ ์ ๊ณต
Flush ๋๊ธฐ ์ ์ Broker ์ฅ์ ๋ฐ์
OS๊ฐ ๋ฐ์ดํฐ๋ฅผ ๋์คํฌ๋ก Flushํ๊ธฐ ์ ์ Broker์ ์์คํ ์ ์ฅ์ ๊ฐ ๋ฐ์ํ๋ฉด ํด๋น ๋ฐ์ดํฐ๊ฐ ์์ค๋จ
Partition์ด Replication(๋ณต์ )๋์ด ์๋ค๋ฉด, Broker๊ฐ ๋ค์ ์จ๋ผ์ธ ์ํ๊ฐ ๋๋ฉด ํ์์ Leader Replica(๋ณต์ ๋ณธ)์์ ๋ฐ์ดํฐ๊ฐ ๋ณต๊ตฌ๋จ
Replication์ด ์๋ค๋ฉด, ๋ฐ์ดํฐ๋ ์๊ตฌ์ ์ผ๋ก ์์ค๋ ์ ์์
Kafka ์์ฒด Flush ์ ์ฑ
๋ง์ง๋ง Flush ์ดํ์ ๋ฉ์์ง ์(log.flush.interval.messages) ๋๋ ์๊ฐ(log.flush.interval.ms)์ผ๋ก Flush(fsync)๋ฅผ ํธ๋ฆฌ๊ฑฐํ๋๋ก ์ค์ ํ ์ ์์
Kafka๋ ์ด์ ์ฒด์ ์ background Flush ๊ธฐ๋ฅ(์: pdflush)์ ๋ ํจ์จ์ ์ผ๋ก ํ์ฉํ๋๊ฒ์ ์ ํธํ๊ธฐ ๋๋ฌธ์ ์ด๋ฌํ ์ค์ ์ ๊ธฐ๋ณธ์ ์ผ๋ก ๋ฌดํ(๊ธฐ๋ณธ์ ์ผ๋ก fsync ๋นํ์ฑํ)์ผ๋ก ์ค์
์ด๋ฌํ ์ค์ ์ ๊ธฐ๋ณธ๊ฐ์ผ๋ก ์ ์งํ๋ ๊ฒ์ ๊ถ์ฅ
*.log ํ์ผ์ ๋ณด๋ฉด ๋์คํฌ๋ก Flush๋ ๋ฐ์ดํฐ์ ์์ง Flush๋์ง ์์ Page Cache (OS Buffer)์ ์๋ ๋ฐ์ดํฐ๊ฐ ๋ชจ๋ ํ์๋จ
Flush๋ ํญ๋ชฉ๊ณผ Flush๋์ง ์์ ํญ๋ชฉ์ ํ์ํ๋ Linux ๋๊ตฌ(์: vmtouch)๋ ์์
Last updated