当前位置: 首页 > news >正文

PySpark中DataFrame应用升阶及UDF使用

目录

  • 1. 加载数据
  • 2. 列常见操作
    • 2.1 添加新列
    • 2.2 重命名列
    • 2.3 删除指定列
    • 2.4 修改数据
  • 3 空值处理
    • 3.1 丢弃空值
    • 3.2 空值填充
  • 4 聚合操作
    • 4.1 分组聚合
  • 5 用户自定义函数(UDF)
    • 5.1 传统UDF函数
    • 5.2 Pandas UDF(向量化UDF)
  • 参考资料

在这里插入图片描述



import findspark
findspark.init() 
from pyspark.sql import SparkSession
# 创建 Spark 会话
spark = SparkSession.builder \.appName("Test PySpark") \.master("local[*]") \.getOrCreate()
sc=spark.sparkContext
sc.master
'local[*]'

1. 加载数据

mes_df = spark.read.csv('spark练习数据.csv',inferSchema=True,header=True)
mes_df.columns
['销售年份','车辆用途','电池形状','动力类型','电池类型','行驶里程(km)','容量保持率(%)','电量(KWh)','质量(kg)','车辆数(辆)']
print(mes_df.printSchema())
mes_df.show(3)
root|-- 销售年份: integer (nullable = true)|-- 车辆用途: string (nullable = true)|-- 电池形状: string (nullable = true)|-- 动力类型: string (nullable = true)|-- 电池类型: string (nullable = true)|-- 行驶里程(km): integer (nullable = true)|-- 容量保持率(%): double (nullable = true)|-- 电量(KWh): double (nullable = true)|-- 质量(kg): double (nullable = true)|-- 车辆数(辆): integer (nullable = true)None
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+
|销售年份|  车辆用途|电池形状|动力类型|电池类型|行驶里程(km)|容量保持率(%)|电量(KWh)|质量(kg)|车辆数(辆)|
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       50000|         92.2|    128.0|  1525.0|         5|
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       85000|         91.6|    128.0|  1525.0|         5|
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       30000|         94.3|    120.4|  1455.0|         5|
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+
only showing top 3 rows

2. 列常见操作

2.1 添加新列

mes_df2 = mes_df.withColumn('功率',mes_df['电量(KWh)']/mes_df['质量(kg)'])
mes_df2.show(3)
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+------------------+
|销售年份|  车辆用途|电池形状|动力类型|电池类型|行驶里程(km)|容量保持率(%)|电量(KWh)|质量(kg)|车辆数(辆)|              功率|
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+------------------+
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       50000|         92.2|    128.0|  1525.0|         5|0.0839344262295082|
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       85000|         91.6|    128.0|  1525.0|         5|0.0839344262295082|
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       30000|         94.3|    120.4|  1455.0|         5|0.0827491408934708|
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+------------------+
only showing top 3 rows

2.2 重命名列

mes_df2 = mes_df2.withColumnsRenamed({'功率':'功率(wh)'})
mes_df2.show(5)
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+-------------------+
|销售年份|  车辆用途|电池形状|动力类型|电池类型|行驶里程(km)|容量保持率(%)|电量(KWh)|质量(kg)|车辆数(辆)|           功率(wh)|
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+-------------------+
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       50000|         92.2|    128.0|  1525.0|         5| 0.0839344262295082|
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       85000|         91.6|    128.0|  1525.0|         5| 0.0839344262295082|
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       30000|         94.3|    120.4|  1455.0|         5| 0.0827491408934708|
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       70000|         87.4|    120.4|  1455.0|         5| 0.0827491408934708|
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       75000|         87.5|     94.8|  1150.0|         4|0.08243478260869565|
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+-------------------+
only showing top 5 rows

2.3 删除指定列

mes_df3 = mes_df2.drop('电池类型','动力类型')
mes_df3.show(5)
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
|销售年份|  车辆用途|电池形状|行驶里程(km)|容量保持率(%)|电量(KWh)|质量(kg)|车辆数(辆)|           功率(wh)|
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
|    2013|私人乘用车|    方形|       50000|         92.2|    128.0|  1525.0|         5| 0.0839344262295082|
|    2013|私人乘用车|    方形|       85000|         91.6|    128.0|  1525.0|         5| 0.0839344262295082|
|    2013|私人乘用车|    方形|       30000|         94.3|    120.4|  1455.0|         5| 0.0827491408934708|
|    2013|私人乘用车|    方形|       70000|         87.4|    120.4|  1455.0|         5| 0.0827491408934708|
|    2013|私人乘用车|    方形|       75000|         87.5|     94.8|  1150.0|         4|0.08243478260869565|
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
only showing top 5 rows

2.4 修改数据

mes_f = mes_df3.replace({'私人乘用车':'私人'})
mes_f.show(3)
+--------+--------+--------+------------+-------------+---------+--------+----------+------------------+
|销售年份|车辆用途|电池形状|行驶里程(km)|容量保持率(%)|电量(KWh)|质量(kg)|车辆数(辆)|          功率(wh)|
+--------+--------+--------+------------+-------------+---------+--------+----------+------------------+
|    2013|    私人|    方形|       50000|         92.2|    128.0|  1525.0|         5|0.0839344262295082|
|    2013|    私人|    方形|       85000|         91.6|    128.0|  1525.0|         5|0.0839344262295082|
|    2013|    私人|    方形|       30000|         94.3|    120.4|  1455.0|         5|0.0827491408934708|
+--------+--------+--------+------------+-------------+---------+--------+----------+------------------+
only showing top 3 rows

3 空值处理

3.1 丢弃空值

mes_df4 = mes_df3.dropna(how = 'all',subset=['车辆用途'])
mes_df4.show(5)
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
|销售年份|  车辆用途|电池形状|行驶里程(km)|容量保持率(%)|电量(KWh)|质量(kg)|车辆数(辆)|           功率(wh)|
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
|    2013|私人乘用车|    方形|       50000|         92.2|    128.0|  1525.0|         5| 0.0839344262295082|
|    2013|私人乘用车|    方形|       85000|         91.6|    128.0|  1525.0|         5| 0.0839344262295082|
|    2013|私人乘用车|    方形|       30000|         94.3|    120.4|  1455.0|         5| 0.0827491408934708|
|    2013|私人乘用车|    方形|       70000|         87.4|    120.4|  1455.0|         5| 0.0827491408934708|
|    2013|私人乘用车|    方形|       75000|         87.5|     94.8|  1150.0|         4|0.08243478260869565|
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
only showing top 5 rows

3.2 空值填充

mes_df5 = mes_df4.fillna({'销售年份':1970,'车辆用途':'私人乘用车'})
mes_df5.show(5)
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
|销售年份|  车辆用途|电池形状|行驶里程(km)|容量保持率(%)|电量(KWh)|质量(kg)|车辆数(辆)|           功率(wh)|
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
|    2013|私人乘用车|    方形|       50000|         92.2|    128.0|  1525.0|         5| 0.0839344262295082|
|    2013|私人乘用车|    方形|       85000|         91.6|    128.0|  1525.0|         5| 0.0839344262295082|
|    2013|私人乘用车|    方形|       30000|         94.3|    120.4|  1455.0|         5| 0.0827491408934708|
|    2013|私人乘用车|    方形|       70000|         87.4|    120.4|  1455.0|         5| 0.0827491408934708|
|    2013|私人乘用车|    方形|       75000|         87.5|     94.8|  1150.0|         4|0.08243478260869565|
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
only showing top 5 rows

4 聚合操作

4.1 分组聚合

g_df = mes_df.groupBy('销售年份').agg({'电量(KWh)':'sum','质量(kg)':'sum'})
# g_df = g_df.withColumnRenamed({'sum(质量(kg))':'总质量','sum(电量(KWh))':'总电量'})
g_df =  g_df.withColumnRenamed('sum(质量(kg))','总质量').withColumnRenamed('sum(电量(KWh))','总电量')
g_df.show(3)
+--------+--------------------+-------------------+
|销售年份|              总质量|             总电量|
+--------+--------------------+-------------------+
|    2018|2.4133957670000014E8|3.040895040000008E7|
|    2015|           2551879.0| 240302.29999999897|
|    2013|             20373.6| 1867.0999999999992|
+--------+--------------------+-------------------+
only showing top 3 rows
g_df.show()
+--------+--------------------+-------------------+
|销售年份|       sum(质量(kg))|     sum(电量(KWh))|
+--------+--------------------+-------------------+
|    2018|2.4133957670000014E8|3.040895040000008E7|
|    2015|           2551879.0| 240302.29999999897|
|    2013|             20373.6| 1867.0999999999992|
|    2014|            220269.0| 20847.199999999968|
|    2019| 2.346486377000002E8|3.259483540000003E7|
|    2016| 4.531763910000002E7|  4402030.500000004|
|    2017|1.7762183689999998E8|1.887332539999987E7|
+--------+--------------------+-------------------+

5 用户自定义函数(UDF)

PySpark 有两种 UDF:传统UDF(非向量化UDF) 和 Pandas UDF(向量化UDF)

传统UDF(非向量化UDF) :通过Python函数逐行处理数据,使用pyspark.sql.functions.udf注册

  • 优点:
  • 适合简单逻辑(如字符串处理、数值转换)
  • 在所有Spark版本(≥1.3)中均可使用
  • 逐行调试方便,便于通过print或日志逐行调试逻辑。
  • 缺点:
  • 性能差,高延迟,尤其在大数据集上可能成为瓶颈。
  • 需手动处理Spark数据类型与Python类型的映射,易因类型不匹配出错。
  • 无法批量处理数据,无法利用现代CPU的SIMD指令加速。

Pandas UDF(向量化UDF):将整个列或分块数据转换为Pandas Series/DataFrame,批量处理。基于Apache Arrow的批量处理模式,使用pyspark.sql.functions.pandas_udf定义。

  • 优点:
  • 高性能,利用Pandas向量化操作,数据通过Arrow以零拷贝方式传输,减少序列化开销。
  • 支持复杂操作,适合处理整列或分组数据(如时间窗口计算、分组聚合)
  • Arrow优化内存布局,减少内存占用。
  • 缺点:
  • 依赖Arrow和Pandas。需要安装PyArrow和Pandas,且版本需与Spark兼容。
  • 调试困难,批量处理逻辑出错时,难以定位具体行的问题。
  • 型处理隐式,需熟悉Pandas与Spark类型的隐式转换规则,类型错误可能更隐晦。

5.1 传统UDF函数

from pyspark.sql.functions import udf # 传统udfdef powerCal(num,num2):return num/num2# udf创建
power_udf = udf(powerCal,DoubleType())
g_df2 = g_df.withColumn('功率',power_udf(g_df['总电量'],g_df['总质量']))
g_df2.show()
+--------+--------------------+-------------------+-------------------+
|销售年份|              总质量|             总电量|               功率|
+--------+--------------------+-------------------+-------------------+
|    2018|2.4133957670000014E8|3.040895040000008E7|0.12600067844570834|
|    2015|           2551879.0| 240302.29999999897|0.09416680806574253|
|    2013|             20373.6| 1867.0999999999992|0.09164310676561822|
|    2014|            220269.0| 20847.199999999968|0.09464427586269501|
|    2019| 2.346486377000002E8|3.259483540000003E7|0.13890911841419995|
|    2016| 4.531763910000002E7|  4402030.500000004|0.09713724252682886|
|    2017|1.7762183689999998E8|1.887332539999987E7|0.10625565937945705|
+--------+--------------------+-------------------+-------------------+

5.2 Pandas UDF(向量化UDF)

from pyspark.sql.functions import pandas_udf # 向量化udfdef powerCal2(num,num2):return num/num2# udf创建
power_pudf = pandas_udf(powerCal,DoubleType())
g_df3 = g_df.withColumn('功率',power_pudf(g_df['总电量'],g_df['总质量']))
g_df3.show()
+--------+--------------------+-------------------+-------------------+
|销售年份|              总质量|             总电量|               功率|
+--------+--------------------+-------------------+-------------------+
|    2018|2.4133957670000014E8|3.040895040000008E7|0.12600067844570834|
|    2015|           2551879.0| 240302.29999999897|0.09416680806574253|
|    2013|             20373.6| 1867.0999999999992|0.09164310676561822|
|    2014|            220269.0| 20847.199999999968|0.09464427586269501|
|    2019| 2.346486377000002E8|3.259483540000003E7|0.13890911841419995|
|    2016| 4.531763910000002E7|  4402030.500000004|0.09713724252682886|
|    2017|1.7762183689999998E8|1.887332539999987E7|0.10625565937945705|
+--------+--------------------+-------------------+-------------------+
spark.stop()

参考资料

《Python+Spark 2.0+Hadoop机器学习与大数据实战》, 林大贵,清华大学出版社,2017-12,9787302490739

http://www.xdnf.cn/news/175213.html

相关文章:

  • Cad求多段线中心点(顶点平均值) C#
  • 利用脚本搭建私有云平台,部署云平台,发布云主机并实现互连和远程连接
  • Arduino 入门学习笔记(五):KEY实验
  • 3G大一下安卓考核题解
  • 多节点同步协同电磁频谱监测任务分配方法简要介绍
  • CDA Edit 的设计
  • 【C到Java的深度跃迁:从指针到对象,从过程到生态】第四模块·Java特性专精 —— 第十五章 泛型:类型系统的元编程革命
  • 编译原理实验 之 Tiny C语言编译程序实验 语法分析
  • 量子力学:量子通信
  • 人工智能时代的网络安全威胁
  • 全自动部署到远程服务器
  • 8.0 西门子PLC的S7通讯解析
  • 欧空局的P 波段雷达卫星即将升空
  • python pyplot 输出支持中文
  • Linux常用命令23——usermod
  • 关于堆栈指针的那些事 | bootloader 如何跳转app
  • react的 Fiber 节点的链表存储
  • 学生公寓限电模块控制柜是如何实现智能限电功能?
  • 【八股消消乐】发送请求有遇到服务不可用吗?如何解决?
  • 项目代码生成工具
  • 【技术追踪】基于扩散模型的脑图像反事实生成与异常检测(TMI-2024)
  • 【计算机视觉】CV实战项目- Four-Flower:基于TensorFlow的花朵分类实战指南
  • HarmonyOS NEXT:多设备的自由流转
  • 前端Vue项目处理跨域请求问题解决方案(后端未加cors),前端调后端
  • 深入探索Python Pandas:解锁数据分析的无限可能
  • go语言八股文(四)
  • WGS84(GPS)、火星坐标系(GCJ02)、百度地图(BD09)坐标系转换Java代码
  • 电池管理系统
  • Linux文件管理(3)
  • SpringMVC 静态资源处理 mvc:default-servlet-handler