7. La biblioteca RxJava
La biblioteca RxJava se basa en el siguiente concepto: un flujo de elementos de tipo T Observable<T> es observado por uno o más suscriptores (suscriptores, observadores, consumidores) Subscriber<T>. La biblioteca RxJava permite que el flujo Observable<T> se ejecute en el hilo T1 y su observador Subscriber<T> en el hilo T2 sin que el desarrollador tenga que preocuparse por gestionar el ciclo de vida de estos hilos ni por problemas naturalmente difíciles, como compartir datos entre hilos y sincronizarlos para ejecutar una tarea global. Por lo tanto, facilita la programación asíncrona.
Un flujo Observable<T> produce elementos de tipo T, que pueden observarse a medida que se producen. Si el observador y el observable (término utilizado de manera general para referirse al tipo Observable<T>) se encuentran en el mismo hilo, entonces el observable solo puede producir el elemento (i+1) una vez que el observador haya consumido el elemento i. Hay pocos casos en los que esta arquitectura resulte útil. Si el observador y el observable no se encuentran en el mismo hilo, entonces el observable y su observador se comportan de forma autónoma: el observable emite a su propio ritmo y el observador consume a su propio ritmo. Aquí es donde reside el valor de la biblioteca. Hasta ahora, solo hemos hablado de un único observador. En realidad, un observable puede tener cualquier número de observadores.
La biblioteca RxJava se adapta especialmente bien a la arquitectura descrita en la sección 2 de la introducción y resumida aquí:

- en [1], una capa de servicios proporciona servicios, algunos de los cuales tardan mucho tiempo en obtenerse (solicitudes de red, por ejemplo);
- esta capa de servicio es invocada por una interfaz gráfica de usuario [1] (Swing, Android, JavaFX). Si la capa de servicio se ejecuta en el mismo hilo que el método [Swing] que la utiliza, la interfaz gráfica de usuario se bloquea (deja de responder) mientras espera el resultado del servicio;
- En [2], una capa de adaptación ligera implementada con RxJava permite que la capa GUI reciba una implementación asíncrona del mismo servicio: este servicio puede ejecutarse en un hilo diferente al del método de la capa GUI que lo invoca. En este caso, la GUI [3] sigue respondiendo: el usuario puede seguir interactuando con ella, por ejemplo, activando una nueva solicitud de red en paralelo con la primera y, lo que es más importante, se le puede dar al usuario la opción de cancelar los procesos que tardan demasiado, algo imposible si la GUI está bloqueada;
- La llamada [4] es sincrónica, mientras que las llamadas [5-6] son asincrónicas;
En esta arquitectura, la capa [2] proporciona servicios que devuelven tipos Observable<T> a los que pueden suscribirse los métodos de la capa gráfica [3]. A continuación, un servicio de la capa [2] entrega sus resultados uno por uno, y la capa [3] puede reaccionar a cada uno de ellos, por ejemplo, actualizando uno o más componentes de la interfaz gráfica de usuario.
La clase Observable<T> tiene docenas de métodos. Este es uno de los retos de la biblioteca: es muy completa y resulta difícil abarcar todas sus posibilidades. Presentaremos algunos de ellos. El dominio de los demás métodos vendrá con el tiempo.
7.1. Creación de observables y suscripción a ellos
7.1.1. Ejemplo-01: el método [Observable.from]
![]() |
Considera el siguiente código:
package dvp.rxjava.observables;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import java.util.Arrays;
public class Exemple01 {
public static void main(String[] args) {
// observable integers
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
obs1.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.printf("next : %s%n", integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println(throwable);
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed");
}
});
}
}
- Línea 12: Creamos un tipo Observable<Integer> a partir de una lista de enteros.
La clase Observable<T> es un flujo de elementos de tipo T que pueden observarse —preferiblemente de forma asíncrona, aunque no necesariamente— a medida que se producen. Su definición es la siguiente:
![]() |
Como se ha mencionado anteriormente, la clase Observable<T> cuenta con docenas de métodos. Algunos son similares a los de la clase Stream<T> que se ha tratado en la sección 5. La documentación de RxJava incluye «diagramas de canicas» [2] que ilustran cómo funcionan estos métodos:
- La línea 3 ilustra las emisiones del observable a lo largo del tiempo;
- el método [4] se aplica a los elementos emitidos por el observable. Por lo general, produce un nuevo observable;
- la línea 5 muestra el nuevo observable obtenido;
El método [Observable.from] tiene la siguiente firma:
![]() |
El método estático [Observable.from] permite crear un Observable<T> a partir de una colección de elementos de tipo T. Se trata de una forma muy sencilla de iniciarse en el uso de observables. La línea:
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
emitirá, por lo tanto, tres elementos. No los emite inmediatamente. Los emitirá en su totalidad cada vez que se registre un suscriptor. Esto se denomina observable en frío. El observable vuelve a emitir sus elementos para cada nuevo suscriptor.
Podemos considerar la instrucción anterior como una acción de configuración para el observable. Se configura una vez y se ejecuta n veces si aparecen n suscriptores.
¿Cómo se suscribe?
Una forma de hacerlo es utilizar el método [Observable.subscribe], cuya definición aquí es la siguiente:
![]() |
- el primer parámetro [Action1<T> onNext] (véase la sección 6.2) del método es el método que se ejecutará cuando el observable emita un nuevo elemento T;
- el segundo parámetro [Action1<Throwable> onError] del método es el método que se ejecutará cuando el observable lance una excepción;
- el tercer parámetro [Action0 onComplete] (véase la sección 6.1) del método es el método que se ejecutará cuando el observable emita una excepción;
- el método devuelve un tipo [Subscription];
El tipo [Subscription] representa una suscripción al observable. Su definición es la siguiente:
![]() |
El valor de esta interfaz [1] reside en su método [2], que permite cancelar una suscripción.
En nuestro ejemplo, el código para suscribirse al observable es el siguiente:
obs1.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.printf("next : %s%n", integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println(throwable);
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed");
}
});
- línea 1: se ignora el resultado de tipo [Subscription];
- líneas 1–15: los tres parámetros son instancias de clases anónimas. También utilizaremos lambdas. La ventaja de las clases anónimas es que los tipos de datos que espera el único método de estas clases son claramente visibles;
- líneas 2-5: implementación del primer parámetro de tipo [Action1<Integer>];
- líneas 6–10: implementación del segundo parámetro de tipo [Action1<Throwable>];
- líneas 11–15: implementación del tercer parámetro de tipo [Action0];
El código completo es el siguiente:
package dvp.rxjava.observables;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import java.util.Arrays;
public class Exemple01 {
public static void main(String[] args) {
// observable integers
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
// subscription
obs1.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.printf("next : %s%n", integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println(throwable);
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed");
}
});
}
}
El observable de la línea 12 comienza a emitir sus tres elementos tan pronto como se invoca el método [subscribe] en la línea 14. A partir de ese momento:
- por cada elemento emitido, se ejecutan las líneas 15–18.
- cuando se han emitido los 3 elementos, se ejecutan las líneas 24–29;
- las líneas 19–24 nunca se ejecutarán porque el observable no emite una excepción aquí;
Por defecto, el observable y el observador se ejecutan en el mismo hilo. Hay algunos observables predefinidos que se ejecutan en un hilo distinto del hilo principal (en este caso, el hilo del método main), pero para la mayoría de ellos no es así. Por lo tanto, aquí todo ocurre en el hilo del método main:
- el observable emite el elemento 1;
- las líneas 15–18 se ejecutan y muestran este elemento;
- el observable emite el elemento 2;
- Las líneas 15-18 ejecutan y muestran este elemento;
- el observable emite el elemento 3;
- las líneas 15–18 ejecutan y muestran este elemento;
- el observable emite la notificación [completed];
- se ejecutan las líneas 24–29;
Esto es lo que muestran los resultados:
La clase [Example02] reutiliza [Example01], esta vez utilizando funciones lambda como parámetros para el método [Observable.subscribe]:
package dvp.rxjava.observables;
import java.util.Arrays;
import rx.Observable;
public class Exemple02 {
public static void main(String[] args) {
// observable integers
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
// subscription
obs1.subscribe(
(integer) -> System.out.printf("next : %s%n", integer),
(th) -> System.out.println(th),
() -> System.out.println("completed"));
}
}
7.1.2. Ejemplo-03: La clase Observer
![]() |
El método [Observable.subscribe], que permite suscribirse a un observable, tiene varias versiones, entre las que se incluyen las siguientes:
package dvp.rxjava.observables;
import java.util.Arrays;
import rx.Observable;
import rx.Observer;
public class Exemple03 {
public static void main(String[] args) {
// observable integers
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
// subscription
obs1.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("completed");
}
@Override
public void onError(Throwable th) {
System.out.printf("throwable %s", th);
}
@Override
public void onNext(Integer integer) {
System.out.printf("next : %s%n", integer);
}
});
};
}
Línea 13: En lugar de pasar tres parámetros al método [subscribe], le pasamos un tipo [Observer] de la siguiente manera:
![]() |
El tipo [Observer] es una interfaz con tres métodos:
- [onNext(T t)], que se invoca cada vez que el observable emite un elemento t;
- [onError(Throwable th)], que se invoca cuando el observable lanza una excepción th;
- [onCompleted], que se invoca cuando el observable indica que ha terminado de emitir;
El código funciona de manera similar a lo explicado anteriormente. Obtenemos los siguientes resultados:
7.1.3. Ejemplo-04: El método [Observable.create]
![]() |
El método estático *Observable.create* se define de la siguiente manera:
![]() |
- El método [create] devuelve un tipo Observable<T>;
- el parámetro del método [create] es una función de tipo [Observable.OnSubscribe<T>] definida de la siguiente manera:
![]() |
El tipo [Observable.OnSubscribe<T>] es una interfaz funcional que, a su vez, extiende la interfaz funcional [Action1<Subscriber<? super T>>]. El método [call] de esta interfaz espera un tipo [Subscriber] (suscriptor, observador) definido de la siguiente manera:
![]() |
Vemos en [1] que la clase [Subscriber<T>] implementa la interfaz [Observer<T>] presentada en la sección 7.1.2.
En definitiva, el método [<T> Observable.create]:
- toma como parámetro una instancia de tipo [Observable.OnSubscribe<T>] con un único método: void call(Subscriber<T> s). El tipo [Subscriber<T>] extiende el tipo [Observer<T>] y, por lo tanto, tiene los métodos onNext, onError y onCompleted;
- devuelve un tipo Observable<T>;
El método [<T> Observable.create] devuelve un observable configurado. Aún no se ha emitido ningún elemento. Cuando un suscriptor [Subscriber<T> s] se suscribe a este observable, se invoca el método [void call(s)] de la función pasada como parámetro al método [<T> Observable.create]. Su función es emitir elementos t de tipo T y llamar al método del observador [s.onNext(t)] en cada emisión. Cuando esto se completa, se debe llamar al método [s.onCompleted(t)] del observador y el método [call] debe finalizar. Si el método [call] encuentra una excepción th, se debe llamar al método [s.onError(th)] del observador y el método [call] debe finalizar;
Para ilustrar este complejo comportamiento, utilizaremos el siguiente código [Ejemplo04]:
package dvp.rxjava.observables;
import rx.Observable;
import rx.Subscriber;
import java.util.Random;
public class Exemple04 {
public static void main(String[] args) {
// observable configuration of reals
Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
@Override
public void call(Subscriber<? super Double> subscriber) {
for (int i = 0; i < 3; i++) {
// emission element i
subscriber.onNext(new Random((i + 1)).nextDouble());
}
// end of issue
subscriber.onCompleted();
}
});
// subscription and therefore emission
obs1.subscribe((d) -> System.out.printf("onNext %s%n", d), (th) -> System.out.printf("onError %s%n", th),
() -> System.out.println("onCompleted"));
}
}
- línea 11: se crea un observable que emite tipos Double;
- líneas 11–21: el parámetro del método [create] se instancia con una clase anónima que contiene el único método [call] de las líneas 12–20. El observable creado en la línea 11 está listo para emitir, pero solo lo hará cuando llegue un observador;
- líneas 13–21: el método [call] recibe una referencia a un observador;
- líneas 14–17: se emiten tres elementos al observador;
- línea 19: notifica al observador que la emisión ha finalizado;
- líneas 23–24: suscripción al observable de la línea 11. Implementamos los tres parámetros [onNext, onError, onCompleted] del método [subscribe] utilizando tres lambdas. Esta suscripción creará el suscriptor [Subscriber<Double>], que se pasará al método [call] en la línea 13. A continuación, comenzará la emisión de elementos;
- todo ocurre en el mismo hilo: observable y observador;
Obtenemos los siguientes resultados:
El método [Observable.create] permite crear un observable a partir de cualquier evento. Este es el método que utilizamos en la sección 2 de la introducción para transformar una interfaz síncrona en una asíncrona.
7.1.4. Ejemplo-05: Refactorización del [Ejemplo-04]
![]() |
El siguiente ejemplo presenta una nueva versión del método estático [Observable.subscribe]:
package dvp.rxjava.observables;
import rx.Observable;
import rx.Subscriber;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
public class Exemple05 {
public static void main(String[] args) {
// configuration of a real observable
Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
@Override
public void call(Subscriber<? super Double> subscriber) {
showInfos("Observable.call start");
for (int i = 0; i < 3; i++) {
// waiting
try {
Thread.sleep(500 - i * 100);
} catch (InterruptedException e) {
// error
subscriber.onError(e);
}
// action
double value = new Random().nextInt(100) * 1.2;
showInfos(String.format("Observable.call onNext(%s)", value));
subscriber.onNext(value);
}
// finish
showInfos(String.format("Observable.call onCompleted"));
subscriber.onCompleted();
}
});
// a subscriber
Subscriber<Double> subscriber = new Subscriber<Double>() {
@Override
public void onCompleted() {
showInfos("Subscriber.onCompleted");
}
@Override
public void onError(Throwable e) {
showInfos(String.format("Subscriber.onError (%s)", e));
}
@Override
public void onNext(Double aDouble) {
showInfos(String.format("Subscriber.onNext (%s)", aDouble));
}
};
// subscription
showInfos("avant souscription");
obs1.subscribe(subscriber);
showInfos("après souscription");
}
private static void showInfos(String message) {
System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
new SimpleDateFormat("ss:SSS").format(new Date()));
}
}
- línea 56: la nueva versión del método estático [Observable.subscribe] acepta el tipo [Subscriber] como parámetro, que hemos presentado en el párrafo anterior;
- líneas 37–52: el suscriptor (observador). Implementa la interfaz Observer con sus tres métodos onNext, onError y onCompleted;
- Líneas 61–64: A partir de aquí, nos centraremos en los subprocesos en los que se ejecutan el observable y su observador;
- línea 62: el nombre del hilo;
- línea 63: la hora actual expresada en segundos y milisegundos. Esto nos permitirá realizar un seguimiento a lo largo del tiempo de la emisión de elementos por parte del observable y su procesamiento por parte del observador;
- Este código tiene la misma funcionalidad que el código anterior. Simplemente hemos refactorizado este último;
Los resultados obtenidos son los siguientes:
- Línea 1 de los resultados: antes de la línea 56 del código, aún no ha ocurrido nada. El observable simplemente se ha configurado;
- Línea 2 de los resultados: la línea 56 del código activa una llamada al método [call] de la línea 15. Línea 3: se emite el número real 80,39 al observador;
- Línea 4: El observador recibe el número emitido;
- Líneas 5-8: el proceso anterior se repite dos veces;
- línea 9: el observable envía la notificación de fin de transmisión;
- línea 10: el observador la recibe;
- línea 11: se muestra mediante la línea 57 del código;
Podemos ver, por lo tanto, que la única línea de suscripción (línea 56) provocó que se mostraran las líneas 2–10 de los resultados. Al empezar a trabajar con la biblioteca RxJava, uno se pregunta cómo se relacionan las cosas entre sí, en particular las conexiones entre el observador y el observable. Aquí vemos que la línea 56, la suscripción al observable,
- desencadenó la emisión de todos los elementos del observable;
- que el observable y el observador se ejecutan en el mismo hilo;
- y que, debido a esto, observamos la secuencia: emitir elemento i, observar elemento i, emitir elemento (i+1), observar elemento (i+1), ...
Recordemos que el emisor estaba esperando antes de emitir sus elementos:
// attente
try {
Thread.sleep(500 - i * 100);
} catch (InterruptedException e) {
// erreur
subscriber.onError(e);
}
donde i en la línea 3 representa el número de emisión (0 <= i < 3). Si observamos los tiempos de emisión de los elementos del observable:
- líneas 2 y 3: el elemento 0 se emitió aproximadamente 500 ms después de que comenzara la suscripción;
- líneas 3 y 5: el elemento 1 se emitió aproximadamente 400 ms después del elemento 0;
- líneas 5 y 7: el elemento 2 se emitió aproximadamente 300 ms después del elemento 1;
7.2. Hilo de ejecución, hilo de observación
7.2.1. Ejemplo-06: Observable y observador en un hilo distinto de [main]
![]() |
Reestructuramos el ejemplo anterior de la siguiente manera [Ejemplo 06]:
package dvp.rxjava.observables;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class Exemple06 {
public static void main(String[] args) {
// gatekeeper
CountDownLatch latch = new CountDownLatch(1);
// configuration of a real observable
Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
@Override
public void call(Subscriber<? super Double> subscriber) {
showInfos("Observable.call start");
for (int i = 0; i < 3; i++) {
// waiting
try {
Thread.sleep(500 - i * 100);
} catch (InterruptedException e) {
// error
subscriber.onError(e);
}
// action
double value = new Random().nextInt(100) * 1.2;
showInfos(String.format("Observable.call onNext(%s)", value));
subscriber.onNext(value);
}
// finish
showInfos(String.format("Observable.call onCompleted"));
subscriber.onCompleted();
}
});
// a subscriber
Subscriber<Double> subscriber = new Subscriber<Double>() {
@Override
public void onCompleted() {
showInfos("Subscriber.onCompleted");
// we lower the barrier
latch.countDown();
}
@Override
public void onError(Throwable e) {
showInfos(String.format("Subscriber.onError (%s)", e));
}
@Override
public void onNext(Double aDouble) {
showInfos(String.format("Subscriber.onNext (%s)", aDouble));
}
};
// suite observable configuration
obs1 = obs1.subscribeOn(Schedulers.computation());
// subscription
showInfos("avant souscription");
obs1.subscribe(subscriber);
// waiting at the gate
try {
showInfos("début attente barrière");
latch.await();
showInfos("fin attente barrière");
} catch (InterruptedException e1) {
System.out.println(e1);
}
showInfos("après souscription");
}
private static void showInfos(String message) {
System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
new SimpleDateFormat("ss:SSS").format(new Date()));
}
}
- Línea 16: Creamos una barrera de seguridad (semáforo) utilizando un objeto [CountDownLatch]. Este objeto se utiliza para sincronizar subprocesos entre sí. Aquí se inicializa con el valor 1, al que nos referiremos como el valor de la barrera de seguridad (o semáforo). Un subproceso espera a la barrera de seguridad mediante la siguiente operación:
latch.await();
El subproceso queda bloqueado si el valor del latch es >0. Un subproceso puede incrementar o decrementar el valor interno del latch. Línea 48: el valor del latch se decrementa en 1.
- Línea 63: el observable se configura para ejecutarse en un hilo proporcionado por el programador [Schedulers.computation()]. Este programador puede proporcionar tantos hilos como núcleos haya en la máquina de ejecución. La sección sobre la aplicación de ejemplo mostró el uso de otros programadores (véase la sección 2.8);
El principio del código es el siguiente:
- el método [main] se ejecuta en el hilo principal;
- línea 66: comienza a emitir elementos desde el observable. Estos se emitirán en un hilo distinto del hilo principal;
- línea 70: el hilo principal se bloquea porque la barrera tiene el valor 1 (véase la línea 16). Solo puede continuar cuando este valor cambie a 0. Esto ocurre en la línea 48. Es el observador el que baja la barrera cuando recibe la notificación de que el observable ha terminado de emitir;
La ejecución arroja los siguientes resultados:
- línea 1: la suscripción está a punto de realizarse;
- línea 2: esto desencadena la ejecución del método [call] en el hilo [RxComputationThreadPool-1]. Ahora tenemos una ejecución paralela con dos hilos;
- línea 3: por una razón desconocida, el hilo [RxComputationThreadPool-1] ha cedido el control. El hilo [main] toma entonces el control y queda bloqueado por la barrera (línea 70 del código). A partir de este momento, solo el hilo [RxComputationThreadPool-1] puede operar;
- líneas 4–11: observamos el comportamiento visto anteriormente entre el observable y su observador, pero ahora todo tiene lugar en el hilo [RxComputationThreadPool-1];
- líneas 12-13: el observador ha bajado la barrera (línea 48 del código) y el hilo [RxComputationThreadPool-1] ha finalizado. El hilo [main] toma el control y muestra dos mensajes;
7.2.2. Ejemplo-07: Observable y observador en dos subprocesos diferentes
![]() |
Modificamos el ejemplo anterior de la siguiente manera:
package dvp.rxjava.observables;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class Exemple07 {
public static void main(String[] args) {
// gatekeeper
CountDownLatch latch = new CountDownLatch(1);
// configuration of a real observable
Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
@Override
public void call(Subscriber<? super Double> subscriber) {
showInfos("Observable.call start");
for (int i = 0; i < 3; i++) {
// waiting
try {
Thread.sleep(500 - i * 100);
} catch (InterruptedException e) {
// error
subscriber.onError(e);
}
// action
double value = new Random().nextInt(100) * 1.2;
showInfos(String.format("Observable.call onNext(%s)", value));
subscriber.onNext(value);
}
// finish
showInfos(String.format("Observable.call onCompleted"));
subscriber.onCompleted();
}
});
// a subscriber
Subscriber<Double> subscriber = new Subscriber<Double>() {
@Override
public void onCompleted() {
showInfos("Subscriber.onCompleted");
// we lower the barrier
latch.countDown();
}
@Override
public void onError(Throwable e) {
showInfos(String.format("Subscriber.onError (%s)", e));
}
@Override
public void onNext(Double aDouble) {
showInfos(String.format("Subscriber.onNext (%s)", aDouble));
}
};
// suite observable configuration
obs1 = obs1.subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation());
// subscription
showInfos("avant souscription");
obs1.subscribe(subscriber);
// waiting in front of the barrier
try {
showInfos("début attente barrière");
latch.await();
showInfos("fin attente barrière");
} catch (InterruptedException e1) {
System.out.println(e1);
}
showInfos("après souscription");
}
private static void showInfos(String message) {
System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
new SimpleDateFormat("ss:SSS").format(new Date()));
}
}
El código es idéntico al del ejemplo anterior, salvo por la línea 63:
obs1 = obs1.subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation());
que configura el observable (subscribeOn) y el observador (observeOn) para que se ejecuten en uno de los subprocesos proporcionados por el programador [Schedulers.computation()].
Los resultados obtenidos son los siguientes:
Cabe destacar lo siguiente:
- el observable se ejecuta en el hilo [RxComputationThreadPool-4] (líneas 3–4, 6, 8–9);
- el observador se ejecuta en el hilo [RxComputationThreadPool-3] (líneas 5, 7, 10-11);
- se ejecutan de forma independiente. Así, en las líneas 8–9, el observable emite dos notificaciones (onNext, onCompleted) antes de que el observador recupere la notificación [onNext] (línea 10);
La biblioteca RxJava se encarga de la transferencia de datos (emisiones) desde el hilo del observable al hilo del observador. El desarrollador no tiene que preocuparse por esto.
Hemos visto cómo crear observables (Observable.from, Observable.create). Ahora veamos los observables predefinidos en la biblioteca RxJava.
7.3. Observables predefinidos
7.3.1. Ejemplo-08: el método [Observable.range]
![]() | ![]() |
A partir de ahora, utilizaremos clases específicas para los procesos observados y sus observadores. La idea es poder registrar sus nombres, sus subprocesos de ejecución y sus tiempos de ejecución para poder realizar un seguimiento a lo largo del tiempo.
La clase [Process] será simplemente un Observable al que podamos asignar un nombre. Implementará la siguiente interfaz [IProcess]:
package dvp.rxjava.observables.utils;
import rx.Observable;
public interface IProcess<T> {
// name of observable
public String getName();
// observable
public Observable<T> getObservable();
}
Esta interfaz puede implementarse mediante la siguiente clase [Process<T>]:
package dvp.rxjava.observables.utils;
import rx.Observable;
import rx.Scheduler;
public class Process<T> implements IProcess<T>{
// observable name
protected String name;
// observed process
protected Observable<T> observable;
// manufacturers
public Process(String name, Observable<T> observable) {
// local initializations
this.name = name;
this.observable = observable;
}
// getters and setters
public String getName() {
return name;
}
public Observable<T> getObservable() {
return observable;
}
}
- línea 9: el nombre del proceso;
- línea 11: la observable observada;
- líneas 14–18: el constructor;
El observador se describirá mediante la siguiente clase [Observer]:
package dvp.rxjava.observables.utils;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import rx.Subscriber;
public class Observateur<T> extends Subscriber<T> {
...
}
- Línea 11: La clase `Observateur<T>` hereda de la clase `Subscriber<T>`, que presentamos brevemente en la sección 7.1.3. La utilizaremos como argumento del método `[Observable.subscribe]`:
// exécution observable (observation)
obs1.subscribe(observateur);
El método [Observable.subscribe] utilizado en la línea 2 anterior tiene la siguiente definición:
![]() |
La función principal del [Subscriber] es gestionar los elementos emitidos por el observable al que se ha suscrito utilizando los métodos de la interfaz [Observer]: onNext, onError, onCompleted. La clase [Subscriber] tiene los siguientes métodos:
![]() |
En el código de la clase [Observer], utilizaremos el método [1] isUnsubscribed para determinar si la suscripción del suscriptor se ha cancelado o no. La clase completa [Observer<T>] es la siguiente:
package dvp.rxjava.observables.utils;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import rx.Subscriber;
public class Observateur<T> extends Subscriber<T> {
// a gatekeeper (semaphore)
private CountDownLatch latch;
// a display method
private Consumer<String> showInfos;
// observer's name
private String observerName;
// the name of the observed process
private String processName;
// manufacturers
public Observateur() {
}
public Observateur(String name, CountDownLatch latch, Consumer<String> showInfos, String observedName) {
this.observerName = name;
this.latch = latch;
this.showInfos = showInfos;
this.processName = observedName;
}
// --------------------------- implementation interface Observer<T>
@Override
public void onCompleted() {
// end of issues
if (!isUnsubscribed()) {
showInfos.accept(String.format("Subscriber [%s,%s].onCompleted", observerName, processName));
}
// end of main thread lock
latch.countDown();
}
@Override
public void onError(Throwable e) {
// emission error
if (!isUnsubscribed()) {
showInfos.accept(String.format("Subscriber [%s, %s].onError (%s)", observerName, processName, e));
}
}
@Override
public void onNext(T value) {
// an additional show
if (!isUnsubscribed()) {
try {
showInfos.accept(String.format("Subscriber [%s,%s] : onNext (%s)", observerName, processName,
new ObjectMapper().writeValueAsString(value)));
} catch (JsonProcessingException e) {
showInfos.accept(String.format("Subscriber [%s,%s].onNext (%s)", observerName, processName, e));
}
}
}
}
- Además de las características de un suscriptor, el observador llevará la siguiente información:
- línea 14: una barrera o semáforo que se utilizará para bloquear el hilo principal hasta que el observador haya recibido todos los elementos emitidos por el observable. Esto ocurrirá en la línea 36 del código cuando el observador reciba la notificación de fin de emisión del observable;
- línea 16: una instancia de Consumer<String> que se utilizará para mostrar un mensaje en la consola;
- línea 18: el nombre del observador, utilizado para distinguir entre observadores cuando hay varios;
- línea 20: el nombre del proceso observado;
- líneas 36, 46, 54: los métodos [onCompleted, onError, onNext] de la interfaz [Observer<T>] implementados por la clase abstracta [Subscriber<T>]. Esta clase no los implementa. Por lo tanto, esto debe hacerse en sus clases hijas. Antes de hacer nada en estos métodos, comprobamos si el observador se ha dado de baja del observable que está observando;
- línea 59: el método [onNext] del observador escribe la cadena JSON del elemento recibido. Esto nos permitirá mostrar varios tipos de elementos;
Dicho esto, examinemos un nuevo método de la clase Observable, el método [range]:
![]() |
El observable Observable.range(n, m) emite (m) números enteros que van desde n hasta n+m-1. Lo exploraremos con el siguiente código [Ejemplo08]:
package dvp.rxjava.observables.exemples;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import dvp.rxjava.observables.utils.Observateur;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple08 {
public static void main(String[] args) throws InterruptedException {
// number of observers
final int nbObservateurs = 2;
// semaphore
CountDownLatch latch = new CountDownLatch(nbObservateurs);
// observable configuration
Observable<Integer> obs1 = Observable.range(15, 3).subscribeOn(Schedulers.computation());
// observable performance (observation)
showInfos.accept("main : début observation");
for (int i = 0; i < nbObservateurs; i++) {
obs1.subscribe(new Observateur<>(String.format("observateur[%d]", i), latch, showInfos,"obs1"));
}
// waiting
showInfos.accept("main : attente fin observation");
latch.await();
// end
showInfos.accept("main : fin observation");
}
// displays
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
- Línea 16: Usaremos dos observadores;
- línea 19: el guardrail (semáforo) se inicializa en dos porque colocaremos cada observador en un hilo diferente. Por lo tanto, el hilo principal tendrá que esperar a que ambos hilos de observador terminen;
- línea 22: configuramos el observable para que se ejecute en un hilo del programador [Schedulers.computation()]. El observador estará en el mismo hilo que el observable;
- líneas 25–27: suscribimos dos observadores al observable. Esto activará la ejecución completa del observable para cada observador: se emitirán los enteros 15, 16 y 17;
- línea 30: el hilo principal espera a que los observadores terminen;
Los resultados obtenidos son los siguientes:
- línea 2: el hilo principal está bloqueado, esperando a que los dos observadores terminen;
- líneas 3-4: vemos que el observador 0 está en el hilo [RxComputationThreadPool-1] y el observador 1 en el hilo [RxComputationThreadPool-2];
- líneas 3-10: vemos que ambos observadores reciben exactamente los mismos elementos;
Utilizaremos la clase Observer definida aquí para ilustrar el comportamiento de otros tipos de observables.
7.3.2. Ejemplo-09: los métodos Observable.[interval, take, doNext]
![]() |
![]() |
Este ejemplo ilustra el uso del observable Observable.interval(long interval, TimeUnit unit), que emite enteros largos a intervalos regulares. Nota [1]: por defecto, el observable [Observable.interval] se ejecuta en uno de los subprocesos del programador [Schedulers.computation].
El código será el siguiente:
package dvp.rxjava.observables.exemples;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import dvp.rxjava.observables.utils.Observateur;
import rx.Observable;
public class Exemple09 {
public static void main(String[] args) throws InterruptedException {
// number of observers
final int nbObservateurs = 2;
// semaphore
CountDownLatch latch = new CountDownLatch(nbObservateurs);
// observable configuration
Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.doOnNext(l -> showInfos.accept(l.toString()));
// observable performance (observation)
showInfos.accept("main : début observation");
for (int i = 0; i < nbObservateurs; i++) {
obs1.subscribe(new Observateur<>(String.format("observateur [%d]", i), latch, showInfos,
"obs1"));
}
// waiting
showInfos.accept("main : attente fin observation");
latch.await();
// end
showInfos.accept("main : fin observation");
}
// displays
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
- línea 22: el observable emite enteros largos cada 500 milisegundos. La secuencia comienza con el número 0;
- línea 22: este observable emite un número infinito de valores. El método [Observable.take(n)] crea un nuevo observable que conserva solo los primeros n elementos emitidos;
![]() |
Repasemos el código del observable:
Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.doOnNext(l -> showInfos.accept(l.toString()));
Línea 2: El método [Observable.doOnNext] se ejecuta cada vez que el observable emite un nuevo elemento. Se utiliza a menudo para registrar información. En este caso, queremos registrar la fecha de emisión de los elementos para verificar que se mantiene el intervalo de 500 milisegundos. El método [Observable.doOnNext] no modifica el observable al que se aplica. Su definición es la siguiente:
![]() |
La ejecución arroja los siguientes resultados:
- líneas 3, 7 y 11: vemos que el intervalo de emisión es de aproximadamente 500 ms;
- los dos observadores se encuentran efectivamente en dos subprocesos diferentes, aunque el observable no se hubiera configurado para ejecutarse con un programador específico. Este es el comportamiento predeterminado del observable [Observable.interval] que vemos aquí;
7.3.3. Ejemplos-10/12: los métodos Observable.[error, empty, never]
![]() | ![]() |
A partir de ahora, seremos más concisos en nuestras ilustraciones de los métodos de la clase [Observable]. El código anterior era el siguiente:
package dvp.rxjava.observables;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import rx.Observable;
public class Exemple09 {
public static void main(String[] args) throws InterruptedException {
// number of observers
final int nbObservateurs = 2;
// semaphore
CountDownLatch latch = new CountDownLatch(nbObservateurs);
// observable configuration
Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.doOnNext(l -> showInfos.accept(l.toString()));
// observable performance (observation)
showInfos.accept("main : début observation");
for (int i = 0; i < nbObservateurs; i++) {
obs1.subscribe(new Observateur<>(String.format("observateur [%d]", i), latch, showInfos,
"obs1"));
}
// waiting
showInfos.accept("main : attente fin observation");
latch.await();
// end
showInfos.accept("main : fin observation");
}
// displays
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
Este código ya se utilizó en el ejemplo anterior. Solo han cambiado las líneas 21 y 22. Por lo tanto, extraeremos la mayor parte de este código a la siguiente clase [ProcessUtils]:
package dvp.rxjava.observables.utils;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import rx.Observable;
public class ProcessUtils {
@SafeVarargs
public static void subscribe(int nbObservateurs, IProcess<?>... processes) throws InterruptedException {
// semaphore
CountDownLatch latch = new CountDownLatch(nbObservateurs * processes.length);
// observable performance (observation)
showInfos.accept("main : début observation");
for (int i = 0; i < nbObservateurs; i++) {
for (IProcess<?> process : processes) {
Observable<?> obs = process.getObservable();
obs.subscribe(new Observateur<>(String.format("observateur[%d]", i), latch, showInfos, process.getName()));
}
}
// waiting
showInfos.accept("main : attente fin observation");
latch.await();
// end
showInfos.accept("main : fin observation");
}
// displays
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
- línea 13: el método toma dos parámetros:
- nbObservers: el número de observadores de los procesos, pasado como segundo parámetro;
- procesos: los procesos (denominados «observables») que se van a observar. Gracias a la notación [IProcess<?>], los procesos pueden emitir elementos de diferentes tipos;
- línea 16: el semáforo debe ponerse en verde cuando todos los observadores hayan completado todas sus observaciones. El valor inicial del semáforo es, por lo tanto, el número de observadores multiplicado por el número de observaciones;
- Líneas 20-25: cada observador está suscrito a todos los procesos que necesita observar;
- línea 23: recuperar el observable del proceso (véase la sección 7.3.1);
- línea 23: un observador está suscrito a él. Se pasan cuatro datos al observador:
- su nombre;
- el semáforo que debe decrementar cuando reciba la notificación de fin de transmisión del observable que está observando;
- el método que debe utilizar cuando desee registrar información en la consola;
- el nombre del proceso que observará;
Una vez definidas estas clases, el Ejemplo 10 quedará así:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple10 {
public static void main(String[] args) throws InterruptedException {
// observable configuration
Observable<?> obs = Observable.error(new RuntimeException("Erreur !!!")).subscribeOn(Schedulers.computation());
// observable execution (observation)
ProcessUtils.subscribe(2,new Process<>("process1", obs));
}
}
En la línea 11, el método estático [Observable.error] se define de la siguiente manera:
![]() |
Por lo tanto, la línea 8 configura un observable que simplemente lanza una excepción al método [onError] de sus suscriptores. La ejecución arroja los siguientes resultados:
main : début observation ------Thread[main] ---- Time[22:618]
main : attente fin observation ------Thread[main] ---- Time[22:636]
Subscriber[observateur[1], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-2] ---- Time[22:638]
Subscriber[observateur[0], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-1] ---- Time[22:638]
Líneas 3 y 4: el método [onError] de ambos suscriptores recibió la excepción lanzada por el observable.
Esta ejecución presenta una peculiaridad: no se llamaron los métodos [onCompleted] de ambos observadores. Como resultado, la barrera no se bajó y el hilo principal permanece bloqueado en el método estático [ProcessUtils.subscribe] en la siguiente línea 3:
// attente
showInfos.accept("main : attente fin observation");
latch.await();
// fin
showInfos.accept("main : fin observation");
Aquí vemos que, si se produce un error en el observable, no se invoca el método [onCompleted] de los suscriptores. Por lo tanto, modificamos el método [Observer.onError] de la siguiente manera:
@Override
public void onError(Throwable e) {
// erreur d'émission
if (!isUnsubscribed()) {
showInfos.accept(String.format("Subscriber[%s, %s].onError (%s)", observerName, processName, e));
}
// fin blocage thread principal
latch.countDown();
}
Añadimos las líneas 7–8 para liberar el bloqueo en caso de un error observable. Con este nuevo código, la ejecución arroja los siguientes resultados:
main : début observation ------Thread[main] ---- Time[40:750]
main : attente fin observation ------Thread[main] ---- Time[40:764]
Subscriber[observateur[0], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-1] ---- Time[40:766]
Subscriber[observateur[1], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-2] ---- Time[40:766]
main : fin observation ------Thread[main] ---- Time[40:767]
Aparece la línea 5, que antes no teníamos.
El ejemplo 11 será el siguiente:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple11 {
public static void main(String[] args) throws InterruptedException {
// observable configuration
Observable<?> obs1 = Observable.empty();
// observable execution (observation)
ProcessUtils.subscribe(2,new Process<>("process1",obs1));
}
}
Línea 10: El método estático [Observable.empty] crea un observable que no emite ningún elemento. Solo emite la notificación de fin de emisión;
![]() |
Al ejecutar el código del ejemplo anterior se obtienen los siguientes resultados:
- Líneas 2 y 3: Vemos que ambos observadores reciben la notificación de fin de emisión sin haber recibido ningún elemento previamente.
Uno podría preguntarse para qué se utiliza realmente este método. Se puede utilizar de forma análoga a una colección, inicialmente vacía, a la que luego se añaden elementos:
En la línea 3, fusionamos el observable inicial obs (línea 1) con otros observables.
El ejemplo 12 ilustra el método estático [Observable.never]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple12 {
public static void main(String[] args) throws InterruptedException {
// observable configuration
Observable<?> obs1 = Observable.never();
// observable execution (observation)
ProcessUtils.subscribe(2,new Process<>("process1",obs1));
}
}
El método estático [Observable.never] crea un observable que nunca emite:
![]() |
Al ejecutar el ejemplo se obtienen los siguientes resultados:
Línea 2: el hilo principal espera indefinidamente. Esto se debe a que ningún observable emite la notificación [onCompleted], lo que permite que el semáforo (barrera) se ponga en verde (bajar la barrera).
7.4. Multiprocesamiento
7.4.1. Ejemplo 13: hilo de acción, hilo de observador
En la sección 7.1.3, creamos un observable utilizando el método estático [Observable.create]:
![]() |
- el método [create] devuelve un tipo Observable<T>;
- el parámetro del método [create] es una función de tipo [Observable.OnSubscribe<T>] definida de la siguiente manera:
![]() |
El tipo [Observable.OnSubscribe<T>] es una interfaz funcional que, a su vez, extiende la interfaz funcional [Action1<Subscriber<? super T>>]. El método [call] de esta interfaz espera un tipo [Subscriber] (suscriptor, observador). En el resto de este documento, en ocasiones nos referiremos al tipo [Observable.OnSubscribe<T>] como una acción. Crearemos acciones personalizadas que tendrán un nombre. Estas serán instancias de la siguiente interfaz [IProcessAction]:
![]() |
package dvp.rxjava.observables.utils;
import rx.Observable;
public interface IProcessAction<T> extends Observable.OnSubscribe<T> {
// action has a name
public String getName();
}
- línea 5: la interfaz [IProcessAction<T>] tiene todas las características de la interfaz [Observable.OnSubscribe<T>];
- línea 8: también tiene un método [getName] que devuelve el nombre de la instancia que implementa la interfaz;
Utilizaremos la siguiente acción denominada [ProcessAction01]:
package dvp.rxjava.observables.utils;
import java.util.Random;
import rx.Subscriber;
import rx.functions.Func1;
public class ProcessAction01<T> implements IProcessAction<T> {
// data
private String name;
private int nbValues;
private Func1<Integer, T> func1;
// manufacturers
public ProcessAction01(String name, int nbValues, Func1<Integer, T> func1) {
this.name = name;
this.nbValues = nbValues;
this.func1 = func1;
}
@Override
public void call(Subscriber<? super T> subscriber) {
ProcessUtils.showInfos.accept(String.format("Observable (%s) call start", getName()));
for (int i = 0; i < nbValues; i++) {
// waiting
try {
Thread.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
// error
ProcessUtils.showInfos.accept(String.format("Observable (%s) onError", getName()));
subscriber.onError(e);
}
// element emission
T value = func1.call(i);
ProcessUtils.showInfos.accept(String.format("Observable (%s,%s) onNext (%s)", getName(), i, value));
subscriber.onNext(value);
}
// finish
ProcessUtils.showInfos.accept(String.format("Observable (%s) onCompleted", getName()));
subscriber.onCompleted();
}
@Override
public String getName() {
return name;
}
}
- línea 8: la clase [ProcessAction01<T>] implementa la interfaz [IProcessAction<T>] y, por lo tanto, la interfaz [Observable.OnSubscribe<T>];
- línea 11: el nombre de la acción;
- línea 12: el número de valores que se van a emitir;
- línea 13: una instancia de tipo [Func1<Integer, T>] que toma un entero y produce un tipo T que será emitido por el observable (líneas 35 y 37);
- líneas 16–20: pasamos el nombre de la acción, el número de valores que se van a emitir y la función de emisión al constructor;
- líneas 23–42: el código del proceso;
- línea 23: el método [call] toma como parámetro al suscriptor del observable asociado al proceso;
- línea 28: el proceso emite sus elementos tras una espera de duración aleatoria;
- línea 32: emisión de un error;
- línea 37: una emisión normal;
- línea 41: emite la notificación de fin de emisión;
- líneas 25–38: la acción emite nbValues números reales tras un tiempo de espera aleatorio (línea 30);
- línea 35: el valor que se va a emitir lo proporciona la función [func1] pasada como parámetro al constructor (línea 16);
Reestructuramos la clase [Process] (véase la sección 7.3.1) para que también pueda construirse con una acción con nombre. Añadimos el siguiente constructor:
public Process(IProcessAction<T> na, Scheduler schedulerObserved, Scheduler schedulerObserver) {
// nom process=nom action
name = na.getName();
// action --> observable
observable = Observable.create(na);
// thread d'exécution du processus observé
if (schedulerObserved != null) {
observable = observable.subscribeOn(schedulerObserved);
}
// thread d'observation de l'observateur
if (schedulerObserver != null) {
observable = observable.observeOn(schedulerObserver);
}
}
- Línea 1: El constructor toma 3 parámetros:
- la acción con nombre que se utilizará para construir el observable (línea 5);
- el programador del proceso observado (puede ser nulo);
- el programador del observador (puede ser nulo);
- línea 5: el observable se crea a partir de la acción pasada como parámetro;
El siguiente código [Ejemplo 13] observa diferentes observables:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple13 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 1, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<String> process2 = new Process<>(
new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), Schedulers.computation(), null);
// process 3
Process<Integer> process3 = new Process<>(new ProcessAction01<Integer>("process3", 3, i -> i * 2), null,
Schedulers.computation());
// process 4
Process<Boolean> process4 = new Process<>(new ProcessAction01<Boolean>("process4", 4, i -> i % 2 == 0), null, null);
// subscriptions
ProcessUtils.subscribe(1, process1);
ProcessUtils.subscribe(1, process2);
ProcessUtils.subscribe(1, process3);
ProcessUtils.subscribe(1, process4);
}
}
- líneas 13–15: process1 genera 1 número real en un hilo de cálculo que será observado en otro hilo de cálculo;
- líneas 17-18: el proceso 2 genera 2 cadenas en un hilo de cálculo, y no se da ninguna indicación sobre el hilo del observador. Los resultados muestran que la observación se produce por defecto en el mismo hilo que la ejecución del proceso;
- líneas 20-21: el proceso3 genera 3 enteros en un hilo no especificado, que serán observados en un hilo de cálculo. Los resultados muestran que el proceso se ejecuta por defecto en el hilo principal;
- línea 23: el proceso process4 genera 4 valores booleanos en un hilo no especificado, que se observarán en un hilo no especificado. Los resultados muestran que la ejecución del proceso y su observación se producen por defecto en el hilo principal;
El resultado de ejecutar este código es el siguiente:
- El proceso «process1» genera un número real (línea 4) en el hilo de cálculo [RxComputationThreadPool-4], que se observa en el hilo de cálculo [RxComputationThreadPool-3] (línea 6);
- El proceso process2 genera 2 cadenas (líneas 12, 14) en el hilo de cálculo [RxComputationThreadPool-5], que se observan en ese mismo hilo (líneas 13, 15);
- El proceso process3 genera 3 enteros (líneas 21, 23, 25) en el hilo principal, que se observan en el hilo de cálculo [RxComputationThreadPool-6] (líneas 22, 24, 28);
- el proceso process4 produce 4 valores booleanos (líneas 34, 36, 38, 40) en el hilo principal, que se observan en ese mismo hilo principal (líneas 33, 35, 37, 39);
Se invita al lector a seguir lo anterior:
- el ciclo de vida del proceso observado y su hilo;
- el ciclo de vida de su observador y su hilo;
Gran parte del atractivo de las bibliotecas Rx reside en este multihilo, que el desarrollador no tiene que gestionar por sí mismo.
7.5. Combinaciones de múltiples observables
7.5.1. Ejemplo 14: Fusión de dos observables con [Observable.merge]
A continuación, presentamos los métodos estáticos de la clase [Observable] que permiten combinar varios observables en un único observable de resultado.
El primer ejemplo de este tipo es el siguiente:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.ProcessAction01;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple14 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process2
Process<String> process2 = new Process<>(
new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), Schedulers.computation(), null);
// merge
Process<?> process12 = new Process<>("process12",
Observable.merge(process1.getObservable(), process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process12);
}
}
- líneas 15–17: un proceso denominado [process1] emitirá 3 números reales en un hilo de cálculo. También será observado en un hilo de cálculo;
- líneas 19–20: un proceso denominado [process2] emitirá 2 cadenas en un hilo de cálculo. No se especifica el hilo de observación. Ya hemos visto anteriormente que, en este caso, el hilo de observación es el hilo de cálculo;
- línea 23: los dos procesos se fusionan, es decir, se crea un observable cuyos elementos provienen simultáneamente de ambos procesos. Para ello se utiliza el método estático [Observable.merge]:
![]() |
Contrariamente a lo que podría sugerir el diagrama anterior, durante la fusión, los elementos del flujo 1 pueden intercalarse entre los elementos del flujo 2. Así lo muestran los resultados de la ejecución:
- línea 3: el proceso [process1] se ejecuta en el hilo de cálculo [RxComputationThreadPool-4];
- línea 4: el proceso [process2] se está ejecutando en el hilo de cálculo [RxComputationThreadPool-5];
- línea 9: el proceso [process12] se observa en el hilo de cálculo [RxComputationThreadPool-3]. No conozco la regla que ha llevado a esta elección;
- líneas 9–11: vemos que el observador observa elementos de ambos procesos [process1] (línea 5) y [process2] (líneas 6, 7) aunque ninguno de los dos haya terminado (hay mezcla);
- el proceso [process12] finaliza (línea 17) cuando ambos procesos, process1 y process2, han terminado;
7.5.2. Ejemplo-15: Concatenación de dos observables con [Observable.concat]
Ahora examinaremos el siguiente código:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.ProcessAction01;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple15 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process2
Process<String> process2 = new Process<>(
new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), null, Schedulers.computation());
// concat
Process<?> process12 = new Process<>("process12",
Observable.concat(process1.getObservable(), process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process12);
}
}
- líneas 15–17: un proceso denominado [process1] emitirá 3 números reales en un hilo de cálculo. También se observará en un hilo de cálculo;
- líneas 19–20: un proceso denominado [process2] emitirá 2 cadenas en un hilo no especificado, en este caso el hilo principal por defecto. Se observará en un hilo de cálculo;
- línea 23: los dos procesos se concatenan, es decir, se crea un observable cuyos elementos provienen de ambos procesos. Los valores emitidos no se mezclan. El proceso [process12] emitirá primero todos los valores del proceso [process1] y luego los del proceso [process2]. Para ello se utiliza el método estático [Observable.concat]:
![]() |
Los resultados de la ejecución son los siguientes:
- líneas 3-10: el proceso [process1] se ejecuta y el proceso [process12] emite los valores emitidos por [process1];
- línea 9: el proceso [process1] ha finalizado;
- líneas 11-17: el proceso [process2] se ejecuta y el proceso [process12] emite los valores emitidos por [process2];
Hay una peculiaridad con respecto a [process2]: no especificamos un hilo de ejecución. Por lo tanto, cabría esperar que se utilizara el hilo principal por defecto. Sin embargo, no es así. El hilo de ejecución fue el hilo de cálculo [RxComputationThreadPool-3] (línea 11). Por lo tanto, cuando no se especifica ningún hilo de ejecución u observación, no podemos hacer ninguna suposición sobre qué hilo se elegirá.
7.5.3. Ejemplo 16: Combinación de dos observables con [Observable.zip]
Ahora examinaremos el siguiente código:
package dvp.rxjava.observables.exemples;
import java.util.Arrays;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.functions.FuncN;
import rx.schedulers.Schedulers;
public class Exemple16 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<String> process2 = new Process<>(
new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), null, null);
// 2-process combination function
FuncN<String> funcn = new FuncN<String>() {
@Override
public String call(Object... args) {
if (args.length == 2) {
return String.format("double=%s, string=%s", args[0], args[1]);
} else {
throw new RuntimeException("la fonction attend 2 paramètres exactement");
}
}
};
// zip of the 2 processes
Process<String> process12 = new Process<>("process12",
Observable.zip(Arrays.asList(process1.getObservable(), process2.getObservable()), funcn));
// subscriptions
ProcessUtils.subscribe(1, process12);
}
}
- líneas 16–18: un proceso denominado [process1] emitirá 3 números reales en un hilo de cálculo. También será observado en un hilo de cálculo;
- líneas 20-21: un proceso denominado [process2] emitirá 2 cadenas en un hilo no especificado. El hilo de observación tampoco está especificado;
- líneas 23–32: instanciación de un tipo [FuncN<String>] con una clase anónima. FuncN es una interfaz funcional:
![]() |
El método [FuncN.call] espera una matriz de objetos y devuelve un tipo R. La función [funcn] se utilizará para combinar los procesos process1 y process2 en ese orden. En el método [FuncN.call]:
- args[0] será un Double;
- args[1] será un String;
Aquí, el resultado de [funcn.call] será la cadena de la línea 27. Para construir este resultado no es necesario conocer los tipos de los argumentos del método de llamada.
Los dos procesos se combinan de la siguiente manera:
// zip des 2 processus
Process<String> process12 = new Process<>("process12",
Observable.zip(Arrays.asList(process1.getObservable(), process2.getObservable()), funcn));
El método [Observable.zip] funciona de la siguiente manera:
![]() |
Vemos que:
- el primer argumento de zip es un Iterable<Observable>. En nuestro ejemplo, tenemos un parámetro real de tipo List<Observable> que contiene nuestros dos observables;
- el segundo argumento de zip es de tipo FuncN. En nuestro ejemplo, el parámetro real es [funcn];
La ejecución arroja los siguientes resultados:
- líneas 7, 11: process12 emite dos elementos;
- línea 8: el elemento adicional emitido por el proceso 1, que no tiene pareja en el proceso 2, no es emitido por el proceso resultante proceso 12;
Vemos que el proceso2, al que no se le había asignado ni un hilo de ejecución ni un hilo de observación, utilizó el hilo principal para ambos.
7.5.4. Ejemplo 17: Combinación de dos observables con [Observable.combineLatest]
Ahora examinaremos el siguiente código:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple17 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Double> process2 = new Process<>(
new ProcessAction01<Double>("process2", 2, i -> new Random().nextInt(200) * 1.4), null,
Schedulers.computation());
// combining the 2 processes
Process<Double> process12 = new Process<>("process12",
Observable.combineLatest(process1.getObservable(), process2.getObservable(), (d1, d2) -> d1 + d2));
// subscriptions
ProcessUtils.subscribe(1, process12);
}
}
- líneas 14–16: un proceso denominado [process1] emitirá 3 números reales en un hilo de cálculo. También se observará en un hilo de cálculo;
- líneas 18–20: un proceso denominado [process2] emitirá 2 números reales en un hilo no vinculado. Se observarán en un hilo de cálculo;
- línea 23: los dos observables se combinan utilizando el siguiente método estático [Observable.combineLatest]:
![]() |
El observable [combineLatest] funciona de la siguiente manera: cuando uno de los dos observables emite un elemento E1, ese elemento se combina mediante [combineFunction] con el último elemento emitido por el otro observable.
Al ejecutar este código se obtiene el siguiente resultado:
- Línea 5: La salida de process2 (56) se combina con el último elemento generado por process1 (54, línea 4) y produce el resultado que se muestra en la línea 7;
- línea 6: la salida de process1 (51,6) se combina con el último elemento generado por process2 (56, línea 5) y produce el resultado de la línea 8;
- línea 9: la salida del proceso 2 (261,8) se combina con el último elemento emitido por el proceso 1 (51,6, línea 6) y produce el resultado de la línea 12;
- línea 13: la emisión del proceso 1 (80,39) se combina con el último elemento emitido por el proceso 2 (261,8, línea 9) y produce el resultado de la línea 15;
Esta es una variante del observable [zip] en la que, en esta ocasión, los elementos combinados no son necesariamente los elementos que se encuentran en la misma posición en las secuencias. Obsérvese aquí que el proceso2, al que no se le había asignado ningún hilo de ejecución, se ejecutó en el hilo principal (línea 2).
7.5.5. Ejemplo-18: Combinación de dos observables con [Observable.amb]
Ahora examinaremos el siguiente código:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple18 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Double> process2 = new Process<>(
new ProcessAction01<Double>("process2", 2, i -> new Random().nextInt(200) * 1.4), null, null);
// combining the 2 processes
Process<Double> process12 = new Process<>("process12",
Observable.amb(process1.getObservable(), process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process12);
}
}
- líneas 14–16: un proceso denominado [process1] emitirá 3 números reales en un hilo de cálculo. También se observará en un hilo de cálculo;
- líneas 18–20: un proceso denominado [process2] emitirá 2 números reales en un hilo no vinculado. Se observarán en un hilo no vinculado;
- línea 22: los dos observables se combinan utilizando el siguiente método estático [Observable.amb]:
![]() |
Como se muestra en el diagrama anterior, el observable [Observable.amb(Observable o1, Observable o2)] emite los elementos del observable que emite primero. Esto queda confirmado por los resultados del ejemplo presentado:
- línea 4: process2 es el primero en emitir;
- Líneas 8, 12: process12 emite todos los elementos emitidos por process2 (líneas 4, 11);
7.6. Cadena de procesamiento para un observable
7.6.1. Ejemplo 19: transformación de un observable con [Observable.map]
En los ejemplos anteriores, examinamos diversas combinaciones de dos observables en un tercer observable. Ahora presentamos métodos estáticos de la clase [Observable] que permiten realizar operaciones de transformación, filtrado y agregación sobre un observable. Aquí encontraremos métodos análogos a los de la clase [Stream] estudiada en la sección 5.
Nuestro primer ejemplo será el siguiente:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple19 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<String> process2 = new Process<>("process2",
process1.getObservable().map(d -> String.format("valeur-%s", d)));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- líneas 14–16: un proceso denominado process1 emitirá 3 números reales en un hilo de cálculo. También se observará en un hilo de cálculo;
- líneas 17–18: los números emitidos por process1 se convertirán en cadenas en un process2;
- línea 20: observamos el proceso2;
El método [Observable.map] de la línea 18 es análogo al método [Stream.map] descrito en la sección 5.5:
![]() |
Los resultados del ejemplo son los siguientes:
- líneas 4, 5 y 8: las emisiones de process1. Son números reales;
- líneas 6, 7, 10: las emisiones observadas de process2. Son cadenas de caracteres;
7.6.2. Ejemplo-20: filtrar un observable con [Observable.filter]
El ejemplo será el siguiente:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple20 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>("process2", process1.getObservable().filter(i -> i % 2 == 0));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- líneas 11-12: un proceso llamado process1 emitirá números enteros del 0 al 2 en un subproceso de trabajo. También se observará en un subproceso de trabajo;
- línea 14: los números emitidos por process1 se filtrarán de modo que solo se conserven los números pares en process2;
- línea 20: observamos el proceso2;
El método [Observable.filter] de la línea 18 es análogo al método [Stream.filter] descrito en la sección 5.4:
![]() |
Los resultados del ejemplo son los siguientes:
- líneas 4, 5 y 7: emisiones de process1;
- líneas 6 y 9: las emisiones observadas de process2. Estos son los elementos pares de process1;
7.6.3. Ejemplo 21: transformar una observable con [Observable.flatMap]
El ejemplo será el siguiente:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple21 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMap(i -> {
int value = i * 10;
return Observable.just(value, value + 1, value + 2);
}));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- líneas 12-13: un proceso llamado process1 emitirá números enteros del 0 al 2 en un hilo de cálculo. También se observará en un hilo de cálculo;
- líneas 15–18: cada número n emitido por process1 se transforma en un observable que emite los 3 números (10*n, 10*n+1, 10*n+2). Si hubiéramos utilizado el método [map] en la línea 15, el proceso2 emitiría un tipo Observable<Integer> en lugar de un tipo Integer. El método [flatMap] utilizado nos permite aplanar esta secuencia de elementos de tipo Observable<Integer> en una secuencia de elementos de tipo Integer que consta de cada elemento de cada Observable<Integer>;
- línea 20: observamos process2;
El método [Observable.flatMap] de la línea 15 es análogo al método [Stream.flatMap] tratado en la sección 5.6.12:
![]() |
Los resultados del ejemplo son los siguientes:
- líneas 5-7: las tres emisiones de process2 tras la emisión de la línea 4 de process1;
- líneas 9-11: las tres emisiones de process2 tras la emisión de la línea 8 de process1;
- líneas 14-16: las tres emisiones del proceso 2 tras la emisión de la línea 12 del proceso 1;
El siguiente código muestra cómo crear un tipo Observable<Integer[]> a partir del proceso1 [Ejemplo 21b]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21b {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer[]> process2 = new Process<>("process2", process1.getObservable().map(i -> {
int value = i * 10;
return new Integer[] { value, value + 1, value + 2 };
}));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- línea 14: se utiliza el método [Observable.map];
- línea 16: que devuelve un tipo Integer[];
Los resultados son los siguientes:
- líneas 6, 7, 10: vemos los resultados del mapa;
Todas estas transformaciones observables se pueden encadenar, ya que cada transformación produce una nueva observable. Esto se demuestra en el siguiente ejemplo [Ejemplo 21c]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple21c {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMap(i -> {
int value = i * 10;
return Observable.just(value, value + 1, value + 2);
}).filter(i -> i % 2 == 0));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- líneas 15–18: el flatMap va seguido de un filter;
Los resultados de la ejecución son los siguientes:
- líneas 8-13: process2 emitió solo los elementos pares de flatMap;
Un método similar a [flatMap] es el método [flatMapIterable], ilustrado en el siguiente ejemplo [Ejemplo21d]:
package dvp.rxjava.observables.exemples;
import java.util.Arrays;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21d {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMapIterable(i -> {
int value = i * 10;
return Arrays.asList(value, value + 1, value + 2);
}).filter(i -> i % 2 == 0));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
Línea 16: En lugar de utilizar el método [flatMap], utilizamos el método [flatMapIterable]. En este caso, la función de transformación debe producir un tipo Iterable<T> (línea 18) en lugar de un tipo Observable<T>.
Obtenemos los mismos resultados que antes.
Volvamos a la definición del método [flatMap]:
![]() |
Como se muestra arriba, se ha insertado un elemento azul [3] entre los dos elementos verdes [1-2]. Esto significa que, al aplanar Observable<T>s, el método [flatMap] conserva el orden de emisión de estos diversos observables internos. Esto queda demostrado en el siguiente ejemplo [Ejemplo21e]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21e {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
Schedulers.computation(), Schedulers.computation());
// process 3
Process<Integer> process3 = new Process<>("process3",
process1.getObservable().flatMap(i -> process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process3);
}
}
- líneas 11-12: el proceso process1 emite los enteros [0,1];
- líneas 14-15: el proceso2 emite los números enteros [10,11,12];
- líneas 17-18: cada elemento emitido por el proceso1 se asocia con la observable del proceso2. Esto significa que:
- el elemento [0] del proceso1 se asociará a una observable que emite [10,11,12];
- lo mismo se aplica al elemento 1;
Al final, se emitirán los 6 números [10, 11, 12, 10, 11, 12]. Queremos ver en qué orden.
Los resultados de la ejecución son los siguientes:
Podemos ver que el orden de emisión de process3 fue: [10, 10, 11, 12, 11, 12] (líneas 11, 12, 14, 17, 19, 22). Por lo tanto, los elementos emitidos por process2 se mezclaron efectivamente. Podemos evitar esto utilizando el método [concatMap] en lugar del método [flatMap]. Esto se demuestra con el siguiente código [Ejemplo21ef]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21ef {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
Schedulers.computation(), Schedulers.computation());
// process 3
Process<Integer> process3 = new Process<>("process3",
process1.getObservable().concatMap(i -> process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process3);
}
}
En la línea 18, hemos sustituido [flatMap] por [concatMap]. Los resultados de la ejecución son los siguientes:
Vemos que el orden de emisión del proceso 3 fue: [10, 11, 12, 10, 11, 12] (líneas 12-14, 17, 19, 22). Los elementos emitidos por el proceso 2 no se mezclaron.
Otra variante del método [map] es el método [switchMap]:
![]() |
Arriba, a partir del observable [1], se crean otros tres observables [2] con dos elementos, que luego se aplanan como en [flatMap] [3]. Obsérvese que el resultado tiene 5 elementos, no 6. Esto se debe a que, antes de que el segundo observable emita su segundo elemento [6], el tercer observable emite su primer elemento [5], lo que provoca que el segundo observable sea descartado. Por lo tanto, el elemento [6] no se encuentra en el observable resultante [3].
Para ilustrar [switchMap], utilizaremos el siguiente ejemplo [Ejemplo21eg]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21eg {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
Schedulers.computation(), Schedulers.computation());
// process 3
Process<Integer> process3 = new Process<>("process3",
process1.getObservable().switchMap(i -> process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process3);
}
}
Al ejecutar el ejemplo se obtienen los siguientes resultados:
- process1 emite 2 elementos que dan lugar a 2 observables de process2 de 3 elementos;
- línea 14: el observador recibe el elemento n.º 0 emitido por el primer observable de process2 en la línea 6;
- línea 15: el observador recibe el elemento n.º 0 emitido por la segunda observable del proceso 2 en la línea 13. La historia no explica por qué no recibió previamente los elementos 1 y 2 emitidos por la primera observable del proceso 2 en las líneas 7 y 8. En cualquier caso, la primera observable del proceso 2 se abandona;
- al final, el observador solo ve 4 elementos (líneas 14, 15, 17, 20) en lugar de los 6 que se emitieron;
7.6.4. Ejemplos-22: Otros métodos de la clase [Observable]
La clase [Observable] incluye muchos métodos de la clase [Stream] que funcionan de manera similar. A continuación se muestran algunos de ellos. Nos limitaremos a proporcionar el código y sus resultados.
[Ejemplo 22a - take=limit]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22a {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.range(1, 10).take(3));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
resultados
[Ejemplo 22b - takeLast]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22b {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.range(1, 10).takeLast(2));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
resultados
[Ejemplo 22c - omitir]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22c {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.range(1, 10).skip(5).take(2));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
resultados
[Ejemplo 22d - reduce]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22d {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.range(1, 10).reduce(0, (i, a) -> i + a));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
- línea 10: calcula la suma de los elementos del observable. El resultado es un observable que emite esta suma;
resultados
[Ejemplo 22e - todo]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22e {
public static void main(String[] args) throws InterruptedException {
// process
Process<Boolean> process = new Process<>("process", Observable.range(1, 10).all(i -> i > 10));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
- línea 10: devuelve un Observable<Boolean> que emite el elemento true si el predicado del método [all] es verdadero para todos los elementos; en caso contrario, devuelve false;
resultados
[Ejemplo 22f - recuento]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22f {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.range(1, 10).count());
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
- línea 10: [Observable.count] crea un observable de un elemento que es la suma de los elementos observados;
resultados
[Ejemplo 22g - distinto]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22g {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.just(1, 2, 1, 3).distinct());
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
resultados
[ Ejemplo 22h - groupBy, asObservable]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.observables.GroupedObservable;
public class Exemple22h {
public static void main(String[] args) throws InterruptedException {
// process
Observable<GroupedObservable<Boolean, Integer>> obs = Observable.range(1, 10).groupBy(i -> i % 2 == 0);
Process<Integer> process = new Process<>("process", obs.concatMap(g -> g.asObservable()));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
- Línea 11: El método [groupBy] agrupa los 10 elementos emitidos en dos grupos: números pares y números impares. El resultado es un Observable<GroupedObservable<Boolean, Integer>>, es decir, un observable cuyos elementos son de tipo GroupedObservable<Boolean, Integer>, donde Boolean es el tipo de la clave del grupo (false, true en este caso) y también es el tipo del resultado de la lambda pasada como parámetro al método [groupBy], e Integer es el tipo de los elementos del grupo;
- línea 12: el tipo GroupedObservable tiene un método [asObservable] que nos permite crear un observable a partir de este tipo. Por lo tanto, tendremos dos tipos Observable<Integer>, uno para los números pares y otro para los impares. A partir de estos dos observables, el método [concatMap] creará uno único;
resultados
[Ejemplo 22i - marca de tiempo]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
import rx.schedulers.Timestamped;
public class Exemple22i {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Timestamped<Integer>> process2 = new Process<>("process2", process1.getObservable().timestamp());
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- línea 15, el método [timestamp] asocia una marca de tiempo a cada elemento procesado del observable;
resultados
En este ejemplo, es difícil saber qué representa la información de la marca de tiempo:
- líneas 4-5: vemos que el elemento 1 del proceso 1 se emitió 139 ms después del elemento 0;
- líneas 6 y 7: vemos que el elemento 1 del proceso 2 se observó 234 ms después del elemento 0;
- líneas 5 y 8: vemos que el elemento 2 del proceso 1 se emitió 33 ms después del elemento 1;
- líneas 7 y 10: vemos que el elemento 2 del proceso 2 se observó 37 ms después del elemento 1;
Estos retrasos se deben a que los subprocesos para observar y ejecutar los observables no son los mismos. Si sustituimos las líneas 12-13 por las siguientes líneas (Ejemplo 22j):
// processus 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
null);
- Líneas 2–3: No especificamos el hilo de observación. Sabemos que, en este caso, el observable se observa donde se ejecuta;
Esto da los siguientes resultados:
- líneas 4 y 6: process1 emite su elemento n.º 1 587 ms después de su elemento n.º 0;
- líneas 5 y 7: el observador observa estos dos elementos con un intervalo de 586 ms;
- líneas 6 y 8: el proceso 1 emite su elemento n.º 2 396 ms después de su elemento n.º 1;
- líneas 7 y 9: el observador observa estos dos elementos con una diferencia de tiempo de 396 ms;
En este caso, los valores de las marcas de tiempo son coherentes: representan con precisión el tiempo de transmisión del elemento.
7.7. Programadores
7.7.1. Ejemplo 23: el programador [Schedulers.computation]
Ahora examinaremos los programadores de ejecución. La observación se realizará en el hilo de ejecución.
El tema de los programadores es algo complejo. Los distintos programadores se presentan en esta pregunta del sitio web StackOverflow [http://stackoverflow.com/questions/31276164/rxjava-schedulers-use-cases]:
![]() |
Intentaremos ilustrar el uso de estos diferentes programadores con ejemplos. El primero ilustra el programador [Schedulers.computation]:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple23 {
public static void main(String[] args) throws InterruptedException {
// processes
@SuppressWarnings("unchecked")
Process<Double> processes[] = new Process[10];
for (int i = 0; i < processes.length; i++) {
processes[i] = new Process<>(
new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
Schedulers.computation(), null);
}
// subscriptions
ProcessUtils.subscribe(1, processes);
}
}
- líneas 14–19: creamos una matriz de 10 procesos que se ejecutan en un hilo de cálculo;
- línea 17: cada proceso genera un número real aleatorio;
- línea 21: nos suscribimos a todos estos procesos;
Los resultados son los siguientes:
- líneas 2-10: los primeros 8 procesos se inician en 8 subprocesos diferentes (la máquina utilizada tiene 8 núcleos). Obsérvese que todos se inician aproximadamente al mismo tiempo;
- líneas 17-19: se cierran 3 procesos, lo que libera 3 subprocesos;
- líneas 23-24: los dos últimos procesos pueden entonces iniciarse utilizando 2 de los subprocesos así liberados;
Por lo tanto, podemos concluir que el programador [Schedulers.computation] proporciona un conjunto de n subprocesos, donde n es el número de núcleos de la máquina. Los subprocesos se ejecutan en paralelo en estos núcleos.
7.7.2. Ejemplo 24: el programador [Schedulers.io]
Ejecutamos el código anterior con el programador [Schedulers.io]:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple24 {
public static void main(String[] args) throws InterruptedException {
// processes
@SuppressWarnings("unchecked")
Process<Double> processes[] = new Process[10];
for (int i = 0; i < processes.length; i++) {
processes[i] = new Process<>(
new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
Schedulers.io(), null);
}
// subscriptions
ProcessUtils.subscribe(1, processes);
}
}
- línea 18: los procesos se ejecutan utilizando los subprocesos del programador [Schedulers.io];
Esto da los siguientes resultados:
- líneas 2-10: los 10 procesos se inician cada uno en un hilo diferente. A diferencia del caso anterior, todos los procesos pudieron iniciarse. Tenga en cuenta que estos inicios tardan 6 ms, mientras que antes tardaban 1 ms;
- líneas 13-18: los observables se emiten uno tras otro y no de forma casi paralela como ocurría anteriormente;
¿Cuál es la diferencia entre los programadores [Schedulers.io] y [Schedulers.computation]? Se puede encontrar una respuesta en la URL [http://stackoverflow.com/questions/31276164/rxjava-schedulers-use-cases]:
![]() |
7.7.3. Ejemplo 25: el programador [Schedulers.newThread]
Ejecutamos el código anterior utilizando el programador [Schedulers.newThread]:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple25 {
public static void main(String[] args) throws InterruptedException {
// processes
@SuppressWarnings("unchecked")
Process<Double> processes[] = new Process[10];
for (int i = 0; i < processes.length; i++) {
processes[i] = new Process<>(
new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
Schedulers.newThread(), null);
}
// subscriptions
ProcessUtils.subscribe(1, processes);
}
}
Los resultados obtenidos son los mismos que con el programador [Schedulers.io]:
En la URL [http://stackoverflow.com/questions/33415881/retrofit-with-rxjava-schedulers-newthread-vs-schedulers-io], se explica que el programador [Schedulers.io] proporciona un grupo de subprocesos, algo que el programador [Schedulers.newThread] no hace. Un grupo de subprocesos crea automáticamente un conjunto de subprocesos. Los asigna a los procesos que los necesitan. Cuando estos procesos finalizan, sus subprocesos no se eliminan, sino que vuelven al grupo y pueden ser reutilizados por otro proceso. Esto es más eficiente que crear y eliminar subprocesos constantemente. Por lo tanto, es preferible utilizar el programador [Schedulers.io].
7.7.4. Ejemplo 26: Los programadores [Schedulers.immediate, Schedulers.trampoline]
Volvamos a la explicación proporcionada para estos dos programadores:
![]() |
La explicación es bastante fácil de entender, pero cuando intentas ilustrarla, te das cuenta de que en realidad no la has comprendido del todo. Fue el libro *Learning Reactive Programming With Java 8* el que me ayudó a crear un ejemplo basado en uno que aparecía en ese libro, pero simplificado. Aquí está:
package dvp.rxjava.observables.exemples;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.function.Consumer;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Scheduler;
import rx.Scheduler.Worker;
import rx.functions.Action0;
import rx.schedulers.Schedulers;
public class Exemple26 {
public static void main(String[] args) throws InterruptedException {
// a scheduler
Scheduler scheduler = Schedulers.immediate();
// a worker of this scheme
Worker worker = scheduler.createWorker();
// an Action0 type to be executed on the worker
Action0 action02 = new Action0() {
@Override
public void call() {
// log action02
ProcessUtils.showInfos.accept("action02");
}
};
// an Action0 type to be executed on the worker
Action0 action01 = new Action0() {
@Override
public void call() {
// program a new action on the same worker
worker.schedule(action02);
// log action01
ProcessUtils.showInfos.accept("action01");
}
};
// action01 is programmed on the worker
worker.schedule(action01);
}
// displays
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
- línea 17: un programador. Este será [Schedulers.immediate], como se muestra aquí, o [Schedulers.trampoline] más adelante;
- línea 19: Las acciones de tipo Action0 (líneas 21, 20) se pueden ejecutar en los trabajadores del programador. El método [Scheduler.createWorker] crea un trabajador. El método [Worker.schedule(Action0)] ejecuta un tipo Action0 a través de un trabajador;
- líneas 21–27: una primera acción llamada [action02] que será ejecutada (línea 40) por el trabajador de la línea 19;
- líneas 30–38: una segunda acción llamada [action01]. Tiene la particularidad de hacer que action02 se ejecute en el mismo trabajador que ella misma (línea 34). Aquí radica la diferencia entre [Schedulers.immediate] y [Schedulers.trampoline]:
- si el programador es [Schedulers.immediate], entonces en la línea 34, la acción action02 se ejecutará inmediatamente (de ahí el nombre del programador) y la acción action01 que se está ejecutando actualmente se interrumpirá. A continuación, veremos aparecer el mensaje de la línea 25. Una vez finalizada la acción action02, la acción action01 se reanudará y veremos el mensaje de la línea 36;
- si el programador es [Schedulers.trampoline], entonces, en la línea 34, la acción action02 se pone en cola. No se ejecutará hasta que la tarea actual, action01, haya finalizado. A continuación, aparecerá el mensaje de la línea 36. Una vez que action01 haya finalizado, se ejecutará action02 y aparecerá el mensaje de la línea 25;
La ejecución del código anterior arroja los siguientes resultados:
Si, en la línea 17, utilizamos el programador [Schedulers.trampoline], obtenemos los resultados opuestos:
Dicho esto, resulta difícil establecer una conexión con los observables. No he encontrado ningún ejemplo convincente que demuestre la ventaja de ejecutar un observable en uno de estos dos subprocesos. Sin embargo, aquí hay uno, aunque no me parece nada natural:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.Scheduler.Worker;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
public class Exemple27 {
public static void main(String[] args) throws InterruptedException {
// Worker
Worker worker = Schedulers.immediate().createWorker();
// Worker worker = Schedulers.trampoline().createWorker();
// observable 1 sur worker
worker.schedule(() -> Observable.range(1, 2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
ProcessUtils.showInfos.accept(String.valueOf(i));
// observable 2 on same worker
worker.schedule(() -> Observable.range(100, 2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
ProcessUtils.showInfos.accept(String.valueOf(i));
}
}));
}
}));
}
}
- líneas 13–14: se crea un trabajador utilizando uno de los dos programadores [Schedulers.immediate] y [Schedulers.trampoline];
- línea 16: se programa una primera observable obs1 en este trabajador para que emita los números [1,2]
- línea 22: cada vez que se observa un elemento de este observable obs1, se inicia la observación de un segundo observable obs2 en el mismo trabajador para emitir los números [100,101];
Con el programador [Schedulers.immediate], obtenemos los siguientes resultados:
En cambio, con el programador [Schedulers.trampoline], obtenemos los siguientes resultados:
7.8. Conclusión
Aún queda mucho por hacer. Para profundizar en el conocimiento de la biblioteca RxJava, se recomienda a los lectores que continúen su aprendizaje utilizando las referencias proporcionadas al principio de este documento. No obstante, ya contamos con los fundamentos necesarios para utilizar RxJava en entornos Swing y Android. Eso es lo que demostraremos a continuación.








































