FlinkSql读取kafka数据流的方法(scala)

我的scala版本为2.12

<scala.binary.version>2.12</scala.binary.version>

我的Flink版本为1.13.6

<flink.version>1.13.6</flink.version>

FlinkSql读取kafka数据流需要如下依赖:

    <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>

我们首先建一个kafka主题person_test。然后建立一个scala类作为kafka的生产者,示例内容如下:

package cha01import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}import java.util.Properties
import Util._import scala.util.Randomobject FlinkSqlKafkaConnectorProducer {def main(args: Array[String]): Unit = {val producerConf = new Properties()producerConf.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"single01:9092")producerConf.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"10")producerConf.setProperty(ProducerConfig.LINGER_MS_CONFIG,"50")producerConf.setProperty(ProducerConfig.RETRIES_CONFIG,"2")producerConf.setProperty(ProducerConfig.ACKS_CONFIG,"1")producerConf.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerSerializer")producerConf.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")val topic = "person_test"val producer:KafkaProducer[Integer,String] = new KafkaProducer(producerConf);val rand = new Random()for(i <- 1 to 2000){val line: String = s"$i,Origami$i,${rand.nextInt(30)+18},${if (rand.nextInt(10) >=8) "Male" else "Female"}"val record: ProducerRecord[Integer, String] =new ProducerRecord[Integer, String](topic, 0, System.currentTimeMillis(), i, line)producer.send(record)Thread.sleep(50+rand.nextInt(500))}producer.flush()producer.close()}
}

此kafka生产者会随机生产出一系列类似以下内容的数据,类型为csv:

id,name,age,gender
1,Origami1,25,Female
2,Origami2,30,Male
3,Origami3,22,Female

随后再在工程中建立一个scala类,内容示例如下:

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject CSVKafkaSystem {def main(args: Array[String]): Unit = {val settings = EnvironmentSettings.newInstance().inStreamingMode().build()val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval tabEnv: StreamTableEnvironment = StreamTableEnvironment.create(see)tabEnv.executeSql("""|CREATE TABLE person(|id INT,|name STRING,|age INT,|gender STRING|) WITH (|'connector' = 'kafka',|'topic'= 'person_test',|'properties.bootstrap.servers' = 'single01:9092',|'properties.group.id' = 'person_test_group',|'scan.startup.mode' = 'earliest-offset',|'format' = 'csv',|'csv.ignore-parse-errors' = 'true',|'csv.field-delimiter' = ','|)|""".stripMargin)tabEnv.sqlQuery("SELECT * FROM person").execute().print()}
}

其中的一些参数解释如下:'

connector' = 'kafka'

指定连接器类型为kafka

'topic'= 'person_test'

指定要读取的kafka主题为person_test

'properties.bootstrap.servers' = 'single01:9092'

指定kafka所在的服务器的ip地址以及端口号

'properties.group.id' = 'person_test_group'

指定 Kafka 消费者组的 ID为person_test_group

'scan.startup.mode' = 'earliest-offset'

指定了控制 Flink 从 Kafka 中读取数据时的起始位置

  • earliest-offset 表示从 Kafka 中每个分区的最早消息开始读取。
  • latest-offset 表示从 Kafka 中每个分区的最新消息开始读取。
  • group-offsets 表示使用 Kafka 消费者组的偏移量来恢复上次消费的位置

'format' = 'csv'

指定了 kafka 消息的格式为csv

'csv.ignore-parse-errors' = 'true'

指定了忽略解析异常的信息

'csv.field-delimiter' = '

指定 CSV 数据中字段的分隔符为,

可以看到最终结果如下,数据在源源不断的生成,flinkSQL也在源源不断的读取表内容

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/13269.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

力扣 LeetCode 19. 删除链表的倒数第N个结点(Day2:链表)

解题思路&#xff1a; 快慢指针 class Solution {public ListNode removeNthFromEnd(ListNode head, int n) {ListNode dummy new ListNode(-1);dummy.next head;ListNode fast dummy;ListNode slow dummy;for (int i 0; i < n; i) {fast fast.next;}while (fast.ne…

提升法律文书处理效率的秘密武器:开源文档比对工具解析

本篇文章介绍了一款针对律师行业的免费开源文档比对工具&#xff0c;旨在解决法律文档的多版本比对难题。通过逐字、逐句精确比对、语义分析、批量处理等核心功能&#xff0c;该工具可高效识别文本差异&#xff0c;提升文书审查效率并降低错误风险。它支持多种文件格式&#xf…

linux命令详解,openssl+历史命令详解

openssl openssl是一个开源的加密工具包&#xff0c;提供了各种加密、解密、签名、验证等功能 openssl passwd -1 123password表示这个命令用于处理密码相关的操作&#xff0c;-1参数指定使用MD5加密算法对密码“123”进行加密处理。MD5是一种常用的哈希算法&#xff0c;它将…

轻松理解操作系统 - Linux的虚拟文件系统是如何简化我们的使用的?

在前面几期&#xff0c;我们不仅了解了 Linux文件系统 是如何在硬盘等储存介质上保存文件的&#xff1a; 什么是软硬链接 文件的“身份证” - inode 真正储存文件的地方 - 数据块 文件系统的心脏 - 超级块 以及了解了 Linux系统 中具体都有一些什么文件&#xff1a; Linu…

LeetCode【0019】删除链表的倒数第N个结点

本文目录 1 中文题目2 求解方法&#xff1a;虚拟头节点和快慢指针2.1 方法思路2.2 Python代码2.3 复杂度分析 3 题目总结 1 中文题目 给定一个链表&#xff0c;删除链表的倒数第 n 个结点&#xff0c;并且返回链表的头结点。 示例&#xff1a; 链表如上&#xff1a; 输入&a…

【JavaSE】多线程案例---阻塞队列

1. 阻塞队列 阻塞队列是一种特殊的队列&#xff0c;也遵守 " 先进先出 " 的原则。 阻塞队列是一种线程安全的数据结构&#xff0c;并且具有以下特性&#xff1a; 1. 当队列为满时&#xff0c;继续进行入队列操作就会阻塞&#xff0c;直到有其他线程从队列中取走元素…

SQL练习(2)

题源&#xff1a;牛客官网 选择题 假设创建新用户nkw&#xff0c;现在想对于任何IP的连接&#xff0c;仅拥有user数据库里面的select和insert权限&#xff0c;则列表语句中能够实现这一要求的语句是&#xff08;&#xff09; A grant select ,insert on *.* to nkw% B grant…

Hyper-v中ubuntu与windows文件共享

Hyper-v中ubuntu与windows文件共享 前言相关链接第一步--第一个链接第二步--第二个链接测试与验证 前言 关于Hyper-V的共享我搞了好久&#xff0c;网上的很多教程太过冗余&#xff0c;我直接采用最简单的办法吧 相关链接 Hyper-V中Ubuntu 同windows系统共享文件夹-百度经验 …

【TCP零窗口问题】

零窗口问题说明 零窗口问题(Zero Window Problem)是指在TCP连接中,当接收方的接收缓冲区已满时,无法接受新的数据。此时,接收方会向发送方发送一个窗口大小为0的TCP消息,告知其暂停发送数据,直到接收方释放出缓冲区空间。这种情况在高负载或接收方处理能力不足时比较常见…

Oracle OCP认证考试考点详解082系列19

题记&#xff1a; 本系列主要讲解Oracle OCP认证考试考点&#xff08;题目&#xff09;&#xff0c;适用于19C/21C,跟着学OCP考试必过。 91. 第91题&#xff1a; 题目 解析及答案&#xff1a; 关于 Oracle 数据库中的索引及其管理&#xff0c;以下哪三个陈述是正确的&#x…

2445.学习周刊-2024年45周

一片树叶展示了秋天的美 ✍优秀博文 数据仓库如何划分主题域在忙碌的工作中如何保持信息的输入&#xff1f;PC小米妙享安装解锁流转补丁智能数据建设与治理Dataphin对方讲话不要乱插嘴轩师处世之道 ✍实用工具 typing-practice云搭 自动化巡检系统 ✍精彩言论 话说的越快、…

关于解决使用VMWare内的虚拟机无法识别USB问题小结

目录 前言 0. 查看是不是没有开启USB3.0的支持 1. 检查一下是否禁用了VMWare USB服务 2. 无奈之举 前言 笔者今天帮助一位同志解决了VMWare内的虚拟机不识别挂载设备的办法。这里对笔者使用的排查手段做一个总结。 0. 查看是不是没有开启USB3.0的支持 我们的第一件事情就…

【364】基于springboot的高校科研信息管理系统

摘 要 信息数据从传统到当代&#xff0c;是一直在变革当中&#xff0c;突如其来的互联网让传统的信息管理看到了革命性的曙光&#xff0c;因为传统信息管理从时效性&#xff0c;还是安全性&#xff0c;还是可操作性等各个方面来讲&#xff0c;遇到了互联网时代才发现能补上自古…

RN codegen编译报错

react-native codegen 编译报错 error: redefinition of ‘NativeAccessibilityInfoSpecJSI’ class JSI_EXPORT NativeAccessibilityInfoSpecJSI : public JavaTurboModule 解决&#xff1a; codegen不能和项目本身一起编译&#xff0c;先执行./gradlew clean&#xff0c;然…

大数据技术之Hadoop :我是恁爹

就如上图中的技术分类&#xff0c;大数据技术主要解决的就是海量数据的存储和计算问题。 这两个问题的解决方案最先被 Google 被提出&#xff0c;用于解决 Google 搜索引擎海量的网页存储和索引的构建。对应的技术就是日后被人所熟知的 HDFS 和 MapReduce。 不关注大数据的可…

ATAT-mcsqs生成准随机结构(SQS)更新

通常使用第一性原理计算某些多元素占据原胞中同一位置的结构会优先考虑使用准随机结构&#xff08;special quasirandom structure&#xff0c;SQS&#xff09;来进行模拟建模。此篇教程意在整理一个较为简便的操作流程&#xff0c;以供参考。 合金理论自动化工具包(ATAT)1是一…

人际交往中,想要有好人缘,需做到“三要”,做到一个,也是好事

人际交往中&#xff0c;想要有好人缘&#xff0c;需做到“三要”&#xff0c;做到一个&#xff0c;也是好事 在这个世上&#xff0c;每个人都是一座孤岛&#xff0c;但通过人际交往这座桥梁&#xff0c;我们能够彼此相连&#xff0c;共同编织出一张温暖的社会网络。 好人缘&a…

政务数据治理专栏开搞!

写在前面 忙忙碌碌干了一年政务数据治理的工作&#xff0c;从法人数据到自然人&#xff0c;从交通到地理信息等等&#xff0c;突发想法开一个专栏讲一讲政务数据遇到的问题&#xff0c;以及治理的成效&#xff0c;或许有朋友爱看。 政务数据&#xff0c;又称之为政务数据资源&a…

Linux最深刻理解页表于物理内存

目录 物理内存管理 页表设计 物理内存管理 如果磁盘上的内容加载到物理内存上&#xff0c;每次io都会按照4kb的方式进行加载(可能不同版本系统有些区别)。所以我们的物理内存上的内容也是4个字节进行管理的。 而每个页框都需要我们进行管理。所以自然物理内存就会对页框进行先…

一键高效管理:苹果手机如何一键删除照片

在我们的日常生活中&#xff0c;苹果手机不仅是沟通的工具&#xff0c;更是捕捉和保存生活瞬间的重要设备。随着时间的推移&#xff0c;数以千计的照片积累在设备中&#xff0c;这不仅占用了大量的存储空间&#xff0c;也可能影响设备的性能。本文将详细介绍苹果手机如何一键删…