Задание 5
Спринт 9/12 → Тема 5/8: → Урок 6/6 Задание 5unknown
python
a year ago
3.5 kB
20
Indexable
import time, json from datetime import datetime from logging import Logger from lib.kafka_connect import KafkaConsumer, KafkaProducer from lib.redis import RedisClient from stg_loader.repository.stg_repository import StgRepository class StgMessageProcessor: def __init__(self, consumer: KafkaConsumer, producer: KafkaProducer, redis_client: RedisClient, stg_repository: StgRepository, batch_size: int, logger: Logger) -> None: self._consumer = consumer self._producer = producer self._redis = redis_client self._stg_repository = stg_repository self._batch_size = batch_size self._logger = logger # функция, которая будет вызываться по расписанию. def run(self) -> None: # Пишем в лог, что джоб был запущен. self._logger.info(f"{datetime.utcnow()}: START") for i in range (self._batch_size): msg = self._consumer.consume() if not msg: break self._stg_repository.order_events_insert(msg['object_id'], msg['object_type'], msg['sent_dttm'], json.dumps(msg['payload'])) payload = msg['payload'] products = payload['order_items'] # diggin redis user_name = (self._redis.get(payload['user']['id']))['name'] rest_msg = self._redis.get(payload['restaurant']['id']) menu = rest_msg['menu'] # start to collect out_msg out_msg = """ { "object_id": %s, "object_type": "%s", "payload": { "id": %s, "date": "%s", "cost": %s, "payment": %s, "status": "%s", "restaurant": { "id": "%s", "name": "%s" }, "user": { "id": "%s", "name": "%s" }, "products": [""" % ( msg['object_id'], msg['object_type'], msg['object_id'], payload['date'], payload['cost'], payload['payment'], payload['final_status'], payload['restaurant']['id'], rest_msg['name'], payload['user']['id'], user_name ) # add products info for prdct in products: out_msg +=""" { "id": "%s", "price": %s, "quantity": %s, "name": "%s",""" % ( prdct['id'], prdct['price'], prdct['quantity'], prdct['name'] ) for items in menu: category = items['category'] if items['_id'] == prdct['id'] : break out_msg +=""" "category": "%s" },""" % ( category ) # finish out_msg out_msg = out_msg[:-1] out_msg += """ ] } } """ self._producer.produce(json.loads(out_msg)) # Пишем в лог, что джоб успешно завершен. self._logger.info(f"{datetime.utcnow()}: FINISH")
Editor is loading...
Leave a Comment