Untitled
unknown
python
2 years ago
15 kB
11
Indexable
import time import json import websocket import contextlib from contextlib import suppress from threading import Thread from sys import _getframe as getframe from .lib.util.helpers import deviceId from .lib.util import objects, helpers class SocketHandler: def __init__(self, client, socket_trace = False, debug = True): self.socket_url = "wss://ws2.aminoapps.com" self.client = client self.debug = debug self.active = False self.headers = None self.socket = None self.reconnect = True self.socket_thread = None self.reconnectTime = 180 self.socket_thread = None if self.socket_enabled: self.reconnect_thread = Thread(target=self.reconnect_handler) self.reconnect_thread.start() websocket.enableTrace(socket_trace) def reconnect_handler(self): # Made by enchart#3410 thx # Fixed by The_Phoenix#3967 while True: time.sleep(self.reconnectTime) if self.active: if self.debug is True: print(f"[socket][reconnect_handler] Reconnecting Socket") self.close() self.run_amino_socket() def on_close(self, **kwargs): if self.debug is True: print("[socket][on_close] Socket Closed") #self.active = False if self.reconnect: if self.debug is True: print("[socket][on_close] reconnect is True, Opening Socket") def handle_message(self, ws, data): self.client.handle_socket_message(data) return def on_open(self, **kwargs): if self.debug is True: print("[socket][on_open] Socket Opened") def send(self, data): if self.debug is True: print(f"[socket][send] Sending Data : {data}") if not self.socket_thread: self.run_amino_socket() time.sleep(5) self.socket.send(data) def on_ping(self, data): if self.debug is True: print("[socket][on_ping] Socket Pinged") contextlib.suppress(self.socket.sock.pong(data)) def run_amino_socket(self): try: if self.debug is True: print(f"[socket][start] Starting Socket") if self.client.sid is None: return final = f"{self.client.device_id}|{int(time.time() * 1000)}" self.headers = { "NDCDEVICEID": deviceId() if self.client.autoDevice else self.client.device_id, "NDCAUTH": f"sid={self.client.sid}", "NDC-MSG-SIG": helpers.signature(final) } self.socket = websocket.WebSocketApp( f"{self.socket_url}/?signbody={final.replace('|', '%7C')}", on_message = self.handle_message, on_open = self.on_open, on_close = self.on_close, on_ping = self.on_ping, header = self.headers ) self.active = True self.socket_thread = Thread(target=self.socket.run_forever) self.socket_thread.start() if self.reconnect_thread is None: self.reconnect_thread = Thread(target=self.reconnect_handler) self.reconnect_thread.start() if self.debug is True: print(f"[socket][start] Socket Started") except Exception as e: print(e) def close(self): if self.debug is True: print(f"[socket][close] Closing Socket") self.active = False try: self.socket.close() except Exception as closeError: if self.debug is True: print(f"[socket][close] Error while closing Socket : {closeError}") return class Callbacks: def __init__(self, client): self.client = client self.handlers = {} self.methods = { 304: self._resolve_chat_action_start, 201: self._resolve_channel, 102: self._resolve_info, 306: self._resolve_chat_action_end, 1000: self._resolve_chat_message } self.chat_methods = { "0:0": self.on_text_message, "0:100": self.on_image_message, "0:103": self.on_youtube_message, "1:0": self.on_strike_message, "2:110": self.on_voice_message, "3:113": self.on_sticker_message, "52:0": self.on_voice_chat_not_answered, "53:0": self.on_voice_chat_not_cancelled, "54:0": self.on_voice_chat_not_declined, "55:0": self.on_video_chat_not_answered, "56:0": self.on_video_chat_not_cancelled, "57:0": self.on_video_chat_not_declined, "58:0": self.on_avatar_chat_not_answered, "59:0": self.on_avatar_chat_not_cancelled, "60:0": self.on_avatar_chat_not_declined, "100:0": self.on_delete_message, "101:0": self.on_group_member_join, "102:0": self.on_group_member_leave, "103:0": self.on_chat_invite, "104:0": self.on_chat_background_changed, "105:0": self.on_chat_title_changed, "106:0": self.on_chat_icon_changed, "107:0": self.on_voice_chat_start, "108:0": self.on_video_chat_start, "109:0": self.on_avatar_chat_start, "110:0": self.on_voice_chat_end, "111:0": self.on_video_chat_end, "112:0": self.on_avatar_chat_end, "113:0": self.on_chat_content_changed, "114:0": self.on_screen_room_start, "115:0": self.on_screen_room_end, "116:0": self.on_chat_host_transfered, "117:0": self.on_text_message_force_removed, "118:0": self.on_chat_removed_message, "119:0": self.on_text_message_removed_by_admin, "120:0": self.on_chat_tip, "121:0": self.on_chat_pin_announcement, "122:0": self.on_voice_chat_permission_open_to_everyone, "123:0": self.on_voice_chat_permission_invited_and_requested, "124:0": self.on_voice_chat_permission_invite_only, "125:0": self.on_chat_view_only_enabled, "126:0": self.on_chat_view_only_disabled, "127:0": self.on_chat_unpin_announcement, "128:0": self.on_chat_tipping_enabled, "129:0": self.on_chat_tipping_disabled, "65281:0": self.on_timestamp_message, "65282:0": self.on_welcome_message, "65283:0": self.on_invite_message } self.channel_methods = { "fetch-channel": self.on_fetch_channel, "user-info": self.on_user_info, } self.chat_actions_start = { "Typing": self.on_user_typing_start, } self.chat_actions_end = { "Typing": self.on_user_typing_end, } def _resolve_chat_message(self, data): key = f"{data['o']['chatMessage']['type']}:{data['o']['chatMessage'].get('mediaType', 0)}" return self.chat_methods.get(key, self.default)(data) def _resolve_chat_action_start(self, data): #print(data) key = data['o'].get('actions', 0) return self.chat_actions_start.get(key, self.default)(data) def _resolve_chat_action_end(self, data): #print(data) key = data['o'].get('actions', 0) return self.chat_actions_end.get(key, self.default)(data) def _resolve_channel(self, data): #print(data) if data['t'] == 201: return self.channel_methods.get("fetch-channel")(data) def _resolve_info(self, data): #print(data) if data['t'] == 102: return self.channel_methods.get("user-info")(data) def resolve(self, data): # def _resolve_channel(self, data): # #print(data) # if data['t'] == 201: # data = json.loads(data) return self.methods.get(data["t"], self.default)(data) def call(self, type, data): if type in self.handlers: for handler in self.handlers[type]: handler(data) def event(self, type): def registerHandler(handler): if type in self.handlers: self.handlers[type].append(handler) else: self.handlers[type] = [handler] return handler return registerHandler def on_text_message(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_image_message(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_youtube_message(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_strike_message(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_voice_message(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_sticker_message(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_voice_chat_not_answered(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_voice_chat_not_cancelled(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_voice_chat_not_declined(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_video_chat_not_answered(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_video_chat_not_cancelled(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_video_chat_not_declined(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_avatar_chat_not_answered(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_avatar_chat_not_cancelled(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_avatar_chat_not_declined(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_delete_message(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_group_member_join(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_group_member_leave(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_chat_invite(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_chat_background_changed(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_chat_title_changed(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_chat_icon_changed(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_voice_chat_start(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_video_chat_start(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_avatar_chat_start(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_voice_chat_end(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_video_chat_end(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_avatar_chat_end(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_chat_content_changed(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_screen_room_start(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_screen_room_end(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_chat_host_transfered(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_text_message_force_removed(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_chat_removed_message(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_text_message_removed_by_admin(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_chat_tip(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_chat_pin_announcement(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_voice_chat_permission_open_to_everyone(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_voice_chat_permission_invited_and_requested(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_voice_chat_permission_invite_only(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_chat_view_only_enabled(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_chat_view_only_disabled(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_chat_unpin_announcement(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_chat_tipping_enabled(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_chat_tipping_disabled(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_timestamp_message(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_welcome_message(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_invite_message(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_user_typing_start(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_user_typing_end(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_fetch_channel(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def on_user_info(self, data): self.call(getframe(0).f_code.co_name, objects.Event(data["o"]).Event) def default(self, data): self.call(getframe(0).f_code.co_name, data)
Editor is loading...