Celery入门指南(一)
/ 点击 / 阅读耗时 7 分钟Celery 介绍
Celery是一个基于Python实现的任务调度框架。可以支持分布式的任务调度。
在我们继续下面的文章前,我们先明确一些术语:
- 任务类型:是指一种任务,比如,给指定的人发送邮件,这是一种任务类型,同个任务类型可以有多个任务。
- 任务:是指具体的任务,比如,给张三发送邮件,这是一个任务,它是属于 “给指定的人发送邮件” 这个类型的任务。
- 定时任务:固定周期执行的一种任务类型,比如,每月1号发送统计邮件。
- 即时任务:一旦产生就开始执行的一种任务类型,比如,给指定的人发送告警邮件。
Celery服务工作时,可以分成 worker
和 beat
两个角色。我们可以起一个 beat
进程。再起多个 worker
进程,只要我们在 beat
中添加任务,它就会分发给一个 worker
节点去执行。
大家肯定会有疑问,如果 worker
数量很少,任务执行起来又很慢,堆积的任务怎么办呢?不要怕,Celery可以使用Redis之类的存储组件来缓存这些任务,这些存储组件,在Celery中,称为 broker
。
安装
Celery可以直接使用pip进行安装:
pip install -U celery
在下面的例子中,我们使用了Redis,因此你需要安装Celery的Redis依赖。
pip install -U celery[redis]
任务注册与发起
注册任务
现在,我们来写一个最简单的任务。这个任务就是接收两个参数,返回它们的和。
# sum.py
from celery import Celery
app = Celery("my_celery", broker="redis://localhost:6379/0")
@app.task
def sum(a, b):
return a + b
在这个 sum.py
文件中,我们做了两件事情:
- 通过
Celery("my_celery", broker="redis://localhost:6379/0")
创建了一个Celery
实例,赋值给了app
。其中,broker
参数指定了用来存储任务的存储组件为本地的Redis。 - 通过
@app.task
这个注解,我们把sum
这个方法注册到了app
中,使它成了一个任务类型。
接下来,我们需要启动一个 worker
进程,这个进程会去处理 sum
任务。启动命令是:
celery -A sum worker --loglevel=INFO
成功启动后,我们会看到类似于下面的内容:
[2024-04-17 23:58:34,856: INFO/MainProcess] mingle: searching for neighbors
[2024-04-17 23:58:35,910: INFO/MainProcess] mingle: all alone
[2024-04-17 23:58:35,918: INFO/MainProcess] celery@YueLegion ready.
发起任务
接着我们新建一个 client.py
文件,它负责发送任务。
from sum import sum
if __name__ == "__main__":
sum.delay(3, 3)
通过 python client.py
调用后,可以在worker执行窗口看到执行结果:
[2024-04-17 23:59:21,374: INFO/MainProcess] Task sum.sum[603a8b08-6b66-43ae-947b-70b5375facb1] received
[2024-04-17 23:59:21,375: INFO/ForkPoolWorker-14] Task sum.sum[603a8b08-6b66-43ae-947b-70b5375facb1] succeeded in 0.00023428399981639814s: 6
注册定时任务
上面的例子中,我们定义的是即时任务。每次发送一个任务,就会立刻被执行。在实际场景中,我们还有一些定时任务,比如每天凌晨1点做一个数据备份,这种我们总不能再另外写一个 crontab
任务去发送命令吧。
对于这类定时任务,Celery也有添加任务的方法。
# crontab_tasks.py
from celery import Celery
from celery.schedules import crontab
app = Celery("crontab", broker="redis://localhost:6379/0")
@app.task(name="add")
def add(a, b):
return a+b
app.conf.beat_schedule = {
'crontab_add': {
'task': 'add',
'schedule': crontab(minute="*/1"),
'args': (3, 3)
}
}
接着我们启动一个 worker
进程:
celery -A crontab_task worker -l INFO
再启动一个 beat
进程:
celery -A crontab_task beat -l INFO
就可以看到 worker
进程的窗口里面,每隔一分钟打印如下内容:
[2024-04-18 21:03:00,053: INFO/MainProcess] Task add[ac1b9d35-0a05-4001-95ee-a2dd9db7168d] received
[2024-04-18 21:03:00,054: INFO/ForkPoolWorker-14] Task add[ac1b9d35-0a05-4001-95ee-a2dd9db7168d] succeeded in 8.45889990159776e-05s: 6
[2024-04-18 21:04:00,050: INFO/MainProcess] Task add[ac72315d-c7ee-472c-bfde-619b92dd4873] received
[2024-04-18 21:04:00,051: INFO/ForkPoolWorker-14] Task add[ac72315d-c7ee-472c-bfde-619b92dd4873] succeeded in 5.806800072605256e-05s: 6