kafka使用指南

文章目录

  • 前言
  • 特点
  • 架构
  • 一、zookeeper安装配置
  • 二、kafka安装配置
  • 三、快去试一下吧!下一章:kafka命令之分区接入创建删除


前言

随着大数据时代的到来,高吞吐量的分布式发布订阅消息系统kafka得到了极大的应用,它具有高吞吐量、

特点

  • 高吞吐量:10w/s+
  • 持久化消息:磁盘
  • 分布式
  • 支持‌Hadoop数据并行加载

架构


一、zookeeper安装配置

为什么要安装zookeeper?Zookeeper是一个分布式的协调服务,主要用于维护集群的元数据信息和配置信息。Kafka集群依赖于Zookeeper来存储和管理Kafka的元数据信息和配置信息
1.安装路径

版本地址
3.8.4wget https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gzhttps://zookeeper.apache.org/releases.html尾部有bin是编译后的包,不带bin的未编译的包;使用已编译的包
  1. 安装配置
mkdir /opt/module  # 新建目录用于保存自定义数据
wget https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz
tar -zxvf apache-zookeeper-3.8.4-bin.tar.gz -C /opt/module/
cd /opt/module
mv apache-zookeeper-3.8.4-bin zookeeper-3.8.4
# mkdir /opt/module/zookeeper-3.8.4/zkData  # 用于存放zookeeper数据,会自动创建
cd /opt/module/zookeeper-3.8.4/conf
cp zoo_sample.cfg zoo_sample.cfg.bak
mv zoo_sample.cfg zoo.cfg
vim zoo.cfg  # 为什么?系统默认读取zoo.cfg文件
"""   zoo.cfg 详情(英译中)   """
# 通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
tickTime=2000
# LF初始通信时限。用于集群,slave首次跟随leader,leader最长等待时间(tickTime的数量)
initLimit=10
# LF同步通信时限。主leader从slave同步限制(用于集群),Leader主节点与从节点之间发送消息,请求和应答时间长度。超时放弃从节点
syncLimit=5
# 保存Zookeeper中的数据 注意:默认的tmp目录,容易被Linux系统定期删除,所以一般不用默认的tmp目录
dataDir=/opt/module/zookeeper-3.8.4/zkData
# 客户端连接端口,通常不做修改
clientPort=2181
# 单个客户端最大连接数,默认60,0为不限制,通过IP来区分不同的客户端
#maxClientCnxns=60
# 保留文件的数量,默认3个,正常此参数被注释掉,未被启用,一般配合autopurge.purgeInterval使用
#autopurge.snapRetainCount=3
# 自动清理快照文件和事务日志的频率,默认0不开启自动清理,单位为小时
#autopurge.purgeInterval=1
# 存储事务日志的目录,不指定代表存放在dataDir目录中
#dataLogDir=xxx## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpHost=0.0.0.0
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
"""
  1. 配置环境变量(依赖java,因此也需要JAVA_HOME配置)
export ZK_HOME=/opt/module/zookeeper-3.8.4
export PATH=$PATH:$ZK_HOME/bin
  1. 操作指令
cd /opt/module/zookeeper-3.8.4/bin
chmod +x zkServer.sh
# 启动服务
zkServer.sh start
# 查看服务
zkServer.sh status
# jps是jdk提供的一个查看当前java进程的小工具,更详细可以使用jps -ml,其它参数自行百度吧
# 查看启动效果, 结果见'->74221 QuorumPeerMain'
jps
->76435 Jps
->74221 QuorumPeerMain  # 这个就是zookeeper启动后的服务
->2981 Main
# 停止服务
zkServer.sh stop
# 启动客户端,如果是它机,用zkCli.sh -server ip:port
zkCli.sh

在这里插入图片描述

  1. 集群
    假设集群机Leader、Follower1、Follower2
# 分别在集群机群上安装配置zookeeper,同上
# 因是集群,需要配置通信。在各集群服务器数据目录配置myid文件,内容分别为1,2,3(快捷配置可以使用xsync)!以Leader为例
cd /opt/module/zookeeper-3.8.4/zkData
vim myid  # 内容见'->'
->1
# 集群服务器分别配置文件zoo.cfg追加集群内容,vim zoo.cfg如下,以Leader为例
server.1=Leader:2888:3888
server.2=Follower2:2888:3888
server.3=Follower3:2888:3888
"""配置参数解读
格式:server.A=B:C:D
A 是一个数字,表示这个是第几号服务器; 集群模式下配置一个文件 myid,这个文件在 dataDir 目录下,这个文件里面有一个数据就是 A 的值,Zookeeper 启动时读取此文件,拿到里面的数据与 zoo.cfg 里面的配置信息比 较从而判断到底是哪个 server
B 是这个服务器的地址
C 是这个服务器Follower与集群中的Leader服务器交换信息的端口
D 是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口
"""
# 接下来就是分别启动服务了,这个上面讲了启动关闭命令,就不书写了,不知道的话可以参考上文。分别启动很麻烦,可不可以做个批量脚本关闭集群
"""在Follower2机器上编写批量集群操作脚本zk.sh
#!/bin/bashcase $1 in
"start")for i in hadoop102 hadoop103 hadoop104doecho ---------- zookeeper $i 启动 ------------ssh $i "/opt/module/zookeeper-3.7.1/bin/zkServer.sh start"done
;;
"stop")for i in hadoop102 hadoop103 hadoop104doecho ---------- zookeeper $i 停止 ------------ ssh $i "/opt/module/zookeeper-3.7.1/bin/zkServer.sh stop"done
;;
"status")for i in hadoop102 hadoop103 hadoop104doecho ---------- zookeeper $i 状态 ------------ ssh $i "/opt/module/zookeeper-3.7.1/bin/zkServer.sh status"done
;;
esac
"""
# 配置可执行权限
chmod u+x zk.sh
启动/关停
zk.sh start/stop
  1. 引用zookeeper help

二、kafka安装配置

1.安装路径

版本地址
3.8.4https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgzhttps://kafka.apache.org/downloads.htmllinux下载安装分为源码和二进制两种形式。源码可以对代码定制化,一般要求较高。使用二进制的即可
  1. 安装配置
wget https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz
tar -zxvf kafka_2.13-3.8.0.tgz -C /opt/module/
cd /opt/module
ln -s kafka_2.13-3.8.0/ kafka
# mdkir /opt/module/kafka/logs  # 存放kafka日志,会自动创建
cd /opt/module/kafka/config
vim server.properties  # kafka配置
"""   server.properties 详情(英译中)   """
############################# Server Basics #############################
# 当前机器在集群中的唯一标识,和zookeeper的myid性质一样
broker.id=0############################# Socket Server Settings #############################
# 当前kafka服务启动的主机地址,单机可使用localhost,集群
host.name=localhost# 当前kafka对外提供服务的端口默认是9092
port=9092# 监听地址和端口,与(host.name&port)组合互斥 
# listeners=PLAINTEXT://localhost:9092# 这个是borker进行网络处理的线程数
num.network.threads=3# 这个是borker进行I/O处理的线程数
num.io.threads=8# 发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.send.buffer.bytes=102400# kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.receive.buffer.bytes=102400# 这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
socket.request.max.bytes=104857600############################# Log Basics #############################
# 存储日志文件的目录列表,以逗号分隔。上面参数num.io.threads值大于此处目录数。
log.dirs=/opt/module/kafka/logs# 默认的分区数,一个topic默认1个分区数
num.partitions=1# 每个数据目录用来日志恢复的线程数目
num.recovery.threads.per.data.dir=1# 是否启用log压缩,一般不用启用,启用的话可以提高性能
# log.cleaner.enable=false############################# Internal Topic Settings  #############################
# 偏移量主题的副本因子,小于1会增加数据丢失的风险!偏移量主题中的数据只会被复制到 Kafka 集群中的一个节点
offsets.topic.replication.factor=1
# 事务状态日志的复制因子副本数,小于1会增加数据丢失的风险!该日志只会被复制到一个 Kafka 集群中的另一个节点
transaction.state.log.replication.factor=1
# 事务状态日志的同步因子副本数,小于1会增加数据丢失的风险!只要有一个副本参与同步,Kafka 就会视写入操作为成功
transaction.state.log.min.isr=1############################# Log Flush Policy #############################
# 日志数据写入磁盘之前累积在内存中的消息数量。达到或超过阈值时,Kafka会触发日志刷新,将数据写入磁盘
#log.flush.interval.messages=10000# 日志数据写入磁盘之前,可以在日志中累积的时间。超过时间间隔时(无论日志消息数量到没到阈值),Kafka会触发日志刷新,将数据写入磁盘
#log.flush.interval.ms=1000############################# Log Retention Policy #############################
# 默认消息的最大持久化时间,168小时,7天
log.retention.hours=168# 每个分区中消息日志文件阈值。当一个日志文件的大小达到或超过这个阈值时,Kafka会根据配置的日志保留策略(如时间或大小)来决定是否删除旧的日志文件以释放空间
#log.retention.bytes=1073741824# 这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
#log.segment.bytes=1073741824# 每隔300000毫秒去检查上面配置的log失效时间
log.retention.check.interval.ms=300000############################# Zookeeper #############################
# 设置zookeeper的连接端口。格式:ip:port,如果集群的话,格式(ip1:port1,ip2:port2,ip3:port3)
zookeeper.connect=localhost:2181# 设置zookeeper的连接超时时间
zookeeper.connection.timeout.ms=18000############################# Group Coordinator Settings ############################## 消费者组初始再平衡的延迟时间。在Kafka中,当消费者组内的消费者数量发生变化(如有新的消费者加入或有消费者退出),会触发再平衡(rebalance)操作,即重新分配分区给消费者以确保负载均衡
group.initial.rebalance.delay.ms=0
  1. . 配置环境变量(依赖java,因此也需要JAVA_HOME配置)
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
  1. 操作指令
# 当logs无权限访问的话,可以sudo chown -R 用户名:用户组 /opt/module/kafka/logs
# 启动服务,后台启动,可使用nohup、&组合
kafka-server-start.sh /opt/module/kafka/config/server.properties
# nohup kafka-server-start.sh /opt/module/kafka/config/server.properties > /dev/null 2>&1 &
# jps是jdk提供的一个查看当前java进程的小工具,更详细可以使用jps -ml,其它参数自行百度吧
# 查看启动效果, 结果见'->44012 kafka'
jps
->76435 Jps
->74221 QuorumPeerMain  # 这个就是zookeeper启动后的服务
->44012 kafka  # 这个就是kafka启动后的服务
->2981 Main
# 多种形式查看启动效果
ps -ef |grep kafka
ls -l /proc/44012/cwd
lsof -i:9092
# 停止服务
kill -9 44012
  1. 集群
    假设集群机kafka0、kafka1、kafka2
# 分别在集群机群上安装配置kafka,同上"""
因是集群,需要配置通信
1、在各集群服务器kafka配置文件server.properties中修改broker.id值,分别为0,1,2(快捷配置可以使用xsync)!
2、在各集群服务器kafka配置文件server.properties中修改zookeeper.connect值为kafka0-ip:2181,kafka1-ip:2181,kafka-ip-ip:2181(快捷配置可以使用xsync)!
"""
# 接下来就是分别启动服务了,这个上面讲了启动关闭命令,就不书写了,不知道的话可以参考上文。分别启动很麻烦,可不可以做个批量脚本关闭集群
"""在kafka1机器上编写批量集群操作脚本kf.sh
# 后续补充
"""
# 配置可执行权限
chmod u+x kf.sh
启动/关停
kf.sh start/stop

三、快去试一下吧!下一章:kafka命令之分区接入创建删除


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1974.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

Windows 服务器中用户的分类

Windows 服务器中用户的分类 本地用户(只能在本地登录)如果你的服务器升级为域成员服务器,即刻失去本地服务。 漫游用户(域用户就是漫游用户,可用在域内的任何一个设备上、且在权限允许的范围内进行登录和资源使用。 …

基于YOLO11/v10/v8/v5深度学习的建筑墙面损伤检测系统设计与实现【python源码+Pyqt5界面+数据集+训练代码】

《博主简介》 小伙伴们好,我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 👍感谢小伙伴们点赞、关注! 《------往期经典推荐------》 一、AI应用软件开发实战专栏【链接】 项目名称项目名称1.【人脸识别与管理系统开发…

Sublime Text 的PHP格式化插件phpfmt 的 setting 配置参数说明

phpfmt.sublime-settings 是 Sublime Text 中 phpfmt 插件的配置文件,用于定义代码格式化的各种参数。以下是一些常见的配置参数及其说明: 1、version 指定配置文件的版本,根据 phpfmt 插件的版本,此值可能有所不同。 2、php_b…

Oracle视频基础1.2.1练习

1.2.1 需求: 完整格式查看所有用户进程判断oracle启动状态 连接sqlplus不登陆 以sysdba身份登陆,通过登陆信息判断oracle启动状态 启动数据库,查系统全局区动态组件表 使用shell,启动监听然后返回sql ps -ef sqlplus /nolog con…

Ajax学习

目录 一、是什么 二、jQuery.ajax 三、初实现 四、再实现 五、应用 一、是什么 AJAX:Asynchronous JavaScript and XML(异步的JavaScript和XML) 是一种在无需重新加载整个网页的情况下,能够更新部分网页的技术 应用&#…

音频中sample rate是什么意思?

‌sample rate‌在数字信号处理中,指的是‌采样频率‌,即每秒钟从连续信号中抽取的样本数量。采样频率越高,信号的还原度越高,但同时也会增加计算负担和存储需求‌。 实际应用场景 在音频处理中,设置合适的采样率可以…

RabbitMQ客户端应用开发实战

这一章节我们将快速完成RabbitMQ客户端基础功能的开发实战。 一、回顾RabbitMQ基础概念 这个RabbitMQ的核心组件,是进行应用开发的基础。 二、RabbitMQ基础编程模型 RabbitMQ提供了很多种主流编程语言的客户端支持。这里我们只分析Java语言的客户端。 上一章节提…

【光交换器件】

一、ROADM ROADM节点通常由波长选择开关(WSS)和其他模块组成。 ROADM分类 光网络交叉能力分类 Colorless(波长无关) Directionless(方向无关) Contentionless(竞争无关) Flexi-G…

docker的安装配置与基本简单命令

目录 1.docker简介 2.docker安装 2.1使用root用户登陆 更新yum源 2.2安装依赖 2.3设置yum源 更新yum源索引 2.4安装docker 2.5启动并且设置开机自启动 2.6验证安装是否成功 2.7配置docker加速器 2.8重启docker服务 3.docker简单使用 3.1下载镜像 3.2列出…

第72期 | GPTSecurity周报

GPTSecurity是一个涵盖了前沿学术研究和实践经验分享的社区,集成了生成预训练Transformer(GPT)、人工智能生成内容(AIGC)以及大语言模型(LLM)等安全领域应用的知识。在这里,您可以找…

Fx-LMS 单片机

功能:主动降噪控制器 开发板连接麦克风,通过ADC或其他方式采集声音信号。采集到的声音信号经过开发板内置的Fx-LIIS主动降噪算法处理,生成反向声波信号,并通过DAC输出至扬声器进行播放。通过反向声波与原声波叠加,达到…

web前端3D旋转相册(附完整代码)

效果图 当鼠标移动到目标时&#xff0c;外层的图片会张开&#xff0c;外面的图片修改透明度为0.5达到想要的效果。 完整代码 HTML部分 这里用的是绝对路径&#xff0c;一般建议使用相对路径&#xff08;..代表上一级&#xff0c;.代表当前文件夹&#xff09; <!DOCTYPE …

新老项目不同node版本,使用nvm控制node版本切换(mac、window)

window系统电脑的链接&#xff1a;https://blog.csdn.net/qq_40269801/article/details/136450961 以下是mac版本的操作方式&#xff1a; 1、打开终端 克隆 NVM 仓库&#xff1a; git clone https://github.com/nvm-sh/nvm.git ~/.nvm 2、运行安装脚本&#xff1a; cd ~/.n…

2024双11买什么东西比较好?2024年双十一必买清单大全!

2024年双十一到底哪些东西值得入手&#xff1f;今天总结了超全双十一值得入手的单品汇总&#xff01;随着一年一度的双十一购物狂欢节临近&#xff0c;各大电商平台纷纷推出各种促销活动&#xff0c;吸引了无数消费者的关注。在这场全民参与的购物盛宴中&#xff0c;如何在众多…

vi —— 终端中的编辑器

目标 vi 简介打开和新建文件三种工作模式常用命令分屏命令常用命令速查图 01. vi 简介 1.1 学习 vi 的目的 在工作中&#xff0c;要对 服务器 上的文件进行 简单 的修改&#xff0c;可以使用 ssh 远程登录到服务器上&#xff0c;并且使用 vi 进行快速的编辑即可常见需要修改…

安装fpm,解决*.deb=> *.rpm

要从生成 .deb 包转换为 .rpm 包&#xff0c;可以按照以下步骤修改打包脚本 1. 使用 fpm 工具 fpm 是一个强大的跨平台打包工具&#xff0c;可以将 .deb 包重新打包成 .rpm&#xff0c;也可以直接从源文件打包成 .rpm。 安装 fpm sudo apt-get install ruby-dev sudo gem in…

web文件包含include

php伪协议 在 PHP 中&#xff0c;伪协议&#xff08;Pseudo Protocols&#xff09; 也被称为 流包装器&#xff0c;这些伪协议以 php:// 开头&#xff0c;后面跟着一些参数&#xff0c;用于指定 要执行的操作 或 需要访问的资源。 伪协议表明这些协议并不是一个 真实的外部协议…

QChart中柱形图的简单使用并实现【Qt】

预备工作 如果qt没下载去下载一个&#xff0c;下载太慢了可以试试它[点击跳转]  (https://blog.csdn.net/qq_19319481/article/details/131655379)。   如果已经下载了qt发现自己的组件中没有QCharts&#xff0c;可以去试试它点击跳转。 都搞定了以后在pro文件里面添加QT …

视频点播系统扩展示例

更多的前端页面&#xff08;如视频详情页、用户注册页等&#xff09;。更复杂的业务逻辑&#xff08;如视频评论、搜索功能等&#xff09;。安全性和权限管理&#xff08;如用户角色管理、权限控制等&#xff09;。其他技术细节&#xff08;如文件上传、分页查询等&#xff09;…

KVM虚拟机迁移:无缝迁徙,重塑云上未来

作者简介&#xff1a;我是团团儿&#xff0c;是一名专注于云计算领域的专业创作者&#xff0c;感谢大家的关注 座右铭&#xff1a; 云端筑梦&#xff0c;数据为翼&#xff0c;探索无限可能&#xff0c;引领云计算新纪元 个人主页&#xff1a;团儿.-CSDN博客 目录 前言&#…