JUC并发-共享模型-工具-线程池

1、自定义线程池

每创建一个线程资源,就要占用一定的内存;
①如果是高并发场景,一下子来了很多任务,如果为每个任务都创建一个线程,对内存占用较大,甚至可能出现OOM。
②大量任务来了,创建了很多线程。从cpu角度出发,核有限,只能让获取不到cpu时间片的线程陷入阻塞,这就会引起这个线程的一个上下文切换问题。它需要把这个当前线程运行的状态先保存下来,下次轮到这个线程运行时还要恢复它当初的那些状态。

1.1 阻塞队列

class BlockingQueue<T>{// 1.任务队列private Deque<T> queue = new ArrayDeque<>();// 2.锁private ReentrantLock lock = new ReentrantLock();// 3.生产者条件变量private Condition fullWaitSet = lock.newCondition();// 4.消费者条件变量private Condition emptyWaitSet = lock.newCondition();// 5.容量private int capcity;public BlockingQueue(int capcity) {this.capcity = capcity;}
//阻塞获取
public T take(){lock.lock();try{while(queue.isEmpty()){try {emptyWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal();return t;}finally{lock.unlock();}
}
//阻塞添加
public void put(T element){lock.lock();try{while(queue.size()==capcity){try {fullWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}queue.addLast(element);emptyWaitSet.signal();}finally {lock.unlock();}
}
//获取大小
public int size(){lock.lock();try{return queue.size();}finally {lock.unlock();}
}}

1.2 带超时的阻塞获取-poll

//带超时的阻塞获取
public T poll(long timeout, TimeUnit unit){lock.lock();try{long nanos = unit.toNanos(timeout);//将timeout时间统一转化为纳秒while(queue.isEmpty()){try {//没等到直接返回if(nanos<=0){return null;}//返回的是剩余时间nanos = emptyWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal();return t;}finally{lock.unlock();}
}

1.3 线程池的基本框架

class TreadPool{//任务队列private BlockingQueue<Runnable> taskQueue;//线程集合private HashSet<Worker> workers = new HashSet<>();//核心线程数private int coreSize;//获取任务的超时时间private long timeout;//时间单位private TimeUnit timeUnit;public void execute(Runnable task){}public TreadPool( int coreSize, long timeout, TimeUnit timeUnit,int queueCapcity) {this.taskQueue = new BlockingQueue<>(queueCapcity);this.coreSize = coreSize;this.timeout = timeout;this.timeUnit = timeUnit;}class Worker{}
}

 1.4 任务提交-Worker实现

//执行任务
public void execute(Runnable task){//当任务数没有超过coreSize时,直接交给worker对象执行//如果任务数超过coreSize时,加入任务队列暂存synchronized (workers){if(workers.size()<coreSize){Worker worker = new Worker(task);log.debug("新增 worker{},{}",worker,task);workers.add(worker);worker.start();}else{log.debug("加入任务队列 {}",task);taskQueue.put(task);}}}class Worker extends Thread{private Runnable task;public Worker(Runnable task) {this.task = task;}public void run(){//执行任务// 1)当task不为空,执行任务// 2)当task执行完毕,再接着从任务队列获取任务while(task !=null || (task = taskQueue.take() )!= null){try{log.debug("正在执行...{}",task);task.run();}catch(Exception e){e.printStackTrace();}finally {task=null;}}synchronized (workers){log.debug("worker被移除{}",this);workers.remove(this);}}
}

 1.5 take死等 poll超时

@Slf4j(topic="c.TestPool")
public class TestPool {public static void main(String[] args){ThreadPool threadPool = new ThreadPool(2,1000,TimeUnit.MICROSECONDS,1);for (int i = 0; i < 5; i++) {int j = i;threadPool.execute(()->{log.debug("{}",j);});}}
}
  1. 因为线程池线程数为2,所以是创建了2个worker线程
  2. 然后把创建出来的2个线程加入到任务队列中等待执行
  3. Thread1和Thread2分别执行。输出结果1和0
  4. 执行完一个任务,新的任务会被继续加入任务队列
  5. 总共设置了5个任务,全部比线程执行完毕

 

1.6  任务队列已满-带超时的阻塞获取-offer

现在假如核心线程数为2,队列容量大小为10,假如处理一个任务的耗时很长,生成了15个任务,必然会有10个任务在任务队列中阻塞,而有3个任务等待加入任务队列。

应该添加一个拒绝策略

//带超时时间的阻塞添加
public boolean offer(T task,long timeout,TimeUnit timeUnit){lock.lock();try{long nanos = timeUnit.toNanos(timeout);while(queue.size()==capcity){try {log.debug("等待加入任务队列{}...",task);if(nanos<=0){return false;}nanos = fullWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}log.debug("加入任务队列 {}",task);queue.addLast(task);emptyWaitSet.signal();return true;}finally {lock.unlock();}
}

1.7 拒绝策略-Worker中的execute()实现

策略模式,把操作抽象为接口,具体的实现由调用者传递进来。

public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(1,1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{// 1. 死等
//            queue.put(task);// 2) 带超时等待
//            queue.offer(task, 1500, TimeUnit.MILLISECONDS);// 3) 让调用者放弃任务执行
//            log.debug("放弃{}", task);// 4) 让调用者抛出异常
//            throw new RuntimeException("任务执行失败 " + task);// 5) 让调用者自己执行任务task.run();});for (int i = 0; i < 4; i++) {int j = i;threadPool.execute(() -> {try {Thread.sleep(1000L);} catch (InterruptedException e) {e.printStackTrace();}log.debug("{}", j);});}
}

ThreadPool类 

 

BlockingQueue类

public void tryPut(RejectPolicy<T> rejectPolicy, T task) {lock.lock();try{//判断队列是否已满if(queue.size()==capcity){rejectPolicy.reject(this,task);}else{ //队列有空闲log.debug("加入任务队列 {}",task);queue.addLast(task);emptyWaitSet.signal();}}finally{lock.unlock();}
}

1.8 完整代码

import lombok.extern.slf4j.Slf4j;
import org.springframework.core.log.LogDelegateFactory;import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;@Slf4j(topic = "c.TestPool")
public class TestPool {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(1,1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{// 1. 死等
//            queue.put(task);// 2) 带超时等待
//            queue.offer(task, 1500, TimeUnit.MILLISECONDS);// 3) 让调用者放弃任务执行
//            log.debug("放弃{}", task);// 4) 让调用者抛出异常
//            throw new RuntimeException("任务执行失败 " + task);// 5) 让调用者自己执行任务task.run();});for (int i = 0; i < 4; i++) {int j = i;threadPool.execute(() -> {try {Thread.sleep(1000L);} catch (InterruptedException e) {e.printStackTrace();}log.debug("{}", j);});}}
}@FunctionalInterface // 拒绝策略
interface RejectPolicy<T> {void reject(BlockingQueue<T> queue, T task);
}@Slf4j(topic = "c.ThreadPool")
class ThreadPool {// 任务队列private BlockingQueue<Runnable> taskQueue;// 线程集合private HashSet<Worker> workers = new HashSet<>();// 核心线程数private int coreSize;// 获取任务时的超时时间private long timeout;private TimeUnit timeUnit;private RejectPolicy<Runnable> rejectPolicy;// 执行任务public void execute(Runnable task) {// 当任务数没有超过 coreSize 时,直接交给 worker 对象执行// 如果任务数超过 coreSize 时,加入任务队列暂存synchronized (workers) {if(workers.size() < coreSize) {Worker worker = new Worker(task);log.debug("新增 worker{}, {}", worker, task);workers.add(worker);worker.start();} else {
//                taskQueue.put(task);// 1) 死等// 2) 带超时等待// 3) 让调用者放弃任务执行// 4) 让调用者抛出异常// 5) 让调用者自己执行任务taskQueue.tryPut(rejectPolicy, task);}}}public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {this.coreSize = coreSize;this.timeout = timeout;this.timeUnit = timeUnit;this.taskQueue = new BlockingQueue<>(queueCapcity);this.rejectPolicy = rejectPolicy;}class Worker extends Thread{private Runnable task;public Worker(Runnable task) {this.task = task;}@Overridepublic void run() {// 执行任务// 1) 当 task 不为空,执行任务// 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
//            while(task != null || (task = taskQueue.take()) != null) {while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {try {log.debug("正在执行...{}", task);task.run();} catch (Exception e) {e.printStackTrace();} finally {task = null;}}synchronized (workers) {log.debug("worker 被移除{}", this);workers.remove(this);}}}
}
@Slf4j(topic = "c.BlockingQueue")
class BlockingQueue<T> {// 1. 任务队列private Deque<T> queue = new ArrayDeque<>();// 2. 锁private ReentrantLock lock = new ReentrantLock();// 3. 生产者条件变量private Condition fullWaitSet = lock.newCondition();// 4. 消费者条件变量private Condition emptyWaitSet = lock.newCondition();// 5. 容量private int capcity;public BlockingQueue(int capcity) {this.capcity = capcity;}// 带超时阻塞获取public T poll(long timeout, TimeUnit unit) {lock.lock();try {// 将 timeout 统一转换为 纳秒long nanos = unit.toNanos(timeout);while (queue.isEmpty()) {try {// 返回值是剩余时间if (nanos <= 0) {return null;}nanos = emptyWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal();return t;} finally {lock.unlock();}}// 阻塞获取public T take() {lock.lock();try {while (queue.isEmpty()) {try {emptyWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal();return t;} finally {lock.unlock();}}// 阻塞添加public void put(T task) {lock.lock();try {while (queue.size() == capcity) {try {log.debug("等待加入任务队列 {} ...", task);fullWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}log.debug("加入任务队列 {}", task);queue.addLast(task);emptyWaitSet.signal();} finally {lock.unlock();}}// 带超时时间阻塞添加public boolean offer(T task, long timeout, TimeUnit timeUnit) {lock.lock();try {long nanos = timeUnit.toNanos(timeout);while (queue.size() == capcity) {try {if(nanos <= 0) {return false;}log.debug("等待加入任务队列 {} ...", task);nanos = fullWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}log.debug("加入任务队列 {}", task);queue.addLast(task);emptyWaitSet.signal();return true;} finally {lock.unlock();}}public int size() {lock.lock();try {return queue.size();} finally {lock.unlock();}}public void tryPut(RejectPolicy<T> rejectPolicy, T task) {lock.lock();try {// 判断队列是否满if(queue.size() == capcity) {rejectPolicy.reject(this, task);} else {  // 有空闲log.debug("加入任务队列 {}", task);queue.addLast(task);emptyWaitSet.signal();}} finally {lock.unlock();}}
}

2、ThreadPoolExecutor

2.1 线程池状态

ThreadPoolExecutor使用int的高3位(最高位为符号位)来表示线程池状态低 29 位表示线程数量

状态名高3位

是否接接收

新任务

是否处理

阻塞队列任务

说明
running111YY
shutdown000NY不会接收新任务,但会处理阻塞队列中剩余的任务
stop001NN不会接收新任务,同时会中断正在执行的任务,并抛弃阻塞队列的任务
tidying010--任务全执行完毕,活动线程为 0, 即将进入终结
terminated011--终结状态

这些信息存储在一个原子变量 ctl 中,目的是将线程池状态线程个数合二为一,这样就可以一次 cas 原子操作进行赋值

// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }

 2.2 ThreadPoolExecutor构造方法-创建线程池

public ThreadPoolExecutor(int corePoolSize,    //核心线程数int maximumPoolSize, //最大线程数=核心线程数+救急线程数long keepAliveTime,  //救急线程生存时间TimeUnit unit,       //救急线程活动保持时间的单位BlockingQueue<Runnable> workQueue,    //阻塞队列ThreadFactory threadFactory,        //线程工厂,为创建线程时起名字RejectedExecutionHandler handler) //拒绝策略

2.3 其它工厂方法-创建线程池

newFixedThreadPool

适用于任务量已知,相对耗时的任务

  • 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
  • 阻塞队列无界的,可以放任意数量的任务

newSingleThreadExecutor

希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。

区别:newSingleThreadExecutor()与newFixedThreadPool(1)

  • newSingleThreadExecutor()自己创建一个单线程串行执行任务,如果任务执行失败而终止,那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作。
  • Executors.newSingleThreadExecutor()线程个数始终为1,不能修改
    • FinalizableDelegatedExecutorService应用的是装饰器模式,只对外暴露了ExecutorService接口,因此不能调用ThreadPoolExecutor中特有的方法。
  • Executors.newFixedThreadPool(1)初始时为1,以后还可以修改
    • 对外暴露的是ThreadPoolExecutor对象,可以强转后调用setCorePoolSize等方法进行修改 。

newCachedThreadPool

整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线程。 适合任务数比较密集,但每个任务执行时间较短的情况。

  • 核心线程数是0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着
    • 全部都是救急线程(60s 后可以回收)
    • 救急线程可以无限创建
  • 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的

2.4 提交任务

void execute(Runnable command);// 执行任务

submit

// 提交任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);
@Slf4j(topic = "c.Test1")
public class Test1 {public static void main(String[] args) throws ExecutionException,InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(2);Future<String> future = pool.submit(new Callable<String>() {@Overridepublic String call() throws Exception {log.debug("running");Thread.sleep(1000);return "ok";}});log.debug("{}",future.get());}
}

invokeAll

// 提交 tasks 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;
// 提交 tasks 中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;
@Slf4j(topic = "c.Test1")
public class Test1 {public static void main(String[] args) throws ExecutionException,InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(2);List<Future<String>> futures =  pool.invokeAll(Arrays.asList(()->{log.debug("begin");Thread.sleep(1000);return "1";},()->{log.debug("begin");Thread.sleep(500);return "2";},()->{log.debug("begin");Thread.sleep(2000);return "3";}));futures.forEach(f->{try {log.debug("{}",f.get());} catch (InterruptedException  | ExecutionException e) {e.printStackTrace();}});}
}

invokeAny

// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
@Slf4j(topic = "c.Test1")
public class Test1 {public static void main(String[] args) throws ExecutionException,InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(3);String result =  pool.invokeAny(Arrays.asList(()->{log.debug("begin");Thread.sleep(1000);log.debug("end");return "1";},()->{log.debug("begin");Thread.sleep(500);log.debug("end");return "2";},()->{log.debug("begin");Thread.sleep(2000);log.debug("end");return "3";}));log.debug("{}",result);}
}

2.5 停止

shutdown

/*
线程池状态变为 SHUTDOWN
- 不会接收新任务
- 但已提交任务会执行完
- 此方法不会阻塞调用线程的执行
*/
void shutdown();

shutdownNow

/*
线程池状态变为 STOP
- 不会接收新任务
- 会将队列中的任务返回
- 并用 interrupt 的方式中断正在执行的任务
*/
List<Runnable> shutdownNow();

其它方法

// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();
// 线程池状态是否是 TERMINATED
boolean isTerminated();
// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池TERMINATED 后做些事情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
@Slf4j(topic = "c.Test1")
public class Test1 {public static void main(String[] args) throws ExecutionException,InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(2);Future<Integer> result = pool.submit(()->{log.debug("task 1 running...");Thread.sleep(1000);log.debug("task 1 finish...");return 1;});Future<Integer> result2 = pool.submit(()->{log.debug("task 2 running...");Thread.sleep(1000);log.debug("task 2 finish...");return 2;});Future<Integer> result3 = pool.submit(()->{log.debug("task 3 running...");Thread.sleep(1000);log.debug("task 3 finish...");return 3;});log.debug("shutdown");pool.shutdown();Future<Integer> result4 = pool.submit(()->{log.debug("task 4 running...");Thread.sleep(1000);log.debug("task 4 finish...");return 4;});}
}

3、异步模式之工作线程

3.1 定义

有限的工作线程(WorkerThread)来轮流异步处理无限多的任务。也可以将其归类为分工模式,它的典型实现就是线程池,也体现了经典设计模式中的享元模式

例如,海底捞的服务员(线程),轮流处理每位客人的点餐(任务),如果为每位客人都配一名专属的服务员,那么成本就太高了(对比另一种多线程设计模式:Thread-Per-Message)
注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率。
例如,如果一个餐馆的工人既要招呼客人(任务类型A),又要到后厨做菜(任务类型B)显然效率不咋地,分成服务员(线程池A)与厨师(线程池B)更为合理,当然你能想到更细致的分工。

3.2 饥饿 

 固定大小线程池有饥饿现象

场景:两个工人是同一个线程池中的两个线程
他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作

  • 客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待
  • 后厨做菜:没啥说的,做就是了

比如工人A处理了点餐任务,接下来它要等着工人B把菜做好,然后上菜,他俩也配合的蛮好
但现在同时来了两个客人,这个时候工人A和工人B都去处理点餐了,这时没人做饭了,饥饿

@Slf4j(topic = "c.Test1")
public class Test1 {static final List<String> MENU=Arrays.asList("地三鲜","宫保鸡丁","辣子鸡丁","烤鸡翅");static Random RANDOM = new Random();static String cooking(){return MENU.get(RANDOM.nextInt(MENU.size()));}public static void main(String[] args) throws ExecutionException,InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(2);pool.execute(()->{log.debug("处理点餐...");Future<String> f=pool.submit(()->{log.debug("做菜");return cooking();});try{log.debug("上菜:{}",f.get());}catch (InterruptedException|ExecutionException e){e.printStackTrace();}});pool.execute(()->{log.debug("处理点餐...");Future<String> f=pool.submit(()->{log.debug("做菜");return cooking();});try{log.debug("上菜:{}",f.get());}catch (InterruptedException|ExecutionException e){e.printStackTrace();}});}
}

 解决:设置2种类型线程池

3.3 创建多少线程池合适

过小会导致程序不能充分地利用系统资源、容易导致饥饿

过大会导致更多的线程上下文切换占用更多内存

CPU密集型运算

通常采用cpu核数+1能够实现最优的CPU利用率,+1是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证CPU时钟周期不被浪费

I/O 密集型运算

CPU不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用CPU资源,但当你执行I/O操作时、远程RPC调用时,包括进行数据库操作时,这时候CPU就闲下来了,你可以利用多线程提高它的利用率。
经验公式如下:
线程数=核数*期望CPU利用率*总时间(CPU计算时间+等待时间)/CPU计算时间
例如4核CPU计算时间是50%,其它等待时间是50%,期望cpu被100%利用,套用公式
4*100%*100%/50%=8
例如4核CPU计算时间是10%,其它等待时间是90%,期望cpu被100%利用,套用公式
4*100%*100%/10%=40 

4、任务调度线程池

4.1 Timer

在『任务调度线程池』功能加入之前,可以使用java.util.Timer来实现定时功能,Timer的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。

@Slf4j(topic = "c.TestTimer")
public class TestTimer {public static void main(String[] args){Timer timer = new Timer();TimerTask task1 = new TimerTask() {@Overridepublic void run() {log.debug("task 1");sleep(2);}};TimerTask task2 = new TimerTask() {@Overridepublic void run() {log.debug("task 2");}};log.debug("start...");// 使用timer添加两个任务,希望它们都在1s后执行// 但由于timer内只有一个线程来顺序执行队列中的任务,因此『任务1』的延时,影响了『任务2』的执行timer.schedule(task1, 1000);timer.schedule(task2, 1000);}
}

4.2 ScheduledExecutorService延时执行schedule

尽管任务1存在延时,但两个线程都是并行执行的。

@Slf4j(topic = "c.TestTimer")
public class TestTimer {public static void main(String[] args) throws ExecutionException, InterruptedException {ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);pool.schedule(() -> {log.debug("task1");sleep(2);//int i = 1/0;	//任务1中存在异常也不会影响任务执行,并且不会在控制台输出}, 1, TimeUnit.SECONDS);pool.schedule(() -> {log.debug("task2");}, 1, TimeUnit.SECONDS);}
}	

4.3 ScheduledExecutorService定时执行

scheduleAtFixedRate

@Slf4j(topic = "c.TestTimer")
public class TestTimer {public static void main(String[] args) throws ExecutionException, InterruptedException {ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);log.debug("start...");pool.scheduleAtFixedRate(() -> {log.debug("running...");}, 1, 1, TimeUnit.SECONDS);}
}

输出分析:一开始,延时 1s,接下来,由于任务执行时间 > 间隔时间,间隔被『撑』到了 2s

scheduleWithFixedDelay 

@Slf4j(topic = "c.TestTimer")
public class TestTimer {public static void main(String[] args) throws ExecutionException, InterruptedException {ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);log.debug("start...");pool.scheduleWithFixedDelay(() -> {log.debug("running...");sleep(2);}, 1, 1, TimeUnit.SECONDS);}
}

 

输出分析:一开始,延时1s,scheduleWithFixedDelay的间隔是 上一个任务结束-延时-下一个任务 开始所以间隔都是3s

4.4 正确处理执行任务异常

主动捕捉异常try-catch

ExecutorService pool = Executors.newFixedThreadPool(1);
pool.submit(() -> {try {log.debug("task1");int i = 1 / 0;} catch (Exception e) {log.error("error:", e);}
});

 使用 Future

ExecutorService pool = Executors.newFixedThreadPool(1);
Future<Boolean> f = pool.submit(() -> {log.debug("task1");int i = 1 / 0;return true;
});
log.debug("result:{}", f.get());

 4.5 案例-让每周四 18:00:00 定时执行任务

package cn.itcast.n8;import java.time.DayOfWeek;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class TestSchedule {// 如何让每周四 18:00:00 定时执行任务?public static void main(String[] args) {//  获取当前时间LocalDateTime now = LocalDateTime.now();System.out.println(now);// 获取周四时间LocalDateTime time = now.withHour(18).withMinute(0).withSecond(0).withNano(0).with(DayOfWeek.THURSDAY);// 如果 当前时间 > 本周周四,必须找到下周周四if(now.compareTo(time) > 0) {time = time.plusWeeks(1);}System.out.println(time);// initailDelay 代表当前时间和周四的时间差// period 一周的间隔时间long initailDelay = Duration.between(now, time).toMillis();long period = 1000 * 60 * 60 * 24 * 7;ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);pool.scheduleAtFixedRate(() -> {System.out.println("running...");}, initailDelay, period, TimeUnit.MILLISECONDS);}
}

5、tomcat线程池

tomcat分为两大组件:连接器(Connector对外交流)和容器(负责实现server类的规范,去运行server组件)。
连接器部分便用到了线程池

 

  • LimitLatch用来限流,可以控制最大连接个数,类似J.U.C中的Semaphore
  • Acceptor只负责【接收新的socket连接
  • Poller只负责监听socket channel是否有【可读的I/O事件
  • 一旦可读,封装一个任务对象socketProcessor),提交给Executor线程池处理
  • Executor线程池中的工作线程最终负责【处理请求】 

Tomcat线程池扩展了ThreadPoolExecutor,行为稍有不同。如果总线程数达到maximumPoolSize不会立刻抛RejectedExecutionException异常,而是再次尝试将任务放入队列如果还失败才抛RejectedExecutionException异常。 

Connector 配置 

配置项默认值说明
acceptorThreadCount1acceptor线程数量
pollerThreadCount1poller线程数量(poller这个线程采用了多路复用的思想:一个线程就能监控多个个channel的读写事件。)
minSpareThreads10核心线程数,即corePoolSize
maxThreads200最大线程数,即maximumPoolSize
executor-Executor名称,用来引l用下面的Executor(如果定义了executor,就会覆盖掉核心线程数和最大线程数,以这个executor为准)

executor的配置就比Connector的优先级高

Executor 线程配置 

6、Fork/Join

6.1 概念

Fork/Join是JDK1.7加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的cpu密集型运算
所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计算,如归并排序、斐波那契数列、都可以用分治思想进行求解。
Fork/Join在分治的基础上加入了多线程,可以把每个任务的分解合并交给不同的线程来完成,进一步提升了运算效率。
Fork/Join默认创建与cpu核心数大小相同线程池

6.2 使用

 提交给Fork/Join线程池的任务需要继承RecursiveTask(有返回值)或RecursiveAction(无返回值),例如下面定义了一个对1~n之间的整数求和的任务

@Slf4j(topic = "c.TestForkJoin2")
public class TestForkJoin2 {public static void main(String[] args) {ForkJoinPool pool = new ForkJoinPool(4);System.out.println(pool.invoke(new MyTask(5)));// new MyTask(5)  5+ new MyTask(4)  4 + new MyTask(3)  3 + new MyTask(2)  2 + new MyTask(1)}
}// 1~n 之间整数的和
@Slf4j(topic = "c.MyTask")
class MyTask extends RecursiveTask<Integer> {private int n;public MyTask(int n) {this.n = n;}@Overridepublic String toString() {return "{" + n + '}';}@Overrideprotected Integer compute() {// 如果 n 已经为 1,可以求得结果了if (n == 1) {log.debug("join() {}", n);return n;}// 将任务进行拆分(fork)AddTask1 t1 = new AddTask1(n - 1);t1.fork();log.debug("fork() {} + {}", n, t1);// 合并(join)结果int result = n + t1.join();log.debug("join() {} + {} = {}", n, t1, result);return result;}
}

改进: 

public class TestForkJoin {public static void main(String[] args) {ForkJoinPool pool = new ForkJoinPool(4);
//        System.out.println(pool.invoke(new AddTask1(5)));System.out.println(pool.invoke(new AddTask3(1, 5)));}
}@Slf4j(topic = "c.AddTask")
class AddTask1 extends RecursiveTask<Integer> {int n;public AddTask1(int n) {this.n = n;}@Overridepublic String toString() {return "{" + n + '}';}@Overrideprotected Integer compute() {if (n == 1) {log.debug("join() {}", n);return n;}AddTask1 t1 = new AddTask1(n - 1);t1.fork();log.debug("fork() {} + {}", n, t1);int result = n + t1.join();log.debug("join() {} + {} = {}", n, t1, result);return result;}
}@Slf4j(topic = "c.AddTask")
class AddTask2 extends RecursiveTask<Integer> {int begin;int end;public AddTask2(int begin, int end) {this.begin = begin;this.end = end;}@Overridepublic String toString() {return "{" + begin + "," + end + '}';}@Overrideprotected Integer compute() {if (begin == end) {log.debug("join() {}", begin);return begin;}if (end - begin == 1) {log.debug("join() {} + {} = {}", begin, end, end + begin);return end + begin;}int mid = (end + begin) / 2;AddTask2 t1 = new AddTask2(begin, mid - 1);t1.fork();AddTask2 t2 = new AddTask2(mid + 1, end);t2.fork();log.debug("fork() {} + {} + {} = ?", mid, t1, t2);int result = mid + t1.join() + t2.join();log.debug("join() {} + {} + {} = {}", mid, t1, t2, result);return result;}
}@Slf4j(topic = "c.AddTask")
class AddTask3 extends RecursiveTask<Integer> {int begin;int end;public AddTask3(int begin, int end) {this.begin = begin;this.end = end;}@Overridepublic String toString() {return "{" + begin + "," + end + '}';}@Overrideprotected Integer compute() {if (begin == end) {log.debug("join() {}", begin);return begin;}if (end - begin == 1) {log.debug("join() {} + {} = {}", begin, end, end + begin);return end + begin;}int mid = (end + begin) / 2;AddTask3 t1 = new AddTask3(begin, mid);t1.fork();AddTask3 t2 = new AddTask3(mid + 1, end);t2.fork();log.debug("fork() {} + {} = ?", t1, t2);int result = t1.join() + t2.join();log.debug("join() {} + {} = {}", t1, t2, result);return result;}
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1410677.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

中国人自己编的百科全书,百科知识全书10册(外国卷)

一、电子书描述 中国人自己编的与国际接轨的百科全书&#xff0c;也是真正意义上的现代百科全书&#xff0c;在坚持全面反映了人类知识的同时&#xff0c;也突出了中国特色&#xff0c;充分显示世界科学文化的新成就和新发展。本套电子书&#xff0c;大小367.39M&#xff0c;共…

每日一题:两地调度

公司计划面试 2n 人。给你一个数组 costs &#xff0c;其中 costs[i] [, ] 。第 i 人飞往 a 市的费用为 &#xff0c;飞往 b 市的费用为 。 返回将每个人都飞到 a 、b 中某座城市的最低费用&#xff0c;要求每个城市都有 n 人抵达。 示例 1&#xff1a; 输入&#xff1a;c…

【深耕 Python】Data Science with Python 数据科学(19)书402页练习题:模型准确率对比研究、KMeans算法的一点探讨

写在前面 关于数据科学环境的建立&#xff0c;可以参考我的博客&#xff1a; 【深耕 Python】Data Science with Python 数据科学&#xff08;1&#xff09;环境搭建 往期数据科学博文一览&#xff1a; 【深耕 Python】Data Science with Python 数据科学&#xff08;2&…

Redis 实战1

SDS Redis 只会使用 C 字符串作为字面量&#xff0c; 在大多数情况下&#xff0c; Redis 使用 SDS &#xff08;Simple Dynamic String&#xff0c;简单动态字符串&#xff09;作为字符串表示。 比起 C 字符串&#xff0c; SDS 具有以下优点&#xff1a; 常数复杂度获取字符串…

【高质量精品】2024五一数学建模C题成品论文22页matlab和13页python完整建模代码、可视图表+分解结果等(后续会更新)

您的点赞收藏是我继续更新的最大动力&#xff01; 一定要点击如下卡片&#xff0c;那是获取资料的入口&#xff01; 【高质量精品】2024五一数学建模C题成品论文22页matlab和13页python完整建模代码、可视图表分解结果等「首先来看看目前已有的资料&#xff0c;还会不断更新哦…

『MySQL 实战 45 讲』19 - 为什么我只查一行的语句,也执行这么慢?

为什么我只查一行的语句&#xff0c;也执行这么慢&#xff1f; 需求&#xff1a;创建一个表&#xff0c;有两个字段 id 和 c&#xff0c;并且在里面插入了 10 万行记录 CREATE TABLE t (id int(11) NOT NULL,c int(11) DEFAULT NULL,PRIMARY KEY (id) ) ENGINEInnoDB;delimit…

硬件知识积累 DP 接口简单介绍以及 DP信号飞线到显示屏的问题

1. DP 接口的介绍 定义与起源&#xff1a; DP接口是由PC及芯片制造商联盟开发&#xff0c;并由视频电子标准协会&#xff08;VESA&#xff09;标准化的数字式视频接口标准。它的设计初衷是为了取代传统的VGA、DVI和FPD-Link&#xff08;LVDS&#xff09;接口&#xff0c;以满足…

Qt QImageReader类介绍

1.简介 QImageReader 是用于读取图像文件的类。它提供了读取不同图像格式的功能&#xff0c;包括但不限于 PNG、JPEG、BMP 等。QImageReader 可以用于文件&#xff0c;也可以用于任何 QIODevice&#xff0c;如 QByteArray &#xff0c;这使得它非常灵活。 QImageReader 是一个…

323_C++_QT_QProcess执行cmd解压、压缩、删除tar.gz等等其他压缩包文件到指定目录,不需要外部库,QT自带API的就行

// decompressPath : 解压到此目录 // fileName : 解压的tar.gz文件名executeCommand(decompressPath , QString::fromStdString(fileName));// 开始解压 void executeCommand

uni-app scroll-view隐藏滚动条的小细节 兼容主流浏览器

开端 想写个横向滚动的列表适配浏览器&#xff0c;主要就是隐藏一下滚动条在手机上美观一点。 但是使用uni-app官方文档建议的::-webkit-scrollbar在目标标签时发现没生效。 .scroll-view_H::-webkit-scrollbar{display: none; }解决 F12看了一下&#xff0c;原来编译到浏览…

漏洞扫描神器:Nessus 保姆级教程(附破解步骤)

一、介绍 Nessus是一款广泛使用的网络漏洞扫描工具&#xff0c;用于发现和评估计算机系统和网络中的安全漏洞。它是一款功能强大的商业工具&#xff0c;由Tenable Network Security开发和维护。 以下是Nessus的一些主要特点和功能&#xff1a; 1. 漏洞扫描&#xff1a;Nessu…

转义字符解释

也许在一些代码中你看到 \n, \0 很纳闷是啥。其实在字符中有一组特殊的字符是转义字符&#xff0c;转义字符顾名思义&#xff1a;转变原来的意思的字符。 比如&#xff1a;我们有字符n&#xff0c;在字符串中打印的时候自然能打印出这个字符&#xff0c;如下&#xff1a; #in…

通过 API 接口,实现增值税发票智能识别

增值税发票智能识别是一项应用于财务管理和数据分析的技术&#xff0c;通过使用API接口&#xff0c;我们可以轻松地将增值税发票的各项信息进行结构化识别。本文将详细介绍如何通过API接口实现增值税发票的智能识别&#xff0c;并给出相应的代码说明。 首先&#xff0c;我们需…

自动安装环境shell脚本使用和运维基础使用讲解

title: 自动安装环境shell脚本使用和运维基础使用讲解 tags: [shell,linux,运维] categories: [开发记录,系统运维] date: 2024-3-27 14:10:15 description: 准备和说明 确认有网。 依赖程序集&#xff0c;官网只提供32位压缩包&#xff0c;手动编译安装后&#xff0c;在64位机…

Java 新手上路常见的5个经典问题,你遇到过吗?

当我们开始学习一门新的编程语言或者开发平台时&#xff0c;经常会遇到一些常见的问题。这些问题不仅是学习过程中的一部分&#xff0c;也是成长和提高的机会。 1. 空指针异常&#xff08;NullPointerException&#xff09; 空指针异常是 Java 开发中最常见的问题之一。它的产…

docker学习笔记3:VmWare CentOS7安装与静态ip配置

文章目录 一、安装CentOS71、下载centos镜像2、安装二、设置静态ip三、xshell连接centos本专栏的docker环境是在centos7里安装,因此首先需要会安装centos虚拟机。 本篇博客介绍如何在vm虚拟机里安装centos7。 一、安装CentOS7 1、下载centos镜像 推荐清华源,下载如下版本 …

OpenCV4.9去运动模糊滤镜(68)

返回:OpenCV系列文章目录&#xff08;持续更新中......&#xff09; 上一篇:OpenCV4.9失焦去模糊滤镜(67) 下一篇 :OpenCV系列文章目录&#xff08;持续更新中......&#xff09; 目标 在本教程中&#xff0c;您将学习&#xff1a; 运动模糊图像的 PSF 是多少如何恢复运动模…

2024-5-3学习笔记 继承关系拓展

一.继承与友元 友元类不能继承&#xff0c;也就是说基类友元不能访问子类私有和保护成员。简单的理解就是&#xff0c;爸爸的朋友不是儿子的朋友。 二.继承与静态成员 基类定义了static静态成员&#xff0c;则整个继承体系里面只有一个这样的成员。无论派生出多少个子类&…

Mac 更新 Homebrew软件包时提示 zsh: command not found: brew 错误

问题 通过Mac电脑更新Homebrew软件包时出现如下错误&#xff1a; xxxxxxxpiaodeMacBook-Pro ~ % brew update zsh: command not found: brew解决方案 在命令行输入如下指令&#xff1a; /bin/zsh -c "$(curl -fsSL https://gitee.com/cunkai/HomebrewCN/raw/master/H…

string(上)

目录 一、string类的简单介绍 二、string类中成员函数介绍 1.构造函数 1&#xff09;string&#xff08;&#xff09; 2&#xff09;string&#xff08;const string& str&#xff09; 3&#xff09;string&#xff08;const string& str&#xff0c;size_t pos&…