Operadores de Programación Reactiva en RxJava 2
() translation by (you can also view the original English article)
Si deseas que tu app Android acumule montones de calificaciones de cinco estrellas en Google Play, entonces debe ser capaz de realizar múltiples tareas a la vez.
Como mínimo, los usuarios móviles de hoy esperan poder interactuar con tu app aún cuando está haciendo algún trabajo en segundo plano. Esto puede sonar muy sencillo, pero Android ejecuta un único subproceso (single-threaded) por defecto, así que, si quieres cumplir con las expectativas de tu público, tarde o temprano deberás crear hilos adicionales.
En el artículo anterior de esta serie, hicimos una introducción a RxJava, una librería reactiva para la JVM que puede ayudarte a crear aplicaciones Android que reaccionen a los diferentes datos y eventos a medida que ocurren. Pero también puedes usar esta librería para reaccionar a datos y eventos simultáneamente.
En este post, te mostraré cómo puedes utilizar los operadores de RxJava para finalmente lograr concurrencia en Android, mediante una experiencia indolora. Cuando llegues al final de este artículo, sabrás cómo emplear los operadores de RxJava para crear hilos adicionales, especificar el trabajo que debería realizar cada uno, y luego devolver los resultados al tan importante subproceso principal, el de la UI—todo esto con sólo unas pocas líneas de código.
Además, como ninguna tecnología es perfecta, te contaré también sobre un potencial riesgo de agregar la librería RxJava a tus proyectos—antes de mostrarte cómo utilizar operadores para asegurarte de que este problema nunca ocurra en tus proyectos Android.
Introducción A Los Operadores
RxJava posee una colección enorme de operadores pensados principalmente para ayudarte a modificar, filtrar, combinar y transformar los datos emitidos por tus Observable
s. Puedes ver la lista completa de operadores RxJava en la documentación oficial, y aunque nadie espera que memorices cada uno de los operadores, vale la pena que inviertas algo de tiempo en leer esta lista, sólo para que tengas una idea aproximada de los diferentes tipos de transformaciones que puedes aplicar a tus datos.
La lista de operadores RxJava ya es de por sí bastante exhaustiva, pero si no puedes hallar el operador perfecto para la transformación de datos que tienes en mente, siempre puedes encadenar múltiples operadores. La aplicación de un operador a un Observable
típicamente retorna otro Observable
, así que puedes seguir aplicando operadores hasta que obtengas los resultados que deseas.
Existen demasiados operadores RxJava para cubrirlos en un sólo artículo, y la documentación oficial de RxJava ya hace el buen trabajo de presentar todos los operadores que puedes usar para transformación de datos, así que me enfocaré en dos operadores que poseen el mayor potencial para facilitar tu vida como desarrollador Android: subscribeOn()
y observeOn()
.
Multithilos Con Operadores RxJava
Si quieres que tu app brinde la mejor experiencia de usuario posible, necesita ser capaz de ejecutar tareas intensivas o de larga ejecución y ejecutar múltiples tareas simultáneamente, sin bloquear el importante hilo de la UI.
Por ejemplo, imagina que tu app necesita obtener cierta información de dos bases de datos diferentes. Si ejecutas ambas tareas una detrás de la otra en el hilo principal de Android, no sólo tomará una cantidad significativa de tiempo, sino que además la UI no responderá hasta que tu app haya terminado de obtener toda la información de ambas bases de datos. ¡No es exactamente una gran experiencia de usuario!
Una solución mucho mejor es crear dos hilos adicionales donde puedas ejecutar ambas tareas simultáneamente, sin que bloqueen el subproceso principal de la UI. Este enfoque hace que el trabajo se complete al doble de velocidad, y el usuario podrá continuar interactuando con la interfaz de usuario de tu app mientras tanto. Potencialmente, tu usuarios ni siquera noten que tu app está ejecutando trabajo intensivo y de larga duración en segundo plano—toda la información de la base de datos simplemente aparecerá en la UI de tu aplicación, ¡como por arte de magia!
Por defecto, Android provee algunas herramientas que puedes utilizar para crear subprocesos adicionales, incluyendo Service
s e IntentService
s, pero estas soluciones requieren cuidado al momento de ser implementadas, y pueden resultar rápidamente en código complejo y verborrágico. Además, si no implementas multihilos correctamente, puedes ocasionar pérdidas de memoria y todo tipo de errores.
Como si la implementación de multihilos en Android no fuera suficientemente dolorosa, el hilo principal de la UI es el único subproceso capaz de actualizar la interfaz de usuario de tu app. Si deseas actualizar la UI de tu app con el resultado de una tarea ejecutada en cualquier otro subproceso, generalmente deberás crear un Handler
en el hilo principal de la UI, y luego utilizar este Handler
para transferir datos desde tu hilo en segundo plano hacia el hilo principal. Esto implica más código, mayor complejidad, y más oportunidades para que surjan errores en tu proyecto.
Pero RxJava contiene dos operadores que pueden ayudarte a evitar gran parte de esta complejidad y probabilidad de errores.
Ten en cuenta que estos operadores se utilizan en conjunto con Schedulers
, que esencialmente son componentes que te permiten especificar hilos. Por ahora, piensa a un scheduler como sinónimo de hilo.
-
subscribeOn(Scheduler)
: Por defecto, unObservable
emite sus datos en el subproceso donde fue declarada la suscripción, es decir, donde has invocado el método.susbscribe
. En Android, generalmente es el hilo principal de la UI. Puedes utilizar el operadorsubscribeOn()
para definir unScheduler
diferente donde elObservable
debe ejecutarse y emitir sus datos. -
observeOn(Scheduler)
: Puedes usar este operador para redirigir las emisiones de tuObservable
a unScheduler
diferente, cambiando efectivamente el subproceso adónde se envían las notificaciones delObservable
, y por extensión, el subproceso donde se consumen sus datos.
RxJava trae varios schedulers que puedes usar para crear diferentes hilos, incluyendo:
-
Schedulers.io()
: Diseñado para utilizarse en tareas relacionadas con E/S. -
Schedulers.computation()
: Diseñado para ser usado en tareas computacionales. Por defecto, el número de hilos en el scheduler está limitado al número de CPUs disponibles en tu dispositivo. -
Schedulers.newThread()
: Crea un nuevo subproceso.
Ahora que tienes una idea general de todas las partes móviles, echemos un vistazo a algunos ejemplos de cómo usar subscribeOn()
y observeOn()
, y algunos schedulers en acción.
subscribeOn()
En Android, típicamente utilizarás subscribeOn()
y un Scheduler
acompañante para cambiar el hilo donde se ejecuta alguna tarea intensiva o de larga duración, así que no hay riesgo de bloquear el hilo de la UI. Por ejemplo, tal vez decidas importar una gran cantidad de datos en el scheduler io()
o ejecutar algunos cálculos en el scheduler computation()
.
En el siguiente código, creamos un nuevo subproceso donde el Observable
llevará a cabo sus operaciones y emitirá los valores 1
, 2
, y 3
.
1 |
Observable.just(1, 2, 3) |
2 |
.subscribeOn(Schedulers.newThread()) |
3 |
.subscribe(Observer); |
Si bien esto es todo lo que necesitas para crear un hilo y comenzar a emitir datos en el mismo, puede que desees algún tipo de confirmación de que este observable realmente está operando en un nuevo hilo. Un método es imprimir el nombre del hilo que tu aplicación está utilizando en el momento, en el Monitor Logcat de Android Studio.
Convenientemente, en el post anterior, Comenzando Con RxJava, creamos una aplicación que envía mensajes al Monitor Logcat en varias etapas durante el ciclo de vida del Observable, así que podemos reutilizar gran parte de este código.
Abre el proyecto que creaste en ese post, y modifica tu código para que utilice el Observable
de arriba como su Observable
fuente. Luego, añade el operador subscribeOn()
y especifica que los mensajes enviados al Logcat deben incluir el nombre del hilo corriente.
Tu proyecto terminado debería verse parecido a esto:
1 |
import android.support.v7.app.AppCompatActivity; |
2 |
import android.os.Bundle; |
3 |
import android.util.Log; |
4 |
import io.reactivex.Observable; |
5 |
import io.reactivex.Observer; |
6 |
import io.reactivex.disposables.Disposable; |
7 |
import io.reactivex.schedulers.Schedulers; |
8 |
|
9 |
public class MainActivity extends AppCompatActivity { |
10 |
public static final String TAG = "MainActivity"; |
11 |
|
12 |
@Override
|
13 |
protected void onCreate(Bundle savedInstanceState) { |
14 |
super.onCreate(savedInstanceState); |
15 |
setContentView(R.layout.activity_main); |
16 |
|
17 |
Observable.just(1, 2, 3) |
18 |
.subscribeOn(Schedulers.newThread()) |
19 |
.subscribe(Observer); |
20 |
}
|
21 |
|
22 |
Observer<Integer> Observer = new Observer<Integer>() { |
23 |
|
24 |
@Override
|
25 |
public void onSubscribe(Disposable d) { |
26 |
Log.e(TAG, "onSubscribe" + Thread.currentThread().getName()); |
27 |
}
|
28 |
|
29 |
@Override
|
30 |
public void onNext(Integer value) { |
31 |
Log.e(TAG, "onNext: " + value + Thread.currentThread().getName()); |
32 |
}
|
33 |
|
34 |
|
35 |
@Override
|
36 |
public void onError(Throwable e) { |
37 |
Log.e(TAG, "onError: "); |
38 |
}
|
39 |
|
40 |
@Override
|
41 |
public void onComplete() { |
42 |
Log.e(TAG, "onComplete: All Done!" + Thread.currentThread().getName()); |
43 |
}
|
44 |
|
45 |
};
|
46 |
|
47 |
}
|
Asegúrate de que el Monitor Logcat de Android Studio esté abierto (seleccionando la pestaña Android Monitor, seguido de Logcat) y luego corre tu proyecto en un dispositivo Android físico o un AVD. Deberías ver lo siguiente en el Monitor Logcat:



Aquí, puedes ver que .subscribe
está siendo invocado en el subproceso principal de la UI, pero el observable está operando en un hilo totalmente diferente.
El operador subscribeOn()
tendrá el mismo efecto sin importar dónde lo coloques en la cadena de observables; sin embargo, no puedes utilizar múltiples operadores subscribeOn()
en la misma cadena. Si incluyes más de un subscribeOn()
, tu cadena sólo utilizará el subscribeOn()
más cercano al observable fuente.
observeOn()
Al contrario que subscribeOn()
, sí importa dónde coloques observeOn()
en tu cadena, ya que este operador sólo cambia el hilo que utilizan los observables que aparecen debajo.
Por ejemplo, si insertas lo siguiente en tu cadena, cada observable que aparezca en la misma a partir de este punto utilizará el nuevo hilo.
1 |
.observeOn(Schedulers.newThread()) |
Esta cadena continuará ejecutándose en el nuevo subproceso, hasta que encuentre otro operador observeOn()
, y en ese punto cambiará al hilo especificado por ese operador. Puedes controlar el hilo donde ciertos observables específicos envían sus notificaciones insertando múltiples operadores observeOn()
en tu cadena.
Al desarrollar apps Android, generalmente utilizarás observeOn()
para enviar el resultado del trabajo realizado en subprocesos en segundo plano al hilo de la UI de Android. La manera más fácil de redirigir emisiones al hilo de la UI es utilizar el Scheduler AndroidSchedulers.mainThread
, que está incluido como parte de la librería RxAndroid, no en la librería RxJava.
La librería RxAndroid incluye bindings específicos de Android para RxJava 2, lo que la convierte en un valorable recurso adicional para los desarrolladores Android (y algo que veremos con mucho más detalle en el próximo post de esta serie).
Para añadir RxAndroid a tu proyecto, abre el archivo build.gradle que está a nivel de módulo y agrega la última versión de la librería a la sección de dependencias. Mientras se escribe este artículo, la versión más reciente de RxAndroid es la 2.0.1, así que yo agregaré lo siguiente:
1 |
dependencies { |
2 |
... |
3 |
... |
4 |
... |
5 |
compile 'io.reactivex.rxjava2:rxandroid:2.0.1' |
6 |
} |
Luego de integrar esta librería en tu proyecto, puedes especificar que los resultados de un observable se envíen al hilo de la UI de tu app, utilizando una sola línea de código:
1 |
.observeOn(AndroidSchedulers.mainThread()) |
Considerando que la comunicación con el subproceso principal de la UI de tu app abarca una página entera de la documentación oficial de Android, esta es una gran mejor que potencialmente puede ahorrarte mucho tiempo al crear aplicaciones Android multihilo.
El Mayor Inconveniente de RxJava
Si bien RxJava tiene mucho que ofrecerles a los desarrolladores Android, ninguna tecnología es perfecta, y RxJava posee un problema importante que potencialmente podría causar crashes en tu app.
Por defecto, RxJava opera con un flujo de trabajo basado en push: los datos son producidos primero por un Observable
, y luego se dirigen hacia el Observer
asignado. El mayor problema con este flujo de trabajo basado en push es cuán fácil es para el productor (en esta instancia, el Observable
) emitir ítems más rápidamente de lo que el consumidor (Observer
) puede procesar.
Un Observable
muy verborrágico en combinación con un Observer
lento puede resultar rápidamente en acumulación de ítems sin consumir, que podrían devorar los recursos del sistema y resultar en una OutOfMemoryException
. Este problema se conoce como backpressure o contrapresión.
Si sospechas que tienes un caso de contrapresión en tu app, existen algunas soluciones posibles, incluyendo la utilización de un operador para reducir el número de ítems que se producen.
Creando Períodos de Muestreo Con sample()
y throttleFirst()
Si un Observable
está emitiendo un gran número de ítems, puede que no sea necesario que el Observer
asignado reciba cada uno de ellos.
Si en tu app puedes ignorar algunas de las emisiones de un Observable
de manera segura, entonces puedes valerte de algunos operadores que sirven para crear períodos de muestreo, y luego seleccionar a mano valores específicos emitidos durante esos períodos:
- El Operador
sample()
chequea la salida del Observable en los intervalos que especifiques, y luego toma el ítem más reciente emitido durante dicho período de muestreo. Por ejemplo, si incluyes.sample(5, SECONDS)
en tu proyecto, el Observer recibirá el último valor emitido durante cada intervalo de cinco segundos. - El Operador
throttleFirst()
toma el primer valor emitido durante el período de muestreo. Por ejemplo, si incluyes.throttleFirst(5, SECONDS)
, el Observer recibirá el primer valor emitido durante cada intervalo de cinco segundos.



Agrupando Emisiones Con buffer()
Si no puedes saltear emisiones de forma segura, todavía existe la posibilidad de quitar un poco de presión de un Observer
en apuros, agrupando emisiones en paquetes y enviándolos luego en masse. El procesamiento de emisiones en lote es típicamente más eficiente que el procesamiento de múltiples emisiones separadas, así que este enfoque debería mejorar la tasa de consumo.
Puedes crear emisiones agrupadas usando el operador buffer()
. Aquí, utilizamos buffer()
para agrupar todos los ítems en un período de tres segundos:
1 |
Observable.range(0, 10) |
2 |
.buffer(3, SECONDS) |
3 |
.subscribe(System.out::println); |



Como alternativa, puedes emplear buffer()
para crear un paquete constituido por una cantidad específica de emisiones. Por ejemplo, aquí le pedimos a buffer()
que empaquete emisiones en grupos de cuatro:
1 |
Observable.range(0, 10) |
2 |
.buffer(4) |
3 |
.subscribe(System.out::println); |
Reemplazando Observables Con Flowables
Un método alternativo de reducir el número de emisiones es reemplazar el Observable
causante de problemas por un Flowable
.
En RxJava2, el equipo de RxJava decidió separar el Observable
estándar en dos tipos: el tipo regular que hemos visto a lo largo de esta serie, y los Flowable
s.
El funcionamiento de los Flowable
s es muy similar al de los Observable
s, pero con una diferencia importante: los Flowable
s sólo envían tantos ítems como solicite el Observer. Si tienes un Observable
que emite más ítems de los que su Observer asignado puede consumir, quizás quieras considerar cambiarlo por un Flowable
.
Antes de que puedas empezar a usar Flowable
s en tus proyectos, necesitas agregar la siguiente sentencia import:
1 |
import io.reactivex.Flowable; |
Luego, puedes crear Flowable
s usando exactamente las mismas técnicas utilizadas para crear Observable
s. Por ejemplo, cada una de las siguientes porciones de código creará un Flowable
capaz de emitir datos:
1 |
Flowable<String> flowable = Flowable.fromArray(new String[] {"south", "north", "west", “east”}); |
2 |
...
|
3 |
flowable.subscribe() |
1 |
Flowable<Integer> flowable = Flowable.range(0, 20); |
2 |
...
|
3 |
flowable.subscribe() |
En este punto, te estarás preguntando: ¿por qué usaría Observable
s si puedo usar directamente Flowable
s y no preocuparme por la contrapresión? La respuesta es que un Flowable
provoca más sobrecarga en el sistema que un Observable
regular, así que, en con vistas a crear una app de alto rendimiento, deberías apegarte a los Observable
s a menos que sospeches que tu app está luchando contra la contrapresión.
Singles
El Flowable
no es la única variación de Observable
que hallarás en RxJava, ya que la librería también incluye la clase Single
.
Los Single
s son útiles cuando necesitas simplemente emitir un único valor. En estos escenarios, la creación de un Observable
puede parecer una exageración, pero el Single
está diseñado para simplemente emitir un único valor y luego completarse, invocando:
-
onSuccess()
: ElSingle
emite su único valor. -
onError()
: Si elSingle
es incapaz de emitir su ítem, devolverá a este método elThrowable
resultante.
Un Single
invocará sólo uno de estos métodos, e inmediatamente terminará su ejecución.
Veamos un ejemplo de un Single
en acción—nuevamente, para ahorrar tiempo reutilizaremos código:
1 |
import android.os.Bundle; |
2 |
import android.support.v7.app.AppCompatActivity; |
3 |
import android.util.Log; |
4 |
import io.reactivex.Single; |
5 |
import io.reactivex.SingleObserver; |
6 |
import io.reactivex.disposables.Disposable; |
7 |
|
8 |
public class MainActivity extends AppCompatActivity { |
9 |
public static final String TAG = "MainActivity"; |
10 |
|
11 |
@Override
|
12 |
protected void onCreate(Bundle savedInstanceState) { |
13 |
super.onCreate(savedInstanceState); |
14 |
setContentView(R.layout.activity_main); |
15 |
|
16 |
Single.just("Hello World") |
17 |
.subscribe(getSingleObserver()); |
18 |
}
|
19 |
|
20 |
private SingleObserver<String> getSingleObserver() { |
21 |
return new SingleObserver<String>() { |
22 |
@Override
|
23 |
public void onSubscribe(Disposable d) { |
24 |
Log.e(TAG, "onSubscribe"); |
25 |
}
|
26 |
|
27 |
@Override
|
28 |
public void onSuccess(String value) { |
29 |
Log.e(TAG, " onSuccess : " + value); |
30 |
}
|
31 |
|
32 |
@Override
|
33 |
public void onError(Throwable e) { |
34 |
Log.e(TAG, "onError: "); |
35 |
}
|
36 |
|
37 |
};
|
38 |
|
39 |
}
|
40 |
|
41 |
}
|
Ejecuta tu proyecto en un AVD o dispositivo Android físico, y verás la siguiente salida en el Monitor Logcat de Android Studio:



Si cambias de parecer y deseas convertir un Single
en un Observable
en algún punto, de nuevo RxJava posee todos los operadores que necesitas, incluyendo:
-
mergeWith()
: Combina múltiplesSingle
s en un soloObservable
. -
concatWith()
: Encadena los ítems emitidos por múltiplesSingle
s, para conformar una emisión deObservable
. -
toObservable()
: Convierte unSingle
en unObservable
que emite el ítem originalmente emitido por el Single, y luego se completa.
Resumen
En este post exploramos algunos operadores de RxJava que puedes usar para crear y administrar múltiples subprocesos, sin la complejidad y potencial de errores que tradicionalmente ha acompañado al multithreading en Android. También vimos cómo puedes emplear la librería RxAndroid para comunicarte con el importantísimo hilo de la UI de Android utilizando una sola línea de código, y cómo asegurarte de que la contrapresión no represente un problema en tu aplicación.
Hemos abordado la librería RxAndroid pocas veces a lo largo de esta serie, pero esta librería provee bindings de RxJava específicos para Android, que pueden resultar invaluables al trabajar con RxJava en la plataforma Android, por ende el post final de esta serie analizará la librería RxAndroid con mucho más detalle.
¡Hasta entonces, revisa algunos de nuestros otros posts sobre desarrollo Android!
- Android SDK¿Qué Hay de Nuevo en Firebase? Actualizaciones Desde la Firebase Dev SummitChike Mgbemena
- Android SDKComo Hacer Llamadas y Usar SMS en Apps AndroidChike Mgbemena
- Android SDK6 Normas para una Grandiosa Experiencia de Usuario AndroidJessica Thornsby
- AndroidIntroducción a Android ThingsPaul Trebilcox-Ruiz