Untitled
unknown
python
9 months ago
3.8 kB
2
Indexable
import asyncio import uuid from concurrent.futures import ProcessPoolExecutor from multiprocessing import Manager from loguru import logger from client_worker import run_client_process class ClientManager: def __init__(self, clients): self.clients_info = clients self.task_queues = {} self.result_queues = {} self.client_locks = {} self.client_started = {} self.task_states = Manager().dict() self.executor = ProcessPoolExecutor() self.manager = Manager() for session_number in range(20): for userbot_id, api_id, api_hash, bot_token in clients: task_queue = asyncio.Queue() result_queue = asyncio.Queue() lock = asyncio.Lock() self.task_queues[(userbot_id, session_number)] = task_queue self.result_queues[(userbot_id, session_number)] = result_queue self.client_locks[(userbot_id, session_number)] = lock self.client_started[(userbot_id, session_number)] = False def handle_process_completion(self, future): result = future.result() logger.info(f"Process completed with result: {result}") async def send_task(self, userbot_id, session_number, task): await self.start_client(userbot_id, session_number) task_id = str(uuid.uuid4()) task.update({'task_id': task_id}) self.task_queues[(userbot_id, session_number)].put(task) self.task_states[task_id] = 'queued' return task_id async def start_client(self, userbot_id, session_number): lock = self.client_locks[(userbot_id, session_number)] async with lock: if not self.client_started[(userbot_id, session_number)]: api_id, api_hash, bot_token = self.get_client_info(userbot_id, session_number) future = self.executor.submit(run_client_process, userbot_id, session_number, api_id, api_hash, bot_token, self.task_queues[(userbot_id, session_number)], self.result_queues, self.task_states) future.add_done_callback(self.handle_process_completion) self.client_started[(userbot_id, session_number)] = True def get_client_info(self, userbot_id, session_number): for uid, api_id, api_hash, bot_token in self.clients_info: if uid == userbot_id: return api_id, api_hash, bot_token raise ValueError(f"No client info found for userbot_id {userbot_id} and session_number {session_number}") async def get_result(self, userbot_id, session_number, task_id): while True: results = [] # Обработка всех элементов в очереди while not self.result_queues[(userbot_id, session_number)].empty(): result_id, result, task_type = await self.result_queues[(userbot_id, session_number)].get() if result_id == task_id: if task_type == 'generator': results.append(result) else: return result else: # Возвращаем обратно в очередь, если это не тот task_id await self.result_queues[(userbot_id, session_number)].put((result_id, result, task_type)) # Для уменьшения нагрузки на CPU await asyncio.sleep(0.1) def is_running(self, task_id): return self.task_states.get(task_id) in ['queued', 'running'] def stop_clients(self): logger.info("Shutting down executor and closing manager.") self.executor.shutdown() self.manager.shutdown()
Editor is loading...
Leave a Comment