1. 7种工作模式介绍
1.1 Simple(简单模式)
- P: ⽣产者,也就是要发送消息的程序
- C: 消费者,消息的接收者
- Queue: 消息队列(图中⻩⾊背景部分)类似⼀个邮箱,可以缓存消息;⽣产者向其中投递消息,消费者从 其中取出消息
特点:
- ⼀个⽣产者P,⼀个消费者C, 消息只能被消费⼀次.
- 也称为点对点(Point-to-Point)模式.
适⽤场景:消息只能被单个消费者处理
1.2 WorkQueue(⼯作队列)
- ⼀个⽣产者P,多个消费者C1,C2.
- 在多个消息的情况下,WorkQueue会将消息分派给不同的消费者,每个消费者都会接收到不同的消息.
特点:
消息不会重复,分配给不同的消费者.
适⽤场景:集群环境中做异步处理
1.3 Publish/Subscribe(发布/订阅)
- X表⽰交换机
- ⼀个⽣产者P,多个消费者C1,C2,X代表交换机消息复制多份,每个消费者接收相同的消息
- ⽣产者发送⼀条消息,经过交换机转发到多个不同的队列,多个不同的队列就有多个不同的消费者
适合场景:消息需要被多个消费者同时接收的场景.如:实时通知或者⼴播消息
Exchange:
作⽤:
⽣产者将消息发送到Exchange,由交换机将消息按⼀定规则路由到⼀个或多个队列中(上图中⽣产者将消息投递到队列中,实际上这个在RabbitMQ中不会发⽣.)
RabbitMQ交换机有四种类型:fanout,direct,topic,headers,不同类型有着不同的路由策略:
①Fanout:⼴播,将消息交给所有绑定到交换机的队列(Publish/Subscribe模式)
②Direct:定向,把消息交给符合指定routingkey的队列(Routing模式)
③Topic:通配符,把消息交给符合routingpattern(路由模式)的队列(Topics模式)
④headers类型的交换器不依赖于路由键的匹配规则来路由消息,⽽是根据发送的消息内容中的 headers属性进⾏匹配.headers类型的交换器性能会很差,⽽且也不实⽤,基本上不会看到它的存在.
- Exchange(交换机)只负责转发消息,不具备存储消息的能⼒,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息就会丢失
- RoutingKey: 路由键.⽣产者将消息发给交换器时,指定的⼀个字符串,⽤来告诉交换机应该如何处理这个消息.
- BindingKey:绑定.RabbitMQ中通过Binding(绑定)将交换器与队列关联起来,在绑定的时候⼀般会指定⼀个BindingKey, 这样RabbitMQ就知道如何正确地将消息路由到队列了.
- 在使⽤绑定的时候,需要的路由键是BindingKey.
- 在发送消息的时候,需要的路由键是RoutingKey.
1.4 Routing(路由模式)
- 路由模式是发布订阅模式的变种,在发布订阅基础上,增加路由key
- Exchange根据RoutingKey的规则, 将数据筛选后发给对应的消费者队列
适合场景:需要根据特定规则分发消息的场景.
1.5 Topics(通配符模式)
Topics和Routing的基本原理相同
即:⽣产者将消息发给交换机,交换机根据RoutingKey将消息转发给与RoutingKey匹配的队列
不同之处是:routingKey的匹配⽅式不同,Routing模式是相等匹配,topics模式是通配符匹配.
适合场景:需要灵活匹配和过滤消息的场景
在topic类型的交换机在匹配规则上,有些要求:
①RoutingKey是⼀系列由点( . )分隔的单词,⽐如" " q uick.orange.rabbit " stock.usd.nyse ","
②BindingKey 和RoutingKey⼀样,也是点( . )分割的字符串.
③Binding Key中可以存在两种特殊字符串,⽤于模糊匹配
- *表⽰⼀个单词
- #表⽰多个单词(0-N个)
比如:
• BindingKey为"d.a.b"会同时路由到Q1和Q2
• BindingKey为"d.a.f"会路由到Q1
1.6 RPC(RPC通信)
- 客⼾端发送消息到⼀个指定的队列,并在消息属性中设置replyTo字段,这个字段指定了⼀个回调队 列,⽤于接收服务端的响应.
- 服务端接收到请求后,处理请求并发送响应消息到replyTo指定的回调队列
- 客⼾端在回调队列上等待响应消息.⼀旦收到响应,客⼾端会检查消息的correlationId属性,以确保它是所期望的响应.
1.7 Publisher Confirms(发布确认)
Publisher Confirms模式是RabbitMQ提供的⼀种确保消息可靠发送到RabbitMQ服务器的机制。
在这种模式下,⽣产者可以等待RabbitMQ服务器的确认,以确保消息已经被服务器接收并处理.
- ⽣产者将Channel设置为confirm模式(通过调⽤channel.confirmSelect()完成)后,发布的每⼀条消 息都会获得⼀个唯⼀的ID,⽣产者可以将这些序列号与消息关联起来,以便跟踪消息的状态.
- 当消息被RabbitMQ服务器接收并处理后,服务器会异步地向⽣产者发送⼀个确认(ACK)给⽣产者 (包含消息的唯⼀ID),表明消息已经送达.
通过PublisherConfirms模式,⽣产者可以确保消息被RabbitMQ服务器成功接收,从⽽避免消息丢失 的问题.
适⽤场景:对数据安全性要求较⾼的场景.⽐如⾦融交易,订单处理
2. 使用案例
public class Constants {public static final String HOST = "119.91.154.99";public static final int PORT = 5672;public static final String USER_NAME = "xuexue";public static final String PASSWORD = "xuexue";public static final String VIRTUAL_HOST = "bit";//工作队列模式public static final String WORK_QUEUE = "work.queue";//发布订阅模式public static final String FANOUT_EXCHANGE = "fanout.exchange";public static final String FANOUT_QUEUE1 = "fanout.queue1";public static final String FANOUT_QUEUE2 = "fanout.queue2";//路由模式public static final String DIRECT_EXCHANGE = "direct.exchange";public static final String DIRECT_QUEUE1 = "direct.queue1";public static final String DIRECT_QUEUE2 = "direct.queue2";//通配符模式public static final String TOPIC_EXCHANGE = "topic.exchange";public static final String TOPIC_QUEUE1 = "topic_queue1";public static final String TOPIC_QUEUE2 = "topic_queue2";//rpc 模式public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";//publisher confirmspublic static final String PUBLISHER_CONFIRMS_QUEUE1 = "publisher.confirms.queue1";public static final String PUBLISHER_CONFIRMS_QUEUE2 = "publisher.confirms.queue2";public static final String PUBLISHER_CONFIRMS_QUEUE3 = "publisher.confirms.queue3";//推拉模式public static final String MESSAGE_QUEUE = "message.queue";}
2.1 简单模式
无
2.2 WorkQueues(⼯作队列)
生产者:
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列 使用内置的交换机//如果队列不存在, 则创建, 如果队列存在, 则不创建channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);//4. 发送消息for (int i = 0; i < 10; i++) {String msg = "hello work queue...."+i;channel.basicPublish("",Constants.WORK_QUEUE, null, msg.getBytes());}System.out.println("消息发送成功~");//5. 资源释放channel.close();connection.close();}
}
消费者(两个一样):
public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列 使用内置的交换机//如果队列不存在, 则创建, 如果队列存在, 则不创建channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};//默认交换机,RoutingKey=队列名称channel.basicConsume(Constants.WORK_QUEUE, true, consumer);//5. 资源释放channel.close();connection.close();}
}
2.3 Publish/Subscribe(发布/订阅)
生产者:
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3.声明交换机channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT,true);//4. 声明队列channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);//5.交换机和队列绑定channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,"");channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");//6. 发布消息String msg = "hello fanout....";channel.basicPublish(Constants.FANOUT_EXCHANGE,"", null, msg.getBytes());System.out.println("消息发送成功");//7. 释放资源channel.close();connection.close();}
}
消费者:
public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);}
}
public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.FANOUT_QUEUE2, true, consumer);}
}
2.4 Routing(路由模式)
生产者:
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明交换机channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);//4. 声明队列channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);//5. 绑定交换机和队列channel.queueBind(Constants.DIRECT_QUEUE1,Constants.DIRECT_EXCHANGE,"a");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c");//6. 发送消息String msg = "hello direct, my routingkey is a....";channel.basicPublish(Constants.DIRECT_EXCHANGE,"a", null, msg.getBytes());String msg_b = "hello direct, my routingkey is b....";channel.basicPublish(Constants.DIRECT_EXCHANGE,"b", null, msg_b.getBytes());String msg_c = "hello direct, my routingkey is c....";channel.basicPublish(Constants.DIRECT_EXCHANGE,"c", null, msg_c.getBytes());System.out.println("消息发送成功");//7. 释放资源channel.close();connection.close();}
}
消费者:
public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = ((Connection) connection).createChannel();//3. 声明队列channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);}
}
public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = ((Connection) connection).createChannel();//3. 声明队列channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.DIRECT_QUEUE2, true, consumer);}
}
2.5 Topics(通配符模式)
生产者:
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = ((Connection) connection).createChannel();//3. 声明交换机channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);//4. 声明队列channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);//5. 绑定交换机和队列channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*");channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b");channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");//6. 发送消息String msg = "hello topic, my routingkey is ae.a.f....";channel.basicPublish(Constants.TOPIC_EXCHANGE,"ae.a.f", null, msg.getBytes()); //转发到Q1String msg_b = "hello topic, my routingkey is ef.a.b....";channel.basicPublish(Constants.TOPIC_EXCHANGE,"ef.a.b", null, msg_b.getBytes()); //转发到Q1和Q2String msg_c = "hello topic, my routingkey is c.ef.d....";channel.basicPublish(Constants.TOPIC_EXCHANGE,"c.ef.d", null, msg_c.getBytes());//转发Q2System.out.println("消息发送成功");//7. 释放资源channel.close();connection.close();}
}
消费者:
public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列channel.queueDeclare(Constants.TOPIC_QUEUE1,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE1, true, consumer);}
}
public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列channel.queueDeclare(Constants.TOPIC_QUEUE2,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE2, true, consumer);}
}
2.6 RPC(RPC通信)
客户端:
/*** 1.发送请求* 2.接收响应*/
public class Client {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException, IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3.声明队列channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);//4. 发送请求String msg = "hello rpc...";//设置请求的唯一标识String correlationID = UUID.randomUUID().toString();//设置请求的相关属性AMQP.BasicProperties props = new AMQP.BasicProperties().builder().correlationId(correlationID).replyTo(Constants.RPC_RESPONSE_QUEUE)//指定了⼀个回调队列,服务端处理后,会把响应结果发送到这个队列.build();//默认交换机,RoutingKey=队列名称//props属性channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());//4. 接收响应(校验ID)//使用阻塞队列, 来存储响应信息//不适用阻塞队列,很快就接收响应,但是响应还没有传过来final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String respMsg = new String(body);System.out.println("接收到回调消息:"+respMsg);if (correlationID.equals(properties.getCorrelationId())){//如果唯⼀标识正确, 放到阻塞队列中response.offer(respMsg);}}};channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);//获取回调的结果String result = response.take();System.out.println("[RPC Client 响应结果]:"+ result);}}
服务端:
/*** 1.接收请求* 2.发送响应*/public class Server {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 接收请求//设置服务端同时最多只能获取⼀个消息channel.basicQos(1);DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//将请求转化为stringString request = new String(body, "UTF-8");System.out.println("接收到请求:" + request);//响应String response = "针对request:" + request + ", 响应成功";AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());//消息应答channel.basicAck(envelope.getDeliveryTag(), false);}};//false:手动确认channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);}
}
RabbitMQ消息确定机制
在RabbitMQ中,basicConsume⽅法的autoAck参数⽤于指定消费者是否应该⾃动向消息队列确认 消息
- ⾃动确认(autoAck=true):消息队列在将消息发送给消费者后,会⽴即从内存中删除该消息.这意味着,如果消费者处理消息失败,消息将丢失,因为消息队列认为消息已经被成功消费
- ⼿动确认(autoAck=false):消息队列在将消息发送给消费者后,需要消费者显式地调⽤basicAck ⽅法来确认消息.⼿动确认提供了更⾼的可靠性,确保消息不会被意外丢失,适⽤于消息处理重要且需 要确保每个消息都被正确处理的场景
2.7 Publisher Confirms(发布确认)
消息丢失⼤概分为三种情况:
1. ⽣产者问题.因为应⽤程序故障,⽹络抖动等各种原因,⽣产者没有成功向broker发送消息.
2. 消息中间件⾃⾝问题.⽣产者成功发送给了Broker,但是Broker没有把消息保存好,导致消息丢失.
3. 消费者问题.Broker发送消息到消费者,消费者在消费消息时,因为没有处理好,导致broker将消费 失败的消息从队列中删除了.
针对问题1,可以采⽤发布确认(PublisherConfirms)机制实现:
⽣产者将信道设置成confirm(确认)模式,⼀旦信道进⼊confirm模式,所有在该信道上⾯发布的消息都 会被指派⼀个唯⼀的ID(从1开始),⼀旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送⼀个确认给⽣产者(包含消息的唯⼀ID)
这就使得⽣产者知道消息已经正确到达⽬的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写⼊磁盘之后发出
broker回传给⽣产者的确认消息中deliveryTag 包含了确认消息的序号,此外broker也可以设置channel.basicAck⽅法中的multiple参数,表⽰到这个序号之前的所有消息都已经得到了处理
发送⽅确认机制最⼤的好处在于它是异步的,⽣产者可以同时发布消息和等待信道返回确认消息.
当消息最终得到确认之后,⽣产者可以通过回调⽅法来处理该确认消息.
如果RabbitMQ因为⾃⾝内部错误导致消息丢失,就会发送⼀条nack(Basic.Nack)命令,⽣产者同样可以在回调⽅法中处理该nack命令.
public class PublisherConfirms {private static final Integer MESSAGE_COUNT = 100;static Connection createConnection() throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机return connectionFactory.newConnection();}public static void main(String[] args) throws Exception {//Strategy #1: Publishing Messages Individually//单独确认//publishingMessagesIndividually();//Strategy #2: Publishing Messages in Batches//批量确认//publishingMessagesInBatches();//Strategy #3: Handling Publisher Confirms Asynchronously//异步确认handlingPublisherConfirmsAsynchronously();}/*** 异步确认*/private static void handlingPublisherConfirmsAsynchronously() throws Exception {try (Connection connection = createConnection()){//1. 开启信道Channel channel = connection.createChannel();//2. 设置信道为confirm模式channel.confirmSelect();//3. 声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);//4. 监听confirm//创建有序集合.中存储的是未确认的消息IDlong start = System.currentTimeMillis();SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {if (multiple){//<=都清除//headSet返回<n的集合,但是这条也要被删除confirmSeqNo.headSet(deliveryTag+1).clear();}else {confirmSeqNo.remove(deliveryTag);}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {if (multiple){confirmSeqNo.headSet(deliveryTag+1).clear();}else {confirmSeqNo.remove(deliveryTag);}//业务需要根据实际场景进行处理, 比如重发, 此处代码省略}});//5.发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publisher confirms"+i;//获取要发送消息的序号long seqNo = channel.getNextPublishSeqNo();channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes());//放confirmSeqNo.add(seqNo);}while (!confirmSeqNo.isEmpty()){Thread.sleep(10);}long end = System.currentTimeMillis();System.out.printf("异步确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);}}/*** 批量确认*/private static void publishingMessagesInBatches() throws Exception{try(Connection connection = createConnection()) {//1. 开启信道Channel channel = connection.createChannel();//2. 设置信道为confirm模式channel.confirmSelect();//3. 声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);//4. 发送消息, 并进行确认long start = System.currentTimeMillis();int batchSize = 100;int outstandingMessageCount = 0;for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publisher confirms"+i;channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());outstandingMessageCount++;if (outstandingMessageCount==batchSize){channel.waitForConfirmsOrDie(5000);outstandingMessageCount = 0;}}if (outstandingMessageCount>0){channel.waitForConfirmsOrDie(5000);}long end = System.currentTimeMillis();System.out.printf("批量确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);}}/*** 单独确认*/private static void publishingMessagesIndividually() throws Exception {try (Connection connection = createConnection()){//1.开启新道Channel channel = connection.createChannel();//2.设置信道为confirm模式channel.confirmSelect();//3. 声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);//4.发送信息,并等待确认long start = System.currentTimeMillis();for(int i =0; i< MESSAGE_COUNT;i++){String msg = "hello publisher confirms"+i;channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());//等待确认channel.waitForConfirmsOrDie(5000);}long end = System.currentTimeMillis();System.out.printf("单独确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);}}}
异步确认:
Channel接⼝提供了⼀个⽅法addConfirmListener.这个⽅法可以添加ConfirmListener回调接⼝
deliveryTag 表⽰发送消息的序号
multiple 表⽰是否批量确认
我们需要为每⼀个Channel维护⼀个已发送消息的序号集合.当收到RabbitMQ的confirm回调时,从集 合中删除对应的消息.当Channel开启confirm模式后,channel上发送消息都会附带⼀个从1开始递增的 deliveryTag序号. 可以使⽤SortedSet的有序性来维护这个已发消息的集合.
- 当收到ack时,从序列中删除该消息的序号.如果为批量确认消息,表⽰⼩于等于当前序号 deliveryTag的消息都收到了,则清除对应集合
- 当收到nack时,处理逻辑类似,不过需要结合具体的业务情况,进⾏消息重发等操作.
对比:
消息数越多,异步确认的优势越明显
3. Spring Boot整合RabbitMQ
创建项⽬时, 加⼊依赖:
添加配置:
详情代码看idea
3.1 工作队列模式
声明队列(@Bean交给spring进行管理):
生产者:
消费者:
① Message message
②String message
此处返回的是message的具体内容
channel:
3.2 Publish/Subscribe(发布订阅模式)
声明队列,交换机:
绑定:
这里@Qualifier指定绑定的队列和交换机
发送消息:
3.3 Routing(路由模式)
此时绑定需要指定routingkey
注意:放到路径里,需要使用注解@PathVariable
3.4 Topics(通配符模式)
4. 基于SpringBoot+RabbitMQ完成应⽤通信
需求:
⽤⼾下单成功之后, 通知物流系统, 进⾏发货
订单系统:生产者
物流系统:消费者
创建项目
创建⼀个空的项⽬ rabbitmq-communication(其实就是⼀个空的⽂件夹),将两个项⽬放在⼀个项⽬中
生产者
配置
spring:rabbitmq:addresses: amqp://xuexue:xuexue@119.91.154.99:5672/order
server:port: 9090
声明队列
@Configuration
public class RabbitMQConfig {@Bean("orderQueue")public Queue orderQueue(){return QueueBuilder.durable("order.create").build();}
}
发送订单消息
@RequestMapping("/order")
@RestController
public class OrderController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/create")public String create(){//发送消息String orderId = UUID.randomUUID().toString();rabbitTemplate.convertAndSend("","order.create","订单信息,订单ID:"+orderId);return "下单成功";}}
启动服务,观察结果
119.91.154.99:15672
消费者
配置
spring:rabbitmq:addresses: amqp://xuexue:xuexue@119.91.154.99:5672/order
server:port: 8080
监听队列
@Component
public class OrderListener {@RabbitListener(queues = "order.create")public void handMessage(String orderInfo){System.out.println("接收到订单消息:"+orderInfo);}}
结果
发送消息格式为对象
如果通过 RabbitTemplate 发送⼀个对象作为消息, 我们需要对该对象进⾏序列化.
Spring AMQP推荐使⽤JSON序列化,Spring AMQP提供了 Jackson2JsonMessageConverter,我们需要把⼀个 MessageConverter 设置 到 RabbitTemplate 中
JSON序列化(生产者和消费者都要添加)
@Configuration
public class RabbitMQConfig {@Bean("orderQueue")public Queue orderQueue(){return QueueBuilder.durable("order.create").build();}@Beanpublic Jackson2JsonMessageConverter jackson2JsonMessageConverter(){return new Jackson2JsonMessageConverter();}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,Jackson2JsonMessageConverter jackson2JsonMessageConverter){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());return rabbitTemplate;}
}
定义对象
@Data
public class OrderInfo { private String orderId;private String name;
}
生产者
@RequestMapping("/order")
@RestController
public class OrderController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/create")public String create(){//发送消息String orderId = UUID.randomUUID().toString();rabbitTemplate.convertAndSend("","order.create","订单信息,订单ID:"+orderId);return "下单成功";}@RequestMapping("/create2")public String create2(){//发送消息OrderInfo orderInfo = new OrderInfo();orderInfo.setOrderId( UUID.randomUUID().toString());orderInfo.setName("价格"+new Random().nextInt(100));rabbitTemplate.convertAndSend("","order.create","订单信息,订单ID:"+orderInfo);return "下单成功";}
}
查看消息:
消费者
@Component
@RabbitListener(queues = "order.create")
public class OrderListener {@RabbitHandlerpublic void handMessage(String orderInfo){System.out.println("接收到订单消息String:"+orderInfo);}@RabbitHandlerpublic void handMessage(OrderInfo orderInfo){System.out.println("接收到订单消息OrderInfo:"+orderInfo);}
}
@RabbitListener(queues="order.create")可以加在类上,也可以加在方法上,⽤于定于⼀个类或者方法作为消息的监听器
@RabbitHandler是一个方法级别的注解,使用它该方法被调用处理特定的消息