01.Producer Acks, Batch, Page Cache, Flush
Last updated
Last updated
acks ์ค์ ์ ์์ฒญ์ด ์ฑ๊ณตํ ๋๋ฅผ ์ ์ํ๋๋ฐ ์ฌ์ฉ๋๋ Producer์ ์ค์ ํ๋ Parameter (Producer Parameter ์ค ํ๋)
acks = 0 : ack๊ฐ ํ์ํ์ง ์์. ๋ฉ์์ง ์์ค์ด ๋ค์ ์๋๋ผ๋ ๋น ๋ฅด๊ฒ ๋ฉ์์ง๋ฅผ ๋ณด๋ด์ผ ํ๋ ๊ฒฝ์ฐ์ ์ฌ์ฉ(๊ถ์ฅ X)
acks=1 : (default ๊ฐ) Leader๊ฐ ๋ฉ์์ง๋ฅผ ์์ ํ๋ฉด ack๋ฅผ ๋ณด๋. Leader๊ฐ Producer์๊ฒ ACK๋ฅผ ๋ณด๋ธ ํ, Follower๊ฐ ๋ณต์ ํ๊ธฐ ์ ์ Leader์ ์ฅ์ ๊ฐ ๋ฐ์ํ๋ฉด ๋ฉ์์ง๊ฐ ์์ค. โAt most once(์ต๋ ํ ๋ฒ)โ ์ ์ก์ ๋ณด์ฅ
acks=-1 : acks=all ๊ณผ ๋์ผ. ๋ฉ์์ง๊ฐ Leader๊ฐ ๋ชจ๋ Replica๊น์ง Commit ๋๋ฉด ack๋ฅผ ๋ณด๋. Leader๋ฅผ ์์ด๋ ๋ฐ์ดํฐ๊ฐ ์ด์๋จ์ ์ ์๋๋ก ๋ณด์ฅ. ๊ทธ๋ฌ๋ ๋๊ธฐ ์๊ฐ์ด ๋ ๊ธธ๊ณ ํน์ ์คํจ ์ฌ๋ก์์ ๋ฐ๋ณต๋๋ ๋ฐ์ดํฐ ๋ฐ์ ๊ฐ๋ฅ์ฑ ์์. โAt least once(์ต์ ํ ๋ฒ)โ ์ ์ก์ ๋ณด์ฅ
retries
๋ฉ์์ง๋ฅผ sendํ๊ธฐ ์ํด ์ฌ์๋ํ๋ ํ์
MAX_INT
retry.backoff.ms
์ฌ์๋ ์ฌ์ด์ ์ถ๊ฐ๋๋ ๋๊ธฐ ์๊ฐ
100
request.timeout.ms
Producer๊ฐ ์๋ต์ ๊ธฐ๋ค๋ฆฌ๋ ์ต๋ ์๊ฐ
30,000(30์ด)
delivery.timeout.ms
send() ํ ์ฑ๊ณต ๋๋ ์คํจ๋ฅผ ๋ณด๊ณ ํ๋ ์๊ฐ์ ์ํ
120,000(2๋ถ)
retries๋ฅผ ์กฐ์ ํ๋ ๋์ ์ delivery.timeout.ms ์กฐ์ ์ผ๋ก ์ฌ์๋ ๋์์ ์ ์ด
acks = 0 ์์ retry๋ ๋ฌด์๋ฏธ
๋ฉ์์ง๋ฅผ ๋ชจ์์ ํ๋ฒ์ ์ ์ก
Batch ์ฒ๋ฆฌ๋ RPC(Remote Procedure Call)์๋ฅผ ์ค์ฌ์ Broker๊ฐ ์ฒ๋ฆฌํ๋ ์์ ์ด ์ค์ด๋ค๊ธฐ ๋๋ฌธ์ ๋ ๋์ ์ฒ๋ฆฌ๋์ ์ ๊ณต
linger.ms (default : 0, ์ฆ์ ๋ณด๋)
๋ฉ์์ง๊ฐ ํจ๊ป Batch ์ฒ๋ฆฌ๋ ๋๊น์ง ๋๊ธฐ ์๊ฐ
batch.size (default : 16 KB)
๋ณด๋ด๊ธฐ ์ Batch์ ์ต๋ ํฌ๊ธฐ
Batch ์ฒ๋ฆฌ์ ์ผ๋ฐ์ ์ธ ์ค์ ์ linger.ms=100 ๋ฐ batch.size=1000000 batch.size๋ก ์ง์ ํ ํฌ๊ธฐ๋งํผ ๋ฐ์ดํฐ๊ฐ ์ ์ฌ๋์ง ์๋ ๊ฒฝ์ฐ ๋ฌธ์ ๊ฐ ๋ ์ ์๋ค. ์ฃผ๋ก linger.ms๋ฅผ ํ์ฉ.
send() ํ ์ฑ๊ณต ๋๋ ์คํจ๋ฅผ ๋ณด๊ณ ํ๋ ์๊ฐ์ ์ํ
์งํ ์ค(in-flight)์ธ ์ฌ๋ฌ ์์ฒญ(request)์ ์ฌ์๋ํ๋ฉด ์์๊ฐ ๋ณ๊ฒฝ๋ ์ ์์ ๋ฉ์์ง ์์๋ฅผ ๋ณด์ฅํ๋ ค๋ฉด Producer์์ enable.idempotence๋ฅผ true๋ก ์ค์
Multiple in-flight request๋ฅผ ์ ์ก max.in.flight.requests.per.connection=5(default)
Batch0๊ฐ ์คํจํ์ง๋ง, Batch1์ ์ฑ๊ณตํ๋ฉด, Batch1์ด Batch0๋ณด๋ค ๋จผ์ Commit Log์ ์ถ๊ฐ๋์ด ์์๊ฐ ๋ฌ๋ผ์ง
enable.idempotence๋ฅผ ์ฌ์ฉํ๋ฉด, ํ๋์ Batch๊ฐ ์คํจํ๋ฉด, ๊ฐ์ Partition์ผ๋ก ๋ค์ด์ค๋ ํ์ Batch๋ค๋ OutOfOrderSequenceException๊ณผ ํจ๊ป ์คํจ
๋ฉ์์ง๋ Partition์ ๊ธฐ๋ก๋จ
Partition์ Log Segment file๋ก ๊ตฌ์ฑ (๊ธฐ๋ณธ๊ฐ : 1GB๋ง๋ค ์๋ก์ด Segment ์์ฑ)
์ฑ๋ฅ์ ์ํด Log Segment๋ OS Page Cache์ ๊ธฐ๋ก๋จ
๋ก๊ทธ ํ์ผ์ ์ ์ฅ๋ ๋ฉ์์ง์ ๋ฐ์ดํฐ ํ์์ Broker๊ฐ Producer๋ก๋ถํฐ ์์ ํ ๊ฒ, ๊ทธ๋ฆฌ๊ณ Consumer์๊ฒ ๋ณด๋ด๋ ๊ฒ๊ณผ ์ ํํ ๋์ผํ๋ฏ๋ก, Zero-Copy๊ฐ ๊ฐ๋ฅ
Page Cache๋ ๋ค์๊ณผ ๊ฐ์ ๊ฒฝ์ฐ ๋์คํฌ๋ก Flush๋จ
Broker๊ฐ ์์ ํ ์ข ๋ฃ
OS background โFlusher Threadโ ์คํ
Zero-copy
์ ์ก์ ๋ฐ์ดํฐ๊ฐ, User Space์ ๋ณต์ฌ๋์ง ์๊ณ , CPU ๊ฐ์ ์์ด Page Cache์ Network Buffer ์ฌ์ด์์ ์ง์ ์ ์ก๋๋ ๊ฒ์ ์๋ฏธ. ์ด๊ฒ์ ํตํด Broker Heap ๋ฉ๋ชจ๋ฆฌ๋ฅผ ์ ์ฝํ๊ณ ๋ํ ์์ฒญ๋ ์ฒ๋ฆฌ๋์ ์ ๊ณต
OS๊ฐ ๋ฐ์ดํฐ๋ฅผ ๋์คํฌ๋ก Flushํ๊ธฐ ์ ์ Broker์ ์์คํ ์ ์ฅ์ ๊ฐ ๋ฐ์ํ๋ฉด ํด๋น ๋ฐ์ดํฐ๊ฐ ์์ค๋จ
Partition์ด Replication(๋ณต์ )๋์ด ์๋ค๋ฉด, Broker๊ฐ ๋ค์ ์จ๋ผ์ธ ์ํ๊ฐ ๋๋ฉด ํ์์ Leader Replica(๋ณต์ ๋ณธ)์์ ๋ฐ์ดํฐ๊ฐ ๋ณต๊ตฌ๋จ
Replication์ด ์๋ค๋ฉด, ๋ฐ์ดํฐ๋ ์๊ตฌ์ ์ผ๋ก ์์ค๋ ์ ์์
๋ง์ง๋ง Flush ์ดํ์ ๋ฉ์์ง ์(log.flush.interval.messages) ๋๋ ์๊ฐ(log.flush.interval.ms)์ผ๋ก Flush(fsync)๋ฅผ ํธ๋ฆฌ๊ฑฐํ๋๋ก ์ค์ ํ ์ ์์
Kafka๋ ์ด์ ์ฒด์ ์ background Flush ๊ธฐ๋ฅ(์: pdflush)์ ๋ ํจ์จ์ ์ผ๋ก ํ์ฉํ๋๊ฒ์ ์ ํธํ๊ธฐ ๋๋ฌธ์ ์ด๋ฌํ ์ค์ ์ ๊ธฐ๋ณธ์ ์ผ๋ก ๋ฌดํ(๊ธฐ๋ณธ์ ์ผ๋ก fsync ๋นํ์ฑํ)์ผ๋ก ์ค์
์ด๋ฌํ ์ค์ ์ ๊ธฐ๋ณธ๊ฐ์ผ๋ก ์ ์งํ๋ ๊ฒ์ ๊ถ์ฅ
*.log ํ์ผ์ ๋ณด๋ฉด ๋์คํฌ๋ก Flush๋ ๋ฐ์ดํฐ์ ์์ง Flush๋์ง ์์ Page Cache (OS Buffer)์ ์๋ ๋ฐ์ดํฐ๊ฐ ๋ชจ๋ ํ์๋จ
Flush๋ ํญ๋ชฉ๊ณผ Flush๋์ง ์์ ํญ๋ชฉ์ ํ์ํ๋ Linux ๋๊ตฌ(์: vmtouch)๋ ์์