线程池
为什么要创建线程池
因为CPU核心数量有限,如果每来一个任务就创建一个线程,就会使线程数远远多于CPU核心数,使线程上下文切换过于频繁,会导致系统性能降低。而且每创建一个线程都会占用一定的内存,如果每来一个任务就创建一个线程,内存消耗太大了。
ThreadPoolExecutor
1.线程池状态
ThreadPoolExecutor使用int的高3位来表示线程状态,低29位表示线程数量
从数字上比较,TERMINATED>TIDYING>STOP>SHUTDOWN>RUNNING(因为高3位所以111最小)
这些信息存储在一个原子变量ctl中,目的是将线程状态与线程个数合二为一,这样就可以用一次cas原子操作进行赋值
// c为旧值,ctlOf返回结果为新值
ctl.compareAndSet(c,ctlOf(targetState,workerCountOf(c)));
// rs 为高 3 位代表线程状态,wc为低29位代表线程个数,ctl是合并它们
private static int ctlOf(int res,int wc){return res | wc;
}
2.构造方法
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
· corePoolSize核心线程数目(最多保留的线程数)
· maximumPoolSize 最大线程数目(核心线程数加救急线程数)
· keepAliveTime 生存时间 - 针对救急线程
· unit 时间单位 - 针对救急线程
· workQueue 阻塞队列
· threadFactory 线程工厂 - 可以为线程创建时起个好名字
· handler 拒绝策略
工作方式
当核心线程数不够用,新来的任务就会进入阻塞队列中等待执行。
当阻塞队列达到上限时,还新来任务,这时救急线程就会将新来的任务执行
当任务5执行完后(过完生存时间)救急线程就会被回收,等到下回需要时再重新创建。核心线程被创建后就不会被回收了,一直存在于线程池中。
如果拒绝队列选择了有界队列,那么任务超过了队列大小时,会创建救急线程
当急救线程也被占满时,这时再来的任务就会采取拒绝策略了,jdk提供了四种拒绝策略(前四种)
· AbortPolicy 让调用者抛出 RejecedtExecutionException异常,这是默认策略
· CallerRunsPolicy 让调用者运行任务
· DiscardPolicy 放弃本次任务
· DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
· Dubbo 的实现,在抛出RejectedExecutionException 异常之前会记录日志,并dump线程栈信息,方便定位问题
· Netty 的实现,是创建一个新线程来执行任务
· AcitveMQ的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略(自定义线程池的文章有)
· PinPoint的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
当高峰过去后,超过corePoolSize的急救线程如果一段时间没有任务做,需要结束节省资源,这个时间由keepAliveTime 和 unit来控制。
根据这个构造方法,JDK Executors类中提供了众多工厂方法来创建各种用途的线程池
3.newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
特点
· 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
· 阻塞队列是无界的,可以放任意数量的任务
测试代码
public static void main(String[] args) {ExecutorService pool = Executors.newFixedThreadPool(2, new ThreadFactory() {AtomicInteger arr = new AtomicInteger(1);@Overridepublic Thread newThread(Runnable r) {// 通过线程工厂对线程进行改名return new Thread(r,"mypool_t "+arr.getAndIncrement());}});pool.execute(()->{log.debug("1");});pool.execute(()->{log.debug("2");});pool.execute(()->{log.debug("3");});}
评价
适用于任务量已知,相对耗时的任务
4.newCachedThreadPool
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
特点
核心线程数是0,最大线程数是Integer.MAX_VALUE,救急线程的空闲生存时间是60s。(意味着全部都是急救线程,60后可以回收。急救线程可以无限创建)
队列采用了SynchronousQueue实现特点,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货)
SynchronousQueue代码实现
public static void main(String[] args) throws InterruptedException {SynchronousQueue<Integer> integers = new SynchronousQueue<>();new Thread(()->{try {log.debug("putting {}",1);integers.put(1);log.debug("{} putted...",1);log.debug("putting {}",2);integers.put(2);log.debug("{} putted...",2);} catch (InterruptedException e) {e.printStackTrace();}},"t1").start();Thread.sleep(1000);new Thread(()->{try {log.debug("take {}",1);integers.take();} catch (InterruptedException e) {e.printStackTrace();}},"t2").start();Thread.sleep(1000);new Thread(()->{try {log.debug("take {}",2);integers.take();} catch (InterruptedException e) {e.printStackTrace();}},"t3").start();
}
输出结果
评价
整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲1分钟后释放线程。
适合任务数比较密集,但每个任务执行时间较短的情况
5.newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
使用场景
希望多个任务排队执行。线程数固定为1,任务数多于1时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。
与自己创建一个线程的区别
自己创建一个单线程穿行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
Executors.newSingleThreadExecutor()线程个数始终为1,不能修改
· FinalizeableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了ExecutorService接口,因此不能调用ThreadPoolExecutor 中特有的方法
Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改
· 对外暴露的是ThreadPoolExecutor对象,可以强制后调用setCorePoolSize 等方法进行修改
6.提交任务
// 执行任务
void execute(Runnable command);
// 提交任务task,用返回值Future获得任务执行结果
<T> Future<T> submit(Callable<T> task);
// 提交 tasks 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;
// 提交tasks 中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;
//提交tasks中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException,ExecutionException;
// 提交tasks中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,TimeUnit unit)throws InterruptedException,ExecutionException,TimeoutException;
7.关闭线程池
shutdown
让线程执行完shutdown之前的任务再停止。
/*线程池状态变为SHUTDOWN- 不会接收新方法- 但已经提交任务会执行完- 此方法不会阻塞调用线程的执行
*/
void shutdown();
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();// 修改线程状态advanceRunState(SHUTDOWN);// 仅会打断空空闲线程interruptIdleWorkers();onShutdown(); // 扩展点 ScheduledThreadPoolExecutor} finally {mainLock.unlock();}// 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)tryTerminate();
}
shutdownNow
让所有任务都终止,就连当前执行的线程也终止,阻塞队列中的任务会当作返回值返回。
/*线程池状态变为STOP- 不会接收新任务- 会将队列中的任务返回- 并用interrupt的方式中断正在执行的任务
*/
List<Runnable> shutdownNow();
public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();// 修改线程状态advanceRunState(STOP);// 打断所有线程interruptWorkers();// 获取队列中剩余任务tasks = drainQueue();} finally {mainLock.unlock();}// 尝试终结tryTerminate();return tasks;}
其它方法
// 不在 RUNNING 状态的线程池,此方法就返回true
public boolean isShutdown();
// 线程池状态是否是TERMINATED
public boolean isTerminated();
//调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池TERMINATED后做些事情,可以利用此方法等待
public boolean awaitTermination(long timeout, TimeUnit unit);