【RabbitMQ消息队列】(二)交换机模式详解
交换机模式
在 RabbitMQ
中,交换机(Exchange)
是消息路由的核心组件,它接收生产者发送的消息,并根据不同的规则将消息路由到一个或多个队列。RabbitMQ
提供了多种交换机模式,每种模式都有其独特的路由规则,下面详细介绍常见的几种交换机模式。
1. 直连交换机(Direct Exchange)
- 路由规则:直连交换机根据消息的路由键(routing key)将消息路由到与之绑定键(binding key)匹配的队列。绑定键是在交换机和队列进行绑定时指定的。
- 工作流程:生产者发送消息时,会指定一个路由键。
直连交换机接收到消息后,会查找所有绑定到它的队列,并检查每个队列的绑定键是否与消息的路由键匹配。
如果匹配,则将消息路由到该队列。 - 应用场景:适用于根据消息的优先级、错误类型等进行消息路由的场景。例如,在一个日志处理系统中,可以根据日志的级别(如
info
、warning
、error
)作为路由键,将不同级别的日志消息路由到不同的队列进行处理。
2. 扇形交换机(Fanout Exchange)
- 路由规则:扇形交换机将接收到的消息广播到所有绑定到它的队列,不考虑消息的路由键。
- 工作流程:生产者发送消息时,无需指定路由键(即使指定了也会被忽略)。
扇形交换机接收到消息后,会将消息复制并发送到所有与之绑定的队列。 - 应用场景:适用于需要将消息同时通知给多个消费者的场景,如系统的配置更新通知、实时新闻推送等。例如,当系统的配置发生变化时,生产者将配置更新消息发送到扇形交换机,所有绑定到该交换机的队列对应的消费者都会接收到该消息,从而进行相应的配置更新操作。
3. 主题交换机(Topic Exchange)
-
路由规则:主题交换机根据消息的路由键和绑定键的模式匹配来进行消息路由。绑定键可以使用 *(匹配一个单词)和
#
(匹配零个或多个单词)作为通配符。 -
工作流程:生产者发送消息时,指定一个路由键,路由键通常由多个单词组成,用 . 分隔。
主题交换机接收到消息后,会查找所有绑定到它的队列,并检查每个队列的绑定键是否与消息的路由键匹配。
如果匹配,则将消息路由到该队列。 -
应用场景:适用于根据消息的主题进行消息路由的场景,如电商系统中的商品分类、订单状态等。例如,在一个电商系统中,路由键可以是
product.category.clothes
或order.status.paid
,不同的消费者可以根据自己的需求订阅不同的主题,如product.category.*
可以订阅所有商品分类的消息,order.status.#
可以订阅所有订单状态的消息。
4. 头交换机(Headers Exchange)
- 路由规则:头交换机根据消息的头部属性(headers)和绑定的头部参数进行匹配来进行消息路由,而不是根据路由键。匹配规则可以是
all
(所有头部参数都匹配)或any
(任意一个头部参数匹配)。 - 工作流程:生产者发送消息时,除了消息内容外,还可以设置消息的头部属性。
头交换机接收到消息后,会查找所有绑定到它的队列,并检查每个队列的绑定头部参数是否与消息的头部属性匹配。如果匹配,则将消息路由到该队列。 - 应用场景:适用于需要根据消息的元数据进行消息路由的场景,如根据消息的版本号、语言等进行路由。例如,在一个多语言的系统中,可以根据消息的
language
头部属性将消息路由到不同的队列进行处理。
import pika# 建立与 RabbitMQ 服务器的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()# 声明不同类型的交换机
channel.exchange_declare(exchange='direct_exchange', exchange_type='direct')
channel.exchange_declare(exchange='fanout_exchange', exchange_type='fanout')
channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')
channel.exchange_declare(exchange='headers_exchange', exchange_type='headers')# 声明队列
channel.queue_declare(queue='queue1')
channel.queue_declare(queue='queue2')# 绑定队列到交换机
channel.queue_bind(queue='queue1', exchange='direct_exchange', routing_key='key1')
channel.queue_bind(queue='queue2', exchange='direct_exchange', routing_key='key2')channel.queue_bind(queue='queue1', exchange='fanout_exchange')
channel.queue_bind(queue='queue2', exchange='fanout_exchange')channel.queue_bind(queue='queue1', exchange='topic_exchange', routing_key='*.important')
channel.queue_bind(queue='queue2', exchange='topic_exchange', routing_key='news.#')headers = {'x-match': 'all', 'key': 'value'}
channel.queue_bind(queue='queue1', exchange='headers_exchange', arguments=headers)# 发送消息到不同的交换机
channel.basic_publish(exchange='direct_exchange', routing_key='key1', body='Direct message')
channel.basic_publish(exchange='fanout_exchange', routing_key='', body='Fanout message')
channel.basic_publish(exchange='topic_exchange', routing_key='sports.important', body='Topic message')headers = {'key': 'value'}
channel.basic_publish(exchange='headers_exchange', routing_key='', body='Headers message',properties=pika.BasicProperties(headers=headers))# 关闭连接
# channel.close()
# connection.close()
发布订阅模式(扇形交换机)
在消息队列系统(如 RabbitMQ)里,交换机的发布 - 订阅模式主要借助扇形交换机(
Fanout Exchange
)来实现。下面详细介绍其原理、特点、工作流程、应用场景以及示例代码。
- 原理
发布 - 订阅模式的核心原理是生产者将消息发布到交换机,交换机再把消息广播给所有与之绑定的队列,每个绑定的队列都有对应的消费者来接收消息。在这种模式下,生产者和消费者之间是松耦合的,生产者无需知道有哪些消费者会接收消息,只需将消息发送到交换机即可。 - 特点
- 广播机制:交换机接收到消息后,会把消息复制并发送给所有绑定到它的队列,不管队列的绑定键是什么,也不考虑消息的路由键。
- 松耦合:生产者和消费者之间的依赖关系被弱化,生产者只负责向交换机发送消息,消费者只需要订阅感兴趣的交换机和队列,它们之间的交互通过交换机和队列来完成,这增强了系统的可扩展性和灵活性。
- 多消费者接收:一个消息可以被多个消费者同时接收和处理,适用于需要将消息通知给多个相关方的场景。
工作流程
- 交换机声明:在 RabbitMQ 中,首先要声明一个扇形交换机。
- 队列声明:声明一个或多个队列,这些队列将用于存储从交换机接收的消息。
- 队列绑定:将声明的队列绑定到扇形交换机上,绑定过程不涉及绑定键。
- 消息发布:生产者将消息发送到扇形交换机,此时无需指定路由键(即便指定了也会被忽略)。
- 消息分发:扇形交换机接收到消息后,把消息复制并发送给所有绑定到它的队列。
- 消息消费:每个绑定队列的消费者从各自的队列中获取消息并进行处理。
应用场景
- 系统通知:当系统有重要通知需要同时告知多个模块或用户时,可采用发布 - 订阅模式。例如,系统配置更新、服务器维护通知等。
- 实时数据推送:在实时数据系统中,如股票行情、气象数据等,生产者将最新数据发布到交换机,多个客户端(消费者)订阅该交换机以获取实时数据。
- 日志记录:应用程序可以将日志消息发布到交换机,不同的日志处理模块(如文件日志、数据库日志)可以作为消费者从队列中获取日志消息进行处理。
生产者
import pika# 建立与 RabbitMQ 服务器的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()# 声明扇形交换机
channel.exchange_declare(exchange='fanout_exchange', exchange_type='fanout')# 要发送的消息
message = "Hello, subscribers!"# 发布消息到扇形交换机,不指定路由键
channel.basic_publish(exchange='fanout_exchange', routing_key='', body=message)print(f" [x] Sent '{message}'")# 关闭连接
# channel.close()
# connection.close()
消费者
import pika# 建立与 RabbitMQ 服务器的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()# 声明扇形交换机
channel.exchange_declare(exchange='fanout_exchange', exchange_type='fanout')# 声明一个临时队列,队列名称由 RabbitMQ 自动生成
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue# 将队列绑定到扇形交换机
channel.queue_bind(exchange='fanout_exchange', queue=queue_name)print(' [*] Waiting for messages. To exit press CTRL+C')def callback(ch, method, properties, body):print(f" [x] Received '{body.decode()}'")# 开始消费消息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)channel.start_consuming()
生产者发送,所有的消费者都会收到
关键字模式(直连交换机)
直连交换机根据消息的路由键(routing key)
与队列绑定键(binding key)
进行精确匹配来决定将消息路由到哪些队列,不支持通配符匹配。
直连交换机适用于需要根据消息的某些特定属性(如消息的优先级、错误类型等)进行精确消息路由的场景。例如,在一个订单处理系统中,可以根据订单的状态(如 paid
、shipped
、cancelled
)作为路由键,将不同状态的订单消息路由到不同的队列进行处理。
生产者
import pika# 建立与 RabbitMQ 服务器的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()# 声明直连交换机
channel.exchange_declare(exchange='direct_exchange', exchange_type='direct')# 定义路由键和消息
routing_key = 'order.paid'
message = 'An order has been paid!'# 发布消息到直连交换机
channel.basic_publish(exchange='direct_exchange', routing_key=routing_key, body=message)print(f" [x] Sent '{routing_key}':'{message}'")# 关闭连接
# channel.close()
# connection.close()
消费者
import pika# 建立与 RabbitMQ 服务器的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()# 声明直连交换机
channel.exchange_declare(exchange='direct_exchange', exchange_type='direct')# 声明一个队列
channel.queue_declare(queue='paid_orders_queue')# 将队列绑定到直连交换机
channel.queue_bind(exchange='direct_exchange', queue='paid_orders_queue', routing_key='order.paid')
channel.queue_bind(exchange='direct_exchange', queue='paid_orders_queue', routing_key='order.shipped')
channel.queue_bind(exchange='direct_exchange', queue='paid_orders_queue', routing_key='order.cancelled')print(' [*] Waiting for messages. To exit press CTRL+C')def callback(ch, method, properties, body):print(f" [x] Received '{method.routing_key}':'{body.decode()}'")# 开始消费消息
channel.basic_consume(queue='paid_orders_queue', on_message_callback=callback, auto_ack=True)channel.start_consuming()
- 路由键:
order.paid
:表示订单已支付的消息。
order.shipped
:表示订单已发货的消息。
order.cancelled
:表示订单已取消的消息。
例如这里有3个消费者
# 那么生产者发送这个,消费者1和消费者2会收到
routing_key = 'order.paid'# 生产者发送这个,消费者3会收到
routing_key = 'order.shipped'# 生产者发送这个,消费者1会收到
routing_key = 'order.cancelled'
通配符模式(主题交换机)
主题交换机依据消息的路由键(routing key)
和绑定键(binding key)
的模式匹配规则来决定将消息路由到哪些队列。绑定键可以使用特定的通配符,像 *
(匹配一个单词)和 #
(匹配零个或多个单词),以此实现灵活的消息路由。
生产者
import pika# 建立与 RabbitMQ 服务器的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()# 声明主题交换机
channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')# 定义路由键和消息
routing_key = 'news.sports.football'
message = 'Football news: A great match today!'# 发布消息到主题交换机
channel.basic_publish(exchange='topic_exchange', routing_key=routing_key, body=message)print(f" [x] Sent '{routing_key}':'{message}'")# 关闭连接
# channel.close()
# connection.close()
消费者
import pika# 建立与 RabbitMQ 服务器的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()# 声明主题交换机
channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')# 声明一个临时队列
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue# 定义绑定键
binding_key = 'news.sports.*'# 将队列绑定到主题交换机
channel.queue_bind(exchange='topic_exchange', queue=queue_name, routing_key=binding_key)print(' [*] Waiting for messages. To exit press CTRL+C')def callback(ch, method, properties, body):print(f" [x] Received '{method.routing_key}':'{body.decode()}'")# 开始消费消息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)channel.start_consuming()
例如这里有3个消费者
# 那么生产者发送这个,3个消费者都会收到
routing_key = 'news.sports.football'# 生产者发送这个,消费者2会收到
routing_key = 'game.sports.football'# 生产者发送这个,消费者3会收到
routing_key = 'news.sports.ball.football'