Untitled
unknown
python
3 years ago
4.7 kB
30
Indexable
# -*- coding: utf-8 -*-
import time
import json
import websocket
import contextlib
from threading import Thread
from sys import _getframe as getframe
class SocketHandler:
def __init__(self, client, socket_trace = False, debug = False):
self.socket_url = ""
self.client = client
self.debug = True
self.active = False
self.headers = None
self.socket = None
self.socket_thread = None
self.reconnectTime = 180
self.socket_thread = None
if self.socket_enabled:
self.reconnect_thread = Thread(target=self.reconnect_handler)
self.reconnect_thread.start()
websocket.enableTrace(socket_trace)
def reconnect_handler(self):
# Made by enchart#3410 thx
# Fixed by The_Phoenix#3967
while True:
time.sleep(self.reconnectTime)
if self.active:
if self.debug is True:
print(f"[socket][reconnect_handler] Reconnecting Socket")
self.close()
self.run_amino_socket()
def handle_message(self, ws, data):
d=json.dumps(data)
print(json.loads(d)["type"])
self.client.handle_socket_message(data)
return
def send(self, data):
if self.debug is True:
print(f"[socket][send] Sending Data : {data}")
if not self.socket_thread:
self.run_amino_socket()
time.sleep(5)
self.socket.send(data)
def run_amino_socket(self):
try:
if self.debug is True:
print(f"[socket][start] Starting Socket")
if self.client.sid is None:
return
self.socket = websocket.WebSocketApp(
f"wss://api.interpals.net/v1/ws?token={self.client.sid}",
on_message = self.handle_message
)
self.active = True
self.socket_thread = Thread(target=self.socket.run_forever)
self.socket_thread.start()
if self.reconnect_thread is None:
self.reconnect_thread = Thread(target=self.reconnect_handler)
self.reconnect_thread.start()
if self.debug is True:
print(f"[socket][start] Socket Started")
except Exception as e:
print(e)
def close(self):
if self.debug is True:
print(f"[socket][close] Closing Socket")
self.active = False
try:
self.socket.close()
except Exception as closeError:
if self.debug is True:
print(f"[socket][close] Error while closing Socket : {closeError}")
return
class Callbacks:
def __init__(self, client):
self.client = client
self.handlers = {}
self.methods = {
304: self._resolve_chat_action_start,
306: self._resolve_chat_action_end,
1000: self._resolve_chat_message
}
self.chat_methods = {
"THREAD_NEW_MESSAGE": self.on_text_message
}
def _resolve_chat_message(self, data):
key = f"{data['o']['chatMessage']['type']}:{data['o']['chatMessage'].get('mediaType', 0)}"
return self.chat_methods.get(key, self.default)(data)
def _resolve_chat_action_start(self, data):
key = data['o'].get('actions', 0)
return self.chat_actions_start.get(key, self.default)(data)
def _resolve_chat_action_end(self, data):
key = data['o'].get('actions', 0)
return self.chat_actions_end.get(key, self.default)(data)
def resolve(self, data):
try:
#print(data)
data = json.loads(data)
#print(data)
if data["type"]=="THREAD_NEW_MESSAGE":
print(data)
return self.chat_methods.get("THREAD_NEW_MESSAGE")(data)
except Exception as f: print(f)
def call(self, type, data):
if type in self.handlers:
for handler in self.handlers[type]:
handler(data)
def event(self, type):
#print(type)
def registerHandler(handler):
if type in self.handlers:
self.handlers[type].append(handler)
else:
self.handlers[type] = [handler]
return handler
return registerHandler
def on_text_message(self, data): self.call(getframe(0).f_code.co_name,data)Editor is loading...