MQ基础篇
1.初识MQ
1.同步调用
概念:
同步调用是一种程序执行方式,在调用一个函数或服务时,调用方会一直等待被调用方执行完成并返回结果,才会继续执行后续代码 ,期间调用线程处于阻塞状态。
同步调用的优势:
- 时效性强,等待到结果后才返回。
同步调用的问题:
- 拓展性差
- 性能下降
- 级联失败问题
2.异步调用
概念:
异步调用是一种程序执行机制,调用方发出请求后,无需等待被调用方处理完成并返回结果,就能继续执行后续代码 。它基于消息通知的方式,涉及消息发送者、消息接收者和消息代理三个角色。
异步调用通常是基于消息通知的方式,包含三个角色:
- 消息发送者:投递消息的人,就是原来的调用者
- 消息接收者:接收和处理消息的人,就是原来的服务提供者
- 消息代理:管理、暂存、转发消息,你可以把它理解成微信服务器
异步调用的优势: - 耦合度低,拓展性强
- 异步调用,无需等待,性能好
- 故障隔离,下游服务故障不影响上游业务
- 缓存消息,流量削峰填谷
异步调用的问题:
- 不能立即得到调用结果,时效性差
- 不确定下游业务执行是否成功
- 业务安全依赖于Broker的可靠性
3.MQ技术选型
MQ(MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是异步调用中的Broker。
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
Broker:核心组件,在生产者和消费者间起中介作用,负责接收、存储和转发消息
2.RabbitMQ
安装部署
docker run \-e RABBITMQ_DEFAULT_USER=wang \-e RABBITMQ_DEFAULT_PASS=123 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network hm-net\-d \rabbitmq:3.8-management
消息发送的注意事项有哪些?
- 交换机只能路由消息,无法存储消息
- 交换机只会路由消息给与其绑定的队列,因此队列必须与交换机绑定
3.Java客户端
1.快速入门
① 引入spring-boot-starter-amqp依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
② 配置rabbitmq服务端信息
spring:rabbitmq:host: 192.168.88.130 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码
③ 利用RabbitTemplate发送消息
@SpringBootTest
class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {//1.队列名String queueName = "simple.queue";//2.消息String message = "hello,spring amqp!";//3.发送消息rabbitTemplate.convertAndSend(queueName, message);}}
④ 利用@RabbitListener注解声明要监听的队列,监听消息
@Slf4j
@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String message) {log.info("Simple queue: {}", message);}
}
2.WorkQueue
实现一个队列绑定多个消费者
Work模型的使用:
- 多个消费者绑定到一个队列,可以加快消息处理速度
- 同一条消息只会被一个消费者处理
- 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String message) {System.out.println("消费者1接收到消息:" + message + "," + LocalDateTime.now());
}@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String message) {System.err.println("消费者2接收到消息:" + message + "," + LocalDateTime.now());
}
@Test
public void testWorkQueue() {//1.队列名String queueName = "work.queue";for(int i=1;i<=50;i++){//2.消息String message = "hello,spring amqp!"+i;//3.发送消息rabbitTemplate.convertAndSend(queueName, message);}
}
listener:simple:prefetch: 1
3.Fanout交换机
交换机的作用:
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- FanoutExchange的会将消息路由到每个绑定的队列
发送消息到交换机的API:
@Test
public void testFanoutExchange() {// 交换机名称String exchangeName = "itcast.fanout";// 消息String message = "hello, everyone!";// 发送消息,参数分别是:交换机名称、RoutingKey(暂时为空)、消息rabbitTemplate.convertAndSend(exchangeName, "", message);
}
Fanout Exchange 会将接收到的消息路由到每一个跟其绑定的queue,所以也叫广播模式
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String message) {log.info("消费者1监听到 fanout.queue1的消息: {}", message);
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String message) {log.info("消费者2监听到 fanout.queue2的消息: {}", message);
}
@Test
public void testFanoutQueue() {//1.交换机名String exchangeName = "hmall.fanout";//2.消息String message = "hello,everyone!";//3.发送消息rabbitTemplate.convertAndSend(exchangeName,"", message);
}
4.Direct交换机
Direct交换机与Fanout交换机的差异:
- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同RoutingKey,则与Fanout功能类似
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String message) {log.info("消费者1监听到 direct.queue1的消息: {}", message);
}
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String message) {log.info("消费者2监听到 direct.queue2的消息: {}", message);
}
@Test
public void testDirectQueue() {//1.交换机名String exchangeName = "hmall.direct";//2.消息String message = "hello,blue!";//3.发送消息rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}
5.Topic交换机
Topic交换机相比Direct交换机的差异:
- Topic的RoutingKey和bindingKey可以是多个单词,以 . 分割
- Topic交换机与队列绑定时的bindingKey可以指定通配符
- #: 代表0个或多个词
- *: 代表1个词
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String message) {log.info("消费者1监听到 topic.queue1的消息: {}", message);
}
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String message) {log.info("消费者2监听到 topic.queue2的消息: {}", message);
}
@Test
public void testTopicQueue() {//1.交换机名String exchangeName = "hmall.topic";//2.消息String message = "hello,blue!";//3.发送消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
6.声明队列交换机
@Configuration
public class FanoutConfiguration {@Beanpublic FanoutExchange fanoutExchange() {
// return new FanoutExchange("hmall.fanout");return ExchangeBuilder.fanoutExchange("hamll.fanout").build();}@Beanpublic Queue fanoutQueue1() {
// return new Queue("fanout.queue1");return QueueBuilder.durable("fanout.queue1").build();}@Beanpublic Binding fanoutQueueBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@Beanpublic Queue fanoutQueue2() {
// return new Queue("fanout.queue1");return QueueBuilder.durable("fanout.queue2").build();}@Beanpublic Binding fanoutQueueBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1", durable = "true"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String message) {log.info("消费者1监听到 direct.queue1的消息: {}", message);
}
7.消息转换器
建议采用JSON序列化代替默认的JDK序列化,要做两件事情:
在publisher和consumer中都要引入jackson依赖:
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>
在publisher和consumer中都要配置MessageConverter:
@Bean
public MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();
}
@Test
public void testSendObject() {//1.准备消息Map<String, Object> msg=new HashMap<>(2);msg.put("name","Jack");msg.put("age",18);//3.发送消息rabbitTemplate.convertAndSend("object.queue",msg);
}