flink设置保存点和恢复保存点

增加了hdfs

package com.qyt;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;/*** DataStreamSource API使用*/
public class StreamWordCount {public static void main(String[] args) throws Exception {//TODO 1、获取流的类final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();System.setProperty("HADOOP_USER_NAME", "root");env.enableCheckpointing(3000);// 配置存储检查点到文件系统env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://hadoop01:9000/flink"));env.getCheckpointConfig().setCheckpointTimeout(2000l);env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//TODO 2、获取无界流DataStreamSource<String> stringDataStreamSource = env.socketTextStream("192.168.1.10", 9000, "\n");//TODO 3 ETL//TODO 3.1 转换成二元数组,简单ETL的过程SingleOutputStreamOperator<Tuple2<String, Integer>> process = stringDataStreamSource.process(new ProcessFunction<String, Tuple2<String, Integer>>() {@Overridepublic void processElement(String value, ProcessFunction<String, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split(" ");for (String word : words) {Tuple2<String, Integer> tuple2 = Tuple2.of(word, 1);out.collect(tuple2);}}}).uid("etl");//TODO 3.1 分组KeyedStream<Tuple2<String, Integer>, String> tuple2StringKeyedStream = process.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}});//TODO 3.2 聚合计算SingleOutputStreamOperator<Tuple2<String, Integer>> sum = tuple2StringKeyedStream.sum(1);//TODO 4、打印sum.print();//TODO 5、无界流需要这个不断执行的方法env.execute();}
}

要增加hadoop客户端的使用

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns="http://maven.apache.org/POM/4.0.0"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flink-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.3.4</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers combine.children="append"><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer></transformers></configuration></execution></executions></plugin></plugins>
</build>
</project>

提交flink集群

#生成对应的任务
./flink run -m 192.168.1.161:8081 -c com.qyt.StreamWordCount /root/soft/flink-demo-1.0-SNAPSHOT.jar
# 恢复上一次保存点,bc5fae2e282247486003ed259f2f37a7为jobID
./flink run -s hdfs://hadoop01:9000/flink/bc5fae2e282247486003ed259f2f37a7/chk-33 -m 192.168.1.161:8081 -c com.qyt.StreamWordCount /root/soft/flink-demo-1.0-SNAPSHOT.jar

查看对应的jobId
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1549717.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

精通推荐算法32:行为序列建模总结

1 行为序列建模总体架构 2 行为序列整体总结 用户行为序列建模是推荐算法中至关重要的一环&#xff0c;也是目前较为核心和前沿的研究方向。其主要分为短序列建模和长序列建模两大方向。短序列建模又主要分为池化和序列化两种方式&#xff0c;其中池化包括Sum-Pooling、Averag…

信道衰落的公式

对于天线&#xff1a; 对于天线的面积计算&#xff1a; 天线的接收功率密度&#xff1a; 天线的接收功率&#xff1a; 移动无线信道&#xff08;I&#xff09; (xidian.edu.cn)https://web.xidian.edu.cn/zma/files/20150710_153736.pdf 更加常用的考虑了额外的信道衰落pathlo…

甘肃辣椒油:舌尖上的热辣诱惑

&#x1f4a5;宝子们&#xff0c;今天必须要给你们安利甘肃食家巷的辣椒油&#x1f336;️&#xff01;✨甘肃辣椒油&#xff0c;那可是有着独特魅力的美食瑰宝&#x1f60d;。它以其鲜艳的色泽、浓郁的香气和醇厚的辣味&#xff0c;瞬间点燃你的味蕾&#x1f525;。&#x1f3…

《Spring Boot应用进阶:打造优雅的错误处理机制与全局异常拦截器》

文章目录 自定义异常类AppException封装业务有关的枚举类AppExceptionCodeMsg全局异常拦截器Handler响应类模板Resp案例展示 || Demo项目结构pom依赖DemoController实际执行结果 Demo案例Git地址 | Gitee 本文主要介绍自己在工作中在处理抛出异常类和封装响应类处理的模板总结。…

【matlab画多纵坐标图像】

一 、什么是多纵坐标图像 多纵坐标图像是一种在同一个坐标系中&#xff0c;使用多个纵坐标轴来表示不同的数据指标的图像。在多纵坐标图中&#xff0c;每个纵坐标轴可以有不同的刻度和单位&#xff0c;用于表示不同的数据范围。这样可以方便地比较不同指标的变化趋势&#xff0…

【C语言】单片机map表详细解析

1、RO Size、RW Size、ROM Size分别是什么 首先将map文件翻到最下面&#xff0c;可以看到 1.1 RO Size&#xff1a;只读段 Code&#xff1a;程序的代码部分&#xff08;也就是 .text 段&#xff09;&#xff0c;它存放了程序的指令和可执行代码。 RO Data&#xff1a;只读…

供应链 | 顶刊POMS论文精读:交易成本经济学(TCE)——供应链效率理论

编者按 供应链效率提升指南&#xff1a;不可不知的TCE理论视角 本文为Production and Operations Management 期刊论文&#xff0c;原文信息&#xff1a; Ketokivi, M., & Mahoney, J. T. (2020). Transaction cost economics as a theory of supply chain efficiency. …

自然资源部最新Nature正刊!!!

2024年8月21日&#xff0c;国际顶级期刊《Nature》发表了自然资源部第二海洋研究所李家彪院士为通讯作者&#xff0c;张涛为第一作者的论文“超慢速扩张加克洋中脊的高变化岩浆增生”。这一成果颠覆了国际海洋学术界半个多世纪以来一直认为的超慢速扩张洋中脊岩浆供给极度贫瘠的…

9.28 Qt界面

#include "widget.h"Widget::Widget(QWidget *parent): QWidget(parent) {this->setWindowTitle("Plane");this->setWindowIcon(QIcon("C:/Users/EDY/Desktop/递送发送.png"));QPushButton *btn1new QPushButton;this->setFixedSize(64…

[SAP ABAP] 锁对象

在SAP中使用锁对象&#xff0c;用于避免在数据库中插入或更改数据时出现不一致的情况 1.创建锁对象 数据准备 学校表(ZDBT_SCH_437) 使用事务码SE11创建锁对象 点击"锁对象"单选按钮&#xff0c;输入以E开头的锁定对象的名称&#xff0c;然后点击创建按钮 锁对象名…

施工现场安全帽监控预警#YOLO视觉 ai视频识别安全帽监测系统

在建筑工地上&#xff0c;安全始终是首要任务。为了提高工地安全&#xff0c;引入了安全帽监控预警系统&#xff0c;这是一项创新技术&#xff0c;利用人工智能和视频识别技术来监测工地上的安全帽佩戴情况。 这个系统的主要工作原理是在工地高危区域门口部署安全帽识别系统&a…

前端使用xlsx-js-style导出Excel,带样式,并处理合并单元格边框显示不全和动态插入表头解决

一、在学习之前&#xff0c;先给出一些学习/下载地址&#xff1a; xlsx-js-style下载地址 https://github.com/gitbrent/xlsx-js-style 或者 https://www.npmjs.com/package/xlsx-js-style SheetJS中文教程&#xff1a; https://xlsx.nodejs.cn/docs/csf/cell 二、先看样…

图文深入理解Oracle Network配置管理(二)

本篇图文深入介绍Oracle Network配置管理。 Oracle网络配置的目的 为了方便对Oracle 数据库进行管理&#xff0c;一般以下情况应该对Oracle进行网络配置。 • 在客户端对服务器端数据库进行管理&#xff08;网络客户端管理&#xff09; • 在一台服务器上管理多个数据库&…

fmql之Linux内核定时器

内容依然来自于正点原子。 Linux内核时间管理 内容包括&#xff1a; 系统频率设置节拍率&#xff1a;高节拍率的优缺点全局变量jiffies绕回的概念&#xff08;溢出&#xff09;API函数&#xff08;处理绕回&#xff09; HZ为每秒的节拍数 Linux内核定时器 内容包括&#xf…

基于python的爱心代码游戏实现 面试最常见问题(源码+内容介绍)

开头附上工作招聘面试必备问题噢~~包括综合面试题、无领导小组面试题资源文件免费&#xff01;全文6000干货。 工作招聘无领导小组面试全攻略最常见面试题&#xff08;第一部分&#xff09;共有17章可用于国企私企合资企业工作招聘面试面试必备心得面试总结资源-CSDN文库https…

【重学 MySQL】四十一、子查询举例与分类

【重学 MySQL】四十一、子查询举例与分类 引入子查询在SELECT子句中引入子查询在FROM子句中引入子查询在WHERE子句中引入子查询注意事项 子查询分类标量子查询列子查询行子查询表子查询 子查询注意事项子查询的位置子查询的返回类型别名的使用性能考虑相关性错误处理逻辑清晰 总…

一文带你读懂分库分表,分片,Sharding的许多概念

一文带你读懂分库分表,分片,Sharding的许多概念 分库是将一个库拆分为多个库&#xff0c;分表就是将一个表拆分为多个表。分库分表有垂直拆分和水平拆分。垂直拆分一般是按照业务将表分到不同的库中&#xff08;此种不在本发的讨论范围&#xff09;。水平拆分是将表的数据拆分…

Java---异常及处理

一.异常 1.概念 程序的非正常执行。高级语言都有异常处理机制&#xff08;C&#xff0c;Java&#xff09; 2.一般处理异常的方法 Scanner sc new Scanner(System.in);System.out.println("请输入一个数字:");String s sc.nextLine();if (s.matches("[0-9]&qu…

ViTamin——视觉-语言时代的可扩展视觉模型设计

人工智能咨询培训老师叶梓 转载标明出处 尽管视觉-语言模型&#xff08;VLMs&#xff09;已经取得了显著的成就&#xff0c;但在图像编码器的选择上&#xff0c;传统的视觉Transformer&#xff08;ViT&#xff09;依然是主流。尽管Transformer在文本编码领域已经证明了其有效性…

【C++笔记】初始模版和STL简介

【C笔记】初始模版和STL简介 &#x1f525;个人主页&#xff1a;大白的编程日记 &#x1f525;专栏&#xff1a;C笔记 文章目录 【C笔记】初始模版和STL简介前言一.初始模版1.1泛型编程1.2函数模版1.3类模板 二.STL简介2.1什么是STL2.2STL的版本2.3STL的六大组件2.4STL的重要…