1、kafka消息发送的流程?
在消息发送时涉及到了两个线程,main 线程 和 sender 线程 ,在main线程里面创建了一个双端队列(RecordAccumulator) ,当双端队列里面的信息满足 一定的条件后, sender线程会拉取双端队列里面的信息,sender线程会不断的拉取信息发送给Kafka集群。
在sender线程里面有 batch.size 文件 和 linger.ms文件,
batch.size文件:当双端队列里面的消息达到16K时,会进行拉取
linger.ms文件:当双端队列里面的消息停留一定的时间后进行拉取。默认是 0ms。
一般来说,linger.ms 的时间设置为 5ms~100ms之间的效率最高
在sender线程拉取的消息默认缓冲是5个,可以通过 buffer.memory 文件进行修改,当leader 应答之后,会把缓冲的消息给删除掉
2、Kafka 的设计架构你知道吗?
kafka里面一个topic 可以分为多个分区(partition),并为了提高可用性,给每一个partition 添加了若干个副本,其中 主副本为 leader ,副副本为 follower ,当leader不可用的时候,follower 会成为新的 leader,然后为了配合分区的设计,有了消费者组,由多个消费者组成。
3、Kafka 分区的目的?
1、为了合理利用存储资源。每个 partition 在一个 Broker 上面存储,可以把海量的数据按照分区切割出来存储在多台Broker 上。可以实现 负载均衡 的效果
2、提高并行度,生产者可以根据分区的个数发送数据;消费者也可以以分区为单位进行消费数据
4、你知道 Kafka 是如何做到消息的有序性?
单分区可以保证数据的有序性,多分区无法保证数据的有序性,除非是把数据拉取到消费者端进行排序,但那样效率太低了,还不如单分区。
单分区如何保证数据的有序性
1)开启幂等性
sender 里面的消息缓冲(max.in.flight.requests.per.connection)可以设置为 <= 5
2)为开启幂等性
sender 里面的 max.in.flight.requests.per.connection 设置为 1
5、ISR、OSR、AR 是什么?
AR = ISR + OSR
AR:AR里面包括所有的副本,不管是存活的还是死掉的都
ISR:里面只包括与leader 保存同步的存活的副本
OSR:里面只含有挂掉的 副本
6、Kafka 在什么情况下会出现消息丢失
当 acks 设置为 0 时:
当生产者发送过来数据之后,不需要等数据落盘应答,那么当发送了两条数据之后,leader直接挂掉了,但是数据还没有来得及持久化,那么数据就丢失了
当 acks 设置为 1 时:
当生产者发送过来数据,leader 收到数据 并且应答完毕了,但是还没有来得及向 follow 同步数据,这时生产者就以为数据以及保存好了,但是这时候 leader 突然挂掉 , follow 成为了新的 leader ,但是 follower 里面并没有之前的数据,那么就数据丢失了
当 acks 设置为 -1 时:
1)ISR 同步副本集合收缩:
当 一个分区里面只剩下一个 leader 之后(其他副本因为各种原因挂掉了),那么在刚应答完毕之后,实际上的数据都会只保存在这一个 leader 里面,生产者就会以为保存好了,但是 leader突然挂掉了,新的 leader里面 就没有数据刚保存的这个数据了
2)数据未完全同步到所有副本时的故障:
在 ack = -1 时,生产者会等待所有 ISR 成员的应答。但在应答成功之前,如果刚刚收到数据的副本发生崩溃,且数据尚未持久化到磁盘,数据也会丢失。
7、怎么尽可能保证 Kafka 的可靠性
数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
8、Kafka中如何做到数据唯一,即数据去重?
kafka数据为什么会出现数据重复:
生产者发出一条消息,Broker 落盘以后因为网络等种种原因,发送端得到一个发送失败的响应或者网络中断,然后生产者收到一个可恢复的 Exception 重试消息导致消息重复。
如何做到数据去重:
开启幂等性和事务
9、生产者如何提高吞吐量?
batch.size : 可以修改批次大小,默认是 16K
linger.ms : 等待时间可以修改为 5~100 ms
compression.type : 压缩 snappy
RecordAccumulator : 缓冲区大小修改为 64M
10、zk在kafka集群中有何作用
1、管理集群
Kafka通过zk 来进行分布式协调和管理。Kafka需要zk集群实现元数据的管理和控制
2、元数据管理
集群中所有的 broker 信息、topic 和 partition 的状态信息都会存储在 zk 节点上。
3、复制进行leader 选举
broker 在启动时会去 zk 创建 controller ,第一个创建成功的就会被指定为控制器
11、简述kafka集群中的Leader选举机制
当一个 topic 启动后,会在 broker 里面创建多个分区,创建的分区会在 zk 里面进行注册,每个分区里面有一个 controller,每个分区的 controller 会去抢先在 zk 里面注册,哪个分区先注册完,这个分区就是 leader
12、kafka是如何处理数据乱序问题的。
(2)开启幂等性
max.in.flight.requests.per.connection需要设置小于等于5。
(1)未开启幂等性
max.in.flight.requests.per.connection需要设置为1。
13、kafka中节点如何服役和退役
服役节点:
先打开Kafka集群,再单独打开第四台虚拟机的 Kafka,
首先要创建一个要均衡的主题,
vi topics-to-move.json
生成一个负载均衡的计划,
bin/kafka-reassign-partitions.sh --bootstrap-server bigdata01:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate
然后把未来计划拷贝到集群,
vi increase-replication-factor.json
最后执行副本计划
bin/kafka-reassign-partitions.sh --bootstrap-server bigdata01:9092 --reassignment-json-file increase-replication-factor.json --execute
退役节点:
(1)创建一个要均衡的主题
vim topics-to-move.json
(2)创建执行计划
bin/kafka-reassign-partitions.sh --bootstrap-server bigdata01:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate
(3)创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中)
vi increase-replication-factor.json
(4)执行副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop11:9092 --reassignment-json-file increase-replication-factor.json --execute
(5)验证副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop11:9092 --reassignment-json-file increase-replication-factor.json --verify
14、Kafka中Leader挂了,Follower挂了,然后再启动,数据如何同步?
重新启动的副本会首先与新的leader对比数据日志,找出自己缺少的部分。只同步缺少的数据部分,这样可以减少数据传输量,加快同步速度。补齐之后会重新写入 ISR 里面
15、kafka中初始化的时候Leader选举有一定的规律,如何打破这个规律呢?
(1)创建一个新的 topic,名称为 three。
bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 4 --replication-factor 2 --topic three
(2)查看分区副本存储情况。
bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --describe --topic three
(3)创建副本存储计划(所有副本都指定存储在 broker0、broker1 中)。
vi increase-replication-factor.json