您好,登錄后才能下訂單哦!
Spring 中異步的實現原理是什么,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
開啟 Spring 異步編程之需要一個注解即可:
@EnableAsync
Springboot 中有非常多 @Enable* 的注解,其目的是顯式開啟某一個功能特性,這也是一個非常典型的編程模型。
@EnableAsync 注解注入了一個 AsyncConfigurationSelector 類,這個類目的就是為了注入 ProxyAsyncConfiguration 自動配置類,它的父類 AbstractAsyncConfiguration 做了件事情:
org.springframework.scheduling.annotation.AbstractAsyncConfiguration#setConfigurers
我們可以實現 AsyncConfigurer 接口的方式去自定義一個線程池 Bean,這個后面會會講到,源碼所示,這里目的是為了這個 bean,并將其定義的線程池對象和異常處理對象保存到 AsyncConfiguration 中,用于創建 AsyncAnnotationBeanPostProcessor 。
這兩個對象后面源碼分析會再次遇上。
而這個配置類就是為了注冊一個名為 AsyncAnnotationBeanPostProcessor 的 bean,如其名,它是一個 BeanPostProcessor 處理器,它的類繼承結構如下所示:
從類繼承結構可以看出,AsyncAnnotationBeanPostProcessor 實現了 BeanPostProcessor 和 BeanFactoryAware,因此 AsyncAnnotationBeanPostProcessor 會在 setBeanFactory 方法中做了 Spring 異步編程中最為重要的一步,創建一個針對 @Async 注解的通知器 AsyncAnnotationAdvisor(叫做切面貌似也可以),這個通知器主要用于攔截被 @Async 注解的方法。同時,bean 實例初始化過程會被 AsyncAnnotationBeanPostProcessor 攔截處理,處理過程會將符合條件的 bean 注冊 AsyncAnnotationAdvisor :
org.springframework.aop.framework.AbstractAdvisingBeanPostProcessor#postProcessAfterInitialization
接下來我們就分析 AsyncAnnotationAdvisor 是如何創建的。
AsyncAnnotationAdvisor 實現了 PointcutAdvisor 接口,因此需要同時實現 getPointcut 和 getAdvice 方法,而這兩個方法的實際內容有以上紅框創建實現。
到這里我們已經知道,Spring 的異步實現原理,是利用 Spring AOP 切面編程實現的,通過 BeanPostProcessor 攔截處理符合條件的 bean,并將切面織入,實現切面增強處理。
Spring AOP 的編程核心概念:
Advice:通知,切面的一種實現,可以完成簡單的織入功能。通知定義了增強代碼切入到目標代碼的時間點,是目標方法執行之前執行,還是執行之后執行等。切入點定義切入的位置,通知定義切入的時間;
Pointcut:切點,切入點指切面具體織入的方法;
Advisor:切面的另一種實現,能夠將通知以更為復雜的方式織入到目標對象中,是將通知包裝為更復雜切面的裝配器。
因此我們需要創建一個切面和切入點:
buildAdvice:
buildAdvice 方法可知,切面是一個 AnnotationAsyncExecutionInterceptor 類,該類實現了 MethodInterceptor 接口,其 invoke 方法即為攔截處理的核心源碼,后面會進行詳細分析。
buildPointcut:
從 AsyncAnnotationAdvisor 構造器中可以看出,buildPointcut 方法目的就是為了創建 @Async 注解的切入點。
前面我們已經知道,攔截切面是一個 AnnotationAsyncExecutionInterceptor 類,我們直接定位到 invoke 方法一探究竟:
org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke
攔截處理的核心邏輯就是這么簡單,也沒啥好分析的,無非就是匹配方法指定的線程池,接著構建執行單元 Callable,最后調用 doSubmit 方法執行。
重點在于如何匹配線程池,這也是后面實戰分析的重點內容,因此我們需要在這里詳細分析匹配線程池的一些策略細節。
org.springframework.aop.interceptor.AsyncExecutionAspectSupport#determineAsyncExecutor
getExecutorQualifier 方法目的是獲取 @Async 注解上的 value 值,value 值即線程池 Bean 的名稱,如果獲取到的 targetExecutor 不是 Spring 類型的線程池,則使用 TaskExecutorAdapter 進行適配,這也是為什么我們直接創建 Executor 類型的線程池 Spring 也是支持的原因。
從以上源碼邏輯可看出如果我們使用 @Async 注解時 value 值為空,Spring 就會使用 defaultExecutor ,defaultExecutor 是什么時候賦值的呢?上面內容已經有提及,在 buildAdvice 方法創建 AnnotationAsyncExecutionInterceptor 時 調用了其 configure 方法,如下:
org.springframework.aop.interceptor.AsyncExecutionAspectSupport#configure
原來當 defaultExecutor 和 exceptionHandler 是當初從 ProxyAsyncConfiguration 中獲取用戶自定義的 AsyncConfigurer 實現類而來的,那么如果 defaultExecutor 不存在怎么辦?從源碼可看出,defaultExecutor 其實是一個 SingletonSupplier 類型,如果調用 get 方法不存在,則使用默認值,默認值為:
() -> getDefaultExecutor(this.beanFactory);
org.springframework.aop.interceptor.AsyncExecutionAspectSupport#getDefaultExecutor
注意第一個紅框的注釋,此時 Spring 尋找默認的線程池 Bean 為指定 Spring 的 TaskExecutor 類型,并非 Executor 類型,如果 Bean 容器中沒有找到 TaskExecutor 類型的 Bean,則繼續尋找默認為以下名稱的 Bean:
public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "taskExecutor";
那么如果都沒有找到怎么辦呢?在這個方法直接返回 null 了,AsyncExecutionInterceptor 類覆寫了 這個方法:
org.springframework.aop.interceptor.AsyncExecutionInterceptor#getDefaultExecutor
如果沒有找到,則直接創建一個 SimpleAsyncTaskExecutor 類作為 @Async 注解底層使用的線程池。
從匹配線程池源碼得知,如果你創建的線程池 Bean 非TaskExecutor 類型并且沒有使用實現 AsyncConfigurer 接口方式創建線程池,就需要主動指定線程池 Bean 名稱,否則 Spring 會使用默認策略。
利用 BeanPostProcessor 機制在 Bean 初始化過程中創建一個 AsyncAnnotationAdvisor 切面,并且符合條件的 Bean 生成代理對象并將 AsyncAnnotationAdvisor 切面添加到代理中。
可以看出 Spring 的很多功能都是圍繞著 Spring IOC 和 AOP 實現的。
有時候為了方便,我們不自定義創建線程池 bean 時,Spring 默認會為我們提供什么樣的線程池呢?
我們先來看下結果:
很奇怪,明明我們都沒有在項目中自定義線程池 Bean,按照以上源碼的分析結果來看,此時 Spring 選擇的是 SimpleAsyncTaskExecutor 才對,莫非是 super#getDefaultExecutor 方法找到了線程池 Bean?
從以上截圖確實是找到了,而且類型還是 ThreadPoolTaskExecutor 類型的,那可以推斷出 Spring 一定是在某個地方創建了一個 ThreadPoolTaskExecutor 類型的 Bean。
果然,在 spring-boot-autoconfigure 2.1.3.RELEASE 中,會在 org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration 中自動創建一個默認的 ThreadPoolTaskExecutor bean,getDefaultExecutor 方法會在容器中找到這個bean,并將其作為默認的 @Async 注解的執行線程池。
這里我為什么要標注版本呢?因為某些低版本的 spring-boot-autoconfigure,是沒有 TaskExecutionAutoConfiguration 的,此時 Spring 就會選擇 SimpleAsyncTaskExecutor。
org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration
從以上源碼可以看出,默認的線程池的參數還可以手動在 properties 中配置,這意味著不需要主動創建線程池的情況下,也可以通過 properties 配置文件更改線程池相關參數。
1、直接創建一個 Bean 的方式,這貌似是最多人使用的方式,可以創建多個線程池 Bean,使用時指定線程池 Bean 名稱:
@Bean("myTaskExecutor_1") public Executor getThreadPoolTaskExecutor1() { final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // set ... return executor; } @Bean("myTaskExecutor_2") public Executor getThreadPoolTaskExecutor2() { final ThreadPoolExecutor executor = new ThreadPoolExecutor(); // set ... return executor; }
2、實現 AsyncConfigurer 接口方式:
@Component public class AsyncConfigurerTest implements AsyncConfigurer { private static final Logger LOGGER = LoggerFactory.getLogger(AsyncConfigurerTest.class); @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // set ... return executor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (ex, method, params) -> { LOGGER.info("Exception message:{}", ex.getMessage(), ex); LOGGER.info("Method name:{}", method.getName()); for (Object param : params) { LOGGER.info("Parameter value:{}", param); } }; } }
這種方式可以方便定義異常處理的邏輯,不過從源碼分析可以看出,項目中只能存在一個 AsyncConfigurer 的配置,意味著我們只能通過 AsyncConfigurer 配置一個自定義的線程池 Bean。
3、利用 spring-boot-autoconfigure 在 properties 配置線程池參數:
前面講到了 Spring 默認線程池策略,這里利用 spring-boot-autoconfigure 默認創建一個 ThreadPoolTaskExecutor,通過 properties 自定義線程池相關參數。
這個方式的缺點就是類型固定為 ThreadPoolTaskExecutor,且只能有一個線程池。
注:以上所有原理分析與實戰結果都是基于 Spring 5.1.5.RELEASE 版本。
看完上述內容,你們掌握Spring 中異步的實現原理是什么的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。