Celery入门指南(二)
/ 点击 / 阅读耗时 5 分钟任务批量注册
在Celery入门指南(一)例子中,我们只是定义了一个任务,在实际的业务代码中,往往包含了多个的任务,这些任务会分布在不同的目录下。我们会产生两个需求:
- 需要有一种方法,让Celery能够找到这些任务。
- 需要有一种方法,可以统一地发起任务,而不是去
import
所有的任务函数,然后再通过task_function.delay()
的方法发起任务。
在继续下面的内容之前,先说下判断任务是否加载成功的方式。在Celery worker 进程启动的时候,会打印出加载到的任务名,如下:
[tasks]
. add
. some_package.other_task
我们创建如下的项目结构:
.
├── main.py
├── task_a
│ ├── __init__.py
│ └── add.py
└── task_b
├── __init__.py
└── minus.py
task_a/add.py
定义了第一个任务:
from main import app
@app.task
def add(a, b):
return a+b
task_b/minus.py
定义了第二个任务:
from main import app
@app.task
def minus(a, b):
return a - b
main.py
是程序入口,定义了Celery的实例。代码如下:
from celery import Celery
app = Celery("demo", broker="redis://localhost:6379/0")
通过imports参数
现在我们在 main.py
中添加以下代码,来指定任务所在的 module
。
app.conf.imports=["task_a.add", "task_b.minus"]
再次启动 worker
进程,就可以看到以下信息:
[tasks]
. task_a.add.add
. task_b.minus.minus
通过autodiscover_tasks方法
现在我们在 main.py
中添加以下代码,来指定任务所在的 module
。
# app.conf.imports=["task_a.add", "task_b.minus"]
app.autodiscover_tasks(["task_a.add", "task_b.minus"], force=True)
同样启动 worker
进程,可以看到相同的任务注册信息。
以上两种方式,并没有太大的区别,可以根据实际需要使用。
通过名称发起任务
在Celery入门指南(一)中,我们调用任务是使用类似于 add.delay(1, 2)
这种方式来发起任务。这种方式存在一个问题,需要在发起任务调用的地方,把定义的任务 import
进去。
假定我们的服务提供了二十种任务,用户希望在一个接口里面,通过指定任务名的方式发起指定任务,那我们的代码就要写成这个样子:
from task_module_a import task_a
from task_module_b import task_b
# ... import 20个任务的方法
def api(task_name, task_args):
if task_name == "task_a":
task_a.delay(*args)
elif task_name == "task_b":
task_b.delay(*args)
# 进行20个任务名字的判断,根据名字调用对应的任务方法
这种写法雷同代码会越来越多,一方面不够简洁,另外一方面如果后面忘了在这里加入 elif
逻辑,会导致任务因找不到类型无法发起。
Celery提供了一种通过指定任务名来发起任务的方式,具体代码如下:
from main import app
def api(task_name, task_args):
app.send_task(name=task_name, arags=task_args)
# 调用示例
api("task_a.add.add", (1, 2))