from BotAmino import *
import requests
import websocket
from functools import reduce
from base64 import b64decode, b64encode
from typing import Union
from hashlib import sha1
from hmac import new
from time import sleep as slp
from functools import wraps
import threading
PREFIX = bytes.fromhex("42")
SIG_KEY = bytes.fromhex("F8E7A61AC3F725941E3AC7CAE2D688BE97F30B93")
c = BotAmino("","")
def signature(data: Union[str, bytes]) -> str:
data = data if isinstance(data, bytes) else data.encode("utf-8")
return b64encode(PREFIX + new(SIG_KEY, data, sha1).digest()).decode("utf-8")
class Token:
def __init__(self, comid, chatid, client):
self.client = client
self.voice_socket_url = "wss://ws2.narvii.com"
self.heads = None
self.sdata = '{"o":{"ndcId":'+comid+',"threadId":"'+chatid+'","id":"3340228"},"t":200}'
self.voice_socket = None
self.token = None
self.voice_start()
def voice_start(self):
final = f"{self.client.device_id}|{int(time.time() * 1000)}"
self.headers = {"NDCDEVICEID": deviceId() if self.client.autoDevice else self.client.device_id,"NDCAUTH": f"sid={self.client.sid}","NDC-MSG-SIG": signature(final)}
self.voice_socket = websocket.WebSocketApp(f"{self.voice_socket_url}/?signbody={final.replace('|', '%7C')}",
on_open=self.on_voice_open,
on_message=self.handle_voice_message,
on_ping=self.on_voice_ping,
header=self.headers)
threading.Thread(target=self.voice_socket.run_forever, kwargs={"ping_interval": 60}).start()
def on_voice_open(self):
self.voice_socket.send(self.sdata)
def handle_voice_message(self, data):
print(data)
if loads(data)['t'] == 201:
self.token = loads(data)['o']
self.voice_socket.close()
def on_voice_ping(self, data):
suppress(self.voice_socket.sock.pong(data))
def play_music(comid: str, chatid: str):
comid = str(comid)
data = '{"o":{"ndcId":'+comid+',"threadId":"'+chatid+'","joinRole":1,"id":"2249844"},"t":112}'
#print(data)
c.send(data)
data = '{"o":{"ndcId":'+comid+',"threadId":"'+chatid+'","channelType":1,"id":"2250161"},"t":108}'
c.send(data)
obj = Token(comid, chatid, c)
slp(3)
token = obj.token
while not token:
token = obj.token
slp(2)
print(token)
@c.command("p")
def play(data):
threading.Thread(target=play_music, args=[data.comId, data.chatId]).start()
@c.command("alive")
def alive(data):
data.subClient.send_message(data.chatId,"Zinda hoon lawde")
c.launch()
print("Ready")