Spark SQL DSL

1、 Spark sql   -- 代替hive的(并非完全代替)  

(1) Spark sql 和 hive 区别 :

     两者都是写sql的,区别是计算引擎不一样  

 hive        -- 计算引擎是MapReduce ,是通过MR做计算的

 Spark sql   -- 计算引擎是Saprk Core,是通过Spark Core做计算的

     Spark sql 功能比 hive 强大 :   并非只能写sql

 hive只能在shell行写sql

 spark可以在代码中写sql  

(2) Spark sql结构 :

1、 Data Source API(读数据) :   可以读取 csv(文本文件)、 json、 jdbc 等各种各样的数据做处理

2、 Data Frame API(提供了两个API):

        Dataframe DSL      -- 写代码      (DSL :  类SQL语法,与SQL差不多,但它是代码)

    Spark SQL and HQL  -- 写SQL

(3) DataFrame :   数据框(二维的表结构,类似hive的一张表)

    写SQL的前提 :  有表

DataFrame 是基于 RDD 做了封装, 在上面提供了 列名和列类型 的概念,即表的结构的概念。

          可以基于 DataFrame 去写 SQL 。

2、 写Spark SQL :   

在spark sql中, shuffle之后分区数不是由前面的RDD决定的,而是有默认值, 默认200个。 可以指定参数修改。

    (1) 导入Spark SQL依赖         -- 在Spark项目的pom文件中加入

<!--  Spark sql核心依赖  -->

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.4.5</version>

        </dependency>

(2) 在Spark项目下创建sql包    -- 新的模块一定要新建新的包  

*项目名称一定要小写,多个单词之间用-分割 : s1-v1_1.2   

*包名也要小写,一般是公司域名倒写 : com.shujia.spark

(3) 创建Spark sql环境 :

    val spark: SparkSession = SparkSession

.builder()    // 构建

.appName("wordCount")

.master("local")

// 设置 sparkSQL 在 shuffle 之后 DF 的分区数,默认是200

            .config("spark.sql.shuffle.partitions", 1)

.getOrCreate()   // 当前环境有SparkSession就获取, 反之则创建

(4) 返回值不再是 RDD,  而是 DataFrame (DF)

    查看数据不再是 foreach(),  而是 show()

(5) 针对于sql语句有多行的情况, 可以使用 """ """ 格式书写

val wordCountDF = spark.sql(

"""

|select word,count(1) as c from (

|select explode(split(line,',')) word from lines

|) as d

|group by word

|""".stripMargin)       // stripMargin :  删除"|"  并合并以上sql语句

(6) 创建 DataFrame 的方式:

 1、  读取 csv 格式的数据创建 DF

    val studentDF: DataFrame = spark

    .read

  .format("csv")

  .option("sep", ",")     //列的分割方式

  .schema("id STRING, name STRING, age INT, gender STRING, clazz STRING")  // 指定字段名和字段类型, 必须按照数据顺序指定

  .load("data/students.txt")     //指定读取的路径

 2、  读取 json 格式的数据构建 DF

      (spark 会自动解析json格式)

val studentJsonDF: DataFrame = spark

  .read

  .format("json")

  .load("data/a.json")

         3、  读取 jdbc 数据构建 DF

              (通过网络远程读取 mysql 中的数据,  需要添加mysql依赖)

  

    val jdbcDF: DataFrame = spark.read

  .format("jdbc")

  .option("url", "jdbc:mysql://master:3306")

  .option("dbtable", "bigdata.students")

  .option("user", "root")

  .option("password", "123456")

  .load()

4、  读取 parquet 格式的数据构建 DF

             (parquet格式的数据中自带 列名 和 列类型,

             parquet会对数据进行压缩, 体积变小, 解压和压缩需要时间)

// 保存一个parquet格式的文件

studentDF

  .write

  .format("parquet")

  .mode(SaveMode.Overwrite)

  .save("data/parquet")

// 读取parquet格式的数据

val parquetDF: DataFrame = spark

  .read

  .format("parquet")

  .load("data/parquet")

3、 DSL语法   -- 类sql语法

    // spark sql 中必须要导入隐式转换, 才可以使用 $方法 获取列对象

import spark.implicits._

//导入 DSL 所有的函数

    import org.apache.spark.sql.functions._       

(1) show   :   查看前面20条数据,  相当于action算子

                   action算子  -- 每一个Action算子都会触发一个job

(2) select :   选择字段,  和 sql 中 select 是一样

(3) $ : 是一个方法,作用是通过列名获取列的对象

studentDF.select($"id", $"age" + 2 as "age").show()

(4) where :  过滤数据

    = : 赋值    == : 判断    === : 等于

(5) group by :   分组

(6) agg :   分组之后进行聚合计算

            只能在分组后使用, 即一般跟在group函数后面

studentDF

  .groupBy($"clazz")

  // 分组之后做聚合计算   -- 可以写多个

  .agg(count($"clazz") as "c", avg($"age") as "avgAge")

  .show()

(7) join :   表关联

(8) 开窗函数     --  统计每个班级总分前2的学生   

    withColumn  :   给 DF 增加新的列

joinDF

  // 按照 id 和 班级 分组

  .groupBy($"id", $"clazz")

  // 对分数求和

  .agg(sum($"sco") as "sumSco")

  // 使用开窗函数                  -- row_number() over (partition by clazz order by sumSco desc)

//    .select($"id", $"clazz", $"sumSco", row_number() over Window.partitionBy($"clazz").orderBy($"sumSco".desc) as "r")

  // 在前面 DF 的基础上增加列 ( 上面的简写, 省去写 $"id", $"clazz", $"sumSco" 步骤, 直接将 "r" 加在 "sumSco" 后面  )

  .withColumn("r", row_number() over Window.partitionBy($"clazz").orderBy($"sumSco".desc))

  // 取 班级前2

  .where($"r" <= 2).show()

(9) orderBy :  排序

DSL 语法 与 SQL 的异同 :

1、 DSL 和 SQL 功能相同, 但写法不同, 代码更简洁

    2、 DSL 不需要做 子查询

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/3083.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

vscode | 开发神器vscode快捷键删除和恢复

目录 快捷键不好使了删除快捷键恢复删除的快捷键 在vscode使用的过程中&#xff0c;随着我们自身需求的不断变化&#xff0c;安装的插件将会持续增长&#xff0c;那么随之而来的就会带来一个问题&#xff1a;插件的快捷键重复。快捷键重复导致的问题就是快捷键不好使了&#xf…

C++优选算法九 链表

一、常用技巧 画图&#xff01;直观形象&#xff0c;便于理解。引入虚拟“头”结点。不吝啬空间。快慢双指针&#xff1a; 判环 找链表中环的入口 找链表中倒数第n个结点 二、常用操作 创建一个新结点尾插头插 三、示例题目 1.两数相加. - 力扣&#xff08…

计算机网络:网络层 —— 虚拟专用网 VPN

文章目录 虚拟专用网 VPN 概述内联网 VPN外联网 VPN 虚拟专用网 VPN 概述 虚拟专用网&#xff08;Virtual Private Network&#xff0c;VPN&#xff09;&#xff1a;利用公用的因特网作为本机构各专用网之间的通信载体&#xff0c;这样形成的网络又称为虚拟专用网。 出于安全…

Web 安全基础知识梳理大全,零基础入门到精通,收藏这篇就够了

一、各种linux虚拟机忘记密码 1、红帽忘记密码修改root密码 1 在重启的时候 e 进入 2 在linux16 后面找到UTF-8 在后面加 rd.break 然后ctrlx 3 这时候可以输入mount 看一下 会发现根为 /sysroot/ 没有w权限&#xff0c;只有ro权限 4 输入 mount -o remount,r…

非凸科技助力第49届ICPC亚洲区域赛(成都)成功举办

10月26日-27日&#xff0c;由电子科技大学承办、非凸科技与华为共同支持的第49届ICPC国际大学生程序设计竞赛亚洲区域赛&#xff08;成都&#xff09;在郫都区体育中心体育馆顺利举行。非凸科技期待与产学研各界专家、青年才俊一起&#xff0c;推动基础科学理论研究的重大突破&…

ssm051网上医院预约挂号系统+jsp(论文+源码)_kaic

本科毕业设计论文 题目&#xff1a;网上医院预约挂号系统设计与实现 系 别&#xff1a; XX系&#xff08;全称&#xff09; 专 业&#xff1a; 软件工程 班 级&#xff1a; 软件工程15201 学生姓名&#xff1a; 学生学号&#xff1a; 指导教师&#xff1a…

EtherCAT转ModbusTCP相关技术

EtherCAT/Ethernet/IP/Profinet/ModbusTCP协议互转工业串口网关https://item.taobao.com/item.htm?ftt&id822721028899 MS-GW15 概述 MS-GW15 是 EtherCAT 和 Modbus TCP 协议转换网关&#xff0c;为用户提供一种 PLC 扩展的集成解决方案&#xff0c;可以轻松容易将 Modbu…

qt QTextStream详解

1、概述 QTextStream类是Qt框架中用于处理文本输入输出的类。它提供了一种方便的方式&#xff0c;可以从各种QIODevice&#xff08;如QFile、QBuffer、QTcpSocket等&#xff09;中读取文本数据&#xff0c;或者将文本数据写入这些设备中。QTextStream能够自动处理字符编码的转…

大数据-201 数据挖掘 机器学习理论 - 决策树 局部最优 剪枝 分裂 二叉分裂

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; 目前已经更新到了&#xff1a; Hadoop&#xff08;已更完&#xff09;HDFS&#xff08;已更完&#xff09;MapReduce&#xff08;已更完&am…

项目_Linux_网络编程_私人云盘

概述 项目功能总述&#xff1a; 该项目使用TCP进行通信&#xff0c;实现文件的上传和下载。云盘的文件同步有手动同步、实时同步、定时同步这三种。本项目主要实现的是手动同步的功能&#xff0c;重点训练在如何使用TCP进行文件传输。 选择TCP的原因&#xff1a; 文件的传输…

细腻的链接:C++ list 之美的解读

细腻的链接&#xff1a;C list 之美的解读 前言&#xff1a; 小编在前几日刚写过关于vector容器的内容&#xff0c;现在小编list容器也学了一大部分了&#xff0c;小编先提前说一下学这部分的感悟&#xff0c;这个部分是我学C以来第一次感到有难度的地方&#xff0c;特别是在…

Java之包,抽象类,接口

目录 包 导入包 静态导入 将类放入包 常见的系统包 抽象类 语法规则 注意事项&#xff1a; 抽象类的作用 接口 实现多个接口 接口间的继承 接口使用实例 &#xff08;法一&#xff09;实现Comparable接口的compareTo()方法 &#xff08;法二&#xff09;实现Comp…

qt QDragEnterEvent详解

1、概述 QDragEnterEvent是Qt框架中用于处理拖放进入事件的一个类。当用户将一个拖拽对象&#xff08;如文件、文本或其他数据&#xff09;拖动到支持拖放操作的窗口部件&#xff08;widget&#xff09;上时&#xff0c;系统会触发QDragEnterEvent事件。这个类允许开发者在拖拽…

永恒之蓝漏洞复现

永恒之蓝漏洞复现 1 实验准备 1台靶机 win7 关闭防火墙 控制面板->系统和安全->Windows 防火墙 192.168.184.131 1台攻击者 kali 192.168.184.129 2 实施攻击 kali操作 1.输入msfconsole回车 2.搜索ms17_010模块 msf6 > search ms17_010 3.选择编号为3的模块 use 3…

c++拷贝构造函数

1.拷贝构造函数 拷贝构造函数的调用时机 class A { public://默认构造函数A(){m_Hp 100;cout << "A默认构造函数调用完毕" << endl;}//有参构造函数A(int hp){m_Hp hp;cout << "A有参构造函数调用完毕" << endl;}A(const A&…

排序算法的分类、时间空间复杂度

排序是计算机科学和数学中的基本操作&#xff0c;有多种不同的方式&#xff0c;每种方式都有其特定的时间复杂度和空间复杂度。以下是对排序方式的分类及其时间复杂度和空间复杂度的详细分析&#xff1a; 一、排序方式的分类 排序方式主要分为两大类&#xff1a;比较排序和非…

【MMAN-M2】基于缺失模态编码器的多多头关注网络

abstract&#xff1a; 多模态融合是多模态学习领域的研究热点。以往的多模态融合任务大多是基于完整模态的。现有的缺失多模态融合研究没有考虑模态的随机缺失&#xff0c;缺乏鲁棒性。大多数方法都是基于缺失模态和非缺失模态之间的相关性&#xff0c;而忽略了缺失模态的语境…

【AI绘画】Stable Diffusion 基础教程! 如何写出好的prompt,一些技巧和原则

前言 Stable Diffusion 教程-中文 Ask AI for ART Original txt2img and img2img modes 基础模式之 文生图/图生图 基础入门部分 所有的AI设计工具&#xff0c;安装包、模型和插件&#xff0c;都已经整理好了&#xff0c;&#x1f447;获取~ 输入一段话&#xff0c;生成一…

C++ —— 网络通信

之前在Linux系统下介绍了多种实现网络通信的方式&#xff0c;从本文开始后面的文章将在Windows系统下用C为大家介绍技术&#xff0c;敬请期待~。 话不多说&#xff0c;直接进入正文&#xff0c;我们知道&#xff0c;要完成网络通信要用到非常多的函数&#xff0c;并且函数的参数…

FPGA视频GTH 8b/10b编解码转PCIE3.0传输,基于XDMA中断架构,提供工程源码和技术支持

目录 1、前言工程概述免责声明 2、相关方案推荐我已有的PCIE方案我已有的 GT 高速接口解决方案 3、PCIE基础知识扫描4、工程详细设计方案工程设计原理框图输入Sensor之-->芯片解码的HDMI视频数据组包基于GTH高速接口的视频传输架构GTH IP 简介GTH 基本结构GTH 发送和接收处理…