animal trivia bot
thank you friendunknown
python
2 years ago
28 kB
6
Indexable
import asyncio import disnake from datetime import datetime from disnake.ext import commands import aiomysql from dotenv import load_dotenv import logging import os # Load environment variables load_dotenv() # Access environment variables DB_HOST = os.getenv('DB_HOST') DB_NAME = os.getenv('DB_NAME') DB_USER = os.getenv('DB_USER') DB_PASS = os.getenv('DB_PASS') DB_PORT = os.getenv('DB_PORT') DISCORD_TOKEN = os.getenv('DISCORD_TOKEN') aiomysql_db_params = { 'host': DB_HOST, 'port': int(DB_PORT), 'user': DB_USER, 'password': DB_PASS, 'db': DB_NAME, 'autocommit': True } # Initialize bot and database intents = disnake.Intents.all() bot = commands.Bot(command_prefix="!", intents=intents) # Logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s]: %(message)s", handlers=[ logging.StreamHandler() # Log to console ] ) # Constants QUIZ_SPEED_DEFAULT = 60 DEV_GUILD_ID = 753384735236948018 # Replace with your development guild's ID # Global variables for game state quiz_speed = QUIZ_SPEED_DEFAULT trivia_channel_id = None game_channel_id = None # This variable should be defined in a scope accessible by both trivia_loop and on_message # Initialize it with a structure that can hold the current trivia question and its state bot.current_trivia_state = { 'game_is_on': False, 'active': False, 'event': asyncio.Event(), 'question_id': None, # ID of the question from the database 'message_id': None, # Discord message ID of the trivia question message 'answers': { 'animal': None, # Answer for the "Animal" part of the question 'specific_animal': None, # Answer for the "Specific Animal" part of the question 'scientific_name': None # Answer for the "Scientific Name" part of the question }, 'answered_parts': set(), # Initialize as an empty set 'answered_users': set(), # Initialize as an empty set 'start_time': None, 'game_channel_id': None } async def initialize_tables(): try: async with aiomysql.connect(**aiomysql_db_params) as conn: async with conn.cursor() as cursor: await cursor.execute(""" CREATE TABLE IF NOT EXISTS trivia ( id INT AUTO_INCREMENT PRIMARY KEY, common_name VARCHAR(255) NOT NULL, specific_name VARCHAR(255) NOT NULL, scientific_name VARCHAR(255) NOT NULL, image_url VARCHAR(255) NOT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci """) await cursor.execute(""" CREATE TABLE IF NOT EXISTS bot_settings ( game_channel_id BIGINT UNSIGNED, quiz_speed INT DEFAULT 60, quiz_answer_duration INT DEFAULT 30 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci """) await cursor.execute(""" CREATE TABLE IF NOT EXISTS scores ( user_id BIGINT UNSIGNED NOT NULL, score INT DEFAULT 0, PRIMARY KEY (user_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci """) await conn.commit() print("Tables initialized successfully.") except aiomysql.Error as err: # Log the error message to the console logging.error(f"An error occurred during table initialization: {err}") async def main(): # Call the initialize_tables function right after defining it and the DB connection is made await initialize_tables() if __name__ == "__main__": asyncio.run(main()) async def get_setting_from_db(setting_name, caller=None): try: async with aiomysql.connect(**aiomysql_db_params) as conn: async with conn.cursor(aiomysql.DictCursor) as cur: # Formulate the SQL query string based on the setting_name passed sql_query = f"SELECT {setting_name} FROM bot_settings LIMIT 1" await cur.execute(sql_query) result = await cur.fetchone() # If the result is found, return the setting value return result[setting_name] if result else None except Exception as e: # Log the error message with context to the caller function error_message = f"An error occurred in get_setting_from_db (Caller: {caller}): {e}" logging.error(error_message) # It is a good practice to also log the query that caused the error for easier debugging logging.error(f"Failed SQL Query: {sql_query}") return None # Return None or handle the error as needed @bot.event async def on_message(message): logging.info(f"Message from {message.author}: {message.content}") # Skip messages sent by the bot itself if message.author == bot.user: return # Check if the message is in the correct channel and if the trivia game is active if (message.channel.id == bot.current_trivia_state.get('game_channel_id') and bot.current_trivia_state['active']): # Normalize input to lowercase and strip leading/trailing whitespace message_content = message.content.strip().lower() # Get the correct answers from the current trivia state correct_answers = bot.current_trivia_state['answers'] # Check if the message content matches one of the correct answers for answer_part, correct_answer in correct_answers.items(): if (message_content == correct_answer.lower() and answer_part not in bot.current_trivia_state['answered_parts']): # Calculate the time taken to answer answer_time = (datetime.utcnow() - bot.current_trivia_state['start_time']).total_seconds() # Call handle_answer with the appropriate parameters await handle_answer( message, answer=message_content, # Provide the answer as 'answer' answer_part=answer_part, time_taken=answer_time # Provide the time_taken as 'time_taken' ) # Add the answer part and user to the respective sets bot.current_trivia_state['answered_parts'].add(answer_part) bot.current_trivia_state['answered_users'].add(message.author.id) # Check if all parts have been answered if bot.current_trivia_state['answered_parts'] == set(correct_answers.keys()): # Optionally, send a message indicating all parts have been answered # and the next question will be coming up # Reset the game state for the next question bot.current_trivia_state['active'] = False bot.current_trivia_state['answered_parts'].clear() bot.current_trivia_state['start_time'] = None break # No need to check other parts if the correct answer is found # Process other commands if any await bot.process_commands(message) async def handle_answer(message, answer, answer_part, time_taken): logging.info(f"Handling correct answer: {answer} for question part: {answer_part}") # Retrieve the original message ID for the trivia question original_message_id = bot.current_trivia_state.get('message_id') if original_message_id: try: original_message = await message.channel.fetch_message(original_message_id) embed = original_message.embeds[0] # Prepare the new answer text with user's name and time taken author_name = message.author.display_name answer_text = f"{answer}, answered by {author_name} in {time_taken:.2f} seconds!" # Update the embed's description with the correct answer based on answer_part if answer_part == 'animal': embed.description = embed.description.replace("[Your Guess for Common Name]", answer_text) elif answer_part == 'specific_animal': embed.description = embed.description.replace("[Your Guess for Specific Name]", answer_text) elif answer_part == 'scientific_name': embed.description = embed.description.replace("[Your Guess for Scientific Name]", answer_text) # Edit the original message with the updated embed await original_message.edit(embed=embed) # Update the leaderboard await update_leaderboard(message.author.id) # React to the message to show the answer has been accepted await message.add_reaction("🎉") except Exception as e: logging.error(f"Failed to fetch or edit the original trivia message: {e}") else: logging.error("Original message ID not found. Cannot update the trivia message.") async def update_leaderboard(user_id): try: async with aiomysql.connect(**aiomysql_db_params) as conn: async with conn.cursor() as cur: # Check if the user has an existing score await cur.execute("SELECT score FROM scores WHERE user_id = %s", (user_id,)) row = await cur.fetchone() # Update the score if it exists, otherwise insert a new score if row: new_score = row[0] + 1 # Increment the existing score await cur.execute("UPDATE scores SET score = %s WHERE user_id = %s", (new_score, user_id)) else: await cur.execute("INSERT INTO scores (user_id, score) VALUES (%s, 1)", (user_id,)) # Commit the transaction await conn.commit() except Exception as e: logging.error(f"Failed to update leaderboard: {e}") # Slash Commands @bot.slash_command(name="gameon", description="Turns the trivia game on") async def gameon(inter: disnake.ApplicationCommandInteraction): bot.current_trivia_state['game_is_on'] = True bot.loop.create_task(trivia_loop()) # Start the trivia_loop coroutine await inter.response.send_message("Trivia game is now ON.", ephemeral=True) @bot.slash_command(name="gameoff", description="Turns the trivia game off") async def gameoff(inter: disnake.ApplicationCommandInteraction): # No need to use 'global' as we are now using an attribute of 'bot'. bot.current_trivia_state['game_is_on'] = False await inter.response.send_message("Trivia game is now OFF.", ephemeral=True) @bot.slash_command(description="Assigns a command to a permission tier.") async def assigncommand(inter: disnake.ApplicationCommandInteraction, command: str, tier: int): cursor.execute("REPLACE INTO command_tiers (command_name, tier_number) VALUES (%s, %s)", (command, tier)) db.commit() await inter.send(f'Assigned command {command} to tier {tier}') @bot.slash_command(description="Associates a role with a permission tier.") async def addrole(inter: disnake.ApplicationCommandInteraction, role: disnake.Role, tier: int): cursor.execute("UPDATE permission_tiers SET role_id = %s WHERE tier_number = %s", (role.id, tier)) db.commit() await inter.send(f'Associated role {role.name} with tier {tier}') @bot.slash_command(description="Displays the bot's current settings.") async def status(inter: disnake.ApplicationCommandInteraction): cursor.execute("SELECT * FROM settings") rows = cursor.fetchall() settings_text = "\n".join([f"{row[0]}: {row[1]}" for row in rows]) await inter.send(f"**Current Settings**\n{settings_text}") @bot.slash_command(description="Set the game channel for trivia") async def gamechannel(inter, channel: disnake.TextChannel): # Connect to the database async with aiomysql.connect(**aiomysql_db_params) as conn: async with conn.cursor() as cur: # Update the existing row with the new game channel ID await cur.execute(""" UPDATE bot_settings SET game_channel_id = %s """, (channel.id,)) # Commit changes await conn.commit() # Send a confirmation message to the channel await inter.response.send_message(f"The game channel has been set to {channel.mention}") @bot.slash_command(description="Set the speed of the quiz in seconds") async def quizspeed(inter, seconds: int): # Connect to the database async with aiomysql.connect(**aiomysql_db_params) as conn: async with conn.cursor() as cur: # Update the existing row with the new quiz speed await cur.execute(""" UPDATE bot_settings SET quiz_speed = %s """, (seconds,)) affected_rows = cur.rowcount # If no row was updated, it means the settings row doesn't exist, so insert it if affected_rows == 0: await cur.execute(""" INSERT INTO bot_settings (quiz_speed) VALUES (%s) """, (seconds,)) # Commit changes await conn.commit() # Send a confirmation message to the channel await inter.response.send_message(f"The quiz speed has been set to {seconds} seconds.") @bot.slash_command(description="Trigger the next trivia question immediately.") async def quizme(inter): # Check if the game is on if not bot.current_trivia_state['game_is_on']: await inter.response.send_message( "The trivia game is not currently active. Please start the game first with /gameon.", ephemeral=True) elif bot.current_trivia_state['active']: # If a game is already active, inform the user and do not proceed await inter.response.send_message( f"{inter.author.mention} a trivia game is already active, please wait until it ends to start another.", ephemeral=True) else: # Set the event to skip the wait and indicate that a new question should be fetched bot.current_trivia_state['event'].set() # Inform the user that the next question is coming up await inter.response.send_message("Next trivia question coming up!") @bot.slash_command(description="Displays a list of commands accessible by the user's permission tier.") async def commands(inter: disnake.ApplicationCommandInteraction): # Use the get_user_tier function to fetch the user's permission tier user_tier = get_user_tier(inter.author) if user_tier is not None: buffered_cursor = db.cursor(buffered=True) buffered_cursor.execute("SELECT command_name FROM command_tiers WHERE tier_number <= %s", (user_tier,)) rows = buffered_cursor.fetchall() buffered_cursor.close() # Close the cursor commands_text = "\n".join([row[0] for row in rows]) await inter.send(f"**Available Commands**\n{commands_text}") else: await inter.send("You do not have a permission tier assigned.") @bot.slash_command(description="Changes the role associated with a permission tier.") async def changetier(inter: disnake.ApplicationCommandInteraction, tier_number: int, role: disnake.Role): cursor.execute("UPDATE permission_tiers SET role_id = %s WHERE tier_number = %s", (role.id, tier_number)) db.commit() await inter.send(f'Changed role for tier {tier_number} to {role.name}') # Trivia Management Commands @bot.slash_command(description="Adds a new trivia question.") async def addtrivia(inter: disnake.ApplicationCommandInteraction, common_name: str, specific_name: str, scientific_name: str, image_url: str): try: # Connect to the database async with aiomysql.connect(**aiomysql_db_params) as conn: # Create a cursor to execute the SQL statement async with conn.cursor() as cur: # Insert the new trivia question into the database await cur.execute( "INSERT INTO trivia (common_name, specific_name, scientific_name, image_url) VALUES (%s, %s, %s, %s)", (common_name, specific_name, scientific_name, image_url)) # Commit the transaction await conn.commit() # Send a confirmation message back to the user await inter.send(f'Trivia question added: {common_name}, {specific_name}, {scientific_name}') except aiomysql.Error as e: # Log the error and send a message back to the user if there's a problem logging.error(f"An error occurred while adding a trivia question: {e}") await inter.send(f"An error occurred while adding the trivia question: {e}") @bot.slash_command(description="Edits an existing trivia question.") async def edittrivia(inter: disnake.ApplicationCommandInteraction, ref_number: int, common_name: str = '', specific_name: str = '', scientific_name: str = '', image_url: str = ''): update_fields = [] update_values = [] if common_name: update_fields.append("common_name=%s") update_values.append(common_name) if specific_name: update_fields.append("specific_name=%s") update_values.append(specific_name) if scientific_name: update_fields.append("scientific_name=%s") update_values.append(scientific_name) if image_url: update_fields.append("image_url=%s") update_values.append(image_url) update_values.append(ref_number) cursor.execute(f"UPDATE trivia SET {', '.join(update_fields)} WHERE id=%s", update_values) db.commit() await inter.send(f'Trivia question {ref_number} updated.') @bot.slash_command(description="Removes a trivia question.") async def removetrivia(inter: disnake.ApplicationCommandInteraction, ref_number: int): cursor.execute("DELETE FROM trivia WHERE id=%s", (ref_number,)) db.commit() await inter.send(f'Trivia question {ref_number} removed.') @bot.slash_command(description="Sets the time available for answering trivia questions.") async def quizanswer(inter: disnake.ApplicationCommandInteraction, seconds: int): # Connect to the database async with aiomysql.connect(**aiomysql_db_params) as conn: async with conn.cursor() as cur: # Update the quiz_answer_duration in the bot_settings table await cur.execute(""" UPDATE bot_settings SET quiz_answer_duration = %s """, (seconds,)) # Commit changes await conn.commit() # Send a confirmation message to the channel await inter.response.send_message(f"Time available for answering set to {seconds} seconds.") # Define the leaderboard command @bot.slash_command(description="Displays the trivia leaderboard.") async def leaderboard(interaction: disnake.ApplicationCommandInteraction): # Connect to the MySQL database async with aiomysql.connect(**aiomysql_db_params) as conn: async with conn.cursor() as cursor: # Retrieve the top 10 scores from the database await cursor.execute("SELECT user_id, score FROM scores ORDER BY score DESC LIMIT 10") rows = await cursor.fetchall() # Create an embed for the leaderboard leaderboard_embed = disnake.Embed( title="Trivia Leaderboard", color=disnake.Color.gold() ) # Initialize an empty string to construct the leaderboard text leaderboard_text = "" # Iterate through the rows and construct the leaderboard text for i, (user_id, score) in enumerate(rows, start=1): # Use bot.get_user to get the user's name based on their ID user = bot.get_user(user_id) username = user.name if user else 'Unknown User' leaderboard_text += f"**{i}.** {username} - {score} points\n" # Add the leaderboard text to the embed leaderboard_embed.description = leaderboard_text # Send the leaderboard embed as a response await interaction.response.send_message(embed=leaderboard_embed) @bot.slash_command(description="Displays a help message describing each command.") async def helpme(interaction: disnake.ApplicationCommandInteraction): # Renamed 'inter' to 'interaction' here too await interaction.send(""" **Commands** - `/dumpchannel [channel_name]`: Sets the channel for trivia questions. - `/addtier [tier_number] [role]`: Adds a permission tier. - `/addtrivia [common_name] [specific_name] [scientific_name] [image_url]`: Adds a trivia question. - `/edittrivia [ref_number] [common_name] [specific_name] [scientific_name] [image_url]`: Edits a trivia question. - `/removetrivia [ref_number]`: Removes a trivia question. - `/leaderboard`: Shows the top 10 users by score. """) @bot.slash_command(description="Prints the current state of the trivia game") async def print_game_state(inter: disnake.ApplicationCommandInteraction): # Construct a string representation of the current_trivia_state state_info = "```\n" # Use triple backticks for code block formatting in Discord for key, value in bot.current_trivia_state.items(): if isinstance(value, dict): state_info += f"{key}:\n" for sub_key, sub_value in value.items(): state_info += f" {sub_key}: {sub_value}\n" else: state_info += f"{key}: {value}\n" state_info += "```" # Send the state information as a message in the chat await inter.response.send_message(state_info, ephemeral=True) # ephemeral=True makes it only visible to the user who invoked the command # Trivia Game Logic async def trivia_loop(): await bot.wait_until_ready() while not bot.is_closed(): if not bot.current_trivia_state['game_is_on']: logging.info("Trivia loop: Game is not on. Waiting...") await asyncio.sleep(10) continue if bot.current_trivia_state['active']: logging.info("Trivia loop: Trivia is already active. Waiting...") await asyncio.sleep(1) continue try: # Logging to track the flow of execution logging.info("Trivia loop: Checking game state...") game_channel_id = bot.current_trivia_state.get('game_channel_id') if not game_channel_id: game_channel_id = await get_setting_from_db('game_channel_id', caller='trivia_loop') if game_channel_id: bot.current_trivia_state['game_channel_id'] = game_channel_id else: logging.error("Trivia loop: No game channel ID found.") await asyncio.sleep(10) continue quiz_speed = int(await get_setting_from_db('quiz_speed', caller='trivia_loop') or 60) quiz_answer_duration = int(await get_setting_from_db('quiz_answer_duration', caller='trivia_loop') or 30) local_game_channel = bot.get_channel(int(game_channel_id)) if local_game_channel is None: logging.warning(f"Trivia loop: Could not find a channel with ID: {game_channel_id}") await asyncio.sleep(10) continue async with aiomysql.connect(**aiomysql_db_params) as conn: async with conn.cursor(aiomysql.DictCursor) as cur: await cur.execute("SELECT * FROM trivia ORDER BY RAND() LIMIT 1") trivia_question = await cur.fetchone() if trivia_question is None: logging.warning("Trivia loop: No trivia questions found in the database.") await asyncio.sleep(quiz_speed) continue bot.current_trivia_state['active'] = True bot.current_trivia_state['question_id'] = trivia_question['id'] bot.current_trivia_state['answers']['animal'] = trivia_question['common_name'] bot.current_trivia_state['answers']['specific_animal'] = trivia_question['specific_name'] bot.current_trivia_state['answers']['scientific_name'] = trivia_question['scientific_name'] bot.current_trivia_state['start_time'] = datetime.utcnow() # Logging to track when the trivia question is sent logging.info("Trivia loop: Sending trivia question...") question_embed = disnake.Embed( title="🌟 Trivia Time! 🌟", description=( "Can you identify the animal below?\n\n" "🐾 **Animal:** [Your Guess for Common Name]\n\n" "🦁 **Specific Animal:** [Your Guess for Specific Name]\n\n" "🔬 **Scientific Name:** [Your Guess for Scientific Name]\n\n" f"You have {quiz_answer_duration} seconds to answer all three parts!" ), color=disnake.Color.blue() ) question_embed.set_image(url=trivia_question['image_url']) question_embed.set_footer(text=f"Trivia ID: {trivia_question['id']}") trivia_message = await local_game_channel.send(embed=question_embed) bot.current_trivia_state['message_id'] = trivia_message.id await asyncio.sleep(quiz_answer_duration) # After waiting for answers, update specific variables to reset between rounds bot.current_trivia_state['active'] = False bot.current_trivia_state['answered_users'].clear() bot.current_trivia_state['start_time'] = None # Reset specific variables between rounds bot.current_trivia_state['question_id'] = None bot.current_trivia_state['answers']['animal'] = None bot.current_trivia_state['answers']['specific_animal'] = None bot.current_trivia_state['answers']['scientific_name'] = None bot.current_trivia_state['answered_parts'].clear() # Logging to track when the answer message is sent logging.info("Trivia loop: Sending answer message...") answer_embed = disnake.Embed( title="Time's Up!", description="Stay tuned for the correct answers and get ready for the next question!", color=disnake.Color.green() ) await local_game_channel.send(embed=answer_embed) bot.current_trivia_state['event'].clear() tasks = [ bot.loop.create_task(bot.current_trivia_state['event'].wait()), bot.loop.create_task(asyncio.sleep(quiz_speed)) ] done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) for task in pending: task.cancel() except Exception as e: # Logging to track errors logging.error(f"Trivia loop: An error occurred: {e}") await asyncio.sleep(10) bot.run(DISCORD_TOKEN)
Editor is loading...