【Phoenix】phoenix实现每个Primarykey主键保留N版本数据,CDC数据记录为Changelog格式

一、背景:

CDC数据中包含了,数据的变更过程。当CDC写入传统数据库最终每一个primary key下会保存一条数据。当然可以使用特殊手段保存多分记录但是显然造成了数据膨胀。
另外数据湖Hudi(0.13.1)是不支持保存所有Changelog其Compaction机制会清除所有旧版本的内容。Iceberg支持TimeTravel,能查到某个时间点的数据状态,但是不能列举的单条记录的Change过程。
所以目前只能手动实现。
其实,实现思路很简单,将原PrimaryKey+Cdc的 ts_ms 一起作为新表的 PrimaryKey就可以了。但需要注意的是一条数据可能变更很多次,但一般需要保存近几次的变更,所以就需要删除部分旧变更记录。ts_ms 就是CDC数据中记录的日志实际产生的时间,具体参见debezium 。如果原表primarykey是联合主键,即有多个字段共同组成,则最好将这些字段拼接为一个字符串,方便后续关联。

本文思路
CDC --写入-> Phoenix + 定期删除旧版本记录

CDC数据写入略过,此处使用SQL模拟写入。

二、Phoenix旧版记录删除(DEMO)

phoenix doc

bin/sqlline.py www.xx.com:2181
-- 直接创建phoenix表
create table TEST.TEST_VERSION(
ID VARCHAR NOT NULL,
TS TIMESTAMP NOT NULL,
NAME VARCHAR,
CONSTRAINT my_pk PRIMARY KEY (ID,TS)
) VERSIONS=5;

再去hbase shell中查看,hbase 关联表已经有phoenix创建了。

hbase(main):032:0> desc "TEST:TEST_VERSION"
Table TEST:TEST_VERSION is ENABLED
TEST:TEST_VERSION, {TABLE_ATTRIBUTES => {coprocessor$1 => '|org.apache.phoenix.coprocessor.ScanRegionObserver|805306366|', coprocessor$2 => '|org.apache.phoenix.coprocessor.UngroupedAggregateRe
gionObserver|805306366|', coprocessor$3 => '|org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver|805306366|', coprocessor$4 => '|org.apache.phoenix.coprocessor.ServerCachingEndpointImpl|80
5306366|', coprocessor$5 => '|org.apache.phoenix.hbase.index.Indexer|805306366|index.builder=org.apache.phoenix.index.PhoenixIndexBuilder,org.apache.hadoop.hbase.index.codec.class=org.apache.phoenix
.index.PhoenixIndexCodec', METADATA => {'OWNER' => 'dcetl'}}
COLUMN FAMILIES DESCRIPTION
{NAME => '0', VERSIONS => '5', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'FAST_DIFF', T
TL => 'FOREVER', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'NONE', CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPE
N => 'false', COMPRESSION => 'NONE', BLOCKCACHE => 'true', BLOCKSIZE => '65536'}
-- 在phoenix中向表插入数据
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 10:00:00'),'zhangsan');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 11:00:00'),'lisi');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 12:00:00'),'wangwu');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 13:00:00'),'zhaoliu');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 14:00:00'),'liuqi');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 15:00:00'),'sunba');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 07:00:00'),'sunyang');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 08:00:00'),'chaoyang');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 09:00:00'),'xuri');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 09:30:00'),'chenxi');
-- OK再查询一下数据插入情况
SELECT * FROM TEST.TEST_VERSION;

以下假设每个PrimaryKey需要保留最新的3版本数据。所以红色框内是需要删除的数据。
在这里插入图片描述

现在需要使用row_number的函数给每个primarykey的不通version数据标识。但是phoenix并没有开窗函数。只有agg聚合函数。
phoenix对SQL的限制还是比较多的如:
(1)join 非等值连接不支持,如on a.id>s.id 是不支持的,也不支持数组比较连接,如on a.id = ARRAY[1,2,3]。 会报错:Error: Does not support non-standard or non-equi correlated-subquery conditions. (state=,code=0)
(2)where exists 格式的非等值连接不支持。select ... from A where exists (select 1 from B where A.id>B.id) 是不支持的。会报错:Error: Does not support non-standard or non-equi correlated-subquery conditions. (state=,code=0)
(2)没有开窗window函数
(3)DELETE FROM不支持JOIN

最终发下有一下函数可用
(1)NTH_VALUE 获取分组排序的第N个值。 返回原值的类型。
(2)FIRST_VALUESLAST_VALUES 获取分区排序后的前、后的N个值,返回ARRAY类型。
此三个函数官网doc中,案例是这样的 FIRST_VALUES( name, 3 ) WITHIN GROUP (ORDER BY salary DESC) 是全局分组,而实际使用中是需要搭配 GROUP BY 使用的。

所以可以获取到

-- 方案一:使用NTH_VALUE获取阈值
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A 
INNER JOIN (
SELECT ID,NTH_VALUE(TS,3) WITHIN GROUP (ORDER BY TS DESC) THRES FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < Z.THRES-- 方案二:使用FIRST_VALUES获取到一个ARRAY 
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A 
INNER JOIN (
SELECT ID,FIRST_VALUES(TS,3) WITHIN GROUP (ORDER BY TS DESC) TSS FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < ALL(Z.TSS);

由于phoenix支持行子查询,以下是官方案例。这样就能绕过不使用DELETE … JOIN了。

Row subqueries
A subquery can return multiple fields in one row, which is considered returning a row constructor. The row constructor on both sides of the operator (IN/NOT IN, EXISTS/NOT EXISTS or comparison operator) must contain the same number of values, like in the below example:
SELECT column1, column2
FROM t1
WHERE (column1, column2) IN(SELECT column3, column4FROM t2WHERE column5 = ‘nowhere’);
This query returns all pairs of (column1, column2) that can match any pair of (column3, column4) in the second table after being filtered by condition: column5 = ‘nowhere’.

最终实现删除 除N个较新的以外的所有旧版本数据, SQL如下:

-- NTH_VALUE方式
DELETE FROM TEST.TEST_VERSION
WHERE (ID,TS) IN (
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A 
INNER JOIN (
SELECT ID,NTH_VALUE(TS,3) WITHIN GROUP (ORDER BY TS DESC) THRES FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < Z.THRES
);-- FIRST_VALUES方式
DELETE FROM TEST.TEST_VERSION
WHERE (ID,TS) IN (
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A 
INNER JOIN (
SELECT ID,FIRST_VALUES(TS,3) WITHIN GROUP (ORDER BY TS DESC) TSS FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < ALL(Z.TSS)
);

删除后效果:
在这里插入图片描述

参考文章:
Phoenix实践 —— Phoenix SQL常用基本语法总结小记
Phoenix 对 Hbase 中表的映射
phoenix使用详解
Phoenix 简介及使用方式
phoenix创建映射表和创建索引、删除索引、重建索引

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/140460.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

【操作系统笔记一】程序运行机制CPU指令集

内存地址 指针 / 引用 指针、引用本质上就是内存地址&#xff0c;有了内存地址就可以操作对应的内存数据了。 不同的数据类型 字节序 大端序&#xff08;Big Endian&#xff09;&#xff1a;字节顺序从低地址到高地址顺序存储的字节序小端序&#xff08;Little Endian&#…

【MySQL】 MySQL索引事务

文章目录 &#x1f6eb;索引&#x1f38d;索引的概念&#x1f333;索引的作用&#x1f384;索引的使用场景&#x1f340;索引的使用&#x1f4cc;查看索引&#x1f4cc;创建索引&#x1f332;删除索引 &#x1f334;索引保存的数据结构&#x1f388;B树&#x1f388;B树&#x…

10.5 串联型稳压电路(1)

稳压管稳压电路输出电流较小&#xff0c;输出电压不可调&#xff0c;不能满足很多场合下的应用。串联型稳压电路以稳压管稳压电路为基础&#xff0c;利用晶体管的电流放大作用&#xff0c;增大负载电流&#xff1b;在电路中引入深度电压负反馈使输出电压稳定&#xff1b;并且&a…

RabbitMQ快速入门——消费者

public class Consumer_HelloWorld {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory new ConnectionFactory();//2.设置参数factory.setHost("172.16.98.133"); ip 默认值 localhostfac…

【论文写作】符号:矩阵、向量的乘法、内积、点积等

【论文写作】符号&#xff1a;矩阵、向量乘法、内积、点积等 文章目录 【论文写作】符号&#xff1a;矩阵、向量乘法、内积、点积等1. 矩阵乘法1.1 矩阵乘积1.2 矩阵哈德玛乘积1.3 矩阵克罗内克积 2. 向量乘法2.1 向量点积、内积2.2 向量Hadamard积2.3 向量外积2.4 向量叉积 1.…

预训练相关知识

1、上下文无关语义表示方式存在问题 语义不同的词具有相同的表示&#xff0c;&#xff08;apple 电子产品苹果/水果苹果&#xff09; 容易出现oov问题 2、神经语言编码器 2.1、序列模型 cnn/rnn等&#xff0c;捕获局部信息和序列依赖信息&#xff0c;无法捕获长距离依赖。易训…

罗德里格斯公式

1.点乘 A ⃗ ⋅ B ⃗ ∣ A ⃗ ∣ ∣ B ⃗ ∣ c o s ⟨ A ⃗ , B ⃗ ⟩ \vec{A} \cdot \vec{B} \left | \vec{A} \right | \left | \vec{B} \right | cos\left \langle \vec{A}, \vec{B} \right \rangle A ⋅B ​A ​ ​B ​cos⟨A ,B ⟩ 对应几何意义&#xff1a;向量 A ⃗…

STL-常用容器

string容器 string构造函数 string本质&#xff1a;类 string和char*区别&#xff1a; char* 是一个指针 string是一个类&#xff0c;类内部封装了char*&#xff0c;管理这个字符串&#xff0c;是一个char*型的容器。 特点&#xff1a; string类内部封装了很多成员方法 …

Android 12,调用系统库libft2.so 遇到的各种问题记录

问题前提,Android 12系统,vendor静态库中调用 libft2.so。(vendor静态库中调用libft2.so会简单点,没这么麻烦) 【问题1】 (native:vendor) can not link against libft2 (native:platform) 本地debug尝试修改: 为了本地环境debug调试方便,我找了个 mk文件,在里面添加了…

Centos安装显卡

1、安装基础环境 yum -y install epel-release yum -y install gcc kernel-devel kernel-headers 2.对应内核版本 yum info kernel-devel kernel-headers Cat /proc/version 3、yum安装版本不对应。则去官网手动下载 离线安装对应的rpm&#xff1a; https://pkgs.org/dow…

RPC框架学习

一、设计目标 RPC 框架的目标就是让远程服务调用更加简单、透明&#xff0c;RPC 框架负责屏蔽底层的传输方式&#xff08;TCP 或者 UDP&#xff09;、序列化方式&#xff08;XML/Json/ 二进制&#xff09;和通信细节。服务调用者可以像调用本地接口一样调用远程的服务提供者&a…

eCharts实现漏斗图

需求&#xff1a;像漏斗那样进行层层筛选&#xff0c;进行一个可直接看见过程的图表 效果&#xff1a; 代码&#xff1a; option { //标题title: {text: Funnel,left: left,top: bottom}, //刷新加下载toolbox: {orient: vertical,top: center,feature: {restore: {},saveA…

Spring Cloud版本选择

SpringCloud版本号由来 SpringCloud的版本号是根据英国伦敦地铁站的名字进行命名的&#xff0c;由地铁站名称字母A-Z依次类推表示发布迭代版本。 SpringCloud和SpringBoot版本对应关系 注意事项&#xff1a; 其实SpringBoot与SpringCloud需要版本对应&#xff0c;否则可能会造…

资料分析笔记

统计术语 现期&#xff1a;现在的时间 基期&#xff1a;之前的时间 现期量 基期量 增长量&#xff08;有正负&#xff09; 增长率 【增幅、增速、r】&#xff08;有正负&#xff09; 同比&#xff1a;例&#xff1a;2014年5月 和 2013年5月 环比&#xff1a;例&#xff1a;20…

《算法竞赛·快冲300题》每日一题:“矩阵”

《算法竞赛快冲300题》将于2024年出版&#xff0c;是《算法竞赛》的辅助练习册。 所有题目放在自建的OJ New Online Judge。 用C/C、Java、Python三种语言给出代码&#xff0c;以中低档题为主&#xff0c;适合入门、进阶。 文章目录 题目描述题解C代码Java代码Python代码 “ 质…

YOLOv5如何训练自己的数据集

文章目录 前言1、数据标注说明2、定义自己模型文件3、训练模型4、参考文献 前言 本文主要介绍如何利用YOLOv5训练自己的数据集 1、数据标注说明 以生活垃圾数据集为例子 生活垃圾数据集&#xff08;YOLO版&#xff09;点击这里直接下载本文生活垃圾数据集 生活垃圾数据集组成&…

005:vue2使用vue-type-writer实现打字机效果

Vue Type Writer是一个Vue.js 2打字机效果组件&#xff0c;支持像打字机一样模仿键入文本。 文章目录 1. 效果2. 安装使用 1. 效果 2. 安装使用 npm 安装 npm install vue-type-writer --save完整代码 <template><div class"app-container home"><…

面向使用者的git与gerrit相关笔记

git与gerrit相关笔记 前言一、gerrit是什么&#xff1f;二、一些配置1.先配置全局email 和name2.gerrit配置ssh key3.可能遇到的问题 三、提交代码和合并冲突常用Git命令三件套严格的要求 总结 前言 本文是介绍什么是gerrit和工作中git与gerrit相关的命令来避免一些提交代码的…

JavaScript中的代理对象(proxy)

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 创建代理对象⭐ 使用代理对象⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 欢迎来到前端入门之旅&#xff01;感兴趣的可以订阅本专栏哦&#xff01;这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友…

负载均衡 —— SpringCloud Netflix Ribbon

Ribbon 简介 Ribbon 是 Netfix 客户端的负载均衡器&#xff0c;可对 HTTP 和 TCP 客户端的行为进行控制。为 Ribbon 配置服务提供者地址后&#xff0c;Ribbon 就可以基于某种负载均衡算法自动帮助服务消费者去请求。Ribbon 默认提供了很多负载均衡算法&#xff0c;例如轮询、随…