Celery入门指南(四)
/ 点击 / 阅读耗时 5 分钟配置任务结果存储
Celery默认是不存储任务的结果信息。为了能够查询到任务的结果信息,我们需要在Celery中配置 backend
参数,指定保存任务信息的存储组件。我们这里以Redis为例:
# sum.py
from celery import Celery
app = Celery(
"my_celery", # Celery 实例名
broker="redis://localhost:6379/0", # 消息队列组件
backend="redis://localhost:6379/1" # 结果存储组件
)
@app.task
def sum(a, b):
return a + b
当然,我们也可以通过 app.conf.update()
方法更新这个配置。例如:
other_celery_app = Celery("other_celery_app")
other_celery_app.conf.update(backend="redis://localhost:6379/2")
获取结果
每次我们调用 task.delay
或者 task.apply_async()
方法后面,都得到一个 celery.result.AsyncResult
类型的返回值。我们可以调用这个返回对象的 get()
方法,来获取任务结果。
>>> from sum import sum
>>> result = sum.delay(1, 2)
>>> result.get()
3
在某些任务中,任务可能执行很慢,我们可以在调用 get()
方法时,通过指定 timeout
参数,来控制超时时间。我们先修改下 sum
任务的定义。
# sum.py
from celery import Celery
from time import sleep
app = Celery(
"my_celery",
broker="redis://localhost:6379/1",
backend="redis://localhost:6379/1"
)
@app.task(priority=1)
def sum(a, b):
sleep(10) # 人为阻塞任务执行
return a + b
再调用一下指定超时时间的 get
方法,可以看到在超时时间到了之后,get
方法抛出了 celery.exceptions.TimeoutError: The operation timed out.
异常。
>>> from sum import sum
>>> result = sum.delay(1, 2)
>>> result.get(timeout=1)
Traceback (most recent call last):
File "/home/yuellin/.local/lib/python3.10/site-packages/celery/backends/asynchronous.py", line 287, in _wait_for_pending
for _ in self.drain_events_until(
File "/home/yuellin/.local/lib/python3.10/site-packages/celery/backends/asynchronous.py", line 52, in drain_events_until
raise socket.timeout()
TimeoutError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/yuellin/.local/lib/python3.10/site-packages/celery/result.py", line 251, in get
return self.backend.wait_for_pending(
File "/home/yuellin/.local/lib/python3.10/site-packages/celery/backends/asynchronous.py", line 221, in wait_for_pending
for _ in self._wait_for_pending(result, **kwargs):
File "/home/yuellin/.local/lib/python3.10/site-packages/celery/backends/asynchronous.py", line 293, in _wait_for_pending
raise TimeoutError('The operation timed out.')
celery.exceptions.TimeoutError: The operation timed out.
需要说明的是,get
方法抛出异常,并不会影响任务的继续执行,我们可以在等待一段时间之后,再去调用 get
方法。
当然,我们也可以先查询任务的状态,任务状态成功之后,我们再执行后续操作。
>>> result = sum.delay(1, 2)
>>> print(result.status)
>>> PENDING
# 等待10秒左右
>>> print(result.status)
>>> SUCCESS
根据ID查询结果
在某些场景中,我们需要根据任务ID查询任务的状态或者结果信息。这时候可以主动构造一个 celery.result.AsyncResult
对象完成查询。我们先发起一个任务:
>>> from sum import sum
>>> result = sum.delay(1, 2)
>>> print(result.id)
ef9a4410-6192-43a7-a983-931f1e0ce7d9 # 记下这串ID
接着,我们通过任务的ID,来查询这个任务的状态和信息:
>>> from sum import app
>>> from celery.result import AsyncResult
>>>
>>> res = AsyncResult('ef9a4410-6192-43a7-a983-931f1e0ce7d9', app=app)
>>> res.status
'SUCCESS'
>>> res.get()
3