MapReduce是一种编程模型,用于处理和生成大数据集,主要用于大规模数据集(TB级数据规模)的并行运算。本文详细介绍了Dolphinscheduler在MapReduce任务中的应用,包括GenericOptionsParser与args的区别、hadoop jar命令参数的完整解释、MapReduce实例代码,以及如何在Dolphinscheduler中配置和运行MapReduce任务。
GenericOptionsParser vs args区别
GenericOptionsParser 如下:
GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
String[] remainingArgs = optionParser.getRemainingArgs();
查看 GenericOptionsParser 源码做了什么?
1、构造方法
public GenericOptionsParser(Configuration conf, String[] args) throws IOException {this(conf, new Options(), args);
}2、点击 this
public GenericOptionsParser(Configuration conf,Options options, String[] args) throws IOException {this.conf = conf;parseSuccessful = parseGeneralOptions(options, args);
}3、查看 parseGeneralOptions
private boolean parseGeneralOptions(Options opts, String[] args)throws IOException {opts = buildGeneralOptions(opts);CommandLineParser parser = new GnuParser();boolean parsed = false;try {commandLine = parser.parse(opts, preProcessForWindows(args), true);processGeneralOptions(commandLine);parsed = true;} catch(ParseException e) {LOG.warn("options parsing failed: "+e.getMessage());HelpFormatter formatter = new HelpFormatter();formatter.printHelp("general options are: ", opts);}return parsed;
}4、看 GnuParser
package org.apache.commons.cli;import java.util.ArrayList;
import java.util.List;@Deprecated
public class GnuParser extends Parser {
.......
}org.apache.commons.cli Parser,是不是有点熟悉?对,请参考 https://segmentfault.com/a/1190000045394541 这篇文章吧5、看 processGeneralOptions 方法
private void processGeneralOptions(CommandLine line) throws IOException {if (line.hasOption("fs")) {FileSystem.setDefaultUri(conf, line.getOptionValue("fs"));}if (line.hasOption("jt")) {String optionValue = line.getOptionValue("jt");if (optionValue.equalsIgnoreCase("local")) {conf.set("mapreduce.framework.name", optionValue);}conf.set("yarn.resourcemanager.address", optionValue, "from -jt command line option");}if (line.hasOption("conf")) {String[] values = line.getOptionValues("conf");for(String value : values) {conf.addResource(new Path(value));}}if (line.hasOption('D')) {String[] property = line.getOptionValues('D');for(String prop : property) {String[] keyval = prop.split("=", 2);if (keyval.length == 2) {conf.set(keyval[0], keyval[1], "from command line");}}}if (line.hasOption("libjars")) {// for libjars, we allow expansion of wildcardsconf.set("tmpjars",validateFiles(line.getOptionValue("libjars"), true),"from -libjars command line option");//setting libjars in client classpathURL[] libjars = getLibJars(conf);if(libjars!=null && libjars.length>0) {conf.setClassLoader(new URLClassLoader(libjars, conf.getClassLoader()));Thread.currentThread().setContextClassLoader(new URLClassLoader(libjars, Thread.currentThread().getContextClassLoader()));}}if (line.hasOption("files")) {conf.set("tmpfiles", validateFiles(line.getOptionValue("files")),"from -files command line option");}if (line.hasOption("archives")) {conf.set("tmparchives", validateFiles(line.getOptionValue("archives")),"from -archives command line option");}conf.setBoolean("mapreduce.client.genericoptionsparser.used", true);// tokensFileif(line.hasOption("tokenCacheFile")) {String fileName = line.getOptionValue("tokenCacheFile");// check if the local file existsFileSystem localFs = FileSystem.getLocal(conf);Path p = localFs.makeQualified(new Path(fileName));localFs.getFileStatus(p);if(LOG.isDebugEnabled()) {LOG.debug("setting conf tokensFile: " + fileName);}UserGroupInformation.getCurrentUser().addCredentials(Credentials.readTokenStorageFile(p, conf));conf.set("mapreduce.job.credentials.binary", p.toString(),"from -tokenCacheFile command line option");}
}原理是把 fs、jt、D、libjars、files、archives、tokenCacheFile 相关参数放入到 Hadoop的 Configuration中了,终于清楚 GenericOptionsParser是干什么的了
args呢?如果要使用args,以上这种 fs、jt、D、libjars、files、archives、tokenCacheFile 是需要自己解析的。
Hadoop jar完整参数解释
hadoop jar wordcount.jar org.myorg.WordCount \-fs hdfs://namenode.example.com:8020 \-jt resourcemanager.example.com:8032 \-D mapreduce.job.queuename=default \-libjars /path/to/dependency1.jar,/path/to/dependency2.jar \-files /path/to/file1.txt,/path/to/file2.txt \-archives /path/to/archive1.zip,/path/to/archive2.tar.gz \-tokenCacheFile /path/to/credential.file \/input /output
这条命令会:
- 将作业提交到 hdfs://namenode.example.com:8020 文件系统
- 使用 resourcemanager.example.com:8032 作为 YARN ResourceManager
- 提交到 default 队列
- 使用 /path/to/dependency1.jar 和 /path/to/dependency2.jar 作为依赖
- 分发本地文件 /path/to/file1.txt 和 /path/to/file2.txt,注意 : 是本地文件哦
- 解压并分发 /path/to/archive1.zip 和 /path/to/archive2.tar.gz
- 分发凭证文件 /path/to/credential.file
MR实例
WordCount经典示例
public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {private IntWritable one = new IntWritable(1);private Text word = new Text();@Overrideprotected void map(LongWritable key,Text value,Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {String[] fields = value.toString().split("\\s+");for (String field : fields) {word.set(field);context.write(word, one);}}
}public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();@Overrideprotected void reduce(Text key,Iterable<IntWritable> values,Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}
}public class WCJob {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();// TODO 如果要是本地访问远程的hdfs,需要指定hdfs的根路径,否则只能访问本地的文件系统
// conf.set("fs.defaultFS", "hdfs://xx.xx.xx.xx:8020");GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);String[] remainingArgs = optionParser.getRemainingArgs();for (String arg : args) {System.out.println("arg :" + arg);}for (String remainingArg : remainingArgs) {System.out.println("remainingArg :" + remainingArg);}if (remainingArgs.length < 2) {throw new RuntimeException("input and output path must set.");}Path outputPath = new Path(remainingArgs[1]);FileSystem fileSystem = FileSystem.get(conf);boolean exists = fileSystem.exists(outputPath);// 如果目标目录存在,则删除if (exists) {fileSystem.delete(outputPath, true);}Job job = Job.getInstance(conf, "MRWordCount");job.setJarByClass(WCJob.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);job.setMapperClass(WCMapper.class);job.setReducerClass(WCReducer.class);FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
文件分发
public class ConfigMapper extends Mapper<LongWritable, Text, Text, NullWritable> {private List<String> whiteList = new ArrayList<>();private Text text = new Text();@Overrideprotected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {// 获取作业提交时传递的文件URI[] files = context.getCacheFiles();if (files != null && files.length > 0) {// 读取文件内容File configFile = new File("white.txt"); // 文件名要与传递的文件名保持一致try (BufferedReader reader = new BufferedReader(new FileReader(configFile))){String line = null;while ((line = reader.readLine()) != null) {whiteList.add(line);}}}}@Overrideprotected void map(LongWritable key,Text value,Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {String line = value.toString();String[] datas = line.split("\\s+");List<String> whiteDatas = Arrays.stream(datas).filter(data -> whiteList.contains(data)).collect(Collectors.toList());for (String data : whiteDatas) {text.set(data);context.write(text , NullWritable.get());}}
}public class ConfigJob {public static void main(String[] args) throws Exception {// 设置用户名System.setProperty("HADOOP_USER_NAME", "root");Configuration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://xx.xx.xx.xx:8020");GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);String[] remainingArgs = optionParser.getRemainingArgs();if (remainingArgs.length < 2) {throw new RuntimeException("input and output path must set.");}Path outputPath = new Path(remainingArgs[1]);FileSystem fileSystem = FileSystem.get(conf);boolean exists = fileSystem.exists(outputPath);// 如果目标目录存在,则删除if (exists) {fileSystem.delete(outputPath, true);}Job job = Job.getInstance(conf, "MRConfig");job.setJarByClass(ConfigJob.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);job.setMapperClass(ConfigMapper.class);FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
Dolphinscheduler MR使用
Yarn test队列设置
YARN 的配置目录中找到 capacity-scheduler.xml 文件。通常位于 $HADOOP_HOME/etc/hadoop/ 目录下。
修改 capacity-scheduler.xml
<property><name>yarn.scheduler.capacity.root.queues</name><value>default, test</value>
</property><property><name>yarn.scheduler.capacity.root.test.capacity</name><value>30</value>
</property><property><name>yarn.scheduler.capacity.root.test.maximum-capacity</name><value>50</value>
</property><property><name>yarn.scheduler.capacity.root.test.user-limit-factor</name><value>1</value>
</property>
刷新队列配置 yarn rmadmin -refreshQueues
流程定义设置
执行结果
离线任务实例
YARN作业展示
源码分析
org.apache.dolphinscheduler.plugin.task.mr.MapReduceArgsUtils#buildArgs
String others = param.getOthers();
// TODO 这里其实就是想说,没有通过 -D mapreduce.job.queuename 形式指定队列,是用页面上直接指定队列名称的,页面上 Yarn队列 输入框
if (StringUtils.isEmpty(others) || !others.contains(MR_YARN_QUEUE)) {String yarnQueue = param.getYarnQueue();if (StringUtils.isNotEmpty(yarnQueue)) {args.add(String.format("%s%s=%s", D, MR_YARN_QUEUE, yarnQueue));}
}// TODO 这里就是页面上,选项参数 输入框
// -conf -archives -files -libjars -D
if (StringUtils.isNotEmpty(others)) {args.add(others);
}
转载自Journey 原文链接:https://segmentfault.com/a/1190000045403915
本文由 白鲸开源科技 提供发布支持!