使用RxJava和RxKotlin进行Kotlin反应式编程
Chinese (Simplified) (中文(简体)) translation by Soleil (you can also view the original English article)
自从成为Android开发的官方支持语言以来,Kotlin在Android开发人员中迅速普及,谷歌报告使用Kotlin创建的应用程序增加了6倍。
如果您以前使用过RxJava或RxAndroid并且想要切换到Kotlin,或者想要使用Kotlin开始反应式编程,那么本教程适合您。 我们将介绍如何在Kotlin中创建RxJava 2.0 Observers,Observables和数据流的基本要素,然后再研究如何通过将RxJava与Kotlin扩展函数相结合来修剪项目中的大量样板代码。
将RxJava与Kotlin一起使用可以帮助您在更少的代码中创建高度反应性的应用程序,但是没有编程语言是完美的,所以我还将分享许多开发人员在他们第一次开始在Kotlin上使用RxJava 2.0时遇到的SAM转换问题的解决方法。
为了总结,我们将创建一个应用程序来演示如何使用RxJava来解决在现实生活中的Android项目中遇到的一些问题。
如果这是您第一次尝试RxJava,那么我将继续提供您理解核心RxJava概念所需的所有背景信息。 即使您以前从未尝试过RxJava,在本文结束时您将对如何在项目中使用此库有充分的了解,并且您将使用RxJava,RxKotlin,RxAndroid创建多个工作应用程序 和RxBinding。
无论如何,RxJava是什么?
RxJava是ReactiveX库的开源实现,可帮助您以反应式编程风格创建应用程序。 虽然RxJava旨在处理同步和异步数据流,但它并不局限于“传统”数据类型。 RxJava对“数据”的定义非常广泛,包括缓存,变量,属性甚至用户输入事件(如点击和滑动)。 仅仅因为您的应用程序不处理大量数据或执行复杂的数据转换,并不意味着它无法从RxJava中受益!
有关使用RxJava for Android应用程序的一些小背景,您可以在Envato Tuts +上查看我的其他一些帖子。


Android SDK开始使用RxJava 2 for AndroidJessica Thornsby

Android SDKRxJava 2 for Android Apps:RxBinding和RxLifecycleJessica Thornsby
那么RxJava如何运作?
RxJava扩展了Observer软件设计模式,该模式基于Observers和Observables的概念。 要创建基本的RxJava数据管道,您需要:
- 创建一个Observable。
- 给Observable一些要发出的数据。
- 创建一个观察者。
- 将观察者订阅到Observable。
一旦Observable至少有一个Observer,它就会开始发送数据。 每次Observable发出一段数据时,它都会通过调用onNext()方法通知其指定的Observer,然后Observer通常会执行一些操作来响应此数据发送。 一旦Observable完成发送数据,它将通过调用onComplete()通知Observer。 然后Observable将终止,数据流将结束。
如果发生异常,则将调用onError(),并且Observable将立即终止,而不会发出任何更多数据或调用onComplete()。
但是RxJava不只是将数据从Observable传递给Observer! RxJava有大量的运算符,可用于过滤,合并和转换此数据。 例如,假设您的应用有一个“立即付款”按钮,可以检测onClick事件,并且您担心不耐烦的用户可能会多次点按该按钮,从而导致您的应用处理多笔付款。
RxJava允许您将这些onClick事件转换为数据流,然后您可以使用RxJava的各种运算符进行操作。 在此特定示例中,您可以使用debounce()运算符来过滤快速连续发生的数据排放,因此即使用户在“立即付款”按钮上消失,您的应用也只会注册一次付款。
使用RxJava有什么好处?
我们已经看到RxJava如何在特定的应用程序中帮助您解决特定问题,但一般来说它有什么能提供Android项目?
RxJava可以通过为您提供编写您想要实现的内容的方法来帮助简化代码,而不是编写应用程序必须完成的指令列表。 例如,如果您想忽略在相同的500毫秒内发生的所有数据发射,那么您可以写:
1 |
.debounce(500, TimeUnit.MILLISECONDS) |
此外,由于RxJava几乎将所有内容都视为数据,因此它提供了一个模板,您可以将其应用于各种事件:创建一个Observable,创建一个Observer,将Observer订阅到Observable,冲洗并重复。 这种公式化方法产生了更直接,人类可读的代码。
Android开发人员的另一个主要好处是RxJava可以消除Android上多线程的痛苦。 今天的移动用户希望他们的应用程序能够执行多任务,即使它像在后台下载数据一样简单,同时保持对用户输入的响应。
Android有几个用于创建和管理多个线程的内置解决方案,但这些解决方案都不是特别容易实现,并且它们很快就会导致复杂,冗长的代码难以阅读并且容易出错。
在RxJava中,您可以使用运算符和调度程序的组合来创建和管理其他线程。 您可以使用subscribeOn运算符和调度程序轻松更改执行工作的线程。 例如,这里我们要安排在新线程上执行的工作:
1 |
.subscribeOn(Schedulers.newThread()) |
您可以使用observeOn运算符指定应该发布此工作结果的位置。 在这里,我们使用AndroidSchedulers.mainThread调度程序将结果发布到Android的所有重要主UI线程,该调度程序作为RxAndroid库的一部分提供:
1 |
.observeOn(AndroidSchedulers.mainThread()) |
与Android的内置多线程解决方案相比,RxJava的方法更简洁,更易于理解。
同样,您可以在我的RxJava 2入门Android文章中了解有关RxJava如何工作的更多信息,以及将此库添加到项目中的好处。
我应该使用RxJava还是RxKotlin?
由于Kotlin与Java 100%可互操作,因此您可以毫无困难地在Kotlin项目中使用大多数Java库 - 而RxJava库也不例外。
有一个专用的RxKotlin库,它是常规RxJava库周围的Kotlin包装器。 该包装器提供了针对Kotlin环境优化RxJava的扩展,并且可以进一步减少需要编写的样板代码量。
由于您可以在Kotlin中使用RxJava而不需要RxKotlin,因此除非另有说明,否则我们将在本文中使用RxJava。
在Kotlin中创建简单的观察者和观察者
观察者和Observable是RxJava的构建块,所以让我们从创建:
- 一个简单的Observable,它响应按钮单击事件发出短数据流。
- 一个Observable,通过向Android Studio的Logcat打印不同的消息来对此数据做出反应。
使用您选择的设置创建一个新项目,但请确保在出现提示时选中Include Kotlin support复选框。 接下来,打开项目的build.gradle文件,并将RxJava库添加为项目依赖项:
1 |
dependencies {
|
2 |
implementation fileTree(dir: 'libs', include: ['*.jar']) |
3 |
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlin_version" |
4 |
implementation 'androidx.appcompat:appcompat:1.0.0-alpha1' |
5 |
implementation 'androidx.constraintlayout:constraintlayout:1.1.0' |
6 |
implementation 'io.reactivex.rxjava2:rxjava:2.1.9' |
7 |
|
8 |
} |
然后,打开项目的activity_main.xml文件并添加将启动数据流的按钮:
1 |
<?xml version="1.0" encoding="utf-8"?>
|
2 |
<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android" |
3 |
xmlns:tools="http://schemas.android.com/tools" |
4 |
android:layout_width="match_parent" |
5 |
android:layout_height="match_parent" |
6 |
android:orientation="vertical" |
7 |
tools:context=".MainActivity" > |
8 |
|
9 |
<Button
|
10 |
android:id="@+id/button" |
11 |
android:layout_width="wrap_content" |
12 |
android:layout_height="wrap_content" |
13 |
android:text="Start RxJava stream" /> |
14 |
|
15 |
</LinearLayout>
|
创建Observable有几种不同的方法,但最简单的方法之一是使用just()运算符将对象或对象列表转换为Observable。
在下面的代码中,我们创建了一个Observable(myObservable)并为其提供了要发出的项目1,2,3,4和5。 我们还创建了一个Observer(myObserver),将它订阅到myObservable,然后告诉它每次收到新的发射时都会向Logcat打印一条消息。
1 |
import androidx.appcompat.app.AppCompatActivity |
2 |
import android.os.Bundle |
3 |
import android.util.Log |
4 |
import io.reactivex.Observable |
5 |
import io.reactivex.Observer |
6 |
import io.reactivex.disposables.Disposable |
7 |
import kotlinx.android.synthetic.main.activity_main.* |
8 |
|
9 |
class MainActivity : AppCompatActivity() { |
10 |
|
11 |
private var TAG = "MainActivity" |
12 |
|
13 |
override fun onCreate(savedInstanceState: Bundle?) { |
14 |
super.onCreate(savedInstanceState) |
15 |
setContentView(R.layout.activity_main) |
16 |
|
17 |
//Start the stream when the button is clicked//
|
18 |
|
19 |
button.setOnClickListener { startRStream() } |
20 |
|
21 |
}
|
22 |
|
23 |
private fun startRStream() { |
24 |
|
25 |
//Create an Observable//
|
26 |
|
27 |
val myObservable = getObservable() |
28 |
|
29 |
//Create an Observer//
|
30 |
|
31 |
val myObserver = getObserver() |
32 |
|
33 |
//Subscribe myObserver to myObservable//
|
34 |
|
35 |
myObservable
|
36 |
.subscribe(myObserver) |
37 |
}
|
38 |
|
39 |
private fun getObserver(): Observer<String> { |
40 |
return object : Observer<String> { |
41 |
override fun onSubscribe(d: Disposable) { |
42 |
}
|
43 |
|
44 |
//Every time onNext is called, print the value to Android Studio’s Logcat//
|
45 |
|
46 |
override fun onNext(s: String) { |
47 |
Log.d(TAG, "onNext: $s") |
48 |
}
|
49 |
|
50 |
//Called if an exception is thrown//
|
51 |
|
52 |
override fun onError(e: Throwable) { |
53 |
Log.e(TAG, "onError: " + e.message) |
54 |
}
|
55 |
|
56 |
//When onComplete is called, print the following to Logcat//
|
57 |
|
58 |
override fun onComplete() { |
59 |
Log.d(TAG, "onComplete") |
60 |
}
|
61 |
}
|
62 |
}
|
63 |
|
64 |
//Give myObservable some data to emit//
|
65 |
|
66 |
private fun getObservable(): Observable<String> { |
67 |
return Observable.just("1", "2", "3", "4", "5") |
68 |
}
|
69 |
}
|
您现在可以将此应用程序用于测试:
- 在物理Android智能手机或平板电脑或Android虚拟设备(AVD)上安装项目。
- 单击“启动RxJava流”按钮。
- 通过选择Android Monitor选项卡(光标位于以下屏幕截图中),然后选择Logcat选项卡,打开Android Studio的Logcat Monitor。
此时,Observable将开始发送其数据,Observer将其消息打印到Logcat。 您的Logcat输出应如下所示:



如果您想亲自尝试,可以从GitHub下载此项目。
RxJava的Kotlin扩展
现在我们已经了解了如何在Kotlin中设置一个简单的RxJava管道,让我们看看如何使用RxKotlin的扩展函数以更少的代码实现这一点。
要使用RxKotlin库,您需要将其添加为项目依赖项:
1 |
dependencies {
|
2 |
implementation fileTree(dir: 'libs', include: ['*.jar']) |
3 |
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlin_version" |
4 |
implementation 'androidx.appcompat:appcompat:1.0.0-alpha1' |
5 |
implementation 'androidx.constraintlayout:constraintlayout:1.1.0' |
6 |
implementation 'io.reactivex.rxjava2:rxjava:2.1.9' |
7 |
|
8 |
//Add the following// |
9 |
|
10 |
implementation 'io.reactivex.rxjava2:rxkotlin:2.2.0' |
11 |
|
12 |
} |
在下面的示例中,我们使用RxKotlin的toObservable()扩展函数将List转换为Observable。 我们还使用了subscribeBy()扩展函数,因为它允许我们使用命名参数构造一个Observer,从而产生更清晰的代码。
1 |
import android.os.Bundle |
2 |
import androidx.appcompat.app.AppCompatActivity |
3 |
import io.reactivex.rxkotlin.subscribeBy |
4 |
import io.reactivex.rxkotlin.toObservable |
5 |
import kotlinx.android.synthetic.main.activity_main.* |
6 |
|
7 |
class MainActivity : AppCompatActivity() { |
8 |
|
9 |
override fun onCreate(savedInstanceState: Bundle?) { |
10 |
super.onCreate(savedInstanceState) |
11 |
setContentView(R.layout.activity_main) |
12 |
|
13 |
//Start the stream when the button is clicked//
|
14 |
|
15 |
button.setOnClickListener { startRStream() } |
16 |
|
17 |
}
|
18 |
|
19 |
private fun startRStream() { |
20 |
|
21 |
val list = listOf("1", "2", "3", "4", "5") |
22 |
|
23 |
//Apply the toObservable() extension function//
|
24 |
|
25 |
list.toObservable() |
26 |
|
27 |
//Construct your Observer using the subscribeBy() extension function//
|
28 |
|
29 |
.subscribeBy( |
30 |
|
31 |
onNext = { println(it) }, |
32 |
onError = { it.printStackTrace() }, |
33 |
onComplete = { println("onComplete!") } |
34 |
|
35 |
)
|
36 |
}
|
37 |
}
|
以下是您应该看到的输出:



解决RxJava的SAM模糊问题
当给定Java方法上存在多个SAM参数重载时,RxKotlin还为SAM转换问题提供了一个重要的解决方法。 这种SAM歧义混淆了Kotlin编译器,因为它无法确定它应该转换哪个接口,因此您的项目将无法编译。
当使用RxJava 2.0与Kotlin时,这种SAM模糊性是一个特殊问题,因为许多RxJava运算符采用多种SAM兼容类型。
让我们来看看运行中的SAM转换问题。 在下面的代码中,我们使用zip()运算符来组合两个Observable的输出:
1 |
import androidx.appcompat.app.AppCompatActivity |
2 |
import android.os.Bundle |
3 |
import io.reactivex.Observable |
4 |
import kotlinx.android.synthetic.main.activity_main.* |
5 |
|
6 |
class MainActivity : AppCompatActivity() { |
7 |
|
8 |
override fun onCreate(savedInstanceState: Bundle?) { |
9 |
super.onCreate(savedInstanceState) |
10 |
setContentView(R.layout.activity_main) |
11 |
|
12 |
//Start the stream when the button is clicked//
|
13 |
|
14 |
button.setOnClickListener { startRStream() } |
15 |
|
16 |
}
|
17 |
|
18 |
private fun startRStream() { |
19 |
|
20 |
val numbers = Observable.range(1, 6) |
21 |
|
22 |
val strings = Observable.just("One", "Two", "Three", |
23 |
|
24 |
"Four", "Five", "Six" ) |
25 |
|
26 |
val zipped = Observable.zip(strings, numbers) { s, n -> "$s $n" } |
27 |
zipped.subscribe(::println) |
28 |
}
|
29 |
}
|
这将导致Kotlin编译器抛出类型推断错误。 但是,RxKotlin为受影响的运算符提供了辅助方法和扩展函数,包括Observables.zip(),我们在以下代码中使用它:
1 |
import android.os.Bundle |
2 |
import androidx.appcompat.app.AppCompatActivity |
3 |
import io.reactivex.Observable |
4 |
import io.reactivex.rxkotlin.Observables |
5 |
import kotlinx.android.synthetic.main.activity_main.* |
6 |
|
7 |
class MainActivity : AppCompatActivity() { |
8 |
|
9 |
override fun onCreate(savedInstanceState: Bundle?) { |
10 |
super.onCreate(savedInstanceState) |
11 |
setContentView(R.layout.activity_main) |
12 |
|
13 |
//Start the stream when the button is clicked//
|
14 |
|
15 |
button.setOnClickListener { startRStream() } |
16 |
|
17 |
}
|
18 |
|
19 |
private fun startRStream() { |
20 |
|
21 |
val numbers = Observable.range(1, 6) |
22 |
|
23 |
val strings = Observable.just("One", "Two", "Three", |
24 |
|
25 |
"Four", "Five", "Six" ) |
26 |
|
27 |
val zipped = Observables.zip(strings, numbers) { s, n -> "$s $n" } |
28 |
zipped.subscribe(::println) |
29 |
}
|
30 |
|
31 |
|
32 |
}
|
这是这段代码的输出:



结论
在本教程中,我向您展示了如何在Kotlin项目中开始使用RxJava库,包括使用许多其他支持库,例如RxKotlin和RxBinding。 我们研究了如何使用扩展函数在Kotlin中创建简单的Observers和Observable,直到为Kotlin平台优化RxJava。
到目前为止,我们已经使用RxJava创建了发出数据的简单Observable,以及将这些数据打印到Android Studio的Logcat的观察者 - 但这不是你在现实世界中使用RxJava的方式!
在下一篇文章中,我们将了解RxJava如何帮助解决开发Android应用程序时遇到的实际问题。 我们将使用RxJava和Kotlin创建一个经典的注册屏幕。



