Flink--6、输出算子(连接到外部系统、文件、kafka、MySQL、自定义Sink)

在这里插入图片描述
                       星光下的赶路人star的个人主页

                      世间真正温煦的春色,都熨帖着大地,潜伏在深谷

文章目录

  • 1、输出算子(Sink)
    • 1.1 连接到外部系统
    • 1.2 输出到文件
    • 1.3 输出到Kafka
    • 1.4 输出到MySQL(JDBC)
    • 1.4 自定义Sink输出

1、输出算子(Sink)

Flink作为数据处理框架,最终还是要把计算处理的结果写入外部储存,为外部应用提供支持。
在这里插入图片描述

1.1 连接到外部系统

Flink的DataStream API专门提供了向外部提供写入数据的方法:addSink。与addSource类似,addSink方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的。Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。
Flink1.12以前,Sink算子的创建是通过调用DataStream的.addSink()方法实现的。

stream.addSink(new SinkFunction());

addSink方法同样需要传入一个参数,实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。

Flink1.12开始,同样重构了Sink架构,

stream.sinkTo()

当然,Sink多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink,它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示,列出了Flink官方目前支持的第三方系统连接器:

在这里插入图片描述
我们可以看到,像Kafka之类流式系统,Flink提供了完美对接,source/sink两端都能连接,可读可写;而对于Elasticsearch、JDBC等数据存储系统,则只提供了输出写入的sink连接器。

除Flink官方之外,Apache Bahir框架,也实现了一些其他第三方系统与Flink的连接器。
在这里插入图片描述
除此以外,就需要用户自定义实现sink连接器了。

1.2 输出到文件

Flink专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。
FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方法:

  • 行编码: FileSink.forRowFormat(basePath,rowEncoder)。
  • 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。
public class SinkFile {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 每个目录中,都有 并行度个数的 文件在写入env.setParallelism(2);// 必须开启checkpoint,否则一直都是 .inprogressenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return "Number:" + value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(1000),Types.STRING);DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");// 输出到文件系统FileSink<String> fieSink = FileSink// 输出行式存储的文件,指定路径、指定编码.<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))// 输出文件的一些配置: 文件名的前缀、后缀.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("atguigu-").withPartSuffix(".log").build())// 按照目录分桶:如下,就是每个小时一个目录.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))// 文件滚动策略:  1分钟 或 1m.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMinutes(1)).withMaxPartSize(new MemorySize(1024*1024)).build()).build();dataGen.sinkTo(fieSink);env.execute();}
}

1.3 输出到Kafka

(1)添加Kafka 连接器依赖
由于我们已经测试过从Kafka数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。
(2)启动Kafka集群
(3)编写输出到Kafka的示例代码

输出无key的record:

public class SinkKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 如果是精准一次,必须开启checkpoint(后续章节介绍)env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("hadoop102", 7777);/*** Kafka Sink:* TODO 注意:如果要使用 精准一次 写入Kafka,需要满足以下条件,缺一不可* 1、开启checkpoint(后续介绍)* 2、设置事务前缀* 3、设置事务超时时间:   checkpoint间隔 <  事务超时时间  < max的15分钟*/KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口.setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")// 指定序列化器:指定Topic名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("ws").setValueSerializationSchema(new SimpleStringSchema()).build())// 写到kafka的一致性级别: 精准一次、至少一次.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 如果是精准一次,必须设置 事务的前缀.setTransactionalIdPrefix("atguigu-")// 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"").build();sensorDS.sinkTo(kafkaSink);env.execute();}
}

自定义序列化器,实现带key的record:

public class SinkKafkaWithKey {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);env.setRestartStrategy(RestartStrategies.noRestart());SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("hadoop102", 7777);/*** 如果要指定写入kafka的key,可以自定义序列化器:* 1、实现 一个接口,重写 序列化 方法* 2、指定key,转成 字节数组* 3、指定value,转成 字节数组* 4、返回一个 ProducerRecord对象,把key、value放进去*/KafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092").setRecordSerializer(new KafkaRecordSerializationSchema<String>() {@Nullable@Overridepublic ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {String[] datas = element.split(",");byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);byte[] value = element.getBytes(StandardCharsets.UTF_8);return new ProducerRecord<>("ws", key, value);}}).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("atguigu-").setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "").build();sensorDS.sinkTo(kafkaSink);env.execute();}
}

运行代码,在Linux主机启动一个消费者,查看是否收到数据

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ws

1.4 输出到MySQL(JDBC)

(1)添加依赖

<!--mysql驱动 -->
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.27</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version>
</dependency>

(2)启动MySQL,在test库下建表

CREATE TABLE `ws` (`id` varchar(100) NOT NULL,`ts` bigint(20) DEFAULT NULL,`vc` int(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

(3)输出到MySQL的示例代码

public class SinkMySQL {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());/*** TODO 写入mysql* 1、只能用老的sink写法: addsink* 2、JDBCSink的4个参数:*    第一个参数: 执行的sql,一般就是 insert into*    第二个参数: 预编译sql, 对占位符填充值*    第三个参数: 执行选项 ---》 攒批、重试*    第四个参数: 连接选项 ---》 url、用户名、密码*/SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink("insert into ws values(?,?,?)",new JdbcStatementBuilder<WaterSensor>() {@Overridepublic void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {//每收到一条WaterSensor,如何去填充占位符preparedStatement.setString(1, waterSensor.getId());preparedStatement.setLong(2, waterSensor.getTs());preparedStatement.setInt(3, waterSensor.getVc());}},JdbcExecutionOptions.builder().withMaxRetries(3) // 重试次数.withBatchSize(100) // 批次的大小:条数.withBatchIntervalMs(3000) // 批次的时间.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8").withUsername("root").withPassword("000000").withConnectionCheckTimeoutSeconds(60) // 重试的超时时间.build());sensorDS.addSink(jdbcSink);env.execute();}
}

(4)运行代码,用客户端连接MySQL,查看是否成功写入数据。

1.4 自定义Sink输出

如果我们想将数据存储到我们自己的存储设备中,而Flink并没有提供可以直接使用的连接器,就只能自定义Sink进行输出了。与Source类似,Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。

stream.addSink(new MySinkFunction<String>());

在实现SinkFunction的时候,需要重写的一个关键方法invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。
这种方式比较通用,对于任何外部存储系统都有效;不过自定义Sink想要实现状态一致性并不容易,所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。
在这里插入图片描述
                      您的支持是我创作的无限动力

在这里插入图片描述
                      希望我能为您的未来尽绵薄之力

在这里插入图片描述
                      如有错误,谢谢指正;若有收获,谢谢赞美

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/142790.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

[密码学入门]仿射密码(Affine)

加密算法y(axb)mod N 解密算法x*(y-b)mod N(此处的为a关于N的乘法逆元&#xff0c;不是幂的概念&#xff09; 如何求&#xff0c;涉及的知识挺多&#xff0c;还没想好怎么写&#xff0c;丢番图方程&#xff0c;贝祖定理&#xff08;又译裴蜀定理&#xff09;&#xff0c;扩展欧…

IO流————

一、字符流 前面我们学习了字节流,使用字节流可以读取文件中的字节数据。但是如果文件中有中文,使用字节流来读取,就有可能读到半个汉字的情况,这样会导致乱码。虽然使用读取全部字节的方法不会出现乱码,但是如果文件过大又不太合适。 所以Java专门为我们提供了另外一种…

opencv: 解决保存视频失败的问题

摘要&#xff1a;opencv能读取视频&#xff0c;但保存视频时报错。 一、首先要确保已经下载了openh264.dll文件&#xff0c;否则保存的视频无法打开&#xff0c;详细可以浏览这个&#xff1a;opencv&#xff1a;保存视频。 二、保存视频时出现一下问题&#xff1a; OpenCV:…

vue_Delete `␍`eslint(prettier/prettier)

Delete ␍eslint(prettier/prettier) 错误的解决方案 问题背景 在Windows笔记本上新拉完代码&#xff0c;在执行pre-commit时&#xff0c;出现如下错误&#xff1a; Delete ␍eslint(prettier/prettier)问题根源 罪魁祸首是git的一个配置属性&#xff1a;core.autocrlf 由于…

【空间-光谱联合注意网络:多时相遥感图像】

A Spatial–Spectral Joint Attention Network for Change Detection in Multispectral Imagery &#xff08;一种用于多光谱图像变化检测的空间-光谱联合注意网络&#xff09; 变化检测是通过比较双时相图像来确定和评估变化&#xff0c;这是遥感领域的一项具有挑战性的任务…

Oracle VM VirtualBox安装并下载安装CentOS7

Oracle VM VirtualBox安装并下载安装CentOS7 Oracle VM VirtualBox下载CentOS创建虚拟机 Oracle VM VirtualBox VM下载链接 https://www.oracle.com/cn/virtualization/virtualbox/ 点击链接直接下载就行&#xff0c;下载完默认安装或者更改一下安装目录。 下载CentOS http://…

快速排序与冒泡排序以及代码

快速排序 快速排序&#xff08;Quicksort&#xff09;是一种常用的排序算法&#xff0c;它基于分治的思想。 时间复杂度&#xff1a;O&#xff08;nlogn&#xff09; 空间复杂度&#xff1a;O&#xff08;logn&#xff09; 快速排序的基本思想如下&#xff1a; 选择一个元素…

vue3 + mark.js | 实现文字标注功能

页面效果 具体实现 新增 1、监听鼠标抬起事件&#xff0c;通过window.getSelection()方法获取鼠标用户选择的文本范围或光标的当前位置。2、通过 选中的文字长度是否大于0或window.getSelection().isCollapsed (返回一个布尔值用于描述选区的起始点和终止点是否位于一个位置&…

设计模式 - 代理模式

目录 一. 前言 二. 实现 三. 静态代理和动态代理 一. 前言 代理模式&#xff08;Proxy Pattern&#xff09;&#xff0c;为某个对象提供一种代理以控制对对象的访问。即客户端可通过代理对象间接访问目标对象&#xff0c;同时可限制、增强、修改目标对象的一些特性。访问者不…

KF32A学习笔记(一):工程导入、编译烧录方法(KF32 IDE+ KF32 PRO)

目录 概述KF32 IDE打开现有项目工程1.工程导入2.编译工程3.下载程序 KF32 PRO 概述 本文主要是对KF32A150芯片程序的编译、烧录方法进行说明。针对开发过程中的编译烧录和无代码情况下的烧录两种场景&#xff0c;需要安装ChipON PRO KF32和ChipON IDE KF32两个上位机工具&…

华为云智能化组装式交付方案 ——金融级PaaS业务洞察及Web3实践的卓越贡献

伴随信息技术与金融业务加速的融合&#xff0c;企业应用服务平台&#xff08;PaaS&#xff09;已从幕后走向台前&#xff0c;成为推动行业数字化转型的关键力量。此背景下&#xff0c;华为云PaaS智能化组装式交付方案闪耀全场&#xff0c;在近日结束的华为全联接大会 2023上倍受…

vue若依前端项目搭建

1.项目搭建 首先进入到你需要创建的项目目录下面&#xff0c;然后输入命令vue create .创建项目 接下来选择手动搭建&#xff0c;然后把下面图片中的内容选上 再然后继续配置一些参数信息 接下来运行npm run serve项目就启动起来了 2.配置登录界面文件 首先修改src/router…

数据分享|R语言逻辑回归、线性判别分析LDA、GAM、MARS、KNN、QDA、决策树、随机森林、SVM分类葡萄酒交叉验证ROC...

全文链接:http://tecdat.cn/?p27384 在本文中&#xff0c;数据包含有关葡萄牙“Vinho Verde”葡萄酒的信息&#xff08;点击文末“阅读原文”获取完整代码数据&#xff09;。 介绍 该数据集&#xff08;查看文末了解数据获取方式&#xff09;有1599个观测值和12个变量&#xf…

使用EPPlus实现C#控件Excel文件内容导入转换

使用EPPlus实现C#控件Excel文件内容导入转换 1.添加EPPlus库 在使用EPPlus库时&#xff0c;你需要确保在项目中添加了正确的引用。你可以通过以下方式添加引用&#xff1a; 打开你的项目。 在“解决方案资源管理器”中&#xff0c;右键单击“引用”并选择“管理NuGet程序包”…

作为一名独立开发者,如何获取客户?

很多程序员想成为一名独立开发者&#xff0c;从事自由职业&#xff0c;最大的困难在于如何赚钱&#xff0c;进一步来说&#xff0c;就是如何找到自己的客户&#xff0c;有很多开发者拥有丰富的经验&#xff0c;优秀的能力&#xff0c;但无法吸引客户。这篇文章的灵感正是为此而…

Python实现猎人猎物优化算法(HPO)优化随机森林回归模型(RandomForestRegressor算法)项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。 1.项目背景 猎人猎物优化搜索算法(Hunter–prey optimizer, HPO)是由Naruei& Keynia于2022年提出的一种最新的…

一创聚宽的实盘就要关闭了,有没有好用的实盘平台推荐

挺多的&#xff0c;比较普遍的是QMT和Ptrade&#xff0c;python语言&#xff0c;易上手&#xff0c;通用性好&#xff0c;要说适用性可以考虑Ptrade&#xff0c;问一下你的客户经理有没有&#xff0c;用Ptrade的券商也多&#xff0c;如果之前用一创聚宽你可以无缝切换&#xff…

单链表详细解析|画图理解

前言&#xff1a; 在前面我们学习了顺序表&#xff0c;相当于数据结构的凉菜&#xff0c;今天我们正式开始数据结构的硬菜了&#xff0c;那就是链表&#xff0c;链表有多种结构&#xff0c;但我们实际中最常用的还是无头单向非循环链表和带头双向循环链表&#xff0c;我们今天先…

ELF文件结构

目录 ELF文件类型 ELF文件结构 通过链接视角分析目标文件 ELF文件头(ELF Header) 节头表 .text代码节 .data 数据节 .rodata 只读数据节 .bss节 其他常见的节 字符串节 符号表 重定位表 通过运行视角分析目标文件 本节内容&#xff1a; ELF文件类型ELF文件结构 …

san.js源码解读之工具(util)篇——splitStr2Obj函数

一、 源码解析 /*** 将字符串逗号切分返回对象** param {string} source 源字符串* return {Object}*/ function splitStr2Obj(source) {var result {};each( // 2source.split(,), // 1function (key) { // 3result[key] key;});return result; }把字符串通过 split 函数以…