Unlimited Plugins, WordPress themes, videos & courses! Unlimited asset downloads! From \$16.50/m

# Reactive Programming Operators in RxJava 2

Difficulty:IntermediateLength:LongLanguages:

If your Android app is going to rack up those five-star reviews on Google Play, then it needs to be able to multi-task.

As a bare minimum, today’s mobile users expect to still be able to interact with your app while it's doing work in the background. This may sound straightforward, but Android is single-threaded by default, so if you’re going to meet your audience's expectations, then sooner or later you’re going to have to create some additional threads.

In the previous article in this series we got an introduction to RxJava, a reactive library for the JVM that can help you create Android applications that react to data and events as they occur. But you can also use this library to react to data and events simultaneously.

In this post, I’m going to show you how you can use RxJava’s operators to finally make concurrency on Android a pain-free experience. By the end of this article, you’ll know how to use RxJava operators to create additional threads, specify the work that should occur on these threads, and then post the results back to Android’s all-important main UI thread—all with just a few lines of code.

And, since no technology is perfect, I’ll also tell you about a major potential pitfall of adding the RxJava library to your projects—before showing you how to use operators to ensure this problem never occurs in your own Android projects.

## Introducing Operators

RxJava has an enormous collection of operators that are mainly intended to help you modify, filter, merge and transform the data that’s being emitted by your Observables. You’ll find the complete list of RxJava operators over at the official docs, and although no-one expects you to memorise every single operator, it’s worth spending some time reading through this list, just so you have a rough idea of the different data transformations you can perform.

RxJava’s list of operators is already pretty exhaustive, but if you can’t find the perfect operator for the data transformation you had in mind, then you can always chain multiple operators together. Applying an operator to an Observable typically returns another Observable, so you can just keep applying operators until you get the results you want.

There are far too many RxJava operators to cover in a single article, and the official RxJava docs already do a good job of introducing all the operators you can use for data transformations, so I’m going to be focusing on two operators that have the most potential to make your life as an Android developer easier: subscribeOn() and observeOn()

If your app is going to provide the best possible user experience, then it needs to be able to perform intensive or long-running tasks and perform multiple tasks simultaneously, without blocking Android’s all-important main UI thread.

For example, imagine your app needs to fetch some information from two different databases. If you perform both of these tasks one after the other on Android’s main thread, then not only is this going to take a significant amount of time, but the UI will be unresponsive until your app has finished retrieving every single piece of information from both databases. Not exactly a great user experience!

A far better solution is to create two additional threads where you can perform both of these tasks simultaneously without them blocking the main UI thread. This approach means that the work will be completed twice as fast, and the user will be able to continue interacting with your app’s user interface throughout. Potentially, your users may not even be aware that your app is performing some intensive and long-running work in the background—all the database information will simply appear in your application’s UI, as if by magic!

Out of the box, Android does provide a few tools that you can use to create additional threads, including Services and IntentServices, but these solutions are tricky to implement and can quickly result in complex, verbose code. Plus, if you don’t implement multithreading correctly, you may find yourself with an application that’s leaking memory and throwing all kinds of errors.

To make multithreading on Android even more headache-inducing, Android’s main UI thread is the only thread that can update your app’s user interface. If you want to update your app’s UI with the result of work that’s performed on any other thread, then you’ll typically need to create a Handler on the main UI thread, and then use this Handler to transfer data from your background thread to the main thread. This means more code, more complexity, and more opportunities for errors to creep into your project.

But RxJava features two operators that can help you avoid much of this complexity and potential for error.

Note that you use these operators in conjunction with Schedulers, which are essentially components that allow you to specify threads. For now, just think of scheduler as being synonymous with the word thread.

• subscribeOn(Scheduler): By default, an Observable emits its data on the thread where the subscription was declared, i.e. where you called the .subscribe method. In Android, this is generally the main UI thread. You can use the subscribeOn() operator to define a different Scheduler where the Observable should execute and emit its data.
• observeOn(Scheduler): You can use this operator to redirect your Observable’s emissions to a different Scheduler, effectively changing the thread where the Observable’s notifications are sent, and by extension the thread where its data is consumed.

RxJava comes with a number of schedulers that you can use to create different threads, including:

• Schedulers.io(): Designed to be used for IO-related tasks.
• Schedulers.computation(): Designed to be used for computational tasks. By default, the number of threads in the computation scheduler is limited to the number of CPUs available on your device.
• Schedulers.newThread(): Creates a new thread.

Now you have an overview of all the moving parts, let’s look at some examples of how subscribeOn() and observeOn() are used, and see some schedulers in action.

### subscribeOn()

In Android, you’ll typically use subscribeOn() and an accompanying Scheduler to change the thread where some long-running or intensive work is performed, so there’s no risk of blocking the main UI thread. For example, you might decide to import a large amount of data on the io() scheduler or perform some computations on the computation() scheduler.

In the following code, we’re creating a new thread where the Observable will execute its operations and emit the values 1, 2, and 3.

While this is all you need to create a thread and start emitting data on that thread, you may want some confirmation that this observable really is operating on a new thread. One method is to print the name of the thread that your application is currently using, in Android Studio’s Logcat Monitor.

Conveniently, in the previous post, Get Started With RxJava, we created an application that sends messages to Logcat Monitor at various stages during the Observable’s lifecycle, so we can reuse a lot of this code.

Open the project you created in that post, and tweak your code so it’s using the above Observable as its source Observable. Then add the subscribeOn() Operator and specify that the messages being sent to Logcat should include the name of the current thread.

Your finished project should look something like this:

Make sure Android Studio’s Logcat Monitor is open (by selecting the Android Monitor tab, followed by Logcat) and then run your project on either a physical Android device or an AVD. You should see the following output in the Logcat Monitor:

Here, you can see that .subscribe is being called on the main UI thread, but the observable is operating on a completely different thread.

The subscribeOn() operator will have the same effect no matter where you place it in the observable chain; however, you can't use multiple subscribeOn() operators in the same chain. If you do include more than one subscribeOn(), then your chain will only use the subscribeOn() that’s the closest to the source observable.

### observeOn()

Unlike subscribeOn(), where you place observeOn() in your chain does matter, as this operator only changes the thread that’s used by the observables that appear downstream

For example, if you inserted the following into your chain then every observable that appears in the chain from this point onwards will use the new thread.

This chain will continue to run on the new thread until it encounters another observeOn() operator, at which point it’ll switch to the thread specified by that operator. You can control the thread where specific observables send their notifications by inserting multiple observeOn() operators into your chain.

When developing Android apps, you’ll generally use observeOn() to send the result of work performed on background threads to Android’s main UI thread. The easiest way to redirect emissions to Android’s main UI thread is to use the AndroidSchedulers.mainThread Scheduler, which is included as part of the RxAndroid library, rather than the RxJava library.

The RxAndroid library includes Android-specific bindings for RxJava 2, making it a valuable additional resource for Android developers (and something we’ll be looking at in much more detail in the next post of this series).

After adding this library to your project, you can specify that the results of an observable should be sent to your app’s main UI thread, using a single line of code:

Considering that communicating with your app’s main UI thread takes up a full page of the official Android docs, this is a huge improvement that could potentially save you a lot of time when creating multithreaded Android applications.

## RxJava’s Major Drawback

While RxJava has plenty to offer Android developers, no technology is perfect, and RxJava does have one major pitfall that has the potential to crash your app.

By default, RxJava operates a push-based workflow: data is produced upstream by an Observable, and is then pushed downstream to the assigned Observer. The major issue with a push-based workflow is how easy it is for the producer (in this instance, the Observable) to emit items too quickly for the consumer (Observer) to process.

A chatty Observable and a slow Observer can quickly result in a backlog of unconsumed items, which is going to gobble up system resources and may even result in an OutOfMemoryException. This problem is known as backpressure.

If you suspect that backpressure is occurring in your app, then there are a few possible solutions, including using an operator to reduce the number of items being produced.

### Creating Sampling Periods With sample() and throttlefirst()

If an Observable is emitting a large number of items, then it may not be necessary for the assigned Observer to receive every single one of those items.

If you can safely ignore some of an Observable’s emissions, then there are a few operators you can use to create sampling periods, and then cherry-pick specific values that are emitted during these periods:

• The sample() Operator checks the Observable’s output at intervals specified by you, and then takes the most recent item that was emitted during that sampling period. For example, if you include .sample(5, SECONDS) in your project then the Observer will receive the last value that was emitted during each five-second interval.
• The throttleFirst() Operator takes the first value that was emitted during the sampling period. For example, if you include .throttlefirst(5, SECONDS) then the Observer will receive the first value that’s emitted during each five-second interval.

### Batching Emissions With buffer()

If you can’t safely skip any emissions, then you may still be able to take some pressure off a struggling Observer by grouping emissions into batches and then sending onward en masse. Processing batched emissions is typically more efficient than processing multiple emissions separately, so this approach should improve the rate of consumption.

You can create batched emissions using the buffer() operator. Here, we’re using buffer() to batch all the items emitted over a three-second period:

Alternatively, you can use buffer() to create a batch consisting of a specific number of emissions. For example, here we’re telling buffer() to bundle emissions into groups of four:

### Replacing Observables With Flowables

An alternative method of reducing the number of emissions is to replace the Observable that’s causing you problems with a Flowable.

In RxJava 2, the RxJava team decided to split the standard Observable into two types: the regular kind we’ve been looking at throughout this series, and Flowables.

Flowables function in much the same way as Observables, but with one major difference: Flowables only send as many items as the observer requests. If you have an Observable that’s emitting more items than its assigned observer can consume, then you may want to consider switching to a Flowable instead.

Before you can start using Flowables in your projects, you need to add the following import statement:

You can then create Flowables using exactly the same techniques used to create Observables. For example, each of the following code snippets will create a Flowable that’s capable of emitting data:

At this point, you may be wondering: why would I ever use Observables when I can just use Flowables and not have to worry about backpressure? The answer is that a Flowable incurs more of an overhead than a regular Observable, so in the interests of creating a high-performing app, you should stick with Observables unless you suspect that your application is struggling with backpressure.

### Singles

A Flowable isn’t the only variation on Observable that you’ll find in RxJava, as the library also includes the Single class.

Singles are useful when you just need to emit one value. In these scenarios, creating an Observable can feel like overkill, but a Single is designed to simply emit a single value and then complete, either by calling:

• onSuccess(): The Single emits its sole value.
• onError(): If the Single is unable to emit its item, then it’ll pass this method the resulting Throwable.

A Single will call one of these methods only, and then immediately terminate.

Let’s look at an example of a Single in action—again, to save time we’re reusing code:

Run your project on an AVD or physical Android device, and you’ll see the following output in Android Studio’s Logcat Monitor:

If you change your mind and want to convert a Single into an Observable at any point, then once again RxJava has all the operators you need, including:

• mergeWith(): Merges multiple Singles into a single Observable
• concatWith(): Chains the items emitted by multiple Singles together, to form an Observable emission.
• toObservable(): Converts a Single into an Observable that emits the item that was originally emitted by the Single, and then completes.

## Summary

In this post we explored some RxJava operators that you can use to create and manage multiple threads, without the complexity and potential for error that’s traditionally accompanied multithreading on Android. We also saw how you can use the RxAndroid library to communicate with Android’s all-important main UI thread using a single line of code, and how to ensure backpressure doesn’t become a problem in your application.

We’ve touched on the RxAndroid library a few times throughout this series, but this library is packed with Android-specific RxJava bindings that can be invaluable when working with RxJava on the Android platform, so in the final post in this series we’ll be looking at the RxAndroid library in much more detail.

Until then, check out some of our other posts on coding for Android!