开发者

How to make a celery task fail from within the task?

开发者 https://www.devze.com 2023-04-11 17:10 出处:网络
Under some conditions, I want to make a celery task fail from within that task. I tried the following:

Under some conditions, I want to make a celery task fail from within that task. I tried the following:

from celery.task import task
from celery import states

@task()
d开发者_开发百科ef run_simulation():
    if some_condition:
        run_simulation.update_state(state=states.FAILURE)
        return False

However, the task still reports to have succeeded:

Task sim.tasks.run_simulation[9235e3a7-c6d2-4219-bbc7-acf65c816e65] succeeded in 1.17847704887s: False

It seems that the state can only be modified while the task is running and once it is completed - celery changes the state to whatever it deems is the outcome (refer to this question). Is there any way, without failing the task by raising an exception, to make celery return that the task has failed?


To mark a task as failed without raising an exception, update the task state to FAILURE and then raise an Ignore exception, because returning any value will record the task as successful, an example:

from celery import Celery, states
from celery.exceptions import Ignore

app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task(bind=True)
def run_simulation(self):
    if some_condition:
        # manually update the task state
        self.update_state(
            state = states.FAILURE,
            meta = 'REASON FOR FAILURE'
        )

        # ignore the task so no other state is recorded
        raise Ignore()

But the best way is to raise an exception from your task, you can create a custom exception to track these failures:

class TaskFailure(Exception):
   pass

And raise this exception from your task:

if some_condition:
    raise TaskFailure('Failure reason')


I'd like to further expand on Pierre's answer as I've encountered some issues using the suggested solution.

To allow custom fields when updating a task's state to states.FAILURE, it is important to also mock some attributes that a FAILURE state would have (notice exc_type and exc_message) While the solution will terminate the task, any attempt to query the state (For example - to fetch the 'REASON FOR FAILURE' value) will fail.

Below is a snippet for reference I took from: https://www.distributedpython.com/2018/09/28/celery-task-states/

@app.task(bind=True)
def task(self):
    try:
        raise ValueError('Some error')
    except Exception as ex:
        self.update_state(
            state=states.FAILURE,
            meta={
                'exc_type': type(ex).__name__,
                'exc_message': traceback.format_exc().split('\n'),
                'custom': '...'
            })
        raise Ignore()


I got an interesting reply on this question from Ask Solem, where he proposes an 'after_return' handler to solve the issue. This might be an interesting option for the future.

In the meantime I solved the issue by simply returning a string 'FAILURE' from the task when I want to make it fail and then checking for that as follows:

result = AsyncResult(task_id)
if result.state == 'FAILURE' or (result.state == 'SUCCESS' and result.get() == 'FAILURE'):
    # Failure processing task 


In my case, I had issues with the accepted answers:

  1. Raising Ignore() or Reject() from within the task led to the task being correctly in FAILURE state, but when running a Celery workflow (think chain, chord, or group) containing this failing task would always result in the workflow hanging:
workflow = chain(my_task.si([], *args, **kwargs), other_task.s(*args, **kwargs))
res = workflow()
results = res.get() # hangs as the workflow never enters the ready state
  1. I wanted the rest of the workflow to still run even if one of the tasks failed (not propagate exceptions, or have global error handlers that are difficult to work with)

So I ended up always marking the task as success and doing my own error post-processing after the workflow ends (always successfully):

import traceback

def my_task(prev, arg1, arg2, opts={}):
  results = []
  state = {
    'state': task_state,
    'meta': {
      'custom_attr': 'test',
      # task metadata as needed
    }
  }

  try:
    # task code goes here
    task_state = 'SUCCESS'
    task_exc = None

  except BaseException as exc:
    task_state = 'FAILURE'
    task_exc = exc

  finally:
    state['state'] = 'SUCCESS'
    if task_state == 'FAILURE':
      exc_str = ' '.join(traceback.format_exception(
        etype=type(task_exc),
        value=task_exc,
        tb=task_exc.__traceback__))
      state['meta']['error'] = exc_str

    # Update task state with final status
    self.update_state(**state)

    return results

This has the advantage of:

  • Keeping all the needed exception data (traceback)
  • Avoiding the weird Celery post-processing of task states:
    • when tasks fail Celery replaces the dict data by the exception instance, which prevents having a consistent way of accessing task metadata
    • updating the FAILURE state without raising Ignore() or Reject() always result in task state being SUCCESS...
  • Make complex workflows way more resilient to failures.

This allows me to always process workflow results in the following way:

info = res.get() # this is always a dict containing metadata
error = info.get('error')
results = info['results']
custom_attr = info['custom_attr']
0

精彩评论

暂无评论...
验证码 换一张
取 消