Задание 5 (out_dict)

Спринт 9/12 → Тема 5/8: → Урок 6/6 Задание 5
 avatar
unknown
python
a year ago
3.1 kB
11
Indexable
import time, json
from datetime import datetime
from logging import Logger
from lib.kafka_connect import KafkaConsumer, KafkaProducer
from lib.redis import RedisClient
from stg_loader.repository.stg_repository import StgRepository


class StgMessageProcessor:
    def __init__(self,
                 consumer: KafkaConsumer,
                 producer: KafkaProducer,
                 redis_client: RedisClient,
                 stg_repository: StgRepository,
                 batch_size: int,
                 logger: Logger) -> None:
        self._consumer = consumer
        self._producer = producer
        self._redis = redis_client
        self._stg_repository = stg_repository
        self._batch_size = batch_size
        self._logger = logger

    # функция, которая будет вызываться по расписанию.
    def run(self) -> None:
        # Пишем в лог, что джоб был запущен.
        self._logger.info(f"{datetime.utcnow()}: START")

        for i in range (self._batch_size):
            msg = self._consumer.consume()
            assert type(msg) == dict
            if not msg:
                break
            self._stg_repository.order_events_insert(msg['object_id'], msg['object_type'], msg['sent_dttm'], json.dumps(msg['payload']))
            
            payload = msg['payload']
            products = payload['order_items']
            
            # diggin redis
            user_name = (self._redis.get(payload['user']['id']))['name']
            rest_msg = self._redis.get(payload['restaurant']['id'])
            
            menu = rest_msg['menu']

            # start to collect out_msg
            out_msg = {
"object_id": msg['object_id'],
"object_type": msg['object_type'],
"payload": {
    "id": msg['object_id'],
    "date": payload['date'],
    "cost": payload['cost'],
    "payment": payload['payment'],
    "status": payload['final_status'],
    "restaurant": {
        "id": payload['restaurant']['id'],
        "name": rest_msg['name']
    },
    "user": {
        "id": payload['user']['id'],
        "name": user_name
    },
    }
}
            prod_dict={}
            all_prods_list=[]

            for prdct in products:
                prod_dict.update({ 
                    "id": prdct['id'],
                    "price": prdct['price'],
                    "quantity": prdct['quantity'],
                    "name": prdct['name']
                })
                for items in menu:
                    category = items['category']
                    if items['_id'] == prdct['id'] :
                        break
                    prod_dict["category"] = category
                all_prods_list.append(prod_dict.copy())

            out_msg['payload']['products'] = all_prods_list
      
            self._producer.produce(out_msg)

        # Пишем в лог, что джоб успешно завершен.
        self._logger.info(f"{datetime.utcnow()}: FINISH")
Editor is loading...
Leave a Comment