您好,登錄后才能下訂單哦!
本篇文章為大家展示了RxJava的響應流程分析基本調用流程是怎樣的,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
首先來看一個簡單的例子:
運行結果為:
從結果中我們不難看出整體的調用流程:
首先通過調用Observable.create()
方法生成一個被觀察者,緊接著在這里我們又調用了map()
方法對原被觀察者進行數據流的變換操作,生成一個新的被觀察者(為何是新的被觀察者后文會講),最后調用subscribe()
方法,傳入我們的觀察者,這里觀察者訂閱的則是調用map()之后生成的新被觀察者。
在整個過程中我們會注意到三個主角:Observable、OnSubscribe、Subscriber,所有的操作都是圍繞它們進行的。不難看出這里三個角色的分工:
Observable:被觀察者的來源,亦或說是被觀察者本身
OnSubscribe:用來通知觀察者的不同行為
Subscriber:觀察者,通過實現對應方法來產生具體的處理。
所以接下來我們以這三個角色為中心來分析具體的流程。
首先我們進入Observable.create()
看看:
這里調用構造函數生成了一個Observable對象并將傳入的OnSubscribe賦給自己的成員變量onsubscribe
,等等,這個hook是從哪里冒出來的?我們向上找:
RxJavaObservableExecutionHook
這個抽象Proxy類默認對OnSubscribe對象不做任何處理,不過通過繼承該類并重寫onCreate()
等方法我們可以對這些方法對應的時機做一些額外處理比如打Log或者一些數據收集方面的工作。
到目前最初始的被觀察者已經生成了,我們再來看看觀察者這邊。我們知道通過調用observable.subscribe()
方法傳入一個觀察者即構成了觀察者與被觀察者之間的訂閱關系,那么這內部又是如何實現的呢?看代碼:
這里我們略去部分無關代碼看主要部分,subscribe.onStart()
默認空實現我們暫且不用管它,對于傳進來的subscriber
要包裝成SafeSubscriber
,這個SafeSubscriber
對原來的subscriber
的一系列方法做了更完善的處理,包括:onError()
與onCompleted()
只會有一個被執行;保證一旦onError()
或者onCompleted()
被執行,將不再能再執onNext()
等情況。這里封裝為SafeSubscriber
之后,調用onSubscribe.call()
,并將subscriber傳入,這樣就完成了一次訂閱。
顯而易見,Subscriber作為觀察者,在訂閱行為完成后,其具體行為在整個鏈式調用中起著至關重要的作用,我們來看看它內部的構成的主要部分:
每個Subscriber都持有一個SubscriptionList
,這個list保存的是所有該觀察者的訂閱事件,同時Subscriber也對應實現了Subscription
接口,當這個Subscriber取消訂閱的時候會將持有事件列表中的所有Subscription
取消訂閱,并且從此不再接受任何訂閱事件。同時,通過Producer
可以去限定該Subscriber所接收的數據流的總量,這個限制量其實是加在Subscriber.onNext()
方法上的,onComplete()
、onError()
則不會受到其影響。因為是底層抽象類,onNext()
、onComplete()
、onError()
統一不在這里處理。
在收到Observable的消息之前我們有可能會對數據流進行處理,例如map()、flatMap()、deBounce()、buffer()等方法,本例中我們用了map()方法,它接收了原被觀察者發射的數據并將通過該方法返回的結果作為新的數據發射出去,相當于做了一層中間轉化:
我們接著看這個轉化過程:
這里是通過一個lift()
方法實現的,再查看其他的轉化方法發現內部也都使用lift()實現的,看來這個lift()
就是關鍵所在了,不過不急,我們先來看看這個OperationMap
是什么:
OperationMap實現了Operator接口的call()
方法,該方法接受外部傳入的觀察者,并將其作為參數構造出了一個新的觀察者,我們不難發現o.onNext(transformer.call(t))
;這一句起了至關重要的作用,這里的接口transformer
將泛型T轉化為泛型R:
這樣之后,再將轉換后的數據傳回至原觀察者的onNext()方法,就完成了觀察數據流的轉化,但是你應該也注意到了,我們用來做轉換的這個新的觀察者并沒有實現訂閱被觀察者的操作,這個訂閱操作又是在哪里實現的呢?答案就是接下來的lift()
:
在這里我們新生成了一個Observable
對象,在這個新對象的onSubscribe
成員的call()方法中我們通過operator.call()
拿到之前生成的未產生訂閱的觀察者st,之后將它作為參數傳入一開始的onSubscribe.call()
中,即完成了這個中間訂閱的過程。
現在我們將整個流程梳理一下:
一次map()變換
根據Operator實例生成新的Subscriber
通過lift()生成新的Observable
原Subscriber訂閱新的Observavble
新的Observable中onSubscribe通知新Subscriber訂閱原Observable
新Subscriber將消息傳給原Subscriber。
為了便于理解,這里借用一下扔物線的圖:
以上就是一次map()
變換的流程,事實上多次map()
也是同樣道理:最外層的目標Subscriber發生訂閱行為后,onSubscribe.onNext()
會逐層嵌套調用,直至初始Observable被最底層的Subscriber訂閱,通過Operator的一層層變化將消息傳到目標Subscriber。再次祭出扔物線的圖:
至于其他的多種變化的實現流程也都很類似,借助于Operator的不同實現來達到變換數據流的目的。例如其中的flatMap()
,它需要進行兩次lift()
,其中第二次是OperationMerge,將轉換成的每一個Observable數據流通過InnerSubscriber
這個紐帶訂閱后,在InnerSubscriber的onNext()
中拿到R,再通過傳入的parent(也就是原MergeSubscriber
)將它們全部發射(emit)出去,由最外層我們傳入的Subscriber
統一接收,這樣就完成了 T => Observable<R> => R
的轉化。
除此之外,還有許多各式各樣的操作符,如果它們還不能滿足你的需要,你也可以通過實現Operator接口定制新的操作符。靈活運用它們往往能達到事半功倍的效果,比如通過使用sample()
、debounce()
等操作符有效避免backpressure
的需要等等,這里就不一一介紹了。
上述內容就是RxJava的響應流程分析基本調用流程是怎樣的,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。