kafka 消费者线程安全问题详细探讨

内容概要

图片

主要内容

常见错误案例

下面这段代码大概逻辑

  • 初始化时 实例化KafkaConsumer, 开启线程拉取消息并且处理

  • 资源释放回调 停止线程、调用kafkaConsumer.close进行资源释放

表面上没有问题,但实际上可能出现线程安全问题,因为poll 和 close 两个操作可能同时执行,因此存在线程安全问题, 如何修改,读者自己思考下。

    @PostConstructpublic void consumer(){kafkaConsumer = new KafkaConsumer(getConfig());kafkaConsumer.subscribe(Arrays.asList("test_partition_num"));new Thread(new Runnable() {@Overridepublic void run() {while(running){ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofMillis(1000));records.forEach(record->{System.out.println(" partition =" + record.partition()  +" offset  = " + record.offset() + " value = " + record.value());});}}}).start();}@PreDestroypublic void close(){running = false;if(kafkaConsumer != null){kafkaConsumer.close();}}

消费者非线程安全代码解读

kafka生成者是线程安全的,但消费者是非线程安全的。KafkaConsumer

  • 相关操作前

    • 调用acquire()方法,校验线程安全问题,如果发现其他线程也在操作,则直接抛出异常。

  • 操作完成后

    • 调用release()清除痕迹

acquire()相对于加锁,release()相当于释放锁。

参看poll 方法实现,一目了然。

    private void acquire() {long threadId = Thread.currentThread().getId();if (threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) {throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");} else {this.refcount.incrementAndGet();}}private void release() {if (this.refcount.decrementAndGet() == 0) {this.currentThread.set(-1L);}}

图片

poll源码

如何实现消费者多线程消费消息呢

思路1

每次实例化一个 KafkaConsumer

这种方式实现简单,但每次都需要建立TCP 链接


思路2

相关操作方法 加上  synchronized,获取使用Lock 加锁保证线程安全

这种方式性能较差

思路3

拉取消息使用一个线程, 消息处理使用多线程

因为通常拉取消息比较快,消息处理比较耗时,由于消息处理不涉及KafkaConsumer 相关API 操作,因此不存在线程安全问题。这种方式建议消息位移设置自动提交,否则编程复杂度较高。

示例代码

ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofMillis(1000));executorService.execute(()->{//处理消息records.forEach(record->{System.out.println(" partition =" + record.partition()  +" offset  = " + record.offset() + " value = " + record.value());});
});

旁敲侧击 举一反三

面试题回顾 Dubbo 线程模型

通常我们线程分为两类

  • IO 线程:负责网络通信的读写操作,接收和发送请求与响应。

  • 业务线程:处理具体的业务逻辑,避免因业务处理耗时过长而阻塞 IO 线程。


Dubbo 线程模型有几种你还记得否?该如何选择?

  • AllDispatcher:所有消息都派发到线程池,包括请求、响应、连接事件、断开事件、心跳等。

  • DirectDispatcher:所有消息都不派发到线程池,全部在 IO 线程上直接执行。

  • MessageOnlyDispatcher:只有请求和响应消息派发到线程池,其它连接断开、心跳等消息直接在 IO 线程上执行。

  • ExecutionDispatcher:只把请求消息派发到线程池,响应和其它连接、断开、心跳等消息直接在 IO 线程上执行。

其实选择的依据 业务处理的快慢,如果业务处理很快则建议让业务处理逻辑放到 IO线程中执行,这样避免线程上下文切换影响性能。反之则处理逻辑需要放到具体的业务线程中执行。

一般来说业务执行需要查询数据库,绝大数场景建议使用默认的 AllDispatcher 

是不是又和我一起温故知新了,加油吧 少年 !!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1543686.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

Jetpack Compose 核心组件(Text, Images, Buttons)(6)

导读大纲 1.1 基本组件介绍1.2 Text1.2.1 基本用法1.2.2 设计文字风格 1.3 Image组件1.3.1 从各种来源加载图片1.3.2 关键属性1.3.3 如何加载和显示不同类型的图像1.3.4 内容描述和无障碍访问: 1.4 Button组件1.4.1 基本用法1.4.2 装饰和自定义1.4.3 处理按钮点击1.4.4 重要考虑…

基于python深度学习遥感影像地物分类与目标识别、分割实践技术

我国高分辨率对地观测系统重大专项已全面启动&#xff0c;高空间、高光谱、高时间分辨率和宽地面覆盖于一体的全球天空地一体化立体对地观测网逐步形成&#xff0c;将成为保障国家安全的基础性和战略性资源。未来10年全球每天获取的观测数据将超过10PB&#xff0c;遥感大数据时…

JS惰性函数两种实现方式

惰性函数的本质就是函数重写&#xff0c;所谓惰性载入&#xff0c;指函数执行的分支只会发生一次。那什么时函数重写呢&#xff1f;由于一个函数可以返回另一个函数&#xff0c;因此可以用新的函数在覆盖旧的函数。 惰性函数有两种实现方式&#xff1a; 1、在函数被调用时&am…

案例研究丨国控星鲨利用DataEase释放数据潜能,重塑业务视野

国药控股星鲨制药&#xff08;厦门&#xff09;有限公司&#xff08;以下简称为国控星鲨&#xff09;始创于1952年&#xff0c;前身为厦门鱼肝油厂&#xff0c;距今已经有70余年历史&#xff0c;是国家商务部认定的“中华老字号”企业。2011年&#xff0c;国药控股与厦门轻工集…

2024年国庆小长假即将来临,陪猫咪的同时应该如何清浮毛

在父母眼中我们是不是永远都长不大&#xff1f;每次和他们讨论一点事情就开始吵起来。这不&#xff0c;前两天想着和好久不见的朋友去见面&#xff0c;出门前还要被逼问一番。 去到朋友家&#xff0c;发现朋友养了两只可爱的小猫&#xff0c;一时心动上头&#xff0c;我也转身…

通信工程学习:什么是MANO管理编排

MANO&#xff1a;管理编排 MANO&#xff1a;Management and Network Orchestration&#xff08;管理和网络编排&#xff09;在网络功能虚拟化&#xff08;NFV&#xff09;架构中扮演着至关重要的角色。MANO是一个由多个功能实体组合而成的层次&#xff0c;这些功能实体负责管理…

地图定位流程

用户端在小程序认证通过后会自动进行定位&#xff0c;也可以在首页手动定位&#xff0c;定位成功后用户在查询家政服务项目时会根据定位的城市查询该城市有哪些服务项目。 高德地图配置 小程序端的定位是通过手机的定位模块进行定位&#xff0c;定位成功获取经纬度坐标&#x…

吸烟行为检测、重点区域吸烟检测、吸烟检测算法样本标注

吸烟检测算法主要用于公共场所、工作场所和家庭环境中的吸烟行为监控&#xff0c;通过图像识别技术来检测和识别吸烟行为&#xff0c;以确保环境卫生和公共安全。这种技术可以帮助管理者实时监控吸烟行为&#xff0c;及时采取措施&#xff0c;减少二手烟的危害。 一、技术实现…

55 循环神经网络RNN的实现_by《李沐:动手学深度学习v2》pytorch版

系列文章目录 文章目录 系列文章目录循环神经网络的从零开始实现[**独热编码**]初始化模型参数循环神经网络模型预测[**梯度裁剪**]训练小结练习 循环神经网络的从零开始实现 import math import torch from torch import nn from torch.nn import functional as F from d2l i…

大数据系统调优:从DAG到单机

目标&#xff1a;优化T10的时效性全局DAG调度层优化&#xff1a;提前任务开始时间&#xff1a; 1. 优化慢结点&#xff1a;T10依赖了T4,T7,T8, 其中T8为瓶颈&#xff0c;如果T8能提前点完成&#xff0c;T10可以早点开始&#xff0c;就能早点完成 2. 快结点做更多预计算…

WEB领域是不是黄了还是没黄

进入2024年后&#xff0c;WEB领域大批老表失业&#xff0c;一片哀嚎&#xff0c;个个饿的鬼叫狼嚎&#xff0c;为啥呢&#xff0c;下面是我个人的见解和看法。 中国程序员在应用层的集中 市场需求&#xff1a;中国的互联网行业在过去几年中经历了爆炸性增长&#xff0c;尤其是…

python和pyqt-tools安装位置

一.python的安装位置 1.查询安装的python的位置 先查询python&#xff0c;然后输入import sys和sys.path 二.python-tools的安装位置 找到python的文件后按下图路径即可查到tools的文件

UEFI EDK2框架学习 (一)

01 Shell界面打印 执行qemu指令后 qemu-system-x86_64 -drive ifpflash,formatraw,fileOVMF.fd -nographic -net none出现shell界面 02 在UEFI shell中创建APP 创建SimplestApp文件夹以及SimplestApp.c、SimplestApp.inf cd edk2 mkdir SimplestAppuuidgen // generate …

论文不会写快来看!分享4款ai改写论文软件

在当今学术研究和写作领域&#xff0c;AI论文改写工具已经成为不可或缺的助手。这些工具不仅能够帮助研究人员提高写作效率&#xff0c;还能确保论文的质量和原创性。以下是四款值得推荐的AI改写论文软件&#xff0c;其中特别推荐千笔-AIPassPaper。 千笔-AIPassPaper 传送门&…

【在.net6和WPF框架下进行海康SDK开发】(一)如何引用Dll

最近有个上位机项目&#xff0c;需要将海康VisionMaster的部分功能嵌入到统一的界面。项目使用WPFdotNet6开发&#xff0c;UI库使用HandyControl。 先说下需求&#xff0c;在某个TabItem内嵌入一个UserControl&#xff0c;UserContr内嵌入VisionMaster运行界面。 本以为按照海康…

开关频率与谐振频率对应的模态图

当fsfr时 当fr2<fs<fr1时 当fs>fr1时 开关频率对应输入电压的频率 谐振频率对应的是谐振电流的频率

人工智能之计算机视觉的发展历程与相关技术内容,相应的模型介绍

大家好&#xff0c;我是微学AI&#xff0c;今天给大家介绍一下人工智能之计算机视觉的发展历程与相关技术内容&#xff0c;相应的模型介绍。本文围绕计算机视觉这一领域&#xff0c;以问答的形式呈现了关键问题及详细解答。内容涵盖计算机视觉的基本概念、技术原理、应用场景等…

Java刷题知识总结(一)

1.局部变量参与运算前是必须要初始化的&#xff0c;比如下面的代码就会编译出错&#xff0c;提示y必须要初始化。 public static void main(String[] args) {int x 1;int y;int z x y; } 2.ArrayList和Vector主要区别是什么&#xff1f; A Vector与ArrayList一样&#xf…

浮动静态路由

浮动静态路由 首先我们知道静态路由的默认优先级是60&#xff0c;然后手动添加一条静态路由优先级为80的路由作为备份路由。当主路由失效的备份路由就会启动。 一、拓扑图 二、基本配置 1.R1: <Huawei>system-view [Huawei]sysname R1 [R1]interface GigabitEthernet…

嵌入式linux方向细分工作岗位分析

大家好,今天主要给大家分享一下,linux方向细分的工作岗位有哪些?,为即将进入linux领域的开发者指明方向。 第一:总结分布 第二:Linux BSP工程师岗位 工作内容: 1、开发和维护Linux系统的板级支持包(BSP),包括启动加载程序、设备驱动、文件系统等。 2、负责解决硬件和软…