【如何使用Python操作Kafka】

如何使用Python操作Kafka

1、安装kafka
pip install kafka
2、直接编写Kafka工具脚本
kafka_tools.py
# -*- coding: utf-8 -*-import json
from kafka import KafkaConsumer, KafkaProducerclass KProducer:def __init__(self, username_ip, topic):"""kafka 生产者:param bootstrap_servers: 地址:param topic:  topic"""bootstrap_servers = Config.get('kafka:{}:bootstrap_servers'.format(username_ip))self.producer = KafkaProducer(bootstrap_servers=bootstrap_servers,value_serializer=lambda m: json.dumps(m).encode('ascii'), )  # json 格式化发送的内容self.topic = topicdef sync_producer(self, data_li):"""同步发送 数据:param data_li:  发送数据:return:"""for data in data_li:future = self.producer.send(self.topic, data)record_metadata = future.get(timeout=10)  # 同步确认消费partition = record_metadata.partition  # 数据所在的分区offset = record_metadata.offset  # 数据所在分区的位置print('save success, partition: {}, offset: {}'.format(partition, offset))def asyn_producer(self, data_li):"""我目前优先使用的是此方案异步发送数据:param data_li:发送数据:return:"""for data in data_li:self.producer.send(self.topic, data)self.producer.flush()  # 批量提交# self.producer.close()  # 提交完成关闭  不需要关闭注释此行def asyn_producer_callback(self, data_li):"""异步发送数据 + 发送状态处理:param data_li:发送数据:return:"""for data in data_li:self.producer.send(self.topic, data).add_callback(self.send_success).add_errback(self.send_error)self.producer.flush()  # 批量提交def send_success(self, *args, **kwargs):"""异步发送成功回调函数"""print('save success')returndef send_error(self, *args, **kwargs):"""异步发送错误回调函数"""print('save error')returndef close_producer(self):try:self.producer.close()except:passclass KConsumer:def __init__(self, username_ip, topic, offset_reset):bootstrap_servers = Config.get('kafka:{}:bootstrap_servers'.format(username_ip))self.bootstrap_servers = bootstrap_serversself.offset_reset = offset_resetself.topic = topicdef read_kafka(self):consumer = KafkaConsumer(self.topic,bootstrap_servers=self.bootstrap_servers,auto_offset_reset=self.offset_reset)return consumer
3、使用此工具

demo_kafka.py

# -*- coding: utf-8 -*-
from kafka_tools import KProducer, KConsumerdef data_read_kafka(username_ip, topic, offset_reset):"""读取卡夫卡数据username_ip: 你的kafka的地址topic:你的topicoffset_reset:消费方式 LATEST 获取当前偏移量最新消息  EARLIEST  从头开始获取信息"""consumer = KConsumer(username_ip, topic, offset_reset).read_kafka()for message in consumer:data_info = message.value.decode('utf-8')print(data_info)def send_kafka(username_ip, topic, data_list):"""向kafka写入数据username_ip: 你的kafka的地址topic:你的topicdata_list: 需要发送的数据列表"""KProducer(username_ip, topic).asyn_producer(data_list)

关注公众号

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/144497.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

【算法篇】栈与队列类(笔记)

目录 一、用栈实现队列 二、用队列实现栈 三、有效的括号 四、删除字符串中的所有相邻重复项 五、逆波兰表达式求值 六、滑动窗口最大值 七、前 K 个高频元素 一、用栈实现队列 232. 用栈实现队列 - 力扣(LeetCode)https://leetcode.cn/proble…

[PTA]7-6 吃火锅

[PTA]7-6 吃火锅 以上图片来自微信朋友圈:这种天气你有什么破事打电话给我基本没用。但是如果你说“吃火锅”,那就厉害了,我们的故事就开始了。 本题要求你实现一个程序,自动检查你朋友给你发来的信息里有没有 chi1 huo3 guo1。 …

手写Spring

简单实现Spring基于注解配置 ComponentScan Target(ElementType.TYPE) Retention(RetentionPolicy.RUNTIME) public interface ComponentScan {String value() default ""; } 相当于component-scan HspSpringConfig ComponentScan(value "spring.write.com…

两个指令反过来说大模型就理解不了啦?或许该让第三者插足啦 -通过引入中间LLM预处理用户输入以提高多任务处理能力

今天就遇到有点儿dt的问题,利用大模型顺利通了自定义的工具调用(并没有用到tools功能,而是通过prompt强制输出),单个单个的没问题哈,但是多个一起就出现问题了 我说“关闭电脑PC1, 打开第2台电脑” 它看不懂…

安卓实现导入Excel文件

使用简化版的jar包 api files(libs/poi-3.12-android-a.jar) api files(libs/poi-ooxml-schemas-3.12-a.jar) 导入遇到了两个兼容问题 1.build.gradle文件里面 android { 要添加 packagingOptions {exclude META-INF/INDEX.LIST } 2.加载大文件要在清单文件里面加androi…

网络变压器HR911130C的使用注意点

HR911130C的使用,需要2个注意点: 1)数据线data0、data2、data3是相邻的引脚,但是data1是 不相邻的两个引脚,注意看下面的电路图,所以绘图时需要注意 2)LED灯的连接 11脚、12脚,连…

快手可灵AI全球升级1.5模型:引入“运动笔刷”功能 画质大幅提升

9月19日,快手公司宣布其可灵AI模型进行了全球范围内的重磅升级,推出了1.5版本。新版本在多个方面实现了显著提升,包括视频画质、动态效果、美学表现、运动合理性以及语义理解等。 新升级的1.5模型支持在高品质模式下直接输出1080p高清视频&am…

【CSS】一行三个盒子 每个盒子都是16:9

padding-top 属性接受百分比值时,其百分比是基于父元素的宽度来计算的,而不是自身元素的宽度 aspect-ratio 更方便&#xff0c;但存在兼容性问题 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name&quo…

字符设备驱动 — 4 异常与中断

异常与中断 中断属于异常的一种 异常会直接打断 CPU 的运行&#xff0c;而各种中断会被传到中断控制器&#xff0c;由中断控制器来选择优先级最高的中断并通知 CPU 处理流程 arm 对异常&#xff08;中断&#xff09;处理流程&#xff1a; 初始化&#xff1a; 设置中断源&…

水经微图PC版5.0.0即将内测

让GIS更简单高效&#xff01; 水经微图&#xff08;以下称“微图”&#xff09;PC版5.0.0即将内测&#xff0c;这是一个基于WeMapEngine开发的全新版本。 关于什么是WeMapEngine&#xff0c;请从《WeMapEngine可快速构建的GIS应用功能》一文中了解。 微图5.0.0功能界面 水经…

【分享】“可恶”的运算放大器电容负载

他们说如果使用放大器驱动电容负载(图 1、CLOAD)&#xff0c;一个不错的经验是采用一个 50 或 100 欧的电阻器 (RISO) 将放大器与电容器隔开。这个附加电阻器可能会阻止运算放大器振荡。 图 1.支持电容负载的放大器可能需要在放大器输出与负载电容器之间连接一个电阻器。 使用…

STM32—I2C通信外设

1.I2C外设简介 STM32内部集成了硬件I2C收发电路&#xff0c;可以由硬件自动执行时钟生成、起始终止条件生成、应答位收发、数据收发等功能&#xff0c;减轻CPU的负担支持多主机模型&#xff08;可变多主机&#xff09;支持7位/10位地址模式&#xff08;11110......)支持不同的通…

JavaWeb JavaScript 11.XML —— 配置文件

生活想埋没我&#xff0c;没想到我是颗种子 —— 24.9.19 一、XML 1.什么是XML XML是EXtensible Markup Languge的缩写&#xff0c;翻译过来就是可扩展标记语言。所以很明显&#xff0c;XML和HTML一样都是标记语言&#xff0c;也就是说它们的基本语法都是标签 可扩展 三个字…

OpenCV基础入门30讲(Python)——第二讲 图像色彩转换

常见的几种颜色类型介绍 1、彩色图像&#xff08;Color Image&#xff0c;BGR&#xff09; 数据类型&#xff1a;uint8通道数&#xff1a;3&#xff08;BGR&#xff1a;蓝色、绿色、红色&#xff09;描述&#xff1a;彩色图像有三个通道&#xff0c;每个通道的值范围是 0 到 …

【图书推荐】《Autodesk Inventor 2024入门与案例实战(视频教学版)》

本书重点 配套示例文件、PPT课件、教学视频、电子教案、课程标准、骄婿大纲、模拟试题、作者微信群答疑服务。 内容简介 《Autodesk Inventor 2024入门与案例实战&#xff1a;视频教学版》以Autodesk Inventor 2024为平台&#xff0c;重点介绍Autodesk Inventor 2024中文版的…

洗衣机制造5G智能工厂物联数字孪生平台,推进制造业数字化转型

洗衣机制造业作为传统制造业的重要组成部分&#xff0c;通过引入5G智能工厂物联数字孪生平台&#xff0c;加速推进自身的数字化转型进程。这一创新模式不仅极大地提升了生产效率&#xff0c;还深刻改变了产品的设计、生产、管理及运维流程&#xff0c;为行业带来了前所未有的竞…

[数据集][目标检测]手机识别检测数据集VOC+YOLO格式9997张1类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;9997 标注数量(xml文件个数)&#xff1a;9997 标注数量(txt文件个数)&#xff1a;9997 标注…

saltstack企业实战

saltstack官网最新文档 saltstack架构设计 saltstack 高可用方案&#xff1a;Salt官网是有 HARebalance minion配置里写多个master地址 failover&#xff08;syndic&#xff09; 架构 操作系统&#xff1a;CentOS7.6salt版本&#xff1a;3000.3 多master https://www.cn…

【贪心算法】贪心算法一

贪心算法一 1.柠檬水找零2.将数组和减半的最少操作次数3.最大数4.摆动序列 点赞&#x1f44d;&#x1f44d;收藏&#x1f31f;&#x1f31f;关注&#x1f496;&#x1f496; 你的支持是对我最大的鼓励&#xff0c;我们一起努力吧!&#x1f603;&#x1f603; 1.柠檬水找零 题目…

【2023工业异常检测文献】SimpleNet

SimpleNet:ASimpleNetworkforImageAnomalyDetectionandLocalization 1、Background 图像异常检测和定位主要任务是识别并定位图像中异常区域。 工业异常检测最大的难题在于异常样本少&#xff0c;一般采用无监督方法&#xff0c;在训练过程中只使用正常样本。 解决工业异常检…