配置任务结果存储

Celery默认是不存储任务的结果信息。为了能够查询到任务的结果信息,我们需要在Celery中配置 backend 参数,指定保存任务信息的存储组件。我们这里以Redis为例:

# sum.py
from celery import Celery

app = Celery(
    "my_celery",				# Celery 实例名
    broker="redis://localhost:6379/0",		# 消息队列组件
    backend="redis://localhost:6379/1"		# 结果存储组件
)

@app.task
def sum(a, b):
    return a + b

当然,我们也可以通过 app.conf.update() 方法更新这个配置。例如:

other_celery_app = Celery("other_celery_app")

other_celery_app.conf.update(backend="redis://localhost:6379/2")

获取结果

每次我们调用 task.delay 或者 task.apply_async() 方法后面,都得到一个 celery.result.AsyncResult 类型的返回值。我们可以调用这个返回对象的 get() 方法,来获取任务结果。

>>> from sum import sum
>>> result = sum.delay(1, 2)
>>> result.get()
3

在某些任务中,任务可能执行很慢,我们可以在调用 get() 方法时,通过指定 timeout 参数,来控制超时时间。我们先修改下 sum 任务的定义。

# sum.py
from celery import Celery
from time import sleep

app = Celery(
    "my_celery",
    broker="redis://localhost:6379/1",
    backend="redis://localhost:6379/1"
)


@app.task(priority=1)
def sum(a, b):
    sleep(10)	# 人为阻塞任务执行
    return a + b

再调用一下指定超时时间的 get方法,可以看到在超时时间到了之后,get方法抛出了 celery.exceptions.TimeoutError: The operation timed out. 异常。

>>> from sum import sum
>>> result = sum.delay(1, 2)
>>> result.get(timeout=1)
Traceback (most recent call last):
  File "/home/yuellin/.local/lib/python3.10/site-packages/celery/backends/asynchronous.py", line 287, in _wait_for_pending
    for _ in self.drain_events_until(
  File "/home/yuellin/.local/lib/python3.10/site-packages/celery/backends/asynchronous.py", line 52, in drain_events_until
    raise socket.timeout()
TimeoutError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/yuellin/.local/lib/python3.10/site-packages/celery/result.py", line 251, in get
    return self.backend.wait_for_pending(
  File "/home/yuellin/.local/lib/python3.10/site-packages/celery/backends/asynchronous.py", line 221, in wait_for_pending
    for _ in self._wait_for_pending(result, **kwargs):
  File "/home/yuellin/.local/lib/python3.10/site-packages/celery/backends/asynchronous.py", line 293, in _wait_for_pending
    raise TimeoutError('The operation timed out.')
celery.exceptions.TimeoutError: The operation timed out.

需要说明的是,get 方法抛出异常,并不会影响任务的继续执行,我们可以在等待一段时间之后,再去调用 get方法。

当然,我们也可以先查询任务的状态,任务状态成功之后,我们再执行后续操作。

>>> result = sum.delay(1, 2)
>>> print(result.status)
>>> PENDING
# 等待10秒左右
>>> print(result.status)
>>> SUCCESS

根据ID查询结果

在某些场景中,我们需要根据任务ID查询任务的状态或者结果信息。这时候可以主动构造一个 celery.result.AsyncResult 对象完成查询。我们先发起一个任务:

>>> from sum import sum
>>> result = sum.delay(1, 2)
>>> print(result.id)
ef9a4410-6192-43a7-a983-931f1e0ce7d9	# 记下这串ID

接着,我们通过任务的ID,来查询这个任务的状态和信息:

>>> from sum import app
>>> from celery.result import AsyncResult
>>> 
>>> res = AsyncResult('ef9a4410-6192-43a7-a983-931f1e0ce7d9', app=app)
>>> res.status
'SUCCESS'
>>> res.get()
3