Untitled
unknown
python
a year ago
3.8 kB
4
Indexable
import asyncio
import uuid
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager
from loguru import logger
from client_worker import run_client_process
class ClientManager:
def __init__(self, clients):
self.clients_info = clients
self.task_queues = {}
self.result_queues = {}
self.client_locks = {}
self.client_started = {}
self.task_states = Manager().dict()
self.executor = ProcessPoolExecutor()
self.manager = Manager()
for session_number in range(20):
for userbot_id, api_id, api_hash, bot_token in clients:
task_queue = asyncio.Queue()
result_queue = asyncio.Queue()
lock = asyncio.Lock()
self.task_queues[(userbot_id, session_number)] = task_queue
self.result_queues[(userbot_id, session_number)] = result_queue
self.client_locks[(userbot_id, session_number)] = lock
self.client_started[(userbot_id, session_number)] = False
def handle_process_completion(self, future):
result = future.result()
logger.info(f"Process completed with result: {result}")
async def send_task(self, userbot_id, session_number, task):
await self.start_client(userbot_id, session_number)
task_id = str(uuid.uuid4())
task.update({'task_id': task_id})
self.task_queues[(userbot_id, session_number)].put(task)
self.task_states[task_id] = 'queued'
return task_id
async def start_client(self, userbot_id, session_number):
lock = self.client_locks[(userbot_id, session_number)]
async with lock:
if not self.client_started[(userbot_id, session_number)]:
api_id, api_hash, bot_token = self.get_client_info(userbot_id, session_number)
future = self.executor.submit(run_client_process, userbot_id, session_number, api_id, api_hash,
bot_token, self.task_queues[(userbot_id, session_number)],
self.result_queues, self.task_states)
future.add_done_callback(self.handle_process_completion)
self.client_started[(userbot_id, session_number)] = True
def get_client_info(self, userbot_id, session_number):
for uid, api_id, api_hash, bot_token in self.clients_info:
if uid == userbot_id:
return api_id, api_hash, bot_token
raise ValueError(f"No client info found for userbot_id {userbot_id} and session_number {session_number}")
async def get_result(self, userbot_id, session_number, task_id):
while True:
results = []
# Обработка всех элементов в очереди
while not self.result_queues[(userbot_id, session_number)].empty():
result_id, result, task_type = await self.result_queues[(userbot_id, session_number)].get()
if result_id == task_id:
if task_type == 'generator':
results.append(result)
else:
return result
else:
# Возвращаем обратно в очередь, если это не тот task_id
await self.result_queues[(userbot_id, session_number)].put((result_id, result, task_type))
# Для уменьшения нагрузки на CPU
await asyncio.sleep(0.1)
def is_running(self, task_id):
return self.task_states.get(task_id) in ['queued', 'running']
def stop_clients(self):
logger.info("Shutting down executor and closing manager.")
self.executor.shutdown()
self.manager.shutdown()
Editor is loading...
Leave a Comment