【并发编程】ThreadPoolExecutor任务提交与停止流程及底层实现【新手探索版】

文章目录

  • 1. ThreadPoolExecutor任务提交
  • 2. 线程池状态[这部分是难点呀]
    • 2.1. addWorker添加worker线程
    • 2.2. 内部类Worker
    • 2.3. runWorker():执行任务
    • 2.4. getTask():获取任务
    • 2.5. processWorkerExit():worker线程退出
  • 3.3. 关闭线程池
    • 3.3.1. shutdown方法
    • 3.3.2. shutdownNow方法
  • 4. 其他方法
    • 4.1. reject方法
  • 5. 总结
  • 6. 参考

测试代码:

// 创建只含有单个线程的线程池
ExecutorService executor = Executors.newSingleThreadExecutor();
// 提交任务
executor.submit(()->{System.out.println("C");
});
// 优雅停机
executor.shutdown();

1. ThreadPoolExecutor任务提交

我们看submit方法

// AbstractExecutorService
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;
}
  • newTaskFor:把Runnable构建为FutureTask任务
  • execute:执行任务
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 1、线程数量小于核心线程,则添加Workerif (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// 2、插入任务到队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 3、队列也满了,则添加非核心workelse if (!addWorker(command, false))// 4、添加非核心Worker失败则"拒绝"reject(command);
}

从以上可以看到线程池的处理原则是:核心线程数 > 队列 > 非核心线程 > 拒绝策略

1、先添加核心线程来处理

2、核心线程不够,则添加到队列中

3、队列也不够,则添加“非核心线程”

4、非核心线程也不够,则执行拒绝策略

2. 线程池状态[这部分是难点呀]

public class ThreadPoolExecutor extends AbstractExecutorService {// 核心变量:该变量前3位表示"状态",后29为表示线程数量private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 该变量控制ctl变量用多少bit位表示状态【bit位的分界线】private static final int COUNT_BITS = Integer.SIZE - 3;// 表示线程的最大容量【即0001 1111 1111...】private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// 以下变量用前3位控制线程池的状态private static final int RUNNING    = -1 << COUNT_BITS; // [即111]private static final int SHUTDOWN   =  0 << COUNT_BITS; // [即000]private static final int STOP       =  1 << COUNT_BITS; // [即001]private static final int TIDYING    =  2 << COUNT_BITS; // [即010]private static final int TERMINATED =  3 << COUNT_BITS; // [即011]// 计算ctl变量的前3位,表示“状态”private static int runStateOf(int c)     { return c & ~CAPACITY; }// 计算ctl变量的后29位,表示"工作线程数量"private static int workerCountOf(int c)  { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }
}
  • ctl:是control(控制)的缩写。是线程池的最核心变量,该int变量即包含状态也包含线程的数量
  • COUNT_BITS:该变量ctl变量bit位的分界线
  • CAPACITY:表示线程池最大线程数量
  • 各种状态:用ctl的前3个bit位表示状态
  • runStateOf、workerCountOf方法:计算状态、线程数量

2.1. addWorker添加worker线程

private boolean addWorker(Runnable firstTask, boolean core) {// <1>retry:for (;;) {int c = ctl.get();// 获取状态int rs = runStateOf(c);// <1.1> 状态判断if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//增加c变量的线程数if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctl// 一旦状态改变,则需要返回外层循环重新判断状态if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}// <2>boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask); // 创建Workerfinal Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();// 添加"工作线程需要上锁"try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();//启动工作线程workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}

addWorker代码分2个部分

  • 维护ctl变量(在<1>部分)

用cas给ctl变量表示线程数量的位置值加一。

<1.1>处,状态判断我是感觉写法挺烧脑子的。

if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))return false;

要使整体条件成立则rs >= SHUTDOWN成立且后面括号的判断为false。后面括号分3种情况:

1、如果rs == SHUTDOWN为false(隐含条件有:rs >= SHUTDOWN成立。

rs == SHUTDOWN,则rs可能得值有stop、tidying、terminated其中一个,即线程池已经终止,则表示终止状态下不必添加Worker线程。

2、如果firstTask == null为false(隐含的条件有:rs >= SHUTDOWN,rs == SHUTDOWN成立。

线程池是shutdown状态,且firstTask == null为false,则表示shutdown状态下不必添加Worker线程。

3、如果workQueue.isEmpty()为true(隐含条件:rs=SHUTDOWN,firstTask == null成立。

线程池是shutdown状态,且提交的任务也是null,且队列是空的,则不必添加Worker线程。【如果队列不是空的还是有必要添加Worker?】

备注:第3个条件真的有必要吗???因为workQueue某些任务挺耗时,添加添加Worker是为了加快workQueue中任务的处理?

如此复杂的条件判断的目的:特定条件下即使shutdown还是会添加Worker线程,并不是只要shutdown就不必添加Worker线程。

批判:完全没有必要把条件搞这么复杂,只能说作者太追求极致了。完全可以只要状态>=shutdown就返回false。而且firstTask=null怎么会发生呢,这个方法是private啊。

极致的条件是什么:①rs>=shutdown且②rs=shutdown且③firstTask=null且④workQueue非空,即这些条件都满足时还是可以添加Worker线程的。

  • 添加启动线程(在<2>部分)

1、创建Worker

firstTask变量表示Worker中的第一个任务

每一个Worker表示一个工作线程。

2、添加Worker到workers

由线程池的workers变量统一维护所有线程

3、添加线程的过程需要“上锁”,添加完线程之后调用t.start方法启动线程

2.2. 内部类Worker

/*** Worker类大体上管理着运行线程的中断状态 和 一些指标* Worker类投机取巧的继承了AbstractQueuedSynchronizer来简化在执行任务时的获取、释放锁* 这样防止了中断在运行中的任务,只会唤醒(中断)在等待从workQueue中获取任务的线程* 解释:*   为什么不直接执行execute(command)提交的command,而要在外面包一层Worker呢??*   主要是为了控制中断....重点主要原因啦*   用什么控制??*   用AQS锁,当运行时上锁,就不能中断,TreadPoolExecutor的shutdown()方法中断前都要获取worker锁*   只有在等待从workQueue中获取任务getTask()时才能中断** worker实现了一个简单的不可重入的互斥锁,而不是用ReentrantLock可重入锁* 因为我们不想让在调用比如setCorePoolSize()这种线程池控制方法时可以再次获取锁(重入)* 解释:*   setCorePoolSize()时可能会interruptIdleWorkers(),在对一个线程interrupt时会要w.tryLock()*   如果可重入,就可能会在对线程池操作的方法中中断线程,类似方法还有:*   setMaximumPoolSize()*   setKeppAliveTime()*   allowCoreThreadTimeOut()*   shutdown()* 此外,为了让线程真正开始后才可以中断,初始化lock状态为负值(-1),在开始runWorker()时将state置为0,而state>=0才可以中断* * Worker继承了AQS,实现了Runnable,说明其既是一个可运行的任务,也是一把锁(不可重入)*/
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{final Thread thread; //利用ThreadFactory和 Worker这个Runnable创建的线程对象Runnable firstTask;volatile long completedTasks;Worker(Runnable firstTask) {//设置AQS的同步状态private volatile int state,是一个计数器,大于0代表锁已经被获取setState(-1);// 在调用runWorker()前,禁止interrupt中断,//在interruptIfStarted()方法中会判断 getState()>=0this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this); //根据当前worker创建一个线程对象//当前worker本身就是一个runnable任务,也就是不会用参数的firstTask创建线程,//而是调用当前worker.run()时调用firstTask.run()}public void run() {runWorker(this); //runWorker()是ThreadPoolExecutor的方法}// Lock methods// The value 0 represents the unlocked state. 0代表“没被锁定”状态// The value 1 represents the locked state. 1代表“锁定”状态protected boolean isHeldExclusively() {return getState() != 0;}/*** 尝试获取锁* 重写AQS的tryAcquire(),AQS本来就是让子类来实现的*/protected boolean tryAcquire(int unused) {//尝试一次将state从0设置为1,即“锁定”状态,//但由于每次都是state 0->1,而不是+1,那么说明不可重入//且state==-1时也不会获取到锁if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread()); //设置exclusiveOwnerThread=当前线程return true;}return false;}/*** 尝试释放锁* 不是state-1,而是置为0*/protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null); setState(0);return true;}public void lock()        { acquire(1); }public boolean tryLock()  { return tryAcquire(1); }public void unlock()      { release(1); }public boolean isLocked() { return isHeldExclusively(); }/*** 中断(如果运行)* shutdownNow时会循环对worker线程执行* 且不需要获取worker锁,即使在worker运行时也可以中断*/void interruptIfStarted() {Thread t;//如果state>=0、t!=null、且t没有被中断//new Worker()时state==-1,说明不能中断if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}
}

Worker类本身既实现了Runnable,又继承了AbstractQueuedSynchronizer,所以其既是一个可执行的任务,又可以达到锁的效果

主要是几个问题:

  • 为什么会有一把锁

肯定不是为了控制并发的,因为只有一个线程。主要是为了控制中断。用AQS锁,当运行时上锁,就不能中断,TreadPoolExecutor的shutdown()方法中断前都要获取worker锁,只有在等待从workQueue中获取任务getTask()时才能中断

  • 控制中断体现在哪里

1、初始AQS状态为-1,此时不允许中断interrupt(),只有在worker线程启动了,执行了runWoker(),将state置为0,才能中断

不允许中断体现在:

  1. shutdown()线程池时,会对每个worker tryLock()上锁,而Worker类这个AQS的tryAcquire()方法是固定将state从0->1,故初始状态state=-1时tryLock()失败,没办法中断interrupt()
  2. shutdownNow()线程池时,不用tryLock()上锁,但调用worker.interruptIfStarted()终止worker,interruptIfStarted()也有state>=0才能interrupt的逻辑,而初始state=-1

2、为了防止某种情况下,在运行中的worker被中断,runWorker()每次运行任务时都会lock()上锁,而shutdown()这类可能会终止worker的操作需要先获取worker的锁,这样就防止了中断正在运行的线程。Worker实现的AQS为不可重入锁,为了是在获得worker锁的情况下再进入其它一些需要加锁的方法

  • 有没有响应中断

很明显ThreadPoolExecutor没有响应中断(即没有检测和处理中断)。没有响应中断与控制中断是两回事。至于为什么要多搞一层Work来控制中断作者也不清楚。

  • 没有中断、 没有锁行不行

个人猜测:如果只是我们自己写代码完全可以,但这是大神写的代码,不可以

  • 为什么要自己继承AQS,而不是使用ReentrantLock

ReentrantLock是可重入的,而ThreadPoolExecute要求不可重入

2.3. runWorker():执行任务

看一下Worker如何执行任务的

// java.util.concurrent.ThreadPoolExecutor.Worker
public void run() {runWorker(this);
}
final void runWorker(Worker w) {Thread wt = Thread.currentThread();// <1>Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// <2> 获取任务【重点】while (task != null || (task = getTask()) != null) {// <3> 上锁,不是为了防止并发执行任务,为了在shutdown()时不终止正在运行的workerw.lock();// <4> 极端判断条件是:由Running或shutdown状态,突然变成stop状态if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))&& !wt.isInterrupted())wt.interrupt();// <5>try {// <5.1>beforeExecute(wt, task);Throwable thrown = null;try {// <5.2>task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// <5.3>afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}// <6> completedAbruptly变量表示是否是"暴力结束线程",执行到此处说明线程是因为taskTask返回为nullcompletedAbruptly = false;} finally {// <7> worker线程的退出【重点】processWorkerExit(w, completedAbruptly);}
}
  • <1>

在创建Worker时添加的任务为第一个任务(firstTask)

  • <2>

调用getTaskworkQueue取出任务。详情请看getTask方法详解。

  • <3>处,为什么要上锁

是为了控制中断,是控制,控制,不是响应中断,线程池没有响应中断。

  • <4>

此时的极端条件是什么:由Running或shutdown状态,突然变成stop状态

1、如果状态达到stop级别,必须要对worker中断。

2、可能暂时没有达到stop级别,但是别处立马调用了shutdownNow则立马达到stop级别,也要立刻中断

注意:线程池本身不响应中断,是否中断用户自行在task.run方法决定

  • <5>

1、<5.1>是任务执行开始之前的动作,默认空实现

2、<5.2>是执行任务,即调用run方法

3、<5.3>是任务结束之后的动作,默认空实现

  • <6>

completedAbruptly变量表示是否是暴力完成任务的,执行到<4>处说明是正常完成,不是暴力完成。

代码执行到<4>处位置,说明getTask方法取出任务为null,也就是任务不足了,就需要考虑销毁不必要的线程

  • <7>

此时需要终止工作线程。这一段后面有详解。

2.4. getTask():获取任务

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// <1> 极端条件是:rs == SHUTDOWN,且 workQueue不为空if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// <2> 是否允许取出任务超时boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// <3> 极端条件:wc <= maximumPoolSize 且 wc > corePoolSize 且 超时 且 队列为空,此时需要销毁当前线程if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// <2> 取出任务Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;// <3> 到这里说明当前线程出现了等待超时,则可能要销毁非核心线程timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}
  • <1>

此时条件复杂,我们看极端情况,极端条件是:【rs == SHUTDOWN且 workQueue不为空】,说明了即使是SHUTDOWN状态只要队列workQueue不为空,还是需要等待workQueue执行。

SHUTDOWN是优雅停机,会先等workQueue执行完毕

  • <2>处,

timed变量主要是控制当线程空闲时是否需要销毁线程。默认如果线程数量>corePoolSize且空闲,则销毁当前线程;如果线程<=corePoolSize但空闲是不会销毁线程的。(是否销毁默认取决于线程数量)

  • <3>

极端条件:wc <= maximumPoolSize 且 wc > corePoolSize 且 超时 且 队列为空,此时线程数量太多又没有任务所以需要销毁当前线程。

  • <4>处,取出任务。

1、如果允许取出任务超时,则调用限时的poll方法。如果超时返回null

2、如果不允许取出任务超时,则调用take方法阻塞获取task。

也就是说如果返回null则一定就是**当前线程出现了等待的情况,则考虑是否需要销毁不必要的线程了

2.5. processWorkerExit():worker线程退出

注意:这才是worker工作线程真正退出的地方,而不是调用线程池的shutdown方法工作线程就会停止,工作线程会等待所有任务都执行完成后才会停止。

private void processWorkerExit(Worker w, boolean completedAbruptly) {/*** 1、worker数量-1* 如果是突然终止,说明是task执行时异常情况导致,即run()方法执行时发生了异常,*那么正在工作的worker线程数量需要-1* 如果不是突然终止,说明是worker线程没有task可执行了,不用-1,因为已经在getTask()方法中-1了*/if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted 代码和注释正好相反啊decrementWorkerCount();/*** 2、从Workers Set中移除worker*/final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks; //把worker的完成任务数加到线程池的完成任务数workers.remove(w); //从HashSet<Worker>中移除} finally {mainLock.unlock();}/*** 3、在对线程池有负效益的操作时,都需要“尝试终止”线程池* 主要是判断线程池是否满足终止的状态* 如果状态满足,但还有线程池还有线程,尝试对其发出中断响应,使其能进入退出流程* 没有线程了,更新状态为tidying->terminated*/tryTerminate();/*** 4、是否需要增加worker线程* 线程池状态是running 或 shutdown* 如果当前线程是突然终止的,addWorker()* 如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker()* 故如果调用线程池shutdown(),直到workQueue为空前,线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程*/int c = ctl.get();//如果状态是running、shutdown,即tryTerminate()没有成功终止线程池,尝试再添加一个workerif (runStateLessThan(c, STOP)) {//不是突然完成的,即没有task任务可以获取而完成的,计算min,并根据当前worker数量判断是否需要addWorker()if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //allowCoreThreadTimeOut默认为false,即min默认为corePoolSize//如果min为0,即不需要维持核心线程数量,且workQueue不为空,至少保持一个线程if (min == 0 && ! workQueue.isEmpty())min = 1;//如果线程数量大于最少数量,直接返回,否则下面至少要addWorker一个if (workerCountOf(c) >= min)return; // replacement not needed}//添加一个没有firstTask的worker//只要worker是completedAbruptly突然终止的,或者线程数量小于要维护的数量,就新添一个worker线程,即使是shutdown状态addWorker(null, false);}
}

processWorkerExit(Worker w, boolean completedAbruptly)

worker:要结束的worker

completedAbruptly:是否突然完成(是否因为异常退出)

执行流程:

1、worker数量-1

A、如果是突然终止,说明是task执行时异常情况导致,即run()方法执行时发生了异常,那么正在工作的worker线程数量需要-1B、如果不是突然终止,说明是worker线程没有task可执行了,不用-1,因为已经在getTask()方法中-1了

2、从Workers Set中移除worker,删除时需要上锁mainlock
3、tryTerminate()。大概逻辑:判断线程池是否满足终止的状态

tryTerminate虽然很长,但是只有一个作用,通过设置线程池状态为terminal来真正终止线程,终止的前提条件是:线程池状态为shutdown + 队列是空的 + 工作线程为0

操作:更新状态为tidying->terminated

4、是否需要增加worker线程,如果线程池还没有完全终止,仍需要保持一定数量的线程

如:我们自定义的任务task.run方法发生了异常导致线程退出,此时线程池需要补充新的线程进来

故如果调用线程池shutdown(),直到workQueue为空前,线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程

3.3. 关闭线程池

注意:关闭线程池并不能让线程池中的线程停止,只是对其发送了中断信号,仅仅是发送了信号而已。而ThreadPoolExecute没有响应中断信号,所以最终只有在所有的task都被执行后才会真正的结束worker线程。

3.3.1. shutdown方法

shutdown()后线程池将变成shutdown状态,此时不接收新任务**,但会处理完正在运行的和在阻塞队列中等待处理的任务**。

/*** 中断在等待任务的线程(没有上锁的),中断唤醒后,可以判断线程池状态是否变化来决定是否继续*/
private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock(); //上锁 try {for (Worker w : workers) {Thread t = w.thread;if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne) break;}} finally {mainLock.unlock(); //解锁 } }}
}

shutdown()执行流程:

1、上锁,mainLock是线程池的主锁,是可重入锁,当要操作workers set这个保持线程的HashSet时,需要先获取mainLock,还有当要处理largestPoolSize、completedTaskCount这类统计数据时需要先获取mainLock

2、使用CAS操作将线程池状态设置为shutdown,shutdown之后将不再接收新任务

3、中断所有空闲线程 interruptIdleWorkers()

正在运行中的线程不会被中断

4、onShutdown(),ScheduledThreadPoolExecutor中实现了这个方法,可以在shutdown()时做一些处理

5、解锁

6、尝试终止线程池 tryTerminate()

可以看到shutdown()方法最重要的几个步骤是:更新线程池状态为shutdown中断所有空闲线程tryTerminated()尝试终止线程池

interruptIdleWorkers() 中断空闲线程过程如下:

    /*** 中断在等待任务的线程(没有上锁的),中断唤醒后,可以判断线程池状态是否变化来决定是否继续*/private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock(); //上锁 try {for (Worker w : workers) {Thread t = w.thread;if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne) break;}} finally {mainLock.unlock(); //解锁 } }}}

关键是:worker.tryLock() 是否成功

如果worker是空闲状态(即tryLock成功),AQS的state从0–>1,既然是空闲线程那么中断空闲线程。

3.3.2. shutdownNow方法

shutdownNow()后线程池将变成stop状态,此时不接收新任务**,不再处理在阻塞队列中等待的任务,还会尝试中断正在处理中的工作线程**。

无论如何线程池ThreadPoolExecute本身没有响应中断,至于用户是否响应中断时用户自己的事情。

/*** 尝试停止所有活动的正在执行的任务,停止等待任务的处理,并返回正在等待被执行的任务列表* 这个任务列表是从任务队列中排出(删除)的* <p>* 这个方法不用等到正在执行的任务结束,要等待线程池终止可使用awaitTermination()* <p>* 除了尽力尝试停止运行中的任务,没有任何保证* 取消任务是通过Thread.interrupt()实现的,所以任何响应中断失败的任务可能永远不会结束*/
public List <Runnable> shutdownNow() {List <Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock(); //上锁try {//判断调用者是否有权限shutdown线程池checkShutdownAccess();//CAS+循环设置线程池状态为stopadvanceRunState(STOP);//中断所有线程,包括正在运行任务的interruptWorkers();tasks = drainQueue(); //将workQueue中的元素放入一个List并返回} finally {mainLock.unlock(); //解锁}//尝试终止线程池tryTerminate();return tasks; //返回workQueue中未执行的任务
}

该方法会中断所有的线程,包括正在运行的线程,用户是否响应是用户自己的事情

线程池中队列中等待执行的task会被移除,并返回给用户。

shutdownNow() 和 shutdown()的大体流程相似,差别是:

1、将线程池更新为stop状态

2、调用**interruptWorkers()**中断所有线程,包括正在运行的线程

interruptWorkers先对mainLock加锁,然后循环调用interruptIfStarted方法中断Worker的所有线程,其中会判断worker的AQS state是否大于0,即worker是否已经开始运作,再调用t.interrupt()。

3、将workQueue中待处理的任务移到一个List中,并在方法最后返回,说明shutdownNow()后不会再处理workQueue中的任务

4. 其他方法

4.1. reject方法

private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();final void reject(Runnable command) {handler.rejectedExecution(command, this);
}public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());
}

默认的handler是AbortPolicy,它的处理方式是“抛出异常”

其它的handler:

  • DiscardPolicy

直接丢弃

  • DiscardOldestPolicy

丢弃”最老的“

  • CallerRunsPolicy

调用者线程来执行。但是如果线程池被shutdown,则任务会被丢弃。

5. 总结

1、把握int型ctl变量的高3位表示线程池的状态,低29为表示线程池中线程数量是前提

2、能了解线程池执行任务的总体流程(先core,在队列,在max,在拒绝)

3、能了解为什么要有Work类(主要是为了中断)

4、worker线程满足什么条件才会退出

task发生异常会退出但是又会补充新的线程进来

只有任务都执行完了且调用了shutdown方法,才会真正退出

5、shutdown方法对worker线程的影响

shutdown会对空闲线程interrupt

shutdown会改变线程池状态为shutdown,worker线程会被状态作出反应

  • shutdown之后不能添加新的task(体现在addWorker中)

  • worker线程执行完任务之后就退出了,而不是在workQueue队列上等待新任务

6、温故而知新,还是要经常看看经常想想

6. 参考

https://blog.csdn.net/qq_41573860/article/details/123291943

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/146002.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

机器学习之单层神经网络的训练:增量规则(Delta Rule)

文章目录 权重的调整单层神经网络使用delta规则的训练过程 神经网络以权值的形式存储信息,根据给定的信息来修改权值的系统方法称为学习规则。由于训练是神经网络系统地存储信息的唯一途径&#xff0c;因此学习规则是神经网络研究中的一个重要组成部分 权重的调整 &#xff08…

【中秋国庆不断更】OpenHarmony多态样式stateStyles使用场景

Styles和Extend仅仅应用于静态页面的样式复用&#xff0c;stateStyles可以依据组件的内部状态的不同&#xff0c;快速设置不同样式。这就是我们本章要介绍的内容stateStyles&#xff08;又称为&#xff1a;多态样式&#xff09;。 概述 stateStyles是属性方法&#xff0c;可以根…

蓝桥等考Python组别十级003

第一部分&#xff1a;选择题 1、Python L10 &#xff08;15分&#xff09; 已知s Pencil&#xff0c;下列说法正确的是&#xff08; &#xff09;。 s[0]对应的字符是Ps[1]对应的字符是ns[-1]对应的字符是is[3]对应的字符是e 正确答案&#xff1a;A 2、Python L10 &am…

NLP 03(LSTM)

一、LSTM LSTM (Long Short-Term Memory) 也称长短时记忆结构,它是传统RNN的变体,与经典RNN相比&#xff1a; 能够有效捕捉长序列之间的语义关联缓解梯度消失或爆炸现象 LSTM的结构更复杂,它的核心结构可以分为四个部分去解析: 遗忘门、输入门、细胞状态、输出门 LSTM内部结构…

解决 react 项目启动端口冲突

报错信息&#xff1a; Emitted error event on Server instance at:at emitErrorNT (net.js:1358:8)at processTicksAndRejections (internal/process/task_queues.js:82:21) {code: EADDRINUSE,errno: -4091,syscall: listen,address: 0.0.0.0,port: 8070 }解决方法&#xff…

叶工好容6-自定义与扩展

本篇主要介绍扩展的本质以及CRD与Operator之间的区别&#xff0c;帮助大家理解相关的概念以及知道要进行扩展需要做哪些工作。 CRD&#xff08;CustomerResourceDefinition&#xff09; 自定义资源定义,代表某种自定义的配置或者独立运行的服务。 用户只定义了CRD没有任何意…

最新AI智能创作系统ChatGPT商业源码+详细图文搭建部署教程+AI绘画系统

一、AI系统介绍 SparkAi创作系统是基于国外很火的ChatGPT进行开发的Ai智能问答系统。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭建部署AI创作ChatGPT&#xff1f;小编这里写一个详细图文教程吧&am…

liunx的攻击

1.场景和分析 2.病毒分析 3.解决步骤

MySQL在线修改表结构-PerconaTookit工具

在线修改表结构必须慎重 在业务系统 运行 过程中随意删改字段&#xff0c;会 造成重大事故。 常规的做法是&#xff1a;业务停机&#xff0c;再 维护表结构 比如&#xff1a;12306 凌晨 0 点到早上 7 点是停机维护 如果是不影响正常业务的表结构是允许在线修改的。 比如&…

Ubuntu部署运行ORB-SLAM2

ORB-SLAM2是特征点法的视觉SLAM集大成者&#xff0c;不夸张地说是必学代码。博主已经多次部署运行与ORB-SLAM2相关的代码&#xff0c;所以对环境和依赖很熟悉&#xff0c;对整个系统也是学习了几个月&#xff0c;一行行代码理解。本次在工控机上部署记录下完整的流程。 ORB-SLA…

Vuex状态管理

一、Vuex简介&安装 简介 vuex是使用vue中必不可少的一部分&#xff0c;基于父子、兄弟组件&#xff0c;我们传值可能会很方便&#xff0c;但是如果是没有关联的组件之间要使用同一组数据&#xff0c;就显得很无能为力&#xff0c;那么vuex就很好的解决了我们这种问题&…

【数据结构】外部排序、多路平衡归并与败者树、置换-选择排序(生成初始归并段)、最佳归并树算法

目录 1、外部排序 1.1 基本概念 1.2 方法 2、多路平衡归并与败者树 2.1 K路平衡归并 2.2 败者树 3、置换-选择排序&#xff08;生成初始归并段&#xff09;​编辑 4、最佳归并树 4.1 理论基础​编辑 4.2 构造方法 ​编辑 5、各种排序算法的性质 1、外部排序 1.1 基本概…

简易磁盘自动监控服务

本文旨在利用crontab定时任务(脚本请参考附件)来监控单个服务节点上所有磁盘使用情况&#xff0c;一旦超过既定阈值则会通过邮件形式告警相关利益人及时介入处理。 1. 开启SMTP服务 为了能够成功接收告警信息&#xff0c;需要邮件接收客户都安开启SMTP服务。简要流程请参考下…

数字孪生智慧能源:风光储一体化能源中心

自“双碳”目标提出以来&#xff0c;我国能源产业不断朝着清洁低碳化、绿色化的方向发展。其中&#xff0c;风能、太阳能等可再生能源在促进全球能源可持续发展、共建清洁美丽世界中被寄予厚望。风能、太阳能具有波动性、间歇性、随机性等特点&#xff0c;主要通过转化为电能再…

中国逐年干燥度指数数据集

简介&#xff1a; 中国逐年干燥度指数&#xff0c;空间分辨率为1km&#xff0c;时间为1901-2022&#xff0c;为比值&#xff0c;没有单位。该数据集是基于中国1km逐月潜在蒸散发&#xff08;PET&#xff09;和降水量&#xff08;PRE&#xff09;采用比值法计算式得到&#xff…

Go_原子操作和锁

原子操作和锁 本文先探究并发问题&#xff0c;再探究锁和原子操作解决问题的方式&#xff0c;最后进行对比。 并发问题 首先&#xff0c;我们看一下程序 num该程序表面看上去一步就可以运行完成&#xff0c;但是实际上&#xff0c;在计算机中是分三步运行的&#xff0c;如下…

PHP 二手物品交易网站系统mysql数据库web结构apache计算机软件工程网页wamp

一、源码特点 PHP 二手物品交易网站系统是一套完善的web设计系统&#xff0c;对理解php编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。 代码下载 https://download.csdn.net/download/qq_41221322/88385559 二、功能介…

分析各种表达式求值过程

目录 算术运算与赋值 编译器常用的两种优化方案 常量传播 常量折叠 加法 Debug编译选项组下编译后的汇编代码分析 Release开启02执行效率优先 减法 Release版下优化和加法一致&#xff0c;不再赘述 乘法 除法 算术结果溢出 自增和自减 关系运算与逻辑运算 JCC指…

What is an HTTP Flood DDoS attack?

HTTP 洪水攻击是一种针对 Web 和应用程序服务器的第 7 层分布式拒绝服务 &#xff08;DDoS&#xff09; 攻击。HTTP 洪水攻击通过使用 HTTP GET 或 HTTP POST 请求执行 DDoS 攻击。这些请求是有效的&#xff0c;并且针对可用资源&#xff0c;因此很难防范 HTTP 洪水攻击。 匿名…

你熟悉Docker吗?

你熟悉Docker吗&#xff1f; 文章目录 你熟悉Docker吗&#xff1f;快速入门Docker安装1.卸载旧版2.配置Docker的yum库3.安装Docker4.启动和校验5.配置镜像加速5.1.注册阿里云账号5.2.开通镜像服务5.3.配置镜像加速 部署MySQL镜像和容器命令解读 Docker基础常用命令数据卷数据卷…