【大数据学习 | kafka】消费者的分区分配规则

1. 概述

上面我们提到过,消费者有的时候会少于或者多于分区的个数,那么如果消费者少了有的消费者要消费多个分区的数据,如果消费者多了,有的消费者就可能没有分区的数据消费。

那么这个关系是如何分配的呢?

现在我们知道kafka中存在一个coordinator可以管理这么一堆消费者,它可以帮助一个组内的所有消费者进行分区的分配和对应。

通过coordinator进行协调

这个分配规则分为以下几种。

2. range分配器

按照范围形式进行分配分区数量

# 为了演示分区的分配效果我们创建一个topic_d,设定为7个分区
[hexuan@hadoop106 bin]$ kafka-topics.sh --bootstrap-server hadoop106:9092 --create --topic topic_f --partitions 7 --replication-factor 2
consumer.subscribe(topics, new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {}});

然后改版订阅代码,subscribe订阅信息的时候展示出来分区的对应映射关系,这个只是一个监控的作用没有其他的代码影响ConsumerRebalanceListener增加监视。

其中存在两个比较直观的方法

onPartitionsRevoked回收的分区。

onPartitionsAssigned分配的分区。

能够直观展示在分区分配的对应关系

其中我们需要知道两个比较重要的参数。

参数解释
offsets.topic.num.partitions__consumer_offset这个topic的分区数量默认50个
heartbeat.interval.ms消费者和协调器的心跳时间 默认3s
session.timeout.ms消费者断开的超时时间 默认45s,最小不能小于6000
partition.assignment.strategy设定分区分配策略

也就是说我们想要直观查看消费者变化后的映射对应关系需要停止消费者以后45s才可以,这个在代码中我们需要人为设定小点,更加快速查看变化

代码测试原理

首先设定topic_d的分区为7个,然后启动一个组内的两个消费者,可以看到他们的分配关系在onPartitionsAssigned这个方法中打印出来,同时我们关闭一个消费者,可以看到onPartitionsRevoked可以展示回收的分区,onPartitionsAssigned以及这个方法中分配的分区

整体代码如下:

package com.hainiu.kafka.consumer;/*** ClassName : rangeAssigned* Package : com.hainiu.kafka.consumer* Description** @Author HeXua* @Create 2024/11/4 22:04* Version 1.0*/
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.*;public class rangeAssigned {public static void main(String[] args) {Properties pro = new Properties();pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group2");pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());pro.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,RangeAssignor.class.getName());//设定分区分配策略为rangepro.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,6000);//设定consumer断开超时时间最小不能小于6sKafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);List<String> topics = Arrays.asList("topic_f");consumer.subscribe(topics, new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {for (TopicPartition partition : partitions) {System.out.println("revoke-->"+partition.topic()+"-->"+partition.partition());}}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {for (TopicPartition partition : partitions) {System.out.println("assign-->"+partition.topic()+"-->"+partition.partition());}}});while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> it = records.iterator();while(it.hasNext()){ConsumerRecord<String, String> record = it.next();System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());}}}
}

我们执行两个实例,两个实例代表两个消费者位于同一个组中,那么两个消费者的分配关系按照,范围进行分割

consumer0[0,1,2,3] consumer1[4,5,6]

执行第一个实例的时候,无需回收,并且七个分区都分配给第一个消费者实例。

执行第二个消费者的时候,需要对第一个消费者实例进行回收分区:

revoke-->topic_f-->0
revoke-->topic_f-->1
revoke-->topic_f-->2
revoke-->topic_f-->3
revoke-->topic_f-->4
revoke-->topic_f-->5
revoke-->topic_f-->6

然后由于一个消费者组中有两个消费者实例,则将分区重新分配个两个消费者实例。

因为coordinator的分配规则基于eager协议,这个协议的规则就是当分配关系发生变化的时候要全部回收然后打乱重分。

consumer1分配分区情况:

assign-->topic_f-->0
assign-->topic_f-->1
assign-->topic_f-->2
assign-->topic_f-->3

consumer2分配分区情况:

assign-->topic_f-->4
assign-->topic_f-->5
assign-->topic_f-->6

缺点:

这个协议只是按照范围形式进行重新分配分区,会造成单个消费者的压力过大问题,多个topic就会不均匀。

一个消费者组消费多个topic时可能会造成数据倾斜。

比如该消费者组有两个消费者:consumer1和consumer2。该消费者组消费两个topic分区:topic_1, topic_2,且假设两个topic都有7个分区,那么range分配规则可能会这么干:

consumer1分配topic_1-0,topic_1-1,topic_1-2, topic_1-3,topic_2-0,topic_2-1,topic_2-2, topic_2-3。

consumer3分配topic_1-4,topic_1-5,topic_1-6,topic_2-4,topic_2-5,topic_2-6。

consumer1要消费8个分区的数据,而consumer2要消费6个分区的数据,

当一个消费者出现消费多个topic主题的时候就可能出现这种数据倾斜的情况。

3. roundRobin轮训分配策略

轮训形式分配分区,一个消费者一个分区

整体代码如下:

pro.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,RoundRobinAssignor.class.getName());

设定分配规则为roundRobin的

启动一个的效果:

assign-->topic_f-->0
assign-->topic_f-->1
assign-->topic_f-->2
assign-->topic_f-->3
assign-->topic_f-->4
assign-->topic_f-->5
assign-->topic_f-->6

启动两个应用

第一个消费者consumer实例:

回收所有的七个分区:

revoke-->topic_f-->0
revoke-->topic_f-->1
revoke-->topic_f-->2
revoke-->topic_f-->3
revoke-->topic_f-->4
revoke-->topic_f-->5
revoke-->topic_f-->6

再被分配到3个分区:

assign-->topic_f-->1
assign-->topic_f-->3
assign-->topic_f-->5

第二个消费者consumer2实例:

assign-->topic_f-->0
assign-->topic_f-->2
assign-->topic_f-->4
assign-->topic_f-->6

优点:

同range方式相比,在多个topic的情况下,可以保证多个consumer负载均衡

分配规则如上图,一人一个轮训形式

consumer0 [0 2 4 6 1 3 5]

consumer1 [1 3 5 0 2 4 6]

缺点

不管是range的还是roundRobin的分配方式都是全量收回打乱重新分配,这样的效率太低,所以我们使用下面的粘性分区策略。

4. sticky粘性分区

粘性分区它的重新分区原理和原来的roundRobin的分区方式差不多,但是又不相同,主要是分区逻辑一样,但是重新分配分区的时候优先保留原分区,然后重新分配其他分区,从而不需要全部打乱重分,减少重新分配分区消耗

第二次启动

第三次

分区分配方式一样,但是如果重新分配的话会有很多原来分区的预留,重新分配新的分区

# 为了演示效果再次创建新的topic topic_g 七个分区
kafka-topics.sh --bootstrap-server hadoop106:9092 --topic topic_g --create --partitions 7 --replication-factor 2

然后让复制代码,修改订阅两个topic

  pro.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,StickyAssignor.class.getName());
//修改为粘性分区List<String> topics = Arrays.asList("topic_f","topic_g");
//订阅两个topic

并且运行应用实例分别运行1 ,2 ,3 多种个数的实例

执行第一个消费者实例consumer1,无需回收,分配14个分区(topic_f和topic_g都是七个分区)

assign-->topic_f-->0
assign-->topic_f-->1
assign-->topic_f-->2
assign-->topic_f-->3
assign-->topic_f-->4
assign-->topic_f-->5
assign-->topic_f-->6
assign-->topic_g-->0
assign-->topic_g-->1
assign-->topic_g-->2
assign-->topic_g-->3
assign-->topic_g-->4
assign-->topic_g-->5
assign-->topic_g-->6

执行第二个消费者实例consumer2时:

consumer1:

回收了14个分区:

revoke-->topic_f-->0
revoke-->topic_f-->1
revoke-->topic_f-->2
revoke-->topic_f-->3
revoke-->topic_f-->4
revoke-->topic_f-->5
revoke-->topic_f-->6
revoke-->topic_g-->0
revoke-->topic_g-->1
revoke-->topic_g-->2
revoke-->topic_g-->3
revoke-->topic_g-->4
revoke-->topic_g-->5
revoke-->topic_g-->6

优先保留原来分区,所以分配七个分区:

assign-->topic_f-->0
assign-->topic_f-->1
assign-->topic_f-->2
assign-->topic_f-->3
assign-->topic_f-->4
assign-->topic_f-->5
assign-->topic_f-->6

consumer2:

分配了七个分区;

assign-->topic_g-->0
assign-->topic_g-->1
assign-->topic_g-->2
assign-->topic_g-->3
assign-->topic_g-->4
assign-->topic_g-->5
assign-->topic_g-->6

执行第三个实例consumer3:

consumer1将会回收七个分区,consumer2将会回收七个分区。

14 / 3 = 4 ---->  4 + 1       4  +  1       4

comsumer1将会被分配:[0, 1, 2, 3, 4]

consumer2将会被分配 : [0, 1, 2, 3, 4]

consumer3将会被分配:[5, 6, 5, 6]

尽量不改变原分区的规则的前提下进行分区分配。

以上三种都基于eager协议,也就是想要重新分配分区一定要将原来的所有分区回收,全部打乱重新,即使保留原来的分区规则,也需要全部都回收分区,这样效率非常低下,最后一种CooperativeSticky分区策略完全打破以上三种的分区关系。

5. CooperativeSticky分区

以粘性为主,但是不全部收回分区,只是将部分需要重新分配的分区重新调配,效率高于以上三种分区策略。

 pro.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,CooperativeStickyAssignor.class.getName());
//设定分区策略

运行两个实例,查看控制台信息发现:

运行第一个消费者实例:

consumer1:

分配了14个分区:

assign-->topic_f-->0
assign-->topic_f-->1
assign-->topic_f-->2
assign-->topic_f-->3
assign-->topic_f-->4
assign-->topic_f-->5
assign-->topic_f-->6
assign-->topic_g-->0
assign-->topic_g-->1
assign-->topic_g-->2
assign-->topic_g-->3
assign-->topic_g-->4
assign-->topic_g-->5
assign-->topic_g-->6

运行第二个消费者实例:

consumer1:

回收7个分区:与前三种分区规则不同,前三种是分配分区的时候将所有分区全部收回

revoke-->topic_g-->0
revoke-->topic_g-->1
revoke-->topic_g-->2
revoke-->topic_g-->3
revoke-->topic_g-->4
revoke-->topic_g-->5
revoke-->topic_g-->6

详细信息:

	Assigned partitions:                       [topic_f-0, topic_f-1, topic_f-2, topic_f-3, topic_f-4, topic_f-5, topic_f-6]Current owned partitions:                  [topic_f-0, topic_f-1, topic_f-2, topic_f-3, topic_f-4, topic_f-5, topic_f-6]Added partitions (assigned - owned):       []Revoked partitions (owned - assigned):     []

consumer2:

分配七个分区:

assign-->topic_g-->0
assign-->topic_g-->1
assign-->topic_g-->2
assign-->topic_g-->3
assign-->topic_g-->4
assign-->topic_g-->5
assign-->topic_g-->6

详细情况:

	Assigned partitions:                       [topic_g-0, topic_g-1, topic_g-2, topic_g-3, topic_g-4, topic_g-5, topic_g-6]Current owned partitions:                  []Added partitions (assigned - owned):       [topic_g-0, topic_g-1, topic_g-2, topic_g-3, topic_g-4, topic_g-5, topic_g-6]Revoked partitions (owned - assigned):     []

整个分区分配规则和粘性分区策略一致,但是并不需要收回全部分区。

系统默认分区分配规则为:。

range+CooperativeSticky。

范围分区为主,优先粘性并且不急于eager协议。

6. 指定分区消费

在计算处理过程中,有时候我们需要指定一个消费者组消费指定的分区,计算其中的数据,这个时候以上的所有分区策略都不符合我们人为的要求,我们需要指定相应的分区进行消费。

consumer.assign();
//用指定的方式定向消费相应的分区数据

整体代码如下:

package com.hainiu.kafka.consumer;/*** ClassName : rangeAssigned* Package : com.hainiu.kafka.consumer* Description** @Author HeXua* @Create 2024/11/4 22:04* Version 1.0*/
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.*;public class Assigned {public static void main(String[] args) {Properties pro = new Properties();pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group2");pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());pro.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());pro.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,6000);KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);List<TopicPartition> list = Arrays.asList(new TopicPartition("topic_f", 0),new TopicPartition("topic_g", 0));consumer.assign(list);while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> it = records.iterator();while(it.hasNext()){ConsumerRecord<String, String> record = it.next();System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());}}}
}

我们只消费topic_e的0号分区和topic_d的0号分区

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/8182.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

【开发】关于Java中String与Integer的小小知识点(使用等号对比引用对象)

一个很简单的小知识点 我们都知道&#xff0c;如果使用对比包装类型或对象&#xff0c;那么比较的都是两者之间的地址&#xff08;指针或句柄&#xff09;&#xff0c;而非对象本身&#xff0c;那么且看下方的代码。 public class A {public static void main(String[] args)…

2025年山东省考报名流程图解

2025年山东公务员考试备考开始 为大家整理了从笔试到录用的全部流程&#xff0c;希望可以帮助到你们&#xff01;参考2024年山东省考公告整理&#xff0c;请以最新公告为准&#xff01; 一、阅读公告和职位表 二、职位查询 三、网上报名 四、确认缴费 五、网上打印准考证 六、参…

网络安全入门篇之详细学习路线

什么是网络安全 网络安全可以基于攻击和防御视角来分类&#xff0c;我们经常听到的 “红队”、“渗透测试” 等就是研究攻击技术&#xff0c;而“蓝队”、“安全运营”、“安全运维”则研究防御技术。 无论网络、Web、移动、桌面、云等哪个领域&#xff0c;都有攻与防两面性&…

什么是大数据治理?在企业数字化转型过程中有什么用?

建设背景 有效的数据治理不仅能够确保数据的安全和质量&#xff0c;还能为企业提供深入的业务洞察&#xff0c;推动决策制定和创新。数据治理是数字化转型的基础&#xff0c;是数据资源成为数据资产的基础&#xff0c;只有经过了数据治理&#xff0c;相应的数据资源才能产生价…

Kalshi PK Polymarket,谁更胜一筹

https://kalshi.com https://polymarket.com/ 在刚过去的2024 美大选中&#xff0c;这两个网站可谓风光无限。这两者究竟有何区别呢&#xff0c;今天咱们一起来扒一扒。 Kalshi与Polymarket主要有以下区别&#xff1a; 监管与合法性方面&#xff1a; Kalshi&#xff1a;经过美…

UI测试还在Selenium,难怪你会被淘汰

一、前言 在UI自动化测试的领域中&#xff0c;Selenium无疑是一颗璀璨的明星&#xff0c;它以其强大的浏览器自动化能力&#xff0c;长期以来一直是众多测试工程师的首选工具。它很经典&#xff0c;地位也毋庸置疑&#xff0c;但也是过去式了&#xff0c;现在我采用的自动化方…

基于ssm的网上药房管理系统的设计与实现(源码+LW+调试)

项目描述 临近学期结束&#xff0c;还是毕业设计&#xff0c;你还在做java程序网络编程&#xff0c;期末作业&#xff0c;老师的作业要求觉得大了吗?不知道毕业设计该怎么办?网页功能的数量是否太多?没有合适的类型或系统?等等。今天给大家介绍一篇基于java的ssm网上药房管…

godot——主题、Theme、StyleBox

我刚开始被这些术语吓到了&#xff0c;一直不敢去接触它们&#xff0c;都用的默认样式。现在好不容易有点思路了&#xff0c;记录下来。 下面看看怎么自定义样式。 1.先新建一个Theme 2.再次点击创建好的Theme 得到 图1 这样一个面板。&#xff08;看不懂没事&#xff0c;继…

如何利用Python API接口实战中高效地获取商品详情信息

在电商数据分析和商品信息集成领域&#xff0c;高效地获取商品详情信息是至关重要的。本文将介绍如何使用Python结合API接口&#xff0c;从淘宝/天猫平台获取商品详情信息&#xff0c;并提供实战代码示例。 一、理解API接口的重要性 API&#xff08;应用程序编程接口&#xff…

【Linux】编辑器vim 与 编译器gcc/g++

目录 一、编辑器vim&#xff1a; 1、对vim初步理解&#xff1a; 2、vim的模式&#xff1a; 3、进入与退出&#xff1a; 4、vim命令模式下的指令集&#xff1a; 移动光标&#xff1a; 删除&#xff1a; cv&#xff1a; 撤销&#xff1a; 其他&#xff1a; 5、vim底行模…

不支持UEFI的老显卡修改vBIOS进行支持

前段时间要在办公室玩恐怖黎明,但是联想自带的GT730实在是有点慢,后来闲鱼收了一张HD7750,虽然也是老掉牙,但是性能也有3成提升,聊胜于无吧.但是存在HD7750不支持UEFI的问题.具体表现为: 系统是win11未进系统时,什么都不显示,不能进BIOS.刚换卡未装驱动的时候,即使已经进入系统…

qt QWheelEvent详解

1、概述 QWheelEvent是Qt框架中用于处理鼠标滚轮事件的一个类。当用户滚动鼠标滚轮时&#xff0c;Qt会生成一个QWheelEvent事件&#xff0c;并将其发送到相应的窗口或控件。开发者可以通过重载窗口或控件的wheelEvent()方法来响应这个事件&#xff0c;并执行相应的操作&#x…

如何训练最懂您企业的AI助理?

随着人工智能技术的蓬勃发展&#xff0c;企业级AI助理已成为提升工作效率、优化客户服务体验的重要利器。这些智慧化的系统&#xff0c;通过模拟人类的认知功能&#xff0c;能够轻松应对复杂的数据分析、自动化繁琐的日常任务&#xff0c;甚至为企业决策提供有力支持。拥有一款…

2024网鼎杯web1+re2 wp

这两道题属于比较简单的&#xff0c;顺道说一下&#xff0c;今年的题有点抽象&#xff0c;web不是misc&#xff0c;re不是web的&#xff0c;也有可能时代在进步&#xff0c;现在要求全栈✌了吧 web1 最开始被强网的小浣熊带偏思路了&#xff0c;进来疯狂找sql注入&#xff0c…

【Android 系统中使用CallStack类来追踪获取和操作调用栈信息】

Android系统CallStack类的使用 定义使用方法使用场景注意事项应用举例 定义 在 Android 系统中&#xff0c;CallStack 类是一个用于获取和操作调用栈信息的工具类。这个类通常用于调试和日志记录&#xff0c;以帮助开发者了解函数调用的顺序和位置。以下是您提供的代码片段的解…

Vue 组件通信-自定义事件(七)

一、组件自定事件概念 自己定义的事件&#xff0c;包含事件名&#xff0c;事件回调等&#xff0c;定义好之后去给组件使用。也是一种组件的通信方式&#xff0c;适用于子组件传递给父组件。 二、 组件自定义事件实现子传父 1、在父组件中给子组件绑定一个自定义事件 在子组件标…

解决Qt Creator调试“warning: GDB: Failed to set controlling terminal“

本文档详细介绍了在Qt环境中遇到GDB提示Failed to set controlling terminal错误时的解决方案&#xff0c;步骤包括勾选Run in Terminal选项。适合开发者在调试过程中遇到此类问题时参考。 &"warning: GDB: Failed to set controlling terminal: \345\257\271\350\25…

nginx 部署2个相同的vue

起因&#xff1a; 最近遇到一个问题&#xff0c;在前端用nginx 部署 vue&#xff0c; 发现如果前端有改动&#xff0c;如果不适用热更新&#xff0c;而是直接复制项目过去&#xff0c;会404 因此想到用nginx 负载两套相同vue项目&#xff0c;然后一个个复制vue项目就可以了。…

城镇保障性住房管理:SpringBoot技术探索

4系统概要设计 4.1概述 本系统采用B/S结构(Browser/Server,浏览器/服务器结构)和基于Web服务两种模式&#xff0c;是一个适用于Internet环境下的模型结构。只要用户能连上Internet,便可以在任何时间、任何地点使用。系统工作原理图如图4-1所示&#xff1a; 图4-1系统工作原理…

ssm075学生信息管理系统+jsp(论文+源码)_kaic

毕 业 设 计&#xff08;论 文&#xff09; 学生信息管理系统设计与实现 摘 要 传统办法管理学生信息首先需要花费的时间比较多&#xff0c;其次数据出错率比较高&#xff0c;而且对错误的数据进行更改也比较困难&#xff0c;最后&#xff0c;检索数据费事费力。因此&#xff…