您好,登錄后才能下訂單哦!
這篇文章主要介紹“java異步編程的實現方式有哪些”,在日常操作中,相信很多人在java異步編程的實現方式有哪些問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”java異步編程的實現方式有哪些”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
當用戶創建一筆電商交易訂單時,要經歷的業務邏輯流程還是很長的,每一步都要耗費一定的時間,那么整體的RT就會比較長。
于是,聰明的人們開始思考能不能將一些非核心業務從主流程中剝離出來,于是有了異步編程
雛形。
異步編程是讓程序并發運行的一種手段。它允許多個事件同時發生,當程序調用需要長時間運行的方法時,它不會阻塞當前的執行流程,程序可以繼續運行。
核心思路:采用多線程優化性能,將串行操作變成并行操作。異步模式設計的程序可以顯著減少線程等待,從而在高吞吐量場景中,極大提升系統的整體性能,顯著降低時延。
接下來,我們來講下異步有哪些編程實現方式
直接繼承 Thread類
是創建異步線程最簡單的方式。
首先,創建Thread子類,普通類或匿名內部類方式;然后創建子類實例;最后通過start()方法啟動線程。
public class AsyncThread extends Thread{ @Override public void run() { System.out.println("當前線程名稱:" + this.getName() + ", 執行線程名稱:" + Thread.currentThread().getName() + "-hello"); } }
public static void main(String[] args) { // 模擬業務流程 // ....... // 創建異步線程 AsyncThread asyncThread = new AsyncThread(); // 啟動異步線程 asyncThread.start(); }
當然如果每次都創建一個 Thread線程
,頻繁的創建、銷毀,浪費系統資源。我們可以采用線程池
@Bean(name = "executorService") public ExecutorService downloadExecutorService() { return new ThreadPoolExecutor(20, 40, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new ThreadFactoryBuilder().setNameFormat("defaultExecutorService-%d").build(), (r, executor) -> log.error("defaultExecutor pool is full! ")); }
將業務邏輯封裝到 Runnable
或 Callable
中,交由 線程池
來執行
上述方式雖然達到了多線程并行處理,但有些業務不僅僅要執行過程,還要獲取執行結果。
Java 從1.5版本開始,提供了 Callable
和 Future
,可以在任務執行完畢之后得到任務執行結果。
當然也提供了其他功能,如:取消任務、查詢任務是否完成等
Future類位于java.util.concurrent包下,接口定義:
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
方法描述:
cancel():取消任務,如果取消任務成功返回true,如果取消任務失敗則返回false
isCancelled():表示任務是否被取消成功,如果在任務正常完成前被取消成功,則返回 true
isDone():表示任務是否已經完成,如果完成,返回true
get():獲取執行結果,這個方法會產生阻塞,會一直等到任務執行完畢才返回
get(long timeout, TimeUnit unit):用來獲取執行結果,如果在指定時間內,還沒獲取到結果,就直接返回null
代碼示例:
public class CallableAndFuture { public static ExecutorService executorService = new ThreadPoolExecutor(4, 40, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024), new ThreadFactoryBuilder() .setNameFormat("demo-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy()); static class MyCallable implements Callable<String> { @Override public String call() throws Exception { return "異步處理,Callable 返回結果"; } } public static void main(String[] args) { Future<String> future = executorService.submit(new MyCallable()); try { System.out.println(future.get()); } catch (Exception e) { // nodo } finally { executorService.shutdown(); } } }
Future 表示一個可能還沒有完成的異步任務的結果,通過 get
方法獲取執行結果,該方法會阻塞直到任務返回結果。
FutureTask
實現了 RunnableFuture
接口,則 RunnableFuture
接口繼承了 Runnable
接口和 Future
接口,所以可以將 FutureTask
對象作為任務提交給 ThreadPoolExecutor
去執行,也可以直接被 Thread
執行;又因為實現了 Future
接口,所以也能用來獲得任務的執行結果。
FutureTask 構造函數:
public FutureTask(Callable<V> callable) public FutureTask(Runnable runnable, V result)
FutureTask 常用來封裝 Callable
和 Runnable
,可以作為一個任務提交到線程池中執行。除了作為一個獨立的類之外,也提供了一些功能性函數供我們創建自定義 task 類使用。
FutureTask 線程安全由CAS來保證。
ExecutorService executor = Executors.newCachedThreadPool(); // FutureTask包裝callbale任務,再交給線程池執行 FutureTask<Integer> futureTask = new FutureTask<>(() -> { System.out.println("子線程開始計算:"); Integer sum = 0; for (int i = 1; i <= 100; i++) sum += i; return sum; }); // 線程池執行任務, 運行結果在 futureTask 對象里面 executor.submit(futureTask); try { System.out.println("task運行結果計算的總和為:" + futureTask.get()); } catch (Exception e) { e.printStackTrace(); } executor.shutdown();
Callable 和 Future 的區別:Callable 用于產生結果,Future 用于獲取結果
如果是對多個任務多次自由串行、或并行組合,涉及多個線程之間同步阻塞獲取結果,Future 代碼實現會比較繁瑣,需要我們手動處理各個交叉點,很容易出錯。
Future 類通過 get()
方法阻塞等待獲取異步執行的運行結果,性能比較差。
JDK1.8 中,Java 提供了 CompletableFuture
類,它是基于異步函數式編程。相對阻塞式等待返回結果,CompletableFuture
可以通過回調的方式來處理計算結果,實現了異步非阻塞,性能更優。
優點:
異步任務結束時,會自動回調某個對象的方法
異步任務出錯時,會自動回調某個對象的方法
主線程設置好回調后,不再關心異步任務的執行
泡茶示例:
(內容摘自:極客時間的《Java 并發編程實戰》)
//任務1:洗水壺->燒開水 CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> { System.out.println("T1:洗水壺..."); sleep(1, TimeUnit.SECONDS); System.out.println("T1:燒開水..."); sleep(15, TimeUnit.SECONDS); }); //任務2:洗茶壺->洗茶杯->拿茶葉 CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { System.out.println("T2:洗茶壺..."); sleep(1, TimeUnit.SECONDS); System.out.println("T2:洗茶杯..."); sleep(2, TimeUnit.SECONDS); System.out.println("T2:拿茶葉..."); sleep(1, TimeUnit.SECONDS); return "龍井"; }); //任務3:任務1和任務2完成后執行:泡茶 CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf) -> { System.out.println("T1:拿到茶葉:" + tf); System.out.println("T1:泡茶..."); return "上茶:" + tf; }); //等待任務3執行結果 System.out.println(f3.join()); }
CompletableFuture 提供了非常豐富的API,大約有50種處理串行,并行,組合以及處理錯誤的方法。
除了硬編碼的異步編程處理方式,SpringBoot 框架還提供了 注解式
解決方案,以 方法體
為邊界,方法體內部的代碼邏輯全部按異步方式執行。
首先,使用 @EnableAsync
啟用異步注解
@SpringBootApplication @EnableAsync public class StartApplication { public static void main(String[] args) { SpringApplication.run(StartApplication.class, args); } }
自定義線程池:
@Configuration @Slf4j public class ThreadPoolConfiguration { @Bean(name = "defaultThreadPoolExecutor", destroyMethod = "shutdown") public ThreadPoolExecutor systemCheckPoolExecutorService() { return new ThreadPoolExecutor(3, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10000), new ThreadFactoryBuilder().setNameFormat("default-executor-%d").build(), (r, executor) -> log.error("system pool is full! ")); } }
在異步處理的方法上添加注解 @Async
,當對 execute 方法
調用時,通過自定義的線程池 defaultThreadPoolExecutor
異步化執行 execute 方法
@Service public class AsyncServiceImpl implements AsyncService { @Async("defaultThreadPoolExecutor") public Boolean execute(Integer num) { System.out.println("線程:" + Thread.currentThread().getName() + " , 任務:" + num); return true; } }
用 @Async 注解標記的方法,稱為異步方法。在spring boot應用中使用 @Async 很簡單:
調用異步方法類上或者啟動類加上注解 @EnableAsync
在需要被異步調用的方法外加上 @Async
所使用的 @Async 注解方法的類對象應該是Spring容器管理的bean對象;
事件機制在一些大型項目中被經常使用,Spring 專門提供了一套事件機制的接口,滿足了架構原則上的解耦。
ApplicationContext
通過 ApplicationEvent
類和 ApplicationListener
接口進行事件處理。如果將實現 ApplicationListener
接口的 bean 注入到上下文中,則每次使用 ApplicationContext
發布 ApplicationEvent
時,都會通知該 bean。本質上,這是標準的觀察者設計模式
。
ApplicationEvent 是由 Spring 提供的所有 Event 類的基類
首先,自定義業務事件子類,繼承自 ApplicationEvent
,通過泛型注入業務模型參數類。相當于 MQ 的消息體。
public class OrderEvent extends AbstractGenericEvent<OrderModel> { public OrderEvent(OrderModel source) { super(source); } }
然后,編寫事件監聽器。ApplicationListener
接口是由 Spring 提供的事件訂閱者必須實現的接口,我們需要定義一個子類,繼承 ApplicationListener
。相當于 MQ 的消費端
@Component public class OrderEventListener implements ApplicationListener<OrderEvent> { @Override public void onApplicationEvent(OrderEvent event) { System.out.println("【OrderEventListener】監聽器處理!" + JSON.toJSONString(event.getSource())); } }
最后,發布事件,把某個事件告訴所有與這個事件相關的監聽器。相當于 MQ 的生產端。
OrderModel orderModel = new OrderModel(); orderModel.setOrderId((long) i); orderModel.setBuyerName("Tom-" + i); orderModel.setSellerName("judy-" + i); orderModel.setAmount(100L); // 發布Spring事件通知 SpringUtils.getApplicationContext().publishEvent(new OrderEvent(orderModel));
加個餐:
[消費端]線程:http-nio-8090-exec-1,消費事件 {"amount":100.0,"buyerName":"Tom-1","orderId":1,"sellerName":"judy-1"}
[生產端]線程:http-nio-8090-exec-1,發布事件 1
[消費端]線程:http-nio-8090-exec-1,消費事件 {"amount":100.0,"buyerName":"Tom-2","orderId":2,"sellerName":"judy-2"}
[生產端]線程:http-nio-8090-exec-1,發布事件 2
[消費端]線程:http-nio-8090-exec-1,消費事件 {"amount":100.0,"buyerName":"Tom-3","orderId":3,"sellerName":"judy-3"}
[生產端]線程:http-nio-8090-exec-1,發布事件 3
上面是跑了個demo的運行結果,我們發現無論生產端還是消費端,使用了同一個線程 http-nio-8090-exec-1
,Spring 框架的事件機制默認是同步阻塞的。只是在代碼規范方面做了解耦,有較好的擴展性,但底層還是采用同步調用方式。
那么問題來了,如果想實現異步調用,如何處理?
我們需要手動創建一個 SimpleApplicationEventMulticaster
,并設置 TaskExecutor
,此時所有的消費事件采用異步線程執行。
@Component public class SpringConfiguration { @Bean public SimpleApplicationEventMulticaster applicationEventMulticaster(@Qualifier("defaultThreadPoolExecutor") ThreadPoolExecutor defaultThreadPoolExecutor) { SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster(); simpleApplicationEventMulticaster.setTaskExecutor(defaultThreadPoolExecutor); return simpleApplicationEventMulticaster; } }
我們看下改造后的運行結果:
[生產端]線程:http-nio-8090-exec-1,發布事件 1
[生產端]線程:http-nio-8090-exec-1,發布事件 2
[生產端]線程:http-nio-8090-exec-1,發布事件 3
[消費端]線程:default-executor-1,消費事件 {"amount":100.0,"buyerName":"Tom-2","orderId":2,"sellerName":"judy-2"}
[消費端]線程:default-executor-2,消費事件 {"amount":100.0,"buyerName":"Tom-1","orderId":1,"sellerName":"judy-1"}
[消費端]線程:default-executor-0,消費事件 {"amount":100.0,"buyerName":"Tom-3","orderId":3,"sellerName":"judy-3"}
SimpleApplicationEventMulticaster
這個我們自己實例化的 Bean 與系統默認的加載順序如何?會不會有沖突?
查了下 Spring 源碼,處理邏輯在 AbstractApplicationContext#initApplicationEventMulticaster
方法中,通過 beanFactory 查找是否有自定義的 Bean,如果沒有,容器會自己 new 一個 SimpleApplicationEventMulticaster
對象注入到容器中。
異步架構是互聯網系統中一種典型架構模式,與同步架構相對應。而消息隊列天生就是這種異步架構,具有超高吞吐量和超低時延。
消息隊列異步架構的主要角色包括消息生產者、消息隊列和消息消費者。
消息生產者就是主應用程序,生產者將調用請求封裝成消息發送給消息隊列。
消息隊列的職責就是緩沖消息,等待消費者消費。根據消費方式又分為點對點模式
和發布訂閱模式
兩種。
消息消費者,用來從消息隊列中拉取、消費消息,完成業務邏輯處理。
當然市面上消息隊列框架非常多,常見的有RabbitMQ、Kafka、RocketMQ、ActiveMQ 和 Pulsar 等
不同的消息隊列的功能特性會略有不同,但整體架構類似,這里就不展開了。
我們只需要記住一個關鍵點,借助消息隊列這個中間件可以高效的實現異步編程。
到此,關于“java異步編程的實現方式有哪些”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。