(Python) Structured Streaming读取Kafka源实时处理图像

Producer.py

import cv2
from kafka import KafkaProducer
import os
import os.path as osp# Kafka 服务器地址
bootstrap_servers = ['xxx.xxx.xxx.xxx:9092'] #terminal运行ifconfig可以查看localhost# 创建 Kafka 生产者
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)# 图片路径
image_path = 'your_path'files_and_folders = os.listdir(image_path)
for i in files_and_folders:try:# 读取图片image_data = cv2.imread(osp.join(image_path, i))if image_data is None:continue  # 如果图片读取失败,则跳过# 将图片数据转换为字节流_, buffer = cv2.imencode('.jpg', image_data)byte_data = buffer.tobytes()# 将字节流发送到 Kafkaproducer.send('image_test', byte_data)except Exception as e:print(f"Error processing image {i}: {e}")continue  # 即使发生错误,也继续处理下一个文件# 等待所有消息被发送
producer.flush()

从本地文件夹中读取几张图像模拟输入,采用CV的方法进行读取,将数据转化为字节流发送

Consumer.py

from pyspark.sql import SparkSession
import numpy as np
import cv2
import os
import matplotlib.pyplot as plt# 初始化 SparkSession
os.environ["SPARK_HOME"] = r"/usr/local/spark"
os.environ["PYSPARK_PYTHON"] = r"/usr/local/anaconda3/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = r"/usr/local/anaconda3/bin/python3"
os.environ["JAVA_HOME"] = r"/usr/lib/jvm/java-8-openjdk-amd64"spark = SparkSession.builder \.appName("KafkaSparkStreaming") \.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.3") \.getOrCreate()
sc = spark.sparkContextsave_path = "save_path"
# Kafka 服务器地址和主题
bootstrap_servers = 'xxx.xxx.xxx.xxx:9092'
kafka_topic = 'image_test'# 创建 Kafka 源
kafka_df = spark \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", bootstrap_servers) \.option("subscribe", kafka_topic) \.load()# 将二进制数据转换为图像
def binary_to_image(binary_data):# 将二进制数据转换为 numpy 数组image_array = np.frombuffer(binary_data, np.uint8)# 使用 OpenCV 解码图像image = cv2.imdecode(image_array, cv2.IMREAD_COLOR)return imagedef plt_show_image(image_list, batch_id):for i, image in enumerate(image_list):# 将BGR图像转换为RGB图像image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)plt.imshow(image)plt.title(f'Image {i + 1} in Batch {batch_id}')plt.axis('off')  # 不显示坐标轴plt.show()def transform_to_gray(sc, img_list):"""将原图像转为灰度图:param sc: sparkContext:param img_list::return: img_list"""list_rdd = sc.parallelize(img_list)result = list_rdd.map(lambda img: cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)).collect()return resultdef process_batch(batch_df, batch_id):print(f" batch ID: {batch_id}")# 将二进制数据转换为图像image_list = batch_df.rdd.map(lambda row: binary_to_image(row['value'])).collect()# 转换为灰度图像result = transform_to_gray(sc, image_list)# 展示图像plt_show_image(result, batch_id)query = kafka_df.select("value") \.writeStream \.foreachBatch(process_batch) \.outputMode("append") \.start()
query.awaitTermination()

环境配置正常可以不用os.environ[]配置临时环境 

笔者尝试直接用CV2展示图像,但只能展示处理后的第一批的图像,且会影响重新运行Producer接收数据,于是改用Plt进行展示,由于Producer采用CV的方式读取图片,其为BGR图像,而Plt为RGB图像,需要先进行一步转化

本案例中采用简单的灰度转化进行测试

附上原图和结果:

文件夹下有两张图片,这里只展示一张

重新运行一次Producer

两个微批的两张照片都成功并接收

附上更多的图像处理方法以供测试

def process_clahe(image):b, g, r = cv2.split(image)clahe = cv2.createCLAHE(clipLimit=2, tileGridSize=(8, 8))clahe_b = clahe.apply(b)clahe_g = clahe.apply(g)clahe_r = clahe.apply(r)filtered_image = cv2.merge((clahe_b, clahe_g, clahe_r))return filtered_imagedef process_hole(image):gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)# 转换为二值图ret, thresh1 = cv2.threshold(gray, 127, 255, cv2.THRESH_BINARY)# 转换为布尔值thresh1 = thresh1 > 1# 去除外部噪点stage1 = morphology.remove_small_objects(thresh1, min_size=256, connectivity=2)stage2 = morphology.remove_small_holes(stage1, area_threshold=5000, connectivity=1)stage2 = stage2.astype('uint8')filtered_image = cv2.cvtColor(stage2 * 255, cv2.COLOR_GRAY2RGB)# # 转换为灰度图# gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)# # 转换为二值图# ret, thresh = cv2.threshold(gray, 127, 255, cv2.THRESH_BINARY)## # 去除外部噪点(使用开运算)# kernel = np.ones((3, 3), np.uint8)# opening = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)## # 去除小孔(使用闭运算)# kernel_large = np.ones((20, 20), np.uint8)  # 根据需要调整内核大小# closing = cv2.morphologyEx(opening, cv2.MORPH_CLOSE, kernel_large)## # 将处理后的图像转换回原始图像的大小和类型# stage2 = cv2.resize(closing, (image.shape[1], image.shape[0]), interpolation=cv2.INTER_AREA)# stage2 = stage2.astype('uint8')## # 将单通道图像转换为三通道图像# filtered_image = cv2.cvtColor(stage2, cv2.COLOR_GRAY2RGB)return filtered_imagedef filter_image(sc, img_list, mode):"""按照mode处理图像:param sc: sparkContext:param img_list::param mode: 处理图像模式1.GaussianBlur2.medianBlur3.SHARPEN4.CLAHE5.Hole:return: img_list"""list_rdd = sc.parallelize(img_list)global result# 高斯滤波if mode == "GaussianBlur":image_rdd = list_rdd.map(lambda img: cv2.GaussianBlur(img, (5, 5), 0))result = image_rdd.collect()# 中值滤波if mode == "medianBlur":image_rdd = list_rdd.map(lambda img: cv2.medianBlur(img, 5))result = image_rdd.collect()# 锐化if mode == "SHARPEN":kernel = np.array([[0, -1, 0],[-1, 5, -1],[0, -1, 0]], dtype=int)# 4邻域模板# kernel = np.array([# [0, -1, 0],# [-1, 4, -1],# [0, -1, 0]], dtype=int)# 拉普拉斯模板# kernel = np.array([# [-1, -1, -1],# [-1, 8, -1],# [-1, -1, -1]], dtype=int)# 8邻域模板# kernel = np.array([# [-1, -1, -1],# [-1, 9, -1],# [-1, -1, -1]], dtype=int)image_rdd = list_rdd.map(lambda img: cv2.filter2D(img, -1, kernel))result = image_rdd.collect()# CLAHEif mode == "CLAHE":image_rdd = list_rdd.map(process_clahe)result = image_rdd.collect()# 孔洞填充if mode == "Hole":image_rdd = list_rdd.map(process_hole)result = image_rdd.collect()return result

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1541989.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

什么是 GPT?通过图形化的方式来理解 Transformer 架构

Predict, sample, repeat 预测、取样、重复 GPT 是 Generative Pre-trained Transformer 的缩写。首个单词较为直接,它们是用来生成新文本的机器人。“Pre-trained” 指的是模型经历了从大量数据中学习的过程,这个词暗示了该模型还有进一步在特定任务中…

开关柜设备红外检测数据集

开关柜设备红外检测数据集 包含以下2个数据文件: /train:训练集 /valid:验证集 /test:测试集 README.txt:数据说明 【数据说明】检测目标以Pascal VOC格式进行标注,对每个图像进行以下预处理,统…

极度精简 Winows11 系统镜像!Tiny11 2311下载 - 支持苹果 M 芯片 Mac 安装 (ARM 精简版)!

最新推出的 Tiny11 是一款极端精简版 Windows 11 系统镜像,针对苹果 M 芯片 Mac 用户(ARM 架构)提供良好支持。Tiny11 内置了众多优化特性,如更小的安装体积和更快的启动速度,特别适合有特殊需求或老机型的用户。用户可…

华为HarmonyOS地图服务 7- 在地图上绘制标记

场景介绍 本章节将向您介绍如何在地图的指定位置添加标记以标识位置、商家、建筑等。 点标记用来在地图上标记任何位置,例如用户位置、车辆位置、店铺位置等一切带有位置属性的事物。Map Kit提供的点标记功能(又称 Marker)封装了大量的触发事件,例如点击事件、长按事件、…

基于YOLO算法的网球运动实时分析-击球速度测量-击球次数(附源码)

这个项目通过分析视频中的网球运动员来测量他们的速度、击球速度以及击球次数。该项目使用YOLO(You Only Look Once)算法来检测球员和网球,并利用卷积神经网络(CNNs)来提取球场的关键点。此实战项目非常适合提升您的机…

VsCode C语言 SDL包配置 2024.9

写这篇文章的起因是,最近我需要使用 SDL 包,我懒得下载V-studio ,所以直接在VsCode 里配置C环境。我搞了好几个小时,啥都弄好了,但是一直被下面几个问题缠绕导致demo启动不了,现在我记录一下这奇葩的解决过程。所有路径…

Qt Debugging帮助文档

Qt中给断点添加条件: 示例1: 当i10时,程序中断 但不知道为什么,46行的条件没有生效,47行的条件生效了 给断点添加忽略次数: 在程序停止之前忽略该断点200次。 Breakpoints (Debugging with GDB)

Apache Doris 实践

Apache Doris 实践 官方使用指南:https://doris.incubator.apache.org/zh-CN/docs/install/source-install/compilation-with-docker/ 手动安装 下载二进制安装包https://apache-doris-releases.oss-accelerate.aliyuncs.com/apache-doris-2.1.5-bin-x64.tar.gz …

华润电力最新校招社招润择认知能力测评:逻辑推理数字计算语言理解高分攻略

​ 尊敬的求职者们, 在您准备加入华润电力这个大家庭之前,了解其招聘测评的详细流程和要求是至关重要的。以下是我们为您整理的测评系统核心内容,希望对您的求职之旅有所帮助。 测评系统概览 华润电力的招聘测评系统旨在全面评估求职者的认…

机器学习04-逻辑回归(python)-02原理与损失函数

1. 逻辑回归概念 逻辑回归(Logistic Regression) 是一种 分类模型,主要用于解决 二分类问题(即分成两类,如是否通过、是否患病等)。逻辑回归的目标是根据输入的特征预测一个 概率,这个概率值介于…

计算机毕业设计hadoop+spark+hive新能源汽车销售数据分析系统 二手车销量分析 新能源汽车推荐系统 可视化大屏 汽车爬虫 机器学习

《HadoopSparkHive新能源汽车销售数据分析系统》开题报告 一、选题背景与意义 1.1 选题背景 随着全球对环境保护意识的增强和能源结构的转型,新能源汽车市场迅速崛起。新能源汽车的销售数据不仅反映了市场趋势和消费者偏好,还为企业决策、政府监管和政…

微服务——网关登录校验(一)

1.网关登录校验 微服务中的网关登录校验是微服务架构中常见的一种安全机制,用于在请求到达微服务之前,对用户的身份进行验证,确保只有合法的用户才能访问相应的服务。 在微服务架构中,每个微服务都是独立部署的,它们之…

(C++17) optional 的 3 种用法

文章目录 *️⃣前言*️⃣3 种主流用法1️⃣函数返回值2️⃣函数参数3️⃣类成员 ⭐END🌟跋🌟交流方式 *️⃣前言 在 C17 中标准化了 std::optional。该类型可以容纳一种类型,且判断是否有无。 若使用的标准在低于 C17 则可以使用 Abseil 的…

浅谈递推法

递推法 递推法是一种数学方法,用于通过利用已知的初始条件和递推关系来计算要求中的每一项。以数列来举例,在递推法中,它的思想很简单:我们首先知道数列的第一项(初始条件),然后通过一个规律&a…

GEE 数据集:人类造成的热带潮湿森林退化程度的估计

目录 简介 摘要 代码 结论 数据和代码 引用 网址推荐 0代码在线构建地图应用 机器学习 人类造成的热带潮湿森林退化程度超出了先前的估计 简介 选择性采伐、火灾和边缘效应造成的热带森林退化是碳和生物多样性损失的主要驱动因素1,2,3,其年增长率可与森林砍伐相媲美…

Golang | Leetcode Golang题解之第424题替换后的最长重复字符

题目: 题解: func characterReplacement(s string, k int) int {cnt : [26]int{}maxCnt, left : 0, 0for right, ch : range s {cnt[ch-A]maxCnt max(maxCnt, cnt[ch-A])if right-left1-maxCnt > k {cnt[s[left]-A]--left}}return len(s) - left }f…

【算法题】63. 不同路径 II-力扣(LeetCode)-”如果起点有障碍物,那么便到不了终点“

【算法题】63. 不同路径 II-力扣(LeetCode)-”如果起点有障碍物,那么便到不了终点“ 1.题目 下方是力扣官方题目的地址 63. 不同路径 II 一个机器人位于一个 m x n 网格的左上角 (起始点在下图中标记为 “Start” )。 机器人每次只能向下…

【全网最全】2024年华为杯研赛A题成品论文获取入口(后续会更新)

您的点赞收藏是我继续更新的最大动力! 一定要点击如下的卡片,那是获取资料的入口! 点击链接加入【2024华为杯研赛资料汇总】:https://qm.qq.com/q/hMgWngXvcQhttps://qm.qq.com/q/hMgWngXvcQ你是否在寻找数学建模比赛的突破点&am…

BUUCTF逆向wp [WUSTCTF2020]Cr0ssfun

第一步 查壳,本题是64位,无壳。 第二步 查看主函数,点开看主函数,没什么东西。 左边表里面看到好几个i开头的函数(红色方框里面),点开看后每个函数的最后末尾(图中红色椭圆圈那里&a…

(笔记自用)位运算总结+LeetCode例题:颠倒二进制位+位1的个数

一.位运算总结: 在解题之前理解一下为什么需要位运算?它的本质是什么? 力扣上不少位运算相关的题,并且很多题也会用到位运算的技巧。这又是为什么? 位运算的由来 在计算机里面,任何数据最终都是用数字来表示的&…