animal trivia bot

thank you friend
mail@pastecode.io avatar
unknown
python
7 months ago
28 kB
4
Indexable
Never
import asyncio
import disnake
from datetime import datetime
from disnake.ext import commands
import aiomysql
from dotenv import load_dotenv
import logging
import os

# Load environment variables
load_dotenv()

# Access environment variables
DB_HOST = os.getenv('DB_HOST')
DB_NAME = os.getenv('DB_NAME')
DB_USER = os.getenv('DB_USER')
DB_PASS = os.getenv('DB_PASS')
DB_PORT = os.getenv('DB_PORT')
DISCORD_TOKEN = os.getenv('DISCORD_TOKEN')

aiomysql_db_params = {
    'host': DB_HOST,
    'port': int(DB_PORT),
    'user': DB_USER,
    'password': DB_PASS,
    'db': DB_NAME,
    'autocommit': True
}

# Initialize bot and database
intents = disnake.Intents.all()
bot = commands.Bot(command_prefix="!", intents=intents)

# Logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s]: %(message)s",
    handlers=[
        logging.StreamHandler()  # Log to console
    ]
)

# Constants
QUIZ_SPEED_DEFAULT = 60
DEV_GUILD_ID = 753384735236948018  # Replace with your development guild's ID

# Global variables for game state
quiz_speed = QUIZ_SPEED_DEFAULT
trivia_channel_id = None
game_channel_id = None
# This variable should be defined in a scope accessible by both trivia_loop and on_message
# Initialize it with a structure that can hold the current trivia question and its state

bot.current_trivia_state = {
    'game_is_on': False,
    'active': False,
    'event': asyncio.Event(),
    'question_id': None,  # ID of the question from the database
    'message_id': None,  # Discord message ID of the trivia question message
    'answers': {
        'animal': None,  # Answer for the "Animal" part of the question
        'specific_animal': None,  # Answer for the "Specific Animal" part of the question
        'scientific_name': None  # Answer for the "Scientific Name" part of the question
    },
    'answered_parts': set(),  # Initialize as an empty set
    'answered_users': set(),  # Initialize as an empty set
    'start_time': None,
    'game_channel_id': None
}


async def initialize_tables():
    try:
        async with aiomysql.connect(**aiomysql_db_params) as conn:
            async with conn.cursor() as cursor:
                await cursor.execute("""
                CREATE TABLE IF NOT EXISTS trivia (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    common_name VARCHAR(255) NOT NULL,
                    specific_name VARCHAR(255) NOT NULL,
                    scientific_name VARCHAR(255) NOT NULL,
                    image_url VARCHAR(255) NOT NULL
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
                """)

                await cursor.execute("""
                CREATE TABLE IF NOT EXISTS bot_settings (
                    game_channel_id BIGINT UNSIGNED,
                    quiz_speed INT DEFAULT 60,
                    quiz_answer_duration INT DEFAULT 30
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
                """)

                await cursor.execute("""
                CREATE TABLE IF NOT EXISTS scores (
                    user_id BIGINT UNSIGNED NOT NULL,
                    score INT DEFAULT 0,
                    PRIMARY KEY (user_id)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
                """)

                await conn.commit()
                print("Tables initialized successfully.")
    except aiomysql.Error as err:
        # Log the error message to the console
        logging.error(f"An error occurred during table initialization: {err}")


async def main():
    # Call the initialize_tables function right after defining it and the DB connection is made
    await initialize_tables()

if __name__ == "__main__":
    asyncio.run(main())


async def get_setting_from_db(setting_name, caller=None):
    try:
        async with aiomysql.connect(**aiomysql_db_params) as conn:
            async with conn.cursor(aiomysql.DictCursor) as cur:
                # Formulate the SQL query string based on the setting_name passed
                sql_query = f"SELECT {setting_name} FROM bot_settings LIMIT 1"
                await cur.execute(sql_query)
                result = await cur.fetchone()
                # If the result is found, return the setting value
                return result[setting_name] if result else None
    except Exception as e:
        # Log the error message with context to the caller function
        error_message = f"An error occurred in get_setting_from_db (Caller: {caller}): {e}"
        logging.error(error_message)
        # It is a good practice to also log the query that caused the error for easier debugging
        logging.error(f"Failed SQL Query: {sql_query}")
        return None  # Return None or handle the error as needed


@bot.event
async def on_message(message):
    logging.info(f"Message from {message.author}: {message.content}")

    # Skip messages sent by the bot itself
    if message.author == bot.user:
        return

    # Check if the message is in the correct channel and if the trivia game is active
    if (message.channel.id == bot.current_trivia_state.get('game_channel_id') and
            bot.current_trivia_state['active']):

        # Normalize input to lowercase and strip leading/trailing whitespace
        message_content = message.content.strip().lower()

        # Get the correct answers from the current trivia state
        correct_answers = bot.current_trivia_state['answers']

        # Check if the message content matches one of the correct answers
        for answer_part, correct_answer in correct_answers.items():
            if (message_content == correct_answer.lower() and
                    answer_part not in bot.current_trivia_state['answered_parts']):

                # Calculate the time taken to answer
                answer_time = (datetime.utcnow() - bot.current_trivia_state['start_time']).total_seconds()

                # Call handle_answer with the appropriate parameters
                await handle_answer(
                    message,
                    answer=message_content,  # Provide the answer as 'answer'
                    answer_part=answer_part,
                    time_taken=answer_time  # Provide the time_taken as 'time_taken'
                )

                # Add the answer part and user to the respective sets
                bot.current_trivia_state['answered_parts'].add(answer_part)
                bot.current_trivia_state['answered_users'].add(message.author.id)

                # Check if all parts have been answered
                if bot.current_trivia_state['answered_parts'] == set(correct_answers.keys()):
                    # Optionally, send a message indicating all parts have been answered
                    # and the next question will be coming up

                    # Reset the game state for the next question
                    bot.current_trivia_state['active'] = False
                    bot.current_trivia_state['answered_parts'].clear()
                    bot.current_trivia_state['start_time'] = None

                break  # No need to check other parts if the correct answer is found

    # Process other commands if any
    await bot.process_commands(message)


async def handle_answer(message, answer, answer_part, time_taken):
    logging.info(f"Handling correct answer: {answer} for question part: {answer_part}")

    # Retrieve the original message ID for the trivia question
    original_message_id = bot.current_trivia_state.get('message_id')

    if original_message_id:
        try:
            original_message = await message.channel.fetch_message(original_message_id)
            embed = original_message.embeds[0]

            # Prepare the new answer text with user's name and time taken
            author_name = message.author.display_name
            answer_text = f"{answer}, answered by {author_name} in {time_taken:.2f} seconds!"

            # Update the embed's description with the correct answer based on answer_part
            if answer_part == 'animal':
                embed.description = embed.description.replace("[Your Guess for Common Name]", answer_text)
            elif answer_part == 'specific_animal':
                embed.description = embed.description.replace("[Your Guess for Specific Name]", answer_text)
            elif answer_part == 'scientific_name':
                embed.description = embed.description.replace("[Your Guess for Scientific Name]", answer_text)

            # Edit the original message with the updated embed
            await original_message.edit(embed=embed)

            # Update the leaderboard
            await update_leaderboard(message.author.id)

            # React to the message to show the answer has been accepted
            await message.add_reaction("🎉")
        except Exception as e:
            logging.error(f"Failed to fetch or edit the original trivia message: {e}")
    else:
        logging.error("Original message ID not found. Cannot update the trivia message.")


async def update_leaderboard(user_id):
    try:
        async with aiomysql.connect(**aiomysql_db_params) as conn:
            async with conn.cursor() as cur:
                # Check if the user has an existing score
                await cur.execute("SELECT score FROM scores WHERE user_id = %s", (user_id,))
                row = await cur.fetchone()

                # Update the score if it exists, otherwise insert a new score
                if row:
                    new_score = row[0] + 1  # Increment the existing score
                    await cur.execute("UPDATE scores SET score = %s WHERE user_id = %s", (new_score, user_id))
                else:
                    await cur.execute("INSERT INTO scores (user_id, score) VALUES (%s, 1)", (user_id,))

                # Commit the transaction
                await conn.commit()
    except Exception as e:
        logging.error(f"Failed to update leaderboard: {e}")


# Slash Commands
@bot.slash_command(name="gameon", description="Turns the trivia game on")
async def gameon(inter: disnake.ApplicationCommandInteraction):
    bot.current_trivia_state['game_is_on'] = True
    bot.loop.create_task(trivia_loop())  # Start the trivia_loop coroutine
    await inter.response.send_message("Trivia game is now ON.", ephemeral=True)


@bot.slash_command(name="gameoff", description="Turns the trivia game off")
async def gameoff(inter: disnake.ApplicationCommandInteraction):
    # No need to use 'global' as we are now using an attribute of 'bot'.
    bot.current_trivia_state['game_is_on'] = False
    await inter.response.send_message("Trivia game is now OFF.", ephemeral=True)


@bot.slash_command(description="Assigns a command to a permission tier.")
async def assigncommand(inter: disnake.ApplicationCommandInteraction, command: str, tier: int):
    cursor.execute("REPLACE INTO command_tiers (command_name, tier_number) VALUES (%s, %s)", (command, tier))
    db.commit()
    await inter.send(f'Assigned command {command} to tier {tier}')


@bot.slash_command(description="Associates a role with a permission tier.")
async def addrole(inter: disnake.ApplicationCommandInteraction, role: disnake.Role, tier: int):
    cursor.execute("UPDATE permission_tiers SET role_id = %s WHERE tier_number = %s", (role.id, tier))
    db.commit()
    await inter.send(f'Associated role {role.name} with tier {tier}')


@bot.slash_command(description="Displays the bot's current settings.")
async def status(inter: disnake.ApplicationCommandInteraction):
    cursor.execute("SELECT * FROM settings")
    rows = cursor.fetchall()
    settings_text = "\n".join([f"{row[0]}: {row[1]}" for row in rows])
    await inter.send(f"**Current Settings**\n{settings_text}")


@bot.slash_command(description="Set the game channel for trivia")
async def gamechannel(inter, channel: disnake.TextChannel):
    # Connect to the database
    async with aiomysql.connect(**aiomysql_db_params) as conn:
        async with conn.cursor() as cur:
            # Update the existing row with the new game channel ID
            await cur.execute("""
                UPDATE bot_settings SET game_channel_id = %s
            """, (channel.id,))

            # Commit changes
            await conn.commit()

    # Send a confirmation message to the channel
    await inter.response.send_message(f"The game channel has been set to {channel.mention}")


@bot.slash_command(description="Set the speed of the quiz in seconds")
async def quizspeed(inter, seconds: int):
    # Connect to the database
    async with aiomysql.connect(**aiomysql_db_params) as conn:
        async with conn.cursor() as cur:
            # Update the existing row with the new quiz speed
            await cur.execute("""
                UPDATE bot_settings SET quiz_speed = %s
            """, (seconds,))
            affected_rows = cur.rowcount

            # If no row was updated, it means the settings row doesn't exist, so insert it
            if affected_rows == 0:
                await cur.execute("""
                    INSERT INTO bot_settings (quiz_speed) VALUES (%s)
                """, (seconds,))

            # Commit changes
            await conn.commit()

    # Send a confirmation message to the channel
    await inter.response.send_message(f"The quiz speed has been set to {seconds} seconds.")


@bot.slash_command(description="Trigger the next trivia question immediately.")
async def quizme(inter):
    # Check if the game is on
    if not bot.current_trivia_state['game_is_on']:
        await inter.response.send_message(
            "The trivia game is not currently active. Please start the game first with /gameon.",
            ephemeral=True)
    elif bot.current_trivia_state['active']:
        # If a game is already active, inform the user and do not proceed
        await inter.response.send_message(
            f"{inter.author.mention} a trivia game is already active, please wait until it ends to start another.",
            ephemeral=True)
    else:
        # Set the event to skip the wait and indicate that a new question should be fetched
        bot.current_trivia_state['event'].set()

        # Inform the user that the next question is coming up
        await inter.response.send_message("Next trivia question coming up!")


@bot.slash_command(description="Displays a list of commands accessible by the user's permission tier.")
async def commands(inter: disnake.ApplicationCommandInteraction):
    # Use the get_user_tier function to fetch the user's permission tier
    user_tier = get_user_tier(inter.author)
    if user_tier is not None:
        buffered_cursor = db.cursor(buffered=True)
        buffered_cursor.execute("SELECT command_name FROM command_tiers WHERE tier_number <= %s", (user_tier,))
        rows = buffered_cursor.fetchall()
        buffered_cursor.close()  # Close the cursor
        commands_text = "\n".join([row[0] for row in rows])
        await inter.send(f"**Available Commands**\n{commands_text}")
    else:
        await inter.send("You do not have a permission tier assigned.")


@bot.slash_command(description="Changes the role associated with a permission tier.")
async def changetier(inter: disnake.ApplicationCommandInteraction, tier_number: int, role: disnake.Role):
    cursor.execute("UPDATE permission_tiers SET role_id = %s WHERE tier_number = %s", (role.id, tier_number))
    db.commit()
    await inter.send(f'Changed role for tier {tier_number} to {role.name}')


# Trivia Management Commands
@bot.slash_command(description="Adds a new trivia question.")
async def addtrivia(inter: disnake.ApplicationCommandInteraction, common_name: str, specific_name: str,
                    scientific_name: str, image_url: str):
    try:
        # Connect to the database
        async with aiomysql.connect(**aiomysql_db_params) as conn:
            # Create a cursor to execute the SQL statement
            async with conn.cursor() as cur:
                # Insert the new trivia question into the database
                await cur.execute(
                    "INSERT INTO trivia (common_name, specific_name, scientific_name, image_url) VALUES (%s, %s, %s, %s)",
                    (common_name, specific_name, scientific_name, image_url))
                # Commit the transaction
                await conn.commit()

        # Send a confirmation message back to the user
        await inter.send(f'Trivia question added: {common_name}, {specific_name}, {scientific_name}')
    except aiomysql.Error as e:
        # Log the error and send a message back to the user if there's a problem
        logging.error(f"An error occurred while adding a trivia question: {e}")
        await inter.send(f"An error occurred while adding the trivia question: {e}")


@bot.slash_command(description="Edits an existing trivia question.")
async def edittrivia(inter: disnake.ApplicationCommandInteraction, ref_number: int, common_name: str = '',
                     specific_name: str = '', scientific_name: str = '', image_url: str = ''):
    update_fields = []
    update_values = []
    if common_name:
        update_fields.append("common_name=%s")
        update_values.append(common_name)
    if specific_name:
        update_fields.append("specific_name=%s")
        update_values.append(specific_name)
    if scientific_name:
        update_fields.append("scientific_name=%s")
        update_values.append(scientific_name)
    if image_url:
        update_fields.append("image_url=%s")
        update_values.append(image_url)

    update_values.append(ref_number)
    cursor.execute(f"UPDATE trivia SET {', '.join(update_fields)} WHERE id=%s", update_values)
    db.commit()
    await inter.send(f'Trivia question {ref_number} updated.')


@bot.slash_command(description="Removes a trivia question.")
async def removetrivia(inter: disnake.ApplicationCommandInteraction, ref_number: int):
    cursor.execute("DELETE FROM trivia WHERE id=%s", (ref_number,))
    db.commit()
    await inter.send(f'Trivia question {ref_number} removed.')


@bot.slash_command(description="Sets the time available for answering trivia questions.")
async def quizanswer(inter: disnake.ApplicationCommandInteraction, seconds: int):
    # Connect to the database
    async with aiomysql.connect(**aiomysql_db_params) as conn:
        async with conn.cursor() as cur:
            # Update the quiz_answer_duration in the bot_settings table
            await cur.execute("""
                UPDATE bot_settings SET quiz_answer_duration = %s
            """, (seconds,))

            # Commit changes
            await conn.commit()

    # Send a confirmation message to the channel
    await inter.response.send_message(f"Time available for answering set to {seconds} seconds.")


# Define the leaderboard command
@bot.slash_command(description="Displays the trivia leaderboard.")
async def leaderboard(interaction: disnake.ApplicationCommandInteraction):
    # Connect to the MySQL database
    async with aiomysql.connect(**aiomysql_db_params) as conn:
        async with conn.cursor() as cursor:
            # Retrieve the top 10 scores from the database
            await cursor.execute("SELECT user_id, score FROM scores ORDER BY score DESC LIMIT 10")
            rows = await cursor.fetchall()

    # Create an embed for the leaderboard
    leaderboard_embed = disnake.Embed(
        title="Trivia Leaderboard",
        color=disnake.Color.gold()
    )

    # Initialize an empty string to construct the leaderboard text
    leaderboard_text = ""

    # Iterate through the rows and construct the leaderboard text
    for i, (user_id, score) in enumerate(rows, start=1):
        # Use bot.get_user to get the user's name based on their ID
        user = bot.get_user(user_id)
        username = user.name if user else 'Unknown User'
        leaderboard_text += f"**{i}.** {username} - {score} points\n"

    # Add the leaderboard text to the embed
    leaderboard_embed.description = leaderboard_text

    # Send the leaderboard embed as a response
    await interaction.response.send_message(embed=leaderboard_embed)


@bot.slash_command(description="Displays a help message describing each command.")
async def helpme(interaction: disnake.ApplicationCommandInteraction):  # Renamed 'inter' to 'interaction' here too
    await interaction.send("""
**Commands** 
- `/dumpchannel [channel_name]`: Sets the channel for trivia questions. 
- `/addtier [tier_number] [role]`: Adds a permission tier. 
- `/addtrivia [common_name] [specific_name] [scientific_name] [image_url]`: Adds a trivia question. 
- `/edittrivia [ref_number] [common_name] [specific_name] [scientific_name] [image_url]`: Edits a trivia question. 
- `/removetrivia [ref_number]`: Removes a trivia question. 
- `/leaderboard`: Shows the top 10 users by score.
""")


@bot.slash_command(description="Prints the current state of the trivia game")
async def print_game_state(inter: disnake.ApplicationCommandInteraction):
    # Construct a string representation of the current_trivia_state
    state_info = "```\n"  # Use triple backticks for code block formatting in Discord
    for key, value in bot.current_trivia_state.items():
        if isinstance(value, dict):
            state_info += f"{key}:\n"
            for sub_key, sub_value in value.items():
                state_info += f"  {sub_key}: {sub_value}\n"
        else:
            state_info += f"{key}: {value}\n"
    state_info += "```"

    # Send the state information as a message in the chat
    await inter.response.send_message(state_info,
                                      ephemeral=True)  # ephemeral=True makes it only visible to the user who invoked the command


# Trivia Game Logic
async def trivia_loop():
    await bot.wait_until_ready()

    while not bot.is_closed():
        if not bot.current_trivia_state['game_is_on']:
            logging.info("Trivia loop: Game is not on. Waiting...")
            await asyncio.sleep(10)
            continue

        if bot.current_trivia_state['active']:
            logging.info("Trivia loop: Trivia is already active. Waiting...")
            await asyncio.sleep(1)
            continue

        try:
            # Logging to track the flow of execution
            logging.info("Trivia loop: Checking game state...")

            game_channel_id = bot.current_trivia_state.get('game_channel_id')
            if not game_channel_id:
                game_channel_id = await get_setting_from_db('game_channel_id', caller='trivia_loop')
                if game_channel_id:
                    bot.current_trivia_state['game_channel_id'] = game_channel_id
                else:
                    logging.error("Trivia loop: No game channel ID found.")
                    await asyncio.sleep(10)
                    continue

            quiz_speed = int(await get_setting_from_db('quiz_speed', caller='trivia_loop') or 60)
            quiz_answer_duration = int(await get_setting_from_db('quiz_answer_duration', caller='trivia_loop') or 30)

            local_game_channel = bot.get_channel(int(game_channel_id))
            if local_game_channel is None:
                logging.warning(f"Trivia loop: Could not find a channel with ID: {game_channel_id}")
                await asyncio.sleep(10)
                continue

            async with aiomysql.connect(**aiomysql_db_params) as conn:
                async with conn.cursor(aiomysql.DictCursor) as cur:
                    await cur.execute("SELECT * FROM trivia ORDER BY RAND() LIMIT 1")
                    trivia_question = await cur.fetchone()

                    if trivia_question is None:
                        logging.warning("Trivia loop: No trivia questions found in the database.")
                        await asyncio.sleep(quiz_speed)
                        continue

                    bot.current_trivia_state['active'] = True
                    bot.current_trivia_state['question_id'] = trivia_question['id']
                    bot.current_trivia_state['answers']['animal'] = trivia_question['common_name']
                    bot.current_trivia_state['answers']['specific_animal'] = trivia_question['specific_name']
                    bot.current_trivia_state['answers']['scientific_name'] = trivia_question['scientific_name']
                    bot.current_trivia_state['start_time'] = datetime.utcnow()

                    # Logging to track when the trivia question is sent
                    logging.info("Trivia loop: Sending trivia question...")

                    question_embed = disnake.Embed(
                        title="🌟 Trivia Time! 🌟",
                        description=(
                            "Can you identify the animal below?\n\n"
                            "🐾 **Animal:** [Your Guess for Common Name]\n\n"
                            "🦁 **Specific Animal:** [Your Guess for Specific Name]\n\n"
                            "🔬 **Scientific Name:** [Your Guess for Scientific Name]\n\n"
                            f"You have {quiz_answer_duration} seconds to answer all three parts!"
                        ),
                        color=disnake.Color.blue()
                    )
                    question_embed.set_image(url=trivia_question['image_url'])
                    question_embed.set_footer(text=f"Trivia ID: {trivia_question['id']}")

                    trivia_message = await local_game_channel.send(embed=question_embed)
                    bot.current_trivia_state['message_id'] = trivia_message.id

                    await asyncio.sleep(quiz_answer_duration)

                    # After waiting for answers, update specific variables to reset between rounds
                    bot.current_trivia_state['active'] = False
                    bot.current_trivia_state['answered_users'].clear()
                    bot.current_trivia_state['start_time'] = None

                    # Reset specific variables between rounds
                    bot.current_trivia_state['question_id'] = None
                    bot.current_trivia_state['answers']['animal'] = None
                    bot.current_trivia_state['answers']['specific_animal'] = None
                    bot.current_trivia_state['answers']['scientific_name'] = None
                    bot.current_trivia_state['answered_parts'].clear()

                    # Logging to track when the answer message is sent
                    logging.info("Trivia loop: Sending answer message...")

                    answer_embed = disnake.Embed(
                        title="Time's Up!",
                        description="Stay tuned for the correct answers and get ready for the next question!",
                        color=disnake.Color.green()
                    )
                    await local_game_channel.send(embed=answer_embed)

                    bot.current_trivia_state['event'].clear()

                    tasks = [
                        bot.loop.create_task(bot.current_trivia_state['event'].wait()),
                        bot.loop.create_task(asyncio.sleep(quiz_speed))
                    ]
                    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

                    for task in pending:
                        task.cancel()

        except Exception as e:
            # Logging to track errors
            logging.error(f"Trivia loop: An error occurred: {e}")
            await asyncio.sleep(10)


bot.run(DISCORD_TOKEN)