Kafka 如何保证消息不丢失?【消息手动 ACK】

前言:

Kafka 作为一个 MQ 它肯定会有消息丢失的场景,那我们如何做到让 Kafka 的消息不丢失呢?本篇我们来剖析一下 Kafka 如何做到消息不丢失。

Kafka 系列文章传送门

Kafka 简介及核心概念讲解

Spring Boot 整合 Kafka 详解

Kafka @KafkaListener 注解的详解及使用

Kafka 客户端工具使用分享【offsetexplorer】

Kafka 之消息同步/异步发送

Kafka 之批量消息发送消费

Kafka 之消息广播消费

Kafka 之消息并发消费

Kafka 之顺序消息

Kafka 之事务消息

Kafka 如何保证消息不丢失?

Kafka 有生产者、Broker、Consumer,这三个环节都可能有消息丢失的情况发生,下面我们就从这三个方面来分析 Kafka 是如何保证消息不丢失的。

生产者:

生产者发送消息到 Kafka 集群的时候,可能会因为网络等其他原因导致发送失败,因此我们可以需要一个机制告诉我们消息是否发送成功,如果没有发送成功就一直发送,直到消息发送成功为止,我们常用的 send(msg) 方法其实是异步发送,发送完消息后会立即返回,我们并不知道消息是否发送成功,为了保证消息一定能够发送成功,建议使用同步发送 send(msg).get() 方法或者带有回调的 send(msg,callback) 方法。

同时我们可以对生产者增加一些配置来保证消息不丢失,配置如下:

#0:表示消息发送后立即返回 无需等待 Leader 的任何确认 1:表示消息写入了 Leader 副本 -1: 表示需要等到消息写入到所有 ISR 同步副本中
spring.kafka.producer.acks = 1
#生产消息发送的重试次数
spring.kafka.producer.retries = 3

spring.kafka.producer.acks 各个值的含义如下:

  • acks =0:表示生产者不需要等待任何 Broker 确认收到消息的回复,就可以继续发送下一条消息,性能最高,但是最容易丢消息,可以用在对性能要求很高,但对数据丢失不敏感的情况可以用这种。
  • acks =1:需要保证 Leader 已经成功将消息写入本地 文件,但是不需要等待所有 ISR副本(同步副本)是否成功写入,就可以继续发送下一条消息,这种情况下,如果 ISR副本(同步副本)没有成功备份数据,而此时 Leader又挂掉,则消息会丢失。
  • acks =-1:需要 Leader 及其所有的 ISR副本(同步副本)都成功写入日志,才可以继续发送下一条消息,这种策略会保证只要有一个副本存活就不会丢失数据,最大程度的保证了消息不会丢失。

Broker:

Broker 合理的使用持久化机制,ISR 副本同步机制可以最大程度的保证消息不丢失。

  • 持久化存储:Kafka 使用持久化来存储消息,让消息在写入 Kafka 的时候被写入磁盘,这种方式可以防止消息因为节点宕机而丢失。
  • ISR 副本复制机制:Kafka 使用 ISR 副本同步机制来保证消息不丢失,ISR 副本同步机制可以让一个分区有多个副本,且副本可以分布在不同的节点上,当某个节点宕机后,其他节点可以继续提供服务,保证消息不丢失。

消费者:

做为消费者只需要保证能够正确的消费消息,并正确的提交消息 offset 即可,Kafka 会记录每个消费者的偏移量,消费者每次消费消息的时候,都会将偏移量向后移动,当消费者挂掉或者 Kafka 宕机的时候,Kafka 会将该消费者的所消费的分区偏移量保存下来,当故障恢复后,消费者可以继续从上一次的偏移量开始消费,为了保证消息不丢失,我们使用手动提交偏移量即可,避免拉取了消息后,业务逻辑没有处理完的时候消费者挂掉了,但是提交了偏移量,导致消息丢失。

Consumer 需要关闭自动提交并开启手动提交,具体配置如下:

#消息 ACK 模式 有7种
spring.kafka.listener.ack-mode = manual
#是否开启手动提交 默认自动提交
spring.kafka.consumer.enable-auto-commit = false

Kafka 消息手动 ACK 案例演示

在演示 Kafka 手动 ACK 之前我们先了解一下 Kafka 的几种 ACK 的含义,也就是 AckMode 的枚举值的含义。

public static enum AckMode {RECORD,BATCH,TIME,COUNT,COUNT_TIME,MANUAL,MANUAL_IMMEDIATE;private AckMode() {}
}
  • RECORD:每一条记录被消费者消费之后提交。
  • BATCH:当每一批 poll() 的消息被消费者处理之后提交,频率取决于 poll 的调用频率,是 Kafka 的默认提交方式,BATCH 模式适用于需要提高处理效率的场景,例如批量处理大量消息以减少网络传输和系统调用的开销。
  • TIME:当每一批 poll()的数据被消费者处理之后,距离上次提交时间大于TIME时提交,如果当前时间有消息正在处理,则等当前消息处理完成在提交。
  • COUNT:当每一批 poll()的数据被消费者监处理之后,被处理消息数量大于等于 COUNT 时提交,如果当前时间有消息正在处理,则等当前消息处理完成在提交。
  • COUNT_TIME:TIME 或 COUNT 满足其中一个就提交。
  • MANUAL:当每一批 poll()的数据被消费者监处理之后,手动调用 Acknowledgment.acknowledge() 先将 offset 存放到 map 本地缓存,在下一次 poll 之前从缓存拿出来批量提交。
  • MANUAL_IMMEDIATE:当每一批 poll()的数据被消费者监处理之后,手动调用 Acknowledgment.acknowledge()后立即提交。

Kafka Producer

Kafka Producer 的代码同样很简单,这里我们使用了前面分享的同步、异步发送的的代码,具体如下:

package com.order.service.kafka.producer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;import java.text.SimpleDateFormat;
import java.util.Date;/*** @ClassName: SyncKafkaProducer* @Author: Author* @Date: 2024/10/22 19:22* @Description: 同步发送消息*/
@Slf4j
@Component
public class SyncKafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;//同步发送消息 public void sendSyncMessage(String message) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(new Date());//同步发送消息try {kafkaTemplate.send("sync-topic", message).get();} catch (Exception e) {e.printStackTrace();}log.info("完成消息发送,当前时间:{}", dateStr);}//异步发送消息public void sendAsyncMessage(String message) {try {//同步发送消息ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send("sync-topic", message);listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable throwable) {log.error("消费发送失败");}@Overridepublic void onSuccess(SendResult<String, String> stringStringSendResult) {log.info("消息发送成功");}});} catch (Exception e) {e.printStackTrace();}}}

Consumer 代码演示

Consumer 我们还是使用 @KafkaListener 来完成消息监听,在 Consumer 代码中,我们刻意模拟了除0异常。

package com.order.service.kafka.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;/*** @ClassName: ManualAckKafkaConsumer* @Author: Author* @Date: 2024/10/22 19:22* @Description: 手动 ACK 消息消费*/
@Slf4j
@Component
public class ManualAckKafkaConsumer {@KafkaListener(id = "my-kafka-manual-consumer",groupId = "my-kafka-consumer-manual-groupId-01",topics = "sync-topic",containerFactory = "myContainerFactory")public void listen(String message, Acknowledgment acknowledgment) {log.info("Manual ACK 消息消费成功,消息内容:{}", message);int a = 1 / 0;//手动提交 ACKacknowledgment.acknowledge();}}

结果验证

2024-10-28 17:41:01.568  INFO 17764 --- [-consumer-0-C-1] c.o.s.k.consumer.ManualAckKafkaConsumer  : Manual ACK 消息消费成功,消息内容:我是一条同步消息

结果符合预期,测试的是如果没有关闭除0 异常,客户端会不停的消费这条消息,因此我们在消息消费失败的时候也要注意做出合理处理,例如加入死信队列,避免消息一直在被消费而占用系统资源。

总结:本篇简单分享了 Kafka 如何保证消息不丢失,并分享了对应的手动 ACK 的代码案例,需要注意的是 Kafka 无法做到消息 100% 不丢失,至于 Kafka 为什么没办法做到消息 100% 不丢失,后面会做分享,欢迎持续关注。

如有不正确的地方欢迎各位指出纠正。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/5758.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

推荐一款便捷的图像处理工具:Photo Collage Maker

Photo Collage Maker是一款便捷的图像处理工具&#xff0c;能够对图像进行拼接和剪辑&#xff0c;帮助用户轻松实现各类图像效果的添加。该软件支持图片框的添加以及图片分享功能&#xff0c;适合用于制作照片拼贴、个性化相册、美丽的剪贴簿等创意项目。 软件特点 简单易用 …

yolo v5 开源项目

项目地址&#xff1a;https://gitcode.net/EricLee/yolo_v5

《化纤与纺织技术》是什么级别的期刊?是正规期刊吗?能评职称吗?

问题解答 问&#xff1a;《化纤与纺织技术》是不是核心期刊&#xff1f; 答&#xff1a;不是&#xff0c;是知网收录的第一批认定学术期刊。 问&#xff1a;《化纤与纺织技术》级别&#xff1f; 答&#xff1a;省级。主管单位&#xff1a;广东粤能&#xff08;集团&#xf…

Python 爬取大量数据如何并发抓取与性能优化

Python 并发抓取与性能优化 在进行网络爬虫开发时&#xff0c;爬取大量数据可能非常耗时。尤其是在处理许多网页或 API 请求时&#xff0c;逐个请求速度会非常慢。为了解决这个问题&#xff0c;我们可以通过并发抓取提高爬取效率。同时&#xff0c;通过性能优化来进一步减少耗…

Centos开机自启动脚本示例

本文建议创建一个sh文件管理自启动的各项内容&#xff0c;再将sh文件设置开机启动 在/root/autoshell下创建一个autostart.sh&#xff0c;内容如下 #!/bin/bash # description:开机自启脚本# 启动mongodb sh /root/software/mongodb-linux-x86_64-rhel70-4.0.6/bin/mongod --c…

猫头虎分享: AI设计利器 Recraft详解与基础使用教程

&#x1f981;猫头虎分享&#xff1a;AI设计利器 Recraft——全面解析与教程 大家好&#xff0c;我是猫头虎&#xff01;今天为大家带来一款非常炙手可热的 AI 设计工具 —— Recraft 的深度介绍与详细教程。这款工具自推出以来&#xff0c;就迅速获得了全球设计师的青睐。那么…

Spring AI : 让ChatGPT成为你构建应用的核心亮点

本文是一篇介绍spring ai的文章&#xff0c;主要介绍了生成文本内容&#xff0c;以及读取图片中内容两个能力。 之所以介绍这两个能力&#xff0c;是因为 大模型目前最适合做的事情有两个&#xff1a; 1&#xff09; 非结构化数据的结构化&#xff08;图片转文字&#xff0c;…

Qt(openCV的应用)

1. OpenCV简介 OpenCV&#xff08;Open Source Computer Vision Library&#xff09;是一个开源的计算机视觉库&#xff0c;它提供了丰富的图像处理和计算机视觉功能。该库由英特尔公司发起&#xff0c;并在 BSD 许可证下发布&#xff0c;因此它是免费的&#xff0c;且开放源代…

Excel快速转换文档word工具

【注意事项&#xff1a;】 1、目前支持Win10/11 x64操作系统&#xff0c;已亲测可正常运行。 2、工具解压后在 \excel_docx\excel\目录中提供了转换前的标准模板“testcase.xlsx”&#xff0c;测试童鞋在使用Zentao、JIRA等测试工具导出excel&#xff08;.xlsx后缀&#xff0…

uniapp 集成 uview

注意&#xff1a;HBuildX新建项目时必须选择vue2版本&#xff0c;vue3会不支持uview 下载安装方式&#xff1a; uview安装网站&#xff1a;uView2.0重磅发布&#xff0c;利剑出鞘&#xff0c;一统江湖 - DCloud 插件市场 配置&#xff1a; 1.安装sass插件 // 安装sass npm i …

想要搭建陪玩系统小程序,这几点不容忽视,陪玩系统源码框架

随着互联网经济的持续稳定发展&#xff0c;游戏市场的“封印”逐渐被打开&#xff0c;搭建陪玩平台成为一个新的热点。提起陪玩系统相信大家也不陌生&#xff0c;漫漫单排路如果有一个大神能带自己躺赢那是再好不过了&#xff0c;于是陪玩系统运营而生。想要搭建陪玩平台&#…

【论文笔记】Dense Connector for MLLMs

&#x1f34e;个人主页&#xff1a;小嗷犬的个人主页 &#x1f34a;个人网站&#xff1a;小嗷犬的技术小站 &#x1f96d;个人信条&#xff1a;为天地立心&#xff0c;为生民立命&#xff0c;为往圣继绝学&#xff0c;为万世开太平。 基本信息 标题: Dense Connector for MLLM…

[论文阅读]A Survey of Embodied Learning for Object-Centric Robotic Manipulation

Abstract --以对象为中心的机器人操纵的Embodied learning是体现人工智能中一个快速发展且具有挑战性的领域。它对于推进下一代智能机器人至关重要&#xff0c;最近引起了人们的极大兴趣。与数据驱动的机器学习方法不同&#xff0c;具身学习侧重于通过与环境的物理交互和感知反…

vscode的一些使用心得

问题1&#xff1a;/home目录空间有限 连接wsl或者remote的时候&#xff0c;会在另一端下载一个.vscode-server&#xff0c;vscode的插件都会安装进去&#xff0c;导致空间增加很多&#xff0c;可以选择更换这个文件的位置 参考&#xff1a;https://blog.csdn.net/weixin_4389…

Vue前端开发之自定义动画样式

在上一小节中&#xff0c;我们介绍了动画的实现源于6个类别样式&#xff0c;它们的名称默认前缀是一个“v”字母或者指定的名称&#xff0c;如“sc”&#xff0c;其实&#xff0c;也可以不使用这些固定的类别样式&#xff0c;开发者可以自定义任意的类别样式&#xff0c;供动画…

【709】基于SSM+vue的“萌宠小知识”网站设计与实现

摘 要 如今社会上各行各业&#xff0c;都喜欢用自己行业的专属软件工作&#xff0c;互联网发展到这个时候&#xff0c;人们已经发现离不开了互联网。新技术的产生&#xff0c;往往能解决一些老技术的弊端问题。因为传统萌宠小知识信息管理难度大&#xff0c;容错率低&#xff…

《机器学习by周志华》学习笔记-神经网络-05RBF径向基函数网络

1、背景 1988年,Broomhead和Lowe用径向基函数(Radialbasis function, RBF)提出分层网络的设计方法,从而将神经网络的设计与数值分析和线性适应滤波相挂钩。 2、概念 RBF(Radial Basis Function,径向基函数)网络是一种单隐层前馈神经网络,即该网络只有3层,输入层、隐含…

11.1组会汇报-基于区块链的安全多方计算研究现状与展望

基础知识 *1.背书&#xff0c;这个词源来自银行票据业务&#xff0c;是指票据转让时&#xff0c;原持有人在票据背面加盖自己的印鉴&#xff0c;证明该票据真实有效、如果有问题就可以找原持有人。 区块链中的背书就好理解了。可以简单的理解为验证交易并声明此交易合法&…

MSC“名实之辩”:精准鉴定只为精准治疗

前 言 MSC是一群来源广泛、能够体外增殖分化的异质性细胞。MSC具有免疫调节、促进组织修复等作用&#xff0c;应用于多种疾病的治疗。由于科学进程、习惯等原因&#xff0c;MSC具有多种名称。自先秦以来&#xff0c;就有“名实之辩”&#xff0c;今日我们就讲一讲MSC的名称与…

CST联合Isight进行天线DOE设计

本期我们转载一篇国外工程师Matthias MEIENHOFER&#xff0c;利用CST和Isight联合进行DOE设计的案例。 本文通过模拟设计一个双频带&#xff08;GSM和WLAN&#xff09;天线来研究天线的几何尺寸变化和性能的关系。如果我们改变天线里的某些宽度或长度参数&#xff0c;天线的性…