2. Um exemplo introdutório
O meu primeiro contacto com o RxJava foi através de cursos e tutoriais que encontrei online. Além do facto de a teoria utilizar conceitos com os quais não estava familiarizado e que me custava compreender, não conseguia perceber como é que isso poderia ser útil na vida real. Começaremos, portanto, por apresentar um exemplo (espero que seja simples) em que a utilização do RxJava leva a uma verdadeira simplificação do código e, a partir daí, tentaremos identificar os elementos importantes desta biblioteca.
A biblioteca RxJava baseia-se no seguinte conceito: um fluxo de elementos do tipo T Observable<T> é observado por um ou mais assinantes (assinantes, observadores, consumidores) Subscriber<T>. A biblioteca RxJava permite que o fluxo Observable<T> seja executado numa thread T1 e o seu observador Subscriber<T> numa thread T2, sem que o programador tenha de se preocupar com a gestão do ciclo de vida dessas threads ou com questões naturalmente complexas, como a partilha de dados entre threads e a sua sincronização para executar uma tarefa global. Facilita, portanto, a programação assíncrona.
Um fluxo Observable<T> produz elementos do tipo T, que podem ser observados à medida que são produzidos. Se o observador e o observável (um termo usado de forma genérica para se referir ao tipo Observable<T>) estiverem na mesma thread, então o observável só pode produzir o elemento (i+1) depois de o observador ter consumido o elemento i. Existem poucos casos em que esta arquitetura é útil. Se o observador e o observável não estiverem na mesma thread, então o observável e o seu observador comportam-se de forma autónoma: o observável emite ao seu próprio ritmo e o observador consome ao seu próprio ritmo. É aqui que reside o valor da biblioteca. Até agora, discutimos apenas um único observador. Na realidade, um observável pode ter qualquer número de observadores.
2.1. A arquitetura da aplicação de exemplo
A aplicação de exemplo tem a seguinte arquitetura:

- em [1], uma camada de serviço gera listas de números aleatórios. Esta camada é executada na mesma thread que o método [swing] que a utiliza. Em seguida, gera os seus números de forma síncrona;
- em [2], uma camada de adaptação fina implementada com RxJava permite que uma implementação assíncrona do mesmo serviço seja apresentada à camada [swing]: este serviço pode ser executado numa thread diferente do método [swing] que o utiliza;
- A chamada [4] é síncrona, enquanto as chamadas [5-6] são assíncronas;
O que queremos demonstrar aqui é que a biblioteca Rx facilita a transformação de uma interface síncrona numa assíncrona. Por que é que isto é útil? Os eventos numa interface Swing são processados numa thread comumente referida como o ciclo de eventos. Os eventos são enfileirados e processados um após o outro. O evento Ei+1 só pode ser processado depois de o evento anterior Ei ter sido totalmente processado. Por isso, é importante que o tratamento de eventos seja o mais breve possível, para que a GUI permaneça responsiva. Por vezes, o tratamento de um evento pode demorar muito tempo. É o que acontece se o tratamento envolver acesso à rede. Se não quisermos congelar a GUI de uma forma inaceitável para o utilizador, estas operações de rede devem ser realizadas em threads separadas do ciclo de eventos, para o libertar. Isto leva-nos ao domínio da programação concorrente (onde múltiplas threads são executadas em paralelo), que é, com razão, considerada difícil. A biblioteca Rx fornece uma solução simples e elegante para este problema.
Para simular processos de longa duração, o serviço no exemplo fornece os seus números aleatórios após um certo atraso, para que possamos observar o comportamento da interface gráfica do utilizador.
2.2. O executável
O executável da aplicação de exemplo encontra-se na pasta [dvp/executables] dos exemplos:
![]() | ![]() |
Existem várias formas de executar o arquivo [swing-01], dependendo da configuração da máquina utilizada para o executar. Por exemplo, pode seguir o processo [1-3]. Isto irá apresentar a seguinte interface gráfica de utilizador:
![]() |
- A interface possui dois separadores [1-2]: um [Request] para enviar um pedido ao serviço de geração de números aleatórios e outro [Response] para exibir os números recebidos;
- Em [3], especifica quantas solicitações pretende enviar ao serviço;
- Em [4], especifica-se o intervalo de geração de números desejado [a,b];
- Em [5], o número de valores devolvidos pelo serviço será um número aleatório dentro do intervalo [minCount, maxCount] definido pelo utilizador;
- em [6], antes de devolver a sua resposta, o serviço irá aguardar delay milissegundos, sendo que delay é um número aleatório dentro do intervalo definido pelo utilizador [minDelay, maxDelay];
- Por predefinição, a camada [swing] utilizará a interface síncrona do serviço. Para utilizar a camada assíncrona, o utilizador deverá marcar [7]. Neste caso, o serviço de geração será executado em threads separadas do ciclo de eventos da GUI. A biblioteca Rx oferece várias estratégias para gerar estas threads. O utilizador pode selecionar a sua estratégia em [8];
- a geração de números é realizada utilizando o botão [9];
![]() |
- [10] apresenta os resultados. Iremos explicar a sua estrutura;
- em [11], o número de resultados obtidos;
- em [12], o tempo de execução em milissegundos;
- em [13], o utilizador tem a opção de cancelar a execução;
Cada resultado tem o seguinte formato:
{"idClient":0,"serviceResponse":{"delay":412,"aleas":[146,115,128,174,159,112,162,127],"executedOn":"RxComputationThreadPool-6"},"observedOn":"AWT-EventQueue-0","requestAt":"02:42:47:708","responseAt":"02:42:52:931"}
- [idClient]: o ID da solicitação. Note-se que várias solicitações são enviadas ao serviço de geração;
- [delay]: o tempo de espera em milissegundos que o serviço observou antes de enviar o seu resultado;
- [aleas]: os números aleatórios devolvidos pelo serviço;
- [executedOn]: o nome da thread na qual o serviço foi executado;
- [observedOn]: o nome da thread que exibiu o resultado. Com uma interface Swing, esta só pode ser a thread do ciclo de eventos, aqui [AWT-EventQueue-0];
- [requestAt]: a hora da solicitação no formato [horas:minutos:segundos:milissegundos];
- [responseAt]: a hora em que os resultados foram recebidos no mesmo formato;
Apresentaremos agora os trechos de código necessários para compreender o exemplo.
2.3. A interface síncrona

A camada de serviço [1] apresenta a seguinte interface:
public interface IService {
// random numbers in [a,b]
// n numbers are generated with random n in the interval [minCount, maxCount]
// numbers are generated after a delay of milliseconds,
// where [delay] is a random number in the range [minDelay, maxDelay]
public ServiceResponse getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay);
}
O [ServiceResponse] é o seguinte:
public class ServiceResponse {
// service waiting time
private int delay;
// random numbers
private List<Integer> aleas;
// execution thread
private String executedOn;
// manufacturers
public ServiceResponse(int delay, List<Integer> aleas) {
executedOn = Thread.currentThread().getName();
this.delay = delay;
this.aleas = aleas;
}
// getters and setters
...
}
A resposta tem três partes:
- linha 6: os números aleatórios gerados;
- linha 4: o tempo de espera observado pelo serviço antes de devolver o seu resultado;
- linha 8: o segmento de execução do serviço;
2.4. A chamada síncrona

Vamos agora detalhar a chamada síncrona [4] efetuada pela camada [swing] ao serviço [1]:
private void doGenerateWithService() {
// start waiting
beginWaiting();
try {
for (int i = 0; i < nbRequests; i++) {
UiResponse uiResponse = new UiResponse();
uiResponse.setIdClient(i);
uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
uiResponse.setResponseAt();
model.add(0, jsonMapper.writeValueAsString(uiResponse));
jLabelNbReponses.setText(String.valueOf(Integer.parseInt(jLabelNbReponses.getText()) + 1));
}
} catch (JsonProcessingException | RuntimeException e) {
System.out.println(e);
}
// end waiting
endWaiting();
}
- linhas 5–12: o ciclo que processa as [nbRequests] solicitações feitas pelo utilizador;
- linha 8: [service] é a implementação da interface síncrona [IService] apresentada na Secção 2.3;
- linha 10: [model] é o modelo exibido pelo componente JList do separador [Response]. Os elementos deste modelo são cadeias JSON de elementos do tipo [UiResponse], como se segue:
public class UiResponse {
// customer id
private int idClient;
// service response
private ServiceResponse serviceResponse;
// observation thread name
private String observedOn;
// query time
private String requestAt;
// response time
private String responseAt;
// manufacturers
public UiResponse() {
observedOn = Thread.currentThread().getName();
requestAt = getTimeStamp();
}
// private methods
private String getTimeStamp() {
return new SimpleDateFormat("hh:mm:ss:SSS").format(Calendar.getInstance().getTime());
}
// getters and setters
...
}
- linha 6: a resposta do serviço de geração de números;
- linha 4: o número da solicitação que está a ser respondida;
- linha 8: o segmento de execução que exibe esta resposta. Conforme mencionado, este será sempre o segmento de execução do ciclo de eventos;
- linhas 10 e 12: a hora do pedido e a hora da resposta;
2.5. Testar chamadas síncronas
Executamos a seguinte configuração:
![]() |
Obtemos os seguintes resultados no separador [Resposta]:
![]() |
- Em [1-2], recebemos efetivamente 10 respostas, tal como solicitado. Estas foram inseridas na primeira posição, pela ordem em que chegaram. Podemos ver que foram recebidas na ordem dos pedidos;
- Todas foram executadas e exibidas na thread do loop de eventos [AWT-EventQueue-0]. As solicitações foram, portanto, executadas uma após a outra nesta thread. Não houve solicitações simultâneas;
- o que não é visível aqui é que, durante a execução, a GUI fica congelada. Por exemplo, não há forma de aceder ao separador [Resposta] para visualizar as respostas recebidas ou para interromper a execução utilizando o botão [Cancelar]. Mesmo que este botão estivesse presente no separador [Pedido], teria sido inutilizável. Na verdade, haveria então dois eventos:
- clicar no botão [Gerar];
- clicar no botão [Cancelar];
O clique no botão [Cancel] só é tratado após a operação desencadeada pelo clique no botão [Generate] ter terminado. Acabámos de ver que esta operação ocupou o thread do loop de eventos durante toda a duração da execução, impedindo assim o tratamento do clique no botão [Cancel]. Este é tipicamente o tipo de situação em que o Rx pode proporcionar uma melhoria significativa;
2.6. A interface assíncrona e a sua implementação
Vamos agora analisar a interface da camada [2] e a sua implementação com o Rx. Isto não será imediatamente claro. Pretendemos simplesmente destacar a simplicidade do código nesta implementação.

A interface assíncrona é a seguinte:
public interface IRxService {
// random numbers in [a,b]
// n numbers are generated with random n in the interval [minCount, maxCount]
// numbers are generated after a delay of milliseconds,
// where [delay] is a random number in the range [minDelay, maxDelay]
public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse);
}
As diferenças em relação à interface síncrona apresentada na Secção 2.3 são as seguintes:
- a classe [UiResponse] apresentada na Secção 2.3 faz agora parte dos parâmetros do método [getAleas] (linha 6). A razão para isto é que, como os pedidos agora são executados em paralelo e o serviço espera um período de tempo aleatório antes de devolver o seu resultado, as respostas não nos serão devolvidas na ordem dos pedidos. Por isso, passamos o objeto [UiResponse], que contém, entre outras informações, o ID da solicitação:
// id du client (requête)
private int idClient;
// réponse du service
private ServiceResponse serviceResponse;
// nom du thread d'observation
private String observedOn;
// heure de la requête
private String requestAt;
// heure de la réponse
private String responseAt;
- O tipo de resposta do serviço assíncrono é do tipo [Observable<UiResponse>]. O tipo [Observable<>] é fornecido pela biblioteca Rx. O resultado [Observable<UiResponse>] indica que o método [getAleas] fornece um fluxo de valores do tipo [UiResponse], que são enviados um a um para o seu observador;
Agora, vamos ver a implementação desta interface:
public class RxService implements IRxService {
// service
private IService service;
// manufacturer
public RxService(IService service) {
this.service = service;
}
@Override
public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse) {
return Observable.create(subscriber -> {
try {
uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
subscriber.onNext(uiResponse);
} catch (Exception e) {
subscriber.onError(e);
} finally {
subscriber.onCompleted();
}
});
}
}
- linhas 7–9: fornecemos ao construtor uma referência à interface síncrona [IService]. Esta interface irá tratar da geração de números aleatórios;
- o observável devolvido pelo método [getAleas] é construído pelo método estático [Observable.create]. Este método permite-nos construir uma implementação assíncrona a partir de uma síncrona;
- linha 13: o parâmetro do método estático [Observable.create] é aqui uma função lambda que recebe um tipo [Subscriber] como parâmetro, novamente um tipo Rx. Um [Subscriber] é um objeto que subscreve um fluxo de observáveis, ou seja, um fluxo de dados entregues de forma assíncrona. Aqui, usamos três métodos deste subscritor:
- [Subscriber.onNext] para lhe passar dados (linha 16);
- [Subscriber.onError] para lhe enviar uma exceção (linha 18);
- [Subscriber.onCompleted] para indicar ao subscritor que o fluxo de dados terminou (linha 20);
Pode haver vários assinantes para o mesmo observável. Aqui, teremos apenas um assinante a subscrever um fluxo de um único dado, aquele produzido nas linhas 15–16. Os dados são produzidos pela implementação síncrona do serviço (linha 15) e devolvidos ao assinante (linha 16).
Mesmo que tudo isto permaneça um pouco obscuro, não se pode deixar de ficar impressionado com a extrema concisão desta implementação assíncrona do serviço.
2.7. A Chamada Assíncrona

Vamos agora examinar a chamada síncrona [5] feita pela camada [swing] ao serviço [2]:
private void doGenerateWithRxService() {
// start waiting
beginWaiting();
// we ask for the random numbers
Observable<UiResponse> observables = Observable.empty();
for (int i = 0; i < nbRequests; i++) {
UiResponse uiResponse = new UiResponse();
uiResponse.setIdClient(i);
// scheduler
int schedulerIndex = jComboBoxSchedulers.getSelectedIndex();
switch (schedulerIndex) {
case 0:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.io()));
break;
...
}
}
...
}
- linhas 6–10: execução das [nbRequests] solicitações solicitadas pelo utilizador;
- linhas 7-8: preparação do objeto [UiResponse] exigido pelo método [getAleas] do serviço assíncrono (linha 13). Isto envolve principalmente o registo do [idClient] da solicitação;
- linha 13: o método [getAleas] do serviço assíncrono é chamado. Ele retorna um objeto [Observable<UiResponse>]. Esta chamada ainda não invoca o serviço síncrono. Voltemos ao código do [getAleas] assíncrono:
@Override
public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse) {
return Observable.create(subscriber -> {
try {
uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
subscriber.onNext(uiResponse);
} catch (Exception e) {
subscriber.onError(e);
} finally {
subscriber.onCompleted();
}
});
}
O código nas linhas 4–11, que chama o serviço síncrono, é executado apenas quando um assinante se regista. Enquanto não houver assinantes, este código não é executado.
Voltemos ao código do método [doGenerateWithRxService]:
- linha 5: criamos um observável vazio (nada é observado);
- linha 13: criamos um observável cujo fluxo será a fusão dos fluxos assíncronos [nbRequests] associados às solicitações [nbRequests]. Isto é conseguido utilizando o método [Observable.mergeWith], que permite a fusão de dois fluxos assíncronos. Na terminologia Rx, [mergeWith] é chamado de operador de fluxo. Estes operadores têm a característica de que o resultado da operação é, na maioria dos casos, outro [Observable]. Por fim, após a linha 17, a variável [observables] refere-se a um único fluxo composto pelas [nbRequests] respostas assíncronas feitas pelo serviço assíncrono;
- linha 13: a operação de fusão poderia ter sido escrita como:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse));
mas escrevemos:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.io()));
Aqui, utilizámos o operador [subscribeOn] no observável [rxService.getAleas]. Como é frequente, o resultado é novamente um observável. O operador [subscribeOn] especifica que o observável deve ser executado numa thread fornecida por um [Scheduler]. Existem vários [Schedulers] possíveis, adequados a diferentes situações. Na GUI, disponibilizámos várias opções para ver como diferem:
![]() |
Isto resulta no seguinte código:
private void doGenerateWithRxService() {
// start waiting
beginWaiting();
// we ask for the random numbers
Observable<UiResponse> observables = Observable.empty();
for (int i = 0; i < nbRequests; i++) {
UiResponse uiResponse = new UiResponse();
uiResponse.setIdClient(i);
// scheduler
int schedulerIndex = jComboBoxSchedulers.getSelectedIndex();
switch (schedulerIndex) {
case 0:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.io()));
break;
case 1:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.computation()));
break;
case 2:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.newThread()));
break;
case 3:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.trampoline()));
break;
case 4:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.immediate()));
break;
}
}
...
}
Vamos rever o código nas linhas 12–14. O agendador [Schedulers.io()] atribui um novo segmento de execução a cada observável. Se seguirmos o código:
- linha 5: temos um observável vazio;
- linha 13, iteração 1: observáveis é a lista [observável0/thread0] (Observável observável0 a ser executado na thread thread0);
- linha 13, iteração 2: observáveis é a lista [observável0/thread0, observável1/thread1];
- etc...
Por fim, após a linha 28, temos um observável resultante da fusão de [nbRequests] observáveis em execução em [nbRequests] threads diferentes. Nem todos os agendadores funcionam desta forma, como veremos durante os testes.
Vamos continuar a examinar o código para chamar o serviço assíncrono:
private void doGenerateWithRxService() {
// start waiting
beginWaiting();
// we ask for the random numbers
Observable<UiResponse> observables = Observable.empty();
for (int i = 0; i < nbRequests; i++) {
...
}
// observer
observables = observables.observeOn(SwingScheduler.getInstance());
// these observables are executed
subscriptions.add(observables.subscribe(uiResponse -> {
updateUi(uiResponse);
} , th -> {
System.out.println(th);
doCancel();
} , this::doCancel));
}
- Vimos que, quando chegamos à linha 10, temos um único observável, uma fusão de [nbRequests] observáveis que podem ou não ser executados em [nbRequests] threads diferentes, dependendo do agendador escolhido pelo utilizador;
- Linha 10: O operador [observeOn] permite-nos especificar em que thread queremos recuperar os dados do observável, neste caso os objetos [nbRequests] do tipo [UiResponse]. Numa interface Swing, não temos escolha. Qualquer atualização da interface deve ser realizada na thread do ciclo de eventos. Aqui, os dados do observável serão exibidos num componente JList do Swing. A thread [SwingScheduler.getInstance()] representa a thread do ciclo de eventos. A classe [SwingScheduler] não provém da biblioteca RxJava, mas sim da biblioteca RxSwing;
- quando chegamos à linha 12, o serviço síncrono ainda não foi chamado porque o observável na linha 10 ainda não tem um assinante. As linhas 12–17 fornecem um, utilizando o operador [subscribe]. Os parâmetros deste operador são três funções lambda:
- a primeira [uiResponse -> {updateUi(uiResponse);}] recebe como parâmetro um dos objetos [UiResponse] produzidos pelo observável. Recorde-se que, neste caso, teremos [nbRequests] objetos deste tipo. O método associado, updateUi neste caso, deve processar este resultado;
- a segunda [th -> {System.out.println(th);doCancel();}] recebe um tipo [Throwable] como parâmetro, neste caso uma exceção que ocorreu durante a execução do observável. O método associado deve processar esta informação. Aqui, exibimo-la na consola (linha 15) e cancelamos a execução, o que atualizará certos elementos da GUI;
- o terceiro [this::doCancel] é chamado quando o observável sinaliza que não tem mais dados para transmitir. Aqui, o observável é a união de [nbRequests] observáveis. O observável resultante indicará que terminou quando todos os observáveis que o compõem tiverem, por sua vez, sinalizado que terminaram o seu trabalho. Assim, quando esta terceira função lambda é executada, já recebemos todos os dados. O método local [doCancel] atualiza a GUI para refletir que a execução está concluída;
A variável [subscriptions] é definida da seguinte forma:
// les souscriptions aux observables
protected List<Subscription> subscriptions = new ArrayList<Subscription>();
O tipo [Subscription] representa uma subscrição, ou seja, a ligação entre um subscritor [Subscriber] e aquilo que este está a observar [Observable]. Utilizámos aqui uma lista de subscrições, embora neste exemplo exista apenas uma. O método local [doCancel], executado quando o observável sinaliza que já não tem mais dados para transmitir, é o seguinte:
@Override
protected void doCancel() {
// fin attente
endWaiting();
// dans le cas de souscriptions
if (jCheckBoxRxSwing.isSelected() && subscriptions != null) {
subscriptions.forEach(Subscription::unsubscribe);
}
}
- A linha 7 cancela a subscrição de todos os subscritores do observável;
A partir desta breve explicação, podemos retirar os seguintes pontos-chave:
- o tipo [Observable] denota um fluxo de valores, que são enviados um a um para assinantes ou observadores;
- o tipo [Subscriber] denota um subscritor do tipo [Observable];
- o tipo [Subscription] denota uma subscrição, ou seja, a ligação entre um [Subscriber] e um [Observable];
- o tipo [Observable] suporta operadores [mergeWith, empty, subscribeOn, observeOn, ...], a maioria dos quais produz observáveis. Estes operadores são utilizados para configurar o observável antes de este ser executado:
- o que observar;
- o thread no qual o observável é executado;
- o thread no qual o assinante recebe dados do observável;
- Existem dois tipos de observáveis: [cold] e [hot]. Um observável cold é executado na íntegra para cada novo subscritor. Se cada execução produzir os mesmos dados, cada novo subscritor recebe os mesmos dados que o anterior. Um observável hot geralmente produz dados de forma contínua. Quando um subscritor se inscreve, recebe os dados emitidos a partir do momento da sua subscrição. Não recebe dados que possam ter sido emitidos anteriormente. No nosso exemplo, o observável é cold: é totalmente reexecutado para cada novo subscritor. O que é que é realmente executado no nosso exemplo? Para descobrir, precisamos de voltar à definição do observável observado:
@Override
public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse) {
return Observable.create(subscriber -> {
try {
uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
subscriber.onNext(uiResponse);
} catch (Exception e) {
subscriber.onError(e);
} finally {
subscriber.onCompleted();
}
});
}
Para cada novo assinante, a função lambda, que é um parâmetro do método [Observable.create] (linha 3), é reexecutada. Portanto, as linhas 4–11 são executadas para cada novo assinante [subscriber];
2.8. Testar chamadas assíncronas
Começamos por demonstrar o efeito dos diferentes agendadores disponíveis. Para tal, utilizamos os seguintes parâmetros:
![]() |
Definimos [1-2] com valores pequenos para que, mesmo que as solicitações sejam executadas na mesma thread, não tenhamos de esperar muito tempo.
2.8.1. com o agendador [Schedulers.io]
![]() |
Podem ser observados os seguintes pontos:
- as respostas são recebidas numa ordem que não corresponde à ordem dos pedidos (ver idClient);
- cada pedido foi executado num segmento de execução diferente;
- a GUI já não fica bloqueada desta vez:
- é possível alternar entre os separadores;
- vemos os dados a chegar;
- não há tempo para ver o botão [Cancelar] porque a execução é demasiado rápida. Iremos destacar isto noutro teste;
2.8.2. com o agendador [Schedulers.computation]
![]() |
Podem ser observados os seguintes pontos:
- as respostas são recebidas numa ordem que não corresponde à ordem dos pedidos (ver idClient);
- os pedidos foram executados em 8 threads;
- a thread n.º 3 foi utilizada para os pedidos 8 e 0;
- a thread n.º 4 foi utilizada para os pedidos 9 e 1;
- cada uma das outras solicitações tinha um tópico diferente;
O agendador [Schedulers.computation] utiliza tantos threads quantos os núcleos existentes na máquina em uso. Esta informação é obtida através da expressão [Runtime.getRuntime().availableProcessors()].
2.8.3. com o agendador [Schedulers.newThread]
![]() |
O comportamento é semelhante ao do agendador [Schedulers.io].
2.8.4. com os agendadores [Schedulers.trampoline, Schedulers.immediate]
![]() |
O comportamento é síncrono. Todos os pedidos são executados na thread do evento loop. Este resultado não deve ser generalizado; significa apenas que, neste exemplo específico, ambos os agendadores funcionaram de forma síncrona.
2.9. Casos extremos
Neste exemplo, vamos trabalhar com agendadores que suportam operação assíncrona. Primeiro, aumentamos o número de pedidos para 100 utilizando o agendador [Schedulers.computation], que aqui é executado em 8 threads. Obtemos o seguinte resultado:
![]() |
- em [1], o botão [Cancel] está presente e pode ser utilizado (operação assíncrona);
Agora, vamos deixar a execução chegar ao fim:
![]() |
Vemos em [2] que a execução das 100 solicitações demorou cerca de 4 segundos (em 8 threads).
Agora, vamos executar estas mesmas 100 solicitações utilizando o agendador [Schedulers.newThread], que executa cada solicitação numa thread separada:
![]() |
Em [1], vemos que a execução das 100 solicitações (em 100 threads) demorou meio segundo. Isto é, portanto, significativamente mais rápido do que com o agendador [Schedulers.computation].
Agora, vamos fazer 800 pedidos nas mesmas condições, continuando a utilizar o agendador [Schedulers.newThread]. Obtemos os seguintes resultados:
![]() |
As 800 solicitações são executadas em cerca de 1 segundo.
Quando aumentamos este número (para mais de 2.500 pedidos na minha máquina — executados em 1,5 segundos — este número depende, evidentemente, em grande medida do ambiente de execução), acabamos por obter a seguinte exceção:
![]() |
Temos, portanto, um estouro de pilha. Os testes mostram que o comportamento do agendador [Schedulers.newThread] não é determinístico. Pode ocorrer a exceção anterior, executar novos testes e, em seguida, voltar à configuração que causou a exceção e já não a encontrar.
2.10. Conclusão
Demonstrámos um exemplo de utilização da biblioteca Rx. Vamos resumir o que aprendemos:
Começámos com a seguinte arquitetura:

- em [4], a camada [swing] efetuava chamadas síncronas à camada [service];
- em [5], a camada [swing] efetuava chamadas assíncronas à camada [rxService], que, por sua vez, efetuava uma chamada síncrona [6] à camada [service];
A primeira coisa que observámos foi que a biblioteca Rx facilitava a criação da interface assíncrona [rxService] a partir da interface síncrona [service] (ver Secção 2.4). Esta é uma lição importante, pois significa que podemos facilmente transformar uma aplicação síncrona numa assíncrona.
Na camada [swing], foram escritos dois métodos distintos:
- um para efetuar chamadas síncronas ao serviço (ver secção 2.4);
- outro para efetuar chamadas assíncronas ao mesmo (ver secção 2.7);
A escrita de chamadas assíncronas revelou-se significativamente mais complexa do que a escrita de chamadas síncronas. No entanto, quem já trabalhou com programação concorrente envolvendo múltiplas threads que precisam de ser sincronizadas irá constatar que a solução Rx é mais simples de escrever e evita todos os problemas difíceis de sincronização e comunicação entre threads. Neste artigo, destacámos os seguintes pontos-chave:
- o tipo [Observable] denota um fluxo de eventos (valores) que podem (mas não precisam) ser assíncronos e que podem ser observados;
- o tipo [Subscriber] denota um subscritor de um tipo [Observable];
- o tipo [Subscription] denota uma subscrição, ou seja, a ligação entre um [Subscriber] e um [Observable];
- O tipo [Observable] suporta operadores [mergeWith, empty, subscribeOn, observeOn, ...] que, na sua maioria, produzem observáveis. Estes operadores são utilizados para configurar o observável antes da sua execução:
- o que observar;
- a thread na qual o observável é executado;
- o thread no qual o assinante recebe dados do observável;
- existem dois tipos de observáveis: [cold] e [hot]. Um observável cold é executado na íntegra para cada novo subscritor. Se cada execução produzir os mesmos dados, cada novo subscritor recebe os mesmos dados que o anterior. Um observável hot geralmente produz dados de forma contínua. Quando um subscritor se inscreve, recebe os dados emitidos a partir do momento da sua inscrição. Não recebe quaisquer dados que possam ter sido emitidos anteriormente. No nosso exemplo, o observável é cold: é totalmente reexecutado para cada novo subscritor.
Agora que vimos um exemplo que demonstra o valor da biblioteca Rx, vamos explorá-la com mais detalhe.
A biblioteca Rx possui muitos métodos com parâmetros genéricos nas suas assinaturas. Iremos rever brevemente estas assinaturas (secção 3). Os parâmetros destes métodos são, na sua maioria, interfaces funcionais (Java 8), ou seja, interfaces com apenas um único método. Os parâmetros reais devem, portanto, ser instâncias destas interfaces. Antes do Java 8, era prática comum implementar uma interface utilizando uma classe anónima. Com o Java 8, se a interface for uma interface funcional, é mais conciso implementá-la utilizando uma função lambda. Iremos, portanto, introduzir estas (Secção 4). Uma vez feito isto, iremos introduzir a classe [Stream] (Secção 5), que permite processar coleções Java utilizando funções lambda. Esta classe é interessante porque a classe [Observable] do RxJava utiliza:
- certos métodos;
- a mesma forma de encadear métodos para processar o mesmo observável;
A seguir, apresentaremos as interfaces funcionais específicas da biblioteca RxJava (secção 6). Continuaremos com os principais elementos da biblioteca Rx [Observable, Subscriber, Subscription, operadores] (secção 7). A classe [Observable] possui dezenas de operadores que, por sua vez, são sobrecarregados várias vezes. Isto cria inicialmente uma complexidade significativa, porque estes operadores e as suas sobrecargas diferem, por vezes, apenas num único detalhe, e é difícil, sem experiência, saber qual o operador a utilizar. Apresentaremos apenas um número limitado de operadores e, na maioria das vezes, ignoraremos as suas sobrecargas.
Toda a secção anterior será abordada utilizando a biblioteca RxJava em aplicações de consola simples. Assim que tivermos dominado a biblioteca RxJava, iremos utilizá-la em dois tipos de aplicações gráficas:
- na Secção 8, revisitaremos o exemplo de aplicação Swing para explorá-lo com mais detalhe. Utilizaremos então a biblioteca RxSwing;
- na Secção 9, iremos criar uma aplicação Android utilizando a biblioteca RxAndroid;
Quando tudo isto estiver concluído, o leitor terá as ferramentas necessárias para se virar sozinho. Provavelmente levará algum tempo até que consiga utilizar a biblioteca Rx de forma intuitiva. Achei esta biblioteca particularmente interessante. No entanto, achei-a complexa de compreender e a curva de aprendizagem foi íngreme. Espero que este documento encurte essa curva de aprendizagem para o leitor. Parece-me que vale bem a pena o esforço.
















