Java版-速通ETL工具中简单的DAG执行实现

DAG作用

在这里插入图片描述

在ETL工具中,一般使用DAG图来进行任务的配置,将任务配置在有向无环图中,执行时候从首层节点,依次往下,下层节点的执行依赖于父节点是否执行完毕的状态,当最后一层的节点执行完成之后,整个DAG图执行完成。在实际使用DAG图过程中,我们一般会手动添加虚拟的单一头结点和尾结点,这样能保证DAG图的开始和结束都只有一个节点,方便判断状态。

本文会构建个简单DAG图,包含DAG并行执行,DAG判断执行完毕,获取DAG结果,来示例DAG。

节点定义

@Getter
@Setter
public class DagTaskNode implements Serializable, Runnable {private static final long serialVersionUID = 3637220767545316789L;String id; //节点idString nodeName;//节点名称DagTaskNode previews;//前置节点List<DagTaskNode> tails = new LinkedList<>();volatile boolean finished;//节点是否执行完成volatile boolean succeed;//节点是否执行成功Map<String, Object> nodeResultMap = new HashMap<>();//节点输出数据JSONObject exeJson;//节点配置数据/*** 节点的具体执行** @throws Exception*/public void executeNode() throws Exception {while (previews != null && !previews.isFinished()) {synchronized (previews) {previews.wait();}}//前置节点都执行完了,可以开始执行本节点任务了//本节点执行完了,通知下synchronized (this) {this.notifyAll();}}@Overridepublic void run() {try {executeNode();this.succeed = true;} catch (Exception e) {//一些失败的处理....} finally {this.finished = true;}}
}
  • 节点包含必要的状态:完成情况,成功失败情况。
  • 另外因为子节点依赖父节点的执行完成,每个结点在执行之前,都要判断父节点是否执行完成。
  • 本节点执行完成需要通知其他所有等待的下级结点

DAG图的构建

定义任务的关键数据:

/*** 任务关键数据*/
@Data
public class ETLTaskInfo implements Serializable {private static final long serialVersionUID = -8965767304205724195L;String id;String name;String exeJson;//配置json
}

定义边的关键数据:

/*** 边关键数据*/
@Data
public class ETLTaskEdgeInfo implements Serializable {private static final long serialVersionUID = 293185776150464728L;String id; //边idString taskId; // from任务idString nextTaskId; //to任务id
}

构建整个DAG图过程:

List<ETLTaskInfo> taskInfoList = new LinkedList<>();//这里数据一般是读取的配置List<ETLTaskEdgeInfo> edgeInfoList = new LinkedList<>();//这里数据一般是读取的配置Collection<DagTaskNode> roots = new LinkedList<>(); //头结点们Map<String, DagTaskNode> nodeMap = new HashMap<>();//构建图关系for (ETLTaskInfo task : taskInfoList) {DagTaskNode taskNode = new DagTaskNode();taskNode.setId(task.getId());taskNode.setNodeName(task.getName());taskNode.setExeJson(JSON.parseObject(task.getExeJson()));nodeMap.put(task.getId(), taskNode);}List<DagTaskNode> dagRoots = new LinkedList<>();//构建边关系for (ETLTaskInfo task : taskInfoList) {DagTaskNode taskNode = nodeMap.get(task.getId());//前置节点设置String prevId = CollectionUtils.isEmpty(edgeInfoList) ?null :edgeInfoList.stream().filter(edge -> task.getId().equals(edge.getNextTaskId())).map(ETLTaskEdgeInfo::getTaskId).findFirst().orElse(null);if (prevId == null) {dagRoots.add(taskNode);} else {taskNode.setPreviews(nodeMap.get(prevId));}//后置节点设置List<String> tailIds = CollectionUtils.isEmpty(edgeInfoList) ?null :edgeInfoList.stream().filter(edge -> task.getId().equals(edge.getId())).map(ETLTaskEdgeInfo::getNextTaskId).collect(Collectors.toList());if (!CollectionUtils.isEmpty(tailIds)) {for (String id : tailIds) {List<DagTaskNode> tails = taskNode.getTails();tails.add(nodeMap.get(id));}}}DagExecutor.bfsWithMoreRoots(roots); //处理图任务

节点执行

public class DagExecutor {public DagExecuteResult bfsWithMoreRoots(Collection<DagTaskNode> roots) {if (CollectionUtils.isEmpty(roots)) {return null;}DagExecuteResult dagExecuteResult = new DagExecuteResult();Queue<DagTaskNode> allTask = new LinkedBlockingQueue<>(); //所有任务队列Queue<DagTaskNode> queue = new LinkedBlockingQueue<>(); //等待处理队列queue.addAll(roots);while (!queue.isEmpty()) {int size = queue.size();for (int i = 0; i < size; i++) {DagTaskNode cur = queue.poll();try {allTask.add(cur);//线程池执行节点...} catch (Exception e) {// handle error}if (CollectionUtils.isEmpty(cur.getTails())) {continue;}queue.addAll(cur.getTails());}}while (!allTask.stream().allMatch(DagTaskNode::isSucceed)) {try {Thread.sleep(5);} catch (InterruptedException e) {//处理下}}//完成了的话if (allTask.stream().allMatch(DagTaskNode::isSucceed)) {//都成功了//处理结果赋值} else {//有失败的任务//处理结果赋值}return dagExecuteResult;}
}public class DagExecuteResult {
}
  • 节点的执行为了提升效率,采用线程池进行执行;
  • 节点自身的执行依赖于父节点的状态,这个已经在节点执行内部进行了判断;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/33756.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

Web安全深度剖析

1.Web安全简介 ​ 攻击者想要对计算机进行渗透&#xff0c;有一个条件是必须的&#xff0c;就是攻击者的计算机与服务器必须能够正常通信&#xff0c;服务器与客户端进行通信依靠的就是端口。 ​ 如今的web应该称之为web应用程序&#xff0c;功能强大&#xff0c;离不开四个要…

策略模式的理解和实践

在软件开发中&#xff0c;我们经常遇到需要在不同算法之间进行选择的情况。这些算法可能实现相同的功能&#xff0c;但使用不同的方法或逻辑。为了增强代码的可维护性和可扩展性&#xff0c;我们可以使用设计模式来优化这些算法的实现和管理。策略模式&#xff08;Strategy Pat…

在 Linux 环境下搭建 OpenLab Web 网站并实现 HTTPS 和访问控制

实验要求 综合练习&#xff1a;请给openlab搭建web网站 ​ 网站需求&#xff1a; ​ 1.基于域名[www.openlab.com](http://www.openlab.com)可以访问网站内容为 welcome to openlab!!! ​ 2.给该公司创建三个子界面分别显示学生信息&#xff0c;教学资料和缴费网站&#xff0c…

Java开发利器:IDEA的安装与使用(下)

文章目录 8. 快捷键的使用8.1 常用快捷键8.2 查看快捷键8.3 自定义快捷键8.4 使用其它平台快捷键 9. IDEA断点调试(Debug)9.1 为什么需要Debug9.2 Debug的步骤9.3 多种Debug情况介绍9.3.1 行断点9.3.2 方法断点9.3.3 字段断点9.3.4 条件断点9.3.5 异常断点9.3.6 线程调试9.3.7 …

非对称任意进制转换器(安卓)

除了正常进制转换&#xff0c;还可以输入、输出使用不同的数字符号&#xff0c;达成对数值进行加密的效果 点我下载APK安装包 使用unity开发。新建一个c#代码文件&#xff0c;把代码覆盖进去&#xff0c;再把代码文件添加给main camera即可。 using System.Collections; usin…

神经网络入门实战:(十四)pytorch 官网内置的 CIFAR10 数据集,及其网络模型

(一) pytorch 官网内置的网络模型 图像处理&#xff1a; Models and pre-trained weights — Torchvision 0.20 documentation (二) CIFAR10数据集的分类网络模型&#xff08;仅前向传播&#xff09;&#xff1a; 下方的网络模型图片有误&#xff0c;已做修改&#xff0c;具…

linux 系列服务器 高并发下ulimit优化文档

系统输入 ulimit -a 结果如下 解除 Linux 系统的最大进程数 要解除或提高 Linux 系统的最大进程数&#xff0c;可以修改 ulimit 设置和 /etc/security/limits.conf 文件中的限制。 临时修改 ulimit 设置 可以使用 ulimit 命令来查看和修改当前会话的最大进程数&#xff1a; 查…

Elasticsearch数据迁移(快照)

1. 数据条件 一台原始es服务器&#xff08;192.168.xx.xx&#xff09;&#xff0c;数据迁移后的目标服务器&#xff08;10.2.xx.xx&#xff09;。 2台服务器所处环境&#xff1a; centos7操作系统&#xff0c; elasticsearch-7.3.0。 2. 为原始es服务器数据创建快照 修改elas…

基于 SpringBoot 构建校园失物招领智能平台:优化校园失物处理流程

4系统设计 4.1系统概要设计 本文通过B/S结构(Browser/Server,浏览器/服务器结构)开发的该校园失物招领系统&#xff0c;B/S结构的优点很多&#xff0c;例如&#xff1a;开发容易、强的共享性、便于维护等&#xff0c;只要有网络&#xff0c;用户可以随时随地进行使用。 系统工作…

图解SSL/TLS 建立加密通道的过程

众所周知&#xff0c;HTTPS 是 HTTP 安全版&#xff0c;HTTP 的数据以明文形式传输&#xff0c;而 HTTPS 使用 SSL/TLS 协议对数据进行加密&#xff0c;确保数据在传输过程中的安全。 那么&#xff0c;HTTPS 是如何做到数据加密的呢&#xff1f;这就需要了解 SSL/TLS 协议了。 …

HTTP协议图--HTTP 工作过程

HTTP请求响应模型 HTTP通信机制是在一次完整的 HTTP 通信过程中&#xff0c;客户端与服务器之间将完成下列7个步骤&#xff1a; 建立 TCP 连接 在HTTP工作开始之前&#xff0c;客户端首先要通过网络与服务器建立连接&#xff0c;该连接是通过 TCP 来完成的&#xff0c;该协议…

BurpSuite工具-Proxy代理用法(抓包、改包、放包)

一、Burp Suite 项目管理 二、Proxy&#xff08;代理抓包模块&#xff09; 1. 简要说明 1.1. Intercept&#xff08;拦截&#xff09; 1.2. HTTP History&#xff08;HTTP 历史&#xff09; 1.3. WebSockets History&#xff08;WebSocket 历史&#xff09; 1.4. Options…

前端测试框架 jasmine 的使用

最近的项目在使用AngulaJs,对JS代码的测试问题就摆在了面前。通过对比我们选择了 Karma jasmine ,使用 Jasmine做单元测试 &#xff0c;Karma 自动化完成&#xff0c;当然了如果使用 Karma jasmine 前提是必须安装 Nodejs。 安装好 Nodejs &#xff0c;使用 npm 安装好必要…

Blender均匀放缩模型

解决办法&#xff1a; 首先选中模型&#xff0c;按下“s”键&#xff0c;如下图所示&#xff0c;此时模型根据鼠标的移动放缩 或者在按下“s”后输入数值&#xff0c;再按回车键Enter&#xff0c;模型会根据你该数值进行均匀放缩 指定放大2倍结果——

TCP/IP 协议图--计算机网络体系结构分层

计算机网络体系结构分层 计算机网络体系结构分层 不难看出&#xff0c;TCP/IP 与 OSI 在分层模块上稍有区别。OSI 参考模型注重“通信协议必要的功能是什么”&#xff0c;而 TCP/IP 则更强调“在计算机上实现协议应该开发哪种程序”

hive 行转列

行转列的常规做法是&#xff0c;group bysum(if())【或count(if())】 建表: CREATE TABLE table2 (year INT,month INT,amount DOUBLE );INSERT INTO table2 (year, month, amount) VALUES(1991, 2, 1.2),(1991, 3, 1.3),(1991, 4, 1.4),(1992, 1, 2.1),(1992, 2, 2.2),(1992…

5G Multicast/Broadcast Services(MBS)相关的Other SI都有哪些?

系统消息分为Minimum SI 和other SI&#xff0c;其中Minimum SI 包括MIB和SIB1&#xff0c;Minimum SI包含初始访问所需的基本信息和获取任何其他 SI 的信息。 而随着3GPP引入的技术越来越多&#xff0c;例如sidelink&#xff0c;NTN&#xff0c;MBS broadcast/multicast以及A…

6. 一分钟读懂“抽象工厂模式”

6.1 模式介绍 书接上文&#xff0c;工厂方法模式只能搞定单一产品族&#xff0c;遇到需要生产多个产品族时就歇菜了。于是&#xff0c;在需求的“花式鞭策”下&#xff0c;程序员们再次绷紧脑细胞&#xff0c;创造出了更强大的抽象工厂模式&#xff0c;让工厂一次性打包多个产品…

Ignis如何将Tokenization解决方案应用于RWA和实体经济

随着区块链技术的发展&#xff0c;代币化&#xff08;Tokenization&#xff09;逐渐成为连接数字经济与实体经济的重要桥梁。尤其是RWA&#xff08;真实世界资产&#xff09;的概念&#xff0c;近年来成为金融行业的热议话题。Ignis作为Jelurida公司推出的公链平台&#xff0c;…

sql删除冗余数据

工作或面试中经常能遇见一种场景题&#xff1a;删除冗余的数据&#xff0c;以下是举例介绍相应的解决办法。 举例&#xff1a; 表结构&#xff1a; 解法1&#xff1a;子查询 获取相同数据中id更小的数据项&#xff0c;再将id不属于其中的数据删除。-- 注意&#xff1a;mysql中…