Untitled
from celery import Celery app = Celery('your_flask_app') # Replace with your app's name app.config_from_object('your_flask_config') # Replace with your Celery config def retry_failed_tasks(): i = app.control.inspect() failed_tasks = i.failed() # Fetch failed tasks if failed_tasks: for worker, tasks in failed_tasks.items(): for task in tasks: task_id = task.get('id') print(f"Retrying task: {task_id}") app.control.revoke(task_id, terminate=False) # Clear revoked state app.control.retry(task_id) if __name__ == "__main__": retry_failed_tasks()
Leave a Comment