Advertisement
  1. Code
  2. Android SDK

RxJava 2中响应式编程的操作符

by
Read Time:14 minsLanguages:

Chinese (Simplified) (中文(简体)) translation by Zhang Xiang Liang (you can also view the original English article)

如果你的Android APP想在Google Play上架并获得五星级评论,那么你的APP要能够进行多任务操作。

如今的用户既希望与APP进行交互,又希望APP在后台工作,而且这是最起码的。 这听起来可能很简单,但是默认情况下Android是单线程的,所以如果你要达到用户的期望,那么你迟早不得不创建多线程。

在本系列的前一篇文章中,我们介绍了RxJava,一个用于JVM的响应式库,可以帮助你创建响应数据和事件变化的Android APP。 你也可以使用此库来同时响应数据和事件。

在这篇文章中,我将展示如何使用RxJava的运算符来最终在Android上实现流畅体验。 在本文结尾,你将了解如何使用RxJava操作符创建其他线程,指定线程任务,然后将结果发回到Android的主UI线程,所有这些都只需几行代码。

而且,没有完美的技术,我还会告诉你RxJava库的一个重大缺陷,然后再展示如何使用运算符确保你的Android项目永远不会出现此问题。

操作符介绍

RxJava拥有庞大的运算符集合,主要用于帮助你修改,过滤,合并和转换由你的Observable发出的数据数据。 你将在官方文档中找到RxJava运算符的完整列表,尽管不需要你记住每一个操作符,但是值得花一些时间阅读此列表,这样你就知道你可以执行哪些不同的数据转换。

RxJava的运算符列表已经非常详尽,但是如果你不能在数据转换中找到完美的运算符,那么你可以随时连接多个运算符。 将运算符应用于Observable通常返回另一个Observable,因此你可以继续应用运算符,直到获得所需的结果。

有太多的RxJava操作符以至于一篇文章写不完,而官方的RxJava文档已经很好地介绍了用于数据转换的所有操作符,所以我将专注于两个让你的Android开发更容易的操作符,他们是:subscribeOn()和observeOn()。 

使用RxJava操作符进行多线程

如果想你APP提供最佳的用户体验,那么需要APP能够执行密集型或长时间运行的任务,还能同时执行多个任务而不会阻塞主UI线程。

例如,假设你的应用需要从两个不同的数据库中获取一些信息。 如果你在Android主线程上一个接一个地执行这两个任务,那么这不仅需要花费大量的时间,而且在你的APP从两个数据库中检索出信息之前,UI将无响应。 多么糟糕的用户体验!

一个更好的解决方案是创建两个线程,你可以同时执行这两个任务而不会阻塞主UI线程。 这种方法意味着工作会快两倍完成,并且用户还能继续与APP的UI进行交互。 而且你的用户可能甚至不知道你的APP在后台执行一些密集且长期运行的工作 - 所有的数据库信息都将简单地显示在APP的UI中,就像魔术一样!

开箱即用,Android确实提供了一些可用于创建线程(包括ServicesIntentServices)的工具,但是这些解决方案难以实现,并且代码复杂。 另外,如果实现多线程时稍不注意,APP就会漏洞百出,抛出各种错误。

要知道Android的主UI线程是唯一可以更新UI的线程。 如果你想把其他线程执行的结果在UI上更新,那么你通常需在主UI线程上创建一个Handler,然后通过Handler把数据从后台线程传到主线程。 这意味着更多,更复杂的代码和更多的错误机会。

RxJava有两个运算符可以帮你避免这种复杂性和潜在的错误。

注意,你可以把这些操作符与Schedulers结合,它们本质上是允许你指定线程。 现在,只要把scheduler看作是线程的代名词。

  • subscribeOn(Scheduler):默认情况下,Observable会在已声明订阅的线程上发射数据,即调用.subscribe方法的地方。 在Android中,这通常是主UI线程。 你可以使用subscribeOn() 运算符来定义不同的Scheduler,在Scheduler里面Observable执行和发射数据。
  • observeOn(Scheduler):你可以使用此运算符重定向Observable发射数据到不同的Scheduler,有效地更改Observable发送通知的线程,并扩展数据消耗的线程。

RxJava有许多schedulers,你可以使用它们创建不同的线程,包括:

  • Schedulers.io():用于与IO相关的任务。 
  • Schedulers.computation():用于计算相关的任务。 默认情况下,计算调度程序中的线程数被设备上可用的CPU数限制。
  • Schedulers.newThread():创建一个新线程。

让我们来看看如何使用subscribeOn()和observeOn()的例子,并在action中看到一些schedulers。

subscribeOn()

在Android中,你通常会使用subscribeOn()和Scheduler来切换线程,该线程执行长时间运行或密集型工作,因此不会阻塞主UI线程。 例如,您可能会决定在io() scheduler上导入大量数据,或者在computation() scheduler上执行一些计算。

在下面的代码,我们正在创造一个新的线程,其中Observable将执行操作并发射值123

上面代码创建了一个线程并开始在该线程上发射数据,你可能怀疑这个observable 真的在一个新的线程上运行吗。 一种验证方法是在Android Studio的Logcat Monitor中打印APP当前正在使用线程的名称。

还有一种方便的方法,在上一篇文章 Get Started With RxJava中, 我们创建了一个APP,它在Observable生命周期中的各个阶段向Logcat Monitor发送消息,所以我们可以重用代码。

打开上文中创建的项目,调整代码以便使用上述Observable作为源Observable。 然后添加subscribeOn()运算符并指定发送到Logcat的消息包含当前线程名称。

你完成的项目应该是这样的:

确保Android Studio的Logcat监视器已打开(通过选择Android Monitor选项卡,其次是Logcat),然后在物理Android设备或AVD上运行APP。 你应该在Logcat Monitor中看到以下输出:

Check the thread where your application is currently running in Android Studios Logcat MonitorCheck the thread where your application is currently running in Android Studios Logcat MonitorCheck the thread where your application is currently running in Android Studios Logcat Monitor

在这里,可以看到.subscribe在主UI线程上被调用,但是observable在一个完全不同的线程上运行。

不管你把它放在observable链的哪里,subscribeOn()操作符作用都相同; 但是,你不能在同一个链中使用多个subscribeOn()操作符。 如果你使用了多个subscribeOn(),那么你的链只会使用最接近源observable的那个。

observeOn()

不像subscribeOn(),你把observeOn()放在你链条中的位置很重要的,因为这个操作符只改变了出现在下游的observables使用的线程。 

例如,如果将以下内容插入到链中,则从该点开始出现在链中的每个observable将使用新线程。

该链将继续在新线程上运行,直到遇到另一个observeOn()运算符,此时它将切换到该运算符指定的线程。 你可以通过在链中插入多个observeOn()运算符来控制线程,该线程是特定observables发送通知的线程。

在开发Android APP时,通常使用observeOn()将后台线程执行的结果发送到Android的主UI线程。 重定向发射到Android主UI线程的最简单的方法是使用AndroidSchedulers.mainThread Scheduler,它是RxAndroid库的一部分而不是RxJava库的一部分。 

RxAndroid库包含针对RxJava 2的Android特定绑定,这使RxAndroid成为Android开发人员的宝贵资源(我们将在本系列的下一篇文章中更详细地介绍)。

要将RxAndroid添加到项目中,打开module级的build.gradle文件,将最新版本库的依赖添加进来。 在撰写本文时,RxAndroid的最新版本为2.0.1,因此我添加了以下内容:

将这个库添加到你的项目之后,你可以使用一行代码指定observable的结果,该结果应该被发送到你APP的主UI线程:

考虑到 communicating with your app’s main UI thread这篇文章会占用官方Android文档的全页,这是一个巨大的改进,因为在创建多线程Android APP时节省了大量时间。

RxJava的主要缺点

虽然RxJava对Android开发人员很有帮助,但没有技术是完美的,RxJava确实有一个很大的陷阱,有可能导致你APP崩溃。

默认情况下,RxJava运行基于推送的工作流程:数据由上游Observable产生,然后被推送到下游Observer。 基于推送的工作流程的主要问题是生产者(在这种情况下是Observable)发射数据太快,而消费者(Observer)来不及处理。

快速的 Observable 和慢速的Observer 可能会迅速导致积压的太多未消耗的数据,这将消耗系统资源,甚至可能导致OutOfMemoryException。 这个问题被称为背压。

如果怀疑你APP中发生背压,有一些可能的解决方案,包括使用操作符减少正在生产的产品数量。

使用sample()和throttlefirst()创建采样周期

如果Observable正在发射大量的数据,不需要分配Observer来接收每一项数据。

如果可以安全地忽略某些Observable发射的数据,那么可以使用几个运算符来创建采样周期,然后选择在这些时间段内发射特定值:

  • sample()操作符在指定时间间隔内检查Observable的输出,然后取得在采样过程中被发射最频繁的数据。 例如,如果项目中有.sample(5, SECONDS),则Observer将收到每五秒间隔内发出的最后一个值。 
  • throttleFirst()操作符获取采样时段期间发射的第一值。 例如,如果项目中有.throttlefirst(5, SECONDS),则Observer将会收到在每五秒间隔内发出的第一个值。
Sample OperatorSample OperatorSample Operator

buffer()批量发射

如果你不能安全地跳过任何发射的数据,那么你仍然可以通过将发射的数据批量化来减轻Observer的压力。 处理批量数据通常比单独处理多个数据更有效,因此这种方法应该能提高消费率。

你可以使用buffer()操作符创建批量数据。 在这里,我们正在使用buffer()将三秒钟内发射的所有数据批量化:

Buffer operatorBuffer operatorBuffer operator

或者,可以使用buffer() 创建由特定数量的数据组成的批次。 例如,在这里我们告诉buffer()将数据分成四组:

用Flowables替换Observables

减少发射数量的另一种方法就是用Flowable替换Observable

在RxJava 2中,RxJava团队决定将标准Observable分为两种类型:我们在本系列中一直在查看的常规类型和Flowable

Flowables的功能与Observable大致相同,但有一个主要区别:observer请求多少数据, Flowables就发射多少。 如果你有一个Observable比它指定的observer发射更多的数据,那么你可能需要考虑切换到Flowable

在开始Flowable之前,需要添加以下import语句:

然后,与创建Observables一样的方式创建Flowable。 例如,以下每个代码片段都将创建一个能够发出数据Flowable

现在,你可能会想:既然使用Flowables不用担心背压,为什么我不直接使用Flowables而要使用Observable呢? 答案是,Flowable比普通Observable需要更多的开销,所以为了创造一个高性能的APP,你应该坚持使用Observable,除非你怀疑你的APP出现背压。

Singles

Flowable不是Observable里面的唯一变量,因为库还包含Single类。

当只需要发出一个值时,Singles很有用。 在这些情况下,创建一个Observable可以感觉有点过分,而一个Single简单地发出一个值,然后就完成了,调用如下:

  • onSuccess():Single发出唯一的值。 
  • onError():如果Single不能发出数据,那么它将通过这个方法得到Throwable

Single只要调用这些方法之一,就立即终止。

让我们来看一个Single的例子- 为了节省时间,我们重用代码:

在AVD或物理Android设备上运行,将在Android Studio的Logcat Monitor中看到以下输出:

Check the Singles output in Android Studios Logcat MonitorCheck the Singles output in Android Studios Logcat MonitorCheck the Singles output in Android Studios Logcat Monitor

如果你改变主意并想任何时候将Single转换成Observable,那么RxJava有你需要的所有操作符,其中包括:

  • mergeWith():将多个Singles合并成一个单一的Observable
  • concatWith():将多个Singles连在一起,形成Observable发射数据。 
  • toObservable():将一个Single转换为一个Observable,发出最初由Single发出的数据,然后完成。

总结

在这篇文章中,我们探索了创建和管理多线程的RxJava操作符,使用这些操作符不需要考虑传统多线程的复杂性和潜在的错误。 我们还看到了如何使用RxAndroid库与Android主UI线程进行通信,而且只需一行代码,还看到了如何解决背压问题。

在本系列中,我们已经对RxAndroid库进行了多次研究,但是这个库中包含了Android特定的RxJava绑定,在Android平台上使用RxJava可能是非常宝贵的,所以在本系列的最后一篇文章中更详细地看RxAndroid库。

看看关于安卓开发的其他文章吧。

关注我们的公众号
Advertisement
Advertisement
Looking for something to help kick start your next project?
Envato Market has a range of items for sale to help get you started.