2023_Spark_实验十一:RDD基础算子操作

一、RDD的练习可以使用两种方式

  1. 使用Shell
  2. 使用IDEA

二、使用Shell练习RDD

当你打开 Spark 的交互式命令行界面(也就是 Spark shell)的时候,它已经自动为你准备好了一个叫做 sc 的特殊对象,这个对象是用来和 Spark 集群沟通的。你不需要,也不应该自己再创建一个这样的对象。

如果你想告诉 Spark 用哪个计算机或者计算机集群来执行你的命令,可以通过 --master 这个选项来设置。比如,你想在本地计算机上只用四个核心来运行,就可以在命令里加上 --master local[4]

$ ./bin/spark-shell --master local[4]

如果你有一些自己的代码打包成了 JAR 文件,想要在 Spark shell 里用,可以通过 --jars 选项,后面跟上你的 JAR 文件名,用逗号分隔,来把它们加入到可以识别的路径里。

$ ./bin/spark-shell --master local[4] --jars code.jar

此外,如果你需要一些额外的库或者 Spark 的扩展包,可以通过 --packages 选项,后面跟上这些库的 Maven 坐标(一种常用的依赖管理方式),用逗号分隔,来添加它们。假设你需要的包是 org.apache.spark:spark-mllib_2.13:3.4.1,这是Spark的机器学习库。

$ ./bin/spark-shell --master local[4] --packages "org.apache.spark:spark-mllib_2.13:3.4.1"

简单来说,这些选项就是让你告诉 Spark 怎么运行你的代码,以及在哪里找到运行代码所需要的资源。

RDD基础

// 从array中创建RDD
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
distData.foreach(println)// 读取文件创建RDD
val lines = sc.textFile("D:\\PycharmProjects\\2024\\pyspark\\datas\\word.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
println(totalLength)// 数据持久化
lineLengths.persist()
print(lineLengths.reduce((a, b) => a + b))// 对象的函数
object MyFunctions {def func1(s: String): String = { s"打印RDD中的字符串,包含的字符串有: $s" }
}
val myRdd = lines.flatMap(lines => lines.split(" "))
myRdd.map(MyFunctions.func1).foreach(println)import org.apache.spark.rdd.RDD
// 类的函数
class MyClass extends Serializable {def func1(s: String): String = { f"在MyClass类中,打印RDD中的字符串,包含的字符串有: $s" }def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}val f1 = new MyClass()
f1.doStuff(myRdd).foreach(println)// 类的应用
class MyClass2 extends Serializable {val field = "你好,测试案例..."def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
val f2 = new MyClass2()
f2.doStuff(myRdd).foreach(println)// Pair RDD应用
val lines = sc.textFile("D:\\PycharmProjects\\2024\\pyspark\\datas\\word.txt")
val pairs = lines.flatMap(_.split(" ")).map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
// 交换键和值的位置
val swappedCounts = counts.map(_.swap)
// 先根据值排序(降序),然后根据键排序(升序)
val sortedByValueThenKeyDesc = swappedCounts.sortByKey(ascending = false)
val CountsDescondvalue = sortedByValueThenKeyDesc .map(_.swap)
CountsDescondvalue .collect()// 广播变量 Broadcast Variables
val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar.valueval accum = sc.longAccumulator("My Accumulator")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
accum.value

三、使用IDEA练习RDD

基于Spark3.4.1,IDEA练习基础的RDD

package testimport org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession/*** @projectName GNUSpark20204  * @package test  * @className test.RDD_spark341  * @description ${description}  * @author pblh123* @date 2024/9/26 23:08* @version 1.0**/object RDD_spark341 extends App {//  创建SparkSession sparkcontextval spark = SparkSession.builder.appName("RDD_spark341").master("local[2]").getOrCreate()val sc: SparkContext = spark.sparkContext//  spark代码主体// 从array中创建RDDval data = Array(1, 2, 3, 4, 5)val distData = sc.parallelize(data)distData.foreach(println)// 读取文件创建RDDval lines = sc.textFile("D:\\PycharmProjects\\2024\\pyspark\\datas\\word.txt")val lineLengths = lines.map(s => s.length)val totalLength = lineLengths.reduce((a, b) => a + b)println(totalLength)// 数据持久化lineLengths.persist()print(lineLengths.reduce((a, b) => a + b))// 对象的函数object MyFunctions {def func1(s: String): String = {s"打印RDD中的字符串,包含的字符串有: $s"}}val myRdd = lines.flatMap(lines => lines.split(" "))myRdd.map(MyFunctions.func1).foreach(println)import org.apache.spark.rdd.RDD// 类的函数class MyClass extends Serializable {def func1(s: String): String = {f"在MyClass类中,打印RDD中的字符串,包含的字符串有: $s"}def doStuff(rdd: RDD[String]): RDD[String] = {rdd.map(func1)}}val f1 = new MyClass()f1.doStuff(myRdd).foreach(println)// 类的应用class MyClass2 extends Serializable {val field = "你好,测试案例..."def doStuff(rdd: RDD[String]): RDD[String] = {rdd.map(x => field + x)}}val f2 = new MyClass2()f2.doStuff(myRdd).foreach(println)// Pair RDD应用val pairs = lines.flatMap(_.split(" ")).map(s => (s, 1))val counts = pairs.reduceByKey((a, b) => a + b)// 交换键和值的位置val swappedCounts = counts.map(_.swap)// 先根据值排序(降序),然后根据键排序(升序)val sortedByValueThenKeyDesc = swappedCounts.sortByKey(ascending = false)val CountsDescondvalue = sortedByValueThenKeyDesc.map(_.swap)println(CountsDescondvalue.collect())// 广播变量 Broadcast Variablesval broadcastVar = sc.broadcast(Array(1, 2, 3))println(broadcastVar.value)val accum = sc.longAccumulator("My Accumulator")sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))println(accum.value)//  关闭sparkSesssion sparkcontextsc.stop()spark.stop()}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1547401.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

【高频SQL基础50题】1-5

目录 1.可回收且低脂的产品 2. 使用唯一标识码替换员工ID 3.有趣的电影 4.每位教师所教授的科目种类的数量 5.每位经理的下属员工数量 1.可回收且低脂的产品 查询题。 # Write your MySQL query statement below SELECT product_id FROM Products WHERE low_fats"…

Seagull远程获取通讯录APP/相册/短信/双端/全开源海外版本

Seagull海外版远程获取工具,全开源,企业管理,内部采集等应用市场,请勿违法使用,禁止任何商业用途,仅供学习研究。 PHP版本:7.4 伪静态:TP 目录:public 后台&#xff…

C--结构体和位段的使用方法

各位看官如果您觉得这篇文章对您有帮助的话 欢迎您分享给更多人哦 感谢大家的点赞收藏评论,感谢您的支持!!! 一:结构体 首先结构体我们有一个非常重要的规则 非常重要: 我们允许在初始化时自动将字符串字面…

JUC高并发编程3:线程间通信

1 线程间通信 线程间通信的模型有两种:共享内存和消息传递,以下方式都是基本这两种模型来实现的。我们来基本一道面试常见的题目来分析 场景:两个线程,一个线程对当前数值加 1,另一个线程对当前数值减 1,要求用线程间…

使用离火插件yoloV8数据标注,模型训练

1. 启动 2.相关配置 2.1 data.yaml path: D:/yolo-tool/yaunshen-yolov8/YOLOv8ys/YOLOv8-CUDA10.2/1/datasets/ceshi001 train: images val: images names: [蔡徐坤,篮球] 2.2 cfg.yaml # Ultralytics YOLOv8, GPL-3.0 license # Default training settings and hyp…

为什么你应该将你的营销材料本地化为俄语:释放新的机会

在当今高度互联的世界中,企业不断寻求新市场以扩大其全球足迹。一个经常被忽视但充满未开发潜力的市场是俄罗斯。全球有超过2.6亿俄语使用者,将您的营销材料翻译成俄语并本地化不仅是一个明智之举,也是迈向强大经济集团和获得竞争优势的重要一…

Docker安装nacos最新版本(图文教程)

Nacos(Naming And Configuration Service)是阿里巴巴开源的一个动态服务发现、配置管理和服务管理平台。Nacos 提供了一套简单易用的服务发现、配置管理、动态 DNS 服务以及服务健康检查的解决方案,广泛应用于微服务架构中。 一、拉取镜像 docker pull nacos/nacos-server:…

@Lazy注解原理

目录 Lazy作用在类上Lazy注解作用在字段上Lazy注解标记的字段或方法中的参数何时触发加载AOP代理中的TargetSource对象为什么使用了 Lazy 之后,就能解决循环依赖问题,正常启动了呢?案例Resource对Lazy注入的处理 参考: https://b…

微服务——服务保护(Sentinel)(一)

1.雪崩问题 级联失败或雪崩问题指的是在微服务架构中,由于服务间的相互依赖和调用,当一个服务出现故障时,会引起调用它的服务也出现故障,进而引发整个调用链路的多个服务都出现故障,最终导致整个系统崩溃的现象。 产生…

【笔记】Dynamic Taint Analysis 动态污点分析

Dynamic Taint Analysis 动态污点分析 什么是动态污点分析?为什么要搞动态污点分析? “污点”指的是什么? DTA中的“污点”指代的是不可信的输入,比如用户输入、网络请求、文件数据等。比方说,如果把程序看作一个城市&…

使用 Visily.ai 进行应用界面设计

在现代应用开发中,快速创建高保真线框图和原型是一个巨大的优势。Visily.ai 是一个利用人工智能帮助你实现这一目标的在线工具。本文将介绍如何使用 Visily.ai 进行应用界面设计。 什么是 Visily.ai? Visily.ai 是一个 AI 驱动的 UI 设计工具&#xff…

嵌入式硬件工程师与嵌入式软件工程师的区别(详细版)

嵌入式硬件工程师与嵌入式软件工程师的区别(详细版) 这里写目录标题 嵌入式硬件工程师与嵌入式软件工程师的区别(详细版)什么是嵌入式硬件工程师?什么是嵌入式软件工程师?嵌入式硬件工程师与嵌入式软件工程…

css 下拉框展示:当hover的时候展示下拉框 z-index的用法解释

代码如下&#xff1a; <template><div class"outer"><div class"left"></div><div class"aTest2"><div class"box">显示方框</div><div class"aTest3"></div></…

【SQL】指定时间段的下单产品

目录 语法 需求 示例 分析 代码 语法 SUM(column_name) SUM 是一个聚合函数&#xff08;Aggregate Function&#xff09;&#xff0c;用于计算数字列中值的总和。当你需要对表中的某一列数值进行求和时&#xff0c;SUM 函数就显得非常有用。它通常与 GROUP BY 语句一起使用…

运算符两边的数据类型

6-3 类型转换 1.非赋值运算的类型转换 &#xff08;1&#xff09;水平方向的转换&#xff1a;所有的char型和short型自动地转换成int 型&#xff0c;所有的unsigned short 型自动地转换成unsigned型&#xff0c;所有的long型自动地转换成unsigned long 型&#xff0c;所有的f…

exBase

1.准备工作 1.端口配置 下列为默认端口号&#xff0c;若部分端口号已被占用&#xff0c;用户可以根据实际情况进行修改。 端口号 说明 31030 exBase默认端口 31003 配置库默认端口 2181 zookeeper默认端口 9092 kafka默认端口 8091 metaNode的RPC端口 8092 node…

毕业论文写作全攻略,让你轻松过关!

姐妹们&#xff0c;毕业论文是大学旅程的最后一站&#xff0c;也是展示我们学术成果的重要时刻。但是&#xff0c;毕业论文该怎么写呢&#xff1f;别担心&#xff0c;我来告诉你&#xff01;&#x1f4da; writehelp智能写作辅导&#xff1a;http://www.writehelp.vip/?sid17…

线性基学习DAY2

今天是第二题学习线性基&#xff0c;让我对线性基的认识更多了&#xff0c;线性基其实就是去处理整个区间异或最值问题的 我们来看一下昨天的一道题 P4570 [BJWC2011] 元素 昨天其实这题我尝试了两次&#xff0c;一种是普通消元去求解&#xff0c;另一种是高斯消元去求解&…

异地如何进行跨地区协作传输文件?

跨区域协作现在是很多企业的常态了&#xff0c;无论是跨国公司还是国内多地区运营的企业&#xff0c;高效、可靠的文件传输协作都是业务顺利进行的关键。然而&#xff0c;异地传输文件常常面临诸多挑战&#xff0c;如何选择合适的工具和服务成为企业必须考虑的问题。 异地传输文…

【ADC】ΔΣ ADC 中数字滤波器的延迟以及 SAR ADC 与 ΔΣ ADC 的差异对比总结

本文学习于TI 高精度实验室课程&#xff0c;深入探讨 delta-sigma 转换器中使用的数字滤波器。具体来说&#xff0c;本文将重点介绍数字滤波器如何引入延迟&#xff0c;因为这是 SAR 和 delta-sigma ADC 之间的显著差异。 文章目录 一、低延迟数字滤波器二、高延迟数字滤波器三…