Git仓库
https://gitee.com/Lin_DH/system
介绍
线程池隔离:指一种通过为每个服务提供独立的线程池来隔离服务之间的资源和执行环境的做法。
为什么需要线程池隔离?
- 资源隔离,每个服务都有独立的线程池,可以避免由于某个服务的异常或高负载导致整个系统的线程资源耗尽。
- 性能隔离,通过线程池隔离,可以更好地控制每个服务的并发度和资源利用率,提高系统的性能和稳定性。
- 故障隔离,当一个服务发生故障时,独立的线程池可以使故障不会传播到其他服务,从而增强系统的容错性。
实现线程池隔离的方式 - 使用线程池,为每个服务或功能模块创建独立的线程池。
- 使用线程池隔离框架,如 Hystrix、Resilence4j 等容错框架,可以轻松实现线程池隔离以及其他容错机制。
实现代码
第一步:初始化多个线程池
TaskPoolConfig.java
package com.lm.system.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;/*** @author DUHAOLIN* @date 2024/10/17*/
@EnableAsync
@Configuration
public class TaskPoolConfig {@Beanpublic Executor taskExecutor1() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(2);executor.setMaxPoolSize(2);executor.setQueueCapacity(10);executor.setKeepAliveSeconds(60);executor.setThreadNamePrefix("executor-1-");executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());return executor;}@Beanpublic Executor taskExecutor2() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(2);executor.setMaxPoolSize(2);executor.setQueueCapacity(10);executor.setKeepAliveSeconds(60);executor.setThreadNamePrefix("executor-2-");executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());return executor;}}
第二步:创建异步回调任务,并指定要使用的线程池
AsyncCallBackTask_1.java
package com.lm.system.task;import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;import java.util.Random;
import java.util.concurrent.CompletableFuture;/*** 异步回调* @author DUHAOLIN* @date 2024/10/17*/
@Slf4j
@Component
public class AsyncCallBackTask_1 {public static Random random = new Random();@Async("taskExecutor1")public CompletableFuture<String> one(String taskNo) throws InterruptedException {log.info("开始执行任务:{}", taskNo);long startTime = System.currentTimeMillis();Thread.sleep(random.nextInt(10000));long endTime = System.currentTimeMillis();log.info("完成任务:{},耗时:{} 毫秒", taskNo, endTime - startTime);return CompletableFuture.completedFuture("任务执行完成");}@Async("taskExecutor2")public CompletableFuture<String> two(String taskNo) throws InterruptedException {log.info("开始执行任务:{}", taskNo);long startTime = System.currentTimeMillis();Thread.sleep(random.nextInt(10000));long endTime = System.currentTimeMillis();log.info("完成任务:{},耗时:{} 毫秒", taskNo, endTime - startTime);return CompletableFuture.completedFuture("任务执行完成");}}
第三步:测试类添加测试方法
SystemApplicationTests.java
@Slf4j
@SpringBootTest(classes = SystemApplication.class)
class SystemApplicationTests {@Resourceprivate AsyncCallBackTask_1 asyncCallBackTask_1;@Testpublic void asyncTaskPoolIsolation() throws InterruptedException { //测试线程池隔离long startTime = System.currentTimeMillis();//线程池1CompletableFuture<String> task_1 = asyncCallBackTask_1.one("task_1");CompletableFuture<String> task_2 = asyncCallBackTask_1.one("task_2");CompletableFuture<String> task_3 = asyncCallBackTask_1.one("task_3");//线程池2CompletableFuture<String> task_4 = asyncCallBackTask_1.two("task_4");CompletableFuture<String> task_5 = asyncCallBackTask_1.two("task_5");CompletableFuture<String> task_6 = asyncCallBackTask_1.two("task_6");CompletableFuture.allOf(task_1, task_2, task_3, task_4, task_5, task_6).join();long endTime = System.currentTimeMillis();log.info("任务执行完成,总耗时:" + (endTime - startTime) + "毫秒");}}
效果图
执行测试类 asyncTaskPoolIsolation 方法,得到如下结果:因为配置的初始化线程为2,taskExecutor1 会先执行 one 方法的前两个(图中的 task_2、task_1),task_3 进入缓存队列,等待前两个任务中的一个执行完就可以执行;taskExecutor2 也会先执行 two 方法的前两个(图中的 task_5、task_4),task_6 进入缓存队列,等待前两个任务中的一个执行完就可以执行。