嵌入式C++、InfluxDB、Spark、MQTT协议、和Dash:树莓派集群物联网数据中心设计与实现(代码示例)

1. 项目概述

随着物联网技术的快速发展,如何高效地收集、存储和分析海量IoT设备数据成为一个重要课题。本文介绍了一个基于树莓派集群搭建的小型物联网数据中心,实现了从数据采集到分析可视化的完整流程。

该系统采用轻量级组件,适合资源受限的边缘计算环境。主要功能包括:

  • 通过MQTT协议采集传感器数据
  • 使用Kafka进行数据传输
  • InfluxDB存储时序数据
  • Spark进行数据处理
  • Grafana可视化展示
  • Flask提供Web API接口

2. 系统设计

2.1 硬件架构

  • 3个树莓派4B作为工作节点
  • 1个树莓派4B作为主节点
  • 1个外接硬盘用于数据存储

2.2 软件架构

  • 数据采集:Mosquitto MQTT Broker
  • 数据传输:Apache Kafka
  • 数据存储:InfluxDB
  • 数据处理:Apache Spark
  • 数据分析:Jupyter Notebook, Pandas
  • 可视化:Grafana
  • 应用层:Flask

3. 代码实现

3.1 数据采集层

在数据采集层,我们使用MQTT协议来收集传感器数据。MQTT是一种轻量级的发布/订阅消息传输协议,非常适合物联网应用。

import paho.mqtt.client as mqtt
import json
import loggingclass IoTDataCollector:def __init__(self, broker_address, broker_port=1883):self.client = mqtt.Client()self.client.on_connect = self._on_connectself.client.on_message = self._on_messageself.broker_address = broker_addressself.broker_port = broker_portself.logger = logging.getLogger(__name__)def _on_connect(self, client, userdata, flags, rc):"""当客户端连接到MQTT代理时调用此函数。它订阅了所有传感器主题。"""self.logger.info(f"Connected with result code {rc}")client.subscribe("sensors/#")def _on_message(self, client, userdata, msg):"""当收到消息时调用此函数。它解析JSON消息并处理数据。"""try:payload = json.loads(msg.payload.decode())self.logger.info(f"Received data: {payload} on topic {msg.topic}")# 这里可以添加数据处理逻辑,例如将数据传递给Kafkaexcept json.JSONDecodeError:self.logger.error(f"Failed to parse message: {msg.payload}")def start(self):"""启动MQTT客户端并开始监听消息。"""self.client.connect(self.broker_address, self.broker_port, 60)self.client.loop_forever()# 使用示例
if __name__ == "__main__":logging.basicConfig(level=logging.INFO)collector = IoTDataCollector("localhost")collector.start()

这段代码展示了如何创建一个MQTT客户端来接收IoT设备数据。它包含了错误处理和日志记录,这在实际应用中非常重要。

3.2 数据传输层

在数据传输层,我们使用Apache Kafka来处理高吞吐量的实时数据流。Kafka提供了可靠的消息队列服务,支持数据持久化和多订阅者模式。

from kafka import KafkaProducer
import json
import logging
from retry import retryclass KafkaDataProducer:def __init__(self, bootstrap_servers):self.producer = KafkaProducer(bootstrap_servers=bootstrap_servers,value_serializer=lambda v: json.dumps(v).encode('utf-8'))self.logger = logging.getLogger(__name__)@retry(exceptions=Exception, tries=3, delay=1, backoff=2)def send_data(self, topic, data):"""发送数据到指定的Kafka主题。包含重试机制以提高可靠性。"""future = self.producer.send(topic, data)try:record_metadata = future.get(timeout=10)self.logger.info(f"Sent data to Kafka: topic={record_metadata.topic}, "f"partition={record_metadata.partition}, "f"offset={record_metadata.offset}")except Exception as e:self.logger.error(f"Error sending data to Kafka: {e}")def flush(self):"""刷新并等待所有未完成的消息请求完成。在关闭生产者之前调用此方法很重要。"""self.producer.flush()def close(self):"""关闭Kafka生产者。这会确保所有未完成的消息请求在关闭之前完成。"""self.producer.close()# 使用示例
if __name__ == "__main__":logging.basicConfig(level=logging.INFO)producer = KafkaDataProducer(['localhost:9092'])sensor_data = {'sensor_id': 1, 'temperature': 25.5, 'humidity': 60, 'timestamp': '2023-05-20T10:00:00Z'}try:producer.send_data('sensor_data', sensor_data)finally:producer.flush()producer.close()

这个Kafka生产者类(KafkaDataProducer)提供了以下关键功能:

  1. 可靠的消息发送:使用send_data方法发送数据到Kafka,包含了异常处理和日志记录。

  2. 重试机制:通过@retry装饰器实现了自动重试,提高了系统的容错能力。

  3. 异步操作:Kafka生产者的发送操作是异步的,使用future.get()等待发送结果。

  4. 资源管理:提供了flushclose方法,确保在关闭生产者之前所有消息都被正确处理。

使用Kafka作为数据传输层有以下优势:

  • 高吞吐量:Kafka能够处理大量的实时数据流。
  • 可靠性:支持数据复制和持久化,确保数据不会丢失。
  • 可扩展性:可以轻松扩展以处理增加的数据量。
  • 灵活性:支持多个生产者和消费者,适合复杂的数据流处理场景。

在物联网数据中心中,Kafka可以作为数据采集层和数据存储层之间的缓冲,解耦系统组件,提高整体系统的可靠性和扩展性。

3.3 数据存储层

对于数据存储层,我们选择使用InfluxDB,这是一个专门为时间序列数据优化的数据库,非常适合存储IoT传感器数据。以下是InfluxDB写入器的实现:

from influxdb import InfluxDBClient
from datetime import datetime
import loggingclass InfluxDBWriter:def __init__(self, host, port, database):self.client = InfluxDBClient(host=host, port=port)self.database = databaseself.logger = logging.getLogger(__name__)self._create_database()def _create_database(self):"""如果数据库不存在,则创建数据库。"""if self.database not in self.client.get_list_database():self.client.create_database(self.database)self.client.switch_database(self.database)def write_data(self, measurement, tags, fields):"""将数据点写入InfluxDB。:param measurement: 测量的名称(类似于表名):param tags: 标签数据(用于索引):param fields: 字段数据(实际的度量值)"""json_body = [{"measurement": measurement,"tags": tags,"time": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'),"fields": fields}]try:self.client.write_points(json_body)self.logger.info(f"Data written to InfluxDB: {json_body}")except Exception as e:self.logger.error(f"Error writing to InfluxDB: {e}")def query_data(self, query):"""从InfluxDB查询数据。:param query: InfluxQL查询字符串:return: 查询结果"""try:result = self.client.query(query)return list(result.get_points())except Exception as e:self.logger.error(f"Error querying InfluxDB: {e}")return Nonedef close(self):"""关闭InfluxDB客户端连接。"""self.client.close()# 使用示例
if __name__ == "__main__":logging.basicConfig(level=logging.INFO)influx_writer = InfluxDBWriter('localhost', 8086, 'iot_data')try:# 写入数据measurement = "temperature"tags = {"sensor_id": "1", "location": "room1"}fields = {"value": 25.5}influx_writer.write_data(measurement, tags, fields)# 查询数据query = 'SELECT * FROM temperature WHERE time > now() - 1h'result = influx_writer.query_data(query)print(f"Query result: {result}")finally:influx_writer.close()

这个InfluxDB写入器类(InfluxDBWriter)提供了以下主要功能:

  1. 数据库初始化:在构造函数中,它会检查指定的数据库是否存在,如果不存在则创建。

  2. 数据写入write_data方法用于将数据点写入InfluxDB。它接受测量名称、标签和字段作为参数,并自动添加时间戳。

  3. 数据查询query_data方法允许执行InfluxQL查询,返回查询结果。

  4. 错误处理:所有的数据库操作都包含了异常处理和日志记录,提高了代码的健壮性。

  5. 资源管理:提供了close方法来正确关闭数据库连接。

使用InfluxDB作为时间序列数据存储有以下优势:

  • 高性能:InfluxDB针对时间序列数据进行了优化,能够高效地处理大量的写入和查询操作。
  • 灵活的数据模型:支持标签和字段的概念,允许灵活地组织和查询数据。
  • 强大的查询语言:InfluxQL提供了丰富的查询功能,包括聚合、降采样等。
  • 数据保留策略:可以设置数据的自动过期和删除策略,方便管理长期数据。

3.4 数据处理层

在数据处理层,我们使用Apache Spark进行大规模数据处理。Spark是一个强大的分布式计算引擎,适合处理大量的IoT数据。

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *class SparkDataProcessor:def __init__(self, app_name="IoTDataProcessor"):self.spark = SparkSession.builder \.appName(app_name) \.getOrCreate()def process_temperature_data(self, input_path, output_path):"""处理温度数据:计算每个传感器的平均温度"""schema = StructType([StructField("sensor_id", StringType(), True),StructField("timestamp", TimestampType(), True),StructField("temperature", FloatType(), True)])df = self.spark.read.json(input_path, schema=schema)result = df.groupBy("sensor_id") \.agg(avg("temperature").alias("avg_temperature"))result.write.csv(output_path, header=True, mode="overwrite")def stop(self):"""停止SparkSession"""self.spark.stop()# 使用示例
if __name__ == "__main__":processor = SparkDataProcessor()try:processor.process_temperature_data("input_data/*.json", "output_data/avg_temperatures")finally:processor.stop()

这个Spark处理器展示了如何使用PySpark处理IoT数据。它读取JSON格式的温度数据,计算每个传感器的平均温度,并将结果保存为CSV文件。

3.5 数据分析层

在数据分析层,我们使用Python的数据分析库Pandas和机器学习库Scikit-learn来进行数据分析和预测。

import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
import joblibclass DataAnalyzer:def __init__(self, data_path):self.data = pd.read_csv(data_path)self.model = Nonedef preprocess_data(self):"""数据预处理:处理缺失值,转换日期等"""self.data['timestamp'] = pd.to_datetime(self.data['timestamp'])self.data['hour'] = self.data['timestamp'].dt.hourself.data = self.data.dropna()def train_model(self):"""训练一个简单的线性回归模型来预测温度"""X = self.data[['hour', 'humidity']]y = self.data['temperature']X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)self.model = LinearRegression()self.model.fit(X_train, y_train)score = self.model.score(X_test, y_test)print(f"Model R2 score: {score}")def save_model(self, path):"""保存训练好的模型"""joblib.dump(self.model, path)# 使用示例
if __name__ == "__main__":analyzer = DataAnalyzer("sensor_data.csv")analyzer.preprocess_data()analyzer.train_model()analyzer.save_model("temperature_prediction_model.joblib")

3.6 可视化层

在可视化层,我们使用Plotly库来创建交互式的数据可视化。这里我们创建一个简单的仪表板来展示传感器数据。

import plotly.graph_objs as go
import plotly.express as px
import pandas as pd
from dash import Dash, dcc, html
from dash.dependencies import Input, Outputclass IoTDashboard:def __init__(self, data_path):self.df = pd.read_csv(data_path)self.app = Dash(__name__)self.setup_layout()def setup_layout(self):self.app.layout = html.Div([html.H1("IoT Sensor Dashboard"),dcc.Graph(id='temperature-graph'),dcc.Graph(id='humidity-graph'),dcc.Interval(id='interval-component',interval=5*1000,  # in millisecondsn_intervals=0)])@self.app.callback(Output('temperature-graph', 'figure'),Input('interval-component', 'n_intervals'))def update_temperature_graph(n):fig = px.line(self.df, x='timestamp', y='temperature', color='sensor_id',title='Temperature Over Time')return fig@self.app.callback(Output('humidity-graph', 'figure'),Input('interval-component', 'n_intervals'))def update_humidity_graph(n):fig = px.scatter(self.df, x='temperature', y='humidity', color='sensor_id',title='Temperature vs Humidity')return figdef run(self):self.app.run_server(debug=True)# 使用示例
if __name__ == '__main__':dashboard = IoTDashboard("sensor_data.csv")dashboard.run()

这个仪表板使用Dash创建了一个web应用,展示了温度随时间的变化以及温度与湿度的关系。

4. 项目总结

本项目展示了如何构建一个基于树莓派集群的物联网数据中心。我们涵盖了从数据采集到数据分析和可视化的整个流程:

  1. 使用MQTT协议采集传感器数据
  2. 通过Kafka进行数据传输
  3. 用InfluxDB存储时序数据
  4. 利用Spark进行大规模数据处理
  5. 使用Pandas和Scikit-learn进行数据分析和预测
  6. 最后通过Plotly和Dash创建交互式仪表板

这个系统具有以下优点:

  • 可扩展性:可以轻松添加更多的传感器和处理节点
  • 实时性:能够实时处理和展示数据
  • 灵活性:各个组件可以独立升级和替换
  • 分析能力:支持复杂的数据处理和机器学习任务

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1487928.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

分类常用的评价指标-二分类/多分类

二分类常用的性能度量指标 精确率、召回率、F1、TPR、FPR、AUC、PR曲线、ROC曲线、混淆矩阵 「精确率」查准率 PrecisionTP/(TPFP) 「召回率」查全率RecallTP/(TPFN) 「真正例率」即为正例被判断为正例的概率TPRTP/(TPFN) 「假正例率」即为反例被判断为正例的概率FPRFP/(TNFP)…

Java代码基础算法练习-数值求和-2024.07.25

任务描述: 现有一串字符(长度不超过255个字符),需对其中的数值字符求和(需转换成整型进行计算)。 解决思路: 输入字符串,循环对每个字符否为数字,转换整型并求和 转换整型有以下的方式 1. su…

当 Nginx 出现请求的重复提交,如何处理?

🍅关注博主🎗️ 带你畅游技术世界,不错过每一次成长机会! 文章目录 当 Nginx 出现请求的重复提交,如何处理?一、理解请求重复提交的来龙去脉二、请求重复提交可能带来的麻烦三、解决方案之“一夫当关”——…

文件包涵条件竞争(ctfshow82)

Web82 利用 session.upload_progress 包含文件漏洞 <!DOCTYPE html> <html> <body> <form action"https://09558c1b-9569-4abd-bf78-86c4a6cb6608.challenge.ctf.show//" method"POST" enctype"multipart/form-data"> …

【YashanDB知识库】yasdb jdbc驱动集成BeetISQL中间件,业务(java)报autoAssignKey failure异常

问题现象 BeetISQL中间件版本&#xff1a;2.13.8.RELEASE 客户在调用BeetISQL提供的api向yashandb的表中执行batch insert并将返回sequence设置到传入的java bean时&#xff0c;报如下异常&#xff1a; 问题的风险及影响 影响业务流程正常执行&#xff0c;无法获得batch ins…

【BUG】已解决:IndexError: positional indexers are out-of-bounds

IndexError: positional indexers are out-of-bounds 目录 IndexError: positional indexers are out-of-bounds 【常见模块错误】 【解决方案】 原因分析 解决方法 示例代码 欢迎来到英杰社区https://bbs.csdn.net/topics/617804998 欢迎来到我的主页&#xff0c;我是博…

HarmonyOS入门-状态管理

View(UI)&#xff1a;UI渲染&#xff0c;指将build方法内的UI描述和Builder装饰的方法内的UI描述映射到界面。 State&#xff1a;状态&#xff0c;指驱动UI更新的数据。用户通过触发组件的事件方法&#xff0c;改变状态数据。状态数据的改变&#xff0c;引起UI的重新渲染。 装…

2024权益商城系统网站源码

2024权益商城系统源码&#xff0c;支持多种支付方式&#xff0c;后台商品管理&#xff0c;订单管理&#xff0c;串货管理&#xff0c;分站管理&#xff0c;会员列表&#xff0c;分销日志&#xff0c;应用配置。 上传到服务器&#xff0c;修改数据库信息&#xff0c;导入数据库…

四、GD32 MCU 常见外设介绍 (7) 7.I2C 模块介绍

7.1.I2C 基础知识 I2C(Inter-Integrated Circuit)总线是一种由Philips公司开发的两线式串行总线&#xff0c;用于内部IC控制的具有多端控制能力的双线双向串行数据总线系统&#xff0c;能够用于替代标准的并行总线&#xff0c;连接各种集成 电路和功能模块。I2C器件能够减少电…

deepin深度操作系统安装教程(完整安装步骤·详细图文教程)

官方下载教程 一、概述 如果您首次使用deepin ISO镜像文件来安装deepin系统&#xff0c;无论您之前是否有安装过Windows电脑系统或者Debian、Ubuntu等其他Linux发行版桌面操作系统&#xff0c;我们都建议您先阅读本文档再安装。安装时&#xff0c;您可以选择只安装deepin系统…

Angular由一个bug说起之八:实践中遇到的一个数据颗粒度的问题

互联网产品离不开数据处理&#xff0c;数据处理有一些基本的原则包括&#xff1a;准确性、‌完整性、‌一致性、‌保密性、‌及时性。‌ 准确性&#xff1a;是数据处理的首要目标&#xff0c;‌确保数据的真实性和可靠性。‌准确的数据是进行分析和决策的基础&#xff0c;‌因此…

思维(交互题),CF 1990E2 - Catch the Mole(Hard Version)

一、题目 1、题目描述 2、输入输出 2.1输入 2.2输出 3、原题链接 E2 - Catch the Mole(Hard Version) 二、解题报告 1、思路分析 考虑每次误判都会让鼹鼠上升一层&#xff0c;相应的&#xff0c;最外层的一层结点都没用了 由于数据范围为5000&#xff0c;我们随便找个叶子…

OSPF概述

OSPF OSPF属于内部网关路由协议【IGP】 用于单一自治系统【Autonomous System-AS】内决策路由 自治系统【AS】 执行统一路由策略的一组网络设备的组合 OSPF概述 为了适应大型的网络&#xff0c;OSPF在AS内划分多个区域 每个OSPF路由器只维护所在区域的完整的链路状态信息 …

微服务实战系列之玩转Docker(五)

前言 在我们日常的工作生活中&#xff0c;经常听到的一句话&#xff1a;“是骡子是马拉出来遛遛”。目的是看一个人/物是不是名副其实。我们在使用docker时&#xff0c;也要看看它究竟是如何RUN起来的。当面试官问你的时候&#xff0c;可以如是回答&#xff0c;保你“一文通关…

prometheus tsdb索引布局及查询流程

prometheus 磁盘布局 采集到的数据每两个小时形成一个block。每个block由一个目录组成&#xff0c;并存放在data路径下。该目录包含一个包含该时间窗口的所有时间序列样本的块子目录、一个元数据文件和一个索引文件&#xff08;将metric_name和label索引到目录下的时间序列&am…

导航不是GPS吗,有人用北斗吗?

在现代生活中&#xff0c;提到导航&#xff0c;人们脑海中最先浮现的往往是GPS。然而&#xff0c;近年来&#xff0c;中国自主研发的北斗导航系统&#xff08;BeiDou Navigation Satellite System, BDS&#xff09;正在迅速崛起&#xff0c;逐步占据全球导航市场的一席之地&…

SQL-REGEX-常见正则表达式的使用

SQL-REGEX-常见正则表达式的使用 在SQL中&#xff0c;正则表达式&#xff08;Regex&#xff09;的使用可以帮助进行更灵活和精确的模式匹配和数据筛选。不同的数据库管理系统对于正则表达式的支持略有差异&#xff0c;但大体都是相似的。 Tips&#xff1a; 模式描述匹配内容…

洗地机哪个牌子好?推荐四款口碑最好的洗地机

在追求高效、便捷的现代居家环境中&#xff0c;洗地机已然跃升为家庭清洁的新风尚。面对市场上琳琅满目的洗地机产品&#xff0c;洗地机哪个牌子好&#xff1f;如何筛选出那些既拥有卓越清洁能力&#xff0c;又兼备智能化操作及高用户满意度的佼佼者&#xff0c;成为了消费者关…

计算机视觉与图像分类:技术原理、应用与发展前景

引言 随着科技的不断进步&#xff0c;计算机视觉逐渐成为了人工智能领域的重要分支之一。计算机视觉旨在让计算机具备“看懂”图像和视频的能力&#xff0c;从而理解和分析视觉信息。作为计算机视觉中的一个关键任务&#xff0c;图像分类涉及将输入的图像归类到预定义的类别中&…

基于Delaunay三角网的边缘检测

1、背景介绍 Delaunay三角网是一种在平面上对一组点构造三角网格的方法&#xff0c;其中任何点都不在由其周围点形成的任何三角形的外接圆内部。这种方法确保了三角形尽可能接近等边三角形&#xff0c;从而避免了狭长的三角形。如下图所示&#xff0c;为利用平面上点集构建生成…