Untitled

 avatar
unknown
plain_text
16 days ago
3.0 kB
15
Indexable
import os
import csv
import logging
from contextlib import suppress
from telegram import Update
from telegram.ext import Application, CommandHandler, CallbackContext

# Enable logging
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)

# Folders
PHONE_FOLDER = "phonenumber"
DATABASE_FOLDER = "database"

def search_phone_number(phone_number):
    """Search for a phone number in the CSV file and return details."""
    if len(phone_number) < 2:
        return "Invalid phone number. It must have at least two digits."
    
    file_path = os.path.join(PHONE_FOLDER, f"{phone_number[:2]}.csv")

    with suppress(FileNotFoundError), open(file_path, newline='', encoding='utf-8') as csvfile:
        reader = csv.reader(csvfile)
        for row in reader:
            if phone_number in row:
                return row  # Return row for further processing
    return "Phone number not found."

def search_line_in_database(file_name, line_number):
    """Search for a specific line in the database file."""
    file_path = os.path.join(DATABASE_FOLDER, file_name)
    
    with suppress(FileNotFoundError), open(file_path, newline='', encoding='utf-8') as csvfile:
        for i, row in enumerate(csv.reader(csvfile), start=1):
            if i == line_number:
                return f"Line found: {row}"
    
    return "Invalid line number or file does not exist."

async def check_group(update: Update) -> bool:
    """Check if the bot is running in a group chat."""
    if update.message.chat.type not in ["group", "supergroup"]:
        await update.message.reply_text("This command only works in a group chat.")
        return False
    return True

async def search_command(update: Update, context: CallbackContext) -> None:
    """Handles the /u <phone_number> command."""
    if not await check_group(update):
        return

    args = context.args
    if not args:
        await update.message.reply_text("Usage: /u <phone_number>")
        return

    phone_number = args[0].strip()
    result = search_phone_number(phone_number)
    
    if isinstance(result, list) and len(result) >= 3:
        try:
            file_name, line_number = result[1], int(result[2])
            await update.message.reply_text(f"Searching for line {line_number} in {file_name}...")
            db_result = search_line_in_database(file_name, line_number)
            await update.message.reply_text(db_result)
        except ValueError:
            await update.message.reply_text("Invalid data format in CSV.")
    else:
        await update.message.reply_text(result)

def main():
    """Start the bot."""
    TOKEN = "apikey"
    
    app = Application.builder().token(TOKEN).build()

    app.add_handler(CommandHandler("u", search_command))

    app.run_polling()

if __name__ == "__main__":
    main()
Editor is loading...
Leave a Comment