Celery 介绍

Celery是一个基于Python实现的任务调度框架。可以支持分布式的任务调度。

在我们继续下面的文章前,我们先明确一些术语:

  • 任务类型:是指一种任务,比如,给指定的人发送邮件,这是一种任务类型,同个任务类型可以有多个任务。
  • 任务:是指具体的任务,比如,给张三发送邮件,这是一个任务,它是属于 “给指定的人发送邮件” 这个类型的任务。
  • 定时任务:固定周期执行的一种任务类型,比如,每月1号发送统计邮件。
  • 即时任务:一旦产生就开始执行的一种任务类型,比如,给指定的人发送告警邮件。

Celery服务工作时,可以分成 workerbeat两个角色。我们可以起一个 beat进程。再起多个 worker进程,只要我们在 beat中添加任务,它就会分发给一个 worker节点去执行。

大家肯定会有疑问,如果 worker数量很少,任务执行起来又很慢,堆积的任务怎么办呢?不要怕,Celery可以使用Redis之类的存储组件来缓存这些任务,这些存储组件,在Celery中,称为 broker

安装

Celery可以直接使用pip进行安装:

pip install -U celery

在下面的例子中,我们使用了Redis,因此你需要安装Celery的Redis依赖。

pip install -U celery[redis]

任务注册与发起

注册任务

现在,我们来写一个最简单的任务。这个任务就是接收两个参数,返回它们的和。

# sum.py

from celery import Celery

app = Celery("my_celery", broker="redis://localhost:6379/0")

@app.task
def sum(a, b):
    return a + b

在这个 sum.py文件中,我们做了两件事情:

  1. 通过 Celery("my_celery", broker="redis://localhost:6379/0") 创建了一个 Celery实例,赋值给了 app。其中,broker参数指定了用来存储任务的存储组件为本地的Redis。
  2. 通过 @app.task这个注解,我们把 sum这个方法注册到了 app中,使它成了一个任务类型。

接下来,我们需要启动一个 worker进程,这个进程会去处理 sum任务。启动命令是:

celery -A sum worker --loglevel=INFO

成功启动后,我们会看到类似于下面的内容:

[2024-04-17 23:58:34,856: INFO/MainProcess] mingle: searching for neighbors
[2024-04-17 23:58:35,910: INFO/MainProcess] mingle: all alone
[2024-04-17 23:58:35,918: INFO/MainProcess] celery@YueLegion ready.

发起任务

接着我们新建一个 client.py文件,它负责发送任务。

from sum import sum

if __name__ == "__main__":
    sum.delay(3, 3)

通过 python client.py调用后,可以在worker执行窗口看到执行结果:

[2024-04-17 23:59:21,374: INFO/MainProcess] Task sum.sum[603a8b08-6b66-43ae-947b-70b5375facb1] received
[2024-04-17 23:59:21,375: INFO/ForkPoolWorker-14] Task sum.sum[603a8b08-6b66-43ae-947b-70b5375facb1] succeeded in 0.00023428399981639814s: 6

注册定时任务

上面的例子中,我们定义的是即时任务。每次发送一个任务,就会立刻被执行。在实际场景中,我们还有一些定时任务,比如每天凌晨1点做一个数据备份,这种我们总不能再另外写一个 crontab任务去发送命令吧。

对于这类定时任务,Celery也有添加任务的方法。

# crontab_tasks.py

from celery import Celery
from celery.schedules import crontab

app = Celery("crontab", broker="redis://localhost:6379/0")


@app.task(name="add")
def add(a, b):
    return a+b


app.conf.beat_schedule = {
    'crontab_add': {
        'task': 'add',
        'schedule': crontab(minute="*/1"),
        'args': (3, 3)
    }
}

接着我们启动一个 worker进程:

celery -A crontab_task worker -l INFO

再启动一个 beat进程:

celery -A crontab_task beat -l INFO

就可以看到 worker进程的窗口里面,每隔一分钟打印如下内容:

[2024-04-18 21:03:00,053: INFO/MainProcess] Task add[ac1b9d35-0a05-4001-95ee-a2dd9db7168d] received
[2024-04-18 21:03:00,054: INFO/ForkPoolWorker-14] Task add[ac1b9d35-0a05-4001-95ee-a2dd9db7168d] succeeded in 8.45889990159776e-05s: 6
[2024-04-18 21:04:00,050: INFO/MainProcess] Task add[ac72315d-c7ee-472c-bfde-619b92dd4873] received
[2024-04-18 21:04:00,051: INFO/ForkPoolWorker-14] Task add[ac72315d-c7ee-472c-bfde-619b92dd4873] succeeded in 5.806800072605256e-05s: 6