Hudi第二章:集成Spark

系列文章目录

Hudi第一章:编译安装
Hudi第二章:集成Spark


文章目录

  • 系列文章目录
  • 前言
  • 一、安装Spark
    • 1、安装Spark
    • 2.安装hive
  • 二、spark-shell
    • 1.启动命令
    • 2.插入数据
    • 3.查询数据
      • 1.转换DF
      • 2.查询
    • 3.更新
    • 4.时间旅行
    • 5.增量查询
    • 6.指定时间点查询
    • 7.删除数据
      • 1.获取总行数
      • 2.取其中2条用来删除
      • 3.将待删除的2条数据构建DF
      • 4.执行删除
      • 5.统计删除数据后的行数,验证删除是否成功
  • 三、Spark-SQL
    • 1.启动Spark-sql
    • 2.建表
      • 1.创建非分区表
      • 2.创建分区表
      • 3.在已有的hudi表上创建新表
      • CTAS
    • 2.插入数据
    • 3.查询
    • 4.时间旅行
    • 5.更新数据
      • 1.update
      • 2.MergeInto
    • 5.删除数据
    • 6.覆盖表
    • 7.修改表
    • 8.修改分区
  • 总结


前言

Hudi可以使用Spark作为搜索引擎。我们写博客记录一下,不知道一次能不能写完。


一、安装Spark

1、安装Spark

只需要简单的上传解压再添加环境变量即可。不做过多演示,具体可以看我之前的博客。
spark第一章:环境安装
spark版本我选用的是3.2。在这里留一个官方的下载地址。
spark-3.2.2-bin-hadoop3.2.tgz
然后我们从编译好的hudi文件夹中,将spark与hudi连接的jar包放入spark中。

cp /opt/software/hudi-0.12.0/packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.12.0.jar /opt/module/spark-3.2.2/jars/

然后需要启动hadoop

2.安装hive

后边hudi会依赖hive的Metastore和HiveServer2
Hive3第一章:环境安装

二、spark-shell

其中大部分命令和Spark很接近,建议学过Spark-shell之后再来学习这一部分。

1.启动命令

#针对Spark 3.2
spark-shell \--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

2.插入数据

dataGen.generateInserts是hudi提供的测试数据生成api,以下是固定写法

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGeneratorval inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))df.write.format("hudi").options(getQuickstartWriteConfigs).option(PRECOMBINE_FIELD_OPT_KEY, "ts").option(RECORDKEY_FIELD_OPT_KEY, "uuid").option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").option(TABLE_NAME, tableName).mode(Overwrite).save(basePath)

说一下这几个参数。
RECORDKEY_FIELD_OPT_KEY:可以理解为MYSQL里的主键。
RECORDKEY_FIELD_OPT_KEY:预聚合字段,当主键相同时,以该字段大小决定,一般用ts字段,也就是时间戳。
PARTITIONPATH_FIELD_OPT_KEY:分区字段
TABLE_NAME:表名称
可以新开一个窗口在本地看一下
在这里插入图片描述

3.查询数据

1.转换DF

val tripsSnapshotDF = spark.read.format("hudi").load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

2.查询

spark.sql(“select fare, begin_lon, begin_lat, uuid, ts from hudi_trips_snapshot where fare > 20.0”).show()
在这里插入图片描述

3.更新

和插入数据差不多,但是需要把mode从Overwrite换成Append。将其从覆盖编程追加

val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").options(getQuickstartWriteConfigs).option(PRECOMBINE_FIELD_OPT_KEY, "ts").option(RECORDKEY_FIELD_OPT_KEY, "uuid").option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").option(TABLE_NAME, tableName).mode(Append).save(basePath)

更新之后我们再次查询。

val tripsSnapshotDF = spark.read.format("hudi").load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")spark.sql("select fare, begin_lon, begin_lat, uuid, ts from  hudi_trips_snapshot where fare > 20.0").show()

在这里插入图片描述
可以看到ts明显增大

4.时间旅行

当数据不断更新时,我们该如何寻找更新前的数据。这个在MYSQL数据库中是没有的,但hudi有,我们只需要找到当初更新数据的时间戳即可。

spark.sql("select _hoodie_commit_time, ts, uuid, fare from  hudi_trips_snapshot").show()

因为我们只有两次提交,所以我们只有两种时间戳
在这里插入图片描述
这就是最简单的年月日时分秒。
现在我们回到第一次提交时的数据。

val tripsSnapshotDF1 = spark.read.format("hudi").option("as.of.instant", "20230927201447123").load(basePath)tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot1")

现在在新的虚拟表中查询。

spark.sql("select fare, begin_lon, begin_lat, uuid, ts from  hudi_trips_snapshot1 where fare > 20.0").show()

随便找一条对比
在这里插入图片描述
可以看到和之前的第一条是一样的。
时间旅行还可以这样写
spark.read.
format(“hudi”).
option(“as.of.instant”, “2023-09-27 20:14:47:123”).
load(basePath)
效果和上边一样。

5.增量查询

查询某一次提交之后的数据。
现在我在插入三次数据。
重复执行三次

df.write.format("hudi").options(getQuickstartWriteConfigs).option(PRECOMBINE_FIELD_OPT_KEY, "ts").option(RECORDKEY_FIELD_OPT_KEY, "uuid").option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").option(TABLE_NAME, tableName).mode(Append).save(basePath)

然后重新生成虚拟表

spark.read.format("hudi").load(basePath).createOrReplaceTempView("hudi_trips_snapshot")

因为每次提交,查询时间会被覆盖,所以我们选择从本地获取。
在这里插入图片描述
咱们选择第四次之后的数据

val beginTime = "20230927210631014"# 增量查询表
val tripsIncrementalDF = spark.read.format("hudi").option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental").show()

在这里插入图片描述
可以看到都是第四次之后的数据。

6.指定时间点查询

增量查询可以查询某一次提交之后的数据,指定时间点查询可以查询,一段时间内的数据。

val beginTime = "000" 
val endTime = "20230927210631014"val tripsPointInTimeDF = spark.read.format("hudi").option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).option(END_INSTANTTIME_OPT_KEY, endTime).load(basePath)
tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where").show()

在这里插入图片描述
都是endTime之前的。

7.删除数据

1.获取总行数

spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()

在这里插入图片描述

2.取其中2条用来删除

val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)

3.将待删除的2条数据构建DF

val deletes = dataGen.generateDeletes(ds.collectAsList())
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))

4.执行删除

df.write.format("hudi").options(getQuickstartWriteConfigs).option(OPERATION_OPT_KEY,"delete").option(PRECOMBINE_FIELD_OPT_KEY, "ts").option(RECORDKEY_FIELD_OPT_KEY, "uuid").option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").option(TABLE_NAME, tableName).mode(Append).save(basePath)

5.统计删除数据后的行数,验证删除是否成功

val roAfterDeleteViewDF = spark.read.format("hudi").load(basePath)roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")// 返回的总行数应该比原来少2行
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()

在这里插入图片描述

三、Spark-SQL

1.启动Spark-sql

#针对Spark 3.2
spark-sql \--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

2.建表

单独创建一个数据库,用作学习。

create database spark_hudi;
use spark_hudi;

1.创建非分区表

hudi中默认分为cow和mor两种表,他们后台的存储方式不太一样,但是前端看起来没区别。
创建一个cow表,默认primaryKey ‘uuid’,不提供preCombineField

create table hudi_cow_nonpcf_tbl (uuid int,name string,price double
) using hudi;

创建一个mor非分区表

create table hudi_mor_tbl (id int,name string,price double,ts bigint
) using hudi
tblproperties (type = 'mor',primaryKey = 'id',preCombineField = 'ts'
);

2.创建分区表

创建一个cow分区外部表,指定primaryKey和preCombineField

create table hudi_cow_pt_tbl (id bigint,name string,ts bigint,dt string,hh string
) using hudi
tblproperties (type = 'cow',primaryKey = 'id',preCombineField = 'ts')
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl';

3.在已有的hudi表上创建新表

create table hudi_existing_tbl0 using hudi
location 'file:///tmp/hudi/dataframe_hudi_nonpt_table';create table hudi_existing_tbl1 using hudi
partitioned by (dt, hh)
location 'file:///tmp/hudi/dataframe_hudi_pt_table';

因为实际路径上并没有数据,所以就不创建了。

CTAS

Create Table As Select
为了提高向hudi表加载数据的性能,CTAS使用批量插入作为写操作,所以也可以用来插入数据。

通过CTAS创建cow非分区表,不指定preCombineField

create table hudi_ctas_cow_nonpcf_tbl
using hudi
tblproperties (primaryKey = 'id')
as
select 1 as id, 'a1' as name, 10 as price;

在这里插入图片描述
通过CTAS创建cow分区表,指定preCombineField

create table hudi_ctas_cow_pt_tbl
using hudi
tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts')
partitioned by (dt)
as
select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-12-01' as dt;

通过CTAS从其他表加载数据
了解即可

# 创建内部表
create table parquet_mngd using parquet location 'file:///tmp/parquet_dataset/*.parquet';# 通过CTAS加载数据
create table hudi_ctas_cow_pt_tbl2 using hudi location 'file:/tmp/hudi/hudi_tbl/' options (type = 'cow',primaryKey = 'id',preCombineField = 'ts')
partitioned by (datestr) as select * from parquet_mngd;

2.插入数据

向非分区表插入数据

insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20;
insert into hudi_mor_tbl select 1, 'a1', 20, 1000;

向分区表动态分区插入数据

insert into hudi_cow_pt_tbl partition (dt, hh)
select 1 as id, 'a1' as name, 1000 as ts, '2021-12-09' as dt, '10' as hh;

向分区表静态分区插入数据

insert into hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='11') select 2, 'a2', 1000;

3.查询

和基本的SQL语句一样

select name,price from hudi_cow_nonpcf_tbl;

在这里插入图片描述

4.时间旅行

建一张新表

create table hudi_cow_pt_tbl1 (id bigint,name string,ts bigint,dt string,hh string
) using hudi
tblproperties (type = 'cow',primaryKey = 'id',preCombineField = 'ts')
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl1';

插入一条数据并查询

insert into hudi_cow_pt_tbl1 select 1, 'a0', 1000, '2023-09-29', '10';
select * from hudi_cow_pt_tbl1;

在这里插入图片描述

现在我们更新这条数据再次查询。

insert into hudi_cow_pt_tbl1 select 1, 'a1', 1001, '2023-09-29', '10';
select * from hudi_cow_pt_tbl1;

在这里插入图片描述
可以看到第二次的ts更大,所以name已经更新,现在我们进行时间旅行,找到刚刚的时间戳。
在这里插入图片描述

select * from hudi_cow_pt_tbl1 timestamp as of '20230929200405253';

在这里插入图片描述
这就可以查询到之前的数据。

5.更新数据

1.update

hudi也是可以使用update更新数据的。
先查看一下

select * from hudi_mor_tbl ;

在这里插入图片描述
在更新数据。

update hudi_mor_tbl set price = price * 2, ts = 1111 where id = 1;
select * from hudi_mor_tbl ;

在这里插入图片描述

2.MergeInto

这个语法有点类似于join,用于两张表的拼接。
创建一张表,并插入数据。

create table merge_source (id int, name string, price double, ts bigint) using hudi
tblproperties (primaryKey = 'id', preCombineField = 'ts');insert into merge_source values (1, "old_a1", 22.22, 2900), (2, "new_a2", 33.33, 2000), (3, "new_a3", 44.44, 2000);

我们将新表的内容插入hudi_mor_tbl

merge into hudi_mor_tbl as target
using merge_source as source
on target.id = source.id
when matched then update set *
when not matched then insert *;

查看hudi_mor_tbl。

select * from hudi_mor_tbl ;

在这里插入图片描述

5.删除数据

delete from hudi_mor_tbl where id = 1;
select * from hudi_mor_tbl ;

在这里插入图片描述

6.覆盖表

insert overwrite hudi_mor_tbl select 99, 'a99', 20.0, 900;
select * from hudi_mor_tbl ;

在这里插入图片描述

7.修改表

修改语法
– Alter table name
ALTER TABLE oldTableName RENAME TO newTableName

– Alter table add columns
ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*)

– Alter table column type
ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType

– Alter table properties
ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = ‘value’)

这么我们修改表名做个实例。
在这里插入图片描述

ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl1;

在这里插入图片描述

8.修改分区

show partitions hudi_cow_pt_tbl1;

在这里插入图片描述

alter table hudi_cow_pt_tbl1 drop partition (dt='2023-09-29', hh='10');
show partitions hudi_cow_pt_tbl1;

在这里插入图片描述


总结

这一次就写到这里,东西比较多,关于Spark的东西还要在写一次。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/145529.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

linux Mysql 8.0.16 安装搭建

文章目录 Mysql 搭建一、安装包下载二、创建用户组用户和修改权限三、配置my.cnf Mysql 搭建 一、安装包下载 mysql 下载地址:https://downloads.mysql.com/archives/community/ 这里有所有的mysql的版本,下载自己需要的版本,我们这里下载 …

视频高效剪辑,批量调整视频速度,让视频更加精彩

你是否曾经需要调整多个视频的速度,但却苦于手动操作效率低下?如果你也遇到了这样的问题,那么是时候采取行动,使用一款高效易用的视频处理工具了。 首先,我们要进入好简单批量智剪,并在板块栏里选择“任务…

搭建自己的搜索引擎之五

一、前言 接上文 搭建自己的搜索引擎之四,下面继续介绍茴香豆茴字的另外两种写法。 二、Jest Jest是ES的Java Http Rest客户端,它主要是为了弥补以前ES自有API缺少HttpRest接口客户端的不足,但因为现在ES官方已经提供了RestClient ,该项目已…

cJSON.c 在mfc中编译失败报 lnk2005错误

问题一、在MFC工程中导入cJson.c 编译时报以下错误: 严重性 代码 说明 项目 文件 行 禁止显示状态 错误 C1853 “x64\Release\xxx.pch”预编译头文件来自编译器的早期版本,或者预编译头为 C 而在 C 中使用它(或相反) xxx …

【广州华锐互动】鱼类授精繁殖VR虚拟仿真实训系统

随着科技的不断发展,虚拟现实技术在各个领域的应用越来越广泛。在养殖业中,VR技术可以帮助养殖户进行家鱼授精实操演练,提高养殖效率和繁殖成功率。本文将介绍利用VR开展家鱼授精实操演练的方法和应用。 首先,我们需要了解家鱼授精…

WorkPlus私有化部署IM即时通讯平台,构建高效安全的局域网办公环境

随着数字化转型的加速,政府机构与企业对高效、安全的即时通讯和协作工具的需求日益增长。企业微信和钉钉作为当前市场上较为常见的通讯工具,虽然在一定程度上满足了企业内部协作的需求,但仍存在一些问题,如数据安全性、私有化部署…

N 皇后问题

N 皇后问题研究的是如何将 N 个皇后放置在 N x N 的棋牌上,并且使皇后彼此之间不能相互攻击。 国际象棋的规则,皇后可以攻击与之处在同一行或同一列或同一斜线上的棋子 解决思路是:剪枝 回溯方法 解决问题 (1).使用二维数组创建棋牌格子 g…

C++与QML交互总结二

目录 1.CPP调用QML 1.1 QMetaObject::invokeMethod调用 1.2 CPP中的信号绑定qml中的槽 2.QML调用CPP 2.1 QML单实例注册 2.2 将类对象注册到QML的上下文中 2.3 QML信号调用CPP槽 3.QML中注入一个cpp实例 3.1qmlRegisterType 3.2QML_ELEMENT 4.附加属性: QML_ATTACHE…

面试题:说一下SpringBoot的自动配置原理

文章目录 引言工作原理剖析EnableAutoConfiguration自动配置生效总结 引言 不论在工作中,亦或是求职面试,Spring Boot已经成为我们必知必会的技能项。除了某些老旧的政府项目或金融项目持有观望态度外,如今的各行各业都在飞速的拥抱这个已经…

c==ubuntu+vscode debug redis7源码

新建.vscode文件夹,创建launch.json和tasks.json {"version": "0.2.0","configurations": [{"name": "C/C Launch","type": "cppdbg","request": "launch","prog…

Flink CDC MySQL同步MySQL错误记录

1、启动 Flink SQL [appuserwhtpjfscpt01 flink-1.17.1]$ bin/sql-client.sh2、新建源表 问题1:Encountered “(” 处理方法:去掉int(11),改为int Flink SQL> CREATE TABLE t_user ( > uid int(11) NOT NULL AUTO_INCREMENT COMME…

1 论文笔记:Efficient Trajectory Similarity Computation with ContrastiveLearning

2022CIKM 1 intro 1.1 背景 轨迹相似度计算是轨迹分析任务(相似子轨迹搜索、轨迹预测和轨迹聚类)最基础的组件之一现有的关于轨迹相似度计算的研究主要可以分为两大类: 传统方法 DTW、EDR、EDwP等二次计算复杂度O(n^2)缺乏稳健性 会受到非…

【Linux】进程控制基础知识

目录 一,fack回顾 二,进程终止 1.进程终止,操作系统做了什么? 2.进程终止,常见的方式 1.main函数的,return 返回码 2. exit()函数 三,进程等待 1. 回收进程方法 (1. wait…

Node.js 学习笔记

小插件Template String Converter 当输入${}时,自动为其加上 反引号 一、node入门 node.js是什么 node的作用 开发服务器应用 开发工具类应用 开发桌面端应用 1.命令行工具 命令的结构 常用命令 切换到D盘——D: 查看D盘目录——dir 切换工作目录——c…

FFmpeg 命令:从入门到精通 | FFmpeg 音视频处理流程

FFmpeg 命令:从入门到精通 | FFmpeg 音视频处理流程 FFmpeg 命令:从入门到精通 | FFmpeg 音视频处理流程实例 FFmpeg 命令:从入门到精通 | FFmpeg 音视频处理流程 实例 ffmpeg -i test_1920x1080.mp4 -acodec copy -vcodec libx264 -s 1280x…

ElasticSearch - 基于 DSL 、JavaRestClient 实现数据聚合

目录 一、数据聚合 1.1、基本概念 1.1.1、聚合分类 1.1.2、特点 1.2、DSL 实现 Bucket 聚合 1.2.1、Bucket 聚合基础语法 1.2.2、Bucket 聚合结果排序 1.2.3、Bucket 聚合限定范围 1.3、DSL 实现 Metrics 聚合 1.4、基于 JavaRestClient 实现聚合 1.4.1、组装请求 …

XSS详解

XSS一些学习记录 XXS短标签、属性、事件、方法短标签属性事件函数弹窗函数一些对于绕过有用的函数一些函数使用payload收集 浏览器编码问题XML实体编码URL编码JS编码混合编码 一些绕过方法利用constructor原型污染链构造弹框空格绕过圆括号过滤绕过其他的一些绕过 参考 XXS短标…

Zygisk-IL2CppDumper对抗方案

众所周知,Unity引擎中有两种脚本编译器,分别是 Mono 和 IL2CPP 。这两种脚本编译器各有优势,同时也存在一些安全性问题,本文将从游戏安全角度对其进行分析并提供对策。 Mono 是由跨平台的开源.NET 实现,它允许开发者使…

关于 自定义的RabbitMQ的RabbitMessageContainer注解-实现原理

概述 RabbitMessageContainer注解 的主要作用就是 替换掉Configuration配置类中的各种Bean配置; 采用注解的方式可以让我们 固化配置,降低代码编写复杂度、减少配置错误情况的发生,提升编码调试的效率、提高业务的可用性。 为什么说“降低…

PMSM——转子位置估算基于QPLL

文章目录 前言仿真模型观测器速度观测位置观测转矩波形电流波形 前言 今后是电机控制方向的研究生的啦,期待有同行互相交流。 仿真模型 观测器 速度观测 位置观测 转矩波形 电流波形