Untitled
unknown
plain_text
16 days ago
3.0 kB
15
Indexable
import os import csv import logging from contextlib import suppress from telegram import Update from telegram.ext import Application, CommandHandler, CallbackContext # Enable logging logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO) logger = logging.getLogger(__name__) # Folders PHONE_FOLDER = "phonenumber" DATABASE_FOLDER = "database" def search_phone_number(phone_number): """Search for a phone number in the CSV file and return details.""" if len(phone_number) < 2: return "Invalid phone number. It must have at least two digits." file_path = os.path.join(PHONE_FOLDER, f"{phone_number[:2]}.csv") with suppress(FileNotFoundError), open(file_path, newline='', encoding='utf-8') as csvfile: reader = csv.reader(csvfile) for row in reader: if phone_number in row: return row # Return row for further processing return "Phone number not found." def search_line_in_database(file_name, line_number): """Search for a specific line in the database file.""" file_path = os.path.join(DATABASE_FOLDER, file_name) with suppress(FileNotFoundError), open(file_path, newline='', encoding='utf-8') as csvfile: for i, row in enumerate(csv.reader(csvfile), start=1): if i == line_number: return f"Line found: {row}" return "Invalid line number or file does not exist." async def check_group(update: Update) -> bool: """Check if the bot is running in a group chat.""" if update.message.chat.type not in ["group", "supergroup"]: await update.message.reply_text("This command only works in a group chat.") return False return True async def search_command(update: Update, context: CallbackContext) -> None: """Handles the /u <phone_number> command.""" if not await check_group(update): return args = context.args if not args: await update.message.reply_text("Usage: /u <phone_number>") return phone_number = args[0].strip() result = search_phone_number(phone_number) if isinstance(result, list) and len(result) >= 3: try: file_name, line_number = result[1], int(result[2]) await update.message.reply_text(f"Searching for line {line_number} in {file_name}...") db_result = search_line_in_database(file_name, line_number) await update.message.reply_text(db_result) except ValueError: await update.message.reply_text("Invalid data format in CSV.") else: await update.message.reply_text(result) def main(): """Start the bot.""" TOKEN = "apikey" app = Application.builder().token(TOKEN).build() app.add_handler(CommandHandler("u", search_command)) app.run_polling() if __name__ == "__main__": main()
Editor is loading...
Leave a Comment