Untitled

 avatar
unknown
plain_text
a month ago
653 B
1
Indexable
from celery import Celery

app = Celery('your_flask_app')  # Replace with your app's name
app.config_from_object('your_flask_config')  # Replace with your Celery config

def retry_failed_tasks():
    i = app.control.inspect()
    failed_tasks = i.failed()  # Fetch failed tasks
    if failed_tasks:
        for worker, tasks in failed_tasks.items():
            for task in tasks:
                task_id = task.get('id')
                print(f"Retrying task: {task_id}")
                app.control.revoke(task_id, terminate=False)  # Clear revoked state
                app.control.retry(task_id)

if __name__ == "__main__":
    retry_failed_tasks()
Leave a Comment