1. Code
  2. Mobile Development
  3. Android Development

使用RxJava和RxKotlin进行Kotlin反应式编程

自从成为Android开发的官方支持语言以来,Kotlin在Android开发人员中迅速普及,谷歌报告使用Kotlin创建的应用程序增加了6倍。
Scroll to top

Chinese (Simplified) (中文(简体)) translation by Soleil (you can also view the original English article)

自从成为Android开发的官方支持语言以来,Kotlin在Android开发人员中迅速普及,谷歌报告使用Kotlin创建的应用程序增加了6倍

如果您以前使用过RxJava或RxAndroid并且想要切换到Kotlin,或者想要使用Kotlin开始反应式编程,那么本教程适合您。 我们将介绍如何在Kotlin中创建RxJava 2.0 ObserversObservables和数据流的基本要素,然后再研究如何通过将RxJava与Kotlin扩展函数相结合来修剪项目中的大量样板代码。

将RxJava与Kotlin一起使用可以帮助您在更少的代码中创建高度反应性的应用程序,但是没有编程语言是完美的,所以我还将分享许多开发人员在他们第一次开始在Kotlin上使用RxJava 2.0时遇到的SAM转换问题的解决方法。

为了总结,我们将创建一个应用程序来演示如何使用RxJava来解决在现实生活中的Android项目中遇到的一些问题。

如果这是您第一次尝试RxJava,那么我将继续提供您理解核心RxJava概念所需的所有背景信息。 即使您以前从未尝试过RxJava,在本文结束时您将对如何在项目中使用此库有充分的了解,并且您将使用RxJava,RxKotlin,RxAndroid创建多个工作应用程序 和RxBinding。

无论如何,RxJava是什么?

RxJava是ReactiveX库的开源实现,可帮助您以反应式编程风格创建应用程序。 虽然RxJava旨在处理同步和异步数据流,但它并不局限于“传统”数据类型。 RxJava对“数据”的定义非常广泛,包括缓存,变量,属性甚至用户输入事件(如点击和滑动)。 仅仅因为您的应用程序不处理大量数据或执行复杂的数据转换,并不意味着它无法从RxJava中受益!

有关使用RxJava for Android应用程序的一些小背景,您可以在Envato Tuts +上查看我的其他一些帖子。

那么RxJava如何运作?

RxJava扩展了Observer软件设计模式,该模式基于Observers和Observables的概念。 要创建基本的RxJava数据管道,您需要:

  • 创建一个Observable。
  • 给Observable一些要发出的数据。
  • 创建一个观察者。
  • 将观察者订阅到Observable。

一旦Observable至少有一个Observer,它就会开始发送数据。 每次Observable发出一段数据时,它都会通过调用onNext()方法通知其指定的Observer,然后Observer通常会执行一些操作来响应此数据发送。 一旦Observable完成发送数据,它将通过调用onComplete()通知Observer。 然后Observable将终止,数据流将结束。

如果发生异常,则将调用onError(),并且Observable将立即终止,而不会发出任何更多数据或调用onComplete()

但是RxJava不只是将数据从Observable传递给Observer! RxJava有大量的运算符,可用于过滤,合并和转换此数据。 例如,假设您的应用有一个“立即付款”按钮,可以检测onClick事件,并且您担心不耐烦的用户可能会多次点按该按钮,从而导致您的应用处理多笔付款。

RxJava允许您将这些onClick事件转换为数据流,然后您可以使用RxJava的各种运算符进行操作。 在此特定示例中,您可以使用debounce()运算符来过滤快速连续发生的数据排放,因此即使用户在“立即付款”按钮上消失,您的应用也只会注册一次付款。

使用RxJava有什么好处?

我们已经看到RxJava如何在特定的应用程序中帮助您解决特定问题,但一般来说它有什么能提供Android项目?

RxJava可以通过为您提供编写您想要实现的内容的方法来帮助简化代码,而不是编写应用程序必须完成的指令列表。 例如,如果您想忽略在相同的500毫秒内发生的所有数据发射,那么您可以写:

1
.debounce(500, TimeUnit.MILLISECONDS)

此外,由于RxJava几乎将所有内容都视为数据,因此它提供了一个模板,您可以将其应用于各种事件:创建一个Observable,创建一个Observer,将Observer订阅到Observable,冲洗并重复。 这种公式化方法产生了更直接,人类可读的代码。

Android开发人员的另一个主要好处是RxJava可以消除Android上多线程的痛苦。 今天的移动用户希望他们的应用程序能够执行多任务,即使它像在后台下载数据一样简单,同时保持对用户输入的响应。

Android有几个用于创建和管理多个线程的内置解决方案,但这些解决方案都不是特别容易实现,并且它们很快就会导致复杂,冗长的代码难以阅读并且容易出错。

在RxJava中,您可以使用运算符和调度程序的组合来创建和管理其他线程。 您可以使用subscribeOn运算符和调度程序轻松更改执行工作的线程。 例如,这里我们要安排在新线程上执行的工作:

1
.subscribeOn(Schedulers.newThread())

您可以使用observeOn运算符指定应该发布此工作结果的位置。 在这里,我们使用AndroidSchedulers.mainThread调度程序将结果发布到Android的所有重要主UI线程,该调度程序作为RxAndroid库的一部分提供:

1
.observeOn(AndroidSchedulers.mainThread())

与Android的内置多线程解决方案相比,RxJava的方法简洁,更易于理解。

同样,您可以在我的RxJava 2入门Android文章中了解有关RxJava如何工作的更多信息,以及将此库添加到项目中的好处。

我应该使用RxJava还是RxKotlin?

由于Kotlin与Java 100%可互操作,因此您可以毫无困难地在Kotlin项目中使用大多数Java库 - 而RxJava库也不例外。

一个专用的RxKotlin库,它是常规RxJava库周围的Kotlin包装器。 该包装器提供了针对Kotlin环境优化RxJava的扩展,并且可以进一步减少需要编写的样板代码量。

由于您可以在Kotlin中使用RxJava而不需要RxKotlin,因此除非另有说明,否则我们将在本文中使用RxJava。

在Kotlin中创建简单的观察者和观察者

观察者和Observable是RxJava的构建块,所以让我们从创建:

  • 一个简单的Observable,它响应按钮单击事件发出短数据流。
  • 一个Observable,通过向Android Studio的Logcat打印不同的消息来对此数据做出反应。

使用您选择的设置创建一个新项目,但请确保在出现提示时选中Include Kotlin support复选框。 接下来,打开项目的build.gradle文件,并将RxJava库添加为项目依赖项:

1
dependencies {
2
  implementation fileTree(dir: 'libs', include: ['*.jar'])
3
  implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlin_version"
4
  implementation 'androidx.appcompat:appcompat:1.0.0-alpha1'
5
  implementation 'androidx.constraintlayout:constraintlayout:1.1.0'
6
  implementation 'io.reactivex.rxjava2:rxjava:2.1.9'
7
8
}

然后,打开项目的activity_main.xml文件并添加将启动数据流的按钮:

1
<?xml version="1.0" encoding="utf-8"?>
2
<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
3
  xmlns:tools="http://schemas.android.com/tools"
4
  android:layout_width="match_parent"
5
  android:layout_height="match_parent"
6
  android:orientation="vertical"
7
  tools:context=".MainActivity" >
8
9
  <Button
10
      android:id="@+id/button"
11
      android:layout_width="wrap_content"
12
      android:layout_height="wrap_content"
13
      android:text="Start RxJava stream" />
14
15
</LinearLayout>

创建Observable有几种不同的方法,但最简单的方法之一是使用just()运算符将对象或对象列表转换为Observable。

在下面的代码中,我们创建了一个Observable(myObservable)并为其提供了要发出的项目1,2,3,4和5。 我们还创建了一个Observer(myObserver),将它订阅到myObservable,然后告诉它每次收到新的发射时都会向Logcat打印一条消息。

1
import androidx.appcompat.app.AppCompatActivity
2
import android.os.Bundle
3
import android.util.Log
4
import io.reactivex.Observable
5
import io.reactivex.Observer
6
import io.reactivex.disposables.Disposable
7
import kotlinx.android.synthetic.main.activity_main.*
8
9
class MainActivity : AppCompatActivity() {
10
11
  private var TAG = "MainActivity"
12
13
  override fun onCreate(savedInstanceState: Bundle?) {
14
      super.onCreate(savedInstanceState)
15
      setContentView(R.layout.activity_main)
16
17
//Start the stream when the button is clicked//

18
19
      button.setOnClickListener { startRStream() }
20
21
  }
22
23
  private fun startRStream() {
24
25
//Create an Observable//

26
27
      val myObservable = getObservable()
28
29
//Create an Observer//

30
31
      val myObserver = getObserver()
32
33
//Subscribe myObserver to myObservable//

34
35
      myObservable
36
              .subscribe(myObserver)
37
  }
38
39
  private fun getObserver(): Observer<String> {
40
      return object : Observer<String> {
41
          override fun onSubscribe(d: Disposable) {
42
          }
43
44
//Every time onNext is called, print the value to Android Studio’s Logcat//

45
46
          override fun onNext(s: String) {
47
              Log.d(TAG, "onNext: $s")
48
          }
49
50
//Called if an exception is thrown//

51
52
          override fun onError(e: Throwable) {
53
              Log.e(TAG, "onError: " + e.message)
54
          }
55
56
//When onComplete is called, print the following to Logcat//

57
58
          override fun onComplete() {
59
              Log.d(TAG, "onComplete")
60
          }
61
      }
62
  }
63
64
//Give myObservable some data to emit//

65
66
  private fun getObservable(): Observable<String> {
67
      return Observable.just("1", "2", "3", "4", "5")
68
  }
69
}

您现在可以将此应用程序用于测试:

  • 在物理Android智能手机或平板电脑或Android虚拟设备(AVD)上安装项目。
  • 单击“启动RxJava流”按钮。
  • 通过选择Android Monitor选项卡(光标位于以下屏幕截图中),然后选择Logcat选项卡,打开Android Studio的Logcat Monitor。

此时,Observable将开始发送其数据,Observer将其消息打印到Logcat。 您的Logcat输出应如下所示:

Check Android Studios Logcat Monitor

如果您想亲自尝试,可以从GitHub下载此项目

RxJava的Kotlin扩展

现在我们已经了解了如何在Kotlin中设置一个简单的RxJava管道,让我们看看如何使用RxKotlin的扩展函数以更少的代码实现这一点。

要使用RxKotlin库,您需要将其添加为项目依赖项:

1
dependencies {
2
  implementation fileTree(dir: 'libs', include: ['*.jar'])
3
  implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlin_version"
4
  implementation 'androidx.appcompat:appcompat:1.0.0-alpha1'
5
  implementation 'androidx.constraintlayout:constraintlayout:1.1.0'
6
  implementation 'io.reactivex.rxjava2:rxjava:2.1.9'
7
8
//Add the following//
9
10
  implementation 'io.reactivex.rxjava2:rxkotlin:2.2.0'
11
12
}

在下面的示例中,我们使用RxKotlin的toObservable()扩展函数将List转换为Observable。 我们还使用了subscribeBy()扩展函数,因为它允许我们使用命名参数构造一个Observer,从而产生更清晰的代码。

1
import android.os.Bundle
2
import androidx.appcompat.app.AppCompatActivity
3
import io.reactivex.rxkotlin.subscribeBy
4
import io.reactivex.rxkotlin.toObservable
5
import kotlinx.android.synthetic.main.activity_main.*
6
7
class MainActivity : AppCompatActivity() {
8
9
  override fun onCreate(savedInstanceState: Bundle?) {
10
      super.onCreate(savedInstanceState)
11
      setContentView(R.layout.activity_main)
12
13
//Start the stream when the button is clicked//

14
15
      button.setOnClickListener { startRStream() }
16
17
  }
18
19
  private fun startRStream() {
20
21
      val list = listOf("1", "2", "3", "4", "5")
22
23
//Apply the toObservable() extension function//

24
25
      list.toObservable()
26
27
//Construct your Observer using the subscribeBy() extension function//

28
29
              .subscribeBy(
30
31
                      onNext = { println(it) },
32
                      onError = { it.printStackTrace() },
33
                      onComplete = { println("onComplete!") }
34
35
              )
36
  }
37
}

以下是您应该看到的输出:

Every time onNext is called the data emission is printed to Android Studios Logcat

解决RxJava的SAM模糊问题

当给定Java方法上存在多个SAM参数重载时,RxKotlin还为SAM转换问题提供了一个重要的解决方法。 这种SAM歧义混淆了Kotlin编译器,因为它无法确定它应该转换哪个接口,因此您的项目将无法编译。

当使用RxJava 2.0与Kotlin时,这种SAM模糊性是一个特殊问题,因为许多RxJava运算符采用多种SAM兼容类型。

让我们来看看运行中的SAM转换问题。 在下面的代码中,我们使用zip()运算符来组合两个Observable的输出:

1
import androidx.appcompat.app.AppCompatActivity
2
import android.os.Bundle
3
import io.reactivex.Observable
4
import kotlinx.android.synthetic.main.activity_main.*
5
6
class MainActivity : AppCompatActivity() {
7
8
  override fun onCreate(savedInstanceState: Bundle?) {
9
      super.onCreate(savedInstanceState)
10
      setContentView(R.layout.activity_main)
11
12
//Start the stream when the button is clicked//

13
14
      button.setOnClickListener { startRStream() }
15
      
16
  }
17
18
  private fun startRStream() {
19
20
      val numbers = Observable.range(1, 6)
21
22
      val strings = Observable.just("One", "Two", "Three",
23
24
              "Four", "Five", "Six" )
25
26
      val zipped = Observable.zip(strings, numbers) { s, n -> "$s $n" }
27
      zipped.subscribe(::println)
28
  }
29
}

这将导致Kotlin编译器抛出类型推断错误。 但是,RxKotlin为受影响的运算符提供了辅助方法和扩展函数,包括Observables.zip(),我们在以下代码中使用它:

1
import android.os.Bundle
2
import androidx.appcompat.app.AppCompatActivity
3
import io.reactivex.Observable
4
import io.reactivex.rxkotlin.Observables
5
import kotlinx.android.synthetic.main.activity_main.*
6
7
class MainActivity : AppCompatActivity() {
8
9
  override fun onCreate(savedInstanceState: Bundle?) {
10
      super.onCreate(savedInstanceState)
11
      setContentView(R.layout.activity_main)
12
13
//Start the stream when the button is clicked//

14
15
      button.setOnClickListener { startRStream() }
16
17
  }
18
19
  private fun startRStream() {
20
21
      val numbers = Observable.range(1, 6)
22
23
      val strings = Observable.just("One", "Two", "Three",
24
25
              "Four", "Five", "Six" )
26
27
      val zipped = Observables.zip(strings, numbers) { s, n -> "$s $n" }
28
      zipped.subscribe(::println)
29
  }
30
31
32
}

这是这段代码的输出:

Switch to the Observableszip operator and the Kotlin compiler will no longer throw a type inference error

结论

在本教程中,我向您展示了如何在Kotlin项目中开始使用RxJava库,包括使用许多其他支持库,例如RxKotlin和RxBinding。 我们研究了如何使用扩展函数在Kotlin中创建简单的Observers和Observable,直到为Kotlin平台优化RxJava。

到目前为止,我们已经使用RxJava创建了发出数据的简单Observable,以及将这些数据打印到Android Studio的Logcat的观察者 - 但这不是你在现实世界中使用RxJava的方式!

在下一篇文章中,我们将了解RxJava如何帮助解决开发Android应用程序时遇到的实际问题。 我们将使用RxJava和Kotlin创建一个经典的注册屏幕。