kafka客户端消费者吞吐量优化

问题背景

业务场景

mq消息消费实时性要求不高,期望可以牺牲一部分实时性,换取吞吐量,例如:数据库单条insert优化为batchInsert。优化后结果不符合预期:消费者消费消息的batchSize远小于实际配置的max.poll.records,导致在批量消息达到时想要聚合mq批量操作业务数据效果与单条处理效果类似。于是翻查影响kafka吞吐量的相关配置

原因分析

Kafka 的 poll() 方法返回的消息数量与 batch.size 参数并不是直接相关的,影响 Kafka 消费者 poll() 时能获取消息数量的因素有很多。让我们逐步分析这些因素,并探讨可能的优化方法。

影响 poll() 返回消息数量的因素

  1. **max.poll.records**** 设置**
    • max.poll.records 限制了每次 poll() 获取的消息最大数量。即使 batch.size 设置得较大,如果 max.poll.records 较小,poll() 获取的消息数依然受限制。
  2. 消息生产速度
    • 如果生产者写入 Kafka 的速度较慢,消费端的 poll() 方法在调用时可能没有足够的消息积累,因此无法返回足够多的消息。
    • 解决方案:监控生产者的写入速度和 Kafka 的积压情况,确保有足够的消息被生产和累积。
  3. 分区数量与消费者线程数
    • 如果 Kafka topic 的分区数量较少,且每个消费者线程处理一个或多个分区,那么分区中的消息总量可能不足,导致每次 poll() 返回的消息数较少。
    • 解决方案:确保 topic 分区数量合理,并且适当增加消费者实例,以提高并行处理能力。
  4. **fetch.min.bytes**** 和 **fetch.max.wait.ms** 设置**
    • fetch.min.bytes:消费者从 Kafka 获取消息时,设置了一个最小的字节数,当数据量不足时,Kafka 将等待更多的数据写入。
    • fetch.max.wait.ms:当 fetch.min.bytes 的数据量未达到时,Kafka 消费者会等待一定的时间再返回消息。如果设置过短,可能会在消息不足时提早返回。
    • 优化思路:增加 fetch.min.bytes 值,使消费者等待更多数据积累后再返回。同时适当调整 fetch.max.wait.ms,确保不会过早返回消息。
  5. 消费者的 **poll()** 调用频率
    • 消费者应用程序调用 poll() 的频率也影响返回的消息数量。如果频繁调用 poll(),每次返回的消息可能较少。
    • 解决方案:适当调整 poll() 的调用频率,确保消费者等待足够的消息后再调用。
  6. **session.timeout.ms**** 和 **heartbeat.interval.ms** 设置**
    • 如果这些参数配置不当,Kafka 消费者可能会因为过频繁发送心跳而导致每次 poll() 间隔较短,未能积累足够的消息。
    • 解决方案:增加 session.timeout.msheartbeat.interval.ms 的值,允许消费者有足够时间 poll() 更多消息。

优化建议

  1. **增加 ****max.poll.records**
    • max.poll.records 设置得更大,以确保每次 poll() 尽可能多地返回消息。例如,尝试将其从默认的 500 增加到 1000 或更大。
  2. **调整 **fetch.min.bytes** 和 ****fetch.max.wait.ms**
    • 可以将 fetch.min.bytes 增加到 1MB 或 5MB,这样消费者将会等待 Kafka 收集到足够的消息再返回。也可以适当增加 fetch.max.wait.ms,让 Kafka 多等待一段时间再返回消息。
  3. 监控消费者调用的频率
    • 适当降低 poll() 的调用频率,确保 Kafka 消费者有时间积累足够的数据。
  4. 增加分区数
    • 确保 Kafka topic 有足够的分区,使得每个分区中可以累积足够的消息。此外,可以考虑增加消费者数量来并行处理消息。

通过这些配置调整,你可以增加每次 poll() 获取的消息数量,从而提高批量处理效率。

参数含义

**fetch.max.wait.ms**

  • 描述: fetch.max.wait.ms 是 Kafka 消费者端的配置,表示当消费者从 Kafka broker 请求消息时,如果可用的数据量不足 fetch.min.bytes,消费者最多会等待的时间(毫秒)。当超出这个时间后,即使没有足够的数据,也会返回当前已经积累的消息。
  • 用途:
    • 这个参数主要用于优化消费者在没有足够数据的情况下的等待时间。通过设定一个合理的 fetch.max.wait.ms,消费者可以等待更多数据积累来提高吞吐量,但不会因数据不足无限等待。
    • 如果消息到达频率低,消费者就会等待 fetch.max.wait.ms 毫秒后返回;如果数据积累足够快,消费者会尽早返回。
  • 默认值: 通常默认值是 500 毫秒。
  • 场景: 应用于从本地 Kafka broker 拉取消息的延迟和吞吐量控制。

**remote.fetch.max.wait.ms**

  • 描述: remote.fetch.max.wait.ms 是 Kafka 远程存储机制相关的一个参数。远程存储是在 Kafka 3.0 引入的架构优化,允许将过期的日志段(log segments)存储在远程存储介质上,如云存储(Amazon S3,Google Cloud Storage 等),而不是一直保留在本地磁盘上。remote.fetch.max.wait.ms 控制 Kafka 代理从远程存储拉取过期日志段时,最大等待的时间。
  • 用途:
    • 当消费者尝试读取的数据已经从本地磁盘迁移到远程存储时,Kafka 代理会从远程存储系统中拉取该段数据。remote.fetch.max.wait.ms 就是用来限制 Kafka 在从远程存储读取数据时的最大等待时间。
    • 远程存储拉取通常比本地拉取要慢,因为涉及外部存储系统,所以这个参数用于优化消费者在这种情况下的性能。
  • 场景: 应用于 Kafka 从远程存储拉取过期消息的等待时间控制。

fetch.min.bytes

fetch.min.bytes 是 Kafka 消费者端的一个配置参数,它用于控制每次从 Kafka broker 拉取数据时的最小数据量。这个参数决定了消费者拉取消息时的行为,影响数据批量处理的效率和延迟。

参数作用:
  • 功能:指定 Kafka broker 每次返回给消费者的消息的最小字节数。当消费者发起拉取请求时,Kafka broker 会等待消息积累到至少 fetch.min.bytes 指定的字节数后,再将数据返回给消费者。
  • 如果 broker 在 fetch.max.wait.ms 时间内没有积累到足够的数据(即 fetch.min.bytes),它会返回当前可用的数据量,即使小于 fetch.min.bytes
默认值:
  • 默认值为 1,意味着 broker 不需要等待消息积累到一定的字节数,只要有消息(即使只有一条消息),就可以立即返回给消费者。
使用场景:
  1. 高吞吐量场景
    • 如果你的应用需要批量处理 Kafka 消息,可以将 fetch.min.bytes 设置得大一些,确保每次拉取的数据足够多,以减少频繁的网络请求。
    • 适合需要处理大批量数据的系统,比如数据分析或日志处理系统。这时,你可以设置较高的 fetch.min.bytes,让 Kafka broker 等待更多消息积累后再返回。
  2. 低延迟场景
    • 如果你希望 Kafka 消息能尽快被消费,不希望消费者等待消息积累,你可以将 fetch.min.bytes 保持默认值(1)。这时,broker 会在有数据可供消费时尽快返回,减少延迟。
    • 适合需要快速响应的系统,比如实时监控或流数据处理。

如果poll(100ms),fetch.max.wait.ms=500ms,那么100ms后mq未达到fetch.min.bytes。客户端会得到当前的records吗?

不会

例子

假设有以下配置:

  • fetch.min.bytes = 1MB
  • fetch.max.wait.ms = 500ms
  • 消费者调用了 poll() 向 broker 请求数据。

情况1:Broker 上的消息量 < 1MB

  • 当消费者请求数据时,broker 上只有 500KB 的消息。
  • Broker 会等到 fetch.max.wait.ms(500ms),试图等待更多消息的到达。
  • 如果在 500ms 内消息累积达到了 1MB,broker 会立即返回这 1MB 的消息。
  • 如果 500ms 过去了,仍然没有足够的消息(比如只有 700KB),broker 会返回这些 700KB 的消息。

情况2:Broker 上的消息量 >= 1MB

  • 如果消费者请求时,broker 上已经有 1MB 或更多消息,broker 会立即返回这些消息,不再等待 fetch.max.wait.ms

**fetch.max.wait.ms**** 与 **fetch.min.bytes** 配合的意义:**

  • **fetch.min.bytes** 设置了消费者希望每次拉取的最小数据量,这样可以避免频繁拉取少量消息,提高吞吐量。
  • **fetch.max.wait.ms** 防止消费者因为等不到足够多的消息而无限期等待,设置了一个时间上限。如果在这个时间内没有足够的数据,broker 仍然会返回已有的数据,避免消费者一直没有数据处理。

fetch.max.wait.ms 与 poll的超时应该相等比较合理,这样poll不会在mq消息量不足的时候拉到空数据空跑浪费cpu资源?

不是

理解两者的关系:

  1. **fetch.max.wait.ms**
    • 这是 Kafka broker 在消息不足时等待积累更多数据的最大时间。如果在这段时间内没有更多数据到达,broker 会返回已经积累的消息(即使小于 fetch.min.bytes)。
    • 主要目的是避免过于频繁的拉取请求,减少网络传输的开销,增加单次拉取的消息量。
  2. **poll()**** 超时时间**:
    • 这是 Kafka 消费者客户端在本地等待 broker 返回数据的最大时间。
    • 如果在 poll() 超时时间内 broker 没有返回数据,poll() 会返回空结果,并且消费者会继续下一轮 poll()
    • 主要目的是控制消费者的等待时间,以确保在没有数据的情况下不会阻塞太久。

两者的配合:

  • **fetch.max.wait.ms****poll()** 超时时间设置为相等:如果 poll() 的超时时间等于 fetch.max.wait.ms,理论上可以避免 poll() 过早返回空数据,因为 broker 正在等待积累足够的数据。

这种配置的好处是可以减少消费者空跑的概率,尤其是在消息量较小的场景中。它确保了在 poll() 的等待时间内,broker 至少有足够的时间来积累消息,最大限度地提高单次拉取的数据量。

  • 实际应用中的权衡
    • 在实际场景中,**fetch.max.wait.ms** 设置略小于 **poll()** 超时时间可能是更合理的选择。例如,你可以设置 fetch.max.wait.ms 为 500ms,poll() 超时时间为 600ms。这种配置让 broker 有足够时间积累数据,并且消费者 poll() 方法也有足够时间等待 broker 返回数据。
    • 如果 poll() 超时时间与 fetch.max.wait.ms 完全相等,有时可能会导致 poll() 稍微早于 broker 返回数据,从而造成一些无效的 poll() 调用。稍微延长 poll() 时间可以避免这一问题。

其他因素影响:

  • 消息吞吐量和延迟:过长的 fetch.max.wait.mspoll() 时间会增加数据的累积量,但也可能增加处理延迟。如果应用对实时性要求较高,可能需要缩短这两个时间,使得消费者更频繁地获取消息。
  • CPU 和网络资源:延长这两个时间可以减少空跑和频繁的拉取请求,从而节省 CPU 和网络资源。但如果设置过长,可能会造成消费者响应不及时,特别是当消息积压严重时。

示例:

假设 fetch.max.wait.ms 设置为 500ms,poll() 超时时间设置为 600ms:

  • 如果在 500ms 内 broker 积累了足够消息,broker 会立即返回数据,消费者的 poll() 将会在接收到消息后立刻处理。
  • 如果 broker 在 500ms 内没有积累到足够的消息,它会返回当前可用的数据,poll() 超时不会过早触发,确保了消费者不会空跑。
  • 如果 poll() 超时时间设置得比 fetch.max.wait.ms 短,消费者可能会在 broker 还未返回数据之前超时,导致空轮询。

总结:

  • 一般建议:可以将 poll() 超时时间设置得稍微大于 fetch.max.wait.ms,这样可以确保 broker 有足够时间积累消息,同时避免 poll() 过早超时。
  • 合理的设置fetch.max.wait.ms = 500ms,poll() 超时 = 600ms。这样 broker 可以最大限度积累数据,消费者也有足够时间等待数据返回,避免空跑。

这种策略会帮助你在减少空轮询提高批量处理效率之间找到平衡。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/5040.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

精美的Python Rich

今天给大家推荐一个非常精美的终端工具 - Python Rich Rich 是一个专为 Python 开发者打造的终端美化库&#xff0c;能让你的控制台输出内容更具视觉效果&#xff01;通过简单易用的 Rich API&#xff0c;可以快速为终端文本添加颜色和样式&#xff0c;让原本单调的输出变得丰…

【react框架之dvajs】dva数据流你可能还不知道的subscriptions隐藏的秘密

Subscriptions 是一种从 源 获取数据的方法&#xff0c;它来自于 elm。 语义是订阅&#xff0c;用于订阅一个数据源&#xff0c;然后根据条件 dispatch 需要的 action。数据源可以是当前的时间、服务器的 websocket连接、keyboard 输入、geolocation 变化、history 路由变化等等…

基于单片机的燃气报警阀门系统

本设计基于单片机的燃气报警阀门系统&#xff0c;燃气报警阀门系统采用STM32主控制器为核心芯片&#xff0c;外围电路由燃气传感器、OLED液晶显示模块、按键模块、蜂鸣器报警模块、电磁阀以及SIM800模块等模块组成。燃气传感器模块负责采集燃气浓度数据&#xff0c;采集完成由S…

python怎么去掉换行符

换行符与其他字符并没有区别&#xff0c;由于换行符总是最后一个字符&#xff0c;所以直接选择除去最后一个字符的所有字符即可。 x abc\n x[:-1] 也可以使用字符串的strip()方法 但是strip()方法除了会去掉换行符&#xff0c;还会去掉空格等其他字符。 x.strip()

Webserver(4.4)多进程/多线程实现并发服务器

目录 多进程实现并发服务器多线程实现并发服务器TCP状态转换 多进程实现并发服务器 要实现TCP服务器处理并发的任务&#xff0c;使用多线程或者多进程来解决 一个父进程&#xff0c;多个子进程 父进程负责等待并接受客户端的连接 子进程&#xff1a;完成通信&#xff0c;接收一…

Pinterest会成为亚马逊的新流量入口吗?

Pinterest 作为一个以图片分享为主的社交媒体平台&#xff0c;全球月活跃用户约为 4.368亿。同时&#xff0c;Pinterest 的用户群体以女性为主&#xff0c;占比高达 70% 以上&#xff0c;且多数是 18 岁到 44 岁之间的中高收入人群&#xff0c;具有较强的购买力和消费能力。对于…

SpeechT5 模型

微软开源的 SpeechT5 语音模型&#xff0c;主要包括以下功能 语音转文字&#xff1a;用于自动语音识别&#xff08;ASR&#xff09;。文字转语音&#xff1a;用于合成音频&#xff08;TTS&#xff09;。语音转语音&#xff1a;用于不同声音之间的转换或进行语音增强。 T5 网络…

.NET 8 中 Entity Framework Core 的使用

本文代码&#xff1a;https://download.csdn.net/download/hefeng_aspnet/89935738 概述 Entity Framework Core (EF Core) 已成为 .NET 开发中数据访问的基石工具&#xff0c;为开发人员提供了强大而多功能的解决方案。随着 .NET 8 和 C# 10 中引入的改进&#xff0c;开发人…

我要精通前端-块级元素和行内元素再度深入学习笔记

真的发现前端天天增删改查&#xff0c;真的是问一些比较细节的知识&#xff0c;我真的懂么 1、块级元素间的margin会重叠&#xff0c; <div class"head"></div> <div class"content"></div>.head {margin: 5px;border: 10px sol…

sparkSQL的UDF,最常用的regeister方式自定义函数和udf注册方式定义UDF函数 (详细讲解)

- UDF&#xff1a;一对一的函数【User Defined Functions】 - substr、split、concat、instr、length、from_unixtime - UDAF&#xff1a;多对一的函数【User Defined Aggregation Functions】 聚合函数 - count、sum、max、min、avg、collect_set/list - UDTF&#xff1a;…

[SAP ABAP] 面向对象程序设计-类和对象

面向对象开发的特点&#xff1a;封装、继承和多态 什么是类和对象&#xff1f; 类(CLASS)是创建对象的模板&#xff0c;对象(OBJECT)是类的实例 一个类可以创建多个对象 类 > 类型 对象 > 个体 在ABAP语言中&#xff0c;定义一个类&#xff0c;需要包含定义(defin…

需求不明确时如何设计测试用例?

&#x1f345; 点击文末小卡片 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 1、与产品澄清问题 需求不明确时&#xff0c;首先&#xff0c;应弄明白&#xff0c;需求有哪些模块及功能算法不明确&#xff1f; 需求有问题找相关负责人沟通…

C++:多态中的虚/纯虚函数,抽象类以及虚函数表

我们在平时&#xff0c;旅游或者是坐高铁或火车的时候。对学生票&#xff0c;军人票&#xff0c;普通票这些概念多少都有些许耳闻。而我们上篇文章也介绍过了继承与多继承。如果这些票我们都分别的去写一个类&#xff0c;当然很冗余&#xff0c;这里我们便可以去使用继承&#…

Sun Solaris开机自启配置

Sun Solaris 开机自启配置 1. 运行级别定义&#xff08;rc0.d — rcS.d&#xff09; Linux/Solaris系统启动相关目录、脚本说明&#xff1a; init: 系统启动超级进程inittab: 进程启动配置init.d: 启动脚本存放目录rc0---rc6: 运行级别目录rcS: 单用户模式启动脚本 Linux/S…

机器学习—例子:图像识别

在上篇文章中&#xff0c;在一个需求预测示例中看到了神经网络是如何工作的&#xff0c;那么如何将类似类型的想法应用于计算机视觉应用程序。 如果你正在开发人脸识别应用程序&#xff0c;让我们深入研究一下。假设一个神经网络将这样的图片作为输入&#xff0c;并输出图片中…

微服务系列五:避免雪崩问题的限流、隔离、熔断措施

目录 实验环境说明 前言 一、一片小雪花引起的雪崩&#xff01; 1.1 雪崩问题&#xff08;级联失败问题&#xff09;示意图 1.2 雪崩问题的产生原因与解决策略 二、雪崩问题的具体解决策略 2.1 请求限流 2.2 线程隔离 2.3 服务熔断 2.4 总结——具体解决策略 三、微…

C语言之写一个修改数组内容的函数

问题代码: 函数ltrim是为了消除buf字符数组中左边空格&#xff0c; memmove函数介绍 如果对c语言指针运用非常熟练的人,结合函数功能就会发现这个代码非常的傻逼&#xff0c;你会发现为什么需要返回&#xff0c;buf不用接收返回值&#xff0c;执行这个函数后buf中的内容就已经…

第二十七章 Vue异步更新之$nextTick

目录 一、概述 二、完整代码 2.1. main.js 2.2. App.vue 一、概述 需求&#xff1a;编辑标题, 弹出显示编辑框自动聚焦 1. 点击编辑&#xff0c;显示编辑框 2. 让编辑框&#xff0c;立刻获取焦点 我们常规的思路可能会编写如下代码来实现&#xff1a; 问题&#xff1a…

【含文档】基于ssm+jsp的IT论坛系统(含源码+数据库+lw)

1.开发环境 开发系统:Windows10/11 架构模式:MVC/前后端分离 JDK版本: Java JDK1.8 开发工具:IDEA 数据库版本: mysql5.7或8.0 数据库可视化工具: navicat 服务器: apache tomcat 主要技术: Java,Spring,SpringMvc,mybatis,mysql,vue 2.视频演示地址 3.功能 系统定义了三个…

【运维心得】按任何键都不能进BIOS三步解决

目录 第一步 键盘 第二步 工具 第三步 短路 估计经常搞运维的朋友&#xff0c;会经常碰到这个问题。 第一步 键盘 这个现象出现&#xff0c;首先要确定开机时&#xff0c;屏幕上是否会显示提示字符&#xff1f;比如F2、F10、DEL键之类的&#xff0c;如果有&#xff0c;那么就…