DAG作用
在ETL工具中,一般使用DAG图来进行任务的配置,将任务配置在有向无环图中,执行时候从首层节点,依次往下,下层节点的执行依赖于父节点是否执行完毕的状态,当最后一层的节点执行完成之后,整个DAG图执行完成。在实际使用DAG图过程中,我们一般会手动添加虚拟的单一头结点和尾结点,这样能保证DAG图的开始和结束都只有一个节点,方便判断状态。
本文会构建个简单DAG图,包含DAG并行执行,DAG判断执行完毕,获取DAG结果,来示例DAG。
节点定义
@Getter
@Setter
public class DagTaskNode implements Serializable, Runnable {private static final long serialVersionUID = 3637220767545316789L;String id; //节点idString nodeName;//节点名称DagTaskNode previews;//前置节点List<DagTaskNode> tails = new LinkedList<>();volatile boolean finished;//节点是否执行完成volatile boolean succeed;//节点是否执行成功Map<String, Object> nodeResultMap = new HashMap<>();//节点输出数据JSONObject exeJson;//节点配置数据/*** 节点的具体执行** @throws Exception*/public void executeNode() throws Exception {while (previews != null && !previews.isFinished()) {synchronized (previews) {previews.wait();}}//前置节点都执行完了,可以开始执行本节点任务了//本节点执行完了,通知下synchronized (this) {this.notifyAll();}}@Overridepublic void run() {try {executeNode();this.succeed = true;} catch (Exception e) {//一些失败的处理....} finally {this.finished = true;}}
}
- 节点包含必要的状态:完成情况,成功失败情况。
- 另外因为子节点依赖父节点的执行完成,每个结点在执行之前,都要判断父节点是否执行完成。
- 本节点执行完成需要通知其他所有等待的下级结点
DAG图的构建
定义任务的关键数据:
/*** 任务关键数据*/
@Data
public class ETLTaskInfo implements Serializable {private static final long serialVersionUID = -8965767304205724195L;String id;String name;String exeJson;//配置json
}
定义边的关键数据:
/*** 边关键数据*/
@Data
public class ETLTaskEdgeInfo implements Serializable {private static final long serialVersionUID = 293185776150464728L;String id; //边idString taskId; // from任务idString nextTaskId; //to任务id
}
构建整个DAG图过程:
List<ETLTaskInfo> taskInfoList = new LinkedList<>();//这里数据一般是读取的配置List<ETLTaskEdgeInfo> edgeInfoList = new LinkedList<>();//这里数据一般是读取的配置Collection<DagTaskNode> roots = new LinkedList<>(); //头结点们Map<String, DagTaskNode> nodeMap = new HashMap<>();//构建图关系for (ETLTaskInfo task : taskInfoList) {DagTaskNode taskNode = new DagTaskNode();taskNode.setId(task.getId());taskNode.setNodeName(task.getName());taskNode.setExeJson(JSON.parseObject(task.getExeJson()));nodeMap.put(task.getId(), taskNode);}List<DagTaskNode> dagRoots = new LinkedList<>();//构建边关系for (ETLTaskInfo task : taskInfoList) {DagTaskNode taskNode = nodeMap.get(task.getId());//前置节点设置String prevId = CollectionUtils.isEmpty(edgeInfoList) ?null :edgeInfoList.stream().filter(edge -> task.getId().equals(edge.getNextTaskId())).map(ETLTaskEdgeInfo::getTaskId).findFirst().orElse(null);if (prevId == null) {dagRoots.add(taskNode);} else {taskNode.setPreviews(nodeMap.get(prevId));}//后置节点设置List<String> tailIds = CollectionUtils.isEmpty(edgeInfoList) ?null :edgeInfoList.stream().filter(edge -> task.getId().equals(edge.getId())).map(ETLTaskEdgeInfo::getNextTaskId).collect(Collectors.toList());if (!CollectionUtils.isEmpty(tailIds)) {for (String id : tailIds) {List<DagTaskNode> tails = taskNode.getTails();tails.add(nodeMap.get(id));}}}DagExecutor.bfsWithMoreRoots(roots); //处理图任务
节点执行
public class DagExecutor {public DagExecuteResult bfsWithMoreRoots(Collection<DagTaskNode> roots) {if (CollectionUtils.isEmpty(roots)) {return null;}DagExecuteResult dagExecuteResult = new DagExecuteResult();Queue<DagTaskNode> allTask = new LinkedBlockingQueue<>(); //所有任务队列Queue<DagTaskNode> queue = new LinkedBlockingQueue<>(); //等待处理队列queue.addAll(roots);while (!queue.isEmpty()) {int size = queue.size();for (int i = 0; i < size; i++) {DagTaskNode cur = queue.poll();try {allTask.add(cur);//线程池执行节点...} catch (Exception e) {// handle error}if (CollectionUtils.isEmpty(cur.getTails())) {continue;}queue.addAll(cur.getTails());}}while (!allTask.stream().allMatch(DagTaskNode::isSucceed)) {try {Thread.sleep(5);} catch (InterruptedException e) {//处理下}}//完成了的话if (allTask.stream().allMatch(DagTaskNode::isSucceed)) {//都成功了//处理结果赋值} else {//有失败的任务//处理结果赋值}return dagExecuteResult;}
}public class DagExecuteResult {
}
- 节点的执行为了提升效率,采用线程池进行执行;
- 节点自身的执行依赖于父节点的状态,这个已经在节点执行内部进行了判断;