Flink_DataStreamAPI_源算子Source

Flink_DataStreamAPI_源算子Source

  • 1从集合中读取数据
  • 2从文件读取数据
  • 3从Socket读取数据
  • 4从Kafka读取数据
  • 5从数据生成器读取数据
  • Flink支持的数据类型
    • 1)Flink的类型系统
    • 2)Flink支持的数据类型
    • 3)类型提示(Type Hints)

1从集合中读取数据

最简单的读取数据的方式,就是在代码中直接创建一个Java集合,然后调用执行环境的fromCollection方法进行读取。这相当于将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用,一般用于测试。

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();List<Integer> data = Arrays.asList(1, 22, 3);DataStreamSource<Integer> ds = env.fromCollection(data);stream.print();env.execute();
}

2从文件读取数据

真正的实际应用中,自然不会直接将数据写在代码中。通常情况下,我们会从存储介质中获取数据,一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。读取文件,需要添加文件连接器依赖:

 <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version>
</dependency>
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("input/word.txt")).build();env.fromSource(fileSource,WatermarkStrategy.noWatermarks(),"file").print();env.execute();
}

说明:
参数可以是目录,也可以是文件;还可以从HDFS目录下读取,使用路径hdfs://…;
路径可以是相对路径,也可以是绝对路径;
相对路径是从系统属性user.dir获取路径:idea下是project的根目录,standalone模式下是集群节点根目录;

3从Socket读取数据

不论从集合还是文件,我们读取的其实都是有界数据。在流处理的场景中,数据往往是无界的。
我们之前用到的读取socket文本流,就是流处理场景。但是这种方式由于吞吐量小、稳定性较差,一般也是用于测试。

DataStream<String> stream = env.socketTextStream("localhost", 7777);

4从Kafka读取数据

Flink官方提供了连接工具flink-connector-kafka,直接帮我们实现了一个消费者FlinkKafkaConsumer,它就是用来读取Kafka数据的SourceFunction。
所以想要以Kafka作为数据源获取数据,我们只需要引入Kafka连接器的依赖。Flink官方提供的是一个通用的Kafka连接器,它会自动跟踪最新版本的Kafka客户端。目前最新版本只支持0.10.0版本以上的Kafka。这里我们需要导入的依赖如下。

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version>
</dependency>
public class SourceKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092").setTopics("topic_1").setGroupId("atguigu").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()) .build();DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");stream.print("Kafka");env.execute();}
}

5从数据生成器读取数据

Flink从1.11开始提供了一个内置的DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。1.17提供了新的Source写法,需要导入依赖:

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version></dependency>
public class DataGeneratorDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataGeneratorSource<String> dataGeneratorSource =new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return "Number:"+value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(10),Types.STRING);env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "datagenerator").print();env.execute();}
}

Flink支持的数据类型

1)Flink的类型系统

Flink使用“类型信息”(TypeInformation)来统一表示数据类型。TypeInformation类是Flink中所有类型描述符的基类。它涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化器、反序列化器和比较器。

2)Flink支持的数据类型

对于常见的Java和Scala数据类型,Flink都是支持的。Flink在内部,Flink对支持不同的类型进行了划分,这些类型可以在Types工具类中找到:

(1)基本类型
所有Java基本类型及其包装类,再加上Void、String、Date、BigDecimal和BigInteger。

(2)数组类型
包括基本类型数组(PRIMITIVE_ARRAY)和对象数组(OBJECT_ARRAY)。

(3)复合数据类型
-Java元组类型(TUPLE):这是Flink内置的元组类型,是Java API的一部分。最多25个字段,也就是从Tuple0~Tuple25,不支持空字段。
-Scala 样例类及Scala元组:不支持空字段。
-行类型(ROW):可以认为是具有任意个字段的元组,并支持空字段。
-POJO:Flink自定义的类似于Java bean模式的类。

(4)辅助类型
Option、Either、List、Map等。

(5)泛型类型(GENERIC)
Flink支持所有的Java类和Scala类。不过如果没有按照上面POJO类型的要求来定义,就会被Flink当作泛型类来处理。Flink会把泛型类型当作黑盒,无法获取它们内部的属性;它们也不是由Flink本身序列化的,而是由Kryo序列化的。
在这些类型中,元组类型和POJO类型最为灵活,因为它们支持创建复杂类型。而相比之下,POJO还支持在键(key)的定义中直接使用字段名,这会让我们的代码可读性大大增加。所以,在项目实践中,往往会将流处理程序中的元素类型定为Flink的POJO类型。
Flink对POJO类型的要求如下:
-类是公有(public)的
-有一个无参的构造方法
-所有属性都是公有(public)的
-所有属性的类型都是可以序列化的

3)类型提示(Type Hints)

Flink还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于Java中泛型擦除的存在,在某些特殊情况下(比如Lambda表达式中),自动提取的信息是不够精细的——只告诉Flink当前的元素由“船头、船身、船尾”构成,根本无法重建出“大船”的模样;这时就需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。
为了解决这类问题,Java API提供了专门的“类型提示”(type hints)。
回忆一下之前的word count流处理程序,我们在将String类型的每个词转换成(word, count)二元组后,就明确地用returns指定了返回的类型。因为对于map里传入的Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>。只有显式地告诉系统当前的返回类型,才能正确地解析出完整数据。

.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));

Flink还专门提供了TypeHint类,它可以捕获泛型的类型信息,并且一直记录下来,为运行时提供足够的信息。我们同样可以通过.returns()方法,明确地指定转换之后的DataStream里元素的类型。

returns(new TypeHint<Tuple2<Integer, SomeType>>(){})

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/14525.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

KPaaS洞察|异构系统中用户角色与权限分类及管理解决方案

多个异构系统的使用已经成为企业常态。每个系统通常有自己独立的用户角色和权限设置&#xff0c;导致权限管理复杂且容易出现冲突。如何在多个异构系统中统一、有效地进行用户角色和权限管理&#xff0c;已成为企业保障数据安全和提升管理效率的关键挑战。通过集中式权限管理平…

食品配送管理系统(源码+文档+部署+讲解)

食品配送管理系统是成品商业化项目&#xff0c;系统可基于源码二开。 系统概述 餐饮食品配送&#xff0c;包含配送人APP、下单APP、管理端等&#xff0c;实现订餐、配餐&#xff0c;用于食品店、中央厨房等订餐、团餐业务 本项目名称为食品配送系统&#xff0c;是针对食品配…

领夹麦克风哪个品牌音质最好?无线领夹麦克风可以唱歌吗?

随着短视频和直播行业的蓬勃发展&#xff0c;无线领夹麦克风已成为内容创作者提升音质体验的关键一环。但遗憾的是&#xff0c;市场上充斥着太多夸大其词、华而不实的宣传&#xff0c;诸如“一键降噪”、“无损传输”等概念满天飞&#xff0c;让消费者难以分辨真伪。许多朋友在…

大模型学习笔记------BLIP模型的再思考

大模型学习笔记------BLIP模型的再思考 1、BLIP推理---如何进行“图生文”2、BLIP推理---如何进行视觉问答&#xff08;Visual Question Answering, VQA&#xff09;3、BLIP推理---如何进行图文检索&#xff08;Image-text retrieval&#xff09;任务4、总结 上一篇文章上文中讲…

c# 调用c++ 的dll 出现找不到函数入口点

今天在调用一个设备的dll文件时遇到了一点波折&#xff0c;因为多c 不熟悉&#xff0c;调用过程张出现了找不到函数入口点&#xff0c;一般我们使用c# 调用c 文件&#xff0c;还是比较简单。 [DllImport("AtnDll2.dll",CharSet CharSet.Ansi)]public static extern …

H5BuildX发行uniapp h5版本的正确姿势

在manifest.json中配置基础路径 在上传到服务器后&#xff0c;需要将打包后的文件夹修改为基础路径中相同的文件名 否则网页的css、js等资源文件会因为路径问题始终访问不了

C++(Qt)软件调试---符号转换工具cv2pdb (24)

C(Qt)软件调试—符号转换工具cv2pdb &#xff08;24&#xff09; 文章目录 C(Qt)软件调试---符号转换工具cv2pdb &#xff08;24&#xff09;[toc]1、概述&#x1f41c;2、下载cv2pdb&#x1fab2;3、qt程序使用cv2pdb&#x1f9a7;1.1 方法1&#xff1a;命令行1.2 方法2&#…

MySQL技巧之跨服务器数据查询:高级篇-先调用A数据库的MySql存储过程再复制到B数据库的表中

MySQL技巧之跨服务器数据查询&#xff1a;高级篇-先调用A数据库的MySql存储过程再复制到B数据库的表中 基础篇已经描述&#xff1a;借用微软的SQL Server ODBC 即可实现MySQL跨服务器间的数据查询。 而且还介绍了如何获得一个在MS SQL Server 可以连接指定实例的MySQL数据库的…

ReactPress与WordPress:两大开源发布平台的对比与选择

ReactPress与WordPress&#xff1a;两大开源发布平台的对比与选择 在当今数字化时代&#xff0c;内容管理系统&#xff08;CMS&#xff09;已成为各类网站和应用的核心组成部分。两款备受欢迎的开源发布平台——ReactPress和WordPress&#xff0c;各自拥有独特的优势和特点&am…

Python多进程间通讯(包含共享内存方式)

文章目录 1 通过非共享内存配合队列方式2 通过共享内存配合队列方式 注&#xff1a;本博文测试环境为Linux系统。 1 通过非共享内存配合队列方式 下面是一个常见的生产者与消费者的模式示例&#xff0c;这里分别启动了两个子进程&#xff0c;一个为生产者&#xff08;producer…

djang5 官网_polls_app_05( 关于代码测试)

这篇教程从 教程4 结束的地方开始。已经构建了一个网络投票应用程序&#xff0c;现在将为其创建一些自动化测试。 1. 原因&#xff1a; 雅各布卡普兰-莫斯&#xff08;Jacob Kaplan-Moss&#xff09;&#xff0c;Django的原始开发者之一&#xff0c;说过&#xff1a;“没有测…

准双向/弱上拉(标准8051输出模式)、仅为输入(高阻)、开漏输出、推挽输出、上拉电阻、下拉电阻都是什么?

准双向/弱上拉&#xff08;标准8051输出模式&#xff09;&#xff1a; 弱上拉&#xff1a;即输出的1驱动能力是有限的 准双向&#xff1a;可以输入也可以输出 为什么是弱上拉呢&#xff1f; 当三极管断开的时候&#xff0c;“内部输入”处应该是高电平&#xff08;前提的后端…

Linux高阶——1110—死锁问题原子访问线程控制与调度线程同步

目录 1、旋转锁 2、死锁问题 死锁问题举例&#xff1a; 1、双线程死锁 代码 成功截图 2、单线程死锁 死锁问题处理&#xff1a; 死锁问题预防&#xff1a; 有向图 3、原子访问 1、原子访问概念 2、原子访问可用函数 原代码 未加锁代码输出 修改后代码 修改后截…

python入门3

IDE的概念 IDE(Integrated Development Environment)又被称为集成开发环境。说白了&#xff0c;就是有一款图形化界面的软件&#xff0c;它集成了编辑代码&#xff0c;编译代码&#xff0c;分析代码&#xff0c;执行代码以及调试代码等功能。在我们Python开发中&#xff0c;最常…

Ollama—87.4k star 的开源大模型服务框架!!

这一年来&#xff0c;AI 发展的越来越快&#xff0c;大模型使用的门槛也越来越低&#xff0c;每个人都可以在自己的本地运行大模型。今天再给大家介绍一个最厉害的开源大模型服务框架——ollama。 项目介绍 Ollama 是一个开源的大语言模型&#xff08;LLM&#xff09;服务工具…

mysql中的EXISTS和NOT EXISTS使用详解

本文来编写一个实例说下mysql中的EXISTS和NOT EXISTS使用详解 文章目录 exists用法SQL中in, not in, exists, not exists的区别使用实例本文小结 exists用法 exists: 如果括号内子查询语句返回结果不为空&#xff0c;说明where条件成立&#xff0c;就会执行主SQL语句。如果括号…

海量数据去重的哈希与布尔过滤器

目录 散列表 hash与平衡二叉树比较: 散列表组成: hash函数 作用&#xff1a; 怎么选择hash&#xff1a; 选择标准: 常用hash: hash的操作: hash冲突 产生原因 如何描述冲突程度: 解决冲突: 在合理范围内:used < size: 不在合理范围内&#xff08;used > s…

快速掌握——python类 封装[私有属性方法]、继承【python进阶】(内附代码)

1.类的定义 与 实例化对象 在python中使用class关键字创建一个类。 举例子 class Stu(object):id 1001name 张三def __init__(self):passdef fun1(self):pass# 实例化对象 s1 Stu() s2 Stu() print(s1.name) print(s2.name) 第一个方法 __init__是一种特殊的方法&#x…

PO 证书链

提到服务器间证书交换会不会头大,这两天遇到一个B2B接口的通讯证书问题,借机涨姿势,分享之 通常服务器之间通讯证书使用有两种方式: 如果不是生产机,可以简单的使用自签名证书,自签名证书就是下面这两个信息相同,都是自己,工具这里就不介绍了,多的是。双方互换证书,你…

HarmonyOS App 购物助手工具的开发与设计

文章目录 摘要引言功能需求分析技术方案与设计架构设计技术选型 代码示例Demo数据抓取模块数据存储模块历史价格查询和数据可视化模块完整界面布局和调用示例代码详解 QA环节总结参考资料 摘要 随着促销活动的增多&#xff0c;用户面临真假折扣的困惑&#xff0c;特别是在一些…