文章目录
- 支持异步线程自动传递上下文(例如当前请求)的工具类(支持自定义上下文传递逻辑,支持拦截所有异步操作)
- 使用示范
- ContextSupportedAsyncUtil .java
- 自动拦截所有异步线程池操作
- ContextSupportedExecutorAspect.java
- 自定义上下文注入逻辑示范
支持异步线程自动传递上下文(例如当前请求)的工具类(支持自定义上下文传递逻辑,支持拦截所有异步操作)
当我们使用异步线程去执行一些耗时操作的时候,这些异步操作中可能需要获取当前请求等上下文信息
若未做传递逻辑默认是获取不到的,因此写了一个自动传递的工具和切面,可使用工具手动调用时会自动拷贝上下文信息,加载切面后会拦截所有异步操作自动拷贝上下文信息。
同时,上下文信息的加载拷贝移除逻辑也可实现接口自定义,自行扩展。
和阿里巴巴TransmittableThreadLocal(TTL)类似,多支持了可以自定义上下文传递逻辑,你可以认为是阿里TTL手写版本知乎介绍阿里TTL原理
使用示范
异步执行lambda表达式中的代码,代码中获取当前请求的URL并打印
若未使用该工具,是无法获取到当前请求的。
ContextSupportedAsyncUtil.execute(()->{System.out.println("我在异步线程中获取到的当前请求URL是"+((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest().getRequestURI());});
ContextSupportedAsyncUtil .java
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;/*** @author humorchen* date: 2024/7/30* description: 支持上下文自动传递的异步工具* 默认支持自动传递 HttpServletRequest 到异步线程中* 其他上下文可自行创建类实现 AsyncContextInjector 接口,并调用ContextSupportedAsyncUtil.registerContextInjector 将其注册上去。* 可参考 SpringWebRequestContextInjector。class**/
@Slf4j
public class ContextSupportedAsyncUtil {private static final int CORE_SIZE = 8;private static final int MAX_SIZE = 32;private static final int QUEUE_SIZE = 1024;private static final int KEEP_ALIVE = 5;private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.MINUTES;private static final ArrayBlockingQueue<Runnable> QUEUE = new ArrayBlockingQueue<>(QUEUE_SIZE);private static final AtomicInteger THREAD_NUM = new AtomicInteger(0);private static final RejectedExecutionHandler REJECT_POLICY = new ThreadPoolExecutor.CallerRunsPolicy();private static final List<Class<? extends AsyncContextInjector>> ASYNC_CONTEXT_INJECTOR_CLS_LIST = new ArrayList<>();private static ContextSupportedThreadPoolExecutor EXECUTOR = new ContextSupportedThreadPoolExecutor(CORE_SIZE, MAX_SIZE, KEEP_ALIVE, KEEP_ALIVE_UNIT, QUEUE, (r) -> new Thread(r, "AsyncUtil-thread-" + THREAD_NUM.incrementAndGet()), REJECT_POLICY);static {// 默认支持spring mvc 的RequestHolder自动传递到异步线程中ASYNC_CONTEXT_INJECTOR_CLS_LIST.add(SpringWebRequestContextInjector.class);}/*** 支持自定义上下文传递的executor*/public static class ContextSupportedThreadPoolExecutor extends ThreadPoolExecutor {public ContextSupportedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);}public ContextSupportedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);}public ContextSupportedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);}public ContextSupportedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);}/*** @param command the task to execute*/@Overridepublic void execute(@NonNull Runnable command) {super.execute(command instanceof AsyncRunnable ? command : getAsyncRunnable(command));}}/*** 异步上下文注入器接口* 将A线程的上下文注入到执行Runnable的B线程,并在执行后清除*/public interface AsyncContextInjector {/*** 初始化* 读取A线程的上下文并保存到当前对象*/void init();/*** 注入上下文信息* 注入init阶段存储的上下文到B线程的上下文中*/void inject();/*** 移除上下文信息* 清理B线程刚注入的上下文*/void remove();}/*** 支持传递ServletRequestAttributes对象用于获取当前请求HttpServletRequest*/public static class SpringWebRequestContextInjector implements AsyncContextInjector {private ServletRequestAttributes requestAttributes;/*** 初始化* 读取A线程的上下文并保存到当前对象*/@Overridepublic void init() {requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();}/*** 注入上下文信息* 注入init阶段存储的上下文到B线程的上下文中*/@Overridepublic void inject() {RequestContextHolder.setRequestAttributes(requestAttributes);}/*** 移除上下文信息* 清理B线程刚注入的上下文*/@Overridepublic void remove() {RequestContextHolder.resetRequestAttributes();}}/*** 抽象的注入器,将自定义的每个注入器执行*/public static class AbstractAsyncContextInjector {private final List<AsyncContextInjector> list = new ArrayList<>(ASYNC_CONTEXT_INJECTOR_CLS_LIST.size());public void init() {// 初始化每个异步上下文注入器for (Class<? extends AsyncContextInjector> aClass : ASYNC_CONTEXT_INJECTOR_CLS_LIST) {try {AsyncContextInjector asyncContextInjector = aClass.newInstance();asyncContextInjector.init();list.add(asyncContextInjector);} catch (Exception e) {log.error("AbstractAsyncContextInjector init error", e);}}}public void inject() {for (AsyncContextInjector asyncContextInjector : list) {try {asyncContextInjector.inject();} catch (Exception e) {log.error("AbstractAsyncContextInjector inject error", e);}}}public void remove() {for (AsyncContextInjector asyncContextInjector : list) {try {asyncContextInjector.remove();} catch (Exception e) {log.error("AbstractAsyncContextInjector remove error", e);}}}}/*** 支持异步线程传递自定义上下文时使用的Runnable包装类*/public static class AsyncRunnable extends AbstractAsyncContextInjector implements Runnable {private final Runnable runnable;public AsyncRunnable(Runnable runnable) {// 初始化保存A线程上下文信息init();this.runnable = runnable;}@Overridepublic void run() {if (runnable == null) {return;}try {// 将保存的A线程的上下文信息恢复到B线程inject();runnable.run();} catch (Exception e) {log.error("【ContextSupportedAsyncUtil】 run error {}", e.getMessage());throw e;} finally {// 清理B线程的上下文remove();}}}/*** 异步执行任务** @param runnable*/public static void execute(Runnable runnable) {EXECUTOR.execute(runnable);}/*** 异步执行任务** @param runnable*/public static void submit(Runnable runnable) {execute(runnable);}/*** 执行异步任务并获取返回值** @param supplier* @param <T>* @return*/public static <T> CompletableFuture<T> execute(Supplier<T> supplier) {return CompletableFuture.supplyAsync(supplier, EXECUTOR);}/*** 获取线程池** @return*/public static ThreadPoolExecutor getExecutor() {return EXECUTOR;}/*** 设置本工具使用的线程池** @param executor*/public static void setExecutor(ContextSupportedThreadPoolExecutor executor) {if (executor == null) {throw new NullPointerException("executor 不得为空");}if (executor.isShutdown() || executor.isTerminated() || executor.isTerminating()) {throw new IllegalStateException("executor 状态不得为shutdown");}ContextSupportedThreadPoolExecutor old = EXECUTOR;EXECUTOR = executor;if (old != null) {old.shutdown();}}/*** 获取支持上下文注入的Runnable** @param runnable* @return 包装后的runnable*/public static AsyncRunnable getAsyncRunnable(Runnable runnable) {return new AsyncRunnable(runnable);}/*** 注册注入器** @param cls*/public static void registerContextInjector(Class<? extends AsyncContextInjector> cls) {if (cls != null && !ASYNC_CONTEXT_INJECTOR_CLS_LIST.contains(cls)) {ASYNC_CONTEXT_INJECTOR_CLS_LIST.add(cls);}}
}
自动拦截所有异步线程池操作
ContextSupportedExecutorAspect.java
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;/*** @author humorchen* date: 2024/8/7* description: 支持上下文的executor**/
@Component
@Aspect
@Slf4j
public class ContextSupportedExecutorAspect {@Around("execution(* java.util.concurrent.Executor.execute(java.lang.Runnable))")public Object contextSupportedExecutor(ProceedingJoinPoint joinPoint) throws Throwable {Object[] args = joinPoint.getArgs();if (args != null && args.length > 0) {Object arg = args[0];if (arg instanceof Runnable && !(arg instanceof ContextSupportedAsyncUtil.AsyncRunnable)) {args[0] = ContextSupportedAsyncUtil.getAsyncRunnable((Runnable) arg);}}return joinPoint.proceed(args);}}
自定义上下文注入逻辑示范
SpringWebRequestContextInjector.java
异步线程上下文自动注入当前请求(默认提供)
/*** 支持传递ServletRequestAttributes对象用于获取当前请求HttpServletRequest*/public static class SpringWebRequestContextInjector implements AsyncContextInjector {private ServletRequestAttributes requestAttributes;/*** 初始化* 读取A线程的上下文并保存到当前对象*/@Overridepublic void init() {requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();}/*** 注入上下文信息* 注入init阶段存储的上下文到B线程的上下文中*/@Overridepublic void inject() {RequestContextHolder.setRequestAttributes(requestAttributes);}/*** 移除上下文信息* 清理B线程刚注入的上下文*/@Overridepublic void remove() {RequestContextHolder.resetRequestAttributes();}}