Apache Filnk----入门

文章目录

  • Flink 概述
    • Flink 是什么
    • 有界流和无界流
    • 有状态流处理
    • Flink 特点
    • Flink vs SparkStreaming
    • Flink 分层API
  • Flink 快速上手
    • WordCount 代码编写
      • 批处理
      • 流处理
      • 读取socket文本流

Flink 概述

Flink 是什么

在这里插入图片描述

有界流和无界流

  • 无界数据流:
    • 有定义流的开始,但没有定义流的结束;
    • 它们会无休止的产生数据:
    • 无界流的数据必须持续处理,即数据被摄取后需要立刻处理我们不能等到所有数据都到达再处理,因为输入是无限的。
  • 有界数据流:
    • 有定义流的开始,也有定义流的结束:
    • 有界流可以在摄取所有数据后再进行计算:
    • 有界流所有数据可以被排序,所以并不需要有序摄取:
    • 有界流处理通常被称为批处理

有状态流处理

把流处理需要的额外数据保存成一个状态,然后针对这条数据进行处理,并且更新状态。这就是所谓的"有状态的流处理"

在这里插入图片描述

  • 状态在内存中:优点:速度快 ;缺点:可靠性差
  • 状态在分布式系统中:优点:可靠性高;缺点:速度慢

Flink 特点

  • 高吞吐和低延迟。每秒处理数百万个事件,毫秒级延迟。
  • 结果的准确性。Flink提供了事件时间(event-time)和处理时间(processing-time)语义对于乱序事件流,事件时间语义仍然能提供一致且准确的结果。
  • 精确一次(exactly-once)的状态一致性保证。
  • 可以连接到最常用的外部系统,如Kata、Hive、JDBC、HDFS、Redis等
  • 高可用。本身高可用的设置,加上与K8S,YARN和Mesos的紧密集成,再加上从故障中快速恢复和动态扩展任务的能力,Fhnk能做到以极少的停机时间7x24全天候运行。

Flink vs SparkStreaming

在这里插入图片描述

Flink 分层API

在这里插入图片描述

  • 有状态流处理:通过底层API(处理函数),对最原始数据加工处理。底层API与DataSstrea API相集成,可以处理复杂的计算。
  • DataStream API(流处理)和DataSet API(批处理)封装了底层处理函数,提供了通用的模块,比如转换(transormations,包括map、flatmap等),连接(joms),聚合(aggregations),窗口(windows)操作等。注意:Flimk1.12以后,DataStream API已经实现真正的流批一体,所以DataSet API已经过时
  • Table API 是以表为中心的声明式编程,其中表可能会动态变化。Ible API遵循关系模型:表有二维数据结构,类似于关系数据库中的表;同时API提供可比较的操作,例如select、project、jomn、group-by、aggregate等。我们可以在表与 DataStream/Dataset 之间无缝切换,以允许程序将 Table AP与DataStream 以及 DataSet 混合使用。
  • SOL这一层在语法与表达能力上与 Table API类似,但是是以SOL查询表达式的形式表现程序。SOL抽象与Table API交互密切,同上心能有难区的时SOL查询可以直接在Table API定义的表上执行。

Flink 快速上手

  • 在IDEA中创建Maven项目

  • 添加项目依赖

    <properties><flink.version>1.17.0</flink.version>
    </properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency>
    </dependencies>
    

WordCount 代码编写

批处理

批处理基本思路:先逐行读入文件数据,然后将每一行文字拆分成单词;接着按照单词分组,统计每组数据的个数,就是对应单词的频次。

1)数据准备
(1)在工程根目录下新建一个input文件夹,并在下面创建文本文件words.txt
(2)在words.txt中输入一些文字,例如:

hello flink
hello world
hello java

2)代码编写
(1)在com.yudan.wc包下新建Java类BatchWordCount,在静态main方法中编写代码。具体代码实现如下:

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class BatchWordCount {public static void main(String[] args) throws Exception {// 1. 创建执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 2. 从文件读取数据  按行读取(存储的元素就是每行的文本)DataSource<String> lineDS = env.readTextFile("input/words.txt");// 3. 转换数据格式FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word,1L));}}});// 4. 按照 word 进行分组UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);// 5. 分组内聚合统计AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);// 6. 打印结果sum.print();}
}

(2)输出

(flink,1)
(world,1)
(hello,3)
(java,1)

从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:

$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

流处理

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.util.Arrays;public class StreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取文件DataStreamSource<String> lineStream = env.readTextFile("input/words.txt");// 3. 转换、分组、求和,得到统计结果SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}}).keyBy(data -> data.f0).sum(1);// 4. 打印sum.print();// 5. 执行env.execute();}
}

输出:

3> (java,1)
5> (hello,1)
5> (hello,2)
5> (hello,3)
13> (flink,1)
9> (world,1)
  • 主要观察与批处理程序BatchWordCount的不同:
    • 创建执行环境的不同,流处理程序使用的是StreamExecutionEnvironment。
    • 转换处理之后,得到的数据对象类型不同。
    • 分组操作调用的是keyBy方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key是什么。
    • 代码末尾需要调用env的execute方法,开始执行任务。

读取socket文本流

在实际的生产环境中,真正的数据流其实是无界的,有开始却没有结束,这就要求我们需要持续地处理捕获的数据。为了模拟这种场景,可以监听socket端口,然后向该端口不断的发送数据。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.util.Arrays;public class SocketStreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取文本流:hadoop102表示发送端主机名、7777表示端口号DataStreamSource<String> lineStream = env.socketTextStream("hadoop102", 7777);// 3. 转换、分组、求和,得到统计结果SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG)).keyBy(data -> data.f0).sum(1);// 4. 打印sum.print();// 5. 执行env.execute();}
}

Flink还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于Java中泛型擦除的存在,在某些特殊情况下(比如Lambda表达式中),自动提取的信息是不够精细的——只告诉Flink当前的元素由“船头、船身、船尾”构成,根本无法重建出“大船”的模样;这时就需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。

因为对于flatMap里传入的Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>。只有显式地告诉系统当前的返回类型,才能正确地解析出完整数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1486951.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

ts一些解决vscode飘红的方法

1、查看是否有些ts的数据类型定义问题&#xff0c;属性缺少或者属性类型不对 把对应属性加上即可 2、在飘红的代码前面设置// ts-ignore忽略此行校验&#xff08;不过一般不建议用这个方法&#xff09; 3、移除高版本不用的属性&#xff08;版本属性兼容问题&#xff09; 原因…

PP-Human行为识别(RTSP协议视频流实时检测)

基于PaddleDetection本地实现PP-Human行为识别模块&#xff08;RTSP协议视频流实时检测&#xff09; 项目介绍环境准备1. Anaconda 创建环境2. 获取 PaddleDetection3. 获取 [MediaMTX](https://github.com/bluenviron/mediamtx/releases/tag/v1.8.4)4. FFmpeg 获取5. VLC 获取…

.NET开源、简单、实用的数据库文档生成工具

前言 今天大姚给大家分享一款.NET开源&#xff08;MIT License&#xff09;、免费、简单、实用的数据库文档&#xff08;字典&#xff09;生成工具&#xff0c;该工具支持CHM、Word、Excel、PDF、Html、XML、Markdown等多文档格式的导出&#xff1a;DBCHM。 支持的数据库 Sq…

IEEE官方列表会议 | 第三届能源与环境工程国际会议(CFEEE 2024)

会议简介 Brief Introduction 2024年第三届能源与环境工程国际会议(CFEEE 2024) 会议时间&#xff1a;2024年12月2日-4日 召开地点&#xff1a;澳大利亚凯恩斯 大会官网&#xff1a;CFEEE 2024-2024 International Conference on Frontiers of Energy and Environment Engineer…

Android APP 音视频(01)MediaCodec解码H264码流

说明&#xff1a; 此MediaCodec解码H264实操主要针对Android12.0系统。通过读取sd卡上的H264码流Me获取视频数据&#xff0c;将数据通过mediacodec解码输出到surfaceview上。 1 H264码流和MediaCodec解码简介 1.1 H264码流简介 H.264&#xff0c;也被称为MPEG-4 AVC&#xff…

uni-app 影视类小程序开发从零到一 | 开源项目分享

引言 在数字娱乐时代&#xff0c;对于电影爱好者而言&#xff0c;随时随地享受精彩影片成为一种日常需求。分享一款基于 uni-app 开发的影视类小程序。它不仅提供了丰富的影视资源推荐&#xff0c;还融入了个性化知乎日报等内容&#xff0c;是不错的素材&#xff0c;同时对电影…

就业管理功能概述:构建智慧校园企业招聘平台

在智慧校园整体解决方案中&#xff0c;就业管理模块连接着学校与企业两端&#xff0c;更成为学生们步入社会、开启职业生涯梦想的关键门户。这一功能的核心价值&#xff0c;在于它如何巧妙地运用科技的力量&#xff0c;简化招聘流程&#xff0c;提升招聘效率&#xff0c;同时为…

5G赋能车联网,无人驾驶引领未来出行

无人驾驶车联网应用已成为智能交通领域的重要发展趋势。随着无人驾驶技术的不断进步和5G网络的广泛部署&#xff0c;5G工业路由器在无人驾驶车联网中的应用日益广泛&#xff0c;为无人驾驶车辆提供了稳定、高效、低时延的通信保障。 5G工业路由器的优势 低时延&#xff1a;5G网…

Python教程(一):环境搭建及PyCharm安装

目录 引言1. Python简介1.1 编译型语言 VS 解释型语言 2. Python的独特之处3. Python应用全览4. Python版本及区别5. 环境搭建5.1 安装Python&#xff1a; 6. 开发工具&#xff08;IDE&#xff09;6.1 PyCharm安装教程6.2 永久使用教程 7. 编写第一个Hello World结语 引言 在当…

Open3D 可视化窗口中查看点的坐标数据

目录 一、概述 1.1实现步骤 1.2应用 二、代码实现 2.1关键函数 2.2完整代码 三、实现效果 3.1选取点 3.2数据显示 前期试读&#xff0c;后续会将博客加入下列链接的专栏&#xff0c;欢迎订阅 Open3D与点云深度学习的应用_白葵新的博客-CSDN博客 一、概述 可以使用Op…

Java语言程序设计基础篇_编程练习题**15.19 (游戏:手眼协调)

**15.19 (游戏:手眼协调) 请编写一个程序&#xff0c;显示一个半径为10像素的实心圆&#xff0c;该圆放置在面板上的随机位置&#xff0c;并填充随机的顔色&#xff0c;如图15-29b所示。单击这个圆时&#xff0c;它会消失&#xff0c;然后在另一个随机的位置显示新的随机颜色的…

【工具】轻松转换JSON与Markdown表格——自制Obsidian插件

文章目录 一、插件简介二、功能详解三、使用教程四、插件代码五、总结 一、插件简介 JsonMdTableConverter是一款用于Obsidian的插件&#xff0c;它可以帮助用户在JSON格式和Markdown表格之间进行快速转换。这款插件具有以下特点&#xff1a; 轻松识别并转换JSON与Markdown表格…

Java | Leetcode Java题解之第278题第一个错误的版本

题目&#xff1a; 题解&#xff1a; public class Solution extends VersionControl {public int firstBadVersion(int n) {int left 1, right n;while (left < right) { // 循环直至区间左右端点相同int mid left (right - left) / 2; // 防止计算时溢出if (isBadVers…

【linux驱动开发】卸载驱动时报错:Trying to free already-free IRQ 0

【linux驱动开发】free_irq时报错:Trying to free already-free IRQ 0 卸载驱动时报错Trying to free already-free IRQ 0 第一次加载卸载驱动没有任何问题。第二次加载驱动&#xff0c;按键中断触发失效&#xff0c;卸载驱动时报错:Trying to free already-free IRQ 0 看了…

牛客周赛50轮+cf955+abc363

D-小红的因式分解_牛客周赛 Round 50 (nowcoder.com) 思路&#xff1a; 巨蠢的题目&#xff0c;ax^2bxca1*a2*x^2(b1*a2b2*a1)xb1*b2&#xff0c;即&#xff1a; aa1*a2,ba1*b2a2*b1,cb1*b2 数据范围很小&#xff0c;直接暴力枚举吧&#xff08;注意条件&#xff09; 代码…

简单使用SpringMVC写一个图书管理系统的登入功能和图书展示功能

准备好前端的代码 这里已经准备好了前端的代码&#xff0c;这里仅仅简单的介绍登入功能&#xff0c;和展示图书列表的功能。 如图&#xff1a; 如上图所示&#xff0c;这里的前端代码还是比较多的&#xff0c;在这里我介绍&#xff0c;login.html还有book_list.html这两个。 l…

【快速逆向四/无过程/有源码】浙江工商职业技术学院 统一身份认证

逆向日期&#xff1a;2024.07.23 使用工具&#xff1a;Node.js 加密方法&#xff1a;RSAUtils 文章全程已做去敏处理&#xff01;&#xff01;&#xff01; 【需要做的可联系我】 AES解密处理&#xff08;直接解密即可&#xff09;&#xff08;crypto-js.js 标准算法&#xf…

万界星空科技MES系统的智能排产功能

万界星空科技MES系统通过一系列先进的手段和算法进行智能排产&#xff0c;这些手段确保了生产过程的优化和效率的提升。 1、智能分析&#xff1a; MES系统通过收集和分析生产过程中的数据&#xff0c;能够对生产过程进行智能分析。这包括分析哪些工序需要生产&#xff0c;哪些…

01 RabbitMQ:简单介绍

01 RabbitMQ&#xff1a;简单介绍 1. 简单介绍1.1. 什么是消息队列&#xff1f;1.2. 底层实现两大主流方式1.3. 两大主流方式对比1.4. 各个MQ产品的对比 2. RabbitMQ简介1.2. 官网1.3. 体系结构1.3.1. Producer1.3.2. Consumer1.3.3. Connection1.3.4. Channel1.3.5. Broker1.3…

【iOS】——属性关键字

属性关键字的类型 在iOS中属性关键字分为四种类型&#xff1a; 可访问性: readonly ,readwrite原子性 &#xff1a; atomic &#xff0c;nonatomic内存管理 &#xff1a; retain/strong/copy&#xff0c; assign/unsafe_unretained&#xff0c;weak方法命名&#xff1a;sette…