任务批量注册

Celery入门指南(一)例子中,我们只是定义了一个任务,在实际的业务代码中,往往包含了多个的任务,这些任务会分布在不同的目录下。我们会产生两个需求:

  1. 需要有一种方法,让Celery能够找到这些任务。
  2. 需要有一种方法,可以统一地发起任务,而不是去 import 所有的任务函数,然后再通过 task_function.delay()的方法发起任务。

在继续下面的内容之前,先说下判断任务是否加载成功的方式。在Celery worker 进程启动的时候,会打印出加载到的任务名,如下:

[tasks]
 . add
 . some_package.other_task

我们创建如下的项目结构:

.
├── main.py
├── task_a
│   ├── __init__.py
│   └── add.py
└── task_b
    ├── __init__.py
    └── minus.py

task_a/add.py定义了第一个任务:

from main import app


@app.task
def add(a, b):
    return a+b

task_b/minus.py定义了第二个任务:

from main import app


@app.task
def minus(a, b):
    return a - b

main.py是程序入口,定义了Celery的实例。代码如下:

from celery import Celery

app = Celery("demo", broker="redis://localhost:6379/0")

通过imports参数

现在我们在 main.py中添加以下代码,来指定任务所在的 module

app.conf.imports=["task_a.add", "task_b.minus"]

再次启动 worker进程,就可以看到以下信息:

[tasks]
  . task_a.add.add
  . task_b.minus.minus

通过autodiscover_tasks方法

现在我们在 main.py中添加以下代码,来指定任务所在的 module

# app.conf.imports=["task_a.add", "task_b.minus"]

app.autodiscover_tasks(["task_a.add", "task_b.minus"], force=True)

同样启动 worker进程,可以看到相同的任务注册信息。

以上两种方式,并没有太大的区别,可以根据实际需要使用。

通过名称发起任务

Celery入门指南(一)中,我们调用任务是使用类似于 add.delay(1, 2)这种方式来发起任务。这种方式存在一个问题,需要在发起任务调用的地方,把定义的任务 import 进去。

假定我们的服务提供了二十种任务,用户希望在一个接口里面,通过指定任务名的方式发起指定任务,那我们的代码就要写成这个样子:

from task_module_a import task_a
from task_module_b import task_b
# ... import 20个任务的方法

def api(task_name, task_args):
    if task_name == "task_a":
        task_a.delay(*args)
    elif task_name == "task_b":
        task_b.delay(*args)
    # 进行20个任务名字的判断,根据名字调用对应的任务方法

这种写法雷同代码会越来越多,一方面不够简洁,另外一方面如果后面忘了在这里加入 elif 逻辑,会导致任务因找不到类型无法发起。

Celery提供了一种通过指定任务名来发起任务的方式,具体代码如下:

from main import app

def api(task_name, task_args):
    app.send_task(name=task_name, arags=task_args)


# 调用示例
api("task_a.add.add", (1, 2))