Skip to content

2. Un ejemplo de introducción

Mis primeros contactos con RxJava se produjeron a través de cursos y tutoriales que encontré en Internet. Además de que la teoría utilizaba conceptos a los que no estaba acostumbrado y que me costaba entender, sobre todo no veía para qué podía servir en la vida real. Por lo tanto, vamos a empezar por presentar un ejemplo (espero que sencillo) en el que el uso de RxJava supone una simplificación real a la hora de escribir el código y, a partir de ahí, intentaremos identificar los elementos importantes de esta biblioteca.

La biblioteca RxJava se basa en el siguiente concepto: un flujo de elementos de tipo T Observable<T> es observado por uno o varios suscriptores (suscriptores, observadores, consumidores) Subscriber<T>. La biblioteca RxJava permite que el flujo Observable<T> se ejecute en un hilo T1 y su observador Subscriber<T> en un hilo T2 sin que el desarrolladortenga que preocuparse por gestionar el ciclo de vida de estos subprocesos ni por problemas naturalmente complejos, como el intercambio de datos entre subprocesos y su sincronización para ejecutar una tarea global. Por lo tanto, facilita la programación asíncrona.

Un flujo Observable<T> produce elementos de tipo T, observables a medida que se producen. Si el observador y el observable (término que designa al tipo Observable<T> por uso coloquial) se encuentran en el mismo hilo, entonces el observable solo puede producir el elemento (i+1) cuando el observador haya consumido el elemento i. Hay pocos casos en los que esta arquitectura resulte interesante. Si el observador y el observable no se encuentran en el mismo hilo, entonces el observable y su observador tienen comportamientos autónomos: el observable produce a su ritmo y el observador consume a su ritmo. Ahí reside el interés de la biblioteca. Hasta ahora siempre hemos hablado de un observador. En realidad, un observable puede tener cualquier número de observadores.

2.1. La arquitectura de la aplicación de ejemplo

La aplicación de ejemplo tiene la siguiente arquitectura:

Image

  • en [1], una capa de servicio genera listas de números aleatorios. Esta capa se ejecuta en el mismo hilo que el método [swing] que la utiliza. A continuación, genera sus números de forma sincrónica;
  • en [2], una delgada capa de adaptación implementada con RxJava permite presentar a la capa [swing] una implementación asíncrona del mismo servicio: esta puede ejecutarse en un hilo diferente al del método [swing] que la utiliza;
  • la llamada [4] es síncrona, mientras que la llamada [5-6] es asíncrona;

Lo que queremos mostrar aquí es que la biblioteca Rx permite transformar fácilmente una interfaz síncrona en una asíncrona. ¿Por qué es útil? Los eventos de una interfaz Swing se procesan en un hilo denominado comúnmente «event loop». Los eventos se ponen en espera en una cola y se procesan uno tras otro. El evento Ei+1 solo puede procesarse cuando el evento anterior Ei se ha procesado por completo. Por lo tanto, es importante que la gestión de un evento sea lo más breve posible para que la interfaz gráfica siga siendo reactiva. A veces, la gestión de un evento puede llevar mucho tiempo. Este es el caso si dicha gestión implica accesos a la red. Si no se quiere bloquear la interfaz gráfica de una forma inaceptable para el usuario, es necesario que estos accesos a la red se realicen en subprocesos separados del evento loop para liberar este último. Entramos entonces en el ámbito de la programación concurrente (varios subprocesos se ejecutan en paralelo), considerada con razón como difícil. La biblioteca Rx ofrece una solución sencilla y elegante a este problema.

Para simular procesos largos, el servicio del ejemplo genera sus números aleatorios tras un cierto tiempo de espera, de modo que se pueda observar el comportamiento de la interfaz gráfica.

2.2. L'exécutable

El ejecutable de la aplicación de ejemplo se encuentra en la carpeta [dvp/executables] de los ejemplos:

Hay varias formas de ejecutar el archivo [swing-01], dependiendo de la configuración del equipo utilizado para ejecutarlo. Por ejemplo, se puede seguir el proceso [1-3]. Se obtiene entonces la siguiente interfaz gráfica:

 
  • La interfaz presenta dos pestañas [1-2]: una, [Request], para la solicitud al servicio de generación de números aleatorios, y la otra, [Response], para la visualización de los números recibidos;
  • en [3], se indica cuántas solicitudes se desean realizar al servicio;
  • en [4], se indica el intervalo [a,b] de generación de los números deseados;
  • en [5], el número de valores devueltos por el servicio será un número aleatorio dentro del intervalo [minCount, maxCount] fijado por el usuario;
  • en [6], antes de devolver su respuesta, el servicio esperará delay milisegundos, donde delay es un número aleatorio dentro del intervalo [minDelay, maxDelay] fijado por el usuario;
  • por defecto, la capa [swing] se dirigirá a la interfaz síncrona del servicio. Para dirigirse a la capa asíncrona, el usuario marcará [7]. En este caso, el servicio de generación se ejecutará en subprocesos separados del evento loop de la interfaz gráfica. La biblioteca Rx dispone de diversas estrategias para la generación de estos subprocesos. El usuario podrá elegir su estrategia en [8];
  • la generación de números se realiza con el botón [9];
 
  • en [10], visualización de los resultados. A continuación explicaremos la estructura de estos;
  • en [11], el número de resultados obtenidos;
  • en [12], el tiempo de ejecución en milisegundos;
  • en [13], el usuario tiene la posibilidad de cancelar la ejecución;

Cada resultado tiene el siguiente formato:

{"idClient":0,"serviceResponse":{"delay":412,"aleas":[146,115,128,174,159,112,162,127],"executedOn":"RxComputationThreadPool-6"},"observedOn":"AWT-EventQueue-0","requestAt":"02:42:47:708","responseAt":"02:42:52:931"}
  • [idClient]: el número de la solicitud. Cabe recordar que se envían varias solicitudes al servicio de generación;
  • [delay]: el tiempo de espera en milisegundos que el servicio ha observado antes de enviar su resultado;
  • [aleas]: los números aleatorios devueltos por el servicio;
  • [executedOn]: el nombre del hilo en el que se ejecutó el servicio;
  • [observedOn]: el nombre del hilo que mostró el resultado. Con una interfaz Swing, solo puede ser el hilo del evento loop, en este caso [AWT-EventQueue-0];
  • [requestAt]: la hora de la solicitud en el formato [heures:minutes:secondes:millisecondes];
  • [responseAt]: la hora de recepción de los resultados en el mismo formato;

A continuación, presentaremos las partes del código útiles para comprender el ejemplo.

2.3. La interfaz síncrona

Image

La capa de servicio [1] presenta la siguiente interfaz:


public interface IService {
  // números aleatorios en [a,b]
  // se generan n números con n aleatorio en el intervalo [minCount, maxCount]
  // los números se generan tras una espera de delay milisegundos,
  // donde [delay] es un número aleatorio en el intervalo [minDelay, maxDelay]
  public ServiceResponse getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay);
}

La respuesta [ServiceResponse] es la siguiente:


public class ServiceResponse {

  // tiempo de espera del servicio
  private int delay;
  // números aleatorios
  private List<Integer> aleas;
  // hilo de ejecución
  private String executedOn;

  // constructores

  public ServiceResponse(int delay, List<Integer> aleas) {
    executedOn = Thread.currentThread().getName();
    this.delay = delay;
    this.aleas = aleas;
  }

  // getters y setters
...
}

La respuesta tiene tres elementos:

  • línea 6: los números aleatorios generados;
  • línea 4: el tiempo de espera observado por el servicio antes de devolver su resultado;
  • línea 8: el hilo de ejecución del servicio;

2.4. La llamada síncrona

Image

A continuación detallamos la llamada síncrona [4] que realiza la capa [swing] al servicio [1]:


  private void doGenerateWithService() {
    // inicio de la espera
    beginWaiting();
    try {
      for (int i = 0; i < nbRequests; i++) {
        UiResponse uiResponse = new UiResponse();
        uiResponse.setIdClient(i);
        uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
        uiResponse.setResponseAt();
        model.add(0, jsonMapper.writeValueAsString(uiResponse));
        jLabelNbReponses.setText(String.valueOf(Integer.parseInt(jLabelNbReponses.getText()) + 1));
      }
    } catch (JsonProcessingException | RuntimeException e) {
      System.out.println(e);
    }
    // fin de espera
    endWaiting();
}
  • líneas 5-12: el bucle de ejecución de las solicitudes [nbRequests] solicitadas por el usuario;
  • línea 8: [service] es la implementación de la interfaz síncrona [IService] presentada en el apartado 2.3;
  • línea 10: [model] es la plantilla mostrada por el componente JList de la pestaña [Response]. Los elementos de esta plantilla son las cadenas jSON de los siguientes elementos de tipo [UiResponse]:

public class UiResponse {

  // id del cliente
  private int idClient;
  // respuesta del servicio
  private ServiceResponse serviceResponse;
  // nombre del hilo de observación
  private String observedOn;
  // hora de la solicitud
  private String requestAt;
  // hora de la respuesta
  private String responseAt;

  // constructores

  public UiResponse() {
    observedOn = Thread.currentThread().getName();
    requestAt = getTimeStamp();
  }
  // métodos privados

  private String getTimeStamp() {
    return new SimpleDateFormat("hh:mm:ss:SSS").format(Calendar.getInstance().getTime());
  }

  // getters y setters
...
}
  • línea 6: la respuesta del servicio de generación de números;
  • línea 4: el n.º de la solicitud a la que se responde;
  • línea 8: el hilo de visualización de esta respuesta. Como se ha dicho, siempre será el hilo del evento loop;
  • líneas 10 y 12: la hora de la solicitud y la de la respuesta;

2.5. Pruebas de llamadas síncronas

Ejecutamos la siguiente configuración:

 

Obtenemos los siguientes resultados en la pestaña [Response]:

 
  • En [1-2], se han obtenido efectivamente 10 respuestas tal y como se había solicitado. Se han insertado en primera posición según su orden de llegada. Se observa que se han obtenido en el orden de las solicitudes;
  • todas se han ejecutado y mostrado en el hilo del evento loop [AWT-EventQueue-0]. Por lo tanto, las consultas se han ejecutado una tras otra en este hilo. No ha habido consultas simultáneas;
  • lo que no se ve aquí es que, durante la ejecución, la interfaz gráfica se bloquea. Por ejemplo, no hay forma de acceder a la pestaña [Response] para ver las respuestas que llegan o interrumpir la ejecución con el botón [Annuler]. Aunque este botón hubiera estado presente en la pestaña [Request], habría sido inutilizable. De hecho, habría entonces dos eventos:
    • el clic en el botón [Générer];
    • el clic en el botón [Annuler];

El clic en el botón [Annuler] solo se gestiona una vez finalizada la operación iniciada por el clic en el botón [Générer]. Acabamos de ver que esta ocupaba el hilo del evento loop durante toda la ejecución, impidiendo así la gestión del clic en el botón [Annuler]. Este es precisamente el tipo de situaciones en las que Rx puede aportar una mejora notable;

2.6. La interfaz asíncrona y su implementación

Ahora nos centramos en la interfaz de la capa [2], así como en su implementación con Rx. Esta no resultará comprensible de inmediato. Simplemente queremos destacar la simplicidad del código de esta implementación.

La interfaz asíncrona es la siguiente:


public interface IRxService {
  // números aleatorios en [a,b]
  // se generan n números con n aleatorio en el intervalo [minCount, maxCount]
  // los números se generan tras una espera de delay milisegundos,
  // donde [delay] es un número aleatorio en el intervalo [minDelay, maxDelay]
  public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse);
}

Las diferencias con respecto a la interfaz síncrona presentada en el apartado 2.3 son las siguientes:

  • la clase [UiResponse] presentada en el apartado 2.3 forma ahora parte de los parámetros del método [getAleas] (línea 6). El motivo es que, dado que las solicitudes se ejecutan ahora en paralelo y que el servicio espera un tiempo aleatorio antes de devolver su resultado, las respuestas no nos llegarán en el orden de las solicitudes. Por lo tanto, pasamos el objeto [UiResponse] que contiene, entre otra información, el número de la solicitud:

  // id del cliente (solicitud)
  private int idClient;
  // respuesta del servicio
  private ServiceResponse serviceResponse;
  // nombre del hilo de observación
  private String observedOn;
  // hora de la solicitud
  private String requestAt;
  // hora de la respuesta
  private String responseAt;
  • El tipo de la respuesta del servicio asíncrono es [Observable<UiResponse>]. El tipo [Observable<>] lo proporciona la biblioteca Rx. El resultado de tipo [Observable<UiResponse>] indica que el método [getAleas] proporciona un flujo de valores de tipo [UiResponse], valores que se envían (pushed) uno a uno a su observador;

Veamos ahora la implementación de esta interfaz:


public class RxService implements IRxService {

  // servicio
  private IService service;

  // fabricante
  public RxService(IService service) {
    this.service = service;
  }

  @Override
  public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse) {
    return Observable.create(subscriber -> {
      try {
        uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
        subscriber.onNext(uiResponse);
      } catch (Exception e) {
        subscriber.onError(e);
      } finally {
        subscriber.onCompleted();
      }
    });
  }
}
  • líneas 7-9: se proporciona al constructor una referencia a la interfaz síncrona [IService]. Es esta la que se encargará de generar los números aleatorios;
  • el observable devuelto por el método [getAleas] se construye mediante el método estático [Observable.create]. Este método permite construir una implementación asíncrona a partir de una implementación síncrona;
  • línea 13: el parámetro del método estático [Observable.create] es aquí una función lambda que recibe como parámetro un tipo [Subscriber], que es, de nuevo, un tipo Rx. Un [Subscriber] es un objeto que se suscribe a un flujo de observables, es decir, un flujo de datos entregados de forma asíncrona. Aquí se utilizan tres métodos de este suscriptor:
    • [Subscriber.onNext] para transmitirle un dato (línea 16);
    • [Subscriber.onError] para transmitirle una excepción (línea 18);
    • [Subscriber.onCompleted] para indicar al suscriptor que el flujo de datos ha finalizado (línea 20);

Puede haber varios suscriptores para un mismo observable. En este caso, solo tendremos un suscriptor que se suscribe a un flujo de un único dato, el generado en las líneas 15-16. El dato es generado por la implementación síncrona del servicio (línea 15) y entregado al suscriptor (línea 16).

Aunque todo esto probablemente resulte confuso, no podemos sino quedarnos impresionados por la extrema concisión de esta implementación asíncrona del servicio.

2.7. La llamada asíncrona

Image

A continuación detallamos la llamada síncrona [5] que realiza la capa [swing] al servicio [2]:


private void doGenerateWithRxService() {
        // inicio de la espera
        beginWaiting();
        // se solicitan los números aleatorios
        Observable<UiResponse> observables = Observable.empty();
        for (int i = 0; i < nbRequests; i++) {
            UiResponse uiResponse = new UiResponse();
            uiResponse.setIdClient(i);
            // programador
            int schedulerIndex = jComboBoxSchedulers.getSelectedIndex();
            switch (schedulerIndex) {
            case 0:
                observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.io()));
                break;
...
            }
        }
...
    }
  • líneas 6-10: ejecución de las solicitudes [nbRequests] solicitadas por el usuario;
  • líneas 7-8: preparación del objeto [UiResponse] que necesita el método [getAleas] del servicio asíncrono (línea 13). Se trata principalmente de registrar el n.º [idClient] de la solicitud;
  • línea 13: se llama al método [getAleas] del servicio asíncrono. Este devuelve un objeto [Observable<UiResponse>]. Esta llamada aún no invoca el servicio síncrono. Volvamos al código del método asíncrono [getAleas]:

  @Override
  public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse) {
    return Observable.create(subscriber -> {
      try {
        uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
        subscriber.onNext(uiResponse);
      } catch (Exception e) {
        subscriber.onError(e);
      } finally {
        subscriber.onCompleted();
      }
    });
}

El código de las líneas 4-11, que llamará al servicio síncrono, solo se ejecuta cuando se registra un suscriptor. Mientras no haya suscriptores, este código no se ejecuta.

Volvamos al código del método [doGenerateWithRxService]:

  • línea 5: se crea un observable vacío (no se observa nada);
  • línea 13: se crea un observable cuyo flujo será la fusión de los flujos asíncronos [nbRequests] asociados a las solicitudes [nbRequests]. Esto se consigue con el método [Observable.mergeWith], que permite fusionar dos flujos asíncronos. En la terminología Rx, [mergeWith] se denomina operador de flujo. Estos operadores tienen la particularidad de que el resultado de la operación suele ser, a su vez, un [Observable]. Al final, tras la línea 17, la variable [observables] designa un único flujo constituido por las respuestas asíncronas [nbRequests] generadas por el servicio asíncrono;
  • línea 13: la operación de fusión podría haberse escrito así:

observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse));

pero hemos escrito:


observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.io()));

Aquí hemos utilizado el operador [subscribeOn] sobre el observable [rxService.getAleas]. Como suele ocurrir, el resultado es de nuevo un observable. El operador [subscribeOn] permite especificar que el observable debe ejecutarse en un hilo proporcionado por un [Scheduler]. Existen varios [Scheduler] posibles adaptados a diferentes situaciones. En la interfaz gráfica, hemos propuesto varios para ver los efectos de cada uno:

  

Esto da como resultado el siguiente código:


    private void doGenerateWithRxService() {
        // inicio de espera
        beginWaiting();
        // se solicitan los números aleatorios
        Observable<UiResponse> observables = Observable.empty();
        for (int i = 0; i < nbRequests; i++) {
            UiResponse uiResponse = new UiResponse();
            uiResponse.setIdClient(i);
            // programador
            int schedulerIndex = jComboBoxSchedulers.getSelectedIndex();
            switch (schedulerIndex) {
            case 0:
                observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.io()));
                break;
            case 1:
                observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.computation()));
                break;
            case 2:
                observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.newThread()));
                break;
            case 3:
                observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.trampoline()));
                break;
            case 4:
                observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.immediate()));
                break;
            }
        }
...
}

Volvamos al código de las líneas 12-14. El programador [Schedulers.io()] asigna un nuevo subproceso a cada observable. Si seguimos el código:

  • línea 5: tenemos un observable vacío;
  • línea 13, iteración 1: observables es la lista [observable0/thread0] (observable observable0 ejecutado en el hilo thread0);
  • línea 13, iteración 2: observables es la lista [observable0/thread0, observable1/thread1];
  • etc...

Al final, tras la línea 28, tenemos un observable resultado de la fusión de los observables [nbRequests] que se ejecutan en diferentes subprocesos [nbRequests]. No todos los programadores funcionan así, como veremos en las pruebas.

Continuemos con el análisis del código de llamada del servicio asíncrono:


private void doGenerateWithRxService() {
        // Inicio de espera
        beginWaiting();
        // se solicitan los números aleatorios
        Observable<UiResponse> observables = Observable.empty();
        for (int i = 0; i < nbRequests; i++) {
        ...
        }
        // observador
        observables = observables.observeOn(SwingScheduler.getInstance());
        // se ejecutan estos observables
        subscriptions.add(observables.subscribe(uiResponse -> {
            updateUi(uiResponse);
        } , th -> {
            System.out.println(th);
            doCancel();
        } , this::doCancel));
    }
  • hemos visto que, al llegar a la línea 10, tenemos un único observable, fusión de [nbRequests] observables que pueden ejecutarse en [nbRequests] hilos diferentes o no, según el programador elegido por el usuario;
  • línea 10: el operador [observeOn] permite especificar en qué hilo se quieren recuperar los datos procedentes del observable, en este caso los objetos de tipo [UiResponse]. En una interfaz Swing, no hay otra opción. Cualquier actualización de la interfaz debe realizarse en el hilo del evento loop. En este caso, los datos del observable se mostrarán en un componente Swing JList. El hilo [SwingScheduler.getInstance()] representa el hilo del evento loop. La clase [SwingScheduler] no proviene de la biblioteca RxJava, sino de la derivada RxSwing;
  • al llegar a la línea 12, el servicio síncrono aún no se ha llamado porque el observable de la línea 10 todavía no tiene ningún suscriptor. Las líneas 12-17 le proporcionan uno, gracias al operador [subscribe]. Los parámetros de este operador son aquí tres funciones lambda:
    • la primera, [uiResponse -> {updateUi(uiResponse);}], admite como parámetro uno de los objetos [UiResponse] generados por el observable. Recordemos que aquí tendremos [nbRequests] objetos de este tipo. El método asociado, updateUi en este caso, debe aprovechar este resultado;
    • La segunda [th -> {System.out.println(th);doCancel();}] admite como parámetro un tipo [Throwable], en este caso una excepción que se produjo durante la ejecución del observable. El método asociado debe aprovechar esta información. Aquí, se muestra en la consola (línea 15) y se cancela la ejecución, lo que tendrá como efecto actualizar ciertos elementos de la interfaz gráfica;
    • el tercer [this::doCancel] se invoca cuando el observable indica que ya no tiene datos que transmitir. Aquí, el observable es la unión de los observables [nbRequests]. El observable resultante indicará que ha terminado cuando todos los observables que lo componen hayan señalado a su vez que han finalizado su trabajo. Por lo tanto, cuando se ejecuta esta tercera función lambda, se han recibido todos los datos. El método local [doCancel] actualiza la interfaz gráfica para reflejar que la ejecución ha finalizado;

La variable [subscriptions] se define de la siguiente manera:


    // suscripciones a los observables
protected List<Subscription> subscriptions = new ArrayList<Subscription>();

El tipo [Subscription] representa una suscripción, es decir, el vínculo entre un suscriptor [Subscriber] y lo que este observa [Observable]. Aquí hemos utilizado una lista de suscripciones, aunque en este ejemplo solo hay una. El método local [doCancel], que se ejecuta cuando el observable indica que ya no tiene datos que transmitir, es el siguiente:


    @Override
    protected void doCancel() {
        // fin de espera
        endWaiting();
        // en el caso de suscripciones
        if (jCheckBoxRxSwing.isSelected() && subscriptions != null) {
            subscriptions.forEach(Subscription::unsubscribe);
        }
}
  • la línea 7 da de baja a todos los suscriptores del observable;

De esta explicación resumida, podemos destacar los siguientes puntos clave:

  • el tipo [Observable] designa un flujo de valores, valores que se envían uno a uno a los suscriptores u observadores;
  • el tipo [Subscriber] designa a un suscriptor del tipo [Observable];
  • el tipo [Subscription] designa una suscripción, es decir, el vínculo entre un [Subscriber] y un [Observable];
  • el tipo [Observable] admite operadores [mergeWith, empty, subscribeOn, observeOn, ...] que, en su mayoría, producen observables. Estos operadores sirven para configurar el observable antes de su ejecución:
    • lo que se quiere observar;
    • el hilo en el que se ejecuta el observable;
    • el hilo en el que el suscriptor recibe los datos del observable;
  • se distinguen dos tipos de observables: los [froid / cold] y los [chaud / hot]. Un observable frío se ejecuta íntegramente cada vez que se suscribe un nuevo suscriptor. Si cada ejecución produce los mismos datos, cada nuevo suscriptor recibe los mismos datos que el anterior. Un observable caliente suele producir datos de forma continua. Cuando un suscriptor se suscribe, recibe los datos emitidos a partir del momento de su suscripción. No recibe los datos que se hayan podido emitir anteriormente. En nuestro ejemplo, el observable es frío: se vuelve a ejecutar por completo cada vez que se suscribe un nuevo suscriptor. ¿Qué es lo que realmente se ejecuta en nuestro ejemplo? Para saberlo, hay que volver a la definición del observable observado:

  @Override
  public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse) {
    return Observable.create(subscriber -> {
      try {
        uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
        subscriber.onNext(uiResponse);
      } catch (Exception e) {
        subscriber.onError(e);
      } finally {
        subscriber.onCompleted();
      }
    });
}

Con cada nuevo suscriptor, se vuelve a ejecutar la función lambda, parámetro del método [Observable.create] (línea 3). Por lo tanto, son las líneas 4-11 las que se ejecutan para cada nuevo suscriptor [subscriber];

2.8. Pruebas de llamadas asíncronas

Comenzamos mostrando el efecto de los diferentes programadores propuestos. Para ello, utilizamos los siguientes parámetros:

 

En [1-2] establecemos valores pequeños para que, si las solicitudes se ejecutan en un mismo hilo, no haya que esperar demasiado tiempo.

2.8.1. con el programador [Schedulers.io]

 

Se pueden observar los siguientes puntos:

  • las respuestas se obtienen en un orden diferente al de las consultas (véase idClient);
  • cada solicitud se ha ejecutado en un hilo diferente;
  • la interfaz gráfica ya no está bloqueada:
    • se puede pasar de una pestaña a otra;
    • se ven llegar los datos;
    • no da tiempo a ver el botón [Annuler] porque la ejecución es demasiado rápida. Lo pondremos de relieve en otra prueba;

2.8.2. con el programador [Schedulers.computation]

 

Se pueden observar los siguientes puntos:

  • se obtienen las respuestas en un orden que no es el de las consultas (véase idClient);
  • las consultas se ejecutaron en 8 subprocesos;
  • el subproceso n.º 3 se utilizó para las consultas 8 y 0;
  • el subproceso n.º 4 se utilizó para las consultas 9 y 1;
  • las demás consultas han tenido cada una un subproceso diferente;

El programador [Schedulers.computation] utiliza tantos subprocesos como núcleos tiene la máquina utilizada. Esta información se obtiene mediante la expresión [Runtime.getRuntime().availableProcessors()].

2.8.3. con el programador [Schedulers.newThread]

 

El funcionamiento es similar al del programador [Schedulers.io].

2.8.4. con los programadores [Schedulers.trampoline, Schedulers.immediate]

 

El funcionamiento es síncrono. Todas las consultas se ejecutan en el hilo del evento loop. No hay que generalizar este resultado, sino simplemente señalar que, en este ejemplo concreto, los dos programadores han funcionado de forma síncrona.

2.9. Casos límite

En este ejemplo vamos a trabajar con los programadores que permiten un funcionamiento asíncrono. En primer lugar, aumentamos el número de solicitudes a 100 con el programador [Schedulers.computation], que aquí trabaja con 8 subprocesos. Obtenemos el siguiente resultado:

 
  • en [1], el botón [Annuler] está presente y se puede utilizar (funcionamiento asíncrono);

Ahora, dejemos que la ejecución llegue hasta el final:

 

En [2] vemos que la ejecución de las 100 consultas tardó unos 4 segundos (en 8 subprocesos).

Ahora, realicemos esas mismas 100 consultas con el programador [Schedulers.newThread], que ejecuta cada consulta en un subproceso independiente:

 

En [1], vemos que la ejecución de las 100 consultas (en 100 subprocesos) tardó medio segundo. Por lo tanto, es claramente más rápido que con el programador [Schedulers.computation].

Ahora, realicemos 800 solicitudes en las mismas condiciones, siempre con el programador [Schedulers.newThread]. Obtenemos los siguientes resultados:

 

Las 800 consultas se ejecutan en aproximadamente 1 segundo.

Cuando aumentamos este número (más allá de 2500 consultas en mi máquina —ejecutadas en 1,5 s—, este número depende, por supuesto, en gran medida del entorno de trabajo en el momento de la ejecución), acabamos obteniendo la siguiente excepción:

  

Por lo tanto, se produce un desbordamiento de pila. Las pruebas muestran que el funcionamiento del programador [Schedulers.newThread] no es determinista. Se puede obtener la excepción anterior, realizar nuevas pruebas, volver luego a la configuración que provocó la excepción y ya no obtenerla.

2.10. Conclusion

Hemos mostrado un ejemplo de uso de la biblioteca Rx. Resumamos lo que hemos aprendido:

Partimos de la siguiente arquitectura:

Image

  • en [4], la capa [swing] realizaba llamadas síncronas a la capa [service];
  • en [5], la capa [swing] realizaba llamadas asíncronas a la capa [rxService], que a su vez llamaba de forma síncrona a la capa [service] desde [6];

Lo primero que observamos es que la biblioteca Rx permitía crear fácilmente la interfaz asíncrona [rxService] a partir de la interfaz síncrona [service] (véase el apartado 2.4). Esta es una lección importante, ya que significa que se puede hacer evolucionar fácilmente una aplicación síncrona hacia una asíncrona.

En la capa [swing], se han escrito dos métodos separados:

  • uno para realizar llamadas síncronas al servicio (véase el apartado 2.4);
  • el otro para realizar llamadas asíncronas al servicio (véase el apartado 2.7);

La escritura de las llamadas asíncronas resultó ser mucho más compleja que la de las llamadas síncronas. No obstante, quienes hayan realizado programación concurrente con varios subprocesos que sincronizar encontrarán que la solución Rx resulta más sencilla de escribir y evita todos los problemas de sincronización y comunicación entre subprocesos, que son problemas difíciles. Durante esta escritura, hemos distinguido los siguientes puntos importantes:

  • el tipo [Observable] designa un flujo de eventos (valores) que pueden ser (aunque no necesariamente) asíncronos y que pueden observarse;
  • el tipo [Subscriber] designa a un suscriptor de un tipo [Observable];
  • el tipo [Subscription] designa una suscripción, es decir, el vínculo entre un [Subscriber] y un [Observable];
  • el tipo [Observable] admite operadores [mergeWith, empty, subscribeOn, observeOn, ...] que, en su mayoría, producen observables. Estos operadores sirven para configurar el observable antes de su ejecución:
    • lo que se quiere observar;
    • el hilo en el que se ejecuta el observable;
    • el hilo en el que el suscriptor recibe los datos del observable;
  • se distinguen dos tipos de observables: los [froid / cold] y los [chaud / hot]. Un observable frío se ejecuta por completo cada vez que se suscribe un nuevo suscriptor. Si cada ejecución genera los mismos datos, cada nuevo suscriptor recibe los mismos datos que el anterior. Un observable caliente suele generar datos de forma continua. Cuando un suscriptor se suscribe, recibe los datos emitidos a partir del momento de su suscripción. No recibe los datos que se hayan podido emitir anteriormente. En nuestro ejemplo, el observable es frío: se vuelve a ejecutar por completo con cada nuevo suscriptor.

Ahora que hemos visto un ejemplo que nos ha mostrado el interés de la biblioteca Rx, vamos a presentarla con más detalle.

La biblioteca Rx cuenta con numerosos métodos que tienen parámetros genéricos en su firma. Haremos un breve repaso de estas firmas (apartado 3). Los parámetros de estos métodos son, en su mayoría, interfaces funcionales (Java 8), es decir, interfaces que solo tienen un único método. Los parámetros efectivos deben ser, por tanto, instancias de estas interfaces. Antes de Java 8, se solía implementar una interfaz mediante una clase anónima. Con Java 8, y si la interfaz es una interfaz funcional, resulta más conciso implementarla con una función lambda. Por lo tanto, presentaremos estas últimas (apartado 4). Una vez hecho esto, presentaremos la clase [Stream] (apartado 5), que permite procesar colecciones Java con funciones lambda. Esta clase es interesante porque la clase [Observable] de RxJava toma prestados:

  • algunos métodos;
  • la misma forma de encadenar métodos entre sí para procesar un mismo observable;

A continuación, presentaremos las interfaces funcionales específicas de la biblioteca RxJava (apartado 6). Continuaremos con los principales elementos de la biblioteca Rx [Observable, Subscriber, Subscription, opérateurs] (apartado 7). La clase [Observable] cuenta con varias decenas de operadores que, a su vez, están sobrecargados varias veces. Esto genera inicialmente una gran complejidad, ya que estos operadores y sus sobrecargas a veces solo difieren en un detalle y, sin experiencia, resulta difícil saber qué operador utilizar. Solo presentaremos un número limitado de operadores y, en la mayoría de los casos, ignoraremos sus sobrecargas.

Toda la parte anterior se realizará con la biblioteca RxJava en aplicaciones de consola sencillas. Una vez adquirida la biblioteca RxJava, la utilizaremos en dos tipos de aplicaciones gráficas:

  • en el apartado 8 volveremos sobre la aplicación Swing de ejemplo para detallarla más. Utilizaremos entonces la biblioteca RxSwing;
  • en el apartado 9 crearemos una aplicación Android con la biblioteca RxAndroid;

Cuando haya terminado todo esto, el lector dispondrá de las herramientas necesarias para valerse por sí mismo. Probablemente le llevará algún tiempo llegar a utilizar la biblioteca Rx de forma intuitiva. Esta biblioteca me ha parecido especialmente interesante. Sin embargo, me ha resultado compleja de entender y el tiempo de aprendizaje ha sido long. Espero que este documento acorte ese tiempo para el lector. Me parece que el esfuerzo merece la pena.