animal trivia bot
thank you friendunknown
python
2 years ago
28 kB
11
Indexable
import asyncio
import disnake
from datetime import datetime
from disnake.ext import commands
import aiomysql
from dotenv import load_dotenv
import logging
import os
# Load environment variables
load_dotenv()
# Access environment variables
DB_HOST = os.getenv('DB_HOST')
DB_NAME = os.getenv('DB_NAME')
DB_USER = os.getenv('DB_USER')
DB_PASS = os.getenv('DB_PASS')
DB_PORT = os.getenv('DB_PORT')
DISCORD_TOKEN = os.getenv('DISCORD_TOKEN')
aiomysql_db_params = {
'host': DB_HOST,
'port': int(DB_PORT),
'user': DB_USER,
'password': DB_PASS,
'db': DB_NAME,
'autocommit': True
}
# Initialize bot and database
intents = disnake.Intents.all()
bot = commands.Bot(command_prefix="!", intents=intents)
# Logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s]: %(message)s",
handlers=[
logging.StreamHandler() # Log to console
]
)
# Constants
QUIZ_SPEED_DEFAULT = 60
DEV_GUILD_ID = 753384735236948018 # Replace with your development guild's ID
# Global variables for game state
quiz_speed = QUIZ_SPEED_DEFAULT
trivia_channel_id = None
game_channel_id = None
# This variable should be defined in a scope accessible by both trivia_loop and on_message
# Initialize it with a structure that can hold the current trivia question and its state
bot.current_trivia_state = {
'game_is_on': False,
'active': False,
'event': asyncio.Event(),
'question_id': None, # ID of the question from the database
'message_id': None, # Discord message ID of the trivia question message
'answers': {
'animal': None, # Answer for the "Animal" part of the question
'specific_animal': None, # Answer for the "Specific Animal" part of the question
'scientific_name': None # Answer for the "Scientific Name" part of the question
},
'answered_parts': set(), # Initialize as an empty set
'answered_users': set(), # Initialize as an empty set
'start_time': None,
'game_channel_id': None
}
async def initialize_tables():
try:
async with aiomysql.connect(**aiomysql_db_params) as conn:
async with conn.cursor() as cursor:
await cursor.execute("""
CREATE TABLE IF NOT EXISTS trivia (
id INT AUTO_INCREMENT PRIMARY KEY,
common_name VARCHAR(255) NOT NULL,
specific_name VARCHAR(255) NOT NULL,
scientific_name VARCHAR(255) NOT NULL,
image_url VARCHAR(255) NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
""")
await cursor.execute("""
CREATE TABLE IF NOT EXISTS bot_settings (
game_channel_id BIGINT UNSIGNED,
quiz_speed INT DEFAULT 60,
quiz_answer_duration INT DEFAULT 30
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
""")
await cursor.execute("""
CREATE TABLE IF NOT EXISTS scores (
user_id BIGINT UNSIGNED NOT NULL,
score INT DEFAULT 0,
PRIMARY KEY (user_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
""")
await conn.commit()
print("Tables initialized successfully.")
except aiomysql.Error as err:
# Log the error message to the console
logging.error(f"An error occurred during table initialization: {err}")
async def main():
# Call the initialize_tables function right after defining it and the DB connection is made
await initialize_tables()
if __name__ == "__main__":
asyncio.run(main())
async def get_setting_from_db(setting_name, caller=None):
try:
async with aiomysql.connect(**aiomysql_db_params) as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
# Formulate the SQL query string based on the setting_name passed
sql_query = f"SELECT {setting_name} FROM bot_settings LIMIT 1"
await cur.execute(sql_query)
result = await cur.fetchone()
# If the result is found, return the setting value
return result[setting_name] if result else None
except Exception as e:
# Log the error message with context to the caller function
error_message = f"An error occurred in get_setting_from_db (Caller: {caller}): {e}"
logging.error(error_message)
# It is a good practice to also log the query that caused the error for easier debugging
logging.error(f"Failed SQL Query: {sql_query}")
return None # Return None or handle the error as needed
@bot.event
async def on_message(message):
logging.info(f"Message from {message.author}: {message.content}")
# Skip messages sent by the bot itself
if message.author == bot.user:
return
# Check if the message is in the correct channel and if the trivia game is active
if (message.channel.id == bot.current_trivia_state.get('game_channel_id') and
bot.current_trivia_state['active']):
# Normalize input to lowercase and strip leading/trailing whitespace
message_content = message.content.strip().lower()
# Get the correct answers from the current trivia state
correct_answers = bot.current_trivia_state['answers']
# Check if the message content matches one of the correct answers
for answer_part, correct_answer in correct_answers.items():
if (message_content == correct_answer.lower() and
answer_part not in bot.current_trivia_state['answered_parts']):
# Calculate the time taken to answer
answer_time = (datetime.utcnow() - bot.current_trivia_state['start_time']).total_seconds()
# Call handle_answer with the appropriate parameters
await handle_answer(
message,
answer=message_content, # Provide the answer as 'answer'
answer_part=answer_part,
time_taken=answer_time # Provide the time_taken as 'time_taken'
)
# Add the answer part and user to the respective sets
bot.current_trivia_state['answered_parts'].add(answer_part)
bot.current_trivia_state['answered_users'].add(message.author.id)
# Check if all parts have been answered
if bot.current_trivia_state['answered_parts'] == set(correct_answers.keys()):
# Optionally, send a message indicating all parts have been answered
# and the next question will be coming up
# Reset the game state for the next question
bot.current_trivia_state['active'] = False
bot.current_trivia_state['answered_parts'].clear()
bot.current_trivia_state['start_time'] = None
break # No need to check other parts if the correct answer is found
# Process other commands if any
await bot.process_commands(message)
async def handle_answer(message, answer, answer_part, time_taken):
logging.info(f"Handling correct answer: {answer} for question part: {answer_part}")
# Retrieve the original message ID for the trivia question
original_message_id = bot.current_trivia_state.get('message_id')
if original_message_id:
try:
original_message = await message.channel.fetch_message(original_message_id)
embed = original_message.embeds[0]
# Prepare the new answer text with user's name and time taken
author_name = message.author.display_name
answer_text = f"{answer}, answered by {author_name} in {time_taken:.2f} seconds!"
# Update the embed's description with the correct answer based on answer_part
if answer_part == 'animal':
embed.description = embed.description.replace("[Your Guess for Common Name]", answer_text)
elif answer_part == 'specific_animal':
embed.description = embed.description.replace("[Your Guess for Specific Name]", answer_text)
elif answer_part == 'scientific_name':
embed.description = embed.description.replace("[Your Guess for Scientific Name]", answer_text)
# Edit the original message with the updated embed
await original_message.edit(embed=embed)
# Update the leaderboard
await update_leaderboard(message.author.id)
# React to the message to show the answer has been accepted
await message.add_reaction("🎉")
except Exception as e:
logging.error(f"Failed to fetch or edit the original trivia message: {e}")
else:
logging.error("Original message ID not found. Cannot update the trivia message.")
async def update_leaderboard(user_id):
try:
async with aiomysql.connect(**aiomysql_db_params) as conn:
async with conn.cursor() as cur:
# Check if the user has an existing score
await cur.execute("SELECT score FROM scores WHERE user_id = %s", (user_id,))
row = await cur.fetchone()
# Update the score if it exists, otherwise insert a new score
if row:
new_score = row[0] + 1 # Increment the existing score
await cur.execute("UPDATE scores SET score = %s WHERE user_id = %s", (new_score, user_id))
else:
await cur.execute("INSERT INTO scores (user_id, score) VALUES (%s, 1)", (user_id,))
# Commit the transaction
await conn.commit()
except Exception as e:
logging.error(f"Failed to update leaderboard: {e}")
# Slash Commands
@bot.slash_command(name="gameon", description="Turns the trivia game on")
async def gameon(inter: disnake.ApplicationCommandInteraction):
bot.current_trivia_state['game_is_on'] = True
bot.loop.create_task(trivia_loop()) # Start the trivia_loop coroutine
await inter.response.send_message("Trivia game is now ON.", ephemeral=True)
@bot.slash_command(name="gameoff", description="Turns the trivia game off")
async def gameoff(inter: disnake.ApplicationCommandInteraction):
# No need to use 'global' as we are now using an attribute of 'bot'.
bot.current_trivia_state['game_is_on'] = False
await inter.response.send_message("Trivia game is now OFF.", ephemeral=True)
@bot.slash_command(description="Assigns a command to a permission tier.")
async def assigncommand(inter: disnake.ApplicationCommandInteraction, command: str, tier: int):
cursor.execute("REPLACE INTO command_tiers (command_name, tier_number) VALUES (%s, %s)", (command, tier))
db.commit()
await inter.send(f'Assigned command {command} to tier {tier}')
@bot.slash_command(description="Associates a role with a permission tier.")
async def addrole(inter: disnake.ApplicationCommandInteraction, role: disnake.Role, tier: int):
cursor.execute("UPDATE permission_tiers SET role_id = %s WHERE tier_number = %s", (role.id, tier))
db.commit()
await inter.send(f'Associated role {role.name} with tier {tier}')
@bot.slash_command(description="Displays the bot's current settings.")
async def status(inter: disnake.ApplicationCommandInteraction):
cursor.execute("SELECT * FROM settings")
rows = cursor.fetchall()
settings_text = "\n".join([f"{row[0]}: {row[1]}" for row in rows])
await inter.send(f"**Current Settings**\n{settings_text}")
@bot.slash_command(description="Set the game channel for trivia")
async def gamechannel(inter, channel: disnake.TextChannel):
# Connect to the database
async with aiomysql.connect(**aiomysql_db_params) as conn:
async with conn.cursor() as cur:
# Update the existing row with the new game channel ID
await cur.execute("""
UPDATE bot_settings SET game_channel_id = %s
""", (channel.id,))
# Commit changes
await conn.commit()
# Send a confirmation message to the channel
await inter.response.send_message(f"The game channel has been set to {channel.mention}")
@bot.slash_command(description="Set the speed of the quiz in seconds")
async def quizspeed(inter, seconds: int):
# Connect to the database
async with aiomysql.connect(**aiomysql_db_params) as conn:
async with conn.cursor() as cur:
# Update the existing row with the new quiz speed
await cur.execute("""
UPDATE bot_settings SET quiz_speed = %s
""", (seconds,))
affected_rows = cur.rowcount
# If no row was updated, it means the settings row doesn't exist, so insert it
if affected_rows == 0:
await cur.execute("""
INSERT INTO bot_settings (quiz_speed) VALUES (%s)
""", (seconds,))
# Commit changes
await conn.commit()
# Send a confirmation message to the channel
await inter.response.send_message(f"The quiz speed has been set to {seconds} seconds.")
@bot.slash_command(description="Trigger the next trivia question immediately.")
async def quizme(inter):
# Check if the game is on
if not bot.current_trivia_state['game_is_on']:
await inter.response.send_message(
"The trivia game is not currently active. Please start the game first with /gameon.",
ephemeral=True)
elif bot.current_trivia_state['active']:
# If a game is already active, inform the user and do not proceed
await inter.response.send_message(
f"{inter.author.mention} a trivia game is already active, please wait until it ends to start another.",
ephemeral=True)
else:
# Set the event to skip the wait and indicate that a new question should be fetched
bot.current_trivia_state['event'].set()
# Inform the user that the next question is coming up
await inter.response.send_message("Next trivia question coming up!")
@bot.slash_command(description="Displays a list of commands accessible by the user's permission tier.")
async def commands(inter: disnake.ApplicationCommandInteraction):
# Use the get_user_tier function to fetch the user's permission tier
user_tier = get_user_tier(inter.author)
if user_tier is not None:
buffered_cursor = db.cursor(buffered=True)
buffered_cursor.execute("SELECT command_name FROM command_tiers WHERE tier_number <= %s", (user_tier,))
rows = buffered_cursor.fetchall()
buffered_cursor.close() # Close the cursor
commands_text = "\n".join([row[0] for row in rows])
await inter.send(f"**Available Commands**\n{commands_text}")
else:
await inter.send("You do not have a permission tier assigned.")
@bot.slash_command(description="Changes the role associated with a permission tier.")
async def changetier(inter: disnake.ApplicationCommandInteraction, tier_number: int, role: disnake.Role):
cursor.execute("UPDATE permission_tiers SET role_id = %s WHERE tier_number = %s", (role.id, tier_number))
db.commit()
await inter.send(f'Changed role for tier {tier_number} to {role.name}')
# Trivia Management Commands
@bot.slash_command(description="Adds a new trivia question.")
async def addtrivia(inter: disnake.ApplicationCommandInteraction, common_name: str, specific_name: str,
scientific_name: str, image_url: str):
try:
# Connect to the database
async with aiomysql.connect(**aiomysql_db_params) as conn:
# Create a cursor to execute the SQL statement
async with conn.cursor() as cur:
# Insert the new trivia question into the database
await cur.execute(
"INSERT INTO trivia (common_name, specific_name, scientific_name, image_url) VALUES (%s, %s, %s, %s)",
(common_name, specific_name, scientific_name, image_url))
# Commit the transaction
await conn.commit()
# Send a confirmation message back to the user
await inter.send(f'Trivia question added: {common_name}, {specific_name}, {scientific_name}')
except aiomysql.Error as e:
# Log the error and send a message back to the user if there's a problem
logging.error(f"An error occurred while adding a trivia question: {e}")
await inter.send(f"An error occurred while adding the trivia question: {e}")
@bot.slash_command(description="Edits an existing trivia question.")
async def edittrivia(inter: disnake.ApplicationCommandInteraction, ref_number: int, common_name: str = '',
specific_name: str = '', scientific_name: str = '', image_url: str = ''):
update_fields = []
update_values = []
if common_name:
update_fields.append("common_name=%s")
update_values.append(common_name)
if specific_name:
update_fields.append("specific_name=%s")
update_values.append(specific_name)
if scientific_name:
update_fields.append("scientific_name=%s")
update_values.append(scientific_name)
if image_url:
update_fields.append("image_url=%s")
update_values.append(image_url)
update_values.append(ref_number)
cursor.execute(f"UPDATE trivia SET {', '.join(update_fields)} WHERE id=%s", update_values)
db.commit()
await inter.send(f'Trivia question {ref_number} updated.')
@bot.slash_command(description="Removes a trivia question.")
async def removetrivia(inter: disnake.ApplicationCommandInteraction, ref_number: int):
cursor.execute("DELETE FROM trivia WHERE id=%s", (ref_number,))
db.commit()
await inter.send(f'Trivia question {ref_number} removed.')
@bot.slash_command(description="Sets the time available for answering trivia questions.")
async def quizanswer(inter: disnake.ApplicationCommandInteraction, seconds: int):
# Connect to the database
async with aiomysql.connect(**aiomysql_db_params) as conn:
async with conn.cursor() as cur:
# Update the quiz_answer_duration in the bot_settings table
await cur.execute("""
UPDATE bot_settings SET quiz_answer_duration = %s
""", (seconds,))
# Commit changes
await conn.commit()
# Send a confirmation message to the channel
await inter.response.send_message(f"Time available for answering set to {seconds} seconds.")
# Define the leaderboard command
@bot.slash_command(description="Displays the trivia leaderboard.")
async def leaderboard(interaction: disnake.ApplicationCommandInteraction):
# Connect to the MySQL database
async with aiomysql.connect(**aiomysql_db_params) as conn:
async with conn.cursor() as cursor:
# Retrieve the top 10 scores from the database
await cursor.execute("SELECT user_id, score FROM scores ORDER BY score DESC LIMIT 10")
rows = await cursor.fetchall()
# Create an embed for the leaderboard
leaderboard_embed = disnake.Embed(
title="Trivia Leaderboard",
color=disnake.Color.gold()
)
# Initialize an empty string to construct the leaderboard text
leaderboard_text = ""
# Iterate through the rows and construct the leaderboard text
for i, (user_id, score) in enumerate(rows, start=1):
# Use bot.get_user to get the user's name based on their ID
user = bot.get_user(user_id)
username = user.name if user else 'Unknown User'
leaderboard_text += f"**{i}.** {username} - {score} points\n"
# Add the leaderboard text to the embed
leaderboard_embed.description = leaderboard_text
# Send the leaderboard embed as a response
await interaction.response.send_message(embed=leaderboard_embed)
@bot.slash_command(description="Displays a help message describing each command.")
async def helpme(interaction: disnake.ApplicationCommandInteraction): # Renamed 'inter' to 'interaction' here too
await interaction.send("""
**Commands**
- `/dumpchannel [channel_name]`: Sets the channel for trivia questions.
- `/addtier [tier_number] [role]`: Adds a permission tier.
- `/addtrivia [common_name] [specific_name] [scientific_name] [image_url]`: Adds a trivia question.
- `/edittrivia [ref_number] [common_name] [specific_name] [scientific_name] [image_url]`: Edits a trivia question.
- `/removetrivia [ref_number]`: Removes a trivia question.
- `/leaderboard`: Shows the top 10 users by score.
""")
@bot.slash_command(description="Prints the current state of the trivia game")
async def print_game_state(inter: disnake.ApplicationCommandInteraction):
# Construct a string representation of the current_trivia_state
state_info = "```\n" # Use triple backticks for code block formatting in Discord
for key, value in bot.current_trivia_state.items():
if isinstance(value, dict):
state_info += f"{key}:\n"
for sub_key, sub_value in value.items():
state_info += f" {sub_key}: {sub_value}\n"
else:
state_info += f"{key}: {value}\n"
state_info += "```"
# Send the state information as a message in the chat
await inter.response.send_message(state_info,
ephemeral=True) # ephemeral=True makes it only visible to the user who invoked the command
# Trivia Game Logic
async def trivia_loop():
await bot.wait_until_ready()
while not bot.is_closed():
if not bot.current_trivia_state['game_is_on']:
logging.info("Trivia loop: Game is not on. Waiting...")
await asyncio.sleep(10)
continue
if bot.current_trivia_state['active']:
logging.info("Trivia loop: Trivia is already active. Waiting...")
await asyncio.sleep(1)
continue
try:
# Logging to track the flow of execution
logging.info("Trivia loop: Checking game state...")
game_channel_id = bot.current_trivia_state.get('game_channel_id')
if not game_channel_id:
game_channel_id = await get_setting_from_db('game_channel_id', caller='trivia_loop')
if game_channel_id:
bot.current_trivia_state['game_channel_id'] = game_channel_id
else:
logging.error("Trivia loop: No game channel ID found.")
await asyncio.sleep(10)
continue
quiz_speed = int(await get_setting_from_db('quiz_speed', caller='trivia_loop') or 60)
quiz_answer_duration = int(await get_setting_from_db('quiz_answer_duration', caller='trivia_loop') or 30)
local_game_channel = bot.get_channel(int(game_channel_id))
if local_game_channel is None:
logging.warning(f"Trivia loop: Could not find a channel with ID: {game_channel_id}")
await asyncio.sleep(10)
continue
async with aiomysql.connect(**aiomysql_db_params) as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute("SELECT * FROM trivia ORDER BY RAND() LIMIT 1")
trivia_question = await cur.fetchone()
if trivia_question is None:
logging.warning("Trivia loop: No trivia questions found in the database.")
await asyncio.sleep(quiz_speed)
continue
bot.current_trivia_state['active'] = True
bot.current_trivia_state['question_id'] = trivia_question['id']
bot.current_trivia_state['answers']['animal'] = trivia_question['common_name']
bot.current_trivia_state['answers']['specific_animal'] = trivia_question['specific_name']
bot.current_trivia_state['answers']['scientific_name'] = trivia_question['scientific_name']
bot.current_trivia_state['start_time'] = datetime.utcnow()
# Logging to track when the trivia question is sent
logging.info("Trivia loop: Sending trivia question...")
question_embed = disnake.Embed(
title="🌟 Trivia Time! 🌟",
description=(
"Can you identify the animal below?\n\n"
"🐾 **Animal:** [Your Guess for Common Name]\n\n"
"🦁 **Specific Animal:** [Your Guess for Specific Name]\n\n"
"🔬 **Scientific Name:** [Your Guess for Scientific Name]\n\n"
f"You have {quiz_answer_duration} seconds to answer all three parts!"
),
color=disnake.Color.blue()
)
question_embed.set_image(url=trivia_question['image_url'])
question_embed.set_footer(text=f"Trivia ID: {trivia_question['id']}")
trivia_message = await local_game_channel.send(embed=question_embed)
bot.current_trivia_state['message_id'] = trivia_message.id
await asyncio.sleep(quiz_answer_duration)
# After waiting for answers, update specific variables to reset between rounds
bot.current_trivia_state['active'] = False
bot.current_trivia_state['answered_users'].clear()
bot.current_trivia_state['start_time'] = None
# Reset specific variables between rounds
bot.current_trivia_state['question_id'] = None
bot.current_trivia_state['answers']['animal'] = None
bot.current_trivia_state['answers']['specific_animal'] = None
bot.current_trivia_state['answers']['scientific_name'] = None
bot.current_trivia_state['answered_parts'].clear()
# Logging to track when the answer message is sent
logging.info("Trivia loop: Sending answer message...")
answer_embed = disnake.Embed(
title="Time's Up!",
description="Stay tuned for the correct answers and get ready for the next question!",
color=disnake.Color.green()
)
await local_game_channel.send(embed=answer_embed)
bot.current_trivia_state['event'].clear()
tasks = [
bot.loop.create_task(bot.current_trivia_state['event'].wait()),
bot.loop.create_task(asyncio.sleep(quiz_speed))
]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in pending:
task.cancel()
except Exception as e:
# Logging to track errors
logging.error(f"Trivia loop: An error occurred: {e}")
await asyncio.sleep(10)
bot.run(DISCORD_TOKEN)
Editor is loading...