亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Java8通過CompletableFuture怎么實現異步回調

發布時間:2022-04-28 10:23:58 來源:億速云 閱讀:156 作者:iii 欄目:開發技術

本篇內容介紹了“Java8通過CompletableFuture怎么實現異步回調”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

1 什么是CompletableFuture?

CompletableFuture是Java 8 中新增的一個類,它是對Future接口的擴展。從下方的類繼承關系圖中我們看到其不僅實現了Future接口,還有CompletionStage接口,當Future需要顯示地完成時,可以使用CompletionStage接口去支持完成時觸發的函數和操作,當2個以上線程同時嘗試完成、異常完成、取消一個CompletableFuture時,只有一個能成功。

CompletableFuture主要作用就是簡化我們異步編程的復雜性,支持函數式編程,可以通過回調的方式處理計算結果。

Java8通過CompletableFuture怎么實現異步回調

2 為什么會有CompletableFuture ?

在java5中,JDK為我們提供了Callable和Future,使我們可以很容易的完成異步任務結果的獲取,但是通過Future的get獲取異步任務結果會導致主線程的阻塞,這樣在某些場景下是非常消耗CPU資源的,進而Java8為我們提供了CompletableFuture,使我們無需阻塞等待,而是通過回調的方式去處理結果,并且還支持流式處理、組合異步任務等操作。

如果不熟悉CallableFuture的,可以看小編之前更新的這篇文章Java從源碼看異步任務計算FutureTask

3 CompletableFuture 簡單使用

下面我們就CompletableFuture 的使用進行簡單分類:

創建任務:

  • supplyAsync/runAsync

異步回調:

  • thenApply/thenAccept/thenRun

  • thenApplyAsync/thenAcceptAsync/thenRunAsync

  • exceptionally

  • handle/whenComplete

組合處理:

  • thenCombine / thenAcceptBoth / runAfterBoth

  • applyToEither / acceptEither / runAfterEither

  • thenCompose

  • allOf / anyOf

具體內容請參照以下案例:

    public static void main(String[] args) throws Exception {
        // 1.帶返回值的異步任務(不指定線程池,默認ForkJoinPool.commonPool(),單核ThreadPerTaskExecutor)
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            return 1 + 1;
        });
        System.out.println("cf1 result: " + cf1.get());
        // 2.無返回值的異步任務(不指定線程池,默認ForkJoinPool.commonPool(),單核ThreadPerTaskExecutor)
        CompletableFuture cf2 = CompletableFuture.runAsync(() -> {
            int a = 1 + 1;
        });
        System.out.println("cf2 result: " + cf2.get());
        // 3.指定線程池的帶返回值的異步任務,runAsync同理
        CompletableFuture<Integer> cf3 = CompletableFuture.supplyAsync(() -> {
            return 1 + 1;
        }, Executors.newCachedThreadPool());
        System.out.println("cf3 result: " + cf3.get());
        // 4.回調,任務執行完成后執行的動作
        CompletableFuture<Integer> cf4 = cf1.thenApply((result) -> {
            System.out.println("cf4回調拿到cf1的結果 result : " + result);
            return result + 1;
        });
        System.out.println("cf4 result: " + cf4.get());
        // 5.異步回調(將回調任務提交到線程池),任務執行完成后執行的動作后異步執行
        CompletableFuture<Integer> cf5 = cf1.thenApplyAsync((result) -> {
            System.out.println("cf5回調拿到cf1的結果 result : " + result);
            return result + 1;
        });
        System.out.println("cf5 result: " + cf5.get());
        // 6.回調(同thenApply但無返回結果),任務執行完成后執行的動作
        CompletableFuture cf6 = cf1.thenAccept((result) -> {
            System.out.println("cf6回調拿到cf1的結果 result : " + result);
        });
        System.out.println("cf6 result: " + cf6.get());
        // 7.回調(同thenAccept但無入參),任務執行完成后執行的動作
        CompletableFuture cf7 = cf1.thenRun(() -> {
        });
        System.out.println("cf7 result: " + cf7.get());
        // 8.異常回調,任務執行出現異常后執行的動作
        CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("出現異常");
        });
        CompletableFuture<Integer> cf8 = cf.exceptionally((result) -> {
            return -1;
        });
        System.out.println("cf8 result: " + cf8.get());
        // 9.當某個任務執行完成后執行的回調方法,會將執行結果或者執行期間拋出的異常傳遞給回調方法
        //   如果是正常執行則異常為null,回調方法對應的CompletableFuture的result和該任務一致;
        //   如果該任務正常執行,則get方法返回執行結果,如果是執行異常,則get方法拋出異常。
        CompletableFuture<Integer> cf9 = cf1.handle((a, b) -> {
            if (b != null) {
                b.printStackTrace();
            }
            return a;
        });
        System.out.println("cf9 result: " + cf9.get());
        // 10 與handle類似,無返回值
        try {
            CompletableFuture<Integer> cf10 = cf.whenComplete((a, b) -> {
                if (b != null) {
                    b.printStackTrace();
                }
            });
            System.out.println("cf10 result: " + cf10.get());
        } catch (Exception e) {
            System.out.println("cf10 出現異常!!!");
        }
        // 11 組合處理(兩個都完成,然后執行)有入參,有返回值
        CompletableFuture<Integer> cf11 = cf1.thenCombine(cf3, (r1, r2) -> {
            return r1 + r2;
        });
        System.out.println("cf11 result: " + cf11.get());
        // 12 組合處理(兩個都完成,然后執行)有入參,無返回值
        CompletableFuture cf12 = cf1.thenAcceptBoth(cf3, (r1, r2) -> {
        });
        System.out.println("cf12 result: " + cf12.get());
        // 13 組合處理(兩個都完成,然后執行)無入參,無返回值
        CompletableFuture cf13 = cf1.runAfterBoth(cf3, () -> {
        });
        System.out.println("cf13 result: " + cf13.get());
        // 14 組合處理(有一個完成,然后執行)有入參,有返回值
        CompletableFuture<Integer> cf14 = cf1.applyToEither(cf3, (r) -> {
            return r;
        });
        System.out.println("cf14 result: " + cf14.get());
        // 15 組合處理(有一個完成,然后執行)有入參,無返回值
        CompletableFuture cf15 = cf1.acceptEither(cf3, (r) -> {
        });
        System.out.println("cf15 result: " + cf15.get());
        // 16 組合處理(有一個完成,然后執行)無入參,無返回值
        CompletableFuture cf16 = cf1.runAfterEither(cf3, () -> {
        });
        System.out.println("cf16 result: " + cf16.get());
        // 17 方法執行后返回一個新的CompletableFuture
        CompletableFuture<Integer> cf17 = cf1.thenCompose((r) -> {
            return CompletableFuture.supplyAsync(() -> {
                return 1 + 1;
            });
        });
        System.out.println("cf17 result: " + cf17.get());
        // 18 多個任務都執行成功才會繼續執行
        CompletableFuture.allOf(cf1,cf2,cf3).whenComplete((r, t) -> {
            System.out.println(r);
        });
        // 18 多個任務任意一個執行成功就會繼續執行
        CompletableFuture.anyOf(cf1,cf2,cf3).whenComplete((r, t) -> {
            System.out.println(r);
        });
    }

4 CompletableFuture 源碼分析

首先我們可以從注釋中看到,它對CompletionStageFuture接口擴展的一些描述,這些也是它的一些重點。

除了直接操作狀態和結果的相關方法外,CompletableFuture還實現了CompletionStage接口的如下策略:

  • (1)為非異步方法的依賴完成提供的操作,可以由完成當前CompletableFuture的線程執行,也可以由完成方法的任何其他調用方執行。

  • (2)所有沒有顯式Executor參數的異步方法都使用ForkJoinPool.commonPool()執行(除非它不支持至少兩個并行級別,在這種情況下,將創建一個新線程來運行每個任務)。為了簡化監視、調試和跟蹤,所有生成的異步任務都是CompletableFuture的實例,異步完成任務。

不了解ForkJoinPool的可以閱讀小編之前更新的這篇文章一文帶你了解Java中的ForkJoin。

  • (3)所有CompletionStage方法都是獨立于其他公共方法實現的,因此一個方法的行為不會受到子類中其他方法重寫的影響。

CompletableFuture實現了Future接口的如下策略:

  • 因為(與FutureTask不同)這個類對導致它完成的計算沒有直接控制權,所以取消被視為另一種形式的異常完成,所以cancel操作被視為是另一種異常完成形式(new CancellationException()具有相同的效果。)。方法isCompletedExceptionally()可以用來確定一個CompletableFuture是否以任何異常的方式完成。

  • 如果異常完成時出現CompletionException,方法get()和get(long,TimeUnit)會拋出一個ExecutionException,其原因與相應CompletionException中的原因相同。為了簡化在大多數上下文中的使用,該類還定義了join()和getNow()方法,在這些情況下直接拋出CompletionException。

4.1 創建異步任務

我們先看一下CompletableFuture是如何創建異步任務的,我們可以看到起創建異步任務的核心實現是兩個入參,一個入參是Executor,另一個入參是Supplier(函數式編程接口)。其中也提供了一個入參的重載,一個入參的重載方法會獲取默認的Executor,當系統是單核的會使用ThreadPerTaskExecutor,多核時使用ForkJoinPool.commonPool()

注意:這里默認ForkJoinPool.commonPool()線程池,如果所有異步任務都使用該線程池話,出現問題不容易定位,如果長時間占用該線程池可能影響其他業務的正常操作,stream的并行流也是使用的該線程池。

其中還封裝了靜態內部類AsyncSupply,該類代表這個異步任務,實現了Runnable,重寫了run方法。

    private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

    private static final boolean useCommonPool =
        (ForkJoinPool.getCommonPoolParallelism() > 1);

	public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }

    static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
                                                     Supplier<U> f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture<U> d = new CompletableFuture<U>();
        e.execute(new AsyncSupply<U>(d, f));
        return d;
    }

	/**
	 * 靜態內部類,繼承了ForkJoinTask<Void>、實現了Runnable、AsynchronousCompletionTask
	 */
    static final class AsyncSupply<T> extends ForkJoinTask<Void>
            implements Runnable, AsynchronousCompletionTask {
        CompletableFuture<T> dep; Supplier<T> fn;
        AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
            this.dep = dep; this.fn = fn;
        }

        public final Void getRawResult() { return null; }
        public final void setRawResult(Void v) {}
        public final boolean exec() { run(); return true; }

        public void run() {
            CompletableFuture<T> d; Supplier<T> f;
            if ((d = dep) != null && (f = fn) != null) {
                dep = null; fn = null;
                if (d.result == null) {
                    try {
                        d.completeValue(f.get());
                    } catch (Throwable ex) {
                        d.completeThrowable(ex);
                    }
                }
                d.postComplete();
            }
        }
    }

Supplier類是一個函數式的接口,@FunctionalInterface注解就是函數式編程的標記。

package java.util.function;

@FunctionalInterface
public interface Supplier<T> {

    T get();
}

4.2 異步任務回調

異步任務回調,我們以thenApply/thenApplyAsync為例來看一下其實現原理,方法名含有Async的會傳入asyncPool。uniApplyStage方法通過判斷e是否有值,來區分是從哪個方法進來的。thenApply不會傳入 Executor,它優先讓當前線程來執行后續 stage 的任務。

  • 當發現前一個 stage 已經執行完畢時,直接讓當前線程來執行后續 stage 的 task。

  • 當發現前一個 stage 還沒執行完畢時,則把當前 stage 包裝成一個 UniApply 對象,放到前一個 stage 的棧中。執行前一個 stage 的線程,執行完畢后,接著執行后續 stage 的 task。

thenApplyAsync會傳入一個 Executor,它總是讓 Executor 線程池里面的線程來執行后續 stage 的任務。

  • 把當前 stage 包裝成一個 UniApply 對象,放到前一個 stage 的棧中,直接讓 Executor 來執行。

    public <U> CompletableFuture<U> thenApply(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(null, fn);
    }

    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(asyncPool, fn);
    }

    private <V> CompletableFuture<V> uniApplyStage(
        Executor e, Function<? super T,? extends V> f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture<V> d =  new CompletableFuture<V>();
        // Async直接進入,不是Async執行uniApply嘗試獲取結果
        if (e != null || !d.uniApply(this, f, null)) {
            UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
            push(c);
            c.tryFire(SYNC);
        }
        return d;
    }
 
    final <S> boolean uniApply(CompletableFuture<S> a,
                               Function<? super S,? extends T> f,
                               UniApply<S,T> c) {
        Object r; Throwable x;
        // 判斷當前CompletableFuture是否已完成,如果沒完成則返回false;如果完成了則執行下面的邏輯。
        if (a == null || (r = a.result) == null || f == null)
            return false;
        tryComplete: if (result == null) {
            // 判斷任務結果是否是AltResult類型
            if (r instanceof AltResult) {
                if ((x = ((AltResult)r).ex) != null) {
                    completeThrowable(x, r);
                    break tryComplete;
                }
                r = null;
            }
            try {
                // 判斷當前任務是否可以執行
                if (c != null && !c.claim())
                    return false;
                // 獲取任務結果
                @SuppressWarnings("unchecked") S s = (S) r;
                // 執行
                completeValue(f.apply(s));
            } catch (Throwable ex) {
                completeThrowable(ex);
            }
        }
        return true;
    }

    static final class UniApply<T,V> extends UniCompletion<T,V> {
        Function<? super T,? extends V> fn;
        UniApply(Executor executor, CompletableFuture<V> dep,
                 CompletableFuture<T> src,
                 Function<? super T,? extends V> fn) {
            super(executor, dep, src); this.fn = fn;
        }
        final CompletableFuture<V> tryFire(int mode) {
            CompletableFuture<V> d; CompletableFuture<T> a;
            if ((d = dep) == null ||
                !d.uniApply(a = src, fn, mode > 0 ? null : this))
                return null;
            dep = null; src = null; fn = null;
            return d.postFire(a, mode);
        }
    }

    final void push(UniCompletion<?,?> c) {
        if (c != null) {
            while (result == null && !tryPushStack(c))
                lazySetNext(c, null); // clear on failure
        }
    }

    final boolean completeValue(T t) {
        return UNSAFE.compareAndSwapObject(this, RESULT, null,
                                           (t == null) ? NIL : t);
    }

4.3 異步任務組合

我們再thenCombine方法為例看一下CompletableFuture是如何處理組合任務的,我們可以看到thenCombine的源碼與thenApply的源碼基本上是一直的,只不過組合的時候不僅僅是判斷一個,需要集合具體場景,判斷多個CompletableFuture

    public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(null, other, fn);
    }

    private <U,V> CompletableFuture<V> biApplyStage(
        Executor e, CompletionStage<U> o,
        BiFunction<? super T,? super U,? extends V> f) {
        CompletableFuture<U> b;
        if (f == null || (b = o.toCompletableFuture()) == null)
            throw new NullPointerException();
        CompletableFuture<V> d = new CompletableFuture<V>();
        if (e != null || !d.biApply(this, b, f, null)) {
            BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f);
            bipush(b, c);
            c.tryFire(SYNC);
        }
        return d;
    }

    final <R,S> boolean biApply(CompletableFuture<R> a,
                                CompletableFuture<S> b,
                                BiFunction<? super R,? super S,? extends T> f,
                                BiApply<R,S,T> c) {
        Object r, s; Throwable x;
        // 此處不止要判斷a還得判斷b
        if (a == null || (r = a.result) == null ||
            b == null || (s = b.result) == null || f == null)
            return false;
        tryComplete: if (result == null) {
            if (r instanceof AltResult) {
                if ((x = ((AltResult)r).ex) != null) {
                    completeThrowable(x, r);
                    break tryComplete;
                }
                r = null;
            }
            // 這里不止判斷a的結果r還要判斷b的結果s
            if (s instanceof AltResult) {
                if ((x = ((AltResult)s).ex) != null) {
                    completeThrowable(x, s);
                    break tryComplete;
                }
                s = null;
            }
            // 最后將rr, ss傳入
            try {
                if (c != null && !c.claim())
                    return false;
                @SuppressWarnings("unchecked") R rr = (R) r;
                @SuppressWarnings("unchecked") S ss = (S) s;
                completeValue(f.apply(rr, ss));
            } catch (Throwable ex) {
                completeThrowable(ex);
            }
        }
        return true;
    }

    static final class BiApply<T,U,V> extends BiCompletion<T,U,V> {
        BiFunction<? super T,? super U,? extends V> fn;
        BiApply(Executor executor, CompletableFuture<V> dep,
                CompletableFuture<T> src, CompletableFuture<U> snd,
                BiFunction<? super T,? super U,? extends V> fn) {
            super(executor, dep, src, snd); this.fn = fn;
        }
        // tryFire方法也同樣的多可個b
        final CompletableFuture<V> tryFire(int mode) {
            CompletableFuture<V> d;
            CompletableFuture<T> a;
            CompletableFuture<U> b;
            if ((d = dep) == null ||
                !d.biApply(a = src, b = snd, fn, mode > 0 ? null : this))
                return null;
            dep = null; src = null; snd = null; fn = null;
            return d.postFire(a, b, mode);
        }
    }

“Java8通過CompletableFuture怎么實現異步回調”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

长垣县| 梅州市| 公主岭市| 龙岩市| 拉萨市| 德钦县| 乐陵市| 南川市| 南靖县| 兰西县| 石家庄市| 普兰店市| 南平市| 纳雍县| 诏安县| 仁布县| 永丰县| 汉阴县| 常德市| 巴里| 宁明县| 东海县| 怀柔区| 当涂县| 印江| 大石桥市| 临湘市| 峨边| 三原县| 铁岭市| 通化市| 阜城县| 饶阳县| 甘德县| 甘泉县| 登封市| 千阳县| 泌阳县| 黄骅市| 平武县| 岱山县|