您好,登錄后才能下訂單哦!
前言
大家越用RxJava,越覺得它好用,所以不知不覺地發現代碼里到處都是RxJava的身影。然而,RxJava也不是銀彈,其中仍然有很多問題需要解決。這里,我簡單地總結一下自己遇到的一些“坑”,內容上可能會比較松散。
一、考慮主線程的切換
RxJava中一個常用的使用方法是——在其他線程中做處理,然后切換到UI線程中去更新頁面。其中,線程切換就是使用了observeOn()。后臺下載文件,前臺顯示下載進度就可以使用這種方式完成。然而,實踐發現這其中有坑。如果文件比較大,而下載包的粒度又比較小,這將導致很多通知積壓下來,最終導致錯誤。
這種錯誤其實也是可以理解的,畢竟MainLooper是根據Message來工作的,Message過多必然會導致一些問題。當然,這還是比較想當然的想法,最終還是需要到源碼中一探究竟。ObserveOn的原理在前面關于RxJava的文章已經有過分析,這里還是簡單列一下代碼。其中的重點還是OperatorObserveOn的內部類——ObserveOnSubscriber。其重要代碼片段如下:
/** Observe through individual queue per observer. */ static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 { final Subscriber<? super T> child; final Scheduler.Worker recursiveScheduler; final NotificationLite<T> on; final boolean delayError; final Queue<Object> queue; /** The emission threshold that should trigger a replenishing request. */ final int limit; // the status of the current stream volatile boolean finished; final AtomicLong requested = new AtomicLong(); final AtomicLong counter = new AtomicLong(); /** * The single exception if not null, should be written before setting finished (release) and read after * reading finished (acquire). */ Throwable error; /** Remembers how many elements have been emitted before the requests run out. */ long emitted; // do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should // not prevent anything downstream from consuming, which will happen if the Subscription is chained public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) { this.child = child; this.recursiveScheduler = scheduler.createWorker(); this.delayError = delayError; this.on = NotificationLite.instance(); int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE; // this formula calculates the 75% of the bufferSize, rounded up to the next integer this.limit = calculatedSize - (calculatedSize >> 2); if (UnsafeAccess.isUnsafeAvailable()) { queue = new SpscArrayQueue<Object>(calculatedSize); } else { queue = new SpscAtomicArrayQueue<Object>(calculatedSize); } // signal that this is an async operator capable of receiving this many request(calculatedSize); } void init() { // don't want this code in the constructor because `this` can escape through the // setProducer call Subscriber<? super T> localChild = child; localChild.setProducer(new Producer() { @Override public void request(long n) { if (n > 0L) { BackpressureUtils.getAndAddRequest(requested, n); schedule(); } } }); localChild.add(recursiveScheduler); localChild.add(this); } @Override public void onNext(final T t) { if (isUnsubscribed() || finished) { return; } if (!queue.offer(on.next(t))) { onError(new MissingBackpressureException()); return; } schedule(); } @Override public void onCompleted() { if (isUnsubscribed() || finished) { return; } finished = true; schedule(); } @Override public void onError(final Throwable e) { if (isUnsubscribed() || finished) { RxJavaHooks.onError(e); return; } error = e; finished = true; schedule(); } protected void schedule() { if (counter.getAndIncrement() == 0) { recursiveScheduler.schedule(this); } } }
關鍵點就在于這個queue成員,這個隊列存放了需要進行發送給下行線程的消息。對于主線程來說,符合其實是比較重的,從消息的生產者和消費者的模式講,過多過快的消息會導致消息阻塞。甚至,都到不了阻塞的情況,因為queue的大小會有上限,在onNext()
方法中的queue.offer()
可能會產生異常,這取決于queue的實現方式。但無論如何都不可能無限大,所以無法保證絕對不出異常。
解決這個問題的方法其實也很簡單,可以在生產者降低消息的產生頻率。也可以在消息處理的時候先不進行線程切換,而是通過判斷,在必要的時候進行線程切換,比如使用runOnUIThread()
。
二、RxJava避免內存泄漏
RxJava的響應式機制本質上還是回調實現的,因此內存泄漏也是會出現的。倘若不對Subscription進行管理,內存泄漏會非常嚴重。對于Subscription,其實有幾個比較廣泛使用的方法,比如RxLifecycle,以及簡單的CompositeSubscription。至于它們的使用方法,其實都非常簡單,這里就不贅述了。
說到內存泄漏,就談點題外話,動畫也可能導致內存泄漏。其原因仍然是一些回調函數,這些回調函數實現的View變化的功能,但是在被撤銷以后,回調函數沒有取消掉,同時View可能持有Context信息,從而導致內存泄漏。最近才發現,LoadToastView這個開源庫一直存在內存泄漏,其原因正如上文所說。
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作能帶來一定的幫助,如果有疑問大家可以留言交流,謝謝大家對億速云的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。