【ELFK】之消息队列kafka

一、kafka的定义

  • Kafka 是一个分布式的基于发布/订阅模式的消息队列(MQ,Message Queue),主要应用于大数据实时处理领域。
  • Kafka 是最初由 Linkedin 公司开发,是一个分布式、支持分区的(partition)、多副本的(replicar 协调的分布式消息中间件系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景,比如基于 hadoop 的批处理系统、低延迟的实时系统、Spark/Flink 流式处理引擎,nginx 访问日志,消息服务等等,用 scala 语言编写

kafka的特性

  • 高吞吐量、低延迟 : kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒。
  • 可扩展性(分布式): kafka集群支持热扩展
  • 持久性、可靠性: 消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
  • 容错性: 允许集群中节点失败
  • 高并发: 支持数千个客户端同时读写。

1、为什么需要消息队列(MQ)⭐⭐⭐

主要原因是由于在高并发环境下,同步请求来不及处理,请求往往会发生阻塞。比如大量的请求并发访问数据库,导致行锁表锁,最后请求线程会堆积过多,从而触发 too many connection 错误,引发雪崩效应。
我们使用消息队列,通过异步处理请求,从而缓解系统的压力。消息队列常应用于异步处理,流量削峰,应用解耦,消息通讯等场景。

当前比较常见的 MQ 中间件有 ActiveMQ、RabbitMQ、RocketMQ、Kafka 等。

2、使用消息队列的好处⭐⭐⭐

解耦

  • 允许你独立的扩展或修改两边的处理过程,只要确保他们遵守同样的接口约束。

可恢复性

  • 系统的一部分组件失效时,不会影响到整个系统,消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

缓冲

  • 有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。

灵活性和峰值处理能力

  • 访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

异步通信

  • 很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

二、消息队列的模式

1、点对点消息传递模式

(一对一,消费者主动拉取数据,消息收到后消息清除)

  • 消息生产者生产消息发送到消息队列中,然后消息消费者从消息队列中取出并且消费消息。

  • 消息被消费以后,消息队列中不再有存储,所以消息消费者不可能消费到已经被消费的消息。

  • 消息队列支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

  • 该模式即使有多个消费者同时消费数据,也能保证数据处理的顺序。

2、发布订阅消息传递模式⭐⭐⭐

(一对多,又叫观察者模式,消费者消费数据之后不会清除消息)

  • 消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。
  • 发布/订阅模式是定义对象间一种一对多的依赖关系,使得每当一个对象(目对标象)的状态发生改变,则所有依赖于它的对象(观察者对象)都会得到通知并自动更新。

三、kafka系统基础架构

1、Producer(生产者)

  • 消息的生产者,是消息的入口

2、Broker(实例)

  • 一台kafka服务器就是一个broker,一个集群由多个broker组成。一个broker可以容纳多个topic(主题)

3、Topic(主题)

  • 消息的主题,可以理解成消息的分类,kafka获取到的数据就是按照不同的类型存储在不同的topic主题中。
  • topic主题中有很多的分区。

4、Partition(分区)

  • 为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分割为一个或多个 partition,每个 partition 是一个有序的队列。
  • Kafka 只保证 partition 内的记录是有序的,而不保证 topic 中不同 partition 的顺序。

每个 topic 至少有一个 partition,当生产者产生数据的时候,会根据分配策略选择分区,然后将消息追加到指定的分区的队列末尾。

##Partation 数据路由规则:
1.指定了 patition,则直接使用;
2.未指定 patition 但指定 key(相当于消息中某个属性),通过对 key 的 value 进行 hash 取模,选出一个 patition;
3.patition 和 key 都未指定,使用轮询选出一个 patition。

每条消息都会有一个自增的编号,用于标识消息的偏移量,标识顺序从 0 开始。
每个 partition 中的数据使用多个 segment 文件存储。
如果 topic 有多个 partition,消费数据时就不能保证数据的顺序。严格保证消息的消费顺序的场景下(例如商品秒杀、 抢红包),需要将 partition 数目设为 1。
 

broker 存储 topic 的数据:

  • 如果某 topic 有 N 个 partition,集群有 N 个 broker,那么每个 broker 存储该 topic 的一个 partition。
  • 如果某 topic 有 N 个 partition,集群有 (N+M) 个 broker,那么其中有 N 个 broker 存储 topic 的一个 partition, 剩下的 M 个 broker 不存储该 topic 的 partition 数据。
  • 如果某 topic 有 N 个 partition,集群中 broker 数目少于 N 个,那么一个 broker 存储该 topic 的一个或多个 partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致 Kafka 集群数据不均衡。

分区的原因:

  • 便于在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了。
  • 可以提高并发,因为可以以Partition为单位读写了。

(1)Replica(副本)

  • 每一个分区都有多个副本,副本的作用就是备份数据。一个Topic的每个分区都有若干个副本,一个leader和若干个follower。
  • 当主分区故障后,副本进行顶替它的位置。

(2)Leader(领导者)

  • 每个分区有多个副本,其中有且仅有一个作为leader,leader是当前负责数据读写的分区。

(3)follower(追随者)

  • follower跟随leader,所有请求都通过leader路由,数据变更会广播给所有follower,follower和leader保持数据同步,follower只负责备份,不负责数据的读写。
  • 如果leader故障,则从follower中选举出一个新的leader。
  • 当follower挂掉,卡主或者同步太慢,leader会把这个follower从集群列表中删除,重新创建一个follower。

(4) producer

  • 生产者即数据的发布者,该角色将消息 push 发布到 Kafka 的 topic 中。
  • broker 接收到生产者发送的消息后,broker 将该消息追加到当前用于追加数据的 segment 文件中。
  • 生产者发送的消息,存储到一个 partition 中,生产者也可以指定数据存储的 partition。
     

(5)Consumer

消费者可以从 broker 中 pull 拉取数据。消费者可以消费多个 topic 中的数据。

(6)Consumer Group(CG)

  • 消费者组,由多个 consumer 组成。
  • 所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。可为每个消费者指定组名,若不指定组名则属于默认的组。
  • 将多个消费者集中到一起去处理某一个 Topic 的数据,可以更快的提高数据的消费能力。
  • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费,防止数据被重复读取。
  • 消费者组之间互不影响。

(7)offset 偏移量

  • 可以唯一的标识一条消息
  • 偏移量决定读取数据的位置,不会有线程安全的问题,消费者通过偏移量来决定下次读取的消息(即消费位置)
  • 消费被消费之后,并不会被删除,这样多个业务就可以重复使用kafka的消息。
  • 某一个业务也可以通过修改偏移量达到重新读取消息的目的,偏移量由用户控制。
  • 消息最终还是会被删除,默认生命周期为1周(168小时)。

(8)Zookeeper

kafka集群依赖zookeeper来存储meta(变化)信息。

  • 由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。
  • Kafka 0.9 版本之前,consumer 默认将 offset 保存在 Zookeeper 中;从 0.9 版本开始,consumer 默认将 offset 保存在 Kafka 一个内置的 topic 中,该 topic 为 __consumer_offsets。
  • zookeeper的作用就是,生产者push数据到kafka集群,就必须要找到kafka集群的节点在哪里,这些都是通过zookeeper去寻找的。消费者消费哪一条数据,也需要zookeeper的支持,从zookeeper获得offset,offset记录上一次消费的数据消费到哪里,这样就可以接着下一条数据进行消费。

四、kafka 工作流程及文件存储机制

  • Kafka 中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向 topic 的。
  • topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据。
  • Producer 生产的数据会被不断追加到该 log 文件末端,且每条数据都有自己的 offset。 消费者组中的每个消费者,都会实时记录自己消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费。
  • 由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个 partition 分为多个 segment。每个 segment 对应两个文件:“.index” 文件和 “.log” 文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如,test 这个 topic 有三个分区, 则其对应的文件夹为 test-0、test-1、test-2。
  • index 和 log 文件以当前 segment 的第一条消息的 offset 命名。

  • “.index” 文件存储大量的索引信息,“.log” 文件存储大量的数据,索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。

五、数据可靠性保证

为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到 producer 发送的数据后, 都需要向 producer 发送 ack(acknowledgement 确认收到),如果 producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。

六、数据一致性问题

LEO:指的是每个副本最大的 offset; 
HW:指的是消费者能见到的最大的 offset,所有副本中最小的 LEO。

(1)follower 故障 
follower 发生故障后会被临时踢出 ISR(Leader 维护的一个和 Leader 保持同步的 Follower 集合),待该 follower 恢复后,follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。

(2)leader 故障 
leader 发生故障之后,会从 ISR 中选出一个新的 leader, 之后,为保证多个副本之间的数据一致性,其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader 同步数据。

注:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。 

七、ack 应答机制

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等 ISR 中的 follower 全部接收成功。所以 Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡选择。

当 producer 向 leader 发送数据时,可以通过 request.required.acks 参数来设置数据可靠性的级别:

  • 0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。当broker故障时有可能丢失数据。
  • 1(默认配置):这意味着producer在ISR中的leader已成功收到的数据并得到确认后发送下一条message。如果在follower同步成功之前leader故障,那么将会丢失数据。
  • -1(或者是all):producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是如果在 follower 同步完成后,broker 发送ack 之前,leader 发生故障,那么会造成数据重复。

三种机制性能依次递减,数据可靠性依次递增。

注:在 0.11 版本以前的Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。在 0.11 及以后版本的 Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指 Producer 不论向 Server 发送多少次重复数据, Server 端都只会持久化一条。

总结:

生产者要推送到kafka集群需要先通过zookeeper确定kafka的位置,消费者消费的数据到哪里也要根据数据在存储zookeeper上的offset,来确定offset偏移量记录上一条消息者消费的数据位置,以便在故障恢复后可以接着下一次数据继续消费

几个kafka服务器就是几个broker,生成推送数据到topic,topic可以被分区多个partition,一个partition可以有多个relica,relica副本可以是一个leader和多个follower,leader负责数据的读写,follower仅对数据进行备份。消费者面向topic进行数据的消费。

​​​​​​​

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/139067.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

ImageJ查看图像灰度值矩阵及像素编号从0开始

ImageJ查看图像灰度值矩阵 imagej打开一幅图像 然后image —— transform——image to results,等一下就会出现灰度值矩阵 我读取的如下,可以看出,imagej对像素的编号是从0开始的,切记!!!跟C/…

编译原理.龙书学习1

第一章: 编译器:将程序翻译成一种能够被计算机执行的形式 解释器:解释器直接利用用户提供的输入执行源程序中指定的操作 一个编译器的结构 编译器将源程序映射为语义上等价的目标程序,这个映射过程由两部分组成:分析…

MyBatis友人帐之ResultMap及分页

一、ResultMap 1.1查询为null问题 要解决的问题&#xff1a;属性名和字段名不一致 解决方案 方案一&#xff1a;为列名指定别名 , 别名和java实体类的属性名一致 . <select id"selectUserById" resultType"User">select id , name , pwd as passwor…

【PyTorch 攻略 (4/7)】张量和梯度函数

一、说明 W在训练神经网络时&#xff0c;最常用的算法是反向传播。在该算法中&#xff0c;参数&#xff08;模型权重&#xff09;根据损失函数相对于给定参数的梯度进行调整。损失函数计算神经网络产生的预期输出和实际输出之间的差异。 目标是获得尽可能接近零的损失函…

QUIC协议报文解析(三)

在前面的两篇文字里我们简单介绍了QUIC的发展历史&#xff0c;优点以及QUIC协议的连接原理。本篇文章将会以具体的QUIC报文为例&#xff0c;详细介绍QUIC报文的结构以及各个字段的含义。 早期QUIC版本众多&#xff0c;主要有谷歌家的gQUIC&#xff0c;以及IETF致力于将QUIC标准…

打印由数字组成的金字塔图案——python

1222 33333 4444444 555555555打印由数字组成的金字塔图案。但n9时&#xff0c;如下图所示。 输入格式: 输入一个整数n&#xff08;1<A<9&#xff09;。 输出格式: 输出由数字组成的金字塔图案。 输入样例: 在这里给出一组输入。例如&#xff1a; 5输出样例: 在这…

【 2023华为杯C题】大规模创新类竞赛评审方案研究(思路、代码......)

目录 1 题目概述 2 问题 3 极差的定义及标准分的计算方法 4 题目及数据下载 5 思路、代码下载...... 1 题目概述 现在创新类竞赛很多&#xff0c;其中规模较大的竞赛&#xff0c;一般采用两阶段&#xff08;网评、现场评审&#xff09;或三阶段&#xff08;网评、现场评审…

高效畅通的iOS平台S5配置指南

在iOS平台上&#xff0c;使用S5代理ip访问互联网是一种非常有用的技巧。无论是为了保证隐私安全&#xff0c;还是解决网络限制问题&#xff0c;S5代理ip都能为您提供更快、更稳定的互联网访问体验。本文将为您详细介绍如何在iOS平台上配置和使用S5代理ip&#xff0c;让您的网络…

git之撤销工作区的修改和版本回溯

有时候在工作区做了一些修改和代码调试不想要了,可如下做 (1)步骤1:删除目录代码,确保.git目录不能修改 (2)git log 得到相关的commit sha值 可配合git reflog 得到相要的sha值 (3)执行git reset --hard sha值,可以得到时间轴任意版本的代码 git reset --hard sha值干净的代…

【Java 基础篇】Java网络编程实战:P2P文件共享详解

Java网络编程是现代软件开发中不可或缺的一部分&#xff0c;因为它允许不同计算机之间的数据传输和通信。在本篇博客中&#xff0c;我们将深入探讨Java中的P2P文件共享&#xff0c;包括什么是P2P文件共享、如何实现它以及一些相关的重要概念。 什么是P2P文件共享&#xff1f; …

23个销量最高的3D扫描仪【2023】

如果你可以 3D 扫描它&#xff0c;你就可以 3D 打印它。 市场上 3D 扫描仪的种类和质量非常丰富&#xff0c;机器尺寸、功能和价格各异。 这样的选择虽然本身是一件很棒的事情&#xff0c;但也会让从无用的东西中挑选出宝石成为一件苦差事。 推荐&#xff1a;用 NSDT编辑器 快速…

HTTP各版本差异

HTTP1.0 无法复用连接 HTTP1.0为每个请求单独新开一个TCP连接 #mermaid-svg-9N3exXRS4VvT4bWF {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-9N3exXRS4VvT4bWF .error-icon{fill:#552222;}#mermaid-svg-9N3exXRS…

Reinforcement Learning(二)--on-policy和off-policy

1.前言 强化学习&#xff08;Reinforcement learning&#xff0c;RL&#xff09;是机器学习的一个分析&#xff0c;特点是概念多、公式多、入门门槛高&#x1f972;&#xff08;别问我怎么知道的&#xff09;。本篇文章着重讲解RL最重要的概念之一&#xff0c;即on-policy和of…

2023工博会强势回归!智微工业携八大系列重磅亮相

中国国际工业博览会&#xff08;简称"中国工博会"&#xff09;自1999年创办以来&#xff0c;历经二十余年发展创新&#xff0c;通过专业化、市场化、国际化、品牌化运作&#xff0c;已发展成为通过国际展览业协会&#xff08;UFI&#xff09;认证、中国工业领域规模最…

mybatis/mp批量插入非自增主键数据

文章目录 前言一、mp的批量插入是假的二、真正的批量插入1.利用sql注入器处理2.采用自编码,编写xml批量执行生成内容如下: 三 问题问题描述问题原因问题解决粘贴一份,兼容集合替换原有文件 总结自增与非自增区别: 前言 mybatis/mp 在实际开发中是常用的优秀持久层框架,但是在非…

Linux:GlusterFS 集群

GlusterFS介绍 1&#xff09;Glusterfs是一个开源的分布式文件系统,是Scale存储的核心,能够处理千数量级的客户端.在传统的解决 方案中Glusterfs能够灵活的结合物理的,虚拟的和云资源去体现高可用和企业级的性能存储. 2&#xff09;Glusterfs通过TCP/IP或InfiniBand RDMA网络链…

【C++】String类基本接口介绍及模拟实现(多看英文文档)

string目录 如果你很赶时间&#xff0c;那么就直接看我本标题下的内容即可&#xff01;&#xff01; 一、STL简介 1.1什么是STL 1.2STL版本 1.3STL六大组件 1.4STL重要性 1.5如何学习STL 二、什么是string&#xff1f;&#xff1f;&#xff08;本质上是一个类&#xff0…

【Redis】深入探索 Redis 的数据类型 —— 列表 List

文章目录 一、List 类型介绍二、List 类型相关命令2.1 LPUSH 和 RPUSH、LPUSHX 和 RPUSHX2.2 LPOP 和 RPOP、BLPOP 和 BRPOP2.3 LRANGE、LINDEX、LINSERT、LLEN2.4 列表相关命令总结 三、List 类型内部编码3.1 压缩列表&#xff08;ziplist&#xff09;3.2 链表&#xff08;lin…

Git错误解决:如何处理“could not determine hash algorithm“问题

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

openssl创建CA证书教程

配置生成CA证书 总示意图&#xff1a; (1)&#xff0c;通过openssl创建CA证书 第一步&#xff1a;创建一个秘钥&#xff0c;这个便是CA证书的根本&#xff0c;之后所有的东西都来自这个秘钥 # 通过rsa算法生成2048位长度的秘钥 openssl genrsa -out myCA.key 2048 第二步&#…