Kafka--关于broker的夺命连环问

目录

1、zk在kafka集群中有何作用 

2、简述kafka集群中的Leader选举机制

3、kafka是如何处理数据乱序问题的。

 4、kafka中节点如何服役和退役

4.1 服役新节点

1)新节点准备

2)执行负载均衡操作

4.2 退役旧节点

5、Kafka中Leader挂了,Follower挂了,然后再启动,数据如何同步?  

6、kafka中初始化的时候Leader选举有一定的规律,如何打破这个规律呢? 

        7、kafka是如何做到高效读写

        8、Kafka集群中数据的存储是按照什么方式存储的?

        9、简述kafka中的数据清理策略。

        10、kafka中是如何快速定位到一个offset的。


1、zk在kafka集群中有何作用 


在zookeeper中存储着kafka集群中的相关信息:

(1)kafka/brokers/ids:{0,1,2} 记录集群中有哪些服务器

(2)kafka/brokers/topics/first/partition/0/state:{Leader:0,isr:{0,1,2}} 记录集群中不同分区谁是Leader以及可用的服务器

(3)kafka/controller {"borkerid:0"}辅助Leader的选举

图解: 

2、简述kafka集群中的Leader选举机制


总结: 

        集群启动后,集群中的broker会在zk中进行注册,谁先进行注册,那么先注册的broker中的Controller就会被选举为Controller Leader,负责监听broker节点的变化以及Leader的选举,随后controller将节点信息上传到zk中,其他的controller从zk中同步相关信息。当broker中的Leader挂掉时,controller会监听到节点的变化,从而获取isr,从isr存活的副本中,按照AR中的排序优先选择靠前的副本成为新的Leader,并及时更新ISR。

图解: 

3、kafka是如何处理数据乱序问题的。

 开启幂等性

 4、kafka中节点如何服役和退役

服役:执行负载均衡操作(1.创建均衡主题 2.生成副本均衡计划(设置--broker-list "0,1,2,3") 3.创建副本存储计划(将未来分区策略放在一个json文件中) 4.执行副本存储计划(运行json文件))

退役:同上 在第二步2.生成副本均衡计划中设置--broker-list "0,1,2"

4.1 服役新节点

1)新节点准备

(1)关闭 bigdata03,进行一个快照,并右键执行克隆操作。

(2)开启 bigdata04,并修改 IP 地址。

vi /etc/sysconfig/network-scripts/ifcfg-ens33修改完记得重启网卡:
systemctl restart network

(3)在 bigdata04 上,修改主机名称为 bigdata04。

hostname bigdata04    # 临时修改

[root@bigdata04 ~]# vim /etc/hostname

bigdata04

还要记得修改 /etc/hosts文件,并进行同步

修改bigdata01的hosts 文件,修改完之后,记得同步一下192.168.52.11 bigdata01
192.168.52.12 bigdata03
192.168.52.13 bigdata02
192.168.52.14 bigdata04xsync.sh /etc/hosts
scp -r /etc/hosts root@bigdata04:/etc/

(4)重新启动 bigdata03、bigdata04。

(5)修改 bigdata04 中 kafka 的 broker.id 为 3。

进入bigdata04的kafka中,修改里面的配置文件   config/server.properties

(6)删除 bigdata04 中 kafka 下的 datas 和 logs。

rm -rf datas/* logs/*

(7)启动 bigdata01、bigdata02、bigdata03 上的 kafka 集群。

先启动zk集群

xcall.sh zkServer.sh stop
xcall.sh zkServer.sh start

启动kafka集群(只能启动三台)

kf.sh start 

(8)单独启动 bigdata04 中的 kafka。

bin/kafka-server-start.sh -daemon ./config/server.properties

查看kafka集群first主题的详情:

bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --topic first --describe

发现副本数并没有增加。

由于我之前创建first这个主题的时候只有一个副本,不是三个副本,所以呢,演示效果不佳。

kafka-topics.sh --bootstrap-server bigdata01:9092 --topic third --create --partitions 3 --replication-factor 3

2)执行负载均衡操作

(1)创建一个要均衡的主题

创建一个文件:vi topics-to-move.json
写上如下代码,如果多个topic 可以使用,分隔
{"topics": [{"topic": "third"}],"version": 1
}

(2)生成一个负载均衡的计划

在创建的时候,记得启动bigdata04节点,否则计划中还是没有bigdata04

bin/kafka-reassign-partitions.sh --bootstrap-server bigdata01:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate

未来的分区策略拷贝一份:

{"version":1,"partitions":[{"topic":"abc","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"abc","partition":1,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"abc","partition":2,"replicas":[0,2,3],"log_dirs":["any","any","any"]}]}

(3)创建副本存储计划(所有副本存储在 broker0、broker1、broker2、broker3 中)。

vi increase-replication-factor.json
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[3,2,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[0,3,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]}]}
以上这个内容来自于第二步的执行计划。

(4)执行副本存储计划。

bin/kafka-reassign-partitions.sh --bootstrap-server bigdata01:9092 --reassignment-json-file increase-replication-factor.json --execute

(5)验证副本存储计划。

bin/kafka-reassign-partitions.sh --bootstrap-server bigdata01:9092 --reassignment-json-file increase-replication-factor.json --verify

如果不相信添加成功,可以查看first节点的详情:

4.2 退役旧节点

1)执行负载均衡操作

先按照退役一台节点,生成执行计划,然后按照服役时操作流程执行负载均衡。

(1)创建一个要均衡的主题

kafka下添加文件:vim topics-to-move.json
添加如下内容:
{"topics": [{"topic": "abc"}],"version": 1
}

(2)创建执行计划。

bin/kafka-reassign-partitions.sh --bootstrap-server bigdata01:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate

(3)创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中)。

添加文件: vi increase-replication-factor.json 
添加如下代码:
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[2,1,0],"log_dirs":["any","any","any"]}]}

(4)执行副本存储计划

bin/kafka-reassign-partitions.sh --bootstrap-server bigdata01:9092 --reassignment-json-file increase-replication-factor.json --execute

(5)验证副本存储计划。

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop11:9092 --reassignment-json-file increase-replication-factor.json --verify

2)执行停止命令

在 bigdata04上执行停止命令即可。

bin/kafka-server-stop.sh

5、Kafka中Leader挂了,Follower挂了,然后再启动,数据如何同步?  

总结: 

follower故障:

        当follower发生故障时,会被临时踢出ISR队列,此时的Leader和其他的follower会继续接收信息。当follower恢复后,会读取磁盘中记录的上次的HW,并将logs文件中高于HW的部分全部截掉,从HW开始从Leader中同步信息。当该follower中的LEO大于等于partition中的HW时,又重新加入ISR队列。

Leader故障:

        Leader故障后,会从ISR中重新选举出新的Leader。为了保证数据的一致性,follower会从logs文件中把高于Leader的部分全部截掉,然后重新从Leader中同步信息。

详解: 

LEO演示-- 每一个副本最后的偏移量offset + 1

HW(高水位线 High Water) 演示:所有副本中,最小的LEO

        由于数据同步的时候先进入Leader,随后同步给Follower,假如Follower挂掉了,Leader和其他的Follower 继续往前存储数据,挂掉的节点从ISR集合中剔除,此时挂掉的Follower又重启了,它会先从上一次挂掉的节点的HW开始同步数据,直到追上最后一个Follower为止,此时会重新回归ISR。

6、kafka中初始化的时候Leader选举有一定的规律,如何打破这个规律呢? 

  简述:可以通过创建副本计划(编写json文件),指定副本在broker中的存储位置,然后执行副本计划(运行json文件),打破Leader的选举规律。 

手动调整分区副本存储的步骤如下:

(1)创建一个新的 topic,名称为 three。

bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 4 --replication-factor 2 --topic three

(2)查看分区副本存储情况。

bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --describe --topic three

(3)创建副本存储计划(所有副本都指定存储在 broker0、broker1 中)。

vi increase-replication-factor.json

输入如下内容:

{ "version":1, "partitions":[{"topic":"three","partition":0,"replicas":[0,1]}, {"topic":"three","partition":1,"replicas":[0,1]}, {"topic":"three","partition":2,"replicas":[1,0]}, {"topic":"three","partition":3,"replicas":[1,0]}] 
}

(4)执行副本存储计划。

屁股决定脑袋

bin/kafka-reassign-partitions.sh --bootstrap-server bigdata01:9092 --reassignment-json-file increase-replication-factor.json --execute

(5)验证副本存储计划。

bin/kafka-reassign-partitions.sh -- bootstrap-server bigdata01:9092 --reassignment-json-file increase-replication-factor.json --verify

(6)查看分区副本存储情况

bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --describe --topic three

7、kafka是如何做到高效读写

1)Kafka 本身是分布式集群,可以采用分区技术,并行度高

2)读数据采用稀疏索引,可以快速定位要消费的数据。(mysql中索引多了之后,写入速度就慢了)

3)顺序写磁盘

Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端, 为顺序写。官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

4)页缓存 + 零拷贝技术

零拷贝:Kafka的数据加工处理操作交由Kafka生产者和Kafka消费者处理。Kafka Broker应用层不关心存储的数据,所以就不用 走应用层,传输效率高

PageCache页缓存:Kafka重度依赖底层操作系统提供的PageCache功 能。当上层有写操作时,操作系统只是将数据写入 PageCache。当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用

8、Kafka集群中数据的存储是按照什么方式存储的?

总结: 

        Topic是逻辑概念,partition是物理概念,每一partition对应一个log(日志文件),该log文件中存储的就是producer生产的数据。Prodecer生产的数据会被不断地追加写入到log文件的末尾,为了防止log文件过大而导致数据定位效率低下,kafka采取了分片和索引的机制,将每个partition分为多个segment。每个segment包括:".index"文件、".log"文件、".timeindex"文件,这些文件位于一个文件夹下面,该文件夹的明明规则为:topic名称+分区序号。

 图解:

9、简述kafka中的数据清理策略。

1)delete 日志删除:将过期数据删除

log.cleanup.policy = delete 所有数据启用删除策略

(1)基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。

(2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。

log.retention.bytes,默认等于-1,表示无穷大。

思考:如果一个 segment 中有一部分数据过期,一部分没有过期,怎么处理?

2)compact 日志压缩(合并的意思,不是真的压缩)

compact日志压缩:对于相同key的不同value值,只保留最后一个版本。

log.cleanup.policy = compact 所有数据启用压缩策略

压缩后的offset可能是不连续的,比如上图中没有6,当从这些offset消费消息时,将会拿到比这个offset大的offset对应的消息,实际上会拿到offset为7的消息,并从这个位置开始消费。

这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。

比如:张三 去年18岁,今年19岁,这种场景下可以进行压缩。

10、kafka中是如何快速定位到一个offset的。

总结: 

        首先根据目标offset定位segment文件,找到小于等于目标offset的最大offset对应的索引,定位到log文件,向下遍历找到目标record。

图解: 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/13367.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

Web项目版本更新及时通知

背景 单页应用,项目更新时,部分用户会出更新不及时,导致异常的问题。 技术方案 给出版本号,项目每次更新时通知用户,版本已经更新需要刷新页面。 版本号更新方案版本号变更后通知用户哪些用户需要通知?…

Android音视频直播低延迟探究之:WLAN低延迟模式

Android WLAN低延迟模式 Android WLAN低延迟模式是 Android 10 引入的一种功能,允许对延迟敏感的应用将 Wi-Fi 配置为低延迟模式,以减少网络延迟,启动条件如下: Wi-Fi 已启用且设备可以访问互联网。应用已创建并获得 Wi-Fi 锁&a…

Appium配置2024.11.12

百度得知:谷歌从安卓9之后不再提供真机layout inspector查看,仅用于支持ide编写的app调试用 所以最新版android studio的android sdk目录下已经没有了布局查看工具... windows x64操作系统 小米k30 pro手机 安卓手机 Android 12 第一步&#xff1a…

前端使用Canvas实现网页电子签名(兼容移动端和PC端)

实现效果: 要使用Canvas实现移动端网页电子签名,可以按照以下步骤: 在HTML文件中创建一个Canvas元素,并设置其宽度和高度,以适配移动设备的屏幕大小。 // 创建一个canvas元素 let canvas document.createElement(&q…

使用 Python 实现高效网页爬虫——从获取链接到数据保存

前言 在这个时代,网络爬虫已成为数据分析与信息收集不可或缺的技术之一。本文将通过一个具体的Python项目来介绍如何构建一个简单的网络爬虫,它能够自动抓取指定网站的文章链接、标题、正文内容以及图片链接,并将这些信息保存为CSV文件。 目标网站 一、准备工作 在开始编…

跟着尚硅谷学vue2—进阶版4.0—Vuex1.0

5. Vuex 1. 理解 Vuex 1. 多组件共享数据-全局事件总线实现 红线是读&#xff0c;绿线是写 2. 多组件共享数据-vuex实现 vuex 不属于任何组件 3. 求和案例-纯vue版 核心代码 1.Count.vue <template><div><h1>当前求和为&#xff1a;{{ sum }}</h1&…

HTML之列表

练习题&#xff1a; 图所示为一个问卷调查网页&#xff0c;请制作出来。要求&#xff1a;大标题用h1标签&#xff1b;小题目用h3标签&#xff1b;前两个问题使用有序列表&#xff1b;最后一个问题使用无序列表。 代码&#xff1a; <!DOCTYPE html> <html> <he…

如何编写jenkins的流水线

如何编写jenkins的流水线 我们为什么需要编写流水线&#xff1f;新建一个jenkins pipeline的item初识pipeline界面pipeline代码关于取值声明和定义工具使用数据结构 我们为什么需要编写流水线&#xff1f; 这里假如你已经安装了好了jenkins&#xff0c;并且能够正常启动它。 通…

项目管理人员的自我评估与职业目标设定

在当今快速发展的商业环境中&#xff0c;项目管理人员的职业规划至关重要。它不仅涉及到个人职业发展的方向、目标和路径选择&#xff0c;还包括如何提升自身的专业技能、管理能力和行业知识。项目管理人员需要明确自己的职业目标、制定合理的职业发展计划、不断学习新知识和技…

状态空间方程离散化(Matlab符号函数)卡尔曼

// 卡尔曼滤波(4)&#xff1a;扩展卡尔曼滤波 - 知乎 // // matlab 连续系统状态空间表达式的离散化&状态转移矩阵求解_matlab状态方程离散化-CSDN博客 // // // %https://blog.csdn.net/weixin_44051006/article/details/107007916 clear all; clc; syms R1 R2 C1 C…

ubuntu24.04播放语音视频

直接打开ubuntu自带的video播放.mp4文件&#xff0c;弹窗报错如下&#xff1a; 播放此影片需要插件 MPEG-4 AAC 编码器安装方式&#xff1a; sudo apt install gstreamer1.0-plugins-good gstreamer1.0-plugins-bad gstreamer1.0-plugins-ugly sudo apt install ffmpeg验证AA…

音视频入门基础:MPEG2-TS专题(4)——使用工具分析MPEG2-TS传输流

一、引言 有很多工具可以分析MPEG2-TS文件/流&#xff0c;比如Elecard Stream Analyzer、PROMAX TS Analyser、easyice等。下面一一对它们进行简介&#xff08;个人感觉easyice功能更强大一点&#xff09;。 二、Elecard Stream Analyzer 使用Elecard Stream Analyzer工具可以…

C++基础 抽象类 类模板 STL库 QT环境

一、抽象类 1、纯虚函数 在多态中&#xff0c;通常父类中虚函数的实现是毫无意义的&#xff0c;主要都是调用子类重写的内容&#xff0c;因此可以将虚函数改为纯虚函数。 语法&#xff1a; virtual 返回值类型 函数名 (参数列表) 0; 2. 抽象类 1) 概念 有纯虚函数所在的类…

c语言选择排序

选择排序思想&#xff1a; 反复地从未排序部分选择最小&#xff08;或最大&#xff09;的元素&#xff0c;将其放到已排序部分的末尾&#xff1b; 首先用一个变量min来保存数组第一个元素的下标&#xff0c;然后用这个下标访问这个元素&#xff0c;将这个元素与它后面的元素相…

数字后端教程之Innovus report_property和get_property使用方法及应用案例

数字IC后端实现Innovus中使用report_property可以报告出各种各样object的属性&#xff0c;主要有cell&#xff0c;net&#xff0c;PG Net&#xff0c;Pin&#xff0c;时钟clock&#xff0c;时序库lib属性&#xff0c;Design属性&#xff0c;timing path&#xff0c;timin arc等…

Golang | Leetcode Golang题解之第560题和为K的子数组

题目&#xff1a; 题解&#xff1a; func subarraySum(nums []int, k int) int {count, pre : 0, 0m : map[int]int{}m[0] 1for i : 0; i < len(nums); i {pre nums[i]if _, ok : m[pre - k]; ok {count m[pre - k]}m[pre] 1}return count }

OpenCompass 评测 InternLM-1.8B 实践

1. API评测 首先我们创建模型的配置文件&#xff0c;我们需要定义评测模型的类型&#xff0c;是OpenAISDK类型&#xff0c;然后是模型名称&#xff0c;请求地址和API等等内容。然后我们通过命令行python run.py --models puyu_api.py --datasets demo_cmmlu_chat_gen.py --deb…

【算法】——二分查找合集

阿华代码&#xff0c;不是逆风&#xff0c;就是我疯 你们的点赞收藏是我前进最大的动力&#xff01;&#xff01; 希望本文内容能够帮助到你&#xff01;&#xff01; 目录 零&#xff1a;二分查找工具 1&#xff1a;最基础模版 2&#xff1a;mid落点问题 一&#xff1a;最…

Python 的 Pygame 库,编写简单的 Flappy Bird 游戏

Pygame 是一个用 Python 编写的开源游戏开发框架&#xff0c;专门用于编写 2D 游戏。它提供了丰富的工具和功能&#xff0c;使得开发者能够快速实现游戏中的图形渲染、声音播放、输入处理和动画效果等功能。Pygame 非常适合初学者和想要快速创建游戏原型的开发者。 Pygame 的主…

Ubuntu上搭建Flink Standalone集群

Ubuntu上搭建Flink Standalone集群 本文部分内容转自如下链接。 环境说明 ubuntu 22.06 先执行apt-get update更新环境 第1步 安装JDK 通过apt自动拉取 openjdk8 apt-get install openjdk-8-jdk执行java -version&#xff0c;如果能显示Java版本号&#xff0c;表示安装并…