Untitled
unknown
python
2 years ago
1.1 kB
6
Indexable
import asyncio
class RateLimitedAPIClient:
def __init__(self, *, rate_limit: int, per_second: int):
self.rate_limit = rate_limit
self.per_second = per_second
self.tasks_queue = asyncio.Queue()
self.semaphore = asyncio.Semaphore(rate_limit)
self.loop = None
async def add_request_task(self, coro):
print('add_request_task')
await self.tasks_queue.put(coro)
async def _worker(self):
while True:
to_execute = []
for _ in range(self.rate_limit):
if self.tasks_queue.empty():
break
request_task = await self.tasks_queue.get()
to_execute.append(request_task)
if to_execute:
await asyncio.gather(*[self.exec_with_limit(task) for task in to_execute])
await asyncio.sleep(self.per_second)
async def exec_with_limit(self, coro):
async with self.semaphore:
await coro
def run(self):
self.loop = asyncio.get_event_loop()
self.loop.create_task(self._worker())
Editor is loading...
Leave a Comment