Untitled

 avatar
unknown
python
a year ago
4.4 kB
4
Indexable
from async_functions import *
import asyncio
from loguru import logger


def run_client_process(userbot_id, session_number, api_id, api_hash,
                       bot_token, task_queue, result_queue, task_states, num_workers=50):

    async def client_worker(client):
        while True:
            await asyncio.sleep(0.1)
            try:
                task = task_queue.get_nowait()  # Получаем задачу без ожидания
            except Exception:
                continue  # Если очередь пуста, выходим из цикла

            task_id = task['task_id']
            action = task['action']

            # Обновляем состояние задачи как "running"
            task_states[task_id] = 'running'
            logger.debug(task)

            try:
                if action == 'get_full_chat':
                    chat_id = task.get('chat_id')
                    result = await get_full_chat(client, chat_id)
                    result_queue.put((task_id, result, "function"))

                elif action == 'get_chat_members':
                    chat_id = task.get('chat_id')
                    query = task.get('query')
                    _, result = await get_chat_members(client, chat_id, query)
                    result_queue.put((task_id, result, "function"))

                elif action == 'join_chat':
                    link = task.get('link')
                    result = await join_chat(client, link)
                    result_queue.put((task_id, result, "function"))

                elif action == 'leave_chat':
                    chat_id = task.get('chat_id')
                    result = await join_chat(client, chat_id)
                    result_queue.put((task_id, result, "function"))

                elif action == 'promote_chat_member':
                    chat_id = task.get('chat_id')
                    user_id = task.get('user_id')
                    result = await promote_chat_member(client, chat_id, user_id)
                    result_queue.put((task_id, result, "function"))

                elif action == 'get_full_chat':
                    chat_id = task.get('chat_id')
                    result = await get_full_chat(client, chat_id)
                    result_queue.put((task_id, result, "function"))

                elif action == 'get_full_user':
                    user_id = task.get('user_id')
                    result = await get_full_user(client, user_id)
                    result_queue.put((task_id, result, "function"))

                elif action == 'get_users':
                    user_ids = task.get('user_ids')
                    result = await get_users(client, user_ids)
                    result_queue.put((task_id, result, "function"))

                elif action == 'get_messages':
                    chat_id = task.get('chat_id')
                    message_ids = task.get('message_ids')
                    result = await get_messages(client, chat_id, message_ids)
                    result_queue.put((task_id, result, "function"))

                elif action == 'get_chat':
                    chat_id = task.get('chat_id')
                    result = await get_chat(client, chat_id)
                    result_queue.put((task_id, result, "function"))

                elif action == 'get_chat_history':
                    chat_id = task.get('chat_id')
                    async for x in get_chat_history(client, chat_id):
                        result_queue.put((task_id, x, "generator"))

                elif action == 'get_chat_events':
                    chat_id = task.get('chat_id')
                    async for x in get_chat_events(client, chat_id):
                        result_queue.put((task_id, x, "generator"))

            except Exception as e:
                result_queue.put((task_id, e, "function"))
                logger.error(f"{task_id} - {e}")
            else:
                logger.debug(f"{task_id} - completed")
            finally:
                # Обновляем состояние задачи как "completed"
                task_states[task_id] = 'completed'

    async def main():
        client = await get_client(userbot_id, session_number, api_id, api_hash, bot_token)
        tasks = [asyncio.create_task(client_worker(client)) for _ in range(num_workers)]
        await asyncio.gather(*tasks)

    asyncio.run(main())
Editor is loading...
Leave a Comment