Ricky Hao

Celery 与 RabbitMQ:关于Exchange和Queue的那些事

RabbitMQ

RabbitMQ的消息流主要由两个部分组成:ExchangeQueue

Exchange

Exchange其实可以类比为一个交换机?其根据Exchange的类型和一些规则,来将消息分发到特定的Queue队列中。
RabbitMQ支持以下几种Exchange类型

Direct Exchange

将每个Message(消息)的routing_key与其下的Queue进行匹配,若一致则将Message下发到对应的Queue

Default Exchange

RabbitMQ预定义的一个Exchange,当一个MessageExchange空字符串时,就会交由这个Default Exchange去处理。且该Message会被路由到和它routing_key同名的Queue去。
每个Queue都与Default Exchange进行绑定,绑定的routing_keyQueue的名称

Topic Exchange

  • routing_key.分割成多个单词
  • *匹配一个单词
  • #匹配一个或零个单词
  • test.*匹配test.fun,不匹配test
  • test.#匹配test.funtest

Fanout Exchange

Message下发到其下的所有Queue里(相当于一次针对Queue的广播)

Headers Exchange

Topic Exchange类似,但是是按header参数进行匹配。
ExchangeQueueBinding中有一个特殊参数x-match,用来表示需要全部参数匹配还是部分参数匹配
* x-match = allheader的所有参数都要和Binding的条件匹配才行,默认
* x-match = anyheader只要有一个参数与Binding的条件匹配就行

若出现多个匹配的Queue,则将消息分发到每个匹配的Queue

Dead Letter Exchange

用于捕捉没有匹配到Queue的所有Message

Celery

Celery的配置有两个参数,分别是task_routestask_queues
当仅指定task_routes,会自动合适的生成QueueExchange,但是很多时候我们想要对ExchangeQueue进行定制化,这就到task_queues出场的时候了。

task_queues

task_queues的默认值为None,但若是我们需要对Queue进行定制化,则需要给他赋值:

app.conf.task_queues = [
    Queue(...),
    Queue(...)
]

task_routes

task_routes是一个用于描述task.name(任务名称)和Queue(队列)关系的字典。
当一个任务的task.name符合(完全匹配、正则匹配等)某个key时,该任务就会被分发到queue名称相对应的Queue

app.conf.task_routes = {
    'test': {'queue': 'test_queue'},
    'test.*' {'queue': 'test_prefix_queue', 'routing_key': 'test.a'},
    re.compile(r'\d{10}'): {'queue': '10_digits_queue'}
}

注意一点,这里的key仅用于和Celerytask.name进行匹配,而不是最后生成Messagerouting_key
最后生成的routing_key是由在task_queues里的Queue()对象进行指定的。

Exchange

用于生成特定类型的Exchange

from kombu import Exchange
direct_exchange = Exchange(name='direct_exchange', type='direct')
fanout_exchange = Exchange(name='fanout_exchange', type='fanout')
topic_exchange = Exchange(name='topic_exchange', type='topic')
headers_exchange = Exchange(name='headers_exchange', type='headers')

其还支持以下参数:
* durablt:在服务器重启后,是否保留该Exchange,默认为True
* auto_delete:在其下的所有Queue都完成之后,自动删除Exchange,默认为False
* delivery_mode:默认的消息分发方式
* 1:消息仅保留在内存里,重启丢失
* 2:消息保存在内存和硬盘里,重启不丢失,默认值
* arguments:其他一些参数(比如Headers Exchangex-match参数)

Queue

用于生成特定的Queue

from kombu import Queue
Queue(name='test', exchange=direct_exchange, routing_key='test')
Queue(name='topic.a', exchange=topic_exchange, routing_key='fanout.*')
Queue(name='fanout_queue', exchange=fanout_exchange)

Queue主要有以下参数:
* name:队列名称
* exchange:队列绑定的Exchange
* routing_key:队列绑定时采用的routing key,根据不同类型的Exchange而不同
* durable:服务器重启后是否保留队列,默认为True
* auto_delete:当所有的消费者完成之后,是否自动删除队列,默认为False
* expires:当队列不再被使用(没有任何消费者,没有被重新声明)一定时间后,自动删除队列
* message_ttl:队列内消息的ttl
* max_length:队列能够储存的消息总数量的上限
* mex_length_bytes:队列能储存的消息总大小的上限
* max_priority:队列的最大优先级

优先级相关

RabbitMQ原生支持优先级设定,但是默认不开启优先级
我们需要在声明Queue队列的时候,指定x-max-priority参数,才能开启优先级功能。
优先级范围是0 ~ 255,越大优先级越高,若x-max-priority设置越大,则越消耗服务器计算资源。

from kombu import Queue
Queue(name='test', exchange=direct_exchange, routing_key='test', max_priority=10)

这里的优先级其实指的是,在一个队列里,优先级高的消息先被消费。
若在Celery里,一个Task在没有指定priority,则该消息的优先级为0

参考

点赞

发表评论

电子邮件地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据