pzdc
unknown
python
3 years ago
7.0 kB
13
Indexable
import asyncio
import json
from asyncio import AbstractEventLoop, sleep, gather, CancelledError, new_event_loop
from dataclasses import dataclass
from enum import Enum
from time import monotonic
from typing import Any, List, Optional, Tuple, SupportsInt
import aiohttp
import logging
from confluent_kafka.cimpl import Producer
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("Producer")
class HttpMethods(Enum):
"""
Класс перечисления. Автоматически инициализируется.
"""
http_none = 'none'
http_get = 'get'
http_post = 'post'
@dataclass
class UrlItem:
"""
dataclass - класс для хранения данных (появился в python 3.6).
Храним поля, типы и значения.
"""
url: str
data: Any = None
method: Enum = HttpMethods.http_get
headers: Any = None
pars: Any = None
pars_json: Any = None
topic: str = None
sleep_time: SupportsInt = None
class Adapter:
"""
Констурктор класса инициализирует event_loop с типом AbstractEventLoop
"""
def __init__(self, event_loop: AbstractEventLoop) -> None:
super().__init__()
self.__event_loop = event_loop
self.producer = self.create_producer()
@staticmethod
def create_producer() -> Producer:
conf = {'bootstrap.servers': '127.0.0.1:9092, 127.0.0.1:9093, 127.0.0.1:9094'}
producer = Producer(conf)
producer.poll(0)
return producer
def delivery_report(self, errmsg, msg) -> None:
if errmsg is not None:
logger.info("Delivery failed for Message: {} : {}".format(msg.key(), errmsg))
logger.info(' Message: {} successfully produced to Topic: {} Partition: [{}] at offset {}'.format(
msg.key(), msg.topic(), msg.partition(), msg.offset()))
async def send_message_to_broker(self, data: bytes, topic: str) -> None:
"""
Отправляем данные в Тему.
При этом указываем ключ по которому будем разделять информацию между сервисами.
"""
try:
logger.info('lol1231231')
self.producer.produce(topic=topic,
key='Key_for_service_gateway_TRACK',
value=data,
on_delivery=self.delivery_report)
self.producer.flush()
except Exception as ex:
logger.info("Exception happened :", ex)
async def do_request(self, request_url: str, *,
data: Any = None,
method: Enum = HttpMethods.http_none,
**request_kwargs) -> Tuple[Optional[int], Any]:
"""
до * - передача аргументов без имени. После * обязан передать имена.
**request_kwargs - передаем остальные параметры.
None, None - возвращается кортеж (Tuple).
Optional[int] - возвращает либо int либо None.
"""
async with aiohttp.ClientSession(loop=self.__event_loop) as cl:
if method is HttpMethods.http_get:
async with cl.get(request_url, **request_kwargs) as response:
return response.status, await response.json()
elif method is HttpMethods.http_post:
async with cl.post(request_url, data=data, **request_kwargs) as response:
return response.status, await response.json()
else:
return None, None
async def get_response(self, url: UrlItem) -> None:
if isinstance(url, UrlItem):
status, resp = await self.do_request(url.url,
data=url.data,
method=url.method,
headers=url.headers,
params=url.pars,
json=url.pars_json)
if resp:
#print(f'Status ({status}), json: {resp}')
logger.info(f'Жду {url.sleep_time} секунд')
########################################################################################
#НЕ ПЕРЕХОДИТ В МЕТОД send_message_to_broker !!!!!!! :(((((((((((((((((((
await self.__event_loop.run_until_complete(self.send_message_to_broker(json.loads(resp), url.topic))
await sleep(url.sleep_time)
########################################################################################
else:
logger.info('Undefined response!')
async def start_tasts(self, urls: List[UrlItem]) -> None:
"""
isinstance - проверяет что объект url является производным от UrlItem.
get_running_loop() - получает цикл событий из контекста корутин.
gather - ждет результата от всех тасков (в том числе exceptions)
await sleep - прерывания для того, чтобы переключить управление на другую корутину
"""
while True:
try:
tasks = [self.__event_loop.create_task(self.get_response(url))
for url in urls]
results = await gather(*tasks, return_exceptions=True)
await sleep(0)
# pprint(results)
except CancelledError:
logger.info(f'Выход из основной функции "{self.start_tasks.__name__}" по CancelledError!')
if __name__ == '__main__':
url_get = 'https://postman-echo.com/get?foo1=bar1&foo2=bar2'
url_post = 'https://postman-echo.com/post'
header = {
'Accepts': 'application/json',
'X-CMC_PRO_API_KEY': '54e5r6jj',
}
body = {'key1': 'value1', 'key2': 'value2'}
params = {'key1': 'value1', 'key2': 'value2'}
'''
Создаем объекты запросов
'''
request_list = [
UrlItem(url_get, sleep_time=4, topic='topic-1'),
UrlItem(url_post, method=HttpMethods.http_post, headers=header, sleep_time=4, topic='topic-2')
]
'''
new_event_loop - создает новый цикл событий корутин.
run_until_complete - запускает корутины. Точка входа start_requests.
'''
t_start = monotonic()
loop = new_event_loop()
adapter = Adapter(loop)
loop.run_until_complete(adapter.start_tasts(request_list))
t_stop = monotonic() - t_start
logger.info(f'Time: {t_stop} secs')Editor is loading...