실습 코드
실습 코드
examples.txt
# mysql 설치
brew install mysql
# pip3 및 dependency 설치
curl <https://bootstrap.pypa.io/get-pip.py> -o get-pip.py
python3 get-pip.py
pip3 --version
pip3 install -r requirements.txt
# node js 설치 및 버전 확인
brew install node
npm --version
node --version
npm install mysql express
# /etc/hosts에 kafka1 추가
# 이벤트 채널 토픽 생성
./create_topics.py
# 인벤토리 테이블 생성
CREATE TABLE inventory (
id int(10) NOT NULL AUTO_INCREMENT PRIMARY KEY,
name char(30) NOT NULL,
price int(10) NOT NULL,
quantity int(10) NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE inventory_history (
id int(10) NOT NULL AUTO_INCREMENT PRIMARY KEY,
transaction_id char(36) NOT NULL, // 사용자가 정상적인 주문을 넣었을 때, 발급
inventory_id int(10) NOT NULL,
quantity int(10) NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
# 아이템 입력
INSERT INTO inventory(name, price, quantity) VALUES('cloth', 50000, 100);
INSERT INTO inventory(name, price, quantity) VALUES('cap', 30000, 100);
INSERT INTO inventory(name, price, quantity) VALUES('sunglasses', 25000, 100);
INSERT INTO inventory(name, price, quantity) VALUES('necklace', 150000, 100);
INSERT INTO inventory(name, price, quantity) VALUES('earring', 15000, 100);
# 주문 서비스 up
./order_service.py
# 인벤토리 서비스 up
node inventory_service.js
# 인벤토리 서비스 컨슈머 up
./inventory_consumer.py
# 결제 서비스 컨슈머 up
./payment_consumer.py
# 주문 서비스 컨슈머 up
./order_consumer.py
// JSON
# 주문 요청
curl -v -XPOST <http://localhost:8080/v1/order> -H'Content-Type: application/json' \\
--data-binary @- << EOF
{
"order": {
"user_id": "user",
"name": "Hong Gil Dong",
"addr": "29, Hansil-ro, Dalseo-gu, Daegu, Republic of Korea",
"tel": "01012341234",
"email": "hong@gil.com"
},
"inventory": {
"id": 2,
"quantity": 5
}
}
EOF
# 주문현황 조회
curl -v -XGET <http://localhost:8080/v1/order?transaction_id=${TRANSACTION_ID}&user_id=user>
create_topics.py
#!/usr/bin/env python3
from confluent_kafka.admin import AdminClient, NewTopic
conf = {'bootstrap.servers': 'kafka1:19092'}
admin = AdminClient(conf=conf)
admin.create_topics([NewTopic('order', 1, 1), NewTopic('inventory', 1, 1), NewTopic('payment', 1, 1)])
print(admin.list_topics().topics)
docker-compose-edm.yml
version: '3'
services:
zookeeper-1:
hostname: zookeeper1
image: confluentinc/cp-zookeeper:6.2.0
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 12181
ZOOKEEPER_DATA_DIR: /zookeeper/data
ports:
- 12181:12181
volumes:
- ./zookeeper/data/1:/zookeeper/data
kafka-1:
hostname: kafka1
image: confluentinc/cp-kafka:6.2.0
depends_on:
- zookeeper-1
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:12181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:19092
KAFKA_LOG_DIRS: /kafka
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- 19092:19092
volumes:
- ./kafka/logs/1:/kafka
// 주문 정보
localstack-1:
hostname: localstack1
image: localstack/localstack:latest
environment:
AWS_DEFAULT_REGION: us-east-2
EDGE_PORT: 4566
SERVICES: dynamodb
DATA_DIR: /tmp/localstack/dynamodb/data
ports:
- 4566:4566
volumes:
- ./localstack:/tmp/localstack
// 인벤토리 관련 정보
mysql-1:
hostname: mysql1
image: mysql/mysql-server:5.7
ports:
- 3306:3306
environment:
MYSQL_USER: root
MYSQL_ROOT_HOST: "%%"
MYSQL_DATABASE: inventory
MYSQL_ROOT_PASSWORD: inventorypw
command: mysqld
--server-id=1234
--max-binlog-size=4096
--binlog-format=ROW
--log-bin=bin-log
--sync-binlog=1
--binlog-rows-query-log-events=ON
volumes:
- ./mysql:/var/lib/mysql
order_service.py
#!/usr/bin/env python3
#-*- coding: utf-8 -*-
import uuid
import json
import logging
from datetime import datetime
from pynamodb.models import Model
from pynamodb.attributes import UnicodeAttribute, UTCDateTimeAttribute
// 웹 서버
from flask import Flask, request, jsonify
from confluent_kafka import Producer, Consumer
import sys
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
conf_producer = {'bootstrap.servers': 'kafka1:19092'}
conf_consumer = {'bootstrap.servers': 'kafka1:19092', 'group.id': 'cousumer_group', 'auto.offset.reset': 'earliest'}
producer = Producer(conf_producer)
consumer = Consumer(conf_consumer)
class OrderModel(Model):
class Meta:
table_name = 'order'
host = '<http://localhost:4566>'
region = 'us-east-2'
transaction_id = UnicodeAttribute(hash_key=True)
user_id = UnicodeAttribute(range_key=True)
name = UnicodeAttribute(null=True)
addr = UnicodeAttribute(null=True)
tel = UnicodeAttribute(null=True)
email = UnicodeAttribute(null=True)
status = UnicodeAttribute(null=True)
updated = UTCDateTimeAttribute()
if not OrderModel.exists():
OrderModel.create_table(read_capacity_units=1, write_capacity_units=1, wait=True)
app = Flask(__name__)
@app.route('/v1/order', methods=['GET'])
def show_transaction():
transaction_id = request.args.get('transaction_id')
user_id = request.args.get('user_id')
try:
order = OrderModel.get(transaction_id, user_id)
return jsonify(order.attribute_values)
except OrderModel.DoesNotExist:
return jsonify({'error': 'data not found'})
@app.route('/v1/order', methods=['POST'])
def create_order():
entire_req = json.loads(request.data)
req = entire_req['order']
// 트랜잭션 ID 발급
transaction_id = str(uuid.uuid1())
user_id = req['user_id']
name = req['name']
addr = req['addr']
tel = req['tel']
email = req['email']
// DB 저장
model = OrderModel(transaction_id, user_id)
model.updated = datetime.now()
model.status = 'ORDER_CREATED'
model.save()
entire_req['transaction_id'] = transaction_id
entire_req['status'] = model.status
// 프로듀싱
logging.debug("entire_req: %s" % entire_req)
producer.produce('inventory', value=json.dumps(entire_req))
producer.flush()
return jsonify({'status': model.status,
'user_id': user_id,
'transaction_id': transaction_id})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080, debug=True)
inventory_consumer.py
#!/usr/bin/env python3
#-*- coding: utf-8 -*-
import json
import logging
import MySQLdb
from confluent_kafka import Producer, Consumer
import sys
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
conf_producer = {'bootstrap.servers': 'kafka1:19092'}
conf_consumer = {'bootstrap.servers': 'kafka1:19092', 'group.id': 'inventory_consumer_group', 'auto.offset.reset': 'earliest'}
producer = Producer(conf_producer)
consumer = Consumer(conf_consumer)
inbound_event_channel = 'inventory'
outbound_success_event_channel = 'payment'
outbound_failure_event_channel = 'order'
// mysql
db = MySQLdb.connect('127.0.0.1','root','inventorypw', 'inventory')
def process_inventory_reserved(event, transaction_id, inventory_id, quantity):
logging.info('process_inventory_reserved')
cursor = db.cursor()
cursor.execute('UPDATE inventory SET quantity=quantity-%d WHERE id=%d' % (quantity, inventory_id))
cursor.execute('INSERT INTO inventory_history(transaction_id, inventory_id, quantity) VALUES(\\'%s\\', %d, %d)' % (transaction_id, inventory_id, -1 * quantity))
db.commit()
cursor.execute('SELECT price FROM inventory WHERE id=%d' % inventory_id)
record = cursor.fetchone()
cursor.close()
total_price = int(record[0]) * quantity
logging.debug('total_price: %d' % total_price)
event['payment'] = {'total_price': total_price}
event['status'] = 'INVENTORY_RESERVED'
producer.produce(outbound_success_event_channel, value=json.dumps(event))
def process_order_cancel(event, transaction_id, inventory_id, quantity):
logging.info('process_order_cancel')
cursor = db.cursor()
cursor.execute('UPDATE inventory SET quantity=quantity+%d WHERE id=%d' % (quantity, inventory_id))
cursor.execute('INSERT INTO inventory_history(transaction_id, inventory_id, quantity) VALUES(\\'%s\\', %d, %d)' % (transaction_id, inventory_id, quantity))
db.commit()
cursor.close()
event['status'] = 'ORDER_CANCEL'
producer.produce(outbound_failure_event_channel, value=json.dumps(event))
def main():
try:
// 구독 상태
consumer.subscribe([inbound_event_channel])
while True:
// poll
message = consumer.poll(timeout=3)
if message is None:
continue
event = message.value()
if event:
logging.debug(event)
evt = json.loads(event)
status = evt['status']
transaction_id = evt['transaction_id']
inventory_id = int(evt['inventory']['id'])
quantity = int(evt['inventory']['quantity'])
if status == 'ORDER_CREATED':
process_inventory_reserved(evt, transaction_id, inventory_id, quantity)
elif status == 'ORDER_CANCEL':
// roleback
process_order_cancel(evt, transaction_id, inventory_id, quantity)
else:
logging.error('unknown event')
else:
continue
finally:
consumer.close()
if __name__ == '__main__':
main()
inventory_service.js
const express = require("express");
const mysql = require('mysql');
const app = express();
var db = mysql.createConnection({
host: '127.0.0.1',
user: 'root',
password: 'inventorypw',
database: 'inventory'
});
db.connect(function(error) {
if (error) throw error;
console.log('Database Connected.');
});
app.get('/v1/inventory', (request, response) => {
db.query('SELECT * from inventory', (error, rows) => {
if(error) throw error;
db.end();
console.log('=== rows ===', rows);
response.send(rows);
});
});
app.get('/v1/inventory_history', (request, response) => {
db.query('SELECT * from inventory_history', (error, rows) => {
if(error) throw error;
db.end();
console.log('=== rows ===', rows);
response.send(rows);
});
});
app.listen(8090, () => {
console.log('8090 port listening...');
});
payment_consumer.py
#!/usr/bin/env python3
#-*- coding: utf-8 -*-
import sys
import json
import logging
from confluent_kafka import Producer, Consumer
import sys
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
def process_consumer(is_pg_api_call_succeeded):
conf_producer = {'bootstrap.servers': 'kafka1:19092'}
producer = Producer(conf_producer)
conf_consumer = {'bootstrap.servers': 'kafka1:19092', 'group.id': 'payment_consumer_group', 'auto.offset.reset': 'earliest'}
consumer = Consumer(conf_consumer)
inbound_event_channel = 'payment'
outbound_success_event_channel = 'order'
outbound_failure_event_channel = 'inventory'
try:
// 구독
consumer.subscribe([inbound_event_channel])
while True:
message = consumer.poll(timeout=3)
if message is None:
continue
event = message.value()
if event:
logging.debug(event)
evt = json.loads(event)
// 페이먼츠 성공여부
if is_pg_api_call_succeeded == 'True':
evt['status'] = 'PAYMENT_COMPLETE'
producer.produce(outbound_success_event_channel, value=json.dumps(evt))
else:
evt['status'] = 'ORDER_CANCEL'
producer.produce(outbound_failure_event_channel, value=json.dumps(evt))
else:
continue
finally:
consumer.close()
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Please put a required argument: is_pg_api_call_succeeded(True/False)")
sys.exit()
is_pg_api_call_succeeded = sys.argv[1]
process_consumer(is_pg_api_call_succeeded)
Last updated