Chinese (Traditional) (中文(繁體)) translation by zheban (you can also view the original English article)
如果你的Android APP想在Google Play上架並獲得五星級評論,那麼你的APP要能夠進行多工操作。
如今的用戶既希望與APP進行交互,又希望APP在後臺工作,而且這是最起碼的。 這聽起來可能很簡單,但是預設情況下Android是單執行緒的,所以如果你要達到用戶的期望,那麼你遲早不得不創建多執行緒。
在本系列的前一篇文章中,我們介紹了RxJava,一個用於JVM的響應式庫,可以説明你創建回應資料和事件變化的Android APP。 你也可以使用此庫來同時回應資料和事件。
在這篇文章中,我將展示如何使用RxJava的運算子來最終在Android上實現流暢體驗。 在本文結尾,你將瞭解如何使用RxJava操作符創建其他執行緒,指定執行緒任務,然後將結果發回到Android的主UI執行緒,所有這些都只需幾行代碼。
而且,沒有完美的技術,我還會告訴你RxJava庫的一個重大缺陷,然後再展示如何使用運算子確保你的Android專案永遠不會出現此問題。
操作符介紹
RxJava擁有龐大的運算子集合,主要用於説明你修改,過濾,合併和轉換由你的Observable
發出的資料資料。 你將在官方文檔中找到RxJava運算子的完整清單,儘管不需要你記住每一個操作符,但是值得花一些時間閱讀此列表,這樣你就知道你可以執行哪些不同的資料轉換。
RxJava的運算子列表已經非常詳盡,但是如果你不能在資料轉換中找到完美的運算子,那麼你可以隨時連接多個運算子。 將運算子應用於Observable
通常返回另一個Observable
,因此你可以繼續應用運算子,直到獲得所需的結果。
有太多的RxJava操作符以至於一篇文章寫不完,而官方的RxJava文檔已經很好地介紹了用於資料轉換的所有操作符,所以我將專注於兩個讓你的Android開發更容易的操作符,他們是:subscribeOn()
和observeOn()
。
使用RxJava操作符進行多執行緒
如果想你APP提供最佳的使用者體驗,那麼需要APP能夠執行密集型或長時間運行的任務,還能同時執行多個任務而不會阻塞主UI執行緒。
例如,假設你的應用需要從兩個不同的資料庫中獲取一些資訊 如果你在Android主執行緒上一個接一個地執行這兩個任務,那麼這不僅需要花費大量的時間,而且在你的APP從兩個資料庫中檢索出資訊之前,UI將無回應。 多麼糟糕的用戶體驗!
一個更好的解決方案是創建兩個執行緒,你可以同時執行這兩個任務而不會阻塞主UI執行緒。 這種方法意味著工作會快兩倍完成,並且用戶還能繼續與APP的UI進行交互。 而且你的用戶可能甚至不知道你的APP在後臺執行一些密集且長期運行的工作 -所有的資料庫資訊都將簡單地顯示在APP的UI中,就像魔術一樣!
開箱即用,Android確實提供了一些可用於創建執行緒(包括Services
和IntentServices
)的工具,但是這些解決方案難以實現,並且代碼複雜。 另外,如果實現多執行緒時稍不注意,APP就會漏洞百出,拋出各種錯誤。
為了讓Android上的多線程更令人頭痛,Android的主UI線程是唯壹可以更新應用程序用護界面的線程。 如果你想把其他執行緒執行的結果在UI上更新,那麼你通常需在主UI執行緒上創建一個Handler
,然後通過Handler
把資料從後臺執行緒傳到主執行緒。 這意味著更多,更複雜的代碼和更多的錯誤機會。
RxJava有兩個運算子可以幫你避免這種複雜性和潛在的錯誤。
注意,你可以把這些操作符與Schedulers
結合,它們本質上是允許你指定執行緒。 現在,只要把scheduler看作是執行緒的代名詞。
-
subscribeOn(Scheduler)
:預設情況下,Observable
會在已聲明訂閱的執行緒上發射資料,即調用.subscribe
方法的地方。 在Android中,這通常是主UI執行緒。 你可以使用subscribeOn()
運算子來定義不同的Scheduler
,在Scheduler裡面Observable
執行和發射資料。 -
observeOn(Scheduler)
:你可以使用此運算子重定向Observable
發射資料到不同的Scheduler
,有效地更改Observable
發送通知的執行緒,並擴展資料消耗的執行緒。
RxJava有許多schedulers,你可以使用它們創建不同的執行緒,包括:
-
Schedulers.io()
:用於與IO相關的任務。 -
Schedulers.computation()
:用於計算相關的任務。 預設情況下,計算調度程式中的執行緒數被設備上可用的CPU數限制。 -
Schedulers.newThread()
:創建一個新執行緒。
讓我們來看看如何使用subscribeOn()
和observeOn()
的例子,並在action中看到一些schedulers。
subscribeOn()
在Android中,你通常會使用subscribeOn()
和Scheduler
來切換執行緒,該執行緒執行長時間運行或密集型工作,因此不會阻塞主UI執行緒。 例如,您可能會決定在io()
scheduler上導入大量資料,或者在computation()
scheduler上執行一些計算。
在下面的代碼,我們正在創造一個新的執行緒,其中Observable
將執行操作並發射值1
,2
和3
。
Observable.just(1, 2, 3) .subscribeOn(Schedulers.newThread()) .subscribe(Observer);
上面代碼創建了一個執行緒並開始在該執行緒上發射資料,你可能懷疑這個observable 真的在一個新的執行緒上運行嗎。 一種驗證方法是在Android Studio的Logcat Monitor中列印APP當前正在使用執行緒的名稱。
還有一種方便的方法,在上一篇文章 Get Started With RxJava中, 我們創建了一個APP,它在Observable生命週期中的各個階段向Logcat Monitor發送消息,所以我們可以重用代碼。
打開上文中創建的專案,調整代碼以便使用上述Observable
作為源Observable
。 然後添加subscribeOn()
運算子並指定發送到Logcat的消息包含當前執行緒名稱。
你完成的項目應該是這樣的:
import android.support.v7.app.AppCompatActivity; import android.os.Bundle; import android.util.Log; import io.reactivex.Observable; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; import io.reactivex.schedulers.Schedulers; public class MainActivity extends AppCompatActivity { public static final String TAG = "MainActivity"; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); Observable.just(1, 2, 3) .subscribeOn(Schedulers.newThread()) .subscribe(Observer); } Observer<Integer> Observer = new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.e(TAG, "onSubscribe" + Thread.currentThread().getName()); } @Override public void onNext(Integer value) { Log.e(TAG, "onNext: " + value + Thread.currentThread().getName()); } @Override public void onError(Throwable e) { Log.e(TAG, "onError: "); } @Override public void onComplete() { Log.e(TAG, "onComplete: All Done!" + Thread.currentThread().getName()); } }; }
確保Android Studio的Logcat監視器已打開(通過選擇Android Monitor選項卡,其次是Logcat),然後在物理Android設備或AVD上運行APP。 你應該在Logcat Monitor中看到以下輸出:

在這裡,可以看到.subscribe
在主UI執行緒上被調用,但是observable在一個完全不同的執行緒上運行。
不管你把它放在observable鏈的哪裡,subscribeOn()
操作符作用都相同; 但是,你不能在同一個鏈中使用多個subscribeOn()
操作符。 如果你使用了多個subscribeOn()
,那麼你的鏈只會使用最接近源observable的那個subscribeOn()
。
observeOn()
不像subscribeOn()
,你把observeOn()
放在你鏈條中的位置很重要的,因為這個操作符只改變了出現在下游的observables使用的執行緒。
例如,如果將以下內容插入到鏈中,則從該點開始出現在鏈中的每個observable將使用新執行緒。
.observeOn(Schedulers.newThread())
該鏈將繼續在新執行緒上運行,直到遇到另一個observeOn()
運算子,此時它將切換到該運算子指定的執行緒。 你可以通過在鏈中插入多個observeOn()
運算子來控制執行緒,該執行緒是特定observables發送通知的執行緒。
在開發Android APP時,通常使用observeOn()
將後臺執行緒執行的結果發送到Android的主UI執行緒。 重定向發射到Android主UI執行緒的最簡單的方法是使用AndroidSchedulers.mainThread Scheduler
,它是RxAndroid庫的一部分而不是RxJava庫的一部分。
RxAndroid庫包含針對RxJava 2的Android特定綁定,這使RxAndroid成為Android開發人員的寶貴資源(我們將在本系列的下一篇文章中更詳細地介紹)。
要將RxAndroid添加到項目中,打開module級的build.gradle文件,將最新版本庫的依賴添加進來。 在撰寫本文時,RxAndroid的最新版本為2.0.1,因此我添加了以下內容:
dependencies { ... ... ... compile 'io.reactivex.rxjava2:rxandroid:2.0.1' }
將這個庫添加到你的項目之後,你可以使用一行代碼指定observable的結果,該結果應該被發送到你APP的主UI執行緒:
.observeOn(AndroidSchedulers.mainThread())
考慮到 communicating with your app’s main UI thread這篇文章會佔用官方Android文檔的全頁,這是一個巨大的改進,因為在創建多執行緒Android APP時節省了大量時間。
RxJava的主要缺點
雖然RxJava對Android開發人員很有幫助,但沒有技術是完美的,RxJava確實有一個很大的陷阱,有可能導致你APP崩潰。
預設情況下,RxJava運行基於推送的工作流程:資料由上游Observable
產生,然後被推送到下游Observer
。 基於推送的工作流程的主要問題是生產者(在這種情況下是Observable
)發射資料太快,而消費者(Observer
)來不及處理。
快速的 Observable
和慢速的Observer
可能會迅速導致積壓的太多未消耗的資料,這將消耗系統資源,甚至可能導致OutOfMemoryException
。 這個問題被稱為背壓。
如果懷疑你APP中發生背壓,有一些可能的解決方案,包括使用操作符減少正在生產的產品數量。
使用sample()
和throttlefirst()
創建採樣週期
如果Observable
正在發射大量的資料,不需要分配Observer
來接收每一項資料。
如果可以安全地忽略某些Observable
發射的資料,那麼可以使用幾個運算子來創建採樣週期,然後選擇在這些時間段內發射特定值:
-
sample()
操作符在指定時間間隔內檢查Observable的輸出,然後取得在採樣過程中被發射最頻繁的資料。 例如,如果項目中有.sample(5, SECONDS)
,則Observer將收到每五秒間隔內發出的最後一個值。 -
throttleFirst()
操作符獲取採樣時段期間發射的第一值。 例如,如果項目中有.throttlefirst(5, SECONDS)
,則Observer將會收到在每五秒間隔內發出的第一個值。

buffer()
批量發射
如果你不能安全地跳過任何發射的資料,那麼你仍然可以通過將發射的資料批量化來減輕Observer
的壓力 處理批量資料通常比單獨處理多個資料更有效,因此這種方法應該能提高消費率。
你可以使用buffer()
操作符創建批量資料。 在這裡,我們正在使用buffer()
將三秒鐘內發射的所有資料批量化:
Observable.range(0, 10) .buffer(3, SECONDS) .subscribe(System.out::println);

或者,可以使用buffer()
創建由特定數量的資料組成的批次。 例如,在這裡我們告訴buffer()
將資料分成四組:
Observable.range(0, 10) .buffer(4) .subscribe(System.out::println);
用Flowables替換Observables
減少發射數量的另一種方法就是用Flowable
替換Observable
。
在RxJava 2中,RxJava團隊決定將標準Observable
分為兩種類型:我們在本系列中一直在查看的常規類型和Flowable
。
Flowables
的功能與Observable
大致相同,但有一個主要區別:observer請求多少資料, Flowables
就發射多少。 如果你有一個Observable
比它指定的observer發射更多的資料,那麼你可能需要考慮切換到Flowable
。
在開始Flowable
之前,需要添加以下import語句:
import io.reactivex.Flowable;
然後,與創建Observables
一樣的方式創建Flowable
。 例如,以下每個代碼片段都將創建一個能夠發出資料Flowable
:
Flowable<String> flowable = Flowable.fromArray(new String[] {"south", "north", "west", “east”}); ... flowable.subscribe()
Flowable<Integer> flowable = Flowable.range(0, 20); ... flowable.subscribe()
現在,你可能會想:既然使用Flowables
不用擔心背壓,為什麼我不直接使用Flowables而要使用Observable
呢? 答案是,Flowable
比普通Observable
需要更多的開銷,所以為了創造一個高性能的APP,你應該堅持使用Observable
,除非你懷疑你的APP出現背壓。
Singles
Flowable
不是Observable
裡面的唯一變數,因為庫還包含Single
類。
當只需要發出一個值時,Singles
很有用。 在這些情況下,創建一個Observable
可以感覺有點過分,而一個Single
簡單地發出一個值,然後就完成了,調用如下:
-
onSuccess()
:Single
發出唯一的值。 -
onError()
:如果Single
不能發出資料,那麼它將通過這個方法得到Throwable
。
Single
只要調用這些方法之一,就立即終止。
讓我們來看一個Single
的例子- 為了節省時間,我們重用代碼:
import android.os.Bundle; import android.support.v7.app.AppCompatActivity; import android.util.Log; import io.reactivex.Single; import io.reactivex.SingleObserver; import io.reactivex.disposables.Disposable; public class MainActivity extends AppCompatActivity { public static final String TAG = "MainActivity"; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); Single.just("Hello World") .subscribe(getSingleObserver()); } private SingleObserver<String> getSingleObserver() { return new SingleObserver<String>() { @Override public void onSubscribe(Disposable d) { Log.e(TAG, "onSubscribe"); } @Override public void onSuccess(String value) { Log.e(TAG, " onSuccess : " + value); } @Override public void onError(Throwable e) { Log.e(TAG, "onError: "); } }; } }
在AVD或物理Android設備上運行,將在Android Studio的Logcat Monitor中看到以下輸出:

如果你改變主意並想任何時候將Single
轉換成Observable
,那麼RxJava有你需要的所有操作符,其中包括:
-
mergeWith()
:將多個Singles
合併成一個單一的Observable
。 -
concatWith()
:將多個Singles
連在一起,形成Observable
發射資料 -
toObservable()
:將一個Single
轉換為一個Observable
,發出最初由Single發出的資料,然後完成。
總結
在這篇文章中,我們探索了創建和管理多執行緒的RxJava操作符,使用這些操作符不需要考慮傳統多執行緒的複雜性和潛在的錯誤。 我們還看到了如何使用RxAndroid庫與Android主UI執行緒進行通信,而且只需一行代碼,還看到了如何解決背壓問題。
在本系列中,我們已經對RxAndroid庫進行了多次研究,但是這個庫中包含了Android特定的RxJava綁定,在Android平臺上使用RxJava可能是非常寶貴的,所以在本系列的最後一篇文章中更詳細地看RxAndroid庫。
看看關於安卓開發的其他文章吧。
Envato Tuts+ tutorials are translated into other languages by our community members—you can be involved too!
Translate this post