Flink 03 | 数据流基本操作

图片

Flink数据流结构

DataStream 转换

通常我们需要分析的业务数据可能存在如下问题:

  • 数据中包含一些我们不需要的数据

  • 数据格式不方面分析

因此我们需要对原始数据流进行加工,比如过滤、转换等操作才可以进行数据分析。

Flink DataStream 转换主要作用:对输入的数据流(DataStream)经过各种转换操作以生成新的数据流

操作分类

  • 单条记录操作

    • 比如 Map  、 Fliter

  • 基于窗口 (Window)操作

    • 窗口根据某些特征(例如,过去 5 秒内到达的数据)对所有流事件进行分组

  • 合并数据流

    • union 、join、connect 可以将多个DataStream 合并为一个DataStream 进行分析处理

  • 拆分数据流

    • 将数据流拆分为多个数据流分别对每个数据流进行分析

基本操作

操作描述备注
Map将数据流中每个元素转换为新的元素类似 Java 中 stream.map 操作
Filter筛选只保留符合条件的数据类似 Java 中 stream.filter 操作
FlatMap将一个输入"展开"为多个元素
KeyBy将流逻辑划分为不相交的分区。所有具有相同键的记录都分配到同一个分区。
Reduce对具有相同键的元素进行规约操作,如求和、求最大值

使用示例

Map

将数据流中每个元素转换为新的元素

使用场景很多,主要对原始数据进行加工转换,Java 8 中 stream().map 操作相信大家不陌生, Flink中map 操作类似。

以下展示对数据流中数字取绝对值例子。

DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer value) throws Exception {return value >=0 ? value : -value;}
});

Filter

筛选出数据流中符合条件的数据,进行分析, 该操作同样与Java 8 中 stream().filter 类型。

以下代码 保留数据流中正数。

dataStream.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer value) throws Exception {return value > 0;}
});

FlatMap

该操作将一个输入"展开"为多个元素,简单来说一个对象,变成一个List。

典型例子,将句子拆分为单词

dataStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out)throws Exception {for(String word: value.split(" ")){out.collect(word);}}
});

Reduce 操作

对具有相同键的元素进行规约操作,如求和、求最大值。单词统计能够很好的展示 Flink 基本操作,包括reduce操作。

数据源进行KeyBy 后, Reduce 操作即 数据流按Key 分组聚合

public class WordCount {  public static void main(String[] args) throws Exception {  // 设置执行环境  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  // 从文件中读取文本数据  DataStream<String> text = env.readTextFile("your file");// 使用 flatMap 将文本分割成单词  DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer())  // 使用 keyBy 分组,然后使用 reduce 进行聚合  .keyBy(value->value.f0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {  @Override  public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {  return new Tuple2<>(value1.f0, value1.f1 + value2.f1);  }  });  // 打印结果  counts.print();  // 执行程序  env.execute("Flink Word Count Example");  }  // 自定义 Tokenizer 用于分割文本  public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {  @Override  public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {  // 使用空格分割字符串  for (String word : value.toLowerCase().split("\\s+")) {  if (word.length() > 0) {  out.collect(new Tuple2<>(word, 1));  }  }  }  }  
}

总结

本文介绍了Flink 数据流基本操作Map/Filter/FlatMap/KeyBy/Reduce 的用法以及使用场景,并通过一个完整的例子展示 这些基本操作同时使用,完成数据分析过程。

对于Flink 一些其他高级操作,会持续更新中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1554822.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

Kubernetes-环境篇-01-mac开发环境搭建

1、brew安装 参考知乎文章&#xff1a;https://zhuanlan.zhihu.com/p/111014448 苹果电脑 常规安装脚本&#xff08;推荐 完全体 几分钟安装完成&#xff09; /bin/zsh -c "$(curl -fsSL https://gitee.com/cunkai/HomebrewCN/raw/master/Homebrew.sh)"苹果电脑 极…

【Android】中级控件

其他布局 相对布局RelativeLayout RelativeLayout下级视图的位置是相对位置&#xff0c;得有具体的参照物才能确定最终位置。如果不设定下级视图的参照物&#xff0c;那么下级视图默认显示在RelativeLayout内部的左上角。用于确定视图位置的参照物分两种&#xff0c;一种是与…

自动驾驶系列—全面解析自动驾驶线控制动技术:智能驾驶的关键执行器

&#x1f31f;&#x1f31f; 欢迎来到我的技术小筑&#xff0c;一个专为技术探索者打造的交流空间。在这里&#xff0c;我们不仅分享代码的智慧&#xff0c;还探讨技术的深度与广度。无论您是资深开发者还是技术新手&#xff0c;这里都有一片属于您的天空。让我们在知识的海洋中…

JavaScript-上篇

JS 入门 JS概述 JavaScript&#xff08;简称JS&#xff09;是一种高层次、解释型的编程语言&#xff0c;最初由布兰登艾奇&#xff08;Brendan Eich&#xff09;于1995年创建&#xff0c;并首次出现在网景浏览器中。JS的设计初衷是为Web页面提供动态交互功能&#xff…

Leetcode - 140双周赛

目录 一&#xff0c;3300. 替换为数位和以后的最小元素 二&#xff0c;3301. 高度互不相同的最大塔高和 三&#xff0c;3302. 字典序最小的合法序列 四&#xff0c;3303. 第一个几乎相等子字符串的下标 一&#xff0c;3300. 替换为数位和以后的最小元素 本题直接暴力求解&a…

【hot100-java】【将有序数组转换为二叉搜索树】

二叉树篇 BST树 递归直接实现。 /*** Definition for a binary tree node.* public class TreeNode {* int val;* TreeNode left;* TreeNode right;* TreeNode() {}* TreeNode(int val) { this.val val; }* TreeNode(int val, TreeNode left, TreeNo…

wpf实现新用户页面引导

第一步 第二部 部分代码: private void show(int xh, FrameworkElement fe, string con, Visibility vis Visibility.Visible) {Point point fe.TransformToAncestor(Window.GetWindow(fe)).Transform(new Point(0, 0));//获取控件坐标点RectangleGeometry rg new Rectangl…

Deathnote解题过程

主机扫描&#xff0c;发现192.168.1.194 arp-scan -l 端口扫描&#xff0c;发现80和22端口 nmap -sS 192.168.1.194 访问80端口发现自动跳转到http://deathnote.vuln/wordpress添加绑定地址就可以访问了 vim /etc/hosts 192.168.1.194 deathnote.vuln 访问发现并没有什么东西…

无人值守安装文件

一般配合BypassUAC 原理 一些 windows 无人值守安装文件中含有用户的明文或 base64 编码后的密文。 无人值守安装文件中的密码不一定是管理员密码&#xff0c;它也可能是普通用户的密码或者服务账户的密码。 不过&#xff0c;在无人值守文件&#xff08;如 sysprep.xml 或 …

11.数据结构与算法-线性表的案例分析与实现

案例2.1-一元多项式的运算 案例2.2-稀疏多项式的计算 图书信息管理系统

2-112基于matlab的协同干扰功率分配模型

基于matlab的协同干扰功率分配模型&#xff0c;带操作界面的功率分配GUI&#xff0c;可以实现对已有功率的分配优化&#xff0c;可以手动输入参数值。4个干扰山区分二批总干扰功率&#xff0c;每个扇区包括威胁总系数、综合压制概率、目标函数增量等。程序已调通&#xff0c;可…

NVIDIA NVLink-C2C

NVIDIA NVLink-C2C 文章目录 前言一、介绍1. 用于定制芯片集成的超快芯片互连技术2. 构建半定制芯片设计3. 使用 NVLink-C2C 技术的产品 二、NVLink-C2C 技术优势1. 高带宽2. 低延迟3. 低功率和高密度4. 行业标准协议 前言 将 NVLink 扩展至芯片级集成 一、介绍 1. 用于定制芯…

【Java的SPI机制】Java SPI机制:实现灵活的服务扩展

在Java开发中&#xff0c;SPI&#xff08;Service Provider Interface&#xff0c;服务提供者接口&#xff09;机制是一种重要的设计模式&#xff0c;它允许在运行时动态地插入或更换组件实现&#xff0c;从而实现框架或库的扩展点。本文将深入浅出地介绍Java SPI机制&#xff…

Linux驱动开发(速记版)--设备模型

第八十章 设备模型基本框架-kobject 和 kset 80.1 什么是设备模型 设备模型使Linux内核处理复杂设备更高效。 字符设备驱动适用于简单设备&#xff0c;但对于电源管理和热插拔&#xff0c;不够灵活。 设备模型允许开发人员以高级方式描述硬件及关系&#xff0c;提供API处理设备…

kaggle实战3RossmanStore商店销售额预测XgBoost解决回归问题案例1

kaggle实战2信用卡反欺诈逻辑回归模型案例1 数据集下载地址 https://download.csdn.net/download/AnalogElectronic/89844637 https://tianchi.aliyun.com/dataset/89785 加载数据 #预测销售额 回归问题 import numpy as np import pandas as pd import matplotlib.pyplot a…

Pikachu-Unsafe FileUpload-客户端check

上传图片&#xff0c;点击查看页面的源码&#xff0c; 可以看到页面的文件名校验是放在前端的&#xff1b;而且也没有发起网络请求&#xff1b; 所以&#xff0c;可以通过直接修改前端代码&#xff0c;删除 checkFileExt(this.value) 这部分&#xff1b; 又或者先把文件名改成…

GS-SLAM论文阅读笔记-MGSO

前言 MGSO首字母缩略词是直接稀疏里程计(DSO)&#xff0c;我们建立的光度SLAM系统和高斯飞溅(GS)的混合。这应该是第一个前端用DSO的高斯SLAM&#xff0c;不知道这个系统的组合能不能打得过ORB-SLAM3&#xff0c;以及对DSO会做出怎么样的改进以适应高斯地图&#xff0c;接下来…

随笔(四)——代码优化

文章目录 前言1.原本代码2.新增逻辑3.优化逻辑 前言 原逻辑&#xff1a;后端data数据中返回数组&#xff0c;数组中有两个对象&#xff0c;一个是属性指标&#xff0c;一个是应用指标&#xff0c;根据这两个指标展示不同的多选框 1.原本代码 getIndicatorRange(indexReportLi…

cherry-markdown开源markdown组件详细使用教程

文章目录 前言开发定位目标调研技术方案前提工作量安排数据库表设计实现步骤1、引入依赖2、实现cherry-markdown的vue组件&#xff08;修改上传接口路径&#xff09;3、支持draw.io组件4、支持展示悬浮目录toc前端使用&#xff1a;编辑状态使用cherry-markdown的vue组件前端使用…

【AUTOSAR 基础软件】COM模块详解(通信)

文章包含了AUTOSAR基础软件&#xff08;BSW&#xff09;中COM模块相关的内容详解。本文从AUTOSAR规范解析&#xff0c;ISOLAR-AB配置以及模块相关代码分析三个维度来帮读者清晰的认识和了解COM这一基础软件模块。文中涉及的ISOLAR-AB配置以及模块相关代码都是依托于ETAS提供的工…