当前位置: 首页 > news >正文

MQ基础篇

1.初识MQ

1.同步调用

概念:
同步调用是一种程序执行方式,在调用一个函数或服务时,调用方会一直等待被调用方执行完成并返回结果,才会继续执行后续代码 ,期间调用线程处于阻塞状态。

同步调用的优势:

  • 时效性强,等待到结果后才返回。

同步调用的问题:

  • 拓展性差
  • 性能下降
  • 级联失败问题

2.异步调用

概念:
异步调用是一种程序执行机制,调用方发出请求后,无需等待被调用方处理完成并返回结果,就能继续执行后续代码 。它基于消息通知的方式,涉及消息发送者、消息接收者和消息代理三个角色。

异步调用通常是基于消息通知的方式,包含三个角色:

  • 消息发送者:投递消息的人,就是原来的调用者
  • 消息接收者:接收和处理消息的人,就是原来的服务提供者
  • 消息代理:管理、暂存、转发消息,你可以把它理解成微信服务器
    在这里插入图片描述
    异步调用的优势:
  • 耦合度低,拓展性强
  • 异步调用,无需等待,性能好
  • 故障隔离,下游服务故障不影响上游业务
  • 缓存消息,流量削峰填谷

异步调用的问题:

  • 不能立即得到调用结果,时效性差
  • 不确定下游业务执行是否成功
  • 业务安全依赖于Broker的可靠性

3.MQ技术选型

MQ(MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是异步调用中的Broker。

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

Broker:核心组件,在生产者和消费者间起中介作用,负责接收、存储和转发消息

2.RabbitMQ

安装部署

docker run \-e RABBITMQ_DEFAULT_USER=wang \-e RABBITMQ_DEFAULT_PASS=123 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network hm-net\-d \rabbitmq:3.8-management

在这里插入图片描述

消息发送的注意事项有哪些?

  • 交换机只能路由消息,无法存储消息
  • 交换机只会路由消息给与其绑定的队列,因此队列必须与交换机绑定

3.Java客户端

1.快速入门

① 引入spring-boot-starter-amqp依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

② 配置rabbitmq服务端信息

spring:rabbitmq:host: 192.168.88.130 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码

③ 利用RabbitTemplate发送消息

@SpringBootTest
class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {//1.队列名String queueName = "simple.queue";//2.消息String message = "hello,spring amqp!";//3.发送消息rabbitTemplate.convertAndSend(queueName, message);}}

④ 利用@RabbitListener注解声明要监听的队列,监听消息

@Slf4j
@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String message) {log.info("Simple queue: {}", message);}
}

2.WorkQueue

实现一个队列绑定多个消费者

Work模型的使用:

  • 多个消费者绑定到一个队列,可以加快消息处理速度
  • 同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String message) {System.out.println("消费者1接收到消息:" + message + "," + LocalDateTime.now());
}@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String message) {System.err.println("消费者2接收到消息:" + message + "," + LocalDateTime.now());
}
@Test
public void testWorkQueue() {//1.队列名String queueName = "work.queue";for(int i=1;i<=50;i++){//2.消息String message = "hello,spring amqp!"+i;//3.发送消息rabbitTemplate.convertAndSend(queueName, message);}
}
listener:simple:prefetch: 1

3.Fanout交换机

交换机的作用:

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • FanoutExchange的会将消息路由到每个绑定的队列

发送消息到交换机的API:

@Test
public void testFanoutExchange() {// 交换机名称String exchangeName = "itcast.fanout";// 消息String message = "hello, everyone!";// 发送消息,参数分别是:交换机名称、RoutingKey(暂时为空)、消息rabbitTemplate.convertAndSend(exchangeName, "", message);
}

Fanout Exchange 会将接收到的消息路由到每一个跟其绑定的queue,所以也叫广播模式
在这里插入图片描述

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String message) {log.info("消费者1监听到 fanout.queue1的消息: {}", message);
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String message) {log.info("消费者2监听到 fanout.queue2的消息: {}", message);
}
@Test
public void testFanoutQueue() {//1.交换机名String exchangeName = "hmall.fanout";//2.消息String message = "hello,everyone!";//3.发送消息rabbitTemplate.convertAndSend(exchangeName,"", message);
}

4.Direct交换机

在这里插入图片描述
Direct交换机与Fanout交换机的差异:

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同RoutingKey,则与Fanout功能类似
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String message) {log.info("消费者1监听到 direct.queue1的消息: {}", message);
}
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String message) {log.info("消费者2监听到 direct.queue2的消息: {}", message);
}
@Test
public void testDirectQueue() {//1.交换机名String exchangeName = "hmall.direct";//2.消息String message = "hello,blue!";//3.发送消息rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}

5.Topic交换机

在这里插入图片描述
Topic交换机相比Direct交换机的差异:

  • Topic的RoutingKey和bindingKey可以是多个单词,以 . 分割
  • Topic交换机与队列绑定时的bindingKey可以指定通配符
  • #: 代表0个或多个词
  • *: 代表1个词
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String message) {log.info("消费者1监听到 topic.queue1的消息: {}", message);
}
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String message) {log.info("消费者2监听到 topic.queue2的消息: {}", message);
}
@Test
public void testTopicQueue() {//1.交换机名String exchangeName = "hmall.topic";//2.消息String message = "hello,blue!";//3.发送消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

6.声明队列交换机

@Configuration
public class FanoutConfiguration {@Beanpublic FanoutExchange fanoutExchange() {
//        return new FanoutExchange("hmall.fanout");return ExchangeBuilder.fanoutExchange("hamll.fanout").build();}@Beanpublic Queue fanoutQueue1() {
//        return new Queue("fanout.queue1");return QueueBuilder.durable("fanout.queue1").build();}@Beanpublic Binding fanoutQueueBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@Beanpublic Queue fanoutQueue2() {
//        return new Queue("fanout.queue1");return QueueBuilder.durable("fanout.queue2").build();}@Beanpublic Binding fanoutQueueBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1", durable = "true"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String message) {log.info("消费者1监听到 direct.queue1的消息: {}", message);
}

7.消息转换器

建议采用JSON序列化代替默认的JDK序列化,要做两件事情:

在publisher和consumer中都要引入jackson依赖:

<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>

在publisher和consumer中都要配置MessageConverter:

@Bean
public MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();
}
@Test
public void testSendObject() {//1.准备消息Map<String, Object> msg=new HashMap<>(2);msg.put("name","Jack");msg.put("age",18);//3.发送消息rabbitTemplate.convertAndSend("object.queue",msg);
}
http://www.xdnf.cn/news/16975.html

相关文章:

  • LoRA(Low - Rank Adaptation,低秩自适应 )微调技术改进创新点
  • 并发设计模式实战系列(1):半同步/半异步模式
  • day45——非递减数列(LeetCode-665)
  • QT项目打包
  • Multi Agents Collaboration OS:文档合规性及质量检测助手设计及实践
  • 【KWDB 创作者计划】_算法篇---Stockwell变换
  • OpenAI重返巅峰:o3与o4-mini引领AI推理新时代
  • 面试经验杂谈
  • onlyoffice关闭JWT后依然报错如何解决?
  • Maven 简介(图文)
  • 文件系统的npu和内核的npu有什么区别
  • 双层Key缓存
  • 【dify实战】agent结合deepseek实现基于自然语言的数据库问答、Echarts可视化展示、Excel报表下载
  • 数据结构--并查集-高效处理连通性问题
  • windows测试
  • Android 13 关闭屏幕调节音量大小
  • LabVIEW油气井井下集成监测系统
  • 神经网络优化 - 小批量梯度下降
  • 消防营区管控:从智能仓储、装备管理、应急物资调用等多维度出发
  • 每日一题(8) 求解矩阵最小路径和问题
  • android的配置检查 查看安卓设备配置
  • LeetCode:DFS综合练习
  • 在服务器上安装redis
  • Kaamel隐私与安全分析报告:Apple Intelligence隐私保护机制
  • MySQL 表varchar字段长度估算
  • 预防网站被劫持
  • WordPress自定义页面与文章:打造独特网站风格的进阶指南
  • 嵌入式C语言位操作的几种常见用法
  • springcloud alibaba
  • FreeRTOS菜鸟入门(七)·创建任务·静态任务创建