当前位置: 首页 > news >正文

【RabbitMQ消息队列】(二)交换机模式详解

交换机模式

在 RabbitMQ
中,交换机(Exchange)是消息路由的核心组件,它接收生产者发送的消息,并根据不同的规则将消息路由到一个或多个队列。RabbitMQ
提供了多种交换机模式,每种模式都有其独特的路由规则,下面详细介绍常见的几种交换机模式。

1. 直连交换机(Direct Exchange)

  • 路由规则:直连交换机根据消息的路由键(routing key)将消息路由到与之绑定键(binding key)匹配的队列。绑定键是在交换机和队列进行绑定时指定的。
  • 工作流程:生产者发送消息时,会指定一个路由键。
    直连交换机接收到消息后,会查找所有绑定到它的队列,并检查每个队列的绑定键是否与消息的路由键匹配。
    如果匹配,则将消息路由到该队列。
  • 应用场景:适用于根据消息的优先级、错误类型等进行消息路由的场景。例如,在一个日志处理系统中,可以根据日志的级别(如 infowarningerror)作为路由键,将不同级别的日志消息路由到不同的队列进行处理。

2. 扇形交换机(Fanout Exchange)

  • 路由规则:扇形交换机将接收到的消息广播到所有绑定到它的队列,不考虑消息的路由键。
  • 工作流程:生产者发送消息时,无需指定路由键(即使指定了也会被忽略)。
    扇形交换机接收到消息后,会将消息复制并发送到所有与之绑定的队列。
  • 应用场景:适用于需要将消息同时通知给多个消费者的场景,如系统的配置更新通知、实时新闻推送等。例如,当系统的配置发生变化时,生产者将配置更新消息发送到扇形交换机,所有绑定到该交换机的队列对应的消费者都会接收到该消息,从而进行相应的配置更新操作。

3. 主题交换机(Topic Exchange)

  • 路由规则:主题交换机根据消息的路由键和绑定键的模式匹配来进行消息路由。绑定键可以使用 *(匹配一个单词)和 #(匹配零个或多个单词)作为通配符。

  • 工作流程:生产者发送消息时,指定一个路由键,路由键通常由多个单词组成,用 . 分隔。
    主题交换机接收到消息后,会查找所有绑定到它的队列,并检查每个队列的绑定键是否与消息的路由键匹配。
    如果匹配,则将消息路由到该队列。

  • 应用场景:适用于根据消息的主题进行消息路由的场景,如电商系统中的商品分类、订单状态等。例如,在一个电商系统中,路由键可以是 product.category.clothesorder.status.paid,不同的消费者可以根据自己的需求订阅不同的主题,如 product.category.* 可以订阅所有商品分类的消息,order.status.# 可以订阅所有订单状态的消息。

4. 头交换机(Headers Exchange)

  • 路由规则:头交换机根据消息的头部属性(headers)和绑定的头部参数进行匹配来进行消息路由,而不是根据路由键。匹配规则可以是 all(所有头部参数都匹配)或 any(任意一个头部参数匹配)。
  • 工作流程:生产者发送消息时,除了消息内容外,还可以设置消息的头部属性。
    头交换机接收到消息后,会查找所有绑定到它的队列,并检查每个队列的绑定头部参数是否与消息的头部属性匹配。如果匹配,则将消息路由到该队列。
  • 应用场景:适用于需要根据消息的元数据进行消息路由的场景,如根据消息的版本号、语言等进行路由。例如,在一个多语言的系统中,可以根据消息的 language 头部属性将消息路由到不同的队列进行处理。
import pika# 建立与 RabbitMQ 服务器的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()# 声明不同类型的交换机
channel.exchange_declare(exchange='direct_exchange', exchange_type='direct')
channel.exchange_declare(exchange='fanout_exchange', exchange_type='fanout')
channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')
channel.exchange_declare(exchange='headers_exchange', exchange_type='headers')# 声明队列
channel.queue_declare(queue='queue1')
channel.queue_declare(queue='queue2')# 绑定队列到交换机
channel.queue_bind(queue='queue1', exchange='direct_exchange', routing_key='key1')
channel.queue_bind(queue='queue2', exchange='direct_exchange', routing_key='key2')channel.queue_bind(queue='queue1', exchange='fanout_exchange')
channel.queue_bind(queue='queue2', exchange='fanout_exchange')channel.queue_bind(queue='queue1', exchange='topic_exchange', routing_key='*.important')
channel.queue_bind(queue='queue2', exchange='topic_exchange', routing_key='news.#')headers = {'x-match': 'all', 'key': 'value'}
channel.queue_bind(queue='queue1', exchange='headers_exchange', arguments=headers)# 发送消息到不同的交换机
channel.basic_publish(exchange='direct_exchange', routing_key='key1', body='Direct message')
channel.basic_publish(exchange='fanout_exchange', routing_key='', body='Fanout message')
channel.basic_publish(exchange='topic_exchange', routing_key='sports.important', body='Topic message')headers = {'key': 'value'}
channel.basic_publish(exchange='headers_exchange', routing_key='', body='Headers message',properties=pika.BasicProperties(headers=headers))# 关闭连接
# channel.close()
# connection.close()

发布订阅模式(扇形交换机)

在消息队列系统(如 RabbitMQ)里,交换机的发布 - 订阅模式主要借助扇形交换机(Fanout Exchange)来实现。下面详细介绍其原理、特点、工作流程、应用场景以及示例代码。

  • 原理
    发布 - 订阅模式的核心原理是生产者将消息发布到交换机,交换机再把消息广播给所有与之绑定的队列,每个绑定的队列都有对应的消费者来接收消息。在这种模式下,生产者和消费者之间是松耦合的,生产者无需知道有哪些消费者会接收消息,只需将消息发送到交换机即可。
  • 特点
    • 广播机制:交换机接收到消息后,会把消息复制并发送给所有绑定到它的队列,不管队列的绑定键是什么,也不考虑消息的路由键。
    • 松耦合:生产者和消费者之间的依赖关系被弱化,生产者只负责向交换机发送消息,消费者只需要订阅感兴趣的交换机和队列,它们之间的交互通过交换机和队列来完成,这增强了系统的可扩展性和灵活性。
    • 多消费者接收:一个消息可以被多个消费者同时接收和处理,适用于需要将消息通知给多个相关方的场景。

工作流程

  1. 交换机声明:在 RabbitMQ 中,首先要声明一个扇形交换机。
  2. 队列声明:声明一个或多个队列,这些队列将用于存储从交换机接收的消息。
  3. 队列绑定:将声明的队列绑定到扇形交换机上,绑定过程不涉及绑定键。
  4. 消息发布:生产者将消息发送到扇形交换机,此时无需指定路由键(即便指定了也会被忽略)。
  5. 消息分发:扇形交换机接收到消息后,把消息复制并发送给所有绑定到它的队列。
  6. 消息消费:每个绑定队列的消费者从各自的队列中获取消息并进行处理。

应用场景

  • 系统通知:当系统有重要通知需要同时告知多个模块或用户时,可采用发布 - 订阅模式。例如,系统配置更新、服务器维护通知等。
  • 实时数据推送:在实时数据系统中,如股票行情、气象数据等,生产者将最新数据发布到交换机,多个客户端(消费者)订阅该交换机以获取实时数据。
  • 日志记录:应用程序可以将日志消息发布到交换机,不同的日志处理模块(如文件日志、数据库日志)可以作为消费者从队列中获取日志消息进行处理。

生产者

import pika# 建立与 RabbitMQ 服务器的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()# 声明扇形交换机
channel.exchange_declare(exchange='fanout_exchange', exchange_type='fanout')# 要发送的消息
message = "Hello, subscribers!"# 发布消息到扇形交换机,不指定路由键
channel.basic_publish(exchange='fanout_exchange', routing_key='', body=message)print(f" [x] Sent '{message}'")# 关闭连接
# channel.close()
# connection.close()

消费者

import pika# 建立与 RabbitMQ 服务器的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()# 声明扇形交换机
channel.exchange_declare(exchange='fanout_exchange', exchange_type='fanout')# 声明一个临时队列,队列名称由 RabbitMQ 自动生成
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue# 将队列绑定到扇形交换机
channel.queue_bind(exchange='fanout_exchange', queue=queue_name)print(' [*] Waiting for messages. To exit press CTRL+C')def callback(ch, method, properties, body):print(f" [x] Received '{body.decode()}'")# 开始消费消息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)channel.start_consuming()

生产者发送,所有的消费者都会收到

关键字模式(直连交换机)

直连交换机根据消息的路由键(routing key)队列绑定键(binding key)进行精确匹配来决定将消息路由到哪些队列,不支持通配符匹配。
直连交换机适用于需要根据消息的某些特定属性(如消息的优先级、错误类型等)进行精确消息路由的场景。例如,在一个订单处理系统中,可以根据订单的状态(如 paidshippedcancelled)作为路由键,将不同状态的订单消息路由到不同的队列进行处理。

生产者

import pika# 建立与 RabbitMQ 服务器的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()# 声明直连交换机
channel.exchange_declare(exchange='direct_exchange', exchange_type='direct')# 定义路由键和消息
routing_key = 'order.paid'
message = 'An order has been paid!'# 发布消息到直连交换机
channel.basic_publish(exchange='direct_exchange', routing_key=routing_key, body=message)print(f" [x] Sent '{routing_key}':'{message}'")# 关闭连接
# channel.close()
# connection.close()

消费者

import pika# 建立与 RabbitMQ 服务器的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()# 声明直连交换机
channel.exchange_declare(exchange='direct_exchange', exchange_type='direct')# 声明一个队列
channel.queue_declare(queue='paid_orders_queue')# 将队列绑定到直连交换机
channel.queue_bind(exchange='direct_exchange', queue='paid_orders_queue', routing_key='order.paid')
channel.queue_bind(exchange='direct_exchange', queue='paid_orders_queue', routing_key='order.shipped')
channel.queue_bind(exchange='direct_exchange', queue='paid_orders_queue', routing_key='order.cancelled')print(' [*] Waiting for messages. To exit press CTRL+C')def callback(ch, method, properties, body):print(f" [x] Received '{method.routing_key}':'{body.decode()}'")# 开始消费消息
channel.basic_consume(queue='paid_orders_queue', on_message_callback=callback, auto_ack=True)channel.start_consuming()
  • 路由键
    order.paid:表示订单已支付的消息。
    order.shipped:表示订单已发货的消息。
    order.cancelled:表示订单已取消的消息。

例如这里有3个消费者
在这里插入图片描述

# 那么生产者发送这个,消费者1和消费者2会收到
routing_key = 'order.paid'# 生产者发送这个,消费者3会收到
routing_key = 'order.shipped'# 生产者发送这个,消费者1会收到
routing_key = 'order.cancelled'

通配符模式(主题交换机)

主题交换机依据消息的路由键(routing key)绑定键(binding key)的模式匹配规则来决定将消息路由到哪些队列。绑定键可以使用特定的通配符,像 *(匹配一个单词)和 #(匹配零个或多个单词),以此实现灵活的消息路由。

生产者

import pika# 建立与 RabbitMQ 服务器的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()# 声明主题交换机
channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')# 定义路由键和消息
routing_key = 'news.sports.football'
message = 'Football news: A great match today!'# 发布消息到主题交换机
channel.basic_publish(exchange='topic_exchange', routing_key=routing_key, body=message)print(f" [x] Sent '{routing_key}':'{message}'")# 关闭连接
# channel.close()
# connection.close()

消费者

import pika# 建立与 RabbitMQ 服务器的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()# 声明主题交换机
channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')# 声明一个临时队列
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue# 定义绑定键
binding_key = 'news.sports.*'# 将队列绑定到主题交换机
channel.queue_bind(exchange='topic_exchange', queue=queue_name, routing_key=binding_key)print(' [*] Waiting for messages. To exit press CTRL+C')def callback(ch, method, properties, body):print(f" [x] Received '{method.routing_key}':'{body.decode()}'")# 开始消费消息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)channel.start_consuming()

例如这里有3个消费者
在这里插入图片描述

# 那么生产者发送这个,3个消费者都会收到
routing_key = 'news.sports.football'# 生产者发送这个,消费者2会收到
routing_key = 'game.sports.football'# 生产者发送这个,消费者3会收到
routing_key = 'news.sports.ball.football'
http://www.xdnf.cn/news/202375.html

相关文章:

  • MTKAndroid12-13-开机应用自启功能实现
  • 【差分隐私】目标扰动机制(Objective Perturbation)
  • Android平台Unity引擎的Mono JIT机制分析
  • 前端如何使用Mock模拟数据实现前后端并行开发,提升项目整体效率
  • 计算机视觉进化论:YOLOv12、YOLOv11与Darknet系YOLOv7的微调实战对比
  • 单片机-89C51部分:7、中断
  • ZYNQ-自定义呼吸灯IP核以及PS-PL数据发送接收
  • 【Java学习笔记】传参机制
  • Vue 2 中 Vue 实例对象(vm)的所有核心方法,包含完整示例、使用说明及对比表格
  • 【Java】 使用 HTTP 响应状态码定义web系统返回码
  • 继承(c++版 非常详细版)
  • linux 环境下 c++ 程序打印 core dump 信息
  • 滑动窗口模板
  • 【基础知识】常见的计算公式(一)
  • java借助NIO、链表、跳表模拟实现redis
  • CDGP|如何建立高效的数据治理团队?
  • 【强化学习系列】贝尔曼方程
  • mysql模糊多次OR查询某一个字段,针对这个字段进行查询分组
  • Marin说PCB之----板材的替换注意事项
  • vite创建vue3项目并进行配置
  • DIFY教程第三弹:构建一个智能生成图片的Agent
  • 【分布式系统中的“瑞士军刀”_ Zookeeper】三、Zookeeper 在实际项目中的应用场景与案例分析
  • openGauss DB4AI与scikit-learn模块对比探究
  • 基于强化学习的用于非刚性图像配准的引导式超声采集|文献速递-深度学习医疗AI最新文献
  • HTML标记语言_@拉钩教育【笔记】
  • 座舱系统香氛模块概念
  • 【Linux】第十一章 管理网络
  • COMEM光纤温度传感器Optocon:可靠稳定的温度监测方案
  • 2025三掌柜赠书活动第十五期:高并发系统:设计原理与实践
  • 跨语言哈希一致性:C# 与 Java 的 MD5 之战?