Spark SQL大数据分析快速上手-DataFrame应用体验

【图书介绍】《Spark SQL大数据分析快速上手》-CSDN博客

《Spark SQL大数据分析快速上手》【摘要 书评 试读】- 京东图书

大数据与数据分析_夏天又到了的博客-CSDN博客

本节主要介绍如何使用DataFrame进行编程。

4.1.1  SparkSession

在旧版本中,Spark SQL提供两种SQL查询起始点:一个叫作SQLContext,用于Spark自己提供的SQL查询;一个叫作HiveContext,用于连接Hive的查询。

SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合。因此,在SQLContext和HiveContext上可用的API,在SparkSession上同样可以使用。

SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。

当我们使用Spark Shell的时候,Spark会自动创建一个叫作spark的SparkSession,就像以前可以自动获取一个sc来表示SparkContext一样,如图4-1所示。

图4-1  自动创建SparkSession

4.1.2  DataFrame应用

Spark SQL的DataFrame API允许我们使用DataFrame而不必去注册临时表或者生成SQL表达式。DataFrame API既有转换操作,也有行动操作;DataSet API则提供了更加函数式的API。

1. 创建DataFrame

有了SparkSession之后,可以通过以下3种方式来创建DataFrame:

  • 通过Spark的数据源来创建。
  • 通过已知的RDD来创建。
  • 通过查询一个Hive表来创建。

Spark支持的数据源如图4-2所示。

图4-2  Spark支持的数据源

通过Spark数据源创建DataFrame的代码如下:

// 读取 JSON 文件
scala> val df = spark.read.json("/opt/module/spark-local/examples/src/main/ resources/employees.json")
df: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]// 展示结果
scala> df.show
+-------+------+
|   name|salary|
+-------+------+
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
+-------+------+

其中,employees.json文件内容如下:

{"name":"Michael", "salary":3000}
{"name":"Andy", "salary":4500}
{"name":"Justin", "salary":3500}
{"name":"Berta", "salary":4000}
2. DataFrame语法风格

1)SQL语法风格

SQL语法风格是指我们查询数据的时候可以使用SQL语句。这种SQL语句风格的查询必须有临时视图或者全局视图来辅助。

创建视图的数据来源于people.json,其内容如下:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

创建临时视图的代码如下:

scala> val df = spark.read.json("/opt/module/spark-local/examples/ src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]scala> df.createOrReplaceTempView("people")scala> spark.sql("select * from people").show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

创建全局视图的代码如下:

scala> val df = spark.read.json("/opt/module/spark-local/examples/src/main/ resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]scala> df.createGlobalTempView("people")scala> spark.sql("select * from global_temp.people")
res31: org.apache.spark.sql.DataFrame = [age: bigint, name: string]scala> res31.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

2)DSL语法风格

DataFrame提供一个特定领域语言(domain-specific language,DSL)去管理结构化的数据。可以在Scala、Java、Python和R中使用DSL。使用DSL语法风格就不必创建临时视图了。

(1)查看schema信息,示例代码如下:

scala> val df = spark.read.json("/opt/module/spark-local/examples/src/main/ resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]scala> df.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)

(2)使用DSL查询,示例代码如下:

只查询name列数据:

scala> df.select($"name").show
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+scala> df.select("name").show
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|

查询name和age列数据:

scala> df.select("name", "age").show
+-------+----+
|   name| age|
+-------+----+
|Michael|null|
|   Andy|  30|
| Justin|  19|
+-------+----+

查询name和age + 1的数据:

scala> df.select($"name", $"age" + 1).show
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+

查询age大于20的数据:

scala> df.filter($"age" > 21).show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

按照age分组,查看数据条数:

scala> df.groupBy("age").count.show
+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+
3. RDD和DataFrame的交互

1)从RDD到DataFrame

涉及RDD、DataFrame、DataSet之间的操作时,需要进行导入,即import spark.implicits._。这里的spark不是包名,而是表示SparkSession的那个对象,所以必须先创建SparkSession对象再导入;implicits是一个内部对象。

首先创建一个RDD:

scala> val rdd1 = sc.textFile("/opt/module/spark-local/examples/src/main/resources/people.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /opt/module/spark-local/examples/src/main/resources/people.txt MapPartitionsRDD[10] at textFile at <console>:24

然后进行转换,转换有3种方法:手动转换、通过样例类反射转换和通过API的方式转换。

(1)手动转换。

示例代码如下:

scala> val rdd2 = rdd1.map(line => { val paras = line.split(", "); (paras(0), paras(1).toInt)})
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[11] at map at <console>:26// 转换为DataFrame的时候手动指定每个数据字段名
scala> rdd2.toDF("name", "age").show
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

(2)通过样例类反射转换。

首先创建样例类:

scala> case class People(name :String, age: Int)
defined class People

然后使用样例把 RDD 转换成DataFrame:

scala> val rdd2 = rdd1.map(line => { val paras = line.split(", "); People(paras(0), paras(1).toInt) })
rdd2: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[6] at map at <console>:28scala> rdd2.toDF.show
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

(3)通过API的方式转换。

通过API方式转换不能在spark命令行下进行,需要编写完整的Scala程序代码,示例代码  如下:

代码4-1  DataFrameDemo.scala

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object DataFrameDemo {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local[*]").appName("Word Count").getOrCreate()val sc: SparkContext = spark.sparkContextval rdd: RDD[(String, Int)] = sc.parallelize(Array(("lisi", 10), ("zs", 20), ("zhiling", 40)))// 映射出来一个 RDD[Row], 因为 DataFrame其实就是 DataSet[Row]val rowRdd: RDD[Row] = rdd.map(x => Row(x._1, x._2))// 创建 StructType 类型val types = StructType(Array(StructField("name", StringType), StructField("age", IntegerType)))val df: DataFrame = spark.createDataFrame(rowRdd, types)df.show}
}

2)从DataFrame到RDD

直接调用DataFrame的rdd方法就能完成转换。示例代码如下:

scala> val df = spark.read.json("/opt/module/spark-local/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]scala> val rdd = df.rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at rdd at <console>:25scala> rdd.collect
res0: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/4729.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

磁盘的分区

硬盘管理 硬盘的概念 硬盘是一种计算机的存储设备&#xff0c;通常是由一个或多个磁性盘片组成。硬盘既可以安装在计算机的内部&#xff0c;也可以外接计算机 硬盘主要是用来保存数据的 数据包括&#xff1a;操作系统&#xff0c;应用程序&#xff0c;文档多媒体文件等等 计算…

IEEE TRO综述论文:抓取合成领域的深度学习方法

TRANSACTIONS ON ROBOTICS综述论文&#xff1a;抓取合成领域的深度学习方法抓取是机器人在现实世界中操纵物体的基本技能之一&#xff0c;涉及在一组接触点上施加力和扭矩来控制物体的运动&#xff0c;而近些年深度学习方法的突破性研究使机器人在抓取方面取得了快速进展。近期…

旋转位置编码

1. Transformer为什么需要位置编码 因为 transformer 结构本身是和位置编码无关的&#xff1a; Y T ( X ) F ( A ( X ) ) Y\Tau(X)F(A(X)) YT(X)F(A(X))&#xff0c;其中 A ( ) A() A() 是 attention 变换&#xff0c;只进行了矩阵变换&#xff0c;跟位置无关&#xff0c; …

ssm+vue683基于VUE.js的在线教育系统设计与实现

博主介绍&#xff1a;专注于Java&#xff08;springboot ssm 等开发框架&#xff09; vue .net php phython node.js uniapp 微信小程序 等诸多技术领域和毕业项目实战、企业信息化系统建设&#xff0c;从业十五余年开发设计教学工作 ☆☆☆ 精彩专栏推荐订阅☆☆☆☆☆不…

【Wi-Fi】WiFi IEEE 802.11ad(60 GHz Wi-Fi)知识整理

参考链接 【技术规范】详解IEEE 802.11ad&#xff08;60GHz Wi-Fi&#xff09;技术 - 天线设计 - RF技术社区 IEEE 802.11ad Tutorial | WiGig (60 GHz Technology) basics IEEE 802.11ad&#xff08;60 GHz Wi-Fi&#xff09; IEEE 802.11ad是一种无线网络标准&#xff0c…

苹果MacOS最常用快捷键(一)

1、利用find命令查找文件 可参考链接&#xff1a;find使用_mac find命令-CSDN博客文章浏览阅读3.2k次。find 使用_mac find命令https://blog.csdn.net/poinsettia/article/details/129187641 举例&#xff1a; 2、虚拟机系统将Ctrl设置为苹果的Command键 实际上就是将Ctrl键和…

壁纸鸭 1.1 |提供许多优质壁纸,并且支持本地图片像素化

壁纸鸭是一款不错的壁纸软件&#xff0c;提供简单的分类和搜索功能&#xff0c;无需注册登录即可免费使用。壁纸质量较高&#xff0c;支持将本地图片像素化&#xff0c;为用户提供多样化的壁纸选择。 大小&#xff1a;29M 下载地址&#xff1a; 百度网盘&#xff1a;https://…

对于一个需要渲染300帧的动画项目,云渲染要多久

探讨云渲染动画300帧需要多久的问题时&#xff0c;我们今天来从多个角度进行分析&#xff0c;对于一个需要渲染300帧的动画项目&#xff0c;传统的本地渲染方式可能会因为硬件限制而变得耗时且效率低下。幸运的是&#xff0c;【渲染101】云渲染技术的出现为这一问题提供了解决方…

项目活动进度计算题

六个时间参数①最早开始时间ESmax{紧前工作最早完成时间EF}&#xff08;紧前取大&#xff09; 最早完成时间EFES工期&#xff0c;从左→右计算&#xff0c;累加取大 ②最迟完成时间LFmin{紧后工作最迟开始时间LS}&#xff08;紧后取小&#xff09; 最迟开始时间LSLF-工期&am…

如何查看局域网内的浏览记录?总结五种方法,按步操作!一学就会!「管理小白须知」

如何查看局域网内的浏览记录&#xff1f; 你是否也曾为如何有效监控局域网内的浏览记录而苦恼&#xff1f; 监控局域网内电脑的浏览记录是确保员工工作效率、维护网络安全以及规范上网行为的重要手段。 别担心&#xff0c;今天我们就来聊聊这个话题&#xff0c;为你揭秘五种简…

5本地方法接口本地方法栈

什么是本地方法&#xff1f; 简单地讲&#xff0c;一个 Native Method 是一个 Java 调用非 Java 代码的接囗 在定义一个 native method 时&#xff0c;并不提供实现体&#xff08;有些像定义一个 Java interface&#xff09;&#xff0c;因为其实现体是由非 java 语言在外面实…

飞书 富文本(Markdown)

飞书机器人webhook支持Markdown格式&#xff0c;包括表格 表格 |Syntax | Description |\n|-------- | -------- |\n|Header | Title |\n|Paragraph | Text |参考 富文本&#xff08;Markdown&#xff09;

Django Admin

Django Admin模块是Django框架提供的一个功能强大且易于使用的后台管理工具&#xff0c;它允许开发者通过Web界面来管理网站的后台数据和功能。 主要功能和特点 自动生成管理界面&#xff1a;Django Admin模块可以根据模型类&#xff08;Model&#xff09;自动创建表单和列表视…

金华迪加现场大屏互动系统 mobile.do.php 任意文件上传漏洞复现

0x01 产品描述&#xff1a; ‌ 金华迪加现场大屏互动系统‌是由金华迪加网络科技有限公司开发的一款专注于增强活动现场互动性的系统。该系统设计用于提供高质量的现场互动体验&#xff0c;支持各种大型活动&#xff0c;如企业年会、产品发布会、展览展示等。其主要功能包…

中小企业项目管理软件选择指南:最适合你的工具是什么?

选择适合小团队的项目管理工具时&#xff0c;关键是要根据团队规模、工作流程、预算和功能需求来决定。对于小团队&#xff0c;通常需要简洁、易用、低成本的工具&#xff0c;同时能支持任务分配、进度跟踪、文件共享等基本功能。以下是一些适合小团队使用的免费和开源项目管理…

【C++】C++的单例模式、跟踪内存分配的简单方法

二十四、C的单例模式、跟踪内存分配的简单方法 1、C的单例模式 本小标题不是讨论C的语言特性&#xff0c;而是一种设计模式&#xff0c;用于确保一个类在任何情况下都只有一个实例&#xff0c;并提供一个全局访问点来获取这个实例。即C的单例模式。这种模式常用于资源管理&…

VMware的三种网卡模式

VMware的三种网卡模式 1 桥接模式 虚拟机当作一台物理机,直接连接你物理机所连接的路由器 物理机的网段与虚拟机的网段是一致的,并且该网络下的其他主机可以访问你的虚拟机 2 NAT模式 相当于在你的物理机里接了一个路由器,路由器下游接的是虚拟机 物理机的网段与虚拟机的网段是…

办公类提示词(上)——工作计划、工作总结、讲话稿等

什么是提示词&#xff1f; 提示词的英文是Prompt&#xff0c;是你与人工智能&#xff08;AI&#xff09;进行交流的方式。简单来说&#xff0c;提示词就是你给AI的一段文字或问题&#xff0c;AI根据这段文字或问题来生成回应或完成任务。 举个例子&#xff1a;假设你在使用一…

plt中subplot综合实战

目录 背景介绍实战 背景介绍 下面是一份贸易数据&#xff08;Prod_Trade.xlsx&#xff09;&#xff0c;需要多角度针对2012年数据进行报表分析&#xff0c;需使用subplot分格展示。Prod_Trade的数据结构包括 Date,Order_Class,Sales Transport,Trans_Cost, Region ,Category, …

Matlab 基于声学超表面的深亚波长厚度完美吸收体

传统吸声器的结构厚度与工作波长相当&#xff0c;这在低频范围的实际应用中造成了很大的障碍。我们提出了一种基于超表面的完美吸收器&#xff0c;能够在极低频区域实现声波的全吸收。该超表面具有深亚波长厚度&#xff0c;特征尺寸为k223&#xff0c;由穿孔板和卷曲共面气室组…