从kafka和zookeeper中获取生产和消费偏移量

从kafka和zookeeper中获取生产和消费偏移量

  • 特殊说明

    • 该命令是使用python进行编译,需要使用centos7系统上进行使用。
  • 命令详细

[root@mongodb_1 get_offsets_num]# ./get_offsets_num -h
usage: get_offsets_num [-h] [-k KAFKA_HOST] [-z ZOOKEEPER_HOST][-m INTERVAL_MINUTES]Usage of argparseoptional arguments:-h, --help            show this help message and exit-k KAFKA_HOST, --kafka_host KAFKA_HOST需要输入kafka:端口-z ZOOKEEPER_HOST, --zookeeper_host ZOOKEEPER_HOST需要输入zookeeper:端口-m INTERVAL_MINUTES, --Interval_minutes INTERVAL_MINUTES间隔分钟
  • 命令执行
[root@mongodb_1 get_offsets_num]# ./get_offsets_num_v2.py  -k 10.130.25.77:9092 -z 10.130.25.79:2181  
Interval 1 minutes sleep
=======================================================================================
kafka offsets: agent 2574552 2574552
zookeeper offsets: agent 2574552 2574552
agent kafka offsets num: 0 storm offsets num: 0 Actual consumption: 0
=======================================================================================
kafka offsets: record 89110 89110
zookeeper offsets: record 89110 89110
record kafka offsets num: 0 storm offsets num: 0 Actual consumption: 0
=======================================================================================
  • 代码详情
#!/usr/local/python3/bin/python3
import os, time,json,argparse
from kazoo.client import KazooClient
from kafka3 import KafkaConsumer, TopicPartitiondef get_zoo_consumer_info(Topology):Topology_num = 0zk_cli.start()path = "/stormOffset/" + Topology + "/partition_0"if zk_cli.exists(path):str_data, stat = zk_cli.get(path)str_data = json.loads(str_data)Topology_num =  str_data.get("offset")#print("zookeeper now " + path + " offsets: " + str(Topology_num) )else:   print("Path " + path  + " does not exist.")return Topology_numdef get_kafka_consumer_info(server, topic):partition = 0tp = TopicPartition(topic, partition)end_offset = server.end_offsets([tp])[tp]#print("kafka topic " + topic + " partition " + str(partition) + " offsets: " + str(end_offset))return end_offsetif  __name__ == '__main__':parser = argparse.ArgumentParser(description='Usage of argparse')parser.add_argument('-k','--kafka_host', type=str, default="10.130.25.77:9092",help='需要输入kafka:端口')parser.add_argument('-z','--zookeeper_host', type=str, default="10.130.25.79:2181",help='需要输入zookeeper:端口')parser.add_argument('-m','--Interval_minutes', type=int, default="1",help='间隔分钟')args = parser.parse_args()kafka_host= args.kafka_hostzookeer_host= args.zookeeper_hostKafka_production_topics = "agent,record"Zoo_consumption_topics= "agentTopology,recordTopology"Interval_minutes = args.Interval_minutestry:zk_cli = KazooClient(hosts=zookeer_host)#print("init zookeeper " + zookeer_host + " conn ok")except Exception as e:print("init zookeeper conn error: "+ str(e))try:#kafka_server = KafkaConsumer(bootstrap_servers=kafka_host)kafka_server = KafkaConsumer(bootstrap_servers=kafka_host)#print("init kafka " + kafka_host + "  conn ok")except Exception as e:print("init kafka conn error: "+ str(e))zoo_offset = {}kafka_offset = {}Kafka_production_topics_list = Kafka_production_topics.split(",")Kafka_production_topics_list_2  =  Kafka_production_topics.split(",")Zoo_consumption_topics_list = Zoo_consumption_topics.split(",")Zoo_consumption_topics_list_2 =   Zoo_consumption_topics.split(",")for i in range(0,len(Kafka_production_topics_list)):kafka_topics = Kafka_production_topics_list.pop()get_kafka_offset_num = get_kafka_consumer_info(kafka_server,kafka_topics)kafka_offset[kafka_topics]=get_kafka_offset_numzoo_topics = Zoo_consumption_topics_list.pop()get_zoo_offset_num = get_zoo_consumer_info(zoo_topics)zoo_offset[zoo_topics]= get_zoo_offset_numprint("Interval " + str(Interval_minutes) + " minutes sleep")print("=======================================================================================")time.sleep(int(Interval_minutes) * 60)for i in range(0,len(Kafka_production_topics_list_2)):kafka_topics = Kafka_production_topics_list_2.pop()get_kafka_offset_num = get_kafka_consumer_info(kafka_server,kafka_topics)last_kafka_num = kafka_offset.get(kafka_topics)minutes_kafka_offset_num = get_kafka_offset_num - last_kafka_numzoo_topics = Zoo_consumption_topics_list_2.pop()get_zoo_offset_num = get_zoo_consumer_info(zoo_topics)last_zoo_num =  zoo_offset.get(zoo_topics)minutes_zoo_offset_num = get_zoo_offset_num - last_zoo_numDifference = minutes_kafka_offset_num - minutes_zoo_offset_numprint("kafka offsets:",kafka_topics,get_kafka_offset_num,last_kafka_num)print("zookeeper offsets:",kafka_topics,get_zoo_offset_num,last_zoo_num)print(kafka_topics  + " kafka offsets num: " + str(minutes_kafka_offset_num) + " storm offsets num: " + str(minutes_zoo_offset_num) + " Actual consumption: " + str(Difference))print("=======================================================================================")zk_cli.stop()# 关闭消费者连接kafka_server.close()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1543946.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

Springcloud框架-能源管理系统-能源管理系统源码-能源在线监测平台-双碳平台

一、介绍 基于SpringCloud的能管管理系统-能源管理平台源码-能源在线监测平台-双碳平台源码-SpringCloud全家桶-能管管理系统源码 有需者咨询,非诚勿扰; 二、软件架构 二、功能介绍 三、数字大屏展示 四、数据采集原理 五、软件截图

Windows11系统安装,配置CUDA、cuDNN等

已经有大几年没有安装过 Windows 的系统了,今天因为黑神话悟空,准备把 Win 11 装一台,玩玩游戏,顺便把一些 CUDA 相关的异步解析项目也安装到 Window 上。 下载安装 PE 因为十几年前,只会用 PE 装系统,所…

XSS闯关小游戏(前13关)

挖掘思路 1.存在可控参数 2.页面存在回显 3.使用带有特殊字符的语句去测试&#xff0c;网站是否进行了实例化 ( 例如 ">123 ) 4.构造闭合&#xff0c;实现payload的逃逸 1 name处参数可控&#xff0c;直接打即可 2 这里知道<>被实体编码了 再测试">1…

DANN GRL

域自适应是指在目标域与源域的数据分布不同但任务相同下的迁移学习&#xff0c;从而将模型在源域上的良好性能迁移到目标域上&#xff0c;极大地缓解目标域标签缺失严重导致模型性能受损的问题。 介绍一篇经典工作 DANN &#xff1a; 模型结构 在训练阶段需要预测如下两个任务…

langchain的构成

1.简介 langchain的构成其包含langchain-core,langchain-community,langchain,langgraph,langserve,langSmith。 2&#xff0c;构件的详解 ‌LangChain Core‌ ‌LangChain Core‌是LangChain框架的核心组成部分&#xff0c;它包含了不同组件的基本抽象以及将它们组合在一起…

【每天学个新注解】Day 4 Lombok注解简解(三)—@NonNull

我们在之前的三天学了Lombok常用的注解&#xff1a; 【每天学个新注解】Day 1 Lombok注解简解&#xff08;〇&#xff09;—Getter、Setter、ToString、EqualsAndHashCode、Constructor 【每天学个新注解】Day 2 Lombok注解简解&#xff08;一&#xff09;—Data、Build、Value…

Kubernetes调度单位Pod

Kubernetes调度单位Pod 1 Pod简介 不直接操作容器container。 一个 pod 可包含一或多个容器&#xff08;container&#xff09;&#xff0c;它们共享一个 namespace&#xff08;用户&#xff0c;网络&#xff0c;存储等&#xff09;&#xff0c;其中进程之间通过 localhost 本地…

iOS 巨魔技巧:一键汉化巨魔商店

嘿&#xff0c;这是黑猫。iOS 巨魔商店一直都有个严重的问题&#xff1a;界面纯英文&#xff0c;不支持简体中文。 当然了&#xff0c;在IT行业&#xff0c;英语是通用语言。但是&#xff0c;既然巨魔/越狱面向普罗大众的技术&#xff0c;那么做好语言适配&#xff0c;还是很关…

idea插件开发系列1-环境搭建

前言 还记着10多年前有幸接触了eclipse插件开发&#xff0c;10多年后的今天有开发了idea的插件&#xff0c;真是一个轮回&#xff01; 为什么要学习idea插件开发呢&#xff1f; 目前公司使用自己的MVC框架&#xff0c;没有相应的idea插件支持&#xff08;如类似mybatis插件可…

Redis简单介绍与安装应用

在当今的互联网时代&#xff0c;数据的快速存取和处理变得至关重要。Redis&#xff0c;作为一种高性能的键值存储系统&#xff0c;已经成为许多开发者和企业的首选。本文将简要介绍Redis的基本概念、工作原理以及其在实际应用中的一些典型用例。 一、简介 1&#xff09;概念 …

centos7 docker部署nacos

1. 一行代码安装git yum -y install git 2. 下载最新版nacos源码&#xff1a; git clone https://github.com/nacos-group/nacos-docker.git 进入nacos-docker文件 cd nacos-docker 3.docker安装数据库Mysql8 按这个来就行&#xff0c;非常好 Docker安装mysql8-超详细、每…

记某学校小程序漏洞挖掘

前言&#xff1a; 遇到一个学校小程序的站点&#xff0c;只在前端登录口做了校验&#xff0c;后端没有任何校验&#xff0c;奇葩弱口令离谱进去&#xff0c;站点里面越权泄露敏感信息&#xff0c;接管账号等漏洞&#xff01;&#xff01;&#xff01; 渗透思路 1.绕过前端 …

docker 创建showdoc服务 showdoc容器部署教程

1. 下载最新版本镜像 # 按照最新版本 docker pull star7th/showdoc 2. 创建映射文件夹&#xff1a; # 创建文件夹 mkdir -p /data/showdoc_data# 可写权限 chmod 777 /data/showdoc_data 3.创建容器命令&#xff1a; docker run -d --name showdoc --userroot --privileged…

DoppelGanger++:面向数据库重放的快速依赖关系图生成

doi&#xff1a;DoppelGanger: Towards Fast Dependency Graph Generation for Database Replay&#xff0c;点击前往 文章目录 1 简介2 架构概述3 依赖关系图3.1 符号和问题定义3.2 无 IT(k) 图3.3 无 OT 图表3.4 无 OTIT 图表3.5 无 IT[OT] 图表3.6 输出确定性保证 4 重复向后…

go-admin-ui的菜单分割线设计思路和代码实现

在菜单管理添加分割线&#xff0c;类似这种&#xff1a; 思路&#xff1a;利用空间结构和数据特点来唯一区分出分割线&#xff0c;来划分业务区域 <template><div><h1>Split Line Controller</h1><ul><li v-for"route in displayedRout…

Thinkphp5x远程命令执行 靶场攻略

环境配置 靶场&#xff1a;vulhub/thinkphp/5-rce docker-compose up -d #启动环境 漏洞复现 1.访问靶场&#xff1a;http://172.16.1.198:8080/ 2.远程命令执⾏ POC&#xff1a; ?sindex/think\app/invokefunction&functioncall_user_func_array&vars[0]system…

网安新声 | 黎巴嫩BP机爆炸事件带来的安全新挑战与反思

网安加社区【网安新声】栏目&#xff0c;汇聚网络安全领域的权威专家与资深学者&#xff0c;紧跟当下热点安全事件、剖析前沿技术动态及政策导向&#xff0c;以专业视野和前瞻洞察&#xff0c;引领行业共同探讨并应对新挑战的策略与可行路径。 9月17日&#xff0c;黎巴嫩境内发…

项目实战:lngress搭建Nginx+WP论坛+MariaDB

1. 网站架构 本次部署形式完全舍弃 Docker&#xff0c;将所有应用都置于Kubernetes&#xff0c;采用 Deployment 而非单 Pod 部署&#xff0c;稳定性得到升级。 2. 部署 MariaDB [rootk8s-master ~]# mkdir tdr [rootk8s-master ~]# cd tdr/ &#xff08;1&#xff09;定义 …

YOLOv8改进,YOLOv8添加STA注意机制(超级令牌注意力机制,CVPR2023),并二次创新C2f结构,助力涨点

改进前训练结果: 改进后训练结果: 摘要 在Transformer架构引入“超级令牌”(Super Token)的机制,旨在解决浅层网络中过多冗余的局部特征捕捉问题。传统的Transformer在捕捉长程依赖性方面表现出色,但在浅层网络中,由于局部特征冗余,导致了计算效率的低下。为了解决…

Nginx基础详解1(单体部署与集群部署、负载均衡、正反代理、nginx安装)

本阶段的任务 1.学会集群的操作概念 2.完成对Nginx的入门操作 3.使用Nginx实现集群和负载均衡 4.使用Nginx实现高可用的方案 目录 1.单体部署与集群部署 1.1单体部署的概念 1.2单体部署的优缺点 1.3集群部署的概念 1.4集群部署的优缺点 1.5集群部署需要注意的点 1.…