Задание 5

Спринт 9/12 → Тема 5/8: → Урок 6/6 Задание 5
 avatar
unknown
python
a year ago
3.5 kB
20
Indexable
import time, json
from datetime import datetime
from logging import Logger
from lib.kafka_connect import KafkaConsumer, KafkaProducer
from lib.redis import RedisClient
from stg_loader.repository.stg_repository import StgRepository


class StgMessageProcessor:
    def __init__(self,
                 consumer: KafkaConsumer,
                 producer: KafkaProducer,
                 redis_client: RedisClient,
                 stg_repository: StgRepository,
                 batch_size: int,
                 logger: Logger) -> None:
        self._consumer = consumer
        self._producer = producer
        self._redis = redis_client
        self._stg_repository = stg_repository
        self._batch_size = batch_size
        self._logger = logger

    # функция, которая будет вызываться по расписанию.
    def run(self) -> None:
        # Пишем в лог, что джоб был запущен.
        self._logger.info(f"{datetime.utcnow()}: START")

        for i in range (self._batch_size):
            msg = self._consumer.consume()
            if not msg:
                break
            self._stg_repository.order_events_insert(msg['object_id'], msg['object_type'], msg['sent_dttm'], json.dumps(msg['payload']))
            
            payload = msg['payload']
            products = payload['order_items']
            
            # diggin redis
            user_name = (self._redis.get(payload['user']['id']))['name']
            rest_msg = self._redis.get(payload['restaurant']['id'])
            
            menu = rest_msg['menu']

            # start to collect out_msg
            out_msg = """
{
    "object_id": %s,
    "object_type": "%s",
    "payload": {
        "id": %s,
        "date": "%s",
        "cost": %s,
        "payment": %s,
        "status": "%s",
        "restaurant": {
            "id": "%s",
            "name": "%s"
        },
        "user": {
            "id": "%s",
            "name": "%s"
        },
        "products": [""" % (
                msg['object_id'],
                msg['object_type'],
                msg['object_id'],
                payload['date'],
                payload['cost'],
                payload['payment'],
                payload['final_status'],
                payload['restaurant']['id'],
                rest_msg['name'],
                payload['user']['id'],
                user_name
            )

            # add products info
            for prdct in products:
                out_msg +="""
            {
                "id": "%s",
                "price": %s,
                "quantity": %s,
                "name": "%s",""" % (
                    prdct['id'],
                    prdct['price'],
                    prdct['quantity'],
                    prdct['name']
                )
            for items in menu:
                category = items['category']
                if items['_id'] == prdct['id'] :
                    break
                out_msg +="""
                "category": "%s"
            },""" % (
                category
                )

            # finish out_msg
            out_msg = out_msg[:-1]
            out_msg += """
        ]
    }
} 
"""         
            self._producer.produce(json.loads(out_msg))

        # Пишем в лог, что джоб успешно завершен.
        self._logger.info(f"{datetime.utcnow()}: FINISH")
Editor is loading...
Leave a Comment