Задание 5 (out_dict)
Спринт 9/12 → Тема 5/8: → Урок 6/6 Задание 5unknown
python
2 years ago
3.1 kB
15
Indexable
import time, json
from datetime import datetime
from logging import Logger
from lib.kafka_connect import KafkaConsumer, KafkaProducer
from lib.redis import RedisClient
from stg_loader.repository.stg_repository import StgRepository
class StgMessageProcessor:
def __init__(self,
consumer: KafkaConsumer,
producer: KafkaProducer,
redis_client: RedisClient,
stg_repository: StgRepository,
batch_size: int,
logger: Logger) -> None:
self._consumer = consumer
self._producer = producer
self._redis = redis_client
self._stg_repository = stg_repository
self._batch_size = batch_size
self._logger = logger
# функция, которая будет вызываться по расписанию.
def run(self) -> None:
# Пишем в лог, что джоб был запущен.
self._logger.info(f"{datetime.utcnow()}: START")
for i in range (self._batch_size):
msg = self._consumer.consume()
assert type(msg) == dict
if not msg:
break
self._stg_repository.order_events_insert(msg['object_id'], msg['object_type'], msg['sent_dttm'], json.dumps(msg['payload']))
payload = msg['payload']
products = payload['order_items']
# diggin redis
user_name = (self._redis.get(payload['user']['id']))['name']
rest_msg = self._redis.get(payload['restaurant']['id'])
menu = rest_msg['menu']
# start to collect out_msg
out_msg = {
"object_id": msg['object_id'],
"object_type": msg['object_type'],
"payload": {
"id": msg['object_id'],
"date": payload['date'],
"cost": payload['cost'],
"payment": payload['payment'],
"status": payload['final_status'],
"restaurant": {
"id": payload['restaurant']['id'],
"name": rest_msg['name']
},
"user": {
"id": payload['user']['id'],
"name": user_name
},
}
}
prod_dict={}
all_prods_list=[]
for prdct in products:
prod_dict.update({
"id": prdct['id'],
"price": prdct['price'],
"quantity": prdct['quantity'],
"name": prdct['name']
})
for items in menu:
category = items['category']
if items['_id'] == prdct['id'] :
break
prod_dict["category"] = category
all_prods_list.append(prod_dict.copy())
out_msg['payload']['products'] = all_prods_list
self._producer.produce(out_msg)
# Пишем в лог, что джоб успешно завершен.
self._logger.info(f"{datetime.utcnow()}: FINISH")
Editor is loading...
Leave a Comment