Untitled
unknown
plain_text
2 years ago
9.5 kB
10
Indexable
import requests import telebot import numpy as np import copy import multiprocessing as mp from telebot import types import threading from multiprocessing import Process, freeze_support import asyncio from time import sleep list_of_error = [] list_of_lists = np.loadtxt("URL.txt", dtype=object) list_of_lists2 = list(copy.copy(list_of_lists)) list_of_lists3 = np.loadtxt("URL2.txt", dtype=object) list_of_lists4 = list(copy.copy(list_of_lists3)) list_of_lists5 = np.loadtxt("URL3.txt", dtype=object) list_of_lists6 = list(copy.copy(list_of_lists5)) # list_of_error = list(copy.copy(list_of_lists)) print(list_of_lists) print(list_of_lists4) print(list_of_lists6) bot = telebot.TeleBot('xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx') @bot.message_handler(commands=['start']) def thread_main(message): # threading.Thread(target=func1, args=(message,)).start() threading.Thread(target=func2, args=(message,)).start() # threading.Thread(target=func3, args=(message,)).start() threading.Thread(target=func4, args=(message,)).start() # def start(message): # bot.send_message(message.chat.id, 'Приветствие'.format(message.from_user, bot.get_me()), parse_mode='html') # markup = types.InlineKeyboardMarkup() # def func1(): # def func1(message): # while True: # for line in list_of_lists2: # try: # # # resp=(urllib.request.urlopen(line).getcode()) #возращает статусы и имя ресурса # # print(resp) # # with eventlet.Timeout(3): # page = requests.get(line, timeout=3, headers={'User-Agent': 'some cool user-agent'}) # возращает статусы, без page # print(page.status_code) # if page.status_code == 200: # #sleep(0.1) # print('OK! Поток №1') # else: # # print(f'Ресурс {line} недоступен') # # message.text = print(f'Ресурс {line} недоступен') # # send_message(message.chat_id, chat_text, parse_mode=telegram.ParseMode.HTML) # bot.send_message(message.chat.id, f'Система {line} недоступна') # # for line in list_of_lists(): # list_of_error.append(line) # #sleep(0.1) # print(list_of_error) # list_of_lists2.remove(line) # # print(list_of_lists.index(line)) # #if not line: # #continue # print(line.rstrip()) # #sleep(0.1) # except: # continue def func2(message): while True: for line in list_of_lists4: try: page = requests.get(line, timeout=3) # возращает статусы, без page print(f'{page.status_code} {line}') if page.status_code != 200: bot.send_message(message.chat.id, f'Система {line} недоступна') # for line in list_of_lists(): list_of_error.append(line) list_of_lists4.remove(line) # list_of_error1 = list(set(list_of_error)) # sleep(0.1) print(f'{list_of_error1} лист ошибок!') # print(list_of_lists.index(line)) # if not line: # continue print(line.rstrip()) #print(f"Система {line} доступна!") else: #page.status_code != 200: bot.send_message(message.chat.id, f'Система {line} недоступна') # for line in list_of_lists(): list_of_error.append(line) list_of_lists4.remove(line) # list_of_error1 = list(set(list_of_error)) # sleep(0.1) print(f'{list_of_error1} лист ошибок!') # print(list_of_lists.index(line)) # if not line: # continue print(line.rstrip()) #else: continue except: continue # except: # # else: # # print(f'Ресурс {line} недоступен') # # message.text = print(f'Ресурс {line} недоступен') # # send_message(message.chat_id, chat_text, parse_mode=telegram.ParseMode.HTML) # bot.send_message(message.chat.id, f'Система {line} недоступна') # # for line in list_of_lists(): # list_of_error.append(line) # list_of_lists4.remove(line) # list_of_error1 = list(set(list_of_error)) # # sleep(0.1) # print(f'{list_of_error1} лист ошибок!') # # print(list_of_lists.index(line)) # # if not line: # # continue # print(line.rstrip()) # # sleep(0.1) # continue # # def func3(message): # while True: # for line in list_of_lists6: # try: # # # resp=(urllib.request.urlopen(line).getcode()) #возращает статусы и имя ресурса # # print(resp) # # with eventlet.Timeout(3): # page = requests.get(line, timeout=3, headers={'User-Agent': 'some cool user-agent'}) # возращает статусы, без page # print(page.status_code) # if page.status_code == 200: # #sleep(0.1) # print('OK! Поток №3') # else: # # print(f'Ресурс {line} недоступен') # # message.text = print(f'Ресурс {line} недоступен') # # send_message(message.chat_id, chat_text, parse_mode=telegram.ParseMode.HTML) # bot.send_message(message.chat.id, f'Система {line} недоступна') # # for line in list_of_lists(): # list_of_error.append(line) # #sleep(0.1) # print(list_of_error) # list_of_lists6.remove(line) # # print(list_of_lists.index(line)) # #if not line: # #continue # print(line.rstrip()) # #sleep(0.1) # except: # continue # # def func4(message): while True: for line in list_of_error: try: page = requests.get(line, timeout=3) # возращает статусы, без page print(page.status_code) if page.status_code == 200: bot.send_message(message.chat.id, f'Система {line} доступна') print('Проверяю недоступные системы!') list_of_error1.remove(line) except requests.exceptions.ConnectionError: print(f'Ошибка исключения ConnectionError {line}') continue # else: # bot.send_message(message.chat.id, f'Система {line} доступна') # list_of_lists4.append(line) # #sleep(0.1) # print(list_of_error) # list_of_error.remove(line) # #if not line: # # continue # print(line.rstrip()) # #sleep(0.1) except: page = requests.get(line, timeout=3) if page.status_code != 200: bot.send_message(message.chat.id, f'Система {line} недоступна') list_of_lists4.append(line) # sleep(0.1) print(list_of_error) # list_of_error.remove(line) # if not line: # continue print(line.rstrip()) # sleep(0.1) continue # # # if __name__=='__main__': # p1 = Process(target = start) # p1.start() # p2 = Process(target = func2) # p2.start() # ioloop = asyncio.get_event_loop() # tasks = [ # ioloop.create_task(func1()), # ioloop.create_task(func2()) # ] # ioloop.run_until_complete(asyncio.wait(tasks)) # ioloop.close() # if name == '__main__': # def main(func: callable): # func() # # tasks = [func1, func2] # procs = [mp.Process(target=main, args=(i,)) for i in tasks] # создаем столько процессов, сколько имеем функций # for proc in procs: # proc.start() # # for proc in procs: # proc.join() if __name__ == '__main__': try: bot.polling(none_stop=True) except Exception as e: print(e) # bot.polling(none_stop=True, interval=0)
Editor is loading...