Untitled
unknown
plain_text
5 years ago
1.2 kB
4
Indexable
import threading
import asyncio
from typing import List, Coroutine
class AsyncLoopThread(threading.Thread):
"""
Class for running async routines in a separate thread, in order not to block the main thread
"""
def __init__(
self,
coroutines: List[Coroutine],
lock: threading.Lock,
loop: asyncio.AbstractEventLoop = None,
) -> None:
"""
:param coroutines: list - list of coroutines to run
:param lock: threading.Lock - lock for thread synchronization and queueing
:param loop: asyncio.EventLoop - current event loop
"""
print("Init thread")
super().__init__()
self.loop = asyncio.new_event_loop() if not loop else loop
asyncio.set_event_loop(self.loop)
self.coroutines = coroutines
self.lock = lock
def run(self) -> None:
""""""
# CODE REVIEW:
# lack of docstring
# I wonder if line 35 should not be changed with line 36
# but it's probably fine ¯\_(ツ)_/¯
for routine in self.coroutines:
with self.lock:
print("Acquired lock")
self.loop.run_until_complete(routine)
Editor is loading...