3.DataFrame的相关API
操作DataFrame一般有二种操作方案:一种为【DSL方式】,另一种为【SQL方式】
SQL方式: 通过编写SQL语句完成统计分析操作DSL方式: 特定领域语言,使用DataFrame特有的API完成计算操作,也就是代码形式从使用角度来说: SQL可能更加的方便一些,当适应了DSL写法后,你会发现DSL要比SQL更好用从Spark角度来说: 更推荐使用DSL方案,此种方案更加利于Spark底层的优化处理
3.1 SQL相关的API
-
创建一个视图/表
df.createTempView('视图名称'): 创建一个临时的视图(表名)df.createOrReplaceTempView('视图名称'): 创建一个临时的视图(表名),如果视图存在,直接替换注意: 临时视图仅能在当前这个Spark Session的会话中使用df.createGlobalTempView('视图名称'): 创建一个全局视图,运行在一个Spark应用中多个spark会话中都可以使用。在使用的时候必须通过 global_temp.视图名称 方式才可以加载到。较少使用
-
执行SQL语句
spark.sql('书写SQL')
3.2 DSL相关的API
官网链接: Spark SQL — PySpark 3.1.2 documentation
-
select():类似于SQL中select, SQL中select后面可以写什么, 这样同样也一样
-
distinct(): 去重后返回一个新的DataFrame
-
withColumn(参数1,参数2):用来产生新列。参数1是新列的名称;参数2是新列数据的来源
-
withColumnRenamed(参数1,参数2):给字段重命名操作。参数1是旧字段名,参数2是新字段名
-
alias(): 返回设置了别名的新DataFrame
-
agg():执行聚合操作。如果有多个聚合,聚合之间使用逗号分隔即可,比较通用
-
where()和filter():用于对数据进行过滤操作, 一般在spark SQL中主要使用where
-
groupBy():使用指定的列对DataFrame进行分组,方便后期对它们进行聚合
-
orderBy():返回按指定列排序的新DataFrame
-
limit() : 返回指定数目的结果集
-
show():用于展示DF中数据, 默认仅展示前20行
-
参数1:设置默认展示多少行 默认为20
-
参数2:是否为阶段列, 默认仅展示前20个字符数据, 如果过长, 不展示(一般不设置)
-
-
printSchema():用于打印当前这个DF的表结构信息
DSL主要支持以下几种传递的方式: str | Column对象 | 列表str格式: '字段'Column对象: DataFrame含有的字段 df['字段']执行过程新产生: F.col('字段')列表: ['字段1','字段2'...][df['字段1'],df['字段2']]
为了能够支持在编写Spark SQL的DSL时候,在DSL中使用SQL函数,专门提供一个SQL的函数库。直接加载使用
链接: Spark SQL, Built-in Functions
导入这个函数库: import pyspark.sql.functions as F通过F调用对应的函数即可,常见函数如下:F.explode()F.split()F.count()F.sum()F.avg()F.max()F.min()...
4.Spark SQL词频统计
准备一个words.txt的文件,words.txt文件的内容如下:
hadoop hive hadoop sqoop hivesqoop hadoop zookeeper hive huehue sqoop hue zookeeper hivespark oozie spark hadoop ooziehive oozie spark hadoop
需求分析:
1- 扫描文件将每行内容切分得到单个的单词
2- 组织DataFrame的数据结构,分别利用SQL风格和DSL风格完成每个单词个数统计
3- 要求最后结果有两列:一列是单词,一列是次数
代码实现:
# 导包
import os
from pyspark.sql import SparkSession,functions as F
# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
# 1.创建spark对象
# appName:应用程序名称 master:提交模式
# getOrCreate:在builder构建器中获取一个存在的SparkSession,如果不存在,则创建一个新的
spark = SparkSession.builder.appName('sparksql_demo').master('local[*]').getOrCreate()
# 2.通过read读取外部文件方式创建DF对象
df = spark.read\
.format('text')\
.schema('words string')\
.load('file:///export/data/spark_project/spark_sql/data/data3.txt')
print(type(df))
# 需求: 从data3.txt读取所有单词,然后统计每个单词出现的次数
# 3.SQL风格
# 方式1: 使用子查询方式
# 先创建临时视图,然后通过sql语句查询展示
df.createTempView('words_tb')
qdf = spark.sql(
"select words,count(1) as cnt from (select explode(split(words,' ')) as words from words_tb) t group by words"
)
print(type(qdf))
qdf.show()
# # 方式2: 使用侧视图
# qdf = spark.sql(
# "select t.words,count(1) as cnt from words_tb lateral view explode(split(words,' ')) t as words group by t.words"
# )
print(type(qdf))
qdf.show()
# 4.DSL风格
# 方式1: 分组后直接用count()统计
df.select(
F.explode(F.split('words', ' ')).alias('words')
).groupBy('words').count().show()
# 方式1升级版:通过withColumnRenamed修改字段名
df.select(
F.explode(F.split('words', ' ')).alias('words')
).groupBy('words').count().withColumnRenamed('count','cnt').show()
# 方式2: 分组后用agg函数
df.select(
F.explode(F.split('words', ' ')).alias('words')
).groupBy('words').agg(
F.count('words').alias('cnt')
).show()
# 方式3: 直接使用withColum
df.withColumn(
'words',
F.explode(F.split('words', ' '))
).groupBy('words').agg(
F.count('words').alias('cnt')
).show()
# 5.释放资源
spark.stop()