Telegram_luanon

mail@pastecode.io avatar
unknown
plain_text
2 years ago
5.2 kB
11
Indexable
Never
import os
import time
from telethon import TelegramClient, sync, events
from telethon.tl.functions.channels import JoinChannelRequest, LeaveChannelRequest

from .utils import Utils, TelegramTask
from .exceptions import NoValue

class Telegram:
	
	def __init__(self, phone=None, api_ịd=None, api_hash=None, delay=None):
		self.Utils = Utils()
		self.tasks = []
		self.consts = {}
		self.tag = "Telegram"
		self.phone = phone or self.raise_error("noValue", "phone")
		self.api_id = api_ịd or 1339492
		self.api_hash = api_hash or "262d46fcf6756c14123ee6910c6ee5f3"
		self.delay = delay or 2
		self.check_file()
	
	def raise_error(self, ex_type=None, ex_name=None):
		if ex_type == "noValue":
			raise NoValue("Value {0} bị thiếu, vui lòng truyền vào value {0}".format(ex_name))
	
	def check_file(self):
		if not os.path.exists("session"):
			os.makedirs("session")
		if not os.listdir("session"):
			print("Bỏ tệp +84966xxx.session vào thư mục session để chạy tool")
			exit()
	
	def sess_connect(self):
		try:
			client = TelegramClient("session/%s" %self.phone, self.api_id, self.api_hash)
			client.connect()
			if not client.is_user_authorized():
				self.Utils.wrong(self.tag, "Không có kể nối: %s" %self.phone)
				return False
			return client
		except Exception as e:
			traceback = "Sess_connect: %s - %s - %s" %(e.__traceback__.tb_frame.f_code.co_filename, e.__traceback__.tb_lineno, e)
			self.Utils.wrong(self.tag, traceback)
			return False
	
	def add_task(self, entity=None, task_answer=None, task_type=None, task_delay=None, const=None, command=None):
		self.tasks.append({"entity": entity, "task_answer": task_answer, "task_type": task_type, "task_delay": task_delay or self.delay, "const": const, "command": command})
	
	def run(self):
		done = False
		for _ in range(3):
			try:
				sess = self.sess_connect()
				if sess == False:
					continue
				for task in self.tasks:
					entity = task["entity"]
					entity = sess.get_entity(entity) if entity else entity
					task_answer = task["task_answer"]
					task_type = task["task_type"]
					task_delay = task["task_delay"]
					const = task["const"]
					command = task["command"]
					argcount = command.__code__.co_argcount if command else 0
					if task_type == "SMSG":
						if const:
							if const in self.consts.keys():
								text = self.consts[const]
							else:
								if "cache" in self.consts.keys():
									text = self.consts["cache"]		 
								else:
									text = task_answer
						else:
							text = task_answer
						if argcount == 1:
							text = command(text)
						sess.send_message(entity, text)
					elif task_type == "GMSG":
						if isinstance(task_answer, (list, tuple)):
							limit, index = task_answer
							msg = sess.get_messages(entity, limit=limit)[index]
						else:
							limit, index = 1, 0
							msg = sess.get_messages(entity, limit=limit)[index]
						text = msg.message
						if argcount == 1:
							text = command(text)
						if const:
							self.consts.update({const: text})
						else:
							self.consts.update({"cache": text})
					elif task_type == "GIMG":
						if isinstance(task_answer, (list, tuple)):
							limit, index = task_answer
							img = sess.get_messages(entity, limit=limit)[index]
						else:
							limit, index = 1, 0
							img = sess.get_messages(entity, limit=limit)[index]
						if img.media:
							img = img.download_media("captcha.jpg")
						if argcount == 1:
							img = command(img)
						if const:
							self.consts.update({const: img})
						else:
							self.consts.update({"cache": img})
					elif task_type == "C":
						if isinstance(task_answer, (list, tuple)):
							row, col = task_answer
							sess.get_messages(entity, limit=1)[0].click(row, col)
						elif isinstance(task_answer, int):
							sess.get_messages(entity, limit=1)[0].click(task_answer)
						else:
							self.Utils.wrong(self.tag, "Task_type: '%s' nhận được câu trả lời sai '%s' thay vì int, list, tuple" %(task_type, type(task_answer)))
					elif task_type in ["J", "L"]:
						if isinstance(task_answer, str):
							if task_type == "J":
								sess(JoinChannelRequest(task_answer))
							else:
								sess(LeaveChannelRequest(task_answer))
						elif isinstance(task_answer, (list, tuple)):
							for group in task_answer:
								if task_type == "J":
									sess(JoinChannelRequest(group))
								else:
									sess(LeaveChannelRequest(group))
						else:
							self.Utils.wrong(self.tag, "Task_type: '%s' nhận được câu trả lời sai '%s' thay vì str, list, tuple" %(task_type, type(task_answer)))
					else:
						self.Utils.wrong(self.tag, "Task_type: '%s' không phải giá trị đúng" %task_type)
					time.sleep(task_delay)
					continue
				done = True
				sess.disconnect()
				self.Utils.wrong(self.tag, "%s: Done" %self.phone)
				break
			except Exception as e:
				traceback = "Run: %s - %s - %s" %(e.__traceback__.tb_frame.f_code.co_filename, e.__traceback__.tb_lineno, e)
				self.Utils.wrong(self.tag, traceback)
				continue
		return done