Skip to content

2. Un ejemplo introductorio

Mi primer contacto con RxJava fue a través de cursos y tutoriales que encontré en Internet. Aparte del hecho de que la teoría utilizaba conceptos con los que no estaba familiarizado y que me costaba entender, no acababa de ver cómo podía ser útil en la vida real. Por lo tanto, comenzaremos presentando un ejemplo (espero que sencillo) en el que el uso de RxJava conduce a una simplificación real del código y, a partir de ahí, intentaremos identificar los elementos importantes de esta biblioteca.

La biblioteca RxJava se basa en el siguiente concepto: un flujo de elementos de tipo T Observable<T> es observado por uno o más suscriptores (suscriptores, observadores, consumidores) Subscriber<T>. La biblioteca RxJava permite que el flujo Observable<T> se ejecute en un hilo T1 y su observador Subscriber<T> en un hilo T2 sin que el desarrollador tenga que preocuparse por gestionar el ciclo de vida de estos hilos ni por cuestiones naturalmente complejas, como compartir datos entre hilos y sincronizarlos para ejecutar una tarea global. Por lo tanto, facilita la programación asíncrona.

Un flujo Observable<T> genera elementos de tipo T, que se pueden observar a medida que se producen. Si el observador y el observable (término utilizado de manera general para referirse al tipo Observable<T>) se encuentran en el mismo hilo, el observable solo puede generar el elemento (i+1) una vez que el observador haya consumido el elemento i. Son pocos los casos en los que esta arquitectura resulta útil. Si el observador y el observable no se encuentran en el mismo hilo, entonces el observable y su observador se comportan de forma autónoma: el observable emite a su propio ritmo y el observador consume a su propio ritmo. Aquí es donde reside el valor de la biblioteca. Hasta ahora, solo hemos hablado de un único observador. En realidad, un observable puede tener cualquier número de observadores.

2.1. La arquitectura de la aplicación de ejemplo

La aplicación de ejemplo tiene la siguiente arquitectura:

Image

  • en [1], una capa de servicio genera listas de números aleatorios. Esta capa se ejecuta en el mismo hilo que el método [swing] que la utiliza. A continuación, genera sus números de forma sincrónica;
  • en [2], una capa de adaptación ligera implementada con RxJava permite presentar una implementación asíncrona del mismo servicio a la capa [swing]: este servicio puede ejecutarse en un hilo diferente al del método [swing] que lo utiliza;
  • La llamada [4] es sincrónica, mientras que las llamadas [5-6] son asincrónicas;

Lo que queremos demostrar aquí es que la biblioteca Rx facilita la transformación de una interfaz síncrona en una asíncrona. ¿Por qué es útil esto? Los eventos en una interfaz Swing se procesan en un hilo comúnmente denominado «bucle de eventos». Los eventos se ponen en cola y se procesan uno tras otro. El evento Ei+1 solo puede procesarse una vez que el evento anterior Ei se haya procesado por completo. Por lo tanto, es importante que el manejo de eventos sea lo más breve posible para que la GUI siga respondiendo. A veces, el manejo de un evento puede llevar mucho tiempo. Este es el caso si el manejo implica acceso a la red. Si no queremos que la GUI se congele de una forma inaceptable para el usuario, estas operaciones de red deben realizarse en subprocesos separados del bucle de eventos para liberarlo. Esto nos lleva al ámbito de la programación concurrente (donde múltiples subprocesos se ejecutan en paralelo), que con razón se considera difícil. La biblioteca Rx ofrece una solución sencilla y elegante a este problema.

Para simular procesos de larga duración, el servicio del ejemplo entrega sus números aleatorios tras un cierto retraso, de modo que podamos observar el comportamiento de la interfaz gráfica de usuario.

2.2. El ejecutable

El ejecutable de la aplicación de ejemplo se encuentra en la carpeta [dvp/executables] de los ejemplos:

Hay varias formas de ejecutar el archivo [swing-01] dependiendo de la configuración del equipo utilizado para ejecutarlo. Por ejemplo, puede seguir el proceso [1-3]. Esto mostrará la siguiente interfaz gráfica de usuario:

 
  • La interfaz tiene dos pestañas [1-2]: una [Solicitud] para enviar una solicitud al servicio del generador de números aleatorios, y la otra [Respuesta] para mostrar los números recibidos;
  • En [3], se especifica el número de solicitudes que se desean realizar al servicio;
  • En [4], se especifica el rango de generación de números deseado [a,b];
  • En [5], el número de valores devueltos por el servicio será un número aleatorio dentro del intervalo [minCount, maxCount] establecido por el usuario;
  • en [6], antes de devolver su respuesta, el servicio esperará delay milisegundos, donde delay es un número aleatorio dentro del intervalo definido por el usuario [minDelay, maxDelay];
  • Por defecto, la capa [swing] utilizará la interfaz síncrona del servicio. Para utilizar la capa asíncrona, el usuario marcará [7]. En este caso, el servicio de generación se ejecutará en subprocesos separados del bucle de eventos de la GUI. La biblioteca Rx ofrece varias estrategias para generar estos subprocesos. El usuario puede seleccionar su estrategia en [8];
  • La generación de números se realiza mediante el botón [9];
 
  • [10] muestra los resultados. Explicaremos su estructura;
  • en [11], el número de resultados obtenidos;
  • en [12], el tiempo de ejecución en milisegundos;
  • en [13], el usuario tiene la opción de cancelar la ejecución;

Cada resultado tiene el siguiente formato:

{"idClient":0,"serviceResponse":{"delay":412,"aleas":[146,115,128,174,159,112,162,127],"executedOn":"RxComputationThreadPool-6"},"observedOn":"AWT-EventQueue-0","requestAt":"02:42:47:708","responseAt":"02:42:52:931"}
  • [idClient]: el ID de la solicitud. Tenga en cuenta que se envían varias solicitudes al servicio de generación;
  • [delay]: el tiempo de espera en milisegundos que el servicio observó antes de enviar su resultado;
  • [aleas]: los números aleatorios devueltos por el servicio;
  • [executedOn]: el nombre del hilo en el que se ejecutó el servicio;
  • [observedOn]: el nombre del hilo que mostró el resultado. Con una interfaz Swing, este solo puede ser el hilo del bucle de eventos, en este caso [AWT-EventQueue-0];
  • [requestAt]: la hora de la solicitud en el formato [horas:minutos:segundos:milisegundos];
  • [responseAt]: la hora en que se recibieron los resultados en el mismo formato;

A continuación, presentaremos los fragmentos de código necesarios para comprender el ejemplo.

2.3. La interfaz síncrona

Image

La capa de servicio [1] presenta la siguiente interfaz:


public interface IService {
  // random numbers in [a,b]
  // n numbers are generated with random n in the interval [minCount, maxCount]
  // numbers are generated after a delay of milliseconds,
  // where [delay] is a random number in the range [minDelay, maxDelay]
  public ServiceResponse getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay);
}

El [ServiceResponse] es el siguiente:


public class ServiceResponse {
 
  // service waiting time
  private int delay;
  // random numbers
  private List<Integer> aleas;
  // execution thread
  private String executedOn;
 
  // manufacturers
 
  public ServiceResponse(int delay, List<Integer> aleas) {
    executedOn = Thread.currentThread().getName();
    this.delay = delay;
    this.aleas = aleas;
  }
 
  // getters and setters
...
}

La respuesta tiene tres partes:

  • línea 6: los números aleatorios generados;
  • línea 4: el tiempo de espera observado por el servicio antes de devolver su resultado;
  • línea 8: el hilo de ejecución del servicio;

2.4. La llamada síncrona

Image

A continuación detallaremos la llamada síncrona [4] realizada por la capa [swing] al servicio [1]:


  private void doGenerateWithService() {
    // start waiting
    beginWaiting();
    try {
      for (int i = 0; i < nbRequests; i++) {
        UiResponse uiResponse = new UiResponse();
        uiResponse.setIdClient(i);
        uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
        uiResponse.setResponseAt();
        model.add(0, jsonMapper.writeValueAsString(uiResponse));
        jLabelNbReponses.setText(String.valueOf(Integer.parseInt(jLabelNbReponses.getText()) + 1));
      }
    } catch (JsonProcessingException | RuntimeException e) {
      System.out.println(e);
    }
    // end waiting
    endWaiting();
}
  • líneas 5–12: el bucle que procesa las [nbRequests] solicitudes realizadas por el usuario;
  • línea 8: [service] es la implementación de la interfaz síncrona [IService] presentada en la sección 2.3;
  • línea 10: [model] es el modelo mostrado por el componente JList de la pestaña [Response]. Los elementos de este modelo son cadenas JSON de elementos de tipo [UiResponse] como se indica a continuación:

public class UiResponse {
 
  // customer id
  private int idClient;
  // service response
  private ServiceResponse serviceResponse;
  // observation thread name
  private String observedOn;
  // query time
  private String requestAt;
  // response time
  private String responseAt;
 
  // manufacturers
 
  public UiResponse() {
    observedOn = Thread.currentThread().getName();
    requestAt = getTimeStamp();
  }
  // private methods
 
  private String getTimeStamp() {
    return new SimpleDateFormat("hh:mm:ss:SSS").format(Calendar.getInstance().getTime());
  }
 
  // getters and setters
...
}
  • línea 6: la respuesta del servicio de generación de números;
  • línea 4: el número de la solicitud a la que se responde;
  • línea 8: el hilo que muestra esta respuesta. Como se ha mencionado, este será siempre el hilo del bucle de eventos;
  • líneas 10 y 12: la hora de la solicitud y la hora de la respuesta;

2.5. Prueba de llamadas sincrónicas

Ejecutamos la siguiente configuración:

 

En la pestaña [Respuesta] obtenemos los siguientes resultados:

 
  • En [1-2], efectivamente recibimos 10 respuestas tal y como se solicitó. Se insertaron en la primera posición en el orden en que llegaron. Podemos ver que se recibieron en el orden de las solicitudes;
  • Todas se ejecutaron y se mostraron en el hilo del bucle de eventos [AWT-EventQueue-0]. Por lo tanto, las solicitudes se ejecutaron una tras otra en este hilo. No hubo solicitudes simultáneas;
  • lo que no se ve aquí es que, durante la ejecución, la interfaz gráfica de usuario (GUI) se bloquea. Por ejemplo, no hay forma de acceder a la pestaña [Respuesta] para ver las respuestas entrantes ni de detener la ejecución mediante el botón [Cancelar]. Incluso si este botón hubiera estado presente en la pestaña [Solicitud], habría sido inutilizable. De hecho, habría entonces dos eventos:
    • hacer clic en el botón [Generar];
    • hacer clic en el botón [Cancelar];

El clic en el botón [Cancel] solo se gestiona una vez que ha finalizado la operación desencadenada por el clic en el botón [Generate]. Acabamos de ver que esta operación ocupó el hilo del bucle de eventos durante toda la ejecución, impidiendo así la gestión del clic en el botón [Cancel]. Este es típicamente el tipo de situación en la que Rx puede aportar una mejora significativa;

2.6. La interfaz asíncrona y su implementación

Ahora veremos la interfaz de la capa [2] y su implementación con Rx. Esto no quedará claro de inmediato. Simplemente queremos destacar la simplicidad del código en esta implementación.

Image

La interfaz asíncrona es la siguiente:


public interface IRxService {
  // random numbers in [a,b]
  // n numbers are generated with random n in the interval [minCount, maxCount]
  // numbers are generated after a delay of milliseconds,
  // where [delay] is a random number in the range [minDelay, maxDelay]
  public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse);
}

Las diferencias con respecto a la interfaz síncrona presentada en la sección 2.3 son las siguientes:

  • la clase [UiResponse] presentada en la sección 2.3 forma ahora parte de los parámetros del método [getAleas] (línea 6). El motivo es que, dado que las solicitudes se ejecutan ahora en paralelo y el servicio espera un tiempo aleatorio antes de devolver su resultado, las respuestas no nos llegarán en el orden de las solicitudes. Por lo tanto, pasamos el objeto [UiResponse], que contiene, entre otra información, el ID de la solicitud:

  // id du client (requête)
  private int idClient;
  // réponse du service
  private ServiceResponse serviceResponse;
  // nom du thread d'observation
  private String observedOn;
  // heure de la requête
  private String requestAt;
  // heure de la réponse
  private String responseAt;
  • El tipo de respuesta del servicio asíncrono es de tipo [Observable<UiResponse>]. El tipo [Observable<>] lo proporciona la biblioteca Rx. El resultado [Observable<UiResponse>] indica que el método [getAleas] proporciona un flujo de valores de tipo [UiResponse], que se envían uno a uno a su observador;

Ahora veamos la implementación de esta interfaz:


public class RxService implements IRxService {
 
  // service
  private IService service;
 
  // manufacturer
  public RxService(IService service) {
    this.service = service;
  }
 
  @Override
  public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse) {
    return Observable.create(subscriber -> {
      try {
        uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
        subscriber.onNext(uiResponse);
      } catch (Exception e) {
        subscriber.onError(e);
      } finally {
        subscriber.onCompleted();
      }
    });
  }
}
  • líneas 7–9: proporcionamos al constructor una referencia a la interfaz síncrona [IService]. Esta interfaz se encargará de la generación de números aleatorios;
  • el observable devuelto por el método [getAleas] se construye mediante el método estático [Observable.create]. Este método nos permite crear una implementación asíncrona a partir de una síncrona;
  • línea 13: el parámetro del método estático [Observable.create] es aquí una función lambda que toma un tipo [Subscriber] como parámetro, de nuevo un tipo Rx. Un [Subscriber] es un objeto que se suscribe a un flujo de observables, es decir, un flujo de datos entregados de forma asíncrona. Aquí utilizamos tres métodos de este suscriptor:
    • [Subscriber.onNext] para pasarle datos (línea 16);
    • [Subscriber.onError] para enviarle una excepción (línea 18);
    • [Subscriber.onCompleted] para indicar al suscriptor que el flujo de datos ha finalizado (línea 20);

Puede haber varios suscriptores para el mismo observable. Aquí, tendremos un solo suscriptor suscrito a un flujo de un único dato, el que se genera en las líneas 15-16. El dato es generado por la implementación síncrona del servicio (línea 15) y devuelto al suscriptor (línea 16).

Aunque todo esto resulte algo confuso, no se puede dejar de admirar la extrema concisión de esta implementación asíncrona del servicio.

2.7. La llamada asíncrona

Image

Ahora examinaremos la llamada síncrona [5] realizada por la capa [swing] al servicio [2]:


private void doGenerateWithRxService() {
        // start waiting
        beginWaiting();
        // we ask for the random numbers
        Observable<UiResponse> observables = Observable.empty();
        for (int i = 0; i < nbRequests; i++) {
            UiResponse uiResponse = new UiResponse();
            uiResponse.setIdClient(i);
            // scheduler
            int schedulerIndex = jComboBoxSchedulers.getSelectedIndex();
            switch (schedulerIndex) {
            case 0:
                observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.io()));
                break;
...
            }
        }
...
    }
  • líneas 6–10: ejecución de las [nbRequests] solicitudes solicitadas por el usuario;
  • líneas 7-8: preparación del objeto [UiResponse] requerido por el método [getAleas] del servicio asíncrono (línea 13). Esto implica principalmente registrar el [idClient] de la solicitud;
  • línea 13: se invoca el método [getAleas] del servicio asíncrono. Devuelve un objeto [Observable<UiResponse>]. Esta llamada aún no invoca el servicio síncrono. Volvamos al código del [getAleas] asíncrono:

  @Override
  public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse) {
    return Observable.create(subscriber -> {
      try {
        uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
        subscriber.onNext(uiResponse);
      } catch (Exception e) {
        subscriber.onError(e);
      } finally {
        subscriber.onCompleted();
      }
    });
}

El código de las líneas 4 a 11, que llama al servicio síncrono, solo se ejecuta cuando un suscriptor se registra. Mientras no haya suscriptores, este código no se ejecuta.

Volvamos al código del método [doGenerateWithRxService]:

  • línea 5: creamos un observable vacío (no se observa nada);
  • línea 13: creamos un observable cuyo flujo será la fusión de los flujos asíncronos [nbRequests] asociados a las solicitudes [nbRequests]. Esto se consigue utilizando el método [Observable.mergeWith], que permite fusionar dos flujos asíncronos. En la terminología de Rx, [mergeWith] se denomina operador de flujo. Estos operadores tienen la característica de que el resultado de la operación es, en la mayoría de los casos, otro [Observable]. En última instancia, después de la línea 17, la variable [observables] hace referencia a un único flujo compuesto por las [nbRequests] respuestas asíncronas realizadas por el servicio asíncrono;
  • línea 13: la operación de fusión se podría haber escrito como:

observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse));

pero escribimos:


observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.io()));

Aquí hemos utilizado el operador [subscribeOn] en el observable [rxService.getAleas]. Como suele ocurrir, el resultado es de nuevo un observable. El operador [subscribeOn] especifica que el observable debe ejecutarse en un hilo proporcionado por un [Scheduler]. Existen varios [Schedulers] posibles adecuados para diferentes situaciones. En la interfaz gráfica de usuario, hemos proporcionado varias opciones para ver en qué se diferencian:

  

El resultado es el siguiente código:


    private void doGenerateWithRxService() {
        // start waiting
        beginWaiting();
        // we ask for the random numbers
        Observable<UiResponse> observables = Observable.empty();
        for (int i = 0; i < nbRequests; i++) {
            UiResponse uiResponse = new UiResponse();
            uiResponse.setIdClient(i);
            // scheduler
            int schedulerIndex = jComboBoxSchedulers.getSelectedIndex();
            switch (schedulerIndex) {
            case 0:
                observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.io()));
                break;
            case 1:
                observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.computation()));
                break;
            case 2:
                observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.newThread()));
                break;
            case 3:
                observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.trampoline()));
                break;
            case 4:
                observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.immediate()));
                break;
            }
        }
...
}

Repasemos el código de las líneas 12–14. El programador [Schedulers.io()] asigna un nuevo subproceso a cada observable. Si seguimos el código:

  • línea 5: tenemos un observable vacío;
  • línea 13, iteración 1: observables es la lista [observable0/thread0] (Observable observable0 ejecutándose en el hilo thread0);
  • línea 13, iteración 2: observables es la lista [observable0/thread0, observable1/thread1];
  • etc...

En definitiva, tras la línea 28, tenemos un observable resultante de la fusión de [nbRequests] observables que se ejecutan en [nbRequests] subprocesos diferentes. No todos los programadores funcionan de esta manera, como veremos durante las pruebas.

Sigamos examinando el código para llamar al servicio asíncrono:


private void doGenerateWithRxService() {
        // start waiting
        beginWaiting();
        // we ask for the random numbers
        Observable<UiResponse> observables = Observable.empty();
        for (int i = 0; i < nbRequests; i++) {
        ...
        }
        // observer
        observables = observables.observeOn(SwingScheduler.getInstance());
        // these observables are executed
        subscriptions.add(observables.subscribe(uiResponse -> {
            updateUi(uiResponse);
        } , th -> {
            System.out.println(th);
            doCancel();
        } , this::doCancel));
    }
  • Hemos visto que, al llegar a la línea 10, tenemos un único observable, una fusión de [nbRequests] observables que pueden ejecutarse o no en [nbRequests] subprocesos diferentes, dependiendo del programador elegido por el usuario;
  • Línea 10: El operador [observeOn] nos permite especificar en qué hilo queremos recuperar los datos del observable, en este caso los objetos [nbRequests] de tipo [UiResponse]. En una interfaz Swing, no tenemos otra opción. Cualquier actualización de la interfaz debe realizarse en el hilo del bucle de eventos. Aquí, los datos del observable se mostrarán en un componente JList de Swing. El hilo [SwingScheduler.getInstance()] representa el hilo del bucle de eventos. La clase [SwingScheduler] no procede de la biblioteca RxJava, sino de la biblioteca RxSwing;
  • cuando llegamos a la línea 12, el servicio síncrono aún no se ha llamado porque el observable de la línea 10 todavía no tiene un suscriptor. Las líneas 12–17 proporcionan uno, utilizando el operador [subscribe]. Los parámetros de este operador son tres funciones lambda:
    • la primera [uiResponse -> {updateUi(uiResponse);}] toma como parámetro uno de los objetos [UiResponse] producidos por el observable. Recordemos que aquí tendremos [nbRequests] objetos de este tipo. El método asociado, updateUi en este caso, debe procesar este resultado;
    • la segunda [th -> {System.out.println(th);doCancel();}] toma como parámetro un tipo [Throwable], en este caso una excepción que se produjo durante la ejecución del observable. El método asociado debe procesar esta información. Aquí, la mostramos en la consola (línea 15) y cancelamos la ejecución, lo que actualizará ciertos elementos de la interfaz gráfica de usuario;
    • el tercero [this::doCancel] se invoca cuando el observable indica que ya no tiene más datos que transmitir. Aquí, el observable es la unión de [nbRequests] observables. El observable resultante indicará que ha finalizado cuando todos los observables que lo componen hayan indicado a su vez que han terminado su trabajo. Así que, cuando se ejecuta esta tercera función lambda, hemos recibido todos los datos. El método local [doCancel] actualiza la interfaz gráfica de usuario para reflejar que la ejecución ha finalizado;

La variable [subscriptions] se define de la siguiente manera:


    // les souscriptions aux observables
protected List<Subscription> subscriptions = new ArrayList<Subscription>();

El tipo [Subscription] representa una suscripción, es decir, el vínculo entre un suscriptor [Subscriber] y lo que está observando [Observable]. Aquí hemos utilizado una lista de suscripciones, aunque en este ejemplo solo hay una. El método local [doCancel], que se ejecuta cuando el observable indica que ya no tiene más datos que transmitir, es el siguiente:


    @Override
    protected void doCancel() {
        // fin attente
        endWaiting();
        // dans le cas de souscriptions
        if (jCheckBoxRxSwing.isSelected() && subscriptions != null) {
            subscriptions.forEach(Subscription::unsubscribe);
        }
}
  • La línea 7 da de baja a todos los suscriptores del observable;

De esta breve explicación, podemos extraer los siguientes puntos clave:

  • el tipo [Observable] denota un flujo de valores, que se envían uno a uno a los suscriptores u observadores;
  • el tipo [Subscriber] denota un suscriptor del tipo [Observable];
  • el tipo [Subscription] denota una suscripción, es decir, el vínculo entre un [Subscriber] y un [Observable];
  • el tipo [Observable] admite operadores [mergeWith, empty, subscribeOn, observeOn, ...], la mayoría de los cuales producen observables. Estos operadores se utilizan para configurar el observable antes de que se ejecute:
    • qué observar;
    • el hilo en el que se ejecuta el observable;
    • el hilo en el que el suscriptor recibe datos del observable;
  • Hay dos tipos de observables: [cold] y [hot]. Un observable cold se ejecuta por completo para cada nuevo suscriptor. Si cada ejecución produce los mismos datos, cada nuevo suscriptor recibe los mismos datos que el anterior. Un observable hot generalmente produce datos de forma continua. Cuando un suscriptor se suscribe, recibe los datos emitidos desde el momento de su suscripción. No recibe datos que puedan haber sido emitidos anteriormente. En nuestro ejemplo, el observable es cold: se vuelve a ejecutar por completo para cada nuevo suscriptor. ¿Qué es lo que se ejecuta realmente en nuestro ejemplo? Para averiguarlo, tenemos que volver a la definición del observable observado:

  @Override
  public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse) {
    return Observable.create(subscriber -> {
      try {
        uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
        subscriber.onNext(uiResponse);
      } catch (Exception e) {
        subscriber.onError(e);
      } finally {
        subscriber.onCompleted();
      }
    });
}

Por cada nuevo suscriptor, se vuelve a ejecutar la función lambda, que es un parámetro del método [Observable.create] (línea 3). Por lo tanto, las líneas 4–11 se ejecutan por cada nuevo suscriptor [subscriber];

2.8. Prueba de llamadas asíncronas

Comenzamos demostrando el efecto de los diferentes programadores disponibles. Para ello, utilizamos los siguientes parámetros:

 

Establecemos [1-2] en valores pequeños para que, aunque las solicitudes se ejecuten en el mismo subproceso, no tengamos que esperar demasiado.

2.8.1. con el programador [Schedulers.io]

 

Cabe señalar lo siguiente:

  • las respuestas se reciben en un orden que no coincide con el orden de las solicitudes (véase idClient);
  • cada solicitud se ejecutó en un hilo diferente;
  • esta vez la interfaz gráfica ya no se ha bloqueado:
    • se puede cambiar de pestaña;
    • vemos cómo llegan los datos;
    • no hay tiempo para ver el botón [Cancelar] porque la ejecución es demasiado rápida. Destacaremos esto en otra prueba;

2.8.2. con el programador [Schedulers.computation]

 

Cabe señalar lo siguiente:

  • las respuestas se reciben en un orden que no coincide con el orden de las solicitudes (véase idClient);
  • las solicitudes se ejecutaron en 8 subprocesos;
  • el subproceso n.º 3 se utilizó para las solicitudes 8 y 0;
  • el subproceso n.º 4 se utilizó para las solicitudes 9 y 1;
  • cada una de las demás solicitudes tenía un hilo diferente;

El programador [Schedulers.computation] utiliza tantos subprocesos como núcleos haya en la máquina que se esté utilizando. Esta información se obtiene mediante la expresión [Runtime.getRuntime().availableProcessors()].

2.8.3. con el programador [Schedulers.newThread]

 

El comportamiento es similar al del programador [Schedulers.io].

2.8.4. con los programadores [Schedulers.trampoline, Schedulers.immediate]

 

El comportamiento es síncrono. Todas las solicitudes se ejecutan en el hilo del bucle de eventos. Este resultado no debe generalizarse; más bien, simplemente significa que, en este ejemplo concreto, ambos programadores funcionaron de forma síncrona.

2.9. Casos extremos

En este ejemplo, trabajaremos con programadores que admiten el funcionamiento asíncrono. En primer lugar, aumentamos el número de solicitudes a 100 utilizando el programador [Schedulers.computation], que aquí se ejecuta en 8 subprocesos. Obtenemos el siguiente resultado:

 
  • en [1], el botón [Cancel] está presente y se puede utilizar (operación asíncrona);

Ahora, dejemos que la ejecución se complete:

 

Vemos en [2] que la ejecución de las 100 solicitudes tardó unos 4 segundos (en 8 subprocesos).

Ahora, ejecutemos estas mismas 100 solicitudes utilizando el programador [Schedulers.newThread], que ejecuta cada solicitud en un subproceso independiente:

 

En [1], vemos que la ejecución de las 100 solicitudes (en 100 subprocesos) tardó medio segundo. Por lo tanto, esto es significativamente más rápido que con el programador [Schedulers.computation].

Ahora, realicemos 800 solicitudes en las mismas condiciones, utilizando de nuevo el programador [Schedulers.newThread]. Obtenemos los siguientes resultados:

 

Las 800 solicitudes se ejecutan en aproximadamente 1 segundo.

Cuando aumentamos este número (más allá de las 2500 solicitudes en mi máquina —ejecutadas en 1,5 segundos—; este número depende, por supuesto, en gran medida del entorno de ejecución), acabamos obteniendo la siguiente excepción:

  

Por lo tanto, tenemos un desbordamiento de pila. Las pruebas muestran que el comportamiento del programador [Schedulers.newThread] no es determinista. Es posible que se produzca la excepción anterior, que al ejecutar nuevas pruebas, al volver a la configuración que provocó la excepción, esta ya no se produzca.

2.10. Conclusión

Hemos mostrado un ejemplo de uso de la biblioteca Rx. Resumamos lo que hemos aprendido:

Empezamos con la siguiente arquitectura:

Image

  • en [4], la capa [swing] realizaba llamadas síncronas a la capa [service];
  • en [5], la capa [swing] realizaba llamadas asíncronas a la capa [rxService], que a su vez realizaba una llamada síncrona [6] a la capa [service];

Lo primero que observamos fue que la biblioteca Rx facilitaba la creación de la interfaz asíncrona [rxService] a partir de la interfaz síncrona [service] (véase la sección 2.4). Esta es una lección importante, ya que significa que podemos convertir fácilmente una aplicación síncrona en una asíncrona.

En la capa [swing], se escribieron dos métodos distintos:

  • uno para realizar llamadas síncronas al servicio (véase la sección 2.4);
  • otro para realizar llamadas asíncronas al mismo (véase la sección 2.7);

La escritura de llamadas asíncronas ha resultado ser significativamente más compleja que la de llamadas síncronas. No obstante, quienes hayan trabajado con programación concurrente que implique múltiples subprocesos que deban sincronizarse descubrirán que la solución Rx es más sencilla de escribir y evita todos los problemas difíciles de la sincronización y la comunicación entre subprocesos. En este artículo, hemos destacado los siguientes puntos clave:

  • el tipo [Observable] denota un flujo de eventos (valores) que pueden (aunque no necesariamente) ser asíncronos y que pueden observarse;
  • el tipo [Subscriber] denota un suscriptor de un tipo [Observable];
  • el tipo [Subscription] denota una suscripción, es decir, el vínculo entre un [Subscriber] y un [Observable];
  • El tipo [Observable] admite operadores [mergeWith, empty, subscribeOn, observeOn, ...] que, en su mayoría, devuelven observables. Estos operadores se utilizan para configurar el observable antes de que se ejecute:
    • qué observar;
    • el hilo en el que se ejecuta el observable;
    • el hilo en el que el suscriptor recibe datos del observable;
  • Hay dos tipos de observables: [cold] y [hot]. Un observable cold se ejecuta por completo para cada nuevo suscriptor. Si cada ejecución produce los mismos datos, cada nuevo suscriptor recibe los mismos datos que el anterior. Un observable hot suele producir datos de forma continua. Cuando un suscriptor se suscribe, recibe los datos emitidos desde el momento de su suscripción. No recibe ningún dato que se haya emitido anteriormente. En nuestro ejemplo, el observable es cold: se vuelve a ejecutar por completo para cada nuevo suscriptor.

Ahora que hemos visto un ejemplo que demuestra el valor de la biblioteca Rx, la exploraremos con más detalle.

La biblioteca Rx tiene muchos métodos con parámetros genéricos en sus firmas. Repasaremos brevemente estas firmas (sección 3). Los parámetros de estos métodos son en su mayoría interfaces funcionales (Java 8), es decir, interfaces con un único método. Por lo tanto, los parámetros reales deben ser instancias de estas interfaces. Antes de Java 8, era habitual implementar una interfaz utilizando una clase anónima. Con Java 8, si la interfaz es una interfaz funcional, resulta más conciso implementarla mediante una función lambda. Por lo tanto, las presentaremos (sección 4). Una vez hecho esto, presentaremos la clase [Stream] (sección 5), que permite procesar colecciones Java utilizando funciones lambda. Esta clase es interesante porque la clase [Observable] de RxJava toma prestados:

  • ciertos métodos;
  • la misma forma de encadenar métodos para procesar el mismo observable;

A continuación, presentaremos las interfaces funcionales específicas de la biblioteca RxJava (sección 6). Continuaremos con los elementos principales de la biblioteca Rx [Observable, Subscriber, Subscription, operadores] (sección 7). La clase [Observable] cuenta con docenas de operadores que, a su vez, están sobrecargados varias veces. Esto genera inicialmente una complejidad significativa, ya que estos operadores y sus sobrecargas a veces difieren solo en un único detalle, y resulta difícil, sin experiencia, saber qué operador utilizar. Presentaremos solo un número limitado de operadores y, en la mayoría de los casos, ignoraremos sus sobrecargas.

Toda la sección anterior se tratará utilizando la biblioteca RxJava en sencillas aplicaciones de consola. Una vez que hayamos dominado la biblioteca RxJava, la utilizaremos en dos tipos de aplicaciones gráficas:

  • en la sección 8, volveremos a examinar la aplicación Swing de ejemplo para explorarla con mayor detalle. A continuación, utilizaremos la biblioteca RxSwing;
  • en la sección 9, crearemos una aplicación para Android utilizando la biblioteca RxAndroid;

Una vez hecho todo esto, el lector dispondrá de las herramientas necesarias para valerse por sí mismo. Probablemente le llevará algún tiempo poder utilizar la biblioteca Rx de forma intuitiva. Esta biblioteca me ha parecido especialmente interesante. Sin embargo, me ha resultado compleja de entender y la curva de aprendizaje ha sido pronunciada. Espero que este documento acorte esa curva de aprendizaje para el lector. Me parece que el esfuerzo merece la pena.