当前位置: 首页 > news >正文

ubantu18.04(Hadoop3.1.3)之Flink安装与编程实践(Flink1.9.1)

说明:本文图片较多,耐心等待加载。(建议用电脑)

注意所有打开的文件都要记得保存。

 第一步:准备工作

本文是在之前Hadoop搭建完集群环境后继续进行的,因此需要读者完成我之前教程的所有操作。

注意本次实验需要你的虚拟机内存大于4G,硬盘内存大于20G

如果没有满足条件,请到我的ubantu虚拟机专栏里进行扩容操作 

以下所有操作均在Master主机进行。

 第二步:安装Flink

大家直接在虚拟机打开浏览器复制下面的地址,点击下载

Index of /dist/flink/flink-1.9.1

 

 然后我们打开终端输入以下命令:

cd ~/下载
sudo tar -zxvf flink-1.9.1-bin-scala_2.11.tgz -C /usr/local

  继续在终端输入以下命令:(注意下面的hadoop是你自己的用户名)

cd /usr/local
sudo mv ./flink-1.9.1 ./flink
sudo chown -R hadoop:hadoop ./flink

 继续在终端输入以下命令:

vim ~/.bashrc

 将之前的内容改为:

export FLINK_HOME=/usr/local/flink
export HIVE_HOME=/usr/local/hive
export PATH=$PATH:/usr/local/hadoop/sbin:/usr/local/hadoop/bin:/usr/local/hbase/bin:$HIVE_HOME/bin:$FLINK_HOME/bin
 

  继续在终端输入以下命令:

source ~/.bashrc

 使用如下命令启动Flink:

cd /usr/local/flink
./bin/start-cluster.sh

 使用jps命令查看进程:

 出现以下内容即为成功。

 Flink安装包中自带了测试样例,这里可以运行WordCount样例程序来测试Flink的运行效果,具体命令如下:

cd /usr/local/flink/bin
./flink run /usr/local/flink/examples/batch/WordCount.jar

  出现以上内容即为成功。

第二步:编程实现WordCount程序

1. 安装Maven

我们之前已经安装过了,这里就不说了,上一章已经搞过了。这里就不需要重新搞了

2. 编写代码

继续在终端输入:

cd ~ #进入用户主文件夹
mkdir -p ./flinkapp/src/main/java

 继续在终端输入:

cd /usr/local/flinkapp/src/main/java
vim WordCountData.java

 在文本内添加以下内容:

package cn.edu.xmu;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;

public class WordCountData {
    public static final String[] WORDS = new String[]{
        "To be, or not to be,--that is the question:--",
        "Whether 'tis nobler in the mind to suffer",
        "The slings and arrows of outrageous fortune",
        "Or to take arms against a sea of troubles,",
        "And by opposing end them?--To die,--to sleep,--",
        "No more; and by a sleep to say we end",
        "The heartache, and the thousand natural shocks",
        "That flesh is heir to,--'tis a consummation",
        "Devoutly to be wish'd. To die,--to sleep;--",
        "To sleep! perchance to dream:--ay, there's the rub;",
        "For in that sleep of death what dreams may come,",
        "When we have shuffled off this mortal coil,",
        "Must give us pause: there's the respect",
        "That makes calamity of so long life;",
        "For who would bear the whips and scorns of time,",
        "The oppressor's wrong, the proud man's contumely,",
        "The pangs of despis'd love, the law's delay,",
        "The insolence of office, and the spurns",
        "That patient merit of the unworthy takes,",
        "When he himself might his quietus make",
        "With a bare bodkin? who would these fardels bear,",
        "To grunt and sweat under a weary life,",
        "But that the dread of something after death,--",
        "The undiscover'd country, from whose bourn",
        "No traveller returns,--puzzles the will,",
        "And makes us rather bear those ills we have",
        "Than fly to others that we know not of?",
        "Thus conscience does make cowards of us all;",
        "And thus the native hue of resolution",
        "Is sicklied o'er with the pale cast of thought;",
        "And enterprises of great pith and moment,",
        "With this regard, their currents turn awry,",
        "And lose the name of action.--Soft you now!",
        "The fair Ophelia!--Nymph, in thy orisons",
        "Be all my sins remember'd."
    };

    public WordCountData() {
    }

    public static DataSet<String> getDefaultTextLineDataset(ExecutionEnvironment env) {
        return env.fromElements(WORDS);
    }
}

 保存退出,继续

vim WordCountTokenizer.java

 在文本内添加以下内容: 

package cn.edu.xmu;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class WordCountTokenizer implements FlatMapFunction<String, Tuple2<String,Integer>> {

    public WordCountTokenizer() {}

    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
        String[] tokens = value.toLowerCase().split("\\W+");
        int len = tokens.length;

        for (int i = 0; i < len; i++) {
            String tmp = tokens[i];
            if (tmp.length() > 0) {
                out.collect(new Tuple2<String, Integer>(tmp, Integer.valueOf(1)));
            }
        }
    }
}

 保存退出,继续

vim WordCount.java

  在文本内添加以下内容: 

package cn.edu.xmu;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.utils.ParameterTool;

public class WordCount {

    public WordCount() {}

    public static void main(String[] args) throws Exception {
        ParameterTool params = ParameterTool.fromArgs(args);
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        Object text;

        // 如果没有指定输入路径,则默认使用WordCountData中提供的数据
        if (params.has("input")) {
            text = env.readTextFile(params.get("input"));
        } else {
            System.out.println("Executing WordCount example with default input data set.");
            System.out.println("Use --input to specify file input.");
            text = WordCountData.getDefaultTextLineDataset(env);
        }

        AggregateOperator counts = ((DataSet<String>) text)
                .flatMap(new WordCountTokenizer())
                .groupBy(0)
                .sum(1);

        // 如果没有指定输出,则默认打印到控制台
        if (params.has("output")) {
            counts.writeAsCsv(params.get("output"), "\n", " ");
            env.execute();
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            counts.print();
        }
    }
}

保存退出,继续

cd ~/flinkapp
vim pom.xml

先执行如下命令检查整个应用程序的文件结构: 

find .

 3.使用Maven打包Java程序

继续在终端输入:

/usr/local/maven/bin/mvn package

 出现上面的内容即为成功

4.通过flink run命令运行程序

最后,可以将生成的JAR包通过flink run命令提交到Flink中运行(请确认已经启动Flink),命令如下:

/usr/local/flink/bin/flink run --class cn.edu.xmu.WordCount ~/flinkapp/target/simple-project-1.0.jar

第四步:使用IntelliJ IDEA开发调试WordCount程序 

1.安装idea

去博主主页请到我的ubantu虚拟机专栏里进行寻找如何安装idea文章。跟随操作。步步截图

2.在开始本实验之前,首先要启动Flink。

没启动的可以在上面找到去启动的命令。

3.启动进入IDEA,新建一个项目。

点击创建

打开pom.xml文件

 

将下面的内容粘贴覆盖了原有内容

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>dblab</groupId>
    <artifactId>FlinkWordCount</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.9.1</version>
        </dependency>
    </dependencies>

</project>

 没有蓝色刷新的图标点击,可以如下图操作,有的话直接点击就行,按右上角的➖就可以关闭这个侧边栏

继续

 输入包名cn.edu.xmu

继续

 输入类名WordCountData

然后按回车或者点击类都行。 

将下面的内容粘贴进去

package cn.edu.xmu;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;

public class WordCountData {
    public static final String[] WORDS = new String[]{
        "To be, or not to be,--that is the question:--",
        "Whether 'tis nobler in the mind to suffer",
        "The slings and arrows of outrageous fortune",
        "Or to take arms against a sea of troubles,",
        "And by opposing end them?--To die,--to sleep,--",
        "No more; and by a sleep to say we end",
        "The heartache, and the thousand natural shocks",
        "That flesh is heir to,--'tis a consummation",
        "Devoutly to be wish'd. To die,--to sleep;--",
        "To sleep! perchance to dream:--ay, there's the rub;",
        "For in that sleep of death what dreams may come,",
        "When we have shuffled off this mortal coil,",
        "Must give us pause: there's the respect",
        "That makes calamity of so long life;",
        "For who would bear the whips and scorns of time,",
        "The oppressor's wrong, the proud man's contumely,",
        "The pangs of despised love, the law's delay,",
        "The insolence of office, and the spurns",
        "That patient merit of the unworthy takes,",
        "When he himself might his quietus make",
        "With a bare bodkin? who would these fardels bear,",
        "To grunt and sweat under a weary life,",
        "But that the dread of something after death,--",
        "The undiscovered country, from whose bourn",
        "No traveller returns,--puzzles the will,",
        "And makes us rather bear those ills we have",
        "Than fly to others that we know not of?",
        "Thus conscience does make cowards of us all;",
        "And thus the native hue of resolution",
        "Is sicklied o'er with the pale cast of thought;",
        "And enterprises of great pith and moment,",
        "With this regard, their currents turn awry,",
        "And lose the name of action.--Soft you now!",
        "The fair Ophelia!--Nymph, in thy orisons",
        "Be all my sins remember'd."
    };

    public WordCountData() {
    }

    public static DataSet<String> getDefaultTextLineDataset(ExecutionEnvironment env) {
        return env.fromElements(WORDS);
    }
}

按照刚才同样的操作,创建第2个文件WordCountTokenizer.java。 

他的内容如下:

package cn.edu.xmu;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class WordCountTokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

    public WordCountTokenizer() {}

    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
        String[] tokens = value.toLowerCase().split("\\W+");
        int len = tokens.length;

        for (int i = 0; i < len; i++) {
            String tmp = tokens[i];
            if (tmp.length() > 0) {
                out.collect(new Tuple2<String, Integer>(tmp, Integer.valueOf(1)));
            }
        }
    }
}

 按照刚才同样的操作,创建第3个文件WordCount.java。

package cn.edu.xmu;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.utils.ParameterTool;

public class WordCount {
    public WordCount() {}

    public static void main(String[] args) throws Exception {
        ParameterTool params = ParameterTool.fromArgs(args);
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        Object text;
        // 如果没有指定输入路径,则默认使用WordCountData中提供的数据
        if (params.has("input")) {
            text = env.readTextFile(params.get("input"));
        } else {
            System.out.println("Executing WordCount example with default input data set.");
            System.out.println("Use --input to specify file input.");
            text = WordCountData.getDefaultTextLineDataset(env);
        }

        AggregateOperator counts = ((DataSet) text)
            .flatMap(new WordCountTokenizer())
            .groupBy(new int[]{0})
            .sum(1);

        // 如果没有指定输出,则默认打印到控制台
        if (params.has("output")) {
            counts.writeAsCsv(params.get("output"), "\n", " ");
            env.execute();
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            counts.print();
        }
    }
}

 最后的效果,这三个文件在同一位置下。

继续

 继续

继续

方法一:

大家也可以用下面的方法 ,  方法二:

 

 执行成功以后,可以看到词频统计结果。

4.下面要把代码进行编译打包,打包成jar包。 

没有文件的可以点四条杠。

 

继续 

 继续

 然后就可以看到

最后来到终端输入以下命令,运行jar包

 注意下面的hadoop是你自己的用户名。

./bin/flink run /home/hadoop-202202810203/IdeaProjects/FlinkWordCount/out/artifacts/FlinkWordCount.jar

结语:但此为止,资料里的林子雨老师的实验,已全部在我的博客更完,并且步步都有截图,完结散花!! 

http://www.xdnf.cn/news/191773.html

相关文章:

  • PostgreSQL与MySQL哪个适合做时空数据分析?
  • 安达发|高效智能塑料切割数控系统 - 全自动化软件解决方案
  • 信创时代技术栈选择与前景分析:国产替代背景下的战略路径与实践指南
  • 穷鬼计划:react+tailwindcss+vercel
  • Git-基本操作
  • 【MCP Node.js SDK 全栈进阶指南】高级篇(1):MCP多服务器协作架构
  • 15、项目搭建:绘制城堡蓝图——React 19 工程配置
  • Linux网络编程:TCP多进程/多线程并发服务器详解
  • OceanBase数据库-学习笔记4-租户
  • 100%提升信号完整性:阻抗匹配在高速SerDes中的实践与影响
  • 7、langChain和RAG实战:基于LangChain和RAG的常用案例实战
  • 已有 npm 项目,如何下载依赖、编译并运行项目
  • 【Kubernetes】部署 Kubernetes 仪表板(Dashboard)
  • C++ STL编程 vector的插入、删除、扩容机制、随机访问和内存交换
  • 安卓基础(HashMap和ArrayList)
  • 测试—概念篇
  • 回归问题常用模型以及优缺点和使用场景
  • Uniapp:vite.config.js全局配置
  • V Rising 夜族崛起 [DLC 解锁] [Steam] [Windows SteamOS]
  • DBeaver CE 24.1.3 (Windows 64位) 详细安装教程
  • 基于SpringBoot的食物营养分析与推荐网站系统
  • 如何在idea中写spark程序。
  • leetcode11-盛水最多的容器
  • AG32VF407VG的VREFP是否可以输入2.5V的参考电压
  • React:封装一个评论回复组件
  • 用远程代理模式轻松实现远程服务调用,打开编程新大门
  • KMP算法
  • 英语五大基本句型
  • gradle-tasks.register(‘classesJar‘, Jar)解析
  • OpenCV计算机视觉实战(2)——环境搭建与OpenCV简介