当前位置: 首页 > news >正文

Spark-Streaming核心编程(四)总结

有状态转化操作 - UpdateStateByKey

功能描述

UpdateStateByKey原语用于在DStream中跨批次维护状态,例如流计算中的累加wordcount

它允许对一个状态变量进行访问和更新,适用于键值对形式的DStream

工作原理

给定一个由(键,事件)对构成的DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数。

构建出一个新的DStream,其内部数据为(键,状态)对。

使用步骤

定义状态:状态可以是一个任意的数据类型。

定义状态更新函数:使用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。

配置检查点目录:updateStateByKey需要使用检查点来保存状态。

示例代码

scalaCopy Code

val updateFunc = (values: Seq[Int], state: Option[Int]) => {

val currentCount = values.foldLeft(0)(_ + _)

val previousCount = state.getOrElse(0)

Some(currentCount + previousCount)

}

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("update")

val ssc = new StreamingContext(sparkConf, Seconds(5))

ssc.checkpoint("./ck")

val lines = ssc.socketTextStream("node01", 9999)

val words = lines.flatMap(_.split(" "))

val pairs = words.map((_, 1))

val stateDStream = pairs.updateStateByKeyInt](updateFunc)

stateDStream.print()

ssc.start()

ssc.awaitTermination()

窗口操作 - Window Operations

功能描述

窗口操作允许设置窗口的大小和滑动窗口的间隔,以动态地获取当前Streaming的状态。

参数说明

窗口时长:计算内容的时间范围。

滑动步长:触发计算的间隔。

这两者都必须为采集周期大小的整数倍。

示例代码

scalaCopy Code

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("window")

val ssc = new StreamingContext(sparkConf, Seconds(3))

ssc.checkpoint("./ck")

val lines = ssc.socketTextStream("node01", 9999)

val words = lines.flatMap(_.split(" "))

val pairs = words.map((_, 1))

val wordCounts = pairs.reduceByKeyAndWindow((a: Int, b: Int) => (a + b), Seconds(12), Seconds(6))

wordCounts.print()

ssc.start()

ssc.awaitTermination()

DStream输出操作

输出操作的重要性

输出操作指定了对流数据经转化操作得到的数据所要执行的操作。

RDD中的惰性求值类似,如果没有执行输出操作,DStream将不会被求值。

常见的输出操作

print():在驱动结点上打印DStream中每一批次数据的最开始10个元素,用于开发和调试。

saveAsTextFiles(prefix, [suffix]):以text文件形式存储DStream的内容。

saveAsObjectFiles(prefix, [suffix]):以Java对象序列化的方式存储数据。

saveAsHadoopFiles(prefix, [suffix]):将数据保存为Hadoop文件。

foreachRDD(func):最通用的输出操作,对DStream中的每个RDD运行任意计算。可以将数据推送到外部系统,如MySQL数据库。

使用注意事项

连接操作不能写在driver层面(序列化问题)。

避免在foreach中对每个RDD中的每条数据都创建连接,效率较低。

可以使用foreachPartition在分区层面创建连接。

http://www.xdnf.cn/news/177085.html

相关文章:

  • Revive 中的 Precompile 合约:实现与调用机制
  • 学习海康VisionMaster之路径提取
  • 怎么检测代理IP延迟?如何选择低延迟代理?
  • 《明解C语言入门篇》读书笔记四
  • 总线位宽不变,有效数据位宽变化的缓存方案
  • 颠覆传统微商!开源AI智能名片链动2+1模式S2B2C商城小程序:重构社交电商的“降维打击”革命
  • 常见锁策略
  • 再学GPIO(二)
  • 02 业务流程架构
  • npm、pnpm 和 yarn 包管理工具
  • 【密码学——基础理论与应用】李子臣编著 第八章 SM2公钥密码算法 课后习题
  • LeetCode3☞无重复字符的最长子串
  • 辞九门回忆
  • 深入理解编程中的同步与异步:原理、区别及实战应用
  • Go 语言中的 `select` 语句详解
  • CSS元素动画篇:基于当前位置的变换动画(四)
  • 加密算法 AES、RSA、MD5、SM2 的对比分析与案例(AI)
  • (七)RestAPI 毛子(Http 缓存/乐观锁/Polly/Rate limiting)
  • 【学习笔记1】一站式大语言模型微调框架LLaMA-Factory
  • Vue2 与 Vue3 深度对比与技术解析
  • 黑马点评redis改 part 6
  • 一周学会Pandas2 Python数据处理与分析-Pandas2数据信息查看操作
  • 语音识别质量的跟踪
  • 力扣HOT100之链表:23. 合并 K 个升序链表
  • 树状数组单点操作+前缀K差分->区间K操作 -#131-#132
  • SpringBoot + SSE 实时异步流式推送
  • Linux内核中的编译时安全防护:以网络协议栈控制块校验为例
  • mAh 与 Wh:电量单位的深度解析
  • 【Pandas】pandas DataFrame rtruediv
  • 全网直播推介会,九识智能与申通快递达成全面战略合作