Trivia Bot (Not Working)
pasted here for a discord forum for advice on coding.unknown
python
2 years ago
27 kB
8
Indexable
import asyncio import aiomysql import mysql.connector import os import disnake from disnake.ext import commands from typing import Optional from disnake import Message from datetime import datetime, timedelta from datetime import datetime from collections import defaultdict from dotenv import load_dotenv # Load environment variables load_dotenv() # Access environment variables DB_HOST = os.getenv('DB_HOST') DB_NAME = os.getenv('DB_NAME') DB_USER = os.getenv('DB_USER') DB_PASS = os.getenv('DB_PASS') DB_PORT = os.getenv('DB_PORT') DISCORD_TOKEN = os.getenv('DISCORD_TOKEN') aiomysql_db_params = { 'host': DB_HOST, 'port': int(DB_PORT), 'user': DB_USER, 'password': DB_PASS, 'db': DB_NAME, 'autocommit': True } # Initialize bot and database intents = disnake.Intents.all() bot = commands.InteractionBot(intents=intents) db = mysql.connector.connect( host=DB_HOST, database=DB_NAME, user=DB_USER, password=DB_PASS, port=int(DB_PORT) ) cursor = db.cursor() # Constants QUIZ_SPEED_DEFAULT = 60 DEV_GUILD_ID = 753384735236948018 # Replace with your development guild's ID # Global variables for game state current_question = {} answered_users = defaultdict(set) permission_tiers = {} quiz_speed = QUIZ_SPEED_DEFAULT trivia_channel_id = None current_trivia_message: Optional[Message] = None answered_questions = set() game_channel_id = None game_channel = None trivia_embed = None game_enabled = True trivia_start_time = None # This variable should be defined in a scope accessible by both trivia_loop and on_message # Initialize it with a structure that can hold the current trivia question and its state current_trivia_state = { 'active': False, 'event': asyncio.Event(), 'question_id': id, # Placeholder or an actual value if available 'answers': { 'common_name': '', # Empty string as placeholder for string values 'specific_name': '', 'scientific_name': '' }, 'answered_users': set(), 'start_time': datetime.now() # Current time as placeholder } def initialize_tables(): try: # Define the character set and collation for the trivia table cursor.execute(""" CREATE TABLE IF NOT EXISTS trivia ( id INT AUTO_INCREMENT PRIMARY KEY, common_name VARCHAR(255) NOT NULL, specific_name VARCHAR(255) NOT NULL, scientific_name VARCHAR(255) NOT NULL, image_url VARCHAR(255) NOT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci """) # Create the bot_settings table with specific columns and default values cursor.execute(""" CREATE TABLE IF NOT EXISTS bot_settings ( game_enabled BOOLEAN DEFAULT FALSE, game_channel_id BIGINT UNSIGNED, quiz_speed INT DEFAULT 60, quiz_answer_duration INT DEFAULT 30 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci """) # Create the scores table with PRIMARY KEY constraint cursor.execute(""" CREATE TABLE IF NOT EXISTS scores ( user_id BIGINT UNSIGNED NOT NULL, guild_id BIGINT UNSIGNED NOT NULL, score INT DEFAULT 0, PRIMARY KEY (user_id, guild_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci """) db.commit() print("Tables initialized successfully.") except mysql.connector.Error as err: print(f"An error occurred during table initialization: {err}") # Call the initialize_tables function right after defining it and the DB connection is made initialize_tables() async def get_setting_from_db(setting_name): async with aiomysql.connect(**aiomysql_db_params) as conn: async with conn.cursor(aiomysql.DictCursor) as cur: await cur.execute("SELECT value FROM bot_settings WHERE setting = %s", (setting_name,)) result = await cur.fetchone() return result['value'] if result else None async def on_message(message): # Skip messages sent by the bot itself if message.author == bot.user: return # Process commands first await bot.process_commands(message) # Check if the trivia game is currently active and the message is in the correct channel is_game_active = current_trivia_state['active'] current_game_channel_id = current_trivia_state['game_channel_id'] # Assuming this is stored in the state if not is_game_active or message.channel.id != current_game_channel_id: return # Normalize input to lowercase and strip leading/trailing whitespace message_content = message.content.strip().lower() # Get the correct answers from the current trivia state correct_answers = current_trivia_state['answers'] # Check if the message content is one of the correct answers if message_content in correct_answers.values(): # Retrieve which part of the answer was correct answer_part = [key for key, value in correct_answers.items() if value == message_content][0] # Calculate the time taken to answer, if a start time is available answer_time = None if 'start_time' in current_trivia_state and current_trivia_state['start_time']: answer_time = (datetime.utcnow() - current_trivia_state['start_time']).total_seconds() # Check if the user has already answered if message.author.id in current_trivia_state.get('answered_users', set()): return # Skip if the user has already answered # Call handle_answer with the appropriate parameters await handle_answer(message, message_content, answer_part, answer_time) # Add the user to the list of users who have answered current_trivia_state.setdefault('answered_users', set()).add(message.author.id) else: # If the answer is incorrect, you can choose to notify the user or take no action pass # This can be implemented as needed based on game design def update_leaderboard(user_id): cursor.execute("SELECT score FROM scores WHERE user_id = %s", (user_id,)) row = cursor.fetchone() if row: new_score = row[0] + 1 cursor.execute("UPDATE scores SET score = %s WHERE user_id = %s", (new_score, user_id)) else: cursor.execute("INSERT INTO scores (user_id, score) VALUES (%s, %s)", (user_id, 1)) db.commit() async def handle_answer(message, answer, question_part, time_taken): # readable_time should already be in seconds as per your `on_message` function readable_time = time_taken # time_taken is already in seconds, no need to call total_seconds() # Determine which field in the original message to update field_name = question_part.capitalize() # This could be 'Common', 'Specific', or 'Scientific' # Retrieve the original message ID for the trivia question, if it's stored globally or passed along original_message_id = current_trivia_state.get('message_id') # Find the original trivia message if necessary if original_message_id: try: original_message = await message.channel.fetch_message(original_message_id) except Exception as e: print(f"Failed to fetch the original trivia message: {e}") return # Assume there is at least one embed already, and we will update it with new information embed = original_message.embeds[0] if original_message.embeds else disnake.Embed() updated = False for index, field in enumerate(embed.fields): if field.name.startswith(field_name): # Check if this is the correct field to update # Update the field with the new information new_value = f"{answer} by {message.author.display_name} in {readable_time:.2f} seconds" embed.set_field_at(index, name=field.name, value=new_value, inline=False) updated = True break if not updated: # If the field wasn't found, it means this is the first correct answer for this part new_field_name = f"{field_name} Answer" new_value = f"{answer} by {message.author.display_name} in {readable_time:.2f} seconds" embed.add_field(name=new_field_name, value=new_value, inline=False) # Edit the original message with the updated embed await original_message.edit(embed=embed) # Handle updating the score of the user in your scores database # Placeholder for database update logic, you need to implement it based on your application # React to the message to show the answer has been accepted await message.add_reaction("🎉") else: # If you don't have the original message id, you can't update the message. # You need to decide how to handle this case. print("Original message ID not found. Cannot update the trivia message.") # Slash Commands @bot.slash_command(name="gameon", description="Turns the trivia game on") async def gameon(inter: disnake.ApplicationCommandInteraction): global current_trivia_state current_trivia_state['active'] = True current_trivia_state['start_time'] = datetime.now() # Optionally reset the start time await inter.response.send_message("Trivia game is now ON.", ephemeral=True) @bot.slash_command(name="gameoff", description="Turns the trivia game off") async def gameoff(inter: disnake.ApplicationCommandInteraction): global current_trivia_state current_trivia_state['active'] = False await inter.response.send_message("Trivia game is now OFF.", ephemeral=True) @bot.slash_command(description="Sets the channel where sensitive information will be dumped.") async def dumpchannel(inter: disnake.ApplicationCommandInteraction, channel_name: str): # Check if the user has the required permission (assuming tier 4 is for admins) if not await has_permission(inter, 4): await inter.send("You do not have permission to use this command.") return channel = disnake.utils.get(inter.guild.text_channels, name=channel_name) if not channel: # Check if the channel was found await inter.send(f"Channel with name {channel_name} not found.") return try: cursor.execute( "REPLACE INTO settings (setting_name, setting_value) VALUES (%s, %s)", ('dump_channel', str(channel.id)) ) db.commit() await inter.send(f'Dump channel set to {channel.mention}') except mysql.connector.Error as err: print(f"Error: {err}") await inter.send(f"Database error occurred: {err}") except disnake.DiscordException as e: print(f"Discord error: {e}") @bot.slash_command(description="Adds a new permission tier and optionally associates it with a Discord role.") async def addtier(inter: disnake.ApplicationCommandInteraction, tier_number: int, role: disnake.Role = None): try: cursor.execute( "INSERT INTO permission_tiers (tier_number, role_id) VALUES (%s, %s)", (tier_number, role.id if role else None) ) db.commit() except mysql.connector.Error as err: print(f"Error: {err}") await inter.send(f"Database error occurred: {err}") return # Exit the function if a database error occurs try: await inter.send(f'Added tier {tier_number} for role {role.name if role else "None"}') except disnake.DiscordException as e: print(f"Discord error: {e}") @bot.slash_command(description="Removes a permission tier.") async def removetier(inter: disnake.ApplicationCommandInteraction, tier_number: int): cursor.execute("DELETE FROM permission_tiers WHERE tier_number = %s", (tier_number,)) db.commit() await inter.send(f'Removed tier {tier_number}') @bot.slash_command(description="Assigns a command to a permission tier.") async def assigncommand(inter: disnake.ApplicationCommandInteraction, command: str, tier: int): cursor.execute("REPLACE INTO command_tiers (command_name, tier_number) VALUES (%s, %s)", (command, tier)) db.commit() await inter.send(f'Assigned command {command} to tier {tier}') @bot.slash_command(description="Associates a role with a permission tier.") async def addrole(inter: disnake.ApplicationCommandInteraction, role: disnake.Role, tier: int): cursor.execute("UPDATE permission_tiers SET role_id = %s WHERE tier_number = %s", (role.id, tier)) db.commit() await inter.send(f'Associated role {role.name} with tier {tier}') @bot.slash_command(description="Displays the bot's current settings.") async def status(inter: disnake.ApplicationCommandInteraction): cursor.execute("SELECT * FROM settings") rows = cursor.fetchall() settings_text = "\n".join([f"{row[0]}: {row[1]}" for row in rows]) await inter.send(f"**Current Settings**\n{settings_text}") @bot.slash_command(description="Set the game channel for trivia") async def gamechannel(inter, channel: disnake.TextChannel): # Connect to the database async with aiomysql.connect(**aiomysql_db_params) as conn: async with conn.cursor() as cur: # Check if the settings row exists await cur.execute("SELECT COUNT(*) FROM bot_settings") (number_of_rows,) = await cur.fetchone() # If no row exists, insert a default row before updating if number_of_rows == 0: await cur.execute(""" INSERT INTO bot_settings (game_enabled, game_channel_id, quiz_speed, quiz_answer_duration) VALUES (%s, %s, %s, %s) """, (False, channel.id, 60, 30)) else: # Update the existing row with the new game channel ID await cur.execute(""" UPDATE bot_settings SET game_channel_id = %s """, (channel.id,)) # Commit changes await conn.commit() # Send a confirmation message to the channel await inter.send(f"The game channel has been set to {channel.mention}") @bot.slash_command(description="Set the speed of the quiz in seconds") async def quizspeed(inter, seconds: int): # Connect to the database async with aiomysql.connect(**aiomysql_db_params) as conn: async with conn.cursor() as cur: # Check if the settings row exists await cur.execute("SELECT COUNT(*) FROM bot_settings") (number_of_rows,) = await cur.fetchone() # If no row exists, insert a default row with the new quiz speed if number_of_rows == 0: await cur.execute(""" INSERT INTO bot_settings (game_enabled, game_channel_id, quiz_speed, quiz_answer_duration) VALUES (%s, %s, %s, %s) """, (False, None, seconds, 30)) else: # Update the existing row with the new quiz speed await cur.execute(""" UPDATE bot_settings SET quiz_speed = %s """, (seconds,)) # Commit changes await conn.commit() # Send a confirmation message to the channel await inter.send(f"The quiz speed has been set to {seconds} seconds.") @bot.slash_command(description="Trigger the next trivia question immediately.") async def quizme(inter): # Set the event to skip the wait and reset the state to immediately pick up a new question current_trivia_state['event'].set() # Ensure the trivia loop knows a new question should be fetched current_trivia_state['active'] = True # Inform the user that the next question is coming up await inter.send("Next trivia question coming up!") @bot.slash_command(description="Displays a list of commands accessible by the user's permission tier.") async def commands(inter: disnake.ApplicationCommandInteraction): # Use the get_user_tier function to fetch the user's permission tier user_tier = get_user_tier(inter.author) if user_tier is not None: buffered_cursor = db.cursor(buffered=True) buffered_cursor.execute("SELECT command_name FROM command_tiers WHERE tier_number <= %s", (user_tier,)) rows = buffered_cursor.fetchall() buffered_cursor.close() # Close the cursor commands_text = "\n".join([row[0] for row in rows]) await inter.send(f"**Available Commands**\n{commands_text}") else: await inter.send("You do not have a permission tier assigned.") @bot.slash_command(description="Changes the role associated with a permission tier.") async def changetier(inter: disnake.ApplicationCommandInteraction, tier_number: int, role: disnake.Role): cursor.execute("UPDATE permission_tiers SET role_id = %s WHERE tier_number = %s", (role.id, tier_number)) db.commit() await inter.send(f'Changed role for tier {tier_number} to {role.name}') # Trivia Management Commands @bot.slash_command(description="Adds a new trivia question.") async def addtrivia(inter: disnake.ApplicationCommandInteraction, common_name: str, specific_name: str, scientific_name: str, image_url: str): cursor.execute( "INSERT INTO trivia (common_name, specific_name, scientific_name, image_url) VALUES (%s, %s, %s, %s)", (common_name, specific_name, scientific_name, image_url)) db.commit() await inter.send(f'Trivia question added: {common_name}, {specific_name}, {scientific_name}') @bot.slash_command(description="Edits an existing trivia question.") async def edittrivia(inter: disnake.ApplicationCommandInteraction, ref_number: int, common_name: str = '', specific_name: str = '', scientific_name: str = '', image_url: str = ''): update_fields = [] update_values = [] if common_name: update_fields.append("common_name=%s") update_values.append(common_name) if specific_name: update_fields.append("specific_name=%s") update_values.append(specific_name) if scientific_name: update_fields.append("scientific_name=%s") update_values.append(scientific_name) if image_url: update_fields.append("image_url=%s") update_values.append(image_url) update_values.append(ref_number) cursor.execute(f"UPDATE trivia SET {', '.join(update_fields)} WHERE id=%s", update_values) db.commit() await inter.send(f'Trivia question {ref_number} updated.') @bot.slash_command(description="Removes a trivia question.") async def removetrivia(inter: disnake.ApplicationCommandInteraction, ref_number: int): cursor.execute("DELETE FROM trivia WHERE id=%s", (ref_number,)) db.commit() await inter.send(f'Trivia question {ref_number} removed.') @bot.slash_command(description="Sets the time available for answering trivia questions.") async def quizanswer(inter: disnake.ApplicationCommandInteraction, seconds: int): global quiz_speed quiz_speed = seconds await inter.send(f'Time available for answering set to {seconds} seconds.') # Leaderboard Functionality @bot.slash_command(description="Displays the trivia leaderboard.") async def leaderboard(interaction: disnake.ApplicationCommandInteraction): # Renamed 'inter' to 'interaction' cursor.execute("SELECT user_id, score FROM scores ORDER BY score DESC LIMIT 10") rows = cursor.fetchall() leaderboard_text = "" for i, (user_id, score) in enumerate(rows, start=1): user = bot.get_user(user_id) leaderboard_text += f"{i}. {user.name if user else 'Unknown User'} - {score} points\n" await interaction.send(f"**Leaderboard**\n{leaderboard_text}") # Use 'interaction' here @bot.slash_command(description="Displays a help message describing each command.") async def helpme(interaction: disnake.ApplicationCommandInteraction): # Renamed 'inter' to 'interaction' here too await interaction.send(""" **Commands** - `/dumpchannel [channel_name]`: Sets the channel for trivia questions. - `/addtier [tier_number] [role]`: Adds a permission tier. - `/addtrivia [common_name] [specific_name] [scientific_name] [image_url]`: Adds a trivia question. - `/edittrivia [ref_number] [common_name] [specific_name] [scientific_name] [image_url]`: Edits a trivia question. - `/removetrivia [ref_number]`: Removes a trivia question. - `/leaderboard`: Shows the top 10 users by score. """) # Trivia Game Logic async def update_trivia_message(answer_type, user): global current_trivia_message if not current_trivia_message or not current_trivia_message.embeds: print("current_trivia_message is None or has no embeds.") return assert isinstance(current_trivia_message, Message) # Assure the type checker # Renamed the variable to avoid shadowing current_embed = current_trivia_message.embeds[0] # Use current_embed instead of trivia_embed if current_embed is not None: # Check for None before using description = current_embed.description new_description = description.replace(f"Option {answer_type}:", f"Option {answer_type} (@{user.name}):") current_embed.description = new_description if current_trivia_message: await current_trivia_message.edit(embed=current_embed) else: print("current_trivia_message is None.") return else: print("current_embed is None") return async def trivia_loop(): await bot.wait_until_ready() while not bot.is_closed(): if not current_trivia_state['active']: await asyncio.sleep(1) continue try: async with aiomysql.connect(**aiomysql_db_params) as conn: async with conn.cursor(aiomysql.DictCursor) as cur: await cur.execute("SELECT game_channel_id, quiz_speed, quiz_answer_duration FROM bot_settings LIMIT 1") settings = await cur.fetchone() if not settings or not settings['game_channel_id']: print("Game channel ID is not set in the database. Please use the /gamechannel command to set it.") await asyncio.sleep(10) continue local_game_channel_id = settings['game_channel_id'] local_quiz_speed = settings.get('quiz_speed', 60) local_quiz_answer_duration = settings.get('quiz_answer_duration', 30) local_game_channel = bot.get_channel(int(local_game_channel_id)) if local_game_channel is None: print(f"Could not find a channel with ID: {local_game_channel_id}") await asyncio.sleep(10) continue await cur.execute("SELECT * FROM trivia ORDER BY RAND() LIMIT 1") trivia_question = await cur.fetchone() if trivia_question is None: print("No trivia questions found in the database.") await asyncio.sleep(local_quiz_speed) continue current_trivia_state.update({ 'active': True, 'question_id': trivia_question['id'], 'answers': { 'common_name': trivia_question['common_name'], 'specific_name': trivia_question['specific_name'], 'scientific_name': trivia_question['scientific_name'] }, 'start_time': datetime.utcnow(), 'answered_users': set() # Use set, not list, for answered_users }) embed = disnake.Embed( title="Trivia Time!", description="Identify the animal below! Each correct answer grants a point!\n" "Please respond with the option followed by your answer.", color=disnake.Color.blue() ) embed.set_image(url=trivia_question['image_url']) embed.set_footer(text=f"Trivia ID: {trivia_question['id']}") await local_game_channel.send(embed=embed) await asyncio.sleep(local_quiz_answer_duration) current_trivia_state['active'] = False correct_answers = "\n".join(f"{key.replace('_', ' ').title()}: {value}" for key, value in current_trivia_state['answers'].items()) answer_embed = disnake.Embed( title="Time's Up!", description=f"The correct answers were:\n{correct_answers}", color=disnake.Color.green() ) await local_game_channel.send(embed=answer_embed) # Reset the event before starting a new task current_trivia_state['event'].clear() tasks = [ bot.loop.create_task(current_trivia_state['event'].wait()), bot.loop.create_task(asyncio.sleep(max(0, local_quiz_speed - local_quiz_answer_duration))) ] done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) for task in pending: task.cancel() except Exception as e: print(f"An error occurred in the trivia loop: {e}") await asyncio.sleep(10) # Start the trivia loop as a background task bot.loop.create_task(trivia_loop()) # Start the Bot bot.run(DISCORD_TOKEN)
Editor is loading...