FutureTask源码分析

        Thread类的run方法返回值类型是void,因此我们无法直接通过Thread类获取线程执行结果。如果要获取线程执行结果就需要使用FutureTask。用法如下:

class CallableImpl implements Callable{@Overridepublic Object call() throws Exception {//do somethings//return result;}
}
FutureTask futureTask = new FutureTask(new CallableImpl());
new Thread(futureTask).start();
try {//获取线程执行结果Object result = futureTask.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}

        在实例化FutureTask时构造函数传入了实现Callable接口的实例。而在实例化Thread类时,构造函数传入FutureTask实例。因此,我们可以猜测线程在执行run方法时必定会调用call方法,并且保存call方法返回的结果。

总览

    通过类图,可以看到FutureTask主要实现了Runnable和Future。实现Runnable的run方法作为线程的执行体。正因为实现了Runnable,我们才可以使用FutureTask来创建线程。Future接口定义了如下几个方法
public interface Future<V> {
/***取消线程执行的任务*/
boolean cancel(boolean mayInterruptIfRunning);
/***判断任务是否被取消*如果任务在正常完成前因调用cancel方法而被取消,返回true*/
boolean isCancelled();
/*** 判断任务是否完成,如果任务已经完成,返回true* 完成可能是由于正常终止、异常或取消——在这些情况下,此方法都将返回true。*/
boolean isDone();
/*** 获取任务执行的结果,调用该方法时,如果任务还没有执行完成,将会阻塞当前线程* 直到任务完成或者被中断*/
V get() throws InterruptedException, ExecutionException;
/*** 获取任务执行的结果,调用该方法时,如果任务还没有执行完成,将会阻塞当前线程* 直到任务完成或者被中断或者超时*/
V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}

      可以看出FutureTask具备获取线程任务执行结果、取消线程任务的能力。

成员变量

public class FutureTask<V> implements RunnableFuture<V> {//state标识任务的运行状态private volatile int state;//新建状态,这是任务的最初状态private static final int NEW          = 0;//正在完成,任务执行体已经完成,正在保存执行结果private static final int COMPLETING   = 1;//任务正常完成private static final int NORMAL       = 2;//任务执行过程中发生异常private static final int EXCEPTIONAL  = 3;//任务被取消private static final int CANCELLED    = 4;//正在中断运行任务的线程private static final int INTERRUPTING = 5;//任务被中断private static final int INTERRUPTED  = 6;//任务的执行体,任务完成后,将会设置成nullprivate Callable<V> callable;//任务执行体的返回结果private Object outcome; //运行callable的线程private volatile Thread runner;//等待任务执行结果的线程队列private volatile WaitNode waiters;static final class WaitNode {//当前节点代表的线程volatile Thread thread;//下一个节点volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }}}

         成员变量的含义只有在分析具体的方法代码和作者的注释时才能知晓。接下来具体分析FutureTask是如何实现保存任务执行结果和获取结果的。

源码分析

        在分析FutureTask源码前,需要对其中使用到jdk的方法做个简单的介绍。其中Unsafe类提供的cas操作的相关方法。

public final native boolean compareAndSwapObject(Object obj, 
long offset, Object expect, Object update);
  • obj :要修改字段的对象;
  • offset :要修改的字段在对象内的偏移量;
  • expect : 字段的期望值;
  • update :如果该字段的值等于字段的期望值,用于更新字段的新值;

   LockSupport的park和unpark提供了阻塞和解除阻塞线程的有效方法,park会使当前线程阻塞,unpark可以唤醒指定的线程。

public static void park() {UNSAFE.park(false, 0L);
}
public static void unpark(Thread thread) {if (thread != null)UNSAFE.unpark(thread);
}

构造函数

public FutureTask(Callable<V> callable) {if (callable == null)       throw new NullPointerException();    this.callable = callable;    this.state = NEW;       
}

接收callable实例并赋值给成员变量callable,将任务状态初始化为NEW。

run方法

public void run() {//先检查任务的状态,如果任务状态是NEW。利用cas操作设置runner为当前执行任务的线程//这里是为了确保在多线程的情况下任务执行和结果设置的安全性及一致性//比如下面的代码,会导致一个任务在多个线程中运行。//  FutureTask futureTask = new FutureTask(task);//  futureTask.run();//  Thread thread = new Thread(futureTask);//  Thread thread1 = new Thread(futureTask);//  thread.start();//  thread1.start();if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {//调用callable方法,执行真正的任务逻辑result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;//执行异常处理,将任务状态修改为EXCEPTIONALsetException(ex);}if (ran)//任务执行体运行成功,保存任务结果set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;//handlePossibleCancellationInterrupt需要结合cancel方法分析if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}
}protected void set(V v) {//在outcome还没有保存返回结果前,先将任务状态设置为COMPLETING(正在完成)if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//保存任务运行结果outcome = v;//将任务状态设置为正常完成UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state//结束任务:唤醒所有因调用get方法而阻塞的线程,并清空等待队列finishCompletion();}
}protected void setException(Throwable t) {//在outcome还没有保存返回结果前,先将任务状态设置为COMPLETING(正在完成)if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//将任务的结果设置为异常信息outcome = t;//将任务状态设置为EXCEPTIONAL(异常中断)UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); //结束任务:唤醒所有因调用get方法而阻塞的线程,并清空等待队列finishCompletion();}
}/***唤醒所有因调用get方法而阻塞的线程,并清空等待队列*/
private void finishCompletion() {// assert state > COMPLETING;for (WaitNode q; (q = waiters) != null;) {//先将成员变量waiters设置为nullif (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//从头开始遍历等待队列,唤醒其每个节点对应的线程for (;;) {Thread t = q.thread;if (t != null) {q.thread = null;LockSupport.unpark(t);}//获取下一个节点WaitNode next = q.next;if (next == null)break;//当前节点next指向null,当前节点从等待队列中断开,之后被GC回收q.next = null; // unlink to help gcq = next;}break;}}done();callable = null;        // to reduce footprint
}

从run方法中,FutureTask的生命周期和线程的生命周期有一定的关联:

        当FutureTask的state为NEW时,执行任务的线程可能处于New状态、Runable状态(线程在操作系统中被创建,处于等待CPU时间或运行中)、Blocked状态(线程在等待锁)。

        当程序调用call方法后,在将call的执行结果保存到FutureTask的成员变量outcome前,会将FutureTask设置为COMPLETING。此时FutureTask的COMPLETING 对应线程的Runable状态。

        如果程序调用call发生异常,FutureTask最终被设置为EXCEPTIONAL,正常执行则被设置为NORMAL,此时线程即将进入Terminated状态。

get方法

/***无限时长的等待获取执行结果*/
public V get() throws InterruptedException, ExecutionException {int s = state;//COMPLETING代表任务正在保存执行结果。<=这个状态,说明任务执行还没有保存执行结果//则会调用awaitDone方法等待执行结果。if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);
}
/***有限时长的等待获取执行结果*/
public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state;if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();return report(s);
}/***等待任务完成,在被中断或超时时终止*/
private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;//当前节点是否在等待队列中boolean queued = false;for (;;) {//检查当前获取结果的线程是否被中断,如果被中断,从等待队列中移除,并抛出中断异常if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;if (s > COMPLETING) {if (q != null)q.thread = null;return s;}else if (s == COMPLETING) // cannot time out yetThread.yield();else if (q == null)//1、创建一个新的节点q = new WaitNode();else if (!queued)//如果新的节点没有在排队,将这个节点加入到队列的头部queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else if (timed) {//判断是否超时nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q);return state;}//阻塞当前线程LockSupport.parkNanos(this, nanos);}else阻塞当前线程LockSupport.park(this);}
}
/**
*遍历等待队列移除节点
*/
private void removeWaiter(WaitNode node) {if (node != null) {//首先将节点thread设置为null,防止节点被意外地再次使用或唤醒//同时thread =null的节点是作为需要被移除节点的标记node.thread = null;retry:for (;;) {          // restart on removeWaiter race//声明pre q s 三个WaitNode变量for (WaitNode pred = null, q = waiters, s; q != null; q = s) {s = q.next;//如果节点的thread !=null说明当前节点不需要被移除,遍历下一个if (q.thread != null)pred = q;else if (pred != null) {//上一个节点pred != null,说明当前节点不是队列的第一个节点//则将pred.next指向当前节点的下一个节点s,即跳过了当前节点pred.next = s;if (pred.thread == null) // check for race//队列在遍历过程中发生了变化,从头开始遍历continue retry;}//如果当前节点是头节点,将头节点设置为当前节点的下一个节点else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,q, s))//队列在遍历过程中发生了变化,从头开始遍历continue retry;}//完成对等待队列的遍历,成功移除了节点(无论是通过更新队列头部还是通过跳过内部节点)//退出break;}}
}

cancel方法

    public boolean cancel(boolean mayInterruptIfRunning) {if (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;try {    // in case call to interrupt throws exceptionif (mayInterruptIfRunning) {try {Thread t = runner;if (t != null)t.interrupt();} finally { // final stateUNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {finishCompletion();}return true;}

cancel返回值是boolean类型,任务取消成功,返回true,任务取消失败返回false;参数mayInterruptIfRunning表示是否尝试取消正在执行中的任务。

1、如果任务状态不是NEW状态,直接返回false,通过对run方法的分析,可以知道当FutureTask状态不为NEW时,任务已经被取消或者已经执行了call方法,无法取消任务。

2、任务状态是NEW

        2.1 参数mayInterruptIfRunning为false,设置任务状态为CANCELLED,从run方法中可以得到,正在执行的任务不会被取消,还未开始的任务会被取消。

        2.2 参数mayInterruptIfRunning为true,尝试调用正在执行任务的线程的interrupt()方法(在一个线程内部存在着名为interrupt flag的标识,如果一个线程被interrupt,那么它的flag将被设置,但是如果当前线程正在执行可中断方法被阻塞时,如Object的wait方法,Thread的sleep、join方法等,调用interrupt方法将其中断,反而会导致flag被清除)

再回过头看看run方法最后的handlePossibleCancellationInterrupt

public void run() {......finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;//handlePossibleCancellationInterrupt需要结合cancel方法分析if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}
}private void handlePossibleCancellationInterrupt(int s) {// It is possible for our interrupter to stall before getting a// chance to interrupt us.  Let's spin-wait patiently.if (s == INTERRUPTING)while (state == INTERRUPTING)Thread.yield(); // wait out pending interrupt// assert state == INTERRUPTED;// We want to clear any interrupt we may have received from// cancel(true).  However, it is permissible to use interrupts// as an independent mechanism for a task to communicate with// its caller, and there is no way to clear only the// cancellation interrupt.//// Thread.interrupted();}

        这个方法到底有什么作用呢,作者通过源代码注释告诉我们这里的自旋目的是如果其他线程调用了cancel(true)方法,确保中断只能传递给当前执行任务的线程runner,并且state在runner线程执行期间最终能够被设置为INTERRUPTED。当线程调用cancel(true)方法方法时,先将任务状态设置为INTERRUPTING,再执行运行任务线程的中断方法,最后将任务状态设置为INTERRUPTED。执行任务的线程检测到有其他线程正在中断任务时,会等待完成中断操作后再退出。

        通过对源码的阅读,我们大致了解到了:任务是如何执行并且保存执行结果,完成任务后,如何唤醒等待获取执行结果的线程。在获取执行结果时,如果任务还未完成,如何进入等待队列,如果等待超时或者被中断,如何从等待队列中移除。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/144260.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

实时加密机的工作原理

实时加密机&#xff0c;作为数据加密领域的一种高级硬件设备&#xff0c;主要承担着对敏感数据进行实时加密和解密的任务&#xff0c;以确保数据在传输和存储过程中的安全性和完整性。以下是对实时加密机的详细阐述&#xff1a; 一、定义与功能 实时加密机是一种采用先进加密算…

前端项目代码开发规范及工具配置

在项目开发中&#xff0c;良好的代码编写规范是项目组成的重要元素。本文将详细介绍在项目开发中如何集成相应的代码规范插件及使用方法。 项目规范及工具 集成 EditorConfig集成 Prettier1. 安装 Prettier2. 创建 Prettier 配置文件3. 配置 .prettierrc4. 使用 Prettier 集成 …

任务调度控制台定制规格要求

在当今高度自动化与数字化的企业环境中&#xff0c;任务调度系统作为业务运营的核心支撑之一&#xff0c;其性能与灵活性直接影响到整体工作效率与服务质量。因此&#xff0c;定制一套符合企业特定需求的任务调度控制台显得尤为重要。接下来就给大家具体的阐述一下任务调度控制…

2024 vue3入门教程:02 我的第一个vue页面

1.打开src下的App.vue&#xff0c;删除所有的默认代码 2.更换为自己写的代码&#xff0c; 变量msg&#xff1a;可以自定义为其他&#xff08;建议不要使用vue的关键字&#xff09; 我的的第一个vue&#xff1a;可以更换为其他自定义文字 3.运行命令两步走 下载依赖 cnpm i…

ModbusTCP报文详解

Modbus TCP与Modbus Rtu(ASCI)数据帧的区别 总结&#xff1a;Modbus TCP就是在Modbus Rtu(ASCI)基础上去掉CRC&#xff0c;再加上六个0一个6 Modbus TCP MBAP报文头 域长度描述客户机服务器事务处理标识符2字节Modbus请求/响应事务处理的识别客户机启动服务器从接收的请求中重…

开放式耳机好不好用?六条绝妙选购要点避免踩坑

​开放式耳机目前非常流行&#xff0c;它们以时尚、美观和舒适著称&#xff0c;迅速赢得了众多用户的喜爱&#xff0c;成为了耳机市场的新宠。与传统的入耳式耳机相比&#xff0c;开放式耳机佩戴更稳固&#xff0c;对耳朵也更为温和。尽管有些人认为它们价格不菲&#xff0c;甚…

机器人的动力学——牛顿欧拉,拉格朗日,凯恩

机器人的动力学推导方法有很多&#xff0c;常用得有牛顿&#xff0c;拉格朗日&#xff0c;凯恩等方法&#xff0c;接下来&#xff0c;简单说说他们之间的使用。注&#xff1a;这里不考虑怎么来的&#xff0c;只说怎么应用。 参考1&#xff1a;4-14动力学分析方法-牛顿—欧拉方…

第J8周:Inception v1算法实战与解析

本文为365天深度学习训练营 中的学习记录博客原作者&#xff1a;K同学啊 任务&#xff1a; 1.了解并学习下图 a《卷积计算过程中》的卷积层运算量的计算过程&#xff08;储备知识->卷积层运算量的计算&#xff09; 2.了解并学习卷积层的并行结构与1x1卷积核部分内容&#xf…

基于STM32的光敏电阻检测及OLED显示仿真(库函数)

本专栏所有源资料都免费获取,无任何隐形消费。 注意事项:STM32仿真会存在各种各样BUG,且尽量按照同样仿真版本使用。本专栏所有的仿真都采用PROTEUS8.15。 本文已经配置好STM32F103C8T6系列,在PROTUES仿真里,32单片机一般只用一种型号,如需其他型号,可改名。 本次功能…

【C盘清理】Pycharm远程调试重度使用者C盘清理

文章目录 1 remote source 1 remote source 找到本地的这个路径C:\Users\verse\AppData\Local\JetBrains\PyCharm2022.3\remote_sources 这个文件夹是 PyCharm 在进行远程调试时使用的&#xff0c;它包含了远程服务器上的源代码副本。当你在 PyCharm 中设置远程调试并启动调试会…

如何测试和验证API的性能和稳定性?

在开发过程中&#xff0c;测试和验证API的性能和稳定性是确保软件质量的关键步骤。以下是一些有效的方法和最佳实践&#xff1a; 功能测试&#xff1a;首先&#xff0c;确保API的所有功能按预期工作。这包括对请求参数、方法、路径和预期响应的理解&#xff0c;以及对正常流程和…

飞机表面缺陷检测系统源码分享

飞机表面缺陷检测检测系统源码分享 [一条龙教学YOLOV8标注好的数据集一键训练_70全套改进创新点发刊_Web前端展示] 1.研究背景与意义 项目参考AAAI Association for the Advancement of Artificial Intelligence 项目来源AACV Association for the Advancement of Computer…

R高级绘图 | 不用 Cytoscape 也能绘制精美的蛋白互作(PPI)网络图啦!| STRING 网站 + R 代码

在进行 PPI 网络图绘制时&#xff0c;我们通常会将 STRING 网站与 Cytoscape 软件结合使用。但是经常有小伙伴们苦恼&#xff0c;不太会用 Cytoscape 软件进行处理可咋整呐&#xff01;之前我也吭哧吭哧学了点 Cytoscape&#xff0c;后来偶然的一个机会&#xff0c;我发现可以直…

python 识别省市、区县并组建三级信息数据库

一、网址&#xff1a; 全国行政区划信息查询平台 二、分析并搭建框架 检查网页源码&#xff1a; 检查网页源码可以发现&#xff1a; 所有省级信息全部在javaScript下的json中&#xff0c;会在页面加载时加载json数据&#xff0c;填充到页面的option中。 1、第一步&#xff1a…

MATLAB系列08:输入/输入函数

MATLAB系列08&#xff1a;输入/输入函数 8. 输入/输入函数8.1 函数textread8.2 关于load和save命令的进一步说明8.3 MATLAB文件过程简介8.4 文件的打开和关闭8.4.1 fopen函数8.4.2 fclose函数 8.5 二进制 I/O 函数8.5.1 fwrite 函数8.5.2 fread函数 8.6 格式化 I/O 函数8.6.1 f…

C语言 | Leetcode C语言题解之第414题第三大的数

题目&#xff1a; 题解&#xff1a; int cmp(const void *a, const void *b) {return *(int*)a < *(int*)b; }int thirdMax(int* nums, int numsSize){qsort(nums, numsSize, sizeof(nums[0]), cmp);int diff 0;for (int i 1; i < numsSize; i) {if (nums[i] ! nums[i…

(黑马点评) 五、探店达人系列功能实现

5.1 发布和查看探店笔记 5.1.1 发布探店笔记 这块代码黑马已经完成了&#xff0c;在发布探店笔记界面&#xff0c;有两块内容是需要上传的。一是笔记内容&#xff0c;二是笔记配图。其中笔记配图部分黑马使用的是上传到本地前端服务器上面的。我我觉得可以将图片文件发布在阿里…

【靶点Talk】免疫检查点争夺战:TIGIT能否超越PD-1?

曾经的TIGIT靶点顶着“下一个PD-1”的名号横空出世&#xff0c;三年的“征程”中TIGIT走过一次又一次的失败&#xff0c;然而面对质疑和压力仍有一批公司选择前行。今天给大家分享TIGIT靶点的相关内容&#xff0c;更多靶点科普视频请关注义翘神州B站和知乎官方账号。 TIGIT的“…

如何使用Java代码实现日期的比较以及如何在列表中按照日期进行排序

哈喽&#xff0c;大家好&#xff0c;我是木头左&#xff01; 在Java编程中&#xff0c;经常需要处理日期和时间相关的操作。本文将向您展示如何使用Java代码实现日期的比较以及如何在列表中按照日期进行排序。将通过以下几个步骤来实现这个目标&#xff1a; 理解日期比较&…

【2025】基于微信小程序的网上点餐系统设计与实现、基于微信小程序的智能网上点餐系统、微信小程序点餐系统设计、智能点餐系统开发、微信小程序网上点餐平台设计

博主介绍&#xff1a; ✌我是阿龙&#xff0c;一名专注于Java技术领域的程序员&#xff0c;全网拥有10W粉丝。作为CSDN特邀作者、博客专家、新星计划导师&#xff0c;我在计算机毕业设计开发方面积累了丰富的经验。同时&#xff0c;我也是掘金、华为云、阿里云、InfoQ等平台…