大数据flink篇之二-基础实例wordcount

flink既支持批数据处理,也支持流数据处理。flink1.12版本后,批流进行了api统一。开发语言可以选择java和scala,这里选择java。下面以wordcount为例,讲解flink编程的流程。
开发前提:

  • idea
  • maven
  • jdk 1.8

一、maven依赖

<properties><flink.version>1.15.0</flink.version><flink.scala.version>2.12</flink.scala.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-hadoop-compatibility_${flink.scala.version}</artifactId><version>${flink.version}</version></dependency></dependencies>

二、数据结构

wordcount.txt数据结构:
zhangsan,lisi,wangwu
ajdhaj,hdgaj,zhangsan
lisi,wangwu

三、DataSet API读取离线文件实现wordcount统计

package com.first.example;import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.*;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FilterOperator;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.netty4.io.netty.util.internal.StringUtil;
import org.apache.flink.util.Collector;/*** @author xxxx* @date 2023-09-26 12:48*/
public class DataSetWordCount {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSource<String> dataSource = env.readTextFile("E:\\Centos\\flinkExercise\\data\\wordcount.txt");FlatMapOperator<String, Tuple2<String, Integer>> tuple2FlatMapOperator = dataSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] split = s.split(",");for (String s1 : split) {if (!StringUtils.isEmpty(s1)) {collector.collect(new Tuple2<>(s1, 1));}}}});AggregateOperator<Tuple2<String, Integer>> total = tuple2FlatMapOperator.groupBy(0).sum(1);//打印输出total.print();}
}

编程模式从获取执行环境env开始; readTextFile读取文件;flatMap对数据进行处理;最后经过groupBy和sum进行统一处理。
结果:
(ajdhaj,1)
(wangwu,2)
(lisi,2)
(hdgaj,1)
(zhangsan,2)

四、DataStream API以流执行方式实现wordcount统计

package com.first.example;import org.apache.commons.compress.archivers.StreamingNotSupportedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @date 2023-09-26 13:21*/
public class StreamingWordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> textFile = senv.readTextFile("E:\\Centos\\flinkExercise\\data\\wordcount.txt");DataStream<Tuple2<String, Integer>> tuple2SingleOutputStreamOperator = textFile.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] split = s.split(",");for (String s1 : split) {if (!StringUtils.isEmpty(s1)) {collector.collect(Tuple2.of(s1, 1));}}}});tuple2SingleOutputStreamOperator.keyBy(ele -> ele.f0).sum(1).print(">>>");senv.execute("first streaming....");}
}

执行过程:
获取执行环境;读取文件转为流;经过flatMap转换;然后通过keyBy和sum算子进行聚合操作。

结果:

:6> (lisi,1)
:4> (wangwu,1)
:4> (zhangsan,1)
:6> (hdgaj,1)
:6> (lisi,2)
:4> (zhangsan,2)
:4> (wangwu,2)
:2> (ajdhaj,1)

批流对比结果发现:批是整体一个输出,流是按数据逐条统计输出

五、使用批流统一api批数据统计

flink 1.12后使用批流统一api进行处理

package com.first.example;import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** 批流完全联合统一* @date 2023-09-26 15:38*/
public class BatchStreamWordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();// 默认为流,此处设为批senv.setRuntimeMode(RuntimeExecutionMode.BATCH);DataStreamSource<String> textFile = senv.readTextFile("E:\\Centos\\flinkExercise\\data\\wordcount.txt");DataStream<Tuple2<String, Integer>> tuple2SingleOutputStreamOperator = textFile.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] split = s.split(",");for (String s1 : split) {if (!StringUtils.isEmpty(s1)) {collector.collect(Tuple2.of(s1, 1));}}}});tuple2SingleOutputStreamOperator.keyBy(ele -> ele.f0).sum(1).print(">>>");senv.execute("first streaming....");}
}

与流处理过程基本一致,添加了运行环境设置:
senv.setRuntimeMode(RuntimeExecutionMode.BATCH);

结果为:

:2> (ajdhaj,1)
:4> (wangwu,2)
:4> (zhangsan,2)
:6> (lisi,2)
:6> (hdgaj,1)

下一节讲解Flink的安装和部署。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/145004.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

【Java】复制数组的四种方式

1. System.arraycopy() 用来将一个数组的&#xff08;一部分&#xff09;内容复制到另一个数组里面去。 定义&#xff1a; void arraycopy(Object src, int srcPos, Object dest, int destPos, int length);例&#xff1a; int[] arr1 { 1, 2, 3, 4, 5 }; int[] arr2 new…

SpringBoot全局异常处理源码

SpringBoot全局异常处理源码 一、SpringMVC执行流程二、SpringBoot源码跟踪三、自定义优雅的全局异常处理脚手架starter自定义异常国际化引入封装基础异常封装基础异常扫描器&#xff0c;并注册到ExceptionHandler中项目分享以及改进点 一、SpringMVC执行流程 今天这里叙述的全…

基于Java的药品管理系统设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序&#xff08;小蔡coding&#xff09;有保障的售后福利 代码参考源码获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作…

Java微信分享接口开发

概述 微信JS-SDK实现自定义分享功能&#xff0c;分享给朋友&#xff0c;分享到朋友圈 详细 概述 概述 微信公众平台开始支持前端网页&#xff0c;大家可能看到很多网页上都有分享到朋友圈&#xff0c;关注微信等按钮&#xff0c;点击它们都会弹出一个窗口让你分享和关注&…

前端uniapp防止页面整体滑动页面顶部以上,设置固定想要固定区域宽高

解决&#xff1a;设置固定想要固定区域宽高 目录 未改前图未改样式改后图改后样式 未改前图 未改样式 .main {display: flex;flex-direction: row;// justify-content: space-between;width: 100vw;// 防止全部移动到上面位置&#xff01;&#xff01;&#xff01;&#xff01…

【C++的OpenCV】第十三课-OpenCV基础强化(一):绝对有用!Mat相关的一系列知识(基础->进阶)

&#x1f389;&#x1f389;&#x1f389; 欢迎各位来到小白 p i a o 的学习空间&#xff01; \color{red}{欢迎各位来到小白piao的学习空间&#xff01;} 欢迎各位来到小白piao的学习空间&#xff01;&#x1f389;&#x1f389;&#x1f389; &#x1f496;&#x1f496;&…

在nodejs中如何防止ssrf攻击

在nodejs中如何防止ssrf攻击 什么是ssrf攻击 ssrf&#xff08;server-side request forgery&#xff09;是服务器端请求伪造&#xff0c;指攻击者能够从易受攻击的Web应用程序发送精心设计的请求的对其他网站进行攻击。(利用一个可发起网络请求的服务当作跳板来攻击其他服务)…

C++核心编程--继承篇

4.6、继承 继承是面向对象三大特征之一 有些类与类之间存在特殊的关系&#xff0c;例如下图中&#xff1a; ​ 我们发现&#xff0c;定义这些类的定义时&#xff0c;都拥有上一级的一些共性&#xff0c;还有一些自己的特性。那么我们遇到重复的东西时&#xff0c;就可以考虑使…

大数据Flink(八十九):Temporal Join(快照 Join)

文章目录 Temporal Join(快照 Join) Temporal Join(快照 Join) Temporal Join 定义(支持 Batch\Streaming):Temporal Join 在离线的概念中其实是没有类似的 Join 概念的,但是离线中常常会维护一种表叫做 拉链快照表,使用一个明细表去 join 这个 拉链快照表 的 join …

GD32F10x的输出模式

1. 单片机型号的识别。 2. GPIO的输出模式。 1. 开漏模式 2.推挽模式 3.复用开漏模式 4.复用推挽模式。 开漏模式&#xff1a;&#xff08;写入位设置&#xff0c;输出数据寄存器来控制MOS&#xff09; 只有N-MOS管导通。PMOS不导通。 当N-MOS的栅极为0&#xff0c;N-MOS管…

SQL血缘解析原理

根据sql解析获取到表到表, 字段到字段间的关系,即血缘关系。实际上这是从sql文本获取到数据流的过程。 大致步骤如下&#xff1a; 1.sql文本进行词法分析 2.sql语法分析获取到AST抽象语法树 3.访问AST抽象语法树根据语法结构推测出数据的流向,例如create as select from 这种结…

排序:败者树和置换选择排序(解决外部排序中的优化问题)

1.算法目的&#xff08;败者树&#xff09; 解决多路平衡归并带来的问题。 在外部排序中&#xff0c;使用k路平衡归并策略, 选出一个最小元素需要对比关键字(k-1)次&#xff0c; 导致内部归并所需时间增加。&#xff08;可用“败者树”进行优化&#xff09; 2.败者树的定义 …

高德地图根据两点的经纬度计算两点之间的距离(修正版)

SQL语句可以用来计算两个经纬度之间的距离。下面是一个示例的SQL语句&#xff1a; SELECT id, ( 6371 * ACOS( COS( RADIANS( lat1 ) ) * COS( RADIANS( lat2 ) ) * COS( RADIANS( lng2 ) - RADIANS( lng1 ) ) SIN( RADIANS( lat1 ) ) * SIN( RADIANS( lat2 ) ) ) ) AS dista…

ROS系统读取USB相机图像数据

ROS系统读取USB相机图像数据 前言usb_cam 功能包下载与编译摄像头选择连接摄像头可配置参数 前言 usb_cam功能包简介 为了丰富机器人与外界的交互方式&#xff0c;已经增加了与机器人的语音交互方式&#xff0c;不仅使机器人能够说话发声&#xff0c;还能听懂我们说的话&#…

泽众APM性能监控软件

泽众Application Performance Management&#xff08;简称APM&#xff09;是一款专业的性能监控工具&#xff0c;可以对全链路如Web服务器、应用服务器、数据库服务器等进行实时监控&#xff0c;并以图表化的形式直观地呈现监控数据&#xff0c;为系统性能优化和定位问题提供准…

3 OpenCV两张图片实现稀疏点云的生成

前文&#xff1a; 1 基于SIFT图像特征识别的匹配方法比较与实现 2 OpenCV实现的F矩阵RANSAC原理与实践 1 E矩阵 1.1 由F到E E K T ∗ F ∗ K E K^T * F * K EKT∗F∗K E 矩阵可以直接通过之前算好的 F 矩阵与相机内参 K 矩阵获得 Mat E K.t() * F * K;相机内参获得的方式…

ThreeJS-3D教学四-光源

three模拟的真实3D环境&#xff0c;一个非常炫酷的功能便是对光源的操控&#xff0c;之前教学一中已经简单的描述了多种光源&#xff0c;这次咱们就详细的讲下一些最常见的光源&#xff1a; AmbientLight 该灯光在全局范围内平等地照亮场景中的所有对象。 该灯光不能用于投射阴…

蓝桥杯每日一题2023.9.29

蓝桥杯大赛历届真题 - C&C 大学 B 组 - 蓝桥云课 (lanqiao.cn) 题目描述1 题目分析 看见有32位&#xff0c;我们以此为入手点&#xff0c; B代表字节1B 8b b代表位&#xff0c;32位即4个字节 (B) 1KB 1024B 1MB 1024KB (256 * 1024 * 1024) / 4 67108864 故答案…

Spring | 基于SpringBoot的多数据源实战 - 使用seata实现多数据源的全局事务管理

Spring | 基于SpringBoot的多数据源实战 - 使用seata实现多数据源的全局事务管理 引言1.1 多数据源的必要性1.2 多数据源的应用场景 实战演示2.1 创建实体类2.2 配置数据源2.3 实现数据源配置类2.4 配置Repository类2.5 运行与验证 事务管理与数据一致性3.1 事务管理3.2 使用Se…

MySQL 索引底层 B+Tree 原理解析

目录 一、前言二、B-Tree 和 BTree 的区别三、InnoDB 和 MyISAM 存储引擎索引存储区别MyISAMInnoDB 四、InnoDB 联合索引底层数据结构五、MySQL 中三次磁盘IO最大能检索多少数据 一、前言 索引是帮助高效获取数据排好序的数据结构&#xff0c;任何数据库都会使用到索引&#x…