@EnableAsync+@Async源码学习笔记之五
接上文,本文来追踪AnnotationAsyncExecutionInterceptor
,源码如下:
/*** Specialization of {@link AsyncExecutionInterceptor} that delegates method execution to* an {@code Executor} based on the {@link Async} annotation. Specifically designed to* support use of {@link Async#value()} executor qualification mechanism introduced in* Spring 3.1.2. Supports detecting qualifier metadata via {@code @Async} at the method or* declaring class level. See {@link #getExecutorQualifier(Method)} for details.** @author Chris Beams* @author Stephane Nicoll* @since 3.1.2* @see org.springframework.scheduling.annotation.Async* @see org.springframework.scheduling.annotation.AsyncAnnotationAdvisor*/
public class AnnotationAsyncExecutionInterceptor extends AsyncExecutionInterceptor {/*** Create a new {@code AnnotationAsyncExecutionInterceptor} with the given executor* and a simple {@link AsyncUncaughtExceptionHandler}.* @param defaultExecutor the executor to be used by default if no more specific* executor has been qualified at the method level using {@link Async#value()};* as of 4.2.6, a local executor for this interceptor will be built otherwise*/public AnnotationAsyncExecutionInterceptor(@Nullable Executor defaultExecutor) {super(defaultExecutor);}/*** Create a new {@code AnnotationAsyncExecutionInterceptor} with the given executor.* @param defaultExecutor the executor to be used by default if no more specific* executor has been qualified at the method level using {@link Async#value()};* as of 4.2.6, a local executor for this interceptor will be built otherwise* @param exceptionHandler the {@link AsyncUncaughtExceptionHandler} to use to* handle exceptions thrown by asynchronous method executions with {@code void}* return type*/public AnnotationAsyncExecutionInterceptor(@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {super(defaultExecutor, exceptionHandler);}/*** Return the qualifier or bean name of the executor to be used when executing the* given method, specified via {@link Async#value} at the method or declaring* class level. If {@code @Async} is specified at both the method and class level, the* method's {@code #value} takes precedence (even if empty string, indicating that* the default executor should be used preferentially).* @param method the method to inspect for executor qualifier metadata* @return the qualifier if specified, otherwise empty string indicating that the* {@linkplain #setExecutor(Executor) default executor} should be used* @see #determineAsyncExecutor(Method) ------ 注意这个方法*/@Override@Nullableprotected String getExecutorQualifier(Method method) {// Maintainer's note: changes made here should also be made in// AnnotationAsyncExecutionAspect#getExecutorQualifierAsync async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);if (async == null) {async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);}System.out.println("找 @Async 注解。先找方法上的。找不到的话再找类上的。也就是说方法上的@Async注解的优先级要高于类上的。");return (async != null ? async.value() : null);}
}
这个拦截器本身没有多少方法。我们需要关注其中的2个:一个是有2个参数的构造方法,上文中我们追踪过程中,调的就是它,所以我们顺着追。另一个就是getExecutorQualifier
方法。
先看 getExecutorQualifier
方法,这个方法所做的入参是你写的标记有 @Async
注解的方法,返回值是 @Async
的 value
属性的值,也就是你指定的线程池的名字。
逐步分析下这个方法,首先,从方法上找 @Async
注解,找不到的话,从类上找,找到的话,返回 @Async
的 value
属性的值,而 @Async
的 value
属性的值就是线程池的名字。
然后看有2个参数的构造方法
public AnnotationAsyncExecutionInterceptor(@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {super(defaultExecutor, exceptionHandler);
}
这个构造方法唯一做的事情就是调用了父类的构造方法,所以重点就转移到父类 AsyncExecutionInterceptor
里边了。
接下来我们就重点分析父类 AsyncExecutionInterceptor
,源码如下:
package org.springframework.aop.interceptor;import java.lang.reflect.Method;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.core.BridgeMethodResolver;
import org.springframework.core.Ordered;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils;/*** AOP Alliance {@code MethodInterceptor} that processes method invocations* asynchronously, using a given {@link org.springframework.core.task.AsyncTaskExecutor}.* Typically used with the {@link org.springframework.scheduling.annotation.Async} annotation.** <p>In terms of target method signatures, any parameter types are supported.* However, the return type is constrained to either {@code void} or* {@code java.util.concurrent.Future}. In the latter case, the Future handle* returned from the proxy will be an actual asynchronous Future that can be used* to track the result of the asynchronous method execution. However, since the* target method needs to implement the same signature, it will have to return* a temporary Future handle that just passes the return value through* (like Spring's {@link org.springframework.scheduling.annotation.AsyncResult}* or EJB 3.1's {@code javax.ejb.AsyncResult}).** <p>When the return type is {@code java.util.concurrent.Future}, any exception thrown* during the execution can be accessed and managed by the caller. With {@code void}* return type however, such exceptions cannot be transmitted back. In that case an* {@link AsyncUncaughtExceptionHandler} can be registered to process such exceptions.** <p>As of Spring 3.1.2 the {@code AnnotationAsyncExecutionInterceptor} subclass is* preferred for use due to its support for executor qualification in conjunction with* Spring's {@code @Async} annotation.** @author Juergen Hoeller* @author Chris Beams* @author Stephane Nicoll* @since 3.0* @see org.springframework.scheduling.annotation.Async* @see org.springframework.scheduling.annotation.AsyncAnnotationAdvisor* @see org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor*/
public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {/*** Create a new instance with a default {@link AsyncUncaughtExceptionHandler}.* @param defaultExecutor the {@link Executor} (typically a Spring {@link AsyncTaskExecutor}* or {@link java.util.concurrent.ExecutorService}) to delegate to;* as of 4.2.6, a local executor for this interceptor will be built otherwise*/public AsyncExecutionInterceptor(@Nullable Executor defaultExecutor) {super(defaultExecutor);}/*** Create a new {@code AsyncExecutionInterceptor}.* @param defaultExecutor the {@link Executor} (typically a Spring {@link AsyncTaskExecutor}* or {@link java.util.concurrent.ExecutorService}) to delegate to;* as of 4.2.6, a local executor for this interceptor will be built otherwise* @param exceptionHandler the {@link AsyncUncaughtExceptionHandler} to use*/public AsyncExecutionInterceptor(@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {super(defaultExecutor, exceptionHandler);}/*** Intercept the given method invocation, submit the actual calling of the method to* the correct task executor and return immediately to the caller.* @param invocation the method to intercept and make asynchronous* @return {@link Future} if the original method returns {@code Future}; {@code null}* otherwise.*/@Override@Nullablepublic Object invoke(final MethodInvocation invocation) throws Throwable {Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);if (executor == null) {throw new IllegalStateException("No executor specified and no default executor set on AsyncExecutionInterceptor either");}Callable<Object> task = () -> {try {Object result = invocation.proceed();if (result instanceof Future) {return ((Future<?>) result).get();}}catch (ExecutionException ex) {handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());}catch (Throwable ex) {handleError(ex, userDeclaredMethod, invocation.getArguments());}return null;};System.out.println("提交任务");System.out.println("提交任务");System.out.println("提交任务");return doSubmit(task, executor, invocation.getMethod().getReturnType());}/*** This implementation is a no-op for compatibility in Spring 3.1.2.* Subclasses may override to provide support for extracting qualifier information,* e.g. via an annotation on the given method.* @return always {@code null}* @since 3.1.2* @see #determineAsyncExecutor(Method)*/@Override@Nullableprotected String getExecutorQualifier(Method method) {return null;}/*** This implementation searches for a unique {@link org.springframework.core.task.TaskExecutor}* bean in the context, or for an {@link Executor} bean named "taskExecutor" otherwise.* If neither of the two is resolvable (e.g. if no {@code BeanFactory} was configured at all),* this implementation falls back to a newly created {@link SimpleAsyncTaskExecutor} instance* for local use if no default could be found.* @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME*/@Override@Nullableprotected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {Executor defaultExecutor = super.getDefaultExecutor(beanFactory);return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());}@Overridepublic int getOrder() {return Ordered.HIGHEST_PRECEDENCE;}}
相信大家一眼就看出来重点是 invoke
这个方法。但是我们先不分析它,放最后分析。
先看下 getDefaultExecutor
这个方法,获取默认的线程池 SimpleAsyncTaskExecutor
这个东西可以自己看下,其实并不是真正的线程池。
然后就是看有2个参数的构造方法:
public AsyncExecutionInterceptor(@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {super(defaultExecutor, exceptionHandler);
}
直接调用父类 AsyncExecutionAspectSupport
的构造方法,这个父类也很重要,下一篇分析。
最后,重点分析 invoke
方法,看名字就知道这里是调用你写的异步方法的地方了,代码再看下:
@Override
@Nullable
public Object invoke(final MethodInvocation invocation) throws Throwable {Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);// 重要。找线程池。AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);if (executor == null) {throw new IllegalStateException("No executor specified and no default executor set on AsyncExecutionInterceptor either");}// 包装任务并处理异常Callable<Object> task = () -> {try {Object result = invocation.proceed();if (result instanceof Future) {return ((Future<?>) result).get();}}catch (ExecutionException ex) {handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());}catch (Throwable ex) {handleError(ex, userDeclaredMethod, invocation.getArguments());}return null;};System.out.println("提交任务");System.out.println("提交任务");System.out.println("提交任务");return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
- 找目标方法,也就是你写的异步方法
- 找异步任务执行器
determineAsyncExecutor
这个就比较重要了,具体实现在父类AsyncExecutionAspectSupport
中,我们放下一篇分析 - 用
Callable
把你写的异步方法包装起来 - 调用父类的
doSubmit
方法,这个也很重要,同样在父类AsyncExecutionAspectSupport
,我们放下一篇分析
多说一嘴,在用 Callable
包装你写的异步方法的这段代码中,有关于异常的处理:
Callable<Object> task = () -> {try {Object result = invocation.proceed();if (result instanceof Future) {return ((Future<?>) result).get();}}catch (ExecutionException ex) {handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());}catch (Throwable ex) {handleError(ex, userDeclaredMethod, invocation.getArguments());}return null;
};
重点看下 handleError
方法(也是在父类 AsyncExecutionAspectSupport
中):
protected void handleError(Throwable ex, Method method, Object... params) throws Exception {if (Future.class.isAssignableFrom(method.getReturnType())) {ReflectionUtils.rethrowException(ex);}else {// Could not transmit the exception to the caller with default executortry {this.exceptionHandler.handleUncaughtException(ex, method, params);}catch (Throwable ex2) {logger.error("Exception handler for async method '" + method.toGenericString() +"' threw unexpected exception itself", ex2);}}
}
这里边就用到了异常处理器,不管是默认的 SimpleAsyncUncaughtExceptionHandler
还是你通过 AsyncConfigurer
接口自定义的,都是在这里生效的。