一、线程池的基本概念
1.1 线程池的定义
线程池是一组预先创建的线程,这些线程可以重复使用来执行多个任务,避免了频繁创建和销毁线程的开销。线程池的核心思想是通过复用一组工作线程,来处理大量的并发任务,减少系统资源消耗,提高应用程序的响应速度和吞吐量。
1.2 线程池解决的问题
线程池解决的核心问题是资源管理问题。在并发环境中,系统无法预知任意时刻有多少任务需要执行或投入多少资源,从而导致以下问题:
- 频繁的资源申请和销毁:频繁地申请和销毁资源以及调度资源会带来较大的额外开销。
- 缺乏资源控制手段:对于无限制的资源申请缺乏有效的抑制机制,容易导致系统资源耗尽的风险。
- 资源分配不合理:系统可能无法合理地管理内部资源的分配,导致整体稳定性降低。
为解决资源分配这个问题,线程池采用了“池化”思想,将资源统一在一起管理。
二、ThreadPoolExecutor概述
2.1 ThreadPoolExecutor的构造方法和参数介绍
- corePoolSize:核心池的大小。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
- maximumPoolSize:线程池最大线程数。表示在线程池中最多能创建多少个线程;
- keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
- unit:参数keepAliveTime的时间单位;
- workQueue:阻塞队列。用来存储等待执行的任务,一般来说,这里的阻塞队列有以下几种选择:
- threadFactory:线程工厂,主要用来创建线程,给线程命名等。
- handler:拒绝策略。有以下几种选择:
三、线程池的工作流程
3.1 任务调度
所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:
- 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
- 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
- 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
- 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
- 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
3.2 源码解析
3.2.1 execute()入口
public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);
}
3.2.2 线程池的状态
线程池运行的状态是伴随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。
在具体实现中,线程池将runState、workerCount两个关键参数的维护放在了一起。如上代码ctl这个AtomicInteger变量,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它同时包含两部分的信息:线程池的运行状态和线程池内有效线程的数量,高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数,这里都使用的是位运算的方式。
3.2.3 源码解读
public void execute(Runnable command) {if (command == null) // 检查传入的任务是否为空,防止空任务被加入任务队列throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) { // 如果线程数量小于核心线程数if (addWorker(command, true)) // 创建新的线程执行任务,addWorker(command, true)返回true说明成功创建并启动一个新的核心线程来执行任务return;c = ctl.get(); }if (isRunning(c) && workQueue.offer(command)) { // 检查线程池是否处于运行状态并尝试将任务加入到工作队列中int recheck = ctl.get();if (! isRunning(recheck) && remove(command)) // 再次检查线程池状态是否是运行状态。如果线程池不是运行状态,则尝试移除任务并调用reject(command)拒绝任务reject(command);else if (workerCountOf(recheck) == 0) // 如果线程数量为0,添加一个非核心线程处理队列中的任务addWorker(null, false);}else if (!addWorker(command, false)) // 如果队列已满或线程池不在运行状态,则尝试增加非核心线程来直接处理任务。如果不能创建新的线程来执行任务,执行拒绝策略reject(command);
}private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && // 线程池处于SHUTDOWN、STOP、TIDYING或TERMINATED状态之一时,不再接收新任务或可能不再处理队列中的任务! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty())) // 只有在rs == SHUTDOWN 且 firstTask == null 且 workQueue不为空时,才继续执行,不返回falsereturn false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize)) // 线程数wc是否超过上限(CAPACITY)或者超过设定的核心线程数或最大线程数。如果是,返回falsereturn false;if (compareAndIncrementWorkerCount(c)) // 以原子操作方式增加工作线程计数,成功则跳出外层循环break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs) // 如果增加工作线程计数失败,再次检查运行状态是否改变,如果改变则继续外层循环重新检查continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock; // 锁定主锁,以确保对workers集合的操作是线程安全的mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN || // 线程池运行状态是RUNNING(rs == SHUTDOWN && firstTask == null)) { // 或者运行状态是SHUTDOWN并且firstTask == nullif (t.isAlive()) // 检查线程t是否已经启动,如果是,抛出IllegalThreadStateException异常throw new IllegalThreadStateException();workers.add(w); // 将新工作线程添加到workers集合int s = workers.size();if (s > largestPoolSize) // 更新记录中最大的线程池大小largestPoolSizelargestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) { // 启动线程并设置workerStarted为truet.start();workerStarted = true;}}} finally {if (! workerStarted) // 如果在前面的操作中未成功启动线程,调用addWorkerFailed(w)进行清理工作。addWorkerFailed(w);}return workerStarted;
}private final class Workerextends AbstractQueuedSynchronizerimplements Runnable
{/** Delegates main run loop to outer runWorker */public void run() {runWorker(this);}
}final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) { // 当任务非空或从任务队列中获取的新任务非空时循环执行w.lock(); // 锁定当前Worker,防止任务执行过程中被中断// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted()) // 确保在线程池STOP、TIDYING或TERMINATED的情况下,当前工作线程及时响应这些状态变化,避免不必要的任务执行wt.interrupt();try {beforeExecute(wt, task); // 允许用户在任务执行前进行一些处理工作,空方法Throwable thrown = null;try {task.run(); // 执行任务} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown); // 允许用户在任务执行后进行一些处理工作,空方法}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false; // 如果正常退出循环(无异常),设置completedAbruptly为false} finally {processWorkerExit(w, completedAbruptly); // 无论如何结束,调用processWorkerExit方法处理工作线程的退出逻辑,包括清理和统计。}
}
3.2.4 Worker简介
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable
{/*** This class will never be serialized, but we provide a* serialVersionUID to suppress a javac warning.*/private static final long serialVersionUID = 6138294804551838833L;/** Thread this worker is running in. Null if factory fails. */final Thread thread;/** Initial task to run. Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker */public void run() {runWorker(this);}// Lock methods//// The value 0 represents the unlocked state.// The value 1 represents the locked state.protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}
}
Worker继承自AQS,重写了AQS锁竞争释放相关的代码。在执行run()方法时,如果Worker取到任务,会通过lock()方法进行同步处理。
AQS通过模板方法,最终调用子类实现的tryAcquire()方法尝试获取锁。这里不对AQS做过多介绍,感兴趣的伙伴可以查看“参考资料”相关的介绍。
我们来看一个问题,Worker的锁是否可重入,为什么?对比可重入锁ReentrantLock的锁获取方法:
/*** Worker的*/
protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;
}/*** ReentrantLock的*/
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) { // 关键int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}
ReentrantLock获取锁的方法中,有一个else if (current == getExclusiveOwnerThread())判断,而Worker没有。由此可知Worker不是可重入的,Worker在ThreadPoolExecutor中设计的目标是让每个任务独立执行完毕,如果允许重入,一个任务还没执行完,当前线程又接收到新的任务,导致任务的状态和数据相互干扰。例如,变量、线程本地数据可能在任务间被覆盖或意外共享。
四、线程池在实际应用中的踩坑案例
4.1 嵌套使用线程池,可能造成死锁
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;public class ThreadPoolDeadlockExample {private static ThreadPoolExecutor executor =new ThreadPoolExecutor(5, 5, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));public static void main(String[] args) throws ExecutionException, InterruptedException {int loop = 0;while (true) {System.out.println("loop start. loop = " + (loop));innerFutureAndOutFuture();System.out.println("loop end. loop = " + (loop++));Thread.sleep(10);}}public static void innerFutureAndOutFuture() throws ExecutionException, InterruptedException {Callable<String> innerCallable = new Callable<String>() {@Overridepublic String call() throws Exception {Thread.sleep(100);return "inner callable";}};Callable<String> outerCallable = new Callable<String>() {@Overridepublic String call() throws Exception {Thread.sleep(10);Future<String> innerFuture = executor.submit(innerCallable);String innerResult = innerFuture.get();Thread.sleep(10);return "outer callable. inner result = " + innerResult;}};List<Future<String>> futures = new ArrayList<>();for (int i = 0; i < 10; i++) {System.out.println("submit : " + i);Future<String> outerFuture = executor.submit(outerCallable);futures.add(outerFuture);}for (int i = 0; i < 10; i++) {String outerResult = futures.get(i).get();System.out.println(outerResult + ":" + i);}}
}输出结果:
loop start. loop = 0
submit : 0
submit : 1
submit : 2
submit : 3
submit : 4
submit : 5
submit : 6
submit : 7
submit : 8
submit : 9
死锁分析:for循环中提交10次outerCallable,占据5个核心线程,阻塞队列5个,outerCallable提交5个innerCallable。核心线程执行的outerCallable等待innerCallable执行完成,innerCallable等待outerCallable占据的核心线程释放,形成死锁。
总结:不要在线程池里等待另一个在池里执行的任务。
4.2 线程池任务执行异常无感知
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class ThreadPoolExceptionExample {private static ThreadPoolExecutor executor =new ThreadPoolExecutor(5, 5, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));public static void main(String[] args) {test1();test2();test3();test4();}private static void test1() {executor.submit(() -> {int i = 1 / 0; // 发生异常});System.out.println("执行完成");}/*** 捕获异常*/private static void test2() {executor.submit(() -> {try {int i = 1 / 0; // 发生异常} catch (Exception e) {System.out.println("发生异常: " + e.getMessage());}});System.out.println("执行完成");}/*** 设置全局异常处理器*/private static void test3() {Thread.setDefaultUncaughtExceptionHandler((t, e) -> {System.out.println("发生异常: " + e.getMessage());});executor.execute(() -> {int i = 1 / 0; // 发生异常});System.out.println("执行完成");}/*** 通过Future.get捕获异常*/private static void test4() {Future<Integer> future = executor.submit(() -> {int i = 1 / 0; // 发生异常return i;});try {future.get(); // 捕获异常} catch (Exception e) {System.out.println("发生异常: " + e.getMessage());}System.out.println("执行完成");}
}输出:
执行完成
执行完成
发生异常: / by zero
执行完成
发生异常: / by zero
发生异常: java.lang.ArithmeticException: / by zero
执行完成
总结:避免直接submit任务,不捕获异常,可能执行失败却无法感知到。
4.3 使用无界队列造成OOM
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class ThreadPoolExecutorOOMExample {private static ThreadPoolExecutor executor =new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());public static void main(String[] args) {while (true) {executor.execute(() -> {try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}});}}
}输出:
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "main"
总结:根据实际任务负载设置队列长度,不要使用无界队列。
4.4 拒绝策略设置错误导致接口超时
在 Java 中,大家知道线程池有四种决绝策略,在实际代码编写中,大多数伙伴可能会用 CallerRunsPolicy 策略(由调用线程处理任务)。
异常分析:在核心线程数、最大线程数、阻塞队列都被占满的情况下,就会执行拒绝策略,但是由于使用的是 CallerRunsPolicy 策略,导致线程任务直接由我们的业务线程来执行。如果出现异常情况比如第三方接口超时,那么业务线程执行也会超时,线上服务采用的 Tomcat 容器,最终也就导致 Tomcat 的最大线程数也被占满,进而无法继续向外提供服务。
总结:考虑到线程池任务的重要性,不是很重要的话,可以使用 DiscardPolicy 策略直接丢弃,要是很重要,可以考虑使用消息队列来替换线程池。
五、总结
本文介绍了线程池的基本概念、主要参数、工作流程,以及 execute() 方法的源码分析,此外,还讨论了在实际应用中可能遇到的陷阱和问题。
在实际生产环境中,任务的数量常常是动态变化的,可能导致负载高或任务积压的现象。静态线程池的线程数和队列大小一旦设置,就无法根据实时负载进行动态调整,可能会导致资源浪费或任务执行效率低。美团提出了一种解决方案,通过实时监控线程池的负载和任务积压情况,支持线程池参数的动态调整,并能立即生效。感兴趣的伙伴可以参考相关资料以获取更多信息。
六、参考资料
1、Java线程池实现原理及其在美团业务中的实践
2、Java并发编程:线程池的使用
3、10问10答:你真的了解线程池吗?
4、谈谈JVM内部锁升级过程
5、打通JAVA与内核系列之一ReentrantLock锁的实现原理
6、深度剖析 AQS 设计原理