Kafka 为什么这么快?

Kafka 是一款性能非常优秀的消息队列,每秒处理的消息体量可以达到千万级别。今天来聊一聊 Kafka 高性能背后的技术原理。

1 批量发送

Kafka 收发消息都是批量进行处理的。我们看一下 Kafka 生产者发送消息的代码:

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {TopicPartition tp = null;try {//省略前面代码Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);//把消息追加到之前缓存的这一批消息上RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs);//积累到设置的缓存大小,则发送出去if (result.batchIsFull || result.newBatchCreated) {log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);this.sender.wakeup();}return result.future;// handling exceptions and record the errors;// for API exceptions return them in the future,// for other exceptions throw directly} catch /**省略 catch 代码*/
}

从代码中可以看到,生产者调用 doSend 方法后,并不会直接把消息发送出去,而是把消息缓存起来,缓存消息量达到配置的批量大小后,才会发送出去。

注意:从上面 accumulator.append 代码可以看到,一批消息属于同一个 topic 下面的同一个 partition。

Broker 收到消息后,并不会把批量消息解析成单条消息后落盘,而是作为批量消息进行落盘,同时也会把批量消息直接同步给其他副本。

消费者拉取消息,也不会按照单条进行拉取,而是按照批量进行拉取,拉取到一批消息后,再解析成单条消息进行消费。

使用批量收发消息,减轻了客户端和 Broker 的交互次数,提升了 Broker 处理能力。

2 消息压缩

如果消息体比较大,Kafka 消息吞吐量要达到千万级别,网卡支持的网络传输带宽会是一个瓶颈。Kafka 的解决方案是消息压缩。发送消息时,如果增加参数 compression.type,就可以开启消息压缩:

public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//开启消息压缩props.put("compression.type", "gzip");Producer<String, String> producer = new KafkaProducer<>(props);ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", "key1", "value1");producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {logger.error("sending message error: ", e);} else {logger.info("sending message successful, Offset: ", metadata.offset());}}});producer.close();
}

如果 compression.type 的值设置为 none,则不开启压缩。那消息是在什么时候进行压缩呢?前面提到过,生产者缓存一批消息后才会发送,在发送这批消息之前就会进行压缩,代码如下:

public RecordAppendResult append(TopicPartition tp,long timestamp,byte[] key,byte[] value,Header[] headers,Callback callback,long maxTimeToBlock) throws InterruptedException {// ...try {// ...buffer = free.allocate(size, maxTimeToBlock);synchronized (dq) {//...RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);if (appendResult != null) {// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...return appendResult;}//这批消息缓存已满,这里进行压缩MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));dq.addLast(batch);incomplete.add(batch);// Don't deallocate this buffer in the finally block as it's being used in the record batchbuffer = null;return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);}} finally {if (buffer != null)free.deallocate(buffer);appendsInProgress.decrementAndGet();}
}

上面的 recordsBuilder 方法最终调用了下面 MemoryRecordsBuilder 的构造方法。

public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,byte magic,CompressionType compressionType,TimestampType timestampType,long baseOffset,long logAppendTime,long producerId,short producerEpoch,int baseSequence,boolean isTransactional,boolean isControlBatch,int partitionLeaderEpoch,int writeLimit) {//省略其他代码this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
}

上面的 wrapForOutput 方法会根据配置的压缩算法进行压缩或者选择不压缩。目前 Kafka 支持的压缩算法包括:gzip、snappy、lz4,从 2.1.0 版本开始,Kafka 支持 Zstandard 算法。

在 Broker 端,会解压 header 做一些校验,但不会解压消息体。消息体的解压是在消费端,消费者拉取到一批消息后,首先会进行解压,然后进行消息处理。

因为压缩和解压都是耗费 CPU 的操作,所以在开启消息压缩时,也要考虑生产者和消费者的 CPU 资源情况。

有了消息批量收集和压缩,kafka 生产者发送消息的过程如下图:

图片

3 磁盘顺序读写

顺序读写省去了寻址的时间,只要一次寻址,就可以连续读写。

在固态硬盘上,顺序读写的性能是随机读写的好几倍。而在机械硬盘上,寻址时需要移动磁头,这个机械运动会花费很多时间,因此机械硬盘的顺序读写性能是随机读写的几十倍。

Kafka 的 Broker 在写消息数据时,首先为每个 Partition 创建一个文件,然后把数据顺序地追加到该文件对应的磁盘空间中,如果这个文件写满了,就再创建一个新文件继续追加写。这样大大减少了寻址时间,提高了读写性能。

4 PageCache

在 Linux 系统中,所有文件 IO 操作都要通过 PageCache,PageCache 是磁盘文件在内存中建立的缓存。当应用程序读写文件时,并不会直接读写磁盘上的文件,而是操作 PageCache。

图片

应用程序写文件时,都先会把数据写入 PageCache,然后操作系统定期地将 PageCache 的数据写到磁盘上。如下图:

图片

而应用程序在读取文件数据时,首先会判断数据是否在 PageCache 中,如果在则直接读取,如果不在,则读取磁盘,并且将数据缓存到 PageCache。

图片

Kafka 充分利用了 PageCache 的优势,当生产者生产消息的速率和消费者消费消息的速率差不多时,Kafka 基本可以不用落盘就能完成消息的传输。

5 零拷贝

Kafka Broker 将消息发送给消费端时,即使命中了 PageCache,也需要将 PageCache 中的数据先复制到应用程序的内存空间,然后从应用程序的内存空间复制到 Socket 缓存区,将数据发送出去。如下图:

图片

Kafka 采用了零拷贝技术把数据直接从 PageCache 复制到 Socket 缓冲区中,这样数据不用复制到用户态的内存空间,同时 DMA 控制器直接完成数据复制,不需要 CPU 参与。如下图:

图片

Java 零拷贝技术采用 FileChannel.transferTo() 方法,底层调用了 sendfile 方法。

6 mmap

Kafka 的日志文件分为数据文件(.log)和索引文件(.index),Kafka 为了提高索引文件的读取性能,对索引文件采用了 mmap 内存映射,将索引文件映射到进程的内存空间,这样读取索引文件就不需要从磁盘进行读取。如下图:

图片

7 总结

本文介绍了 Kafka 实现高性能用到的关键技术,这些技术可以为我们学习和工作提供参考。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1541095.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

Mysql进阶——1

一.MySQL程序简介 本章介绍 MySQL 命令⾏程序以及在运⾏这些程序时指定选项的⼀般语法。 对常⽤程序进⾏详细的讲解&#xff0c;包括它们的选项。 MySQL安装完成通常会包含如下程序&#xff1a; • Linux系统程序⼀般在 /usr/bin⽬录下&#xff0c;可以通过命令查看&#x…

MySQL篇(索引)(持续更新迭代)

目录 一、简介 二、有无索引情况 1. 无索引情况 2. 有索引情况 3. 优劣势 三、索引结构 1. 简介 2. 存储引擎对于索引结构的支持情况 3. 为什么InnoDB默认的索引结构是Btree而不是其它树 3.1. 二叉树&#xff08;BinaryTree&#xff09; 3.2. 红黑树&#xff08;RB&a…

让模型评估模型:构建双代理RAG评估系统的步骤解析

在当前大语言模型(LLM)应用开发的背景下,一个关键问题是如何评估模型输出的准确性。我们需要确定哪些评估指标能够有效衡量提示(prompt)的效果,以及在多大程度上需要对提示进行优化。 为解决这一问题,我们将介绍一个基于双代理的RAG(检索增强生成)评估系统。该系统使用生成代理…

MySQL练手题--日期连续类型(困难)

一、准备工作 Create table If Not Exists Failed (fail_date date); Create table If Not Exists Succeeded (success_date date); Truncate table Failed; insert into Failed (fail_date) values (2018-12-28); insert into Failed (fail_date) values (2018-12-29); inser…

攻防世界-1-misc

下载附件&#xff0c;提示需要密码 提示密码是出题人的生日&#xff0c;这里可以自己定义一个关于生日的字典&#xff0c;使用字典生成工具&#xff0c;直接生成字典。&#xff08;我用的是19000101至20231231字典进行的爆破测试&#xff09; 使用archpr软件&#xff0c;和刚刚…

k8s下的网络通信与调度

目录 一、k8s网络通信 1、k8s通信整体架构 2、flannel网络插件 &#xff08;1&#xff09;flannel跨主机通信原理 &#xff08;2&#xff09;flannel支持的后端模式 3、calico网络插件 &#xff08;1&#xff09;简介 &#xff08;2&#xff09;网络架构 &#xff08;…

Css_动态渐变圆圈旋转效果

1、效果图 2、实现代码 <template><div class"box"><div class"line"></div><div class"lineNew"></div></div> </template><script lang"ts" setup></script><styl…

C语言 | Leetcode C语言题解之第421题数组中两个数的最大异或值

题目&#xff1a; 题解&#xff1a; const int HIGH_BIT 30;struct Trie {// 左子树指向表示 0 的子节点struct Trie* left;// 右子树指向表示 1 的子节点struct Trie* right; };struct Trie* createTrie() {struct Trie* ret malloc(sizeof(struct Trie));ret->left re…

天润融通创新功能,将无效会话转化为企业新商机

“您好&#xff0c;请问有什么可以帮您&#xff1f;” “......” 一个新的咨询会话进来&#xff0c;但客户却并不说话&#xff0c;这种情况客服人员肯定不会陌生&#xff0c;它一般被称为“无效会话”。 如今“无效会话”越来越多&#xff0c;已经成为困扰无数企业的难题。…

数学建模 第二讲 - 初等建模

绪论 主要内容:介绍以下几个初等模型&#xff0c;椅子问题、席位分配问题、行走步长问题、实物交换模型。 主要目的:体会数学建模的形式多样性与方法多样性&#xff0c;了解建模思想&#xff0c;着重理解由现实问题向数学问题的转化过程。 一、椅子问题 问题 四条腿长度相等…

Flat File端口更新:如何实现嵌套结构

Flat File端口可以实现平面文件和XML文件的互相转换&#xff0c;本文主要介绍在知行之桥EDI系统8971及更高版本中&#xff0c;Flat File端口如何支持类似EDI嵌套结构的转换。 Flatfile端口如何自定义嵌套结构 下载示例工作流以及示例文件 打开知行之桥EDI系统&#xff0c;创建…

2024年中国研究生数学建模竞赛ABCDEF题【附带解题思路代码+结果】

2024年中国研究生数学建模竞赛D题 点击链接加入群聊【2024华为杯数学建模助攻资料】&#xff1a;http://qm.qq.com/cgi-bin/qm/qr?_wv1027&kxtS4vwn3gcv8oCYYyrqd0BvFc7tNfhV7&authKeyedQFZne%2BzvEfLEVg2v8FOm%2BWNg1V%2Fiv3H4tcE6X%2FW6lCmkhaSaZV4PwQ%2FOVPDtF%2B…

css实现居中的方法

水平居中 1. 行内设置text-align 给父元素设置text-align为center&#xff0c;一般用于实现文字水平居中 2. 给当前元素设置margin&#xff1a;0 auto 原理&#xff1a;块级独占一行&#xff0c;表现为在水平方向上占满整个父容器&#xff0c;当水平方向padding&#xff0c;…

算法-Init

&#xff08;1&#xff09;有限性&#xff08;Finiteness&#xff09;&#xff1a;算法必 需在有限步骤内结束&#xff1b; &#xff08;2&#xff09;确定性&#xff08;Definiteness&#xff09;&#xff1a;算法的每一个步骤必须清晰无歧义地定义&#xff1b; &#xff08;3…

2024年Q3国际信息系统安全认证联盟(ISC2)内部研讨会要点分享

2024年是CISSP认证成立30周年&#xff0c;这是一项具有里程碑意义的成就&#xff0c;代表了CISSP在网络安全领域的卓越、创新和领导力。博主于今年9月份参加了ISC2&#xff08;国际信息系统安全认证联盟&#xff09;组织的2024年第3季度内部网络研讨会&#xff0c;针对会议中的…

国标视频流媒体服务GB28181和Ehome等多协议接入的Liveweb方案详解

Liveweb视频融合/汇聚云平台基于“云-边-端”一体化架构&#xff0c;部署轻量简单、功能灵活多样&#xff0c;平台可支持多协议&#xff08;GB28181/RTSP/Onvif/海康SDK/Ehome/大华SDK/RTMP推流等&#xff09;、多类型设备接入(IPC/NVR/监控平台)&#xff0c;在视频能力上&…

Python 二级考试

易错点 电脑基础知识 定义学生关系模式如下&#xff1a;Student &#xff08;S#&#xff0c; Sn&#xff0c; Ssex&#xff0c;class&#xff0c;monitorS#&#xff09;&#xff08;其属性分别为学号、学生名、性别、班级和班长学号&#xff09; 在关系模式中&#xff0c;如果…

.NET内网实战:通过FSharp白名单执行命令

01阅读须知 此文所节选自小报童《.NET 内网实战攻防》专栏&#xff0c;主要内容有.NET在各个内网渗透阶段与Windows系统交互的方式和技巧。 02基本介绍 本文内容部分节选自小报童《.NET 通过Fsharp执行命令绕过安全防护》我们会长期更新&#xff01; 03编码实现 Fsi.exe 是…

信息安全工程师(9)网络信息安全管理内容与方法

前言 网络信息安全管理是确保网络资产&#xff08;包括网络设备、网络通信协议、网络服务及网络管理&#xff09;的安全性、可用性、完整性和可控性的重要工作。 一、网络信息安全管理内容 数据安全&#xff1a; 保密性&#xff1a;确保数据不被未经授权的第三方获取。完整性&a…

go的结构体、方法、接口

结构体&#xff1a; 结构体&#xff1a;不同类型数据集合 结构体成员是由一系列的成员变量构成&#xff0c;这些成员变量也被称为“字段” 先声明一下我们的结构体&#xff1a; type Person struct {name stringage intsex string } 定义结构体法1&#xff1a; var p1 P…