Java JUC(四) 自定义线程池实现与原理分析

目录

一. 阻塞队列 BlockingQue

二. 拒绝策略 RejectPolicy

三. 线程池 ThreadPool 

四. 模拟运行


在 Java基础(二) 多线程编程 中,我们简单介绍了线程池 ThreadPoolExecutor 的核心概念与基本使用。在本文中,我们将基于前面学习的各种锁与同步工具来实现自定义的线程池,同时来探究和分析 Java 线程池的基本原理。

一. 阻塞队列 BlockingQue

在线程池的生态中,阻塞队列是至关重要的一环,其用于实现任务与工作线程之间的平衡(类似于生产者/消费者模式)。 在此处,我们实现了一个自定义的阻塞队列 BlockingQue,其代码如下:

// 阻塞队列实现
public class BlockingQue<T> {// 1. 任务队列private Deque<T> queue;// 2. 锁private ReentrantLock lock;// 3. 生产者条件变量private Condition fullWaitSet;// 4. 消费者条件变量private Condition emptyWaitSet;// 5. 容量private int capacity;public BlockingQue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.capacity = capacity;// ArrayDeque: 基于 Object[] 实现,可以自动扩容this.queue = new ArrayDeque<>();this.lock = new ReentrantLock(fair);// 读写共用一把锁this.fullWaitSet = lock.newCondition();this.emptyWaitSet = lock.newCondition();}public BlockingQue(int capacity) {this(capacity, false);}// 阻塞添加public void put(T element) throws InterruptedException {lock.lock();try {while (queue.size() == capacity) {fullWaitSet.await();}queue.addLast(element);// 唤醒消费线程emptyWaitSet.signal();} finally {lock.unlock();}}// 非阻塞添加public boolean offer(T element) {lock.lock();try {if (queue.size() == capacity)return false;queue.addLast(element);// 唤醒消费线程emptyWaitSet.signal();return  true;} finally {lock.unlock();}}// 超时阻塞添加public boolean offer(T element, long timeout, TimeUnit unit) throws InterruptedException {lock.lock();try {long nanos = unit.toNanos(timeout);while (queue.size() == capacity) {// 已经超时则返回 falseif (nanos <= 0)return false;nanos = fullWaitSet.awaitNanos(nanos); // awaitNanos 返回剩余等待时间(处理虚假唤醒)}queue.addLast(element);// 唤醒消费线程emptyWaitSet.signal();return true;}finally {lock.unlock();}}// 阻塞获取public T take() throws InterruptedException {lock.lock();try {while (queue.isEmpty()) {emptyWaitSet.await();}T element = queue.removeFirst();// 唤醒生产线程fullWaitSet.signal();return element;} finally {lock.unlock();}}// 超时阻塞获取public T poll(long timeout, TimeUnit unit) throws InterruptedException {lock.lock();try {// 将 timeout 统一转换为纳秒long nanos = unit.toNanos(timeout);while (queue.isEmpty()) {// 已经超时则返回 nullif(nanos <= 0){return null;}nanos = emptyWaitSet.awaitNanos(nanos); // awaitNanos 返回剩余等待时间(处理虚假唤醒)}T element = queue.removeFirst();// 唤醒生产线程fullWaitSet.signal();return element;}finally {lock.unlock();}}//获取大小public int size() {lock.lock();try {return queue.size();} finally {lock.unlock();}}
}

可以看出,上述代码使用了 Deque 作为元素存储容器,但若将 Deque 换成 Object[] 数组,则其基本就是 ArrayBlockingQueue 的实现源码。在实际工作中,若要实现自定义阻塞队列,我们只需要实现 BlockingQueue<E> 接口及其抽象方法即可。

package java.util.concurrent;
import java.util.Collection;
import java.util.Queue;public interface BlockingQueue<E> extends Queue<E> {boolean add(E e);boolean offer(E e);void put(E e) throws InterruptedException;boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;E take() throws InterruptedException;E poll(long timeout, TimeUnit unit) throws InterruptedException;int remainingCapacity();boolean remove(Object o);public boolean contains(Object o);int drainTo(Collection<? super E> c);int drainTo(Collection<? super E> c, int maxElements);
}

二. 拒绝策略 RejectPolicy

在线程数量已满且阻塞队列已满的情况下,主线程则会因为无法放置任务而一直阻塞等待,因此我们需要拒绝策略来处理这种溢出情况。拒绝策略一般定义为接口,并允许我们自定义策略,其代码如下:

// 拒绝策略
@FunctionalInterface
public interface RejectPolicy<T> {void reject(BlockingQue<T> queue, T task);
}

一般接口方法需要提供阻塞队列以及当前任务两个参数,并支持函数式编程;常见的拒绝策略包括:阻塞等待、放弃执行、抛出异常、由调用线程执行等(后续会实现)。在实际工作中,Java已经为我们提供了拒绝策略的顶层设计,若想自定义拒绝策略,我们只需实现 RejectedExecutionHandler 接口并实现其 rejectedExecution 抽象方法即可。

package java.util.concurrent;public interface RejectedExecutionHandler {void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

三. 线程池 ThreadPool 

在本节,我们将实现一个简单的自定义线程池,其只包含核心线程数,并且规定线程池的运行规则如下:

1.若当前线程数 < corePoolSize,则新建线程处理任务;

2.若当前线程数 >= corePoolSize && 任务队列未满,则将任务放入任务队列等待;

3.若当前线程数 >= corePoolSize && 任务队列已满,则执行拒绝策略;

/*** 自定义线程池实现:*  1. 若当前线程数 < corePoolSize,则新建线程处理任务*  2. 若当前线程数 >= corePoolSize && 任务队列未满,则将任务放入任务队列等待*  3. 若当前线程数 >= corePoolSize && 任务队列已满,则执行拒绝策略*/
public class ThreadPool {// 任务队列private BlockingQue<Runnable> taskQueue;// 线程集合private HashSet<Worker> workers = new HashSet<>();// 锁private ReentrantLock mainLock = new ReentrantLock();// 核心线程数private int coreSize;// 获取任务的超时时间(allowThreadTimeOut=true时有效)private long timeOut;// 时间单位(allowThreadTimeOut=true时有效)private TimeUnit timeUnit;// 是否允许线程超时等待(默认允许)private boolean allowThreadTimeOut = true;// 拒绝策略private RejectPolicy<Runnable> rejectPolicy;public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {this.coreSize = coreSize;this.timeOut = timeOut;this.timeUnit = timeUnit;this.taskQueue = new BlockingQue<>(queueCapacity);this.rejectPolicy = rejectPolicy;}// 设置 allowThreadTimeOut 参数public void setAllowThreadTimeOut(boolean allowThreadTimeOut) {this.allowThreadTimeOut = allowThreadTimeOut;}// 执行任务 taskpublic void execute(Runnable task){mainLock.lock();try{if(workers.size() < coreSize){// 添加核心线程Worker worker = new Worker(task);workers.add(worker);worker.start();}else if(!taskQueue.offer(task)){// 执行拒绝策略rejectPolicy.reject(taskQueue, task);}} finally {mainLock.unlock();}}// 工作线程类private class Worker extends Thread{// 执行任务private Runnable task;public Worker(Runnable task){this.task = task;}@Overridepublic void run() {// 获取任务while(task != null || (task = getTask()) != null){try{task.run();}catch (Exception e){e.printStackTrace();}finally {task = null;}}// worker 线程终止synchronized (workers){// 移除 workerworkers.remove(this);}}}// 从阻塞队列中获取等待任务(提供给Worker的钩子方法)private Runnable getTask(){for(;;){try {Runnable r = allowThreadTimeOut ? taskQueue.poll(timeOut, timeUnit) : taskQueue.take();return r;} catch (InterruptedException e) {// 若被中断则重新等待e.printStackTrace();}}}
}

Java ThreadPoolExecutor 的实现相比我们自定义的线程池更加复杂和安全(增加了线程池状态的维护、最大线程数的逻辑、线程池终止方法等),但在核心思想的实现上基本一致,因此这段自定义代码的实现可以帮助我们更加方便的理解 ThreadPoolExecutor 的源码。

四. 模拟运行

public class Main {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(3,10000, TimeUnit.MILLISECONDS, 5,(queue, task) -> {// 1. 死等//try {//    queue.put(task);//} catch (InterruptedException e) {//    e.printStackTrace();//}// 2. 放弃任务执行// do nothing...System.out.println("do discard policy...");// 3. 抛出异常//throw new RuntimeException("task run fail" + task);// 4. 调用线程执行任务//task.run();});for (int i = 0; i < 10; i++) {int j = i;threadPool.execute(() -> {try {Thread.sleep(1000L);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(j + "is running...");});}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/937.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

go-logger v0.27.0 - 并发性能为官方库 10 倍

go-logger是一个高性能的 golang 日志库&#xff0c;旨在提供快速、轻量级的日志记录功能 Github 使用文档 v0.27.0 更新内容 优化内存分配优化写数据性能增加日志属性自定义函数增加各个日志级别格式化打印函数 说明 性能优化是该版本最重要的更新内容。性能优化的结果&…

【华为HCIP实战课程31(完整版)】中间到中间系统协议IS-IS路由汇总详解,网络工程师

一、IS-IS的汇总 1、可以有效减少在LSP中发布的路由条目,减小对系统资源的占用。 2、会减少LSP报文的扩散,接收到该LSP报文的其他设备路由表中只会出现一条聚合路由。 3、可以避免网络中的路由震荡,提高了网络的稳定性。 4、被聚合的路由可以是IS-IS路由,也可以是被引入…

邮件发送excel带预览excel功能

excel 打开后的内容: 思路&#xff1a; 1、邮件发送excel 是作为附件发送出去的&#xff1b; 2、excel 预览是&#xff0c;必须另外点击预览按钮&#xff0c;并不能直接预览邮件内容然后在邮件主体内容展示出来 根据以上两点基本没法实现 邮件发送后邮件自带 预览功能。 伪方法…

HCIA(DHCP服务)

第三节 开启DHCP服务 创建地址池 调用全局服务 [R1]dhcp enable 开启DHCP服务 [R1]ip pool AA 创建地址池 [R1-ip-pool-AA]network 192.168.1.0 mask 24 写入网段 [R1-ip-pool-AA]gateway-list 192.168.1.1 写入网关 [R1-ip-pool-AA]dns-list 8.8.8.8 114.1…

java项目之文理医院预约挂号系统源码(springboot)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于springboot的文理医院预约挂号系统。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息。 项目简介&#xff1a; 本系统的使用角色可…

A4-C四驱高防变电站巡检机器人

在电力行业数字化、智能化转型进程中&#xff0c;搭载多模态成像传感器的变电站巡检机器人、视频监控设备逐渐取代传统人工&#xff0c;成为变电设备状态监测的主要工具。变电站巡检机器人具有全天候、非接触式、多参量测量等特点&#xff0c;结合内置人工智能算法完成仪表识别…

【华为HCIP实战课程三十】中间到中间系统协议IS-IS路由渗透及TAG标识详解,网络工程师

一、路由泄露 1、默认情况Level 1不会学到Level2的明细路由&#xff0c;L2可以学到L1的明细路由 2、FIB数据转发&#xff0c;路由负载&#xff0c;通过随机数据中的五元组hash,hash值决定数据走哪条链路 R1设备ping和telnet通过抓包查看走的都是S1/0/0接口 抓包进行过滤;ip.a…

面向对象三大特征之一:封 装

1、特点 封装是面向对象的核心思想&#xff0c;两层含义&#xff1a;一是一个整体&#xff08;把对象的属性和行为看成一个整体&#xff0c;即封装在一个对象种&#xff09;&#xff0c;二是信息隐藏&#xff0c;对外隐藏&#xff0c;但可以通过某种方式进行调用。 2、访问权…

引领汽车行业未来,3D数字化技术如何改变汽车行业?

新能源汽车行业加速发展&#xff0c;新车型密集发布&#xff0c;汽车保有量和车龄的增加&#xff0c;也同时点燃了汽车后市场的增长引擎。对于车企而言&#xff0c;如何全方面优化汽车从研发、生产、售后到营销的各个环节&#xff0c;以便适应快速变化的市场需求&#xff1f; 1…

使用MongoDB Atlas构建无服务器数据库

&#x1f493; 博客主页&#xff1a;瑕疵的CSDN主页 &#x1f4dd; Gitee主页&#xff1a;瑕疵的gitee主页 ⏩ 文章专栏&#xff1a;《热点资讯》 使用MongoDB Atlas构建无服务器数据库 MongoDB Atlas 简介 注册账户 创建集群 配置网络 设置数据库用户 连接数据库 设计文档模式…

萌熊数据科技:剑指脑机转入,开启科技新篇章

近日&#xff0c;科技圈传来一则令人瞩目的消息&#xff0c;天津萌熊数据科技有限公司和天津一万年科技发展有限公司在全国范围内大力开展AI加生命科学的主体业务&#xff0c;并明确将朝着脑机转入方向深入发展&#xff0c;引发了行业内外的广泛关注。 天津萌熊数据科技有限公司…

【云备份项目】json以及jsoncpp库的使用

目录 1.JSON 2.什么是 JSON&#xff1f; 3.JSON 发展史 4.为什么要使用 JSON&#xff1f; 5.JSON 的不足 6.JSON 应该如何存储&#xff1f; 7.什么时候会使用 JSON 7.1.定义接口 7.2.序列化 7.3.生成 Token 7.4.配置文件 8.JSON的语法规则 8.1.对象和数组 8.2.JS…

动态规划-两个数组的dp问题——1035.不相交的线

1.题目解析 题目来源&#xff1a;1035.不相交的线 测试用例 2.算法原理 本题实质上就是寻找两个数组的最长公共子序列&#xff0c;所以可以直接移步到 最长公共子序列 的解析博客&#xff0c;那里更加详细的介绍了如何处理最长公共子序列的判断 1.状态表示 由题目可知是两个…

Android13开发恢复出厂设置默认打开WiFi

案例&#xff1a; 测试要求系统恢复出场设置的时候&#xff0c;默认WiFi打开 代码 找到def_wifi_on,将它修改为true即可 修改路径&#xff1a;QSSI.13/packages/services/Car/car_product/overlay/frameworks/base/packages/SettingsProvider/res/values/defaults.xml

Spring Boot框架下校园社团信息管理的优化策略

1系统概述 1.1 研究背景 随着计算机技术的发展以及计算机网络的逐渐普及&#xff0c;互联网成为人们查找信息的重要场所&#xff0c;二十一世纪是信息的时代&#xff0c;所以信息的管理显得特别重要。因此&#xff0c;使用计算机来管理校园社团信息管理系统的相关信息成为必然。…

MySql基础:数据类型

目录 1. 数据类型的整体分类 2. 整数数据类型 2.1 TINYINT 类型 2.2 bit 类型 2.3 小数类型 2.3.1 float 2.3.2 decimal 2.4 字符串类型 2.4.1 char 2.4.3 varchar 2. 5 日期和时间类型 2.6 enum和set类型 补充&#xff1a; 1. 数据类型的整体分类 说明&#xff1a;在…

【测试工具】通过Jmeter压测存储过程

目录 一、存储过程准备1.1、 建立空表1.2、 建立存储过程1.3、调试 二、测试工具准备三、工具配置及执行3.1、配置JDBC Connection Configuration&#xff1a;3.2、配置吞吐量控制器3.3、配置JDBC Request 一、存储过程准备 1.1、 建立空表 CREATE TABLE test_data ( id NUMB…

探讨Facebook的AI研究:未来社交平台的技术前瞻

在数字时代&#xff0c;社交媒体已成为人们日常生活的重要组成部分。作为全球最大的社交网络之一&#xff0c;Facebook不断致力于人工智能&#xff08;AI&#xff09;的研究与应用&#xff0c;以提升用户体验、增强平台功能并推动技术创新。本文将探讨Facebook在AI领域的研究方…

二百七十五、Kettle——ClickHouse增量导入数据补全以及数据修复记录表数据(实时)

一、目的 在完成数据修复后&#xff0c;需要生成修复记录 二、Hive中原有代码 2.1 表结构 --52、数据补全以及数据修复记录表 dwd_data_correction_record create table if not exists hurys_db.dwd_data_correction_record(data_type int comment 数据类型…

无人机之集群控制方法篇

无人机的集群控制方法涉及多个技术和策略&#xff0c;以确保多架无人机能够协同、高效地执行任务。以下是一些主要的无人机集群控制方法&#xff1a; 一、编队控制方法 领航-跟随法&#xff08;Leader-Follower&#xff09; 通过设定一架无人机作为领航者&#xff08;长机&am…