FlinkSql读取外部Mysql和HBase数据库的方法(scala)

我的Flink版本为1.13.6

<flink.version>1.13.6</flink.version>

FlinkSql读取外部的MySQL是走的JDBC所以需要以下两个依赖:

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>1.13.6</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.25</version></dependency>

读取HBase需要如下依赖:

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hbase-2.2_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>

我们首先在虚拟机的MySQL中随意找一张表,我以我这边的ebs库的customer表为例:

表内容如下:

在scala工程中建立一个类,内容如下:

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject MysqlJdbc {def main(args: Array[String]): Unit = {val settings = EnvironmentSettings.newInstance().inStreamingMode().build()val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval tabEnv: StreamTableEnvironment = StreamTableEnvironment.create(see)tabEnv.executeSql("""|CREATE TABLE person(|gender string,|country string,|job string,|credit_type string,|customer_id int ,|first_name string,|last_name string,|email string,|address string,|`language` string,|credit_no string|)|WITH (|'connector' = 'jdbc',|'url' = 'jdbc:mysql://single01:3306/ebs',|'driver' = 'com.mysql.cj.jdbc.Driver',|'username' = 'root',|'password' = 'sakura20031204',|'table-name' = 'customer'|)|""".stripMargin)tabEnv.sqlQuery("""|SELECT * FROM person||""".stripMargin).execute().print()}
}

在flinkSQL中,我建立一张虚拟表person,然后在字段列表中,每一个字段名的数据类型都必须和MySQL中源数据的数据类型相匹配,否则会报错。

其中一些参数解释如下:

'connector' = 'jdbc'

指定连接器类型为jdbc。

'url' = 'jdbc:mysql://single01:3306/ebs'

指定所要连接的MySQL服务器地址以及库名。

'driver' = 'com.mysql.cj.jdbc.Driver'

指定MySQL驱动包。

'username' = 'root'

'password' = 'sakura20031204'

指定登录MySQL用户的账号和密码,我这边为了方便使用的是root用户,实际使用时不能使用root。要保证该用户有读取你指定的那张表的权限。

'table-name' = 'customer'

指定表的名字。

结果如下(截图只有部分内容,因为字段列表长):

同样的,在HBase中先找一张表,我这里以我这边的hbase_test:tranfer_from_mysql为例:

HBase shell中看表内容比较费劲,这张表大致内容有一个列族baseinfo 内容是:

<age INT, gender STRING,name STRING,phone STRING>

在scala工程中建立一个类,内容如下:

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject HBase {def main(args: Array[String]): Unit = {val settings = EnvironmentSettings.newInstance().inStreamingMode().build()val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval tabEnv: StreamTableEnvironment = StreamTableEnvironment.create(see)tabEnv.executeSql("""|CREATE TABLE person(|id INT,|baseinfo ROW<age INT, gender STRING,name STRING,phone STRING>|) WITH (|'connector' = 'hbase-2.2',|'table-name' = 'hbase_test:tranfer_from_mysql',|'zookeeper.quorum' = 'single01:2181',|'zookeeper.znode.parent' = '/hbase'|)|""".stripMargin)tabEnv.sqlQuery("""|SELECT id, baseinfo.age, baseinfo.gender, baseinfo.name, baseinfo.phone|FROM person|WHERE baseinfo.gender = 'female'|and baseinfo.age > 20|""".stripMargin).execute().print()}
}

其中一些参数内容解释如下:

'connector' = 'hbase-2.2'

指定使用的连接器类型,这里是 HBase 的版本 2.2。

'table-name' = 'hbase_test:tranfer_from_mysql'

指定在 HBase 中要访问的表的名称。

'zookeeper.quorum' = 'single01:2181'

指定 HBase 使用的 ZooKeeper 集群的地址和端口。

'zookeeper.znode.parent' = '/hbase'

指定 ZooKeeper 中的父节点(znode)。

最终结果如下:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/5433.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

使用Rust实现http/https正向代理

相关库的安装 利用vcpkg安装openssl库 vcpkg install openssl:x64-windows并设置openssl库位置的环境变量 $Env:OPENSSL_DIR"D:/vcpkg/packages/openssl_x64-windows/"安装openssl软件&#xff0c;因为需要利用openssl生成自签名证书 Cargo依赖 [dependencies] …

vue3如何使用pinia设置全局状态,附常见面试题

1. stores/index.ts 文件 在 index.ts 中创建 store 实例并封装了注册逻辑&#xff0c;这样可以方便地将整个 Pinia 实例注册到 Vue 应用中。代码如下&#xff1a; import type { App } from vue import { createPinia } from piniaconst store createPinia()// 全局注册 st…

【微知】Nvida Mellanox网卡中速率SDR、DDR、QDR、FDR、EDR、HDR、NDR全称与速率?

文章目录 综述背景全称早期速率&#xff1a;中期当前 其他 综述 Single Data Rate (SDR) 10Gbps Double Data Rate (DDR) 20Gbps Quad Data Rate (QDR) 40Gbps Fourteen Data Rate (FDR) 56Gbps Enhanced Data Rate (EDR) 100Gbps High Data Rate (HDR) 200Gbps Next Data Rat…

融合虚拟化与容器技术,打造灵活又安全的AI算力服务

随着人工智能技术的不断进步&#xff0c;AI企业在迅速推进大模型业务时&#xff0c;往往会倾向于采用容器化的轻量部署方案。相较于传统的虚拟机部署&#xff0c;容器化在快速部署、资源利用、环境一致性和自动化编排等方面具备显著优势。 然而&#xff0c;容器技术所固有的隔…

Hunyuan-Large:推动AI技术进步的下一代语言模型

腾讯近期推出了基于Transformer架构的混合专家&#xff08;MoE&#xff09;模型——Hunyuan-Large&#xff08;Hunyuan-MoE-A52B&#xff09;。该模型目前是业界开源的最大MoE模型之一&#xff0c;拥有3890亿总参数和520亿激活参数&#xff0c;展示了极强的计算能力和资源优化优…

岛屿数量 广搜版BFS C#

和之前的卡码网深搜版是一道题 力扣第200题 99. 岛屿数量 题目描述 给定一个由 1&#xff08;陆地&#xff09;和 0&#xff08;水&#xff09;组成的矩阵&#xff0c;你需要计算岛屿的数量。岛屿由水平方向或垂直方向上相邻的陆地连接而成&#xff0c;并且四周都是水域。…

本地使用conda创建django虚拟环境

1、首先本地安装好conda。 2、创建django的虚拟环境 conda create -n django # 这里的 django只是虚拟的名称&#xff0c;自己随便名字就行&#xff0c;只要你自己知道这个是django的虚拟环境就行。 3、安装成功&#xff0c;查看虚拟环境 conda env list 4、激活虚拟环境…

rabbitMQ

官网&#xff1a;https://www.rabbitmq.com/ 一 介绍与安装 1 安装 我们同样基于Docker来安装RabbitMQ&#xff0c;使用下面的命令即可&#xff1a; docker run \-e RABBITMQ_DEFAULT_USERitheima \-e RABBITMQ_DEFAULT_PASS123321 \-v mq-plugins:/plugins \--name rabbi…

reg注册表研究与物理Hack

reg注册表研究与物理Hack 声明&#xff1a;内容的只是方便各位师傅学习知识&#xff0c;以下网站只涉及学习内容&#xff0c;其他的都与本人无关&#xff0c;切莫逾越法律红线&#xff0c;否则后果自负。 目录 reg注册表研究与物理HackWindows注册表修改注册表实现应用程序开机…

【黑盒测试】等价类划分法及实例

本文主要介绍黑盒测试之等价类划分法&#xff0c;如什么是等价类划分法&#xff0c;以及如何划分&#xff0c;设计等价类表。以及关于三角形案例的等价类划分法。 文章目录 一、什么是等价类划分法 二、划分等价类和列出等价类表 三、确定等价类的原则 四、建立等价类表 …

适用于个人或团队的文档管理和知识库系统,NAS快速部署『BookStack』

适用于个人或团队的文档管理和知识库系统&#xff0c;NAS快速部署『BookStack』 哈喽小伙伴们好&#xff0c;我是Stark-C~ 知识库对于很多需要和文字打交道的个人或者团队都不陌生对吧&#xff1f;对于我们个人来说&#xff0c;它可以将常用的学习资料、工作笔记、项目计划和…

delphi fmx android 自动更新(一)

12.2 android10测试通过 一,安卓权限设置 1,REQUEST_INSTALL_PACKAGES 权限 2,INTERNET 权限 3,READ_EXTERNAL_STORAGE 权限 4,WRITE_EXTERNAL_STORAGE 权限 5,READ_PHONE_STATE 二,安卓下载过程 一般是从http下载安装包 apk 所以,如果是http 则,manife…

《JVM第7课》堆区

文章目录 1.概念2.指定堆大小3.新生代和老年代3.1 新生代3.2 老年代3.3 动画演示 4.分代收集理念 1.概念 堆是JVM中最重要的一块区域&#xff0c;JVM规范中规定所有的对象和数组都应该存放在堆中&#xff0c;在执行字节码指令时&#xff0c;会把创建的对象存入堆中&#xff0c…

【笔记】自动驾驶预测与决策规划_Part6_不确定性感知的决策过程

文章目录 0. 前言1. 部分观测的马尔可夫决策过程1.1 POMDP的思想以及与MDP的联系1.1.1 MDP的过程回顾1.1.2 POMDP定义1.1.3 与MDP的联系及区别POMDP 视角MDP 视角决策次数对最优解的影响 1.2 POMDP的3种常规解法1.2.1 连续状态的“Belief MDP”方法1. 信念状态的定义2. Belief …

Spring Boot框架下的知识管理与多维分类

4 系统设计 系统分析接下来的操作步骤就是系统的设计&#xff0c;这部分内容也是不能马虎对待的。因为生活都是在不断产生变化&#xff0c;人们需求也是在不断改变&#xff0c;开发技术也是在不断升级&#xff0c;所以程序也需要考虑在今后可以方便进行功能扩展&#xff0c;完成…

LeetCode17. 电话号码的字母组合(2024秋季每日一题 59)

给定一个仅包含数字 2-9 的字符串&#xff0c;返回所有它能表示的字母组合。答案可以按 任意顺序 返回。 给出数字到字母的映射如下&#xff08;与电话按键相同&#xff09;。注意 1 不对应任何字母。 示例 1&#xff1a; 输入&#xff1a;digits “23” 输出&#xff1a;[“…

Nature Methods | 基于流形约束的RNA速度推断精准解析细胞周期动态调节规律

生信碱移 VeloCycle算法 VeloCycle&#xff1a;基于流形约束的RNA速度推断在细胞周期动态中的精准解析 今天给各位老铁们分享一篇于2024年10月31号发表在 Nature Methods [IF: 36.1] 的文章&#xff1a;"Statistical inference with a manifold-constrained RNA velocity…

Spring挖掘:(AOP篇)

学习AOP时,我们首先来了解一下何为AOP 一. 概念 AOP&#xff08;面向切面编程&#xff0c;Aspect Oriented Programming&#xff09;是一种编程技术&#xff0c;旨在通过预编译方式或运行期动态代理实现程序功能的统一管理和增强。AOP的主要目标是在不改变原有业务逻辑代码的…

【机器学习】k最近邻分类

&#x1f4dd;本文介绍 本文为作者阅读鸢尾花书籍以及一些其他资料的k最近邻分类后&#xff0c;所作笔记 &#x1f44b;作者简介&#xff1a;一个正在积极探索的本科生 &#x1f4f1;联系方式&#xff1a;943641266(QQ) &#x1f6aa;Github地址&#xff1a;https://github.com…

《深度学习》bert自然语言处理框架

目录 一&#xff0c;关于bert框架 1、什么是bert 2、模型结构 自注意力机制&#xff1a; 3、预训练任务 4、双向性 5、微调&#xff08;Fine-tuning&#xff09; 6、表现与影响 二、Transformer 1、传统RNN网络计算时存在的问题 1&#xff09;串联 2&#xff09;并行…