01_RabbitMQ安装及工作模式

一、消息队列MQ

中间件

1.1 什么是消息队列

消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。

消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

1.2 为何用消息队列

支付成功

service层功能:(操作)

1.修改订单状态;update 订单表

2.扣减库存;update 库存表

3.淘金币;update 淘金币表

4.发消息;A a,B b

5.发放优惠卷;insert 用户优惠卷表

业务service流程
支付成功:1,2,3,4,5|54321|24531
发货1
客服聊天4

从上面的描述中可以看出消息队列是一种应用间的异步协作机制,那什么时候需要使用 MQ 呢?

以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知。在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,比如发放红包、发短信通知等。这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑。

以上是用于业务解耦的情况,其它常见场景包括最终一致性、广播、错峰流控等等。

1.3 RabbitMQ

RabbitMQ是一个消息代理。核心思想:接收,保存,转发消息。是目前非常热门的一款消息中间件。

1.4 特点

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。

AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:

  1. 可靠性(Reliability)

    RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。

  2. 灵活的路由(Flexible Routing)

    在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。

  3. 消息集群(Clustering)

    多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。

  4. 高可用(Highly Available Queues)

    队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。

  5. 多种协议(Multi-protocol)

    RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。

  6. 多语言客户端(Many Clients)

    RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。

  7. 管理界面(Management UI)

    RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。

  8. 跟踪机制(Tracing)

    如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。

  9. 插件机制(Plugin System)

    RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。

应用场景

  • 异步处理:把所有消息保存在中间件中,等到需要处理的时候再出处理消息;

  • 流量销峰:短时间内访问量突然增加,使用mq,进行流量销峰,直接拒绝多余的请求;

  • 日志处理:

  • 应用程序解耦:A服务向B服务发送请求,B服务需要修改业务逻辑,A发送的请求全部保存在消息队列中先不处理,等到B服务器修改完成重新部署完成之后,再读取队列中的信息,对A服务器发送的请求进行处理。

1.4 RabbitMQ 中的概念模型

考试:

生产者:学生(提交试卷到FTP)

队列:FTP(保存试卷)

消费者:老师(从FTP上获取试卷,批改)

消息:试卷

消息模型

所有 MQ 产品从模型抽象上来说都是一样的过程:

消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。

  • producing的意思是发送。一个发送消息的程序叫做producer

  • queue,即队列,它由RabbitMQ管理。尽管消息会在你的应用和RabbitMQ之间流过,但他们只被保存在队列中。队列没有边界限制,你想存多少消息就能存多少——它本质上是一个无限制的缓冲区。一个队列可以接收多个producer的消息,也可以被多个consumer读取。

  • consuming的意思类似于接收。一个等待接收消息的程序叫做consumer

消息流

RabbitMQ 基本概念

上面只是最简单抽象的描述,具体到 RabbitMQ 则有更详细的概念需要解释。上面介绍过 RabbitMQ 是 AMQP 协议的一个开源实现,所以其内部实际上也是 AMQP 中的基本概念:

RabbitMQ 内部结构

  1. Message

    消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

  2. Publisher

    消息的生产者,也是一个向交换器发布消息的客户端应用程序。

  3. Exchange

    交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

  4. Binding

    绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

  5. Queue

    消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

  6. Connection

    网络连接,比如一个TCP连接。

  7. Channel

    信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

  8. Consumer

    消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

  9. Virtual Host

    虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

  10. Broker

    表示消息队列服务器实体。

RabbitMQ 常用交换器

Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct(直联)、fanout(扇形)、topic(主题)、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:

生产者A/B ---> 队列A/队列B --->消费者A/B/C

direct 交换器

消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。

fanout 交换器

每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。

topic 交换器

topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“

”。#匹配0个或多个单词,

匹配不多不少一个单词。

二、RabbitMQ 安装

2.1 tar安装

2.1.1 安装文件准备

上传文件到对应文件夹下

  • rabbitmq-server-generic-unix-3.6.1.tar.xz

2.1.2 安装Erlang

由于RabbitMQ是基于Erlang(面向高并发的语言)语言开发,所以在安装RabbitMQ之前,需要先安装Erlang。

安装编辑工具

yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel

创建文件夹erlang

makdir erlang
makdir rabbitmq

上传安装包并解压

tar -zxvf otp_src_22.0.tar.gz

进入解压目录下,配置安装路径

cd otp_src_22.0
​
./configure --prefix=/usr/local/erlang

安装

make & make install

配置erlang环境变量

echo 'export PATH=$PATH:/opt//erlang/bin' >> /etc/profile

刷新环境变量

source /etc/profile

检验是否安装成功

#输入命令检验是否安装成功,如下输出表示安装成功
erl
​
# 输入halt().命令退出
halt().

2.1.3 安装RabbitMQ

由于是tar.xz格式的所以需要用到xz,没有的话就先安装

yum install -y xz

第一次解压,解压为tar

/bin/xz -d rabbitmq-server-3.7.15.tar.xz

第二次解压

tar -xvf  rabbitmq-server-3.7.15.tar

改名

mv rabbitmq-server-3.7.15.tar  rabbitmq

配置环境变量

echo 'export PATH=$PATH:/opt/rabbitmq/rabbitmq/sbin' >> /etc/profile

刷新环境变量

source /etc/profile

2.2 rpm安装

tar包:压缩文件,windows中的zip包

rpm包:安装文件,windows中的安装包,.exe文件,自动处理软件之间的依赖关系

2.2.1 安装文件准备

2.2.2 安装Erlang

由于RabbitMQ是基于Erlang(面向高并发的语言)语言开发,所以在安装RabbitMQ之前,需要先安装Erlang。

执行rpm -ivh xxx.rpm安装

yum -y install epel-release
​
yum -y install socat
​
rpm -ivh erlang-23.2.1-1.el7.x86_64.rpm
​
rpm -ivh rabbitmq-server-3.8.14-1.el7.noarch.rpm

https://pkgs.org/download/libcrypto.so.1.1(OPENSSL_1_1_0)(64bit)

2.3 Rabbitmq命令

启动

启动:

-detached在后台启动Rabbit

rabbitmq-server -detached

停止:

rabbitmqctl stop

状态:

rabbitmqctl status

开启web插件

rabbitmq启动之后默认有很多插件可以使用∶

 rabbitmq-plugins list

rabbitmq-plugins enable rabbitmq_management

访问:http://127.0.0.1:15672/

默认账号密码:guest guest(这个账号只允许本机访问)

rabbitmq默认只能使用localhost访问,windows无法直接访问

添加管理员账号

角色:

administrator:管理员

managment:普通管理员

# 添加用户
# rabbitmqctl add_user Username Password
rabbitmqctl add_user rabbitadmin 123456
​
# 分配用户标签
# rabbitmqctl set_user_tags User Tag #[administrator]:管理员标签
rabbitmqctl set_user_tags rabbitadmin administrator

控制台

创建交换机

默认

创建队列

绑定

交换机和队列做绑定

三、RabbitMQ工作模式

3.1 引入依赖关系

        <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.11.0</version></dependency>

3.2 队列模式(消费者)

队列模式不需要生命交换机,使用交换机,交换机使用默认交换机(直连交换机,key=队列名字)

1. 简单队列

一个生产者对应一个消费者

获取连接

package test.mq;
​
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
​
import java.io.IOException;
import java.util.concurrent.TimeoutException;
​
public class ConnectionUtil {
​public static Connection getConnection() {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.190.130");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("sn");factory.setPassword("sn");
​Connection connection = null;try {connection = factory.newConnection();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}return connection;}
}
​

生产者

queueDeclare(name,durable,exclusive,autoDelete);

  • name: 队列的名称;

  • durable: 是否持久化;

  • exclusive: 是否独享、排外的;

  • autoDelete: 是否自动删除;

/*** 〈简单队列——消息生产者〉*/
public class Producer {private final static String QUEUE_NAME = "my-que";
​public static void main(String[] args) throws Exception {sendMessage();}
​public static void sendMessage() throws Exception {//1、获取连接Connection connection = ConnectionUtil.getConnection();//2、声明信道Channel channel = connection.createChannel();//3、声明(创建)队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//4、定义消息内容String message = "hello rabbitmq ";//5、发布消息channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("发送消息:" + message );//6、关闭通道channel.close();//7、关闭连接connection.close();}
}
​

管理工具中查看消息

消费者

/*** 〈消息消费者〉*/
public class Customer {
​private final static String QUEUE_NAME = "my-que";
​public static void main(String[] args) throws Exception {getMessage();
​}
​public static void getMessage() throws Exception {//1、获取连接Connection connection = ConnectionUtil.getConnection();//2、声明通道Channel channel = connection.createChannel();//3、声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//4、定义队列的消费者Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {String msgString = new String(body, "utf-8");System.out.println("接收的消息:" + msgString);}};//5、监听队列/*true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息已经成功消费false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态,并且服务器会认为该消费者已经挂掉,不会再给其发送消息,直到该消费者反馈。*/channel.basicConsume(QUEUE_NAME, true, consumer);}
}
​

2.Work模式

一个生产者,多个消费者,每个消费者获取到的消息唯一,默认轮询获取。

2.1 轮询分发

不关心速度,效率低,平均,123123123

生产者

/*** 〈轮询分发——生产者〉*/
public class Send {private static final String QUEUE_NAME = "my-que";
​public static void main(String args[]) throws Exception {//获取连接Connection connection = ConnectionUtil.getConnection();//获取channelChannel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);for (int i = 0; i < 50; i++) {String msg = "hello " + i;System.out.println("[mq] send:" + msg);channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());Thread.sleep(i * 20);}channel.close();connection.close();}
}

消费者

创建两个消费者

  • 消费者1:每接收一条消息后休眠1秒

  • 消费者2:每接收一条消息后休眠2秒

/*** 〈消费者1〉*/
public class Receive1 {private static final String QUEUE_NAME = "my-que";
​public static void main(String args[]) throws Exception {//获取连接Connection connection = ConnectionUtil.getConnection();//获取channel、Channel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//定义一个消费者Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "utf-8");System.out.println("[1] Receive1 msg:" + msg);try {// 注:消费者2修改为2秒,其他一样Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("[1] done");}}};boolean autoAck = true;channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
​
}

结果

work模式默认轮询分发,将消息队列中的消息,依次发送给所有消费者。一个消息只能被一个消费者获取。

2.2 公平分发

消费者关闭自动应答,开启手动回执,消费者完成业务接口方法后可以告知消息队列处理完成,消息队列从队列中取一条消息发送给消费者。

效率高的消费者消费消息多。

/** 〈消费者1〉*/
public class Receive1 {private static final String QUEUE_NAME = "my-que";
​public static void main(String args[]) throws Exception {//获取连接Connection connection = ConnectionUtil.getConnection();//获取channel、Channel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicQos(0,1,false);//定义一个消费者Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "utf-8");System.out.println("[1] Receive1 msg:" + msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("[1] done");//手动回执// 应答成功,basicAck(消息标识符,是否批量应答)// channel.basicAck(envelope.getDeliveryTag(), true);// 应答失败,basicReject(消息标识符,是否重新发送)channel.basicReject(envelope.getDeliveryTag(), false);}}};boolean autoAck = fasle;channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
​
}

3.3 交换器模式(交换机)

在RabbitMQ中,交换器主要有四种类型:direct、fanout、topic、headers

3.发布/订阅模式(Fanout)

未支付的订单--付款:消息(用户编号,商品编号,付款金额)

  • queue-a:修改订单状态 1s

  • queue-b:发消息 1s

  • queue-c:发红包 1s

一个生产者发送的消息会被多个消费者获取(一样的)。发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(binding)的所有的Queue上。这种模式不需要任何Routekey,需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以和多个Exchange绑定。如果接收到消息的Exchange没有与任何Queue绑定,则消息会丢失。

  

生产者

/*** 〈订阅模式——生产者〉*/
public class Send {private static final String EXCHANGE_NAME = "my-fanout-ex";
​public static void main(String args[]) throws Exception {//获取连接Connection connection = ConnectionUtil.getConnection();//获取channelChannel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//分发//发送消息String msg = "hello exchange";System.out.println("[mq] send:" + msg);channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());channel.close();connection.close();}
​
}

消费者

两个消费者绑定不同的队列,绑定相同的交换机

/*** 〈消费者1〉*/
public class Receive1 {private static final String QUEUE_NAME = "my-que-a";private static final String EXCHANGE_NAME = "my-fanout-ex";
​public static void main(String args[]) throws Exception {//获取连接Connection connection = ConnectionUtil.getConnection();//获取channelChannel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//绑定到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");//定义一个消费这Consumer consumer = new DefaultConsumer(channel)@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "utf-8");System.out.println("[1] Receive1 msg:" + msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("[1] done");}}};boolean autoAck = false;channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
​
}
/*** 〈消费者2〉*/
public class Receive1 {private static final String QUEUE_NAME = "my-que-b";private static final String EXCHANGE_NAME = "my-fanout-ex";
​public static void main(String args[]) throws Exception {//获取连接Connection connection = ConnectionUtil.getConnection();//获取channelChannel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//绑定到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");//定义一个消费这Consumer consumer = new DefaultConsumer(channel)@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "utf-8");System.out.println("[1] Receive1 msg:" + msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("[1] done");}}};boolean autoAck = false;channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
​
}

两个消费者获得了同一条消息。一个消息从交换机同时发送给了两个队列中,监听这两个队列的消费者消费了这个消息;如果没有队列绑定交换机,则消息将丢失。因为交换机没有存储能力,消息只能存储在队列中。

4.路由模式(Direct)

任何发送到Direct Exchange的消息都会被转发到RouteKey指定的Queue,这种模式下不需要将Exchange进行任何绑定(binding)操作,消息传递时需要一个RouteKey,可以简单的理解为要发送到的队列名字。如果vhost中不存在该队列名,消息会丢失。

也就是让消费者有选择性的接收消息

生产者

/*** 〈路由模式-消息发送者〉*/
public class Send {
​public static final String EXCHANGE_NAME = "my-direct-ex";public static final String ROUTING_KEY = "rt-a";
​public static void main(String args[]) throws Exception {//获取连接Connection connection = ConnectionUtil.getConnection();//获取channelChannel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(EXCHANGE_NAME, "direct");String msg = "route message ->" + ROUTING_KEY;System.out.println("对 " + ROUTING_KEY + " 发送消息:" + msg);channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes());//关闭连接channel.close();connection.close();}
​
}

消费者

两个消费者,绑定相同的交换机,不同的队列,不一样的路由

/*** 〈接收消息1〉*/
public class Receive1 {
​public static final String QUEUE_NAME = "my-que-a";public static final String EXCHANGE_NAME = "my-direct-ex";public static final String ROUTING_KEY_A = "rt-a";
​public static void main(String args[]) throws Exception {//获取连接Connection connection = ConnectionUtil.getConnection();//获取channelChannel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//设置预读取数channel.basicQos(1);//绑定交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_A);Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "utf-8");System.out.println(envelope.getRoutingKey() + " [1] Receive1 msg:" + msg);}};channel.basicConsume(QUEUE_NAME, true, consumer);}
​
}
/*** 〈接收消息2〉*/
public class Receive2 {
​public static final String QUEUE_NAME = "my-que-b";public static final String EXCHANGE_NAME = "my-direct-ex";public static final String ROUTING_KEY_A = "rt-a";public static final String ROUTING_KEY_B = "rt-b";
​public static void main(String args[]) throws Exception {//获取连接Connection connection = ConnectionUtil.getConnection();//获取channelChannel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//设置预读取数channel.basicQos(1);//绑定交换机和路由器,可以绑定多个路由channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_A);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_B);//定义消息消费者Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "utf-8");System.out.println(envelope.getRoutingKey() + " [2] Receive1 msg:" + msg);}};//接收消息channel.basicConsume(QUEUE_NAME, true, consumer);}
​
}

路由模式,是以路由规则为导向,引导消息存入符合规则的队列中。再由队列的消费者进行消费的。

5.主题模式(Topic)

上面的路由模式是根据路由key进行完整的匹配(完全相等才发送消息),这里的通配符模式通俗的来讲就是模糊匹配。在进行绑定时要提供一个该队列对应的主题。‘ # ’表示0个或若干个关键字,‘ * ’表示一个关键字。如果Exchange没有发现能够与RouteKey匹配的Queue,消息会丢失。

一个关键字是一个单词:root

/*** 〈主题模式-消息发送者〉*/
public class Send {
​public static final String EXCHANGE_NAME = "my-topic-ex";public static final String ROUTING_KEY = "rt-key";
​public static void main(String args[]) throws Exception {//获取连接Connection connection = ConnectionUtil.getConnection();//获取channelChannel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(EXCHANGE_NAME, "topic");String routingKey = ROUTING_KEY + ".publish";// String routingKey = ROUTING_KEY + ".add";// String routingKey = ROUTING_KEY + ".update";String msg = "route message ->" + routingKey;channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());System.out.println("对 " + routingKey + " 发送消息:" + msg);//关闭连接channel.close();connection.close();}
​
}

消费者

/*** 〈接收消息1〉*/
public class Receive1 {
​public static final String QUEUE_NAME = "my-que-a";public static final String EXCHANGE_NAME = "my-topic-ex";public static final String ROUTING_KEY = "rt-key";
​public static void main(String args[]) throws Exception {//获取连接Connection connection = ConnectionUtil.getConnection();//获取channelChannel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//设置预读取数channel.basicQos(1);//绑定交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY + ".add");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY + ".update");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "utf-8");System.out.println(envelope.getRoutingKey() + " [1] Receive1 msg:" + msg);}};channel.basicConsume(QUEUE_NAME, true, consumer);}
​
}
/*** 〈接收消息1〉*/
public class Receive1 {
​public static final String QUEUE_NAME = "my-que-b";public static final String EXCHANGE_NAME = "my-topic-ex";public static final String ROUTING_KEY = "rt-key";
​public static void main(String args[]) throws Exception {//获取连接Connection connection = ConnectionUtil.getConnection();//获取channelChannel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//设置预读取数channel.basicQos(1);//绑定交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY + ".add");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY + ".update");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "utf-8");System.out.println(envelope.getRoutingKey() + " [1] Receive1 msg:" + msg);}};channel.basicConsume(QUEUE_NAME, true, consumer);}
​
}

3.4 总结

  • 队列模式:关注是一个队列有几个消费者,发布者向队列发送消息(使用服务器默认得交换机,direct,key=队列名)

    • 简单队列模式:一个生产者一个队列一个消费者

    • Work模式:一个生产者一个队列多个消费者

      • 轮询分发:123123123消息评分分配

      • 公平分发:效率高得多分

  • 交换机模式:关注是接收消息得交换机类型,发布者向交换机发送消息

    • 发布订阅模式:fanout扇形交换机,没有routekey,所有和交换机绑定得队列都接收消息

    • 路由模式:direct直连交换机,有routekey(不能使用通配符),根据routekey对应队列接收消息

    • 主题模式:topic主题交换机,有routekey(使用通配符),根据routekey匹配对应队列接收消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/147168.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

鸿蒙开发(NEXT/API 12)【跨设备互通开发】远场通信服务

跨设备互通提供跨设备的相机、扫描、图库访问能力&#xff0c;平板或2in1设备可以调用手机的相机、扫描、图库等功能。 场景介绍 您通过此能力实现跨设备交互&#xff0c;可以使用其他设备的相机、扫描和图库功能。 约束与限制 需同时满足以下条件&#xff0c;才能使用该功…

COLORmap

在这段MATLAB代码中&#xff0c;surf(peaks)、map的定义以及colormap(map)的调用共同完成了以下任务&#xff1a; 1. **绘制曲面图**&#xff1a; - surf(peaks)&#xff1a;这个函数调用了MATLAB内置的peaks函数来生成数据&#xff0c;并使用surf函数将这些数据绘制成一个…

CSS 选择器的分类与使用要点二

目录 非 VIP 用户可前往公众号进行免费阅读 标签选择器 id 选择器 类选择器 介绍 公共类 CSS 中优先用 class 选择器,慎用 id 选择器 后代选择器 交集选择器 以标签名作为开头 以类名作为开头 连续交集 并集选择器(分组选择器) 通配符* 儿子选择器 >(IE7…

CSS 的继承性、层叠性与权重问题解析

目录 非 VIP 用户可前往公众号进行免费阅读 继承性 层叠性 CSS的权重问题 如果权重一样,以后出现的为准 以权重大的为准 没有选中,权重为0,就近原则 权重只和css顺序有关 非 VIP 用户可前往公众号进行免费阅读 CSS 的继承性、层叠性与权重问题解析本文主要介绍了 C…

AIGC8: 高通骁龙AIPC开发者大会记录B

图中是一个小男孩在市场卖他的作品。 AI应用开发出来之后&#xff0c;无论是个人开发者还是企业开发者。 如何推广分发是面临的大问题。 做出来的东西一定要符合商业规律。否则就是实验室里面的玩物&#xff0c;或者自嗨的东西。 背景 上次是回顾和思考前面两个硬件营销总的…

解决Python Debug没有反应的问题

应该有伙伴和我一样&#xff0c;用的2024版本的VS code&#xff0c;但是用到的python解释器是3.6.x&#xff0c;或者是更旧版本的Python. 想要进行Debug就会在扩展里面安装 一般安装就会安装最新版本&#xff0c;但是debug时又没有反应&#xff0c;其主要原因是Python的版本与…

Gin框架入门(2)--异常捕获与日志实现

异常捕获 Go语言的异常捕获采用的是延迟处理的方法实现的&#xff0c;实际上就是利用defer&#xff0c;panic和recover三个关键字和函数来实现的。 关键字 defer关键字(函数) 这个关键字在控制语句中就有所涉及&#xff0c;本质上是采用一个栈的存储结构&#xff0c;在整个…

时钟的配置

在使用51单片机时&#xff0c;系统使用的时钟源是一个外部晶体振荡器&#xff0c;频率为12M。由于51单片机每个指令周期都是12分频的&#xff0c;所以实际工作频率仅为1M。2440作为一种性能远高于51的Soc&#xff0c;主频肯定要远远高于51&#xff0c;因此2440有着比51单片机复…

yolov8模型在Xray图像中关键点检测识别中的应用【代码+数据集+python环境+GUI系统】

yolov8模型在X yolov8模型在Xray图像中关键点检测识别中的应用【代码数据集python环境GUI系统】 1.背景意义 X射线是一种波长极短、穿透能力极强的电磁波。当X射线穿透物体时&#xff0c;不同密度和厚度的物质会吸收不同程度的X射线&#xff0c;从而在接收端产生不同强度的信号…

pycharm加载虚拟环境及运行代码

pycharm加载虚拟环境及运行代码 pycharm下载地址&#xff1a; https://www.jetbrains.com/pycharm/download/ 1.加载虚拟环境 选择pycharm图标&#xff0c;点击启动。 选择OPEN, 选择工程文件夹&#xff1a; 选择File->setting 选择python 解释器&#xff1a; Project--…

扫码挪车是怎么实现的呢?一篇文章带你了解一下!扫码挪车小程序基础版上线了!!!

挪车小程序系统源码的功能特点 快速定位与挪车请求&#xff1a;车主通过小程序可以快速定位车辆位置&#xff0c;并发送挪车请求。系统会自动将请求发送给附近的车主&#xff0c;提醒其尽快挪车。实时通信与交互&#xff1a;小程序支持实时通信功能&#xff0c;车主之间可以通…

【C++笔记】C++编译器拷贝优化和内存管理

【C笔记】C编译器拷贝优化和内存管理 &#x1f525;个人主页&#xff1a;大白的编程日记 &#x1f525;专栏&#xff1a;C笔记 文章目录 【C笔记】C编译器拷贝优化和内存管理前言一.对象拷贝时的编译器优化二.C/C内存管理2.1练习2.2 C内存管理方式2.3 operator new与operator…

tornado

Tornado通过使用非阻塞网络1/0&#xff0c;可以扩展到数以万计的开放链接&#xff0c;非常适合 长时间轮询&#xff0c;WebSockets和其他需要与每个用户建立长期连接的应用程序。 特点 注重性能优越&#xff0c;速度快解决高并发异步非阻塞websockets 长连接内嵌了HTTP服务器…

十一、 JDK17 新特性梳理

文章目录 为什么是JDK17语法层面新特性1、文本块2 、Switch 表达式增强3、instanceof的模式匹配4、var 局部变量推导 模块化及类封装1、记录类 record2 、隐藏类 Hidden Classes3 、密封类 Sealed Classes4、模块化 Module System1 、什么是模块化2、声明一个module3 、require…

“智能密钥管家”IKE

IKE的出现 上一篇通过IPSec实现了BJ到CS的业务互通&#xff0c;但是是通过手工方式把加密和验证密钥手动配置&#xff0c;为了保障安全性&#xff0c;就需要经常去修改这些密钥&#xff0c;小型场景还好&#xff0c;来来回回就这2个点&#xff0c; 修改起来不算麻烦&#xff…

[Redis] 渐进式遍历+使用jedis操作Redis+使用Spring操作Redis

&#x1f338;个人主页:https://blog.csdn.net/2301_80050796?spm1000.2115.3001.5343 &#x1f3f5;️热门专栏: &#x1f9ca; Java基本语法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12615970.html?spm1001.2014.3001.5482 &#x1f355; Collection与…

解决SVN蓝色问号的问题

桌面或文件夹右键&#xff0c;选择TortoiseSVN->Settings打开设置对话框&#xff0c;选择Icon Overlays->Overlay Handlers->取消钩选Unversioned。确定&#xff0c;重启系统即可

CodeGeeX4:程序员的高效助手,多语言代码生成神器!

你是否曾在编写代码时&#xff0c;为复杂的语法、逻辑错误而绞尽脑汁&#xff1f;或是在面对多个编程语言的切换时&#xff0c;感觉脑子快要爆炸&#xff1f;别担心&#xff01;一款全新的多语言代码生成神器——CodeGeeX4&#xff0c;正悄然成为程序员们的“救命稻草”。它不仅…

【工具变量】地市环保法庭试点城市DID数据集(2005-2023年)

数据简介&#xff1a;环保法庭是中国司法体系中专门处理环境资源案件的审判机构&#xff0c;其主要职责包括审理涉及自然环境污染、矿产资源保护、自然资源环境开发等环境资源民事纠纷案件&#xff0c;对不服下级人民法院生效裁判的环境资源民事案件进行审查&#xff0c;以及对…

如何在Chrome最新浏览器中调用ActiveX控件?

小编最近登陆工商银行网上银行&#xff0c;发现工商银行的个人网银网页&#xff0c;由于使用了ActiveX安全控件&#xff0c;导致不能用高版本Chrome浏览器打开&#xff0c;目前只有使用IE或基于IE内核的浏览器才能正常登录网上银行&#xff0c;而IE已经彻底停止更新了&#xff…