RabbitMQ
RabbitMQ的消息流主要由两个部分组成:Exchange和Queue。
Exchange
Exchange其实可以类比为一个交换机?其根据Exchange的类型和一些规则,来将消息分发到特定的Queue队列中。
RabbitMQ支持以下几种Exchange类型:
Direct Exchange
将每个Message(消息)的routing_key与其下的Queue进行匹配,若一致则将Message下发到对应的Queue
Default Exchange
由RabbitMQ预定义的一个Exchange,当一个Message的Exchange为空字符串时,就会交由这个Default Exchange去处理。且该Message会被路由到和它routing_key同名的Queue去。
每个Queue都与Default Exchange进行绑定,绑定的routing_key为Queue的名称
Topic Exchange
- 将
routing_key以.分割成多个单词 *匹配一个单词#匹配一个或零个单词test.*匹配test.fun,不匹配testtest.#匹配test.fun和test
Fanout Exchange
将Message下发到其下的所有Queue里(相当于一次针对Queue的广播)
Headers Exchange
跟Topic Exchange类似,但是是按header参数进行匹配。
在Exchange和Queue的Binding中有一个特殊参数x-match,用来表示需要全部参数匹配还是部分参数匹配:
* x-match = all:header的所有参数都要和Binding的条件匹配才行,默认
* x-match = any:header只要有一个参数与Binding的条件匹配就行
若出现多个匹配的Queue,则将消息分发到每个匹配的Queue。
Dead Letter Exchange
用于捕捉没有匹配到Queue的所有Message。
Celery
Celery的配置有两个参数,分别是task_routes和task_queues。
当仅指定task_routes,会自动合适的生成Queue和Exchange,但是很多时候我们想要对Exchange和Queue进行定制化,这就到task_queues出场的时候了。
task_queues
task_queues的默认值为None,但若是我们需要对Queue进行定制化,则需要给他赋值:
app.conf.task_queues = [
Queue(...),
Queue(...)
]
task_routes
task_routes是一个用于描述task.name(任务名称)和Queue(队列)关系的字典。
当一个任务的task.name符合(完全匹配、正则匹配等)某个key时,该任务就会被分发到queue名称相对应的Queue
app.conf.task_routes = {
'test': {'queue': 'test_queue'},
'test.*' {'queue': 'test_prefix_queue', 'routing_key': 'test.a'},
re.compile(r'\d{10}'): {'queue': '10_digits_queue'}
}
注意一点,这里的key仅用于和Celery的task.name进行匹配,而不是最后生成Message的routing_key。
最后生成的routing_key是由在task_queues里的Queue()对象进行指定的。
Exchange
用于生成特定类型的Exchange。
from kombu import Exchange
direct_exchange = Exchange(name='direct_exchange', type='direct')
fanout_exchange = Exchange(name='fanout_exchange', type='fanout')
topic_exchange = Exchange(name='topic_exchange', type='topic')
headers_exchange = Exchange(name='headers_exchange', type='headers')
其还支持以下参数:
* durablt:在服务器重启后,是否保留该Exchange,默认为True
* auto_delete:在其下的所有Queue都完成之后,自动删除Exchange,默认为False
* delivery_mode:默认的消息分发方式
* 1:消息仅保留在内存里,重启丢失
* 2:消息保存在内存和硬盘里,重启不丢失,默认值
* arguments:其他一些参数(比如Headers Exchange的x-match参数)
Queue
用于生成特定的Queue。
from kombu import Queue
Queue(name='test', exchange=direct_exchange, routing_key='test')
Queue(name='topic.a', exchange=topic_exchange, routing_key='fanout.*')
Queue(name='fanout_queue', exchange=fanout_exchange)
Queue主要有以下参数:
* name:队列名称
* exchange:队列绑定的Exchange
* routing_key:队列绑定时采用的routing key,根据不同类型的Exchange而不同
* durable:服务器重启后是否保留队列,默认为True
* auto_delete:当所有的消费者完成之后,是否自动删除队列,默认为False
* expires:当队列不再被使用(没有任何消费者,没有被重新声明)一定时间后,自动删除队列
* message_ttl:队列内消息的ttl
* max_length:队列能够储存的消息总数量的上限
* mex_length_bytes:队列能储存的消息总大小的上限
* max_priority:队列的最大优先级
优先级相关
RabbitMQ原生支持优先级设定,但是默认不开启优先级。
我们需要在声明Queue队列的时候,指定x-max-priority参数,才能开启优先级功能。
优先级范围是0 ~ 255,越大优先级越高,若x-max-priority设置越大,则越消耗服务器计算资源。
from kombu import Queue
Queue(name='test', exchange=direct_exchange, routing_key='test', max_priority=10)
这里的优先级其实指的是,在一个队列里,优先级高的消息先被消费。
若在Celery里,一个Task在没有指定priority,则该消息的优先级为0。