python学习之路 - PySpark快速入门

目录

  • 一、PySpark实战
    • 1、前言介绍
    • 2、基础准备
        • a、pySpark库的安装
        • b、构建pySpark执行环境入口对象
        • c、pySpark编程模型
    • 3、数据输入
        • a、python数据容器转RDD对象
        • b、读取文件内容转RDD对象
    • 4、数据计算
        • a、map算子
        • b、flatMap算子
        • c、reduceByKey算子
        • d、综合案例
        • e、filter算子
        • f、distinct算子
        • g、sortBy算子
    • 5、数据输出
        • a、collect算子
        • b、reduce算子
        • c、take算子
        • e、count算子
        • f、saveAsTextFile算子

一、PySpark实战

1、前言介绍

Spark:Spark是用于大规模数据处理的统一分析引擎,是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB、EB等海量数据

pySpark:Spark对python的支持,就体现在python的第三方库pySpark上

2、基础准备

a、pySpark库的安装

命令:pip install pyspark
不知道操作步骤的可以看此文章 第六节 安装第三方python包

b、构建pySpark执行环境入口对象
from pyspark import SparkConf,SparkContext
#创建SparkConf类对象   setMaster:设置运行模式   setAppName:当前spark类的名称
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")       
#基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)
#打印pySpark的运行版本
print(sc.version)
#停止Sparkcontext对象的运行
sc.stop()
c、pySpark编程模型
  • 数据输入:通过SparkContext类对象的成员方法完成数据的读取操作,读取后得到RDD类对象
  • 数据处理:通过RDD类对象的成员方法完成各种数据计算的需求
  • 数据输出:将处理完成后的RDD对象调用各种成员方法完成写出文件等操作

3、数据输入

a、python数据容器转RDD对象
  • 支持的数据容器有:list,tuple,set,dict,str
  • str容器会输出单个字符,字典容器会输出所有key,其他容器会输出原本内容
from pyspark import SparkConf,SparkContext#定义数据容器
list = ['1', '2', '3']
tuple = ('1', '2', '3')
set = {'1', '2', '3'}
dict = {'1': 'abc', '2': 'def', '3': 'ghi'}
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
#将数据容器转换为RDD
rdd = sc.parallelize(dict)
print(rdd.collect())
sc.stop()
b、读取文件内容转RDD对象
  • 文件的每一行会变为一个元素

如创建一个文件,内容如图。
用下面代码取文件内容转换为RDD对象并输出

在这里插入图片描述

from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.textFile(文件地址)
print(rdd.collect())
sc.stop()输出结果为:
['这是一个文件内容。', '但是', '这才是第三行内容', '你猜这是第几行', '对了,这是第五行']

4、数据计算

RDD中内置了丰富的成员方法,也叫“算子”

a、map算子
  • 功能:将RDD的数据一条一条处理(处理的逻辑是基于map算子中接收的处理函数),返回新的RDD
  • 多个map方法之间可以链式调用

案例1:将list中的每个元素都乘以10

from pyspark import SparkConf,SparkContext
#如果报错Python worker failed to connect back,需要引入os设置python安装位置
import os
os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"list = ['1', '2', '3']
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(list)
#写法一
# rdd2 = rdd.map(lambda x:int(x) * 10)#写法二
def func(x):return int(x) * 10
rdd2 = rdd.map(func)
rdd2 = rdd.map(func)
print(rdd2.collect())
sc.stop()结果为:
[10, 20, 30]

案例2:将list中的每个元素都先乘以10,再加上5,分为两个map写

from pyspark import SparkConf,SparkContext
import os
os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"list = ['1', '2', '3']
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(list)
rdd2 = rdd.map(lambda x:int(x) * 10).map(lambda x: int(x) + 5)		#这里支持链式调用
print(rdd2.collect())
sc.stop()
b、flatMap算子
  • 功能:对RDD执行map操作,然后解除嵌套操作
  • 解除嵌套:假如输入的list的多层嵌套的,那么最后的结果全部元素都为list的一层

案例:将多层嵌套的 list 取出所有元素放到一层中

from pyspark import SparkConf,SparkContext
import os
os.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"list = [[1,2,3],[4,5,6],[7,8]]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(list)
#写法一
rdd2 = rdd.flatMap(lambda x: x)
print(rdd2.collect())
sc.stop()结果为:
[1, 2, 3, 4, 5, 6, 7, 8]
c、reduceByKey算子
  • 功能:针对KV型的RDD,自动按照key分组,然后根据提供的聚合逻辑,完成组内数据(value)的聚合操作
  • KV型的RDD其实就是二元元组,比如:[(‘a’,1) , (‘b’,1) , (‘c’,1)],每个元组中第一个值为key,第二个值为value

案例:将男女分组,并且计算两组的分数总和

from pyspark import SparkConf, SparkContext
import osos.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"list = [('男', 99), ('男', 88), ('女', 77), ('女', 66), ('男', 55)]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(list)
# 写法一
rdd2 = rdd.reduceByKey(lambda a, b: a + b)
print(rdd2.collect())
sc.stop()结果为:
[('女', 143), ('男', 242)]
d、综合案例

读取文件内容,统计各个元素出现次数,文件内容如下:

在这里插入图片描述

from pyspark import SparkConf, SparkContext
import osos.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"list1 = []
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
#读取文件内容
rdd = sc.textFile("C://Users//HLY//Desktop//test.txt")
#先将内容切割成单个元素并一层展示,再将元素设置成二元元组,最后将元素分组统计
rdd2 = rdd.flatMap(lambda x : x.strip().split(" ")).map(lambda x : (x,1)).reduceByKey(lambda a, b: a + b)
print(rdd2.collect())
sc.stop()结果为:
[('test2', 3), ('test3', 4), ('test', 3), ('test1', 3), ('test4', 4), ('test5', 4)]
e、filter算子
  • 功能:过滤想要的数据进行保留

案例:过滤出所有偶数

from pyspark import SparkConf, SparkContext
import osos.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"list1 = [1,2,3,4,5,6,7,8,9,10]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
#读取文件内容
rdd = sc.parallelize(list1)
rdd2 = rdd.filter(lambda x : x % 2 == 0)
print(rdd2.collect())
sc.stop()结果为:
[2, 4, 6, 8, 10]
f、distinct算子
  • 功能:对RDD中的数据进行去重,返回新的RDD

案例:对已有的列表进行去重

from pyspark import SparkConf, SparkContext
import osos.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"list1 = [1,1,2,2,2,2,3,3,3,4,4,4]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
#读取文件内容
rdd = sc.parallelize(list1)
rdd2 = rdd.distinct()
print(rdd2.collect())
sc.stop()结果为:
[1, 2, 3, 4]
g、sortBy算子
  • 功能:对RDD数据进行排序,基于指定的排序依据
  • 参数:
    • func:告知RDD是对那个数据进行排序,比如lambda x:x[1] 表示对rdd中第二列元素进行排序
    • ascending:True升序,False降序
    • numPartitions:用多少分区排序,单个分区时传1

案例:对给出的二元集合根据第二个元素进行降序排列

from pyspark import SparkConf, SparkContext
import osos.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"list1 = [('test', 1), ('test1', 5), ('test3', 2), ('test4', 4), ('test5', 8), ('test6', 7), ('test7', 6)]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 读取文件内容
rdd = sc.parallelize(list1)
rdd2 = rdd.sortBy(lambda x: x[1], ascending=False)
print(rdd2.collect())
sc.stop()结果为:
[('test5', 8), ('test6', 7), ('test7', 6), ('test1', 5), ('test4', 4), ('test3', 2), ('test', 1)]

5、数据输出

a、collect算子
  • 功能:将RDD各个分区内的数据统一收集到Driver当中,形成一个List对象

案例:输出RDD的内容

from pyspark import SparkConf, SparkContext
import osos.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"list1 = [1,2,3,4,5]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 读取文件内容
rdd = sc.parallelize(list1)
print(rdd.collect())
sc.stop()结果为:
[1,2,3,4,5]
b、reduce算子
  • 对RDD的全部数据按照传入的逻辑进行聚合,返回一个数字

案例:计算列表中的所有元素和

from pyspark import SparkConf, SparkContext
import osos.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"list1 = [1, 2, 3, 4, 5]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 读取文件内容
rdd = sc.parallelize(list1)
num = rdd.reduce(lambda a, b: a + b)
print(num)
sc.stop()结果为:
15
c、take算子
  • 功能:取RDD的前N个元素,组成 List 返回

案例:取出列表前3个元素

from pyspark import SparkConf, SparkContext
import osos.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"list1 = [1, 2, 3, 4, 5]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 读取文件内容
rdd = sc.parallelize(list1)
list = rdd.take(3)
print(list)
sc.stop()结果为:
[1, 2, 3]
e、count算子
  • 功能:计算RDD有多少条数据,返回一个数字

案例:获取列表中的元素个数

from pyspark import SparkConf, SparkContext
import osos.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"list1 = [1, 2, 3, 4, 5]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 读取文件内容
rdd = sc.parallelize(list1)
num = rdd.count()
print(num)
sc.stop()结果为:
5
f、saveAsTextFile算子
  • 功能:将RDD的数据写入文本文件中
  • 执行此方法需要安装hadoop环境,具体配置过程可以看 这篇文章
  • 其输出内容是根据区分决定的,有多少分区就会输出多少个文件。内容会均匀分摊到各个文件中。分区数默认与电脑的CPU内核一致

案例1:输出列表内容到各个文件中

from pyspark import SparkConf, SparkContext
import osos.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"
os.environ["HADOOP_HOME"] = "D:/hadoop/hadoop-3.0.0"list1 = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
# 读取文件内容
rdd = sc.parallelize(list1)
rdd.saveAsTextFile("C:/Users/HLY/Desktop/test")
sc.stop()

在这里插入图片描述
结果会生成16个内容文件和2个状态文件,并且16个内容文件中每个文件中都有一个数字

案例2:将列表内容输出到一个文件中

#方法一:配置全局并行度为1from pyspark import SparkConf, SparkContext
import osos.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"
os.environ["HADOOP_HOME"] = "D:/hadoop/hadoop-3.0.0"list1 = [1, 2, 3, 4, 5]
#设置全局并行度为1
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app").set("spark.default.parallelism", "1")
sc = SparkContext(conf=conf)
# 读取文件内容
rdd = sc.parallelize(list1)
rdd.saveAsTextFile("C:/Users/HLY/Desktop/test")
sc.stop()
#方法二:设置分区数为1from pyspark import SparkConf, SparkContext
import osos.environ["PYSPARK_PYTHON"] = "F:/Python/Python311/python.exe"
os.environ["HADOOP_HOME"] = "D:/hadoop/hadoop-3.0.0"list1 = [1, 2, 3, 4, 5]
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)
#设置分区数为1
rdd = sc.parallelize(list1,numSlices= 1)
rdd.saveAsTextFile("C:/Users/HLY/Desktop/test")
sc.stop()

在这里插入图片描述

最后会生成1个结果文件,3个其他文件,并且内容都会在 part-00000 文件中显示

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1523166.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

Flutter修改Android包名

一、前言 我在将Android打包上传到google商店的时候提示我“com.example”已受到限制,请换一个软件包名称。“的错误。因此我们需要去修改flutter的Android包名。 二、操作流程 1.修改路径 android ——> app ——> src ——> debug ——> AndroidMa…

K12智慧校园云平台源码,智慧校园小程序源码,支持PC+小程序,提供丰富的API接口,支持和其他系统的融合对接

智慧校园平台是目前教育信息化领域的热点之一。随着数字化转型的加速,越来越多的学校开始寻求解决方案,以提高教育管理的效率和质量。 智慧校园电子班牌系统是一种集成信息化技术、物联网、智能化的教育管理解决方案,它在校园内实现了信息共…

信捷 XD PLC 双精度浮点数的初始化及传输

在用信捷XDH PLC进行运动控制时,加减速时间是个64位的双精度的浮点数,那么如果不在人机界面写到PLC,PLC自身也是可以初始化的,比如0.005,怎么办呢。 用FLT指令把 整数出单精度浮点数,然后EDIV指令把两个单精度浮点数相…

冲击大厂算法面试=>链表专题【链表反转之局部反转升级版】

目录标题 多重局部反转之K 个一组翻转链表上代码题解呀实在不会的时候记住 多重局部反转之K 个一组翻转链表 上代码 整个函数通过不断地检查剩余节点数量和进行局部反转,实现了链表的分组反转,最后返回反转后的链表。这种方法有效地利用了额外的 pre 和…

idea插件【1】Smart Tomcat

一、简介 在开发过程中除了springboot项目支持jar运行,很多场景下需要使用到tomcat外置服务部署,此时我们可以使用idea插件Smart Tomcat (Smart Tomcat 插件是一个用于简化与 Tomcat 服务器交互的工具,它提供了一些额外的功能来增…

opencv之几何变换

文章目录 1 前言2 线性几何变换的主要类型2.1 平移 (Translation):2.1.1 定义2.1.2代码 2.2 缩放 (Scaling):2.2.1 定义2.2.2 代码 2.3 旋转 (Rotation):2.3.1 定义2.3.2 代码 2.4 仿射变换 (Affine Transformation):2.4.1 定义2.…

全国地市未来产业水平数据集(2008-2023年)

未来产业,作为驱动经济社会高质量发展的核心引擎,是指依托科技创新和模式创新,引领全球新一轮科技革命和产业变革,具有前瞻性、先导性、战略性的新兴产业领域。也是实现生产力大解放,推动生产力质的跃迁并形成新质生产…

wacat - 一款开源随机测试工具

想象一下,你离开电脑一会儿去拿一杯咖啡。与此同时,你的猫走过键盘,引发了一些混乱。 wacat 应用程序: • 访问你的网页应用的根网址 • 随机访问应用中的每个链接 • 在表单中添加随机文本输入 • 从下拉菜单、复选框等中选择…

使用Redis如何实现集群会话同步?

使用Redis如何实现集群会话同步? 1、为什么选择Redis?2、如何实现?1. 环境准备2. 配置Web服务器3. 测试与验证4. 监控与优化 💖The Begin💖点点关注,收藏不迷路💖 在分布式Web应用中&#xff0c…

springboot高校实验室教学管理系统的设计和实现

基于springbootvue高校实验室教学管理系统的设计和实现(源码L文ppt)4-045 4 系统总体设计 此次高校实验室教学管理系统通过springboot框架。springboot适合快速构建Web应用。springboot将B/S设计模式中的视图分成了View模块和Template模块两部分,将动态的逻辑处理…

51单片机.之蜂鸣器振动播放歌曲

蜂鸣器发声是通过喇叭振动发声的&#xff0c;通电产生磁场&#xff0c;磁铁吸收&#xff0c;而振动。不断释放&#xff0c;吸收。 1、蜂鸣器发声&#xff0c;播放不同频率的声音逐渐变尖 #include<reg52.h>sbit BUZZ P1^6;unsigned char T0RH0; unsigned char T0RL0; v…

SpringCloud开发实战(二):通过RestTemplate实现远程调用

目录 SpringCloud开发实战&#xff08;一&#xff09;&#xff1a;搭建SpringCloud框架 RestTemplate介绍 RestTemplate 是 Spring 框架中的一个类&#xff0c;它用于促进 HTTP 请求的发送和接收&#xff0c;并且简化了与 RESTful 服务的交互。RestTemplate 提供了许多便利的方…

Redis Zset 类型:Score 属性在数据排序中的作用

Zset 有序集合 一 . zset 的引入二 . 常见命令2.1 zadd、zrange2.2 zcard2.3 zcount2.4 zrevrange、zrangebyscore2.5 zpopmax、zpopmin2.6 bzpopmax、bzpopmin2.7 zrank、zrevrank2.8 zscore2.9 zrem、zremrangebyrank、zremrangebyscore2.10 zincrby2.11 集合间操作交集 : zi…

【算法】PageRank

一、引言 PageRank是由谷歌创始人拉里佩奇和谢尔盖布林在斯坦福大学读研究生时发明的一种算法&#xff0c;用于衡量网页的重要性。它基于一个简单的假设&#xff1a;更重要的网页会有更多的链接指向它。 二、算法原理 PageRank算法的核心思想是&#xff0c;一个网页的重要性可以…

沸点 | LDBC 第18届 TUC 会议召开,专家孙宇熙受邀参加并发表演讲

图数据管理领域国际权威组织LDBC&#xff08;Linked Data Benchmark Council&#xff09;于8月30日至31日在广州举办了第18届LDBC TUC会议。作为图数据库领域的创新引领者&#xff0c;嬴图受邀参加此次盛会&#xff0c;国际高性能计算与存储系统专家、大数据专家、图专家及嬴图…

【从零开始学爬虫】采集58同城房源数据

本文以采集北京市58同城房源数据为例进行演示&#xff1a; l 采集网站 【场景描述】采集58同城房源数据。 【使用工具】前嗅ForeSpider数据采集系统 http://www.forenose.com/view/commodity/forespider.html 【入口网址】 https://bj.58.com/xiaoqu/?PGTID0d000000-000…

三、数组————相关概念详解

数组 前言一、数据理论基础二、数组常用操作2.1 初始化数组2.2 访问数组中的元素2.3 插入元素2.4 删除元素 三、数组扩展3.1 遍历数组3.2 数组扩容 总结1、数组的优点2、数组的不足 前言 在数据结构中&#xff0c;数组可以算得上最基本的数据结构。数组可以用于实现栈、队列、…

YoloV10改进策略:卷积篇|基于PConv的二次创新|附结构图|性能和精度得到大幅度提高(独家原创)

文章目录 摘要论文指导PConv在论文中的描述改进YoloV10的描述改进代码与结构图改进方法测试结果总结摘要 在PConv的基础上做了二次创新,创新后的模型不仅在精度和速度上有了质的提升,还可以支持Stride为2的降采样。 改进方法简单高效,需要发论文的同学不要错过! 论文指导…

机器学习实战篇——肿瘤良性/恶性分类器(二元逻辑回归)

机器学习之实战篇——肿瘤良性/恶性分类器&#xff08;二元逻辑回归&#xff09; 前言数据集和实验文件下载相关文章推荐实验过程导入相关模块数据预处理手写二元逻辑回归模型&#xff08;小批量梯度下降&#xff09;sklearn逻辑回归器 前言 实验中难免有许多缺陷和错误&#…

Mac M1安装Hive

一、下载解压Hive 1.官网地址 https://dlcdn.apache.org/hive/ 2.选择对应版本进行下载&#xff0c;这里我以3.1.3为例&#xff1b; 3.下载好后&#xff0c;进行解压&#xff0c;并重命名为hive-3.1.3&#xff0c;放到资源库目录下&#xff1b; 二、配置系统环境 1.打开~/…