Skip to content

7. La biblioteca RxJava

La biblioteca RxJava se basa en el siguiente concepto: un flujo de elementos de tipo T Observable<T> es observado por uno o varios suscriptores (suscriptores, observadores, consumidores) Subscriber<T>. La biblioteca RxJava permite que el flujo Observable<T> se ejecute en un hilo T1 y su observador Subscriber<T> en un hilo T2 sin que el desarrolladortenga que preocuparse por gestionar el ciclo de vida de estos subprocesos ni por problemas naturalmente complejos, como el intercambio de datos entre subprocesos y su sincronización para ejecutar una tarea global. Por lo tanto, facilita la programación asíncrona.

Un flujo Observable<T> produce elementos de tipo T, observables a medida que se producen. Si el observador y el observable (término que designa al tipo Observable<T> por uso coloquial) se encuentran en el mismo hilo, entonces el observable solo puede producir el elemento (i+1) cuando el observador haya consumido el elemento i. Hay pocos casos en los que esta arquitectura resulte interesante. Si el observador y el observable no se encuentran en el mismo hilo, entonces el observable y su observador tienen comportamientos autónomos: el observable produce a su ritmo y el observador consume a su ritmo. Ahí reside el interés de la biblioteca. Hasta ahora siempre hemos hablado de un observador. En realidad, un observable puede tener cualquier número de observadores.

La biblioteca RxJava se adapta especialmente bien a la arquitectura vista en el apartado 2 de la introducción y que recordamos aquí:

Image

  • en [1], una capa de servicio presta servicios, algunos de los cuales son long de obtener (por ejemplo, solicitudes de red);
  • esta capa de servicios es invocada por una interfaz gráfica [1] (Swing, Android, JavaFx). Si la capa de servicios se ejecuta en el mismo hilo que el método [swing] que la utiliza, la interfaz gráfica se bloquea (no responde) mientras espera el resultado del servicio;
  • en [2], una delgada capa de adaptación implementada con RxJava permite presentar a la capa gráfica una implementación asíncrona del mismo servicio: este puede ejecutarse en un hilo diferente al del método de la capa gráfica que lo invoca. En este caso, la interfaz gráfica [3] sigue siendo reactiva: el usuario puede seguir interactuando con ella, por ejemplo, lanzando una nueva solicitud de red en paralelo a la primera y, sobre todo, se le puede ofrecer la posibilidad de cancelar los procesos que duran demasiado, algo imposible si la interfaz gráfica está congelada;
  • la llamada [4] es síncrona, mientras que la llamada [5-6] es asíncrona;

En esta arquitectura, la capa [2] ofrece servicios que devuelven tipos Observable<T> a los que pueden suscribirse los métodos de la capa gráfica [3]. Un servicio de la capa [2] entrega entonces sus resultados uno a uno y la capa [3] puede reaccionar a cada uno de ellos, por ejemplo, actualizando uno o varios componentes de la interfaz gráfica.

La clase Observable<T> cuenta con varias decenas de métodos. Esta es una de las dificultades de la biblioteca: es muy completa y resulta difícil abarcar todas sus posibilidades. Vamos a presentar algunas de ellas. El dominio de los demás métodos vendrá con el tiempo.

7.1. Crear observables y suscribirse a ellos

7.1.1. Ejemplo-01: el método [Observable.from]

  

Consideremos el siguiente código:


package dvp.rxjava.observables;

import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;

import java.util.Arrays;

public class Exemple01 {
  public static void main(String[] args) {
    // observables de enteros
    Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
    obs1.subscribe(new Action1<Integer>() {
      @Override
      public void call(Integer integer) {
        System.out.printf("next : %s%n", integer);
      }
    }, new Action1<Throwable>() {
      @Override
      public void call(Throwable throwable) {
        System.out.println(throwable);
      }
    }, new Action0() {
      @Override
      public void call() {
        System.out.println("completed");
      }
    });
  }
}
  • línea 12: se crea un tipo Observable<Integer> a partir de una lista de enteros.

La clase Observable<T> es un flujo de elementos de tipo T que se pueden observar, preferiblemente de forma asíncrona, aunque no necesariamente, a medida que se producen. Su definición es la siguiente:

 

Como ya se ha dicho, la clase Observable<T> tiene varias decenas de métodos. Algunos son similares a los de la clase Stream<T> estudiada en el apartado 5. La documentación de RxJava incluye «diagramas de mármol» [2] que ilustran el funcionamiento de estos métodos:

  • la línea 3 ilustra las emisiones del observable a lo largo del tiempo;
  • el método [4] se aplica a los elementos emitidos por el observable. Por lo general, produce un nuevo observable;
  • la línea 5 muestra el nuevo observable obtenido;

El método [Observable.from] tiene la siguiente firma:

 

El método estático [Observable.from] permite crear un Observable<T> a partir de una colección de elementos de tipo T. Es una forma muy sencilla de empezar con los observables. La línea:


    Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));

emitirá, por tanto, tres elementos. No los emite inmediatamente. Los emitirá en su totalidad cada vez que se declare un observador. Esto es lo que se denomina un observable frío. El observable vuelve a emitir sus elementos por cada nuevo suscriptor.

Podemos considerar la instrucción anterior como una acción de configuración del observable. Este se configura una vez y se ejecuta n veces si se presentan n suscriptores.

¿Cómo se suscribe uno?

Una forma de hacerlo es utilizar el método [Observable.subscribe], cuya definición utilizada aquí es la siguiente:

 
  • el primer parámetro [Action1<T> onNext] (véase el apartado 6.2) del método es el método que se ejecutará cuando el observable emita un nuevo elemento T;
  • el segundo parámetro [Action1<Throwable> onError] del método es el método que se debe ejecutar cuando el observable lanza una excepción;
  • el tercer parámetro [Action0 onComplete] (véase el apartado 6.1) del método es el método que se debe ejecutar cuando el observable lanza una excepción;
  • el método devuelve un tipo [Subscription];

El tipo [Subscription] representa una suscripción al observable. Su definición es la siguiente:

 

El interés de esta interfaz [1] reside en su método [2], que permite cancelar una suscripción.

En nuestro ejemplo, el código de la suscripción al observable es el siguiente:


    obs1.subscribe(new Action1<Integer>() {
      @Override
      public void call(Integer integer) {
        System.out.printf("next : %s%n", integer);
      }
    }, new Action1<Throwable>() {
      @Override
      public void call(Throwable throwable) {
        System.out.println(throwable);
      }
    }, new Action0() {
      @Override
      public void call() {
        System.out.println("completed");
      }
});
  • línea 1: se ignora el resultado de tipo [Subscription];
  • líneas 1-15: los tres parámetros son instancias de clases anónimas. También utilizaremos lambdas. La ventaja de las clases anónimas es que se ven claramente los tipos de datos esperados por el único método de estas clases;
  • líneas 2-5: implementación del primer parámetro de tipo [Action1<Integer>];
  • líneas 6-10: implementación del segundo parámetro de tipo [Action1<Throwable>];
  • líneas 11-15: implementación del tercer parámetro de tipo [Action0];

El código completo es el siguiente:


package dvp.rxjava.observables;

import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;

import java.util.Arrays;

public class Exemple01 {
  public static void main(String[] args) {
    // observables de enteros
    Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
    // suscripción
    obs1.subscribe(new Action1<Integer>() {
      @Override
      public void call(Integer integer) {
        System.out.printf("next : %s%n", integer);
      }
    }, new Action1<Throwable>() {
      @Override
      public void call(Throwable throwable) {
        System.out.println(throwable);
      }
    }, new Action0() {
      @Override
      public void call() {
        System.out.println("completed");
      }
    });
  }
}

El observable de la línea 12 comienza a emitir sus 3 elementos tan pronto como se llama al método [subscribe] en la línea 14. A partir de ese momento:

  • con cada elemento emitido, se ejecutan las líneas 15-18.
  • al final de los 3 elementos, se ejecutan las líneas 24-29;
  • las líneas 19-24 nunca se ejecutarán porque el observable no emite aquí ninguna excepción;

Por defecto, el observable y el observador se ejecutan en el mismo hilo. Existen algunos observables predefinidos que se ejecutan en un hilo diferente al hilo principal (en este caso, el hilo del método main), pero para la mayoría de ellos no es así. Por lo tanto, aquí todo ocurre en el hilo del método [main]:

  • el observable emite el elemento 1;
  • las líneas 15-18 se ejecutan y muestran este elemento;
  • el observable emite el elemento 2;
  • se ejecutan las líneas 15-18 y muestran este elemento;
  • el observable emite el elemento 3;
  • se ejecutan las líneas 15-18 y muestran este elemento;
  • el observable emite la notificación [completed];
  • se ejecutan las líneas 24-29;

Esto es lo que muestran los resultados obtenidos:

1
2
3
4
next : 1
next : 2
next : 3
completed

La clase [Exemple02] retoma [Exemple01] utilizando esta vez funciones lambda como parámetros del método [Observable.subscribe]:


package dvp.rxjava.observables;

import java.util.Arrays;

import rx.Observable;

public class Exemple02 {
  public static void main(String[] args) {
    // observables de enteros
    Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
    // suscripción
    obs1.subscribe(
      (integer) -> System.out.printf("next : %s%n", integer),
      (th) -> System.out.println(th),
      () -> System.out.println("completed"));
  }
}

7.1.2. Ejemplo-03: la clase Observer

  

El método [Observable.subscribe], que permite suscribirse a un observable, tiene varias versiones, entre las que se encuentra la siguiente:


package dvp.rxjava.observables;

import java.util.Arrays;

import rx.Observable;
import rx.Observer;

public class Exemple03 {
    public static void main(String[] args) {
        // observables de enteros
        Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
        // suscripción
        obs1.subscribe(new Observer<Integer>() {
            @Override
            public void onCompleted() {
                System.out.println("completed");
            }

            @Override
            public void onError(Throwable th) {
                System.out.printf("throwable %s", th);
            }

            @Override
            public void onNext(Integer integer) {
                System.out.printf("next : %s%n", integer);
            }
        });
    };
}

Línea 13: en lugar de pasar tres parámetros al método [subscribe], se le pasa un tipo [Observer] como sigue:

 

El tipo [Observer] es una interfaz con tres métodos:

  • [onNext(T t)], que se invoca cada vez que el observable emite un elemento t;
  • [onError(Throwable th)], que se invoca cuando el observable lanza una excepción th;
  • [onCompleted], que se invoca cuando el observable indica que ha terminado de emitir;

El funcionamiento del código es similar al explicado anteriormente. Se obtienen los siguientes resultados:

1
2
3
4
next : 1
next : 2
next : 3
completed

7.1.3. Ejemplo-04: el método [Observable.create]

  

El método estático Observable.create se define de la siguiente manera:

 
  • el método [create] devuelve un tipo Observable<T>;
  • el parámetro del método [create] es una función de tipo [Observable.OnSubscribe<T>] definida de la siguiente manera:
 

El tipo [Observable.OnSubscribe<T>] es una interfaz funcional que a su vez extiende la interfaz funcional [Action1<Subscriber<? super T>>]. El método [call] de esta interfaz espera un tipo [Subscriber] (suscriptor, observador) definido de la siguiente manera:

 

En [1] se observa que la clase [Subscriber<T>] implementa la interfaz [Observer<T>] presentada en el apartado 7.1.2.

Finalmente, el método [<T> Observable.create]:

  • espera como parámetro una instancia de tipo [Observable.OnSubscribe<T>] que tenga el único método de firma: void call(Subscriber<T> s). El tipo [Subscriber<T>] extiende el tipo [Observer<T>] y, por lo tanto, dispone de los métodos onNext, onError, onCompleted;
  • devuelve un tipo Observable<T>;

El método [<T> Observable.create] devuelve un observable configurado. Aún no se ha producido ninguna emisión de elementos. Cuando un suscriptor [Subscriber<T> s] se suscribe a este observable, se invoca el método [void call(s)] de la función pasada como parámetro del método [<T> Observable.create]. Su función es emitir elementos t de tipo T y llamar al método [s.onNext(t)] del observador en cada emisión. Cuando este haya finalizado, se debe llamar al método [s.onCompleted(t)] del observador y el método [call] debe finalizar. Si el método [call] encuentra una excepción th, se debe llamar al método [s.onError(th)] del observador y el método [call] debe finalizar;

Para ilustrar este complejo funcionamiento, utilizaremos el siguiente código [Exemple04]:


package dvp.rxjava.observables;

import rx.Observable;
import rx.Subscriber;

import java.util.Random;

public class Exemple04 {
    public static void main(String[] args) {
        // configuración observable de números reales
        Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
            @Override
            public void call(Subscriber<? super Double> subscriber) {
                for (int i = 0; i < 3; i++) {
                    // emisión del elemento i
                    subscriber.onNext(new Random((i + 1)).nextDouble());
                }
                // fin de emisión
                subscriber.onCompleted();
            }
        });
        // suscripción y, por tanto, emisión
        obs1.subscribe((d) -> System.out.printf("onNext %s%n", d), (th) -> System.out.printf("onError %s%n", th),
                () -> System.out.println("onCompleted"));
    }
}
  • línea 11: se crea un observable que emite tipos Double;
  • líneas 11-21: el parámetro del método [create] se instancia con una clase anónima que tiene el único método [call] de las líneas 12-20. El observable creado en la línea 11 está listo para emitir, pero solo emitirá cuando llegue un observador;
  • líneas 13-21: el método [call] recibe la referencia de un observador;
  • líneas 14-17: emisión de 3 elementos al observador;
  • líneas 19: notificación de fin de emisión al observador;
  • líneas 23-24: suscripción al observable de la línea 11. Se implementan los tres parámetros [onNext, onError, onCompleted] del método [subscribe] mediante tres lambdas. Esta suscripción creará el suscriptor [Subscriber<Double>], que se pasará al método [call] de la línea 13. A continuación, comenzará la emisión de elementos;
  • todo ocurre en el mismo hilo: observable y observador;

Se obtienen los siguientes resultados:

1
2
3
4
onNext 0.7308781907032909
onNext 0.7311469360199058
onNext 0.731057369148862
onCompleted

El método [Observable.create] permite crear un observable a partir de cualquier fenómeno. Este es el método que hemos utilizado en el apartado 2 de la introducción, para transformar una interfaz síncrona en una asíncrona.

7.1.4. Ejemplo-05: refactorización de [Exemple-04]

  

El siguiente ejemplo presenta una nueva version del método estático [Observable.subscribe]:


package dvp.rxjava.observables;

import rx.Observable;
import rx.Subscriber;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;

public class Exemple05 {
    public static void main(String[] args) {
        // configuración de un observable de números reales
        Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
            @Override
            public void call(Subscriber<? super Double> subscriber) {
                showInfos("Observable.call start");
                for (int i = 0; i < 3; i++) {
                    // espera
                    try {
                        Thread.sleep(500 - i * 100);
                    } catch (InterruptedException e) {
                        // error
                        subscriber.onError(e);
                    }
                    // acción
                    double value = new Random().nextInt(100) * 1.2;
                    showInfos(String.format("Observable.call onNext(%s)", value));
                    subscriber.onNext(value);
                }
                // fin
                showInfos(String.format("Observable.call onCompleted"));
                subscriber.onCompleted();
            }
        });

        // un suscriptor
        Subscriber<Double> subscriber = new Subscriber<Double>() {
            @Override
            public void onCompleted() {
                showInfos("Subscriber.onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                showInfos(String.format("Subscriber.onError (%s)", e));
            }

            @Override
            public void onNext(Double aDouble) {
                showInfos(String.format("Subscriber.onNext (%s)", aDouble));
            }
        };

        // suscripción
        showInfos("avant souscription");
        obs1.subscribe(subscriber);
        showInfos("après souscription");

    }

    private static void showInfos(String message) {
        System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
                new SimpleDateFormat("ss:SSS").format(new Date()));
    }
}
  • línea 56: la nueva version del método estático [Observable.subscribe] admite como parámetro el tipo [Subscriber] que hemos presentado en el párrafo anterior;
  • líneas 37-52: el suscriptor (suscriptor, observador). Implementa la interfaz Observer con sus tres métodos onNext, onError y onCompleted;
  • líneas 61-64: a partir de ahora nos centraremos en los subprocesos en los que se ejecutan el observable y su observador;
  • línea 62: el nombre del hilo;
  • línea 63: la hora actual expresada en segundos y milisegundos. Esto nos permitirá ver en el tiempo la emisión de elementos por parte del observable y su procesamiento por parte del observador;
  • Este código tiene la misma funcionalidad que el anterior. Simplemente lo hemos refactorizado;

Los resultados obtenidos son los siguientes:

avant souscription ------Thread[main] ---- Time[31:685]
Observable.call start ------Thread[main] ---- Time[31:691]
Observable.call onNext(80.39999999999999) ------Thread[main] ---- Time[32:194]
Subscriber.onNext (80.39999999999999) ------Thread[main] ---- Time[32:195]
Observable.call onNext(73.2) ------Thread[main] ---- Time[32:595]
Subscriber.onNext (73.2) ------Thread[main] ---- Time[32:595]
Observable.call onNext(106.8) ------Thread[main] ---- Time[32:897]
Subscriber.onNext (106.8) ------Thread[main] ---- Time[32:897]
Observable.call onCompleted ------Thread[main] ---- Time[32:898]
Subscriber.onCompleted ------Thread[main] ---- Time[32:898]
après souscription ------Thread[main] ---- Time[32:899]
  • Línea 1 de los resultados: antes de la línea 56 del código, aún no ha ocurrido nada. El observable simplemente se ha configurado;
  • línea 2 de los resultados: la línea 56 del código provoca la llamada al método [call] de la línea 15. Línea 3, se envía el número real 80,39 al observador;
  • línea 4: el observador recibe el número enviado;
  • líneas 5-8: el proceso anterior se repite dos veces;
  • línea 9: el observable envía la notificación de fin de emisión;
  • línea 10: el observador la recibe;
  • línea 11: mostrada por la línea 57 del código;

Vemos, pues, que la única línea 56 de suscripción ha provocado la visualización de las líneas 2-10 de los resultados. Cuando se empieza con la biblioteca RxJava, uno se pregunta cómo se encadenan las cosas entre sí y, en particular, los vínculos que unen al observador y al observable. Aquí vemos que la línea 56, la suscripción al observable,

  • ha provocado la emisión de todos los elementos del observable;
  • que el observable y el observador se ejecutan en el mismo hilo;
  • que, debido a ello, se observa la secuencia: emisión del elemento i, observación del elemento i, emisión del elemento (i+1), observación del elemento (i+1), ...

Recordemos que el emisor esperaba antes de emitir sus elementos:


                    // en espera
                    try {
                        Thread.sleep(500 - i * 100);
                    } catch (InterruptedException e) {
                        // error
                        subscriber.onError(e);
}

donde i en la línea 3 representa el número de emisión (0<=i<3). Si observamos las horas de emisión de los elementos del observable:

  • líneas 2, 3: el elemento 0 se emitió aproximadamente 500 ms después del inicio de la suscripción;
  • líneas 3 y 5: el elemento 1 se emitió aproximadamente 400 ms después del elemento 0;
  • líneas 5, 7: el elemento 2 se emitió aproximadamente 300 ms después del elemento 1;

7.2. Hilo de ejecución, hilo de observación

7.2.1. Ejemplo-06: observable y observador en un hilo distinto de [main]

  

Reestructuramos el ejemplo anterior de la siguiente manera [Exemple06]:


package dvp.rxjava.observables;

import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;

public class Exemple06 {
    public static void main(String[] args) {

        // guardia de barrera
        CountDownLatch latch = new CountDownLatch(1);

        // configuración de un observable de valores reales
        Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
            @Override
            public void call(Subscriber<? super Double> subscriber) {
                showInfos("Observable.call start");
                for (int i = 0; i < 3; i++) {
                    // en espera
                    try {
                        Thread.sleep(500 - i * 100);
                    } catch (InterruptedException e) {
                        // error
                        subscriber.onError(e);
                    }
                    // acción
                    double value = new Random().nextInt(100) * 1.2;
                    showInfos(String.format("Observable.call onNext(%s)", value));
                    subscriber.onNext(value);
                }
                // fin
                showInfos(String.format("Observable.call onCompleted"));
                subscriber.onCompleted();
            }
        });

        // un suscriptor
        Subscriber<Double> subscriber = new Subscriber<Double>() {
            @Override
            public void onCompleted() {
                showInfos("Subscriber.onCompleted");
                // se baja la barrera
                latch.countDown();
            }

            @Override
            public void onError(Throwable e) {
                showInfos(String.format("Subscriber.onError (%s)", e));
            }

            @Override
            public void onNext(Double aDouble) {
                showInfos(String.format("Subscriber.onNext (%s)", aDouble));
            }
        };

        // continuación de la configuración observable
        obs1 = obs1.subscribeOn(Schedulers.computation());
        // suscripción
        showInfos("avant souscription");
        obs1.subscribe(subscriber);
        // espera delante de la barrera
        try {
            showInfos("début attente barrière");
            latch.await();
            showInfos("fin attente barrière");
        } catch (InterruptedException e1) {
            System.out.println(e1);
        }
        showInfos("après souscription");

    }

    private static void showInfos(String message) {
        System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
                new SimpleDateFormat("ss:SSS").format(new Date()));
    }
}
  • línea 16: creamos una barrera (semáforo) con un objeto de tipo [CountDownLatch]. Este objeto sirve para sincronizar subprocesos entre sí. Aquí se inicializa con el valor 1, que llamaremos valor de la barrera (o del semáforo). Un subproceso se pone en espera de la barrera mediante una operación:

latch.await();

El hilo queda bloqueado si el valor del guardabarros es >0. Un hilo puede aumentar o disminuir el valor interno del guardabarros. En la línea 48, el valor del guardabarros se decrementa en 1.

  • Línea 63: el observable se configura para ejecutarse en un hilo proporcionado por el programador [Schedulers.computation()]. Este programador puede proporcionar tantos hilos como núcleos haya en la máquina de ejecución. El apartado de la aplicación de ejemplo ha mostrado el uso de otros programadores (véase el apartado 2.8);

El principio del código es el siguiente:

  • el método [main] se ejecuta en el hilo principal (main);
  • línea 66: inicia la emisión de elementos del observable. Estos se emitirán en un subproceso distinto del subproceso principal;
  • línea 70: el hilo principal se bloquea porque el guardabarros tiene el valor 1 (véase la línea 16). Solo podrá continuar cuando este valor pase a 0. Esto ocurre en la línea 48. Es el observador quien baja la barrera cuando recibe la notificación de que el observable ha terminado sus emisiones;

La ejecución da los siguientes resultados:

avant souscription ------Thread[main] ---- Time[09:268]
Observable.call start ------Thread[RxComputationThreadPool-1] ---- Time[09:278]
début attente barrière ------Thread[main] ---- Time[09:278]
Observable.call onNext(44.4) ------Thread[RxComputationThreadPool-1] ---- Time[09:783]
Subscriber.onNext (44.4) ------Thread[RxComputationThreadPool-1] ---- Time[09:783]
Observable.call onNext(18.0) ------Thread[RxComputationThreadPool-1] ---- Time[10:183]
Subscriber.onNext (18.0) ------Thread[RxComputationThreadPool-1] ---- Time[10:184]
Observable.call onNext(54.0) ------Thread[RxComputationThreadPool-1] ---- Time[10:486]
Subscriber.onNext (54.0) ------Thread[RxComputationThreadPool-1] ---- Time[10:488]
Observable.call onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[10:489]
Subscriber.onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[10:490]
fin attente barrière ------Thread[main] ---- Time[10:491]
après souscription ------Thread[main] ---- Time[10:493]
  • línea 1: se va a realizar la suscripción;
  • línea 2: esto desencadena la ejecución del método [call] en el hilo [RxComputationThreadPool-1]. Ahora tenemos una ejecución paralela con dos hilos;
  • línea 3: por una razón no aclarada, el hilo [RxComputationThreadPool-1] ha cedido el control. El subproceso [main] toma entonces el control y queda bloqueado por el guardabarros (línea 70 del código). A partir de este momento, solo el subproceso [RxComputationThreadPool-1] puede operar;
  • líneas 4-11: se observa el comportamiento observado anteriormente entre el observable y su observador, pero ahora todo ocurre en el hilo [RxComputationThreadPool-1];
  • líneas 12-13: el observador ha bajado la barrera (línea 48 del código) y el hilo [RxComputationThreadPool-1] ha finalizado. El hilo [main] toma el control y muestra dos mensajes;

7.2.2. Ejemplo-07: observable y observador en dos subprocesos diferentes

  

Modificamos el ejemplo anterior de la siguiente manera:


package dvp.rxjava.observables;

import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;

public class Exemple07 {
    public static void main(String[] args) {

        // guardia de barrera
        CountDownLatch latch = new CountDownLatch(1);

        // configuración de un observable de enteros
        Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
            @Override
            public void call(Subscriber<? super Double> subscriber) {
                showInfos("Observable.call start");
                for (int i = 0; i < 3; i++) {
                    // espera
                    try {
                        Thread.sleep(500 - i * 100);
                    } catch (InterruptedException e) {
                        // error
                        subscriber.onError(e);
                    }
                    // acción
                    double value = new Random().nextInt(100) * 1.2;
                    showInfos(String.format("Observable.call onNext(%s)", value));
                    subscriber.onNext(value);
                }
                // fin
                showInfos(String.format("Observable.call onCompleted"));
                subscriber.onCompleted();
            }
        });

        // un suscriptor
        Subscriber<Double> subscriber = new Subscriber<Double>() {
            @Override
            public void onCompleted() {
                showInfos("Subscriber.onCompleted");
                // se baja la barrera
                latch.countDown();
            }

            @Override
            public void onError(Throwable e) {
                showInfos(String.format("Subscriber.onError (%s)", e));
            }

            @Override
            public void onNext(Double aDouble) {
                showInfos(String.format("Subscriber.onNext (%s)", aDouble));
            }
        };

        // continuación de la configuración observable
        obs1 = obs1.subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation());
        // suscripción
        showInfos("avant souscription");
        obs1.subscribe(subscriber);
        // esperando que se supere la barrera
        try {
            showInfos("début attente barrière");
            latch.await();
            showInfos("fin attente barrière");
        } catch (InterruptedException e1) {
            System.out.println(e1);
        }
        showInfos("après souscription");

    }

    private static void showInfos(String message) {
        System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
                new SimpleDateFormat("ss:SSS").format(new Date()));
    }
}

El código es idéntico al del ejemplo anterior, salvo por la línea 63:


obs1 = obs1.subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation());

que configura el observable (subscribeOn) y el observador (observeOn) para que se ejecuten en uno de los subprocesos proporcionados por el programador [Schedulers.computation()].

Los resultados obtenidos son los siguientes:

avant souscription ------Thread[main] ---- Time[09:643]
début attente barrière ------Thread[main] ---- Time[09:656]
Observable.call start ------Thread[RxComputationThreadPool-4] ---- Time[09:656]
Observable.call onNext(39.6) ------Thread[RxComputationThreadPool-4] ---- Time[10:162]
Subscriber.onNext (39.6) ------Thread[RxComputationThreadPool-3] ---- Time[10:163]
Observable.call onNext(98.39999999999999) ------Thread[RxComputationThreadPool-4] ---- Time[10:562]
Subscriber.onNext (98.39999999999999) ------Thread[RxComputationThreadPool-3] ---- Time[10:564]
Observable.call onNext(46.8) ------Thread[RxComputationThreadPool-4] ---- Time[10:864]
Observable.call onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[10:866]
Subscriber.onNext (46.8) ------Thread[RxComputationThreadPool-3] ---- Time[10:866]
Subscriber.onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[10:868]
fin attente barrière ------Thread[main] ---- Time[10:869]
après souscription ------Thread[main] ---- Time[10:870]

Cabe destacar lo siguiente:

  • el observable se ejecuta en el hilo [RxComputationThreadPool-4] (líneas 3-4, 6, 8-9);
  • el observador se ejecuta en el hilo [RxComputationThreadPool-3] (líneas 5, 7, 10-11);
  • que se ejecutan de forma autónoma. Así, en las líneas 8-9, el observable emite 2 notificaciones (onNext, onCompleted) antes de que el observador recupere la notificación [onNext] (línea 10);

La biblioteca RxJava se encarga del paso de datos (las emisiones) del hilo del observable al hilo del observador. El desarrollador no tiene que preocuparse por ello.

Hemos visto cómo crear observables (Observable.from, Observable.create). Veamos ahora los observables predefinidos de la biblioteca RxJava.

7.3. Observables predefinidos

7.3.1. Ejemplo-08: el método [Observable.range]

 

A partir de ahora, utilizaremos clases específicas para los procesos observados y sus observadores. La idea es poder registrar su nombre, su hilo de ejecución y las horas de ejecución para poder realizar un seguimiento de estas a lo largo del tiempo.

La clase [Process] será simplemente un Observable al que se le puede asignar un nombre. Implementará la siguiente interfaz [IProcess]:


package dvp.rxjava.observables.utils;

import rx.Observable;

public interface IProcess<T> {

    // nombre de la observable
    public String getName();

    // observable
    public Observable<T> getObservable();

}

Esta interfaz podrá ser implementada por la siguiente clase [Process<T>]:


package dvp.rxjava.observables.utils;

import rx.Observable;
import rx.Scheduler;

public class Process<T> implements IProcess<T>{

    // nombre de la variable observable
    protected String name;
    // proceso observado
    protected Observable<T> observable;

    // constructores
    public Process(String name, Observable<T> observable) {
        // inicializaciones locales
        this.name = name;
        this.observable = observable;
    }

    // getters y setters
    public String getName() {
        return name;
    }

    public Observable<T> getObservable() {
        return observable;
    }

}
  • línea 9: el nombre del proceso;
  • línea 11: la variable observada;
  • líneas 14-18: el constructor;

El observador se describirá mediante la siguiente clase [Observateur]:


package dvp.rxjava.observables.utils;

import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import rx.Subscriber;

public class Observateur<T> extends Subscriber<T> {

...
}
  • línea 11, la clase Observateur<T> extiende la clase Subscriber<T> que hemos presentado brevemente en el apartado 7.1.3. La utilizaremos como argumento del método [Observable.subscribe]:

// ejecución observable (observación)
obs1.subscribe(observateur);

El método [Observable.subscribe] utilizado en la línea 2 anterior tiene la siguiente definición:

 

La función del [Subscriber] es principalmente gestionar los elementos emitidos por el observable al que se ha suscrito mediante los métodos de la interfaz [Observer]: onNext, onError, onCompleted. La clase [Subscriber] tiene los siguientes métodos:

 

En el código de la clase [Observateur], utilizaremos el método [1] isUnsubscribed para saber si la suscripción del suscriptor ha sido cancelada o no. La clase [Observateur<T>] completa es la siguiente:


package dvp.rxjava.observables.utils;

import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import rx.Subscriber;

public class Observateur<T> extends Subscriber<T> {

    // un guardabarros (semáforo)
    private CountDownLatch latch;
    // un método de visualización
    private Consumer<String> showInfos;
    // el nombre del observador
    private String observerName;
    // el nombre del proceso observado
    private String processName;

    // constructores
    public Observateur() {

    }

    public Observateur(String name, CountDownLatch latch, Consumer<String> showInfos, String observedName) {
        this.observerName = name;
        this.latch = latch;
        this.showInfos = showInfos;
        this.processName = observedName;
    }

    // --------------------------- implementación de la interfaz Observer<T>
    @Override
    public void onCompleted() {
        // fin de las emisiones
        if (!isUnsubscribed()) {
            showInfos.accept(String.format("Subscriber [%s,%s].onCompleted", observerName, processName));
        }
        // fin bloqueo del hilo principal
        latch.countDown();
    }

    @Override
    public void onError(Throwable e) {
        // error de emisión
        if (!isUnsubscribed()) {
            showInfos.accept(String.format("Subscriber [%s, %s].onError (%s)", observerName, processName, e));
        }
    }

    @Override
    public void onNext(T value) {
        // una emisión adicional
        if (!isUnsubscribed()) {
            try {
                showInfos.accept(String.format("Subscriber [%s,%s] : onNext (%s)", observerName, processName,
                        new ObjectMapper().writeValueAsString(value)));
            } catch (JsonProcessingException e) {
                showInfos.accept(String.format("Subscriber [%s,%s].onNext (%s)", observerName, processName, e));
            }
        }
    }
}
  • Además de las características de un Subscriber, el observador Observateur llevará consigo la siguiente información:
    • línea 14: un guardabarrieras o semáforo que servirá para bloquear el hilo principal hasta que el observador haya recibido todos los elementos emitidos por el observable. Esto se hará en la línea 36 del código cuando el observador reciba del observable la notificación de fin de emisión;
    • línea 16: una instancia Consumer<String> que servirá para mostrar un mensaje en la consola;
    • línea 18: el nombre del observador para distinguirlos entre sí cuando hay varios;
    • línea 20: el nombre del proceso observado;
  • líneas 36, 46, 54: los métodos [onCompleted, onError, onNext] de la interfaz [Observer<T>] implementada por la clase abstracta [Subscriber<T>]. Esta clase no los implementa. Por lo tanto, hay que hacerlo en sus clases hijas. Antes de hacer nada en estos métodos, se comprueba si el observador no se ha dado de baja del observable que observa;
  • línea 59: el método [onNext] del observador escribe la cadena jSON del elemento recibido. Esto nos permitirá mostrar diversos tipos de elementos;

Dicho esto, estudiemos un nuevo método de la clase Observable, el método [range]:

 

El observable Observable.range(n,m) emite (m) enteros que van de n a n+m-1. Lo estudiamos con el siguiente código [Exemple08]:


package dvp.rxjava.observables.exemples;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;

import dvp.rxjava.observables.utils.Observateur;
import rx.Observable;
import rx.schedulers.Schedulers;

public class Exemple08 {
    public static void main(String[] args) throws InterruptedException {

        // número de observadores
        final int nbObservateurs = 2;

        // semáforo
        CountDownLatch latch = new CountDownLatch(nbObservateurs);

        // configuración observable
        Observable<Integer> obs1 = Observable.range(15, 3).subscribeOn(Schedulers.computation());
        // ejecución observable (observación)
        showInfos.accept("main : début observation");
        for (int i = 0; i < nbObservateurs; i++) {
            obs1.subscribe(new Observateur<>(String.format("observateur[%d]", i), latch, showInfos,"obs1"));
        }
        // espera
        showInfos.accept("main : attente fin observation");
        latch.await();
        // fin
        showInfos.accept("main : fin observation");
    }

    // visualizaciones
    static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
            Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
  • línea 16: vamos a utilizar dos observadores;
  • línea 19: el guardabarros (semáforo) se inicializa en dos porque vamos a colocar cada observador en un hilo diferente. Por lo tanto, el hilo principal deberá esperar a que finalicen los dos hilos de observación;
  • línea 22: configuramos el observable de tal manera que se ejecute en un hilo del programador [Schedulers.computation()]. El observador estará en el mismo hilo que el observable;
  • líneas 25-27: se suscriben dos observadores al observable. Esto provocará la ejecución completa de este para cada uno de los observadores: se emitirán los enteros 15, 16 y 17;
  • línea 30: el hilo principal espera a que terminen los observadores;

Los resultados obtenidos son los siguientes:

main : début observation ------Thread[main] ---- Time[27:875]
main : attente fin observation ------Thread[main] ---- Time[27:893]
Subscriber[observateur[1],obs1] : onNext (15) ------Thread[RxComputationThreadPool-2] ---- Time[28:245]
Subscriber[observateur[0],obs1] : onNext (15) ------Thread[RxComputationThreadPool-1] ---- Time[28:245]
Subscriber[observateur[1],obs1] : onNext (16) ------Thread[RxComputationThreadPool-2] ---- Time[28:247]
Subscriber[observateur[0],obs1] : onNext (16) ------Thread[RxComputationThreadPool-1] ---- Time[28:248]
Subscriber[observateur[1],obs1] : onNext (17) ------Thread[RxComputationThreadPool-2] ---- Time[28:249]
Subscriber[observateur[1],obs1].onCompleted ------Thread[RxComputationThreadPool-2] ---- Time[28:250]
Subscriber[observateur[0],obs1] : onNext (17) ------Thread[RxComputationThreadPool-1] ---- Time[28:251]
Subscriber[observateur[0],obs1].onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[28:252]
main : fin observation ------Thread[main] ---- Time[28:252]
  • línea 2: el hilo principal está bloqueado a la espera de que finalicen los dos observadores;
  • líneas 3-4: se observa que el observador 0 está en el hilo [RxComputationThreadPool-1] y el observador 1 en el hilo [RxComputationThreadPool-2];
  • líneas 3-10: vemos que los dos observadores reciben exactamente los mismos elementos;

Vamos a utilizar la clase Observateur así definida para ilustrar el comportamiento de otros tipos de observables.

7.3.2. Ejemplo-09: los métodos Observable.[interval, take, doNext]

  
 

Este ejemplo ilustra el uso del observable Observable.interval (long interval, TimeUnit unit), que emite enteros largos a intervalos de tiempo regulares. Cabe destacar el punto [1]: por defecto, el observable [Observable.interval] se ejecuta en uno de los subprocesos del programador [Schedulers.computation].

El código será el siguiente:


package dvp.rxjava.observables.exemples;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import dvp.rxjava.observables.utils.Observateur;
import rx.Observable;

public class Exemple09 {
    public static void main(String[] args) throws InterruptedException {

        // número de observadores
        final int nbObservateurs = 2;

        // semáforo
        CountDownLatch latch = new CountDownLatch(nbObservateurs);

        // configuración observable
        Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
                .doOnNext(l -> showInfos.accept(l.toString()));
        // ejecución observable (observación)
        showInfos.accept("main : début observation");
        for (int i = 0; i < nbObservateurs; i++) {
            obs1.subscribe(new Observateur<>(String.format("observateur [%d]", i), latch, showInfos,
                    "obs1"));
        }
        // espera
        showInfos.accept("main : attente fin observation");
        latch.await();
        // fin
        showInfos.accept("main : fin observation");
    }

    // visualizaciones
    static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
            Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
  • línea 22: el observable emite enteros largos cada 500 milisegundos. La serie comienza con el número 0;
  • línea 22: este observable emite un número infinito de valores. El método [Observable.take(n)] crea un nuevo observable que solo conserva los primeros n elementos emitidos;
 

Volvamos al código del observable:


Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.doOnNext(l -> showInfos.accept(l.toString()));

Línea 2: el método [Observable.doOnNext] se ejecuta cada vez que el observable emite un nuevo elemento. Se utiliza a menudo para registrar información. En este caso, queremos registrar la fecha de emisión de los elementos para comprobar si se cumple el intervalo de 500 milisegundos. El método [Observable.doOnNext] no modifica el observable al que se aplica. Su definición es la siguiente:

 

La ejecución da los siguientes resultados:

main : début observation ------Thread[main] ---- Time[55:892]
main : attente fin observation ------Thread[main] ---- Time[55:911]
0 ------Thread[RxComputationThreadPool-1] ---- Time[56:412]
0 ------Thread[RxComputationThreadPool-2] ---- Time[56:413]
Subscriber[observateur [1],obs1] : onNext (0) ------Thread[RxComputationThreadPool-2] ---- Time[56:723]
Subscriber[observateur [0],obs1] : onNext (0) ------Thread[RxComputationThreadPool-1] ---- Time[56:723]
1 ------Thread[RxComputationThreadPool-1] ---- Time[56:906]
Subscriber[observateur [0],obs1] : onNext (1) ------Thread[RxComputationThreadPool-1] ---- Time[56:908]
1 ------Thread[RxComputationThreadPool-2] ---- Time[56:912]
Subscriber[observateur [1],obs1] : onNext (1) ------Thread[RxComputationThreadPool-2] ---- Time[56:914]
2 ------Thread[RxComputationThreadPool-1] ---- Time[57:405]
Subscriber[observateur [0],obs1] : onNext (2) ------Thread[RxComputationThreadPool-1] ---- Time[57:407]
Subscriber[observateur [0],obs1].onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[57:408]
2 ------Thread[RxComputationThreadPool-2] ---- Time[57:412]
Subscriber[observateur [1],obs1] : onNext (2) ------Thread[RxComputationThreadPool-2] ---- Time[57:414]
Subscriber[observateur [1],obs1].onCompleted ------Thread[RxComputationThreadPool-2] ---- Time[57:415]
main : fin observation ------Thread[main] ---- Time[57:416]
  • líneas 3, 7 y 11: se observa que, aproximadamente, el intervalo de emisión es cercano a los 500 ms;
  • Los dos observadores se ejecutan, por supuesto, en dos subprocesos diferentes, a pesar de que el observable no se había configurado para ejecutarse con un programador concreto. Lo que vemos aquí es el funcionamiento por defecto del observable [Observable.interval];

7.3.3. Ejemplos-10/12: los métodos Observable.[error, empty, never]

 

A partir de ahora seremos más concisos en nuestras ilustraciones de los métodos de la clase [Observable]. El código anterior era el siguiente:


package dvp.rxjava.observables;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import rx.Observable;

public class Exemple09 {
    public static void main(String[] args) throws InterruptedException {

        // número de observadores
        final int nbObservateurs = 2;

        // semáforo
        CountDownLatch latch = new CountDownLatch(nbObservateurs);

        // configuración observable
        Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
                .doOnNext(l -> showInfos.accept(l.toString()));
        // ejecución observable (observación)
        showInfos.accept("main : début observation");
        for (int i = 0; i < nbObservateurs; i++) {
            obs1.subscribe(new Observateur<>(String.format("observateur [%d]", i), latch, showInfos,
                    "obs1"));
        }
        // espera
        showInfos.accept("main : attente fin observation");
        latch.await();
        // fin
        showInfos.accept("main : fin observation");
    }

    // visualizaciones
    static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
            Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}

Este código ya se había utilizado en el ejemplo anterior. Solo cambiaban las líneas 21-22. Por lo tanto, vamos a factorizar la mayor parte de este código en la siguiente clase [ProcessUtils]:


package dvp.rxjava.observables.utils;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;

import rx.Observable;

public class ProcessUtils {

    @SafeVarargs
    public static void subscribe(int nbObservateurs, IProcess<?>... processes) throws InterruptedException {

        // semáforo
        CountDownLatch latch = new CountDownLatch(nbObservateurs * processes.length);

        // ejecución observable (observación)
        showInfos.accept("main : début observation");
        for (int i = 0; i < nbObservateurs; i++) {
            for (IProcess<?> process : processes) {
                Observable<?> obs = process.getObservable();
                obs.subscribe(new Observateur<>(String.format("observateur[%d]", i), latch, showInfos, process.getName()));
            }
        }
        // espera
        showInfos.accept("main : attente fin observation");
        latch.await();
        // fin
        showInfos.accept("main : fin observation");
    }

    // visualizaciones
    static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
            Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
  • línea 13: el método admite dos parámetros:
    • nbObservateurs: el número de observadores de los procesos pasados como segundo parámetro;
    • processes: los procesos (observables con nombre) que se van a observar. Gracias a la notación [IProcess<?>], los procesos podrán emitir elementos de diferentes tipos;
  • línea 16: el semáforo debe pasar a verde cuando todos los observadores hayan completado todas sus observaciones. El valor inicial del semáforo es, por tanto, el número de observadores multiplicado por el número de observaciones;
  • líneas 20-25: se suscribe a cada observador a todos los procesos que debe observar;
  • línea 23: se recupera el observable del proceso (véase el apartado 7.3.1);
  • línea 23: se suscribe un observador a ella. Se le pasan cuatro datos:
    • su nombre;
    • el semáforo que debe decrementar cuando reciba la notificación de fin de emisión de la observable que observa;
    • el método que debe utilizar cuando quiera registrar información en la consola;
    • el nombre del proceso que va a observar;

Una vez definidas estas clases, el ejemplo 10 será el siguiente:


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;

public class Exemple10 {
    public static void main(String[] args) throws InterruptedException {
        // configuración observable
        Observable<?> obs = Observable.error(new RuntimeException("Erreur !!!")).subscribeOn(Schedulers.computation());
        // ejecución (observación) observable
        ProcessUtils.subscribe(2,new Process<>("process1", obs));
    }
}

En la línea 11, el método estático [Observable.error] se define de la siguiente manera:

 

La línea 8 configura, por tanto, un observable que se limita a lanzar una excepción dirigida al método [onError] de sus suscriptores. La ejecución da los siguientes resultados:


main : début observation ------Thread[main] ---- Time[22:618]
main : attente fin observation ------Thread[main] ---- Time[22:636]
Subscriber[observateur[1], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-2] ---- Time[22:638]
Subscriber[observateur[0], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-1] ---- Time[22:638]

Líneas 3 y 4: el método [onError] de ambos suscriptores ha recibido la excepción lanzada por el observable.

Esta ejecución tiene una particularidad: los métodos [onCompleted] de los dos observadores no se han llamado. Por lo tanto, la barrera no se ha bajado y el hilo principal permanece bloqueado en el método estático [ProcessUtils.subscribe] en la siguiente línea 3:


// espera
showInfos.accept("main : attente fin observation");
latch.await();
// fin
showInfos.accept("main : fin observation");

Aquí vemos que, en caso de error del observable, no se invoca el método [onCompleted] de los suscriptores. Por lo tanto, modificamos el método [Observateur.onError] de la siguiente manera:


    @Override
    public void onError(Throwable e) {
        // error de emisión
        if (!isUnsubscribed()) {
            showInfos.accept(String.format("Subscriber[%s, %s].onError (%s)", observerName, processName, e));
        }
        // fin bloqueo del hilo principal
        latch.countDown();
}

Añadimos las líneas 7-8 para eliminar la barrera en caso de error del observable. Con este nuevo código, la ejecución da los siguientes resultados:


main : début observation ------Thread[main] ---- Time[40:750]
main : attente fin observation ------Thread[main] ---- Time[40:764]
Subscriber[observateur[0], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-1] ---- Time[40:766]
Subscriber[observateur[1], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-2] ---- Time[40:766]
main : fin observation ------Thread[main] ---- Time[40:767]

Obtenemos la línea 5, que no teníamos anteriormente.

El ejemplo 11 será el siguiente:


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;

public class Exemple11 {
    public static void main(String[] args) throws InterruptedException {
        // configuración observable
        Observable<?> obs1 = Observable.empty();
        // ejecución (observación) observable
        ProcessUtils.subscribe(2,new Process<>("process1",obs1));
    }
}

Línea 10, el método estático [Observable.empty] crea un observable que no emite ningún elemento. Solo emite la notificación de fin de emisión;

 

La ejecución del código del ejemplo anterior da los siguientes resultados:

1
2
3
4
5
main : début observation ------Thread[main] ---- Time[37:073]
Subscriber[observateur[0],process1].onCompleted ------Thread[main] ---- Time[37:086]
Subscriber[observateur[1],process1].onCompleted ------Thread[main] ---- Time[37:086]
main : attente fin observation ------Thread[main] ---- Time[37:087]
main : fin observation ------Thread[main] ---- Time[37:087]
  • líneas 2 y 3: se observa que los dos observadores reciben la notificación de fin de emisión sin haber recibido elementos anteriormente.

Cabe preguntarse para qué puede servir este método. Se puede utilizar de forma análoga a una colección, vacía al principio, en la que luego se acumulan elementos:

1
2
3
4
Observable obs=Observable.empty() ;
for(Observable o : observables){
    obs=obs.mergeWith(o) ;
}

En la línea 3, se fusiona el observable inicial obs (línea 1) con otros observables.

El ejemplo 12 ilustra el método estático [Observable.never]:


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;

public class Exemple12 {
    public static void main(String[] args) throws InterruptedException {
        // configuración observable
        Observable<?> obs1 = Observable.never();
        // ejecución (observación) observable
        ProcessUtils.subscribe(2,new Process<>("process1",obs1));
    }
}

El método estático [Observable.never] crea un observable que nunca emite:

 

La ejecución del ejemplo da los siguientes resultados:

main : début observation ------Thread[main] ---- Time[27:018]
main : attente fin observation ------Thread[main] ---- Time[27:030]

Línea 2: el hilo principal espera indefinidamente. De hecho, ningún observable emite la notificación [onCompleted] que permite que el semáforo (barrera) pase a verde (bajar la barrera).

7.4. Multi-threading

7.4.1. Ejemplo 13: hilo de acción, hilo de observación

En el apartado 7.1.3 creamos un observable con el método estático [Observable.create]:

 
  • el método [create] devuelve un tipo Observable<T>;
  • el parámetro del método [create] es una función de tipo [Observable.OnSubscribe<T>] definida de la siguiente manera:
 

El tipo [Observable.OnSubscribe<T>] es una interfaz funcional que a su vez extiende la interfaz funcional [Action1<Subscriber<? super T>>]. El método [call] de esta interfaz espera un tipo [Subscriber] (suscriptor, observador). En el resto de este documento, a veces nos referiremos al tipo [Observable.OnSubscribe<T>] como una acción. Vamos a crear acciones personalizadas que tendrán un nombre. Serán instancias de la siguiente interfaz [IProcessAction]:

  

package dvp.rxjava.observables.utils;

import rx.Observable;

public interface IProcessAction<T> extends Observable.OnSubscribe<T> {

    // la acción tiene un nombre
    public String getName();
}
  • línea 5: la interfaz [IProcessAction<T>] tiene todas las características de la interfaz [Observable.OnSubscribe<T>];
  • línea 8: además, tiene un método [getName] que devuelve el nombre de la instancia que implementa la interfaz;

Vamos a utilizar la siguiente acción denominada [ProcessAction01]:


package dvp.rxjava.observables.utils;

import java.util.Random;

import rx.Subscriber;
import rx.functions.Func1;

public class ProcessAction01<T> implements IProcessAction<T> {

    // data
    private String name;
    private int nbValues;
    private Func1<Integer, T> func1;

    // fabricantes
    public ProcessAction01(String name, int nbValues, Func1<Integer, T> func1) {
        this.name = name;
        this.nbValues = nbValues;
        this.func1 = func1;
    }

    @Override
    public void call(Subscriber<? super T> subscriber) {
        ProcessUtils.showInfos.accept(String.format("Observable (%s) call start", getName()));
        for (int i = 0; i < nbValues; i++) {
            // espera
            try {
                Thread.sleep(new Random().nextInt(500));
            } catch (InterruptedException e) {
                // error
                ProcessUtils.showInfos.accept(String.format("Observable (%s) onError", getName()));
                subscriber.onError(e);
            }
            // emisión de un elemento
            T value = func1.call(i);
            ProcessUtils.showInfos.accept(String.format("Observable (%s,%s) onNext (%s)", getName(), i, value));
            subscriber.onNext(value);
        }
        // finalizado
        ProcessUtils.showInfos.accept(String.format("Observable (%s) onCompleted", getName()));
        subscriber.onCompleted();
    }

    @Override
    public String getName() {
        return name;
    }

}
  • línea 8: la clase [ProcessAction01<T>] implementa la interfaz [IProcessAction<T>] y, por lo tanto, la interfaz [Observable.OnSubscribe<T>];
  • línea 11: el nombre de la acción;
  • línea 12: el número de valores que se van a emitir;
  • línea 13: una instancia de tipo [Func1<Integer, T>] que, a partir de un entero, crea un tipo T que será emitido por el observable (líneas 35 y 37);
  • líneas 16-20: se pasan al constructor el nombre de la acción, el número de valores que se van a emitir y la función de emisión;
  • líneas 23-42: el código del proceso;
  • línea 23: el método [call] recibe como parámetro el suscriptor del observable asociado al proceso;
  • línea 28: el proceso emite sus elementos tras una espera de duración aleatoria;
  • línea 32: emisión de un error;
  • línea 37: una emisión normal;
  • línea 41: emisión de la notificación de fin de emisión;
  • líneas 25-38: la acción emite nbValues reales tras un tiempo de espera aleatorio (línea 30);
  • línea 35: el valor que se va a emitir lo proporciona la función [func1] pasada como parámetro al constructor (línea 16);

Reestructuramos la clase [Process] (véase el apartado 7.3.1) para que también pueda construirse con una acción con nombre. Le añadimos el siguiente constructor:


public Process(IProcessAction<T> na, Scheduler schedulerObserved, Scheduler schedulerObserver) {
        // nom process=nom action
        name = na.getName();
        // action --> observable
        observable = Observable.create(na);
        // thread d'exécution du processus observé
        if (schedulerObserved != null) {
            observable = observable.subscribeOn(schedulerObserved);
        }
        // thread d'observation de l'observateur
        if (schedulerObserver != null) {
            observable = observable.observeOn(schedulerObserver);
        }
    }
  • línea 1, el constructor admite 3 parámetros:
    1. la acción con nombre que se utilizará para construir el observable (línea 5);
    2. el programador del proceso observado (puede ser null);
    3. el programador del observador (puede ser null);
  • línea 5: el observable se crea a partir de la acción pasada como parámetro;

El siguiente código [Exemple13] observa diferentes observables:


package dvp.rxjava.observables.exemples;

import java.util.Random;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;

public class Exemple13 {
    public static void main(String[] args) throws InterruptedException {
        // proceso 1
        Process<Double> process1 = new Process<>(
                new ProcessAction01<Double>("process1", 1, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
                Schedulers.computation());
        // proceso 2
        Process<String> process2 = new Process<>(
                new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), Schedulers.computation(), null);
        // proceso 3
        Process<Integer> process3 = new Process<>(new ProcessAction01<Integer>("process3", 3, i -> i * 2), null,
                Schedulers.computation());
        // proceso 4
        Process<Boolean> process4 = new Process<>(new ProcessAction01<Boolean>("process4", 4, i -> i % 2 == 0), null, null);
        // suscripciones
        ProcessUtils.subscribe(1, process1);
        ProcessUtils.subscribe(1, process2);
        ProcessUtils.subscribe(1, process3);
        ProcessUtils.subscribe(1, process4);
    }
}
  • líneas 13-15: el proceso process1 genera un número real en un hilo de cálculo que se observará en otro hilo de cálculo;
  • líneas 17-18: el proceso process2 genera 2 cadenas de caracteres en un hilo de cálculo y no se proporciona ninguna indicación sobre el hilo del observador. Los resultados muestran que la observación se realiza por defecto en el mismo hilo que el de la ejecución del proceso;
  • líneas 20-21: el proceso process3 genera 3 números enteros en un hilo no impuesto que serán observados en un hilo de cálculo. Los resultados muestran que la ejecución del proceso se realiza por defecto en el hilo principal;
  • línea 23: el proceso process4 genera 4 valores booleanos en un hilo no impuesto que se observarán en un hilo no impuesto. Los resultados muestran que la ejecución del proceso y su observación se realizan por defecto en el hilo principal;

El resultado de la ejecución de este código es el siguiente:

main : début observation ------Thread[main] ---- Time[18:642]
main : attente fin observation ------Thread[main] ---- Time[18:660]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[18:660]
Observable (process1,0) onNext (68.39999999999999) ------Thread[RxComputationThreadPool-4] ---- Time[19:093]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[19:094]
Subscriber[observateur[0],process1] : onNext (68.39999999999999) ------Thread[RxComputationThreadPool-3] ---- Time[19:396]
Subscriber[observateur[0],process1].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[19:397]
main : fin observation ------Thread[main] ---- Time[19:397]
main : début observation ------Thread[main] ---- Time[19:398]
main : attente fin observation ------Thread[main] ---- Time[19:399]
Observable (process2) call start ------Thread[RxComputationThreadPool-5] ---- Time[19:399]
Observable (process2,0) onNext (valeur-0) ------Thread[RxComputationThreadPool-5] ---- Time[19:630]
Subscriber[observateur[0],process2] : onNext ("valeur-0") ------Thread[RxComputationThreadPool-5] ---- Time[19:631]
Observable (process2,1) onNext (valeur-1) ------Thread[RxComputationThreadPool-5] ---- Time[20:094]
Subscriber[observateur[0],process2] : onNext ("valeur-1") ------Thread[RxComputationThreadPool-5] ---- Time[20:095]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-5] ---- Time[20:096]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-5] ---- Time[20:096]
main : fin observation ------Thread[main] ---- Time[20:097]
main : début observation ------Thread[main] ---- Time[20:097]
Observable (process3) call start ------Thread[main] ---- Time[20:098]
Observable (process3,0) onNext (0) ------Thread[main] ---- Time[20:188]
Subscriber[observateur[0],process3] : onNext (0) ------Thread[RxComputationThreadPool-6] ---- Time[20:213]
Observable (process3,1) onNext (2) ------Thread[main] ---- Time[20:336]
Subscriber[observateur[0],process3] : onNext (2) ------Thread[RxComputationThreadPool-6] ---- Time[20:338]
Observable (process3,2) onNext (4) ------Thread[main] ---- Time[20:676]
Observable (process3) onCompleted ------Thread[main] ---- Time[20:677]
main : attente fin observation ------Thread[main] ---- Time[20:677]
Subscriber[observateur[0],process3] : onNext (4) ------Thread[RxComputationThreadPool-6] ---- Time[20:678]
Subscriber[observateur[0],process3].onCompleted ------Thread[RxComputationThreadPool-6] ---- Time[20:679]
main : fin observation ------Thread[main] ---- Time[20:679]
main : début observation ------Thread[main] ---- Time[20:680]
Observable (process4) call start ------Thread[main] ---- Time[20:680]
Observable (process4,0) onNext (true) ------Thread[main] ---- Time[21:065]
Subscriber[observateur[0],process4] : onNext (true) ------Thread[main] ---- Time[21:067]
Observable (process4,1) onNext (false) ------Thread[main] ---- Time[21:187]
Subscriber[observateur[0],process4] : onNext (false) ------Thread[main] ---- Time[21:188]
Observable (process4,2) onNext (true) ------Thread[main] ---- Time[21:624]
Subscriber[observateur[0],process4] : onNext (true) ------Thread[main] ---- Time[21:625]
Observable (process4,3) onNext (false) ------Thread[main] ---- Time[21:765]
Subscriber[observateur[0],process4] : onNext (false) ------Thread[main] ---- Time[21:766]
Observable (process4) onCompleted ------Thread[main] ---- Time[21:767]
Subscriber[observateur[0],process4].onCompleted ------Thread[main] ---- Time[21:767]
main : attente fin observation ------Thread[main] ---- Time[21:767]
main : fin observation ------Thread[main] ---- Time[21:768]
  • el proceso process1 genera 1 número real (línea 4) en el hilo de cálculo [RxComputationThreadPool-4], que se observa en el hilo de cálculo [RxComputationThreadPool-3] (línea 6);
  • el proceso process2 genera 2 cadenas de caracteres (líneas 12, 14) en el hilo de cálculo [RxComputationThreadPool-5], que se observan en ese mismo hilo (líneas 13, 15);
  • el proceso process3 genera 3 números enteros (líneas 21, 23, 25) en el hilo principal, que se observan en el hilo de cálculo [RxComputationThreadPool-6] (líneas 22, 24, 28);
  • el proceso process4 genera 4 valores booleanos (líneas 34, 36, 38, 40) en el hilo principal, que se observan en ese mismo hilo principal (líneas 33, 35, 37, 39);

Se invita al lector a seguir lo anterior:

  • el ciclo de vida del proceso observado y su hilo;
  • el ciclo de vida de su observador y su hilo;

Gran parte del interés de las bibliotecas Rx reside en este multihilo que el desarrollador no tiene que gestionar por sí mismo.

7.5. Combinaciones de varios observables

7.5.1. Ejemplo 14: fusionar dos observables con [Observable.merge]

A continuación, presentamos métodos estáticos de la clase [Observable] que permiten combinar varios observables en un observable resultante.

El primer ejemplo de este tipo será el siguiente:


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.ProcessAction01;

import java.util.Random;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;

public class Exemple14 {
    public static void main(String[] args) throws InterruptedException {
        // proceso 1
        Process<Double> process1 = new Process<>(
                new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
                Schedulers.computation());
        // proceso 2
        Process<String> process2 = new Process<>(
                new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), Schedulers.computation(), null);
        // fusión
        Process<?> process12 = new Process<>("process12",
                Observable.merge(process1.getObservable(), process2.getObservable()));
        // suscripciones
        ProcessUtils.subscribe(1, process12);
    }
}
  • líneas 15-17: un proceso llamado [process1] emitirá 3 números reales en un hilo de cálculo. También se observará en un hilo de cálculo;
  • líneas 19-20: un proceso llamado [process2] emitirá 2 cadenas de caracteres en un hilo de cálculo. El hilo de observación no está impuesto. Hemos visto anteriormente que, en este caso, el hilo de observación es el hilo de cálculo;
  • línea 23: los dos procesos se fusionan, es decir, se crea un observable cuyos elementos proceden simultáneamente de ambos procesos. Para ello se utiliza el método estático [Observable.merge]:
 

Contrariamente a lo que podría sugerir el esquema anterior, durante la fusión, los elementos de un flujo 1 pueden intercalarse entre los elementos de un flujo 2. Esto es lo que muestran los resultados de la ejecución:

main : début observation ------Thread[main] ---- Time[56:053]
main : attente fin observation ------Thread[main] ---- Time[56:073]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[56:073]
Observable (process2) call start ------Thread[RxComputationThreadPool-5] ---- Time[56:074]
Observable (process1,0) onNext (64.8) ------Thread[RxComputationThreadPool-4] ---- Time[56:263]
Observable (process2,0) onNext (valeur-0) ------Thread[RxComputationThreadPool-5] ---- Time[56:403]
Observable (process2,1) onNext (valeur-1) ------Thread[RxComputationThreadPool-5] ---- Time[56:515]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-5] ---- Time[56:516]
Subscriber[observateur[0],process12] : onNext (64.8) ------Thread[RxComputationThreadPool-3] ---- Time[56:552]
Subscriber[observateur[0],process12] : onNext ("valeur-0") ------Thread[RxComputationThreadPool-3] ---- Time[56:553]
Subscriber[observateur[0],process12] : onNext ("valeur-1") ------Thread[RxComputationThreadPool-3] ---- Time[56:553]
Observable (process1,1) onNext (56.4) ------Thread[RxComputationThreadPool-4] ---- Time[56:716]
Subscriber[observateur[0],process12] : onNext (56.4) ------Thread[RxComputationThreadPool-3] ---- Time[56:718]
Observable (process1,2) onNext (22.8) ------Thread[RxComputationThreadPool-4] ---- Time[57:082]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[57:083]
Subscriber[observateur[0],process12] : onNext (22.8) ------Thread[RxComputationThreadPool-3] ---- Time[57:084]
Subscriber[observateur[0],process12].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[57:085]
main : fin observation ------Thread[main] ---- Time[57:085]
  • línea 3: el proceso [process1] se ejecuta en el hilo de cálculo [RxComputationThreadPool-4];
  • línea 4: el proceso [process2] se ejecuta en el hilo de cálculo [RxComputationThreadPool-5];
  • línea 9: el proceso [process12] se observa en el hilo de cálculo [RxComputationThreadPool-3]. No conozco la regla que ha llevado a esta elección;
  • líneas 9-11: se observa que el observador observa elementos de los dos procesos [process1] (línea 5) y [process2] (líneas 6, 7), aunque ninguno de los dos ha finalizado (hay mezcla);
  • El proceso [process12] finaliza (línea 17) cuando los dos procesos process1 y process2 han finalizado;

7.5.2. Ejemplo 15: concatenar dos observables con [Observable.concat]

Ahora examinamos el siguiente código:


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.ProcessAction01;

import java.util.Random;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;

public class Exemple15 {
    public static void main(String[] args) throws InterruptedException {
        // proceso 1
        Process<Double> process1 = new Process<>(
                new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
                Schedulers.computation());
        // proceso 2
        Process<String> process2 = new Process<>(
                new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), null, Schedulers.computation());
        // concat
        Process<?> process12 = new Process<>("process12",
                Observable.concat(process1.getObservable(), process2.getObservable()));
        // suscripciones
        ProcessUtils.subscribe(1, process12);
    }
}
  • líneas 15-17: un proceso llamado [process1] emitirá 3 números reales en un hilo de cálculo. También se observará en un hilo de cálculo;
  • líneas 19-20: un proceso llamado [process2] emitirá 2 cadenas de caracteres en un hilo no impuesto, en este caso el hilo principal por defecto. Se observará en un hilo de cálculo;
  • línea 23: los dos procesos se concatenan, es decir, se crea un observable cuyos elementos proceden de ambos procesos. No se mezclan los valores emitidos. El proceso [process12] emitirá primero todos los valores del proceso [process1] y luego los del proceso [process2]. Para ello se utiliza el método estático [Observable.concat]:
 

Los resultados de la ejecución son los siguientes:

main : début observation ------Thread[main] ---- Time[30:162]
main : attente fin observation ------Thread[main] ---- Time[30:189]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[30:190]
Observable (process1,0) onNext (79.2) ------Thread[RxComputationThreadPool-4] ---- Time[30:681]
Observable (process1,1) onNext (98.39999999999999) ------Thread[RxComputationThreadPool-4] ---- Time[30:792]
Subscriber[observateur[0],process12] : onNext (79.2) ------Thread[RxComputationThreadPool-3] ---- Time[30:975]
Subscriber[observateur[0],process12] : onNext (98.39999999999999) ------Thread[RxComputationThreadPool-3] ---- Time[30:976]
Observable (process1,2) onNext (84.0) ------Thread[RxComputationThreadPool-4] ---- Time[31:084]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[31:085]
Subscriber[observateur[0],process12] : onNext (84.0) ------Thread[RxComputationThreadPool-3] ---- Time[31:086]
Observable (process2) call start ------Thread[RxComputationThreadPool-3] ---- Time[31:087]
Observable (process2,0) onNext (valeur-0) ------Thread[RxComputationThreadPool-3] ---- Time[31:556]
Subscriber[observateur[0],process12] : onNext ("valeur-0") ------Thread[RxComputationThreadPool-5] ---- Time[31:557]
Observable (process2,1) onNext (valeur-1) ------Thread[RxComputationThreadPool-3] ---- Time[31:608]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[31:609]
Subscriber[observateur[0],process12] : onNext ("valeur-1") ------Thread[RxComputationThreadPool-5] ---- Time[31:609]
Subscriber[observateur[0],process12].onCompleted ------Thread[RxComputationThreadPool-5] ---- Time[31:610]
main : fin observation ------Thread[main] ---- Time[31:611]
  • líneas 3-10: se ejecuta el proceso [process1] y el proceso [process12] emite los valores emitidos por [process1];
  • línea 9: el proceso [process1] ha finalizado;
  • líneas 11-17: se ejecuta el proceso [process2] y el proceso [process12] emite los valores emitidos por [process2];

Hay una anomalía en el proceso process2: no se había impuesto ningún hilo de ejecución. Por lo tanto, cabría esperar que, por defecto, este fuera el hilo principal. Sin embargo, no es así. El hilo de ejecución fue el hilo de cálculo [RxComputationThreadPool-3] (línea 11). Por lo tanto, cuando no se impone un hilo de ejecución u observación, no se puede hacer ninguna suposición sobre el hilo que se elegirá.

7.5.3. Ejemplo 16: combinar dos observables con [Observable.zip]

Ahora examinamos el siguiente código:


package dvp.rxjava.observables.exemples;

import java.util.Arrays;
import java.util.Random;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.functions.FuncN;
import rx.schedulers.Schedulers;

public class Exemple16 {
    public static void main(String[] args) throws InterruptedException {
        // proceso 1
        Process<Double> process1 = new Process<>(
                new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
                Schedulers.computation());
        // proceso 2
        Process<String> process2 = new Process<>(
                new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), null, null);
        // función de combinación de los 2 procesos
        FuncN<String> funcn = new FuncN<String>() {
            @Override
            public String call(Object... args) {
                if (args.length == 2) {
                    return String.format("double=%s, string=%s", args[0], args[1]);
                } else {
                    throw new RuntimeException("la fonction attend 2 paramètres exactement");
                }
            }
        };
        // compresión de los 2 procesos
        Process<String> process12 = new Process<>("process12",
                Observable.zip(Arrays.asList(process1.getObservable(), process2.getObservable()), funcn));
        // suscripciones
        ProcessUtils.subscribe(1, process12);
    }
}
  • líneas 16-18: un proceso llamado [process1] emitirá 3 números reales en un hilo de cálculo. También será observado en un hilo de cálculo;
  • líneas 20-21: un proceso llamado [process2] emitirá 2 cadenas de caracteres en un hilo no impuesto. El hilo de observación tampoco está impuesto;
  • líneas 23-32: instanciación de un tipo [FuncN<String>] con una clase anónima. FuncN es una interfaz funcional:
 

El método [FuncN.call] espera una matriz de objetos y devuelve un tipo R. La función [funcn] se utilizará para combinar los procesos process1 y process2 en este orden. En el método [FuncN.call]:

  • args[0] será un Double;
  • args[1] será un String;

Aquí, el resultado de [funcn.call] será la cadena de caracteres de la línea 27. La construcción de este resultado no requiere conocer los tipos de los argumentos del método call.

Los dos procesos se combinan de la siguiente manera:


// archivo zip de los 2 procesos
Process<String> process12 = new Process<>("process12",
Observable.zip(Arrays.asList(process1.getObservable(), process2.getObservable()), funcn));

El método [Observable.zip] funciona de la siguiente manera:

 

Se observa que:

  • el primer argumento de zip es un Iterable<Observable>. En nuestro ejemplo, tenemos un parámetro efectivo de tipo List<Observable> formado por nuestros dos observables;
  • el segundo argumento de zip es de tipo FuncN. En nuestro ejemplo, el parámetro efectivo es [funcn];

La ejecución da los siguientes resultados:

main : début observation ------Thread[main] ---- Time[55:636]
Observable (process2) call start ------Thread[main] ---- Time[55:666]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[55:666]
Observable (process1,0) onNext (69.6) ------Thread[RxComputationThreadPool-4] ---- Time[55:902]
Observable (process2,0) onNext (valeur-0) ------Thread[main] ---- Time[56:076]
Observable (process1,1) onNext (82.8) ------Thread[RxComputationThreadPool-4] ---- Time[56:271]
Subscriber[observateur[0],process12] : onNext ("double=69.6, string=valeur-0") ------Thread[main] ---- Time[56:352]
Observable (process1,2) onNext (14.399999999999999) ------Thread[RxComputationThreadPool-4] ---- Time[56:641]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[56:642]
Observable (process2,1) onNext (valeur-1) ------Thread[main] ---- Time[56:778]
Subscriber[observateur[0],process12] : onNext ("double=82.8, string=valeur-1") ------Thread[main] ---- Time[56:779]
Observable (process2) onCompleted ------Thread[main] ---- Time[56:779]
Subscriber[observateur[0],process12].onCompleted ------Thread[main] ---- Time[56:780]
main : attente fin observation ------Thread[main] ---- Time[56:781]
main : fin observation ------Thread[main] ---- Time[56:781]
  • líneas 7, 11: el proceso process12 emite dos elementos;
  • línea 8: el elemento adicional emitido por el proceso process1, que no tiene contraparte en el proceso process2, no es emitido por el proceso de resultado process12;

Se observa que el proceso process2, al que no se le había asignado ni un hilo de ejecución ni un hilo de observación, utilizó el hilo principal para ambos.

7.5.4. Ejemplo 17: combinar dos observables con [Observable.combineLatest]

Examinemos ahora el siguiente código:


package dvp.rxjava.observables.exemples;

import java.util.Random;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;

public class Exemple17 {
    public static void main(String[] args) throws InterruptedException {
        // proceso 1
        Process<Double> process1 = new Process<>(
                new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
                Schedulers.computation());
        // proceso 2
        Process<Double> process2 = new Process<>(
                new ProcessAction01<Double>("process2", 2, i -> new Random().nextInt(200) * 1.4), null,
                Schedulers.computation());
        // combinación de los 2 procesos
        Process<Double> process12 = new Process<>("process12",
                Observable.combineLatest(process1.getObservable(), process2.getObservable(), (d1, d2) -> d1 + d2));
        // suscripciones
        ProcessUtils.subscribe(1, process12);
    }
}
  • líneas 14-16: un proceso llamado [process1] emitirá 3 números reales en un hilo de cálculo. También será observado en un hilo de cálculo;
  • líneas 18-20: un proceso llamado [process2] emitirá 2 números reales en un subproceso no impuesto. Se observarán en un subproceso de cálculo;
  • línea 23: los dos observables se combinan con el siguiente método estático [Observable.combineLatest]:
 

El observable [combineLatest] funciona de la siguiente manera: cuando uno de los dos observables emite un elemento E1, este elemento es combinado por [combineFunction] con el último elemento emitido por el otro observable.

La ejecución de este código da el siguiente resultado:

main : début observation ------Thread[main] ---- Time[01:768]
Observable (process2) call start ------Thread[main] ---- Time[01:791]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[01:791]
Observable (process1,0) onNext (54.0) ------Thread[RxComputationThreadPool-4] ---- Time[01:991]
Observable (process2,0) onNext (56.0) ------Thread[main] ---- Time[02:245]
Observable (process1,1) onNext (51.6) ------Thread[RxComputationThreadPool-4] ---- Time[02:358]
Subscriber[observateur[0],process12] : onNext (110.0) ------Thread[RxComputationThreadPool-5] ---- Time[02:521]
Subscriber[observateur[0],process12] : onNext (107.6) ------Thread[RxComputationThreadPool-5] ---- Time[02:522]
Observable (process2,1) onNext (261.8) ------Thread[main] ---- Time[02:595]
Observable (process2) onCompleted ------Thread[main] ---- Time[02:596]
main : attente fin observation ------Thread[main] ---- Time[02:596]
Subscriber[observateur[0],process12] : onNext (313.40000000000003) ------Thread[RxComputationThreadPool-5] ---- Time[02:597]
Observable (process1,2) onNext (80.39999999999999) ------Thread[RxComputationThreadPool-4] ---- Time[02:790]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[02:791]
Subscriber[observateur[0],process12] : onNext (342.2) ------Thread[RxComputationThreadPool-3] ---- Time[02:792]
Subscriber[observateur[0],process12].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[02:792]
main : fin observation ------Thread[main] ---- Time[02:793]
  • línea 5: la emisión de process2 (56) se combina con el último elemento emitido por process1 (54, línea 4) y produce el resultado de la línea 7;
  • línea 6: la emisión de process1 (51,6) se combina con el último elemento emitido por process2 (56, línea 5) y produce el resultado de la línea 8;
  • línea 9: la emisión de process2 (261,8) se combina con el último elemento emitido por process1 (51,6, línea 6) y produce el resultado de la línea 12;
  • línea 13: la emisión de process1 (80,39) se combina con el último elemento emitido por process2 (261,8, línea 9) y produce el resultado de la línea 15;

Nos encontramos aquí ante una variante del observable [zip] en la que, en esta ocasión, los elementos combinados no son necesariamente los elementos de la misma posición en los flujos. Cabe destacar aquí que el proceso process2, al que no se le había asignado ningún hilo de ejecución, se ha ejecutado aquí en el hilo principal (línea 2).

7.5.5. Ejemplo 18: combinar dos observables con [Observable.amb]

Examinemos ahora el siguiente código:


package dvp.rxjava.observables.exemples;

import java.util.Random;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;

public class Exemple18 {
    public static void main(String[] args) throws InterruptedException {
        // proceso 1
        Process<Double> process1 = new Process<>(
                new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
                Schedulers.computation());
        // proceso 2
        Process<Double> process2 = new Process<>(
                new ProcessAction01<Double>("process2", 2, i -> new Random().nextInt(200) * 1.4), null, null);
        // combinación de los 2 procesos
        Process<Double> process12 = new Process<>("process12",
                Observable.amb(process1.getObservable(), process2.getObservable()));
        // suscripciones
        ProcessUtils.subscribe(1, process12);
    }
}
  • líneas 14-16: un proceso llamado [process1] emitirá 3 números reales en un hilo de cálculo. También se observará en un hilo de cálculo;
  • líneas 18-20: un proceso llamado [process2] emitirá 2 números reales en un subproceso no impuesto. Se observarán en un subproceso no impuesto;
  • línea 22: los dos observables se combinan con el siguiente método estático [Observable.amb]:
 

Como muestra el esquema anterior, el observable [Observable.amb(Observable o1, Observable o2)] emite los elementos del observable que emite el primero. Esto lo confirman los resultados del ejemplo presentado:

main : début observation ------Thread[main] ---- Time[21:594]
Observable (process2) call start ------Thread[main] ---- Time[21:612]
Observable (process1) call start ------Thread[RxComputationThreadPool-3] ---- Time[21:612]
Observable (process2,0) onNext (155.39999999999998) ------Thread[main] ---- Time[21:817]
Observable (process1) onError ------Thread[RxComputationThreadPool-3] ---- Time[21:820]
Observable (process1,0) onNext (90.0) ------Thread[RxComputationThreadPool-3] ---- Time[21:820]
Observable (process1,1) onNext (104.39999999999999) ------Thread[RxComputationThreadPool-3] ---- Time[21:877]
Subscriber[observateur[0],process12] : onNext (155.39999999999998) ------Thread[main] ---- Time[22:105]
Observable (process1,2) onNext (44.4) ------Thread[RxComputationThreadPool-3] ---- Time[22:122]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[22:123]
Observable (process2,1) onNext (201.6) ------Thread[main] ---- Time[22:581]
Subscriber[observateur[0],process12] : onNext (201.6) ------Thread[main] ---- Time[22:583]
Observable (process2) onCompleted ------Thread[main] ---- Time[22:583]
Subscriber[observateur[0],process12].onCompleted ------Thread[main] ---- Time[22:584]
main : attente fin observation ------Thread[main] ---- Time[22:585]
main : fin observation ------Thread[main] ---- Time[22:586]
  • línea 4, es el proceso process2 el que emite primero;
  • líneas 8 y 12: el proceso process12 emite todos los elementos emitidos por el proceso process2 (líneas 4 y 11);

7.6. Cadena de procesamiento de un observable

7.6.1. Ejemplo 19: transformar un observable con [Observable.map]

En los ejemplos anteriores, hemos examinado diversas combinaciones de dos observables en un tercer observable. Ahora presentamos métodos estáticos de la clase [Observable] que permiten operaciones de transformación, filtrado y agregación sobre un observable. Aquí encontraremos métodos análogos a los de la clase [Stream] estudiados en el apartado 5.

Nuestro primer ejemplo será el siguiente:


package dvp.rxjava.observables.exemples;

import java.util.Random;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;

public class Exemple19 {
    public static void main(String[] args) throws InterruptedException {
        // proceso 1
        Process<Double> process1 = new Process<>(
                new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
                Schedulers.computation());
        // proceso 2
        Process<String> process2 = new Process<>("process2",
                process1.getObservable().map(d -> String.format("valeur-%s", d)));
        // suscripciones
        ProcessUtils.subscribe(1, process2);
    }
}
  • líneas 14-16: un proceso denominado process1 emitirá 3 números reales en un hilo de cálculo. También se observará en un hilo de cálculo;
  • líneas 17-18: los números emitidos por process1 se transformarán en cadenas de caracteres en un proceso process2;
  • línea 20: se observa process2;

El método [Observable.map] de la línea 18 es análogo al método [Stream.map] estudiado en el apartado 5.5:

 

Los resultados del ejemplo son los siguientes:

main : début observation ------Thread[main] ---- Time[55:328]
main : attente fin observation ------Thread[main] ---- Time[55:346]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[55:347]
Observable (process1,0) onNext (21.599999999999998) ------Thread[RxComputationThreadPool-4] ---- Time[55:354]
Observable (process1,1) onNext (97.2) ------Thread[RxComputationThreadPool-4] ---- Time[55:512]
Subscriber[observateur[0],process2] : onNext ("valeur-21.599999999999998") ------Thread[RxComputationThreadPool-3] ---- Time[55:615]
Subscriber[observateur[0],process2] : onNext ("valeur-97.2") ------Thread[RxComputationThreadPool-3] ---- Time[55:616]
Observable (process1,2) onNext (98.39999999999999) ------Thread[RxComputationThreadPool-4] ---- Time[55:803]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[55:804]
Subscriber[observateur[0],process2] : onNext ("valeur-98.39999999999999") ------Thread[RxComputationThreadPool-3] ---- Time[55:804]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[55:805]
main : fin observation ------Thread[main] ---- Time[55:805]
  • líneas 4, 5 y 8: las emisiones de process1. Son números reales;
  • líneas 6, 7 y 10: las emisiones de process2 observadas. Son cadenas de caracteres;

7.6.2. Ejemplo-20: filtrar una variable observable con [Observable.filter]

El ejemplo será el siguiente:


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;

public class Exemple20 {
    public static void main(String[] args) throws InterruptedException {
        // proceso 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // proceso 2
        Process<Integer> process2 = new Process<>("process2", process1.getObservable().filter(i -> i % 2 == 0));
        // suscripciones
        ProcessUtils.subscribe(1, process2);
    }
}
  • líneas 11-12: un proceso llamado process1 emitirá los números enteros del 0 al 2 en un hilo de cálculo. También se observará en un hilo de cálculo;
  • línea 14: los números emitidos por process1 se filtrarán para conservar en process2 solo los números pares;
  • línea 20: se observa process2;

El método [Observable.filter] de la línea 18 es análogo al método [Stream.filter] estudiado en el apartado 5.4:

 

Los resultados del ejemplo son los siguientes:

main : début observation ------Thread[main] ---- Time[30:319]
main : attente fin observation ------Thread[main] ---- Time[30:335]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[30:336]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[30:388]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[30:625]
Subscriber[observateur[0],process2] : onNext (0) ------Thread[RxComputationThreadPool-3] ---- Time[30:703]
Observable (process1,2) onNext (2) ------Thread[RxComputationThreadPool-4] ---- Time[30:704]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[30:705]
Subscriber[observateur[0],process2] : onNext (2) ------Thread[RxComputationThreadPool-3] ---- Time[30:706]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[30:707]
main : fin observation ------Thread[main] ---- Time[30:707]
  • líneas 4, 5 y 7: las emisiones de process1;
  • líneas 6 y 9: las emisiones de process2 observadas. Son los elementos de process1 los que son pares;

7.6.3. Ejemplo-21: transformar una variable observable con [Observable.flatMap]

El ejemplo será el siguiente:


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;

public class Exemple21 {
    public static void main(String[] args) throws InterruptedException {
        // proceso 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // proceso 2
        Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMap(i -> {
            int value = i * 10;
            return Observable.just(value, value + 1, value + 2);
        }));
        // suscripciones
        ProcessUtils.subscribe(1, process2);
    }
}
  • líneas 12-13: un proceso llamado process1 emitirá los números enteros del 0 al 2 en un hilo de cálculo. También se observará en un hilo de cálculo;
  • líneas 15-18: cada número n emitido por process1 se transforma en un observable que emite los 3 números (10*n, 10*n+1, 10*n+2). Si en la línea 15 se utilizara el método [map], process2 emitiría un tipo Observable<Integer> y no un tipo Integer. El método [flatMap] utilizado permite aplanar (flatten) esta secuencia de elementos de tipo Observable<Integer> en una secuencia de elementos de tipo Integer constituida por cada uno de los elementos de cada uno de los Observable<Integer>;
  • línea 20: se observa process2;

El método [Observable.flatMap] de la línea 15 es análogo al método [Stream.flatMap] estudiado en el apartado 5.6.12:

 

Los resultados del ejemplo son los siguientes:

main : début observation ------Thread[main] ---- Time[31:466]
main : attente fin observation ------Thread[main] ---- Time[31:486]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[31:486]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[31:777]
Subscriber[observateur[0],process2] : onNext (0) ------Thread[RxComputationThreadPool-3] ---- Time[32:082]
Subscriber[observateur[0],process2] : onNext (1) ------Thread[RxComputationThreadPool-3] ---- Time[32:085]
Subscriber[observateur[0],process2] : onNext (2) ------Thread[RxComputationThreadPool-3] ---- Time[32:087]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[32:192]
Subscriber[observateur[0],process2] : onNext (10) ------Thread[RxComputationThreadPool-3] ---- Time[32:194]
Subscriber[observateur[0],process2] : onNext (11) ------Thread[RxComputationThreadPool-3] ---- Time[32:196]
Subscriber[observateur[0],process2] : onNext (12) ------Thread[RxComputationThreadPool-3] ---- Time[32:197]
Observable (process1,2) onNext (2) ------Thread[RxComputationThreadPool-4] ---- Time[32:686]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[32:687]
Subscriber[observateur[0],process2] : onNext (20) ------Thread[RxComputationThreadPool-3] ---- Time[32:688]
Subscriber[observateur[0],process2] : onNext (21) ------Thread[RxComputationThreadPool-3] ---- Time[32:690]
Subscriber[observateur[0],process2] : onNext (22) ------Thread[RxComputationThreadPool-3] ---- Time[32:692]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[32:693]
main : fin observation ------Thread[main] ---- Time[32:693]
  • líneas 5-7: las tres emisiones de process2 tras la emisión de la línea 4 de process1;
  • líneas 9-11: las tres emisiones de process2 tras la emisión de la línea 8 de process1;
  • líneas 14-16: las tres emisiones de process2 tras la emisión de la línea 12 de process1;

El siguiente código muestra cómo crear un tipo Observable<Integer[]> a partir de process1 y [Exemple21b]:


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;

public class Exemple21b {
    public static void main(String[] args) throws InterruptedException {
        // proceso 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // proceso 2
        Process<Integer[]> process2 = new Process<>("process2", process1.getObservable().map(i -> {
            int value = i * 10;
            return new Integer[] { value, value + 1, value + 2 };
        }));
        // suscripciones
        ProcessUtils.subscribe(1, process2);
    }
}
  • línea 14: se utiliza el método [Observable.map];
  • línea 16: que devuelve un tipo Integer[];

Los resultados son los siguientes:

main : début observation ------Thread[main] ---- Time[58:089]
main : attente fin observation ------Thread[main] ---- Time[58:107]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[58:108]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[58:503]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[58:762]
Subscriber[observateur[0],process2] : onNext ([0,1,2]) ------Thread[RxComputationThreadPool-3] ---- Time[58:792]
Subscriber[observateur[0],process2] : onNext ([10,11,12]) ------Thread[RxComputationThreadPool-3] ---- Time[58:795]
Observable (process1,2) onNext (2) ------Thread[RxComputationThreadPool-4] ---- Time[58:851]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[58:852]
Subscriber[observateur[0],process2] : onNext ([20,21,22]) ------Thread[RxComputationThreadPool-3] ---- Time[58:853]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[58:854]
main : fin observation ------Thread[main] ---- Time[58:854]
  • líneas 6, 7, 10: se ven los resultados de map;

Todas estas transformaciones de observables se pueden encadenar, ya que cada transformación produce un nuevo observable. Esto es lo que muestra el siguiente ejemplo [Exemple21c]:


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;

public class Exemple21c {
    public static void main(String[] args) throws InterruptedException {
        // proceso 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // proceso 2
        Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMap(i -> {
            int value = i * 10;
            return Observable.just(value, value + 1, value + 2);
        }).filter(i -> i % 2 == 0));
        // suscripciones
        ProcessUtils.subscribe(1, process2);
    }
}
  • líneas 15-18: el flatMap va seguido de un filter;

Los resultados de la ejecución son los siguientes:

main : début observation ------Thread[main] ---- Time[37:993]
main : attente fin observation ------Thread[main] ---- Time[38:016]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[38:017]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[38:124]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[38:366]
Observable (process1,2) onNext (2) ------Thread[RxComputationThreadPool-4] ---- Time[38:380]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[38:381]
Subscriber[observateur[0],process2] : onNext (0) ------Thread[RxComputationThreadPool-3] ---- Time[38:436]
Subscriber[observateur[0],process2] : onNext (2) ------Thread[RxComputationThreadPool-3] ---- Time[38:439]
Subscriber[observateur[0],process2] : onNext (10) ------Thread[RxComputationThreadPool-3] ---- Time[38:441]
Subscriber[observateur[0],process2] : onNext (12) ------Thread[RxComputationThreadPool-3] ---- Time[38:443]
Subscriber[observateur[0],process2] : onNext (20) ------Thread[RxComputationThreadPool-3] ---- Time[38:445]
Subscriber[observateur[0],process2] : onNext (22) ------Thread[RxComputationThreadPool-3] ---- Time[38:446]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[38:447]
main : fin observation ------Thread[main] ---- Time[38:447]
  • líneas 8-13: process2 solo ha emitido los elementos pares procedentes de flatMap;

Un método similar a [flatMap] es el método [flatMapIterable], ilustrado por el siguiente ejemplo [Exemple21d]:


package dvp.rxjava.observables.exemples;

import java.util.Arrays;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;

public class Exemple21d {
    public static void main(String[] args) throws InterruptedException {
        // proceso 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // proceso 2
        Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMapIterable(i -> {
            int value = i * 10;
            return Arrays.asList(value, value + 1, value + 2);
        }).filter(i -> i % 2 == 0));
        // suscripciones
        ProcessUtils.subscribe(1, process2);
    }
}

Línea 16: en lugar de utilizar el método [flatMap], se utiliza el método [flatMapIterable]. En este caso, la función de transformación debe generar un tipo Iterable<T> (línea 18) en lugar de un tipo Observable<T>.

Se obtienen los mismos resultados que anteriormente.

Volvamos a la definición del método [flatMap]:

 

Como se ve arriba, se ha insertado un elemento azul [3] entre los dos elementos verdes [1-2]. Esto significa que, en su operación de aplanamiento de los Observable<T>, el método [flatMap] respeta el orden de emisión de estos diferentes observables internos. Esto se muestra en el siguiente ejemplo [Exemple21e]:


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;

public class Exemple21e {
    public static void main(String[] args) throws InterruptedException {
        // proceso 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // proceso 2
        Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
                Schedulers.computation(), Schedulers.computation());
        // proceso 3
        Process<Integer> process3 = new Process<>("process3",
                process1.getObservable().flatMap(i -> process2.getObservable()));
        // suscripciones
        ProcessUtils.subscribe(1, process3);
    }
}
  • líneas 11-12: el proceso process1 genera los números enteros [0,1];
  • líneas 14-15: el proceso process2 emite los números enteros [10,11,12];
  • líneas 17-18: a cada elemento emitido por process1 se le asocia el observable del proceso process2. Esto significa que:
    • al elemento [0] de process1 se le asociará una observable que emite los [10,11,12];
    • lo mismo ocurre con el elemento 1;

Al final, se emitirán los 6 números [10, 11, 12, 10, 11, 12]. Queremos ver en qué orden.

Los resultados de la ejecución son los siguientes:

main : début observation ------Thread[main] ---- Time[22:540]
main : attente fin observation ------Thread[main] ---- Time[22:566]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[22:566]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[22:949]
Observable (process2) call start ------Thread[RxComputationThreadPool-6] ---- Time[22:951]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[23:159]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[23:160]
Observable (process2) call start ------Thread[RxComputationThreadPool-8] ---- Time[23:160]
Observable (process2,0) onNext (10) ------Thread[RxComputationThreadPool-6] ---- Time[23:286]
Observable (process2,0) onNext (10) ------Thread[RxComputationThreadPool-8] ---- Time[23:513]
Subscriber[observateur[0],process3] : onNext (10) ------Thread[RxComputationThreadPool-5] ---- Time[23:597]
Subscriber[observateur[0],process3] : onNext (10) ------Thread[RxComputationThreadPool-5] ---- Time[23:599]
Observable (process2,1) onNext (11) ------Thread[RxComputationThreadPool-6] ---- Time[23:645]
Subscriber[observateur[0],process3] : onNext (11) ------Thread[RxComputationThreadPool-5] ---- Time[23:647]
Observable (process2,2) onNext (12) ------Thread[RxComputationThreadPool-6] ---- Time[23:789]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-6] ---- Time[23:790]
Subscriber[observateur[0],process3] : onNext (12) ------Thread[RxComputationThreadPool-5] ---- Time[23:791]
Observable (process2,1) onNext (11) ------Thread[RxComputationThreadPool-8] ---- Time[23:976]
Subscriber[observateur[0],process3] : onNext (11) ------Thread[RxComputationThreadPool-7] ---- Time[23:978]
Observable (process2,2) onNext (12) ------Thread[RxComputationThreadPool-8] ---- Time[24:184]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-8] ---- Time[24:184]
Subscriber[observateur[0],process3] : onNext (12) ------Thread[RxComputationThreadPool-7] ---- Time[24:186]
Subscriber[observateur[0],process3].onCompleted ------Thread[RxComputationThreadPool-7] ---- Time[24:187]
main : fin observation ------Thread[main] ---- Time[24:187]

Se observa que el orden de emisión del proceso process3 ha sido: [10, 10, 11, 12, 11, 12] (líneas 11, 12, 14, 17, 19, 22). Por lo tanto, se ha producido una mezcla de los elementos emitidos por el proceso process2. Esto se puede evitar utilizando el método [concatMap] en lugar del método [flatMap]. Así lo muestra el siguiente código [Exemple21ef]:


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;

public class Exemple21ef {
    public static void main(String[] args) throws InterruptedException {
        // proceso 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // proceso 2
        Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
                Schedulers.computation(), Schedulers.computation());
        // proceso 3
        Process<Integer> process3 = new Process<>("process3",
                process1.getObservable().concatMap(i -> process2.getObservable()));
        // suscripciones
        ProcessUtils.subscribe(1, process3);
    }
}

En la línea 18, se ha sustituido [flatMap] por [concatMap]. Los resultados de la ejecución son los siguientes:

main : début observation ------Thread[main] ---- Time[45:507]
main : attente fin observation ------Thread[main] ---- Time[45:530]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[45:530]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[45:775]
Observable (process2) call start ------Thread[RxComputationThreadPool-6] ---- Time[45:778]
Observable (process2,0) onNext (10) ------Thread[RxComputationThreadPool-6] ---- Time[45:846]
Observable (process2,1) onNext (11) ------Thread[RxComputationThreadPool-6] ---- Time[45:890]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[45:947]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[45:948]
Observable (process2,2) onNext (12) ------Thread[RxComputationThreadPool-6] ---- Time[46:096]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-6] ---- Time[46:097]
Subscriber[observateur[0],process3] : onNext (10) ------Thread[RxComputationThreadPool-5] ---- Time[46:144]
Subscriber[observateur[0],process3] : onNext (11) ------Thread[RxComputationThreadPool-5] ---- Time[46:147]
Subscriber[observateur[0],process3] : onNext (12) ------Thread[RxComputationThreadPool-5] ---- Time[46:148]
Observable (process2) call start ------Thread[RxComputationThreadPool-8] ---- Time[46:149]
Observable (process2,0) onNext (10) ------Thread[RxComputationThreadPool-8] ---- Time[46:364]
Subscriber[observateur[0],process3] : onNext (10) ------Thread[RxComputationThreadPool-7] ---- Time[46:366]
Observable (process2,1) onNext (11) ------Thread[RxComputationThreadPool-8] ---- Time[46:529]
Subscriber[observateur[0],process3] : onNext (11) ------Thread[RxComputationThreadPool-7] ---- Time[46:531]
Observable (process2,2) onNext (12) ------Thread[RxComputationThreadPool-8] ---- Time[46:558]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-8] ---- Time[46:559]
Subscriber[observateur[0],process3] : onNext (12) ------Thread[RxComputationThreadPool-7] ---- Time[46:560]
Subscriber[observateur[0],process3].onCompleted ------Thread[RxComputationThreadPool-7] ---- Time[46:562]
main : fin observation ------Thread[main] ---- Time[46:562]

Se observa que el orden de emisión del proceso process3 ha sido: [10, 11, 12, 10, 11, 12] (líneas 12-14, 17, 19, 22). Los elementos emitidos por el proceso process2 no se han mezclado.

Otra variante del método [map] es el método [switchMap]:

 

En el ejemplo anterior, del observable [1] surgen otros 3 observables [2] de 2 elementos que luego se aplanan como en [flatMap] [3]. Cabe señalar que el resultado tiene 5 elementos y no 6. Esto se debe a que, antes de que el segundo observable emita su elemento n.º 2 [6], el tercer observable emite su primer elemento [5], lo que hace que el segundo observable sea descartado. Por lo tanto, no encontramos el elemento [6] en el observable resultante [3].

Para ilustrar [switchMap], utilizaremos el siguiente ejemplo [Exemple21eg]:


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;

public class Exemple21eg {
    public static void main(String[] args) throws InterruptedException {
        // proceso 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // proceso 2
        Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
                Schedulers.computation(), Schedulers.computation());
        // proceso 3
        Process<Integer> process3 = new Process<>("process3",
                process1.getObservable().switchMap(i -> process2.getObservable()));
        // suscripciones
        ProcessUtils.subscribe(1, process3);
    }
}

La ejecución del ejemplo da los siguientes resultados:

main : début observation ------Thread[main] ---- Time[02:388]
main : attente fin observation ------Thread[main] ---- Time[02:419]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[02:419]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[02:641]
Observable (process2) call start ------Thread[RxComputationThreadPool-6] ---- Time[02:643]
Observable (process2,0) onNext (10) ------Thread[RxComputationThreadPool-6] ---- Time[02:802]
Observable (process2,1) onNext (11) ------Thread[RxComputationThreadPool-6] ---- Time[02:888]
Observable (process2,2) onNext (12) ------Thread[RxComputationThreadPool-6] ---- Time[02:957]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-6] ---- Time[02:958]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[03:005]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[03:007]
Observable (process2) call start ------Thread[RxComputationThreadPool-8] ---- Time[03:007]
Observable (process2,0) onNext (10) ------Thread[RxComputationThreadPool-8] ---- Time[03:106]
Subscriber[observateur[0],process3] : onNext (10) ------Thread[RxComputationThreadPool-5] ---- Time[03:106]
Subscriber[observateur[0],process3] : onNext (10) ------Thread[RxComputationThreadPool-5] ---- Time[03:108]
Observable (process2,1) onNext (11) ------Thread[RxComputationThreadPool-8] ---- Time[03:236]
Subscriber[observateur[0],process3] : onNext (11) ------Thread[RxComputationThreadPool-7] ---- Time[03:238]
Observable (process2,2) onNext (12) ------Thread[RxComputationThreadPool-8] ---- Time[03:716]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-8] ---- Time[03:717]
Subscriber[observateur[0],process3] : onNext (12) ------Thread[RxComputationThreadPool-7] ---- Time[03:718]
Subscriber[observateur[0],process3].onCompleted ------Thread[RxComputationThreadPool-7] ---- Time[03:718]
main : fin observation ------Thread[main] ---- Time[03:719]
  • process1 emite 2 elementos que dan lugar a 2 observables process2 de 3 elementos;
  • línea 14: el observador recibe el elemento n.º 0 emitido por el primer observable process2 de la línea 6;
  • línea 15: el observador recibe el elemento n.º 0 emitido por el segundo observable process2, línea 13. La historia no explica por qué no recibió antes los elementos 1 y 2 emitidos por el primer observable process2 en las líneas 7 y 8. Sea como fuere, el primer observable process2 se descarta;
  • al final, el observador solo ve 4 elementos (líneas 14, 15, 17, 20) en lugar de los 6 que se emitieron;

7.6.4. Ejemplos-22: otros métodos de la clase [Observable]

La clase [Observable] retoma numerosos métodos de la clase [Stream] con un funcionamiento análogo. A continuación se muestran algunos de ellos. Nos limitamos a proporcionar el código y sus resultados.

[Exemple22a - take=limit]


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;

public class Exemple22a {
    public static void main(String[] args) throws InterruptedException {
        // proceso
        Process<Integer> process = new Process<>("process", Observable.range(1, 10).take(3));
        // suscripciones
        ProcessUtils.subscribe(1, process);
    }
}

Resultados

1
2
3
4
5
6
7
main : début observation ------Thread[main] ---- Time[25:071]
Subscriber[observateur[0],process] : onNext (1) ------Thread[main] ---- Time[25:399]
Subscriber[observateur[0],process] : onNext (2) ------Thread[main] ---- Time[25:402]
Subscriber[observateur[0],process] : onNext (3) ------Thread[main] ---- Time[25:404]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[25:404]
main : attente fin observation ------Thread[main] ---- Time[25:406]
main : fin observation ------Thread[main] ---- Time[25:406]

[Exemple22b - takeLast]


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;

public class Exemple22b {
    public static void main(String[] args) throws InterruptedException {
        // proceso
        Process<Integer> process = new Process<>("process", Observable.range(1, 10).takeLast(2));
        // suscripciones
        ProcessUtils.subscribe(1, process);
    }
}

resultados

1
2
3
4
5
6
main : début observation ------Thread[main] ---- Time[19:440]
Subscriber[observateur[0],process] : onNext (9) ------Thread[main] ---- Time[19:726]
Subscriber[observateur[0],process] : onNext (10) ------Thread[main] ---- Time[19:728]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[19:728]
main : attente fin observation ------Thread[main] ---- Time[19:729]
main : fin observation ------Thread[main] ---- Time[19:730]

[Exemple22c - skip]


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;

public class Exemple22c {
    public static void main(String[] args) throws InterruptedException {
        // procesos
        Process<Integer> process = new Process<>("process", Observable.range(1, 10).skip(5).take(2));
        // suscripciones
        ProcessUtils.subscribe(1, process);
    }
}

resultados

1
2
3
4
5
6
main : début observation ------Thread[main] ---- Time[16:685]
Subscriber[observateur[0],process] : onNext (6) ------Thread[main] ---- Time[17:002]
Subscriber[observateur[0],process] : onNext (7) ------Thread[main] ---- Time[17:004]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[17:005]
main : attente fin observation ------Thread[main] ---- Time[17:006]
main : fin observation ------Thread[main] ---- Time[17:006]

[Exemple22d - reduce]


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;

public class Exemple22d {
    public static void main(String[] args) throws InterruptedException {
        // procesos
        Process<Integer> process = new Process<>("process", Observable.range(1, 10).reduce(0, (i, a) -> i + a));
        // suscripciones
        ProcessUtils.subscribe(1, process);
    }
}
  • línea 10: calcula la suma de los elementos del observable. El resultado es un observable que emite dicha suma;

resultados

1
2
3
4
5
main : début observation ------Thread[main] ---- Time[52:412]
Subscriber[observateur[0],process] : onNext (55) ------Thread[main] ---- Time[52:640]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[52:640]
main : attente fin observation ------Thread[main] ---- Time[52:642]
main : fin observation ------Thread[main] ---- Time[52:642]

[Exemple22e - all]


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;

public class Exemple22e {
    public static void main(String[] args) throws InterruptedException {
        // procesos
        Process<Boolean> process = new Process<>("process", Observable.range(1, 10).all(i -> i > 10));
        // suscripciones
        ProcessUtils.subscribe(1, process);
    }
}
  • línea 10: devuelve un Observable<Boolean> que emite el elemento true, si el predicado del método [all] es verdadero para todos los elementos; false en caso contrario;

resultados

1
2
3
4
5
main : début observation ------Thread[main] ---- Time[59:866]
Subscriber[observateur[0],process] : onNext (false) ------Thread[main] ---- Time[00:069]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[00:070]
main : attente fin observation ------Thread[main] ---- Time[00:071]
main : fin observation ------Thread[main] ---- Time[00:071]

[Exemple22f - count]


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;

public class Exemple22f {
    public static void main(String[] args) throws InterruptedException {
        // procesos
        Process<Integer> process = new Process<>("process", Observable.range(1, 10).count());
        // suscripciones
        ProcessUtils.subscribe(1, process);
    }
}
  • línea 10: [Observable.count] crea un observable de 1 elemento que es la suma de los elementos observados;

resultados

1
2
3
4
5
main : début observation ------Thread[main] ---- Time[16:409]
Subscriber[observateur[0],process] : onNext (10) ------Thread[main] ---- Time[16:634]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[16:634]
main : attente fin observation ------Thread[main] ---- Time[16:635]
main : fin observation ------Thread[main] ---- Time[16:635]

[Exemple22g - distinct]


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;

public class Exemple22g {
    public static void main(String[] args) throws InterruptedException {
        // procesos
        Process<Integer> process = new Process<>("process", Observable.just(1, 2, 1, 3).distinct());
        // suscripciones
        ProcessUtils.subscribe(1, process);
    }
}

resultados

1
2
3
4
5
6
7
main : début observation ------Thread[main] ---- Time[05:373]
Subscriber[observateur[0],process] : onNext (1) ------Thread[main] ---- Time[05:594]
Subscriber[observateur[0],process] : onNext (2) ------Thread[main] ---- Time[05:595]
Subscriber[observateur[0],process] : onNext (3) ------Thread[main] ---- Time[05:596]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[05:597]
main : attente fin observation ------Thread[main] ---- Time[05:597]
main : fin observation ------Thread[main] ---- Time[05:597]

[Exemple22h - groupBy, asObservable]


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.observables.GroupedObservable;

public class Exemple22h {
    public static void main(String[] args) throws InterruptedException {
        // procesos
        Observable<GroupedObservable<Boolean, Integer>> obs = Observable.range(1, 10).groupBy(i -> i % 2 == 0);
        Process<Integer> process = new Process<>("process", obs.concatMap(g -> g.asObservable()));
        // suscripciones
        ProcessUtils.subscribe(1, process);
    }
}
  • línea 11: el método [groupBy] agrupa los 10 elementos emitidos en 2 grupos, los números pares y los números impares. El resultado es un tipo Observable<GroupedObservable<Boolean, Integer>>, es decir, un observable cuyos elementos son de tipo GroupedObservable<Boolean, Integer>, donde Boolean es el tipo de la clave del grupo (false, true en este caso) y que es también el tipo del resultado de la lambda pasada como parámetro al método [groupBy], y Integer el tipo de los elementos del grupo;
  • línea 12: el tipo GroupedObservable tiene un método [asObservable] que permite crear un observable a partir de este tipo. Por lo tanto, tendremos dos tipos Observable<Integer>, uno para los números pares y otro para los impares. A partir de estos dos observables, el método [concatMap] creará uno solo;

resultados

main : début observation ------Thread[main] ---- Time[23:809]
Subscriber[observateur[0],process] : onNext (1) ------Thread[main] ---- Time[24:034]
Subscriber[observateur[0],process] : onNext (3) ------Thread[main] ---- Time[24:036]
Subscriber[observateur[0],process] : onNext (5) ------Thread[main] ---- Time[24:037]
Subscriber[observateur[0],process] : onNext (7) ------Thread[main] ---- Time[24:038]
Subscriber[observateur[0],process] : onNext (9) ------Thread[main] ---- Time[24:039]
Subscriber[observateur[0],process] : onNext (2) ------Thread[main] ---- Time[24:041]
Subscriber[observateur[0],process] : onNext (4) ------Thread[main] ---- Time[24:043]
Subscriber[observateur[0],process] : onNext (6) ------Thread[main] ---- Time[24:044]
Subscriber[observateur[0],process] : onNext (8) ------Thread[main] ---- Time[24:045]
Subscriber[observateur[0],process] : onNext (10) ------Thread[main] ---- Time[24:046]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[24:047]
main : attente fin observation ------Thread[main] ---- Time[24:047]
main : fin observation ------Thread[main] ---- Time[24:048]

[Exemple22i - timestamp]


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
import rx.schedulers.Timestamped;

public class Exemple22i {
    public static void main(String[] args) throws InterruptedException {
        // proceso 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // proceso 2
        Process<Timestamped<Integer>> process2 = new Process<>("process2", process1.getObservable().timestamp());
        // suscripciones
        ProcessUtils.subscribe(1, process2);
    }
}
  • línea 15, el método [timestamp] asocia una hora a cada elemento de la variable observable procesada;

resultados

main : début observation ------Thread[main] ---- Time[59:362]
main : attente fin observation ------Thread[main] ---- Time[59:377]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[59:378]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[59:553]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[59:692]
Subscriber[observateur[0],process2] : onNext ({"timestampMillis":1462975259555,"value":0}) ------Thread[RxComputationThreadPool-3] ---- Time[59:789]
Subscriber[observateur[0],process2] : onNext ({"timestampMillis":1462975259789,"value":1}) ------Thread[RxComputationThreadPool-3] ---- Time[59:791]
Observable (process1,2) onNext (2) ------Thread[RxComputationThreadPool-4] ---- Time[00:025]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[00:027]
Subscriber[observateur[0],process2] : onNext ({"timestampMillis":1462975260026,"value":2}) ------Thread[RxComputationThreadPool-3] ---- Time[00:031]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[00:033]
main : fin observation ------Thread[main] ---- Time[00:034]

En este ejemplo, es difícil determinar qué representa la información timestamp:

  • líneas 4-5: se observa que el elemento 1 de process1 se emitió 139 ms después del elemento 0;
  • líneas 6 y 7: se observa que el elemento 1 de process2 se detectó 234 ms después del elemento 0;
  • líneas 5 y 8: se observa que el elemento 2 de process1 se emitió 33 ms después del elemento 1;
  • líneas 7 y 10: se observa que el elemento 2 de process2 se observó 37 ms después del elemento 1;

Estos desfases se deben a que los subprocesos de observación y de ejecución de los observables no son los mismos. Si sustituimos las líneas 12-13 por las siguientes (Ejemplo22j):


// proceso 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
null);
  • líneas 2-3: no se impone el hilo de observación. Sabemos que, en este caso, el observable se observa donde se ejecuta;

Esto da los siguientes resultados:

main : début observation ------Thread[main] ---- Time[43:834]
main : attente fin observation ------Thread[main] ---- Time[43:845]
Observable (process1) call start ------Thread[RxComputationThreadPool-1] ---- Time[43:846]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-1] ---- Time[44:291]
Subscriber[observateur[0],process2] : onNext ({"timestampMillis":1462976384293,"value":0}) ------Thread[RxComputationThreadPool-1] ---- Time[44:552]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-1] ---- Time[44:878]
Subscriber[observateur[0],process2] : onNext ({"timestampMillis":1462976384879,"value":1}) ------Thread[RxComputationThreadPool-1] ---- Time[44:884]
Observable (process1,2) onNext (2) ------Thread[RxComputationThreadPool-1] ---- Time[45:274]
Subscriber[observateur[0],process2] : onNext ({"timestampMillis":1462976385275,"value":2}) ------Thread[RxComputationThreadPool-1] ---- Time[45:280]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[45:281]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[45:283]
main : fin observation ------Thread[main] ---- Time[45:284]
  • líneas 4 y 6: el proceso process1 emite su elemento n.º 1 587 ms después de su elemento n.º 0;
  • líneas 5 y 7: el observador observa estos dos elementos con un intervalo de 586 ms;
  • líneas 6 y 8: el proceso process1 emite su elemento n.º 2 396 ms después de su elemento n.º 1;
  • líneas 7 y 9: el observador observa estos dos elementos con un intervalo de 396 ms;

En este caso, los valores de timestamp son coherentes: representan correctamente la fecha de emisión del elemento.

7.7. Los programadores

7.7.1. Ejemplo 23: el programador [Schedulers.computation]

Ahora examinamos los programadores de ejecución. La observación se realizará en el hilo de ejecución.

El tema de los programadores es un poco oscuro. Los diferentes programadores se presentan en esta pregunta en el sitio web de StackOverflow [http://stackoverflow.com/questions/31276164/rxjava-schedulers-use-cases]:

 

Intentaremos ilustrar el uso de estos diferentes programadores con ejemplos. El primero ilustra el programador [Schedulers.computation]:


package dvp.rxjava.observables.exemples;

import java.util.Random;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;

public class Exemple23 {
    public static void main(String[] args) throws InterruptedException {
        // procesos
        @SuppressWarnings("unchecked")
        Process<Double> processes[] = new Process[10];
        for (int i = 0; i < processes.length; i++) {
            processes[i] = new Process<>(
                    new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
                    Schedulers.computation(), null);
        }
        // suscripciones
        ProcessUtils.subscribe(1, processes);
    }
}
  • líneas 14-19: se crea una matriz de 10 procesos que se ejecutan en un hilo de cálculo;
  • línea 17: cada proceso genera un número real aleatorio;
  • línea 21: se suscribe a todos estos procesos;

Los resultados son los siguientes:

main : début observation ------Thread[main] ---- Time[01:034]
Observable (process0) call start ------Thread[RxComputationThreadPool-1] ---- Time[01:042]
Observable (process2) call start ------Thread[RxComputationThreadPool-3] ---- Time[01:042]
Observable (process1) call start ------Thread[RxComputationThreadPool-2] ---- Time[01:042]
Observable (process5) call start ------Thread[RxComputationThreadPool-6] ---- Time[01:043]
Observable (process7) call start ------Thread[RxComputationThreadPool-8] ---- Time[01:043]
Observable (process4) call start ------Thread[RxComputationThreadPool-5] ---- Time[01:042]
Observable (process3) call start ------Thread[RxComputationThreadPool-4] ---- Time[01:042]
main : attente fin observation ------Thread[main] ---- Time[01:043]
Observable (process6) call start ------Thread[RxComputationThreadPool-7] ---- Time[01:043]
Observable (process3,0) onNext (70.8) ------Thread[RxComputationThreadPool-4] ---- Time[01:115]
Observable (process1,0) onNext (13.2) ------Thread[RxComputationThreadPool-2] ---- Time[01:153]
Observable (process0,0) onNext (63.599999999999994) ------Thread[RxComputationThreadPool-1] ---- Time[01:215]
Subscriber[observateur[0],process0] : onNext (63.599999999999994) ------Thread[RxComputationThreadPool-1] ---- Time[01:326]
Subscriber[observateur[0],process3] : onNext (70.8) ------Thread[RxComputationThreadPool-4] ---- Time[01:326]
Subscriber[observateur[0],process1] : onNext (13.2) ------Thread[RxComputationThreadPool-2] ---- Time[01:326]
Observable (process3) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[01:326]
Observable (process0) onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[01:326]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-2] ---- Time[01:327]
Subscriber[observateur[0],process0].onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[01:327]
Subscriber[observateur[0],process3].onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[01:327]
Subscriber[observateur[0],process1].onCompleted ------Thread[RxComputationThreadPool-2] ---- Time[01:327]
Observable (process8) call start ------Thread[RxComputationThreadPool-1] ---- Time[01:329]
Observable (process9) call start ------Thread[RxComputationThreadPool-2] ---- Time[01:329]
...
main : fin observation ------Thread[main] ---- Time[01:610]
  • líneas 2-10: los 8 primeros procesos se inician en 8 subprocesos diferentes (la máquina utilizada tiene 8 núcleos). Se puede observar que todos se inician aproximadamente al mismo tiempo;
  • líneas 17-19: 3 procesos finalizan y liberan así 3 subprocesos;
  • líneas 23-24: los dos últimos procesos pueden entonces iniciarse utilizando 2 de los subprocesos así liberados;

Por lo tanto, cabe destacar que el programador [Schedulers.computation] proporciona un conjunto de n subprocesos, donde n es el número de núcleos de la máquina. Los subprocesos se ejecutan en paralelo en estos núcleos.

7.7.2. Ejemplo 24: el programador [Schedulers.io]

Ejecutamos el código anterior con el programador [Schedulers.io]:


package dvp.rxjava.observables.exemples;

import java.util.Random;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;

public class Exemple24 {
    public static void main(String[] args) throws InterruptedException {
        // procesos
        @SuppressWarnings("unchecked")
        Process<Double> processes[] = new Process[10];
        for (int i = 0; i < processes.length; i++) {
            processes[i] = new Process<>(
                    new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
                    Schedulers.io(), null);
        }
        // suscripciones
        ProcessUtils.subscribe(1, processes);
    }
}
  • línea 18: los procesos se ejecutan con los subprocesos del programador [Schedulers.io];

Esto da los siguientes resultados:

main : début observation ------Thread[main] ---- Time[03:451]
Observable (process0) call start ------Thread[RxCachedThreadScheduler-1] ---- Time[03:459]
Observable (process1) call start ------Thread[RxCachedThreadScheduler-2] ---- Time[03:459]
Observable (process2) call start ------Thread[RxCachedThreadScheduler-3] ---- Time[03:460]
Observable (process3) call start ------Thread[RxCachedThreadScheduler-4] ---- Time[03:460]
Observable (process4) call start ------Thread[RxCachedThreadScheduler-5] ---- Time[03:464]
Observable (process5) call start ------Thread[RxCachedThreadScheduler-6] ---- Time[03:464]
Observable (process6) call start ------Thread[RxCachedThreadScheduler-7] ---- Time[03:465]
Observable (process8) call start ------Thread[RxCachedThreadScheduler-9] ---- Time[03:465]
Observable (process9) call start ------Thread[RxCachedThreadScheduler-10] ---- Time[03:465]
main : attente fin observation ------Thread[main] ---- Time[03:465]
Observable (process7) call start ------Thread[RxCachedThreadScheduler-8] ---- Time[03:465]
Observable (process7,0) onNext (54.0) ------Thread[RxCachedThreadScheduler-8] ---- Time[03:473]
Observable (process8,0) onNext (116.39999999999999) ------Thread[RxCachedThreadScheduler-9] ---- Time[03:500]
Observable (process6,0) onNext (105.6) ------Thread[RxCachedThreadScheduler-7] ---- Time[03:506]
Observable (process0,0) onNext (96.0) ------Thread[RxCachedThreadScheduler-1] ---- Time[03:509]
Observable (process5,0) onNext (25.2) ------Thread[RxCachedThreadScheduler-6] ---- Time[03:583]
Observable (process3,0) onNext (97.2) ------Thread[RxCachedThreadScheduler-4] ---- Time[03:684]
Subscriber[observateur[0],process7] : onNext (54.0) ------Thread[RxCachedThreadScheduler-8] ---- Time[03:685]
Subscriber[observateur[0],process6] : onNext (105.6) ------Thread[RxCachedThreadScheduler-7] ---- Time[03:685]
Subscriber[observateur[0],process0] : onNext (96.0) ------Thread[RxCachedThreadScheduler-1] ---- Time[03:685]
Subscriber[observateur[0],process8] : onNext (116.39999999999999) ------Thread[RxCachedThreadScheduler-9] ---- Time[03:685]
Observable (process0) onCompleted ------Thread[RxCachedThreadScheduler-1] ---- Time[03:686]
Observable (process6) onCompleted ------Thread[RxCachedThreadScheduler-7] ---- Time[03:686]
Observable (process7) onCompleted ------Thread[RxCachedThreadScheduler-8] ---- Time[03:685]
...
main : fin observation ------Thread[main] ---- Time[03:933]
  • líneas 2-10: los 10 procesos se inician cada uno en un hilo diferente. A diferencia del caso anterior, se han podido iniciar todos los procesos. Se observa que estos inicios se realizan en un tiempo de 6 ms, mientras que anteriormente había sido de 1 ms;
  • líneas 13-18: los observables emiten uno tras otro y no de forma casi paralela, como había sido el caso anteriormente;

¿Cuál es la diferencia entre los programadores [Schedulers.io] y [Schedulers.computation]? Se puede encontrar una respuesta en URL [http://stackoverflow.com/questions/31276164/rxjava-schedulers-use-cases]:

 

7.7.3. Ejemplo 25: el programador [Schedulers.newThread]

Ejecutamos el código anterior con el programador [Schedulers.newThread]:


package dvp.rxjava.observables.exemples;

import java.util.Random;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;

public class Exemple25 {
    public static void main(String[] args) throws InterruptedException {
        // procesos
        @SuppressWarnings("unchecked")
        Process<Double> processes[] = new Process[10];
        for (int i = 0; i < processes.length; i++) {
            processes[i] = new Process<>(
                    new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
                    Schedulers.newThread(), null);
        }
        // suscripciones
        ProcessUtils.subscribe(1, processes);
    }
}

Los resultados obtenidos son los mismos que con el programador [Schedulers.io]:

main : début observation ------Thread[main] ---- Time[17:058]
Observable (process0) call start ------Thread[RxNewThreadScheduler-1] ---- Time[17:065]
Observable (process1) call start ------Thread[RxNewThreadScheduler-2] ---- Time[17:065]
Observable (process2) call start ------Thread[RxNewThreadScheduler-3] ---- Time[17:066]
Observable (process3) call start ------Thread[RxNewThreadScheduler-4] ---- Time[17:066]
Observable (process4) call start ------Thread[RxNewThreadScheduler-5] ---- Time[17:068]
Observable (process5) call start ------Thread[RxNewThreadScheduler-6] ---- Time[17:069]
Observable (process6) call start ------Thread[RxNewThreadScheduler-7] ---- Time[17:069]
Observable (process8) call start ------Thread[RxNewThreadScheduler-9] ---- Time[17:069]
Observable (process7) call start ------Thread[RxNewThreadScheduler-8] ---- Time[17:069]
Observable (process9) call start ------Thread[RxNewThreadScheduler-10] ---- Time[17:069]
main : attente fin observation ------Thread[main] ---- Time[17:069]
Observable (process6,0) onNext (25.2) ------Thread[RxNewThreadScheduler-7] ---- Time[17:120]
Observable (process3,0) onNext (39.6) ------Thread[RxNewThreadScheduler-4] ---- Time[17:193]
Observable (process5,0) onNext (21.599999999999998) ------Thread[RxNewThreadScheduler-6] ---- Time[17:212]
Observable (process0,0) onNext (19.2) ------Thread[RxNewThreadScheduler-1] ---- Time[17:273]
Observable (process8,0) onNext (81.6) ------Thread[RxNewThreadScheduler-9] ---- Time[17:308]
Subscriber[observateur[0],process3] : onNext (39.6) ------Thread[RxNewThreadScheduler-4] ---- Time[17:331]
Subscriber[observateur[0],process0] : onNext (19.2) ------Thread[RxNewThreadScheduler-1] ---- Time[17:331]
Subscriber[observateur[0],process6] : onNext (25.2) ------Thread[RxNewThreadScheduler-7] ---- Time[17:331]
Subscriber[observateur[0],process8] : onNext (81.6) ------Thread[RxNewThreadScheduler-9] ---- Time[17:331]
Subscriber[observateur[0],process5] : onNext (21.599999999999998) ------Thread[RxNewThreadScheduler-6] ---- Time[17:331]
Observable (process8) onCompleted ------Thread[RxNewThreadScheduler-9] ---- Time[17:333]
Observable (process5) onCompleted ------Thread[RxNewThreadScheduler-6] ---- Time[17:333]
Observable (process6) onCompleted ------Thread[RxNewThreadScheduler-7] ---- Time[17:332]
Observable (process0) onCompleted ------Thread[RxNewThreadScheduler-1] ---- Time[17:332]
Observable (process3) onCompleted ------Thread[RxNewThreadScheduler-4] ---- Time[17:332]
...
main : fin observation ------Thread[main] ---- Time[17:571]

En URL [http://stackoverflow.com/questions/33415881/retrofit-with-rxjava-schedulers-newthread-vs-schedulers-io], se explica que el programador [Schedulers.io] proporciona un grupo de subprocesos, algo que no hace el programador [Schedulers.newThread]. Un grupo de subprocesos creará automáticamente un número n de subprocesos. Los asignará a los procesos que los necesiten. Cuando estos procesos finalizan, sus subprocesos no se eliminan, sino que vuelven al grupo y pueden ser reutilizados por otro proceso. Esto resulta más eficiente que crear y eliminar subprocesos continuamente. Por lo tanto, se puede considerar que es preferible utilizar el programador [Schedulers.io].

7.7.4. Ejemplo 26: los programadores [Schedulers.immediate, Schedulers.trampoline]

Volvamos a la explicación dada para estos dos programadores:

 

La explicación es bastante fácil de entender, pero cuando queremos ilustrarla nos damos cuenta de que no la hemos entendido. El libro [Learning Reactive Programming With Java 8] me permitió crear un ejemplo que retoma uno encontrado en este libro, pero lo simplifica. Es el siguiente:


package dvp.rxjava.observables.exemples;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.function.Consumer;

import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Scheduler;
import rx.Scheduler.Worker;
import rx.functions.Action0;
import rx.schedulers.Schedulers;

public class Exemple26 {
    public static void main(String[] args) throws InterruptedException {

        // un programador
        Scheduler scheduler = Schedulers.immediate();
        // un trabajador de este programador
        Worker worker = scheduler.createWorker();
        // un tipo Action0 que se ejecutará en el trabajador
        Action0 action02 = new Action0() {
            @Override
            public void call() {
                // registro de acción02
                ProcessUtils.showInfos.accept("action02");
            }
        };

        // un tipo Action0 que se ejecutará en el worker
        Action0 action01 = new Action0() {
            @Override
            public void call() {
                // se programa una nueva acción en el mismo worker
                worker.schedule(action02);
                // registro de la acción01
                ProcessUtils.showInfos.accept("action01");
            }
        };
        // la acción01 está programada en el worker
        worker.schedule(action01);
    }

    // visualizaciones
    static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
            Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));

}
  • línea 17: un programador. Será [Schedulers.immediate], como aquí, o bien [Schedulers.trampoline] más adelante;
  • línea 19: se pueden ejecutar acciones del tipo Action0 (líneas 21, 20) en los trabajadores del programador. El método [Scheduler.createWorker] permite crear un trabajador. El método [Worker.schedule(Action0)] permite que un trabajador ejecute un tipo Action0;
  • líneas 21-27: una primera acción denominada [action02] que será ejecutada (línea 40) por el worker de la línea 19;
  • líneas 30-38: una segunda acción denominada [action01]. Tiene la particularidad de hacer que se ejecute la acción action02 en el mismo worker que ella (línea 34). Ahí radica la diferencia entre [Schedulers.immediate] y [Schedulers.trampoline]:
    • si el programador es [Schedulers.immediate], entonces, en la línea 34, la acción action02 se ejecutará inmediatamente (de ahí el nombre del programador) y la acción action01 en curso se interrumpirá. A continuación, aparecerá el mensaje de la línea 25. Una vez finalizada la acción action02, se reanudará la acción action01 y veremos el mensaje de la línea 36;
    • si el programador es [Schedulers.trampoline], entonces, en la línea 34, la acción action02 se pone en espera. No se ejecutará hasta que la tarea en curso action01 haya finalizado. Entonces aparecerá el mensaje de la línea 36. Una vez finalizada la acción action01, se ejecutará la acción action02 y veremos el mensaje de la línea 25;

La ejecución del código anterior da los siguientes resultados:

action02 ------Thread[main] ---- Time[38:480]
action01 ------Thread[main] ---- Time[38:485]

Si en la línea 17 se utiliza el programador [Schedulers.trampoline], se obtienen los resultados inversos:

action01 ------Thread[main] ---- Time[42:972]
action02 ------Thread[main] ---- Time[42:976]

Dicho esto, es difícil establecer una relación con los observables. No he encontrado ningún ejemplo convincente que pudiera demostrar el interés de ejecutar un observable en uno de estos dos subprocesos. Sin embargo, aquí hay uno, aunque no me parece nada natural:


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.Scheduler.Worker;
import rx.functions.Action1;
import rx.schedulers.Schedulers;

public class Exemple27 {
    public static void main(String[] args) throws InterruptedException {

        // Worker
        Worker worker = Schedulers.immediate().createWorker();
        // Trabajador trabajador = Schedulers.trampoline().createWorker();
        // observable 1 en el worker
        worker.schedule(() -> Observable.range(1, 2).subscribe(new Action1<Integer>() {

            @Override
            public void call(Integer i) {
                ProcessUtils.showInfos.accept(String.valueOf(i));
                // observable 2 en el mismo worker
                worker.schedule(() -> Observable.range(100, 2).subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer i) {
                        ProcessUtils.showInfos.accept(String.valueOf(i));
                    }
                }));
            }
        }));
    }
}
  • líneas 13-14: se crea un worker a partir de uno de los dos programadores [Schedulers.immediate] y [Schedulers.trampoline];
  • línea 16: se programa un primer observable obs1 en este worker para emitir los números [1,2]
  • línea 22: cada vez que se observa un elemento de este observable obs1, se inicia la observación de un segundo observable obs2 en el mismo worker para emitir los números [100,101];

Con el programador [Schedulers.immediate], se obtienen los siguientes resultados:

1
2
3
4
5
6
1 ------Thread[main] ---- Time[44:604]
100 ------Thread[main] ---- Time[44:610]
101 ------Thread[main] ---- Time[44:610]
2 ------Thread[main] ---- Time[44:612]
100 ------Thread[main] ---- Time[44:612]
101 ------Thread[main] ---- Time[44:612]

Mientras que con el programador [Schedulers.trampoline] se obtienen los siguientes resultados:

1
2
3
4
5
6
1 ------Thread[main] ---- Time[14:107]
2 ------Thread[main] ---- Time[14:114]
100 ------Thread[main] ---- Time[14:115]
101 ------Thread[main] ---- Time[14:115]
100 ------Thread[main] ---- Time[14:115]
101 ------Thread[main] ---- Time[14:116]

7.8. Conclusion

Queda mucho por hacer. Para profundizar en la biblioteca RxJava, se invita al lector a continuar su formación con las referencias indicadas al principio de este documento. A pesar de todo, contamos con las bases para utilizar RxJava en los entornos Swing y Android. Esto es lo que vamos a mostrar ahora.