深度了解flink(十一) 心跳机制详解

前言

在Flink的各个服务组件中,ResourceManager、JobMaster、TaskExecutor三者之间存在相互检测的心跳机制:ResourceManager会主动发送心跳请求探测JobMaster、TaskExecutor是否存活;JobMaster也会主动发送心跳请求探测TaskExecutor是否存活,以便进行任务重启或者失败处理。

核心组件

HeartbeatMonitor

接口,定义心跳状态信息相关的接口,管理需要心跳通信相关组件的心跳状态

HeartbeatTarget

作用

1.发起心跳的请求

2.接收心跳并进行响应

3.心跳信息中的负载可以添加额外的信息

4.维护ResourceId和HeartBeatMonitor的Map集合映射

UML

HeartbeatManager

接口,能够停止/启动 心跳的监听,并且能够在心跳超时进行上报

HeartbeatManagerImpl

HeartbeatManager的实现类,提供了组件监听的方法,心跳目标对象注册后会将监控对象信息方法Map中

HeartbeatManagerSenderImpl

继承了HeartbeatManagerImpl,实现了Runnable接口,能够周期性的调度进行心跳的检测

初始化

HeartbeatManagerSenderImpl(long heartbeatPeriod,long heartbeatTimeout,int failedRpcRequestsUntilUnreachable,ResourceID ownResourceID,HeartbeatListener<I, O> heartbeatListener,ScheduledExecutor mainThreadExecutor,Logger log,HeartbeatMonitor.Factory<O> heartbeatMonitorFactory) {super(heartbeatTimeout,failedRpcRequestsUntilUnreachable,ownResourceID,heartbeatListener,mainThreadExecutor,log,heartbeatMonitorFactory);this.heartbeatPeriod = heartbeatPeriod;mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);}

在初始化的时候会被线程池提交一个立刻执行的任务,从而进入到run方法

@Overridepublic void run() {if (!stopped) {log.debug("Trigger heartbeat request.");for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {requestHeartbeat(heartbeatMonitor);}getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);}}

作用

1.进行状态判断

2.遍历心跳请求目标

3.发起心跳请求

4.以固定心跳周期的定时任务进行调度

HeartbeatServices

ClusterEntrypoint是集群启动的入口,启动的过程中initializeServices进行了服务的初始化,也包含心跳服务HeartbeatServices的初始化。

初始化HeartbeatServices调用方法栈如下:

ClusterEntrypoint#runCluster->ClusterEntrypoint#initializeServices->ClusterEntrypoint#createHaServices->HeartbeatServices#fromConfiguration

作用

负责创建HeartbeatManagerSenderImpl对象

@Overridepublic <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(ResourceID resourceId,HeartbeatListener<I, O> heartbeatListener,ScheduledExecutor mainThreadExecutor,Logger log) {return new HeartbeatManagerSenderImpl<>(heartbeatInterval,heartbeatTimeout,failedRpcRequestsUntilUnreachable,resourceId,heartbeatListener,mainThreadExecutor,log);}

组件心跳服务启动

之前分析Flink Rpc底层原理,Flink的组件都继承自EndPoint,组件初始化后会执行start方法,给组件本身发送Start 控制类消息,从而进入到onStart方法,组件的心跳服务一般都在onStart方法进行启动

ResourceManager

onStart方法

组件启动进入到onStart方法,该方法中对ResourceManager的服务进行初始化

@Overridepublic final void onStart() throws Exception {try {log.info("Starting the resource manager.");//初始化ResourceManager的服务startResourceManagerServices();startedFuture.complete(null);} catch (Throwable t) {final ResourceManagerException exception =new ResourceManagerException(String.format("Could not start the ResourceManager %s", getAddress()),t);onFatalError(exception);throw exception;}}

startResourceManagerServices 启动心跳服务

private void startResourceManagerServices() throws Exception {try {jobLeaderIdService.start(new JobLeaderIdActionsImpl());registerMetrics();startHeartbeatServices();slotManager.start(getFencingToken(),getMainThreadExecutor(),resourceAllocator,new ResourceEventListenerImpl(),blocklistHandler::isBlockedTaskManager);delegationTokenManager.start(this);initialize();} catch (Exception e) {handleStartResourceManagerServicesException(e);}}

ResourceManager负责计算资源的分配,JobMaster解析作业后会向ResouceManager申请资源,ReouceManger收到申请后会分配TaskManager Slot资源给用于运行任务,所以ReouceManager和这两个组件需要维持心跳,startHeartbeatServices方法内就进行了心跳管理对象的初始化

 private void startHeartbeatServices() {//taskmanager 心跳管理对象 底层使用map进行存储taskManagerHeartbeatManager =heartbeatServices.createHeartbeatManagerSender(resourceId,new TaskManagerHeartbeatListener(),getMainThreadExecutor(),log);//jobMaster 心跳管理对象 底层使用map进行存储jobManagerHeartbeatManager =heartbeatServices.createHeartbeatManagerSender(resourceId,new JobManagerHeartbeatListener(),getMainThreadExecutor(),log);}

heartbeatServices创建HeartbeatManagerSender内部就启动一个定时任务调度,遍历这两个对象的Map集合,因为ResourceManager初始化后还没组件进行心跳注册,此时为空跑。

JobMaster

onStart方法

@Override
protected void onStart() throws JobMasterException {try {startJobExecution();} catch (Exception e) {final JobMasterException jobMasterException =new JobMasterException("Could not start the JobMaster.", e);handleJobMasterError(jobMasterException);throw jobMasterException;}
}

心跳服务初始化,方法调用栈如下

startJobExecution->startJobMasterServices->this.taskManagerHeartbeatManager = createTaskManagerHeartbeatManager(heartbeatServices);this.resourceManagerHeartbeatManager =createResourceManagerHeartbeatManager(heartbeatServices);
 private void startJobMasterServices() throws Exception {try {this.taskManagerHeartbeatManager = createTaskManagerHeartbeatManager(heartbeatServices);this.resourceManagerHeartbeatManager =createResourceManagerHeartbeatManager(heartbeatServices);// start the slot pool make sure the slot pool now accepts messages for this leaderslotPoolService.start(getFencingToken(), getAddress(), getMainThreadExecutor());// job is ready to go, try to establish connection with resource manager//   - activate leader retrieval for the resource manager//   - on notification of the leader, the connection will be established and//     the slot pool will start requesting slots//选举ResourceManager的leaderresourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());} catch (Exception e) {handleStartJobMasterServicesError(e);}}

初始化了TaskManager和ResourceManager的心跳管理服务

resourceManagerLeaderRetriever.start 获取ResourceManager的Leader对象,获取成功后监听器进行回调

@Overridepublic void start(LeaderRetrievalListener listener) {checkNotNull(listener, "Listener must not be null.");synchronized (startStopLock) {checkState(!started, "StandaloneLeaderRetrievalService can only be started once.");started = true;// directly notify the listener, because we already know the leading JobManager's// addresslistener.notifyLeaderAddress(leaderAddress, leaderId);}}

监听器回调方法对状态进行校验,然后会进入到rpc连接的方法

private void openRpcConnectionTo(String leaderAddress, JobMasterId jobMasterId) {Preconditions.checkState(currentJobMasterId == null && rpcConnection == null,"Cannot open a new rpc connection if the previous connection has not been closed.");currentJobMasterId = jobMasterId;//封装rpc连接信息rpcConnection =new JobManagerRegisteredRpcConnection(LOG, leaderAddress, jobMasterId, rpcService.getScheduledExecutor());LOG.info("Try to register at job manager {} with leader id {}.",leaderAddress,jobMasterId.toUUID());//开始和ResourceManager建立Rpc连接rpcConnection.start();}

RegisteredRpcConnection#start方法开始对rpc连接进行注册

public void start() {//连接状态校验checkState(!closed, "The RPC connection is already closed");checkState(!isConnected() && pendingRegistration == null,"The RPC connection is already started");//创建新的注册对象,这一步生成注册信息final RetryingRegistration<F, G, S, R> newRegistration = createNewRegistration();if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {newRegistration.startRegistration();} else {// concurrent start operationnewRegistration.cancel();}}

createNewRegistration方法如下,

 private RetryingRegistration<F, G, S, R> createNewRegistration() {//初始化注册信息RetryingRegistration<F, G, S, R> newRegistration = checkNotNull(generateRegistration());CompletableFuture<RetryingRegistration.RetryingRegistrationResult<G, S, R>> future =newRegistration.getFuture();//回调方法future.whenCompleteAsync(//代码省略);return newRegistration;}
}

generateRegistration会创建RetryingRegistration,并定义了注册的方法

protected RetryingRegistration<ResourceManagerId,ResourceManagerGateway,JobMasterRegistrationSuccess,RegistrationResponse.Rejection>generateRegistration() {return new RetryingRegistration<ResourceManagerId,ResourceManagerGateway,JobMasterRegistrationSuccess,RegistrationResponse.Rejection>(log,getRpcService(),"ResourceManager",ResourceManagerGateway.class,getTargetAddress(),getTargetLeaderId(),jobMasterConfiguration.getRetryingRegistrationConfiguration()) {//定义了异步方法@Overrideprotected CompletableFuture<RegistrationResponse> invokeRegistration(ResourceManagerGateway gateway,ResourceManagerId fencingToken,long timeoutMillis) {Time timeout = Time.milliseconds(timeoutMillis);return gateway.registerJobMaster(jobMasterId,jobManagerResourceID,jobManagerRpcAddress,jobID,timeout);}};}

ResourceManagerGateway是ResourceManager的动态代理对象,执行gateway.registerJobMaster这个方法会远程调用ResourceManager的registerJobMaster方法,ResourceManager#registerJobMaster如下,

public CompletableFuture<RegistrationResponse> registerJobMaster(final JobMasterId jobMasterId,final ResourceID jobManagerResourceId,final String jobManagerAddress,final JobID jobId,final Time timeout) {CompletableFuture<JobMasterGateway> jobMasterGatewayFuture =getRpcService().connect(jobManagerAddress, jobMasterId, JobMasterGateway.class);CompletableFuture<RegistrationResponse> registrationResponseFuture =jobMasterGatewayFuture.thenCombineAsync(jobMasterIdFuture,(JobMasterGateway jobMasterGateway, JobMasterId leadingJobMasterId) -> {if (Objects.equals(leadingJobMasterId, jobMasterId)) {return registerJobMasterInternal(jobMasterGateway,jobId,jobManagerAddress,jobManagerResourceId);} else {final String declineMessage =String.format("The leading JobMaster id %s did not match the received JobMaster id %s. "+ "This indicates that a JobMaster leader change has happened.",leadingJobMasterId, jobMasterId);log.debug(declineMessage);return new RegistrationResponse.Failure(new FlinkException(declineMessage));}},getMainThreadExecutor());}

会走到registerJobMasterInternal这个方法

private RegistrationResponse registerJobMasterInternal(final JobMasterGateway jobMasterGateway,JobID jobId,String jobManagerAddress,ResourceID jobManagerResourceId) {//代码省略// jobmastet 作为监听对象存入map中jobManagerHeartbeatManager.monitorTarget(jobManagerResourceId, new JobMasterHeartbeatSender(jobMasterGateway));return new JobMasterRegistrationSuccess(getFencingToken(), resourceId);}

jobManagerHeartbeatManager.monitorTarget 最终ResouceManager会把JobMaster作为监控对象存在Map中,后续调度遍历map发送心跳请求

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/14191.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

华为策略路由配置

一、本地策略路由 要求&#xff1a; 长度为64~1400字节的报文走g0/0/0链路 长度为1401~1500字节的报文走g0/0/1链路 1.启动设备 2.配置IP地址 [AR1]int g0/0/0 [AR1-GigabitEthernet0/0/0]ip add 150.1.1.1 24 [AR1-GigabitEthernet0/0/0]int g0/0/1 [AR1-GigabitEther…

Html Area 图像映射可点击区域 实现响应式图像映射

Html Area 图像映射可点击区域 实现响应式图像映射 主要实现了图片的分区域点击&#xff0c;可以自定义点击的区域&#xff0c;根据点击的位置不同&#xff0c;执行不同的方法或者跳转不同的网页 介绍 引用w3school的Demohttps://www.w3school.com.cn/tags/tag_area.asp#googl…

Python爬虫知识体系-----requests-----持续更新

数据科学、数据分析、人工智能必备知识汇总-----Python爬虫-----持续更新&#xff1a;https://blog.csdn.net/grd_java/article/details/140574349 文章目录 一、安装和基本使用二、get请求三、post请求四、代理 一、安装和基本使用 和解析库urllib几乎一摸一样&#xff0c;但是…

操作系统OS--进程

目录 操作系统是什么 进程 进程的状态 1.并行和并发 2.时间片 进程优先级 进程切换 task_struct内容分类&#xff1a; 操作系统是什么 操作系统本质上是一款纯正的“搞管理”的软件 你的程序不能直接写入硬件&#xff0c;都必须通过操作系统 对软硬件之间进行交互&…

C语言 strlen 函数 - C语言零基础入门教程

目录 一.strlen 函数简介二.strlen 函数实战三.猜你喜欢 零基础 C/C 学习路线推荐 : C/C 学习目录 >> C 语言基础入门 一.strlen 函数简介 在C 语言中&#xff0c;char 字符串也是一种非常重要的数据类型&#xff0c;我们可以使用 strlen 函数获取字符串长度&#xff1b;…

地面沉降数值模拟的最新进展与研究动态

地面沉降&#xff0c;由自然或人为因素引起的地表垂直位移现象&#xff0c;对城市规划、交通基础设施、建筑工程和环境地质学等多个领域产生深远影响。它不仅威胁着城市建筑安全和交通运行&#xff0c;还对环境和经济发展构成挑战。掌握地面沉降的理论知识和实践技能至关重要。…

如何选择适合的谷歌SEO服务避免踩坑?

在选择SEO服务时&#xff0c;很多企业担心花了钱却看不到效果。市面上确实有一些不靠谱的服务商&#xff0c;他们承诺短时间内实现排名飙升&#xff0c;但最终结果往往不尽如人意。那么&#xff0c;如何判断SEO服务的真假呢 首先&#xff0c;靠谱的SEO公司一定能提供真实的案例…

【OpenGL】OpenGL简介

文章目录 OpenGL概述OpenGL的本质OpenGL相关库核心库窗口管理glutfreeglutglfw 函数加载glewGLAD OpenGL概述 OpenGL(Open Graphics Library) 严格来说&#xff0c;本身并不是一个API&#xff0c;它是一个由Khronos组织制定并维护的规范(Specification)。OpenGL规范严格规定了…

算法闭关修炼百题计划(六)

塔塔开(滑稽 1.删除排序链表中的重复元素2.删除排序链表中的重复元素II3.字典序的第k小数字4.下一个排列5.排序链表6.随机链表的复制7.数据流的中位数 1.删除排序链表中的重复元素 使每个元素就出现一次 class Solution { public:ListNode* deleteDuplicates(ListNode* head)…

实习冲刺第二十天

543.二叉树的直径 给你一棵二叉树的根节点&#xff0c;返回该树的 直径 。 二叉树的 直径 是指树中任意两个节点之间最长路径的 长度 。这条路径可能经过也可能不经过根节点 root 。 两节点之间路径的 长度 由它们之间边数表示。 示例 1&#xff1a; 输入&#xff1a;root …

搜索引擎算法解析提升搜索效率的关键要素

内容概要 搜索引擎算法是指一系列计算机程序和规则&#xff0c;用于决定如何抓取、索引和提供网页信息。了解这些算法的核心概念对于提高我们的搜索效率至关重要。本文将详细分析搜索引擎的工作原理和主要算法类型&#xff0c;以及它们如何影响搜索结果的准确性和用户体验。 …

Brave127编译指南 Windows篇:配置Git(四)

1. 概述 在Brave浏览器的开发过程中&#xff0c;Git作为核心版本控制工具扮演着不可或缺的角色。作为当今最广泛使用的分布式版本控制系统&#xff0c;Git为开发者提供了强大的源码管理能力。通过Git&#xff0c;您可以轻松追踪代码变更、管理不同版本&#xff0c;并与其他开发…

使用React和Vite构建一个AirBnb Experiences克隆网站

这一篇文章中&#xff0c;我会教你如何做一个AirBnb Experiences的克隆网站。主要涵盖React中Props的使用。 克隆网站最终呈现的效果&#xff1a; 1. 使用vite构建基础框架 npm create vitelatestcd airbnb-project npm install npm run dev2. 构建网站的3个部分 网站从上…

LC68----222. 完全二叉树的节点个数(java版)---树

1. 题目描述 完全二叉树的节点个数 给你一棵 完全二叉树 的根节点 root &#xff0c;求出该树的节点个数。 完全二叉树 的定义如下&#xff1a;在完全二叉树中&#xff0c;除了最底层节点可能没填满外&#xff0c;其余每层节点数都达到最大值&#xff0c;并且最下面一层的节点…

206面试题(71~80)

208道Java面试题 文章目录 **208道Java面试题** **71. 如何避免 SQL 注入&#xff1f;****72. 什么是 XSS 攻击&#xff0c;如何避免&#xff1f;****73. 什么是 CSRF 攻击&#xff0c;如何避免&#xff1f;****74. throw 和 throws 的区别&#xff1f;****75. final、finally、…

快速安装mysql5.7.44

参考文档&#xff1a; Windows系统上安装MySQL 5.7步骤&#xff08;实测可行&#xff09;_mysql5.7 windows-CSDN博客 MySQL 5.7压缩包安装图文教程(超详细)_Mysql_脚本之家 关键点&#xff1a; 参数文件内容参考&#xff1a; ALTER USER rootlocalhost IDENTIFIED WITH mys…

大数据新视界 -- 大数据大厂之 Impala 性能提升:高级执行计划优化实战案例(下)(18/30)

&#x1f496;&#x1f496;&#x1f496;亲爱的朋友们&#xff0c;热烈欢迎你们来到 青云交的博客&#xff01;能与你们在此邂逅&#xff0c;我满心欢喜&#xff0c;深感无比荣幸。在这个瞬息万变的时代&#xff0c;我们每个人都在苦苦追寻一处能让心灵安然栖息的港湾。而 我的…

数字经济新时代,高校数字经济专业人才培养如何与产业对接?

一、数字经济发展及人才需求 &#xff08;一&#xff09;数字经济蓬勃发展 数字经济已成为驱动中国经济实现发展的新引擎&#xff0c;据中国信通院数据&#xff0c;2023年&#xff0c;我国数字经济规模达到53.9万亿元&#xff0c;数字经济占GDP比重达到42.8&#xff05;&#x…

【SpringBoot】21 @Async异步任务线程池的隔离

Git仓库 https://gitee.com/Lin_DH/system 介绍 线程池隔离&#xff1a;指一种通过为每个服务提供独立的线程池来隔离服务之间的资源和执行环境的做法。 为什么需要线程池隔离&#xff1f; 资源隔离&#xff0c;每个服务都有独立的线程池&#xff0c;可以避免由于某个服务的…

Python进行GRPC和Dubbo协议的高级测试

在微服务架构日益流行的今天&#xff0c;分布式系统的复杂性不断增加。GRPC 和 Dubbo 协议作为当今互联网行业中常见的高性能通信协议&#xff0c;已经成为服务之间交互的核心。然而&#xff0c;随着服务调用层次的不断增加&#xff0c;如何有效地测试这两种协议&#xff0c;确…