Skip to content

2. Um exemplo de introdução

Os meus primeiros contactos com o RxJava ocorreram através de cursos e tutoriais encontrados na Internet. Além de a teoria utilizar conceitos aos quais eu não estava habituado e que me custava compreender, não conseguia perceber, sobretudo, para que servia isso na vida real. Vamos, portanto, começar por apresentar um exemplo (simples, espero) em que a utilização do RxJava traz uma verdadeira simplificação na escrita do código e, a partir daí, tentaremos identificar os elementos importantes desta biblioteca.

A biblioteca RxJava baseia-se no seguinte conceito: um fluxo de elementos do tipo T Observable<T> é observado por um ou mais subscritores (assinantes, observadores, consumidores) Subscriber<T>. A biblioteca RxJava permite que o fluxo Observable<T> seja executado num thread T1 e o seu observador Subscriber<T> num thread T2, sem que o programadortenha de se preocupar com a gestão do ciclo de vida dessas threads nem com problemas naturalmente complexos, tais como a partilha de dados entre threads e a sincronização das mesmas para executar uma tarefa global. Facilita, assim, a programação assíncrona.

Um fluxo Observable<T> produz elementos do tipo T, observáveis à medida que são produzidos. Se o observador e o observável (termo que designa o tipo Observable<T>, por conveniência) se encontrarem na mesma thread, então o observável só pode produzir o elemento (i+1) quando o observador tiver consumido o elemento i. São poucos os casos em que esta arquitetura se revela vantajosa. Se o observador e o observável não se encontrarem no mesmo thread, então o observável e o seu observador têm comportamentos autónomos: o observável produz ao seu próprio ritmo e o observador consome ao seu próprio ritmo. É aí que reside o interesse da biblioteca. Até agora, temos falado sempre de um único observador. Na realidade, um observável pode ter um número qualquer de observadores.

2.1. A arquitetura da aplicação de exemplo

A aplicação de exemplo tem a seguinte arquitetura:

Image

  • em [1], uma camada de serviço fornece listas de números aleatórios. Esta camada é executada no mesmo thread que o método [swing] que a utiliza. Assim, fornece os seus números de forma síncrona;
  • em [2], uma fina camada de adaptação implementada com RxJava permite apresentar à camada [swing] uma implementação assíncrona do mesmo serviço: este pode ser executado num thread diferente do do método [swing] que o utiliza;
  • a chamada [4] é síncrona, enquanto a chamada [5-6] é assíncrona;

O que pretendemos demonstrar aqui é que a biblioteca Rx permite transformar facilmente uma interface síncrona numa interface assíncrona. Por que razão isto é útil? Os eventos de uma interface Swing são processados numa thread comummente designada por «event loop». Os eventos são colocados em fila e processados um após o outro. O evento Ei+1 só pode ser processado quando o evento anterior, Ei, tiver sido totalmente processado. Por isso, é importante que o processamento de um evento seja o mais rápido possível, para que a interface gráfica se mantenha responsiva. Por vezes, o processamento de um evento pode demorar bastante tempo. É o que acontece quando esse processamento envolve acessos à rede. Se não se quiser que a interface gráfica fique bloqueada de uma forma inaceitável para o utilizador, é necessário que esses acessos à rede sejam efetuados em threads separadas do evento loop, de modo a libertar este último. Entramos então no domínio da programação concorrente (várias threads executam-se em paralelo), considerada, com razão, difícil. A biblioteca Rx oferece uma solução simples e elegante para este problema.

Para simular processamentos demorados, o serviço do exemplo fornece os seus números aleatórios após um certo tempo de espera, para que se possa observar o comportamento da interface gráfica.

2.2. L'exécutable

O executável da aplicação de exemplo encontra-se na pasta [dvp/executables] dos exemplos:

Existem várias formas de executar o ficheiro [swing-01], dependendo da configuração do computador utilizado para o executar. Por exemplo, pode seguir-se o processo [1-3]. Obtém-se então a seguinte interface gráfica:

 
  • A interface apresenta duas guias [1-2]: uma, [Request], para a solicitação ao serviço de geração de números aleatórios, e a outra, [Response], para a visualização dos números recebidos;
  • em [3], indica-se o número de pedidos que se pretende fazer ao serviço;
  • em [4], indica-se o intervalo [a,b] de geração dos números pretendidos;
  • em [5], o número de valores devolvidos pelo serviço será um número aleatório no intervalo [minCount, maxCount] definido pelo utilizador;
  • em [6], antes de devolver a sua resposta, o serviço aguardará delay milissegundos, sendo que delay é um número aleatório no intervalo [minDelay, maxDelay] definido pelo utilizador;
  • por predefinição, a camada [swing] dirigirá à interface síncrona do serviço. Para aceder à camada assíncrona, o utilizador deverá selecionar [7]. Neste caso, o serviço de geração será executado em threads separadas do evento loop da interface gráfica. A biblioteca Rx dispõe de várias estratégias para a geração destas threads. O utilizador poderá escolher a sua estratégia em [8];
  • a geração de números é feita através do botão [9];
 
  • em [10], exibição dos resultados. Vamos explicar a estrutura destes;
  • em [11], o número de resultados obtidos;
  • em [12], o tempo de execução em milissegundos;
  • No [13], o utilizador tem a possibilidade de cancelar a execução;

Cada resultado tem o seguinte formato:

{"idClient":0,"serviceResponse":{"delay":412,"aleas":[146,115,128,174,159,112,162,127],"executedOn":"RxComputationThreadPool-6"},"observedOn":"AWT-EventQueue-0","requestAt":"02:42:47:708","responseAt":"02:42:52:931"}
  • [idClient]: o número da solicitação. Recorde-se que são efetuadas várias solicitações ao serviço de geração;
  • [delay]: o tempo de espera, em milissegundos, que o serviço registou antes de enviar o seu resultado;
  • [aleas]: os números aleatórios devolvidos pelo serviço;
  • [executedOn]: o nome do thread no qual o serviço foi executado;
  • [observedOn]: o nome do thread que apresentou o resultado. Com uma interface Swing, este só pode ser o thread do event loop, neste caso [AWT-EventQueue-0];
  • [requestAt]: a hora do pedido na forma [heures:minutes:secondes:millisecondes];
  • [responseAt]: a hora de receção dos resultados no mesmo formato;

Vamos agora apresentar as partes do código úteis para a compreensão do exemplo.

2.3. A interface síncrona

Image

A camada de serviço [1] apresenta a seguinte interface:


public interface IService {
  // números aleatórios em [a,b]
  // são gerados n números aleatórios no intervalo [minCount, maxCount]
  // os números são gerados após um atraso de delay milissegundos,
  // onde [delay] é um número aleatório no intervalo [minDelay, maxDelay]
  public ServiceResponse getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay);
}

A resposta [ServiceResponse] é a seguinte:


public class ServiceResponse {

  // tempo de espera do serviço
  private int delay;
  // números aleatórios
  private List<Integer> aleas;
  // thread de execução
  private String executedOn;

  // construtores

  public ServiceResponse(int delay, List<Integer> aleas) {
    executedOn = Thread.currentThread().getName();
    this.delay = delay;
    this.aleas = aleas;
  }

  // getters e setters
...
}

A resposta tem três elementos:

  • linha 6: os números aleatórios gerados;
  • linha 4: o tempo de espera observado pelo serviço antes de apresentar o seu resultado;
  • linha 8: o thread de execução do serviço;

2.4. A chamada síncrona

Image

Passamos agora a detalhar a chamada síncrona [4] que a camada [swing] efetua ao serviço [1]:


  private void doGenerateWithService() {
    // início da espera
    beginWaiting();
    try {
      for (int i = 0; i < nbRequests; i++) {
        UiResponse uiResponse = new UiResponse();
        uiResponse.setIdClient(i);
        uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
        uiResponse.setResponseAt();
        model.add(0, jsonMapper.writeValueAsString(uiResponse));
        jLabelNbReponses.setText(String.valueOf(Integer.parseInt(jLabelNbReponses.getText()) + 1));
      }
    } catch (JsonProcessingException | RuntimeException e) {
      System.out.println(e);
    }
    // fim da espera
    endWaiting();
}
  • linhas 5-12: o ciclo de execução das requisições [nbRequests] solicitadas pelo utilizador;
  • linha 8: [service] é a implementação da interface síncrona [IService] apresentada no parágrafo 2.3;
  • linha 10: [model] é o modelo apresentado pelo componente JList do separador [Response]. Os elementos deste modelo são as cadeias jSON dos seguintes elementos do tipo [UiResponse]:

public class UiResponse {

  // ID do cliente
  private int idClient;
  // resposta do serviço
  private ServiceResponse serviceResponse;
  // nome do thread de observação
  private String observedOn;
  // hora da solicitação
  private String requestAt;
  // hora da resposta
  private String responseAt;

  // construtores

  public UiResponse() {
    observedOn = Thread.currentThread().getName();
    requestAt = getTimeStamp();
  }
  // métodos privados

  private String getTimeStamp() {
    return new SimpleDateFormat("hh:mm:ss:SSS").format(Calendar.getInstance().getTime());
  }

  // getters e setters
...
}
  • linha 6: a resposta do serviço de geração de números;
  • linha 4: o número da solicitação à qual está a ser respondido;
  • linha 8: o thread de exibição desta resposta. Como já foi referido, será sempre o thread do event loop;
  • linhas 10 e 12: a hora da solicitação e a hora da resposta;

2.5. Testes de chamadas síncronas

Executamos a seguinte configuração:

 

Obtenemos os seguintes resultados no separador [Response]:

 
  • em [1-2], obtivemos efetivamente 10 respostas, tal como solicitado. Estas foram inseridas na primeira posição, pela ordem em que chegaram. Verifica-se que foram obtidas na ordem das solicitações;
  • todas foram executadas e apresentadas no thread do event loop [AWT-EventQueue-0]. As consultas foram, portanto, executadas uma a seguir à outra neste thread. Não houve consultas simultâneas;
  • o que não é visível aqui é que, durante a execução, a interface gráfica fica congelada. Não há, por exemplo, forma de aceder ao separador [Response] para ver as respostas a chegar ou interromper a execução com o botão [Annuler]. Mesmo que esse botão estivesse presente no separador [Request], teria ficado inutilizável. Com efeito, haveria então dois eventos:
    • o clique no botão [Générer];
    • o clique no botão [Annuler];

O clique no botão [Annuler] só é processado após o término da operação desencadeada pelo clique no botão [Générer]. Acabámos de ver que esta ocupava o thread do event loop durante todo o tempo de execução, impedindo assim o processamento do clique no botão [Annuler]. Este é tipicamente o tipo de situação em que o Rx pode trazer uma melhoria significativa;

2.6. A interface assíncrona e a sua implementação

Vamos agora debruçar-nos sobre a interface da camada [2], bem como sobre a sua implementação com o Rx. Esta não será imediatamente compreensível. Pretendemos simplesmente destacar a simplicidade do código desta implementação.

A interface assíncrona é a seguinte:


public interface IRxService {
  // números aleatórios em [a,b]
  // são gerados n números aleatórios no intervalo [minCount, maxCount]
  // os números são gerados após um atraso de delay milissegundos,
  // onde [delay] é um número aleatório no intervalo [minDelay, maxDelay]
  public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse);
}

As diferenças em relação à interface síncrona apresentada no parágrafo 2.3 são as seguintes:

  • a classe [UiResponse] apresentada no parágrafo 2.3 faz agora parte dos parâmetros do método [getAleas] (linha 6). A razão para tal é que, uma vez que as solicitações são agora executadas em paralelo e o serviço aguarda um tempo aleatório antes de devolver o seu resultado, as respostas não nos serão devolvidas na ordem das solicitações. Por isso, passamos o objeto [UiResponse], que contém, entre outras informações, o número da solicitação:

  // ID do cliente (pedido)
  private int idClient;
  // resposta do serviço
  private ServiceResponse serviceResponse;
  // nome do thread de observação
  private String observedOn;
  // hora da solicitação
  private String requestAt;
  // hora da resposta
  private String responseAt;
  • O tipo da resposta do serviço assíncrono é o tipo [Observable<UiResponse>]. O tipo [Observable<>] é fornecido pela biblioteca Rx. O resultado do tipo [Observable<UiResponse>] indica que o método [getAleas] fornece um fluxo de valores do tipo [UiResponse], valores esses que são enviados (pushed) um a um para o seu observador;

Vejamos agora a implementação desta interface:


public class RxService implements IRxService {

  // serviço
  private IService service;

  // fabricante
  public RxService(IService service) {
    this.service = service;
  }

  @Override
  public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse) {
    return Observable.create(subscriber -> {
      try {
        uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
        subscriber.onNext(uiResponse);
      } catch (Exception e) {
        subscriber.onError(e);
      } finally {
        subscriber.onCompleted();
      }
    });
  }
}
  • linhas 7-9: é fornecida ao construtor uma referência à interface síncrona [IService]. É esta que se encarregará de gerar os números aleatórios;
  • o observável devolvido pelo método [getAleas] é construído pelo método estático [Observable.create]. É este método que permite construir uma implementação assíncrona a partir de uma implementação síncrona;
  • linha 13: o parâmetro do método estático [Observable.create] é, neste caso, uma função lambda que recebe como parâmetro um tipo [Subscriber], que, mais uma vez, é um tipo Rx. Um [Subscriber] é um objeto que subscreve um fluxo de observáveis, ou seja, um fluxo de dados fornecidos de forma assíncrona. Aqui, utilizam-se três métodos deste subscritor:
    • [Subscriber.onNext] para lhe transmitir um dado (linha 16);
    • [Subscriber.onError] para lhe transmitir uma exceção (linha 18);
    • [Subscriber.onCompleted] para indicar ao subscritor que o fluxo de dados terminou (linha 20);

Pode haver vários assinantes para um mesmo observável. Aqui, teremos apenas um assinante que subscreve um fluxo de um único dado, o produzido nas linhas 15-16. O dado é produzido pela implementação síncrona do serviço (linha 15) e entregue ao assinante (linha 16).

Embora tudo isto possa parecer um pouco obscuro, não podemos deixar de ficar impressionados com a extrema concisão desta implementação assíncrona do serviço.

2.7. A chamada assíncrona

Image

Vamos agora detalhar a chamada síncrona [5] que a camada [swing] efetua ao serviço [2]:


private void doGenerateWithRxService() {
        // início da espera
        beginWaiting();
        // solicitação de números aleatórios
        Observable<UiResponse> observables = Observable.empty();
        for (int i = 0; i < nbRequests; i++) {
            UiResponse uiResponse = new UiResponse();
            uiResponse.setIdClient(i);
            // agendador
            int schedulerIndex = jComboBoxSchedulers.getSelectedIndex();
            switch (schedulerIndex) {
            case 0:
                observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.io()));
                break;
...
            }
        }
...
    }
  • linhas 6-10: execução das consultas [nbRequests] solicitadas pelo utilizador;
  • linhas 7-8: preparação do objeto [UiResponse] necessário ao método [getAleas] do serviço assíncrono (linha 13). Trata-se principalmente de registar o n.º [idClient] da solicitação;
  • linha 13: o método [getAleas] do serviço assíncrono é chamado. Este devolve um objeto [Observable<UiResponse>]. Esta chamada ainda não invoca o serviço síncrono. Voltemos ao código do [getAleas] assíncrono:

  @Override
  public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse) {
    return Observable.create(subscriber -> {
      try {
        uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
        subscriber.onNext(uiResponse);
      } catch (Exception e) {
        subscriber.onError(e);
      } finally {
        subscriber.onCompleted();
      }
    });
}

O código das linhas 4 a 11, que irá chamar o serviço síncrono, só é executado quando um subscritor se regista. Enquanto não houver subscritores, este código não é executado.

Voltemos ao código do método [doGenerateWithRxService]:

  • linha 5: cria-se um observável vazio (nada é observado);
  • linha 13: cria-se um observável cujo fluxo será a fusão dos fluxos assíncronos [nbRequests] associados às requisições [nbRequests]. Isto é obtido com o método [Observable.mergeWith], que permite fundir dois fluxos assíncronos. Na terminologia Rx, [mergeWith] é designado por operador de fluxo. Estes operadores têm a particularidade de que o resultado da operação é, na maioria das vezes, novamente um [Observable]. No final, após a linha 17, a variável [observables] designa um único fluxo constituído pelas respostas assíncronas [nbRequests] fornecidas pelo serviço assíncrono;
  • linha 13: a operação de fusão poderia ter sido escrita da seguinte forma:

observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse));

mas escrevemos:


observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.io()));

Utilizámos aqui o operador [subscribeOn] sobre o observável [rxService.getAleas]. Como é frequente, o resultado é, mais uma vez, um observável. O operador [subscribeOn] permite especificar que o observável deve ser executado num thread fornecido por um [Scheduler]. Existem vários [Scheduler] possíveis, adaptados a diferentes situações. Na interface gráfica, propusemos vários para ver os efeitos de cada um:

  

Isto resulta no seguinte código:


    private void doGenerateWithRxService() {
        // início da espera
        beginWaiting();
        // solicitação de números aleatórios
        Observable<UiResponse> observables = Observable.empty();
        for (int i = 0; i < nbRequests; i++) {
            UiResponse uiResponse = new UiResponse();
            uiResponse.setIdClient(i);
            // agendador
            int schedulerIndex = jComboBoxSchedulers.getSelectedIndex();
            switch (schedulerIndex) {
            case 0:
                observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.io()));
                break;
            case 1:
                observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.computation()));
                break;
            case 2:
                observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.newThread()));
                break;
            case 3:
                observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.trampoline()));
                break;
            case 4:
                observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.immediate()));
                break;
            }
        }
...
}

Voltemos ao código das linhas 12-14. O agendador [Schedulers.io()] atribui um novo thread a cada observável. Se seguirmos o código:

  • linha 5: temos um observável vazio;
  • linha 13, iteração 1: observables é a lista [observable0/thread0] (observável observable0 executado na thread thread0);
  • linha 13, iteração 2: observables é a lista [observable0/thread0, observable1/thread1];
  • etc...

No final, após a linha 28, temos um observável resultante da fusão dos observáveis [nbRequests] que se executam em threads diferentes [nbRequests]. Nem todos os agendadores funcionam assim, como veremos durante os testes.

Continuemos a análise do código de chamada do serviço assíncrono:


private void doGenerateWithRxService() {
        // início da espera
        beginWaiting();
        // solicitação de números aleatórios
        Observable<UiResponse> observables = Observable.empty();
        for (int i = 0; i < nbRequests; i++) {
        ...
        }
        // observador
        observables = observables.observeOn(SwingScheduler.getInstance());
        // executam-se estes observáveis
        subscriptions.add(observables.subscribe(uiResponse -> {
            updateUi(uiResponse);
        } , th -> {
            System.out.println(th);
            doCancel();
        } , this::doCancel));
    }
  • vimos que, ao chegarmos à linha 10, temos um único observável, resultado da fusão de [nbRequests] observáveis que podem ou não ser executados em [nbRequests] threads diferentes, dependendo do agendador escolhido pelo utilizador;
  • linha 10: o operador [observeOn] permite especificar em que thread se pretende recuperar os dados provenientes do observável, neste caso os objetos de tipo [UiResponse]. Numa interface Swing, não há escolha. Qualquer atualização da interface deve ser feita no thread do event loop. Aqui, os dados do observável serão apresentados num componente Swing JList. A thread [SwingScheduler.getInstance()] representa a thread do ciclo de eventos. A classe [SwingScheduler] não provém da biblioteca RxJava, mas sim da biblioteca derivada RxSwing;
  • quando se chega à linha 12, o serviço síncrono ainda não foi chamado, pois o observável da linha 10 ainda não tem nenhum subscritor. As linhas 12-17 atribuem-lhe um, graças ao operador [subscribe]. Os parâmetros deste operador são, neste caso, três funções lambda:
    • a primeira, [uiResponse -> {updateUi(uiResponse);}], aceita como parâmetro um dos objetos [UiResponse] produzidos pelo observável. Recorde-se que, neste caso, teremos [nbRequests] objetos deste tipo. O método associado, updateUi neste caso, deve explorar este resultado;
    • A segunda [th -> {System.out.println(th);doCancel();}] aceita como parâmetro um tipo [Throwable], neste caso uma exceção que ocorreu durante a execução do observável. O método associado deve utilizar esta informação. Aqui, exibimo-la na consola (linha 15) e cancelamos a execução, o que terá como efeito a atualização de alguns elementos da interface gráfica;
    • o terceiro [this::doCancel] é chamado quando o observável sinaliza que já não tem dados para transmitir. Aqui, o observável é a união dos observáveis [nbRequests]. O observável resultante indicará que terminou quando todos os observáveis que o compõem tiverem, por sua vez, sinalizado que concluíram o seu trabalho. Assim, quando esta terceira função lambda é executada, já foram recebidos todos os dados. O método local [doCancel] atualiza a interface gráfica para refletir o facto de a execução ter terminado;

A variável [subscriptions] é definida da seguinte forma:


    // as subscrições dos observáveis
protected List<Subscription> subscriptions = new ArrayList<Subscription>();

O tipo [Subscription] representa uma subscrição, ou seja, a ligação entre um subscritor [Subscriber] e aquilo que este observa [Observable]. Utilizámos aqui uma lista de subscrições, embora, neste exemplo, exista apenas uma. O método local [doCancel], executado quando o observável sinaliza que já não tem dados para transmitir, é o seguinte:


    @Override
    protected void doCancel() {
        // fim da espera
        endWaiting();
        // no caso de subscrições
        if (jCheckBoxRxSwing.isSelected() && subscriptions != null) {
            subscriptions.forEach(Subscription::unsubscribe);
        }
}
  • a linha 7 cancela a subscrição de todos os subscritores do observável;

Desta explicação resumida, podemos reter os seguintes pontos-chave:

  • o tipo [Observable] designa um fluxo de valores, valores esses que são enviados um a um para subscritores ou observadores;
  • o tipo [Subscriber] designa um subscritor do tipo [Observable];
  • o tipo [Subscription] designa uma subscrição, ou seja, a ligação entre um [Subscriber] e um [Observable];
  • o tipo [Observable] admite operadores [mergeWith, empty, subscribeOn, observeOn, ...] que, na sua maioria, produzem observáveis. Estes operadores servem para configurar o observável antes da sua execução:
    • o que se pretende observar;
    • o thread no qual o observável é executado;
    • o thread no qual o subscritor recebe os dados do observável;
  • distinguem-se dois tipos de observáveis: os [froid / cold] e os [chaud / hot]. Um observável «frio» é executado na íntegra sempre que surge um novo subscritor. Se cada execução produzir os mesmos dados, cada novo subscritor recebe os mesmos dados que o anterior. Um observável «quente» produz geralmente dados de forma contínua. Quando um subscritor se subscreve, recebe os dados emitidos a partir da hora da sua subscrição. Não recebe os dados que possam ter sido emitidos anteriormente. No nosso exemplo, o observável é «frio»: é totalmente reexecutado sempre que surge um novo subscritor. O que é que é realmente executado no nosso exemplo? Para o saber, é necessário voltar à definição do observável em questão:

  @Override
  public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse) {
    return Observable.create(subscriber -> {
      try {
        uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
        subscriber.onNext(uiResponse);
      } catch (Exception e) {
        subscriber.onError(e);
      } finally {
        subscriber.onCompleted();
      }
    });
}

A cada novo subscritor, a função lambda, parâmetro do método [Observable.create] (linha 3), é reexecutada. São, portanto, as linhas 4 a 11 que são executadas para cada novo subscritor [subscriber];

2.8. Testes de chamadas assíncronas

Começamos por mostrar o efeito dos diferentes agendadores propostos. Para tal, utilizamos os seguintes parâmetros:

 

Atribuímos valores pequenos a [1-2] para que, caso as consultas sejam executadas na mesma thread, a espera não seja demasiado longa.

2.8.1. com o agendador [Schedulers.io]

 

É possível observar os seguintes pontos:

  • as respostas são recebidas numa ordem diferente da das consultas (ver idClient);
  • cada pedido foi executado num thread diferente;
  • a interface gráfica já não está bloqueada desta vez:
    • é possível alternar entre os separadores;
    • vê-se os dados a chegar;
    • não dá tempo de ver o botão [Annuler] porque a execução é demasiado rápida. Iremos destacá-lo noutro teste;

2.8.2. com o agendador [Schedulers.computation]

 

Podemos observar os seguintes pontos:

  • obtêm-se as respostas numa ordem diferente da das consultas (ver idClient);
  • as consultas foram executadas em 8 threads;
  • o thread n.º 3 foi utilizado para as consultas 8 e 0;
  • o thread n.º 4 foi utilizado para as consultas 9 e 1;
  • as restantes consultas utilizaram, cada uma, um thread diferente;

O agendador [Schedulers.computation] utiliza tantos threads quantos os núcleos existentes na máquina utilizada. Esta informação é obtida através da expressão [Runtime.getRuntime().availableProcessors()].

2.8.3. com o agendador [Schedulers.newThread]

 

Temos um funcionamento análogo ao do agendador [Schedulers.io].

2.8.4. com os agendadores [Schedulers.trampoline, Schedulers.immediate]

 

Temos um funcionamento síncrono. Todas as consultas são executadas no thread do event loop. Não se deve generalizar este resultado, mas sim referir que, simplesmente neste exemplo específico, os dois agendadores funcionaram de forma síncrona.

2.9. Casos-limite

Neste exemplo, vamos trabalhar com os agendadores que permitem um funcionamento assíncrono. Em primeiro lugar, aumentamos o número de pedidos para 100 com o agendador [Schedulers.computation], que aqui funciona com 8 threads. Obtemos o seguinte resultado:

 
  • no [1], o botão [Annuler] está presente e pode ser utilizado (funcionamento assíncrono);

Agora, deixemos a execução chegar ao fim:

 

No [2], verifica-se que a execução das 100 consultas demorou cerca de 4 segundos (em 8 threads).

Agora, vamos efetuar essas mesmas 100 consultas com o agendador [Schedulers.newThread], que executa cada consulta num thread separado:

 

No [1], verifica-se que a execução das 100 consultas (em 100 threads) demorou meio segundo. É, portanto, significativamente mais rápido do que com o agendador [Schedulers.computation].

Agora, vamos efetuar 800 pedidos nas mesmas condições, continuando a utilizar o agendador [Schedulers.newThread]. Obtemos os seguintes resultados:

 

As 800 consultas são executadas em cerca de 1 segundo.

Quando aumentamos este número (para mais de 2500 pedidos na minha máquina — executados em 1,5 s — este número depende, naturalmente, muito do ambiente de trabalho no momento da execução), acabamos por obter a seguinte exceção:

  

Temos, portanto, um estouro de pilha. Os testes mostram que o funcionamento do agendador [Schedulers.newThread] não é determinístico. É possível obter a exceção anterior, realizar novos testes, voltar depois à configuração que provocou a exceção e já não a obter.

2.10. Conclusion

Mostrámos um exemplo de utilização da biblioteca Rx. Resumamos o que aprendemos:

Partimos da seguinte arquitetura:

Image

  • em [4], a camada [swing] fazia chamadas síncronas à camada [service];
  • na camada [5], a camada [swing] fazia chamadas assíncronas à camada [rxService], que, por sua vez, chamava de forma síncrona a camada [service] através da camada [6];

A primeira coisa que observámos foi que a biblioteca Rx permitia criar facilmente a interface assíncrona [rxService] a partir da interface síncrona [service] (ver parágrafo 2.4). Esta é uma lição importante, pois significa que é possível fazer evoluir facilmente uma aplicação síncrona para uma aplicação assíncrona.

Na camada [swing], foram escritos dois métodos distintos:

  • um para efetuar chamadas síncronas ao serviço (ver parágrafo 2.4);
  • o outro para efetuar chamadas assíncronas ao serviço (ver parágrafo 2.7);

A implementação das chamadas assíncronas revelou-se significativamente mais complexa do que a das chamadas síncronas. No entanto, quem já tenha trabalhado com programação concorrente envolvendo várias threads a sincronizar irá constatar que a solução Rx é mais simples de implementar e evita todos os problemas de sincronização e comunicação entre threads, que são questões difíceis. Durante esta implementação, destacámos os seguintes pontos importantes:

  • o tipo [Observable] designa um fluxo de eventos (valores) que podem ser (mas não necessariamente) assíncronos e que podem ser observados;
  • o tipo [Subscriber] designa um subscritor de um tipo [Observable];
  • o tipo [Subscription] designa uma subscrição, ou seja, a ligação entre um [Subscriber] e um [Observable];
  • o tipo [Observable] admite operadores [mergeWith, empty, subscribeOn, observeOn, ...] que, na sua maioria, produzem observáveis. Estes operadores servem para configurar o observável antes da sua execução:
    • o que se pretende observar;
    • o thread no qual o observável é executado;
    • o thread no qual o subscritor recebe os dados do observável;
  • distinguem-se dois tipos de observáveis: os [froid / cold] e os [chaud / hot]. Um observável «frio» é executado na íntegra sempre que surge um novo subscritor. Se cada execução produzir os mesmos dados, cada novo subscritor recebe os mesmos dados que o anterior. Um observável «quente» produz geralmente dados de forma contínua. Quando um subscritor se inscreve, recebe os dados emitidos a partir da hora da sua inscrição. Não recebe os dados que possam ter sido emitidos anteriormente. No nosso exemplo, o observável é «frio»: é totalmente reexecutado a cada novo subscritor.

Agora que vimos um exemplo que nos mostrou a utilidade da biblioteca Rx, vamos apresentá-la com mais pormenor.

A biblioteca Rx possui numerosos métodos com parâmetros genéricos na sua assinatura. Faremos uma breve revisão destas assinaturas (parágrafo 3). Os parâmetros destes métodos são, na maioria das vezes, interfaces funcionais (Java 8), ou seja, interfaces que possuem apenas um único método. Os parâmetros efetivos devem, portanto, ser instâncias dessas interfaces. Antes do Java 8, era comum implementar uma interface através de uma classe anónima. Com o Java 8, e se a interface for uma interface funcional, é mais conciso implementá-la com uma função lambda. Apresentaremos, portanto, estas últimas (parágrafo 4). Quando isto estiver feito, apresentaremos a classe [Stream] (parágrafo 5), que permite processar coleções Java com funções lambda. Esta classe é interessante porque a classe [Observable], da RxJava, utiliza:

  • certos métodos;
  • a mesma forma de encadear métodos entre si para processar um mesmo observável;

Apresentaremos, em seguida, as interfaces funcionais específicas da biblioteca RxJava (parágrafo 6). Continuaremos com os principais elementos da biblioteca Rx [Observable, Subscriber, Subscription, opérateurs] (parágrafo 7). A classe [Observable] possui várias dezenas de operadores que, por sua vez, são sobrecarregados várias vezes. Isto cria, inicialmente, uma grande complexidade, pois estes operadores e as suas sobrecargas diferem, por vezes, apenas num pormenor e é difícil, sem experiência, saber qual o operador a utilizar. Apresentaremos apenas um número limitado de operadores e, na maioria das vezes, ignoraremos as suas sobrecargas.

Toda a parte anterior será realizada com a biblioteca RxJava em aplicações de consola simples. Assim que a biblioteca RxJava for adquirida, iremos utilizá-la em dois tipos de aplicações gráficas:

  • no parágrafo 8, voltaremos à aplicação Swing de exemplo para a detalhar mais. Utilizaremos então a biblioteca RxSwing;
  • no parágrafo 9, criaremos uma aplicação Android com a biblioteca RxAndroid;

Quando tudo isto estiver concluído, o leitor terá as ferramentas necessárias para seguir o seu próprio caminho. Provavelmente, levará algum tempo até que consiga utilizar a biblioteca Rx de forma intuitiva. Achei esta biblioteca particularmente interessante. No entanto, achei-a complexa de compreender e o tempo de aprendizagem foi longo. Espero que este documento ajude a encurtar esse tempo para o leitor. Parece-me que vale bem a pena.