Untitled
unknown
python
2 years ago
21 kB
1
Indexable
Never
from urllib.request import urlopen from datetime import datetime from random import choice from json import dumps, load, loads from contextlib import suppress from pathlib import Path from uuid import uuid4 from threading import Thread from amino import Client, SubClient, ACM from .commands import * from .extensions import * path_utilities = "utilities" path_amino = f'{path_utilities}/amino_list' path_wel = 'welcome_list' def print_exception(exc): print(repr(exc)) class Bot(SubClient, ACM): def __init__(self, client: Client, community, prefix: str = ".", bio: str = None, activity: bool = False) -> None: self.client = client self.marche = True self.prefix = prefix self.bio_contents = bio self.activity = activity self.session = self.client.session if isinstance(community, int): self.community_id = community self.community = self.client.get_community_info(comId=self.community_id) self.community_amino_id = self.community.aminoId else: self.community_amino_id = community self.informations = self.client.get_from_code(f"http://aminoapps.com/c/{community}") self.community_id = self.informations.json["extensions"]["community"]["ndcId"] self.community = self.client.get_community_info(comId=self.community_id) self.community_name = self.community.name super().__init__(comId=self.community_id, profile=self.client.profile) try: self.community_leader_agent_id = self.community.json["agent"]["uid"] except Exception: self.community_leader_agent_id = "-" try: self.community_staff_list = self.community.json["communityHeadList"] except Exception: self.community_staff_list = "" if self.community_staff_list: self.community_leaders = [elem["uid"] for elem in self.community_staff_list if elem["role"] in (100, 102)] self.community_curators = [elem["uid"] for elem in self.community_staff_list if elem["role"] == 101] self.community_staff = [elem["uid"] for elem in self.community_staff_list] if not Path(f'{path_wel}/{self.community_amino_id}.txt').exists(): self.create_wel_msg() if not Path(f'{path_amino}/{self.community_amino_id}.json').exists(): self.create_community_file() old_dict = self.get_file_dict() new_dict = self.create_dict() def do(k, v): old_dict[k] = v def undo(k): del old_dict[k] [do(k, v) for k, v in new_dict.items() if k not in old_dict] [undo(k) for k in new_dict.keys() if k not in old_dict] self.update_file(old_dict) # self.subclient = SubClient(comId=self.community_id, profile=client.profile) self.banned_words = self.get_file_info("banned_words") self.locked_command = self.get_file_info("locked_command") self.message_bvn = self.get_wel() self.welcome_chat = self.get_file_info("welcome_chat") self.prefix = self.get_file_info("prefix") self.favorite_users = self.get_file_info("favorite_users") self.favorite_chats = self.get_file_info("favorite_chats") self.update_file() # self.activity_status("on") new_users = self.get_all_users(start=0, size=30, type="recent") self.new_users = [elem["uid"] for elem in new_users.json["userProfileList"]] def create_wel_msg(self): wf=open(f'{path_wel}/{self.community_amino_id}.txt', 'w') def create_community_file(self): with open(f'{path_amino}/{self.community_amino_id}.json', 'w', encoding='utf8') as file: dict = self.create_dict() file.write(dumps(dict, sort_keys=False, indent=4)) def create_dict(self): return {"welcome": "", "prefix": self.prefix, "welcome_chat": "", "locked_command": [], "favorite_users": [], "favorite_chats": [], "banned_words": []} def get_dict(self): return {"welcome": self.message_bvn, "prefix": self.prefix, "welcome_chat": self.welcome_chat, "locked_command": self.locked_command, "favorite_users": self.favorite_users, "favorite_chats": self.favorite_chats, "banned_words": self.banned_words} def sewel(self,msg): wf=open(f'{path_wel}/{self.community_amino_id}.txt', 'w', encoding='utf8') wf.write(msg) wf.close() def get_wel(self): wf=open(f'{path_wel}/{self.community_amino_id}.txt', 'r') welcm=wf.read() return welcm def update_file(self, dict=None): if not dict: dict = self.get_dict() with open(f"{path_amino}/{self.community_amino_id}.json", "w", encoding="utf8") as file: file.write(dumps(dict, sort_keys=False, indent=4)) def get_file_info(self, info: str = None): with open(f"{path_amino}/{self.community_amino_id}.json", "r", encoding="utf8") as file: return load(file)[info] def get_file_dict(self, info: str = None): with open(f"{path_amino}/{self.community_amino_id}.json", "r", encoding="utf8") as file: return load(file) def get_banned_words(self): return self.banned_words def set_prefix(self, prefix: str): self.prefix = prefix self.update_file() def set_welcome_message(self, message: str): self.message_bvn = message self.sewel(message) def set_welcome_chat(self, chatId: str): self.welcome_chat = chatId self.update_file() def add_favorite_users(self, value: str): self.favorite_users.append(value) self.update_file() def add_favorite_chats(self, value: str): self.favorite_chats.append(value) self.update_file() def add_banned_words(self, liste: list): self.banned_words.extend(liste) self.update_file() def add_locked_command(self, liste: list): self.locked_command.extend(liste) self.update_file() def remove_favorite_users(self, value: str): liste = [value] [self.favorite_users.remove(elem) for elem in liste if elem in self.favorite_users] self.update_file() def remove_favorite_chats(self, value: str): liste = [value] [self.favorite_chats.remove(elem) for elem in liste if elem in self.favorite_chats] self.update_file() def remove_banned_words(self, liste: list): [self.banned_words.remove(elem) for elem in liste if elem in self.banned_words] self.update_file() def remove_locked_command(self, liste: list): [self.locked_command.remove(elem) for elem in liste if elem in self.locked_command] self.update_file() def unset_welcome_chat(self): self.welcome_chat = "" self.update_file() def is_in_staff(self, uid): return uid in self.community_staff def is_leader(self, uid): return uid in self.community_leaders def is_curator(self, uid): return uid in self.community_curators def is_agent(self, uid): return uid == self.community_leader_agent_id def accept_role(self, rid: str = None): with suppress(Exception): self.accept_organizer(rid) return True with suppress(Exception): self.promotion(noticeId=rid) return True return False def get_staff(self, community): if isinstance(community, int): with suppress(Exception): community = self.client.get_community_info(com_id=community) else: try: informations = self.client.get_from_code(f"http://aminoapps.com/c/{community}") except Exception: return False community_id = informations.json["extensions"]["community"]["ndcId"] community = self.client.get_community_info(comId=community_id) try: community_staff_list = community.json["communityHeadList"] community_staff = [elem["uid"] for elem in community_staff_list] except Exception: community_staff_list = "" else: return community_staff def get_user_id(self, name_or_id): members = self.get_all_users(size=1).json['userProfileCount'] start = 0 lower_name = None while start <= members: users = self.get_all_users(start=start, size=100).json['userProfileList'] for user in users: name = user['nickname'] uid = user['uid'] if name_or_id == name or name_or_id == uid: return (name, uid) if not lower_name and name_or_id.lower() in name.lower(): lower_name = (name, uid) start += 100 return lower_name if lower_name else None def ask_all_members(self, message, lvl: int = 20, type_bool: int = 1): def ask(uid): try: self.start_chat(userId=[uid], message=message) except Exception: self.start_chat(userId=[uid], message=message) size = self.get_all_users(start=0, size=1, type="recent").json['userProfileCount'] st = 0 while size > 0: value = size if value > 100: value = 100 users = self.get_all_users(start=st, size=value) if type_bool == 1: [ask(user["uid"]) for user in users.json['userProfileList'] if user['level'] == lvl] elif type_bool == 2: [ask(user["uid"]) for user in users.json['userProfileList'] if user['level'] <= lvl] elif type_bool == 3: [ask(user["uid"]) for user in users.json['userProfileList'] if user['level'] >= lvl] size -= 100 st += 100 def ask_amino_staff(self, message): self.start_chat(userId=self.community_staff, message=message) def get_chat_id(self, chat: str = None): with suppress(Exception): return self.get_from_code(f"http://aminoapps.com/c/{chat}").objectId with suppress(Exception): chati = self.get_from_code(f"{chat}").objectId return chati val = self.get_public_chat_threads() for title, chat_id in zip(val.title, val.chatId): if chat == title: return chat_id for title, chat_id in zip(val.title, val.chatId): if chat.lower() in title.lower() or chat == chat_id: return chat_id return False def upload_bubble(self,file,comId): data=file response = self.session.post(f"https://service.narvii.com/api/v1/x{comId}/s/chat/chat-bubble/templates/107147e9-05c5-405f-8553-af65d2823457/generate", data=data, headers=self.headers) bid=loads(response.text)['chatBubble']['bubbleId'] print(bid) response = self.session.post(f"https://service.narvii.com/api/v1/x{comId}/s/chat/chat-bubble/{bid}", data=data, headers=self.headers) if response.status_code !=200: return loads(response.text) else: return bid def copy_bubble(self, chatId: str, replyId: str, comId: str = None): if not comId: comId = self.community_id header = { 'Accept-Language': 'en-US', 'Content-Type': 'application/octet-stream', 'User-Agent': 'Dalvik/2.1.0 (Linux; U; Android 7.1; LG-UK495 Build/MRA58K; com.narvii.amino.master/3.3.33180)', 'Host': 'service.narvii.com', 'Accept-Encoding': 'gzip', 'Connection': 'Keep-Alive', } a = self.get_message_info(chatId=chatId, messageId=replyId).json["chatBubble"]["resourceUrl"] with urlopen(a) as zipresp: yo = zipresp.read() response = self.session.post(f"https://service.narvii.com/api/v1/x{comId}/s/chat/chat-bubble/templates/107147e9-05c5-405f-8553-af65d2823457/generate", data=yo, headers=header) bid = loads(response.text)['chatBubble']['bubbleId'] response = self.session.post(f"https://service.narvii.com/api/v1/{comId}/s/chat/chat-bubble/{bid}", data=yo, headers=header) def stop_instance(self): self.marche = False def start_instance(self): self.marche = True Thread(target=self.passive).start() def leave_amino(self): self.marche = False for elem in self.get_public_chat_threads().chatId: with suppress(Exception): self.leave_chat(elem) self.client.leave_community(comId=self.community_id) def check_new_member(self): if not (self.message_bvn or self.welcome_chat): return new_list = self.get_all_users(start=0, size=25, type="recent") new_member = [(elem["nickname"], elem["uid"]) for elem in new_list.json["userProfileList"]] for elem in new_member: name, uid = elem[0], elem[1] if name =="mambll_1" or "blackeyed": self.ban(userId=uid,reason="lawde ka bot") val = self.get_wall_comments(userId=uid, sorting='newest').commentId if not val and self.message_bvn: with suppress(Exception): self.comment(message=self.message_bvn, userId=uid) if not val and self.welcome_chat: with suppress(Exception): self.invite_to_chat(chatId=self.welcome_chat, userId=uid) new_users = self.get_all_users(start=0, size=30, type="recent") self.new_users = [elem["uid"] for elem in new_users.json["userProfileList"]] def welcome_new_member(self): new_list = self.get_all_users(start=0, size=25, type="recent") new_member = [(elem["nickname"], elem["uid"]) for elem in new_list.json["userProfileList"]] for elem in new_member: name, uid = elem[0], elem[1] val = self.get_wall_comments(userId=uid, sorting='newest').commentId if not val or uid not in self.new_users and self.message_bvn: with suppress(Exception): self.comment(message=self.message_bvn, userId=uid) if uid not in self.new_users and self.welcome_chat: with suppress(Exception): self.invite_to_chat(chatId=self.welcome_chat, userId=uid) new_users = self.get_all_users(start=0, size=30, type="recent") self.new_users = [elem["uid"] for elem in new_users.json["userProfileList"]] def feature_chats(self): for elem in self.favorite_chats: with suppress(Exception): self.favorite(time=2, chatId=elem) def feature_users(self): featured = [elem["uid"] for elem in self.get_featured_users().json["userProfileList"]] for elem in self.favorite_users: if elem not in featured: with suppress(Exception): self.favorite(time=1, userId=elem) def get_member_level(self, uid): return self.get_user_info(userId=uid).level def get_member_titles(self, uid): with suppress(Exception): return self.get_user_info(userId=uid).customTitles return False def get_wallet_amount(self): return self.client.get_wallet_info().totalCoins def generate_transaction_id(self): return str(uuid4()) def pay(self, coins: int = 0, blogId: str = None, chatId: str = None, objectId: str = None, transactionId: str = None): if not transactionId: transactionId = self.generate_transaction_id() self.send_coins(coins=coins, blogId=blogId, chatId=chatId, objectId=objectId, transactionId=transactionId) def favorite(self, time: int = 1, userId: str = None, chatId: str = None, blogId: str = None, wikiId: str = None): self.feature(time=time, userId=userId, chatId=chatId, blogId=blogId, wikiId=wikiId) def unfavorite(self, userId: str = None, chatId: str = None, blogId: str = None, wikiId: str = None): self.unfeature(userId=userId, chatId=chatId, blogId=blogId, wikiId=wikiId) def join_chatroom(self, chat: str = None, chatId: str = None): if not chat: with suppress(Exception): self.join_chat(chatId) return "" with suppress(Exception): chati = self.get_from_code(f"{chat}").objectId self.join_chat(chati) return chat chats = self.get_public_chat_threads() for title, chat_id in zip(chats.title, chats.chatId): if chat == title: self.join_chat(chat_id) return title chats = self.get_public_chat_threads() for title, chat_id in zip(chats.title, chats.chatId): if chat.lower() in title.lower() or chat == chat_id: self.join_chat(chat_id) return title return False def start_screen_room(self, chatId: str, joinType: int=1): self.client.join_video_chat(comId=self.community_id, chatId=chatId, joinType=joinType) def start_voice_room(self, chatId: str, joinType: int=1): self.client.join_voice_chat(comId=self.community_id, chatId=chatId, joinType=joinType) def join_screen_room(self, chatId: str, joinType: int=1): self.client.join_video_chat_as_viewer(comId=self.community_id, chatId=chatId, joinType=joinType) def get_chats(self): return self.get_public_chat_threads() def join_all_chat(self): for elem in self.get_public_chat_threads().chatId: with suppress(Exception): self.join_chat(elem) def leave_all_chats(self): for elem in self.get_public_chat_threads().chatId: with suppress(Exception): self.leave_chat(elem) def follow_user(self, uid): self.follow(userId=[uid]) def unfollow_user(self, uid): self.unfollow(userId=uid) def add_title(self, uid: str, title: str, color: str = None): member = self.get_member_titles(uid) try: titles = [i['title'] for i in member] + [title] colors = [i['color'] for i in member] + [color] except TypeError: titles = [title] colors = [color] self.edit_titles(uid, titles, colors) return True def remove_title(self, uid: str, title: str): member = self.get_member_titles(uid) tlist = [] clist = [] for t in member: if t["title"] != title: tlist.append(t["title"]) clist.append(t["color"]) self.edit_titles(uid, tlist, clist) return True def passive(self): def upt_activity(): timeNow = int(datetime.timestamp(datetime.now())) timeEnd = timeNow + 300 try: self.send_active_obj(startTime=timeNow, endTime=timeEnd) except Exception: pass def change_bio_and_welcome_members(): if self.welcome_chat or self.message_bvn: Thread(target=self.welcome_new_member).start() try: if isinstance(self.bio_contents, list): self.edit_profile(content=choice(self.bio_contents)) elif isinstance(self.bio_contents, str): self.edit_profile(content=self.bio_contents) except Exception as e: print_exception(e) def feature_chats(): try: Thread(target=self.feature_chats).start() except Exception as e: print_exception(e) def feature_users(): try: Thread(target=self.feature_users).start() except Exception as e: print_exception(e) feature_chats() feature_users() j = 0 k = 0 while self.marche: change_bio_and_welcome_members() if j >= 24: feature_chats() j = 0 if k >= 288: feature_users() k = 0 if self.activity: try: self.activity_status('on') except Exception: pass upt_activity() slp(300) j += 1 k += 1