其实这一篇我有简单提到这个MapReduse的概念,但是只是粗略的讲解,可以去大致看一眼MapReduse跟HDFS、YARN的关系:
大数据之——Hadoop的HDFS、YARN、MapReduce_大数据hdfs-CSDN博客
(另外注意,这一篇文章需要有一定java基础,本文大量用到java,不再过多解释java原理)
一、MapReduce是啥?
Hadoop里的HDFS是负责【存储】
Hadoop里的YARN是负责【任务资源调度】
那么Hadoop里的MapReduce就是负责【计算、统计】
MapReduce有两个过程阶段:【Map】和【Reduce】
【Map】阶段就是将海量大数据拆分成若干份小数据,多个程序同时并行计算出自己的结果
【Reduce】阶段就是将【Map】阶段求得的所有结果汇总,最终得到想要的数据结果
上面的解释可能还是比较模糊难懂,我一个人理解举一个通俗的例子:
假设有一个搜索引擎网站希望统计近段时间用户搜索的关键词权重,那不可能放一个服务器里一个程序去计算吧?每天有几千亿条搜索数据呢!
;
那就把他们分散到多个服务器来分别统计,然后各个服务器统计出自己的结果(将数据分解成键值对),然后全部汇总到一个服务器,这部就能得出各个搜索关键词的权重了吗
二、MapReduce的大概逻辑
首先注意,MapReduce统计的数据就是以【键值对】形式表示的(键值对自己查,这里不过多解释)
那么我用图画来解释一下MapReduce的简单流程:
【Map阶段】将数据分成【key:数据,value:1】的形式
【Reduce阶段】将重复【key】的数据整合成【key:数据,value:N】的最终结果
;
其中Map阶段和Reduce阶段都是通过JAVA的MapReduce的程序执行,Map阶段的JAVA程序叫MapTask,Reduce阶段的JAVA程序叫ReduceTask
但是简单流程里我们只知道要分成<key, value>的形式,到底详细的流程应该是怎么转变的呢?
只需记住,数据主要经过三种变化:<k1, v1> ———> <k2, v2> ———> <k3, v3> (key简写(k),value简写(v))
首先前期我们读取数据文件(excel、mysql、csv...文件),得到<k1, v1>。
这里的key暂时只是文件每一行数据距离第一行的字节偏移量(这个k1是默认生成的,没有什么意义)
value只是一行文本,每一行整行就为一个value
;
然后Map阶段主要是两个步骤来形成 :<k2, v2>【一拆一合】
【拆】:将<k1, v1>的一行文本数据,根据特征、分隔符分出来我们要的明确的N多个数据,作为key2,每个key2的value2都统计是1
【合】:将这些N个Key2统计,重复出现key2的次数就整合进一个集合,最终变成一个key2对应集合value2<1,1,1......>
(通俗理解:v1拆完变成了k2,v2是根据k2统计的数值)
;
最后Reduce阶段,将<k2, v2>的value2集合数据值再次统计,将各个集合统计出总和,最后得到<key: 数据,value: N>的完整<k3, v3>结果
;
【完整流程】
【YARN】的集群MapReduce工作流程:
三、MapReduce的java代码编写逻辑
根据我们我们前面的流程我们可以知道,我们需要关心的部分就是中间的java程序的代码编写,那么这里就分析一下代码的逻辑
1、Map阶段分2步
1)读取文件获取<k1, v1>
我们要读取文件,要创建【TextInputFormat】这个接口子类输入、读取文件,并将<k1, v1>给Map
2)写Map逻辑,得出<k2, v2>
接收到<k1, v1>,自定义Map逻辑,将<k1, v1>【一拆一合】形成<k2, v2>
2、Shuffle阶段4步骤
- 对输出的 Key-Value 对进行分区
- 对不同分区的数据按照相同的 Key 排序
- (可选)对分组过的数据初步规约,降低数据的网络拷贝
- 对数据进行分组,相同 Key的 Value 放入一个集合中
3、Reduce阶段3步
1)第一步
对多个 Map 任务的结果进行排序以及合并,编写 Reduce 函数实现自己的逻辑,对输入的<key-Value 进行处理,转为新的 Key-Value(K3和V3)
2)第二步
输出设置 OutputFormat 处理并保存 Reduce 输出的 Key-Value 数据
四、WordCount程序代码编写
WordCount算是大数据计算领域经典的入门案例,相当于Hello World,就是最经典的一个MapReduce的案例实践 ———— 统计数据中所有单词出现的次数
牢记三大核心:
map阶段的核心:把输入的数据经过切割,全部标记1,因此输出就是<单词,1>。
;
shuffle阶段核心:经过MR程序内部自带默认的排序分组等功能,把key相同的单词会作为一组数据构成新的kv对。
;
reduce阶段核心:处理shuffle完的一组数据,该组数据就是该单词所有的键值对。对所有的1进行累加求和,就是单词的总次数。
1、前期准备
1)虚拟机本地创建一个【数据文件】作为数据来源
到虚拟机,不管什么系统只要是Linux系统就行,创建并进入 【/export/servers】路径
注意这是根目录下,要切换成root用户才有权限(【sudo -i】)
然后创建 “wordcount.txt”文件 并用编辑器写入内容
复制粘贴下面内容进入 “wordcount.txt”文件
hello,world,hadoop
hive,sqoop,flume,hello
kitty,tom,jerry,world
hadoop
2)上传这个文件到HDFS
首先先启动HDFS
上传文件到hadoop(记得先在hadoop创建一个放这个文件的路径)
2、代码编写
JAVA的MapReduce的代码编写:
1)创建maven管理的java项目
这里我不打算细讲maven,因为我之前做过java后端开发,所以之前已经配置好了maven,没了解过maven的去我这篇文章了解一下:
后端之路第一站——Maven_java后端项目maven配置-CSDN博客
那么如果之前已经配置过maven管理的,基本流程就是【新建项目】——>【选择maven构建系统】完事了
但是这里有一个很致命的事情一定要在开头就说清楚!!!!!!!!!!!!!!
我以我的性命担保,按照我的方法来,一定一定一定一定一定一定!!!!创建项目时要选【老版本】的java环境,也就是JDK,如果你实在想不到用什么版本的,就按照我图片的来,选择【1.8版本】,绝对百分之一百不会错!!!!!
(1.8版本的JDK的安装的方式)
点击下面百度网盘分享的文件:(提取码:44hp)
百度网盘 请输入提取码
这个本地安装教程我就不不再这讲了,默认各位都是有java基础的,实在不会就去这个博客下看:JDK1.8安装和环境配置教程(2024年10月26日)_jdk1.8下载-CSDN博客
2)在本地引入hadoop依赖
之前我们在配置虚拟机的时候知道,要想用虚拟机里用java写hadoop代码,就得安装hadoop然后导入一大堆jar包进编译器
但是学过java后端的人都知道,maven管理的项目很简单,直接把配置代码写进【pom.xml】里,点一下“maven”按钮就开始自动下载安装导入了
那么我们只需要把下面这堆代码复制进【pom.xml】文件里
<dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.3.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>test</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.30</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.2</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>compile</scope></dependency></dependencies>
另外再加下面这一行,意思是打包成一个不包含所有依赖包的整体项目的jar包,会体积更轻
<packaging>jar</packaging>
最后点击右上角的【maven】更新按钮,一键导入
3)首先创建Mapper类
首先先创建一个在【src/main/java】路径下创建一个【xxx.xxx.xxx】的路径,“.”是层级关系的意思,从左到右一层一层往里包含,创建好后在里面写我们的java代码
创建一个【Mapper类】用来写map阶段的代码,你可以自定义名字【xxxMapper】,这里我们写的案例是“wordcount”那就叫【WordCountMapper】
4)开始代码逻辑编写
总体思路就是创建三个类:【Mapper】【Reducer】【执行主类:job类】
【Mapper】类创建
首先extends继承【Mapper】这个父类,注意导包要导下图的这个依赖包
然后@Overrider重写Mapper父类的map方法(敲除map这个单词就有提示了,直接选)
首先解释Mapper这个父类的泛型,Mapper是一个java泛型类型数据,它里面的4个泛型类型分别指代的就是map阶段的【k1, v1, k2, v2】四个数据的类型
;
那么根据我们前面的分析我们可以知道【k1, v1】对应的java的数据类型应该是【Long, String】;然后【k2, v2】对应的数据类型应该是【String, Long】
但是,mapreduse有他自己的框架,人家不用java的Long、String,他非要自己整个LongWritable代替Long、Text代替String,那没法子,我们只能跟着改咯
;
你可以理解为原本应该写成Mapper<Long, String, String, Long>变成了下面这样
然后解释一下Mapper类的这个【map方法】,这个方法就是为了将<k1,v1> 转变成——> <k2, v2>,因此他就要接收<k1, v1>这两参数
那么第1个参数key就是代表接收到的k1,第2个参数value就是代表接收到的v1,第3个参数则是用Mapper的context这个内部类,来处理上下文关系,将拆分好的k1、k2组合成<k2, v2>写入文中,用它就不需要我们在关心中间的逻辑了
接下来就是怎么去写【map方法】里的逻辑了,那么我们的首先目的就是要把<k1, v1>的v1拆分成一个一个我们要的确切的单词数据(k1只是一个偏移量,对我们的意义不大可以不管了)
那么第一步就是把【代表v1的第2个value】拆分
但是因为value是Text类型的,java的String才能用split分割字符串
所以要先把value转成String类型,再根据我们自定义的“,”逗号分隔符拆分字符串数据
接着第二步,循环我们拿到的【v1数据数组】,然后遍历它,通过【context.write(Text数据 ,LongWritable统计数值)】方法将<v1> 变成——> <k2, v2>
;
其中这个方法要穿的第一个参数是我们的(Text类型的)每一个 v1,第二个参数是(LongWritable类型的)的统计数值,我们说过map阶段我们只需要将每一个出现的key都统计为1就行。
;
但是注意我们刚刚拆分得到的【v1数据数组】是String类型,【1】是Long、int类型,那么就要分别用【new Text( String字符串值 )】和【new LongWritable( Long数字 )】的方式将他们数据类型强制转化
但是这有一个弊端,就是我们在循环【v1数据数组】时,每遍历一次就要创建一个Text( )对象和一个LongWritable( )对象,有点浪费
;
那么我们还有一个方案,开始创建一个Text( )对象和一个LongWritable( )对象,然后在循环里只需要调用他两的【Text对象.set( String字符串值 )】【LongWritable对象.set( Long数字 )】也能达到强制数据类型转换的效果
至此Mapper类的代码就写完了,完整代码:
package org.czm.mapreduce;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** 四个泛型解释:* KEYIN: 【K1的类型】LongWritable类型(这是mapreduce自定义的变量,对应java的Long)* VALUEIN: 【V1的类型】Text类型(这是mapreduce自定义的变量,对应java的String)** KEYOUT: 【K2的类型】Text类型(这是mapreduce自定义的变量,对应java的Long)* VALUEOUT: 【V2的类型】LongWritable类型(这是mapreduce自定义的变量,对应java的String)* java的话就应该理解为Mapper<Long, String, Long, String>* mapreduce的话就应该理解为Mapper<LongWritable, Text, Text, LongWritable>*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {/*** map方法就是将【K1、V1】变成【K2、V2】* @param key:K1(偏移量)* @param value:V1(每一行的文本数据)* @param context(表示上下文对象)*/@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {//没用删掉super.map(key, value, context);//1、拆分【V1】,就是把每一行的文本数据(类似:hello,world,hadoop)拆分String[] words = value.toString().split(",");//2、遍历拆分好的数组,组装【K2、V2】Text text = new Text();LongWritable longWritable = new LongWritable();for(String word : words) {//text跟longWritable对象的【.set()】方法跟创建他两的对象再转数据类型是一样的text.set(word);longWritable.set(1);context.write(text, longWritable);}}
}
【Reducer】类创建
跟mapper一样,首先extends继承【Reducer】这个父类,注意导包要导下图的这个依赖包
然后@Overrider重写Mapper父类的reduece方法(敲出reduce这个单词就有提示了,直接选)
跟Mapper同理,Reducer也是一个java泛型类型数据,它里面的4个泛型类型分别指代的就是reduce阶段的【k2, v2, k3, v3】四个数据的类型
另外注意一点,我们在map阶段将<k1, v1>变成了<k2: 单词 , v2: 1 >的形式,但是Mapper类还会对它们再进行一次处理,得出新的<k2: 单词 , v2: <1,1,1...> >这种新形式的<k2, v2>
;
因此Reduce的【reduce方法】里可以发现,
第1个参数代表新的k2;
第2个参数代表新的v2,但是不是数值类型而是一个 "Interable<>可迭代" 类型数据,你可以理解 "Interable<>" 就等于数组、集合、哈希表这样的数据容器,那么我们的新v2就是集合,因此用"Interable<>"接收;
第3个参数conText就还是跟mapper一样,代表上下文对象,只不过Reducer的conText是帮我们把<k2, v2>变成<k3, v3>
接下来就是怎么去写【reduce方法】里的逻辑了,那么我们的首先目的就是要把<k2, v2>的v2的集合总和求出来,就能得到每个单词的统计结果了
那么跟map方法逻辑不同的就是,reduce的逻辑是遍历对应所有k2的v2,然后只需要把v2的集合总数通过累加器得出总和就行了
;
(另外这里有个逻辑我当时不明白,为什么统计每个集合的总和之后,【countText.write()】只在外面执行一次,而不是也在循环里把每一个key的v3写入文中?
;
原理是这个reduce方法会本身就会在外面被循环执行,每次传一个k2进来,然后value就是k2的v2集合,我们的循环是把集合总和求出来,然后最红结果在外面通过【countText.write()】组成<k3, v3>)
Reducer类完整代码:
package org.czm.mapreduce;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** 四个泛型解释:* KEYIN: 【K2的类型】LongWritable类型(这是mapreduce自定义的变量,对应java的Long)* VALUEIN: 【V2的类型】Text类型(这是mapreduce自定义的变量,对应java的String)** KEYOUT: 【K3的类型】Text类型(这是mapreduce自定义的变量,对应java的Long)* VALUEOUT: 【V3的类型】LongWritable类型(这是mapreduce自定义的变量,对应java的String)*/
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {/*** reduce方法:将新得到的【K2,V2】变成【K3,V3】,并写入上下文* @param key:新K2* @param values:新V2(集合)* @param context:表示上下文对象*/@Overrideprotected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {//1、遍历集合,将每个V2的<1,1,1...>这样的集合数据相加,得到V3long count = 0;for (LongWritable value : values) {//这里因为每个成员都是LongWritable,不能直接跟long类型相加//.get()方法就可以获得正常数字count += value.get();}//2、将新的【K2、V2】转为【K3、V3】context.write(key, new LongWritable(count));}
}
【job执行类】创建
首先创建一个【JobMain】类
然后这个类需要先extends继承【Configured】这个父类,还要implements实现【Tool】这个接口(别选错了包了,记得是hadoop包的噢)
然后实现重写Tool接口的【run】方法
然后在【run】方法下面再写一个【main】方法,这就是【JobMain】的入口启动方法,这整个项目由【JobMain】启动,【JobMain】又由【main】方法启动
【main】方法要做的事很简单,就是利用【ToolRunner这个类】的【run方法】去触发一个job任务启动
首先创建【ToolRunner这个类的run方法】
然后传入三个参数:
1、第一个是Configuration对象类型的数据,不用管是干啥的,直接新建这个对象然后传入就完事
2、第二个参数是要的是一个Tool接口实现类,那么我们这个JobMain不就是个Tool的实现子类嘛,直接搞一个new JobMain()完事,不用管为什么
3、第三个参数要一个字符串数组,那直接把main方法的args给它,我也不知道为什么,搞就完事
然后抛一下异常就行了
然后要写一个System.exit()方法等待job任务执行完退出程序,这里System.exit()方法需要一个数字参数:0或1,那么刚好ToolRunner.run()方法会返回一个数字结果,0就是失败、1就是成功,那么我们就要根据job任务的状态来让System.exit()方法退出程序。
接着写上面【run】方法
总体来看就是:【创建job对象】——>【创建并配置job任务】
那么先看【创建job对象】
注意别导错包
【Job.getInstance()】可以返回一个job对象,但是这个方法需要两个参数:
;
第一个参数是下面的main方法传进去的那个【configuration】,这个值是存放到configured这个父类的,然后JobMain类又继承了configured父类,那么就要通过【super.getConf()】方法取出【configuration】
;
第二个参数就是指代你这个job任务的名字,随便取就行
然后看【创建并配置job任务】
一共八步:
1、设置读取文件的类型为TextInputFormat类,然后设置读取文件的路径
2、设置mapper的类型:指定使用我们刚刚写的WordMapper类,然后指定k2、v2的类型
3、4、5、6、都是suffle阶段的分区、排序、规约、分组
7、设置reduce的类型:指定使用我们刚刚写的WordReducer类,然后指定k3、v3的类型
8、设置输出文件的类型为TextOutputFormat类,以及输出的文件的路径
1、设置读取文件的类型为TextInputFormat类,然后设置读取文件的路径
这里读取文件的路径代表我们要读取hadoop的hdfs文件系统哪里的文件,前面我们创建了【/wordcount】这层目录,里面也有一个【wordcount.txt】文件,只需要写【/wordcount】就会自动扫描到里面的【wordcount.txt】文件
;
然后前面的【hdfs://localhost:9000】这个前缀,是我们在当时伪分布式安装hadoop式配置的端口路径,在这个文件能看到
2、设置mapper的类型:指定使用我们刚刚写的WordMapper类,然后指定k2、v2的类型
注意导入Text类
3、4、5、6、都是suffle阶段的分区、排序、规约、分组(暂时先跳过不管)
7、设置reduce的类型:指定使用我们刚刚写的WordReducer类,然后指定k3、v3的类型
8、设置输出文件的类型为TextOutputFormat类,以及输出的文件的路径
最后,我们main方法里的Runner.run()方法不是有一个数值返回值判断job任务是否执行成功吗?正好job有一个【job.waitForCompletion(true);】方法判断job任务是否执行成功,但是因为返回的是boolean布尔值,所以在run方法最后return那里我们用个三元表达式,通过【job.waitForCompletion(true);】的结果返回0或1。
完整代码:
package org.czm.mapreduce;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;public class JobMain extends Configured implements Tool {@Overridepublic int run(String[] strings) throws Exception {// 1、创建job任务对象// 第一个参数是:// 下面main方法的ToolRunner.run()方法把Configuration这个对象其实是传给【父类Configured】保存起来// 所以这里要通过super.getConf()获取【父类Configured】的Configuration// 第二个参数是:// 随便一个名字,代表你这个Job任务的名字Job job = Job.getInstance(super.getConf(), "wordcount");// 2、创建Job任务的8步// 第一步:设置读取文件的类型为TextInputFormat类,然后设置读取文件的路径// 这里设置的是输入的路径,也就是MapReduce程序读取我们上传到hdfs文件系统的数据的路径job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/wordcount"));// 第二步:设置mapper的类型:// (1)就是我们写好的WordMapper类// (2)设置mapper的输出k2类型// (3)设置mapper的输出v2类型job.setMapperClass(WordCountMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);// 第三、第四、第五、第六步// suffle阶段的分区、排序、规约、分组......暂时省略可以不写// 第七步:设置reduce的类型:// (1)就是我们写好的WordReducer类// (2)设置reduce的输出k3类型// (3)设置reduce的输出v3类型job.setReducerClass(WordCountReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);// 第八步:设置输出文件的类型为TextOutputFormat类,以及输出的文件的路径// 这里设置的是输出的路径,也就是最终结果输出到hdfs文件系统的数据的路径,没有就自动创建一个job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/wordcount_output"));// 最后返回一个布尔值判断job任务执行是否成功boolean b = job.waitForCompletion(true);// run方法最终是根据job任务是否执行成功,要返回一个int类型的结果,那么就根据job布尔判断写一个三元判断return b? 0 : 1;}public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();//ToolRunner.run()方法其实就是执行上面run方法,只不过间接隔了一层Configured父类//这个方法的三个参数:// 第1个参数要一个Configuration对象,这个对象其实是传给父类Configured保存起来,上面的run要通过super.getConf()获取// 第2个参数要一个Tool对象,那么JobMain就继承实现了Tool接口,所以直接new JobMain()就行// 第3个参数直接给它这个args字符串数组就行(不用管为啥,记住就行)int run = ToolRunner.run(configuration, new JobMain(), args);//这个意思就是程序执行完自动退出,根据run的结果,不管run是1(成功)还是0(失败)都退出System.exit(run);}}
五、wordcount项目到Linux系统运行
1、将本地java编写的MapReduce项目打包
maven管理的java项目很简单,直接点右边侧边栏的maven,然后找到【package】点两下就开始打包了
把上面图片里打包好的jar包文件复制一份,我们还是用共享文件夹传到虚拟机里,或者你用vwtools直接复制粘贴也行,反正最终就是把这个压缩包给它粘贴到虚拟机Linux系统的【~/Downloads】这层目录下
然后在终端控制中,通过下面的命令将这个jar包运行起来
hadoop jar [你刚刚打包的mapreuce项目jar包路径] [你的项目里src/java/下的三层路径,一直到JobMain]//比如你的jar包文件名叫[xxx.jar],你的JobMain的java项目路径是[xxx.xxx.xxx.JobMain]
//那就【hadoop jar ~/Downloads/xxx.jar xxx.xxx.xxx.JobMain】
2、前提准备(运行出错的情况的人要看)
(虚拟机安装JDK 11 和 hadoop 3.3以上)
但是同志们!不出意外的话肯定会出意外!
如果你之前虚拟机里的Java环境JDK是很老很老的版本(比1.8还老),然后你本地写的java项目的JDK就算是1.8或者很高的版本(一般1.8不会出错),那么当Linux虚拟机运行你本地计算机上传上的MapReduce项目时就会报错了
因为版本不兼容!!!我简单来说就是应该让我们本地计算机的java项目的JDK应该足够老,虚拟机Linux系统的JDK应该足够新!!因为新版本的JDK能向老版本的JDK兼容
那么这里注意了:
【hadoop 3.3以下】的环境只能配对【JDK11以下】的版本;
【hadoop 3.3以上】的环境只能配对【JDK11以上】的版本
简单说句人话就是:既然我们本地的JDK比较新,或者是即使我们已经用了【1.8】版本JDK,写的java代码还是显示版本比虚拟机的JDK过高的话,那我们至少要将我们的 Java 运行时环境升级到至少 Java 17,也就是换成【JDK11】
那么我现在要做的第一步:安装【11版本的JDK】
;
去到Oracle官网下载:Java Downloads | Oracle 中国
;
然后还是那套流程,把安装包放到虚拟机Linux系统的【~/Downloads】下,然后再通过【sudo tar -zxf ~/Downloads/jdk-11.0.25 linux-x64 bin.tar.gz -C /usr/lib/jvm】解压安装到【/usr/lib/jvm】目录
;
接下来还是跟之前一开始配置java环境一样的流程,设置环境变量
可以用【vim编辑器】来改写【~/.bashrc】这个配置文件
输入【vim ~/.bashrc】
然后把第一行 【export JAVA_HOME=】后面那部分换成你现在要换的JDK版本的安装路径,比如我刚刚安装的是 “jdk-11.0.25”,那我就【export JAVA_HOME=/usr/lib/jvm/jsk-11.0.25】
;
最后修改完之后,输入【source ~/.bashrc】进行重启配置
然后输入【java -version】应该就能看到当前java环境以及变成我们所要的JDK版本了
然后我们来开始安装适配【11版本JDK】的【hadoop】
前面说过了【JDK 11以上】的版本要适配【hadoop 3.3以上】
那么开始安装【hadoop 3.3以上】
;
这个我看了一下,综合评判下来,我还是觉得自己本机下载安装包,再移到虚拟机解压的这种老办法稳妥一点,毕竟我自己这样比较熟不容易错
;
首先,安装地址我们用清华大学的国内镜像网址进行安装hadoop安装包:
Index of /apache/hadoop/commonhttps://mirrors.tuna.tsinghua.edu.cn/apache/hadoop/common/直接自己选一个3.3以上的版本进行安装
;
然后还是那套操作:移到共享文件夹 ——> 移到【~/Downloads】
把【~/Downloads】的hadoop压缩包解压到【/usr/local】,执行这个命令:【sudo tar -zxf [hadoop的路径] -C [/usr/local]】
;
接下来就是切换hadoop仪式:
先停掉hadoop服务,输入【/usr/local/hadoop/sbin/stop-all.sh】
然后我们把之前不用版本的hadoop换掉,你有两个选择,你可以输入下面这个命令把原来的hadoop备份起来,换成叫 “hadoop-old”留着
sudo mv /usr/local/hadoop /usr/local/hadoop-old
或者直接输入下面命令删除
sudo rm -rf /usr/local/hadoop
然后,我们把想要换成的【hadoop 3.3以上】的变成现在系统的【hadoop】,就执行下面的命令:【sudo mv [/usr/local/你的hadoop全名] /usr/local/hadoop】
;
然后我们需要给这个hadoop这个文件夹设置一下权限,记住一定要是hadoop用户下进行操作,然后设置了权限之后以后我们才有权限编辑配置hadoop的配置文件
【sudo chown -R hadoop /usr/local/hadoop】
;
最后输入【cd /usr/local/hadoop】,再输入【./bin/hadoop version】查看是否安装成功,能出现版本信息就行
最后就是配置【hadoop】!!
接下来就是伪分布式配置文件步骤,那么伪分布式的配置记住了,就配置这两个文件:
【/usr/local/hadoop/etc/hadoop/】路径下的【core-site.xml】 和 【hdfs-site.xml】
;
确保【core-site.xml】文件里有配置下面的代码
<property><name>hadoop.tmp.dir</name><value>file:/usr/local/hadoop/tmp</value><description>Abase for other temporary directories.</description></property><property><name>fs.defaultFS</name><value>hdfs://localhost:9000</value></property>
;
确保【hdfs-site.xml】有配置下面的代码:
<property><name>dfs.replication</name><value>1</value></property><property><name>dfs.namenode.name.dir</name><value>file:/usr/local/hadoop/tmp/dfs/name</value></property><property><name>dfs.datanode.data.dir</name><value>file:/usr/local/hadoop/tmp/dfs/data</value></property>
;
以防万一,有的人还会出现后期hadoop找不到JAVA环境的情况,我们再到【/usr/local/hadoop/etc/hadoop/】,编辑【hadoop-env.sh】这个配置文件,执行下面命令:
【cd /usr/local/hadoop/etc/hadoop/】
【vim hadoop-env.sh】
然后添加一句【export JAVA_HOME=[你的java安装路径,要跟你~/.bashrc的java路径一样]】
最后重启设置
【source ~/.bashrc】
;
然后再对NameNode进行格式化:【cd /usr/local/hadoop】——>【./bin/hdfs namenode -format】
;
接着启动整个Hadoop
进入到【/usr/local/hadoop】路径
——然后执行【./sbin/start-all】
——或者分别先执行【./sbin/start-dfs.sh】再执行【./sbin/start-yarn.sh】
cd /usr/local/hadoop ./sbin/start-all.sh
或者
cd /usr/local/hadoop ./sbin/start-dfs.sh ./sbin/start-yarn.sh
;
输入【jps】看看有没有显示多个节点
;
最后打开火狐浏览器输入【localhost:9870】看看有没有正常显示下图
以及点击【Utilities】——>【Browse the file system】有没有hdfs的文件系统目录显示
3、重新打包测试
再经历完上面极度痛苦折磨的过程之后,我们再试一下我们的MapReduce项目能否运行
重新检查一下本地的wordcount.txt还在不在
然后在我们新的hadoop 3.3.5的文件系统上创建【/wordcount】目录,并将wordcount.txt重新上传到hdfs文件系统上
然后【hadoop jar [我们写的MapReduce项目的jar包] [这个java项目里的JobMain类]】来启动这个项目
然后,出现上面图片那样就是执行成功了
我们可以从本地通过hdfs的命令来查看刚刚运行完【WordCount项目的JobMain类】之后输出的结果文件,就在hdfs的【/wordcount_output】下的【part-r-00000】,不确定路径的话可以先用【hadoop fs -ls [路径]】或【hdfs dfs -ls [路径]】命令一层一层的找
确定了路径之后输入【hadoop fs -cat /wordcount_output/part-r-00000】或【hdfs dfs -cat /wordcount_output/part-r-00000】来查看文件内容
或者更直观的话,取到浏览器【localhost:9870】,然后点击【Utilities】——>【Browse the file system】,输入框输入【/wordcount_output】后点击【part-r-00000】这个文件
六、wordcount项目结果运行在本地(测试常用)
刚刚那种方式我们是直接将文件上传到虚拟机的Linux系统的HDFS文件系统,并将输出结果也运行到HDFS了
但是在测试阶段,我们通常只会将文件放在本地,不然一直上传到虚拟机、再上传到HDFS、在输入命令一大堆的很麻烦
那么我们只需要在【JobMain】里将输入跟输出的路径更改成本地计算机的路径就行了
1、前提准备
1)在本地window系统配置hadoop环境
这一步我另外写了一篇文章,我怕这里看着太臃肿,请各位先前往这篇文章配置好自己的window的hadoop环境先
大数据之——Window电脑本地配置hadoop系统(100%包避坑!!方便日常测试,不用再去虚拟机那么麻烦)-CSDN博客
2)准备好本地的wordcount文件
首先本地随便找个地方,创建一个【input】输入路径
【output】输出目录我们不用创建,因为等会我们运行mapreduce的时候,产生输出文件时它会自动生成
再在里面创建一个【wordcount.txt】输入来源文件
3)修改MapReduce的java代码
ok,接下来这波,我们只需要做2件事了
1、修一下JobMain里输入输出的路径
换成本地的就行,注意在路径前加上【file:///】,然后后面的路径里【/】换成【\\】
【输入】
【输出】
现在运行已经没问题了,直接运行
然后回到文件夹就能看到输出文件了
JobMain源代码:
package org.czm.mapreduce;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;public class JobMain extends Configured implements Tool {@Overridepublic int run(String[] strings) throws Exception {// 1、创建job任务对象// 第一个参数是:// 下面main方法的ToolRunner.run()方法把Configuration这个对象其实是传给【父类Configured】保存起来// 所以这里要通过super.getConf()获取【父类Configured】的Configuration// 第二个参数是:// 随便一个名字,代表你这个Job任务的名字Job job = Job.getInstance(super.getConf(), "wordcount");// 2、创建Job任务的8步// 第一步:设置读取文件的类型为TextInputFormat类,然后设置读取文件的路径// 这里设置的是输入的路径,也就是MapReduce程序读取我们上传到hdfs文件系统的数据的路径job.setInputFormatClass(TextInputFormat.class);//TextInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/wordcount"));TextInputFormat.addInputPath(job, new Path("file:///F:\\编程学习资料\\大数据\\mapreduce\\input"));// 第二步:设置mapper的类型:// (1)就是我们写好的WordMapper类// (2)设置mapper的输出k2类型// (3)设置mapper的输出v2类型job.setMapperClass(WordCountMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);// 第三、第四、第五、第六步// suffle阶段的分区、排序、规约、分组......暂时省略可以不写// 第七步:设置reduce的类型:// (1)就是我们写好的WordReducer类// (2)设置reduce的输出k3类型// (3)设置reduce的输出v3类型job.setReducerClass(WordCountReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);// 第八步:设置输出文件的类型为TextOutputFormat类,以及输出的文件的路径// 这里设置的是输出的路径,也就是最终结果输出到hdfs文件系统的数据的路径,没有就自动创建一个job.setOutputFormatClass(TextOutputFormat.class);//TextOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/wordcount_output"));TextOutputFormat.setOutputPath(job, new Path("file:///F:\\编程学习资料\\大数据\\mapreduce\\output"));// 最后返回一个布尔值判断job任务执行是否成功boolean b = job.waitForCompletion(true);// run方法最终是根据job任务是否执行成功,要返回一个int类型的结果,那么就根据job布尔判断写一个三元判断return b? 0 : 1;}public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();//ToolRunner.run()方法其实就是执行上面run方法,只不过间接隔了一层Configured父类//这个方法的三个参数:// 第1个参数要一个Configuration对象,这个对象其实是传给父类Configured保存起来,上面的run要通过super.getConf()获取// 第2个参数要一个Tool对象,那么JobMain就继承实现了Tool接口,所以直接new JobMain()就行// 第3个参数直接给它这个args字符串数组就行(不用管为啥,记住就行)int run = ToolRunner.run(configuration, new JobMain(), args);//这个意思就是程序执行完自动退出,根据run的结果,不管run是1(成功)还是0(失败)都退出System.exit(run);}}
2、配置日志
但是光上面那样,控制台会有一些刺眼的红色警告,虽然不影响运行,但是看着难受
那么首先:
在你的 pom.xml
文件中添加 Logback 的依赖:
<dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.3</version>
</dependency>
在你的项目根目录下创建一个 logback.xml
文件,内容如下:
<configuration><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern></encoder></appender><root level="debug"><appender-ref ref="STDOUT" /></root>
</configuration>
最后在resource目录下创建一个【log4j.properties】文件
下入如下内容
# configure logging for testing: optionally with log file
#log4j.rootLogger=debug,appender
log4j.rootLogger=info,appender
#log4j.rootLogger=error,appender#\u8F93\u51FA\u5230\u63A7\u5236\u53F0
log4j.appender.appender=org.apache.log4j.ConsoleAppender
#\u6837\u5F0F\u4E3ATTCCLayout
log4j.appender.appender.layout=org.apache.log4j.TTCCLayout
重新运行,爽
七、wordcount项目小问题
1、有的时候打包会出问题
可以在JobMain添加这一句就行了
2、如果你执行一次,就已经在指定输出位置产生输出文件,再次执行就会报错
那么执行用指令删除原来的输出文件即可
但是老重新删除也麻烦,那么我们可以写一段代码,提前判断输出路径是否已经有输出文件存在
3、重新打包
先clean一下
然后package打包
最新JobMain源代码:
package org.czm.mapreduce;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.net.URI;public class JobMain extends Configured implements Tool {@Overridepublic int run(String[] strings) throws Exception {// 1、创建job任务对象// 第一个参数是:// 下面main方法的ToolRunner.run()方法把Configuration这个对象其实是传给【父类Configured】保存起来// 所以这里要通过super.getConf()获取【父类Configured】的Configuration// 第二个参数是:// 随便一个名字,代表你这个Job任务的名字Job job = Job.getInstance(super.getConf(), "wordcount");//如果打包运行出错,则需要加该配置job.setJarByClass(JobMain.class);// 2、创建Job任务的8步// 第一步:设置读取文件的类型为TextInputFormat类,然后设置读取文件的路径// 这里设置的是输入的路径,也就是MapReduce程序读取我们上传到hdfs文件系统的数据的路径job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/wordcount"));//TextInputFormat.addInputPath(job, new Path("file:///F:\\编程学习资料\\大数据\\mapreduce\\input"));// 第二步:设置mapper的类型:// (1)就是我们写好的WordMapper类// (2)设置mapper的输出k2类型// (3)设置mapper的输出v2类型job.setMapperClass(WordCountMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);// 第三、第四、第五、第六步// suffle阶段的分区、排序、规约、分组......暂时省略可以不写// 第七步:设置reduce的类型:// (1)就是我们写好的WordReducer类// (2)设置reduce的输出k3类型// (3)设置reduce的输出v3类型job.setReducerClass(WordCountReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);// 第八步:设置输出文件的类型为TextOutputFormat类,以及输出的文件的路径// 这里设置的是输出的路径,也就是最终结果输出到hdfs文件系统的数据的路径,没有就自动创建一个job.setOutputFormatClass(TextOutputFormat.class);Path outPut_path = new Path("hdfs://localhost:9000/wordcount_output");TextOutputFormat.setOutputPath(job, outPut_path);//TextOutputFormat.setOutputPath(job, new Path("file:///F:\\编程学习资料\\大数据\\mapreduce\\output"));//但是每次都要删除输出路径很麻烦,那么我们直接先提前判断这个路径是否存在就行//获取FileSystemFileSystem fileSystem = FileSystem.get(new URI("hdfs://localhost:9000"), new Configuration());boolean pathExisted = fileSystem.exists(outPut_path);//如果路径存在,则删除if (pathExisted) {//删除目标目录fileSystem.delete(outPut_path, true);}// 最后返回一个布尔值判断job任务执行是否成功boolean b = job.waitForCompletion(true);// run方法最终是根据job任务是否执行成功,要返回一个int类型的结果,那么就根据job布尔判断写一个三元判断return b? 0 : 1;}public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();//ToolRunner.run()方法其实就是执行上面run方法,只不过间接隔了一层Configured父类//这个方法的三个参数:// 第1个参数要一个Configuration对象,这个对象其实是传给父类Configured保存起来,上面的run要通过super.getConf()获取// 第2个参数要一个Tool对象,那么JobMain就继承实现了Tool接口,所以直接new JobMain()就行// 第3个参数直接给它这个args字符串数组就行(不用管为啥,记住就行)int run = ToolRunner.run(configuration, new JobMain(), args);//这个意思就是程序执行完自动退出,根据run的结果,不管run是1(成功)还是0(失败)都退出System.exit(run);}}