Advertisement
  1. Code
  2. Android SDK

Operadores de Programación Reactiva en RxJava 2

Scroll to top
Read Time: 15 min

() translation by (you can also view the original English article)

Si deseas que tu app Android acumule montones de calificaciones de cinco estrellas en Google Play, entonces debe ser capaz de realizar múltiples tareas a la vez.

Como mínimo, los usuarios móviles de hoy esperan poder interactuar con tu app aún cuando está haciendo algún trabajo en segundo plano. Esto puede sonar muy sencillo, pero Android ejecuta un único subproceso (single-threaded) por defecto, así que, si quieres cumplir con las expectativas de tu público, tarde o temprano deberás crear hilos adicionales.

En el artículo anterior de esta serie, hicimos una introducción a RxJava, una librería reactiva para la JVM que puede ayudarte a crear aplicaciones Android que reaccionen a los diferentes datos y eventos a medida que ocurren. Pero también puedes usar esta librería para reaccionar a datos y eventos simultáneamente.

En este post, te mostraré cómo puedes utilizar los operadores de RxJava para finalmente lograr concurrencia en Android, mediante una experiencia indolora. Cuando llegues al final de este artículo, sabrás cómo emplear los operadores de RxJava para crear hilos adicionales, especificar el trabajo que debería realizar cada uno, y luego devolver los resultados al tan importante subproceso principal, el de la UI—todo esto con sólo unas pocas líneas de código.

Además, como ninguna tecnología es perfecta, te contaré también sobre un potencial riesgo de agregar la librería RxJava a tus proyectos—antes de mostrarte cómo utilizar operadores para asegurarte de que este problema nunca ocurra en tus proyectos Android.

Introducción A Los Operadores

RxJava posee una colección enorme de operadores pensados principalmente para ayudarte a modificar, filtrar, combinar y transformar los datos emitidos por tus Observables. Puedes ver la lista completa de operadores RxJava en la documentación oficial, y aunque nadie espera que memorices cada uno de los operadores, vale la pena que inviertas algo de tiempo en leer esta lista, sólo para que tengas una idea aproximada de los diferentes tipos de transformaciones que puedes aplicar a tus datos.

La lista de operadores RxJava ya es de por sí bastante exhaustiva, pero si no puedes hallar el operador perfecto para la transformación de datos que tienes en mente, siempre puedes encadenar múltiples operadores. La aplicación de un operador a un Observable típicamente retorna otro Observable, así que puedes seguir aplicando operadores hasta que obtengas los resultados que deseas.

Existen demasiados operadores RxJava para cubrirlos en un sólo artículo, y la documentación oficial de RxJava ya hace el buen trabajo de presentar todos los operadores que puedes usar para transformación de datos, así que me enfocaré en dos operadores que poseen el mayor potencial para facilitar tu vida como desarrollador Android: subscribeOn() y observeOn().

Multithilos Con Operadores RxJava

Si quieres que tu app brinde la mejor experiencia de usuario posible, necesita ser capaz de ejecutar tareas intensivas o de larga ejecución y ejecutar múltiples tareas simultáneamente, sin bloquear el importante hilo de la UI.

Por ejemplo, imagina que tu app necesita obtener cierta información de dos bases de datos diferentes. Si ejecutas ambas tareas una detrás de la otra en el hilo principal de Android, no sólo tomará una cantidad significativa de tiempo, sino que además la UI no responderá hasta que tu app haya terminado de obtener toda la información de ambas bases de datos. ¡No es exactamente una gran experiencia de usuario!

Una solución mucho mejor es crear dos hilos adicionales donde puedas ejecutar ambas tareas simultáneamente, sin que bloqueen el subproceso principal de la UI. Este enfoque hace que el trabajo se complete al doble de velocidad, y el usuario podrá continuar interactuando con la interfaz de usuario de tu app mientras tanto. Potencialmente, tu usuarios ni siquera noten que tu app está ejecutando trabajo intensivo y de larga duración en segundo plano—toda la información de la base de datos simplemente aparecerá en la UI de tu aplicación, ¡como por arte de magia!

Por defecto, Android provee algunas herramientas que puedes utilizar para crear subprocesos adicionales, incluyendo Services e IntentServices, pero estas soluciones requieren cuidado al momento de ser implementadas, y pueden resultar rápidamente en código complejo y verborrágico. Además, si no implementas multihilos correctamente, puedes ocasionar pérdidas de memoria y todo tipo de errores.

Como si la implementación de multihilos en Android no fuera suficientemente dolorosa, el hilo principal de la UI es el único subproceso capaz de actualizar la interfaz de usuario de tu app. Si deseas actualizar la UI de tu app con el resultado de una tarea ejecutada en cualquier otro subproceso, generalmente deberás crear un Handler en el hilo principal de la UI, y luego utilizar este Handler para transferir datos desde tu hilo en segundo plano hacia el hilo principal. Esto implica más código, mayor complejidad, y más oportunidades para que surjan errores en tu proyecto.

Pero RxJava contiene dos operadores que pueden ayudarte a evitar gran parte de esta complejidad y probabilidad de errores.

Ten en cuenta que estos operadores se utilizan en conjunto con Schedulers, que esencialmente son componentes que te permiten especificar hilos. Por ahora, piensa a un scheduler como sinónimo de hilo.

  • subscribeOn(Scheduler): Por defecto, un Observable emite sus datos en el subproceso donde fue declarada la suscripción, es decir, donde has invocado el método .susbscribe. En Android, generalmente es el hilo principal de la UI. Puedes utilizar el operador subscribeOn() para definir un Scheduler diferente donde el Observable debe ejecutarse y emitir sus datos.
  • observeOn(Scheduler): Puedes usar este operador para redirigir las emisiones de tu Observable a un Scheduler diferente, cambiando efectivamente el subproceso adónde se envían las notificaciones del Observable, y por extensión, el subproceso donde se consumen sus datos.

RxJava trae varios schedulers que puedes usar para crear diferentes hilos, incluyendo:

  • Schedulers.io(): Diseñado para utilizarse en tareas relacionadas con E/S.
  • Schedulers.computation(): Diseñado para ser usado en tareas computacionales. Por defecto, el número de hilos en el scheduler está limitado al número de CPUs disponibles en tu dispositivo.
  • Schedulers.newThread(): Crea un nuevo subproceso.

Ahora que tienes una idea general de todas las partes móviles, echemos un vistazo a algunos ejemplos de cómo usar subscribeOn() y observeOn(), y algunos schedulers en acción.

subscribeOn()

En Android, típicamente utilizarás subscribeOn() y un Scheduler acompañante para cambiar el hilo donde se ejecuta alguna tarea intensiva o de larga duración, así que no hay riesgo de bloquear el hilo de la UI. Por ejemplo, tal vez decidas importar una gran cantidad de datos en el scheduler io() o ejecutar algunos cálculos en el scheduler computation().

En el siguiente código, creamos un nuevo subproceso donde el Observable llevará a cabo sus operaciones y emitirá los valores 1, 2, y 3.

1
Observable.just(1, 2, 3)
2
          .subscribeOn(Schedulers.newThread())
3
          .subscribe(Observer);

Si bien esto es todo lo que necesitas para crear un hilo y comenzar a emitir datos en el mismo, puede que desees algún tipo de confirmación de que este observable realmente está operando en un nuevo hilo. Un método es imprimir el nombre del hilo que tu aplicación está utilizando en el momento, en el Monitor Logcat de Android Studio.

Convenientemente, en el post anterior, Comenzando Con RxJava, creamos una aplicación que envía mensajes al Monitor Logcat en varias etapas durante el ciclo de vida del Observable, así que podemos reutilizar gran parte de este código.

Abre el proyecto que creaste en ese post, y modifica tu código para que utilice el Observable de arriba como su Observable fuente. Luego, añade el operador subscribeOn() y especifica que los mensajes enviados al Logcat deben incluir el nombre del hilo corriente.

Tu proyecto terminado debería verse parecido a esto:

1
import android.support.v7.app.AppCompatActivity;
2
import android.os.Bundle;
3
import android.util.Log;
4
import io.reactivex.Observable;
5
import io.reactivex.Observer;
6
import io.reactivex.disposables.Disposable;
7
import io.reactivex.schedulers.Schedulers;
8
9
public class MainActivity extends AppCompatActivity {
10
  public static final String TAG = "MainActivity";
11
12
  @Override
13
  protected void onCreate(Bundle savedInstanceState) {
14
      super.onCreate(savedInstanceState);
15
      setContentView(R.layout.activity_main);
16
      
17
      Observable.just(1, 2, 3)
18
              .subscribeOn(Schedulers.newThread())
19
              .subscribe(Observer);
20
  }
21
22
  Observer<Integer> Observer = new Observer<Integer>() {
23
24
      @Override
25
      public void onSubscribe(Disposable d) {
26
          Log.e(TAG, "onSubscribe" + Thread.currentThread().getName());
27
      }
28
29
      @Override
30
      public void onNext(Integer value) {
31
          Log.e(TAG, "onNext: " + value + Thread.currentThread().getName());
32
      }
33
34
35
      @Override
36
      public void onError(Throwable e) {
37
          Log.e(TAG, "onError: ");
38
      }
39
      
40
      @Override
41
      public void onComplete() {
42
          Log.e(TAG, "onComplete: All Done!" + Thread.currentThread().getName());
43
      }
44
45
  };
46
47
}

Asegúrate de que el Monitor Logcat de Android Studio esté abierto (seleccionando la pestaña Android Monitor, seguido de Logcat) y luego corre tu proyecto en un dispositivo Android físico o un AVD. Deberías ver lo siguiente en el Monitor Logcat:

Check the thread where your application is currently running in Android Studios Logcat MonitorCheck the thread where your application is currently running in Android Studios Logcat MonitorCheck the thread where your application is currently running in Android Studios Logcat Monitor

Aquí, puedes ver que .subscribe está siendo invocado en el subproceso principal de la UI, pero el observable está operando en un hilo totalmente diferente.

El operador subscribeOn() tendrá el mismo efecto sin importar dónde lo coloques en la cadena de observables; sin embargo, no puedes utilizar múltiples operadores subscribeOn() en la misma cadena. Si incluyes más de un subscribeOn(), tu cadena sólo utilizará el subscribeOn() más cercano al observable fuente.

observeOn()

Al contrario que subscribeOn(), sí importa dónde coloques observeOn() en tu cadena, ya que este operador sólo cambia el hilo que utilizan los observables que aparecen debajo.

Por ejemplo, si insertas lo siguiente en tu cadena, cada observable que aparezca en la misma a partir de este punto utilizará el nuevo hilo.

1
.observeOn(Schedulers.newThread())

Esta cadena continuará ejecutándose en el nuevo subproceso, hasta que encuentre otro operador observeOn(), y en ese punto cambiará al hilo especificado por ese operador. Puedes controlar el hilo donde ciertos observables específicos envían sus notificaciones insertando múltiples operadores observeOn() en tu cadena.

Al desarrollar apps Android, generalmente utilizarás observeOn() para enviar el resultado del trabajo realizado en subprocesos en segundo plano al hilo de la UI de Android. La manera más fácil de redirigir emisiones al hilo de la UI es utilizar el Scheduler AndroidSchedulers.mainThread, que está incluido como parte de la librería RxAndroid, no en la librería RxJava.

La librería RxAndroid incluye bindings específicos de Android para RxJava 2, lo que la convierte en un valorable recurso adicional para los desarrolladores Android (y algo que veremos con mucho más detalle en el próximo post de esta serie).

Para añadir RxAndroid a tu proyecto, abre el archivo build.gradle que está a nivel de módulo y agrega la última versión de la librería a la sección de dependencias. Mientras se escribe este artículo, la versión más reciente de RxAndroid es la 2.0.1, así que yo agregaré lo siguiente:

1
dependencies {
2
  ...
3
  ...
4
  ...
5
  compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
6
}

Luego de integrar esta librería en tu proyecto, puedes especificar que los resultados de un observable se envíen al hilo de la UI de tu app, utilizando una sola línea de código:

1
.observeOn(AndroidSchedulers.mainThread())

Considerando que la comunicación con el subproceso principal de la UI de tu app abarca una página entera de la documentación oficial de Android, esta es una gran mejor que potencialmente puede ahorrarte mucho tiempo al crear aplicaciones Android multihilo.

El Mayor Inconveniente de RxJava

Si bien RxJava tiene mucho que ofrecerles a los desarrolladores Android, ninguna tecnología es perfecta, y RxJava posee un problema importante que potencialmente podría causar crashes en tu app.

Por defecto, RxJava opera con un flujo de trabajo basado en push: los datos son producidos primero por un Observable, y luego se dirigen hacia el Observer asignado. El mayor problema con este flujo de trabajo basado en push es cuán fácil es para el productor (en esta instancia, el Observable) emitir ítems más rápidamente de lo que el consumidor (Observer) puede procesar.

Un Observable muy verborrágico en combinación con un Observer lento puede resultar rápidamente en acumulación de ítems sin consumir, que podrían devorar los recursos del sistema y resultar en una OutOfMemoryException. Este problema se conoce como backpressure o contrapresión.

Si sospechas que tienes un caso de contrapresión en tu app, existen algunas soluciones posibles, incluyendo la utilización de un operador para reducir el número de ítems que se producen.

Creando Períodos de Muestreo Con sample() y throttleFirst()

Si un Observable está emitiendo un gran número de ítems, puede que no sea necesario que el Observer asignado reciba cada uno de ellos.

Si en tu app puedes ignorar algunas de las emisiones de un Observable de manera segura, entonces puedes valerte de algunos operadores que sirven para crear períodos de muestreo, y luego seleccionar a mano valores específicos emitidos durante esos períodos:

  • El Operador sample() chequea la salida del Observable en los intervalos que especifiques, y luego toma el ítem más reciente emitido durante dicho período de muestreo. Por ejemplo, si incluyes .sample(5, SECONDS) en tu proyecto, el Observer recibirá el último valor emitido durante cada intervalo de cinco segundos.
  • El Operador throttleFirst() toma el primer valor emitido durante el período de muestreo. Por ejemplo, si incluyes .throttleFirst(5, SECONDS), el Observer recibirá el primer valor emitido durante cada intervalo de cinco segundos.
Sample OperatorSample OperatorSample Operator

Agrupando Emisiones Con buffer()

Si no puedes saltear emisiones de forma segura, todavía existe la posibilidad de quitar un poco de presión de un Observer  en apuros, agrupando emisiones en paquetes y enviándolos luego en masse. El procesamiento de emisiones en lote es típicamente más eficiente que el procesamiento de múltiples emisiones separadas, así que este enfoque debería mejorar la tasa de consumo.

Puedes crear emisiones agrupadas usando el operador buffer(). Aquí, utilizamos buffer() para agrupar todos los ítems en un período de tres segundos:

1
Observable.range(0, 10)
2
.buffer(3, SECONDS)
3
   .subscribe(System.out::println);
Buffer operatorBuffer operatorBuffer operator

Como alternativa, puedes emplear buffer() para crear un paquete constituido por una cantidad específica de emisiones. Por ejemplo, aquí le pedimos a buffer() que empaquete emisiones en grupos de cuatro:

1
Observable.range(0, 10)
2
   .buffer(4)
3
   .subscribe(System.out::println);

Reemplazando Observables Con Flowables

Un método alternativo de reducir el número de emisiones es reemplazar el Observable causante de problemas por un Flowable.

En RxJava2, el equipo de RxJava decidió separar el Observable estándar en dos tipos: el tipo regular que hemos visto a lo largo de esta serie, y los Flowables.

El funcionamiento de los Flowables es muy similar al de los Observables, pero con una diferencia importante: los Flowables sólo envían tantos ítems como solicite el Observer. Si tienes un Observable que emite más ítems de los que su Observer asignado puede consumir, quizás quieras considerar cambiarlo por un Flowable.

Antes de que puedas empezar a usar Flowables en tus proyectos, necesitas agregar la siguiente sentencia import:

1
import io.reactivex.Flowable;

Luego, puedes crear Flowables usando exactamente las mismas técnicas utilizadas para crear Observables. Por ejemplo, cada una de las siguientes porciones de código creará un Flowable capaz de emitir datos:

1
Flowable<String> flowable = Flowable.fromArray(new String[] {"south", "north", "west", east});
2
...
3
flowable.subscribe()
1
Flowable<Integer> flowable = Flowable.range(0, 20);
2
...
3
flowable.subscribe()

En este punto, te estarás preguntando: ¿por qué usaría Observables si puedo usar directamente Flowables y no preocuparme por la contrapresión? La respuesta es que un Flowable provoca más sobrecarga en el sistema que un Observable regular, así que, en con vistas a crear una app de alto rendimiento, deberías apegarte a los Observables a menos que sospeches que tu app está luchando contra la contrapresión.

Singles

El Flowable no es la única variación de Observable que hallarás en RxJava, ya que la librería también incluye la clase Single.

Los Singles son útiles cuando necesitas simplemente emitir un único valor. En estos escenarios, la creación de un Observable puede parecer una exageración, pero el Single está diseñado para simplemente emitir un único valor y luego completarse, invocando:

  • onSuccess(): El Single emite su único valor.
  • onError(): Si el Single es incapaz de emitir su ítem, devolverá a este método el Throwable resultante.

Un Single invocará sólo uno de estos métodos, e inmediatamente terminará su ejecución.

Veamos un ejemplo de un Single en acción—nuevamente, para ahorrar tiempo reutilizaremos código:

1
import android.os.Bundle;
2
import android.support.v7.app.AppCompatActivity;
3
import android.util.Log;
4
import io.reactivex.Single;
5
import io.reactivex.SingleObserver;
6
import io.reactivex.disposables.Disposable;
7
8
public class MainActivity extends AppCompatActivity {
9
  public static final String TAG = "MainActivity";
10
11
  @Override
12
  protected void onCreate(Bundle savedInstanceState) {
13
      super.onCreate(savedInstanceState);
14
      setContentView(R.layout.activity_main);
15
16
      Single.just("Hello World")
17
              .subscribe(getSingleObserver());
18
  }
19
20
  private SingleObserver<String> getSingleObserver() {
21
      return new SingleObserver<String>() {
22
          @Override
23
          public void onSubscribe(Disposable d) {
24
              Log.e(TAG, "onSubscribe");
25
          }
26
27
          @Override
28
          public void onSuccess(String value) {
29
              Log.e(TAG, " onSuccess : " + value);
30
          }
31
32
          @Override
33
          public void onError(Throwable e) {
34
              Log.e(TAG, "onError: ");
35
          }
36
37
      };
38
39
  }
40
41
}

Ejecuta tu proyecto en un AVD o dispositivo Android físico, y verás la siguiente salida en el Monitor Logcat de Android Studio:

Check the Singles output in Android Studios Logcat MonitorCheck the Singles output in Android Studios Logcat MonitorCheck the Singles output in Android Studios Logcat Monitor

Si cambias de parecer y deseas convertir un Single en un Observable en algún punto, de nuevo RxJava posee todos los operadores que necesitas, incluyendo:

  • mergeWith(): Combina múltiples Singles en un solo Observable.
  • concatWith(): Encadena los ítems emitidos por múltiples Singles, para conformar una emisión de Observable.
  • toObservable(): Convierte un Single en un Observable que emite el ítem originalmente emitido por el Single, y luego se completa.

Resumen

En este post exploramos algunos operadores de RxJava que puedes usar para crear y administrar múltiples subprocesos, sin la complejidad y potencial de errores que tradicionalmente ha acompañado al multithreading en Android. También vimos cómo puedes emplear la librería RxAndroid para comunicarte con el importantísimo hilo de la UI de Android utilizando una sola línea de código, y cómo asegurarte de que la contrapresión no represente un problema en tu aplicación.

Hemos abordado la librería RxAndroid pocas veces a lo largo de esta serie, pero esta librería provee bindings de RxJava específicos para Android, que pueden resultar invaluables al trabajar con RxJava en la plataforma Android, por ende el post final de esta serie analizará la librería RxAndroid con mucho más detalle.

¡Hasta entonces, revisa algunos de nuestros otros posts sobre desarrollo Android!

Advertisement
Did you find this post useful?
Want a weekly email summary?
Subscribe below and we’ll send you a weekly email summary of all new Code tutorials. Never miss out on learning about the next big thing.
Advertisement
Looking for something to help kick start your next project?
Envato Market has a range of items for sale to help get you started.