Untitled

 avatar
unknown
plain_text
a month ago
5.3 kB
3
Indexable
async def create_command(update: Update, context: CallbackContext) -> None:
    chat_id = str(update.message.chat_id)
    user_id = update.message.from_user.id

    # Check if the command is sent in a private chat
    if update.message.chat.type == 'private' and user_id not in admin_user_ids:
        await update.message.reply_text("πŸ’ͺDuring the Beta Phase, you can only create images in the Innovia Trust group.")
        return

    # Enforce rate limit: max 5 calls per hour
    if not enforce_rate_limit(chat_id, 'create', 48, timedelta(hours=24), context):
        await update.message.reply_text("⚠️ You have exceeded the limit of 48 image generation requests per day.")
        return

    prompt = " ".join(context.args)
    if not prompt:
        await update.message.reply_text("πŸ™ Please provide a prompt after the /create command.")
        return

    # Check if the chat ID is -1002056894611, if so, don't attach any ads
    if chat_id == '-1002056894611':
        message = await update.message.reply_text("πŸ›  Generating...")

    else:
        # Load the ad data and the rotation state from JSON or bot context
        ad_data = load_ad_data()
        rotation_state = context.bot_data.get('rotation_state', {})

        # Get the next available ad slot
        next_slot = get_next_ad_slot(chat_id, ad_data, rotation_state)

        # Show an ad while generating the image (only if an ad exists for the slot and has a valid file_id)
        ad_info = ad_data.get(f'ad_slot_{next_slot}', None)
        if ad_info:
            ad_file_id = ad_info.get('file_id')
            media_type = ad_info.get('media_type')
            ad_link = ad_info.get('link')
            button_name = ad_info.get('button_name', 'Visit the site')

            # Check if file_id exists and is valid
            if ad_file_id:
                try:
                    # Prepare the ad button if a link is available
                    if ad_link:
                        keyboard = [[InlineKeyboardButton(f"🌐 {button_name}", url=ad_link)]]
                        reply_markup = InlineKeyboardMarkup(keyboard)
                    else:
                        reply_markup = None  # No button if there's no link

                    # Send the ad with the appropriate media type
                    if media_type == "photo":
                        message = await update.message.reply_photo(photo=ad_file_id, caption="πŸ›  Generating your image...",
                                                                   reply_markup=reply_markup)
                    elif media_type == "video":
                        message = await update.message.reply_video(video=ad_file_id, caption="πŸ›  Generating your image...",
                                                                   reply_markup=reply_markup)
                    elif media_type == "animation":
                        message = await update.message.reply_animation(animation=ad_file_id, caption="πŸ›  Generating your image...",
                                                                       reply_markup=reply_markup)
                    else:
                        message = await update.message.reply_text("🚫 Unknown media type for the ad.", reply_markup=reply_markup)
                except telegram.error.BadRequest as e:
                    # Log error and notify user
                    logger.error(f"Error sending ad: {e}")
                    message = await update.message.reply_text("⚠️ Error sending the ad. Please try again.")
            else:
                # If no valid file_id, proceed without sending an ad
                message = await update.message.reply_text("πŸ›  Generating...")
        else:
            # If no ad exists, proceed without sending an ad
            message = await update.message.reply_text("πŸ›  Generating...")

    # Run the image generation and sending task in the background
    asyncio.create_task(handle_image_generation(prompt, message, update, context))




async def handle_image_generation(prompt: str, message, update: Update, context: CallbackContext) -> None:
    try:
        logger.info(f"Generating image with prompt: {prompt}")

        # Run image generation concurrently using asyncio.create_task
        loop = asyncio.get_running_loop()
        image_url = await loop.run_in_executor(None, create_image, prompt)

        logger.info(f"Image generated successfully: {image_url}")

        # Run image download concurrently
        image_bytes = await download_image_as_bytes(image_url)

        logger.info("Image downloaded successfully")

        # Replace the ad message with the generated image
        await message.delete()
        await send_photo_via_telegram(update, image_bytes)

        logger.info("Generated image sent successfully")

    except Exception as e:
        logger.error(f"Error generating image: {e}")
        await update.message.reply_text("πŸ₯²An error occurred while generating the image. Please try again later.")

    # Save the updated rotation state back to bot_data
    context.bot_data['rotation_state'] = context.bot_data.get('rotation_state', {})
Editor is loading...
Leave a Comment