项目实战总结-Kafka实战应用核心要点

Kafka实战应用核心要点

  • 一、前言
  • 二、Kafka避免重复消费
    • 2.1 消费者组机制
    • 2.2 幂等生产者
    • 2.3 事务性生产者/消费者
    • 2.4 手动提交偏移量
    • 2.5 外部存储管理偏移量
    • 2.6 去重逻辑
    • 2.7 幂等消息处理逻辑
    • 2.8 小结
  • 三、Kafka持久化策略
    • 3.1 持久化文件
    • 3.2 segment 分段策略
    • 3.3 数据文件刷盘策略
    • 3.4 日志清理策略
    • 3.5 Kafka消息查找策略
  • 四、Kafka零复制(Zero-copy)
  • 五、Kafka设计实现延迟消息
  • 六、Kafka与ZooKeeper依赖性

一、前言

在这里插入图片描述
记录Kafka在项目中应用的核心要点,面试可食用。

二、Kafka避免重复消费

在 Apache Kafka 应用于项目中时,避免重复消费是个重要且常见的问题,尤其是在处理消息时需要确保每条消息只被处理一次。总结而言,避免重复消费的方式有七种:

2.1 消费者组机制

Kafka消费者组(Consumer Group)机制可以确保每个分区的消息只被一个消费者实例消费。通过合理的分区和消费者组设计,可以避免同一消息被多个消费者重复消费。
优点:

  • 简单易用,Kafka内置支持。
  • 适用于简单的负载均衡和扩展。

缺点:

  • 不能完全避免重复消费,比如在消费者重启或重新平衡的过程中可能会有些消息被重复消费。
  • 需要额外处理消费者重平衡带来的复杂性。

2.2 幂等生产者

Kafka 0.11.0版本引入幂等生产者(Idempotent Producer),可确保相同的消息在网络或其他错误导致重试时不会被重复写入Kafka。
启用幂等生产者只需要在生产者配置中设置enable.idempotence=true。幂等生产者确保消息在网络或其他错误导致重试时不会被重复写入 Kafka,通过为每个消息分配唯一的序列号来实现幂等性。
配置修改:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("enable.idempotence", "true");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

优点:

  • 简化生产者端的去重逻辑。
  • 可以确保消息在Kafka中只写入一次。

缺点:

  • 需要Kafka 0.11.0及以上版本。
  • 在某些情况下可能会增加生产者的延迟。

2.3 事务性生产者/消费者

Kafka支持事务性消息,允许生产者和消费者在一个事务中一起工作。生产者可以将一组消息作为一个事务写入Kafka,消费者也可以在一个事务中读取和处理消息。这样可确保消息处理的原子性和一致性。要使用事务性生产者,需要配置transactional.id
配置修改:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

优点:

  • 提供强一致性保证。
  • 避免消息处理中的部分提交问题。

缺点:

  • 复杂度较高,需Kafka 0.11.0及以上版本。
  • 性能开销较大,适用于对一致性要求高的场景。

2.4 手动提交偏移量

Kafka消费者默认会自动提交偏移量(auto commit),为更好地控制消息处理和偏移量提交,可关闭自动提交(enable.auto.commit=false),并在确保消息处理成功后手动提交偏移量。这可通过commitSync()commitAsync()方法来实现。
配置修改:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 处理消息}consumer.commitSync();
}

优点:

  • 精细控制偏移量提交时机,确保消息处理成功后才提交。
  • 提高处理的可靠性。

缺点:

  • 增加消费者代码的复杂性。
  • 如果处理逻辑很慢,可能导致偏移量提交延迟。

2.5 外部存储管理偏移量

在某些特定场景下,可将偏移量存储在外部存储(如数据库)中,而不是依赖 Kafka的内部偏移量管理。这样可在消息处理和偏移量提交之间建立更强的关联,确保只有当消息处理成功后才更新偏移量。
优点:

  • 可以在消息处理和偏移量提交之间建立更强的关联。
  • 灵活性高,可根据业务需求自定义偏移量管理。

缺点:

  • 需要额外的存储和管理逻辑。
  • 增加系统的复杂性。

2.6 去重逻辑

在消息处理逻辑中引入去重机制。
例如,可以使用消息的唯一标识符(如消息ID)在处理前检查是否已经处理过该消息,从而避免重复处理。
优点:

  • 灵活性高,可根据业务逻辑自定义去重策略。
  • 适用于需要严格去重的场景。

缺点:

  • 需要额外的存储和管理去重信息。
  • 增加处理逻辑的复杂性。

2.7 幂等消息处理逻辑

设计消息处理逻辑时,尽量使其成为幂等操作,即相同的消息即使被处理多次也不会产生副作用。
例如,在数据库操作时,可以使用UPSERT操作(更新插入)来确保数据的一致性。
优点:

  • 简化重复消费问题的处理。适
  • 用于可以设计为幂等操作的业务场景。

缺点:

  • 并不是所有业务逻辑都能设计为幂等操作。
  • 需要仔细设计和验证处理逻辑的幂等性。

2.8 小结

对于大多数场景,结合使用消费者组、手动提交偏移量和幂等处理逻辑可以有效避免重复消费,而在需要更严格一致性的场景下,可以考虑使用幂等生产者和事务性消息
具体选择方案取决于具体的应用场景和需求。

三、Kafka持久化策略

Kafka实际上就是日志消息存储系统, 根据offset获取对应的消息,消费者获取到消息之后该消息不会立即从mq中移除,而是继续存储在磁盘中

3.1 持久化文件

topic有分区(partition)的概念,Kafka 会将topic分成多个不同的分区,生产者往同一个topic发送的消息最终是发送到不同的分区里面,每个分区中拆分成多个不同的segment文件存储日志。
每个segment文件包含:

  • .index 文件 (消息偏移量索引文件)
  • .log 文件(消息物理存放文件)
  • .timeindex文件(时间索引文件)

每个segment文件容量最大默认为500MB,如果超过500MB就生成新的 segment文件,且文件命名后几位表示上个segment文件最后offset值,如:segment01 、segment500 、segment1000
由此可知:一个topic里的消息是由该topic下所有分区里的消息组成的。在同一个分区内部,消息是有序的,而不同分区之间,消息是不能保证有序的

存储的消息日志文件在 server.properties 配置文件的 log.dirs 参数指定的目录下,以" t o p i c − topic- topicpartition"为名称的目录:
在这里插入图片描述
注:由于每个分区都有leader的概念,而不同分区的leader可能位于不同的broker上,除leader外,分区还有副本(replica)的概念,因此每个broker只会存储分区leader或副本位于该broker中的topic的消息。

3.2 segment 分段策略

在 server.properties 配置文件中,分段文件配置默认是500MB ,有利于快速回收磁盘空间,重启kafka加载也会加快(如果文件过小,则文件数量比较多,kafka启动时是单线程扫描目录(log.dir)下所有数据文件),文件较多时性能会稍微降低。下面是相关配置参数:

##日志滚动的周期时间,到达指定周期时间时,强制生成一个新的segment
log.roll.hours=72
##segment的索引文件最大尺寸限制,即时log.segment.bytes没达到,也会生成一个新的segment
log.index.size.max.bytes=10*1024*1024
##控制日志segment文件的大小,超出该大小则追加到一个新的日志segment文件中(-1表示没有限制)
log.segment.bytes=1024*1024*1024

3.3 数据文件刷盘策略

当把数据写入到文件系统之后,数据其实在操作系统的page cache里面,并没有刷到磁盘上去。若此时操作系统宕机,数据就会丢失。
这里可根据消息的数量log.flush.interval.messages和时间log.flush.interval.ms进行配置,如果时间设置的过大,有没达到指定的数量的情况下,如果系统宕机,数据就会丢失。
Kafka官方并不建议通过Broker端的log.flush.interval.messageslog.flush.interval.ms来强制写盘,认为数据的可靠性应该通过Replica来保证,而强制Flush数据到磁盘会对整体性能产生影响。

##每当producer写入10000条消息时,刷数据到磁盘 配置
log.flush.interval.messages=10000
##每间隔5秒钟时间,刷数据到磁盘
log.flush.interval.ms=5000

3.4 日志清理策略

##   是否开启日志清理
log.cleaner.enable=true
##  日志清理运行的线程数
log.cleaner.threads = 2
##  日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖,默认 delete
log.cleanup.policy = delete
##  数据文件保留多长时间, 存储的最大时间超过这个时间会根据log.cleanup.policy设置数据清除策略
##  log.retention.bytes和 log.retention.minutes或 log.retention.hours任意一个达到要求,都会执行删除
log.retention.minutes=300
log.retention.hours=24
##   topic每个分区的最大文件大小,-1没有大小限制
log.retention.bytes=-1
##  文件大小检查的周期时间,是否触发 log.cleanup.policy中设置的策略
log.retention.check.interval.ms=5minutes

3.5 Kafka消息查找策略

前文提到每个segment file有命名规则,且在.index文件中,存储的是key-value格式的,key代表在.log中按顺序开始顺序消费的offset值,value代表该消息的物理消息存放位置。
但是在.index中不是对每条消息都做记录,它是每隔一些消息记录一次,避免占用太多内存。即使消息不在index记录中,在已有的记录中查找,范围也大大缩小。
kafka就是利用分段+索引的方式来解决查找效率的问题,kafka没有对每个文件建立索引,而是利用kafka 消息写入磁盘的顺序性,对其中部分的消息建立偏移量索引和时间戳索引,这就是稀疏索引,目的是节约空间的资源,定位到相邻.log文件,再根据顺序遍历查找,此方式的时间复杂度是O(n)。
其中,偏移量索引源码:

offsetIndex.append(largestOffset, physicalPosition)def append(offset: Long, position: Int) {inLock(lock) {// 索引位置mmap.putInt(relativeOffset(offset))// 日志位置mmap.putInt(position)_entries += 1_lastOffset = offset}
}// 用当前offset减去基准offset
def relativeOffset(offset: Long): Int = {val relativeOffset = offset - baseOffset
}

时间戳索引源码:

timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)def maybeAppend(timestamp: Long, offset: Long, skipFullCheck: Boolean = false) {inLock(lock) {if (timestamp > lastEntry.timestamp) {// 添加时间戳mmap.putLong(timestamp)// 添加相对位移(偏移量索引)mmap.putInt(relativeOffset(offset))_entries += 1_lastEntry = TimestampOffset(timestamp, offset)}}
}

Kafka使用改进版的二分查找,改的不是二分查找的内部,而是把所有索引项分为热区和冷区 这个改进可以让查询热数据部分时,遍历的Page永远是固定的,这样能避免缺页中断。
整体流程:
在这里插入图片描述

四、Kafka零复制(Zero-copy)

Kafka信息复制的原因:确保任何已发布的消息不会丢失,并且可以在机器错误、程序错误或更常见些的软件升级中使用。
Kafka之所以能够快速地处理大量数据,其中一个重要原因就是采用零拷贝(Zero-copy)技术。Kafka采用两种零拷贝技术来提高性能:mmap(memory-map)sendfile
主要有两个大的场景:

  • Broker 读写.index文件,用 mmap零复制
  • Broker 向Consumer发消息,用 sendfile 零复制

mmap (memory-map):把文件映射到进程的虚拟内存空间。通过对这段内存的读取和修改,可以实现对文件的读取和修改,而不需要用read和write系统调用。
sendfile:直接在内核完成输入和输出,不需要拷贝到用户空间再写出去。

五、Kafka设计实现延迟消息

Kafka延时操作的实现方式:基于时间戳的延时和基于特殊Topic的延时
(1)基于时间戳的延时:通过设置消息的时间戳来实现延时操作。Producer在发送消息时,可以为消息设置一个未来的时间戳,指定消息在该时间点之后才能被消费者消费。Kafka会根据消息的时间戳进行延时推送,直到时间点到达后才将消息发送给消费者。
(2)基于特殊Topic的延时:通过创建专门的延时Topic来实现延时操作。可以将需要延时的消息发送到延时Topic中,然后设置一个定时任务来定期检查延时Topic中的消息,并将到期的消息转发到目标Topic供消费者消费。

简单步骤:

1.创建正常的topic(即即时消费的消息)。
2.创建延迟的topic,并设置合适的副本因子和参数以支持延迟消费。
3.发送消息到正常的topic,同时指定消息需要被延迟消费。
4.使用Kafka的消费者API从延迟topic拉取消息并处理。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class DelayedMessageProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);// 正常的topicString immediateTopic = "immediate-messages";// 延迟的topicString delayedTopic = "delayed-messages";// 消息内容String value = "This is a delayed message";// 延迟消费的时间,例如10秒long delayTime = 10000;// 发送消息到延迟的topicproducer.send(new ProducerRecord<>(delayedTopic, 0, System.currentTimeMillis() + delayTime, value));producer.close();}
}

六、Kafka与ZooKeeper依赖性

从Kafka 2.8版本开始,Kafka提供KRaft模式,需要配置Quorm控制器,可以在没有ZooKeeper的情况下运行Kafka集群。
之前版本,Zookeeper是Kafka的核心组件之一,负责集群元数据的管理和控制器的选举等任务。Zookeeper存储和管理着Kafka的元数据信息和配置信息,包括broker的IP地址、端口号、主题分区的分配方案等。此外,Zookeeper还帮助Kafka集群实现自动故障转移和负载均衡等功能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1544282.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

迎国庆-为祖国庆生python、Java、C各显神通

" 金秋送爽&#xff0c;丹桂飘香“&#xff0c;我们即将即将迎来祖国母亲的华诞&#xff01;&#xff01; 七十余载风雨兼程&#xff0c;无数先辈以热血铸就辉煌&#xff0c;换来了今日的繁荣昌盛。从东方破晓的第一缕曙光&#xff0c;到星辰大海的无限探索&#xff0c;中…

git 删除 git push 失败的记录

文章目录 问题分析 问题 git push 失败后如何清理 commit 提交的内容 当我们 git push 失败后&#xff0c;如果下次有新的改动需要push时&#xff0c;会出现如下报错 分析 找到需要回退的那次commit的 哈希值 git log然后就回退到了指定版本&#xff0c;这个时候再把新修改…

解析rss链接数据,来长期把某博客数据订阅到自己的网站

目的 当我们打开这个订阅链接&#xff0c;会看到我们的文章信息以xml的形式呈现到浏览器页面中&#xff0c;怎么直接在我们自己的网站中&#xff0c;将这个链接的数据转为我们熟悉的json数据&#xff0c;然后渲染到自己的网站中呢 技术栈 react hookstypescriptwebpack 核心…

【C++掌中宝】深入理解函数重载:概念、规则与应用

文章目录 引言1. 什么是函数重载&#xff1f;2. 为什么需要函数重载&#xff1f;3. 编译器如何解决命名冲突&#xff1f;4. 为什么返回类型不参与重载&#xff1f;5. 重载函数的调用匹配规则6. 编译器如何解析重载函数的调用&#xff1f;7. 重载的限制与注意事项8. 总结结语 引…

柯桥小语种学习之语言交流 | 德语餐厅用语

01 一、入座与点餐 1. Guten Tag! Ein Tisch fr zwei Personen, bitte.&#xff08;你好&#xff01;请给我们一张两人桌。&#xff09; 2. Knnen wir hier sitzen?&#xff08;我们可以坐这里吗&#xff1f;&#xff09; 3. Die Speisekarte, bitte.&#xff08;请给我菜…

在Windows系统上安装的 zlib C++ 库

在Windows系统上安装的 zstd C 库 项目地址步骤步骤一步骤二步骤三如果生成过程中遇到如下错误: 效果 项目地址 https://github.com/madler/zlib 可以发现这个项目有CMakeLists.txt文件,那就比较好搞了 步骤 步骤一 git clone gitgithub.com:madler/zlib.git步骤二 cd zli…

丢失照片/消息/文件,当发现没有备份 Android 手机数据时急救方法

当人们发现他们没有备份 Android 手机数据时&#xff0c;通常为时已晚。但是&#xff0c;我们都不想永久丢失珍贵的照片&#xff0c; 消息和其他文件。这就是为什么您应该检查 遵循 5 大免费 Android 数据恢复工具和最佳替代品 他们。 排名前五的免费 Android 数据恢复软件 1.奇…

黑芝麻A1000-Ubuntu20.04(九)yolov5从训练到板端运行过程详解

宿主机&#xff1a;台式电脑 Ubuntu20.04 开发板&#xff1a;A1000&#xff08;烧录版本SDK v2.3.1.2&#xff09; 模型转换容器&#xff1a;bsnn-tools-container-stk-4.2.0 编译容器&#xff1a;a1000b-sdk-fad-2.3.1.2 yolov5使用工程&#xff1a;黑芝麻根据https://github.…

PHP探索校园新生态校园帮小程序系统小程序源码

探索校园新生态 —— 校园帮小程序系统&#xff0c;让生活更精彩&#xff01; &#x1f331;【开篇&#xff1a;走进未来校园&#xff0c;遇见新生态】&#x1f331; 你是否厌倦了传统校园的繁琐与单调&#xff1f;是否渴望在校园里也能享受到便捷、智能的生活体验&#xff1…

3d可视化图片:通过原图和深度图实现

1、depthy 在线体验demo: https://depthy.stamina.pl/#/ 也可以docker安装上面服务: docker run --rm -t -i -p 9000:9000 ndahlquist/depthy http://localhost:90001)首先传原图 2)再传对应深度图 3)效果 </ifra

网络事件管理

网络事件管理是运行组织 IT 网络不可或缺的一部分&#xff0c;网络事件管理的最终目标很简单&#xff1a;在发生中断时尽快恢复服务或功能。但是为了高效和一致地进行&#xff0c;IT 运营团队需要时刻保持警惕&#xff0c;不断了解网络事件&#xff0c;并且必须系统地遵循一套程…

opencv4.5.5 GPU版本编译

一、安装环境 1、opencv4.5.5 下载地址&#xff1a;https://github.com/opencv/opencv/archive/refs/tags/4.5.5.ziphttps://gitee.com/mirrors/opencv/tree/4.5.0 2、opencv-contrib4.5.5 下载地址&#xff1a;https://github.com/opencv/opencv_contrib/archive/refs/tags/4…

ToB项目身份认证AD集成(二):一分钟搞定window server 2003部署AD域服务并支持ssl加密(多图保姆教程+证书脚本)

在ToB的应用开发中&#xff0c;往往需要集成AD域控实现身份认证&#xff0c;同时也算是近期工作的总结&#xff0c;之前已介绍了基础的AD、Ldap&#xff0c;本文主要介绍如何大家一个本地的测试环境。 相关系列&#xff1a; ToB项目身份认证AD集成&#xff08;一&#xff09;&a…

【JavaSE】-- 类和对象(1)

文章目录 1. 面向对象的初步认知1.1 什么是面向对象1.2 面向对象与面向过程 2. 类的定义和使用2.1 简单认识类2.2 类的定义格式 3. 类的实例化3.1 什么是实例化3.2 类和对象的说明 4. this引用4.1 为什么要有this引用4.2 什么是this引用4.3 this引用的特性 5. 对象的构造及初始…

增强GPT4v的Grounding能力,video-level

开源链接&#xff1a; appletea233/AL-Ref-SAM2: AL-Ref-SAM 2: Unleashing the Temporal-Spatial Reasoning Capacity of GPT for Training-Free Audio and Language Referenced Video Object Segmentation (github.com) In this project, we propose an Audio-Language-Refe…

Spring Boot中实现一个递归获取省市区行政区划代码

Spring Boot中实现一个递归获取省市区行政区划代码 写于20240924 10:23 在Spring Boot中实现一个递归获取省市区行政区划代码的功能&#xff0c;可以按照以下步骤进行。我们将使用Spring Data JPA来与数据库交互&#xff0c;并构建一个递归的方法来获取层级数据。 首先这里数据…

11周年 | 初心不改,焕新前行,奔赴下一个10年!

2024年8月13日&#xff0c;爱加密正式迎来了11岁生日&#xff0c;在爱加密肩负着崇高使命踏浪而行的10年间&#xff0c;蓝绿色的品牌标识一直伴于左右。随着时代的变迁以及市场需求的不断变化&#xff0c;企业同样也需要在品牌上做出创新递进&#xff0c;从而更加适应市场竞争的…

数据科学的秘密武器:defaultdict——Python字典的自动化填充神器,让数据结构更灵活

目录 什么是defaultdict 引入动机 创建与初始化 工作原理 自定义默认值函数 注意事项 使用案例 使用场景 1: 计数 使用场景 2: 分组数据 使用场景 3: 嵌套字典结构 进阶案例使用 进阶案例 1: 使用 defaultdict 实现词频统计并排序 进阶案例 2: 使用 defaultdict 实…

OpenCSG推出StarShip SecScan:AI驱动的软件安全革新

OpenCSG 导读 如今&#xff0c;IT 技术迅速发展&#xff0c;软件安全不仅是企业稳健运营的基础&#xff0c;更是整个社会经济体系安全的保障。加强软件安全&#xff0c;尤其是在开发阶段识别和修补漏洞&#xff0c;是企业必须重视的问题。国际数据公司&#xff08;IDC&#xf…

MyBatis 入门教程-搭建入门工程

Maven作为一个优秀的项目构建和管理工具,在日常的开发中被大多数开发者使用,后续的项目也是基于Maven来构建。 创建一个Maven项目 利用IDEA创建项目工具来创建一个Maven项目 添加MyBatis的依赖 这里可以从Maven仓库地址中进行查看, https://mvnrepository.com/ 从这里可…