Untitled

 avatar
unknown
plain_text
4 years ago
1.2 kB
3
Indexable
import threading
import asyncio
from typing import List, Coroutine


class AsyncLoopThread(threading.Thread):
    """
    Class for running async routines in a separate thread, in order not to block the main thread
    """

    def __init__(
        self,
        coroutines: List[Coroutine],
        lock: threading.Lock,
        loop: asyncio.AbstractEventLoop = None,
    ) -> None:
        """
        :param coroutines: list - list of coroutines to run
        :param lock: threading.Lock - lock for thread synchronization and queueing
        :param loop: asyncio.EventLoop - current event loop
        """
        print("Init thread")
        super().__init__()
        self.loop = asyncio.new_event_loop() if not loop else loop
        asyncio.set_event_loop(self.loop)

        self.coroutines = coroutines
        self.lock = lock

    def run(self) -> None:
        """"""
        # CODE REVIEW:
        # lack of docstring
        # I wonder if line 35 should not be changed with line 36
        # but it's probably fine ¯\_(ツ)_/¯
        for routine in self.coroutines:
            with self.lock:
                print("Acquired lock")
                self.loop.run_until_complete(routine)
Editor is loading...