Spark+实例解读

第一部分 Spark入门

学习教程:Spark 教程 | Spark 教程

Spark 集成了许多大数据工具,例如 Spark 可以处理任何 Hadoop 数据源,也能在 Hadoop 集群上执行。大数据业内有个共识认为,Spark 只是Hadoop MapReduce 的扩展(事实并非如此),如Hadoop MapReduce 中没有的迭代查询和流处理。然而Spark并不需要依赖于 Hadoop,它有自己的集群管理系统。更重要的是,同样数据量,同样集群配置,Spark 的数据处理速度要比 Hadoop MapReduce 快10倍左右。

Spark 的一个关键的特性是数据可以在内存中迭代计算,提高数据处理的速度。虽然Spark是用 Scala开发的,但是它对 Java、Scala、Python 和 R 等高级编程语言提供了开发接口。

第二部分 SparkCore

2 RDD

2.1 转换算子-map

map是将RDD的数据一条条处理,返回新的RDD

# 定义方法
def add(data):return data*10
print(rdd.map(add).collect)
# 定义lamabda表达式
rdd.map(lambda data:data*10)
2.2 转换算子-flatMap

flatMap对RDD执行map操作,然后执行解除嵌套操作

rdd = sc.parallelize([('a',1),('a',11)])
# 将二元元组的所有value都*10进行处理
rdd.mapValues(lambda x:x*10)
    data.map { case (label, feature) => ((feature, label), 1)}.reduceByKey(_ + _).map { case ((feature, label), num) =>(feature, List((label, num))) //feature,label,cnt}.reduceByKey(_ ::: _).mapValues { x =>val size_entro = x.map(_._2).sumval res = x.map(_._2.toDouble / size_entro).map { t =>-t * (Math.log(t) / Math.log(2))}.sumsize_entro * res}.mapValues { x => x / size }.map(_._2).sum

2.3转换算子-reduceByKey

针对KV型RRDD自动按照key进行分组,然后按照提供的聚合逻辑,对组内数据value完成聚合操作

rdd.reduceByKey(func)

      val clickStat = joinDf.where(F.col("active_type")==="click").rdd.map(row => {val mapInfo = Option(row.getMap[String,Double](row.fieldIndex(feat)))mapInfo match {case Some(x) => xcase _ => null}}).filter(_!=null).flatMap(x=>x).reduceByKey(_+_)
2.4 转换算子-mapValues

针对二元元组RDD,对其内部的二元元组的value进行map操作

rdd = sc.parallelize([('a',1),('a',11)])
# 将二元元组的所有value都*10进行处理
rdd.mapValues(lambda x:x*10)
    data.map { case (label, feature) => ((feature, label), 1)}.reduceByKey(_ + _).map { case ((feature, label), num) =>(feature, List((label, num))) //feature,label,cnt}.reduceByKey(_ ::: _).mapValues { x =>val size_entro = x.map(_._2).sumval res = x.map(_._2.toDouble / size_entro).map { t =>-t * (Math.log(t) / Math.log(2))}.sumsize_entro * res}.mapValues { x => x / size }.map(_._2).sum
2.5 转换算子-groupBy

将RDD的数据进行分组

rdd.groupBy(func)

rdd = sc.parallelize([('a',1),('a',11),('b',1)])
# 通过这个函数确认按照谁来分组(返回谁即可)
print(rdd.groupBy(lambda x:x[0]).collect())
print(rdd.groupBy(lambda x:x[0]).collect())
# 结果为:
​
    val userContentListHis = spark.thriftSequenceFile(inpath_his, classOf[LongVideoUserContentStat]).map(l=>{(l.getUid,l.getContent_properties.get(0).getId)}).toDF("uid", "docid").groupBy($"uid")
2.6 转换算子-filter

过滤想要的数据进行保存

rdd = sc.parallelize([1,2,3,4,5,6])
rdd.filter(lamdba x:x%2 == 1) # 只保留奇数
    val treatmentUser = spark.read.option("header", false).option("sep", "\t").csv(inpath).select("_c0").withColumnRenamed("_c0", "userid").withColumn("flow", getexpId($"userid")).filter($"flow" >= start and $"flow" <= end).select("userid").dropDuplicates()
2.7 转换算子-其他算子
distinct算子
rdd.distinct() 一般不写去重分区val userContentHis = hisPathList.map(path =>{val hisData = spark.thriftSequenceFile(path, classOf[LongVideoUserContentStat])println(s"hisData ==>${hisData.count()}")hisData}).reduce(_ union _).distinct().repartition(partition)
union算子 
2个rdd合并成一个rdd:rdd.union(other_rdd)
只合并不去重  rdd的类型不同也是可以合并的
rdd1 = sc.parallelize([1,2,3])
rdd2 = sc.parallelize([1,2,3,4])
rdd3 = rdd1.union(rdd2)
2.8 算子面试题
1.groupByKey和reduceByKey的区别:
groupByKey仅仅有分组功能而已,reduceByKey除了分组还有聚合作用,是一个分组+聚合一体化的算子. 分组前先聚合再shuffle,预聚合,被shuffle的数据极大的减少,提升了性能.数据量越大,reduceByKey的性能优势也就越大.
​
2.rdd的分区数怎么查看? 
通过getNumPartitions API查看,返回int
​
3.Transformation和Action的区别:
转换算子的返回值100%是rdd,而Action算子不一定.转换算子是懒加载的,只有遇到Action才会执行
​
4.哪两个算子不经过Driver直接输出?
foreach 和 saveAsTextFile

3 RDD的持久化

3.1 RDD的持久化

rdd是过程数据 rdd进行相互迭代计算,执行开启时,新的RDD生成,老的RDD消失

3.2 RDD的缓存

val rawLog = profilePushLogReader(spark, date, span).persist()
3.3 RDD的checkPoint

也是将RDD的数据保存起来,仅支持磁盘存储,被认为是安全的, 不保留血缘关系

3.4 缓存面试题

4 案例

4.1 搜素引擎日志分析案例
4.2
4.3 ....
4.4 计算资源面试题
1.如何尽量提升任务计算的资源?
计算cpu核心和内存量,通过--executor-memory指定executor内存,通过--executor-cores指定executor的核心数
​

5 广播变量 累加器

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1488721.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

C语言常见字符函数和字符串函数精讲

目录 引言 一、字符函数 1.字符分类函数 2.字符转换函数 二、字符串函数 1.gets、puts 2.strlen 3.strcpy 4.strncpy 5.strcat 6.strncat 7.strcmp 8.strncmp 9.strstr 10.strchr 11.strtok 12.strlwr 13.strupr 引言 在C语言编程中&#xff0c;字符函数…

Rancher 快照备份至 S3 及备份恢复

AWS S3&#xff08;Simple Storage Service&#xff09;是亚马逊云服务提供的一种高度可扩展、安全且经济高效的对象存储服务。它允许用户在任何位置存储和检索任意数量的数据,非常适合存储和分发静态文件、备份数据以及作为数据湖的存储层。 集群备份 一、创建S3桶 1、登录…

PyTorch学习(1)

PyTorch学习&#xff08;1&#xff09; CIFAR-10数据集-图像分类 数据集来源是官方提供的&#xff1a; torchvision.datasets.CIFAR10()共有十类物品&#xff0c;需要用CNN实现图像分类问题。 代码如下&#xff1a;(CIFAR_10_Classifier_Self_1.py) import torch import t…

【Linux】玩转操作系统,深入刨析进程状态与调度机制

目录 1. 进程排队2. 进程状态的表述2.1. 进程状态2.2 运行状态2.3. 阻塞状态2.4. 挂起状态 3. Linux下具体的进程状态3.1. 运行状态R3.2. 可中断睡眠状态S3.3. 不可中断睡眠状态D3.4. 停止状态T3.5. 死亡状态X3.6. 僵尸状态Z 4. 孤儿进程5. 优先级6. Linux的调度与切换6.1. 四个…

打破自闭症束缚:儿童康复案例揭秘

在浩瀚的康复领域中&#xff0c;有这样一所机构&#xff0c;它如同温暖的阳光&#xff0c;穿透自闭症的阴霾&#xff0c;为无数家庭带来了希望与光明。这&#xff0c;就是星启帆——国内规模较大的全寄宿制自闭症儿童康复机构&#xff0c;一个专注于中重度广泛性发育障碍儿童康…

ffmpeg更改视频的帧率

note 视频帧率调整 帧率(fps-frame per second) 例如&#xff1a;原来帧率为30&#xff0c;调整后为1 现象&#xff1a;原来是每秒有30张图像&#xff0c;调整后每秒1张图像&#xff0c;看着图像很慢 实现&#xff1a;在每秒的时间区间里&#xff0c;取一张图像…

MySQL之视图和索引

新建数据库 插入数据 处理表 1. 2. 3. mysql> alter table sc add unique index SC_INDEX (sno asc,cno asc); 4. mysql> create view stu_info as select student.sno,ssex,sc.cno,score from student join sc on student.snosc.sno; 5. mysql> drop index S…

JavaScript——变量与运算符、输入输出、判断、循环

文章目录 前言概述使用 js从文件引入 js 代码importjs 的作用变量计算输入格式化输出保留小数向上取整&#xff0c;向下取整条件判断循环总结 前言 为了监督自己的进度&#xff0c;把学习任务一点点都写出来&#xff0c;写多少就算多少&#xff0c;不求完美&#xff0c;只求完…

Adobe正通过数字体验改变世界

在当今这个数字化飞速发展的时代&#xff0c;Adobe公司正以其创新的技术和卓越的产品引领着创意设计领域的变革。从Adobe发布的生成式AI工具&#xff08;Adobe Firefly&#xff09;&#xff0c;到Illustrator和Photoshop的新AI功能&#xff0c;再到广受认可的Adobe国际认证&…

架构师第二周作业

目录 1.总结Dockerfile的指令和Docker的网络模式 1.1 Dockerfile指令 1.1.1 FROM &#xff1a;指定基础镜像&#xff0c;必须放在Dockerfile文件第一个非注释行 1.1.2 LABEL : 指定镜像元数据&#xff0c;如&#xff1a;镜像作者等 1.1.3 RUN &#xff1a;执行shell命令 1…

Python编程入门指南:从基础到高级

Python编程入门指南&#xff1a;从基础到高级 一、Python编程语言简介 1. Python是什么&#xff1f; Python是一门广泛使用的计算机程序编程语言&#xff0c;由荷兰人吉多范罗苏姆&#xff08;Guido van Rossum&#xff09;于1991年首次发行。Python是一种解释型、交互式、面…

抖音短视频seo矩阵系统源代码搭建---基于PHP语言开发部署

随着短视频市场的爆发式增长&#xff0c;越来越多的企业开始寻求在短视频领域建立自己的品牌形象&#xff0c;增加用户粘性和获取更多流量。为此&#xff0c;一套高效的抖音短视频seo矩阵系统源代码显得尤为重要。本文将介绍基于PHP语言的抖音短视频seo矩阵系统源代码开发&…

数据结构(5):树和二叉树

1 树的定义 1.1 树的基本概念 分支可以称为边&#xff0c;结点可以用于存放数据结构。 除了根节点&#xff0c;其他节点只有一个前驱&#xff01;&#xff01;&#xff01;&#xff01; 互不相交也就是 只有一个前驱结点&#xff01; 树应用的很广的 1.2 结点之间的关系 直接…

Infuse Pro for Mac全能视频播放器

Mac分享吧 文章目录 效果一、下载软件二、开始安装1、双击运行软件&#xff0c;将其从左侧拖入右侧文件夹中&#xff0c;等待安装完毕2、应用程序显示软件图标&#xff0c;表示安装成功 三、运行测试安装完成&#xff01;&#xff01;&#xff01; 效果 一、下载软件 下载软件…

什么是公司自建企业邮箱?自建企业邮箱有什么用?

什么是公司自建企业邮箱&#xff1f;公司自建企业邮箱有什么用途&#xff1f;一是品牌统一&#xff1b;二是安全性增强&#xff1b;三是定制化功能&#xff1b;四是控制与灵活性等等。哪些企业适合自建企业邮箱呢&#xff1f;本篇文章将为您一一解释。 一、什么是公司自建企业…

《Milvus Cloud向量数据库指南》——SPLADE:基于BERT的Learned稀疏向量技术深度解析

在自然语言处理(NLP)领域,随着深度学习技术的飞速发展,预训练语言模型如BERT(Bidirectional Encoder Representations from Transformers)已成为推动研究与应用进步的重要基石。BERT通过其强大的上下文感知能力,在多项NLP任务中取得了显著成效,尤其是在文本表示和语义理…

Cannot access org.springframework.context.ConfigurableApplicationContext

Cannot access org.springframework.context.ConfigurableApplicationContext SpringApplication.run曝红 解决方案&#xff1a; File -> Invalidate Cache and Restart 如果对你有用就点个赞&#xff01;

Platform Designer 自定义IP(用于纯RTL设计)

在开始菜单找到Quartus Prime工具&#xff0c;点击并打开。 点击Quartus菜单File——New&#xff1a; 选择Verilog HDL File&#xff0c;点击OK&#xff1a; 这是新建的.v文件如下&#xff1a; 在新建的.v文件中键入如下Verilog代码&#xff1a; module mux2x1( //模块的开头…

vue element-ui日期控件传参

前端&#xff1a;Vue element-ui <el-form-item label"过期时间" :rules"[ { required: true, message: 请选择过期时间, trigger: blur }]"><el-date-picker v-model"form.expireTime" type"date" format"yyyy-MM-dd&…

计算机实验室排课查询小程序的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;学生管理&#xff0c;教师管理&#xff0c;实验室信息管理&#xff0c;实验室预约管理&#xff0c;取消预约管理&#xff0c;实验课程管理&#xff0c;实验报告管理&#xff0c;报修信息管理&#xff0…