ubantu18.04(Hadoop3.1.3)之Flink安装与编程实践(Flink1.9.1)
说明:本文图片较多,耐心等待加载。(建议用电脑)
注意所有打开的文件都要记得保存。
第一步:准备工作
本文是在之前Hadoop搭建完集群环境后继续进行的,因此需要读者完成我之前教程的所有操作。
注意本次实验需要你的虚拟机内存大于4G,硬盘内存大于20G
如果没有满足条件,请到我的ubantu虚拟机专栏里进行扩容操作
以下所有操作均在Master主机进行。
第二步:安装Flink
大家直接在虚拟机打开浏览器复制下面的地址,点击下载
Index of /dist/flink/flink-1.9.1
然后我们打开终端输入以下命令:
cd ~/下载
sudo tar -zxvf flink-1.9.1-bin-scala_2.11.tgz -C /usr/local
继续在终端输入以下命令:(注意下面的hadoop是你自己的用户名)
cd /usr/local
sudo mv ./flink-1.9.1 ./flink
sudo chown -R hadoop:hadoop ./flink
继续在终端输入以下命令:
vim ~/.bashrc
将之前的内容改为:
export FLINK_HOME=/usr/local/flink
export HIVE_HOME=/usr/local/hive
export PATH=$PATH:/usr/local/hadoop/sbin:/usr/local/hadoop/bin:/usr/local/hbase/bin:$HIVE_HOME/bin:$FLINK_HOME/bin
继续在终端输入以下命令:
source ~/.bashrc
使用如下命令启动Flink:
cd /usr/local/flink
./bin/start-cluster.sh
使用jps命令查看进程:
出现以下内容即为成功。
Flink安装包中自带了测试样例,这里可以运行WordCount样例程序来测试Flink的运行效果,具体命令如下:
cd /usr/local/flink/bin
./flink run /usr/local/flink/examples/batch/WordCount.jar
出现以上内容即为成功。
第二步:编程实现WordCount程序
1. 安装Maven
我们之前已经安装过了,这里就不说了,上一章已经搞过了。这里就不需要重新搞了
2. 编写代码
继续在终端输入:
cd ~ #进入用户主文件夹
mkdir -p ./flinkapp/src/main/java
继续在终端输入:
cd /usr/local/flinkapp/src/main/java
vim WordCountData.java
在文本内添加以下内容:
package cn.edu.xmu;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;public class WordCountData {
public static final String[] WORDS = new String[]{
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,",
"And by opposing end them?--To die,--to sleep,--",
"No more; and by a sleep to say we end",
"The heartache, and the thousand natural shocks",
"That flesh is heir to,--'tis a consummation",
"Devoutly to be wish'd. To die,--to sleep;--",
"To sleep! perchance to dream:--ay, there's the rub;",
"For in that sleep of death what dreams may come,",
"When we have shuffled off this mortal coil,",
"Must give us pause: there's the respect",
"That makes calamity of so long life;",
"For who would bear the whips and scorns of time,",
"The oppressor's wrong, the proud man's contumely,",
"The pangs of despis'd love, the law's delay,",
"The insolence of office, and the spurns",
"That patient merit of the unworthy takes,",
"When he himself might his quietus make",
"With a bare bodkin? who would these fardels bear,",
"To grunt and sweat under a weary life,",
"But that the dread of something after death,--",
"The undiscover'd country, from whose bourn",
"No traveller returns,--puzzles the will,",
"And makes us rather bear those ills we have",
"Than fly to others that we know not of?",
"Thus conscience does make cowards of us all;",
"And thus the native hue of resolution",
"Is sicklied o'er with the pale cast of thought;",
"And enterprises of great pith and moment,",
"With this regard, their currents turn awry,",
"And lose the name of action.--Soft you now!",
"The fair Ophelia!--Nymph, in thy orisons",
"Be all my sins remember'd."
};public WordCountData() {
}public static DataSet<String> getDefaultTextLineDataset(ExecutionEnvironment env) {
return env.fromElements(WORDS);
}
}
保存退出,继续
vim WordCountTokenizer.java
在文本内添加以下内容:
package cn.edu.xmu;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class WordCountTokenizer implements FlatMapFunction<String, Tuple2<String,Integer>> {
public WordCountTokenizer() {}
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] tokens = value.toLowerCase().split("\\W+");
int len = tokens.length;for (int i = 0; i < len; i++) {
String tmp = tokens[i];
if (tmp.length() > 0) {
out.collect(new Tuple2<String, Integer>(tmp, Integer.valueOf(1)));
}
}
}
}
保存退出,继续
vim WordCount.java
在文本内添加以下内容:
package cn.edu.xmu;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.utils.ParameterTool;public class WordCount {
public WordCount() {}
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
Object text;// 如果没有指定输入路径,则默认使用WordCountData中提供的数据
if (params.has("input")) {
text = env.readTextFile(params.get("input"));
} else {
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
text = WordCountData.getDefaultTextLineDataset(env);
}AggregateOperator counts = ((DataSet<String>) text)
.flatMap(new WordCountTokenizer())
.groupBy(0)
.sum(1);// 如果没有指定输出,则默认打印到控制台
if (params.has("output")) {
counts.writeAsCsv(params.get("output"), "\n", " ");
env.execute();
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
}
}
保存退出,继续
cd ~/flinkapp
vim pom.xml
先执行如下命令检查整个应用程序的文件结构:
find .
3.使用Maven打包Java程序
继续在终端输入:
/usr/local/maven/bin/mvn package
出现上面的内容即为成功
4.通过flink run命令运行程序
最后,可以将生成的JAR包通过flink run命令提交到Flink中运行(请确认已经启动Flink),命令如下:
/usr/local/flink/bin/flink run --class cn.edu.xmu.WordCount ~/flinkapp/target/simple-project-1.0.jar
第四步:使用IntelliJ IDEA开发调试WordCount程序
1.安装idea
去博主主页请到我的ubantu虚拟机专栏里进行寻找如何安装idea文章。跟随操作。步步截图
2.在开始本实验之前,首先要启动Flink。
没启动的可以在上面找到去启动的命令。
3.启动进入IDEA,新建一个项目。
点击创建
打开pom.xml文件
将下面的内容粘贴覆盖了原有内容
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>dblab</groupId>
<artifactId>FlinkWordCount</artifactId>
<version>1.0-SNAPSHOT</version><properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties><dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.9.1</version>
</dependency>
</dependencies></project>
没有蓝色刷新的图标点击,可以如下图操作,有的话直接点击就行,按右上角的➖就可以关闭这个侧边栏
继续
输入包名cn.edu.xmu
继续
输入类名WordCountData
然后按回车或者点击类都行。
将下面的内容粘贴进去
package cn.edu.xmu;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;public class WordCountData {
public static final String[] WORDS = new String[]{
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,",
"And by opposing end them?--To die,--to sleep,--",
"No more; and by a sleep to say we end",
"The heartache, and the thousand natural shocks",
"That flesh is heir to,--'tis a consummation",
"Devoutly to be wish'd. To die,--to sleep;--",
"To sleep! perchance to dream:--ay, there's the rub;",
"For in that sleep of death what dreams may come,",
"When we have shuffled off this mortal coil,",
"Must give us pause: there's the respect",
"That makes calamity of so long life;",
"For who would bear the whips and scorns of time,",
"The oppressor's wrong, the proud man's contumely,",
"The pangs of despised love, the law's delay,",
"The insolence of office, and the spurns",
"That patient merit of the unworthy takes,",
"When he himself might his quietus make",
"With a bare bodkin? who would these fardels bear,",
"To grunt and sweat under a weary life,",
"But that the dread of something after death,--",
"The undiscovered country, from whose bourn",
"No traveller returns,--puzzles the will,",
"And makes us rather bear those ills we have",
"Than fly to others that we know not of?",
"Thus conscience does make cowards of us all;",
"And thus the native hue of resolution",
"Is sicklied o'er with the pale cast of thought;",
"And enterprises of great pith and moment,",
"With this regard, their currents turn awry,",
"And lose the name of action.--Soft you now!",
"The fair Ophelia!--Nymph, in thy orisons",
"Be all my sins remember'd."
};public WordCountData() {
}public static DataSet<String> getDefaultTextLineDataset(ExecutionEnvironment env) {
return env.fromElements(WORDS);
}
}
按照刚才同样的操作,创建第2个文件WordCountTokenizer.java。
他的内容如下:
package cn.edu.xmu;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class WordCountTokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
public WordCountTokenizer() {}
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] tokens = value.toLowerCase().split("\\W+");
int len = tokens.length;for (int i = 0; i < len; i++) {
String tmp = tokens[i];
if (tmp.length() > 0) {
out.collect(new Tuple2<String, Integer>(tmp, Integer.valueOf(1)));
}
}
}
}
按照刚才同样的操作,创建第3个文件WordCount.java。
package cn.edu.xmu;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.utils.ParameterTool;public class WordCount {
public WordCount() {}public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
Object text;
// 如果没有指定输入路径,则默认使用WordCountData中提供的数据
if (params.has("input")) {
text = env.readTextFile(params.get("input"));
} else {
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
text = WordCountData.getDefaultTextLineDataset(env);
}AggregateOperator counts = ((DataSet) text)
.flatMap(new WordCountTokenizer())
.groupBy(new int[]{0})
.sum(1);// 如果没有指定输出,则默认打印到控制台
if (params.has("output")) {
counts.writeAsCsv(params.get("output"), "\n", " ");
env.execute();
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
}
}
最后的效果,这三个文件在同一位置下。
继续
继续
继续
方法一:
大家也可以用下面的方法 , 方法二:
执行成功以后,可以看到词频统计结果。
4.下面要把代码进行编译打包,打包成jar包。
没有文件的可以点四条杠。
继续
继续
然后就可以看到
最后来到终端输入以下命令,运行jar包
注意下面的hadoop是你自己的用户名。
./bin/flink run /home/hadoop-202202810203/IdeaProjects/FlinkWordCount/out/artifacts/FlinkWordCount.jar