Kafka 之批量消息发送消费

前言:

前面我们分享了 Kafka 的一些基础知识,以及 Spring Boot 集成 Kafka 完成消息发送消费,本篇我们来分享一下 Kafka 的批量消息发送消费。

Kafka 系列文章传送门

Kafka 简介及核心概念讲解

Spring Boot 整合 Kafka 详解

Kafka @KafkaListener 注解的详解及使用

Kafka 客户端工具使用分享【offsetexplorer】

Kafka 之消息同步/异步发送

Kafka 消息批量发送

Kafka 没有提供批量发送消息的 API,Kafka 的方式是提供一个 RecordAccumulator 消息收集器,将发送给同一个 Topic 同一个 Partition 的消息先缓存起来,当其达到某些条件后,才会一次性的将消息提交给 Kafka Broker。

Kafka 消息的批量发送主要跟以下三个参数有关:

  • batch.size:批量发送消息的大小,默认 16KB,产生的消息达到这个数量后,即刻触发消息批量提交到 Kafka Broker。
  • buffer.memory:生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数,模式是 32 MB,如果超过这个数量,即刻触发消息批量提交到 Kafka Broker。
  • linger.ms:批量发送的的最大时间间隔,单位是毫秒,当达到配置的时间之后,会立刻触发消息批量提交大 Kafka Broker。

以上三个条件满足一个就会触发消息的批量提交。

官方文档传送门

Kafka 批量消息 参数配置

上面我们分析了 Kafka 没有提供批量发送的 API,而是使用了三个参数来控制批量发送的,换句话说,其实我们每次使用 Kafka 发送消息的时候都是批量发送,Kafka 批量发送消息的代码没有什么特殊之处,只需要对上面解释的三个参数进行按需配置即可,本案例的配置如下:

#批量发送消息的大小 默认 16KB 我们这里为了演示效果 配置为1Kb
spring.kafka.producer.batch-size = 1024
#生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数  默认 32M
spring.kafka.producer.buffer-memory = 33554432
#批量发送的的最大时间间隔,单位是毫秒
spring.kafka.producer.properties.linger.ms=50000

Kafka 批量消息 Producer 代码演示

Kafka 批量发送消息代码如下:

package com.order.service.kafka.producer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;/*** @ClassName: MyKafkaBatchProducer* @Author: Author* @Date: 2024/10/22 19:22* @Description:*/
@Slf4j
@Component
public class MyKafkaBatchProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void batchSendMessage() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(new Date());log.info("开始消息发送,当前时间:{}", dateStr);for (int a = 0; a < 1000; a++) {this.kafkaTemplate.send("my-topic", "第" + a + "条 kafka 消息");}log.info("完成消息发送,当前时间:{}", dateStr);}}

在 Kafka 发送完成消息后,我们记录了当前时间,这个时间是用来证明消息是被批量发送的。

Kafka 批量消息 Consumer 代码演示

Kafka 批量消息的代码也没有什么特殊之处,还是使用 @KafkaListener注解来监听消息,只不过参数变成了 List<ConsumerRecord<String, String>> 类型,然后我们在配置中配置了批量消费的模式,批量消费的配置如下:

#Kafka 的消费模式 single 每次单条消费消息  batch  每次批量消费消息
spring.kafka.listener.type = batch

Consumer 代码如下:

package com.order.service.kafka.consumer;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;/*** @ClassName: MyKafkaBatchConsumer * @Author: Author* @Date: 2024/10/22 19:22* @Description:*/
@Slf4j
@Component
public class MyKafkaBatchConsumer {@KafkaListener(id = "my-kafka-consumer-01",groupId = "my-kafka-consumer-groupId-01",topics = "my-topic",containerFactory = "myContainerFactory",properties = {"max.poll.records:10"})public void listen(List<ConsumerRecord<String, String>> consumerRecords) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(new Date());log.info("my-kafka-consumer-groupId-01 消息消费成功,当前时间:{},消息size:{}", dateStr, consumerRecords.size());for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {String value = consumerRecord.value();log.info("消息内容:{}",value);}}}

这里我们使用了 properties 这个属性配置,后面详细讲解。

** Kafka 批量消息验证**

触发消息发送消费结果如下:

2024-10-27 15:27:17.563  INFO 18320 --- [nio-8086-exec-2] c.o.s.k.producer.MyKafkaBatchProducer    : 完成消息发送,当前时间:2024-10-27 15:27:17
2024-10-27 15:27:22.569  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : my-kafka-consumer-groupId-01 消息消费成功,当前时间:2024-10-27 15:27:22,消息size:10
2024-10-27 15:27:22.569  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第0条 kafka 消息
2024-10-27 15:27:22.569  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第1条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第2条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第3条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第4条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第5条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第6条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第7条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第8条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第9条 kafka 消息

2024-10-27 15:27:17 完成消息发送,:2024-10-27 15:27:22 完成消息消费,时间间隔是 5秒,消息是 10 条,符合预期。

我们修改配置再次演示,将批量发送消息的时间间隔改为 10 秒,同时一次性发送 1000 条消息,是消息的总大小大于 1KB。

#批量发送消息的大小 默认 16KB 我们这里为了演示效果 配置为1Kb
spring.kafka.producer.batch-size = 1024
#生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数  默认 32M
spring.kafka.producer.buffer-memory = 33554432
#批量发送的的最大时间间隔,单位是毫秒
spring.kafka.producer.properties.linger.ms=50000

调整消息发送端代码如下:

package com.order.service.kafka.producer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;/*** @ClassName: KafkaProducer* @Author: Author* @Date: 2024/10/22 19:22* @Description:*/
@Slf4j
@Component
public class MyKafkaBatchProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void batchSendMessage() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(new Date());log.info("开始消息发送,当前时间:{}", dateStr);for (int a = 0; a < 1000; a++) {this.kafkaTemplate.send("my-topic", "第" + a + "条 kafka 消息");}log.info("完成消息发送,当前时间:{}", dateStr);}}

触发消息发送消费结果如下:

2024-10-27 15:41:39.530  INFO 17440 --- [nsumer-01-2-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : my-kafka-consumer-groupId-01 消息消费成功,当前时间:2024-10-27 15:41:39,消息size:10
2024-10-27 15:41:39.530  INFO 17440 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : my-kafka-consumer-groupId-01 消息消费成功,当前时间:2024-10-27 15:41:39,消息size:10

可以看到消息发送和消息消费几乎是同时进行的,因为这里我们打印的是时间只有秒,是看不出差异的,但是也可以根据这个结果看出,消费者并没有等到 10秒后才开始消费,是因为批量发送消息的大小大于了1KB 就触发了批量消息的提交,符合上面我们说的三个条件满足其中一个就触发批量消息提交到 Kafka Broker,结果符合预期。

关于 buffer-memory 这个参数这里不做验证了,有兴趣的朋友可以自己去验证哈。

spring.kafka.consumer.max-poll-records 参数讨论

spring.kafka.consumer.max-poll-records 表示一次调用 poll() 操作时返回的最大记录数,默认为 500 条,上面的案例中我们使用了 properties = {“max.poll.records:10”} 这个配置,其实这个配置也是配置批量拉去消息的最大数量,我们配置的是 10,日志记录每次最多拉去的数量就是 10,使用 properties 的配置方式可以覆盖掉项目配置文件中的配置,也就是局部配置覆盖全局配置,这样做的好处是显而易见的,我们可以针对每个消费端按需做出灵活配置。

总结:本篇简单分享了 Kafka 批量发送消息消费的一些案例,希望可以帮助到有需要的朋友,分享有错误的地方也欢迎大家提出纠正。

如有不正确的地方欢迎各位指出纠正。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/3442.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

交替传译收费标准

交替传译是一种高端口服务&#xff0c;常用于国际会议、商务洽谈、学术交流等多语言会议场合&#xff0c;演讲者的发言一般不超过15分钟&#xff0c;交替传译员和演讲者采取接力式交替发言&#xff0c;在这种模式下&#xff0c;口译员需要具备优秀的记忆能力和翻译功底。其价格…

灵动AI视频:重塑视频创作,智启无限灵感!

&#x1f680; 在这个视觉为王的时代&#xff0c;视频创作已成为展现创意与才华的重要舞台。然而&#xff0c;繁琐的剪辑流程、有限的创意资源往往成为制约创作者发挥的瓶颈。灵动AI视频&#xff0c;一款集智能、高效、创意于一体的视频编辑神器&#xff0c;正为视频创作领域带…

生物信息学R语言

检查R语言安装包和依赖 .libPaths() 这里有一个简单的生物信息学分析案例&#xff0c;使用R语言处理基因表达数据。这个示例中&#xff0c;我们将导入模拟的基因表达数据&#xff0c;进行数据预处理&#xff08;如归一化&#xff09;&#xff0c;并使用主成分分析&#xff08…

基于VsCode platformio的stm32开发环境搭建

背景 VsCode作为当下流行的编辑器&#xff0c;且不单单是一个编辑器里面集成了很多插件&#xff0c;使用这些插件可以完成很多功能。 STM32开发环境除了KEIL与IAR&#xff0c;其实还有很多其他的开方方式&#xff0c;ST官方提供了很多的开发软件&#xff0c;基于Eclipse也可以…

【题解】【排序】—— [NOIP2017 普及组] 图书管理员

【题解】【排序】—— [NOIP2017 普及组] 图书管理员 [NOIP2017 普及组] 图书管理员题目背景题目描述输入格式输出格式输入输出样例输入 #1输出 #1 提示 1.思路解析2.AC代码 [NOIP2017 普及组] 图书管理员 通往洛谷的传送门 题目背景 NOIP2017 普及组 T2 题目描述 图书馆中…

华为和思科的配置

vrrp和mstp 思路 vrrp是用来虚拟网关&#xff0c;噢&#xff0c;是虚拟一条虚拟网关 优先级&#xff0c;priority越大越优先&#xff0c;优先级相同&#xff0c;哪个的路由器的vrrp先起来&#xff0c;谁就是主 mstp是快速生成树协议&#xff0c;防止环路用的 优先级越小越优…

React 前端如何通过组件完成 “下载 Excel模板” 和 “上传 Excel 文件并读取内容生成可使用的对象数组”

文章目录 一、Excel 模板下载01、代码示例 二、Excel 文件上传01、文件展示02、示例代码03、前端样式展示04、数据结果展示 三、完整代码 本文的业务需求是建立在批量导入数据的情况下&#xff0c;普通组件只能少量导入&#xff0c;数据较多的情况都会选择 Excel 数据导入&…

『统计检验』一篇文章入门置信区间

文章目录 置信区间点估计和区间估计置信度置信区间的计算置信区间计算的具体例子 参考文献 置信区间 置信区间是总体参数落在测量结果周围的程度 点估计和区间估计 点估计&#xff1a;通过样本数据估计总体参数 ⇒ \Rightarrow ⇒使用样本统计量&#xff08;如样本均值、样本…

ESRALLY安装与使用

ESRALLY安装与使用 geonames、geopoint:都是和地理位置相关的,如果需要测试ES在地理位置处理的性能可以选用 http_logs:是http_server的,如果要测服务器日志、redis日志、apache日志可以选用 说明:esrally 自带的测试数据即为 rally_track 文件夹中的内容,主要包括: Ge…

SpringMvc day1101

ok了家人们&#xff0c;今天我们继续 studying springMvc&#xff0c;let‘me see see 四.SSM整合 SpringMVC Spring MyBatis WebConfig SpringConfigMybatisConfig SpringMvcSupport jdbc.properties 表现层 业务层持久层 EmpController EmpServiceEmpMapper EmpServiceIm…

关于基于 GA102 核心的显卡及主要参数

基于 GA102 核心的显卡的主要参数&#xff1a; 主要用途 高端游戏, 专业图形处理 高端游戏, 专业图形处理 高端游戏, 专业图形处理 高端游戏, 专业图形处理 专业图形处理, 数据中心 数据中心, AI 计算 解释 CUDA 核心数&#xff1a;更多的 CUDA 核心意味着更强的并行计算能力。…

C++ 多态 (详解)

多态的概念 通俗来说&#xff0c;就是多种形态&#xff0c;具体点就是去完成某个行为&#xff0c;当不同的对象去完成时会产生出不同的状态。举个栗子&#xff1a;比如买票这个行为&#xff0c;当普通人买票时&#xff0c;是全价买票&#xff1b;学生买票时&#xff0c;是半价…

雷池社区版新版本功能防绕过人机验证解析

前两天&#xff0c;2024.10.31&#xff0c;雷池社区版更新7.1版本&#xff0c;其中有一个功能&#xff0c;新增请求防重放 更新记录&#xff1a;hhttps://docs.waf-ce.chaitin.cn/zh/%E7%89%88%E6%9C%AC%E6%9B%B4%E6%96%B0%E8%AE%B0%E5%BD%95 仔细研究了这个需求&#xff0c;…

省级-社会保障水平数据(2007-2022年)

社会保障水平是一个综合性的概念&#xff0c;它不仅涉及到一个国家或地区的社会保障制度覆盖范围&#xff0c;还包括了提供的保障种类与水平&#xff0c;以及这些制度在满足公民基本生活需求方面的能力。 2007-2022年省级-社会保障水平数据.zip资源-CSDN文库https://download.…

如何搭建汽车行业AI知识库:定义+好处+方法步骤

在汽车行业&#xff0c;大型车企面临着员工众多、价值链长、技术密集和知识传播难等挑战。如何通过有效的知识沉淀与应用&#xff0c;提升各部门协同效率&#xff0c;快速响应客户咨询&#xff0c;降低销售成本&#xff0c;并开启体系化、可持续性的知识管理建设&#xff0c;成…

【C++篇】数据之林:解读二叉搜索树的优雅结构与运算哲学

文章目录 二叉搜索树详解&#xff1a;基础与基本操作前言第一章&#xff1a;二叉搜索树的概念1.1 二叉搜索树的定义1.1.1 为什么使用二叉搜索树&#xff1f; 第二章&#xff1a;二叉搜索树的性能分析2.1 最佳与最差情况2.1.1 最佳情况2.1.2 最差情况 2.2 平衡树的优势 第三章&a…

如何在Linux下部署自己的ZFile开源网盘

ZFile 项目介绍 ZFile是一个功能强大、灵活的开源网盘系统&#xff0c;为用户提供安全便捷的文件存储和共享方案。 项目概述 ZFile由ZFile, Inc.开发和维护&#xff0c;基于Docusaurus构建。其用户友好的界面支持多种文件存储和共享功能&#xff0c;并具备高度的可定制性和扩…

平替、超越Jira?18 个最佳 Jira 替代方案【开源+免费+付费】

Jira 是一种流行的项目管理工具&#xff0c;被团队广泛用于跟踪和管理他们的任务、问题和项目。 打个不太恰当的比喻&#xff0c;Jira &#xff0c;她就是项目管理家的单反。 如果您正在寻找 Jira 的替代方案&#xff0c;本文介绍了 18个最重要的 Jira 替代方案&#xff0c;可以…

Nuxt.js 应用中的 nitro:build:public-assets 事件钩子详解

title: Nuxt.js 应用中的 nitro:build:public-assets 事件钩子详解 date: 2024/11/5 updated: 2024/11/5 author: cmdragon excerpt: nitro:build:public-assets 是 Nuxt 3 中的一个生命周期钩子,在复制公共资产之后调用。该钩子使开发者能够在构建 Nitro 服务器之前,对…

02_CC2530 + LED流水灯

CC2530 LED流水灯 前言 ​ 在搭建ZigBee定位系统前&#xff0c;先通过几个基础案例熟悉CC2530的一些外设和寄存器编程方式。CC2530基础篇由LED流水灯(按键控制启停、定时器中断方式)、定时器与Delay_ms延时函数、Uart串口通信三章组成。 按键控制启停–通用I/O中断 硬件电…