Celery入门指南(三)
/ 点击 / 阅读耗时 6 分钟多队列背景
在实际应用场景中,我们一个服务会有多种任务,有些任务耗时长,有些任务耗时短。如果所有的任务都是按顺序执行,可能会导致部分需要提前执行的任务,一直在等待前面的任务执行。
我们可以通过分队列的方式,把耗时长的任务放到一个专门的队列中,让它们慢慢执行,其他的任务则在默认的队列中执行。
举个例子,一个发送邮件的任务,发送通知邮件和告警邮件的执行逻辑是一样的,我们希望告警邮件通过一个专有队列发送,不要让大量的通知邮件阻塞在告警邮件前面。
指定队列的方法
现在我们来看两种指定任务队列的方法,一种是在定义任务的时候指定,通过这种方式,可以让某种类型的任务都发送到指定的队列中;另外一种是在发起任务的时候指定。
在定义任务后指定
我们可以在定义完任务之后,通过指定 app.conf.task_routes
参数的方式指定任务使用的队列,如下代码所示:
# main.py
from celery import Celery
app = Celery("demo", broker="redis://localhost:6379/0")
app.conf.task_routes = {
"add": {"queue": "add_queue" }
}
@app.task(name="add")
def add(a, b):
reutrn a + b
main.add
这个任务会在 add_queue
这个队列中排队执行。这里需要说明,队列可以不用事先声明,不存在的情况下会自动创建的。
我们还可以通过模糊匹配的方式,指定一系列的任务在特定的队列里面执行,如:
app.conf.task_routes = {
"add": {"queue": "add_queue" },
"send_emails.*": {"queue": "send_emails"}
}
这样只要任务名能够被 send_emails.*
匹配的任务,都会发送到 send_emails
这个队列中。
在发起任务时指定
我们除了还可以在发起任务的时候,指定任务使用的队列。方法如下:
app.send("add", args=(1, 2), queue="add_queue")
我们也可以通过 task.apply_async()
方法指定任务使用的队列。如:
add.apply_async(args=(1, 2), queue="add")
这种方式指定的队列,优先级比 app.conf.task_routes
指定的队列高。
指定woker消费特定队列任务
我们可以在启动 worker
时,通过 -Q
参数指定 worker
消费的队列。如:
celery -A main worker -Q add
这样 worker
就只会消费 add
这个队列中的任务。我们也可以指定一个 worker
消费多个队列中的任务。方法用 ,
隔开队列名,如:
celery -A main worker -Q add,minus
定义任务优先级
Celery支持任务优先级,对于使用Redis作为 broker
的情况,Celery的实现给不同优先级创建不同的队列,我们可以通过如下的配置,实现优先级的定义:
app.conf.broker_transport_options = {
'priority_steps': list(range(10)),
'sep': ':',
'queue_order_strategy': 'priority',
}
上面的配置,定义了10个优先级,它们对应的队列名依次为
['celery', 'celery:1', 'celery:2', 'celery:3', 'celery:4', 'celery:5', 'celery:6', 'celery:7', 'celery:8', 'celery:9']
其中,优先级数字越小,代表优先级越高,celery
这个队列的优先级最高。需要特别说明,优先级数字的含义,与使用的 broker
有关,只有在Redis中,数字越小,优先级越大 。
指定任务优先级
我们可以在定义任务的时候,指定任务的优先级。
@app.task(priority=1)
def sum(a, b):
return a + b
同样的,我们也可以在发起任务的时候,可以通过 priority
参数指定任务的优先级。
app.send("add", args=(1, 2), priority=0)
需要注意的是,在发起任务时指定的优先级,会覆盖掉任务定义时指定的优先级。