基于YOLOv8的RTSP视频流实时目标检测与告警系统设计与实现(超详细)

前言

在训练模型完成后,想把模型应用起来,比如模型可以部署到项目中,实时接收RTSP视频流进行识别检测,一旦达到自己所设置的置信度阈值(例如大于0.5),系统就会实时把报警信息发送给服务端,由服务端进行实时响应和策略执行,很像是个企业级应用流程 。

比如:我们在yolov8平台训练完“火焰”模型,想要实时检测并把识别结果发送给服务端进行报警,一旦出现火焰,立即发送服务端进行报警。如下图所示:

 智能识别系统(python端)

首先,获取RTSP视屏流,可以是单个视频流也可以是视频流数组,所有常量都可以通过配置文件的形式进行读取。在yolov8网络模型代码基础上进行调用模型,然后对实时的RTSP协议视屏流进行实时识别,一旦发现有异常,比如出现火焰,并且超过置信度阈值(0.5),就立马发送报警信息到服务端,以下是python代码,主要实现:

2.1.模型调用

   # 加载YOLOv8模型model = YOLO('model.pt')

2.2.RTSP协议视频流实时读取

def process_video_stream(rtsp_url):"""从RTSP流中读取视频帧并处理。:param rtsp_url: RTSP流的URL:return: None"""cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)if not cap.isOpened():logging.error(f"无法打开RTSP视频流:{rtsp_url}")return

2.3.异常检测

def start_detection(rtsp_url):"""启动视频流检测线程。:param rtsp_url: RTSP流的URL"""detection_thread = threading.Thread(target=process_video_stream, args=(rtsp_url,))detection_thread.start()

2.4.实时发送报警信息

def send_alarm(alarm_data):"""发送报警信息到服务器。:param alarm_data: 包含标签名称、数量、置信度和图像信息的字典:return: None"""for endpoint in SERVER_ENDPOINTS:try:response = requests.post(endpoint, json=alarm_data)if response.status_code == 200:logging.info(f"报警信息成功发送到 {endpoint}")else:logging.error(f"报警信息发送到 {endpoint} 失败,状态码:{response.status_code}")except Exception as e:logging.error(f"发送报警信息到 {endpoint} 时发生错误:{e}")

2.5.发送报警信息格式

 # 构造报警信息class_names = [model.names[int(cls)] for cls in filtered_classes]alarm_data = {"labels": class_names,"counts": len(filtered_boxes),"confidences": filtered_confidences.tolist(),"image": image_to_base64(frame),"timestamp": timestamp  # 添加时间戳字段}# 记录日志logging.info(f"发送报警信息:标签={class_names}, 时间={timestamp}, 类别={filtered_classes}, 置信度={filtered_confidences.tolist()}")# 发送报警信息send_alarm(alarm_data)

完整代码参考如下: 

import cv2
import torch
import threading
import requests
import time
from datetime import datetime
import json
import logging
import os
from queue import Queue
import base64
from concurrent.futures import ThreadPoolExecutor
from ultralytics import YOLO  # 导入YOLO类# 读取配置文件
def load_config(config_file='config.json'):"""从配置文件中加载配置信息。:param config_file: 配置文件路径:return: 配置信息字典"""try:with open(config_file, 'r', encoding='utf-8') as f:config = json.load(f)return configexcept Exception as e:logging.critical(f"无法加载配置文件:{e}")raise# 将图像转换为Base64编码的字符串
def image_to_base64(image):"""将图像转换为Base64编码的字符串。:param image: 图像数组:return: Base64编码的字符串"""try:_, buffer = cv2.imencode('.jpg', image)return base64.b64encode(buffer).decode('utf-8')except Exception as e:logging.error(f"图像转换失败:{e}")return None# 发送报警信息到服务器
def send_alarm(alarm_data, server_endpoints):"""发送报警信息到服务器。:param alarm_data: 包含标签名称、数量、置信度和图像信息的字典:param server_endpoints: 服务器端点列表:return: None"""headers = {'Content-Type': 'application/json'}try:for endpoint in server_endpoints:response = requests.post(endpoint, json=alarm_data, headers=headers,timeout=10)if response.status_code == 200:logging.info(f"报警信息成功发送到 {endpoint}")else:logging.error(f"报警信息发送到 {endpoint} 失败,状态码:{response.status_code}")except requests.exceptions.RequestException as e:logging.error(f"发送报警信息到 {endpoint} 时发生网络错误:{e}")except Exception as e:logging.error(f"发送报警信息到 {endpoint} 时发生错误:{e}")
# 从RTSP流中读取视频帧并处理
def process_video_stream(rtsp_url, model, config, alarm_queue):"""从RTSP流中读取视频帧并处理。:param rtsp_url: RTSP流的URL:param model: YOLO模型:param config: 配置信息字典:param alarm_queue: 报警队列:return: None"""try:cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)if not cap.isOpened():logging.error(f"无法打开RTSP视频流:{rtsp_url}")return# 初始化重试计数器和重试时间retry_count = 0retry_start_time = time.time()  # 记录开始重试的时间# 初始化 FPS 计算器fps_start_time = time.time()fps_num_frames = 0fps_window = []while True:# 检查是否超过了重试时间if time.time() - retry_start_time > config['total_retry_duration'] * 60:logging.error(f"重试时间超过 {config['total_retry_duration']} 分钟,停止处理。")breaktry:# 读取视频帧ret, frame = cap.read()if not ret:logging.warning(f"无法获取帧数据:{rtsp_url}")# 释放资源并重新打开cap.release()cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)if not cap.isOpened():# 重试连接for _ in range(config['max_retries_per_minute']):if cap.isOpened():breaklogging.error(f"尝试重新打开RTSP视频流:{rtsp_url}")cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)retry_count += 1time.sleep(config['retry_interval'])  # 每次重试间隔时间else:logging.error(f"无法重新打开RTSP视频流:{rtsp_url}")time.sleep(60 - (time.time() - fps_start_time) % 60)  # 等待到下一分钟continuecontinue# 使用YOLOv8进行检测results = model(frame, verbose=False)# 获取检测结果result = results[0]boxes = result.boxes.to(model.device)  # 将结果移动到同一设备confidences = boxes.conf.cpu().numpy()classes = boxes.cls.cpu().numpy().astype(int)# 筛选置信度大于阈值的结果high_conf_indices = confidences > config['confidence_threshold']filtered_boxes = boxes.xyxy[high_conf_indices].cpu().numpy()filtered_classes = classes[high_conf_indices]filtered_confidences = confidences[high_conf_indices]if len(filtered_boxes) > 0:# 当前时间戳current_datetime = datetime.now().strftime('%Y-%m-%d %H:%M:%S')  # 格式化日期时间# 构造报警信息class_names = [model.names[int(cls)] for cls in filtered_classes]  # 类名列表alarm_data = {"labels": class_names,"counts": len(filtered_boxes),"confidences": filtered_confidences.tolist(),"image": image_to_base64(frame),"timestamp": current_datetime  # 添加格式化的日期时间字段}# 记录日志logging.info(f"发送报警信息:标签={class_names}, 时间={current_datetime}, 类别={filtered_classes}, 置信度={filtered_confidences.tolist()}")# 将报警信息放入队列alarm_queue.put(alarm_data)# 计算 FPSfps_num_frames += 1if (time.time() - fps_start_time) > config['fps_interval']:fps = fps_num_frames / (time.time() - fps_start_time)fps_window.append(fps)if len(fps_window) > config['fps_window_size']:fps_window.pop(0)avg_fps = sum(fps_window) / len(fps_window)logging.info(f"当前 FPS:{avg_fps:.2f}")fps_start_time = time.time()fps_num_frames = 0except cv2.error as e:logging.error(f"OpenCV错误:{e}")retry_count += 1continueexcept requests.exceptions.RequestException as e:logging.error(f"网络请求错误:{e}")retry_count += 1continueexcept torch.cuda.OutOfMemoryError as e:logging.error(f"CUDA内存不足:{e}")retry_count += 1continueexcept Exception as e:logging.error(f"处理视频流时发生错误:{e}")retry_count += 1continuecap.release()except Exception as e:logging.error(f"处理视频流时发生全局异常:{e}")# 从队列中读取报警信息并发送到服务器
def send_alarms(alarm_queue, server_endpoints):"""从队列中读取报警信息并发送到服务器。:param alarm_queue: 报警队列:param server_endpoints: 服务器端点列表:return: None"""try:while True:alarm_data = alarm_queue.get()send_alarm(alarm_data, server_endpoints)alarm_queue.task_done()except Exception as e:logging.error(f"发送报警信息时发生异常:{e}")# 启动多个视频流的检测
def start_detection(rtsp_urls, config):"""启动多个视频流的检测。:param rtsp_urls: RTSP流的URL列表:param config: 配置信息字典:return: None"""# 创建报警队列alarm_queue = Queue(maxsize=config['max_workers'])# 启动报警发送线程executor = ThreadPoolExecutor(max_workers=config['max_workers'])for _ in range(config['max_workers']):executor.submit(send_alarms, alarm_queue, config['server_endpoints'])# 加载YOLOv8模型,并优先使用GPU,如果没有GPU则使用CPUdevice = 'cuda' if torch.cuda.is_available() else 'cpu'logging.info(f"使用设备:{device}")model = YOLO(config['model_path']).to(device)# 启动多个检测线程for rtsp_url in rtsp_urls:detection_thread = threading.Thread(target=process_video_stream, args=(rtsp_url, model, config, alarm_queue))detection_thread.start()if __name__ == '__main__':# 加载配置文件config = load_config()# 设置日志级别logging.basicConfig(level=config['log_level'], format='%(asctime)s - %(levelname)s - %(message)s')# 直接在主程序中启动多个视频流的检测start_detection(config['rtsp_urls'], config)

配置文件代码:

{"rtsp_urls": ["rtsp://admin:XXXXXX@xx.xx.xx.xx:554/Streaming/Channels/101"],"server_endpoints": ["http://localhost:8088/alarm"],"model_path": "yolov8n.pt","device": "cuda","retry_interval": 6,"max_retries_per_minute": 10,"total_retry_duration": 30,"log_level": "INFO","confidence_threshold": 0.5,"fps_interval": 1,"fps_window_size": 10,"max_workers": 5
}

服务端接收报警信息(Java端)

服务端是Java编写,主要是接收python端发送的报警信息,报警字段主要有:标签名称、置信度大小、图片、数量和时间等字段,python发送时会根据设置的server_endpoints地址进行发送。例如:http://localhost:8088/alarm。

Java服务端代码:

bean类:

package com.wei.demo1.demo;import java.util.Arrays;
import java.util.Date;
import java.util.List;/*** @BelongsProject: demo1* @BelongsPackage: com.wei.demo1.demo* @ClassName AlarmInfo* @Author: weiq* @CreateTime: 2024-09-13  14:01* @Description: TODO* @Version: 1.0*/
public class AlarmInfo {private String[] labels;private Integer  counts;private String[] confidences;private String image;private String timestamp;public String[] getLabels() {return labels;}public void setLabels(String[] labels) {this.labels = labels;}public Integer getCounts() {return counts;}public void setCounts(Integer counts) {this.counts = counts;}public String[] getConfidences() {return confidences;}public void setConfidences(String[] confidences) {this.confidences = confidences;}..........................

处理类:

package com.wei.demo1.demo;
import com.alibaba.fastjson.JSON;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.lang.reflect.Type;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;public class AlarmReceiver {public static void main(String[] args) throws Exception {HttpServer server = HttpServer.create(new InetSocketAddress(8088), 0);server.createContext("/alarm", new AlarmHandler());server.setExecutor(null); // creates a default executorserver.start();System.out.println("Server started and listening on port 8088");}static class AlarmHandler implements HttpHandler {@Overridepublic void handle(HttpExchange exchange) throws IOException {if (!"POST".equals(exchange.getRequestMethod())) {exchange.sendResponseHeaders(405, -1); // Method Not Allowedreturn;}StringBuilder sb = new StringBuilder();try (BufferedReader br = new BufferedReader(new InputStreamReader(exchange.getRequestBody(), StandardCharsets.UTF_8))) {String line;while ((line = br.readLine()) != null) {sb.append(line);}} catch (IOException e) {System.err.println("Error reading request body: " + e.getMessage());exchange.sendResponseHeaders(500, -1); // Internal Server Errorreturn;}String body = sb.toString();
//            System.out.println("Received POST data: " + body);// 使用FastJSON解析JSON字符串AlarmInfo alarmInfo = null;try {alarmInfo = JSON.parseObject(body, AlarmInfo.class);} catch (Exception e) {System.err.println("Failed to parse JSON object: " + e.getMessage());exchange.sendResponseHeaders(400, 0); // Bad Requestreturn;}System.out.println("Received POST data: " + alarmInfo);// 设置响应头和状态码exchange.getResponseHeaders().add("Content-Type", "application/json");exchange.sendResponseHeaders(200, body.length());// 写入响应体try (OutputStream responseBody = exchange.getResponseBody()) {responseBody.write(body.getBytes(StandardCharsets.UTF_8));}}}
}

测试效果

python端发送报警信息

Java服务端接收报警信息

部署

代码测试通过后,我们就可以进行项目完整打包部署工作了,Python项目一般可以可以通过Pyinstaller组件进行打包部署。

步骤一:安装必要的库

安装Pyinstaller库,命令如下:

pip install Pyinstaller -i https://pypi.tuna.tsinghua.edu.cn/simple

步骤二:创建spec文件

创建一个.spec文件,比如命名为ods.spec,并在其中添加如下内容来指定如何打包您的应用程序。这里的例子假设您的主Python脚本名为ods.py。 

# -*- mode: python ; coding: utf-8 -*-from PyInstaller.utils.hooks import collect_data_files
import ultralyticsblock_cipher = Nonea = Analysis(['ods.py'],pathex=[],binaries=[],datas=collect_data_files('ultralytics'),  # 收集Ultralytics库中的数据文件hiddenimports=[],hookspath=[],hooksconfig={},runtime_hooks=[],excludes=[],win_no_prefer_redirects=False,win_private_assemblies=False,cipher=block_cipher,noarchive=False,
)pyz = PYZ(a.pure, a.zipped_data,cipher=block_cipher)exe = EXE(pyz,a.scripts,a.binaries,a.zipfiles,a.datas,name='ods',debug=False,strip=False,upx=True,runtime_tmpdir=None,icon=['ods.jpg'],console=True )

请确保替换ods.py为您自己的主脚本名,并根据需要调整其他参数。 

步骤三:运行spec文件打包

 运行下面的命令来使用您创建的spec文件来打包应用程序:

pyinstaller main.spec

打包完成后,会自动生成dist文件,里面有一个exe可执行文件,双击打开运行即可。如果出现缺少配置文件的错误,可手动补充添加进去即可。如下图所示:

步骤四:打包运行测试

 双击exe文件,测试运行,Java服务端会实时接收报警的信息,如下图所示:

完成!

后续更新策略执行的代码,敬请期待!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/145552.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

让银河麒麟桌面操作系统V10(SP1)允许ping测

银河麒麟桌面操作系统V10(SP1)使用的防火墙是kylin-firewall,默认情况下“公用网络”是禁ping的,如下图: 可以在“安全中心”->“网络保护”->“配置访问规则”->“专网规则”->找到“icmp”这条规则&…

减少代码错误的方法

最重要的是在写之前多举几个刁钻的例子来理解问题和代码的正确性 如果你给不出反例就说明你还没有理解(有的反例会后来会被证明是错的) 由于递归是把自己的和别人的相关的混合在一起来了,所以举反例的时候要从不同的角度出发。 求割点的&a…

二、电源滤波器

电源滤波器 1、电源滤波的过程分析! 波形形成过程: 2、计算: 滤波电容的容量和耐压值选择。 学习心得

【算法】最长公共子序列(C/C++)

最长公共子序列(LCS,Longest Common Subsequence)问题简称(LCS),是动态规划里面里面的基础算法。它的所解决的问题是,在两个序列中找到一个序列,使得它既是第一个序列的子序列&#…

【计算机网络 - 基础问题】每日 3 题(十三)

✍个人博客:Pandaconda-CSDN博客 📣专栏地址:http://t.csdnimg.cn/fYaBd 📚专栏简介:在这个专栏中,我将会分享 C 面试中常见的面试题给大家~ ❤️如果有收获的话,欢迎点赞👍收藏&…

阿里史上最大规模开源发布,超GPT-4o 、Llama-3.1!

今天凌晨,阿里巴巴官宣了史上最大规模的开源发布,推出了基础模型Qwen2.5、专用于编码Qwen2.5-Coder和数学的Qwen2.5-Math。 这三大类模型一共有10多个版本,包括0.5B、1.5B、3B、7B、14B、32B和72B,适用于个人、企业以及移动端、P…

数字化转型的策略与执行路径

企业在明确数字化转型的目标并评估自身数字化能力之后,必须前瞻性地识别出实现这些目标所需的关键数字化能力。基于这些能力,企业应制定出一套数字化转型战略,确立短期和中长期的转型目标,确保数字技术投资带来价值,而…

vulhub搭建漏洞环境docker-compose up -d命令执行报错以及解决方法汇总

在利用vulhub靶场搭建环境进行漏洞复现时,我们通常要使用这一步命令: docker-compose up -d 但是经常报错,今天我们来说几个常见的报错以及解决方法: 1.报错提示: ERROR: Couldnt connect to Docker daemon at httpdoc…

MySQL_图形管理工具简介、下载及安装(超详细)

课 程 推 荐我 的 个 人 主 页:👉👉 失心疯的个人主页 👈👈入 门 教 程 推 荐 :👉👉 Python零基础入门教程合集 👈👈虚 拟 环 境 搭 建 :&#x1…

如何确保Java程序分发后不被篡改?使用JNI对Java程序进行安全校验

前言 众所周知,Java/Kotlin编译后会编译成smali,使用Jadx这类的反编译工具或者Hook工具就能很轻松的把我们的软件安全校验给破解了。 为了防止这种情况发生,我们一般会将核心代码使用C编写,然后使用JNI技术,使用Java…

对接全国点餐api接口有哪些具体步骤

与第三方餐饮服务提供商进行接口对接可以按照以下步骤进行: 一、前期准备 1.明确需求: 确定你的业务目标和对接口的具体需求。例如,你是希望通过接口获取餐厅信息、菜品列表、价格、库存情况,还是实现订单提交、支付处理、配送…

WAN广域网技术--PPP和PPPoE

广域网基础概述 广域网(Wide Area Network,WAN)是一种覆盖广泛地区的计算机网络,它连接不同地理位置的计算机、服务器和设备。广域网通常用于连接不同城市、州或国家之间的网络,它通过互联网服务提供商(ISP…

中泰免签,准备去泰国旅游了吗?《泰语翻译通》app支持文本翻译和语音识别翻译,解放双手对着说话就能翻译。

泰国是很多中国游客的热门选择,现在去泰国旅游更方便了,因为泰国对中国免签了。如果你打算去泰国,那么下载一个好用的泰语翻译软件是很有必要的。 简单好用的翻译工具 《泰语翻译通》App就是为泰国旅游设计的,它翻译准确&#x…

pg198-jesd204-phy阅读笔记

简介 介绍 JESD204 PHY IP核实现了JESD204的物理接口,简化在发送和接收核心之间共享串行收发器信息通道。此内核一般不单独使用,只能与JESD204或JESD204C内核结合使用(目前不太懂这句话,因为我只看到与TX、RX IP核结合使用&#…

声网SDK脚本运行错误

文章目录 运行步骤无法运行.bat电脑出现警告--更改执行策略若无出现-更新power shell搜索最新版本的 PowerShell安装新版本 仍无法解决-手动下载第三方库 2024-9-9运行步骤 无法运行.bat 电脑出现警告–更改执行策略 若无出现-更新power shell 搜索最新版本的 PowerShell 在…

Java面试篇基础部分-Java线程生命周期

线程的生命周期分别为 新建(New)、就绪(Runnable)、运行(Running)、阻塞(Blocked)和死亡(Dead)这五种状态。   在系统运行过程中有线程不断地被创建,而旧的线程在执行完毕之后被清理,线程通过排队的方式获取共享资源或者锁的时候被阻塞,所以运行中的线程就会在…

让医院更智慧,让决策更容易

依托数字孪生技术,赋能智慧医院,对使用者和决策者带来了众多的优势。数字孪生技术是将物理实体与数字模型相结合,实现实时监测、仿真预测和智能决策的一种先进技术。在智慧医院中应用数字孪生技术,不仅可以提升医疗服务的质量和效…

气势如神助!未来三年最好的投资:找适合自己的稳定的路——早读(逆天打工人爬取热门微信文章解读)

势如破竹!!!冲 引言Python 代码第一篇 洞见 未来三年最好的投资:好好上班第二篇 趋势结尾 偷换概念,贝多芬失聪是后来的事了,从小失聪还能当音乐家怕不是觉醒松果体了 但体会这个意思,玩味一下即…

开放式耳机和入耳式耳机哪个好?2024优质开放式蓝牙耳机推荐

首先,开放式耳机与传统的入耳式耳机相比,最大的特点在于其的舒适性和对听力的保护。因为无需入耳,所以能够有效地减少长时间佩戴导致的耳朵疲劳,同时也避免了直接对耳膜的压迫。但是当然也会有问题出现,开放式耳机别人…

pprof简单使用

1. 什么是 pprof? pprof 是 Go 语言内置的性能分析工具。它能够帮助开发者收集 CPU、内存、goroutine 等资源的使用情况,生成性能报告并提供可视化功能。pprof 提供了全面的性能分析能力,是排查性能瓶颈、优化代码的利器。 2. pprof 使用场…