Students Save 30%! Learn & create with unlimited courses & creative assets Students Save 30%! Save Now
Advertisement
  1. Code
  2. Android SDK
Code

RxJava 2中響應式編程的操作符

by
Difficulty:IntermediateLength:LongLanguages:

Chinese (Traditional) (中文(繁體)) translation by zheban (you can also view the original English article)

如果你的Android APP想在Google Play上架並獲得五星級評論,那麼你的APP要能夠進行多工操作。

如今的用戶既希望與APP進行交互,又希望APP在後臺工作,而且這是最起碼的。 這聽起來可能很簡單,但是預設情況下Android是單執行緒的,所以如果你要達到用戶的期望,那麼你遲早不得不創建多執行緒。

本系列的前一篇文章中,我們介紹了RxJava,一個用於JVM的響應式庫,可以説明你創建回應資料和事件變化的Android APP。 你也可以使用此庫來同時回應資料和事件。

在這篇文章中,我將展示如何使用RxJava的運算子來最終在Android上實現流暢體驗。 在本文結尾,你將瞭解如何使用RxJava操作符創建其他執行緒,指定執行緒任務,然後將結果發回到Android的主UI執行緒,所有這些都只需幾行代碼。

而且,沒有完美的技術,我還會告訴你RxJava庫的一個重大缺陷,然後再展示如何使用運算子確保你的Android專案永遠不會出現此問題。

操作符介紹

RxJava擁有龐大的運算子集合,主要用於説明你修改,過濾,合併和轉換由你的Observable發出的資料資料。 你將在官方文檔中找到RxJava運算子的完整清單,儘管不需要你記住每一個操作符,但是值得花一些時間閱讀此列表,這樣你就知道你可以執行哪些不同的資料轉換。

RxJava的運算子列表已經非常詳盡,但是如果你不能在資料轉換中找到完美的運算子,那麼你可以隨時連接多個運算子。 將運算子應用於Observable通常返回另一個Observable,因此你可以繼續應用運算子,直到獲得所需的結果。

有太多的RxJava操作符以至於一篇文章寫不完,而官方的RxJava文檔已經很好地介紹了用於資料轉換的所有操作符,所以我將專注於兩個讓你的Android開發更容易的操作符,他們是:subscribeOn()observeOn()。 

使用RxJava操作符進行多執行緒

如果想你APP提供最佳的使用者體驗,那麼需要APP能夠執行密集型或長時間運行的任務,還能同時執行多個任務而不會阻塞主UI執行緒。

例如,假設你的應用需要從兩個不同的資料庫中獲取一些資訊 如果你在Android主執行緒上一個接一個地執行這兩個任務,那麼這不僅需要花費大量的時間,而且在你的APP從兩個資料庫中檢索出資訊之前,UI將無回應。 多麼糟糕的用戶體驗!

一個更好的解決方案是創建兩個執行緒,你可以同時執行這兩個任務而不會阻塞主UI執行緒。 這種方法意味著工作會快兩倍完成,並且用戶還能繼續與APP的UI進行交互。 而且你的用戶可能甚至不知道你的APP在後臺執行一些密集且長期運行的工作 -所有的資料庫資訊都將簡單地顯示在APP的UI中,就像魔術一樣!

開箱即用,Android確實提供了一些可用於創建執行緒(包括ServicesIntentServices)的工具,但是這些解決方案難以實現,並且代碼複雜。 另外,如果實現多執行緒時稍不注意,APP就會漏洞百出,拋出各種錯誤。

為了讓Android上的多線程更令人頭痛,Android的主UI線程是唯壹可以更新應用程序用護界面的線程。 如果你想把其他執行緒執行的結果在UI上更新,那麼你通常需在主UI執行緒上創建一個Handler,然後通過Handler把資料從後臺執行緒傳到主執行緒。 這意味著更多,更複雜的代碼和更多的錯誤機會。

RxJava有兩個運算子可以幫你避免這種複雜性和潛在的錯誤。

注意,你可以把這些操作符與Schedulers結合,它們本質上是允許你指定執行緒。 現在,只要把scheduler看作是執行緒的代名詞。

  • subscribeOn(Scheduler):預設情況下,Observable會在已聲明訂閱的執行緒上發射資料,即調用.subscribe方法的地方。 在Android中,這通常是主UI執行緒。 你可以使用subscribeOn() 運算子來定義不同的Scheduler,在Scheduler裡面Observable執行和發射資料。
  • observeOn(Scheduler):你可以使用此運算子重定向Observable發射資料到不同的Scheduler,有效地更改Observable發送通知的執行緒,並擴展資料消耗的執行緒。

RxJava有許多schedulers,你可以使用它們創建不同的執行緒,包括:

  • Schedulers.io():用於與IO相關的任務。 
  • Schedulers.computation():用於計算相關的任務。 預設情況下,計算調度程式中的執行緒數被設備上可用的CPU數限制。
  • Schedulers.newThread():創建一個新執行緒。

讓我們來看看如何使用subscribeOn()observeOn()的例子,並在action中看到一些schedulers。

subscribeOn()

在Android中,你通常會使用subscribeOn()Scheduler來切換執行緒,該執行緒執行長時間運行或密集型工作,因此不會阻塞主UI執行緒。 例如,您可能會決定在io() scheduler上導入大量資料,或者在computation() scheduler上執行一些計算。

在下面的代碼,我們正在創造一個新的執行緒,其中Observable將執行操作並發射值123

上面代碼創建了一個執行緒並開始在該執行緒上發射資料,你可能懷疑這個observable 真的在一個新的執行緒上運行嗎。 一種驗證方法是在Android Studio的Logcat Monitor中列印APP當前正在使用執行緒的名稱。

還有一種方便的方法,在上一篇文章 Get Started With RxJava中, 我們創建了一個APP,它在Observable生命週期中的各個階段向Logcat Monitor發送消息,所以我們可以重用代碼。

打開上文中創建的專案,調整代碼以便使用上述Observable作為源Observable。 然後添加subscribeOn()運算子並指定發送到Logcat的消息包含當前執行緒名稱。

你完成的項目應該是這樣的:

確保Android Studio的Logcat監視器已打開(通過選擇Android Monitor選項卡,其次是Logcat),然後在物理Android設備或AVD上運行APP。 你應該在Logcat Monitor中看到以下輸出:

Check the thread where your application is currently running in Android Studios Logcat Monitor

在這裡,可以看到.subscribe在主UI執行緒上被調用,但是observable在一個完全不同的執行緒上運行。

不管你把它放在observable鏈的哪裡,subscribeOn()操作符作用都相同; 但是,你不能在同一個鏈中使用多個subscribeOn()操作符。 如果你使用了多個subscribeOn(),那麼你的鏈會使用最接近源observable的那個subscribeOn()

observeOn()

不像subscribeOn(),你把observeOn()放在你鏈條中的位置重要的,因為這個操作符只改變了出現在下游的observables使用的執行緒。

例如,如果將以下內容插入到鏈中,則從該點開始出現在鏈中的每個observable將使用新執行緒。

該鏈將繼續在新執行緒上運行,直到遇到另一個observeOn()運算子,此時它將切換到該運算子指定的執行緒。 你可以通過在鏈中插入多個observeOn()運算子來控制執行緒,該執行緒是特定observables發送通知的執行緒。

在開發Android APP時,通常使用observeOn()將後臺執行緒執行的結果發送到Android的主UI執行緒。 重定向發射到Android主UI執行緒的最簡單的方法是使用AndroidSchedulers.mainThread Scheduler,它是RxAndroid庫的一部分而不是RxJava庫的一部分。 

RxAndroid庫包含針對RxJava 2的Android特定綁定,這使RxAndroid成為Android開發人員的寶貴資源(我們將在本系列的下一篇文章中更詳細地介紹)。

要將RxAndroid添加到項目中,打開module級的build.gradle文件,將最新版本庫的依賴添加進來。 在撰寫本文時,RxAndroid的最新版本為2.0.1,因此我添加了以下內容:

將這個庫添加到你的項目之後,你可以使用一行代碼指定observable的結果,該結果應該被發送到你APP的主UI執行緒:

考慮到 communicating with your app’s main UI thread這篇文章會佔用官方Android文檔的全頁,這是一個巨大的改進,因為在創建多執行緒Android APP時節省了大量時間。

RxJava的主要缺點

雖然RxJava對Android開發人員很有幫助,但沒有技術是完美的,RxJava確實有一個很大的陷阱,有可能導致你APP崩潰。

預設情況下,RxJava運行基於推送的工作流程:資料由上游Observable產生,然後被推送到下游Observer。 基於推送的工作流程的主要問題是生產者(在這種情況下是Observable)發射資料太快,而消費者(Observer)來不及處理。

快速的 Observable 和慢速的Observer 可能會迅速導致積壓的太多未消耗的資料,這將消耗系統資源,甚至可能導致OutOfMemoryException。 這個問題被稱為背壓

如果懷疑你APP中發生背壓,有一些可能的解決方案,包括使用操作符減少正在生產的產品數量。

使用sample()throttlefirst()創建採樣週期

如果Observable正在發射大量的資料,不需要分配Observer來接收一項資料。

如果可以安全地忽略某些Observable發射的資料,那麼可以使用幾個運算子來創建採樣週期,然後選擇在這些時間段內發射特定值:

  • sample()操作符在指定時間間隔內檢查Observable的輸出,然後取得在採樣過程中被發射最頻繁的資料。 例如,如果項目中有.sample(5, SECONDS),則Observer將收到每五秒間隔內發出的最後一個值。 
  • throttleFirst()操作符獲取採樣時段期間發射的第一值。 例如,如果項目中有.throttlefirst(5, SECONDS),則Observer將會收到在每五秒間隔內發出的第一個值。
Sample Operator

buffer()批量發射

如果你不能安全地跳過任何發射的資料,那麼你仍然可以通過將發射的資料批量化來減輕Observer的壓力 處理批量資料通常比單獨處理多個資料更有效,因此這種方法應該能提高消費率。

你可以使用buffer()操作符創建批量資料。 在這裡,我們正在使用buffer()將三秒鐘內發射的所有資料批量化:

Buffer operator

或者,可以使用buffer() 創建由特定數量的資料組成的批次。 例如,在這裡我們告訴buffer()將資料分成四組:

用Flowables替換Observables

減少發射數量的另一種方法就是用Flowable替換Observable

在RxJava 2中,RxJava團隊決定將標準Observable分為兩種類型:我們在本系列中一直在查看的常規類型和Flowable

Flowables的功能與Observable大致相同,但有一個主要區別:observer請求多少資料, Flowables就發射多少。 如果你有一個Observable比它指定的observer發射更多的資料,那麼你可能需要考慮切換到Flowable

在開始Flowable之前,需要添加以下import語句:

然後,與創建Observables一樣的方式創建Flowable。 例如,以下每個代碼片段都將創建一個能夠發出資料Flowable

現在,你可能會想:既然使用Flowables不用擔心背壓,為什麼我不直接使用Flowables而要使用Observable呢? 答案是,Flowable比普通Observable需要更多的開銷,所以為了創造一個高性能的APP,你應該堅持使用Observable,除非你懷疑你的APP出現背壓。

Singles

Flowable不是Observable裡面的唯一變數,因為庫還包含Single類。

當只需要發出一個值時,Singles很有用。 在這些情況下,創建一個Observable可以感覺有點過分,而一個Single簡單地發出一個值,然後就完成了,調用如下:

  • onSuccess()Single發出唯一的值。
  • onError():如果Single不能發出資料,那麼它將通過這個方法得到Throwable

Single只要調用這些方法之一,就立即終止。

讓我們來看一個Single的例子- 為了節省時間,我們重用代碼:

在AVD或物理Android設備上運行,將在Android Studio的Logcat Monitor中看到以下輸出:

Check the Singles output in Android Studios Logcat Monitor

如果你改變主意並想任何時候將Single轉換成Observable,那麼RxJava有你需要的所有操作符,其中包括:

  • mergeWith():將多個Singles合併成一個單一的Observable
  • concatWith():將多個Singles連在一起,形成Observable發射資料
  • toObservable():將一個Single轉換為一個Observable,發出最初由Single發出的資料,然後完成。

總結

在這篇文章中,我們探索了創建和管理多執行緒的RxJava操作符,使用這些操作符不需要考慮傳統多執行緒的複雜性和潛在的錯誤。 我們還看到了如何使用RxAndroid庫與Android主UI執行緒進行通信,而且只需一行代碼,還看到了如何解決背壓問題。

在本系列中,我們已經對RxAndroid庫進行了多次研究,但是這個庫中包含了Android特定的RxJava綁定,在Android平臺上使用RxJava可能是非常寶貴的,所以在本系列的最後一篇文章中更詳細地看RxAndroid庫。

看看關於安卓開發的其他文章吧。

Envato qr branded
关注我们的公众号
Advertisement
Advertisement
Looking for something to help kick start your next project?
Envato Market has a range of items for sale to help get you started.