Trivia Bot (Not Working)

pasted here for a discord forum for advice on coding.
 avatar
unknown
python
2 years ago
27 kB
8
Indexable
import asyncio
import aiomysql
import mysql.connector
import os
import disnake
from disnake.ext import commands
from typing import Optional
from disnake import Message
from datetime import datetime, timedelta
from datetime import datetime
from collections import defaultdict
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Access environment variables
DB_HOST = os.getenv('DB_HOST')
DB_NAME = os.getenv('DB_NAME')
DB_USER = os.getenv('DB_USER')
DB_PASS = os.getenv('DB_PASS')
DB_PORT = os.getenv('DB_PORT')
DISCORD_TOKEN = os.getenv('DISCORD_TOKEN')

aiomysql_db_params = {
    'host': DB_HOST,
    'port': int(DB_PORT),
    'user': DB_USER,
    'password': DB_PASS,
    'db': DB_NAME,
    'autocommit': True
}


# Initialize bot and database
intents = disnake.Intents.all()
bot = commands.InteractionBot(intents=intents)
db = mysql.connector.connect(
    host=DB_HOST,
    database=DB_NAME,
    user=DB_USER,
    password=DB_PASS,
    port=int(DB_PORT)
)
cursor = db.cursor()

# Constants
QUIZ_SPEED_DEFAULT = 60
DEV_GUILD_ID = 753384735236948018  # Replace with your development guild's ID

# Global variables for game state
current_question = {}
answered_users = defaultdict(set)
permission_tiers = {}
quiz_speed = QUIZ_SPEED_DEFAULT
trivia_channel_id = None
current_trivia_message: Optional[Message] = None
answered_questions = set()
game_channel_id = None
game_channel = None
trivia_embed = None
game_enabled = True
trivia_start_time = None
# This variable should be defined in a scope accessible by both trivia_loop and on_message
# Initialize it with a structure that can hold the current trivia question and its state

current_trivia_state = {
    'active': False,
    'event': asyncio.Event(),
    'question_id': id,  # Placeholder or an actual value if available
    'answers': {
        'common_name': '',  # Empty string as placeholder for string values
        'specific_name': '',
        'scientific_name': ''
    },
    'answered_users': set(),
    'start_time': datetime.now()  # Current time as placeholder
}


def initialize_tables():
    try:
        # Define the character set and collation for the trivia table
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS trivia (
            id INT AUTO_INCREMENT PRIMARY KEY,
            common_name VARCHAR(255) NOT NULL,
            specific_name VARCHAR(255) NOT NULL,
            scientific_name VARCHAR(255) NOT NULL,
            image_url VARCHAR(255) NOT NULL
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
        """)

        # Create the bot_settings table with specific columns and default values
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS bot_settings (
                game_enabled BOOLEAN DEFAULT FALSE,
                game_channel_id BIGINT UNSIGNED,
                quiz_speed INT DEFAULT 60,
                quiz_answer_duration INT DEFAULT 30
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
        """)

        # Create the scores table with PRIMARY KEY constraint
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS scores (
                user_id BIGINT UNSIGNED NOT NULL,
                guild_id BIGINT UNSIGNED NOT NULL,
                score INT DEFAULT 0,
                PRIMARY KEY (user_id, guild_id)
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
        """)

        db.commit()
        print("Tables initialized successfully.")
    except mysql.connector.Error as err:
        print(f"An error occurred during table initialization: {err}")


# Call the initialize_tables function right after defining it and the DB connection is made
initialize_tables()


async def get_setting_from_db(setting_name):
    async with aiomysql.connect(**aiomysql_db_params) as conn:
        async with conn.cursor(aiomysql.DictCursor) as cur:
            await cur.execute("SELECT value FROM bot_settings WHERE setting = %s", (setting_name,))
            result = await cur.fetchone()
            return result['value'] if result else None


async def on_message(message):
    # Skip messages sent by the bot itself
    if message.author == bot.user:
        return

    # Process commands first
    await bot.process_commands(message)

    # Check if the trivia game is currently active and the message is in the correct channel
    is_game_active = current_trivia_state['active']
    current_game_channel_id = current_trivia_state['game_channel_id']  # Assuming this is stored in the state

    if not is_game_active or message.channel.id != current_game_channel_id:
        return

    # Normalize input to lowercase and strip leading/trailing whitespace
    message_content = message.content.strip().lower()

    # Get the correct answers from the current trivia state
    correct_answers = current_trivia_state['answers']

    # Check if the message content is one of the correct answers
    if message_content in correct_answers.values():
        # Retrieve which part of the answer was correct
        answer_part = [key for key, value in correct_answers.items() if value == message_content][0]

        # Calculate the time taken to answer, if a start time is available
        answer_time = None
        if 'start_time' in current_trivia_state and current_trivia_state['start_time']:
            answer_time = (datetime.utcnow() - current_trivia_state['start_time']).total_seconds()

        # Check if the user has already answered
        if message.author.id in current_trivia_state.get('answered_users', set()):
            return  # Skip if the user has already answered

        # Call handle_answer with the appropriate parameters
        await handle_answer(message, message_content, answer_part, answer_time)

        # Add the user to the list of users who have answered
        current_trivia_state.setdefault('answered_users', set()).add(message.author.id)
    else:
        # If the answer is incorrect, you can choose to notify the user or take no action
        pass  # This can be implemented as needed based on game design


def update_leaderboard(user_id):
    cursor.execute("SELECT score FROM scores WHERE user_id = %s", (user_id,))
    row = cursor.fetchone()
    if row:
        new_score = row[0] + 1
        cursor.execute("UPDATE scores SET score = %s WHERE user_id = %s", (new_score, user_id))
    else:
        cursor.execute("INSERT INTO scores (user_id, score) VALUES (%s, %s)", (user_id, 1))
    db.commit()


async def handle_answer(message, answer, question_part, time_taken):
    # readable_time should already be in seconds as per your `on_message` function
    readable_time = time_taken  # time_taken is already in seconds, no need to call total_seconds()

    # Determine which field in the original message to update
    field_name = question_part.capitalize()  # This could be 'Common', 'Specific', or 'Scientific'

    # Retrieve the original message ID for the trivia question, if it's stored globally or passed along
    original_message_id = current_trivia_state.get('message_id')

    # Find the original trivia message if necessary
    if original_message_id:
        try:
            original_message = await message.channel.fetch_message(original_message_id)
        except Exception as e:
            print(f"Failed to fetch the original trivia message: {e}")
            return

        # Assume there is at least one embed already, and we will update it with new information
        embed = original_message.embeds[0] if original_message.embeds else disnake.Embed()
        updated = False

        for index, field in enumerate(embed.fields):
            if field.name.startswith(field_name):  # Check if this is the correct field to update
                # Update the field with the new information
                new_value = f"{answer} by {message.author.display_name} in {readable_time:.2f} seconds"
                embed.set_field_at(index, name=field.name, value=new_value, inline=False)
                updated = True
                break

        if not updated:
            # If the field wasn't found, it means this is the first correct answer for this part
            new_field_name = f"{field_name} Answer"
            new_value = f"{answer} by {message.author.display_name} in {readable_time:.2f} seconds"
            embed.add_field(name=new_field_name, value=new_value, inline=False)

        # Edit the original message with the updated embed
        await original_message.edit(embed=embed)

        # Handle updating the score of the user in your scores database
        # Placeholder for database update logic, you need to implement it based on your application

        # React to the message to show the answer has been accepted
        await message.add_reaction("🎉")
    else:
        # If you don't have the original message id, you can't update the message.
        # You need to decide how to handle this case.
        print("Original message ID not found. Cannot update the trivia message.")


# Slash Commands
@bot.slash_command(name="gameon", description="Turns the trivia game on")
async def gameon(inter: disnake.ApplicationCommandInteraction):
    global current_trivia_state
    current_trivia_state['active'] = True
    current_trivia_state['start_time'] = datetime.now()  # Optionally reset the start time
    await inter.response.send_message("Trivia game is now ON.", ephemeral=True)


@bot.slash_command(name="gameoff", description="Turns the trivia game off")
async def gameoff(inter: disnake.ApplicationCommandInteraction):
    global current_trivia_state
    current_trivia_state['active'] = False
    await inter.response.send_message("Trivia game is now OFF.", ephemeral=True)


@bot.slash_command(description="Sets the channel where sensitive information will be dumped.")
async def dumpchannel(inter: disnake.ApplicationCommandInteraction, channel_name: str):
    # Check if the user has the required permission (assuming tier 4 is for admins)
    if not await has_permission(inter, 4):
        await inter.send("You do not have permission to use this command.")
        return

    channel = disnake.utils.get(inter.guild.text_channels, name=channel_name)
    if not channel:  # Check if the channel was found
        await inter.send(f"Channel with name {channel_name} not found.")
        return

    try:
        cursor.execute(
            "REPLACE INTO settings (setting_name, setting_value) VALUES (%s, %s)",
            ('dump_channel', str(channel.id))
        )
        db.commit()
        await inter.send(f'Dump channel set to {channel.mention}')
    except mysql.connector.Error as err:
        print(f"Error: {err}")
        await inter.send(f"Database error occurred: {err}")
    except disnake.DiscordException as e:
        print(f"Discord error: {e}")


@bot.slash_command(description="Adds a new permission tier and optionally associates it with a Discord role.")
async def addtier(inter: disnake.ApplicationCommandInteraction, tier_number: int, role: disnake.Role = None):
    try:
        cursor.execute(
            "INSERT INTO permission_tiers (tier_number, role_id) VALUES (%s, %s)",
            (tier_number, role.id if role else None)
        )

        db.commit()
    except mysql.connector.Error as err:
        print(f"Error: {err}")
        await inter.send(f"Database error occurred: {err}")
        return  # Exit the function if a database error occurs

    try:
        await inter.send(f'Added tier {tier_number} for role {role.name if role else "None"}')
    except disnake.DiscordException as e:
        print(f"Discord error: {e}")


@bot.slash_command(description="Removes a permission tier.")
async def removetier(inter: disnake.ApplicationCommandInteraction, tier_number: int):
    cursor.execute("DELETE FROM permission_tiers WHERE tier_number = %s", (tier_number,))
    db.commit()
    await inter.send(f'Removed tier {tier_number}')


@bot.slash_command(description="Assigns a command to a permission tier.")
async def assigncommand(inter: disnake.ApplicationCommandInteraction, command: str, tier: int):
    cursor.execute("REPLACE INTO command_tiers (command_name, tier_number) VALUES (%s, %s)", (command, tier))
    db.commit()
    await inter.send(f'Assigned command {command} to tier {tier}')


@bot.slash_command(description="Associates a role with a permission tier.")
async def addrole(inter: disnake.ApplicationCommandInteraction, role: disnake.Role, tier: int):
    cursor.execute("UPDATE permission_tiers SET role_id = %s WHERE tier_number = %s", (role.id, tier))
    db.commit()
    await inter.send(f'Associated role {role.name} with tier {tier}')


@bot.slash_command(description="Displays the bot's current settings.")
async def status(inter: disnake.ApplicationCommandInteraction):
    cursor.execute("SELECT * FROM settings")
    rows = cursor.fetchall()
    settings_text = "\n".join([f"{row[0]}: {row[1]}" for row in rows])
    await inter.send(f"**Current Settings**\n{settings_text}")


@bot.slash_command(description="Set the game channel for trivia")
async def gamechannel(inter, channel: disnake.TextChannel):
    # Connect to the database
    async with aiomysql.connect(**aiomysql_db_params) as conn:
        async with conn.cursor() as cur:
            # Check if the settings row exists
            await cur.execute("SELECT COUNT(*) FROM bot_settings")
            (number_of_rows,) = await cur.fetchone()

            # If no row exists, insert a default row before updating
            if number_of_rows == 0:
                await cur.execute("""
                    INSERT INTO bot_settings (game_enabled, game_channel_id, quiz_speed, quiz_answer_duration)
                    VALUES (%s, %s, %s, %s)
                """, (False, channel.id, 60, 30))
            else:
                # Update the existing row with the new game channel ID
                await cur.execute("""
                    UPDATE bot_settings SET game_channel_id = %s
                """, (channel.id,))

            # Commit changes
            await conn.commit()

    # Send a confirmation message to the channel
    await inter.send(f"The game channel has been set to {channel.mention}")


@bot.slash_command(description="Set the speed of the quiz in seconds")
async def quizspeed(inter, seconds: int):
    # Connect to the database
    async with aiomysql.connect(**aiomysql_db_params) as conn:
        async with conn.cursor() as cur:
            # Check if the settings row exists
            await cur.execute("SELECT COUNT(*) FROM bot_settings")
            (number_of_rows,) = await cur.fetchone()

            # If no row exists, insert a default row with the new quiz speed
            if number_of_rows == 0:
                await cur.execute("""
                    INSERT INTO bot_settings (game_enabled, game_channel_id, quiz_speed, quiz_answer_duration)
                    VALUES (%s, %s, %s, %s)
                """, (False, None, seconds, 30))
            else:
                # Update the existing row with the new quiz speed
                await cur.execute("""
                    UPDATE bot_settings SET quiz_speed = %s
                """, (seconds,))

            # Commit changes
            await conn.commit()

    # Send a confirmation message to the channel
    await inter.send(f"The quiz speed has been set to {seconds} seconds.")


@bot.slash_command(description="Trigger the next trivia question immediately.")
async def quizme(inter):
    # Set the event to skip the wait and reset the state to immediately pick up a new question
    current_trivia_state['event'].set()

    # Ensure the trivia loop knows a new question should be fetched
    current_trivia_state['active'] = True

    # Inform the user that the next question is coming up
    await inter.send("Next trivia question coming up!")


@bot.slash_command(description="Displays a list of commands accessible by the user's permission tier.")
async def commands(inter: disnake.ApplicationCommandInteraction):
    # Use the get_user_tier function to fetch the user's permission tier
    user_tier = get_user_tier(inter.author)
    if user_tier is not None:
        buffered_cursor = db.cursor(buffered=True)
        buffered_cursor.execute("SELECT command_name FROM command_tiers WHERE tier_number <= %s", (user_tier,))
        rows = buffered_cursor.fetchall()
        buffered_cursor.close()  # Close the cursor
        commands_text = "\n".join([row[0] for row in rows])
        await inter.send(f"**Available Commands**\n{commands_text}")
    else:
        await inter.send("You do not have a permission tier assigned.")


@bot.slash_command(description="Changes the role associated with a permission tier.")
async def changetier(inter: disnake.ApplicationCommandInteraction, tier_number: int, role: disnake.Role):
    cursor.execute("UPDATE permission_tiers SET role_id = %s WHERE tier_number = %s", (role.id, tier_number))
    db.commit()
    await inter.send(f'Changed role for tier {tier_number} to {role.name}')


# Trivia Management Commands
@bot.slash_command(description="Adds a new trivia question.")
async def addtrivia(inter: disnake.ApplicationCommandInteraction, common_name: str, specific_name: str,
                    scientific_name: str, image_url: str):
    cursor.execute(
        "INSERT INTO trivia (common_name, specific_name, scientific_name, image_url) VALUES (%s, %s, %s, %s)",
        (common_name, specific_name, scientific_name, image_url))
    db.commit()
    await inter.send(f'Trivia question added: {common_name}, {specific_name}, {scientific_name}')


@bot.slash_command(description="Edits an existing trivia question.")
async def edittrivia(inter: disnake.ApplicationCommandInteraction, ref_number: int, common_name: str = '',
                     specific_name: str = '', scientific_name: str = '', image_url: str = ''):
    update_fields = []
    update_values = []
    if common_name:
        update_fields.append("common_name=%s")
        update_values.append(common_name)
    if specific_name:
        update_fields.append("specific_name=%s")
        update_values.append(specific_name)
    if scientific_name:
        update_fields.append("scientific_name=%s")
        update_values.append(scientific_name)
    if image_url:
        update_fields.append("image_url=%s")
        update_values.append(image_url)

    update_values.append(ref_number)
    cursor.execute(f"UPDATE trivia SET {', '.join(update_fields)} WHERE id=%s", update_values)
    db.commit()
    await inter.send(f'Trivia question {ref_number} updated.')


@bot.slash_command(description="Removes a trivia question.")
async def removetrivia(inter: disnake.ApplicationCommandInteraction, ref_number: int):
    cursor.execute("DELETE FROM trivia WHERE id=%s", (ref_number,))
    db.commit()
    await inter.send(f'Trivia question {ref_number} removed.')


@bot.slash_command(description="Sets the time available for answering trivia questions.")
async def quizanswer(inter: disnake.ApplicationCommandInteraction, seconds: int):
    global quiz_speed
    quiz_speed = seconds
    await inter.send(f'Time available for answering set to {seconds} seconds.')

    # Leaderboard Functionality
    @bot.slash_command(description="Displays the trivia leaderboard.")
    async def leaderboard(interaction: disnake.ApplicationCommandInteraction):  # Renamed 'inter' to 'interaction'
        cursor.execute("SELECT user_id, score FROM scores ORDER BY score DESC LIMIT 10")
        rows = cursor.fetchall()
        leaderboard_text = ""
        for i, (user_id, score) in enumerate(rows, start=1):
            user = bot.get_user(user_id)
            leaderboard_text += f"{i}. {user.name if user else 'Unknown User'} - {score} points\n"
        await interaction.send(f"**Leaderboard**\n{leaderboard_text}")  # Use 'interaction' here

    @bot.slash_command(description="Displays a help message describing each command.")
    async def helpme(interaction: disnake.ApplicationCommandInteraction):  # Renamed 'inter' to 'interaction' here too
        await interaction.send("""
    **Commands** 
    - `/dumpchannel [channel_name]`: Sets the channel for trivia questions. 
    - `/addtier [tier_number] [role]`: Adds a permission tier. 
    - `/addtrivia [common_name] [specific_name] [scientific_name] [image_url]`: Adds a trivia question. 
    - `/edittrivia [ref_number] [common_name] [specific_name] [scientific_name] [image_url]`: Edits a trivia question. 
    - `/removetrivia [ref_number]`: Removes a trivia question. 
    - `/leaderboard`: Shows the top 10 users by score.
    """)


# Trivia Game Logic
async def update_trivia_message(answer_type, user):
    global current_trivia_message
    if not current_trivia_message or not current_trivia_message.embeds:
        print("current_trivia_message is None or has no embeds.")
        return
    assert isinstance(current_trivia_message, Message)  # Assure the type checker

    # Renamed the variable to avoid shadowing
    current_embed = current_trivia_message.embeds[0]

    # Use current_embed instead of trivia_embed
    if current_embed is not None:  # Check for None before using
        description = current_embed.description
        new_description = description.replace(f"Option {answer_type}:", f"Option {answer_type} (@{user.name}):")
        current_embed.description = new_description
        if current_trivia_message:
            await current_trivia_message.edit(embed=current_embed)
        else:
            print("current_trivia_message is None.")
            return
    else:
        print("current_embed is None")
        return


async def trivia_loop():
    await bot.wait_until_ready()

    while not bot.is_closed():
        if not current_trivia_state['active']:
            await asyncio.sleep(1)
            continue

        try:
            async with aiomysql.connect(**aiomysql_db_params) as conn:
                async with conn.cursor(aiomysql.DictCursor) as cur:
                    await cur.execute("SELECT game_channel_id, quiz_speed, quiz_answer_duration FROM bot_settings LIMIT 1")
                    settings = await cur.fetchone()

                    if not settings or not settings['game_channel_id']:
                        print("Game channel ID is not set in the database. Please use the /gamechannel command to set it.")
                        await asyncio.sleep(10)
                        continue

                    local_game_channel_id = settings['game_channel_id']
                    local_quiz_speed = settings.get('quiz_speed', 60)
                    local_quiz_answer_duration = settings.get('quiz_answer_duration', 30)

                    local_game_channel = bot.get_channel(int(local_game_channel_id))
                    if local_game_channel is None:
                        print(f"Could not find a channel with ID: {local_game_channel_id}")
                        await asyncio.sleep(10)
                        continue

                    await cur.execute("SELECT * FROM trivia ORDER BY RAND() LIMIT 1")
                    trivia_question = await cur.fetchone()

                    if trivia_question is None:
                        print("No trivia questions found in the database.")
                        await asyncio.sleep(local_quiz_speed)
                        continue

                    current_trivia_state.update({
                        'active': True,
                        'question_id': trivia_question['id'],
                        'answers': {
                            'common_name': trivia_question['common_name'],
                            'specific_name': trivia_question['specific_name'],
                            'scientific_name': trivia_question['scientific_name']
                        },
                        'start_time': datetime.utcnow(),
                        'answered_users': set()  # Use set, not list, for answered_users
                    })

                    embed = disnake.Embed(
                        title="Trivia Time!",
                        description="Identify the animal below! Each correct answer grants a point!\n"
                                    "Please respond with the option followed by your answer.",
                        color=disnake.Color.blue()
                    )
                    embed.set_image(url=trivia_question['image_url'])
                    embed.set_footer(text=f"Trivia ID: {trivia_question['id']}")
                    await local_game_channel.send(embed=embed)

                    await asyncio.sleep(local_quiz_answer_duration)

                    current_trivia_state['active'] = False
                    correct_answers = "\n".join(f"{key.replace('_', ' ').title()}: {value}"
                                                for key, value in current_trivia_state['answers'].items())
                    answer_embed = disnake.Embed(
                        title="Time's Up!",
                        description=f"The correct answers were:\n{correct_answers}",
                        color=disnake.Color.green()
                    )
                    await local_game_channel.send(embed=answer_embed)

                    # Reset the event before starting a new task
                    current_trivia_state['event'].clear()
                    tasks = [
                        bot.loop.create_task(current_trivia_state['event'].wait()),
                        bot.loop.create_task(asyncio.sleep(max(0, local_quiz_speed - local_quiz_answer_duration)))
                    ]
                    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

                    for task in pending:
                        task.cancel()

        except Exception as e:
            print(f"An error occurred in the trivia loop: {e}")
            await asyncio.sleep(10)

# Start the trivia loop as a background task
bot.loop.create_task(trivia_loop())


# Start the Bot
bot.run(DISCORD_TOKEN)
Editor is loading...