Flink的环境搭建及使用

在idea中创建一个Maven项目,导入Flink的依赖,在代码中创建Flink环境,编写代码.

如果不想去找flink依赖,就去flink官网,提供了一个mvn的命令,快速下载在本地构建一个flink的项目,可以直接从这个项目的pom.xml文件中拿到依赖配置

一、环境搭建

pom.xml文件的依赖导入

<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.15.4</flink.version><target.java.version>1.8</target.java.version><scala.binary.version>2.12</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.17.1</log4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency></dependencies>


二、使用Flink

以WordCount为例:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class Demo1WordCount {public static void main(String[] args) throws Exception {//1、创建flink的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度,一个并行度对应一个taskenv.setParallelism(2);//修改数据从上游发送到下游的缓存时间env.setBufferTimeout(2000);/** 无界流*///2、读取数据//nc -lk 8888DataStream<String> linesDS = env.socketTextStream("master", 8888);//一行转换成多行DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> out) throws Exception {for (String word : line.split(",")) {//将数据发送到下游out.collect(word);}}});//转换成kv格式DataStream<Tuple2<String, Integer>> kvDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String word) throws Exception {//返回一个二元组return Tuple2.of(word, 1);}});//按照单词进行分组//底层是hash分区KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> kv) throws Exception {return kv.f0;}});//统计数量DataStream<Tuple2<String, Integer>> countDS = keyByDS.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> kv1,Tuple2<String, Integer> kv2) throws Exception {int count = kv1.f1 + kv2.f1;return Tuple2.of(kv1.f0, count);}});//打印结果countDS.print();//3、启动flinkenv.execute("wc");}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/3638.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

TypeError: Cannot read properties of undefined (reading ‘__asyncLoader‘)

项目场景&#xff1a; vue3element-plus 项目场景&#xff1a;vue3element-plustsvite的技术栈开发的后台&#xff0c;一个后台列表页面&#xff0c;使用了ElTable组件 问题描述 页面提示报一个好像是异步的问题 runtime-core.esm-bundler.js:2261 Uncaught (in promise) Ty…

SAP ABAP开发学习——BADI增强操作步骤示例1

SAP ABAP开发学习——第三代增强&#xff08;BADI)-CSDN博客 SAP ABAP开发学习——BADI增强操作步骤示例2-CSDN博客 创建物料MM01的增强 首先进入SE24 打断点&#xff0c;运行事务MM01,启动debug,获得增强的名字 F8依次获得下一个增强的名字 继续获得增强 进入选择视图以及销…

odrive代码阅读笔记

电机参数 电流环带宽 atan2 // based on https://math.stackexchange.com/a/1105038/81278 float fast_atan2(float y, float x) {// a : min (|x|, |y|) / max (|x|, |y|)float abs_y fabsf(y);float abs_x fabsf(x);// inject FLT_MIN in denominator to avoid division …

C++多线程常见的数据竞争模式及示例分析

一、简单竞争 最简单的数据竞争是最常见的一种&#xff1a;两个线程在没有任何同步的情况下访问一个内置类型的变量。很多时候&#xff0c;这种竞争是良性的&#xff08;代码统计一些允许不精确的统计信息&#xff09;。 int var;void Thread1() { // 在一个线程中运行。var;…

Jest进阶知识:测试快照 - 确保组件渲染输出正确

在 React 应用开发中&#xff0c;确保组件的渲染输出正确是一项重要的测试任务。快照测试是一种有效的方法&#xff0c;可以帮助开发者捕捉并验证组件的渲染输出&#xff0c;确保其在不同的情况下保持一致。 什么是快照测试&#xff1f; 快照测试的基本思想是&#xff1a; 首…

【AI落地应用实战】HivisionIDPhotos AI证件照制作实践指南

最近在网上发现了一款轻量级的AI证件照制作的项目&#xff0c;名为HivisionIDPhotos。它利用AI模型实现对多种拍照场景的识别、抠图与证件照生成&#xff0c;支持轻量级抠图、多种标准证件照和排版照生成、纯离线或端云推理、美颜等功能。此外&#xff0c;项目还提供了Gradio D…

基于SSM的在线作业管理系统 -octopus-master(源码+调试)

项目描述 临近学期结束&#xff0c;还是毕业设计&#xff0c;你还在做java程序网络编程&#xff0c;期末作业&#xff0c;老师的作业要求觉得大了吗?不知道毕业设计该怎么办?网页功能的数量是否太多?没有合适的类型或系统?等等。你想解决的问题&#xff0c;今天给大家介绍…

【SPIE单独出版审核,见刊检索稳定!】2024年遥感技术与图像处理国际学术会议(RSTIP 2024,11月29-12月1日)

2024年遥感技术与图像处理国际学术会议&#xff08;RSTIP 2024&#xff09; 2024 International Conference on Remote Sensing Technology and Image Processing 官方信息 会议官网&#xff1a;www.rstip.org 时间地点&#xff1a;2024年11月29-12月1日 | 中国大理 三轮截…

青少年编程能力等级测评CPA Python编程(一级)

青少年编程能力等级测评CPA Python编程(一级) &#xff08;考试时间90分钟&#xff0c;满分100分&#xff09; 一、单项选择题&#xff08;共20题&#xff0c;每题3.5分&#xff0c;共70分&#xff09; 下列语句的输出结果是&#xff08; &#xff09;。 print(35*2) A&a…

数学篇 - 微分(求导)的基本法则与行列式

一、常数及基本函数的求导规则 常数的导数&#xff1a; ( C ) ′ 0 (C)0 (C)′0 幂函数的导数&#xff1a; ( x μ ) ′ μ x μ − 1 (x^\mu)\mu x^{\mu-1} (xμ)′μxμ−1 三角函数正弦、余弦函数的导数&#xff1a; ( s i n x ) ′ c o s x (sin\ x)cos\ x (sin x)′…

玄机-应急响应- Linux入侵排查

一、web目录存在木马&#xff0c;请找到木马的密码提交 到web目录进行搜索 find ./ type f -name "*.php" | xargs grep "eval(" 发现有三个可疑文件 1.php看到密码 1 flag{1} 二、服务器疑似存在不死马&#xff0c;请找到不死马的密码提交 被md5加密的…

H.266与H.265、AV1、H.264对比

好多开发者希望搞清楚H.266&#xff08;Versatile Video Coding&#xff0c;VVC&#xff09;、H.265&#xff08;High Efficiency Video Coding&#xff0c;HEVC&#xff09;、AV1、H.264&#xff08;Advanced Video Coding&#xff09;四者区别&#xff0c;本文从压缩效率、画…

【征程 6 工具链性能分析与优化-1】编译器预估 perf 解读与性能分析

01 引言 本篇文章中&#xff0c;我们将首先介绍 layerdetails 中的参数信息&#xff0c;然后将结合实例分析如何利用 layerdetails 来分析模型的性能瓶颈&#xff0c;进而对模型的性能进行优化。 02 layerdetails 中信息解读 征程 6 工具链目前提供了两种方式生成性能评估报…

有线电视 1.27.5 | 完全免费的电视直播应用,频道丰富,画质清晰

有线电视是一款针对智能电视和电视盒子开发的在线观看电视应用软件。该软件最大的特色是完全免费&#xff0c;并且支持几乎国内所有的电视台&#xff0c;无论是央视频道还是省卫视频道应有尽有。为了更好地服务用户&#xff0c;有线电视还对电视频道进行了分类&#xff0c;包含…

ML2001-1 机器学习/深度学习 Introduction of Machine / Deep Learning

图片说明来自李宏毅老师视频的学习笔记&#xff0c;如有侵权&#xff0c;请通知下架 影片参考 【李宏毅】3.第一节 - (上) - 机器学习基本概念简介_哔哩哔哩_bilibili 1. 机器学习的概念与任务类型 概念&#xff1a;机器学习近似于寻找函数&#xff0c;用于处理不同类型的任…

Java项目实战II基于Java+Spring Boot+MySQL的植物健康系统(开发文档+数据库+源码)

目录 一、前言 二、技术介绍 三、系统实现 四、文档参考 五、核心代码 六、源码获取 全栈码农以及毕业设计实战开发&#xff0c;CSDN平台Java领域新星创作者&#xff0c;专注于大学生项目实战开发、讲解和毕业答疑辅导。获取源码联系方式请查看文末 一、前言 基于Java、…

推荐一款面向增材制造的高效设计平台:nTopology

nTopology是一款面向增材制造的高效设计平台&#xff0c;平台预置了大量增材制造常用的设计工具包&#xff0c;工程师通过调用若干个预置工具包、或自主开发定制的工具包&#xff0c;建立一个工作流&#xff0c;实现复杂几何结构的参数化设计。nTopology集合了的强大几何建模和…

CreateEvent使用笔记

一、前言 开发中上位机获取或设置下位机参数的接口&#xff0c;有阻塞、非阻塞两种&#xff1a; 1、API非阻塞&#xff0c;异步回调返回结果 2、API阻塞&#xff0c;超时或直接返回结果 对于应用层调用者来说&#xff0c;阻塞API更方便&#xff0c;而要实现阻塞API在windows可使…

从“点”到“面”,热成像防爆手机如何为安全织就“透视网”?

市场上测温产品让人眼花缭乱&#xff0c;通过调研分析&#xff0c;小编发现测温枪占很高比重。但是&#xff0c;测温枪局限于显示单一数值信息&#xff0c;无法直观地展示物体的整体温度分布情况&#xff0c;而且几乎没有功能拓展能力。以AORO A23为代表的热成像防爆手机改变了…

代码随想录一刷——454.四数相加II

我们现在前2个数组中&#xff0c;统计元素之和以及出现的次数&#xff08;用map&#xff09;&#xff0c;随后再另外2个数组中遍历看上面元素之和的相反数是否存在于map中即可。 C&#xff1a; class Solution { public: int fourSumCount(vector<int>& nums1, ve…