Untitled
unknown
python
3 years ago
4.7 kB
17
Indexable
# -*- coding: utf-8 -*- import time import json import websocket import contextlib from threading import Thread from sys import _getframe as getframe class SocketHandler: def __init__(self, client, socket_trace = False, debug = False): self.socket_url = "" self.client = client self.debug = True self.active = False self.headers = None self.socket = None self.socket_thread = None self.reconnectTime = 180 self.socket_thread = None if self.socket_enabled: self.reconnect_thread = Thread(target=self.reconnect_handler) self.reconnect_thread.start() websocket.enableTrace(socket_trace) def reconnect_handler(self): # Made by enchart#3410 thx # Fixed by The_Phoenix#3967 while True: time.sleep(self.reconnectTime) if self.active: if self.debug is True: print(f"[socket][reconnect_handler] Reconnecting Socket") self.close() self.run_amino_socket() def handle_message(self, ws, data): d=json.dumps(data) print(json.loads(d)["type"]) self.client.handle_socket_message(data) return def send(self, data): if self.debug is True: print(f"[socket][send] Sending Data : {data}") if not self.socket_thread: self.run_amino_socket() time.sleep(5) self.socket.send(data) def run_amino_socket(self): try: if self.debug is True: print(f"[socket][start] Starting Socket") if self.client.sid is None: return self.socket = websocket.WebSocketApp( f"wss://api.interpals.net/v1/ws?token={self.client.sid}", on_message = self.handle_message ) self.active = True self.socket_thread = Thread(target=self.socket.run_forever) self.socket_thread.start() if self.reconnect_thread is None: self.reconnect_thread = Thread(target=self.reconnect_handler) self.reconnect_thread.start() if self.debug is True: print(f"[socket][start] Socket Started") except Exception as e: print(e) def close(self): if self.debug is True: print(f"[socket][close] Closing Socket") self.active = False try: self.socket.close() except Exception as closeError: if self.debug is True: print(f"[socket][close] Error while closing Socket : {closeError}") return class Callbacks: def __init__(self, client): self.client = client self.handlers = {} self.methods = { 304: self._resolve_chat_action_start, 306: self._resolve_chat_action_end, 1000: self._resolve_chat_message } self.chat_methods = { "THREAD_NEW_MESSAGE": self.on_text_message } def _resolve_chat_message(self, data): key = f"{data['o']['chatMessage']['type']}:{data['o']['chatMessage'].get('mediaType', 0)}" return self.chat_methods.get(key, self.default)(data) def _resolve_chat_action_start(self, data): key = data['o'].get('actions', 0) return self.chat_actions_start.get(key, self.default)(data) def _resolve_chat_action_end(self, data): key = data['o'].get('actions', 0) return self.chat_actions_end.get(key, self.default)(data) def resolve(self, data): try: #print(data) data = json.loads(data) #print(data) if data["type"]=="THREAD_NEW_MESSAGE": print(data) return self.chat_methods.get("THREAD_NEW_MESSAGE")(data) except Exception as f: print(f) def call(self, type, data): if type in self.handlers: for handler in self.handlers[type]: handler(data) def event(self, type): #print(type) def registerHandler(handler): if type in self.handlers: self.handlers[type].append(handler) else: self.handlers[type] = [handler] return handler return registerHandler def on_text_message(self, data): self.call(getframe(0).f_code.co_name,data)
Editor is loading...