kafka消费数据太慢了,给优化下

原代码

public class KafkaConsumerDemo {public static void main(String[] args) {int numConsumers = 5; // 增加消费者的数量for (int i = 0; i < numConsumers; i++) {new Thread(new KafkaConsumerThread()).start();}}static class KafkaConsumerThread implements Runnable {@Overridepublic void run() {// 配置消费者属性Properties props = new Properties();props.put("bootstrap.servers", "114.15.78.14:9092");props.put("group.id", "test-group");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 调整消费者配置props.put("fetch.min.bytes", "1024"); // 最小获取1KB的数据props.put("fetch.max.wait.ms", "500"); // 最大等待500ms// 创建消费者实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("test-topic"));// 消费消息; 如果消息处理逻辑允许,可以批量处理消息,而不是逐条处理while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));if (!records.isEmpty()) {for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}// 批量提交偏移量consumer.commitSync();}}}}
}  

在 Kafka 消费者代码中,有几个地方可以进行优化以提高消费速度:

增加消费者线程:你已经增加了消费者的数量,这是一个很好的开始。确保每个消费者线程都在不同的分区上消费,以实现并行处理。

调整 fetch.min.bytes 和 fetch.max.wait.ms:这两个参数控制了消费者从 Kafka
服务器获取数据的最小字节数和最大等待时间。如果你的消息较小,增加 fetch.min.bytes
可能会导致消费者等待更长时间以收集足够的数据,这可能会降低消费速度。如果你的消息较大,可以适当增加这个值。

异步提交偏移量:你当前使用的是 consumer.commitSync() 进行同步提交偏移量,这会阻塞当前线程直到提交完成。可以考虑使用
consumer.commitAsync() 异步提交,这样可以不阻塞消费线程。

增加 max.poll.records:poll
方法有一个参数可以设置一次拉取的最大记录数。如果设置得太低,可能会导致频繁的网络请求,增加延迟。

合理设置 auto.commit.interval.ms:自动提交偏移量的间隔设置得太短可能会导致不必要的性能开销,可以适当增加这个值。

使用多线程处理消息:如果你的消息处理逻辑是 CPU 密集型的,可以考虑使用线程池来并行处理消息。

监控和调优:使用 Kafka 的监控工具来观察消费者的行为,找出瓶颈并进行调优。

下面是根据上述建议优化后的代码示例:

public class KafkaConsumerDemo {public static void main(String[] args) {int numConsumers = 5; // 增加消费者的数量for (int i = 0; i < numConsumers; i++) {new Thread(new KafkaConsumerThread()).start();}}static class KafkaConsumerThread implements Runnable {@Overridepublic void run() {// 配置消费者属性Properties props = new Properties();props.put("bootstrap.servers", "114.15.78.14:9092");props.put("group.id", "test-group");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "5000"); // 增加自动提交偏移量的间隔props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 调整消费者配置props.put("fetch.min.bytes", "1"); // 减少最小获取字节数props.put("fetch.max.wait.ms", "100"); // 减少最大等待时间props.put("max.poll.records", "500"); // 增加一次拉取的最大记录数// 创建消费者实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("test-topic"));// 消费消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));if (!records.isEmpty()) {processRecords(records); // 异步处理消息consumer.commitAsync(); // 异步提交偏移量}}}private void processRecords(ConsumerRecords<String, String> records) {// 异步处理消息的逻辑for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());// 这里可以添加消息处理逻辑,例如使用线程池并行处理}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/13393.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

并发基础:(淘宝笔试题)三个线程分别打印 A,B,C,要求这三个线程一起运行,打印 n 次,输出形如“ABCABCABC....”的字符串

🚀 博主介绍:大家好,我是无休居士!一枚任职于一线Top3互联网大厂的Java开发工程师! 🚀 🌟 在这里,你将找到通往Java技术大门的钥匙。作为一个爱敲代码技术人,我不仅热衷于探索一些框架源码和算法技巧奥秘,还乐于分享这些宝贵的知识和经验。 💡 无论你是刚刚踏…

字节、快手、Vidu“打野”升级,AI视频小步快跑

文&#xff5c;白 鸽 编&#xff5c;王一粟 继9月份版本更新之后&#xff0c;光锥智能从生数科技联合创始人兼CEO唐家渝朋友圈获悉&#xff0c;Vidu大模型将于本周再次进行版本升级&#xff0c;Vidu-1.5版本即将上线。 此版本更新方向仍是重点延伸大模型的泛化能力和主体…

redis实现消息队列的几种方式

一、了解 众所周知&#xff0c;redis是我们日常开发过程中使用最多的非关系型数据库&#xff0c;也是消息中间件。实际上除了常用的rabbitmq、rocketmq、kafka消息队列&#xff08;大家自己下去研究吧~模式都是通用的&#xff09;&#xff0c;我们也能使用redis实现消息队列。…

JVM(一、基础知识)

JVM虚拟机的灵魂三问 JVM是什么&#xff1f; 广义上是一种规范&#xff0c;狭义上的是JDK中的JVM虚拟机&#xff0c;虚拟机模拟计算机的组成部分&#xff0c;可以运行我们写的应用程序&#xff0c;是对操作系统的一层抽象&#xff0c;把我们的应用程序和操作系统解耦&#xff0…

问题分析与解决:Android开机卡动画问题分析

1. 问题背景及描述 在一个android设备的开发的项目中遇到了一个比较典型的问题:在主板贴片完成后,首次刷入androdi固件验证时,遇到了按键出发开机后,系统启动到android动画界阶段时一直循环卡在此阶段,无法进入桌面。如下如所示: 此问题在许多android项目的首次点亮阶段均…

视频会议接入GB28181视频指挥调度,语音对讲方案

传统的视频会议指挥调度系统目前主流的互联网会议大部分都是私有协议&#xff0c;功能都很独立。目前主流的视频监控国标都最GB平台&#xff0c;新的需求要求融合平台要接入监控等设备&#xff0c;并能实现观看监控接入会议&#xff0c;实时语音设备指挥现场工作人员办公实施。…

跟着尚硅谷学vue2—进阶版1.0—组件化编程

2. Vue 组件化编程 1. 传统方式和使用组件方式编写的对比 1. 传统方式编写应用 2. 使用组件方式编写应用 2. 模块与组件、模块化与组件化 1. 模块 理解: 向外提供特定功能的 js 程序, 一般就是一个 js 文件为什么: js 文件很多很复杂作用: 复用 js, 简化 js 的编写, 提高 j…

WebRTC视频 01 - 视频采集整体架构

一、前言&#xff1a; 我们从1对1通信说起&#xff0c;假如有一天&#xff0c;你和你情敌使用X信进行1v1通信&#xff0c;想象一下画面是不是一个大画面中有一个小画面&#xff1f;这在布局中就叫做PIP&#xff08;picture in picture&#xff09;&#xff1b;这个随手一点&am…

【大数据学习 | HBASE高级】rowkey的设计,hbase的预分区和压缩

1. rowkey的设计 ​ RowKey可以是任意字符串&#xff0c;最大长度64KB&#xff0c;实际应用中一般为10~100bytes&#xff0c;字典顺序排序&#xff0c;rowkey的设计至关重要&#xff0c;会影响region分布&#xff0c;如果rowkey设计不合理还会出现region写热点等一系列问题。 …

Spring Boot编程训练系统:架构设计与实现技巧

1系统概述 1.1 研究背景 随着计算机技术的发展以及计算机网络的逐渐普及&#xff0c;互联网成为人们查找信息的重要场所&#xff0c;二十一世纪是信息的时代&#xff0c;所以信息的管理显得特别重要。因此&#xff0c;使用计算机来管理编程训练系统的相关信息成为必然。开发合适…

刘知远LLM——大模型微调:prompt-learningdelta tuning

文章目录 背景&概览Prompt-learningdelta tuning增量式指定式重参数化式 OpenPrompt工具包 对应视频P41-P57 如何高效使用大模型&#xff1f;涉及到NLP的前沿技术&#xff0c;如prompt-learning&delta tuning。 prompt-learning对学习大模型范式的改变&#xff0c;del…

Spring Boot编程训练系统:性能优化实践

摘要 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实施在技术上已逐步成熟。本文介绍了编程训练系统的开发全过程。通过分析编程训练系统管理的不足&#xff0c;创建了一个计算机管理编程训练系统的方案。文章介绍了编程训练系统的系统分析部分&…

电子应用产品设计方案-4:基于物联网和人工智能的温度控制器设计方案

一、概述 本温度控制器旨在提供高精度、智能化、远程可控的温度调节解决方案&#xff0c;适用于各种工业和民用场景。 二、系统组成 1. 传感器模块 - 采用高精度的数字式温度传感器&#xff0c;如 TMP117&#xff0c;能够提供精确到 0.01C 的温度测量。 - 配置多个传感器分布在…

如何在 Ubuntu 24.04 上安装和配置 Fail2ban ?

确保你的 Ubuntu 24.04 服务器的安全是至关重要的&#xff0c;特别是如果它暴露在互联网上。一个常见的威胁是未经授权的访问尝试&#xff0c;特别是通过 SSH。Fail2ban 是一个强大的工具&#xff0c;可以通过自动阻止可疑活动来帮助保护您的服务器。 在本指南中&#xff0c;我…

同三维T610UDP-4K60 4K60 DP或HDMI或手机信号采集卡

1路DP/HDMI/TYPE-C&#xff08;手机/平板等&#xff09;视频信号输入1路MIC1路LINE OUT,带1路HDMI环出&#xff0c;USB免驱&#xff0c;分辨率4K60&#xff0c;可采集3路信号中其中1路&#xff0c;按钮切换&#xff0c;可采集带TYPE-C接口的各品牌手机/平板/笔记本电脑等 同三维…

Kafka--关于broker的夺命连环问

目录 1、zk在kafka集群中有何作用 2、简述kafka集群中的Leader选举机制 3、kafka是如何处理数据乱序问题的。 4、kafka中节点如何服役和退役 4.1 服役新节点 1&#xff09;新节点准备 2&#xff09;执行负载均衡操作 4.2 退役旧节点 5、Kafka中Leader挂了&#xff0c;…

Web项目版本更新及时通知

背景 单页应用&#xff0c;项目更新时&#xff0c;部分用户会出更新不及时&#xff0c;导致异常的问题。 技术方案 给出版本号&#xff0c;项目每次更新时通知用户&#xff0c;版本已经更新需要刷新页面。 版本号更新方案版本号变更后通知用户哪些用户需要通知&#xff1f;…

Android音视频直播低延迟探究之:WLAN低延迟模式

Android WLAN低延迟模式 Android WLAN低延迟模式是 Android 10 引入的一种功能&#xff0c;允许对延迟敏感的应用将 Wi-Fi 配置为低延迟模式&#xff0c;以减少网络延迟&#xff0c;启动条件如下&#xff1a; Wi-Fi 已启用且设备可以访问互联网。应用已创建并获得 Wi-Fi 锁&a…

Appium配置2024.11.12

百度得知&#xff1a;谷歌从安卓9之后不再提供真机layout inspector查看&#xff0c;仅用于支持ide编写的app调试用 所以最新版android studio的android sdk目录下已经没有了布局查看工具... windows x64操作系统 小米k30 pro手机 安卓手机 Android 12 第一步&#xff1a…

前端使用Canvas实现网页电子签名(兼容移动端和PC端)

实现效果&#xff1a; 要使用Canvas实现移动端网页电子签名&#xff0c;可以按照以下步骤&#xff1a; 在HTML文件中创建一个Canvas元素&#xff0c;并设置其宽度和高度&#xff0c;以适配移动设备的屏幕大小。 // 创建一个canvas元素 let canvas document.createElement(&q…