五.海量数据实时分析-FlinkCDC+DorisConnector实现数据的全量增量同步

前言

前面四篇文字都在学习Doris的理论知识,也是比较枯燥,当然Doris的理论知识还很多,我们后面慢慢学,本篇文章我们尝试使用SpringBoot来整合Doris完成基本的CRUD。

由于 Doris 高度兼容 Mysql 协议,两者在 SQL 语法方面有着比较强的一致性,另外 Mysql 客户端也是 Doris 官方选择的客户端。因此,如需对 Mysql 进行数据分析,使用 Doris 的迁移成本较低。但是对于数据规模特别大的情况下Mysql的方式还是不太建议的,所以这里还会介绍一种 方式就是通过CDC+DorisConnector

一.整合Mybatis操作Doris

1.准备数据库

CREATE TABLE `doris_test` (`id` int NULL COMMENT "id",`price` decimal(10,2) NULL COMMENT "价格",`title` varchar(20)NULL COMMENT "标题") ENGINE=OLAPDUPLICATE KEY(`id`)DISTRIBUTED BY HASH(`id`) BUCKETS 1PROPERTIES ("replication_num" = "1");
Query OK, 0 rows affected (0.06 sec)

2.搭建SpringBoot项目

第一步:创建SpringBoot项目,导入依赖,主要是Mybatis相关的依赖

<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.1</version><relativePath/></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--如果要用传统的xml或properties配置,则需要添加此依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId></dependency><!--mybatisplus持久层依赖--><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.3.1</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><!-- fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.70</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.23</version></dependency></dependencies>

3.整合Mybatis

创建好启动类,实体类,服务层,持久层,SQL映射文件等,这里和我们操作Mysql没有任何区别.

启动类如下,通过 @MapperScan 来扫描Mapper接口

@SpringBootApplication
@MapperScan(basePackages = "cn.whale.mapper")
public class DorisApplication {public static void main(String[] args) {SpringApplication.run(DorisApplication.class,args);}
}

服务层和持久层代码如下 , 省略了部分代码

@Service
public class DorisServiceImpl implements DorisService {@AutowiredDorisMapper dorisMapper;@Overridepublic List<Order> listDoris() {return dorisMapper.listDoris();}@Overridepublic int add(Order order) {return dorisMapper.add(order);}
}public interface DorisMapper {List<Order> listDoris();int add(Order order);}

下面是sql映射文件,和操作数据库没有任何区别

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="cn.whale.mapper.DorisMapper"><select id="listDoris" resultType="cn.whale.domain.Order">select id,price,title from orders</select><insert id="add" parameterType="cn.whale.domain.Order">INSERT INTO orders(id,price,title) VALUES(#{id},#{price},#{title})</insert>
</mapper>

4.编写配置文件

server:port: 8080
spring:#Doris数据库连接配置datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://192.168.220.253:9030/app_db?characterEncoding=utf-8&useSSL=falseusername: rootpassword: 123456type: com.alibaba.druid.pool.DruidDataSourceinitial-size: 500min-idle: 500max-active: 500#mybatis的相关配置
mybatis:mapper-locations: classpath:mapper/*.xmltype-aliases-package: com.wudl.doris.domain.DorisTest

5.编写测试类

注入Service,完成查询和添加的测试

package cn.whale.service;import cn.whale.DorisApplication;
import cn.whale.domain.Order;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.math.BigDecimal;
import java.util.List;@RunWith(SpringRunner.class)
@SpringBootTest(classes= DorisApplication.class)
public class DorisServiceTest {@Autowiredprivate DorisService dorisService;@Testpublic void listDoris() {List<Order> orders = dorisService.listDoris();orders.stream().forEach(System.out::println);}@Testpublic void addDoris() throws InterruptedException {for (int i = 100 ; i>0 ; i--){Order order = new Order();order.setId(Long.valueOf(i));order.setPrice(new BigDecimal(i));order.setTitle(i+"元");dorisService.add(order);}Thread.sleep(100000);}
}

二.SpringBoot整合FlinkCDC+doris-connector实时同步

上面通过Mysql客户端的方式来进行数据的同步方式虽然使用起来比较简单,但是不适合大量数据的实时同步,性能不是很好。Doris官方提供了doris-connector进行Doris读写,那么我们可以通过FlinkCDC监听Mysql数据变更,然后同步到SpringBoot项目中,对数据进行处理后,然后通过doris-connector同步到Doris。虽然我们之前也介绍过直接通过FlinkCDC同步Mysql到Doris,那种方式我们没办法对数据进行处理。

1.搭建项目导入依赖

版本兼容
在这里插入图片描述

  • flink-connector-mysql-cdc :flinkCDC,实时监听Mysql增量或者全量同步
  • flink-doris-connector :用来读写Doris的驱动
<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.13.6</flink.version></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.13</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><!--mysql -cdc--><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.18</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.13.6</version><scope>provided</scope></dependency><!-- flink doris connector --><dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.13_2.12</artifactId><version>1.0.3</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>1.13.6</version><scope>provided</scope></dependency></dependencies>

2.定义消息序列化器

定义序列化器,当拿到Mysql的数据后我们可以通过虚拟化器对数据进行格式化


/*** @desc mysql消息读取自定义序列化**/
public class MysqlDeserialization implements DebeziumDeserializationSchema<String> {public static final String TS_MS = "ts_ms";public static final String BIN_FILE = "file";public static final String POS = "pos";public static final String CREATE = "CREATE";public static final String BEFORE = "before";public static final String AFTER = "after";public static final String SOURCE = "source";public static final String UPDATE = "UPDATE";/**** 反序列化数据,转为变更JSON对象*/@Overridepublic void deserialize(SourceRecord sourceRecord, Collector<String> collector) {//拿到数据Struct struct = (Struct) sourceRecord.value();//输出数据collector.collect(getJsonObject(struct, AFTER).toJSONString());}/**** 从原数据获取出变更之前或之后的数据*/private JSONObject getJsonObject(Struct value, String fieldElement) {Struct element = value.getStruct(fieldElement);JSONObject jsonObject = new JSONObject();if (element != null) {Schema afterSchema = element.schema();List<Field> fieldList = afterSchema.fields();for (Field field : fieldList) {Object afterValue = element.get(field);jsonObject.put(field.name(), afterValue);}}return jsonObject;}@Overridepublic TypeInformation<String> getProducedType() {return TypeInformation.of(String.class);}
}

3.Mysql监听器

监听器的作用主要是监听Mysql的数据变更后,通过序列化器把数据进行处理后,然后同步到Doris中


/*** @desc mysql变更监听**/
@Component
public class MysqlEventListener implements ApplicationRunner {@Overridepublic void run(ApplicationArguments args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//设置Mysql数据源DebeziumSourceFunction<String> dataChangeInfoMySqlSource = buildDataChangeSource();DataStream<String> streamSource = env.addSource(dataChangeInfoMySqlSource, "mysql-source").setParallelism(1);//streamSource.addSink(dataChangeSink);Properties pro = new Properties();pro.setProperty("format", "json");pro.setProperty("strip_outer_array", "true");//配置Doris信息streamSource.addSink(DorisSink.sink(DorisExecutionOptions.builder().setBatchSize(3).setBatchIntervalMs(10L).setMaxRetries(3).setStreamLoadProp(pro).setEnableDelete(true).build(),DorisOptions.builder().setFenodes("192.168.220.253:8030").setTableIdentifier("app_db.orders").setUsername("root").setPassword("123456").build()));env.execute("mysql-stream-cdc");}/*** 构造变更数据源*/private DebeziumSourceFunction<String> buildDataChangeSource() {return MySqlSource.<String>builder().hostname("192.168.220.253").port(3307).databaseList("app_db").tableList("app_db.orders").username("root").password("123456")/**initial初始化快照,即全量导入后增量导入(检测更新数据写入)* latest:只进行增量导入(不读取历史变化)* timestamp:指定时间戳进行数据导入(大于等于指定时间读取数据)*/.startupOptions(StartupOptions.latest())//序列化.deserializer(new MysqlDeserialization()).serverTimeZone("GMT+8").build();}
}

4.最后编写一个启动类

@SpringBootApplication
public class FlinkCdcApplication {public static void main(String[] args) {SpringApplication.run(FlinkCdcApplication.class, args);}}

5.准备好数据库

我的Mysql使用的是 :app_db.orders 对应的是Doris中的 app_db.orders ,字段类型,字段名需要一致哦。然后启动项目,修改Mysql的数据,Doris会自动同步过去

三.其他同步方式

Doris还支持其他的数据读写方式,具体的可以根据官网案例去尝试地址:https://doris.apache.org/zh-CN/docs/1.2/ecosystem/spark-doris-connector

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1549087.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

G2O (General Graph Optimization)

前言 以高翔的《视觉SLAM14讲》中的 g2o 拟合曲线为例&#xff0c;讲解 g2o 的使用。源文件为 g2oCurveFitting.cpp。 #include <iostream> #include <g2o/core/g2o_core_api.h> #include <g2o/core/base_vertex.h> #include <g2o/core/base_unary_edge.…

一文读懂:监督式微调(SFT)

监督式微调 (Supervised fine-tuning)&#xff0c;也就是SFT&#xff0c;就是拿一个已经学了不少东西的大型语言模型&#xff0c;然后用一些特定的、已经标记好的数据来教它怎么更好地完成某个特定的任务。就好比你已经学会了做饭&#xff0c;但是要特别学会怎么做川菜&#xf…

计算机网络的整体认识---网络协议,网络传输过程

计算机网络背景 网络发展 独立模式: 计算机之间相互独立; 网络互联: 多台计算机连接在一起, 完成数据共享; 局域网LAN: 计算机数量更多了, 通过交换机和路由器连接在一起; 广域网WAN: 将远隔千里的计算机都连在一起;所谓 "局域网" 和 "广域网" 只是一个相…

计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-09-28

计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-09-28 目录 文章目录 计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-09-28目录前言1. Cognitive phantoms in LLMs through the lens of latent variables摘要研究背景问题与挑战创新点算法模型实验效果…

如何调整云桌面安装的虚拟机分辨率?

如何调整云桌面安装的虚拟机分辨率&#xff1f; 1. 编辑GRUB配置文件2. 修改分辨率3. 更新GRUB4. 重启虚拟机 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 在云桌面环境中&#xff0c;虚拟机分辨率过低且无法调整时&#xff0c;可以通过以…

【React】react项目中的redux使用

1. store目录结构设计 2. react组件中使用store中的数据——useSelector 3. react组件中修改store中的数据——useDispatch 4. 示例 react-basic\src\store\moduels\counterStore.js import { createSlice } from reduxjs/toolkitconst counterStore createSlice({name: cou…

LeetCode讲解篇之15. 三数之和

文章目录 题目描述题解思路题解代码 题目描述 题解思路 这道题如果我们直接使用三层循环暴力搜索&#xff0c;时间复杂度是O(n3)&#xff0c;大概率会超时 那还有更优解吗&#xff0c;答案是绝对的&#xff0c;查询搜索想要优化&#xff0c;就要思考如何进行排除法加速搜索过…

OIDC6-OIDC 授权流程类型

OpenID Connect&#xff08;OIDC&#xff09;支持三种主要的授权流程&#xff08;Authorization Flow&#xff09;&#xff0c;分别是授权码流程&#xff08;Authorization Code Flow&#xff09;、隐式流程&#xff08;Implicit Flow&#xff09;和混合流程&#xff08;Hybrid…

OpenCV视频I/O(6)检查视频捕获对象是否已成功打开的函数isOpened()的使用

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 如果视频捕获已经初始化&#xff0c;则返回 true。 如果之前调用 VideoCapture 构造函数或 VideoCapture::open() 成功&#xff0c;则该方法返回…

File systems

inode descriptor 文件系统中核心的数据结构就是inode和file descriptor。后者主要与用户进程进行交互。 inode&#xff0c;这是代表一个文件的对象&#xff0c;并且它不依赖于文件名。实际上&#xff0c;inode是通过自身的编号来进行区分的&#xff0c;这里的编号就是个整数…

游戏厅计时器ps5计算时间的软件 佳易王电玩计时计费管理系统操作教程

一、前言 游戏厅计时器ps5计算时间的软件 佳易王电玩计时计费管理系统操作教程 软件为绿色免安装版&#xff0c;解压即可使用。 二、软件程序教程 计时的时候&#xff0c;点击 开始计时按钮 开台后可设置定时语音提醒的时间 时间设置好后&#xff0c;点击 开启提醒 即可 三、…

C++远端开发环境安装(centos7)

使用VMWare安装centos7 启用网卡设备 修改文件/etc/sysconfig/network-scripts/ifcfg-ens33中的ONBOOTyes 重启网络服务 systemctl restart network 配置yum仓库 直接将如下内容覆盖原有的/etc/yum.repos.d/CentOS-Base.repo文件 清理yum缓存 yum clean all 刷新yum y…

深度学习常见术语介绍

文章目录 数据集&#xff08;Dataset&#xff09;特征&#xff08;Feature&#xff09;标签&#xff08;Label&#xff09;训练集&#xff08;Training Set&#xff09;测试集&#xff08;Test Set&#xff09;验证集&#xff08;Validation Set&#xff09;模型&#xff08;Mo…

急!现在转大模型还来得及吗?零基础入门到精通,收藏这一篇就够了

大模型的出现&#xff0c;让行内和行外大多数人都感到非常焦虑。 行外很多人想了解却感到无从下手&#xff0c;行内很多人苦于没有硬件条件无法尝试。想转大模型方向&#xff0c;相关的招聘虽然层出不穷&#xff0c;但一般都要求有大模型经验。而更多的人&#xff0c;则一直处…

黑马程序员pink前端查漏补缺笔记,耗时6天,针对必要案例进行练习

HTML 1&#xff09;插件 自动闭合标签&#xff0c;修改开标签时闭标签跟着变&#xff08;微信开发者工具没有这个功能&#xff09; 主题 保存格式化 浏览器打开 实时刷新&#xff0c;不用按浏览器的刷新按钮 win←/→ 快速分屏 2&#xff09;初始结构标签 文档类型声明标签…

社交电商团购平台构建策略与用户体验设计思路

一、电商拼团系统开发思路 1、需求分析&#xff1a; 深入理解业务需求&#xff1a;与商家深入沟通&#xff0c;明确其业务目标和需求&#xff0c;包括拼团模式的具体要求&#xff08;如参与人数、时间限制、价格策略等&#xff09;、用户参与限制、商品管理、订单处理、支付流…

828华为云征文|部署敏捷项目管理系统工具 ZenTao

828华为云征文&#xff5c;部署敏捷项目管理系统工具 ZenTao 一、Flexus云服务器X实例介绍二、Flexus云服务器X实例配置2.1 重置密码2.2 服务器连接2.3 安全组配置2.4 Docker 环境搭建 三、Flexus云服务器X实例部署 ZenTao3.1 ZenTao 介绍3.2 ZenTao 部署3.3 ZenTao 使用 四、总…

P1303 A*B Problem Python题解

A*B Problem 题目背景 高精度乘法模板题。 题目描述 给出两个非负整数&#xff0c;求它们的乘积。 输入格式 输入共两行&#xff0c;每行一个非负整数。 输出格式 输出一个非负整数表示乘积。 样例 #1 样例输入 #1 1 2样例输出 #1 2提示 每个非负整数不超过 1 0…

万字面试题大模型面试,最全八股和答案

自ChatGPT开启大模型时代以来&#xff0c;大模型正迎来飞速发展&#xff0c;现在从事大模型开发相关工作可谓是处在时代的风口。那么大模型面试需要哪些技能和技巧呢&#xff0c;本文详细整理了全套的面试问题及答案&#xff0c;希望对大家有所帮助&#xff01; 目录 大模型&a…

平衡二叉搜索树插入的实现

前言 因为二叉搜索树在插入的时候最坏的情况可能会变成一条单一链表&#xff0c;从而使查找或者插入的时候消耗大量的时间。所以为了解决这一情况诞生了平衡二叉搜索树&#xff0c;其作用是为了减少二叉搜索树的整体高度&#xff0c;从而使查找插入删除的效率提高。 一、平衡二…