Advertisement
  1. Code
  2. Android SDK

RxJava 2中響應式編程的操作符

by
Read Time:14 minsLanguages:

Chinese (Traditional) (中文(繁體)) translation by zheban (you can also view the original English article)

如果你的Android APP想在Google Play上架並獲得五星級評論,那麼你的APP要能夠進行多工操作。

如今的用戶既希望與APP進行交互,又希望APP在後臺工作,而且這是最起碼的。 這聽起來可能很簡單,但是預設情況下Android是單執行緒的,所以如果你要達到用戶的期望,那麼你遲早不得不創建多執行緒。

本系列的前一篇文章中,我們介紹了RxJava,一個用於JVM的響應式庫,可以説明你創建回應資料和事件變化的Android APP。 你也可以使用此庫來同時回應資料和事件。

在這篇文章中,我將展示如何使用RxJava的運算子來最終在Android上實現流暢體驗。 在本文結尾,你將瞭解如何使用RxJava操作符創建其他執行緒,指定執行緒任務,然後將結果發回到Android的主UI執行緒,所有這些都只需幾行代碼。

而且,沒有完美的技術,我還會告訴你RxJava庫的一個重大缺陷,然後再展示如何使用運算子確保你的Android專案永遠不會出現此問題。

操作符介紹

RxJava擁有龐大的運算子集合,主要用於説明你修改,過濾,合併和轉換由你的Observable發出的資料資料。 你將在官方文檔中找到RxJava運算子的完整清單,儘管不需要你記住每一個操作符,但是值得花一些時間閱讀此列表,這樣你就知道你可以執行哪些不同的資料轉換。

RxJava的運算子列表已經非常詳盡,但是如果你不能在資料轉換中找到完美的運算子,那麼你可以隨時連接多個運算子。 將運算子應用於Observable通常返回另一個Observable,因此你可以繼續應用運算子,直到獲得所需的結果。

有太多的RxJava操作符以至於一篇文章寫不完,而官方的RxJava文檔已經很好地介紹了用於資料轉換的所有操作符,所以我將專注於兩個讓你的Android開發更容易的操作符,他們是:subscribeOn()observeOn()。 

使用RxJava操作符進行多執行緒

如果想你APP提供最佳的使用者體驗,那麼需要APP能夠執行密集型或長時間運行的任務,還能同時執行多個任務而不會阻塞主UI執行緒。

例如,假設你的應用需要從兩個不同的資料庫中獲取一些資訊 如果你在Android主執行緒上一個接一個地執行這兩個任務,那麼這不僅需要花費大量的時間,而且在你的APP從兩個資料庫中檢索出資訊之前,UI將無回應。 多麼糟糕的用戶體驗!

一個更好的解決方案是創建兩個執行緒,你可以同時執行這兩個任務而不會阻塞主UI執行緒。 這種方法意味著工作會快兩倍完成,並且用戶還能繼續與APP的UI進行交互。 而且你的用戶可能甚至不知道你的APP在後臺執行一些密集且長期運行的工作 -所有的資料庫資訊都將簡單地顯示在APP的UI中,就像魔術一樣!

開箱即用,Android確實提供了一些可用於創建執行緒(包括ServicesIntentServices)的工具,但是這些解決方案難以實現,並且代碼複雜。 另外,如果實現多執行緒時稍不注意,APP就會漏洞百出,拋出各種錯誤。

為了讓Android上的多線程更令人頭痛,Android的主UI線程是唯壹可以更新應用程序用護界面的線程。 如果你想把其他執行緒執行的結果在UI上更新,那麼你通常需在主UI執行緒上創建一個Handler,然後通過Handler把資料從後臺執行緒傳到主執行緒。 這意味著更多,更複雜的代碼和更多的錯誤機會。

RxJava有兩個運算子可以幫你避免這種複雜性和潛在的錯誤。

注意,你可以把這些操作符與Schedulers結合,它們本質上是允許你指定執行緒。 現在,只要把scheduler看作是執行緒的代名詞。

  • subscribeOn(Scheduler):預設情況下,Observable會在已聲明訂閱的執行緒上發射資料,即調用.subscribe方法的地方。 在Android中,這通常是主UI執行緒。 你可以使用subscribeOn() 運算子來定義不同的Scheduler,在Scheduler裡面Observable執行和發射資料。
  • observeOn(Scheduler):你可以使用此運算子重定向Observable發射資料到不同的Scheduler,有效地更改Observable發送通知的執行緒,並擴展資料消耗的執行緒。

RxJava有許多schedulers,你可以使用它們創建不同的執行緒,包括:

  • Schedulers.io():用於與IO相關的任務。 
  • Schedulers.computation():用於計算相關的任務。 預設情況下,計算調度程式中的執行緒數被設備上可用的CPU數限制。
  • Schedulers.newThread():創建一個新執行緒。

讓我們來看看如何使用subscribeOn()observeOn()的例子,並在action中看到一些schedulers。

subscribeOn()

在Android中,你通常會使用subscribeOn()Scheduler來切換執行緒,該執行緒執行長時間運行或密集型工作,因此不會阻塞主UI執行緒。 例如,您可能會決定在io() scheduler上導入大量資料,或者在computation() scheduler上執行一些計算。

在下面的代碼,我們正在創造一個新的執行緒,其中Observable將執行操作並發射值123

上面代碼創建了一個執行緒並開始在該執行緒上發射資料,你可能懷疑這個observable 真的在一個新的執行緒上運行嗎。 一種驗證方法是在Android Studio的Logcat Monitor中列印APP當前正在使用執行緒的名稱。

還有一種方便的方法,在上一篇文章 Get Started With RxJava中, 我們創建了一個APP,它在Observable生命週期中的各個階段向Logcat Monitor發送消息,所以我們可以重用代碼。

打開上文中創建的專案,調整代碼以便使用上述Observable作為源Observable。 然後添加subscribeOn()運算子並指定發送到Logcat的消息包含當前執行緒名稱。

你完成的項目應該是這樣的:

確保Android Studio的Logcat監視器已打開(通過選擇Android Monitor選項卡,其次是Logcat),然後在物理Android設備或AVD上運行APP。 你應該在Logcat Monitor中看到以下輸出:

Check the thread where your application is currently running in Android Studios Logcat MonitorCheck the thread where your application is currently running in Android Studios Logcat MonitorCheck the thread where your application is currently running in Android Studios Logcat Monitor

在這裡,可以看到.subscribe在主UI執行緒上被調用,但是observable在一個完全不同的執行緒上運行。

不管你把它放在observable鏈的哪裡,subscribeOn()操作符作用都相同; 但是,你不能在同一個鏈中使用多個subscribeOn()操作符。 如果你使用了多個subscribeOn(),那麼你的鏈會使用最接近源observable的那個subscribeOn()

observeOn()

不像subscribeOn(),你把observeOn()放在你鏈條中的位置重要的,因為這個操作符只改變了出現在下游的observables使用的執行緒。

例如,如果將以下內容插入到鏈中,則從該點開始出現在鏈中的每個observable將使用新執行緒。

該鏈將繼續在新執行緒上運行,直到遇到另一個observeOn()運算子,此時它將切換到該運算子指定的執行緒。 你可以通過在鏈中插入多個observeOn()運算子來控制執行緒,該執行緒是特定observables發送通知的執行緒。

在開發Android APP時,通常使用observeOn()將後臺執行緒執行的結果發送到Android的主UI執行緒。 重定向發射到Android主UI執行緒的最簡單的方法是使用AndroidSchedulers.mainThread Scheduler,它是RxAndroid庫的一部分而不是RxJava庫的一部分。 

RxAndroid庫包含針對RxJava 2的Android特定綁定,這使RxAndroid成為Android開發人員的寶貴資源(我們將在本系列的下一篇文章中更詳細地介紹)。

要將RxAndroid添加到項目中,打開module級的build.gradle文件,將最新版本庫的依賴添加進來。 在撰寫本文時,RxAndroid的最新版本為2.0.1,因此我添加了以下內容:

將這個庫添加到你的項目之後,你可以使用一行代碼指定observable的結果,該結果應該被發送到你APP的主UI執行緒:

考慮到 communicating with your app’s main UI thread這篇文章會佔用官方Android文檔的全頁,這是一個巨大的改進,因為在創建多執行緒Android APP時節省了大量時間。

RxJava的主要缺點

雖然RxJava對Android開發人員很有幫助,但沒有技術是完美的,RxJava確實有一個很大的陷阱,有可能導致你APP崩潰。

預設情況下,RxJava運行基於推送的工作流程:資料由上游Observable產生,然後被推送到下游Observer。 基於推送的工作流程的主要問題是生產者(在這種情況下是Observable)發射資料太快,而消費者(Observer)來不及處理。

快速的 Observable 和慢速的Observer 可能會迅速導致積壓的太多未消耗的資料,這將消耗系統資源,甚至可能導致OutOfMemoryException。 這個問題被稱為背壓

如果懷疑你APP中發生背壓,有一些可能的解決方案,包括使用操作符減少正在生產的產品數量。

使用sample()throttlefirst()創建採樣週期

如果Observable正在發射大量的資料,不需要分配Observer來接收一項資料。

如果可以安全地忽略某些Observable發射的資料,那麼可以使用幾個運算子來創建採樣週期,然後選擇在這些時間段內發射特定值:

  • sample()操作符在指定時間間隔內檢查Observable的輸出,然後取得在採樣過程中被發射最頻繁的資料。 例如,如果項目中有.sample(5, SECONDS),則Observer將收到每五秒間隔內發出的最後一個值。 
  • throttleFirst()操作符獲取採樣時段期間發射的第一值。 例如,如果項目中有.throttlefirst(5, SECONDS),則Observer將會收到在每五秒間隔內發出的第一個值。
Sample OperatorSample OperatorSample Operator

buffer()批量發射

如果你不能安全地跳過任何發射的資料,那麼你仍然可以通過將發射的資料批量化來減輕Observer的壓力 處理批量資料通常比單獨處理多個資料更有效,因此這種方法應該能提高消費率。

你可以使用buffer()操作符創建批量資料。 在這裡,我們正在使用buffer()將三秒鐘內發射的所有資料批量化:

Buffer operatorBuffer operatorBuffer operator

或者,可以使用buffer() 創建由特定數量的資料組成的批次。 例如,在這裡我們告訴buffer()將資料分成四組:

用Flowables替換Observables

減少發射數量的另一種方法就是用Flowable替換Observable

在RxJava 2中,RxJava團隊決定將標準Observable分為兩種類型:我們在本系列中一直在查看的常規類型和Flowable

Flowables的功能與Observable大致相同,但有一個主要區別:observer請求多少資料, Flowables就發射多少。 如果你有一個Observable比它指定的observer發射更多的資料,那麼你可能需要考慮切換到Flowable

在開始Flowable之前,需要添加以下import語句:

然後,與創建Observables一樣的方式創建Flowable。 例如,以下每個代碼片段都將創建一個能夠發出資料Flowable

現在,你可能會想:既然使用Flowables不用擔心背壓,為什麼我不直接使用Flowables而要使用Observable呢? 答案是,Flowable比普通Observable需要更多的開銷,所以為了創造一個高性能的APP,你應該堅持使用Observable,除非你懷疑你的APP出現背壓。

Singles

Flowable不是Observable裡面的唯一變數,因為庫還包含Single類。

當只需要發出一個值時,Singles很有用。 在這些情況下,創建一個Observable可以感覺有點過分,而一個Single簡單地發出一個值,然後就完成了,調用如下:

  • onSuccess()Single發出唯一的值。
  • onError():如果Single不能發出資料,那麼它將通過這個方法得到Throwable

Single只要調用這些方法之一,就立即終止。

讓我們來看一個Single的例子- 為了節省時間,我們重用代碼:

在AVD或物理Android設備上運行,將在Android Studio的Logcat Monitor中看到以下輸出:

Check the Singles output in Android Studios Logcat MonitorCheck the Singles output in Android Studios Logcat MonitorCheck the Singles output in Android Studios Logcat Monitor

如果你改變主意並想任何時候將Single轉換成Observable,那麼RxJava有你需要的所有操作符,其中包括:

  • mergeWith():將多個Singles合併成一個單一的Observable
  • concatWith():將多個Singles連在一起,形成Observable發射資料
  • toObservable():將一個Single轉換為一個Observable,發出最初由Single發出的資料,然後完成。

總結

在這篇文章中,我們探索了創建和管理多執行緒的RxJava操作符,使用這些操作符不需要考慮傳統多執行緒的複雜性和潛在的錯誤。 我們還看到了如何使用RxAndroid庫與Android主UI執行緒進行通信,而且只需一行代碼,還看到了如何解決背壓問題。

在本系列中,我們已經對RxAndroid庫進行了多次研究,但是這個庫中包含了Android特定的RxJava綁定,在Android平臺上使用RxJava可能是非常寶貴的,所以在本系列的最後一篇文章中更詳細地看RxAndroid庫。

看看關於安卓開發的其他文章吧。

关注我们的公众号
Advertisement
Advertisement
Looking for something to help kick start your next project?
Envato Market has a range of items for sale to help get you started.