Untitled

 avatar
unknown
python
a year ago
1.1 kB
4
Indexable
import asyncio

class RateLimitedAPIClient:
    def __init__(self, *, rate_limit: int, per_second: int):
        self.rate_limit = rate_limit
        self.per_second = per_second

        self.tasks_queue = asyncio.Queue()
        self.semaphore = asyncio.Semaphore(rate_limit)

        self.loop = None

    async def add_request_task(self, coro):
        print('add_request_task')
        await self.tasks_queue.put(coro)

    async def _worker(self):
        while True:
            to_execute = []
            for _ in range(self.rate_limit):
                if self.tasks_queue.empty():
                    break
                request_task = await self.tasks_queue.get()
                to_execute.append(request_task)

            if to_execute:
                await asyncio.gather(*[self.exec_with_limit(task) for task in to_execute])

            await asyncio.sleep(self.per_second)

    async def exec_with_limit(self, coro):
        async with self.semaphore:
            await coro

    def run(self):
        self.loop = asyncio.get_event_loop()
        self.loop.create_task(self._worker())
Editor is loading...
Leave a Comment