RabbitMQ应用

1. 7种工作模式介绍

1.1  Simple(简单模式)

  • P: ⽣产者,也就是要发送消息的程序
  • C: 消费者,消息的接收者
  • Queue: 消息队列(图中⻩⾊背景部分)类似⼀个邮箱,可以缓存消息;⽣产者向其中投递消息,消费者从 其中取出消息

特点:

  • ⼀个⽣产者P,⼀个消费者C, 消息只能被消费⼀次.
  • 也称为点对点(Point-to-Point)模式.

适⽤场景:消息只能被单个消费者处理

1.2 WorkQueue(⼯作队列)

 

  • ⼀个⽣产者P,多个消费者C1,C2.
  • 在多个消息的情况下,WorkQueue会将消息分派给不同的消费者,每个消费者都会接收到不同的消息.

特点:

消息不会重复,分配给不同的消费者.

适⽤场景:集群环境中做异步处理

1.3 Publish/Subscribe(发布/订阅)

 

  • X表⽰交换机
  • ⼀个⽣产者P,多个消费者C1,C2,X代表交换机消息复制多份,每个消费者接收相同的消息
  • ⽣产者发送⼀条消息,经过交换机转发到多个不同的队列,多个不同的队列就有多个不同的消费者

适合场景:消息需要被多个消费者同时接收的场景.如:实时通知或者⼴播消息

Exchange:

作⽤:

⽣产者将消息发送到Exchange,由交换机将消息按⼀定规则路由到⼀个或多个队列中(上图中⽣产者将消息投递到队列中,实际上这个在RabbitMQ中不会发⽣.)

RabbitMQ交换机有四种类型:fanout,direct,topic,headers,不同类型有着不同的路由策略:

①Fanout:⼴播,将消息交给所有绑定到交换机的队列(Publish/Subscribe模式)

②Direct:定向,把消息交给符合指定routingkey的队列(Routing模式)

③Topic:通配符,把消息交给符合routingpattern(路由模式)的队列(Topics模式)

④headers类型的交换器不依赖于路由键的匹配规则来路由消息,⽽是根据发送的消息内容中的 headers属性进⾏匹配.headers类型的交换器性能会很差,⽽且也不实⽤,基本上不会看到它的存在.

  • Exchange(交换机)只负责转发消息,不具备存储消息的能⼒,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息就会丢失
  • RoutingKey: 路由键.⽣产者将消息发给交换器时,指定的⼀个字符串,⽤来告诉交换机应该如何处理这个消息.
  • BindingKey:绑定.RabbitMQ中通过Binding(绑定)将交换器与队列关联起来,在绑定的时候⼀般会指定⼀个BindingKey, 这样RabbitMQ就知道如何正确地将消息路由到队列了.
  • 在使⽤绑定的时候,需要的路由键是BindingKey.
  • 在发送消息的时候,需要的路由键是RoutingKey.

1.4 Routing(路由模式)

 

  • 路由模式是发布订阅模式的变种,在发布订阅基础上,增加路由key
  • Exchange根据RoutingKey的规则, 将数据筛选后发给对应的消费者队列

适合场景:需要根据特定规则分发消息的场景.

1.5 Topics(通配符模式)

 

Topics和Routing的基本原理相同

即:⽣产者将消息发给交换机,交换机根据RoutingKey将消息转发给与RoutingKey匹配的队列

不同之处是:routingKey的匹配⽅式不同,Routing模式是相等匹配,topics模式是通配符匹配.

适合场景:需要灵活匹配和过滤消息的场景

在topic类型的交换机在匹配规则上,有些要求:

①RoutingKey是⼀系列由点( . )分隔的单词,⽐如" " q uick.orange.rabbit " stock.usd.nyse ","

②BindingKey 和RoutingKey⼀样,也是点( . )分割的字符串.

③Binding Key中可以存在两种特殊字符串,⽤于模糊匹配

  •  *表⽰⼀个单词
  •  #表⽰多个单词(0-N个)

比如:

• BindingKey为"d.a.b"会同时路由到Q1和Q2

• BindingKey为"d.a.f"会路由到Q1

1.6 RPC(RPC通信)

 

  • 客⼾端发送消息到⼀个指定的队列,并在消息属性中设置replyTo字段,这个字段指定了⼀个回调队 列,⽤于接收服务端的响应.
  • 服务端接收到请求后,处理请求并发送响应消息到replyTo指定的回调队列
  • 客⼾端在回调队列上等待响应消息.⼀旦收到响应,客⼾端会检查消息的correlationId属性,以确保它是所期望的响应.

1.7 Publisher Confirms(发布确认)

 

Publisher Confirms模式是RabbitMQ提供的⼀种确保消息可靠发送到RabbitMQ服务器的机制。

在这种模式下,⽣产者可以等待RabbitMQ服务器的确认,以确保消息已经被服务器接收并处理.

  • ⽣产者将Channel设置为confirm模式(通过调⽤channel.confirmSelect()完成)后,发布的每⼀条消 息都会获得⼀个唯⼀的ID,⽣产者可以将这些序列号与消息关联起来,以便跟踪消息的状态.
  • 当消息被RabbitMQ服务器接收并处理后,服务器会异步地向⽣产者发送⼀个确认(ACK)给⽣产者 (包含消息的唯⼀ID),表明消息已经送达.

通过PublisherConfirms模式,⽣产者可以确保消息被RabbitMQ服务器成功接收,从⽽避免消息丢失 的问题.

适⽤场景:对数据安全性要求较⾼的场景.⽐如⾦融交易,订单处理

 

2. 使用案例

public class Constants {public static final String HOST = "119.91.154.99";public static final int PORT = 5672;public static final String USER_NAME = "xuexue";public static final String PASSWORD = "xuexue";public static final String VIRTUAL_HOST = "bit";//工作队列模式public static final String WORK_QUEUE = "work.queue";//发布订阅模式public static final String FANOUT_EXCHANGE = "fanout.exchange";public static final String FANOUT_QUEUE1 = "fanout.queue1";public static final String FANOUT_QUEUE2 = "fanout.queue2";//路由模式public static final String DIRECT_EXCHANGE = "direct.exchange";public static final String DIRECT_QUEUE1 = "direct.queue1";public static final String DIRECT_QUEUE2 = "direct.queue2";//通配符模式public static final String TOPIC_EXCHANGE = "topic.exchange";public static final String TOPIC_QUEUE1 = "topic_queue1";public static final String TOPIC_QUEUE2 = "topic_queue2";//rpc 模式public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";//publisher confirmspublic static final String PUBLISHER_CONFIRMS_QUEUE1 = "publisher.confirms.queue1";public static final String PUBLISHER_CONFIRMS_QUEUE2 = "publisher.confirms.queue2";public static final String PUBLISHER_CONFIRMS_QUEUE3 = "publisher.confirms.queue3";//推拉模式public static final String MESSAGE_QUEUE = "message.queue";}

2.1 简单模式

无 

2.2 WorkQueues(⼯作队列)

生产者:

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列   使用内置的交换机//如果队列不存在, 则创建, 如果队列存在, 则不创建channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);//4. 发送消息for (int i = 0; i < 10; i++) {String msg = "hello work queue...."+i;channel.basicPublish("",Constants.WORK_QUEUE, null, msg.getBytes());}System.out.println("消息发送成功~");//5. 资源释放channel.close();connection.close();}
}

消费者(两个一样): 

public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列   使用内置的交换机//如果队列不存在, 则创建, 如果队列存在, 则不创建channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};//默认交换机,RoutingKey=队列名称channel.basicConsume(Constants.WORK_QUEUE, true, consumer);//5. 资源释放channel.close();connection.close();}
}

2.3 Publish/Subscribe(发布/订阅)

生产者: 

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3.声明交换机channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT,true);//4. 声明队列channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);//5.交换机和队列绑定channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,"");channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");//6. 发布消息String msg = "hello fanout....";channel.basicPublish(Constants.FANOUT_EXCHANGE,"", null, msg.getBytes());System.out.println("消息发送成功");//7. 释放资源channel.close();connection.close();}
}

消费者:

public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);}
}
public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.FANOUT_QUEUE2, true, consumer);}
}

2.4 Routing(路由模式)

生产者:

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明交换机channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);//4. 声明队列channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);//5. 绑定交换机和队列channel.queueBind(Constants.DIRECT_QUEUE1,Constants.DIRECT_EXCHANGE,"a");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c");//6. 发送消息String msg = "hello direct, my routingkey is a....";channel.basicPublish(Constants.DIRECT_EXCHANGE,"a", null, msg.getBytes());String msg_b = "hello direct, my routingkey is b....";channel.basicPublish(Constants.DIRECT_EXCHANGE,"b", null, msg_b.getBytes());String msg_c = "hello direct, my routingkey is c....";channel.basicPublish(Constants.DIRECT_EXCHANGE,"c", null, msg_c.getBytes());System.out.println("消息发送成功");//7. 释放资源channel.close();connection.close();}
}

消费者: 

public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = ((Connection) connection).createChannel();//3. 声明队列channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);}
}

public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = ((Connection) connection).createChannel();//3. 声明队列channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.DIRECT_QUEUE2, true, consumer);}
}

2.5 Topics(通配符模式)

生产者:

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = ((Connection) connection).createChannel();//3. 声明交换机channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);//4. 声明队列channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);//5. 绑定交换机和队列channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*");channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b");channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");//6. 发送消息String msg = "hello topic, my routingkey is ae.a.f....";channel.basicPublish(Constants.TOPIC_EXCHANGE,"ae.a.f", null, msg.getBytes());  //转发到Q1String msg_b = "hello topic, my routingkey is ef.a.b....";channel.basicPublish(Constants.TOPIC_EXCHANGE,"ef.a.b", null, msg_b.getBytes()); //转发到Q1和Q2String msg_c = "hello topic, my routingkey is c.ef.d....";channel.basicPublish(Constants.TOPIC_EXCHANGE,"c.ef.d", null, msg_c.getBytes());//转发Q2System.out.println("消息发送成功");//7. 释放资源channel.close();connection.close();}
}

消费者: 

public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列channel.queueDeclare(Constants.TOPIC_QUEUE1,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE1, true, consumer);}
}
public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列channel.queueDeclare(Constants.TOPIC_QUEUE2,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE2, true, consumer);}
}

2.6 RPC(RPC通信)

客户端:

/*** 1.发送请求* 2.接收响应*/
public class Client {public  static void main(String[] args) throws IOException, TimeoutException, InterruptedException, IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3.声明队列channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);//4. 发送请求String msg = "hello rpc...";//设置请求的唯一标识String correlationID = UUID.randomUUID().toString();//设置请求的相关属性AMQP.BasicProperties props = new AMQP.BasicProperties().builder().correlationId(correlationID).replyTo(Constants.RPC_RESPONSE_QUEUE)//指定了⼀个回调队列,服务端处理后,会把响应结果发送到这个队列.build();//默认交换机,RoutingKey=队列名称//props属性channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());//4. 接收响应(校验ID)//使用阻塞队列, 来存储响应信息//不适用阻塞队列,很快就接收响应,但是响应还没有传过来final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String respMsg = new String(body);System.out.println("接收到回调消息:"+respMsg);if (correlationID.equals(properties.getCorrelationId())){//如果唯⼀标识正确, 放到阻塞队列中response.offer(respMsg);}}};channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);//获取回调的结果String result = response.take();System.out.println("[RPC Client 响应结果]:"+ result);}}

服务端:

/*** 1.接收请求* 2.发送响应*/public class Server {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 接收请求//设置服务端同时最多只能获取⼀个消息channel.basicQos(1);DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//将请求转化为stringString request = new String(body, "UTF-8");System.out.println("接收到请求:" + request);//响应String response = "针对request:" + request + ", 响应成功";AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());//消息应答channel.basicAck(envelope.getDeliveryTag(), false);}};//false:手动确认channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);}
}

RabbitMQ消息确定机制

在RabbitMQ中,basicConsume⽅法的autoAck参数⽤于指定消费者是否应该⾃动向消息队列确认 消息

  • ⾃动确认(autoAck=true):消息队列在将消息发送给消费者后,会⽴即从内存中删除该消息.这意味着,如果消费者处理消息失败,消息将丢失,因为消息队列认为消息已经被成功消费
  • ⼿动确认(autoAck=false):消息队列在将消息发送给消费者后,需要消费者显式地调⽤basicAck ⽅法来确认消息.⼿动确认提供了更⾼的可靠性,确保消息不会被意外丢失,适⽤于消息处理重要且需 要确保每个消息都被正确处理的场景

2.7 Publisher Confirms(发布确认)

消息丢失⼤概分为三种情况:

1. ⽣产者问题.因为应⽤程序故障,⽹络抖动等各种原因,⽣产者没有成功向broker发送消息.

2. 消息中间件⾃⾝问题.⽣产者成功发送给了Broker,但是Broker没有把消息保存好,导致消息丢失.

3. 消费者问题.Broker发送消息到消费者,消费者在消费消息时,因为没有处理好,导致broker将消费 失败的消息从队列中删除了.


针对问题1,可以采⽤发布确认(PublisherConfirms)机制实现:

⽣产者将信道设置成confirm(确认)模式,⼀旦信道进⼊confirm模式,所有在该信道上⾯发布的消息都 会被指派⼀个唯⼀的ID(从1开始),⼀旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送⼀个确认给⽣产者(包含消息的唯⼀ID)

这就使得⽣产者知道消息已经正确到达⽬的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写⼊磁盘之后发出

broker回传给⽣产者的确认消息中deliveryTag 包含了确认消息的序号,此外broker也可以设置channel.basicAck⽅法中的multiple参数,表⽰到这个序号之前的所有消息都已经得到了处理

发送⽅确认机制最⼤的好处在于它是异步的,⽣产者可以同时发布消息和等待信道返回确认消息.

当消息最终得到确认之后,⽣产者可以通过回调⽅法来处理该确认消息.

如果RabbitMQ因为⾃⾝内部错误导致消息丢失,就会发送⼀条nack(Basic.Nack)命令,⽣产者同样可以在回调⽅法中处理该nack命令. 

public class PublisherConfirms {private static final Integer MESSAGE_COUNT = 100;static Connection createConnection() throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机return connectionFactory.newConnection();}public static void main(String[] args) throws Exception {//Strategy #1: Publishing Messages Individually//单独确认//publishingMessagesIndividually();//Strategy #2: Publishing Messages in Batches//批量确认//publishingMessagesInBatches();//Strategy #3: Handling Publisher Confirms Asynchronously//异步确认handlingPublisherConfirmsAsynchronously();}/*** 异步确认*/private static void handlingPublisherConfirmsAsynchronously() throws Exception {try (Connection connection = createConnection()){//1. 开启信道Channel channel = connection.createChannel();//2. 设置信道为confirm模式channel.confirmSelect();//3. 声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);//4. 监听confirm//创建有序集合.中存储的是未确认的消息IDlong start = System.currentTimeMillis();SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {if (multiple){//<=都清除//headSet返回<n的集合,但是这条也要被删除confirmSeqNo.headSet(deliveryTag+1).clear();}else {confirmSeqNo.remove(deliveryTag);}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {if (multiple){confirmSeqNo.headSet(deliveryTag+1).clear();}else {confirmSeqNo.remove(deliveryTag);}//业务需要根据实际场景进行处理, 比如重发, 此处代码省略}});//5.发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publisher confirms"+i;//获取要发送消息的序号long seqNo = channel.getNextPublishSeqNo();channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes());//放confirmSeqNo.add(seqNo);}while (!confirmSeqNo.isEmpty()){Thread.sleep(10);}long end = System.currentTimeMillis();System.out.printf("异步确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);}}/*** 批量确认*/private static void publishingMessagesInBatches() throws Exception{try(Connection connection = createConnection()) {//1. 开启信道Channel channel = connection.createChannel();//2. 设置信道为confirm模式channel.confirmSelect();//3. 声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);//4. 发送消息, 并进行确认long start = System.currentTimeMillis();int batchSize = 100;int outstandingMessageCount = 0;for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publisher confirms"+i;channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());outstandingMessageCount++;if (outstandingMessageCount==batchSize){channel.waitForConfirmsOrDie(5000);outstandingMessageCount = 0;}}if (outstandingMessageCount>0){channel.waitForConfirmsOrDie(5000);}long end = System.currentTimeMillis();System.out.printf("批量确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);}}/*** 单独确认*/private static void publishingMessagesIndividually() throws Exception {try (Connection connection = createConnection()){//1.开启新道Channel channel = connection.createChannel();//2.设置信道为confirm模式channel.confirmSelect();//3. 声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);//4.发送信息,并等待确认long start = System.currentTimeMillis();for(int i =0; i< MESSAGE_COUNT;i++){String msg = "hello publisher confirms"+i;channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());//等待确认channel.waitForConfirmsOrDie(5000);}long end = System.currentTimeMillis();System.out.printf("单独确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);}}}

异步确认:

Channel接⼝提供了⼀个⽅法addConfirmListener.这个⽅法可以添加ConfirmListener回调接⼝

deliveryTag 表⽰发送消息的序号

multiple 表⽰是否批量确认

我们需要为每⼀个Channel维护⼀个已发送消息的序号集合.当收到RabbitMQ的confirm回调时,从集 合中删除对应的消息.当Channel开启confirm模式后,channel上发送消息都会附带⼀个从1开始递增的 deliveryTag序号. 可以使⽤SortedSet的有序性来维护这个已发消息的集合.

  • 当收到ack时,从序列中删除该消息的序号.如果为批量确认消息,表⽰⼩于等于当前序号 deliveryTag的消息都收到了,则清除对应集合
  • 当收到nack时,处理逻辑类似,不过需要结合具体的业务情况,进⾏消息重发等操作.

对比:

消息数越多,异步确认的优势越明显

 

3. Spring Boot整合RabbitMQ

 创建项⽬时, 加⼊依赖:

 

添加配置:

详情代码看idea

 

 

3.1 工作队列模式

 

声明队列(@Bean交给spring进行管理):

生产者:

 

消费者:

① Message message

②String message 

 

此处返回的是message的具体内容

channel:

 

3.2  Publish/Subscribe(发布订阅模式)

 

声明队列,交换机:

绑定:

这里@Qualifier指定绑定的队列和交换机

 

发送消息:

 

 

 

3.3 Routing(路由模式)

 

此时绑定需要指定routingkey 

 

注意:放到路径里,需要使用注解@PathVariable

 

 

3.4 Topics(通配符模式)

 

 

 

 

4. 基于SpringBoot+RabbitMQ完成应⽤通信

需求:

⽤⼾下单成功之后, 通知物流系统, 进⾏发货

 

订单系统:生产者

物流系统:消费者

创建项目

创建⼀个空的项⽬ rabbitmq-communication(其实就是⼀个空的⽂件夹),将两个项⽬放在⼀个项⽬中

生产者

配置 

spring:rabbitmq:addresses: amqp://xuexue:xuexue@119.91.154.99:5672/order
server:port: 9090

声明队列

@Configuration
public class RabbitMQConfig {@Bean("orderQueue")public Queue orderQueue(){return QueueBuilder.durable("order.create").build();}
}

发送订单消息

 

@RequestMapping("/order")
@RestController
public class OrderController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/create")public String create(){//发送消息String orderId = UUID.randomUUID().toString();rabbitTemplate.convertAndSend("","order.create","订单信息,订单ID:"+orderId);return "下单成功";}}

启动服务,观察结果 

119.91.154.99:15672

 

 

 

 

消费者

配置

spring:rabbitmq:addresses: amqp://xuexue:xuexue@119.91.154.99:5672/order
server:port: 8080

 

监听队列 

@Component
public class OrderListener {@RabbitListener(queues = "order.create")public void handMessage(String orderInfo){System.out.println("接收到订单消息:"+orderInfo);}}

 结果

  

发送消息格式为对象

如果通过 RabbitTemplate 发送⼀个对象作为消息, 我们需要对该对象进⾏序列化.

Spring AMQP推荐使⽤JSON序列化,Spring AMQP提供了 Jackson2JsonMessageConverter,我们需要把⼀个 MessageConverter 设置 到 RabbitTemplate 中

JSON序列化(生产者和消费者都要添加)

@Configuration
public class RabbitMQConfig {@Bean("orderQueue")public Queue orderQueue(){return QueueBuilder.durable("order.create").build();}@Beanpublic Jackson2JsonMessageConverter jackson2JsonMessageConverter(){return new Jackson2JsonMessageConverter();}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,Jackson2JsonMessageConverter jackson2JsonMessageConverter){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());return rabbitTemplate;}
}

定义对象

@Data
public class OrderInfo  { private String orderId;private String name;
}

生产者

@RequestMapping("/order")
@RestController
public class OrderController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/create")public String create(){//发送消息String orderId = UUID.randomUUID().toString();rabbitTemplate.convertAndSend("","order.create","订单信息,订单ID:"+orderId);return "下单成功";}@RequestMapping("/create2")public String create2(){//发送消息OrderInfo orderInfo  = new OrderInfo();orderInfo.setOrderId( UUID.randomUUID().toString());orderInfo.setName("价格"+new Random().nextInt(100));rabbitTemplate.convertAndSend("","order.create","订单信息,订单ID:"+orderInfo);return "下单成功";}
}

查看消息: 

消费者 

@Component
@RabbitListener(queues = "order.create")
public class OrderListener {@RabbitHandlerpublic void handMessage(String orderInfo){System.out.println("接收到订单消息String:"+orderInfo);}@RabbitHandlerpublic void handMessage(OrderInfo orderInfo){System.out.println("接收到订单消息OrderInfo:"+orderInfo);}
}

@RabbitListener(queues="order.create")可以加在类上,也可以加在方法上,⽤于定于⼀个类或者方法作为消息的监听器

@RabbitHandler是一个方法级别的注解,使用它该方法被调用处理特定的消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/7567.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

从Java中使用new 关键字创建对象开始,深度剖析对象结构与存储

文章目录 1.对象结构2.扩展补充3.小结 1.对象结构 在介绍之前.先来看一个java高频面试题&#xff0c;new String(hello") 创建了几个对象&#xff1f; 1.这里分情况讨论&#xff0c;如果hello已经在常量池中存在&#xff0c;那么就是在堆中创建1个对象&#xff0c;并返回…

ThreadLocal 的原理和使用场景

1.ThreadLocal是什么 ThreadLocal 是 Java 提供的一个用于线程存储本地变量的类。它为每个线程提供独立的变量副本&#xff0c;确保变量在多线程环境下的线程安全。每个线程访问 ThreadLocal 时&#xff0c;都会有自己专属的变量副本&#xff0c;互不干扰&#xff0c;避免了并…

qt QColorDialog详解

1、概述 QColorDialog是Qt框架中的一个对话框类&#xff0c;专门用于让用户选择颜色。它提供了一个标准的颜色选择界面&#xff0c;其中包括基本的颜色选择器&#xff08;如调色板和颜色轮&#xff09;、自定义颜色输入区域以及预定义颜色列表。QColorDialog支持RGB、HSV和十六…

关于Redis

Redis 基础 什么是 Redis&#xff1f; Redis &#xff08;REmote DIctionary Server&#xff09;是一个基于 C 语言开发的开源 NoSQL 数据库&#xff08;BSD 许可&#xff09;。与传统数据库不同的是&#xff0c;Redis 的数据是保存在内存中的&#xff08;内存数据库&#xf…

linux nvidia/cuda安装

1.查看显卡型号 lspci |grep -i vga2.nvidia安装 2.1在线安装 终端输入&#xff08;当显卡插上之后&#xff0c;系统会有推荐的安装版本&#xff09; ubuntu-drivers devices可得到如下内容 vendor : NVIDIA Corporation model : TU104GL [Tesla T4] driver : nvid…

uniapp 实现瀑布流

效果演示 组件下载 瀑布流布局-waterfall - DCloud 插件市场

了解聚簇索引和非聚簇索引

在关系型数据库中,索引是提高查询效率的重要手段。索引类似于书籍中的目录,能够帮助数据库快速定位到所需的数据。而在数据库中,最常用的两种索引类型是聚簇索引(Clustered Index)和非聚簇索引(Non-clustered Index)。本文将详细介绍这两种索引类型,帮助读者更好地理解…

CODESYS可视化桌面屏保-动态气泡制作详细案例

#一个用于可视化(HMI)界面的动态屏保的详细制作案例程序# 前言: 在工控自动化设备上,为了防止由于人为误触发或操作引起的故障,通常在触摸屏(HMI)增加屏幕保护界面,然而随着PLC偏IT化的发展,在控制界面上的美观程度也逐渐向上位机或网页前端方面发展,本篇模仿Windows…

【数据结构 队列】超详细理解例题

数据结构 队列 前言队列的定义队列的概念队列的基本操作 队列用C语言实现Queue.hQueue.ctext.c 队列 VS 栈队列的应用 你好&#xff0c;这里是新人 Sunfor 这篇是我最近对于数据结构 队列的学习心得和错题整理 有任何错误欢迎指正&#xff0c;欢迎交流&#xff01; 会持续更新…

VSCode + linux 远程免密登录

目录 一. VS Code端1. 安装插件Remote - SSH2. 配置config文件3. 公钥生成 二、远程服务器端1. 将生成的公钥发送到远程服务器 三、连接1. 准备就绪后&#xff0c;VSCode连接 一. VS Code端 1. 安装插件Remote - SSH 2. 配置config文件 Host H5WebHostName xx.xx.xx.xxUser ro…

简单分享一下淘宝商品数据自动化抓取的技术实现与挑战

在电子商务领域&#xff0c;数据是驱动决策的关键。淘宝作为国内最大的电商平台之一&#xff0c;其商品数据对电商从业者来说具有极高的价值。然而&#xff0c;从淘宝平台自动化抓取商品数据并非易事&#xff0c;涉及多重技术和法律挑战。本文将从技术层面分析实现淘宝商品数据…

初识网络编程

目录 前言相关名词解释应用层协议——HTTP传输层协议socketTCP帧头格式三次握手、四次挥手 UDPTCP的socket实现 参考博文 前言 刚碰到网络编程&#xff0c;会出现一堆协议、概念、这层次那技术的&#xff0c;头都大了&#xff0c;还是得总结总结…… 相关名词解释 ✨✨网络…

JRTPLIB中的RTPSession与OnSSRCCollision:深入解析SSRC冲突处理机制

JRTPLIB中的RTPSession与OnSSRCCollision:深入解析SSRC冲突处理机制 一、RTP与SSRC基础1.1 RTP简介1.2 SSRC的作用二、JRTPLIB与RTPSession2.1 JRTPLIB概述2.2 RTPSession类三、SSRC冲突与OnSSRCCollision3.1 SSRC冲突的原因3.2 OnSSRCCollision回调函数3.3 OnSSRCCollision的…

【数据集】【YOLO】【目标检测】口罩佩戴识别数据集 1971 张,YOLO佩戴口罩检测算法实战训练教程!

数据集介绍 【数据集】口罩佩戴检测数据集 1971 张&#xff0c;目标检测&#xff0c;包含YOLO/VOC格式标注。 数据集中包含1种分类&#xff1a;{0: face_mask}&#xff0c;佩戴口罩。 数据集来自国内外图片网站和视频截图。 检测场景为城市街道、医院、商场、机场、车站、办…

实测讯飞智作,一张照片定制属于自己的数字人

Datawhale亲测 AI应用&#xff1a;讯飞智作 只用一张照片&#xff0c;就可以定制属于自己的数字人。 这是大模型给数字人领域带来的最新震撼。 就在两周前的 AI 开发者 Talk 合肥站活动上&#xff0c;我们 Datawhale 的一名小伙伴玉鑫化身成数字人亮相大屏幕&#xff0c;为参加…

乡村景区一体化系统(门票,餐饮,便利店,果园,娱乐,停车收费

一、一体化优势 1. 提升游客体验&#xff1a;游客可以通过一个系统方便地完成各种消费和预订&#xff0c;无需在不同的地方分别处理&#xff0c;节省时间和精力&#xff0c;使游玩过程更加顺畅和愉快。 2. 提高管理效率&#xff1a;景区管理者能够在一个平台上集中管理多个业…

安卓编程最方便的读写资料类SharedPreferences,多个APP共享

本文介绍Android平台进行数据存储的五大方式,分别如下: 1 使用SharedPreferences存储数据 2 文件存储数据 3 SQLite数据库存储数据 4 使用ContentProvider存储数据 5 网络存储数据 下面详细讲解这五种方式的特点 第一种&#xff1a; 使用SharedPreferences存储数据 …

[Docker#1] 专栏前言 | 亿级高并发架构演进之路

目录 目标 一.前期演进 1. 单机架构 2. 应用数据分离架构 3. 应用集群架构 4. 读写分离/主从分离架构 5. 冷热分离架构 二. 架构 分布式数据库架构 微服务架构 容器编排架构 三. An Internet Application Architecture 理解 上层传输 框架 数据处理 主要思想 …

初识AI大模型,ollama使用,llama factory大模型微调,lama.cpp模型转换guff

最近了解了下生成式AI对话&#xff0c;下面是自己的一些尝试记录。 ollama 安装及使用 1、安装 我是在windows环境下安装的&#xff0c;很简单&#xff0c;访问&#xff1a;https://ollama.com/ &#xff0c;下载windows安装包&#xff0c;打开安装就行了。 cmd输入ollama -v检…

Mybatis、Mybatis-Plus 调用同一个组件的查询时遇到的坑

Mybatis、Mybatis-Plus 调用同一个组件的查询时遇到的坑 Mybais-plus配置了驼峰自动命名&#xff0c;所以不需要在SQL里转化查询。