pzdc

mail@pastecode.io avatar
unknown
python
2 years ago
7.0 kB
6
Indexable
Never
import asyncio
import json
from asyncio import AbstractEventLoop, sleep, gather, CancelledError, new_event_loop
from dataclasses import dataclass
from enum import Enum
from time import monotonic
from typing import Any, List, Optional, Tuple, SupportsInt
import aiohttp
import logging

from confluent_kafka.cimpl import Producer


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("Producer")


class HttpMethods(Enum):
    """
    Класс перечисления. Автоматически инициализируется.
    """
    http_none = 'none'
    http_get = 'get'
    http_post = 'post'


@dataclass
class UrlItem:
    """
    dataclass - класс для хранения данных (появился в python 3.6).
    Храним поля, типы и значения.
    """
    url: str
    data: Any = None
    method: Enum = HttpMethods.http_get
    headers: Any = None
    pars: Any = None
    pars_json: Any = None
    topic: str = None
    sleep_time: SupportsInt = None


class Adapter:
    """
    Констурктор класса инициализирует event_loop с типом AbstractEventLoop
    """

    def __init__(self, event_loop: AbstractEventLoop) -> None:
        super().__init__()
        self.__event_loop = event_loop
        self.producer = self.create_producer()

    @staticmethod
    def create_producer() -> Producer:
        conf = {'bootstrap.servers': '127.0.0.1:9092, 127.0.0.1:9093, 127.0.0.1:9094'}
        producer = Producer(conf)
        producer.poll(0)
        return producer

    def delivery_report(self, errmsg, msg) -> None:
        if errmsg is not None:
            logger.info("Delivery failed for Message: {} : {}".format(msg.key(), errmsg))
        logger.info('  Message: {} successfully produced to Topic: {} Partition: [{}] at offset {}'.format(
            msg.key(), msg.topic(), msg.partition(), msg.offset()))

    async def send_message_to_broker(self, data: bytes, topic: str) -> None:
        """
            Отправляем данные в Тему.
            При этом указываем ключ по которому будем разделять информацию между сервисами.
        """

        try:
            logger.info('lol1231231')
            self.producer.produce(topic=topic,
                                  key='Key_for_service_gateway_TRACK',
                                  value=data,
                                  on_delivery=self.delivery_report)
            self.producer.flush()

        except Exception as ex:
            logger.info("Exception happened :", ex)


    async def do_request(self, request_url: str, *,
                         data: Any = None,
                         method: Enum = HttpMethods.http_none,
                         **request_kwargs) -> Tuple[Optional[int], Any]:
        """
        до * - передача аргументов без имени. После * обязан передать имена.
        **request_kwargs - передаем остальные параметры.
        None, None - возвращается кортеж (Tuple).
        Optional[int] - возвращает либо int либо None.
        """
        async with aiohttp.ClientSession(loop=self.__event_loop) as cl:
            if method is HttpMethods.http_get:
                async with cl.get(request_url, **request_kwargs) as response:
                    return response.status, await response.json()
            elif method is HttpMethods.http_post:
                async with cl.post(request_url, data=data, **request_kwargs) as response:
                    return response.status, await response.json()
            else:
                return None, None

    async def get_response(self, url: UrlItem) -> None:
        if isinstance(url, UrlItem):
            status, resp = await self.do_request(url.url,
                                                 data=url.data,
                                                 method=url.method,
                                                 headers=url.headers,
                                                 params=url.pars,
                                                 json=url.pars_json)
            if resp:
                #print(f'Status ({status}), json: {resp}')
                logger.info(f'Жду {url.sleep_time} секунд')


                ########################################################################################
                #НЕ ПЕРЕХОДИТ В МЕТОД send_message_to_broker !!!!!!! :(((((((((((((((((((

                await self.__event_loop.run_until_complete(self.send_message_to_broker(json.loads(resp), url.topic))
                await sleep(url.sleep_time)
                ########################################################################################


            else:
                logger.info('Undefined response!')

    async def start_tasts(self, urls: List[UrlItem]) -> None:
        """
        isinstance - проверяет что объект url является производным от UrlItem.
        get_running_loop() - получает цикл событий из контекста корутин.
        gather - ждет результата от всех тасков (в том числе exceptions)
        await sleep - прерывания для того, чтобы переключить управление на другую корутину
        """
        while True:
            try:
                tasks = [self.__event_loop.create_task(self.get_response(url))
                         for url in urls]

                results = await gather(*tasks, return_exceptions=True)
                await sleep(0)
                # pprint(results)
            except CancelledError:
                logger.info(f'Выход из основной функции "{self.start_tasks.__name__}" по CancelledError!')



if __name__ == '__main__':
    url_get = 'https://postman-echo.com/get?foo1=bar1&foo2=bar2'
    url_post = 'https://postman-echo.com/post'

    header = {
        'Accepts': 'application/json',
        'X-CMC_PRO_API_KEY': '54e5r6jj',
    }
    body = {'key1': 'value1', 'key2': 'value2'}
    params = {'key1': 'value1', 'key2': 'value2'}

    '''
    Создаем объекты запросов
    '''
    request_list = [
        UrlItem(url_get, sleep_time=4, topic='topic-1'),
        UrlItem(url_post, method=HttpMethods.http_post, headers=header, sleep_time=4, topic='topic-2')
    ]

    '''
    new_event_loop - создает новый цикл событий корутин.
    run_until_complete - запускает корутины. Точка входа start_requests.
    '''
    t_start = monotonic()

    loop = new_event_loop()
    adapter = Adapter(loop)


    loop.run_until_complete(adapter.start_tasts(request_list))

    t_stop = monotonic() - t_start
    logger.info(f'Time: {t_stop} secs')