Advertisement
  1. Code
  2. Android SDK

Concurrencia en RxJava 2

Scroll to top
Read Time: 12 min

() translation by (you can also view the original English article)

Una app multi-hilo tiene dos o más partes que pueden ejecutarse en paralelo. Esto permite a la app hacer mejor uso de los núcleos dentro del CPU del dispositivo. Esto le permite realizar tareas más rápido y lleva a una experiencia más responsiva y suave para el usuario.

Codificar con concurrencia en Java puede ser doloroso, pero gracias a RxJava, es ahora mucho más sencillo de hacer. Con RxJava, solo necesitas declarar el hilo en el cuál quieres que la tarea se ejecute (de manera declarada) en vez de crear y administrar hilos (de manera imperativa).

RxJava hace uso de Schedulers junto con los operadores de concurrencia suscribeOn() y observeOn() para lograr esto. En este tutorial, aprenderás sobre Schedulers, el operador suscribeOn(), el operador observeOn(), y también como sacar ventaja del operador flatMap() para lograr concurrencia. Pero primero, comencemos con Schedulers en RxJava.

Pre-requisitos

Para seguir este tutorial, deberías estar familiarizado con:

Revisa nuestros otros artículos para acelerar los básicos de RxJava y expresiones lambda.

Schedulers en RxJava 2

Los Schedulers en RxJava son usados para ejecutar una unidad de trabajo sobre un hilo. Un Scheduler proporciona una abstracción al mecanismo de hilos de Android y Java. Cuando quieres ejecutar una tarea y haces uso de un Scheduler para ejecutar esa tarea, el Scheduler va a su piscina de hilo (una colección de hilos que están listos para usar) y entonces ejecuta la tarea en un hilo disponible.

También puedes especificar que una tarea debería ejecutarse en un hilo específico. (Hay dos operadores, subscribeOn() y observeOn(), los cuáles pueden ser usados para especificar sobre cuál hilo de la piscina de hilo Scheduler debería ser ejecutada la tarea.)

Como sabes, en Android, los procesos de larga ejecución o intensivos de CPU no deberían ejecutarse en el hilo principal. Si una suscripción por un Observer a un Observable es conducida en el hilo principal, cualquier operador asociado se ejecutará también en el hilo principal. En el caso de una tarea de larga ejecución (ej. realizar una petición de red) o una tarea intensiva de CPU (ej. transformación de imagen), esto bloqueará la UI hasta que la tarea sea terminada, llevando al horrible diálogo ANR (Aplicación No Respondiendo) y el cierre de la aplicación. Estos operadores pueden ser en su lugar cambiados a otro hilo con el operador observeOn().

En la siguiente sección, vamos a explorar los diferentes tipos de Schedulers y sus usos.

Tipos de Schedulers

Aquí están algunos de los tipos de Schedulers disponibles en RxJava y RxAndroid para indicar el tipo de hilo sobre el cuál ejecutar tareas.

  • Schedulers.immediate(): devuelve un Scheduler el cuál ejecuta el trabajo de manera instantánea en el hilo actual. Ten en cuenta que esto bloqueará el hilo actual, así que debería ser usado con precaución.
  • Schedulers.trampoline(): programa tareas en el hilo actual. Estas tareas no son ejecutadas de manera inmediata pero en su lugar son ejecutadas después de que el hilo termina sus tareas actuales. Esto es diferente de Schedulers.immediate() porque en lugar de ejecutar una tarea de forma inmediata, este espera a que se complete la tarea actual.
  • Schedulers.newThread(): dispara un nuevo hilo y devuelve un Scheduler para ejecutar la tarea en el nuevo hilo para cada Observer. Deberías tener cuidado usando esto porque el nuevo hilo no se vuelve a usar después sino que es destruido.
  • Schedulers.computation(): esto nos da un Scheduler que está pensado para trabajo computacional intensivo tal como transformación de imagen, cálculos complejos, etc. Esta operación emplea completamente los núcleos del CPU. Este Scheduler usa un tamaño fijo de piscina de hilo el cuál depende de los núcleos del CPU para uso óptimo. Deberías tener cuidado de no crear más hilos que los núcleos disponibles del CPU porque esto puede reducir el desempeño.
  • Schedulers.io(): crea y devuelve un Scheduler diseñado para trabajo límite-I/O tal como desempeñar llamadas asíncronas de red o leer y escribir a la base de datos. Estas tareas no son intensivas con el CPU o de otro modo hace uso de Schedulers.computation().
  • Schedulers.single(): crea y devuelve un Scheduler y ejecuta varias tareas de manera secuencial en un solo hilo.
  • Schedulers.from(Executor executor): esto creará un Scheduler que ejecutará una tarea o unidad de trabajo en el Executor dado.
  • AndroidSchedulers.mainThread(): esto creará un Scheduler que ejecuta la tarea sobre el hilo principal de aplicación Android. Este tipo de programador es proporcionado por la librería RxAndroid.

El Operador sbscribeOn()

Usando el operador de concurrencia subscribeOn(), especificas que el Scheduler debería realizar la operación en el flujo Observable. Este entonces empujará los valores a los Observers usando el mismo hilo. Ahora veamos un ejemplo práctico:

1
import android.os.Bundle;
2
import android.support.v7.app.AppCompatActivity;
3
import android.util.Log;
4
import io.reactivex.Observable;
5
import io.reactivex.ObservableOnSubscribe;
6
import io.reactivex.disposables.Disposable;
7
import io.reactivex.schedulers.Schedulers;
8
9
public class MainActivity extends AppCompatActivity {
10
11
    private static final String[] STATES = { "Lagos", "Abuja", "Abia",
12
            "Edo", "Enugu", "Niger", "Anambra"};
13
14
    private Disposable mDisposable = null;
15
16
    @Override
17
    protected void onCreate(Bundle savedInstanceState) {
18
        super.onCreate(savedInstanceState);
19
        setContentView(R.layout.activity_main);
20
21
        Observable<String> observable = Observable.create(dataSource())
22
                .subscribeOn(Schedulers.newThread())
23
                .doOnComplete(() -> Log.d("MainActivity", "Complete"));
24
25
        mDisposable = observable.subscribe(s -> {
26
            Log.d("MainActivity", "received " + s + " on thread " + Thread.currentThread().getName());
27
        });
28
    }
29
30
    private ObservableOnSubscribe<String> dataSource() {
31
       return(emitter -> {
32
            for(String state : STATES) {
33
                emitter.onNext(state);
34
                Log.d("MainActivity", "emitting " + state + " on thread " + Thread.currentThread().getName());
35
                Thread.sleep(600);
36
            }
37
            emitter.onComplete();
38
        });
39
    }
40
41
    @Override
42
    protected void onDestroy() {
43
        if (mDisposable != null && !mDisposable.isDisposed()) {
44
            mDisposable.dispose();
45
        }
46
        super.onDestroy();
47
    }
48
}

En el código de arriba, tenemos un ArrayList estático el cuál contiene algunos estados en Nigeria. También tenemos un campo el cuál es de tipo Disposable. Obtenemos la instancia Disposable llamando a Observable.subscribe(), y lo usaremos después cuando llamemos  al método dispose() para liberar cualquier recurso que sea usado. Esto ayuda a prevenir fugas de memoria. Nuestro método dataSource() (el cuál puede devolver datos de una fuente de base de datos remota o local) devolverá ObservableOnSubscribe<T>: este es requerido por nosotros para crear nuestro propio Observable después usando el método Observable.create().

Dentro del método dataSource(), ciclamos a través del arreglo, emitiendo cada elemento al Observers llamando emitter.onNext(). Después de que cada valor es emitido, dormidos el hilo para simular el trabajo intensivo siendo realizado. Finalmente, llamamos al método onComplete() para dar una señal a los Observers a los que terminamos de pasar valores y no deberían esperar ninguno más.

Ahora, nuestro método dataSource() no debería ser ejecutado sobre el hilo principal UI. ¿Pero cómo es especificado esto? En el ejemplo de arriba, proporcionamos Schedulers.newThread() como un argumento a subscribeOn(). Esto significa que la operación dataSource() se ejecutará en un nuevo hilo. Nota también en el ejemplo de arriba, que tenemos solo un Observer. Si tuviéramos  múltiples Observers, cada uno de ellos tendría su propio hilo.

Así que para que podamos ver esto funcionando, nuestro Observer imprime los valores que obtiene en su método onNext() desde el Observable.

Cuando ejecutamos esto y vemos nuestro logcat en Android Studio, puedes ver que las emisiones desde el método dataSource() al del Observer sucedió en el mismo hilo---RxNewThreadScheduler-1---en el cuál los recibió el Observer.

Android Studio logcat result showing execution logs on a single threadAndroid Studio logcat result showing execution logs on a single threadAndroid Studio logcat result showing execution logs on a single thread

Si no especificas el método .subscribeOn() después del método Observable.create(), será ejecutado sobre el hilo actual---que en nuestro caso es el hilo principal, por lo tanto bloqueando la UI de la aplicación.

Android Studio Logcat showing execution logs on main threadAndroid Studio Logcat showing execution logs on main threadAndroid Studio Logcat showing execution logs on main thread

Hay algunos detalles importantes de los que deberías estar consciente con respecto al operador subscribeOn(). Solo deberías tener un subscribeOn() en la cadena Observable; agregar otro en cualquier lugar en la cadena no tendrá efecto en absoluto. El lugar recomendado para poner este operador es tan cercano a la fuente como sea posible por el bien de la claridad. En otras palabras, colócalo primero en la cadena de operador.

1
Observable.create(dataSource())
2
                .subscribeOn(Schedulers.computation()) // this has effect

3
                .subscribeOn(Schedulers.io()) // has no effect 

4
                .doOnNext(s -> {
5
                    saveToCache(s); // executed on Schedulers.computation()

6
                })

El Operador observeOn()

Como vimos, el operador de concurrencia subscribeOn() instruirá al Observable cuál Scheduler usar para empujar emisiones hacia adelante junto con la cadena Observable al Observers.

El trabajo del operador de concurrencia observeOn(), por el otro lado, es cambiar las emisiones subsecuentes a otro hilo o Scheduler. Usamos este operador para controlar sobre qué hilo recibirán los consumidores las emisiones. Veamos un ejemplo práctico.

1
import android.os.Bundle;
2
import android.support.v7.app.AppCompatActivity;
3
import android.util.Log;
4
import android.widget.TextView;
5
import io.reactivex.Observable;
6
import io.reactivex.ObservableOnSubscribe;
7
import io.reactivex.android.schedulers.AndroidSchedulers;
8
import io.reactivex.disposables.Disposable;
9
import io.reactivex.schedulers.Schedulers;
10
11
public class ObserveOnActivity extends AppCompatActivity {
12
13
    private Disposable mDisposable = null;
14
15
    @Override
16
    protected void onCreate(Bundle savedInstanceState) {
17
        super.onCreate(savedInstanceState);
18
        setContentView(R.layout.activity_main);
19
20
        TextView textView = (TextView) findViewById(R.id.tv_main);
21
22
        Observable<String> observable = Observable.create(dataSource())
23
                .subscribeOn(Schedulers.newThread())
24
                .observeOn(AndroidSchedulers.mainThread())
25
                .doOnComplete(() -> Log.d("ObserveOnActivity", "Complete"));
26
27
        mDisposable = observable.subscribe(s -> {
28
           Log.d("ObserveOnActivity", "received " + s + " on thread " + Thread.currentThread().getName());
29
            textView.setText(s);
30
        });
31
    }
32
33
    private ObservableOnSubscribe<String> dataSource() {
34
        return(emitter -> {
35
            Thread.sleep(800);
36
            emitter.onNext("Value");
37
            Log.d("ObserveOnActivity", "dataSource() on thread " + Thread.currentThread().getName());
38
            emitter.onComplete();
39
        });
40
    }
41
// ... 

42
}

En el código de arriba, usamos el operador observeOn() y después pasamos el AndroidSchedulers,mainThread() a este. Lo que hemos hecho es cambiar el hilo desde Schedulers.newThread() al hilo principal Android. Esto es necesario porque queremos actualizar el widget TextView, y solo podemos hacer eso desde el hilo principal de UI. Nota que si no cambias al hilo principal cuando intentas actualizar el widget TextView, la aplicación se cerrará y arrojará un CalledFromWrongThreadException.

A diferencia del operador subscribeOn(), el operador observerOn() puede ser aplicado múltiples veces en la cadena de operador, de ahí cambiando el Scheduler más de una vez.

1
Observable<String> observable = Observable.create(dataSource())
2
                .subscribeOn(Schedulers.newThread())
3
                .observeOn(Schedulers.io())
4
                .doOnNext(s -> {
5
                    saveToCache(s);
6
                    Log.d("ObserveOnActivity", "doOnNext() on thread " + Thread.currentThread().getName());
7
                })
8
                .observeOn(AndroidSchedulers.mainThread())
9
                .doOnComplete(() -> Log.d("ObserveOnActivity", "Complete"));

Este código tiene dos operadores observeOn(). El primero usa el Schedulers.io(), lo cuál significa que el método saveToCache() será ejecutado sobre el hilo Schedulers.io(). Después de eso, este cambia entonces al AndroidSchedulers.mainThread() en donde Observers recibirá las emisiones desde el flujo.

Android Studio logcat result showing logsAndroid Studio logcat result showing logsAndroid Studio logcat result showing logs

Concurrencia Con el Operador flatMap()

El operador flatMap() es otro operador muy poderoso e importante que puede ser usado para lograr concurrencia. La definición de acuerdo a la documentación oficial es como sigue:

Transforma los elementos emitidos por un Observable en Observables, después aplana las emisiones de esos en un solo Observable.

FlatMap operator diagramFlatMap operator diagramFlatMap operator diagram

Echemos un vistazo a un ejemplo práctico que use este operador:

1
    //...

2
    @Override
3
    protected void onCreate(Bundle savedInstanceState) { 
4
        // ...

5
        final String[] states = {"Lagos", "Abuja", "Imo", "Enugu"};
6
        Observable<String> statesObservable = Observable.fromArray(states);
7
        
8
        statesObservable.flatMap(
9
                    s -> Observable.create(getPopulation(s))
10
          ).subscribe(pair -> Log.d("MainActivity", pair.first + " population is " + pair.second));
11
    }
12
13
    private ObservableOnSubscribe<Pair> getPopulation(String state) {
14
        return(emitter -> {
15
            Random r = new Random();
16
            Log.d("MainActivity", "getPopulation() for " + state + " called on " + Thread.currentThread().getName());
17
            emitter.onNext(new Pair(state, r.nextInt(300000 - 10000) + 10000));
18
            emitter.onComplete();
19
        });
20
    }
21
}

Esto imprimirá lo siguiente en el logcat de Android Studio:

1
getPopulation() for Lagos called on main
2
Lagos population is 80362
3
getPopulation() for Abuja called on main
4
Abuja population is 132559
5
getPopulation() for Imo called on main
6
Imo population is 34106
7
getPopulation() for Enugu called on main
8
Enugu population is 220301

Desde el resultado de arriba, puedes ver que los resultados que obtuvimos estaban en el mismo orden que en el arreglo. También, el método getPopulation() para cada estado fue procesado en el mismo hilo---el hilo principal. Esto hace el resultado de salida lento porque fueron procesados de manera secuencial sobre el hilo principal.

Ahora, para que podamos lograr concurrencia con este operador, queremos que el método getPopulation() para cada estado (emisiones desde el statesObservable) sea procesado sobre diferentes hilos. Hacer esto llevará a un procesamiento más rápido. Usaremos el operador flatMap() para hacer esto porque crea un nuevo Observable por cada emisión. Después aplicamos el operador de concurrencia subscribeOn() a cada uno, pasando un Scheduler a este.

1
 statesObservable.flatMap(
2
                s -> Observable.create(getPopulation(s))
3
                .subscribeOn(Schedulers.io())
4
        ).subscribe(pair -> Log.d("MainActivity", pair.first + " population is " + pair.second));

Mientras cada emisión produce un Observable, el trabajo del operador flatMap() es fusionarlos y después enviarlos como un solo flujo.

1
getPopulation() for Lagos called on RxCachedThreadScheduler-1
2
Lagos population is 143965
3
getPopulation() for Abuja called on RxCachedThreadScheduler-2
4
getPopulation() for Enugu called on RxCachedThreadScheduler-4
5
Abuja population is 158363
6
Enugu population is 271420
7
getPopulation() for Imo called on RxCachedThreadScheduler-3
8
Imo population is 81564

En el resultado de arriba, podemos observar que el método getPopulation() de cada método fue procesado en diferentes hilos. Esto hace el procesamiento mucho más rápido, pero también observa que las emisiones del operador flatMap() que fueron recibidas por el Observer no están en el mismo orden que el flujo original de emisiones.

Conclusión

En este tutorial, aprendiste sobre manejar concurrencia usando RxJava2: lo que es, los diferentes Schedulers disponibles, y cómo usar los operadores de concurrencia subscribeOn() y observeOn(). También te mostré cómo usar el operador flatMap() para lograr concurrencia.

Mientras tanto, ¡revisa algunos de nuestros otros cursos y tutoriales sobre el lenguaje Java y desarrollo de aplicaciones Android!

Advertisement
Did you find this post useful?
Want a weekly email summary?
Subscribe below and we’ll send you a weekly email summary of all new Code tutorials. Never miss out on learning about the next big thing.
Advertisement
Looking for something to help kick start your next project?
Envato Market has a range of items for sale to help get you started.