Untitled
unknown
plain_text
8 months ago
3.0 kB
20
Indexable
import os
import csv
import logging
from contextlib import suppress
from telegram import Update
from telegram.ext import Application, CommandHandler, CallbackContext
# Enable logging
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)
# Folders
PHONE_FOLDER = "phonenumber"
DATABASE_FOLDER = "database"
def search_phone_number(phone_number):
"""Search for a phone number in the CSV file and return details."""
if len(phone_number) < 2:
return "Invalid phone number. It must have at least two digits."
file_path = os.path.join(PHONE_FOLDER, f"{phone_number[:2]}.csv")
with suppress(FileNotFoundError), open(file_path, newline='', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile)
for row in reader:
if phone_number in row:
return row # Return row for further processing
return "Phone number not found."
def search_line_in_database(file_name, line_number):
"""Search for a specific line in the database file."""
file_path = os.path.join(DATABASE_FOLDER, file_name)
with suppress(FileNotFoundError), open(file_path, newline='', encoding='utf-8') as csvfile:
for i, row in enumerate(csv.reader(csvfile), start=1):
if i == line_number:
return f"Line found: {row}"
return "Invalid line number or file does not exist."
async def check_group(update: Update) -> bool:
"""Check if the bot is running in a group chat."""
if update.message.chat.type not in ["group", "supergroup"]:
await update.message.reply_text("This command only works in a group chat.")
return False
return True
async def search_command(update: Update, context: CallbackContext) -> None:
"""Handles the /u <phone_number> command."""
if not await check_group(update):
return
args = context.args
if not args:
await update.message.reply_text("Usage: /u <phone_number>")
return
phone_number = args[0].strip()
result = search_phone_number(phone_number)
if isinstance(result, list) and len(result) >= 3:
try:
file_name, line_number = result[1], int(result[2])
await update.message.reply_text(f"Searching for line {line_number} in {file_name}...")
db_result = search_line_in_database(file_name, line_number)
await update.message.reply_text(db_result)
except ValueError:
await update.message.reply_text("Invalid data format in CSV.")
else:
await update.message.reply_text(result)
def main():
"""Start the bot."""
TOKEN = "apikey"
app = Application.builder().token(TOKEN).build()
app.add_handler(CommandHandler("u", search_command))
app.run_polling()
if __name__ == "__main__":
main()
Editor is loading...
Leave a Comment