7. A biblioteca RxJava
A biblioteca RxJava baseia-se no seguinte conceito: um fluxo de elementos do tipo T Observable<T> é observado por um ou mais subscritores (assinantes, observadores, consumidores) Subscriber<T>. A biblioteca RxJava permite que o fluxo Observable<T> seja executado num thread T1 e o seu observador Subscriber<T> num thread T2, sem que o programadortenha de se preocupar com a gestão do ciclo de vida dessas threads nem com problemas naturalmente complexos, tais como a partilha de dados entre threads e a sincronização das mesmas para executar uma tarefa global. Facilita, assim, a programação assíncrona.
Um fluxo Observable<T> produz elementos do tipo T, observáveis à medida que são produzidos. Se o observador e o observável (termo que designa o tipo Observable<T>, por conveniência) se encontrarem na mesma thread, então o observável só pode produzir o elemento (i+1) quando o observador tiver consumido o elemento i. São poucos os casos em que esta arquitetura se revela vantajosa. Se o observador e o observável não se encontrarem no mesmo thread, então o observável e o seu observador têm comportamentos autónomos: o observável produz ao seu próprio ritmo e o observador consome ao seu próprio ritmo. É aí que reside o interesse da biblioteca. Até agora, temos falado sempre de um único observador. Na realidade, um observável pode ter um número qualquer de observadores.
A biblioteca RxJava está particularmente bem adaptada à arquitetura apresentada no parágrafo 2 da introdução e que aqui recordamos:

- em [1], uma camada de serviços presta serviços, alguns dos quais demoram a ser obtidos (por exemplo, pedidos de rede);
- esta camada de serviços é invocada por uma interface gráfica [1] (Swing, Android, JavaFx). Se a camada de serviços for executada no mesmo thread que o método [swing] que a utiliza, a interface gráfica fica congelada (não reativa) enquanto aguarda o resultado do serviço;
- em [2], uma fina camada de adaptação implementada com RxJava permite apresentar à camada gráfica uma implementação assíncrona do mesmo serviço: este pode ser executado numa thread diferente daquela do método da camada gráfica que o invoca. Neste caso, a interface gráfica [3] mantém-se responsiva: o utilizador pode continuar a interagir com ela, por exemplo, iniciar um novo pedido de rede em paralelo com o primeiro e, sobretudo, é possível oferecer-lhe a possibilidade de cancelar processamentos demasiado demorados, algo impossível se a interface gráfica estiver congelada;
- a chamada [4] é síncrona, enquanto a chamada [5-6] é assíncrona;
Nesta arquitetura, a camada [2] fornece serviços que devolvem tipos Observable<T>, aos quais os métodos da camada gráfica [3] podem subscrever. Um serviço da camada [2] fornece então os seus resultados um a um e a camada [3] pode reagir a cada um deles, atualizando, por exemplo, um ou vários componentes da interface gráfica.
A classe Observable<T> possui várias dezenas de métodos. Esta é uma das dificuldades da biblioteca: é muito rica e é difícil compreender todas as suas possibilidades. Vamos apresentar algumas delas. O domínio dos restantes métodos virá com o tempo.
7.1. Criar observáveis e subscrevê-los
7.1.1. Exemplo-01: o método [Observable.from]
![]() |
Consideremos o seguinte código:
package dvp.rxjava.observables;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import java.util.Arrays;
public class Exemple01 {
public static void main(String[] args) {
// observáveis de inteiros
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
obs1.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.printf("next : %s%n", integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println(throwable);
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed");
}
});
}
}
- linha 12: cria-se um tipo Observable<Integer> a partir de uma lista de inteiros.
A classe Observable<T> é um fluxo de elementos do tipo T que podem ser observados, de preferência de forma assíncrona, mas não necessariamente, à medida que são produzidos. A sua definição é a seguinte:
![]() |
Como já foi referido, a classe Observable<T> possui várias dezenas de métodos. Alguns são semelhantes aos da classe Stream<T> analisada no parágrafo 5. A documentação da RxJava inclui «diagramas de mármore» [2] que ilustram o funcionamento destes métodos:
- a linha 3 ilustra as emissões do observável ao longo do tempo;
- o método [4] é aplicado aos elementos emitidos pelo observável. Em geral, produz um novo observável;
- a linha 5 mostra o novo observável obtido;
O método [Observable.from] tem a seguinte assinatura:
![]() |
O método estático [Observable.from] permite criar um Observable<T> a partir de uma coleção de elementos do tipo T. É uma forma muito simples de começar a trabalhar com observáveis. A linha:
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
irá, portanto, emitir três elementos. Não os emite imediatamente. Irá emiti-los na íntegra sempre que um observador se registar. É o que se denomina um observável «frio». O observável reemite os seus elementos para cada novo subscritor.
Pode-se considerar a instrução anterior como uma ação de configuração do observável. Este é configurado uma vez e executado n vezes, caso surjam n subscritores.
Como é que se subscreve?
Uma forma de o fazer é utilizar o método [Observable.subscribe], cuja definição utilizada aqui é a seguinte:
![]() |
- o primeiro parâmetro [Action1<T> onNext] (ver parágrafo 6.2) do método é o método a executar quando o observável emite um novo elemento T;
- o segundo parâmetro [Action1<Throwable> onError] do método é o método a executar quando o observável lança uma exceção;
- o terceiro parâmetro [Action0 onComplete] (ver parágrafo 6.1) do método é o método a executar quando o observável lança uma exceção;
- o método devolve um tipo [Subscription];
O tipo [Subscription] representa uma subscrição do observável. A sua definição é a seguinte:
![]() |
O interesse desta interface [1] reside no seu método [2], que permite cancelar uma subscrição.
No nosso exemplo, o código da subscrição do observável é o seguinte:
obs1.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.printf("next : %s%n", integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println(throwable);
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed");
}
});
- linha 1: o resultado do tipo [Subscription] é ignorado;
- linhas 1-15: os três parâmetros são instâncias de classes anónimas. Também utilizaremos lambdas. A vantagem das classes anónimas é que se consegue ver claramente os tipos de dados esperados pelo único método dessas classes;
- linhas 2-5: implementação do primeiro parâmetro do tipo [Action1<Integer>];
- linhas 6-10: implementação do segundo parâmetro do tipo [Action1<Throwable>];
- linhas 11-15: implementação do terceiro parâmetro do tipo [Action0];
O código completo é o seguinte:
package dvp.rxjava.observables;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import java.util.Arrays;
public class Exemple01 {
public static void main(String[] args) {
// observáveis de inteiros
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
// assinatura
obs1.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.printf("next : %s%n", integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println(throwable);
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed");
}
});
}
}
O observável da linha 12 começa a emitir os seus 3 elementos assim que o método [subscribe] é chamado na linha 14. A partir desse momento:
- a cada elemento emitido, as linhas 15-18 são executadas.
- no final dos 3 elementos, as linhas 24-29 são executadas;
- as linhas 19-24 nunca serão executadas, uma vez que o observável não emite aqui nenhuma exceção;
Por predefinição, o observável e o observador são executados no mesmo thread. Existem alguns observáveis predefinidos que são executados num thread diferente do thread principal (neste caso, o thread do método main), mas para a maioria deles não é esse o caso. Assim, tudo ocorre aqui na thread do método [main]:
- o observável emite o elemento 1;
- as linhas 15-18 são executadas e apresentam esse elemento;
- o observável emite o elemento 2;
- as linhas 15-18 são executadas e exibem esse elemento;
- o observável emite o elemento 3;
- as linhas 15-18 são executadas e apresentam esse elemento;
- o observável emite a notificação [completed];
- as linhas 24-29 são executadas;
É isto que mostram os resultados obtidos:
A classe [Exemple02] retoma a classe [Exemple01], utilizando desta vez funções lambda como parâmetros do método [Observable.subscribe]:
package dvp.rxjava.observables;
import java.util.Arrays;
import rx.Observable;
public class Exemple02 {
public static void main(String[] args) {
// observáveis de inteiros
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
// assinatura
obs1.subscribe(
(integer) -> System.out.printf("next : %s%n", integer),
(th) -> System.out.println(th),
() -> System.out.println("completed"));
}
}
7.1.2. Exemplo-03: a classe Observer
![]() |
O método [Observable.subscribe], que permite subscrever um observável, tem várias versões, incluindo a seguinte:
package dvp.rxjava.observables;
import java.util.Arrays;
import rx.Observable;
import rx.Observer;
public class Exemple03 {
public static void main(String[] args) {
// observáveis de inteiros
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
// assinatura
obs1.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("completed");
}
@Override
public void onError(Throwable th) {
System.out.printf("throwable %s", th);
}
@Override
public void onNext(Integer integer) {
System.out.printf("next : %s%n", integer);
}
});
};
}
Na linha 13, em vez de passar três parâmetros ao método [subscribe], passa-se-lhe um tipo [Observer], como se segue:
![]() |
O tipo [Observer] é uma interface com três métodos:
- [onNext(T t)], que é chamado sempre que o observável emite um elemento t;
- [onError(Throwable th)], que é chamado quando o observável lança uma exceção th;
- [onCompleted], que é chamado quando o observável indica que terminou de emitir;
O funcionamento do código é semelhante ao explicado anteriormente. Obtêm-se os seguintes resultados:
7.1.3. Exemplo-04: o método [Observable.create]
![]() |
O método estático Observable.create é definido da seguinte forma:
![]() |
- o método [create] devolve um tipo Observable<T>;
- o parâmetro do método [create] é uma função do tipo [Observable.OnSubscribe<T>] definida da seguinte forma:
![]() |
O tipo [Observable.OnSubscribe<T>] é uma interface funcional que, por sua vez, estende a interface funcional [Action1<Subscriber<? super T>>]. O método [call] desta interface espera um tipo [Subscriber] (assinante, subscritor, observador) definido da seguinte forma:
![]() |
Vê-se em [1] que a classe [Subscriber<T>] implementa a interface [Observer<T>] apresentada no parágrafo 7.1.2.
Por fim, o método [<T> Observable.create]:
- espera como parâmetro uma instância do tipo [Observable.OnSubscribe<T>] com o único método de assinatura: void call(Subscriber<T> s). O tipo [Subscriber<T>] estende o tipo [Observer<T>] e, por conseguinte, dispõe dos métodos onNext, onError, onCompleted;
- retorna um tipo Observable<T>;
O método [<T> Observable.create] devolve um observável configurado. Ainda não houve qualquer emissão de elementos. Quando um subscritor [Subscriber<T> s] subscreve este observável, é então chamado o método [void call(s)] da função passada como parâmetro do método [<T> Observable.create]. A sua função consiste em emitir elementos t do tipo T e em chamar o método [s.onNext(t)] do observador a cada emissão. Quando este estiver concluído, o método [s.onCompleted(t)] do observador deve ser chamado e o método [call] deve terminar. Se o método [call] encontrar uma exceção th, o método [s.onError(th)] do observador deve ser chamado e o método [call] deve terminar;
Para ilustrar este funcionamento complexo, utilizaremos o seguinte código [Exemple04]:
package dvp.rxjava.observables;
import rx.Observable;
import rx.Subscriber;
import java.util.Random;
public class Exemple04 {
public static void main(String[] args) {
// configuração observável de números reais
Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
@Override
public void call(Subscriber<? super Double> subscriber) {
for (int i = 0; i < 3; i++) {
// emissão do elemento i
subscriber.onNext(new Random((i + 1)).nextDouble());
}
// fim da emissão
subscriber.onCompleted();
}
});
// assinatura e, consequentemente, emissão
obs1.subscribe((d) -> System.out.printf("onNext %s%n", d), (th) -> System.out.printf("onError %s%n", th),
() -> System.out.println("onCompleted"));
}
}
- linha 11: cria-se um observável que emite tipos Double;
- linhas 11-21: o parâmetro do método [create] é instanciado com uma classe anónima que possui o método único [call] das linhas 12-20. O observável criado na linha 11 está pronto para emitir, mas só o fará quando um observador for registado;
- linhas 13-21: o método [call] recebe a referência de um observador;
- linhas 14-17: emissão de 3 elementos para o observador;
- linha 19: notificação de fim de transmissão ao observador;
- linhas 23-24: subscrição do observável da linha 11. Implementam-se os três parâmetros [onNext, onError, onCompleted] do método [subscribe] através de três funções lambda. Esta subscrição irá criar o subscritor [Subscriber<Double>], que será passado para o método [call] da linha 13. A emissão de elementos terá então início;
- tudo ocorre no mesmo thread: observável e observador;
Obtêm-se os seguintes resultados:
O método [Observable.create] permite criar um observável a partir de qualquer fenómeno. Foi este método que utilizámos no parágrafo 2 da secção «Descoberta», para transformar uma interface síncrona numa interface assíncrona.
7.1.4. Exemplo-05: refatoração de [Exemple-04]
![]() |
O exemplo seguinte apresenta uma nova versão do método estático [Observable.subscribe]:
package dvp.rxjava.observables;
import rx.Observable;
import rx.Subscriber;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
public class Exemple05 {
public static void main(String[] args) {
// configuração de um observável de números reais
Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
@Override
public void call(Subscriber<? super Double> subscriber) {
showInfos("Observable.call start");
for (int i = 0; i < 3; i++) {
// espera
try {
Thread.sleep(500 - i * 100);
} catch (InterruptedException e) {
// erro
subscriber.onError(e);
}
// ação
double value = new Random().nextInt(100) * 1.2;
showInfos(String.format("Observable.call onNext(%s)", value));
subscriber.onNext(value);
}
// concluído
showInfos(String.format("Observable.call onCompleted"));
subscriber.onCompleted();
}
});
// um subscritor
Subscriber<Double> subscriber = new Subscriber<Double>() {
@Override
public void onCompleted() {
showInfos("Subscriber.onCompleted");
}
@Override
public void onError(Throwable e) {
showInfos(String.format("Subscriber.onError (%s)", e));
}
@Override
public void onNext(Double aDouble) {
showInfos(String.format("Subscriber.onNext (%s)", aDouble));
}
};
// subscrição
showInfos("avant souscription");
obs1.subscribe(subscriber);
showInfos("après souscription");
}
private static void showInfos(String message) {
System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
new SimpleDateFormat("ss:SSS").format(new Date()));
}
}
- linha 56: a nova versão do método estático [Observable.subscribe] aceita como parâmetro o tipo [Subscriber] que apresentámos no parágrafo anterior;
- linhas 37-52: o subscritor (assinante, observador). Este implementa a interface Observer com os seus três métodos onNext, onError e onCompleted;
- linhas 61-64: a partir de agora, vamos concentrar-nos nos threads em que o observável e o seu observador são executados;
- linha 62: o nome do thread;
- linha 63: a hora atual expressa em segundos e milissegundos. Isto vai permitir-nos acompanhar ao longo do tempo a emissão de elementos pelo observável e o seu processamento pelo observador;
- este código tem a mesma funcionalidade que o código anterior. Simplesmente refatorámos este último;
Os resultados obtidos são os seguintes:
- linha 1 dos resultados: antes da linha 56 do código, ainda não aconteceu nada. O observável foi simplesmente configurado;
- linha 2 dos resultados: a linha 56 do código provoca a chamada do método [call] da linha 15. Na linha 3, o número real 80,39 é enviado para o observador;
- linha 4: o observador recebe o número enviado;
- linhas 5-8: o processo anterior repete-se duas vezes;
- linha 9: o observável envia a notificação de fim de transmissão;
- linha 10: o observador recebe-a;
- linha 11: exibida pela linha 57 do código;
Vemos, portanto, que a única linha 56 de subscrição provocou a exibição das linhas 2 a 10 dos resultados. Quando se começa a utilizar a biblioteca RxJava, questiona-se como as coisas se encadeiam umas às outras e, em particular, as ligações que unem o observador e o observável. Vemos aqui que a linha 56, a subscrição do observável,
- provocou a emissão de todos os elementos do observável;
- que o observável e o observador são executados no mesmo thread;
- que, por causa disso, observa-se a sequência: emissão do elemento i, observação do elemento i, emissão do elemento (i+1), observação do elemento (i+1), ...
Recorde-se que o emissor esperava antes de emitir os seus elementos:
// em espera
try {
Thread.sleep(500 - i * 100);
} catch (InterruptedException e) {
// erro
subscriber.onError(e);
}
onde i na linha 3 representa o número da emissão (0<=i<3). Se observarmos as horas de emissão dos elementos do observável:
- linhas 2 e 3: o elemento 0 foi transmitido cerca de 500 ms após o início da subscrição;
- linhas 3 e 5: o elemento 1 foi transmitido cerca de 400 ms após o elemento 0;
- linhas 5, 7: o elemento 2 foi emitido cerca de 300 ms após o elemento 1;
7.2. Thread de execução, thread de observação
7.2.1. Exemplo-06: observável e observador num thread diferente de [main]
![]() |
Reestruturamos o exemplo anterior da seguinte forma [Exemple06]:
package dvp.rxjava.observables;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class Exemple06 {
public static void main(String[] args) {
// guarda-barreira
CountDownLatch latch = new CountDownLatch(1);
// configuração de um observável de valores reais
Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
@Override
public void call(Subscriber<? super Double> subscriber) {
showInfos("Observable.call start");
for (int i = 0; i < 3; i++) {
// em espera
try {
Thread.sleep(500 - i * 100);
} catch (InterruptedException e) {
// erro
subscriber.onError(e);
}
// ação
double value = new Random().nextInt(100) * 1.2;
showInfos(String.format("Observable.call onNext(%s)", value));
subscriber.onNext(value);
}
// concluído
showInfos(String.format("Observable.call onCompleted"));
subscriber.onCompleted();
}
});
// um subscritor
Subscriber<Double> subscriber = new Subscriber<Double>() {
@Override
public void onCompleted() {
showInfos("Subscriber.onCompleted");
// baixar a barreira
latch.countDown();
}
@Override
public void onError(Throwable e) {
showInfos(String.format("Subscriber.onError (%s)", e));
}
@Override
public void onNext(Double aDouble) {
showInfos(String.format("Subscriber.onNext (%s)", aDouble));
}
};
// continuação da configuração observável
obs1 = obs1.subscribeOn(Schedulers.computation());
// subscrição
showInfos("avant souscription");
obs1.subscribe(subscriber);
// espera em frente à barreira
try {
showInfos("début attente barrière");
latch.await();
showInfos("fin attente barrière");
} catch (InterruptedException e1) {
System.out.println(e1);
}
showInfos("après souscription");
}
private static void showInfos(String message) {
System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
new SimpleDateFormat("ss:SSS").format(new Date()));
}
}
- linha 16: criamos um semáforo com um objeto do tipo [CountDownLatch]. Este objeto serve para sincronizar threads entre si. Aqui, é inicializado com o valor 1, ao qual chamaremos de valor do semáforo. Uma thread entra em espera no semáforo através de uma operação:
latch.await();
A thread fica bloqueada se o valor do guard-barrier for >0. Uma thread pode aumentar ou diminuir o valor interno do guard-barrier. Na linha 48, o valor do guard-barrier é decrementado em 1.
- Linha 63: o observável é configurado para ser executado num thread fornecido pelo agendador [Schedulers.computation()]. Este agendador pode fornecer tantos threads quantos forem os núcleos da máquina de execução. O parágrafo sobre a aplicação de exemplo demonstrou a utilização de outros agendadores (ver parágrafo 2.8);
O princípio do código é o seguinte:
- o método [main] é executado na thread principal (main);
- linha 66: inicia a emissão de elementos do observável. Estes serão emitidos numa thread diferente da thread principal;
- linha 70: a thread principal fica bloqueada porque o bloqueador tem o valor 1 (ver linha 16). Só poderá continuar quando este valor passar para 0. Isto acontece na linha 48. É o observador que desce o bloqueador quando recebe a notificação de que o observável terminou as suas emissões;
A execução produz os seguintes resultados:
- linha 1: a subscrição vai ocorrer;
- linha 2: esta subscrição desencadeia a execução do método [call] na thread [RxComputationThreadPool-1]. Temos agora uma execução paralela com duas threads;
- linha 3: por uma razão ainda não esclarecida, a thread [RxComputationThreadPool-1] cedeu o controlo. A thread [main] assume então o controlo e fica bloqueada pelo bloqueador (linha 70 do código). A partir deste momento, apenas a thread [RxComputationThreadPool-1] pode operar;
- linhas 4-11: observa-se o comportamento anteriormente observado entre o observável e o seu observador, mas agora tudo decorre no thread [RxComputationThreadPool-1];
- linhas 12-13: o observador baixou a barreira (linha 48 do código) e o thread [RxComputationThreadPool-1] terminou. O thread [main] assume o controlo e apresenta duas mensagens;
7.2.2. Exemplo-07: observável e observador em dois threads diferentes
![]() |
Alteramos o exemplo anterior da seguinte forma:
package dvp.rxjava.observables;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class Exemple07 {
public static void main(String[] args) {
// guarda da barreira
CountDownLatch latch = new CountDownLatch(1);
// configuração de um observável de valores reais
Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
@Override
public void call(Subscriber<? super Double> subscriber) {
showInfos("Observable.call start");
for (int i = 0; i < 3; i++) {
// espera
try {
Thread.sleep(500 - i * 100);
} catch (InterruptedException e) {
// erro
subscriber.onError(e);
}
// ação
double value = new Random().nextInt(100) * 1.2;
showInfos(String.format("Observable.call onNext(%s)", value));
subscriber.onNext(value);
}
// concluído
showInfos(String.format("Observable.call onCompleted"));
subscriber.onCompleted();
}
});
// um subscritor
Subscriber<Double> subscriber = new Subscriber<Double>() {
@Override
public void onCompleted() {
showInfos("Subscriber.onCompleted");
// baixar a barreira
latch.countDown();
}
@Override
public void onError(Throwable e) {
showInfos(String.format("Subscriber.onError (%s)", e));
}
@Override
public void onNext(Double aDouble) {
showInfos(String.format("Subscriber.onNext (%s)", aDouble));
}
};
// continuação da configuração observável
obs1 = obs1.subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation());
// subscrição
showInfos("avant souscription");
obs1.subscribe(subscriber);
// à espera de que a barreira suba
try {
showInfos("début attente barrière");
latch.await();
showInfos("fin attente barrière");
} catch (InterruptedException e1) {
System.out.println(e1);
}
showInfos("après souscription");
}
private static void showInfos(String message) {
System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
new SimpleDateFormat("ss:SSS").format(new Date()));
}
}
O código é idêntico ao do exemplo anterior, exceto na linha 63:
obs1 = obs1.subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation());
que configura o observável (subscribeOn) e o observador (observeOn) para serem executados numa das threads fornecidas pelo agendador [Schedulers.computation()].
Os resultados obtidos são os seguintes:
É possível observar os seguintes pontos:
- o observável é executado na thread [RxComputationThreadPool-4] (linhas 3-4, 6, 8-9);
- o observador é executado no thread [RxComputationThreadPool-3] (linhas 5, 7, 10-11);
- que ambos se executam de forma autónoma. Assim, nas linhas 8-9, o observável emite duas notificações (onNext, onCompleted) antes de o observador recuperar a notificação [onNext] (linha 10);
A biblioteca RxJava encarrega-se da transferência de dados (as emissões) da thread do observável para a thread do observador. O programador não precisa de se preocupar com isso.
Já vimos como criar observáveis (Observable.from, Observable.create). Vamos agora ver os observáveis predefinidos da biblioteca RxJava.
7.3. Observáveis predefinidos
7.3.1. Exemplo-08: o método [Observable.range]
![]() | ![]() |
A partir de agora, vamos utilizar classes dedicadas aos processos observados e aos seus observadores. A ideia é poder registar o seu nome, o seu thread de execução e as horas de execução, de modo a poder acompanhá-los ao longo do tempo.
A classe [Process] será simplesmente um Observable ao qual se pode atribuir um nome. Ela implementará a seguinte interface [IProcess]:
package dvp.rxjava.observables.utils;
import rx.Observable;
public interface IProcess<T> {
// nome do observável
public String getName();
// observável
public Observable<T> getObservable();
}
Esta interface poderá ser implementada pela seguinte classe [Process<T>]:
package dvp.rxjava.observables.utils;
import rx.Observable;
import rx.Scheduler;
public class Process<T> implements IProcess<T>{
// nome da variável observável
protected String name;
// processo observado
protected Observable<T> observable;
// construtores
public Process(String name, Observable<T> observable) {
// inicializações locais
this.name = name;
this.observable = observable;
}
// getters e setters
public String getName() {
return name;
}
public Observable<T> getObservable() {
return observable;
}
}
- linha 9: o nome do processo;
- linha 11: a observável observada;
- linhas 14-18: o construtor;
O observador será, por sua vez, descrito pela seguinte classe [Observateur]:
package dvp.rxjava.observables.utils;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import rx.Subscriber;
public class Observateur<T> extends Subscriber<T> {
...
}
- na linha 11, a classe Observateur<T> estende a classe Subscriber<T> que apresentámos brevemente no parágrafo 7.1.3. Iremos utilizá-la como argumento do método [Observable.subscribe]:
// execução observável (observação)
obs1.subscribe(observateur);
O método [Observable.subscribe] utilizado na linha 2 acima tem a seguinte definição:
![]() |
A função do [Subscriber] consiste principalmente em gerir os elementos emitidos pelo observável ao qual se subscreveu, através dos métodos da interface [Observer]: onNext, onError, onCompleted. A classe [Subscriber] possui os seguintes métodos:
![]() |
No código da classe [Observateur], utilizaremos o método [1] isUnsubscribed para verificar se a subscrição do subscritor foi cancelada ou não. A classe [Observateur<T>] completa é a seguinte:
package dvp.rxjava.observables.utils;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import rx.Subscriber;
public class Observateur<T> extends Subscriber<T> {
// um semáforo
private CountDownLatch latch;
// um método de visualização
private Consumer<String> showInfos;
// o nome do observador
private String observerName;
// o nome do processo observado
private String processName;
// construtores
public Observateur() {
}
public Observateur(String name, CountDownLatch latch, Consumer<String> showInfos, String observedName) {
this.observerName = name;
this.latch = latch;
this.showInfos = showInfos;
this.processName = observedName;
}
// --------------------------- implementação da interface Observer<T>
@Override
public void onCompleted() {
// fim das transmissões
if (!isUnsubscribed()) {
showInfos.accept(String.format("Subscriber [%s,%s].onCompleted", observerName, processName));
}
// fim do bloqueio do thread principal
latch.countDown();
}
@Override
public void onError(Throwable e) {
// erro de emissão
if (!isUnsubscribed()) {
showInfos.accept(String.format("Subscriber [%s, %s].onError (%s)", observerName, processName, e));
}
}
@Override
public void onNext(T value) {
// uma emissão adicional
if (!isUnsubscribed()) {
try {
showInfos.accept(String.format("Subscriber [%s,%s] : onNext (%s)", observerName, processName,
new ObjectMapper().writeValueAsString(value)));
} catch (JsonProcessingException e) {
showInfos.accept(String.format("Subscriber [%s,%s].onNext (%s)", observerName, processName, e));
}
}
}
}
- Para além das características de um Subscriber, o observador Observateur incluirá as seguintes informações:
- linha 14: um guard-barrier ou semáforo que servirá para bloquear o thread principal até que o observador tenha recebido todos os elementos emitidos pelo observável. Isto ocorrerá na linha 36 do código, quando o observador receber do observável a notificação de fim de emissão;
- linha 16: uma instância Consumer<String> que servirá para exibir uma mensagem na consola;
- linha 18: o nome do observador, para os distinguir uns dos outros quando houver vários;
- linha 20: o nome do processo observado;
- linhas 36, 46, 54: os métodos [onCompleted, onError, onNext] da interface [Observer<T>] implementada pela classe abstrata [Subscriber<T>]. Esta classe não os implementa. Por isso, é necessário fazê-lo nas suas classes filhas. Antes de realizar qualquer ação nestes métodos, verifica-se se o observador não foi desabonado do observável que está a observar;
- linha 59: o método [onNext] do observador escreve a cadeia jSON do elemento recebido. Isto permitir-nos-á apresentar vários tipos de elementos;
Dito isto, vamos analisar um novo método da classe Observable, o método [range]:
![]() |
O observável Observable.range(n,m) emite (m) inteiros que vão de n a n+m-1. Analisamo-lo com o código [Exemple08] seguinte:
package dvp.rxjava.observables.exemples;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import dvp.rxjava.observables.utils.Observateur;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple08 {
public static void main(String[] args) throws InterruptedException {
// número de observadores
final int nbObservateurs = 2;
// semáforo
CountDownLatch latch = new CountDownLatch(nbObservateurs);
// configuração observável
Observable<Integer> obs1 = Observable.range(15, 3).subscribeOn(Schedulers.computation());
// execução observável (observação)
showInfos.accept("main : début observation");
for (int i = 0; i < nbObservateurs; i++) {
obs1.subscribe(new Observateur<>(String.format("observateur[%d]", i), latch, showInfos,"obs1"));
}
// espera
showInfos.accept("main : attente fin observation");
latch.await();
// fim
showInfos.accept("main : fin observation");
}
// visualizações
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
- linha 16: vamos utilizar dois observadores;
- linha 19: o semáforo é inicializado com o valor 2, porque vamos colocar cada observador num thread diferente. O thread principal terá, portanto, de aguardar a conclusão dos dois threads de observação;
- linha 22: configuramos o observável de forma a que seja executado num thread do agendador [Schedulers.computation()]. O observador estará no mesmo thread que o observável;
- linhas 25-27: subscrevemos dois observadores ao observável. Isto irá desencadear a execução completa deste para cada um dos observadores: os números inteiros 15, 16 e 17 serão emitidos;
- linha 30: o thread principal aguarda a conclusão dos observadores;
Os resultados obtidos são os seguintes:
- linha 2: o thread principal fica bloqueado, à espera que os dois observadores terminem;
- linhas 3-4: verifica-se que o observador 0 está no thread [RxComputationThreadPool-1] e o observador 1 no thread [RxComputationThreadPool-2];
- linhas 3-10: verifica-se que os dois observadores recebem exatamente os mesmos elementos;
Vamos utilizar a classe Observateur assim definida para ilustrar o comportamento de outros tipos de observáveis.
7.3.2. Exemplo-09: os métodos Observable.[interval, take, doNext]
![]() |
![]() |
Este exemplo ilustra a utilização do observável Observable.interval (intervalo longo, unidade TimeUnit), que emite inteiros longos a intervalos de tempo regulares. É importante referir o ponto [1]: por predefinição, o observável [Observable.interval] é executado numa das threads do agendador [Schedulers.computation].
O código será o seguinte:
package dvp.rxjava.observables.exemples;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import dvp.rxjava.observables.utils.Observateur;
import rx.Observable;
public class Exemple09 {
public static void main(String[] args) throws InterruptedException {
// número de observadores
final int nbObservateurs = 2;
// semáforo
CountDownLatch latch = new CountDownLatch(nbObservateurs);
// configuração observável
Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.doOnNext(l -> showInfos.accept(l.toString()));
// execução observável (observação)
showInfos.accept("main : début observation");
for (int i = 0; i < nbObservateurs; i++) {
obs1.subscribe(new Observateur<>(String.format("observateur [%d]", i), latch, showInfos,
"obs1"));
}
// espera
showInfos.accept("main : attente fin observation");
latch.await();
// fim
showInfos.accept("main : fin observation");
}
// visualizações
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
- linha 22: o observável emite inteiros longos a cada 500 milissegundos. A série começa com o número 0;
- linha 22: este observável emite um número infinito de valores. O método [Observable.take(n)] cria um novo observável que conserva apenas os primeiros n elementos emitidos;
![]() |
Voltemos ao código do observável:
Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.doOnNext(l -> showInfos.accept(l.toString()));
Na linha 2, o método [Observable.doOnNext] é executado sempre que o observável emite um novo elemento. Isto é frequentemente utilizado para registar informações. Neste caso, pretendemos registar a data de emissão dos elementos para verificar se o intervalo de 500 milissegundos está a ser cumprido. O método [Observable.doOnNext] não altera o observável ao qual se aplica. A sua definição é a seguinte:
![]() |
A execução produz os seguintes resultados:
- linhas 3, 7 e 11: verifica-se que, aproximadamente, o intervalo de emissão é próximo de 500 ms;
- Os dois observadores estão, evidentemente, em dois threads diferentes, apesar de o observável não ter sido configurado para ser executado com um agendador específico. Trata-se do funcionamento por predefinição do observável [Observable.interval] que vemos aqui;
7.3.3. Exemplos-10/12: os métodos Observable.[error, empty, never]
![]() | ![]() |
A partir de agora, seremos mais concisos nas nossas ilustrações dos métodos da classe [Observable]. O código anterior era o seguinte:
package dvp.rxjava.observables;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import rx.Observable;
public class Exemple09 {
public static void main(String[] args) throws InterruptedException {
// número de observadores
final int nbObservateurs = 2;
// semáforo
CountDownLatch latch = new CountDownLatch(nbObservateurs);
// configuração observável
Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.doOnNext(l -> showInfos.accept(l.toString()));
// execução observável (observação)
showInfos.accept("main : début observation");
for (int i = 0; i < nbObservateurs; i++) {
obs1.subscribe(new Observateur<>(String.format("observateur [%d]", i), latch, showInfos,
"obs1"));
}
// espera
showInfos.accept("main : attente fin observation");
latch.await();
// fim
showInfos.accept("main : fin observation");
}
// visualizações
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
Este código já tinha sido utilizado no exemplo anterior. Apenas as linhas 21 e 22 mudavam. Vamos, portanto, factorizar a maior parte deste código na seguinte classe [ProcessUtils]:
package dvp.rxjava.observables.utils;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import rx.Observable;
public class ProcessUtils {
@SafeVarargs
public static void subscribe(int nbObservateurs, IProcess<?>... processes) throws InterruptedException {
// semáforo
CountDownLatch latch = new CountDownLatch(nbObservateurs * processes.length);
// execução observável (observação)
showInfos.accept("main : début observation");
for (int i = 0; i < nbObservateurs; i++) {
for (IProcess<?> process : processes) {
Observable<?> obs = process.getObservable();
obs.subscribe(new Observateur<>(String.format("observateur[%d]", i), latch, showInfos, process.getName()));
}
}
// espera
showInfos.accept("main : attente fin observation");
latch.await();
// fim
showInfos.accept("main : fin observation");
}
// visualizações
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
- linha 13: o método aceita dois parâmetros:
- nbObservateurs: o número de observadores dos processos passados como segundo parâmetro;
- processes: os processos (observáveis nomeados) a observar. Graças à notação [IProcess<?>], os processos poderão emitir elementos de tipos diferentes;
- linha 16: o semáforo deve passar para verde quando todos os observadores tiverem concluído todas as suas observações. O valor inicial do semáforo é, portanto, o número de observadores multiplicado pelo número de observações;
- linhas 20-25: cada observador é subscrito a todos os processos que deve observar;
- linha 23: recupera-se o observável a partir do processo (ver parágrafo 7.3.1);
- linha 23: subscreve-se um observador a essa observável. São transmitidas a este observador quatro informações:
- o seu nome;
- o semáforo que deve decrementar quando receber a notificação de fim de emissão da variável observável que está a observar;
- o método a utilizar quando quiser registar informações na consola;
- o nome do processo que vai observar;
Uma vez definidas estas classes, o exemplo 10 será o seguinte:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple10 {
public static void main(String[] args) throws InterruptedException {
// configuração observável
Observable<?> obs = Observable.error(new RuntimeException("Erreur !!!")).subscribeOn(Schedulers.computation());
// execução (observação) observável
ProcessUtils.subscribe(2,new Process<>("process1", obs));
}
}
Na linha 11, o método estático [Observable.error] é definido da seguinte forma:
![]() |
A linha 8 configura, portanto, um observável que se limita a lançar uma exceção para o método [onError] dos seus subscritores. A execução produz os seguintes resultados:
main : début observation ------Thread[main] ---- Time[22:618]
main : attente fin observation ------Thread[main] ---- Time[22:636]
Subscriber[observateur[1], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-2] ---- Time[22:638]
Subscriber[observateur[0], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-1] ---- Time[22:638]
Nas linhas 3 e 4, o método [onError] dos dois subscritores recebeu a exceção lançada pelo observável.
Esta execução tem uma particularidade: os métodos [onCompleted] dos dois observadores não foram chamados. Consequentemente, a barreira não foi baixada e o thread principal permanece bloqueado no método estático [ProcessUtils.subscribe] na linha 3 seguinte:
// em espera
showInfos.accept("main : attente fin observation");
latch.await();
// fim
showInfos.accept("main : fin observation");
Verifica-se aqui que, em caso de erro do observável, o método [onCompleted] dos subscritores não é chamado. Modificamos, então, o método [Observateur.onError] da seguinte forma:
@Override
public void onError(Throwable e) {
// erro de emissão
if (!isUnsubscribed()) {
showInfos.accept(String.format("Subscriber[%s, %s].onError (%s)", observerName, processName, e));
}
// fim do bloqueio do thread principal
latch.countDown();
}
Adicionamos as linhas 7-8 para eliminar a barreira em caso de erro do observável. Com este novo código, a execução produz os seguintes resultados:
main : début observation ------Thread[main] ---- Time[40:750]
main : attente fin observation ------Thread[main] ---- Time[40:764]
Subscriber[observateur[0], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-1] ---- Time[40:766]
Subscriber[observateur[1], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-2] ---- Time[40:766]
main : fin observation ------Thread[main] ---- Time[40:767]
Obtenemos a linha 5, que não tínhamos obtido anteriormente.
O exemplo 11 será o seguinte:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple11 {
public static void main(String[] args) throws InterruptedException {
// configuração observável
Observable<?> obs1 = Observable.empty();
// execução (observação) observável
ProcessUtils.subscribe(2,new Process<>("process1",obs1));
}
}
Na linha 10, o método estático [Observable.empty] cria um observável que não emite nenhum elemento. Este emite apenas a notificação de fim de emissão;
![]() |
A execução do código do exemplo acima produz os seguintes resultados:
- linhas 2 e 3: verifica-se que os dois observadores recebem a notificação de fim de emissão sem terem recebido quaisquer elementos anteriormente.
Podemos questionar-nos sobre qual será a utilidade deste método. Podemos utilizá-lo de forma análoga a uma coleção, inicialmente vazia, na qual se acumulam posteriormente elementos:
Na linha 3, funde-se o observável inicial obs (linha 1) com outros observáveis.
O exemplo 12 ilustra o método estático [Observable.never]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple12 {
public static void main(String[] args) throws InterruptedException {
// configuração observável
Observable<?> obs1 = Observable.never();
// execução (observação) observável
ProcessUtils.subscribe(2,new Process<>("process1",obs1));
}
}
O método estático [Observable.never] cria um observável que nunca emite:
![]() |
A execução do exemplo produz os seguintes resultados:
Na linha 2, o thread principal fica à espera indefinidamente. Com efeito, nenhum observável emite a notificação [onCompleted] que permite que o semáforo (barreira) passe para verde (baixar a barreira).
7.4. Multi-threading
7.4.1. Exemplo 13: thread de ação, thread de observação
No parágrafo 7.1.3, criámos um observável com o método estático [Observable.create]:
![]() |
- o método [create] devolve um tipo Observable<T>;
- o parâmetro do método [create] é uma função do tipo [Observable.OnSubscribe<T>] definida da seguinte forma:
![]() |
O tipo [Observable.OnSubscribe<T>] é uma interface funcional que, por sua vez, estende a interface funcional [Action1<Subscriber<? super T>>]. O método [call] desta interface espera um tipo [Subscriber] (assinante, subscritor, observador). No restante deste documento, por vezes referir-nos-emos ao tipo [Observable.OnSubscribe<T>] como uma ação. Vamos criar ações personalizadas que terão um nome. Serão instâncias da seguinte interface [IProcessAction]:
![]() |
package dvp.rxjava.observables.utils;
import rx.Observable;
public interface IProcessAction<T> extends Observable.OnSubscribe<T> {
// a ação tem um nome
public String getName();
}
- linha 5: a interface [IProcessAction<T>] possui todas as características da interface [Observable.OnSubscribe<T>];
- linha 8: além disso, possui um método [getName] que devolve o nome da instância que implementa a interface;
Vamos utilizar a seguinte ação denominada [ProcessAction01]:
package dvp.rxjava.observables.utils;
import java.util.Random;
import rx.Subscriber;
import rx.functions.Func1;
public class ProcessAction01<T> implements IProcessAction<T> {
// dados
private String name;
private int nbValues;
private Func1<Integer, T> func1;
// construtores
public ProcessAction01(String name, int nbValues, Func1<Integer, T> func1) {
this.name = name;
this.nbValues = nbValues;
this.func1 = func1;
}
@Override
public void call(Subscriber<? super T> subscriber) {
ProcessUtils.showInfos.accept(String.format("Observable (%s) call start", getName()));
for (int i = 0; i < nbValues; i++) {
// espera
try {
Thread.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
// erro
ProcessUtils.showInfos.accept(String.format("Observable (%s) onError", getName()));
subscriber.onError(e);
}
// emissão de um elemento
T value = func1.call(i);
ProcessUtils.showInfos.accept(String.format("Observable (%s,%s) onNext (%s)", getName(), i, value));
subscriber.onNext(value);
}
// concluído
ProcessUtils.showInfos.accept(String.format("Observable (%s) onCompleted", getName()));
subscriber.onCompleted();
}
@Override
public String getName() {
return name;
}
}
- linha 8: a classe [ProcessAction01<T>] implementa a interface [IProcessAction<T>] e, por conseguinte, a interface [Observable.OnSubscribe<T>];
- linha 11: o nome da ação;
- linha 12: o número de valores a emitir;
- linha 13: uma instância do tipo [Func1<Integer, T>] que, a partir de um inteiro, cria um tipo T que será emitido pelo observável (linhas 35 e 37);
- linhas 16-20: passam-se ao construtor o nome da ação, o número de valores a emitir e a função de emissão;
- linhas 23-42: o código do processo;
- linha 23: o método [call] recebe como parâmetro o subscritor do observável associado ao processo;
- linha 28: o processo emite os seus elementos após uma espera de duração aleatória;
- linha 32: a emissão de um erro;
- linha 37: uma emissão normal;
- linha 41: emissão da notificação de fim de emissão;
- linhas 25-38: a ação emite valores reais nbValues após um tempo de espera aleatório (linha 30);
- linha 35: o valor a emitir é fornecido pela função [func1] passada como parâmetro ao construtor (linha 16);
Reestruturamos a classe [Process] (ver parágrafo 7.3.1) para que possa ser instanciada também com uma ação nomeada. Adicionamos-lhe o seguinte construtor:
public Process(IProcessAction<T> na, Scheduler schedulerObserved, Scheduler schedulerObserver) {
// nome do processo=nome da ação
name = na.getName();
// ação --> observável
observable = Observable.create(na);
// thread de execução do processo observado
if (schedulerObserved != null) {
observable = observable.subscribeOn(schedulerObserved);
}
// thread de observação do observador
if (schedulerObserver != null) {
observable = observable.observeOn(schedulerObserver);
}
}
- na linha 1, o construtor aceita 3 parâmetros:
- a ação nomeada que servirá para construir o observável (linha 5);
- o agendador do processo observado (pode ser null);
- o agendador do observador (pode ser null);
- linha 5: o observável é criado a partir da ação passada como parâmetro;
O código seguinte, [Exemple13], observa diferentes observáveis:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple13 {
public static void main(String[] args) throws InterruptedException {
// processo 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 1, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// processo 2
Process<String> process2 = new Process<>(
new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), Schedulers.computation(), null);
// processo 3
Process<Integer> process3 = new Process<>(new ProcessAction01<Integer>("process3", 3, i -> i * 2), null,
Schedulers.computation());
// processo 4
Process<Boolean> process4 = new Process<>(new ProcessAction01<Boolean>("process4", 4, i -> i % 2 == 0), null, null);
// subscrições
ProcessUtils.subscribe(1, process1);
ProcessUtils.subscribe(1, process2);
ProcessUtils.subscribe(1, process3);
ProcessUtils.subscribe(1, process4);
}
}
- linhas 13-15: o processo process1 produz 1 número real num thread de cálculo, que será observado noutro thread de cálculo;
- linhas 17-18: o processo process2 produz 2 cadeias de caracteres num thread de cálculo e não é fornecida qualquer indicação sobre o thread do observador. Os resultados mostram que a observação é feita, por predefinição, no mesmo thread em que o processo é executado;
- linhas 20-21: o processo process3 produz 3 números inteiros num thread não especificado, que serão observados num thread de cálculo. Os resultados mostram que a execução do processo ocorre, por predefinição, no thread principal;
- linha 23: o processo process4 produz 4 valores booleanos num thread não imposto, que serão observados num thread não imposto. Os resultados mostram que a execução do processo e a sua observação ocorrem, por predefinição, no thread principal;
O resultado da execução deste código é o seguinte:
- o processo process1 produz 1 número real (linha 4) na thread de cálculo [RxComputationThreadPool-4], que é observado na thread de cálculo [RxComputationThreadPool-3] (linha 6);
- o processo process2 produz 2 cadeias de caracteres (linhas 12, 14) no thread de cálculo [RxComputationThreadPool-5], que são observadas nesse mesmo thread (linhas 13, 15);
- o processo process3 produz 3 números inteiros (linhas 21, 23, 25) no thread principal, que são observados no thread de cálculo [RxComputationThreadPool-6] (linhas 22, 24, 28);
- o processo process4 gera 4 valores booleanos (linhas 34, 36, 38, 40) no thread principal, que são observados nesse mesmo thread principal (linhas 33, 35, 37, 39);
Convida-se o leitor a acompanhar o que se segue:
- o ciclo de vida do processo observado e da sua thread;
- o ciclo de vida do seu observador e da sua thread;
Grande parte do interesse das bibliotecas Rx reside neste multithreading que o programador não tem de gerir por si próprio.
7.5. Combinações de vários observáveis
7.5.1. Exemplo 14: fundir dois observáveis com [Observable.merge]
Apresentamos agora métodos estáticos da classe [Observable] que permitem combinar vários observáveis num observável resultante.
O primeiro exemplo deste tipo será o seguinte:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.ProcessAction01;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple14 {
public static void main(String[] args) throws InterruptedException {
// processo 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// processo 2
Process<String> process2 = new Process<>(
new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), Schedulers.computation(), null);
// fusão
Process<?> process12 = new Process<>("process12",
Observable.merge(process1.getObservable(), process2.getObservable()));
// subscrições
ProcessUtils.subscribe(1, process12);
}
}
- linhas 15-17: um processo denominado [process1] irá emitir 3 números reais num thread de cálculo. Será também observado num thread de cálculo;
- linhas 19-20: um processo denominado [process2] irá emitir 2 cadeias de caracteres num thread de cálculo. O thread de observação não é imposto. Vimos anteriormente que, neste caso, o thread de observação é o thread de cálculo;
- linha 23: os dois processos são fundidos, ou seja, cria-se um observável cujos elementos provêm simultaneamente dos dois processos. Para tal, utiliza-se o método estático [Observable.merge]:
![]() |
Ao contrário do que o esquema acima possa sugerir, durante a fusão, os elementos de um fluxo 1 podem intercalar-se entre os elementos de um fluxo 2. É isso que mostram os resultados da execução:
- linha 3: o processo [process1] é executado na thread de cálculo [RxComputationThreadPool-4];
- linha 4: o processo [process2] é executado na thread de cálculo [RxComputationThreadPool-5];
- linha 9: o processo [process12] é observado na thread de cálculo [RxComputationThreadPool-3]. Não sei qual foi a regra que levou a esta escolha;
- linhas 9-11: verifica-se que o observador observa elementos dos dois processos [process1] (linha 5) e [process2] (linhas 6, 7), embora nenhum dos dois esteja concluído (há mistura);
- o processo [process12] termina (linha 17) quando os dois processos process1 e process2 terminam;
7.5.2. Exemplo 15: concatenar dois observáveis com [Observable.concat]
Analisamos agora o código seguinte:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.ProcessAction01;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple15 {
public static void main(String[] args) throws InterruptedException {
// processo 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// processo 2
Process<String> process2 = new Process<>(
new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), null, Schedulers.computation());
// concat
Process<?> process12 = new Process<>("process12",
Observable.concat(process1.getObservable(), process2.getObservable()));
// subscrições
ProcessUtils.subscribe(1, process12);
}
}
- linhas 15-17: um processo denominado [process1] irá emitir 3 números reais num thread de cálculo. Será também observado num thread de cálculo;
- linhas 19-20: um processo denominado [process2] irá emitir 2 cadeias de caracteres numa thread não especificada, neste caso a thread principal por predefinição. Será observado numa thread de cálculo;
- linha 23: os dois processos são concatenados, ou seja, cria-se um observável cujos elementos provêm dos dois processos. Não há mistura dos valores emitidos. O processo [process12] irá primeiro emitir todos os valores do processo [process1] e, em seguida, os do processo [process2]. Para tal, utiliza-se o método estático [Observable.concat]:
![]() |
Os resultados da execução são os seguintes:
- linhas 3-10: o processo [process1] está a ser executado e o processo [process12] emite os valores gerados pelo [process1];
- linha 9: o processo [process1] foi concluído;
- linhas 11-17: o processo [process2] está a ser executado e o processo [process12] transmite os valores emitidos pelo [process2];
Há uma anomalia no processo process2: não tinha sido definido nenhum thread de execução. Seria de esperar que, por predefinição, este fosse o thread principal. No entanto, não é esse o caso. O thread de execução foi o thread de cálculo [RxComputationThreadPool-3] (linha 11). Assim, quando não se define um thread de execução ou de observação, não é possível fazer suposições sobre o thread que será escolhido.
7.5.3. Exemplo 16: combinar duas variáveis observáveis com [Observable.zip]
Analisamos agora o código seguinte:
package dvp.rxjava.observables.exemples;
import java.util.Arrays;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.functions.FuncN;
import rx.schedulers.Schedulers;
public class Exemple16 {
public static void main(String[] args) throws InterruptedException {
// processo 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// processo 2
Process<String> process2 = new Process<>(
new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), null, null);
// função de combinação dos 2 processos
FuncN<String> funcn = new FuncN<String>() {
@Override
public String call(Object... args) {
if (args.length == 2) {
return String.format("double=%s, string=%s", args[0], args[1]);
} else {
throw new RuntimeException("la fonction attend 2 paramètres exactement");
}
}
};
// compactação dos 2 processos
Process<String> process12 = new Process<>("process12",
Observable.zip(Arrays.asList(process1.getObservable(), process2.getObservable()), funcn));
// subscrições
ProcessUtils.subscribe(1, process12);
}
}
- linhas 16-18: um processo denominado [process1] irá emitir 3 números reais num thread de cálculo. Será também observado num thread de cálculo;
- linhas 20-21: um processo denominado [process2] irá emitir 2 cadeias de caracteres num thread não imposto. O thread de observação também não é imposto;
- linhas 23-32: instanciação de um tipo [FuncN<String>] com uma classe anónima. FuncN é uma interface funcional:
![]() |
O método [FuncN.call] espera um array de objetos e devolve um tipo R. A função [funcn] será utilizada para combinar os processos process1 e process2 nesta ordem. No método [FuncN.call]:
- args[0] será um Double;
- args[1] será um String;
Aqui, o resultado de [funcn.call] será a cadeia de caracteres da linha 27. A construção deste resultado não requer o conhecimento dos tipos dos argumentos do método call.
Os dois processos são combinados da seguinte forma:
// compactação dos 2 processos
Process<String> process12 = new Process<>("process12",
Observable.zip(Arrays.asList(process1.getObservable(), process2.getObservable()), funcn));
O método [Observable.zip] funciona da seguinte forma:
![]() |
Verifica-se que:
- o primeiro argumento do zip é um Iterable<Observable>. No nosso exemplo, temos um parâmetro efetivo do tipo List<Observable>, formado pelos nossos dois observáveis;
- o segundo argumento do zip é do tipo FuncN. No nosso exemplo, o parâmetro efetivo é [funcn];
A execução produz os seguintes resultados:
- linhas 7, 11: o processo process12 emite dois elementos;
- linha 8: o elemento adicional emitido pelo processo process1, que não tem um parceiro no processo process2, não é emitido pelo processo de resultado process12;
Verifica-se que o processo process2, ao qual não tinha sido atribuído nem um thread de execução nem um thread de observação, utilizou o thread principal para ambos.
7.5.4. Exemplo 17: combinar dois observáveis com [Observable.combineLatest]
Analisamos agora o código seguinte:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple17 {
public static void main(String[] args) throws InterruptedException {
// processo 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// processo 2
Process<Double> process2 = new Process<>(
new ProcessAction01<Double>("process2", 2, i -> new Random().nextInt(200) * 1.4), null,
Schedulers.computation());
// combinação dos 2 processos
Process<Double> process12 = new Process<>("process12",
Observable.combineLatest(process1.getObservable(), process2.getObservable(), (d1, d2) -> d1 + d2));
// subscrições
ProcessUtils.subscribe(1, process12);
}
}
- linhas 14-16: um processo denominado [process1] irá emitir 3 números reais numa thread de cálculo. Será também observado numa thread de cálculo;
- linhas 18-20: um processo denominado [process2] irá emitir 2 números reais numa thread não imposta. Estes serão observados numa thread de cálculo;
- linha 23: os dois observáveis são combinados com o seguinte método estático [Observable.combineLatest]:
![]() |
O observável [combineLatest] funciona da seguinte forma: quando um dos dois observáveis emite um elemento E1, esse elemento é combinado por [combineFunction] com o último elemento emitido pelo outro observável.
A execução deste código produz o seguinte resultado:
- linha 5: a emissão de process2 (56) é combinada com o último elemento emitido por process1 (54, linha 4) e produz o resultado da linha 7;
- linha 6: a emissão de process1 (51,6) é combinada com o último elemento emitido por process2 (56, linha 5) e produz o resultado da linha 8;
- linha 9: a emissão de process2 (261,8) é combinada com o último elemento emitido por process1 (51,6, linha 6) e produz o resultado da linha 12;
- linha 13: a emissão de process1 (80,39) é combinada com o último elemento emitido por process2 (261,8, linha 9) e produz o resultado da linha 15;
Estamos aqui perante uma variante do observável [zip], em que, desta vez, os elementos combinados não são necessariamente os elementos que ocupam a mesma posição nos fluxos. Repara-se aqui que o processo process2, ao qual não tinha sido atribuída nenhuma thread de execução, foi aqui executado na thread principal (linha 2).
7.5.5. Exemplo 18: combinar dois observáveis com [Observable.amb]
Analisamos agora o código seguinte:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple18 {
public static void main(String[] args) throws InterruptedException {
// processo 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// processo 2
Process<Double> process2 = new Process<>(
new ProcessAction01<Double>("process2", 2, i -> new Random().nextInt(200) * 1.4), null, null);
// combinação dos 2 processos
Process<Double> process12 = new Process<>("process12",
Observable.amb(process1.getObservable(), process2.getObservable()));
// subscrições
ProcessUtils.subscribe(1, process12);
}
}
- linhas 14-16: um processo denominado [process1] irá emitir 3 números reais numa thread de cálculo. Será também observado numa thread de cálculo;
- linhas 18-20: um processo denominado [process2] irá emitir 2 números reais num thread não restrito. Estes serão observados num thread não restrito;
- linha 22: os dois observáveis são combinados com o seguinte método estático [Observable.amb]:
![]() |
Como mostra o esquema acima, o observável [Observable.amb(Observable o1, Observable o2)] emite os elementos do observável que os emite em primeiro lugar. O que é confirmado pelos resultados do exemplo apresentado:
- na linha 4, é o processo process2 que emite primeiro;
- linhas 8 e 12: o processo process12 emite todos os elementos emitidos pelo processo process2 (linhas 4 e 11);
7.6. Cadeia de processamento de um observável
7.6.1. Exemplo 19: transformar um observável com [Observable.map]
Nos exemplos anteriores, analisámos várias combinações de dois observáveis num terceiro observável. Apresentamos agora métodos estáticos da classe [Observable] que permitem operações de transformação, filtragem e agregação num observável. Encontraremos aqui métodos análogos aos da classe [Stream] estudados no parágrafo 5.
O nosso primeiro exemplo será o seguinte:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple19 {
public static void main(String[] args) throws InterruptedException {
// processo 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// processo 2
Process<String> process2 = new Process<>("process2",
process1.getObservable().map(d -> String.format("valeur-%s", d)));
// subscrições
ProcessUtils.subscribe(1, process2);
}
}
- linhas 14-16: um processo denominado process1 irá emitir 3 números reais num thread de cálculo. Será também observado num thread de cálculo;
- linhas 17-18: os números emitidos por process1 serão transformados em cadeias de caracteres num processo process2;
- linha 20: observa-se o process2;
O método [Observable.map] da linha 18 é análogo ao método [Stream.map] analisado no parágrafo 5.5:
![]() |
Os resultados do exemplo são os seguintes:
- linhas 4, 5 e 8: os resultados de process1. Trata-se de números reais;
- linhas 6, 7 e 10: as emissões de process2 observadas. São cadeias de caracteres;
7.6.2. Exemplo-20: filtrar uma variável observável com [Observable.filter]
O exemplo será o seguinte:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple20 {
public static void main(String[] args) throws InterruptedException {
// processo 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// processo 2
Process<Integer> process2 = new Process<>("process2", process1.getObservable().filter(i -> i % 2 == 0));
// subscrições
ProcessUtils.subscribe(1, process2);
}
}
- linhas 11-12: um processo denominado process1 irá emitir os números inteiros de 0 a 2 num thread de cálculo. Será também observado num thread de cálculo;
- linha 14: os números emitidos por process1 serão filtrados para que apenas os números pares fiquem em process2;
- linha 20: observa-se o process2;
O método [Observable.filter] da linha 18 é análogo ao método [Stream.filter] analisado no parágrafo 5.4:
![]() |
Os resultados do exemplo são os seguintes:
- linhas 4, 5 e 7: as transmissões de process1;
- linhas 6 e 9: as emissões de process2 observadas. São os elementos de process1 que são pares;
7.6.3. Exemplo 21: transformar um observável com [Observable.flatMap]
O exemplo será o seguinte:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple21 {
public static void main(String[] args) throws InterruptedException {
// processo 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// processo 2
Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMap(i -> {
int value = i * 10;
return Observable.just(value, value + 1, value + 2);
}));
// subscrições
ProcessUtils.subscribe(1, process2);
}
}
- linhas 12-13: um processo denominado process1 irá emitir os números inteiros de 0 a 2 num thread de cálculo. Será também observado num thread de cálculo;
- linhas 15-18: cada número n emitido por process1 é transformado num observável que emite os 3 números (10*n, 10*n+1, 10*n+2). Se, na linha 15, se utilizasse o método [map], o process2 emitiria um tipo Observable<Integer> e não um tipo Integer. O método [flatMap] utilizado permite simplificar (flatten) esta sequência de elementos do tipo Observable<Integer> numa sequência de elementos do tipo Integer constituída por cada um dos elementos de cada um dos Observable<Integer>;
- linha 20: observa-se process2;
O método [Observable.flatMap] da linha 15 é análogo ao método [Stream.flatMap] analisado no parágrafo 5.6.12:
![]() |
Os resultados do exemplo são os seguintes:
- linhas 5-7: as três transmissões de process2 na sequência da transmissão da linha 4 de process1;
- linhas 9-11: as três emissões de process2 na sequência da emissão da linha 8 de process1;
- linhas 14-16: as três emissões de process2 na sequência da emissão da linha 12 de process1;
O código seguinte mostra como criar um tipo Observable<Integer[]> a partir de process1 e [Exemple21b]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21b {
public static void main(String[] args) throws InterruptedException {
// processo 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// processo 2
Process<Integer[]> process2 = new Process<>("process2", process1.getObservable().map(i -> {
int value = i * 10;
return new Integer[] { value, value + 1, value + 2 };
}));
// subscrições
ProcessUtils.subscribe(1, process2);
}
}
- linha 14: utiliza-se o método [Observable.map];
- linha 16: que devolve um tipo Integer[];
Os resultados são os seguintes:
- linhas 6, 7, 10: vemos os resultados do map;
Todas estas transformações de observáveis podem ser encadeadas, uma vez que cada transformação produz um novo observável. É o que ilustra o exemplo seguinte [Exemple21c]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple21c {
public static void main(String[] args) throws InterruptedException {
// processo 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// processo 2
Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMap(i -> {
int value = i * 10;
return Observable.just(value, value + 1, value + 2);
}).filter(i -> i % 2 == 0));
// subscrições
ProcessUtils.subscribe(1, process2);
}
}
- linhas 15-18: o flatMap é seguido por um filter;
Os resultados da execução são os seguintes:
- linhas 8-13: o process2 emitiu apenas os elementos pares provenientes do flatMap;
Um método semelhante ao [flatMap] é o método [flatMapIterable], ilustrado pelo exemplo seguinte [Exemple21d]:
package dvp.rxjava.observables.exemples;
import java.util.Arrays;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21d {
public static void main(String[] args) throws InterruptedException {
// processo 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// processo 2
Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMapIterable(i -> {
int value = i * 10;
return Arrays.asList(value, value + 1, value + 2);
}).filter(i -> i % 2 == 0));
// subscrições
ProcessUtils.subscribe(1, process2);
}
}
Na linha 16, em vez de utilizar o método [flatMap], utiliza-se o método [flatMapIterable]. Neste caso, a função de transformação deve produzir um tipo Iterable<T> (linha 18) em vez de um tipo Observable<T>.
Obtêm-se os mesmos resultados que anteriormente.
Voltemos à definição do método [flatMap]:
![]() |
Como se pode ver acima, um elemento azul [3] foi inserido entre os dois elementos verdes [1-2]. Isto significa que, na sua operação de nivelamento dos Observable<T>, o método [flatMap] respeita a ordem de emissão destes diferentes observáveis internos. Isto é demonstrado pelo exemplo seguinte [Exemple21e]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21e {
public static void main(String[] args) throws InterruptedException {
// processo 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
Schedulers.computation());
// processo 2
Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
Schedulers.computation(), Schedulers.computation());
// processo 3
Process<Integer> process3 = new Process<>("process3",
process1.getObservable().flatMap(i -> process2.getObservable()));
// subscrições
ProcessUtils.subscribe(1, process3);
}
}
- linhas 11-12: o processo process1 gera os números inteiros [0,1];
- linhas 14-15: o processo process2 emite os números inteiros [10,11,12];
- linhas 17-18: a cada elemento emitido pelo process1 é associada a observável do processo process2. Isto significa que:
- ao elemento [0] do process1 será associada uma observável que emite os [10,11,12];
- o mesmo se aplica ao elemento 1;
No final, serão emitidos os 6 números [10, 11, 12, 10, 11, 12]. Queremos ver em que ordem.
Os resultados da execução são os seguintes:
Vemos que a ordem de emissão do processo process3 foi: [10, 10, 11, 12, 11, 12] (linhas 11, 12, 14, 17, 19, 22). Houve, portanto, uma mistura dos elementos emitidos pelo processo process2. É possível evitar isso utilizando o método [concatMap] em vez do método [flatMap]. É isso que mostra o código seguinte [Exemple21ef]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21ef {
public static void main(String[] args) throws InterruptedException {
// processo 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
Schedulers.computation());
// processo 2
Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
Schedulers.computation(), Schedulers.computation());
// processo 3
Process<Integer> process3 = new Process<>("process3",
process1.getObservable().concatMap(i -> process2.getObservable()));
// subscrições
ProcessUtils.subscribe(1, process3);
}
}
Na linha 18, substituímos [flatMap] por [concatMap]. Os resultados da execução são os seguintes:
Verifica-se que a ordem de emissão do processo process3 foi: [10, 11, 12, 10, 11, 12] (linhas 12-14, 17, 19, 22). Os elementos emitidos pelo processo process2 não foram misturados.
Outra variante do método [map] é o método [switchMap]:
![]() |
Acima, a partir do observável [1], surgem outros 3 observáveis [2] com 2 elementos, que são posteriormente achatados tal como em [flatMap] e [3]. Pode-se observar que o resultado tem 5 elementos e não 6. Isto deve-se ao facto de, antes de o segundo observável emitir o seu elemento n.º 2, [6], o terceiro observável emitir o seu primeiro elemento, [5], o que faz com que o segundo observável seja abandonado. Por conseguinte, o elemento [6] não aparece no observável resultante [3].
Para ilustrar o [switchMap], utilizaremos o seguinte exemplo [Exemple21eg]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21eg {
public static void main(String[] args) throws InterruptedException {
// processo 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
Schedulers.computation());
// processo 2
Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
Schedulers.computation(), Schedulers.computation());
// processo 3
Process<Integer> process3 = new Process<>("process3",
process1.getObservable().switchMap(i -> process2.getObservable()));
// subscrições
ProcessUtils.subscribe(1, process3);
}
}
A execução do exemplo produz os seguintes resultados:
- process1 emite 2 elementos que dão origem a 2 observáveis process2 de 3 elementos;
- linha 14: o observador recebe o elemento n.º 0 emitido pelo primeiro observável process2, linha 6;
- linha 15: o observador recebe o elemento n.º 0 emitido pelo segundo observável process2, na linha 13. Não se sabe por que razão não recebeu anteriormente os elementos 1 e 2 emitidos pelo primeiro observável process2 nas linhas 7 e 8. Seja como for, o primeiro observável process2 é abandonado;
- no final, o observador vê apenas 4 elementos (linhas 14, 15, 17, 20) em vez dos 6 que foram emitidos;
7.6.4. Exemplos-22: outros métodos da classe [Observable]
A classe [Observable] retoma vários métodos da classe [Stream] com um funcionamento análogo. Aqui estão alguns deles. Limitamo-nos a apresentar o código e os respetivos resultados.
[Exemple22a - take=limit]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22a {
public static void main(String[] args) throws InterruptedException {
// processo
Process<Integer> process = new Process<>("process", Observable.range(1, 10).take(3));
// subscrições
ProcessUtils.subscribe(1, process);
}
}
resultados
[Exemple22b - takeLast]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22b {
public static void main(String[] args) throws InterruptedException {
// processo
Process<Integer> process = new Process<>("process", Observable.range(1, 10).takeLast(2));
// subscrições
ProcessUtils.subscribe(1, process);
}
}
resultados
[Exemple22c - skip]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22c {
public static void main(String[] args) throws InterruptedException {
// processos
Process<Integer> process = new Process<>("process", Observable.range(1, 10).skip(5).take(2));
// subscrições
ProcessUtils.subscribe(1, process);
}
}
resultados
[Exemple22d - reduce]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22d {
public static void main(String[] args) throws InterruptedException {
// processos
Process<Integer> process = new Process<>("process", Observable.range(1, 10).reduce(0, (i, a) -> i + a));
// subscrições
ProcessUtils.subscribe(1, process);
}
}
- linha 10: calcula a soma dos elementos do observável. O resultado é um observável que emite essa soma;
resultados
[Exemple22e - all]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22e {
public static void main(String[] args) throws InterruptedException {
// processos
Process<Boolean> process = new Process<>("process", Observable.range(1, 10).all(i -> i > 10));
// subscrições
ProcessUtils.subscribe(1, process);
}
}
- linha 10: devolve um Observable<Boolean> que emite o elemento true, se o predicado do método [all] for verdadeiro para todos os elementos; caso contrário, devolve false;
resultados
[Exemple22f - count]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22f {
public static void main(String[] args) throws InterruptedException {
// processos
Process<Integer> process = new Process<>("process", Observable.range(1, 10).count());
// subscrições
ProcessUtils.subscribe(1, process);
}
}
- linha 10: [Observable.count] cria um observável com 1 elemento que corresponde à soma dos elementos observados;
resultados
[Exemple22g - distinct]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22g {
public static void main(String[] args) throws InterruptedException {
// processos
Process<Integer> process = new Process<>("process", Observable.just(1, 2, 1, 3).distinct());
// subscrições
ProcessUtils.subscribe(1, process);
}
}
resultados
[Exemple22h - groupBy, asObservable]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.observables.GroupedObservable;
public class Exemple22h {
public static void main(String[] args) throws InterruptedException {
// processos
Observable<GroupedObservable<Boolean, Integer>> obs = Observable.range(1, 10).groupBy(i -> i % 2 == 0);
Process<Integer> process = new Process<>("process", obs.concatMap(g -> g.asObservable()));
// subscrições
ProcessUtils.subscribe(1, process);
}
}
- linha 11: o método [groupBy] agrupa os 10 elementos emitidos em 2 grupos, os números pares e os números ímpares. O resultado é um tipo Observable<GroupedObservable<Boolean, Integer>>, ou seja, um observável cujos elementos são do tipo GroupedObservable<Boolean, Integer>, em que Boolean é o tipo da chave do grupo (false, true, neste caso) e que é também o tipo do resultado da função lambda passada como parâmetro ao método [groupBy], sendo Integer o tipo dos elementos do grupo;
- linha 12: o tipo GroupedObservable possui um método [asObservable] que permite criar um observável a partir desse tipo. Teremos, portanto, dois tipos Observable<Integer>, um para os números pares e outro para os números ímpares. A partir destes dois observáveis, o método [concatMap] irá criar um único observável;
resultados
[Exemple22i - timestamp]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
import rx.schedulers.Timestamped;
public class Exemple22i {
public static void main(String[] args) throws InterruptedException {
// processo 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// processo 2
Process<Timestamped<Integer>> process2 = new Process<>("process2", process1.getObservable().timestamp());
// subscrições
ProcessUtils.subscribe(1, process2);
}
}
- na linha 15, o método [timestamp] associa uma hora a cada elemento da variável observável processada;
resultados
Neste exemplo, é difícil determinar o que representa a informação timestamp:
- linhas 4-5: verifica-se que o elemento 1 de process1 foi emitido 139 ms após o elemento 0;
- linhas 6 e 7: verifica-se que o elemento 1 de process2 foi observado 234 ms após o elemento 0;
- linhas 5 e 8: verifica-se que o elemento 2 de process1 foi emitido 33 ms após o elemento 1;
- linhas 7 e 10: verifica-se que o elemento 2 de process2 foi observado 37 ms após o elemento 1;
Estes desfasamentos devem-se ao facto de os threads de observação e de execução dos observáveis não serem os mesmos. Se substituirmos as linhas 12-13 pelas seguintes (Exemplo22j):
// processo 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
null);
- linhas 2-3: não se impõe o thread de observação. Sabe-se que, nesse caso, o observável é observado no local onde é executado;
Isto dá os seguintes resultados:
- linhas 4 e 6: o processo process1 emite o seu elemento n.º 1 587 ms após o seu elemento n.º 0;
- linhas 5 e 7: o observador observa estes dois elementos com um intervalo de 586 ms;
- linhas 6 e 8: o processo process1 emite o seu elemento n.º 2 396 ms após o seu elemento n.º 1;
- linhas 7 e 9: o observador observa estes dois elementos com um intervalo de 396 ms;
Aqui, os valores do timestamp são coerentes: representam efetivamente a data de emissão do elemento.
7.7. Os agendadores
7.7.1. Exemplo 23: o agendador [Schedulers.computation]
Vamos agora analisar os agendadores de execução. A observação será feita no thread de execução.
O tema dos agendadores é um pouco obscuro. Os diferentes agendadores são apresentados nesta questão no site de StackOverflow [http://stackoverflow.com/questions/31276164/rxjava-schedulers-use-cases]:
![]() |
Vamos tentar ilustrar a utilização destes diferentes agendadores através de exemplos. O primeiro ilustra o agendador [Schedulers.computation]:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple23 {
public static void main(String[] args) throws InterruptedException {
// processos
@SuppressWarnings("unchecked")
Process<Double> processes[] = new Process[10];
for (int i = 0; i < processes.length; i++) {
processes[i] = new Process<>(
new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
Schedulers.computation(), null);
}
// subscrições
ProcessUtils.subscribe(1, processes);
}
}
- linhas 14-19: cria-se uma matriz de 10 processos a serem executados num thread de cálculo;
- linha 17: cada processo gera um número real aleatório;
- linha 21: subscrevem-se todos esses processos;
Os resultados são os seguintes:
- linhas 2-10: os primeiros 8 processos iniciam-se em 8 threads diferentes (a máquina utilizada tem 8 núcleos). É possível observar que todos iniciam aproximadamente ao mesmo tempo;
- linhas 17-19: 3 processos terminam, libertando assim 3 threads;
- linhas 23-24: os dois últimos processos podem então iniciar, ocupando 2 das threads assim libertadas;
Conclui-se, portanto, que o agendador [Schedulers.computation] fornece um conjunto de n threads, em que n é o número de núcleos da máquina. As threads são executadas em paralelo nesses núcleos.
7.7.2. Exemplo 24: o agendador [Schedulers.io]
Executamos o código anterior com o agendador [Schedulers.io]:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple24 {
public static void main(String[] args) throws InterruptedException {
// processos
@SuppressWarnings("unchecked")
Process<Double> processes[] = new Process[10];
for (int i = 0; i < processes.length; i++) {
processes[i] = new Process<>(
new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
Schedulers.io(), null);
}
// subscrições
ProcessUtils.subscribe(1, processes);
}
}
- linha 18: os processos são executados com os threads do agendador [Schedulers.io];
Isto produz os seguintes resultados:
- linhas 2-10: os 10 processos são iniciados, cada um num thread diferente. Ao contrário do caso anterior, todos os processos puderam ser iniciados. Verifica-se que estes inícios demoram 6 ms, enquanto anteriormente demoravam 1 ms;
- linhas 13-18: os observáveis emitem um após o outro e não de forma quase paralela, como tinha acontecido anteriormente;
Qual é a diferença entre os agendadores [Schedulers.io] e [Schedulers.computation]? Pode encontrar-se uma resposta no URL [http://stackoverflow.com/questions/31276164/rxjava-schedulers-use-cases]:
![]() |
7.7.3. Exemplo 25: o agendador [Schedulers.newThread]
Executamos o código anterior com o agendador [Schedulers.newThread]:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple25 {
public static void main(String[] args) throws InterruptedException {
// processos
@SuppressWarnings("unchecked")
Process<Double> processes[] = new Process[10];
for (int i = 0; i < processes.length; i++) {
processes[i] = new Process<>(
new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
Schedulers.newThread(), null);
}
// subscrições
ProcessUtils.subscribe(1, processes);
}
}
Os resultados obtidos são os mesmos que com o agendador [Schedulers.io]:
No URL e no [http://stackoverflow.com/questions/33415881/retrofit-with-rxjava-schedulers-newthread-vs-schedulers-io], explica-se que o agendador [Schedulers.io] fornece um conjunto de threads, o que não acontece com o agendador [Schedulers.newThread]. Um conjunto de threads cria automaticamente um número n de threads. Aloca-as aos processos que delas necessitam. Quando estes terminam, as suas threads não são eliminadas, mas regressam ao conjunto e podem então ser reutilizadas por outro processo. Isto é mais eficiente do que criar e eliminar threads constantemente. Por isso, pode considerar-se que é preferível utilizar o agendador [Schedulers.io].
7.7.4. Exemplo 26: os agendadores [Schedulers.immediate, Schedulers.trampoline]
Voltemos à explicação dada para estes dois agendadores:
![]() |
A explicação é bastante simples de compreender, mas quando se tenta ilustrá-la, percebe-se que não a compreendemos. Foi o livro [Learning Reactive Programming With Java 8] que me permitiu criar um exemplo que retoma um exemplo encontrado nesse livro, mas que o simplifica. É o seguinte:
package dvp.rxjava.observables.exemples;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.function.Consumer;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Scheduler;
import rx.Scheduler.Worker;
import rx.functions.Action0;
import rx.schedulers.Schedulers;
public class Exemple26 {
public static void main(String[] args) throws InterruptedException {
// um agendador
Scheduler scheduler = Schedulers.immediate();
// um worker desse agendador
Worker worker = scheduler.createWorker();
// um tipo Action0 a ser executado no trabalhador
Action0 action02 = new Action0() {
@Override
public void call() {
// registo da ação02
ProcessUtils.showInfos.accept("action02");
}
};
// um tipo Action0 a executar no worker
Action0 action01 = new Action0() {
@Override
public void call() {
// programa-se uma nova ação no mesmo worker
worker.schedule(action02);
// registo da ação01
ProcessUtils.showInfos.accept("action01");
}
};
// a ação 01 está programada no worker
worker.schedule(action01);
}
// visualizações
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
- linha 17: um agendador. Será ou o [Schedulers.immediate], como aqui, ou o [Schedulers.trampoline] posteriormente;
- linha 19: é possível executar ações do tipo Action0 (linhas 21, 20) nos workers do agendador. O método [Scheduler.createWorker] permite criar um worker. O método [Worker.schedule(Action0)] permite que um worker execute um tipo Action0;
- linhas 21-27: uma primeira ação denominada [action02] que será executada (linha 40) pelo worker da linha 19;
- linhas 30-38: uma segunda ação denominada [action01]. Esta ação tem a particularidade de fazer com que a ação action02 seja executada no mesmo worker que ela (linha 34). É aqui que reside a diferença entre [Schedulers.immediate] e [Schedulers.trampoline]:
- se o agendador for [Schedulers.immediate], então, na linha 34, a ação action02 será executada imediatamente (daí o nome do agendador) e a ação action01 em curso será interrompida. Aparecerá então a mensagem da linha 25. Concluída a ação action02, a ação action01 será retomada e aparecerá a mensagem da linha 36;
- se o agendador for [Schedulers.trampoline], então, na linha 34, a ação action02 é colocada em espera. Só será executada quando a tarefa em curso action01 estiver concluída. Veremos então aparecer a mensagem da linha 36. Concluída a ação action01, a ação action02 será executada e veremos a mensagem da linha 25;
A execução do código acima produz os seguintes resultados:
Se, na linha 17, utilizarmos o agendador [Schedulers.trampoline], obtemos os resultados opostos:
Dito isto, é difícil estabelecer uma ligação com os observáveis. Não encontrei nenhum exemplo convincente que pudesse demonstrar a vantagem de executar um observável numa destas duas threads. Aqui está, no entanto, um exemplo, mas que não me parece nada natural:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.Scheduler.Worker;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
public class Exemple27 {
public static void main(String[] args) throws InterruptedException {
// Worker
Worker worker = Schedulers.immediate().createWorker();
// Worker worker = Schedulers.trampoline().createWorker();
// observável 1 no worker
worker.schedule(() -> Observable.range(1, 2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
ProcessUtils.showInfos.accept(String.valueOf(i));
// observável 2 no mesmo worker
worker.schedule(() -> Observable.range(100, 2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
ProcessUtils.showInfos.accept(String.valueOf(i));
}
}));
}
}));
}
}
- linhas 13-14: cria-se um worker a partir de um dos dois agendadores [Schedulers.immediate] e [Schedulers.trampoline];
- linha 16: um primeiro observável obs1 é programado neste worker para emitir os números [1,2]
- linha 22: sempre que um elemento deste observável obs1 é observado, é iniciada a observação de um segundo observável obs2 no mesmo worker para emitir os números [100,101];
Com o agendador [Schedulers.immediate], obtêm-se os seguintes resultados:
Já com o agendador [Schedulers.trampoline], obtêm-se os seguintes resultados:
7.8. Conclusion
Ainda há muito a fazer. Para aprofundar os conhecimentos sobre a biblioteca RxJava, convidamos o leitor a prosseguir a sua formação com as referências indicadas no início deste documento. Ainda assim, dispomos das bases necessárias para utilizar o RxJava nos ambientes Swing e Android. É isso que vamos demonstrar agora.








































