生产者确认
生产者确认就是:发送消息的人,要确保消息发送给了消息队列,分别是确保到了交换机,确保到了消息队列这两步。
1、在发送消息服务的application.yml中添加配置
spring:rabbitmq:publisher-confirm-type: correlated # 异步回调publisher-returns: truetemplate:mandatory: true
2、确保消息到交换机
package cn.zsh.mq.spring;import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.ListenableFutureCallback;import java.util.UUID;@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testConfirmCallBack() {// 1、定义消息String message = "ABC";// 设置一个消息的唯一IDCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 3、confirm-ackcorrelationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {System.out.println("消息发送异常:" + ex.toString());}@Overridepublic void onSuccess(CorrelationData.Confirm result) {if (result.isAck()) {// 说明到了交换机System.out.println("publish-confirm:ack==消息发送成功:" + correlationData.getId());} else {// 消息没有到交换机System.out.println("publish-confirm:nack==消息发送失败:" + correlationData.getId());}}});// 4、消息发送rabbitTemplate.convertAndSend("191exchange","191",message,correlationData);}}
3、确保消息从交换机路由到队列
创建公开CommonConfig类
package cn.zsh.mq.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;/*** 发送消息到交换机没有到消息队列*/
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 1、获取RabbitTemplate(获取启动中的Bean的方式)RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 2、设置回调函数rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.error("发送消息失败,没到队列===消息:{}, 交换机:{}, 路由Key:{}, 响应CODE:{}, 相应内容:{}", message,exchange,routingKey,replyCode,replyText);}});}
}
消息持久化
消息持久化就是:确保消息不会在交换机或者队列中丢失。
案例:
使用SpringAMQP创建出来的交换机和队列,默认就是做了持久化的
package cn.zsh.mq.config;import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;/*** 创建交换机与队列*/
@Component
public class FoundQueue {@Beanpublic DirectExchange qiuExchange(){// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除return new DirectExchange("qiu.deirect",true,false);}@Beanpublic Queue piqiuQueue(){// 使用QueueBuilder构建队列,durable就是持久化的return QueueBuilder.durable("piqiu.queue").build();}
}
消费者确认
消费者确认就是:消费者把消息从队列中获取出来,然后要消费成功,队列中的消息才能被删除掉。
方案一:消费者确认
加入这个配置以后,消费者消费失败,会直接重试或者删除,具体取决于设置的是none还是auto。
默认是none,不建议设置为auto模式因为会一直不断地尝试,这样会导致服务器压力很大。
spring:rabbitmq:listener:simple:acknowledge-mode: auto # none:投递完立马删除 auto:失败后让你再次重试(重新投递到队列)知道成功
方案二:消费者失败重试,重试固定次数后,则删除当前消息
加入这个配置以后,消费者消费失败会重试固定的次数,然后将消息删除。
spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * initial-intervalmax-attempts: 3 # 最大重试次数stateless: true # ture:无状态 false:有状态。如果业务中包含事务,这里改成false
方案三:消费者失败重试,重试固定次数后,将当前消息发送给error交换机路由给error队列
加入这个配置之后,重试固定次数后,会将这条消费失败的消息发送给error交换机,路由给error队列。
1、在消费者(消息接收者)中加入配置
spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * initial-intervalmax-attempts: 3 # 最大重试次数stateless: true # ture:无状态 false:有状态。如果业务中包含事务,这里改成false
2、创建error交换机和队列并绑定
package cn.zsh.mq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
public class ErrorConfig {/*** 定义交换机* @return*/@Beanpublic DirectExchange errorExchange2(){return new DirectExchange("error.direct");}/*** 定义队列* @return*/@Beanpublic Queue errorQueue2(){return new Queue("error.queue");}@Beanpublic Binding bindErrorQueue(DirectExchange errorExchange2,Queue errorQueue2){return BindingBuilder.bind(errorQueue2).to(errorExchange2).with("error");}
}
3、在启动类或者配置类中加入配置
package cn.zsh.mq;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;@SpringBootApplication
public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct","error");}
}