目录
一、关于自定义函数
1、自定义函数分为:
2、pyspark中自定义函数的三种写法:
二、 regeister方式自定义函数(SQL和DSL中使用)
三、udf注册方式定义UDF函数(DSL中使用)
一、关于自定义函数
1、自定义函数分为:
- UDF:一对一的函数【User Defined Functions】
- substr、split、concat、instr、length、from_unixtime
- UDAF:多对一的函数【User Defined Aggregation Functions】 聚合函数
- count、sum、max、min、avg、collect_set/list
- UDTF:一对多的函数【User Defined Tabular Functions】
- explode、json_tuple【解析JSON格式】、parse_url_tuple【解析URL函数】
Spark中支持UDF和UDAF两种,本文主要介绍Pyspark中的UDF定义的方式!
2、pyspark中自定义函数的三种写法:
二、 regeister方式自定义函数(SQL和DSL中使用)
语法:UDF变量名 = spark.udf.register(UDF函数名, 函数的处理逻辑
UDF变量名:DSL中调用UDF使用的
UDF函数名:SQL中调用UDF使用
举例数据:
id name msg
01 周杰伦 150/175
02 周杰 130/185
03 周华健 148/178
04 周星驰 130/175
05 闫妮 110/180将以上数据,通过自定义函数,变为如下数据:
01 周杰伦 150斤/175cm
02 周杰 130斤/185cm
03 周华健 148斤/178cm
SQL中使用:
#SQL中使用# 1、编写一个普通的函数,用于写逻辑
def myfunc(msg):list1=msg.split("/")return list1[0]+"斤/"+list1[1]+"cm"
# 2、注册函数
myfunc2=spark.udf.register('myfunc1', myfunc)
# 3、使用函数
spark.sql("select id,name,myfunc1(msg) msg from a").show()
DSL中使用:
# DSL中使用# 1、编写一个普通的函数,用于写逻辑
def myfunc(msg):list1=msg.split("/")return list1[0]+"斤/"+list1[1]+"cm"
# 2、注册函数
myfunc2=spark.udf.register('myfunc1', myfunc)
# 3、使用函数
df.select(F.col("id"),F.col("name"),myfunc2(F.col("msg"))).show()
完整python代码:
import osfrom pyspark.sql import SparkSession
import pyspark.sql.functions as F"""
------------------------------------------Description : TODO:SourceFile : _13-自定义函数Author : songDate : 2024/11/6
-------------------------------------------
"""
if __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_201'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/B/05-Hadoop/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 获取sparkSession对象spark = SparkSession.builder.master("local[2]").appName("").config("spark.sql.shuffle.partitions", 2).getOrCreate()df=spark.read.format('csv').option('sep','\t').option('header','true').load('../../datas/a.txt')df.createOrReplaceTempView('a')# 第一种方案spark.sql("select id,name,concat(split(msg,'/')[0],'斤/',split(msg,'/')[1],'cm') msg from a").show()# 第二种方案def myfunc(msg):list1=msg.split("/")return list1[0]+"斤/"+list1[1]+"cm"myfunc2=spark.udf.register('myfunc1', myfunc)spark.sql("select id,name,myfunc1(msg) msg from a").show()# 第三种方案df.select(F.col("id"),F.col("name"),myfunc2(F.col("msg"))).show()spark.stop()
三、udf注册方式定义UDF函数(DSL中使用)
语法:UDF变量名 = F.udf(函数的处理逻辑, 返回值类型)
不常用,只能用于DSL开发中
# 编写一个普通的函数,用于写逻辑
def myfunc(msg):list1=msg.split("/")return list1[0]+"斤/"+list1[1]+"cm"
# 注册函数
myfunc2=F.udf(myfunc,StringType())
# 使用函数
df.select(F.col("id"),F.col("name"),myfunc2(F.col("msg"))).show()
完整python代码:
import osfrom pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StringType"""
------------------------------------------Description : TODO:SourceFile : _13-自定义函数Author : songDate : 2024/11/6
-------------------------------------------
"""
if __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_201'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/B/05-Hadoop/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 获取sparkSession对象spark = SparkSession.builder.master("local[2]").appName("").config("spark.sql.shuffle.partitions", 2).getOrCreate()df=spark.read.format('csv').option('sep','\t').option('header','true').load('../../datas/a.txt')df.createOrReplaceTempView('a')# spark.sql("select * from a").show()def myfunc(msg):list1=msg.split("/")return list1[0]+"斤/"+list1[1]+"cm"myfunc2=F.udf(myfunc,StringType())df.select(F.col("id"),F.col("name"),myfunc2(F.col("msg"))).show()spark.stop()
注意:最常用的还是 regeister方式自定义函数 因为该方式可以在SQL中使用!!!