Flink 本地启动的多种方式
Application模式通过代码提交到Yarn上启动
//设置Yarn客户端
YarnClient yarnClient = ;
Configuration configuration = new Configuration();
if (customConfiguration != null) {configuration.addAll(customConfiguration);
}
configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024m"));
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024m"));
configuration.set(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName());
// 设置flink-dist-???.jar
String distPath = ;
configuration.set(YarnConfigOptions.FLINK_DIST_JAR, distPath);
// 设置需要执行的jar包
String examplePath = ;
configuration.set(PipelineOptions.JARS, Collections.singletonList(examplePath));
FileSystem fileSystem = FileSystem.get(hadoopClusterTest.getConfig());
//设置flink lib
String dirPath = ;
// 上传flink libjar包到hdfs中
fileSystem.copyFromLocalFile(new Path(dirPath), new Path(dirPath));
configuration.set(YarnConfigOptions.PROVIDED_LIB_DIRS, Collections.singletonList(dirPath));
setIfAbsent(configuration, PipelineOptions.JARS, new ArrayList<>());
YarnConfiguration yarnConfiguration = new YarnConfiguration();
YarnClientYarnClusterInformationRetriever yarnClientYarnClusterInformationRetriever =YarnClientYarnClusterInformationRetriever.create(yarnClient);
YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(configuration,yarnConfiguration,yarnClient,yarnClientYarnClusterInformationRetriever,true
);
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().setSlotsPerTaskManager(1).createClusterSpecification();ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(new String[]{},// 需要执行的类全名
);
try {// 启动ApplicationClusteryarnClusterDescriptor.deployApplicationCluster(clusterSpecification, applicationConfiguration);
} catch (ClusterDeploymentException e) {e.printStackTrace();
}
Session模式通过代码提交到Yarn上启动
public class YarnFlinkSessionTest {ClusterClient<ApplicationId> clusterClient;@Testvoid test() throws ExecutionException, InterruptedException {YarnClient yarnClient = //创建Yarn客户端Configuration configuration = new Configuration();configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY,MemorySize.parse("1024m"));configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY,MemorySize.parse("1024m"));configuration.set(YarnConfigOptions.FLINK_DIST_JAR, "${FLINK_HOME}/lib/flink-dist-1.16.2.jar");YarnConfiguration yarnConfiguration = new YarnConfiguration();YarnClientYarnClusterInformationRetriever yarnClientYarnClusterInformationRetriever =YarnClientYarnClusterInformationRetriever.create(yarnClient);YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(configuration,yarnConfiguration,yarnClient,yarnClientYarnClusterInformationRetriever,true);ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().setMasterMemoryMB(1024).setTaskManagerMemoryMB(1024).setSlotsPerTaskManager(1).createClusterSpecification();try {ClusterClientProvider<ApplicationId> applicationIdClusterClientProvider = yarnClusterDescriptor.deploySessionCluster(clusterSpecification);clusterClient = applicationIdClusterClientProvider.getClusterClient();} catch (ClusterDeploymentException e) {e.printStackTrace();}Thread.sleep(10000000);}
}
Flink MiniCluster 提交任务
MiniCluster在start方法中启动QueryService、RPCService、Zookeeper、BlobServer、TaskManager、DispatcherLeader、ResourceManager、DispatcherGateway、WebMonitor进行RPC通信。。
MiniCluster启动后再调用submitJob提交任务
RpcTaskManagerGateway、TaskExecutor
命令行Flink本地Standalone模式启动
运行任务:
./bin/flink run ./examples/streaming/TopSpeedWindowing.jar
-
该命令会调用
CliFrontend.main()
方法 -
CliFrontend.main()
方法再调用内部run()
方法,然后调用内部executeProgram()
方法 -
最后
CliFrontend.executeProgram()
调用ClientUtils.executeProgram()
方法. -
最后通过StandloneSessionClusterEntrypoint的main方法启动Flink
RestServerEndpoint在执行start()方法时注册Netty的ChannelHandler,可以通过WebMonitorEndpoint查看具体的Handler类型和实现。
JobManager::onStart -> JobMaster::startJobExecution
官方文档命令行启动
yarn: https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/resource-providers/yarn/
kubernetes: https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/resource-providers/native_kubernetes/
standalone: https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/resource-providers/standalone/overview/