Untitled
unknown
plain_text
4 years ago
1.2 kB
3
Indexable
import threading import asyncio from typing import List, Coroutine class AsyncLoopThread(threading.Thread): """ Class for running async routines in a separate thread, in order not to block the main thread """ def __init__( self, coroutines: List[Coroutine], lock: threading.Lock, loop: asyncio.AbstractEventLoop = None, ) -> None: """ :param coroutines: list - list of coroutines to run :param lock: threading.Lock - lock for thread synchronization and queueing :param loop: asyncio.EventLoop - current event loop """ print("Init thread") super().__init__() self.loop = asyncio.new_event_loop() if not loop else loop asyncio.set_event_loop(self.loop) self.coroutines = coroutines self.lock = lock def run(self) -> None: """""" # CODE REVIEW: # lack of docstring # I wonder if line 35 should not be changed with line 36 # but it's probably fine ¯\_(ツ)_/¯ for routine in self.coroutines: with self.lock: print("Acquired lock") self.loop.run_until_complete(routine)
Editor is loading...