Задание 5 (out_dict)
Спринт 9/12 → Тема 5/8: → Урок 6/6 Задание 5unknown
python
a year ago
3.1 kB
11
Indexable
import time, json from datetime import datetime from logging import Logger from lib.kafka_connect import KafkaConsumer, KafkaProducer from lib.redis import RedisClient from stg_loader.repository.stg_repository import StgRepository class StgMessageProcessor: def __init__(self, consumer: KafkaConsumer, producer: KafkaProducer, redis_client: RedisClient, stg_repository: StgRepository, batch_size: int, logger: Logger) -> None: self._consumer = consumer self._producer = producer self._redis = redis_client self._stg_repository = stg_repository self._batch_size = batch_size self._logger = logger # функция, которая будет вызываться по расписанию. def run(self) -> None: # Пишем в лог, что джоб был запущен. self._logger.info(f"{datetime.utcnow()}: START") for i in range (self._batch_size): msg = self._consumer.consume() assert type(msg) == dict if not msg: break self._stg_repository.order_events_insert(msg['object_id'], msg['object_type'], msg['sent_dttm'], json.dumps(msg['payload'])) payload = msg['payload'] products = payload['order_items'] # diggin redis user_name = (self._redis.get(payload['user']['id']))['name'] rest_msg = self._redis.get(payload['restaurant']['id']) menu = rest_msg['menu'] # start to collect out_msg out_msg = { "object_id": msg['object_id'], "object_type": msg['object_type'], "payload": { "id": msg['object_id'], "date": payload['date'], "cost": payload['cost'], "payment": payload['payment'], "status": payload['final_status'], "restaurant": { "id": payload['restaurant']['id'], "name": rest_msg['name'] }, "user": { "id": payload['user']['id'], "name": user_name }, } } prod_dict={} all_prods_list=[] for prdct in products: prod_dict.update({ "id": prdct['id'], "price": prdct['price'], "quantity": prdct['quantity'], "name": prdct['name'] }) for items in menu: category = items['category'] if items['_id'] == prdct['id'] : break prod_dict["category"] = category all_prods_list.append(prod_dict.copy()) out_msg['payload']['products'] = all_prods_list self._producer.produce(out_msg) # Пишем в лог, что джоб успешно завершен. self._logger.info(f"{datetime.utcnow()}: FINISH")
Editor is loading...
Leave a Comment