7. A biblioteca RxJava
A biblioteca RxJava baseia-se no seguinte conceito: um fluxo de elementos do tipo T Observable<T> é observado por um ou mais assinantes (assinantes, observadores, consumidores) Subscriber<T>. A biblioteca RxJava permite que o fluxo Observable<T> seja executado na thread T1 e o seu observador Subscriber<T> na thread T2, sem que o programador tenha de se preocupar com a gestão do ciclo de vida dessas threads ou com problemas naturalmente complexos, tais como a partilha de dados entre threads e a sua sincronização para executar uma tarefa global. Facilita, assim, a programação assíncrona.
Um fluxo Observable<T> produz elementos do tipo T, que podem ser observados à medida que são produzidos. Se o observador e o observável (um termo usado de forma genérica para se referir ao tipo Observable<T>) estiverem na mesma thread, então o observável só pode produzir o elemento (i+1) depois de o observador ter consumido o elemento i. Existem poucos casos em que esta arquitetura é útil. Se o observador e o observável não estiverem na mesma thread, então o observável e o seu observador comportam-se de forma autónoma: o observável emite ao seu próprio ritmo e o observador consome ao seu próprio ritmo. É aqui que reside o valor da biblioteca. Até agora, discutimos apenas um único observador. Na realidade, um observável pode ter qualquer número de observadores.
A biblioteca RxJava é particularmente adequada para a arquitetura descrita na Secção 2 da introdução e resumida aqui:

- em [1], uma camada de serviço fornece serviços, alguns dos quais demoram muito tempo a obter (pedidos de rede, por exemplo);
- esta camada de serviço é invocada por uma interface gráfica de utilizador [1] (Swing, Android, JavaFX). Se a camada de serviço for executada na mesma thread que o método [Swing] que a utiliza, a interface gráfica de utilizador congela (deixa de responder) enquanto aguarda o resultado do serviço;
- Em [2], uma camada de adaptação fina implementada com RxJava permite que a camada GUI receba uma implementação assíncrona do mesmo serviço: este serviço pode ser executado numa thread diferente do método da camada GUI que o invoca. Neste caso, a GUI [3] permanece responsiva: o utilizador pode continuar a interagir com ela, por exemplo, acionando um novo pedido de rede em paralelo com o primeiro e, mais importante ainda, pode ter a opção de cancelar processos que demorem demasiado tempo — algo impossível se a GUI estiver congelada;
- A chamada [4] é síncrona, enquanto as chamadas [5-6] são assíncronas;
Nesta arquitetura, a camada [2] fornece serviços que devolvem tipos Observable<T> aos quais os métodos da camada gráfica [3] podem subscrever. Um serviço na camada [2] entrega então os seus resultados um a um, e a camada [3] pode reagir a cada um deles, por exemplo, atualizando um ou mais componentes da interface gráfica do utilizador.
A classe Observable<T> possui dezenas de métodos. Este é um dos desafios da biblioteca: é muito rica e é difícil compreender todas as suas possibilidades. Apresentaremos alguns deles. O domínio dos outros métodos virá com o tempo.
7.1. Criar observáveis e subscrevê-los
7.1.1. Exemplo-01: o método [Observable.from]
![]() |
Considere o seguinte código:
package dvp.rxjava.observables;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import java.util.Arrays;
public class Exemple01 {
public static void main(String[] args) {
// observable integers
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
obs1.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.printf("next : %s%n", integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println(throwable);
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed");
}
});
}
}
- Linha 12: Criamos um tipo Observable<Integer> a partir de uma lista de inteiros.
A classe Observable<T> é um fluxo de elementos do tipo T que podem ser observados — de preferência de forma assíncrona, mas não necessariamente — à medida que são produzidos. A sua definição é a seguinte:
![]() |
Como mencionado anteriormente, a classe Observable<T> possui dezenas de métodos. Alguns são semelhantes aos da classe Stream<T>, discutida na Secção 5. A documentação do RxJava inclui «diagramas de bolinhas» [2] que ilustram como estes métodos funcionam:
- A linha 3 ilustra as emissões do observável ao longo do tempo;
- o método [4] é aplicado aos elementos emitidos pelo observável. Geralmente, produz um novo observável;
- a linha 5 mostra o novo observável obtido;
O método [Observable.from] tem a seguinte assinatura:
![]() |
O método estático [Observable.from] permite criar um Observable<T> a partir de uma coleção de elementos do tipo T. Esta é uma forma muito simples de começar a trabalhar com observáveis. A linha:
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
irá, portanto, emitir três elementos. Não os emite imediatamente. Irá emiti-los na íntegra sempre que um subscritor se registar. Isto é designado por observável frio. O observável reemite os seus elementos para cada novo subscritor.
Podemos considerar a instrução anterior como uma ação de configuração para o observável. É configurado uma vez e executado n vezes se aparecerem n assinantes.
Como se inscreve?
Uma forma de o fazer é utilizar o método [Observable.subscribe], cuja definição aqui utilizada é a seguinte:
![]() |
- o primeiro parâmetro [Action1<T> onNext] (ver Secção 6.2) do método é o método a ser executado quando o observável emite um novo elemento T;
- o segundo parâmetro [Action1<Throwable> onError] do método é o método a ser executado quando o observável lança uma exceção;
- o terceiro parâmetro [Action0 onComplete] (ver Secção 6.1) do método é o método a ser executado quando o observável emite uma exceção;
- o método retorna um tipo [Subscription];
O tipo [Subscription] representa uma subscrição do observável. A sua definição é a seguinte:
![]() |
O valor desta interface [1] reside no seu método [2], que permite cancelar uma subscrição.
No nosso exemplo, o código para subscrever o observável é o seguinte:
obs1.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.printf("next : %s%n", integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println(throwable);
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed");
}
});
- linha 1: o resultado do tipo [Subscription] é ignorado;
- linhas 1–15: os três parâmetros são instâncias de classes anónimas. Também utilizaremos lambdas. A vantagem das classes anónimas é que os tipos de dados esperados pelo único método destas classes são claramente visíveis;
- linhas 2–5: implementação do primeiro parâmetro do tipo [Action1<Integer>];
- linhas 6–10: implementação do segundo parâmetro do tipo [Action1<Throwable>];
- linhas 11–15: implementação do terceiro parâmetro do tipo [Action0];
O código completo é o seguinte:
package dvp.rxjava.observables;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import java.util.Arrays;
public class Exemple01 {
public static void main(String[] args) {
// observable integers
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
// subscription
obs1.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.printf("next : %s%n", integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println(throwable);
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed");
}
});
}
}
O observável na linha 12 começa a emitir os seus três elementos assim que o método [subscribe] é chamado na linha 14. A partir desse momento:
- para cada elemento emitido, as linhas 15–18 são executadas.
- quando os 3 elementos estiverem concluídos, as linhas 24–29 são executadas;
- as linhas 19–24 nunca serão executadas porque o observável não emite uma exceção aqui;
Por predefinição, o observável e o observador são executados na mesma thread. Existem alguns observáveis predefinidos que são executados numa thread diferente da thread principal (aqui, a thread do método main), mas para a maioria deles, não é esse o caso. Assim, aqui, tudo acontece na thread do método main:
- o observável emite o elemento 1;
- as linhas 15–18 são executadas e exibem este elemento;
- o observável emite o elemento 2;
- as linhas 15–18 executam e apresentam este elemento;
- o observável emite o elemento 3;
- as linhas 15–18 executam e exibem este elemento;
- o observável emite a notificação [completed];
- as linhas 24–29 são executadas;
Eis o que os resultados mostram:
A classe [Example02] reutiliza [Example01], desta vez utilizando funções lambda como parâmetros para o método [Observable.subscribe]:
package dvp.rxjava.observables;
import java.util.Arrays;
import rx.Observable;
public class Exemple02 {
public static void main(String[] args) {
// observable integers
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
// subscription
obs1.subscribe(
(integer) -> System.out.printf("next : %s%n", integer),
(th) -> System.out.println(th),
() -> System.out.println("completed"));
}
}
7.1.2. Exemplo-03: A Classe Observador
![]() |
O método [Observable.subscribe], que permite subscrever um observável, tem várias versões, incluindo as seguintes:
package dvp.rxjava.observables;
import java.util.Arrays;
import rx.Observable;
import rx.Observer;
public class Exemple03 {
public static void main(String[] args) {
// observable integers
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
// subscription
obs1.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("completed");
}
@Override
public void onError(Throwable th) {
System.out.printf("throwable %s", th);
}
@Override
public void onNext(Integer integer) {
System.out.printf("next : %s%n", integer);
}
});
};
}
Linha 13: Em vez de passar três parâmetros para o método [subscribe], passamos-lhe um tipo [Observer] da seguinte forma:
![]() |
O tipo [Observer] é uma interface com três métodos:
- [onNext(T t)], que é chamado sempre que o observável emite um elemento t;
- [onError(Throwable th)], que é chamado quando o observável lança uma exceção th;
- [onCompleted], que é chamado quando o observável indica que terminou de emitir;
O código funciona de forma semelhante ao que foi explicado anteriormente. Obtemos os seguintes resultados:
7.1.3. Exemplo-04: O método [Observable.create]
![]() |
O método estático Observable.create é definido da seguinte forma:
![]() |
- O método [create] retorna um tipo Observable<T>;
- o parâmetro do método [create] é uma função do tipo [Observable.OnSubscribe<T>] definida da seguinte forma:
![]() |
O tipo [Observable.OnSubscribe<T>] é uma interface funcional que, por sua vez, estende a interface funcional [Action1<Subscriber<? super T>>]. O método [call] desta interface espera um tipo [Subscriber] (subscriber, observer) definido da seguinte forma:
![]() |
Vemos em [1] que a classe [Subscriber<T>] implementa a interface [Observer<T>] apresentada na Secção 7.1.2.
Em última análise, o método [<T> Observable.create]:
- toma como parâmetro uma instância do tipo [Observable.OnSubscribe<T>] com um único método: void call(Subscriber<T> s). O tipo [Subscriber<T>] estende o tipo [Observer<T>] e, portanto, possui os métodos onNext, onError e onCompleted;
- retorna um tipo Observable<T>;
O método [<T> Observable.create] retorna um observável configurado. Ainda não foram emitidos elementos. Quando um subscritor [Subscriber<T> s] subscreve este observável, o método [void call(s)] da função passada como parâmetro ao método [<T> Observable.create] é então chamado. A sua função é emitir elementos t do tipo T e chamar o método do observador [s.onNext(t)] em cada emissão. Quando isto estiver concluído, o método [s.onCompleted(t)] do observador deve ser chamado e o método [call] deve terminar. Se o método [call] encontrar uma exceção th, o método [s.onError(th)] do observador deve ser chamado e o método [call] deve terminar;
Para ilustrar este comportamento complexo, utilizaremos o seguinte código [Exemplo04]:
package dvp.rxjava.observables;
import rx.Observable;
import rx.Subscriber;
import java.util.Random;
public class Exemple04 {
public static void main(String[] args) {
// observable configuration of reals
Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
@Override
public void call(Subscriber<? super Double> subscriber) {
for (int i = 0; i < 3; i++) {
// emission element i
subscriber.onNext(new Random((i + 1)).nextDouble());
}
// end of issue
subscriber.onCompleted();
}
});
// subscription and therefore emission
obs1.subscribe((d) -> System.out.printf("onNext %s%n", d), (th) -> System.out.printf("onError %s%n", th),
() -> System.out.println("onCompleted"));
}
}
- linha 11: é criado um observável que emite tipos Double;
- linhas 11–21: o parâmetro do método [create] é instanciado com uma classe anónima contendo o único método [call] das linhas 12–20. O observável criado na linha 11 está pronto para emitir, mas só o fará quando um observador chegar;
- linhas 13–21: o método [call] recebe uma referência a um observador;
- linhas 14–17: são emitidos três elementos para o observador;
- linha 19: notifica o observador de que a emissão terminou;
- linhas 23–24: Subscrição do observável da linha 11. Implementamos os três parâmetros [onNext, onError, onCompleted] do método [subscribe] utilizando três lambdas. Esta subscrição irá criar o subscritor [Subscriber<Double>], que será passado para o método [call] na linha 13. A emissão de elementos terá então início;
- tudo acontece na mesma thread: observável e observador;
Obtemos os seguintes resultados:
O método [Observable.create] permite criar um observável a partir de qualquer evento. Este é o método que utilizámos na Secção 2 da introdução para transformar uma interface síncrona numa assíncrona.
7.1.4. Exemplo-05: Refatoração do [Exemplo-04]
![]() |
O exemplo seguinte apresenta uma nova versão do método estático [Observable.subscribe]:
package dvp.rxjava.observables;
import rx.Observable;
import rx.Subscriber;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
public class Exemple05 {
public static void main(String[] args) {
// configuration of a real observable
Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
@Override
public void call(Subscriber<? super Double> subscriber) {
showInfos("Observable.call start");
for (int i = 0; i < 3; i++) {
// waiting
try {
Thread.sleep(500 - i * 100);
} catch (InterruptedException e) {
// error
subscriber.onError(e);
}
// action
double value = new Random().nextInt(100) * 1.2;
showInfos(String.format("Observable.call onNext(%s)", value));
subscriber.onNext(value);
}
// finish
showInfos(String.format("Observable.call onCompleted"));
subscriber.onCompleted();
}
});
// a subscriber
Subscriber<Double> subscriber = new Subscriber<Double>() {
@Override
public void onCompleted() {
showInfos("Subscriber.onCompleted");
}
@Override
public void onError(Throwable e) {
showInfos(String.format("Subscriber.onError (%s)", e));
}
@Override
public void onNext(Double aDouble) {
showInfos(String.format("Subscriber.onNext (%s)", aDouble));
}
};
// subscription
showInfos("avant souscription");
obs1.subscribe(subscriber);
showInfos("après souscription");
}
private static void showInfos(String message) {
System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
new SimpleDateFormat("ss:SSS").format(new Date()));
}
}
- linha 56: a nova versão do método estático [Observable.subscribe] aceita o tipo [Subscriber] como parâmetro, que introduzimos no parágrafo anterior;
- linhas 37–52: o assinante (observador). Ele implementa a interface Observer com os seus três métodos onNext, onError e onCompleted;
- Linhas 61–64: A partir daqui, vamos concentrar-nos nos threads em que o observável e o seu observador são executados;
- linha 62: o nome da thread;
- linha 63: a hora atual expressa em segundos e milissegundos. Isto permitir-nos-á acompanhar ao longo do tempo a emissão de elementos pelo observável e o seu processamento pelo observador;
- Este código tem a mesma funcionalidade que o código anterior. Simplesmente refatorámos este último;
Os resultados obtidos são os seguintes:
- Linha 1 dos resultados: antes da linha 56 do código, ainda não aconteceu nada. O observável foi simplesmente configurado;
- Linha 2 dos resultados: a linha 56 do código aciona uma chamada ao método [call] na linha 15. Linha 3: o número real 80,39 é emitido para o observador;
- Linha 4: O observador recebe o número emitido;
- linhas 5–8: o processo anterior repete-se duas vezes;
- linha 9: o observável envia a notificação de fim de transmissão;
- linha 10: o observador recebe-a;
- linha 11: exibida pela linha 57 do código;
Podemos ver, portanto, que a única linha de subscrição 56 fez com que as linhas 2–10 dos resultados fossem exibidas. Ao começar a utilizar a biblioteca RxJava, questiona-se como as coisas estão interligadas, particularmente as ligações entre o observador e o observável. Aqui vemos que a linha 56, a subscrição do observável,
- desencadeou a emissão de todos os elementos do observável;
- que o observável e o observador são executados na mesma thread;
- e que, por causa disso, observamos a sequência: emitir elemento i, observar elemento i, emitir elemento (i+1), observar elemento (i+1), ...
Lembre-se de que o emissor estava à espera antes de emitir os seus elementos:
// attente
try {
Thread.sleep(500 - i * 100);
} catch (InterruptedException e) {
// erreur
subscriber.onError(e);
}
onde i na linha 3 representa o número de emissão (0 <= i < 3). Se analisarmos os tempos de emissão dos elementos do observável:
- linhas 2, 3: o elemento 0 foi emitido aproximadamente 500 ms após o início da subscrição;
- linhas 3, 5: o elemento 1 foi emitido aproximadamente 400 ms após o elemento 0;
- linhas 5, 7: o elemento 2 foi emitido aproximadamente 300 ms após o elemento 1;
7.2. Thread de execução, thread de observação
7.2.1. Exemplo-06: Observável e observador numa thread diferente de [main]
![]() |
Reestruturamos o exemplo anterior da seguinte forma [Exemplo 06]:
package dvp.rxjava.observables;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class Exemple06 {
public static void main(String[] args) {
// gatekeeper
CountDownLatch latch = new CountDownLatch(1);
// configuration of a real observable
Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
@Override
public void call(Subscriber<? super Double> subscriber) {
showInfos("Observable.call start");
for (int i = 0; i < 3; i++) {
// waiting
try {
Thread.sleep(500 - i * 100);
} catch (InterruptedException e) {
// error
subscriber.onError(e);
}
// action
double value = new Random().nextInt(100) * 1.2;
showInfos(String.format("Observable.call onNext(%s)", value));
subscriber.onNext(value);
}
// finish
showInfos(String.format("Observable.call onCompleted"));
subscriber.onCompleted();
}
});
// a subscriber
Subscriber<Double> subscriber = new Subscriber<Double>() {
@Override
public void onCompleted() {
showInfos("Subscriber.onCompleted");
// we lower the barrier
latch.countDown();
}
@Override
public void onError(Throwable e) {
showInfos(String.format("Subscriber.onError (%s)", e));
}
@Override
public void onNext(Double aDouble) {
showInfos(String.format("Subscriber.onNext (%s)", aDouble));
}
};
// suite observable configuration
obs1 = obs1.subscribeOn(Schedulers.computation());
// subscription
showInfos("avant souscription");
obs1.subscribe(subscriber);
// waiting at the gate
try {
showInfos("début attente barrière");
latch.await();
showInfos("fin attente barrière");
} catch (InterruptedException e1) {
System.out.println(e1);
}
showInfos("après souscription");
}
private static void showInfos(String message) {
System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
new SimpleDateFormat("ss:SSS").format(new Date()));
}
}
- Linha 16: Criamos um guardrail (semáforo) utilizando um objeto [CountDownLatch]. Este objeto é utilizado para sincronizar threads entre si. Aqui, é inicializado com o valor 1, ao qual nos referiremos como o valor do guardrail (ou semáforo). Uma thread aguarda o guardrail utilizando a seguinte operação:
latch.await();
A thread fica bloqueada se o valor do latch for >0. Uma thread pode incrementar ou decrementar o valor interno do latch. Linha 48: o valor do latch é decrementado em 1.
- Linha 63: o observável é configurado para ser executado numa thread fornecida pelo agendador [Schedulers.computation()]. Este agendador pode fornecer tantas threads quantos forem os núcleos na máquina de execução. A secção sobre a aplicação de exemplo demonstrou a utilização de outros agendadores (ver Secção 2.8);
O princípio do código é o seguinte:
- o método [main] é executado na thread principal;
- linha 66: começa a emitir elementos a partir do observável. Estes serão emitidos numa thread diferente da thread principal;
- linha 70: a thread principal é bloqueada porque a barreira tem o valor 1 (ver linha 16). Só pode continuar quando este valor mudar para 0. Isto acontece na linha 48. É o observador que baixa a barreira quando recebe a notificação de que o observável terminou a emissão;
A execução produz os seguintes resultados:
- linha 1: a subscrição está prestes a ocorrer;
- linha 2: isto desencadeia a execução do método [call] na thread [RxComputationThreadPool-1]. Temos agora uma execução paralela com duas threads;
- linha 3: por uma razão desconhecida, a thread [RxComputationThreadPool-1] cedeu o controlo. A thread [main] assume então o controlo e é bloqueada pela barreira (linha 70 do código). A partir deste ponto, apenas a thread [RxComputationThreadPool-1] pode operar;
- linhas 4–11: observamos o comportamento visto anteriormente entre o observável e o seu observador, mas agora tudo ocorre na thread [RxComputationThreadPool-1];
- linhas 12-13: o observador baixou a barreira (linha 48 do código) e a thread [RxComputationThreadPool-1] foi encerrada. A thread [main] assume o controlo e exibe duas mensagens;
7.2.2. Exemplo-07: Observável e observador em duas threads diferentes
![]() |
Alteramos o exemplo anterior da seguinte forma:
package dvp.rxjava.observables;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class Exemple07 {
public static void main(String[] args) {
// gatekeeper
CountDownLatch latch = new CountDownLatch(1);
// configuration of a real observable
Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
@Override
public void call(Subscriber<? super Double> subscriber) {
showInfos("Observable.call start");
for (int i = 0; i < 3; i++) {
// waiting
try {
Thread.sleep(500 - i * 100);
} catch (InterruptedException e) {
// error
subscriber.onError(e);
}
// action
double value = new Random().nextInt(100) * 1.2;
showInfos(String.format("Observable.call onNext(%s)", value));
subscriber.onNext(value);
}
// finish
showInfos(String.format("Observable.call onCompleted"));
subscriber.onCompleted();
}
});
// a subscriber
Subscriber<Double> subscriber = new Subscriber<Double>() {
@Override
public void onCompleted() {
showInfos("Subscriber.onCompleted");
// we lower the barrier
latch.countDown();
}
@Override
public void onError(Throwable e) {
showInfos(String.format("Subscriber.onError (%s)", e));
}
@Override
public void onNext(Double aDouble) {
showInfos(String.format("Subscriber.onNext (%s)", aDouble));
}
};
// suite observable configuration
obs1 = obs1.subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation());
// subscription
showInfos("avant souscription");
obs1.subscribe(subscriber);
// waiting in front of the barrier
try {
showInfos("début attente barrière");
latch.await();
showInfos("fin attente barrière");
} catch (InterruptedException e1) {
System.out.println(e1);
}
showInfos("après souscription");
}
private static void showInfos(String message) {
System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
new SimpleDateFormat("ss:SSS").format(new Date()));
}
}
O código é idêntico ao do exemplo anterior, exceto na linha 63:
obs1 = obs1.subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation());
que configura o observável (subscribeOn) e o observador (observeOn) para serem executados numa das threads fornecidas pelo agendador [Schedulers.computation()].
Os resultados obtidos são os seguintes:
Podem ser observados os seguintes pontos:
- o observável é executado na thread [RxComputationThreadPool-4] (linhas 3–4, 6, 8–9);
- o observador é executado na thread [RxComputationThreadPool-3] (linhas 5, 7, 10-11);
- eles executam-se de forma independente. Assim, nas linhas 8–9, o observável emite duas notificações (onNext, onCompleted) antes de o observador recuperar a notificação [onNext] (linha 10);
A biblioteca RxJava trata da transferência de dados (emissões) da thread do observável para a thread do observador. O programador não precisa de se preocupar com isto.
Já vimos como criar observáveis (Observable.from, Observable.create). Agora, vamos analisar os observáveis predefinidos na biblioteca RxJava.
7.3. Observáveis predefinidos
7.3.1. Exemplo-08: o método [Observable.range]
![]() | ![]() |
A partir de agora, utilizaremos classes dedicadas para os processos observados e os seus observadores. A ideia é poder registar os seus nomes, os seus threads de execução e os seus tempos de execução, para que possamos acompanhá-los ao longo do tempo.
A classe [Process] será simplesmente um Observable ao qual podemos atribuir um nome. Ela implementará a seguinte interface [IProcess]:
package dvp.rxjava.observables.utils;
import rx.Observable;
public interface IProcess<T> {
// name of observable
public String getName();
// observable
public Observable<T> getObservable();
}
Esta interface pode ser implementada pela seguinte classe [Process<T>]:
package dvp.rxjava.observables.utils;
import rx.Observable;
import rx.Scheduler;
public class Process<T> implements IProcess<T>{
// observable name
protected String name;
// observed process
protected Observable<T> observable;
// manufacturers
public Process(String name, Observable<T> observable) {
// local initializations
this.name = name;
this.observable = observable;
}
// getters and setters
public String getName() {
return name;
}
public Observable<T> getObservable() {
return observable;
}
}
- linha 9: o nome do processo;
- linha 11: o observável observado;
- linhas 14–18: o construtor;
O observador será descrito pela seguinte classe [Observer]:
package dvp.rxjava.observables.utils;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import rx.Subscriber;
public class Observateur<T> extends Subscriber<T> {
...
}
- Linha 11: A classe `Observateur<T>` estende a classe `Subscriber<T>`, que apresentámos brevemente na Secção 7.1.3. Iremos utilizá-la como argumento para o método [`Observable.subscribe`]:
// exécution observable (observation)
obs1.subscribe(observateur);
O método [Observable.subscribe] utilizado na linha 2 acima tem a seguinte definição:
![]() |
A função do [Subscriber] consiste principalmente em gerir os elementos emitidos pelo observável ao qual se subscreveu, utilizando os métodos da interface [Observer]: onNext, onError, onCompleted. A classe [Subscriber] possui os seguintes métodos:
![]() |
No código da classe [Observer], utilizaremos o método [1] isUnsubscribed para determinar se a subscrição do subscritor foi cancelada ou não. A classe [Observer<T>] completa é a seguinte:
package dvp.rxjava.observables.utils;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import rx.Subscriber;
public class Observateur<T> extends Subscriber<T> {
// a gatekeeper (semaphore)
private CountDownLatch latch;
// a display method
private Consumer<String> showInfos;
// observer's name
private String observerName;
// the name of the observed process
private String processName;
// manufacturers
public Observateur() {
}
public Observateur(String name, CountDownLatch latch, Consumer<String> showInfos, String observedName) {
this.observerName = name;
this.latch = latch;
this.showInfos = showInfos;
this.processName = observedName;
}
// --------------------------- implementation interface Observer<T>
@Override
public void onCompleted() {
// end of issues
if (!isUnsubscribed()) {
showInfos.accept(String.format("Subscriber [%s,%s].onCompleted", observerName, processName));
}
// end of main thread lock
latch.countDown();
}
@Override
public void onError(Throwable e) {
// emission error
if (!isUnsubscribed()) {
showInfos.accept(String.format("Subscriber [%s, %s].onError (%s)", observerName, processName, e));
}
}
@Override
public void onNext(T value) {
// an additional show
if (!isUnsubscribed()) {
try {
showInfos.accept(String.format("Subscriber [%s,%s] : onNext (%s)", observerName, processName,
new ObjectMapper().writeValueAsString(value)));
} catch (JsonProcessingException e) {
showInfos.accept(String.format("Subscriber [%s,%s].onNext (%s)", observerName, processName, e));
}
}
}
}
- Para além das características de um Assinante, o Observador conterá as seguintes informações:
- linha 14: uma barreira ou semáforo que será utilizado para bloquear o segmento principal até que o observador tenha recebido todos os elementos emitidos pelo observável. Isto ocorrerá na linha 36 do código quando o observador receber a notificação de fim de emissão do observável;
- linha 16: uma instância de Consumer<String> que será utilizada para exibir uma mensagem na consola;
- linha 18: o nome do observador, utilizado para distinguir entre observadores quando existem vários;
- linha 20: o nome do processo observado;
- linhas 36, 46, 54: os métodos [onCompleted, onError, onNext] da interface [Observer<T>] implementados pela classe abstrata [Subscriber<T>]. Esta classe não os implementa. Por conseguinte, isto deve ser feito nas suas classes filhas. Antes de fazer qualquer coisa nestes métodos, verificamos se o observador foi cancelado da assinatura do observável que está a observar;
- linha 59: o método [onNext] do observador escreve a cadeia JSON do elemento recebido. Isto permitir-nos-á apresentar vários tipos de elementos;
Dito isto, vamos examinar um novo método da classe Observable, o método [range]:
![]() |
O observável Observable.range(n,m) emite (m) inteiros que variam de n a n+m-1. Vamos explorá-lo com o seguinte código [Exemplo08]:
package dvp.rxjava.observables.exemples;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import dvp.rxjava.observables.utils.Observateur;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple08 {
public static void main(String[] args) throws InterruptedException {
// number of observers
final int nbObservateurs = 2;
// semaphore
CountDownLatch latch = new CountDownLatch(nbObservateurs);
// observable configuration
Observable<Integer> obs1 = Observable.range(15, 3).subscribeOn(Schedulers.computation());
// observable performance (observation)
showInfos.accept("main : début observation");
for (int i = 0; i < nbObservateurs; i++) {
obs1.subscribe(new Observateur<>(String.format("observateur[%d]", i), latch, showInfos,"obs1"));
}
// waiting
showInfos.accept("main : attente fin observation");
latch.await();
// end
showInfos.accept("main : fin observation");
}
// displays
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
- Linha 16: Vamos utilizar dois observadores;
- linha 19: o guardrail (semáforo) é inicializado com o valor dois, porque colocaremos cada observador numa thread diferente. A thread principal terá, portanto, de esperar que ambas as threads dos observadores terminem;
- linha 22: configuramos o observável para que seja executado numa thread do agendador [Schedulers.computation()]. O observador estará na mesma thread que o observável;
- linhas 25–27: subscrevemos dois observadores ao observável. Isto irá desencadear a execução completa do observável para cada observador: os inteiros 15, 16 e 17 serão emitidos;
- linha 30: a thread principal aguarda que os observadores terminem;
Os resultados obtidos são os seguintes:
- linha 2: o thread principal está bloqueado, à espera que os dois observadores terminem;
- linhas 3-4: vemos que o observador 0 está na thread [RxComputationThreadPool-1] e o observador 1 na thread [RxComputationThreadPool-2];
- linhas 3-10: vemos que ambos os observadores recebem exatamente os mesmos elementos;
Vamos utilizar a classe Observer aqui definida para ilustrar o comportamento de outros tipos de observáveis.
7.3.2. Exemplo-09: os métodos Observable.[interval, take, doNext]
![]() |
![]() |
Este exemplo ilustra a utilização do observável Observable.interval(long interval, TimeUnit unit), que emite inteiros longos a intervalos regulares. Nota [1]: por predefinição, o observável [Observable.interval] é executado numa das threads do agendador [Schedulers.computation].
O código será o seguinte:
package dvp.rxjava.observables.exemples;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import dvp.rxjava.observables.utils.Observateur;
import rx.Observable;
public class Exemple09 {
public static void main(String[] args) throws InterruptedException {
// number of observers
final int nbObservateurs = 2;
// semaphore
CountDownLatch latch = new CountDownLatch(nbObservateurs);
// observable configuration
Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.doOnNext(l -> showInfos.accept(l.toString()));
// observable performance (observation)
showInfos.accept("main : début observation");
for (int i = 0; i < nbObservateurs; i++) {
obs1.subscribe(new Observateur<>(String.format("observateur [%d]", i), latch, showInfos,
"obs1"));
}
// waiting
showInfos.accept("main : attente fin observation");
latch.await();
// end
showInfos.accept("main : fin observation");
}
// displays
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
- linha 22: o observável emite inteiros longos a cada 500 milissegundos. A sequência começa com o número 0;
- linha 22: este observável emite um número infinito de valores. O método [Observable.take(n)] cria um novo observável que retém apenas os primeiros n elementos emitidos;
![]() |
Vamos rever o código do observável:
Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.doOnNext(l -> showInfos.accept(l.toString()));
Linha 2: O método [Observable.doOnNext] é executado sempre que o observável emite um novo elemento. É frequentemente utilizado para registar informações. Aqui, pretendemos registar a data de emissão dos elementos para verificar se o intervalo de 500 milissegundos está a ser mantido. O método [Observable.doOnNext] não altera o observável ao qual é aplicado. A sua definição é a seguinte:
![]() |
A execução produz os seguintes resultados:
- linhas 3, 7 e 11: vemos que o intervalo de emissão é de aproximadamente 500 ms;
- os dois observadores estão, de facto, em duas threads diferentes, apesar de o observável não ter sido configurado para ser executado com um agendador específico. Este é o comportamento padrão do observável [Observable.interval] que vemos aqui;
7.3.3. Exemplos-10/12: os métodos Observable.[error, empty, never]
![]() | ![]() |
A partir de agora, seremos mais concisos nas nossas ilustrações dos métodos da classe [Observable]. O código anterior era o seguinte:
package dvp.rxjava.observables;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import rx.Observable;
public class Exemple09 {
public static void main(String[] args) throws InterruptedException {
// number of observers
final int nbObservateurs = 2;
// semaphore
CountDownLatch latch = new CountDownLatch(nbObservateurs);
// observable configuration
Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.doOnNext(l -> showInfos.accept(l.toString()));
// observable performance (observation)
showInfos.accept("main : début observation");
for (int i = 0; i < nbObservateurs; i++) {
obs1.subscribe(new Observateur<>(String.format("observateur [%d]", i), latch, showInfos,
"obs1"));
}
// waiting
showInfos.accept("main : attente fin observation");
latch.await();
// end
showInfos.accept("main : fin observation");
}
// displays
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
Este código já foi utilizado no exemplo anterior. Apenas as linhas 21–22 foram alteradas. Por isso, vamos extrair a maior parte deste código para a seguinte classe [ProcessUtils]:
package dvp.rxjava.observables.utils;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import rx.Observable;
public class ProcessUtils {
@SafeVarargs
public static void subscribe(int nbObservateurs, IProcess<?>... processes) throws InterruptedException {
// semaphore
CountDownLatch latch = new CountDownLatch(nbObservateurs * processes.length);
// observable performance (observation)
showInfos.accept("main : début observation");
for (int i = 0; i < nbObservateurs; i++) {
for (IProcess<?> process : processes) {
Observable<?> obs = process.getObservable();
obs.subscribe(new Observateur<>(String.format("observateur[%d]", i), latch, showInfos, process.getName()));
}
}
// waiting
showInfos.accept("main : attente fin observation");
latch.await();
// end
showInfos.accept("main : fin observation");
}
// displays
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
- linha 13: o método recebe dois parâmetros:
- nbObservers: o número de observadores para os processos, passado como segundo parâmetro;
- processos: os processos (denominados observáveis) a serem observados. Graças à notação [IProcess<?>], os processos podem emitir elementos de diferentes tipos;
- linha 16: o semáforo deve ficar verde quando todos os observadores tiverem concluído todas as suas observações. O valor inicial do semáforo é, portanto, o número de observadores multiplicado pelo número de observações;
- Linhas 20–25: Cada observador está inscrito em todos os processos que precisa de observar;
- linha 23: recuperar o observável do processo (ver Secção 7.3.1);
- linha 23: um observador está inscrito nele. São passadas quatro informações ao observador:
- o seu nome;
- o semáforo que deve decrementar quando receber a notificação de fim de transmissão do observável que está a observar;
- o método a utilizar quando quiser registar informações na consola;
- o nome do processo que irá observar;
Com estas classes definidas, o Exemplo 10 será o seguinte:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple10 {
public static void main(String[] args) throws InterruptedException {
// observable configuration
Observable<?> obs = Observable.error(new RuntimeException("Erreur !!!")).subscribeOn(Schedulers.computation());
// observable execution (observation)
ProcessUtils.subscribe(2,new Process<>("process1", obs));
}
}
Na linha 11, o método estático [Observable.error] é definido da seguinte forma:
![]() |
A linha 8 configura, portanto, um observável que simplesmente lança uma exceção para o método [onError] dos seus assinantes. A execução produz os seguintes resultados:
main : début observation ------Thread[main] ---- Time[22:618]
main : attente fin observation ------Thread[main] ---- Time[22:636]
Subscriber[observateur[1], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-2] ---- Time[22:638]
Subscriber[observateur[0], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-1] ---- Time[22:638]
Linhas 3 e 4: o método [onError] de ambos os assinantes recebeu a exceção lançada pelo observável.
Esta execução tem uma peculiaridade: os métodos [onCompleted] de ambos os observadores não foram chamados. Como resultado, a barreira não foi baixada e o thread principal permanece bloqueado no método estático [ProcessUtils.subscribe] na linha 3 a seguir:
// attente
showInfos.accept("main : attente fin observation");
latch.await();
// fin
showInfos.accept("main : fin observation");
Aqui vemos que, se ocorrer um erro no observável, o método [onCompleted] dos assinantes não é chamado. Por isso, modificamos o método [Observer.onError] da seguinte forma:
@Override
public void onError(Throwable e) {
// erreur d'émission
if (!isUnsubscribed()) {
showInfos.accept(String.format("Subscriber[%s, %s].onError (%s)", observerName, processName, e));
}
// fin blocage thread principal
latch.countDown();
}
Adicionamos as linhas 7–8 para libertar o bloqueio em caso de um erro observável. Com este novo código, a execução produz os seguintes resultados:
main : début observation ------Thread[main] ---- Time[40:750]
main : attente fin observation ------Thread[main] ---- Time[40:764]
Subscriber[observateur[0], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-1] ---- Time[40:766]
Subscriber[observateur[1], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-2] ---- Time[40:766]
main : fin observation ------Thread[main] ---- Time[40:767]
Obtemos a linha 5, que não tínhamos antes.
O Exemplo 11 será o seguinte:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple11 {
public static void main(String[] args) throws InterruptedException {
// observable configuration
Observable<?> obs1 = Observable.empty();
// observable execution (observation)
ProcessUtils.subscribe(2,new Process<>("process1",obs1));
}
}
Linha 10: O método estático [Observable.empty] cria um observável que não emite elementos. Emite apenas a notificação de fim de emissão;
![]() |
A execução do código do exemplo acima produz os seguintes resultados:
- Linhas 2 e 3: Vemos que ambos os observadores recebem a notificação de fim de transmissão sem terem recebido quaisquer elementos anteriormente.
Pode-se questionar para que serve, na verdade, este método. Pode ser utilizado de forma análoga a uma coleção, inicialmente vazia, à qual são posteriormente adicionados elementos:
Na linha 3, fundimos o observável inicial obs (linha 1) com outros observáveis.
O Exemplo 12 ilustra o método estático [Observable.never]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple12 {
public static void main(String[] args) throws InterruptedException {
// observable configuration
Observable<?> obs1 = Observable.never();
// observable execution (observation)
ProcessUtils.subscribe(2,new Process<>("process1",obs1));
}
}
O método estático [Observable.never] cria um observável que nunca emite:
![]() |
A execução do exemplo produz os seguintes resultados:
Linha 2: o thread principal aguarda indefinidamente. Isto acontece porque nenhum observável emite a notificação [onCompleted], o que permite que o semáforo (barreira) fique verde (baixar a barreira).
7.4. Multithreading
7.4.1. Exemplo 13: thread de ação, thread de observador
Na Secção 7.1.3, criámos um observável utilizando o método estático [Observable.create]:
![]() |
- o método [create] devolve um tipo Observable<T>;
- o parâmetro do método [create] é uma função do tipo [Observable.OnSubscribe<T>] definida da seguinte forma:
![]() |
O tipo [Observable.OnSubscribe<T>] é uma interface funcional que, por sua vez, estende a interface funcional [Action1<Subscriber<? super T>>]. O método [call] desta interface espera um tipo [Subscriber] (assinante, observador). No restante deste documento, por vezes referir-nos-emos ao tipo [Observable.OnSubscribe<T>] como uma ação. Iremos criar ações personalizadas que terão um nome. Estas serão instâncias da seguinte interface [IProcessAction]:
![]() |
package dvp.rxjava.observables.utils;
import rx.Observable;
public interface IProcessAction<T> extends Observable.OnSubscribe<T> {
// action has a name
public String getName();
}
- linha 5: a interface [IProcessAction<T>] tem todas as características da interface [Observable.OnSubscribe<T>];
- linha 8: possui também um método [getName] que devolve o nome da instância que implementa a interface;
Vamos utilizar a seguinte ação denominada [ProcessAction01]:
package dvp.rxjava.observables.utils;
import java.util.Random;
import rx.Subscriber;
import rx.functions.Func1;
public class ProcessAction01<T> implements IProcessAction<T> {
// data
private String name;
private int nbValues;
private Func1<Integer, T> func1;
// manufacturers
public ProcessAction01(String name, int nbValues, Func1<Integer, T> func1) {
this.name = name;
this.nbValues = nbValues;
this.func1 = func1;
}
@Override
public void call(Subscriber<? super T> subscriber) {
ProcessUtils.showInfos.accept(String.format("Observable (%s) call start", getName()));
for (int i = 0; i < nbValues; i++) {
// waiting
try {
Thread.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
// error
ProcessUtils.showInfos.accept(String.format("Observable (%s) onError", getName()));
subscriber.onError(e);
}
// element emission
T value = func1.call(i);
ProcessUtils.showInfos.accept(String.format("Observable (%s,%s) onNext (%s)", getName(), i, value));
subscriber.onNext(value);
}
// finish
ProcessUtils.showInfos.accept(String.format("Observable (%s) onCompleted", getName()));
subscriber.onCompleted();
}
@Override
public String getName() {
return name;
}
}
- linha 8: a classe [ProcessAction01<T>] implementa a interface [IProcessAction<T>] e, por conseguinte, a interface [Observable.OnSubscribe<T>];
- linha 11: o nome da ação;
- linha 12: o número de valores a emitir;
- linha 13: uma instância do tipo [Func1<Integer, T>] que recebe um inteiro e produz um tipo T a ser emitido pelo observável (linhas 35 e 37);
- linhas 16–20: passamos o nome da ação, o número de valores a emitir e a função de emissão para o construtor;
- linhas 23–42: o código do processo;
- linha 23: o método [call] recebe como parâmetro o subscritor do observável associado ao processo;
- linha 28: o processo emite os seus elementos após uma espera de duração aleatória;
- linha 32: emissão de um erro;
- linha 37: uma emissão normal;
- linha 41: emite a notificação de fim de emissão;
- linhas 25–38: a ação emite nbValues números reais após um tempo de espera aleatório (linha 30);
- linha 35: o valor a ser emitido é fornecido pela função [func1] passada como parâmetro ao construtor (linha 16);
Reestruturamos a classe [Process] (ver Secção 7.3.1) para que também possa ser construída com uma ação nomeada. Adicionamos o seguinte construtor:
public Process(IProcessAction<T> na, Scheduler schedulerObserved, Scheduler schedulerObserver) {
// nom process=nom action
name = na.getName();
// action --> observable
observable = Observable.create(na);
// thread d'exécution du processus observé
if (schedulerObserved != null) {
observable = observable.subscribeOn(schedulerObserved);
}
// thread d'observation de l'observateur
if (schedulerObserver != null) {
observable = observable.observeOn(schedulerObserver);
}
}
- Linha 1: O construtor recebe 3 parâmetros:
- a ação nomeada que será utilizada para construir o observável (linha 5);
- o agendador do processo observado (pode ser nulo);
- o agendador do observador (pode ser nulo);
- linha 5: o observável é criado a partir da ação passada como parâmetro;
O código seguinte [Exemplo 13] observa diferentes observáveis:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple13 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 1, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<String> process2 = new Process<>(
new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), Schedulers.computation(), null);
// process 3
Process<Integer> process3 = new Process<>(new ProcessAction01<Integer>("process3", 3, i -> i * 2), null,
Schedulers.computation());
// process 4
Process<Boolean> process4 = new Process<>(new ProcessAction01<Boolean>("process4", 4, i -> i % 2 == 0), null, null);
// subscriptions
ProcessUtils.subscribe(1, process1);
ProcessUtils.subscribe(1, process2);
ProcessUtils.subscribe(1, process3);
ProcessUtils.subscribe(1, process4);
}
}
- linhas 13–15: o process1 produz 1 número real numa thread de computação que será observado noutra thread de computação;
- linhas 17–18: o processo2 produz 2 cadeias de caracteres numa thread de computação, e não é dada qualquer indicação relativamente à thread do observador. Os resultados mostram que a observação ocorre, por predefinição, na mesma thread que a execução do processo;
- linhas 20–21: o processo3 produz 3 inteiros numa thread não especificada, que serão observados numa thread de computação. Os resultados mostram que o processo é executado por predefinição na thread principal;
- linha 23: o processo process4 produz 4 valores booleanos numa thread não especificada, que serão observados numa thread não especificada. Os resultados mostram que a execução do processo e a sua observação ocorrem, por predefinição, na thread principal;
O resultado da execução deste código é o seguinte:
- O processo process1 gera 1 número real (linha 4) na thread de computação [RxComputationThreadPool-4], que é observado na thread de computação [RxComputationThreadPool-3] (linha 6);
- O processo process2 produz 2 cadeias de caracteres (linhas 12, 14) na thread de computação [RxComputationThreadPool-5], que são observadas nessa mesma thread (linhas 13, 15);
- O processo process3 produz 3 inteiros (linhas 21, 23, 25) na thread principal, que são observados na thread de computação [RxComputationThreadPool-6] (linhas 22, 24, 28);
- o processo process4 produz 4 valores booleanos (linhas 34, 36, 38, 40) na thread principal, que são observados nessa mesma thread principal (linhas 33, 35, 37, 39);
Convidamos o leitor a acompanhar o exposto acima:
- o ciclo de vida do processo observado e da sua thread;
- o ciclo de vida do seu observador e da sua thread;
Grande parte do apelo das bibliotecas Rx reside neste multithreading, que o programador não precisa de gerir por conta própria.
7.5. Combinações de múltiplos observáveis
7.5.1. Exemplo-14: Combinação de dois observáveis com [Observable.merge]
Apresentamos agora métodos estáticos da classe [Observable] que permitem combinar múltiplos observáveis num único observável de resultado.
O primeiro exemplo deste tipo é o seguinte:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.ProcessAction01;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple14 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process2
Process<String> process2 = new Process<>(
new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), Schedulers.computation(), null);
// merge
Process<?> process12 = new Process<>("process12",
Observable.merge(process1.getObservable(), process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process12);
}
}
- linhas 15–17: um processo denominado [process1] emitirá 3 números reais numa thread de computação. Será também observado numa thread de computação;
- linhas 19–20: um processo denominado [process2] emitirá 2 cadeias de caracteres numa thread de computação. A thread de observação não está especificada. Vimos anteriormente que, neste caso, a thread de observação é a thread de computação;
- linha 23: os dois processos são fundidos, ou seja, é criado um observável cujos elementos provêm simultaneamente de ambos os processos. O método estático [Observable.merge] é utilizado para este efeito:
![]() |
Ao contrário do que o diagrama acima possa sugerir, durante a fusão, os elementos do fluxo 1 podem ser intercalados entre os elementos do fluxo 2. Isto é demonstrado pelos resultados da execução:
- linha 3: o processo [process1] é executado na thread de computação [RxComputationThreadPool-4];
- linha 4: o processo [process2] está a ser executado na thread de computação [RxComputationThreadPool-5];
- linha 9: o processo [process12] é observado na thread de computação [RxComputationThreadPool-3]. Não conheço a regra que levou a esta escolha;
- linhas 9–11: vemos que o observador observa elementos de ambos os processos [process1] (linha 5) e [process2] (linhas 6, 7), mesmo que nenhum deles tenha terminado (há mistura);
- o processo [process12] termina (linha 17) quando ambos os processos, process1 e process2, terminam;
7.5.2. Exemplo-15: Concatenar dois observáveis com [Observable.concat]
Vamos agora examinar o seguinte código:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.ProcessAction01;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple15 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process2
Process<String> process2 = new Process<>(
new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), null, Schedulers.computation());
// concat
Process<?> process12 = new Process<>("process12",
Observable.concat(process1.getObservable(), process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process12);
}
}
- linhas 15–17: um processo denominado [process1] emitirá 3 números reais numa thread de computação. Será também observado numa thread de computação;
- linhas 19–20: um processo denominado [process2] emitirá 2 cadeias de caracteres numa thread não especificada, neste caso a thread principal por predefinição. Será observado numa thread de computação;
- linha 23: os dois processos são concatenados, ou seja, é criado um observável cujos elementos provêm de ambos os processos. Os valores emitidos não são misturados. O processo [process12] emitirá primeiro todos os valores do processo [process1] e, em seguida, os do processo [process2]. O método estático [Observable.concat] é utilizado para este efeito:
![]() |
Os resultados da execução são os seguintes:
- linhas 3-10: o processo [process1] é executado e o processo [process12] emite os valores emitidos por [process1];
- linha 9: o processo [process1] terminou;
- linhas 11-17: o processo [process2] é executado e o processo [process12] emite os valores emitidos pelo [process2];
Há uma peculiaridade em relação ao processo2: não especificámos um segmento de execução. Seria, portanto, de esperar que o segmento principal fosse utilizado por predefinição. No entanto, não é esse o caso. O segmento de execução foi o segmento de computação [RxComputationThreadPool-3] (linha 11). Por conseguinte, quando não é especificado nenhum segmento de execução ou de observação, não podemos fazer quaisquer suposições sobre qual o segmento que será escolhido.
7.5.3. Exemplo-16: Combinar dois observáveis com [Observable.zip]
Vamos agora examinar o código seguinte:
package dvp.rxjava.observables.exemples;
import java.util.Arrays;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.functions.FuncN;
import rx.schedulers.Schedulers;
public class Exemple16 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<String> process2 = new Process<>(
new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), null, null);
// 2-process combination function
FuncN<String> funcn = new FuncN<String>() {
@Override
public String call(Object... args) {
if (args.length == 2) {
return String.format("double=%s, string=%s", args[0], args[1]);
} else {
throw new RuntimeException("la fonction attend 2 paramètres exactement");
}
}
};
// zip of the 2 processes
Process<String> process12 = new Process<>("process12",
Observable.zip(Arrays.asList(process1.getObservable(), process2.getObservable()), funcn));
// subscriptions
ProcessUtils.subscribe(1, process12);
}
}
- linhas 16–18: um processo denominado [process1] emitirá 3 números reais numa thread de computação. Será também observado numa thread de computação;
- linhas 20–21: um processo denominado [process2] emitirá 2 cadeias de caracteres numa thread não especificada. A thread de observação também não é especificada;
- linhas 23–32: instância de um tipo [FuncN<String>] com uma classe anónima. FuncN é uma interface funcional:
![]() |
O método [FuncN.call] espera um conjunto de objetos e devolve um tipo R. A função [funcn] será utilizada para combinar os processos process1 e process2 nessa ordem. No método [FuncN.call]:
- args[0] será um Double;
- args[1] será um String;
Aqui, o resultado de [funcn.call] será a string da linha 27. A construção deste resultado não requer o conhecimento dos tipos dos argumentos do método de chamada.
Os dois processos são combinados da seguinte forma:
// zip des 2 processus
Process<String> process12 = new Process<>("process12",
Observable.zip(Arrays.asList(process1.getObservable(), process2.getObservable()), funcn));
O método [Observable.zip] funciona da seguinte forma:
![]() |
Vemos que:
- o primeiro argumento de zip é um Iterable<Observable>. No nosso exemplo, temos um parâmetro real do tipo List<Observable> composto pelos nossos dois observáveis;
- o segundo argumento de zip é do tipo FuncN. No nosso exemplo, o parâmetro real é [funcn];
A execução produz os seguintes resultados:
- linhas 7, 11: o process12 emite dois elementos;
- linha 8: o elemento adicional emitido pelo processo1, que não tem correspondência no processo2, não é emitido pelo processo resultante processo12;
Vemos que o process2, ao qual não tinha sido atribuída nem uma thread de execução nem uma thread de observação, utilizou a thread principal para ambas.
7.5.4. Exemplo-17: Combinar dois observáveis com [Observable.combineLatest]
Vamos agora examinar o seguinte código:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple17 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Double> process2 = new Process<>(
new ProcessAction01<Double>("process2", 2, i -> new Random().nextInt(200) * 1.4), null,
Schedulers.computation());
// combining the 2 processes
Process<Double> process12 = new Process<>("process12",
Observable.combineLatest(process1.getObservable(), process2.getObservable(), (d1, d2) -> d1 + d2));
// subscriptions
ProcessUtils.subscribe(1, process12);
}
}
- linhas 14–16: um processo denominado [process1] emitirá 3 números reais numa thread de computação. Também será observado numa thread de computação;
- linhas 18–20: um processo denominado [process2] emitirá 2 números reais numa thread não vinculada. Estes serão observados numa thread de computação;
- linha 23: os dois observáveis são combinados utilizando o seguinte método estático [Observable.combineLatest]:
![]() |
O observável [combineLatest] funciona da seguinte forma: quando um dos dois observáveis emite um elemento E1, esse elemento é combinado pela função [combineFunction] com o último elemento emitido pelo outro observável.
A execução deste código produz o seguinte resultado:
- Linha 5: A saída do process2 (56) é combinada com o último elemento produzido pelo process1 (54, linha 4) e produz o resultado mostrado na linha 7;
- linha 6: a saída do process1 (51,6) é combinada com o último elemento produzido pelo process2 (56, linha 5) e produz o resultado da linha 8;
- linha 9: a saída do processo2 (261,8) é combinada com o último elemento emitido pelo processo1 (51,6, linha 6) e produz o resultado da linha 12;
- linha 13: a emissão do processo1 (80,39) é combinada com o último elemento emitido pelo processo2 (261,8, linha 9) e produz o resultado da linha 15;
Esta é uma variante do observável [zip] em que, desta vez, os elementos combinados não são necessariamente os elementos na mesma posição nos fluxos. Note-se aqui que o processo2, ao qual não tinha sido atribuída nenhuma thread de execução, foi executado na thread principal (linha 2).
7.5.5. Exemplo-18: Combinação de dois observáveis com [Observable.amb]
Vamos agora examinar o código seguinte:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple18 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Double> process2 = new Process<>(
new ProcessAction01<Double>("process2", 2, i -> new Random().nextInt(200) * 1.4), null, null);
// combining the 2 processes
Process<Double> process12 = new Process<>("process12",
Observable.amb(process1.getObservable(), process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process12);
}
}
- linhas 14–16: um processo denominado [process1] emitirá 3 números reais numa thread de computação. Também será observado numa thread de computação;
- linhas 18–20: um processo denominado [process2] emitirá 2 números reais numa thread não vinculada. Estes serão observados numa thread não vinculada;
- linha 22: os dois observáveis são combinados utilizando o seguinte método estático [Observable.amb]:
![]() |
Conforme ilustrado no diagrama acima, o observável [Observable.amb(Observable o1, Observable o2)] emite os elementos do observável que emite primeiro. Isto é confirmado pelos resultados do exemplo apresentado:
- linha 4: o processo2 é o primeiro a emitir;
- Linhas 8, 12: o process12 emite todos os elementos emitidos pelo process2 (linhas 4, 11);
7.6. Cadeia de processamento para um observável
7.6.1. Exemplo-19: transformação de um observável com [Observable.map]
Nos exemplos anteriores, examinámos várias combinações de dois observáveis num terceiro observável. Apresentamos agora métodos estáticos da classe [Observable] que permitem operações de transformação, filtragem e agregação num observável. Aqui encontraremos métodos análogos aos da classe [Stream] estudada na Secção 5.
O nosso primeiro exemplo será o seguinte:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple19 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<String> process2 = new Process<>("process2",
process1.getObservable().map(d -> String.format("valeur-%s", d)));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- linhas 14–16: um processo denominado process1 emitirá 3 números reais numa thread de computação. Também será observado numa thread de computação;
- linhas 17–18: os números emitidos pelo process1 serão convertidos em cadeias de caracteres num process2;
- linha 20: observamos o process2;
O método [Observable.map] na linha 18 é análogo ao método [Stream.map] discutido na Secção 5.5:
![]() |
Os resultados do exemplo são os seguintes:
- linhas 4, 5 e 8: as emissões do process1. Estes são números reais;
- linhas 6, 7, 10: as emissões observadas do process2. Estas são cadeias de caracteres;
7.6.2. Exemplo-20: filtrar um observável com [Observable.filter]
O exemplo será o seguinte:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple20 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>("process2", process1.getObservable().filter(i -> i % 2 == 0));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- linhas 11-12: um processo chamado process1 emitirá números inteiros de 0 a 2 numa thread de trabalho. Também será observado numa thread de trabalho;
- linha 14: os números emitidos pelo process1 serão filtrados de modo a que apenas os números pares sejam retidos no process2;
- linha 20: observamos o process2;
O método [Observable.filter] na linha 18 é análogo ao método [Stream.filter] discutido na Secção 5.4:
![]() |
Os resultados do exemplo são os seguintes:
- linhas 4, 5 e 7: emissões do process1;
- linhas 6, 9: as emissões observadas do process2. Estes são os elementos pares do process1;
7.6.3. Exemplo 21: transformando um observável com [Observable.flatMap]
O exemplo será o seguinte:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple21 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMap(i -> {
int value = i * 10;
return Observable.just(value, value + 1, value + 2);
}));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- linhas 12-13: um processo chamado process1 emitirá números inteiros de 0 a 2 numa thread de computação. Será também observado numa thread de computação;
- linhas 15–18: cada número n emitido por process1 é transformado num observável que emite os 3 números (10*n, 10*n+1, 10*n+2). Se tivéssemos usado o método [map] na linha 15, o process2 emitiria um tipo Observable<Integer> em vez de um tipo Integer. O método [flatMap] utilizado permite-nos achatar esta sequência de elementos do tipo Observable<Integer> numa sequência de elementos do tipo Integer composta por cada elemento de cada Observable<Integer>;
- linha 20: observamos o process2;
O método [Observable.flatMap] na linha 15 é análogo ao método [Stream.flatMap] discutido na Secção 5.6.12:
![]() |
Os resultados do exemplo são os seguintes:
- linhas 5-7: as três emissões do process2 a seguir à emissão na linha 4 do process1;
- linhas 9-11: as três emissões do process2 a seguir à emissão na linha 8 do process1;
- linhas 14-16: as três emissões do processo2 a seguir à emissão na linha 12 do processo1;
O código seguinte mostra como criar um tipo Observable<Integer[]> a partir do processo1 [Exemplo 21b]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21b {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer[]> process2 = new Process<>("process2", process1.getObservable().map(i -> {
int value = i * 10;
return new Integer[] { value, value + 1, value + 2 };
}));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- linha 14: é utilizado o método [Observable.map];
- linha 16: que retorna um tipo Integer[];
Os resultados são os seguintes:
- linhas 6, 7, 10: vemos os resultados do map;
Todas estas transformações observáveis podem ser encadeadas, uma vez que cada transformação produz uma nova observável. Isto é demonstrado no exemplo seguinte [Exemplo21c]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple21c {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMap(i -> {
int value = i * 10;
return Observable.just(value, value + 1, value + 2);
}).filter(i -> i % 2 == 0));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- linhas 15–18: o flatMap é seguido por um filter;
Os resultados da execução são os seguintes:
- linhas 8-13: process2 emitiu apenas os elementos pares de flatMap;
Um método semelhante ao [flatMap] é o método [flatMapIterable], ilustrado pelo seguinte exemplo [Exemplo21d]:
package dvp.rxjava.observables.exemples;
import java.util.Arrays;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21d {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMapIterable(i -> {
int value = i * 10;
return Arrays.asList(value, value + 1, value + 2);
}).filter(i -> i % 2 == 0));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
Linha 16: Em vez de usar o método [flatMap], usamos o método [flatMapIterable]. Neste caso, a função de transformação deve produzir um tipo Iterable<T> (linha 18) em vez de um tipo Observable<T>.
Obtemos os mesmos resultados que antes.
Voltemos à definição do método [flatMap]:
![]() |
Como mostrado acima, um elemento azul [3] foi inserido entre os dois elementos verdes [1-2]. Isto significa que, ao achatar Observable<T>s, o método [flatMap] preserva a ordem de emissão destes vários observáveis internos. Isto é demonstrado pelo seguinte exemplo [Exemplo21e]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21e {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
Schedulers.computation(), Schedulers.computation());
// process 3
Process<Integer> process3 = new Process<>("process3",
process1.getObservable().flatMap(i -> process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process3);
}
}
- linhas 11-12: o processo process1 emite os inteiros [0,1];
- linhas 14-15: o processo2 emite os números inteiros [10,11,12];
- linhas 17-18: cada elemento emitido pelo processo1 está associado ao observável do processo2. Isto significa que:
- o elemento [0] do processo1 será associado a uma observável que emite [10,11,12];
- o mesmo se aplica ao elemento 1;
No final, os 6 números [10, 11, 12, 10, 11, 12] serão emitidos. Queremos ver em que ordem.
Os resultados da execução são os seguintes:
Podemos ver que a ordem de emissão do process3 foi: [10, 10, 11, 12, 11, 12] (linhas 11, 12, 14, 17, 19, 22). Portanto, os elementos emitidos pelo process2 ficaram, de facto, misturados. Podemos evitar isto utilizando o método [concatMap] em vez do método [flatMap]. Isto é demonstrado pelo código seguinte [Exemplo21ef]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21ef {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
Schedulers.computation(), Schedulers.computation());
// process 3
Process<Integer> process3 = new Process<>("process3",
process1.getObservable().concatMap(i -> process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process3);
}
}
Na linha 18, substituímos [flatMap] por [concatMap]. Os resultados da execução são os seguintes:
Vemos que a ordem de emissão do processo3 foi: [10, 11, 12, 10, 11, 12] (linhas 12–14, 17, 19, 22). Os elementos emitidos pelo processo2 não foram baralhados.
Outra variante do método [map] é o método [switchMap]:
![]() |
Acima, a partir do observável [1], são criados três outros observáveis [2] com dois elementos, que são então achatados como em [flatMap] [3]. Note-se que o resultado tem 5 elementos, e não 6. Isto deve-se ao facto de, antes de o segundo observável emitir o seu segundo elemento [6], o terceiro observável emitir o seu primeiro elemento [5], fazendo com que o segundo observável seja descartado. Portanto, o elemento [6] não é encontrado no observável resultante [3].
Para ilustrar [switchMap], utilizaremos o seguinte exemplo [Exemplo21eg]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21eg {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
Schedulers.computation(), Schedulers.computation());
// process 3
Process<Integer> process3 = new Process<>("process3",
process1.getObservable().switchMap(i -> process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process3);
}
}
A execução do exemplo produz os seguintes resultados:
- process1 emite 2 elementos que dão origem a 2 observáveis process2 de 3 elementos;
- linha 14: o observador recebe o elemento n.º 0 emitido pelo primeiro observável process2 na linha 6;
- linha 15: o observador recebe o elemento #0 emitido pela segunda observável do processo2 na linha 13. A história não explica por que razão não recebeu anteriormente os elementos 1 e 2 emitidos pela primeira observável do processo2 nas linhas 7 e 8. De qualquer forma, a primeira observável do processo2 é abandonada;
- no final, o observador vê apenas 4 elementos (linhas 14, 15, 17, 20) em vez dos 6 que foram emitidos;
7.6.4. Exemplos-22: Outros métodos da classe [Observable]
A classe [Observable] inclui muitos métodos da classe [Stream] que funcionam de forma semelhante. Aqui estão alguns deles. Apresentaremos simplesmente o código e os seus resultados.
[Exemplo 22a - take=limit]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22a {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.range(1, 10).take(3));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
resultados
[Exemplo 22b - takeLast]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22b {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.range(1, 10).takeLast(2));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
resultados
[Exemplo 22c - ignorar]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22c {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.range(1, 10).skip(5).take(2));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
resultados
[Exemplo 22d - reduzir]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22d {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.range(1, 10).reduce(0, (i, a) -> i + a));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
- linha 10: calcula a soma dos elementos no observável. O resultado é um observável que emite essa soma;
resultados
[Exemplo 22e - todos]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22e {
public static void main(String[] args) throws InterruptedException {
// process
Process<Boolean> process = new Process<>("process", Observable.range(1, 10).all(i -> i > 10));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
- linha 10: retorna um Observable<Boolean> que emite o elemento true se o predicado do método [all] for verdadeiro para todos os elementos; caso contrário, retorna false;
resultados
[Exemplo 22f - contagem]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22f {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.range(1, 10).count());
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
- linha 10: [Observable.count] cria um observável de 1 elemento que é a soma dos elementos observados;
resultados
[Exemplo 22g - distinto]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22g {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.just(1, 2, 1, 3).distinct());
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
resultados
[ Exemplo 22h - groupBy, asObservable]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.observables.GroupedObservable;
public class Exemple22h {
public static void main(String[] args) throws InterruptedException {
// process
Observable<GroupedObservable<Boolean, Integer>> obs = Observable.range(1, 10).groupBy(i -> i % 2 == 0);
Process<Integer> process = new Process<>("process", obs.concatMap(g -> g.asObservable()));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
- Linha 11: O método [groupBy] agrupa os 10 elementos emitidos em dois grupos: números pares e números ímpares. O resultado é um Observable<GroupedObservable<Boolean, Integer>>, ou seja, um observável cujos elementos são do tipo GroupedObservable<Boolean, Integer>, onde Boolean é o tipo da chave do grupo (false, true neste caso) e é também o tipo do resultado da lambda passada como parâmetro ao método [groupBy], e Integer é o tipo dos elementos do grupo;
- linha 12: o tipo GroupedObservable possui um método [asObservable] que nos permite criar um observável a partir deste tipo. Teremos, portanto, dois tipos Observable<Integer>, um para números pares e outro para números ímpares. A partir destes dois observáveis, o método [concatMap] criará um único observável;
resultados
[Exemplo 22i - carimbo de data/hora]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
import rx.schedulers.Timestamped;
public class Exemple22i {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Timestamped<Integer>> process2 = new Process<>("process2", process1.getObservable().timestamp());
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- na linha 15, o método [timestamp] associa um carimbo de data/hora a cada elemento processado do observável;
resultados
Neste exemplo, é difícil perceber o que as informações de carimbo de data/hora representam:
- linhas 4-5: vemos que o elemento 1 do processo1 foi emitido 139 ms após o elemento 0;
- linhas 6 e 7: vemos que o elemento 1 do processo2 foi observado 234 ms após o elemento 0;
- linhas 5 e 8: vemos que o elemento 2 do processo1 foi emitido 33 ms após o elemento 1;
- linhas 7 e 10: vemos que o elemento 2 do processo2 foi observado 37 ms após o elemento 1;
Estes atrasos devem-se ao facto de as threads para observar e executar os observáveis não serem as mesmas. Se substituirmos as linhas 12–13 pelas seguintes linhas (Exemplo 22j):
// processus 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
null);
- Linhas 2–3: Não especificamos o segmento de observação. Sabemos que, neste caso, o observável é observado no local onde é executado;
Isto produz os seguintes resultados:
- linhas 4 e 6: o process1 emite o seu elemento n.º 1 587 ms após o seu elemento n.º 0;
- linhas 5 e 7: o observador observa estes dois elementos com um intervalo de 586 ms;
- linhas 6 e 8: o processo1 emite o seu elemento #2 396 ms após o seu elemento #1;
- linhas 7 e 9: o observador observa estes dois elementos com uma diferença de tempo de 396 ms;
Aqui, os valores dos carimbos de data/hora são consistentes: representam com precisão o tempo de transmissão do elemento.
7.7. Agendadores
7.7.1. Exemplo 23: o agendador [Schedulers.computation]
Vamos agora examinar os agendadores de execução. A observação será feita na thread de execução.
O tema dos agendadores é um pouco obscuro. Os vários agendadores são apresentados nesta pergunta no site StackOverflow [http://stackoverflow.com/questions/31276164/rxjava-schedulers-use-cases]:
![]() |
Tentaremos ilustrar a utilização destes diferentes agendadores com exemplos. O primeiro ilustra o agendador [Schedulers.computation]:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple23 {
public static void main(String[] args) throws InterruptedException {
// processes
@SuppressWarnings("unchecked")
Process<Double> processes[] = new Process[10];
for (int i = 0; i < processes.length; i++) {
processes[i] = new Process<>(
new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
Schedulers.computation(), null);
}
// subscriptions
ProcessUtils.subscribe(1, processes);
}
}
- linhas 14–19: criamos uma matriz de 10 processos em execução numa thread de computação;
- linha 17: cada processo gera um número real aleatório;
- linha 21: subscrevemos todos estes processos;
Os resultados são os seguintes:
- linhas 2-10: os primeiros 8 processos iniciam-se em 8 threads diferentes (a máquina utilizada tem 8 núcleos). Note-se que todos eles iniciam-se aproximadamente ao mesmo tempo;
- linhas 17-19: 3 processos terminam, libertando assim 3 threads;
- linhas 23-24: os dois últimos processos podem então iniciar, utilizando 2 das threads assim libertadas;
Podemos, portanto, concluir que o agendador [Schedulers.computation] fornece um conjunto de n threads, onde n é o número de núcleos na máquina. As threads são executadas em paralelo nestes núcleos.
7.7.2. Exemplo-24: o agendador [Schedulers.io]
Executamos o código anterior com o agendador [Schedulers.io]:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple24 {
public static void main(String[] args) throws InterruptedException {
// processes
@SuppressWarnings("unchecked")
Process<Double> processes[] = new Process[10];
for (int i = 0; i < processes.length; i++) {
processes[i] = new Process<>(
new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
Schedulers.io(), null);
}
// subscriptions
ProcessUtils.subscribe(1, processes);
}
}
- linha 18: os processos são executados utilizando os threads do agendador [Schedulers.io];
Isto produz os seguintes resultados:
- linhas 2-10: cada um dos 10 processos inicia numa thread diferente. Ao contrário do caso anterior, todos os processos conseguiram ser iniciados. Note-se que estes inícios demoram 6 ms, enquanto anteriormente demoravam 1 ms;
- linhas 13-18: os observáveis são emitidos um após o outro e não de forma quase paralela, como era o caso anteriormente;
Qual é a diferença entre os agendadores [Schedulers.io] e [Schedulers.computation]? Pode encontrar uma resposta na URL [http://stackoverflow.com/questions/31276164/rxjava-schedulers-use-cases]:
![]() |
7.7.3. Exemplo-25: o agendador [Schedulers.newThread]
Executamos o código anterior utilizando o agendador [Schedulers.newThread]:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple25 {
public static void main(String[] args) throws InterruptedException {
// processes
@SuppressWarnings("unchecked")
Process<Double> processes[] = new Process[10];
for (int i = 0; i < processes.length; i++) {
processes[i] = new Process<>(
new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
Schedulers.newThread(), null);
}
// subscriptions
ProcessUtils.subscribe(1, processes);
}
}
Os resultados obtidos são os mesmos que com o agendador [Schedulers.io]:
Na URL [http://stackoverflow.com/questions/33415881/retrofit-with-rxjava-schedulers-newthread-vs-schedulers-io], explica-se que o agendador [Schedulers.io] fornece um conjunto de threads, o que o agendador [Schedulers.newThread] não faz. Um conjunto de threads cria automaticamente um conjunto de threads. Este aloca-as aos processos que delas necessitam. Quando estes processos terminam, as suas threads não são eliminadas, mas regressam ao conjunto e podem então ser reutilizadas por outro processo. Isto é mais eficiente do que criar e eliminar threads constantemente. Por conseguinte, é preferível utilizar o agendador [Schedulers.io].
7.7.4. Exemplo 26: Os programadores [Schedulers.immediate, Schedulers.trampoline]
Voltemos à explicação fornecida para estes dois agendadores:
![]() |
A explicação é bastante fácil de compreender, mas quando tentamos ilustrá-la, percebemos que ainda não a compreendemos bem. Foi o livro *Learning Reactive Programming With Java 8* que me ajudou a criar um exemplo baseado num que se encontra nesse livro, mas simplificado. Aqui está:
package dvp.rxjava.observables.exemples;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.function.Consumer;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Scheduler;
import rx.Scheduler.Worker;
import rx.functions.Action0;
import rx.schedulers.Schedulers;
public class Exemple26 {
public static void main(String[] args) throws InterruptedException {
// a scheduler
Scheduler scheduler = Schedulers.immediate();
// a worker of this scheme
Worker worker = scheduler.createWorker();
// an Action0 type to be executed on the worker
Action0 action02 = new Action0() {
@Override
public void call() {
// log action02
ProcessUtils.showInfos.accept("action02");
}
};
// an Action0 type to be executed on the worker
Action0 action01 = new Action0() {
@Override
public void call() {
// program a new action on the same worker
worker.schedule(action02);
// log action01
ProcessUtils.showInfos.accept("action01");
}
};
// action01 is programmed on the worker
worker.schedule(action01);
}
// displays
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
- linha 17: um agendador. Este será [Schedulers.immediate], como mostrado aqui, ou [Schedulers.trampoline] mais tarde;
- linha 19: Ações do tipo Action0 (linhas 21, 20) podem ser executadas nos trabalhadores do agendador. O método [Scheduler.createWorker] cria um trabalhador. O método [Worker.schedule(Action0)] executa um tipo Action0 através de um trabalhador;
- linhas 21–27: uma primeira ação chamada [action02] que será executada (linha 40) pelo trabalhador da linha 19;
- linhas 30–38: uma segunda ação chamada [action01]. Tem a particularidade de fazer com que a action02 seja executada no mesmo trabalhador que ela própria (linha 34). É aqui que reside a diferença entre [Schedulers.immediate] e [Schedulers.trampoline]:
- se o agendador for [Schedulers.immediate], então na linha 34, a ação action02 será executada imediatamente (daí o nome do agendador) e a ação action01 atualmente em execução será interrompida. Veremos então aparecer a mensagem da linha 25. Assim que a ação action02 terminar, a ação action01 será retomada e veremos a mensagem da linha 36;
- se o agendador for [Schedulers.trampoline], então, na linha 34, a ação action02 é colocada na fila. Ela não será executada até que a tarefa atual, action01, esteja concluída. A mensagem na linha 36 aparecerá então. Assim que a ação action01 estiver concluída, a ação action02 será executada e a mensagem na linha 25 aparecerá;
A execução do código acima produz os seguintes resultados:
Se, na linha 17, utilizarmos o agendador [Schedulers.trampoline], obtemos os resultados opostos:
Dito isto, é difícil estabelecer uma ligação com observáveis. Não encontrei nenhum exemplo convincente que demonstrasse a vantagem de executar um observável numa destas duas threads. Aqui está um, no entanto, que não me parece nada natural:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.Scheduler.Worker;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
public class Exemple27 {
public static void main(String[] args) throws InterruptedException {
// Worker
Worker worker = Schedulers.immediate().createWorker();
// Worker worker = Schedulers.trampoline().createWorker();
// observable 1 sur worker
worker.schedule(() -> Observable.range(1, 2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
ProcessUtils.showInfos.accept(String.valueOf(i));
// observable 2 on same worker
worker.schedule(() -> Observable.range(100, 2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
ProcessUtils.showInfos.accept(String.valueOf(i));
}
}));
}
}));
}
}
- linhas 13–14: é criado um trabalhador utilizando um dos dois agendadores [Schedulers.immediate] e [Schedulers.trampoline];
- linha 16: um primeiro observável obs1 é agendado neste trabalhador para emitir os números [1,2]
- linha 22: cada vez que um elemento deste observável obs1 é observado, a observação de um segundo observável obs2 é iniciada no mesmo trabalhador para emitir os números [100,101];
Com o agendador [Schedulers.immediate], obtemos os seguintes resultados:
Já com o agendador [Schedulers.trampoline], obtemos os seguintes resultados:
7.8. Conclusão
Ainda há muito a fazer. Para obter uma compreensão mais profunda da biblioteca RxJava, encorajamos os leitores a continuar a sua aprendizagem utilizando as referências fornecidas no início deste documento. No entanto, dispomos agora dos conhecimentos básicos necessários para utilizar o RxJava em ambientes Swing e Android. É isso que iremos demonstrar a seguir.








































