- UDF:一对一的函数【User Defined Functions】
- substr、split、concat、instr、length、from_unixtime
- UDAF:多对一的函数【User Defined Aggregation Functions】 聚合函数
- count、sum、max、min、avg、collect_set/list
- UDTF:一对多的函数【User Defined Tabular Functions】
- explode、json_tuple【解析JSON格式】、parse_url_tuple【解析URL函数】
Spark中支持UDF和UDAF两种,支持直接使用Hive中的UDF、UDAF、UDTF.
pyspark中自定义函数的三种写法:
使用最常用的regeister方式自定义函数
最常用的方式,这种方式编写的函数,既能用于SQL中,也能用于DSL中
语法:
UDF变量名 = spark.udf.register(UDF函数名, 函数的处理逻辑)
定义:spark.udf.register()
UDF变量名:DSL中调用UDF使用的
UDF函数名:SQL中调用UDF使用
案例:
查看以下数据
id name msg
01 周杰伦 150/175
02 周杰 130/185
03 周华健 148/178
04 周星驰 130/175
05 闫妮 110/180将以上数据,通过自定义函数,变为如下数据:
01 周杰伦 150斤/175cm
02 周杰 130斤/185cm
03 周华健 148斤/178cm
第一步 :自定义函数
# 编写一个普通的函数,用于写逻辑
def get_data(str1):
list1 = str1.split("/")
return list1[0] + "斤/" + list1[1] + "cm"
第二步:注册函数
# 定义一个UDF:变量名-dsl = spark.udf.register(函数名-sql, 处理逻辑, 返回值)
# get_new_info 用于 sql 中
# get_info 用于DSL
get_info = spark.udf.register(name="get_new_info", f=lambda oldinfo: get_data(oldinfo))
第三步:使用函数
#使用sql的方式调用
spark.sql("select id,name,get_new_info(msg) from star").show()# 使用dsl的方式调用
# DSL:用变量名
import pyspark.sql.functions as Fnew_df.select(F.col("id"), F.col("name"), F.col("msg"), get_info(F.col("msg")).alias("newinfo")).show()
代码演示 以及解释
import osfrom pyspark.sql import SparkSessionimport pyspark.sql.functions as F
if __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = 'D:/java/jdk'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/hadoop3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'spark = SparkSession.builder.master("local[2]").appName("").config("spark.sql.shuffle.partitions", 2).getOrCreate()# 第一种方案# format("csv") 是读取文件格式,后面是文件路径 toDF("id","name","msg")是把文件里面的数据源变成指定的字段df = spark.read.format("csv").option("sep", "\t").load("../../datas/function/udf.txt").toDF("id", "name","msg") df.createOrReplaceTempView("t") #给数据源起个名字# 编写sqlspark.sql("""select id,name,concat(split(msg,"/")[0],'斤/',split(msg,"/")[1],'cm')msg from t """).show()# 第二种方案 自定义函数#第一步 定义函数def my_function(msg):return msg.split("/")[0] + "斤/" + msg.split("/")[1] + "cm"# 第二步注册函数my_function2 = spark.udf.register("my_function",my_function)# 第三步调用函数spark.sql("""select id,name,my_function(msg) msg from t""").show()# 自定义函数DSL使用 registerdf.select(F.col("id"),F.col("name"),my_function2(F.col("msg"))).show()spark.stop()