pzdc
unknown
python
2 years ago
7.0 kB
6
Indexable
Never
import asyncio import json from asyncio import AbstractEventLoop, sleep, gather, CancelledError, new_event_loop from dataclasses import dataclass from enum import Enum from time import monotonic from typing import Any, List, Optional, Tuple, SupportsInt import aiohttp import logging from confluent_kafka.cimpl import Producer logging.basicConfig(level=logging.INFO) logger = logging.getLogger("Producer") class HttpMethods(Enum): """ Класс перечисления. Автоматически инициализируется. """ http_none = 'none' http_get = 'get' http_post = 'post' @dataclass class UrlItem: """ dataclass - класс для хранения данных (появился в python 3.6). Храним поля, типы и значения. """ url: str data: Any = None method: Enum = HttpMethods.http_get headers: Any = None pars: Any = None pars_json: Any = None topic: str = None sleep_time: SupportsInt = None class Adapter: """ Констурктор класса инициализирует event_loop с типом AbstractEventLoop """ def __init__(self, event_loop: AbstractEventLoop) -> None: super().__init__() self.__event_loop = event_loop self.producer = self.create_producer() @staticmethod def create_producer() -> Producer: conf = {'bootstrap.servers': '127.0.0.1:9092, 127.0.0.1:9093, 127.0.0.1:9094'} producer = Producer(conf) producer.poll(0) return producer def delivery_report(self, errmsg, msg) -> None: if errmsg is not None: logger.info("Delivery failed for Message: {} : {}".format(msg.key(), errmsg)) logger.info(' Message: {} successfully produced to Topic: {} Partition: [{}] at offset {}'.format( msg.key(), msg.topic(), msg.partition(), msg.offset())) async def send_message_to_broker(self, data: bytes, topic: str) -> None: """ Отправляем данные в Тему. При этом указываем ключ по которому будем разделять информацию между сервисами. """ try: logger.info('lol1231231') self.producer.produce(topic=topic, key='Key_for_service_gateway_TRACK', value=data, on_delivery=self.delivery_report) self.producer.flush() except Exception as ex: logger.info("Exception happened :", ex) async def do_request(self, request_url: str, *, data: Any = None, method: Enum = HttpMethods.http_none, **request_kwargs) -> Tuple[Optional[int], Any]: """ до * - передача аргументов без имени. После * обязан передать имена. **request_kwargs - передаем остальные параметры. None, None - возвращается кортеж (Tuple). Optional[int] - возвращает либо int либо None. """ async with aiohttp.ClientSession(loop=self.__event_loop) as cl: if method is HttpMethods.http_get: async with cl.get(request_url, **request_kwargs) as response: return response.status, await response.json() elif method is HttpMethods.http_post: async with cl.post(request_url, data=data, **request_kwargs) as response: return response.status, await response.json() else: return None, None async def get_response(self, url: UrlItem) -> None: if isinstance(url, UrlItem): status, resp = await self.do_request(url.url, data=url.data, method=url.method, headers=url.headers, params=url.pars, json=url.pars_json) if resp: #print(f'Status ({status}), json: {resp}') logger.info(f'Жду {url.sleep_time} секунд') ######################################################################################## #НЕ ПЕРЕХОДИТ В МЕТОД send_message_to_broker !!!!!!! :((((((((((((((((((( await self.__event_loop.run_until_complete(self.send_message_to_broker(json.loads(resp), url.topic)) await sleep(url.sleep_time) ######################################################################################## else: logger.info('Undefined response!') async def start_tasts(self, urls: List[UrlItem]) -> None: """ isinstance - проверяет что объект url является производным от UrlItem. get_running_loop() - получает цикл событий из контекста корутин. gather - ждет результата от всех тасков (в том числе exceptions) await sleep - прерывания для того, чтобы переключить управление на другую корутину """ while True: try: tasks = [self.__event_loop.create_task(self.get_response(url)) for url in urls] results = await gather(*tasks, return_exceptions=True) await sleep(0) # pprint(results) except CancelledError: logger.info(f'Выход из основной функции "{self.start_tasks.__name__}" по CancelledError!') if __name__ == '__main__': url_get = 'https://postman-echo.com/get?foo1=bar1&foo2=bar2' url_post = 'https://postman-echo.com/post' header = { 'Accepts': 'application/json', 'X-CMC_PRO_API_KEY': '54e5r6jj', } body = {'key1': 'value1', 'key2': 'value2'} params = {'key1': 'value1', 'key2': 'value2'} ''' Создаем объекты запросов ''' request_list = [ UrlItem(url_get, sleep_time=4, topic='topic-1'), UrlItem(url_post, method=HttpMethods.http_post, headers=header, sleep_time=4, topic='topic-2') ] ''' new_event_loop - создает новый цикл событий корутин. run_until_complete - запускает корутины. Точка входа start_requests. ''' t_start = monotonic() loop = new_event_loop() adapter = Adapter(loop) loop.run_until_complete(adapter.start_tasts(request_list)) t_stop = monotonic() - t_start logger.info(f'Time: {t_stop} secs')