【技术解析】Dolphinscheduler实现MapReduce任务的高效管理

MapReduce是一种编程模型,用于处理和生成大数据集,主要用于大规模数据集(TB级数据规模)的并行运算。本文详细介绍了Dolphinscheduler在MapReduce任务中的应用,包括GenericOptionsParser与args的区别、hadoop jar命令参数的完整解释、MapReduce实例代码,以及如何在Dolphinscheduler中配置和运行MapReduce任务。

GenericOptionsParser vs args区别

GenericOptionsParser 如下:

GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
String[] remainingArgs = optionParser.getRemainingArgs();

查看 GenericOptionsParser 源码做了什么?

1、构造方法
public GenericOptionsParser(Configuration conf, String[] args) throws IOException {this(conf, new Options(), args); 
}2、点击 this
public GenericOptionsParser(Configuration conf,Options options, String[] args) throws IOException {this.conf = conf;parseSuccessful = parseGeneralOptions(options, args);
}3、查看 parseGeneralOptions
private boolean parseGeneralOptions(Options opts, String[] args)throws IOException {opts = buildGeneralOptions(opts);CommandLineParser parser = new GnuParser();boolean parsed = false;try {commandLine = parser.parse(opts, preProcessForWindows(args), true);processGeneralOptions(commandLine);parsed = true;} catch(ParseException e) {LOG.warn("options parsing failed: "+e.getMessage());HelpFormatter formatter = new HelpFormatter();formatter.printHelp("general options are: ", opts);}return parsed;
}4、看 GnuParser 
package org.apache.commons.cli;import java.util.ArrayList;
import java.util.List;@Deprecated
public class GnuParser extends Parser {
.......
}org.apache.commons.cli Parser,是不是有点熟悉?对,请参考 https://segmentfault.com/a/1190000045394541 这篇文章吧5、看 processGeneralOptions 方法
private void processGeneralOptions(CommandLine line) throws IOException {if (line.hasOption("fs")) {FileSystem.setDefaultUri(conf, line.getOptionValue("fs"));}if (line.hasOption("jt")) {String optionValue = line.getOptionValue("jt");if (optionValue.equalsIgnoreCase("local")) {conf.set("mapreduce.framework.name", optionValue);}conf.set("yarn.resourcemanager.address", optionValue, "from -jt command line option");}if (line.hasOption("conf")) {String[] values = line.getOptionValues("conf");for(String value : values) {conf.addResource(new Path(value));}}if (line.hasOption('D')) {String[] property = line.getOptionValues('D');for(String prop : property) {String[] keyval = prop.split("=", 2);if (keyval.length == 2) {conf.set(keyval[0], keyval[1], "from command line");}}}if (line.hasOption("libjars")) {// for libjars, we allow expansion of wildcardsconf.set("tmpjars",validateFiles(line.getOptionValue("libjars"), true),"from -libjars command line option");//setting libjars in client classpathURL[] libjars = getLibJars(conf);if(libjars!=null && libjars.length>0) {conf.setClassLoader(new URLClassLoader(libjars, conf.getClassLoader()));Thread.currentThread().setContextClassLoader(new URLClassLoader(libjars, Thread.currentThread().getContextClassLoader()));}}if (line.hasOption("files")) {conf.set("tmpfiles", validateFiles(line.getOptionValue("files")),"from -files command line option");}if (line.hasOption("archives")) {conf.set("tmparchives", validateFiles(line.getOptionValue("archives")),"from -archives command line option");}conf.setBoolean("mapreduce.client.genericoptionsparser.used", true);// tokensFileif(line.hasOption("tokenCacheFile")) {String fileName = line.getOptionValue("tokenCacheFile");// check if the local file existsFileSystem localFs = FileSystem.getLocal(conf);Path p = localFs.makeQualified(new Path(fileName));localFs.getFileStatus(p);if(LOG.isDebugEnabled()) {LOG.debug("setting conf tokensFile: " + fileName);}UserGroupInformation.getCurrentUser().addCredentials(Credentials.readTokenStorageFile(p, conf));conf.set("mapreduce.job.credentials.binary", p.toString(),"from -tokenCacheFile command line option");}
}原理是把 fs、jt、D、libjars、files、archives、tokenCacheFile 相关参数放入到 Hadoop的 Configuration中了,终于清楚 GenericOptionsParser是干什么的了

args呢?如果要使用args,以上这种 fs、jt、D、libjars、files、archives、tokenCacheFile 是需要自己解析的。

Hadoop jar完整参数解释

hadoop jar wordcount.jar org.myorg.WordCount \-fs hdfs://namenode.example.com:8020 \-jt resourcemanager.example.com:8032 \-D mapreduce.job.queuename=default \-libjars /path/to/dependency1.jar,/path/to/dependency2.jar \-files /path/to/file1.txt,/path/to/file2.txt \-archives /path/to/archive1.zip,/path/to/archive2.tar.gz \-tokenCacheFile /path/to/credential.file \/input /output

这条命令会:

  1. 将作业提交到 hdfs://namenode.example.com:8020 文件系统
  2. 使用 resourcemanager.example.com:8032 作为 YARN ResourceManager
  3. 提交到 default 队列
  4. 使用 /path/to/dependency1.jar 和 /path/to/dependency2.jar 作为依赖
  5. 分发本地文件 /path/to/file1.txt 和 /path/to/file2.txt,注意 : 是本地文件哦
  6. 解压并分发 /path/to/archive1.zip 和 /path/to/archive2.tar.gz
  7. 分发凭证文件 /path/to/credential.file

    MR实例

    WordCount经典示例

public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {private IntWritable one = new IntWritable(1);private Text word = new Text();@Overrideprotected void map(LongWritable key,Text value,Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {String[] fields = value.toString().split("\\s+");for (String field : fields) {word.set(field);context.write(word, one);}}
}public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();@Overrideprotected void reduce(Text key,Iterable<IntWritable> values,Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}
}public class WCJob {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();// TODO 如果要是本地访问远程的hdfs,需要指定hdfs的根路径,否则只能访问本地的文件系统
//        conf.set("fs.defaultFS", "hdfs://xx.xx.xx.xx:8020");GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);String[] remainingArgs = optionParser.getRemainingArgs();for (String arg : args) {System.out.println("arg :" + arg);}for (String remainingArg : remainingArgs) {System.out.println("remainingArg :" + remainingArg);}if (remainingArgs.length < 2) {throw new RuntimeException("input and output path must set.");}Path outputPath = new Path(remainingArgs[1]);FileSystem fileSystem = FileSystem.get(conf);boolean exists = fileSystem.exists(outputPath);// 如果目标目录存在,则删除if (exists) {fileSystem.delete(outputPath, true);}Job job = Job.getInstance(conf, "MRWordCount");job.setJarByClass(WCJob.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);job.setMapperClass(WCMapper.class);job.setReducerClass(WCReducer.class);FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

文件分发

public class ConfigMapper extends Mapper<LongWritable, Text, Text, NullWritable> {private List<String> whiteList = new ArrayList<>();private Text text = new Text();@Overrideprotected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {// 获取作业提交时传递的文件URI[] files = context.getCacheFiles();if (files != null && files.length > 0) {// 读取文件内容File configFile = new File("white.txt"); // 文件名要与传递的文件名保持一致try (BufferedReader reader = new BufferedReader(new FileReader(configFile))){String line = null;while ((line = reader.readLine()) != null) {whiteList.add(line);}}}}@Overrideprotected void map(LongWritable key,Text value,Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {String line = value.toString();String[] datas = line.split("\\s+");List<String> whiteDatas = Arrays.stream(datas).filter(data -> whiteList.contains(data)).collect(Collectors.toList());for (String data : whiteDatas) {text.set(data);context.write(text , NullWritable.get());}}
}public class ConfigJob {public static void main(String[] args) throws Exception {// 设置用户名System.setProperty("HADOOP_USER_NAME", "root");Configuration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://xx.xx.xx.xx:8020");GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);String[] remainingArgs = optionParser.getRemainingArgs();if (remainingArgs.length < 2) {throw new RuntimeException("input and output path must set.");}Path outputPath = new Path(remainingArgs[1]);FileSystem fileSystem = FileSystem.get(conf);boolean exists = fileSystem.exists(outputPath);// 如果目标目录存在,则删除if (exists) {fileSystem.delete(outputPath, true);}Job job = Job.getInstance(conf, "MRConfig");job.setJarByClass(ConfigJob.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);job.setMapperClass(ConfigMapper.class);FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

Dolphinscheduler MR使用

Yarn test队列设置

YARN 的配置目录中找到 capacity-scheduler.xml 文件。通常位于 $HADOOP_HOME/etc/hadoop/ 目录下。

修改 capacity-scheduler.xml

<property><name>yarn.scheduler.capacity.root.queues</name><value>default, test</value>
</property><property><name>yarn.scheduler.capacity.root.test.capacity</name><value>30</value>
</property><property><name>yarn.scheduler.capacity.root.test.maximum-capacity</name><value>50</value>
</property><property><name>yarn.scheduler.capacity.root.test.user-limit-factor</name><value>1</value>
</property>

刷新队列配置 yarn rmadmin -refreshQueues

流程定义设置

file

执行结果

file

离线任务实例

file

YARN作业展示

file

源码分析

org.apache.dolphinscheduler.plugin.task.mr.MapReduceArgsUtils#buildArgs

String others = param.getOthers();
// TODO 这里其实就是想说,没有通过 -D mapreduce.job.queuename 形式指定队列,是用页面上直接指定队列名称的,页面上 Yarn队列 输入框
if (StringUtils.isEmpty(others) || !others.contains(MR_YARN_QUEUE)) {String yarnQueue = param.getYarnQueue();if (StringUtils.isNotEmpty(yarnQueue)) {args.add(String.format("%s%s=%s", D, MR_YARN_QUEUE, yarnQueue));}
}// TODO 这里就是页面上,选项参数 输入框
// -conf -archives -files -libjars -D
if (StringUtils.isNotEmpty(others)) {args.add(others);
}

转载自Journey 原文链接:https://segmentfault.com/a/1190000045403915

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/18853.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

Linux :进程间通信之管道

一、进程间通信 1.1 是什么和为什么 1、进程间通信是什么&#xff1f;&#xff1f; ——>两个或多个进程实现数据层面的交互&#xff0c;但是由于进程独立性的存在&#xff0c;导致通信的成本比较高。 2、既然通信成本高&#xff0c;那为什么还要通信呢&#xff1f;&…

Vue基础(2)_el和data的两种写法

举例&#xff1a; <div id"root"><h1>你好&#xff0c;{{name}}</h1> </div> el和data的2种写法 1.el有2种写法 (1).new Vue时候配置el属性。 // 第一种写法&#xff1a;new Vue时候配置el属性。// 优点&#xff1a;简单、直接new Vue({e…

【汇编语言】数据处理的两个基本问题(二) —— 解密汇编语言:数据长度与寻址方式的综合应用

文章目录 前言1. 指令要处理的数据有多长&#xff1f;1.1 通过寄存器指明数据的尺寸1.1.1 字操作1.1.2 字节操作 1.2 用操作符X ptr指明内存单元的长度1.2.1 访问字单元1.2.2 访问字节单元1.2.3 为什么要用操作符X ptr指明 1.3 其他方法 2. 寻址方式的综合应用2.1 问题背景&…

c++多态(深度刨析)

C系列-----多态 文章目录 C系列-----多态前言一、多态的概念二、多态的定义及实现2.1、多态构成的条件2.1.1、虚函数2.1.2、虚函数的重写 2.2、C11 override 和 final2.3、重载、覆盖(重写)、隐藏(重定义)的对比2.4、抽象类2.5、 接口继承和实现继承 三、多态的原理3.1、虚函数…

FPGA开发技能(9)快速生成约束XDC文件

文章目录 1.从Cadence导出csv约束文件2.python程序将csv导出为xdc文件。3.python生成exe4.exe使用注意事项5.传送门 前言&#xff1a; 作为一名FPGA工程师&#xff0c;通常公司会对该岗位的人有一定的硬件能力的要求&#xff0c;最基础的就是需要依据原理图的设计进行FPGA工程内…

css uniapp背景图宽度固定高度自适应可以重复

page {height: 100%;background-image: url(https://onlinekc.a.hlidc.cn/uploads/20241115/350f94aaf493d05625a7ddbc86c7804e.png);background-repeat: repeat;background-size: contain;} 如果不要重复 把background-repeat: repeat;替换background-repeat: no-repeat;

Stable Diffusion核心网络结构——U-Net

​ &#x1f33a;系列文章推荐&#x1f33a; 扩散模型系列文章正在持续的更新&#xff0c;更新节奏如下&#xff0c;先更新SD模型讲解&#xff0c;再更新相关的微调方法文章&#xff0c;敬请期待&#xff01;&#xff01;&#xff01;&#xff08;本文及其之前的文章均已更新&a…

学习threejs,使用AnimationMixer实现变形动画

&#x1f468;‍⚕️ 主页&#xff1a; gis分享者 &#x1f468;‍⚕️ 感谢各位大佬 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍⚕️ 收录于专栏&#xff1a;threejs gis工程师 文章目录 一、&#x1f340;前言1.1 ☘️THREE.AnimationMixer 动画…

【Linux】指令 + 重定向操作

Linux基本指令 一.Linux基本指令1.mv&#xff08;重要&#xff09;2.cat3.more和less&#xff08;重要&#xff09;4.head和tail5.date6.cal7.find&#xff08;重要&#xff09; 二.Linux相关知识点1. Linux系统中&#xff1a;一切皆文件2. 重定向操作1. 输出重定向2. 追加重定…

SpringBoot源码解析(四):解析应用参数args

SpringBoot源码系列文章 SpringBoot源码解析(一)&#xff1a;SpringApplication构造方法 SpringBoot源码解析(二)&#xff1a;引导上下文DefaultBootstrapContext SpringBoot源码解析(三)&#xff1a;启动开始阶段 SpringBoot源码解析(四)&#xff1a;解析应用参数args 目录…

Vue3.0 + Ts:动态设置style样式 ts 报错

error TS2322: Type ‘{ width: string; left: string; ‘background-color’: unknown; ‘z-index’: number; }’ is not assignable to type ‘StyleValue’ 在 vue3.0 ts 项目中&#xff0c;动态设置样式报错 在 Vue 3 TypeScript 项目中&#xff0c;当你使用 :style 绑…

跨平台WPF框架Avalonia教程 十六

SelectableTextBlock 可选文本块 SelectableTextBlock 块是一个用于显示文本的标签&#xff0c;允许选择和复制文本。它可以显示多行&#xff0c;并且可以完全控制所使用的字体。 有用的属性​ 您可能最常使用这些属性&#xff1a; 属性描述SelectionStart当前选择的起始字…

【MySQL】库的基础操作入门指南

&#x1f351;个人主页&#xff1a;Jupiter. &#x1f680; 所属专栏&#xff1a;MySQL入门指南&#xff1a;从零开始的数据库之旅 欢迎大家点赞收藏评论&#x1f60a; 目录 ☁创建数据库语法说明&#xff1a; 创建数据库案例 &#x1f308;字符集和校验规则查看系统默认字符集…

数据仓库数据湖湖仓一体解决方案

一、资料介绍 数据仓库与数据湖是现代数据管理的两大核心概念。数据仓库是结构化的数据存储仓库&#xff0c;用于支持企业的决策分析&#xff0c;其数据经过清洗、整合&#xff0c;以固定的模式存储&#xff0c;适合复杂查询。数据湖则是一个集中存储大量原始数据的存储库&…

人工智能英伟达越来越“大”的GPU

英伟达&#xff1a;让我们遇见越来越“大”的GPU 在2024年台北ComputeX大会上&#xff0c;英伟达CEO黄仁勋发表了题为《揭开新工业革命序幕》的演讲。他手持一款游戏显卡(很有可能是4090),自豪地宣称&#xff1a;“这是目前最先进的游戏GPU。”紧接着&#xff0c;他走到一台DGX…

知识库搭建:高科技行业的智慧基石与未来展望

一、引言 在科技日新月异的今天&#xff0c;知识密集型作业已成为高科技企业竞争力的核心。面对快速的技术迭代和激烈的市场竞争&#xff0c;如何高效地管理和运用知识资源&#xff0c;成为高科技企业必须面对的挑战。知识库&#xff0c;作为知识管理的核心平台&#xff0c;正…

算法编程题-删除子文件夹

算法编程题-删除子文件夹 原题描述设计思路代码实现复杂度分析 前一段时间面试字节的时候&#xff0c;被问到gin框架的路由结构。gin框架的路由结构采用的一般是前缀树来实现&#xff0c;于是被要求手写前缀树来实现路由的注册和查找。 本文以 leetcode 1233为例介绍一下前缀树…

利用SSH中的弱私钥

import paramiko import argparse import os from threading import Thread, BoundedSemaphore # 设置最大连接数 maxConnections 5 # 创建一个有界信号量&#xff0c;用于控制同时进行的连接数 connection_lock BoundedSemaphore(valuemaxConnections) # 用于控制是否停止所…

力扣整理版七:二叉树(待更新)

满二叉树&#xff1a;如果一棵二叉树只有度为0的结点和度为2的结点&#xff0c;并且度为0的结点在同一层上&#xff0c;则这棵二叉树为满二叉树。深度为k&#xff0c;有2^k-1个节点的二叉树。 完全二叉树&#xff1a;在完全二叉树中&#xff0c;除了最底层节点可能没填满外&am…

如何使用可靠UDP协议(KCP)

希望这篇文章&#xff0c;对学习和使用 KCP 协议的读者&#xff0c;有帮助。 1. KCPUDP 流程图 2. 示例代码 #include <iostream>int main() {// 代码太多&#xff0c;暂存仓库return 0; } 具体使用&#xff0c;请参考代码仓库&#xff1a;https://github.com/ChivenZha…