亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RxJava簡單源碼的示例分析

發布時間:2021-09-10 17:58:04 來源:億速云 閱讀:124 作者:柒染 欄目:大數據

今天就跟大家聊聊有關RxJava簡單源碼的示例分析,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。

demo代碼如下

public class ObservableTest {

    public static void main(String[] args) {
        Observable<Object> observable = Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> observer) throws Exception {
                observer.onNext("處理的數字是:" + Math.random() * 100);
                observer.onComplete();
            }
        });
        observable.subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object consumer) throws Exception {
                System.out.println("我處理的元素是:" + consumer);
            }
        });
    }
}

先看第一行代碼

Observable<Object> observable = Observable.create(new ObservableOnSubscribe<Object>() {
    @Override
    public void subscribe(ObservableEmitter<Object> observer) throws Exception {
        observer.onNext("處理的數字是:" + Math.random() * 100);
        observer.onComplete();
    }
});

//Observable.java
//第1560行
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    //RxJavaPlugins里有很多方法可以設置,
    //有點類似于Spring的ApplicationListener,在對應的生命周期中會被調用
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

//RxJavaPlugins.java
//第1031行
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    //如果設置了對應的方法,就執行,否則原樣返回
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

可以看到RxJavaPlugins中的方法如果不配置的方法,參數就會原樣返回,所以Observable.create最終得到的就是ObservableCreate這個類。


再來看第二行代碼

observable.subscribe(new Consumer<Object>() {
    @Override
    public void accept(Object consumer) throws Exception {
        System.out.println("我處理的元素是:" + consumer);
    }
});

//Observable.java
//第10869行
public final Disposable subscribe(Consumer<? super T> onNext) {
    return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}

//Observable.java
//第10958行
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
    ObjectHelper.requireNonNull(onNext, "onNext is null");
    ObjectHelper.requireNonNull(onError, "onError is null");
    ObjectHelper.requireNonNull(onComplete, "onComplete is null");
    ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

    //這里的onNext就是我們自己寫的Consumer類
    LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

    subscribe(ls);

    return ls;
}

//Observable.java
//第10974行
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        //還記得我們的observable變量是什么類型么?ObservableCreate!
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

//ObservableCreate.java
//第35行
protected void subscribeActual(Observer<? super T> observer) {
    //這里的observer是LambdaObserver
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    //省略部分代碼
}

//LambdaObserver.java
//第47行
public void onSubscribe(Disposable s) {
    //設置AtomicReference的值(LambdaObserver繼承了AtomicReference)
    //如果之前已經設置過了(AtomicReference的值不為空),則直接返回false
    if (DisposableHelper.setOnce(this, s)) {
        try {
            //在new LambdaObserver()的時候我們設置了onSubscribe = Functions.emptyConsumer()
            //所以這里什么都不做
            onSubscribe.accept(this);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            s.dispose();
            onError(ex);
        }
    }
}

//ObservableCreate.java
//第35行
protected void subscribeActual(Observer<? super T> observer) {
    //省略部分代碼

    try {
        //還記得source是啥么,就是你在創建Observable的時候new的ObservableOnSubscribe
        //于是終于執行到了我們編寫的代碼中
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

//ObservableOnSubscribe.java
//第6行
public static void main(String[] args) {
    Observable<Object> observable = Observable.create(new ObservableOnSubscribe<Object>() {
        //開始執行這個方法
        //observer是new CreateEmitter<T>(new LambdaObserver());
        @Override
        public void subscribe(ObservableEmitter<Object> observer) throws Exception {
            observer.onNext("處理的數字是:" + Math.random() * 100);
            observer.onComplete();
        }
    });
}

//ObservableCreate$CreateEmitter
//第61行
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        //這里的observer就是LambdaObserver
        //t就是《"處理的數字是:" + Math.random() * 100》這段字符串
        observer.onNext(t);
    }
}

//LambdaObserver.java
//第60行
public void onNext(T t) {
    if (!isDisposed()) {
        try {
            onNext.accept(t);
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            get().dispose();
            onError(e);
        }
    }
}

//ObservableOnSubscribe.java
//第13行
public static void main(String[] args) {
    //省略部分代碼
    
    observable.subscribe(new Consumer<Object>() {
        @Override
        public void accept(Object consumer) throws Exception {
            System.out.println("我處理的元素是:" + consumer);
        }
    });
}

//ObservableOnSubscribe.java
//第8行
public static void main(String[] args) {
    Observable<Object> observable = Observable.create(new ObservableOnSubscribe<Object>() {
        @Override
        public void subscribe(ObservableEmitter<Object> observer) throws Exception {
            //省略部分代碼
            observer.onComplete();
        }
    });
    //省略部分代碼
}

//ObservableCreate.java
//第95行
public void onComplete() {
    if (!isDisposed()) {
        try {
            observer.onComplete();
        } finally {
            //取消訂閱
            dispose();
        }
    }
}

//LambdaObserver.java
//第86行
public void onComplete() {
    if (!isDisposed()) {
        lazySet(DisposableHelper.DISPOSED);
        try {
            //new LambdaObserver的時候設置了為空,所以不執行操作
            onComplete.run();
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            RxJavaPlugins.onError(e);
        }
    }
}

至此,調用流程分析完成,可以看到雖然在main方法里我們只寫了幾行代碼,但是內部調用的流程還是很繁雜的

看完上述內容,你們對RxJava簡單源碼的示例分析有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

西城区| 石台县| 吉安县| 泸定县| 柳江县| 望谟县| 邛崃市| 黔西县| 莆田市| 广平县| 多伦县| 广元市| 望江县| 延长县| 三门峡市| 东阳市| 陆良县| 彭泽县| 岳池县| 慈溪市| 东乡县| 正阳县| 美姑县| 平南县| 旬阳县| 汉中市| 皮山县| 三原县| 红河县| 若尔盖县| 双桥区| 湘潭县| 霞浦县| 香河县| 榆树市| 阿尔山市| 万荣县| 眉山市| 于都县| 比如县| 长丰县|