Advertisement
  1. Code
  2. Android SDK
Code

Concurrency in RxJava 2

by
Difficulty:IntermediateLength:MediumLanguages:

A multithreaded app has two or more parts that can run in parallel. This lets the app make better use of the cores inside the device CPU. This lets it get tasks done faster and leads to a smoother and more responsive experience for the user. 

Coding for concurrency in Java can be painful, but thanks to RxJava, it is now much easier to do. With RxJava, you just need to declare the thread on which you want the task to be executed (declaratively) instead of creating and managing threads (imperatively). 

RxJava makes use of Schedulers along with the subscribeOn() and observeOn() concurrency operators to achieve this. In this tutorial, you'll learn about Schedulers, the subscribeOn() operator, the observeOn() operator, and also how to leverage the flatMap() operator to achieve concurrency. But first, let's start with Schedulers in RxJava.

Prerequisites 

To follow along with this tutorial, you should be familiar with:

Check out our other posts to get up to speed on the basics of RxJava and lambda expressions.

Schedulers in RxJava 2

Schedulers in RxJava are used to execute a unit of work on a thread. A Scheduler provides an abstraction to the Android and Java threading mechanism. When you want to run a task and you make use of a Scheduler to execute that task, the Scheduler goes to its thread pool (a collection of threads that are ready for use) and then runs the task in an available thread. 

You can also specify that a task should run in one specific thread. (There are two operators, subscribeOn() and observeOn(), which can be used to specify on which thread from the Scheduler thread pool the task should be executed.)

As you know, in Android, long-running processes or CPU-intensive tasks should not be run on the main thread. If a subscription by an Observer to an Observable is conducted on the main thread, any associated operator will run on the main thread also. In the case of a long-running task (e.g. performing a network request) or a CPU-intensive task (e.g. image transformation), this will block the UI until the task is finished, leading to the awful ANR (Application Not Responding) dialog and the app crashing. These operators can instead be switched over to another thread with the observeOn() operator. 

In the next section, we are going to explore the different kinds of Schedulers and their uses.

Types of Schedulers

Here are some of the types of Schedulers available in RxJava and RxAndroid to indicate the kind of thread to execute tasks on. 

  • Schedulers.immediate(): returns a Scheduler which executes the work instantly in the current thread. Be aware that this will block the current thread, so it should be used with caution. 
  • Schedulers.trampoline(): schedules tasks in the current thread. These tasks are not executed immediately but instead are executed after the thread has finished its current tasks. This is different from Schedulers.immediate() because instead of executing a task immediately, it waits for the current tasks to complete. 
  • Schedulers.newThread(): fires up a new thread and returns a Scheduler to execute the task in the new thread for each Observer. You should be careful using this because the new thread is not reused afterwards but instead is destroyed. 
  • Schedulers.computation(): this gives us a Scheduler that is intended for computationally intensive work such as image transformation, complex calculations, etc. This operation fully employs the CPU cores. This Scheduler uses a fixed thread pool size which is dependent on the CPU cores for optimal usage. You should be careful not to create more threads than the available CPU cores because this can reduce performance. 
  • Schedulers.io(): creates and returns a Scheduler designated for I/O-bound work such as performing asynchronous network calls or reading and writing to the database. These tasks are not CPU-intensive or else make use of Schedulers.computation().
  • Schedulers.single(): creates and returns a Scheduler and executes several tasks sequentially in a single thread. 
  • Schedulers.from(Executor executor): this will create a Scheduler that will execute a task or unit of work on the given Executor
  • AndroidSchedulers.mainThread(): this will create a Scheduler that executes the task on the Android application main thread. This scheduler type is provided by the RxAndroid library. 

The subscribeOn() Operator

By using the subscribeOn() concurrency operator, you specify that the Scheduler should perform the operation in the Observable upstream. It will then push the values to the Observers using the same thread. Now let's see a practical example:

In the code above, we have a static ArrayList which contains some states in Nigeria. We also have a field which is of type Disposable. We get the Disposable instance by calling Observable.subscribe(), and we'll use it later when we call the dispose() method to release any resources that were used. This helps to prevent memory leaks. Our dataSource() method (which can return data from a remote or local database source) will return ObservableOnSubscribe<T>: this is required for us to create our own Observable later using the method Observable.create()

Inside the dataSource() method, we loop through the array, emitting each element to the Observers by calling emitter.onNext(). After each value is emitted, we sleep the thread so as to simulate intensive work being performed. Finally, we call the onComplete() method to signal to the Observers that we are done passing values and they shouldn't expect any more. 

Now, our dataSource() method should not be executed on the main UI thread. But how is this specified? In the example above, we provided Schedulers.newThread() as an argument to subscribeOn(). This means that the dataSource() operation will be run in a new thread. Note also that in the example above, we have just one Observer. If we had multiple Observers, each of them would get its own thread. 

So that we can see this working, our Observer prints out the values it gets in its onNext() method from the Observable

When we run this and view our logcat on Android Studio, you can see that the emissions from the dataSource() method to the Observer happened on the same thread—RxNewThreadScheduler-1—in which the Observer received them. 

Android Studio logcat result showing execution logs on a single thread

If you don't specify the .subscribeOn() method after the Observable.create() method, it will be executed on the current thread—which in our case is the main thread, thereby blocking the app UI. 

Android Studio Logcat showing execution logs on main thread

There are some important details you should be aware of concerning the subscribeOn() operator. You should only have one subscribeOn() in the Observable chain; adding another one anywhere in the chain will have no effect at all. The recommended place to put this operator is as close to the source as possible for the sake of clarity. In other words, place it first in the operator chain. 

The observeOn() Operator

As we saw, the subscribeOn() concurrency operator will instruct the Observable which Scheduler to use to push emissions forward along the Observable chain to the Observers

The job of the observeOn() concurrency operator, on the other hand, is to switch the subsequent emissions over to another thread or Scheduler. We use this operator to control on what thread downstream consumers will receive the emissions. Let's see a practical example. 

In the code above, we used the observeOn() operator and then passed the AndroidSchedulers.mainThread() to it. What we have done is to switch the thread from Schedulers.newThread() to the Android main thread. This is necessary because we want to update the TextView widget, and can only do that from the main UI thread. Note that if you don't switch over to the main thread when you try to update the TextView widget, the app will crash and throw a CalledFromWrongThreadException

Unlike the subscribeOn() operator, the observeOn() operator can be applied multiple times in the operator chain, thereby changing the Scheduler more than once. 

This code has two observeOn() operators. The first one uses the Schedulers.io(), which means that the saveToCache() method will be executed on the Schedulers.io() thread. After that, it then switches over to the AndroidSchedulers.mainThread() where Observers will receive the emissions from the upstream. 

Android Studio logcat result showing logs

Concurrency With the flatMap() Operator

The flatMap() operator is another very powerful and important operator that can be used to achieve concurrency. The definition according to the official documentation is as follows:

Transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable.

FlatMap operator diagram

Let's take a look at a practical example that uses this operator: 

This will print the following on Android Studio logcat:

From the result above, you can see that the results we got were in the same order as in the array. Also, the getPopulation() method for each state was processed on the same thread—the main thread. This makes the output result slow because they were processed sequentially on the main thread. 

Now, in order for us to achieve concurrency with this operator, we want the getPopulation() method for each state (emissions from the statesObservable) to be processed on different threads. Doing this will lead to faster processing. We'll use the flatMap() operator to do this because it creates a new Observable for each emission. We then apply the subscribeOn() concurrency operator to each one, passing a Scheduler to it. 

As each emission produces an Observable, the flatMap() operator's job is to merge them together and then send them out as a single stream. 

In the result above, we can observe that each state's getPopulation() method was processed on different threads. This makes the processing much faster, but also observe that the emissions from the flatMap() operator which were received by the Observer are not in the same order as the original emissions upstream. 

Conclusion

In this tutorial, you learned about handling concurrency using RxJava 2: what it is, the different Schedulers available, and how to use the subscribeOn() and observeOn() concurrency operators. I also showed you how to use the flatMap() operator to achieve concurrency. 

In the meantime, check out some of our other courses and tutorials on the Java language and Android app development!

Advertisement
Advertisement
Looking for something to help kick start your next project?
Envato Market has a range of items for sale to help get you started.