Spark 共享变量:广播变量与累加器解析

Spark 的介绍与搭建:从理论到实践_spark环境搭建-CSDN博客

Spark 的Standalone集群环境安装与测试-CSDN博客

PySpark 本地开发环境搭建与实践-CSDN博客

Spark 程序开发与提交:本地与集群模式全解析-CSDN博客

Spark on YARN:Spark集群模式之Yarn模式的原理、搭建与实践-CSDN博客

Spark 中 RDD 的诞生:原理、操作与分区规则-CSDN博客

Spark 中的 RDD 分区的设定规则与高阶函数、Lambda 表达式详解-CSDN博客

RDD 算子全面解析:从基础到进阶与面试要点-CSDN博客

PySpark 数据处理实战:从基础操作到案例分析-CSDN博客

Spark 的容错机制:保障数据处理的稳定性与高效性-CSDN博客

目录

一、需求背景

二、广播变量(Broadcast Variables)

(一)功能

(二)语法 / 用法

(三)示例代码修改

(四)本质与优势

三、累加器(Accumulators)

(一)需求示例

(二)原理与功能

(三)使用方法与示例代码修改

四、总结


        在 Spark 大数据处理框架中,共享变量是一个非常重要的概念。当我们处理一些涉及到不同计算节点(Executor)需要访问相同数据的场景时,共享变量就发挥了关键作用。本文将深入探讨 Spark 中的广播变量和累加器,包括它们的使用场景、原理以及如何在实际代码中应用。

一、需求背景

        假设我们有一份用户数据(user.tsv),其中包含用户的一些基本信息如用户 id、用户名、年龄和城市 id,同时我们还有一个城市字典(city_dict),它存储了城市 id 与城市名称的对应关系。我们的目标是将这两份数据进行处理,得到包含用户完整信息(用户 id、用户名、年龄、城市 id、城市名称)的结果集。

user.tsv数据如下

user001 陆家嘴 18 2
user002 羊毛 20 5
user003 爱丽丝 22 6
user004 蒸饭 24 8
user005 淘米 26 1
user006 小笼包 28 7
user007 凉粉 30 4
user008 泡腾片 25 10
user009 炒米 27 3
user010 颖火虫 29 9

 city中的字典如下

city_dict = {1: "北京",2: "上海",3: "广州",4: "深圳",5: "苏州",6: "无锡",7: "重庆",8: "厦门",9: "大理",10: "成都"}

示例代码如下

import os
# 导入pyspark模块
from pyspark import SparkContext, SparkConfif __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 获取 conf 对象# setMaster  按照什么模式运行,local  bigdata01:7077  yarn#  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核#  appName 任务的名字conf = SparkConf().setMaster("local[*]").setAppName("第一个Spark程序")# 假如我想设置压缩# conf.set("spark.eventLog.compression.codec","snappy")# 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字sc = SparkContext(conf=conf)fileRdd = sc.textFile("../datas/user.tsv")city_dict = {1: "北京",2: "上海",3: "广州",4: "深圳",5: "苏州",6: "无锡",7: "重庆",8: "厦门",9: "大理",10: "成都"}def getLine(line):list01 = line.split(" ")cityName = city_dict.get(int(list01[3]))# print(cityName)return line + " " + cityNamemapRdd = fileRdd.map(getLine)mapRdd.foreach(print)# 使用完后,记得关闭sc.stop()

结果如下

user007 凉粉 30 4 深圳
user008 泡腾片 25 10 成都
.....

        在 Spark 中,user_rdd 的计算处理在 Executor 中进行,而 city_dict 的数据存储在 Driver 的内存中。这就引发了一个问题:计算过程中每个 Task 是如何获取 city_dict 的数据呢?如果 city_dict 的数据量很大(例如 1G),每个 Task 都要从 Driver 中下载一份(假设存在多个 Task 导致总下载量达到 6G),那么网络传输的开销将非常大,性能会变得很差。

二、广播变量(Broadcast Variables)

(一)功能

        广播变量的主要功能就是将一个变量元素广播到每台 Worker 节点的 Executor 中。这样一来,每个 Task 就可以直接从本地读取数据,从而大大减少网络传输的 I/O。

(二)语法 / 用法

在 Spark 中使用广播变量,首先需要创建一个广播变量对象。例如:

broadcastValue = sc.broadcast(city_dict)

        这里的 sc 是 SparkContext 对象,city_dict 是我们想要广播的数据(在这个例子中是城市字典)。创建广播变量后,在需要使用该数据的地方,可以通过 broadcastValue.value 来获取广播的数据。

此链接是官方给的API文档:RDD Programming Guide - Spark 3.5.3 Documentation

(三)示例代码修改

        在我们的用户数据处理示例中,原始代码在处理每个用户数据行时,需要获取对应的城市名称。修改后的代码如下:

import os
# 导入pyspark模块
from pyspark import SparkContext, SparkConfif __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 获取 conf 对象# setMaster  按照什么模式运行,local  bigdata01:7077  yarn#  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核#  appName 任务的名字conf = SparkConf().setMaster("local[*]").setAppName("第一个Spark程序")# 假如我想设置压缩# conf.set("spark.eventLog.compression.codec","snappy")# 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字sc = SparkContext(conf=conf)fileRdd = sc.textFile("../datas/user.tsv",2)city_dict = {1: "北京",2: "上海",3: "广州",4: "深圳",5: "苏州",6: "无锡",7: "重庆",8: "厦门",9: "大理",10: "成都"}# 将一个变量广播出去,广播到executor中,不是task中city_dict_broad = sc.broadcast(city_dict)def getLine(line):list01 = line.split(" ")#cityName = city_dict.get(int(list01[3]))# 使用广播变量的变量获取数据cityName = city_dict_broad.value.get(int(list01[3]))# print(cityName)return line + " " + cityNamemapRdd = fileRdd.map(getLine)mapRdd.foreach(print)# 释放广播变量city_dict_broad.unpersist()# 使用完后,记得关闭sc.stop()

(四)本质与优势

广播变量本质上是一种优化手段。它的优势主要体现在两个方面:

  1. 减少数据传输量:通过广播一个 Driver 中较大的数据,可以减少每次从 Driver 复制的数据量,降低网络 I/O 损耗,从而提高整体性能。
  2. 优化表连接:在两张表进行 Join 操作时,如果一张表较小,可以将小表进行广播,然后与大表的每个部分进行 Join,这样就可以避免 Shuffle Join(Reduce Join),进一步提升性能。

需要注意的是,广播变量是只读变量,不能被修改

三、累加器(Accumulators)

(一)需求示例

        假设我们有搜狗日志的数据,现在需要统计 10 点搜索的数据一共有多少条。如果按照常规的方式编写代码,可能会出现问题。例如:

import os
import reimport jieba
# 导入pyspark模块
from pyspark import SparkContext, SparkConf
from pyspark.storagelevel import StorageLevelif __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 获取 conf 对象# setMaster  按照什么模式运行,local  bigdata01:7077  yarn#  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核#  appName 任务的名字conf = SparkConf().setMaster("local[*]").setAppName("搜索热词案例")# 假如我想设置压缩# conf.set("spark.eventLog.compression.codec","snappy")# 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字sc = SparkContext(conf=conf)mapRdd = sc.textFile("../../datas/zuoye/sogou.tsv",minPartitions=8) \.filter(lambda line:len(re.split("\s+",line)) == 6) \.map(lambda line:(re.split("\s+",line)[0],re.split("\s+",line)[1],re.split("\s+",line)[2][1:-1])).persist(StorageLevel.MEMORY_AND_DISK_2)# 统计一天每小时点击量并按照点击量降序排序_sum = 0def sumTotalLine(tuple1):global _sum # 把_sum 设置为全局变量timeStr = tuple1[0] # 10:19:18if timeStr[0:2] == '10':_sum += 1mapRdd.foreach(lambda tuple1:sumTotalLine(tuple1))print(_sum) # 结果是0# 使用完后,记得关闭

        在 Spark 中,上述代码最终结果会是 0。因为 sum = 0 是在 Driver 端的内存中的,Executor 中程序对其进行累加操作并不能改变 Driver 端的结果。

(二)原理与功能

        累加器的功能是实现分布式的计算。它在每个 Task 内部构建一个副本进行累加,并且在最后返回每个 Task 的结果并进行合并。

官方API截图

(三)使用方法与示例代码修改

在 Spark 中使用累加器,首先需要创建一个累加器对象:

accumulator = sc.accumulator(0)

然后在需要进行计数累加的地方使用 accumulator.add(1)。例如:

def getLines(line, accumulator):accumulator.add(1)# 对用户数据 RDD 进行处理并统计数据量
fileRdd.foreach(lambda line: getLines(line, accumulator))

最后可以通过 accumulator.value 获取累加的结果。完整代码如下:

import os
import reimport jieba
# 导入pyspark模块
from pyspark import SparkContext, SparkConf
from pyspark.storagelevel import StorageLevelif __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 获取 conf 对象# setMaster  按照什么模式运行,local  bigdata01:7077  yarn#  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核#  appName 任务的名字conf = SparkConf().setMaster("local[*]").setAppName("搜索热词案例")# 假如我想设置压缩# conf.set("spark.eventLog.compression.codec","snappy")# 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字sc = SparkContext(conf=conf)accCounter = sc.accumulator(0)mapRdd = sc.textFile("../../datas/zuoye/sogou.tsv",minPartitions=8) \.filter(lambda line:len(re.split("\s+",line)) == 6) \.map(lambda line:(re.split("\s+",line)[0],re.split("\s+",line)[1],re.split("\s+",line)[2][1:-1])).persist(StorageLevel.MEMORY_AND_DISK_2)# 统计一天每小时点击量并按照点击量降序排序#_sum = 0def sumTotalLine(tuple1):#global _sum # 把_sum 设置为全局变量timeStr = tuple1[0] # 10:19:18if timeStr[0:2] == '10':accCounter.add(1)mapRdd.foreach(lambda tuple1:sumTotalLine(tuple1))print(accCounter.value) # 104694# 假如我不知道累加器这个操作,这个题目怎么做?print(mapRdd.filter(lambda tuple1: tuple1[0][0:2] == '10').count())# 使用完后,记得关闭sc.stop()

四、总结

        Spark 中的广播变量和累加器是处理分布式计算中共享数据问题的有效工具。广播变量主要用于在多个 Task 之间共享只读数据,减少网络传输开销;累加器则用于实现分布式环境下的计数或累加操作,确保在不同 Task 中的计算结果能够正确地合并到 Driver 端。在实际的 Spark 大数据处理项目中,合理地运用广播变量和累加器能够显著提高程序的性能和计算的准确性。

        希望通过本文的介绍,读者能够对 Spark 中的共享变量有更深入的理解,并能够在自己的项目中熟练运用广播变量和累加器来优化数据处理流程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/11866.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

基于Matlab 疲劳驾驶检测

Matlab 疲劳驾驶检测 课题介绍 该课题为基于眼部和嘴部的疲劳驾驶检测。带有一个人机交互界面GUI,通过输入视频,分帧,定位眼睛和嘴巴,通过眼睛和嘴巴的张合度,来判别是否疲劳。 二、操作步骤 第一步:最…

强化学习不愧“顶会收割机”!2大创新思路带你上大分,毕业不用愁!

强化学习之父Richard Sutton悄悄搞了个大的,提出了一个简单思路:奖励聚中。这思路简单效果却不简单,等于是给几乎所有的强化学习算法上了一个增强buff,所以这篇论文已经入选了首届强化学习会议(RLC 2024)&a…

个人记录。改错huggingface,离线使用

huggingface_hub.utils._errors.LocalEntryNotFoundError: Connection error, and we cannot find the requested files in the disk cache. Please try again or make sure your Internet connection is on. 下载 true改false

【计算机网络】网络框架

一、网络协议和分层 1.理解协议 什么是协议?实际上就是约定。如果用计算机语言进行表达,那就是计算机协议。 2.理解分层 分层是软件设计方面的优势(低耦合);每一层都要解决特定的问题 二、网络传输基本流程 1.预备…

C++练习 字符串反转

从界面上输入一个C风格的字符串&#xff0c;如果输入的是"abc"&#xff0c;反转后"cba"。 要求&#xff1a; 1&#xff09;反转的结果存放在另一字符串中。 2&#xff09;原地反转&#xff0c;不借助其它的字符串。 #include <iostream> using n…

Postman常见问题及解决方法

软件测试资料领取&#xff1a;[内部资源] 想拿年薪40W的软件测试人员&#xff0c;这份资料必须领取~ 软件测试面试刷题工具&#xff1a;软件测试面试刷题【800道面试题答案免费刷】 1、网络连接问题 如果Postman无法发送请求或接收响应&#xff0c;可以尝试以下操作&#xf…

LED和QLED的区别

文章目录 1. 基础背光技术2. 量子点技术的引入3. 色彩表现4. 亮度和对比度5. 能效6. 寿命7. 价格总结 LED和 QLED都是基于液晶显示&#xff08;LCD&#xff09;技术的电视类型&#xff0c;但它们在显示技术、色彩表现和亮度方面有一些关键区别。以下是两者的详细区别&#xff…

光流法(Optical Flow)

一、简介 光流法&#xff08;Optical Flow&#xff09;是一种用于检测图像序列中像素运动的计算机视觉技术。其基于以下假设&#xff1a; 1.亮度恒定性假设&#xff1a;物体在运动过程中&#xff0c;其像素值在不同帧中保持不变。 2.空间和时间上的连续性&#xff1a;相邻像素之…

400. 第 N 位数字

目录 题目解法 题目 给你一个整数 n &#xff0c;请你在无限的整数序列 [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, …] 中找出并返回第 n 位上的数字。 解法 class Solution { public:int findNthDigit(int n) {int low 1, high 9;while (low < high) {int mid (high - lo…

FlinkPipelineComposer 详解

FlinkPipelineComposer 详解 原文 背景 在flink-cdc 3.0中引入了pipeline机制&#xff0c;提供了除Datastream api/flink sql以外的一种方式定义flink 任务 通过提供一个yaml文件&#xff0c;描述source sink transform等主要信息 由FlinkPipelineComposer解析&#xff0c…

10款音频剪辑推荐!!你的剪辑好帮手!!

在如今的数据化浪潮中&#xff0c;工作已经采用了线上线下相结合。我的工作就需要借助一些剪辑工具&#xff0c;来实现我对音频工具的剪辑。我初次接触到音频剪辑也是因为工作需求&#xff0c;从起初我只是一个音频剪辑的小白&#xff0c;这些工具的协助。吸引着我。对于这些工…

智能检测技术与传感器(热电传感器四个定律)

热电传感器&#xff1a; 两种不同的导体两端相互紧密地连接在一起&#xff0c;组成一个闭合回路。当两接点温度不等时&#xff08;设 &#xff09;&#xff0c;回路中就会产生大小和方向与导体材料及两接点的温度有关的电动势&#xff0c;从而形成电流&#xff0c;这种现象称为…

Ubuntu 20.04配置ollama并下载安装调用本地大语言模型

Ubuntu 20.04配置ollama并下载安装调用本地大语言模型 ollama 介绍(来自ChatGPT)主要特点 ollama开发环境预配置ollama在ubuntu下的安装直接安装压缩包安装创建开机ollama的脚本启动ollama ollama在ubuntu下的运行 ollama 介绍(来自ChatGPT) Ollama 是一种新的本地语言模型管理…

多点支撑:滚珠导轨的均匀分布优势!

滚珠导轨的滚珠稳定性可以有效保持滚珠导轨的稳定运行&#xff0c;减少滚珠脱落的风险&#xff0c;确保设备的长期稳定性和可靠性。事实上&#xff0c;滚珠导轨的滚珠稳定性主要依赖于以下几个方面&#xff1a; 1、精密的制造工艺&#xff1a;滚珠导轨的导轨和滑块通常采用高精…

轻松搭建在线文档管理系统:BookStack的Docker部署与远程访问指南

前言 本文将介绍如何在Linux系统上利用Docker本地部署在线文档管理系统BookStack&#xff0c;并通过cpolar内网穿透工具实现异地远程访问&#xff0c;无需公网IP或复杂的路由器设置。 BookStack是一个开源的知识管理平台&#xff0c;基于Laravel Vue.js构建。它提供了一个简…

【代码及应用】10个最常用的Python包!

世界上有超过200,000个Python程序包&#xff08;这只是基于官方的Python程序包索引PyPI托管的程序包&#xff09;这就引出了一个问题&#xff1a;拥有这么多的软件包&#xff0c;每个Python程序员都需要学习哪些软件包是最重要的&#xff1f; 包含编程资料、学习路线图、源代码…

Java面试要点01- 基本数据类型与包装类详解

本文目录 一、引言二、基本数据类型详解2.1 数值类型2.2 代码示例 三、包装类详解3.1 包装类介绍3.2 包装类的主要用途3.3 代码示例 四、注意事项和最佳实践4.1 数值计算注意事项4.2 包装类使用建议 五、面试重点详解5.1 基本类型和包装类的区别5.2 自动装箱和拆箱的原理5.3 In…

铠侠代理商 | KIOXIA SLC闪存选型和应用

一、铠侠&#xff08;KIOXIA&#xff09;的SLC闪存系列 铠侠SLC NAND可以高速写入大量数据&#xff0c;具有高的擦写次数耐久性和可靠性的1位/单元非易失性存储器。铠侠SLC NAND闪存产品系列具有多种容量和封装形式的选择&#xff0c;可满足嵌入式市场的不同需求。 铠侠的SLC…

ts定义接口返回写法

接口&#xff08;未进行ts定义&#xff09; export async function UserList(params: {// keyword?: string;current?: number;pageSize?: number;},// options?: { [key: string]: any }, ) {return request<API1.UserList>(http://geek.itheima.net/v1_0/mp/artic…

#多语言爬取京东价格信息 python 比价api接入指南

以下是使用 Python 接入京东价格信息比价 API 的一般指南&#xff1a; 寻找合适的比价 API 服务&#xff1a; 市面上有一些第三方数据服务提供商提供京东比价 API。这些服务通常需要你注册账号并申请 API Key 和 API Secret 等凭证&#xff0c;以便进行接口调用。你可以根据自己…