大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Flink Sink JDBC
  • Flink Sink Kafka

在这里插入图片描述

注意事项

DataSetAPI 和 DataStream API一样有三个部分组成,各部分的作用对应一致,此处不再赘述。

FlinkDataSet

在 Apache Flink 中,DataSet API 是 Flink 批处理的核心接口,它主要用于处理静态数据集。虽然 Flink 的 DataStream API 被广泛用于流式数据处理,但 DataSet API 适用于大规模批处理场景,如数据清洗、ETL、分析等。虽然近年来 Flink 更多地向流处理方向发展,但批处理仍然是数据处理中的一个重要场景。

DataSource

对DataSet批处理而言,较为频繁的操作是读取HDFS中的文件数据,因为这里主要介绍两个 DataSource 组件:

  • 基于集合:fromCollection 主要是为了方便测试
  • 基于文件:readTextFile,基于HDFS中的数据进行计算分析

基本概念

Flink 的 DataSet API 是一个功能强大的批处理 API,专为处理静态、离线数据集设计。DataSet 中的数据是有限的,处理时系统会先等待整个数据集加载完毕。DataSet 可以通过多种方式创建,例如从文件、数据库、集合等加载数据,然后通过一系列转换操作(如 map、filter、join 等)进行处理。

核心特性

  • 支持丰富的转换操作。
  • 提供多种输入输出数据源。
  • 支持复杂的数据类型,包括基本类型、元组、POJO、列表等。
  • 支持优化计划,例如通过 cost-based optimizer 来优化查询执行计划。

DataSet 创建

在 Flink 中,可以通过多种方式创建 DataSet。以下是常见的数据源:

从本地文件读取

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.readTextFile("path/to/file");

从 CSV 文件读取

DataSet<Tuple3<Integer, String, Double>> csvData = env.readCsvFile("path/to/file.csv").types(Integer.class, String.class, Double.class);

从集合中创建

List<Tuple2<String, Integer>> data = Arrays.asList(new Tuple2<>("Alice", 1),new Tuple2<>("Bob", 2)
);
DataSet<Tuple2<String, Integer>> dataSet = env.fromCollection(data);

从数据库中读取

可以通过自定义的输入格式(如 JDBC 输入格式)从数据库中读取数据,虽然 Flink 本身并没有内置 JDBC 源的批处理 API,但可以通过自定义实现。

DataSet 的转换操作(Transformation)

Flink 的 DataSet API 提供了丰富的转换操作,可以对数据进行各种变换,以下是常用的转换操作:
在这里插入图片描述
在这里插入图片描述

Map

将 DataSet 中的每一条记录进行映射操作,生成新的 DataSet。

DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);
DataSet<Integer> squaredNumbers = numbers.map(n -> n * n);

Filter

过滤掉不满足条件的记录。

DataSet<Integer> evenNumbers = numbers.filter(n -> n % 2 == 0);

FlatMap

类似于 map,但允许一条记录生成多条输出记录。

DataSet<String> lines = env.fromElements("hello world", "flink is great");
DataSet<String> words = lines.flatMap((line, collector) -> {for (String word : line.split(" ")) {collector.collect(word);}
});

Reduce

将数据集根据某种聚合逻辑进行合并

DataSet<Integer> sum = numbers.reduce((n1, n2) -> n1 + n2);

GroupBy 和 Reduce

对数据集进行分组,然后在每个组上执行聚合操作

DataSet<Tuple2<String, Integer>> wordCounts = words.map(word -> new Tuple2<>(word, 1)).groupBy(0).reduce((t1, t2) -> new Tuple2<>(t1.f0, t1.f1 + t2.f1));

Join

类似于 SQL 中的连接操作,连接两个 DataSet。

DataSet<Tuple2<Integer, String>> persons = env.fromElements(new Tuple2<>(1, "Alice"),new Tuple2<>(2, "Bob")
);
DataSet<Tuple2<Integer, String>> cities = env.fromElements(new Tuple2<>(1, "Berlin"),new Tuple2<>(2, "Paris")
);
DataSet<Tuple2<String, String>> personWithCities = persons.join(cities).where(0).equalTo(0).with((p, c) -> new Tuple2<>(p.f1, c.f1));

DataSet 输出

DataSet API 提供多种方式将数据写出到外部系统:

写入文件

wordCounts.writeAsCsv("output/wordcounts.csv", "\n", ",");

写入数据库

虽然 DataSet API 没有直接提供 JDBC Sink,可以通过自定义 Sink 实现写入数据库功能。

打印控制台

wordCounts.print();

批处理的优化

DataSet API 提供了优化机制,通过成本模型和执行计划的分析来优化任务执行。在 Flink 内部,编译器会根据任务定义的转换操作生成一个优化的执行计划,这个过程类似于 SQL 查询优化器的工作原理。

  • DataSet 的分区:Flink 可以根据数据集的分区进行优化。例如,通过 partitionByHash 或 partitionByRange 来手动控制数据的分布方式。
  • DataSet 的缓存:可以通过 rebalance()、hashPartition() 等方法来均衡数据负载,以提高并行度和计算效率。

DataSet API 的容错机制

Flink 的 DataSet API 提供了容错机制,支持在发生故障时重新执行失败的任务。虽然 DataSet API 没有像 DataStream 那样依赖于 Checkpoint 机制,但其批处理特性允许任务从头开始重新执行,确保数据处理的正确性。

DataSet 与 DataStream 的对比

DataSet API 与 DataStream API 之间有一些重要的区别:

请添加图片描述

DataSet API 的未来

需要注意的是,Flink 的官方路线图中已经不再优先开发 DataSet API 的新特性,未来的主要开发将集中在 DataStream API,甚至批处理功能都将通过 DataStream API 来实现。
因此,如果可能,建议新项目尽量使用 DataStream API 来替代 DataSet API。
特别是 Flink 的 Table API 和 SQL API 也适用于批处理和流处理,这些高层 API 提供了更简洁的语法和更强的优化能力。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1523359.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

手机怎么把wmv转换成mp4格式?视频格式这样做,让你的视频更加通用

“我最近想在手机上编辑视频&#xff0c;但遇到一个问题&#xff0c;就是我有一些wmv格式的视频&#xff0c;想把它们转换成mp4格式&#xff0c;好把它们发布到平台上。但是我不会转格式。请问手机怎么把wmv转换成mp4格式呢&#xff1f;请大家帮帮我。” 格式转换对于没怎么接…

JAVA 二维码生成

1.pom依赖 <dependency><groupId>com.google.zxing</groupId><artifactId>core</artifactId><version>3.3.3</version></dependency><dependency><groupId>com.google.zxing</groupId><artifactId>ja…

四川财谷通抖音小店创新引领新风尚

在数字化浪潮的推动下&#xff0c;电商行业蓬勃发展&#xff0c;抖音小店作为新兴的电商平台&#xff0c;凭借其独特的社交属性和便捷的购物体验&#xff0c;迅速赢得了广大消费者的青睐。在众多抖音小店中&#xff0c;四川财谷通抖音小店以其精准定位、高质量内容、一站式服务…

智慧公厕:城市公共卫生管理的新篇章‌@卓振思众

在快节奏的现代生活中&#xff0c;公共厕所作为城市基础设施的重要组成部分&#xff0c;其使用体验和管理效率直接影响着市民的生活质量与城市形象。随着科技的飞速发展&#xff0c;智慧公厕应运而生&#xff0c;它以一种全新的姿态&#xff0c;为城市公共卫生管理带来了前所未…

鸿蒙Harmony--状态变量更改通知--@Watch装饰器详解

风雨飘摇中&#xff0c;我心起伏&#xff0c; 万丈雄心&#xff0c;却难以施展。 天高地远&#xff0c;路途迷茫&#xff0c; 理想如星&#xff0c;却遥不可及。 千百次跌倒&#xff0c;千百次爬起&#xff0c; 在命运的手掌中&#xff0c;挣扎前行。 谁知我心中的热血滚烫&…

向 ADC 模型和 DAC 建模添加低通滤波器

与单音测试信号相比&#xff0c;双音测试信号可提供更多有关 ADC 性能的信息。您的作者的模型与特定 ADC 的制造商模型非常匹配&#xff0c;因此可以方便地运行误码率模拟。该 ADC 恰好具有非常宽的输入带宽。 对于带宽较低的 ADC&#xff0c;添加如图 1 所示的低通滤波器将提…

用亚马逊AI代码开发助手Amazon Q Developer开发小游戏(中篇)

快用人工智能帮程序员写代码、开发游戏&#xff01;今天小李哥就来介绍亚马逊推出的国际前沿人工智能AI代码开发助手Amazon Q Developer。目前该代码助手在Hugging Face代码生成权威测试集SWE-bench中排名第一&#xff0c;可以根据我们的需求生成整个代码项目&#xff0c;并可以…

IDEA莫名奇妙自动选择光标所在行 -罪魁祸首居然是钉钉

请看受害者视角 作为开发者&#xff0c;工作时基本都会运行钉钉吧。最近&#xff0c;钉钉更新了AI功能&#xff0c;但不知道是不是开发团队平时不使用IDE&#xff0c;竟然让这个AI功能影响到了其他软件&#xff0c;简直让人无语。不仅仅是IDEA受影响&#xff0c;就连WebStorm也…

QQ聊天记录删除了怎么恢复?学会这3个方法,简单又有效

QQ作为我们日常沟通的重要工具之一&#xff0c;其聊天记录往往承载着许多珍贵的记忆和重要的信息。但在操作中我们会不小心删除或丢失这些聊天记录&#xff0c;那么QQ聊天记录删除了怎么恢复就成为我们急切需要解决的问题。先别急&#xff0c;本文就为你介绍3种简单又有效的QQ聊…

【Qt笔记】QListWidget控件详解

目录 引言 一、基本概念和特性 二、基本用法 2.1 创建和初始化 2.2 添加和删除项 2.3 选择和遍历项 三、信号与槽 3.1 itemClicked 3.2 itemDoubleClicked 3.3 itemSelectionChanged 四、自定义项 五、排序和查找 六、代码示例 6.1 头文件 6.2 源文件 6.3 主…

腾讯云TRTC无UI集成——分享屏幕主流、辅流(Vue2+JS+TRTC无UI集成)

先阐述一下问题&#xff0c;在项目中用到腾讯云的TRTC&#xff0c;A端发布A1、A2两个视频源&#xff0c;在B端订阅A1、A2使用两个view进行播放渲染 问题主流视频源和辅流视频源渲染在同一view上&#xff0c;控制台报错 // 播放远端视频 TRTCService.js; setRemoteVideo(view)…

【数据结构入门】排序算法之插入排序与选择排序

目录 前言 一、排序的概念及运用 1.排序的概念 2.排序的运用 3.常见排序算法 二、插入排序与选择排序 2.1插入排序 2.1.1直接插入排序 1&#xff09;基本思想 2&#xff09;具体步骤 3&#xff09;算法特性 4&#xff09;算法实现 2.1.2希尔排序 1) 基本思想 2&…

草原灭火车的功能与性能_鼎跃安全

在内蒙古的草原火灾中&#xff0c;水陆两栖全地形草原灭火车曾多次用于紧急救援。其强大的越野能力和高速反应&#xff0c;使其在广袤的草原上能够迅速到达火场&#xff0c;并使用集成的多功能灭火设备进行灭火作业&#xff0c;有效防止了火灾的进一步蔓延。 水陆两栖全地形草原…

React学习-hooks

官方文档&#xff1a;https://zh-hans.react.dev/reference/react/useActionState 1.useEffect(setup, dependencies?) 1.1 基础使用 //hooks import { useEffect } from "react"; import "./App.css";function App(){useEffect(()>{console.log(us…

redis的共享session应用

项目背景&#xff1a; 该项目背景就是黑马的黑马点评项目。 一&#xff1a;基于Session实现验证码登录流程 基本的登录流程我们做了很多了。这个是短信登录流程 其实和普通的登录流程就多了一个生成验证码&#xff0c;并将验证码保存在session中&#xff0c;并且呢&#xf…

vue3中使用supermap icilent3d for cesium

记录从头开始学习supermap icilent3d fro cesium 1.新建vue3项目 npm create vitelatest 添加这个&#xff0c;自动打开浏览器 2.使用supermap icilent3d for Cesium 复制这个Cesium&#xff0c;放到pulibc目录下面 然后分别引入css和js 然后就可以使用了&#xff0c;但是会…

Oracle 客户端 PL/SQL Developer 15.0.4 安装与使用

目录 官网下载与安装 切换中文与注册 连接Oracle数据库 tnsnames.ora 文件使用 Oracle 客户端 PL/SQL Developer 12.0.7 安装、数据导出、Oracle 执行/解释计划、for update。 官网下载与安装 1、官网&#xff1a;https://www.allroundautomations.com/products/pl-sql-d…

【STM32】通用定时器TIM(输出比较)

本篇博客重点在于标准库函数的理解与使用&#xff0c;搭建一个框架便于快速开发 目录 前言 输出比较简介 PWM简介 输出比较配置 初始化IO口 输出比较初始化 输出比较代码 PWM.h PWM.c main.c 应用案例 前言 建议先阅读这篇博客&#xff0c;理解时基单元的配置 【…

CDGA|数据治理:构建高效数据管理体系的实践路径

在当今数字化时代&#xff0c;数据已成为企业最宝贵的资产之一&#xff0c;其质量、安全性和有效利用率直接影响着企业的决策能力、运营效率和市场竞争力。因此&#xff0c;数据治理作为确保数据质量、促进数据价值最大化的关键环节&#xff0c;其重要性日益凸显。本文将从几个…

机械学习—零基础学习日志(概率论总笔记1)

概率论的起源 在历史上有明确记载的最早研究随机性的数学家是帕斯卡和费马。帕斯卡就是最早发明机械计算机的那位数学家&#xff0c;他并不是赌徒&#xff0c;但是他有些赌徒朋友&#xff0c;那些人常常玩一种掷骰子游戏&#xff0c;游戏规则是由玩家连续掷4次骰子&#xff0c…