kafka์ ๋ํด ๊ณต๋ถํด๋ณด์
๐ก์์ฆ ์ ๋ค๋ค ์นดํ์นด, ์นดํ์นด ํ๋ ๊ฑธ๊น?
- ๋๊ท๋ชจ ์ค์๊ฐ ๋ฐ์ดํฐ ์คํธ๋ฆฌ๋ฐ์ ์ํ ๋ถ์ฐ ๋ฉ์ธ์ง ์์คํ
- ๋์ ์ฒ๋ฆฌ๋ ๋ฐ ๊ฐ๋ฐ ํจ์จ์ ์ํ ๋ถ์ฐ ์์คํ ์์ ๊ณ ๊ฐ์ฉ์ฑ๊ณผ ์ ์ฐํจ์ ๊ฐ์ถ ์ฐ๊ณ์์คํ ์ด ํ์
์นดํ์นด Overview
1. Producer & Consumer
- Producer - ๋ฉ์ธ์ง๋ฅผ ์นดํ์นด ๋ธ๋ก์ปค์ ์ ์ฌ(๋ฐํ)ํ๋ ์๋น์ค
- Consumer - ์นดํ์นด ๋ธ๋ก์ปค์ ์ ์ฌ๋ ๋ฉ์์ง๋ฅผ ์ฝ์ด์ค๋(์๋น) ์๋น์ค
- ๋ฉ์ธ์ง๋ฅผ ์ฝ์ ๋๋ง๋ค ํํฐ์ ๋ณ๋ก offset์ ์ ์งํด ์ฒ๋ฆฌํ๋ ๋ฉ์ธ์ง์ ์์น๋ฅผ ์ถ์
- CURRENT-OFFSET
์ปจ์๋จธ๊ฐ ์ด๋๊น์ง ์ฒ๋ฆฌํ๋์ง๋ฅผ ๋ํ๋ด๋ offset์ด๋ฉฐ, ๋์ผํ ๋ฉ์ธ์ง๋ฅผ ์ฌ์ฒ๋ฆฌํ์ง ์๊ณ , ์ฒ๋ฆฌํ์ง ์์ ๋ฉ์ธ์ง๋ฅผ ๊ฑด๋๋ฐ์ง ์๊ธฐ์ํด ๋ง์ง๋ง๊น์ง ์ฒ๋ฆฌํ offset์ ์ ์ฅ(์ปค๋ฐ) ํด์ผํจ
- ๋ง์ฝ ์ค๋ฅ๊ฐ ๋ฐ์ํ๊ฑฐ๋ ๋ฌธ์ ๊ฐ ๋ฐ์ํ ๊ฒฝ์ฐ, ์ปจ์๋จธ ๊ทธ๋ฃน ์ฐจ์์์ --reset-offsets ์ต์ ์ ํตํด ํน์ ์์ ์ผ๋ก offset์ ๋๋๋ฆด ์ ์์
2. Broker
- ์นดํ์นด ์๋ฒ Unit
- Producer์ ๋ฉ์ธ์ง๋ฅผ ๋ฐ์ offset ์ง์ ํ ๋์คํฌ์ ์ ์ฅ
- Consumer์ ํํฐ์ Read์ ์๋ตํด ๋์คํฌ์ ๋ฉ์ธ์ง ์ ์ก
- Cluster ๋ด์์ ๊ฐ 1๊ฐ์ฉ ์กด์ฌํ๋ Role Broker
- Controller
๋ค๋ฅธ ๋ธ๋ก์ปค๋ฅผ ๋ชจ๋ํฐ๋งํ๊ณ ์ฅ์ ๊ฐ ๋ฐ์ํ Broker์ ํน์ ํ ํฝ์ Leader ํํฐ์ ์ด ์กด์ฌํ๋ค๋ฉด, ๋ค๋ฅธ ๋ธ๋ก์ปค์ ํํฐ์ ์ค Leader๋ฅผ ์ฌ๋ถ๋ฐฐํ๋ ์ญํ ์ ์ํ
- Coordinator
์ปจ์๋จธ ๊ทธ๋ฃน์ ๋ชจ๋ํฐ๋งํ๊ณ ํด๋น ๊ทธ๋ฃน ๋ด์ ํน์ ์ปจ์๋จธ๊ฐ ์ฅ์ ๊ฐ ๋ฐ์ํด ๋งค์นญ๋ ํํฐ์ ์ ๋ฉ์ธ์ง๋ฅผ Consume ํ ์ ์๋ ๊ฒฝ์ฐ, ํด๋น ํํฐ์ ์ ๋ค๋ฅธ ์ปจ์๋จธ์๊ฒ ๋งค์นญํด์ฃผ๋ ์ญํ ์ํ(Rebalance)
3.Message
- ์นดํ์นด์์ ์ทจ๊ธํ๋ ๋ฐ์ดํฐ์ ๋จ์๋ก <Key, Message> ํํ๋ก ๊ตฌ์ฑ
4. Topic & Partition
- Topic์ ๋ฉ์ธ์ง๋ฅผ ๋ถ๋ฅํ๋ ๊ธฐ์ค์ด๋ฉฐ N๊ฐ์ Partition์ผ๋ก ๊ตฌ์ฑ
- Partition์ ๋ฐํ๋ ์์๋๋ก ์ปจ์ํจ์ผ๋ก์จ ์์ฐจ์ฒ๋ฆฌ๋ฅผ ๋ณด์ฅ
- ๋์ฉ๋ ํธ๋ํฝ์ ํํฐ์ ์ ๊ฐ์๋งํผ ๋ณ๋ ฌ๋ก ์ฒ๋ฆฌํ ์ ์์ด ๋น ๋ฅธ ์ฒ๋ฆฌ ๊ฐ๋ฅ
- ์ ์ฒด ๋ฉ์ธ์ง์ ๋ฐํ ์์ฐจ์ฒ๋ฆฌ๋ฅผ ๋ณด์ฅํ์ง ์์ง๋ง, ๊ฐ์ ํํฐ์ ์ ๋ฉ์ธ์ง๋ฅผ ๋ํด์๋ ์์ฐจ์ฒ๋ฆฌ๋ฅผ ๋ณด์ฅ
- ๋์์ฑ ์ ์ด์ ๊ฐ๋ ์ ์๊ฐํ์ ๋, ๋์์ ์ฒ๋ฆฌ๋๋ฉด ์๋๋ ์์์ Id ๋ฑ์ ๋ฉ์ธ์ง์ ํค๋ก ์ค์ ํ๋ฉด ์์ฐจ์ฒ๋ฆฌ๊ฐ ๋ณด์ฅ๋์ด์ผ ํ๋ ์ผ์ด์ค๋ ๋ณด์ฅ๋ ๋๋ฉด์ ๋ณ๋ ฌ ์ฒ๋ฆฌ๋ก ๋์ ์ฒ๋ฆฌ๋์ ๋ณด์ฅํ ํ์ด๋ธ๋ฆฌ๋
- Producer์์ ๋ฉ์ธ์ง๋ฅผ ๋ฐํํ ๋, ์ ์ฅ๋ Partition์ ๊ฒฐ์ ํ๊ธฐ ์ํด ๋ฉ์ธ์ง์ ํค ํด์๋ฅผ ํ์ฉํ๋ฉฐ, ํค๊ฐ ์กด์ฌํ์ง ์์ ๊ฒฝ์ฐ ๊ท ํ ์ ์ด๋ฅผ ์ํด
key: "842"
hash: "55478" // "842".hashCode()
partitionCnt: "3"
targetPartition: 2 // 55478 % 3
- Partitioner
๋ฉ์ธ์ง๋ฅผ ๋ฐํํ ๋, ํ ํฝ์ ์ด๋ค ํํฐ์ ์ ์ ์ฅ๋ ์ง ๊ฒฐ์ ํ๋ฉฐ Producer ์ธก์์ ๊ฒฐ์ . ํน์ ๋ฉ์ธ์ง์ ํค๊ฐ ์กด์ฌํ๋ค๋ฉด ํค์ ํด์ ๊ฐ์ ๋งค์นญ๋๋ ํํฐ์ ์ ๋ฐ์ดํฐ๋ฅผ ์ ์กํจ์ผ๋ก์จ ํค๊ฐ ๊ฐ์ ๋ฉ์์ง๋ฅผ ๋ค๊ฑด ๋ฐํํ๋๋ผ๋, ํญ์ ๊ฐ์ ํํฐ์ ์ ๋ฉ์ธ์ง๋ฅผ ์ ์ฌํด ์ฒ๋ฆฌ์์๋ฅผ ๋ณด์ฅํ ์ ์์.
- ํ Partition์ ํ๋์ ์ปจ์๋จธ์์๋ง ์ปจ์ํ ์ ์์
- ํ๋์ ํํฐ์ ์ ์ฌ๋ฌ๊ฐ์ ์ปจ์๋จธ๊ฐ ๋ฉ์์ง๋ฅผ ์ปจ์ํ๋ฉด ๋ฉ์์ง ์ฒ๋ฆฌ์ ์์๋ฅผ ๋ณด์ฅํ ์ ์๊ฒ ๋จ
5. Consumer Group
- ํ๋์ ํ ํฝ์ ๋ฐํ๋ ๋ฉ์ธ์ง๋ฅผ ์ฌ๋ฌ ์๋น์ค๊ฐ ์ปจ์ํ๊ธฐ ์ํด ๊ทธ๋ฃน์ ์ค์
- ํ๋์ ์ฃผ๋ฌธ์๋ฃ ๋ฉ์ธ์ง๋ฅผ ๊ฒฐ์ ์๋น์ค์์๋, ์ํ์๋น์ค์์๋ ์ปจ์
- ๋ณดํต ์๋น ์ฃผ์ฒด์ธ Application ๋จ์๋ก Concumer Group์ ์์ฑ, ๊ด๋ฆฌํจ
- ๊ฐ์ ํ ํฝ์ ๋ํ ์๋น์ฃผ์ฒด๋ฅผ ๋๋ฆฌ๊ณ ์ถ๋ค๋ฉด, ๋ณ๋์ ์ปจ์๋จธ ๊ทธ๋ฃน์ ๋ง๋ค์ด ํ ํฝ๊ตฌ๋ ..
- ํํฐ์ ์ ๊ฐ์๊ฐ ๊ทธ๋ฃน ๋ด ์ปจ์๋จธ ๊ฐ์๋ณด๋ค ๋ง๋ค๋ฉด ์์ฌ ํํฐ์ ์ ๊ฒฝ์ฐ ๋ฉ์ธ์ง๊ฐ ์๋น๋ ์ ์์์ ์๋ฏธํจ
- (์ฐธ๊ณ ) ํฌํฑ์ Patition ๊ฐ์์ Consumer ๊ฐ์์ ๋ฐ๋ฅธ ์๋น
6. Rebalancing
- Consmuer Group์ ๊ฐ์ฉ์ฑ๊ณผ ํ์ฅ์ฑ์ ํ๋ณดํด์ฃผ๋ ๊ฐ๋
- ํน์ ์ปจ์๋จธ๋ก๋ถํฐ ๋ค๋ฅธ ์ปจ์๋จธ๋ก ํํฐ์ ์ ์์ ๊ถ์ ์ด์ ์ํค๋ ํ์
- e.g. Consumer Group ๋ด์ Concumer๊ฐ ์ถ๊ฐ๋ ๊ฒฝ์ฐ, ํน์ ํํฐ์ ์ ์์ ๊ถ์ ์ด์ ์ํค๊ฑฐ๋ ์ค๋ฅ๊ฐ ์๊ธด Consumer๋ก๋ถํฐ ์์ ๊ถ์ ํ์ํด ๋ค๋ฅธ Consumer์ ๋ฐฐ์ ํจ
- (์ฃผ์) ๋ฆฌ๋ฐธ๋ฐ์ฑ ์ค์๋ ์ปจ์๋จธ๊ฐ ๋ฉ์ธ์ง๋ฅผ ์ฝ์ ์ ์์
Rebalancing Case
1. Concumer Group ๋ด์ ์๋ก์ด Consumer ์ถ๊ฐ
2. Consumer Group ๋ด์ ํน์ Consumer ์ฅ์ ๋ก ์๋น ์ค๋จ
3. Topic ๋ด์ ์๋ก์ด Partition ์ถ๊ฐ
7. Cluster
- ๊ณ ๊ฐ์ฉ์ฑ (HA)๋ฅผ ์ํด ์ฌ๋ฌ ์๋ฒ๋ฅผ ๋ฌถ์ด ํน์ ์๋ฒ์ ์ฅ์ ๋ฅผ ๊ทน๋ณตํ ์ ์๋๋ก ๊ตฌ์ฑ
- Broker๊ฐ ์ฆ๊ฐํ ์๋ก ๋ฉ์์ง ์์ , ์ ๋ฌ ์ฒ๋ฆฌ๋์ ๋ถ์ฐ์ํฌ ์ ์์ผ๋ฏ๋ก ํ์ฅ์ ํด๋ฆฌ
- (๋์์ค์ธ ๋ค๋ฅธ Broker์ ์ํฅ ์์ด ํ์ฅ์ด ๊ฐ๋ฅํ๋ฏ๋ก, ํธ๋ํฝ ์์ ์ฆ๊ฐ์ ๋ฐ๋ฅธ ๋ธ๋ก์ปค ์ฆ์ค์ด ์์ฝ๊ฒ ๊ฐ๋ฅ)
8. Replication
- Cluster์ ๊ฐ์ฉ์ฑ์ ๋ณด์ฅํ๋ ๊ฐ๋
- ๊ฐ Partition์ Replica๋ฅผ ๋ง๋ค์ด ๋ฐฑ์ ๋ฐ ์ฅ์ ๊ทน๋ณต
- Leader Replica - ๊ฐ ํํฐ์ ์ 1๊ฐ์ ๋ฆฌ๋ Replica๋ฅผ ๊ฐ์ง๋ค. ๋ชจ๋ Producer, Consumer ์์ฒญ์ ๋ฆฌ๋๋ฅผ ํตํด ์ฒ๋ฆฌ๋๊ฒ ํ์ฌ ์ผ๊ด์ฑ์ ๋ณด์ฅํ๋ค.
- Follwer Replica - ๊ฐ ํํฐ์ ์ ๋ฆฌ๋๋ฅผ ์ ์ธํ Replica์ด๋ฉฐ ๋จ์ํ ๋ฆฌ๋์ ๋ฉ์ธ์ง๋ฅผ ๋ณต์ ํด ๋ฐฑ์ ํ๋ค. ๋ง์ผ, ํํฐ์ ์ ๋ฆฌ๋๊ฐ ์ค๋จ๋๋ ๊ฒฝ์ฐ ํ๋ก์ ์ค ํ๋๋ฅผ ์๋ก์ด ๋ฆฌ๋๋ฅผ ์ ์ถํ๋ค.
๋น๋๊ธฐ ๋ฉ์ธ์ง ํต์ ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ ๋ ์ฃผ์ํ ์
๋น์ฆ๋์ค ๋ก์ง ์คํ ์ดํ์ ์ด๋ฒคํธ๊ฐ ๋ฐํ๋์ง ์์ ๊ฒฝ์ฐ, ํด๋น ์ด๋ฒคํธ๋ฅผ ๋ฐ๋ผ๋ณด๋ Consumer ๋ํ ๋ณธ์ธ์ ๋น์ฆ๋์ค ๋ก์ง์ ์ํํ ์ ์์ผ๋ฏ๋ก ์ ์ฒด์ ์ธ ๋น์ฆ๋์ค ํ๋ฆ์ Hole์ด ์๊ธฐ๊ฑฐ๋ ๋ฐ์ดํฐ ์ ํฉ์ฑ์ ๋ฌธ์ ๊ฐ ์๊ธธ ์ ์๋ค.
์นดํ์นด์ ๋น์ทํ ์์คํ
๋ฉด์ ๋จ๊ณจ ์ง๋ฌธ [Messaging System ๊ฐ ์ฐจ์ด์ ]
Redis
- pub/sub ์ ํตํด MQ ๊ตฌํ ๊ฐ๋ฅ
- publisher ๊ฐ ์ฑ๋์ ๋ฉ์ธ์ง๋ฅผ ๊ฒ์ → ๋ชจ๋ ์ฑ๋ subscriber ๊ฐ ๋ฉ์ธ์ง๋ฅผ ๋ฐ์ ์ฒ๋ฆฌ
- channel : ์ด๋ฒคํธ ์ ์ฅ x, subscriber ์์ผ๋ฉด ์์ค, subscriber ๋ ์ฌ๋ฌ channel ๊ตฌ๋ ๊ฐ๋ฅ
RabbitMQ
- AMQP ํ๋กํ ์ฝ์ ๋ฉ์ธ์ง ํ
- producer ๋ฉ์ธ์ง ์ ์ก→ consumer ๊ฐ Queue ์ ๋ฉ์ธ์ง ์์
- → exchange ๊ฐ ํค์ ๋ง๊ฒ Queue ์ ๋ถ๋ฐฐ
- MQ ์๋ฒ์ Queue ๋ด์ฉ์ด ์๋ช ์ฃผ๊ธฐ๊ฐ ๊ฐ์ → ์ข ๋ฃ ์ ๋ชจ๋ ์ญ์
Kafka
- pub-sub ๊ธฐ๋ฐ์ ๋ฉ์ธ์ง ๋ฐํ/๊ตฌ๋ ์์คํ
- ํํฐ์ ์ ์ฌ๋ฌ๊ฐ ๋์ด์ ํ ํฝ์ ๋ฉ์ธ์ง ์ฐ๊ธฐ ๋ถํ๋ฅผ ํด์ํ ์ ์์. ( ๋ณ๋ ฌ ์ฒ๋ฆฌ )
- ๋ํ ์ด ๋๋ฌธ์ ๊ฐ ๋ฉ์ธ์ง์ ์ฒ๋ฆฌ ์์๋ฅผ ๋ณด์ฅํ์ง ๋ชปํจ. ( ๊ณ ์ key ๋ฅผ ๋ฃ์ด์ ๋์ผ ํํฐ์ ์ ์์๋๋ก ๋ค์ด๊ฐ๊ฒ๋ ํ ์ ์์ )
- ํํฐ์ ์ ๋๋ฆฐ ํ์ ์ค์ผ ์ ์์.
- ์ด์ ๋ฉ์ธ์ง๋ฅผ ๋ค์ ์ฝ์ด ์ฌ ์ ์์. ์ด ๊ฒฝ์ฐ, ์ค๋ณต ๋ฉ์ธ์ง ์ฒ๋ฆฌ ๋ฑ์ ๋ํด์๋ ๊ณ ๋ฏผํด์ผํจ.
3. ์นดํ์นด ๋น๋๊ธฐ ๋ฉ์ธ์ง ํต์ ์ ํตํ ์ฑ ์ ๋ถ๋ฆฌ
์ง๋์ฃผ์ ํด์น์ ๋ค๊ณ ์๊ฐํ๋ ๋ก์ง์ ๋ค์ํ๋ฒ ๋ถ๋ฌ์๋ณผ๊น์?
๊ฐ์ ๋ก์ง ์ต์ข ์
class OrderPaymentService {
@Transactional
public void pay() {
decreaseUserPoint(); // ์ ์ ํฌ์ธํธ ์ฐจ๊ฐ
updateOrderStatus(); // ์ฃผ๋ฌธ ์ํ ๋ณ๊ฒฝ
savePayment(); // ๊ฒฐ์ ์ ๋ณด ์ ์ฅ
eventPublisher.publish(new orderPaidEvent());
}
}
class OrderPaidEventListener {
@Async
@TransactionalEventListener(phase = AFTER_COMMIT)
public void sendOrderInfo() {
try {
dataPlatformMockApiClient.sendOrder();
} catch(Exception e) {
log.error("์ฃผ๋ฌธ์ ๋ณด์ ๋ฌ์ ์คํจํ์ด์~์");
}
}
}
- ์๋น์ค ์ฝ๋๋ ๊น๋ํด์ง๊ณ , ์ธ๋ถ ๋ก์ง์ด ์ด๋ค๊ฒ๋ค์ด ์๋์ง ๊ด์ฌ์ฌ๊ฐ ๋ถ๋ฆฌ๋จ
- ํธ๋์ญ์ ์ปค๋ฐ ์ดํ์ ๋น๋๊ธฐ์ ์ผ๋ก ์ํ๋๊ธฐ ๋๋ฌธ์ ์ธ๋ถ api์ ์ํฅ์ ์ ํํ๊ฒ ์ ๊ฑฐํจ
- ์ ํด์น์ ๋!!??
- ๊ทธ๋ฐ๋ฐ… ๋ฐ์ดํฐ ์์ง ํ๋ซํผ์ด ์ผ์์ ์ธ ์ฅ์ ๊ฐ ๋ฐ์ํด์ api๊ฐ ์คํจํ๋ค๋ฉด?
- ๋ฐ์ดํฐ ์์ง ํ๋ซํผ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ฌ ์ ๋ฌํด์ค์ผํ๋ ์ฑ ์์ ์ฐ๋ฆฌํํ ์๋ ์ต์ธํ ๋๋..
- ์ฌ์ ์ก ๋ก์ง์ ๋ ์ด๋๋ค ๋ง๋ค์ง…
๋ฐ์ดํฐ ์์ง ํ๋ซํผ์ ๋ฉ์ธ์ง์ ์์ ๋ถ๊ฐ ์ํฉ์ ์ฑ ์์ ์ฐ๋ฆฌ๊ฐ ๊ฐ์ง ์์ผ๋ ค๋ฉด?? ์นดํ์นด๋ก ํด๊ฒฐ์ด ๊ฐ๋ฅํ๋ค๊ณ ??
- ๋ฐ์ดํฐ ์์ง ํ๋ซํผ์ด ์ผ์์ ์ธ ์ฅ์ ์ํฉ์ผ ๋์๋ ์นดํ์นด์ ์ฃผ๋ฌธ์ ๋ณด๋ฅผ ์ ์์ ์ผ๋ก ์ ์ฌํด๋๋ค๋ฉด, ์ฅ์ ์ํฉ ํด์ ์ดํ ์ ์ฌ๋์๋ ์ฃผ๋ฌธ์ ๋ณด๋ฅผ “์์์” ์ปจ์ํด๊ฐ๋ฉด ๋์ง ์์๊น?]
๊ฐ์ ๋ก์ง ์ต์ข ์์ ์ต์ข ์
class OrderPaymentService {
@Transactional
public void pay() {
decreaseUserPoint(); // ์ ์ ํฌ์ธํธ ์ฐจ๊ฐ
updateOrderStatus(); // ์ฃผ๋ฌธ ์ํ ๋ณ๊ฒฝ
savePayment(); // ๊ฒฐ์ ์ ๋ณด ์ ์ฅ
eventPublisher.publish(new orderPaidEvent());
}
}
class OrderPaidEventListener {
@TransactionalEventListener(phase = AFTER_COMMIT)
public void sendOrderInfo() {
kafkaPublisher.publishOrderInfo();
}
}
- ์ฃผ๋ฌธ์ ๋ณด๋ฅผ ์นดํ์นด๋ก ์ ๋ฌํ๋ค๋ฉด ๋ฐ์ดํฐ ์์ง ํ๋ซํผ์์ ๋ฉ์ธ์ง๋ฅผ ์ ๋ฐ์๋ ๋ชป ๋ฐ๋ ๋ด ์ฑ ์์ ๋!!
- ๋น๋๊ธฐ ์ฒ๋ฆฌํ์ง ์๋์?
- ์น์๋น์ค์ API๋ ์๋ฒ์ ์์ฒด ๋ก์ง๊ณผ ๋ฐ์ดํฐ๋ฅผ ์ ์ฌํ๋ ๋น์ฉ์ด ํจ๊ป ํฌํจ๋์ด latency๊ฐ ์์ ์ ์ด์ง ์์ ์ ์์ผ๋, ์ผ๋ฐ์ ์ผ๋ก ์นดํ์นด์ ๋ฉ์ธ์ง๋ฅผ ๋ฐํํ ๋๋ ๋ณ๋ ๋ก์ง ์์ด ๋ฐํ๋ ๋ฉ์ธ์ง๋ฅผ ์ ์ฅ๋ง ํ๊ธฐ์ ๋ฐํ์ ๋ํ ๋น์ฉ์ด ๋ ์ ๋ค๊ณ ํ๋จ
- ๋น๋๊ธฐ๋ก ์ฒ๋ฆฌํ๊ธฐ ์ํด ๋ณ๋์ ์ค๋ ๋์์ ์ปจํ ์คํธ ์ค์์นญํ๋ ๋น์ฉ์ด ์คํ๋ ค ๋ ํด ์๋ ์์
- ์นดํ์นด์ ๋ฉ์์ง ๋ฐํ์ด ์คํจํ๋ค๋ฉด?
- … ์ง์ง ์ธ์ ๊น์ง ์คํจ ๊ณ ๋ฏผ ํด์ผํ๋…
- ์ผ๋ฐ์ ์ธ ์น ์๋น์ค๋ณด๋ค๋ ๊ณ ๊ฐ์ฉ์ฑ์ ํน์ฅ์ ์ ๊ฐ๋ ์นดํ์นด๊ฐ ๋ ์์ ์ ์ด์ง ์์๊น?
- ๊ทผ๋ฐ… ๊ทธ๋๋… ํธ๋์ญ์ ์ด ์ปค๋ฐ๋ ์ดํ์ ์นดํ์นด ๋ฉ์์ง ๋ฐํ์ ํ๋๋ฐ ์ด๊ฒ ์คํจํ๋ฉด…?
Transactional Messaging
- Eventual Consistency - ๊ฒฐ๊ณผ์ ์ผ๊ด์ฑ
- ์ธ์ ๊ฐ ๋ชจ๋ ์๋น์ค ๊ฐ์ ๋ฐ์ดํฐ ์ ํฉ์ฑ์ ๋ง๊ฒ ๋๋ค
- ๋ถ์ฐ ์์คํ / ๋น๋๊ธฐ ๋ฉ์ธ์ง ํ๊ฒฝ์์ ์ถ๊ตฌํ๋ ๋ฐฉํฅ
- ๋๋์ ํธ๋ํฝ์ ์์ฉํ ์ ์๋๋ก ์์ ์ ๋จ์ (Transaction) ๋ฅผ ์๊ฒ, ๋น๋๊ธฐ๋ก ๊ตฌ์ฑ
- ๋น์ฆ๋์ค ๋ก์ง ์ํ + ํ์ ์ด๋ฒคํธ ๋ฐํ์ ์์์ ์ผ๋ก ํจ๊ป ์ํํ๋ ๋ฐฉ์
- Transactional Messaging ์ ๋ํ์ ์ธ ํจํด
- (1) Transactional Outbox Pattern
- ๋๋ฉ์ธ ๋ก์ง์ด ์ฑ๊ณต์ ์ผ๋ก ์ํ๋์๋ค๋ฉด, ์ด์ ํด๋นํ๋ ์ด๋ฒคํธ ๋ฉ์ธ์ง๋ฅผ Outbox Table ์ด๋ผ๋ ๋ณ๋์ ํ ์ด๋ธ์ ์ ์ฅํ์ฌ ํจ๊ป Commit
- ๋์ผํ ํธ๋์ญ์ ๋ด์์ ์ด๋ฒคํธ ๋ฐํ์ ์ํ Outbox ๋ฐ์ดํฐ ์ ์ฌ๊น์ง ์งํํด ์ด๋ฒคํธ ๋ฐํ์ ๋ํด ๋ณด์ฅ
- ์ด๋ฒคํธ ๋ฐํ ์ํ ๋ํ Outbox ๋ฐ์ดํฐ์ ์กด์ฌํ๋ฏ๋ก, ๋ฐฐ์น ํ๋ก์ธ์ค ๋ฑ์ ์ด์ฉํด ๋ฏธ๋ฐํ๋ ๋ฐ์ดํฐ์ ๋ํ Fallback ์ฒ๋ฆฌ๊ฐ ์ฉ์ด
(2) Change Data Capture ( CDC )
- ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ๋ณ๊ฒฝ ์ฌํญ์ ๋ชจ๋ํฐ๋งํ๊ณ ( e.g. Debezium ) ๋ค๋ฅธ ์์คํ ์ผ๋ก ํด๋น ๋ณ๊ฒฝ์ฌํญ์ ๋ํ ๋ด์ฉ์ ์ ํํ๋ ์ํํธ์จ์ด ํ๋ก์ธ์ค
- DB ์์ ๋ฐ์ดํฐ๊ฐ ๋ณ๊ฒฝ๋๋ ๊ฒ์ ๊ฐ์งํ๊ณ , ํด๋น ๋ณ๊ฒฝ ๊ฑด์ ๊ดํ ์ด๋ฒคํธ๋ฅผ ๋ฐํ
Transactional Outbox Pattern
E-commerce ์ ์ฃผ๋ฌธ๊ณผ ์ฌ๊ณ ์ฒ๋ฆฌ์ ๋ํด ๊ณ ๋ คํด๋ณด์.
์๋์ ์ผ๋ก Lock ์ ์ด์ฉํ ์ ํฉ์ฑ ์ ์ด ๋ฑ ๋๋์ ํธ๋ํฝ์ด ๋ชฐ๋ ค๋ค์์ ๋์ ์ ์ด๊ฐ ํ๋ ์ฌ๊ณ ์ฒ๋ฆฌ ๋ก์ง์ ์ฃผ๋ฌธ ๋ก์ง๊ณผ ๋ถ๋ฆฌํด๋ผ ๋ฐฉ๋ฒ์ ๊ณ ๋ คํด๋ณด๊ณ , ๋ณธ์ธ์ ๊ด์ฌ์ฌ๋ฅผ ์จ์ ํ ์ฒ๋ฆฌํ ์ ์๋๋ก ์์คํ ์ ์ค๊ณํด๋ณด๋ฉด ์๋์ ๊ฐ์ด ์ค๊ณํด ๋ณผ ์ ์๋ค.
[์ฃผ๋ฌธ ์๋น์ค]
// ์ฃผ๋ฌธ ์ฒ๋ฆฌ
Tx 1 {
์ฃผ๋ฌธ ์์ฑ ( Order.Status = INIT(์ฒ๋ฆฌ์ค) )
์์๋ฐ์ค ์์ฑ ( Outbox.Status = INIT )
์ด๋ฒคํธ ๋ฐํ ( Application Event Publish )
}
// ์ฃผ๋ฌธ ์ด๋ฒคํธ ๋ฐํ
Application Event Listener {
์์๋ฐ์ค ๋ฐ์ดํฐ Kafka ์ ์ ์ก
}
[Consumer 1 : ์์๋ฐ์ค ์๋น์ค]
// ์์๋ฐ์ค ์ฒ๋ฆฌ
// ์ฑ๊ณต์ ์ผ๋ก Kafka ์ ์ด๋ฒคํธ๊ฐ ๋ฐํ๋์์์ ๋ณด์ฅ
Tx 1-1 {
์์๋ฐ์ค Update ( Outbox.Status = PUBLISHED )
}
[์ฌ๊ณ ์๋น์ค]
// ์ฌ๊ณ ์ฒ๋ฆฌ
// Kafka ์ ์ด๋ฒคํธ๋ฅผ ์ฑ๊ณต์ ์ผ๋ก ์์ทจํด ๋น์ฆ๋์ค ๋ก์ง์ ์ํํ์์ ๋ณด์ฅ
Tx 2 {
์ฒ๋ฆฌ์ ๋ณด ์์ฑ ( OrderId ์ ๋ํ Processed ์ ๋ณด ์์ฑ Processed.Status = RECEIVED )
์ํ ์ฌ๊ณ Update
์๋ฃ ์ด๋ฒคํธ ๋ฐํ
}
Tx 2-1 {
์ฒ๋ฆฌ์ ๋ณด Update ( Processed.Status = SUCCESS )
}
์ด๋ฒคํธ ๋ฐํ ๋ฐ ์ฒ๋ฆฌ๋ฅผ ๋ณด์ฅํ๊ธฐ ์ํ ์ค๊ณ
- Publisher
- ๋น์ฆ๋์ค ๋ก์ง ์ํ
- Outbox ์ ์ฅ
- Consumer 1
- Outbox ๋ณ๊ฒฝ
- Consumer 2
- ๋น์ฆ๋์ค ๋ก์ง ์ํ
- Processed ์ ์ฅ
๊ฐ์ ๋ก์ง ์ต์ข ์์ ์ต์ข ์์ ์ต์ข ์
class OrderPaymentService {
@Transactional
public void pay() {
decreaseUserPoint(); // ์ ์ ํฌ์ธํธ ์ฐจ๊ฐ
updateOrderStatus(); // ์ฃผ๋ฌธ ์ํ ๋ณ๊ฒฝ
savePayment(); // ๊ฒฐ์ ์ ๋ณด ์ ์ฅ
orderCreatedEventRepository.save(orderCreatedEvent) // outbox ์ ์ฅ
eventPublisher.publish(new orderPaidEvent());
}
}
class OrderPaidEventListener {
@TransactionalEventListener(phase = AFTER_COMMIT)
public void sendOrderInfo() {
kafkaPublisher.publishOrderInfo();
}
}
class KafkaConsumer(orderCreatedMessage) {
orderCreatedEventRepository.findById(message.id).published() // outbox ์์
}
// ๋ฐฐ์นํ๋ก์ธ์ค ๋ฑ์ผ๋ก ์ฒ๋ฆฌํ๋ฉด ์ข๊ฒ ์ฃ .
@Scheduler(fixedRate = 5000)
orderCreatedEventRepository.findAllByStatus(INIT).forEach {
if(it.createdAt < now() - 5minutes) {
slackApiClient.sendMessageToDeveloper();
kafkaPublisher.publishOrderInfo();
}
}