1. Code
  2. Mobile Development
  3. Android Development

使用RxJava和RxKotlin进行Kotlin反应式编程

自从成为Android开发的官方支持语言以来,Kotlin在Android开发人员中迅速普及,谷歌报告使用Kotlin创建的应用程序增加了6倍。
Scroll to top

Chinese (Simplified) (中文(简体)) translation by Soleil (you can also view the original English article)

自从成为Android开发的官方支持语言以来,Kotlin在Android开发人员中迅速普及,谷歌报告使用Kotlin创建的应用程序增加了6倍

如果您以前使用过RxJava或RxAndroid并且想要切换到Kotlin,或者想要使用Kotlin开始反应式编程,那么本教程适合您。 我们将介绍如何在Kotlin中创建RxJava 2.0 ObserversObservables和数据流的基本要素,然后再研究如何通过将RxJava与Kotlin扩展函数相结合来修剪项目中的大量样板代码。

将RxJava与Kotlin一起使用可以帮助您在更少的代码中创建高度反应性的应用程序,但是没有编程语言是完美的,所以我还将分享许多开发人员在他们第一次开始在Kotlin上使用RxJava 2.0时遇到的SAM转换问题的解决方法。

为了总结,我们将创建一个应用程序来演示如何使用RxJava来解决在现实生活中的Android项目中遇到的一些问题。

如果这是您第一次尝试RxJava,那么我将继续提供您理解核心RxJava概念所需的所有背景信息。 即使您以前从未尝试过RxJava,在本文结束时您将对如何在项目中使用此库有充分的了解,并且您将使用RxJava,RxKotlin,RxAndroid创建多个工作应用程序 和RxBinding。

无论如何,RxJava是什么?

RxJava是ReactiveX库的开源实现,可帮助您以反应式编程风格创建应用程序。 虽然RxJava旨在处理同步和异步数据流,但它并不局限于“传统”数据类型。 RxJava对“数据”的定义非常广泛,包括缓存,变量,属性甚至用户输入事件(如点击和滑动)。 仅仅因为您的应用程序不处理大量数据或执行复杂的数据转换,并不意味着它无法从RxJava中受益!

有关使用RxJava for Android应用程序的一些小背景,您可以在Envato Tuts +上查看我的其他一些帖子。

那么RxJava如何运作?

RxJava扩展了Observer软件设计模式,该模式基于Observers和Observables的概念。 要创建基本的RxJava数据管道,您需要:

  • 创建一个Observable。
  • 给Observable一些要发出的数据。
  • 创建一个观察者。
  • 将观察者订阅到Observable。

一旦Observable至少有一个Observer,它就会开始发送数据。 每次Observable发出一段数据时,它都会通过调用onNext()方法通知其指定的Observer,然后Observer通常会执行一些操作来响应此数据发送。 一旦Observable完成发送数据,它将通过调用onComplete()通知Observer。 然后Observable将终止,数据流将结束。

如果发生异常,则将调用onError(),并且Observable将立即终止,而不会发出任何更多数据或调用onComplete()

但是RxJava不只是将数据从Observable传递给Observer! RxJava有大量的运算符,可用于过滤,合并和转换此数据。 例如,假设您的应用有一个“立即付款”按钮,可以检测onClick事件,并且您担心不耐烦的用户可能会多次点按该按钮,从而导致您的应用处理多笔付款。

RxJava允许您将这些onClick事件转换为数据流,然后您可以使用RxJava的各种运算符进行操作。 在此特定示例中,您可以使用debounce()运算符来过滤快速连续发生的数据排放,因此即使用户在“立即付款”按钮上消失,您的应用也只会注册一次付款。

使用RxJava有什么好处?

我们已经看到RxJava如何在特定的应用程序中帮助您解决特定问题,但一般来说它有什么能提供Android项目?

RxJava可以通过为您提供编写您想要实现的内容的方法来帮助简化代码,而不是编写应用程序必须完成的指令列表。 例如,如果您想忽略在相同的500毫秒内发生的所有数据发射,那么您可以写:

1
.debounce(500, TimeUnit.MILLISECONDS)

此外,由于RxJava几乎将所有内容都视为数据,因此它提供了一个模板,您可以将其应用于各种事件:创建一个Observable,创建一个Observer,将Observer订阅到Observable,冲洗并重复。 这种公式化方法产生了更直接,人类可读的代码。

Android开发人员的另一个主要好处是RxJava可以消除Android上多线程的痛苦。 今天的移动用户希望他们的应用程序能够执行多任务,即使它像在后台下载数据一样简单,同时保持对用户输入的响应。

Android有几个用于创建和管理多个线程的内置解决方案,但这些解决方案都不是特别容易实现,并且它们很快就会导致复杂,冗长的代码难以阅读并且容易出错。

在RxJava中,您可以使用运算符和调度程序的组合来创建和管理其他线程。 您可以使用subscribeOn运算符和调度程序轻松更改执行工作的线程。 例如,这里我们要安排在新线程上执行的工作:

1
.subscribeOn(Schedulers.newThread())

您可以使用observeOn运算符指定应该发布此工作结果的位置。 在这里,我们使用AndroidSchedulers.mainThread调度程序将结果发布到Android的所有重要主UI线程,该调度程序作为RxAndroid库的一部分提供:

1
.observeOn(AndroidSchedulers.mainThread())

与Android的内置多线程解决方案相比,RxJava的方法简洁,更易于理解。

同样,您可以在我的RxJava 2入门Android文章中了解有关RxJava如何工作的更多信息,以及将此库添加到项目中的好处。

我应该使用RxJava还是RxKotlin?

由于Kotlin与Java 100%可互操作,因此您可以毫无困难地在Kotlin项目中使用大多数Java库 - 而RxJava库也不例外。

一个专用的RxKotlin库,它是常规RxJava库周围的Kotlin包装器。 该包装器提供了针对Kotlin环境优化RxJava的扩展,并且可以进一步减少需要编写的样板代码量。

由于您可以在Kotlin中使用RxJava而不需要RxKotlin,因此除非另有说明,否则我们将在本文中使用RxJava。

在Kotlin中创建简单的观察者和观察者

观察者和Observable是RxJava的构建块,所以让我们从创建:

  • 一个简单的Observable,它响应按钮单击事件发出短数据流。
  • 一个Observable,通过向Android Studio的Logcat打印不同的消息来对此数据做出反应。

使用您选择的设置创建一个新项目,但请确保在出现提示时选中Include Kotlin support复选框。 接下来,打开项目的build.gradle文件,并将RxJava库添加为项目依赖项:

1
dependencies {
2
  implementation fileTree(dir: 'libs', include: ['*.jar'])
3
  implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlin_version"
4
  implementation 'androidx.appcompat:appcompat:1.0.0-alpha1'
5
  implementation 'androidx.constraintlayout:constraintlayout:1.1.0'
6
  implementation 'io.reactivex.rxjava2:rxjava:2.1.9'
7
8
}

然后,打开项目的activity_main.xml文件并添加将启动数据流的按钮:

1
<?xml version="1.0" encoding="utf-8"?>
2
<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
3
  xmlns:tools="http://schemas.android.com/tools"
4
  android:layout_width="match_parent"
5
  android:layout_height="match_parent"
6
  android:orientation="vertical"
7
  tools:context=".MainActivity" >
8
9
  <Button
10
      android:id="@+id/button"
11
      android:layout_width="wrap_content"
12
      android:layout_height="wrap_content"
13
      android:text="Start RxJava stream" />
14
15
</LinearLayout>

创建Observable有几种不同的方法,但最简单的方法之一是使用just()运算符将对象或对象列表转换为Observable。

在下面的代码中,我们创建了一个Observable(myObservable)并为其提供了要发出的项目1,2,3,4和5。 我们还创建了一个Observer(myObserver),将它订阅到myObservable,然后告诉它每次收到新的发射时都会向Logcat打印一条消息。

1
import androidx.appcompat.app.AppCompatActivity
2
import android.os.Bundle
3
import android.util.Log
4
import io.reactivex.Observable
5
import io.reactivex.Observer
6
import io.reactivex.disposables.Disposable
7
import kotlinx.android.synthetic.main.activity_main.*
8
9
class MainActivity : AppCompatActivity() {
10
11
  private var TAG = "MainActivity"
12
13
  override fun onCreate(savedInstanceState: Bundle?) {
14
      super.onCreate(savedInstanceState)
15
      setContentView(R.layout.activity_main)
16
17
//Start the stream when the button is clicked//

18
19
      button.setOnClickListener { startRStream() }
20
21
  }
22
23
  private fun startRStream() {
24
25
//Create an Observable//

26
27
      val myObservable = getObservable()
28
29
//Create an Observer//

30
31
      val myObserver = getObserver()
32
33
//Subscribe myObserver to myObservable//

34
35
      myObservable
36
              .subscribe(myObserver)
37
  }
38
39
  private fun getObserver(): Observer<String> {
40
      return object : Observer<String> {
41
          override fun onSubscribe(d: Disposable) {
42
          }
43
44
//Every time onNext is called, print the value to Android Studio’s Logcat//

45
46
          override fun onNext(s: String) {
47
              Log.d(TAG, "onNext: $s")
48
          }
49
50
//Called if an exception is thrown//

51
52
          override fun onError(e: Throwable) {
53
              Log.e(TAG, "onError: " + e.message)
54
          }
55
56
//When onComplete is called, print the following to Logcat//

57
58
          override fun onComplete() {
59
              Log.d(TAG, "onComplete")
60
          }
61
      }
62
  }
63
64
//Give myObservable some data to emit//

65
66
  private fun getObservable(): Observable<String> {
67
      return Observable.just("1", "2", "3", "4", "5")
68
  }
69
}

您现在可以将此应用程序用于测试:

  • 在物理Android智能手机或平板电脑或Android虚拟设备(AVD)上安装项目。
  • 单击“启动RxJava流”按钮。
  • 通过选择Android Monitor选项卡(光标位于以下屏幕截图中),然后选择Logcat选项卡,打开Android Studio的Logcat Monitor。

此时,Observable将开始发送其数据,Observer将其消息打印到Logcat。 您的Logcat输出应如下所示:

Check Android Studios Logcat MonitorCheck Android Studios Logcat MonitorCheck Android Studios Logcat Monitor

如果您想亲自尝试,可以从GitHub下载此项目

RxJava的Kotlin扩展

现在我们已经了解了如何在Kotlin中设置一个简单的RxJava管道,让我们看看如何使用RxKotlin的扩展函数以更少的代码实现这一点。

要使用RxKotlin库,您需要将其添加为项目依赖项:

1
dependencies {
2
  implementation fileTree(dir: 'libs', include: ['*.jar'])
3
  implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlin_version"
4
  implementation 'androidx.appcompat:appcompat:1.0.0-alpha1'
5
  implementation 'androidx.constraintlayout:constraintlayout:1.1.0'
6
  implementation 'io.reactivex.rxjava2:rxjava:2.1.9'
7
8
//Add the following//
9
10
  implementation 'io.reactivex.rxjava2:rxkotlin:2.2.0'
11
12
}

在下面的示例中,我们使用RxKotlin的toObservable()扩展函数将List转换为Observable。 我们还使用了subscribeBy()扩展函数,因为它允许我们使用命名参数构造一个Observer,从而产生更清晰的代码。

1
import android.os.Bundle
2
import androidx.appcompat.app.AppCompatActivity
3
import io.reactivex.rxkotlin.subscribeBy
4
import io.reactivex.rxkotlin.toObservable
5
import kotlinx.android.synthetic.main.activity_main.*
6
7
class MainActivity : AppCompatActivity() {
8
9
  override fun onCreate(savedInstanceState: Bundle?) {
10
      super.onCreate(savedInstanceState)
11
      setContentView(R.layout.activity_main)
12
13
//Start the stream when the button is clicked//

14
15
      button.setOnClickListener { startRStream() }
16
17
  }
18
19
  private fun startRStream() {
20
21
      val list = listOf("1", "2", "3", "4", "5")
22
23
//Apply the toObservable() extension function//

24
25
      list.toObservable()
26
27
//Construct your Observer using the subscribeBy() extension function//

28
29
              .subscribeBy(
30
31
                      onNext = { println(it) },
32
                      onError = { it.printStackTrace() },
33
                      onComplete = { println("onComplete!") }
34
35
              )
36
  }
37
}

以下是您应该看到的输出:

Every time onNext is called the data emission is printed to Android Studios LogcatEvery time onNext is called the data emission is printed to Android Studios LogcatEvery time onNext is called the data emission is printed to Android Studios Logcat

解决RxJava的SAM模糊问题

当给定Java方法上存在多个SAM参数重载时,RxKotlin还为SAM转换问题提供了一个重要的解决方法。 这种SAM歧义混淆了Kotlin编译器,因为它无法确定它应该转换哪个接口,因此您的项目将无法编译。

当使用RxJava 2.0与Kotlin时,这种SAM模糊性是一个特殊问题,因为许多RxJava运算符采用多种SAM兼容类型。

让我们来看看运行中的SAM转换问题。 在下面的代码中,我们使用zip()运算符来组合两个Observable的输出:

1
import androidx.appcompat.app.AppCompatActivity
2
import android.os.Bundle
3
import io.reactivex.Observable
4
import kotlinx.android.synthetic.main.activity_main.*
5
6
class MainActivity : AppCompatActivity() {
7
8
  override fun onCreate(savedInstanceState: Bundle?) {
9
      super.onCreate(savedInstanceState)
10
      setContentView(R.layout.activity_main)
11
12
//Start the stream when the button is clicked//

13
14
      button.setOnClickListener { startRStream() }
15
      
16
  }
17
18
  private fun startRStream() {
19
20
      val numbers = Observable.range(1, 6)
21
22
      val strings = Observable.just("One", "Two", "Three",
23
24
              "Four", "Five", "Six" )
25
26
      val zipped = Observable.zip(strings, numbers) { s, n -> "$s $n" }
27
      zipped.subscribe(::println)
28
  }
29
}

这将导致Kotlin编译器抛出类型推断错误。 但是,RxKotlin为受影响的运算符提供了辅助方法和扩展函数,包括Observables.zip(),我们在以下代码中使用它:

1
import android.os.Bundle
2
import androidx.appcompat.app.AppCompatActivity
3
import io.reactivex.Observable
4
import io.reactivex.rxkotlin.Observables
5
import kotlinx.android.synthetic.main.activity_main.*
6
7
class MainActivity : AppCompatActivity() {
8
9
  override fun onCreate(savedInstanceState: Bundle?) {
10
      super.onCreate(savedInstanceState)
11
      setContentView(R.layout.activity_main)
12
13
//Start the stream when the button is clicked//

14
15
      button.setOnClickListener { startRStream() }
16
17
  }
18
19
  private fun startRStream() {
20
21
      val numbers = Observable.range(1, 6)
22
23
      val strings = Observable.just("One", "Two", "Three",
24
25
              "Four", "Five", "Six" )
26
27
      val zipped = Observables.zip(strings, numbers) { s, n -> "$s $n" }
28
      zipped.subscribe(::println)
29
  }
30
31
32
}

这是这段代码的输出:

Switch to the Observableszip operator and the Kotlin compiler will no longer throw a type inference errorSwitch to the Observableszip operator and the Kotlin compiler will no longer throw a type inference errorSwitch to the Observableszip operator and the Kotlin compiler will no longer throw a type inference error

结论

在本教程中,我向您展示了如何在Kotlin项目中开始使用RxJava库,包括使用许多其他支持库,例如RxKotlin和RxBinding。 我们研究了如何使用扩展函数在Kotlin中创建简单的Observers和Observable,直到为Kotlin平台优化RxJava。

到目前为止,我们已经使用RxJava创建了发出数据的简单Observable,以及将这些数据打印到Android Studio的Logcat的观察者 - 但这不是你在现实世界中使用RxJava的方式!

在下一篇文章中,我们将了解RxJava如何帮助解决开发Android应用程序时遇到的实际问题。 我们将使用RxJava和Kotlin创建一个经典的注册屏幕。