Untitled

 avatar
unknown
python
9 months ago
3.8 kB
2
Indexable
import asyncio
import uuid
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager
from loguru import logger
from client_worker import run_client_process


class ClientManager:
    def __init__(self, clients):
        self.clients_info = clients
        self.task_queues = {}
        self.result_queues = {}
        self.client_locks = {}
        self.client_started = {}
        self.task_states = Manager().dict()
        self.executor = ProcessPoolExecutor()
        self.manager = Manager()

        for session_number in range(20):
            for userbot_id, api_id, api_hash, bot_token in clients:
                task_queue = asyncio.Queue()
                result_queue = asyncio.Queue()
                lock = asyncio.Lock()

                self.task_queues[(userbot_id, session_number)] = task_queue
                self.result_queues[(userbot_id, session_number)] = result_queue
                self.client_locks[(userbot_id, session_number)] = lock
                self.client_started[(userbot_id, session_number)] = False

    def handle_process_completion(self, future):
        result = future.result()
        logger.info(f"Process completed with result: {result}")

    async def send_task(self, userbot_id, session_number, task):
        await self.start_client(userbot_id, session_number)

        task_id = str(uuid.uuid4())
        task.update({'task_id': task_id})
        self.task_queues[(userbot_id, session_number)].put(task)
        self.task_states[task_id] = 'queued'
        return task_id

    async def start_client(self, userbot_id, session_number):
        lock = self.client_locks[(userbot_id, session_number)]
        async with lock:
            if not self.client_started[(userbot_id, session_number)]:
                api_id, api_hash, bot_token = self.get_client_info(userbot_id, session_number)
                future = self.executor.submit(run_client_process, userbot_id, session_number, api_id, api_hash,
                                              bot_token, self.task_queues[(userbot_id, session_number)],
                                              self.result_queues, self.task_states)
                future.add_done_callback(self.handle_process_completion)
                self.client_started[(userbot_id, session_number)] = True

    def get_client_info(self, userbot_id, session_number):
        for uid, api_id, api_hash, bot_token in self.clients_info:
            if uid == userbot_id:
                return api_id, api_hash, bot_token
        raise ValueError(f"No client info found for userbot_id {userbot_id} and session_number {session_number}")

    async def get_result(self, userbot_id, session_number, task_id):
        while True:
            results = []
            # Обработка всех элементов в очереди
            while not self.result_queues[(userbot_id, session_number)].empty():
                result_id, result, task_type = await self.result_queues[(userbot_id, session_number)].get()

                if result_id == task_id:
                    if task_type == 'generator':
                        results.append(result)
                    else:
                        return result
                else:
                    # Возвращаем обратно в очередь, если это не тот task_id
                    await self.result_queues[(userbot_id, session_number)].put((result_id, result, task_type))

            # Для уменьшения нагрузки на CPU
            await asyncio.sleep(0.1)

    def is_running(self, task_id):
        return self.task_states.get(task_id) in ['queued', 'running']

    def stop_clients(self):
        logger.info("Shutting down executor and closing manager.")
        self.executor.shutdown()
        self.manager.shutdown()
Editor is loading...
Leave a Comment