多队列背景

在实际应用场景中,我们一个服务会有多种任务,有些任务耗时长,有些任务耗时短。如果所有的任务都是按顺序执行,可能会导致部分需要提前执行的任务,一直在等待前面的任务执行。

我们可以通过分队列的方式,把耗时长的任务放到一个专门的队列中,让它们慢慢执行,其他的任务则在默认的队列中执行。

举个例子,一个发送邮件的任务,发送通知邮件和告警邮件的执行逻辑是一样的,我们希望告警邮件通过一个专有队列发送,不要让大量的通知邮件阻塞在告警邮件前面。

指定队列的方法

现在我们来看两种指定任务队列的方法,一种是在定义任务的时候指定,通过这种方式,可以让某种类型的任务都发送到指定的队列中;另外一种是在发起任务的时候指定。

在定义任务后指定

我们可以在定义完任务之后,通过指定 app.conf.task_routes 参数的方式指定任务使用的队列,如下代码所示:

# main.py
from celery import Celery

app = Celery("demo", broker="redis://localhost:6379/0")
app.conf.task_routes = {
    "add": {"queue": "add_queue" }
}

@app.task(name="add")
def add(a, b):
    reutrn a + b

main.add 这个任务会在 add_queue 这个队列中排队执行。这里需要说明,队列可以不用事先声明,不存在的情况下会自动创建的。

我们还可以通过模糊匹配的方式,指定一系列的任务在特定的队列里面执行,如:

app.conf.task_routes = {
    "add": {"queue": "add_queue" },
    "send_emails.*": {"queue": "send_emails"}
}

这样只要任务名能够被 send_emails.* 匹配的任务,都会发送到 send_emails这个队列中。

在发起任务时指定

我们除了还可以在发起任务的时候,指定任务使用的队列。方法如下:

app.send("add", args=(1, 2), queue="add_queue")

我们也可以通过 task.apply_async() 方法指定任务使用的队列。如:

add.apply_async(args=(1, 2), queue="add")

这种方式指定的队列,优先级比 app.conf.task_routes 指定的队列高。

指定woker消费特定队列任务

我们可以在启动 worker 时,通过 -Q 参数指定 worker 消费的队列。如:

celery -A main worker -Q add

这样 worker就只会消费 add 这个队列中的任务。我们也可以指定一个 worker 消费多个队列中的任务。方法用 ,隔开队列名,如:

celery -A main worker -Q add,minus

定义任务优先级

Celery支持任务优先级,对于使用Redis作为 broker的情况,Celery的实现给不同优先级创建不同的队列,我们可以通过如下的配置,实现优先级的定义:

app.conf.broker_transport_options = {
    'priority_steps': list(range(10)),
    'sep': ':',
    'queue_order_strategy': 'priority',
}

上面的配置,定义了10个优先级,它们对应的队列名依次为

['celery', 'celery:1', 'celery:2', 'celery:3', 'celery:4', 'celery:5', 'celery:6', 'celery:7', 'celery:8', 'celery:9']

其中,优先级数字越小,代表优先级越高,celery 这个队列的优先级最高。需要特别说明,优先级数字的含义,与使用的 broker 有关,只有在Redis中,数字越小,优先级越大 。

指定任务优先级

我们可以在定义任务的时候,指定任务的优先级。

@app.task(priority=1)
def sum(a, b):
    return a + b

同样的,我们也可以在发起任务的时候,可以通过 priority 参数指定任务的优先级。

app.send("add", args=(1, 2), priority=0)

需要注意的是,在发起任务时指定的优先级,会覆盖掉任务定义时指定的优先级。