Задание 5
Спринт 9/12 → Тема 5/8: → Урок 6/6 Задание 5unknown
python
2 years ago
3.5 kB
25
Indexable
import time, json
from datetime import datetime
from logging import Logger
from lib.kafka_connect import KafkaConsumer, KafkaProducer
from lib.redis import RedisClient
from stg_loader.repository.stg_repository import StgRepository
class StgMessageProcessor:
def __init__(self,
consumer: KafkaConsumer,
producer: KafkaProducer,
redis_client: RedisClient,
stg_repository: StgRepository,
batch_size: int,
logger: Logger) -> None:
self._consumer = consumer
self._producer = producer
self._redis = redis_client
self._stg_repository = stg_repository
self._batch_size = batch_size
self._logger = logger
# функция, которая будет вызываться по расписанию.
def run(self) -> None:
# Пишем в лог, что джоб был запущен.
self._logger.info(f"{datetime.utcnow()}: START")
for i in range (self._batch_size):
msg = self._consumer.consume()
if not msg:
break
self._stg_repository.order_events_insert(msg['object_id'], msg['object_type'], msg['sent_dttm'], json.dumps(msg['payload']))
payload = msg['payload']
products = payload['order_items']
# diggin redis
user_name = (self._redis.get(payload['user']['id']))['name']
rest_msg = self._redis.get(payload['restaurant']['id'])
menu = rest_msg['menu']
# start to collect out_msg
out_msg = """
{
"object_id": %s,
"object_type": "%s",
"payload": {
"id": %s,
"date": "%s",
"cost": %s,
"payment": %s,
"status": "%s",
"restaurant": {
"id": "%s",
"name": "%s"
},
"user": {
"id": "%s",
"name": "%s"
},
"products": [""" % (
msg['object_id'],
msg['object_type'],
msg['object_id'],
payload['date'],
payload['cost'],
payload['payment'],
payload['final_status'],
payload['restaurant']['id'],
rest_msg['name'],
payload['user']['id'],
user_name
)
# add products info
for prdct in products:
out_msg +="""
{
"id": "%s",
"price": %s,
"quantity": %s,
"name": "%s",""" % (
prdct['id'],
prdct['price'],
prdct['quantity'],
prdct['name']
)
for items in menu:
category = items['category']
if items['_id'] == prdct['id'] :
break
out_msg +="""
"category": "%s"
},""" % (
category
)
# finish out_msg
out_msg = out_msg[:-1]
out_msg += """
]
}
}
"""
self._producer.produce(json.loads(out_msg))
# Пишем в лог, что джоб успешно завершен.
self._logger.info(f"{datetime.utcnow()}: FINISH")
Editor is loading...
Leave a Comment