Skip to content

7. A biblioteca RxJava

A biblioteca RxJava baseia-se no seguinte conceito: um fluxo de elementos do tipo T Observable<T> é observado por um ou mais assinantes (assinantes, observadores, consumidores) Subscriber<T>. A biblioteca RxJava permite que o fluxo Observable<T> seja executado na thread T1 e o seu observador Subscriber<T> na thread T2, sem que o programador tenha de se preocupar com a gestão do ciclo de vida dessas threads ou com problemas naturalmente complexos, tais como a partilha de dados entre threads e a sua sincronização para executar uma tarefa global. Facilita, assim, a programação assíncrona.

Um fluxo Observable<T> produz elementos do tipo T, que podem ser observados à medida que são produzidos. Se o observador e o observável (um termo usado de forma genérica para se referir ao tipo Observable<T>) estiverem na mesma thread, então o observável só pode produzir o elemento (i+1) depois de o observador ter consumido o elemento i. Existem poucos casos em que esta arquitetura é útil. Se o observador e o observável não estiverem na mesma thread, então o observável e o seu observador comportam-se de forma autónoma: o observável emite ao seu próprio ritmo e o observador consome ao seu próprio ritmo. É aqui que reside o valor da biblioteca. Até agora, discutimos apenas um único observador. Na realidade, um observável pode ter qualquer número de observadores.

A biblioteca RxJava é particularmente adequada para a arquitetura descrita na Secção 2 da introdução e resumida aqui:

Image

  • em [1], uma camada de serviço fornece serviços, alguns dos quais demoram muito tempo a obter (pedidos de rede, por exemplo);
  • esta camada de serviço é invocada por uma interface gráfica de utilizador [1] (Swing, Android, JavaFX). Se a camada de serviço for executada na mesma thread que o método [Swing] que a utiliza, a interface gráfica de utilizador congela (deixa de responder) enquanto aguarda o resultado do serviço;
  • Em [2], uma camada de adaptação fina implementada com RxJava permite que a camada GUI receba uma implementação assíncrona do mesmo serviço: este serviço pode ser executado numa thread diferente do método da camada GUI que o invoca. Neste caso, a GUI [3] permanece responsiva: o utilizador pode continuar a interagir com ela, por exemplo, acionando um novo pedido de rede em paralelo com o primeiro e, mais importante ainda, pode ter a opção de cancelar processos que demorem demasiado tempo — algo impossível se a GUI estiver congelada;
  • A chamada [4] é síncrona, enquanto as chamadas [5-6] são assíncronas;

Nesta arquitetura, a camada [2] fornece serviços que devolvem tipos Observable<T> aos quais os métodos da camada gráfica [3] podem subscrever. Um serviço na camada [2] entrega então os seus resultados um a um, e a camada [3] pode reagir a cada um deles, por exemplo, atualizando um ou mais componentes da interface gráfica do utilizador.

A classe Observable<T> possui dezenas de métodos. Este é um dos desafios da biblioteca: é muito rica e é difícil compreender todas as suas possibilidades. Apresentaremos alguns deles. O domínio dos outros métodos virá com o tempo.

7.1. Criar observáveis e subscrevê-los

7.1.1. Exemplo-01: o método [Observable.from]

  

Considere o seguinte código:


package dvp.rxjava.observables;
 
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
 
import java.util.Arrays;
 
public class Exemple01 {
  public static void main(String[] args) {
    // observable integers
    Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
    obs1.subscribe(new Action1<Integer>() {
      @Override
      public void call(Integer integer) {
        System.out.printf("next : %s%n", integer);
      }
    }, new Action1<Throwable>() {
      @Override
      public void call(Throwable throwable) {
        System.out.println(throwable);
      }
    }, new Action0() {
      @Override
      public void call() {
        System.out.println("completed");
      }
    });
  }
}
  • Linha 12: Criamos um tipo Observable<Integer> a partir de uma lista de inteiros.

A classe Observable<T> é um fluxo de elementos do tipo T que podem ser observados — de preferência de forma assíncrona, mas não necessariamente — à medida que são produzidos. A sua definição é a seguinte:

 

Como mencionado anteriormente, a classe Observable<T> possui dezenas de métodos. Alguns são semelhantes aos da classe Stream<T>, discutida na Secção 5. A documentação do RxJava inclui «diagramas de bolinhas» [2] que ilustram como estes métodos funcionam:

  • A linha 3 ilustra as emissões do observável ao longo do tempo;
  • o método [4] é aplicado aos elementos emitidos pelo observável. Geralmente, produz um novo observável;
  • a linha 5 mostra o novo observável obtido;

O método [Observable.from] tem a seguinte assinatura:

 

O método estático [Observable.from] permite criar um Observable<T> a partir de uma coleção de elementos do tipo T. Esta é uma forma muito simples de começar a trabalhar com observáveis. A linha:


    Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));

irá, portanto, emitir três elementos. Não os emite imediatamente. Irá emiti-los na íntegra sempre que um subscritor se registar. Isto é designado por observável frio. O observável reemite os seus elementos para cada novo subscritor.

Podemos considerar a instrução anterior como uma ação de configuração para o observável. É configurado uma vez e executado n vezes se aparecerem n assinantes.

Como se inscreve?

Uma forma de o fazer é utilizar o método [Observable.subscribe], cuja definição aqui utilizada é a seguinte:

 
  • o primeiro parâmetro [Action1<T> onNext] (ver Secção 6.2) do método é o método a ser executado quando o observável emite um novo elemento T;
  • o segundo parâmetro [Action1<Throwable> onError] do método é o método a ser executado quando o observável lança uma exceção;
  • o terceiro parâmetro [Action0 onComplete] (ver Secção 6.1) do método é o método a ser executado quando o observável emite uma exceção;
  • o método retorna um tipo [Subscription];

O tipo [Subscription] representa uma subscrição do observável. A sua definição é a seguinte:

 

O valor desta interface [1] reside no seu método [2], que permite cancelar uma subscrição.

No nosso exemplo, o código para subscrever o observável é o seguinte:


    obs1.subscribe(new Action1<Integer>() {
      @Override
      public void call(Integer integer) {
        System.out.printf("next : %s%n", integer);
      }
    }, new Action1<Throwable>() {
      @Override
      public void call(Throwable throwable) {
        System.out.println(throwable);
      }
    }, new Action0() {
      @Override
      public void call() {
        System.out.println("completed");
      }
});
  • linha 1: o resultado do tipo [Subscription] é ignorado;
  • linhas 1–15: os três parâmetros são instâncias de classes anónimas. Também utilizaremos lambdas. A vantagem das classes anónimas é que os tipos de dados esperados pelo único método destas classes são claramente visíveis;
  • linhas 2–5: implementação do primeiro parâmetro do tipo [Action1<Integer>];
  • linhas 6–10: implementação do segundo parâmetro do tipo [Action1<Throwable>];
  • linhas 11–15: implementação do terceiro parâmetro do tipo [Action0];

O código completo é o seguinte:


package dvp.rxjava.observables;
 
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
 
import java.util.Arrays;
 
public class Exemple01 {
  public static void main(String[] args) {
    // observable integers
    Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
    // subscription
    obs1.subscribe(new Action1<Integer>() {
      @Override
      public void call(Integer integer) {
        System.out.printf("next : %s%n", integer);
      }
    }, new Action1<Throwable>() {
      @Override
      public void call(Throwable throwable) {
        System.out.println(throwable);
      }
    }, new Action0() {
      @Override
      public void call() {
        System.out.println("completed");
      }
    });
  }
}

O observável na linha 12 começa a emitir os seus três elementos assim que o método [subscribe] é chamado na linha 14. A partir desse momento:

  • para cada elemento emitido, as linhas 15–18 são executadas.
  • quando os 3 elementos estiverem concluídos, as linhas 24–29 são executadas;
  • as linhas 19–24 nunca serão executadas porque o observável não emite uma exceção aqui;

Por predefinição, o observável e o observador são executados na mesma thread. Existem alguns observáveis predefinidos que são executados numa thread diferente da thread principal (aqui, a thread do método main), mas para a maioria deles, não é esse o caso. Assim, aqui, tudo acontece na thread do método main:

  • o observável emite o elemento 1;
  • as linhas 15–18 são executadas e exibem este elemento;
  • o observável emite o elemento 2;
  • as linhas 15–18 executam e apresentam este elemento;
  • o observável emite o elemento 3;
  • as linhas 15–18 executam e exibem este elemento;
  • o observável emite a notificação [completed];
  • as linhas 24–29 são executadas;

Eis o que os resultados mostram:

1
2
3
4
next : 1
next : 2
next : 3
completed

A classe [Example02] reutiliza [Example01], desta vez utilizando funções lambda como parâmetros para o método [Observable.subscribe]:


package dvp.rxjava.observables;
 
import java.util.Arrays;
 
import rx.Observable;
 
public class Exemple02 {
  public static void main(String[] args) {
    // observable integers
    Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
    // subscription
    obs1.subscribe(
      (integer) -> System.out.printf("next : %s%n", integer),
      (th) -> System.out.println(th),
      () -> System.out.println("completed"));
  }
}

7.1.2. Exemplo-03: A Classe Observador

  

O método [Observable.subscribe], que permite subscrever um observável, tem várias versões, incluindo as seguintes:


package dvp.rxjava.observables;
 
import java.util.Arrays;
 
import rx.Observable;
import rx.Observer;
 
public class Exemple03 {
    public static void main(String[] args) {
        // observable integers
        Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
        // subscription
        obs1.subscribe(new Observer<Integer>() {
            @Override
            public void onCompleted() {
                System.out.println("completed");
            }
 
            @Override
            public void onError(Throwable th) {
                System.out.printf("throwable %s", th);
            }
 
            @Override
            public void onNext(Integer integer) {
                System.out.printf("next : %s%n", integer);
            }
        });
    };
}

Linha 13: Em vez de passar três parâmetros para o método [subscribe], passamos-lhe um tipo [Observer] da seguinte forma:

 

O tipo [Observer] é uma interface com três métodos:

  • [onNext(T t)], que é chamado sempre que o observável emite um elemento t;
  • [onError(Throwable th)], que é chamado quando o observável lança uma exceção th;
  • [onCompleted], que é chamado quando o observável indica que terminou de emitir;

O código funciona de forma semelhante ao que foi explicado anteriormente. Obtemos os seguintes resultados:

1
2
3
4
next : 1
next : 2
next : 3
completed

7.1.3. Exemplo-04: O método [Observable.create]

  

O método estático Observable.create é definido da seguinte forma:

 
  • O método [create] retorna um tipo Observable<T>;
  • o parâmetro do método [create] é uma função do tipo [Observable.OnSubscribe<T>] definida da seguinte forma:
 

O tipo [Observable.OnSubscribe<T>] é uma interface funcional que, por sua vez, estende a interface funcional [Action1<Subscriber<? super T>>]. O método [call] desta interface espera um tipo [Subscriber] (subscriber, observer) definido da seguinte forma:

 

Vemos em [1] que a classe [Subscriber<T>] implementa a interface [Observer<T>] apresentada na Secção 7.1.2.

Em última análise, o método [<T> Observable.create]:

  • toma como parâmetro uma instância do tipo [Observable.OnSubscribe<T>] com um único método: void call(Subscriber<T> s). O tipo [Subscriber<T>] estende o tipo [Observer<T>] e, portanto, possui os métodos onNext, onError e onCompleted;
  • retorna um tipo Observable<T>;

O método [<T> Observable.create] retorna um observável configurado. Ainda não foram emitidos elementos. Quando um subscritor [Subscriber<T> s] subscreve este observável, o método [void call(s)] da função passada como parâmetro ao método [<T> Observable.create] é então chamado. A sua função é emitir elementos t do tipo T e chamar o método do observador [s.onNext(t)] em cada emissão. Quando isto estiver concluído, o método [s.onCompleted(t)] do observador deve ser chamado e o método [call] deve terminar. Se o método [call] encontrar uma exceção th, o método [s.onError(th)] do observador deve ser chamado e o método [call] deve terminar;

Para ilustrar este comportamento complexo, utilizaremos o seguinte código [Exemplo04]:


package dvp.rxjava.observables;
 
import rx.Observable;
import rx.Subscriber;
 
import java.util.Random;
 
public class Exemple04 {
    public static void main(String[] args) {
        // observable configuration of reals
        Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
            @Override
            public void call(Subscriber<? super Double> subscriber) {
                for (int i = 0; i < 3; i++) {
                    // emission element i
                    subscriber.onNext(new Random((i + 1)).nextDouble());
                }
                // end of issue
                subscriber.onCompleted();
            }
        });
        // subscription and therefore emission
        obs1.subscribe((d) -> System.out.printf("onNext %s%n", d), (th) -> System.out.printf("onError %s%n", th),
                () -> System.out.println("onCompleted"));
    }
}
  • linha 11: é criado um observável que emite tipos Double;
  • linhas 11–21: o parâmetro do método [create] é instanciado com uma classe anónima contendo o único método [call] das linhas 12–20. O observável criado na linha 11 está pronto para emitir, mas só o fará quando um observador chegar;
  • linhas 13–21: o método [call] recebe uma referência a um observador;
  • linhas 14–17: são emitidos três elementos para o observador;
  • linha 19: notifica o observador de que a emissão terminou;
  • linhas 23–24: Subscrição do observável da linha 11. Implementamos os três parâmetros [onNext, onError, onCompleted] do método [subscribe] utilizando três lambdas. Esta subscrição irá criar o subscritor [Subscriber<Double>], que será passado para o método [call] na linha 13. A emissão de elementos terá então início;
  • tudo acontece na mesma thread: observável e observador;

Obtemos os seguintes resultados:

1
2
3
4
onNext 0.7308781907032909
onNext 0.7311469360199058
onNext 0.731057369148862
onCompleted

O método [Observable.create] permite criar um observável a partir de qualquer evento. Este é o método que utilizámos na Secção 2 da introdução para transformar uma interface síncrona numa assíncrona.

7.1.4. Exemplo-05: Refatoração do [Exemplo-04]

  

O exemplo seguinte apresenta uma nova versão do método estático [Observable.subscribe]:


package dvp.rxjava.observables;
 
import rx.Observable;
import rx.Subscriber;
 
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
 
public class Exemple05 {
    public static void main(String[] args) {
        // configuration of a real observable
        Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
            @Override
            public void call(Subscriber<? super Double> subscriber) {
                showInfos("Observable.call start");
                for (int i = 0; i < 3; i++) {
                    // waiting
                    try {
                        Thread.sleep(500 - i * 100);
                    } catch (InterruptedException e) {
                        // error
                        subscriber.onError(e);
                    }
                    // action
                    double value = new Random().nextInt(100) * 1.2;
                    showInfos(String.format("Observable.call onNext(%s)", value));
                    subscriber.onNext(value);
                }
                // finish
                showInfos(String.format("Observable.call onCompleted"));
                subscriber.onCompleted();
            }
        });
 
        // a subscriber
        Subscriber<Double> subscriber = new Subscriber<Double>() {
            @Override
            public void onCompleted() {
                showInfos("Subscriber.onCompleted");
            }
 
            @Override
            public void onError(Throwable e) {
                showInfos(String.format("Subscriber.onError (%s)", e));
            }
 
            @Override
            public void onNext(Double aDouble) {
                showInfos(String.format("Subscriber.onNext (%s)", aDouble));
            }
        };
 
        // subscription
        showInfos("avant souscription");
        obs1.subscribe(subscriber);
        showInfos("après souscription");
 
    }
 
    private static void showInfos(String message) {
        System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
                new SimpleDateFormat("ss:SSS").format(new Date()));
    }
}
  • linha 56: a nova versão do método estático [Observable.subscribe] aceita o tipo [Subscriber] como parâmetro, que introduzimos no parágrafo anterior;
  • linhas 37–52: o assinante (observador). Ele implementa a interface Observer com os seus três métodos onNext, onError e onCompleted;
  • Linhas 61–64: A partir daqui, vamos concentrar-nos nos threads em que o observável e o seu observador são executados;
  • linha 62: o nome da thread;
  • linha 63: a hora atual expressa em segundos e milissegundos. Isto permitir-nos-á acompanhar ao longo do tempo a emissão de elementos pelo observável e o seu processamento pelo observador;
  • Este código tem a mesma funcionalidade que o código anterior. Simplesmente refatorámos este último;

Os resultados obtidos são os seguintes:

avant souscription ------Thread[main] ---- Time[31:685]
Observable.call start ------Thread[main] ---- Time[31:691]
Observable.call onNext(80.39999999999999) ------Thread[main] ---- Time[32:194]
Subscriber.onNext (80.39999999999999) ------Thread[main] ---- Time[32:195]
Observable.call onNext(73.2) ------Thread[main] ---- Time[32:595]
Subscriber.onNext (73.2) ------Thread[main] ---- Time[32:595]
Observable.call onNext(106.8) ------Thread[main] ---- Time[32:897]
Subscriber.onNext (106.8) ------Thread[main] ---- Time[32:897]
Observable.call onCompleted ------Thread[main] ---- Time[32:898]
Subscriber.onCompleted ------Thread[main] ---- Time[32:898]
après souscription ------Thread[main] ---- Time[32:899]
  • Linha 1 dos resultados: antes da linha 56 do código, ainda não aconteceu nada. O observável foi simplesmente configurado;
  • Linha 2 dos resultados: a linha 56 do código aciona uma chamada ao método [call] na linha 15. Linha 3: o número real 80,39 é emitido para o observador;
  • Linha 4: O observador recebe o número emitido;
  • linhas 5–8: o processo anterior repete-se duas vezes;
  • linha 9: o observável envia a notificação de fim de transmissão;
  • linha 10: o observador recebe-a;
  • linha 11: exibida pela linha 57 do código;

Podemos ver, portanto, que a única linha de subscrição 56 fez com que as linhas 2–10 dos resultados fossem exibidas. Ao começar a utilizar a biblioteca RxJava, questiona-se como as coisas estão interligadas, particularmente as ligações entre o observador e o observável. Aqui vemos que a linha 56, a subscrição do observável,

  • desencadeou a emissão de todos os elementos do observável;
  • que o observável e o observador são executados na mesma thread;
  • e que, por causa disso, observamos a sequência: emitir elemento i, observar elemento i, emitir elemento (i+1), observar elemento (i+1), ...

Lembre-se de que o emissor estava à espera antes de emitir os seus elementos:


                    // attente
                    try {
                        Thread.sleep(500 - i * 100);
                    } catch (InterruptedException e) {
                        // erreur
                        subscriber.onError(e);
}

onde i na linha 3 representa o número de emissão (0 <= i < 3). Se analisarmos os tempos de emissão dos elementos do observável:

  • linhas 2, 3: o elemento 0 foi emitido aproximadamente 500 ms após o início da subscrição;
  • linhas 3, 5: o elemento 1 foi emitido aproximadamente 400 ms após o elemento 0;
  • linhas 5, 7: o elemento 2 foi emitido aproximadamente 300 ms após o elemento 1;

7.2. Thread de execução, thread de observação

7.2.1. Exemplo-06: Observável e observador numa thread diferente de [main]

  

Reestruturamos o exemplo anterior da seguinte forma [Exemplo 06]:


package dvp.rxjava.observables;
 
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
 
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
 
public class Exemple06 {
    public static void main(String[] args) {
 
        // gatekeeper
        CountDownLatch latch = new CountDownLatch(1);
 
        // configuration of a real observable
        Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
            @Override
            public void call(Subscriber<? super Double> subscriber) {
                showInfos("Observable.call start");
                for (int i = 0; i < 3; i++) {
                    // waiting
                    try {
                        Thread.sleep(500 - i * 100);
                    } catch (InterruptedException e) {
                        // error
                        subscriber.onError(e);
                    }
                    // action
                    double value = new Random().nextInt(100) * 1.2;
                    showInfos(String.format("Observable.call onNext(%s)", value));
                    subscriber.onNext(value);
                }
                // finish
                showInfos(String.format("Observable.call onCompleted"));
                subscriber.onCompleted();
            }
        });
 
        // a subscriber
        Subscriber<Double> subscriber = new Subscriber<Double>() {
            @Override
            public void onCompleted() {
                showInfos("Subscriber.onCompleted");
                // we lower the barrier
                latch.countDown();
            }
 
            @Override
            public void onError(Throwable e) {
                showInfos(String.format("Subscriber.onError (%s)", e));
            }
 
            @Override
            public void onNext(Double aDouble) {
                showInfos(String.format("Subscriber.onNext (%s)", aDouble));
            }
        };
 
        // suite observable configuration
        obs1 = obs1.subscribeOn(Schedulers.computation());
        // subscription
        showInfos("avant souscription");
        obs1.subscribe(subscriber);
        // waiting at the gate
        try {
            showInfos("début attente barrière");
            latch.await();
            showInfos("fin attente barrière");
        } catch (InterruptedException e1) {
            System.out.println(e1);
        }
        showInfos("après souscription");
 
    }
 
    private static void showInfos(String message) {
        System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
                new SimpleDateFormat("ss:SSS").format(new Date()));
    }
}
  • Linha 16: Criamos um guardrail (semáforo) utilizando um objeto [CountDownLatch]. Este objeto é utilizado para sincronizar threads entre si. Aqui, é inicializado com o valor 1, ao qual nos referiremos como o valor do guardrail (ou semáforo). Uma thread aguarda o guardrail utilizando a seguinte operação:

latch.await();

A thread fica bloqueada se o valor do latch for >0. Uma thread pode incrementar ou decrementar o valor interno do latch. Linha 48: o valor do latch é decrementado em 1.

  • Linha 63: o observável é configurado para ser executado numa thread fornecida pelo agendador [Schedulers.computation()]. Este agendador pode fornecer tantas threads quantos forem os núcleos na máquina de execução. A secção sobre a aplicação de exemplo demonstrou a utilização de outros agendadores (ver Secção 2.8);

O princípio do código é o seguinte:

  • o método [main] é executado na thread principal;
  • linha 66: começa a emitir elementos a partir do observável. Estes serão emitidos numa thread diferente da thread principal;
  • linha 70: a thread principal é bloqueada porque a barreira tem o valor 1 (ver linha 16). Só pode continuar quando este valor mudar para 0. Isto acontece na linha 48. É o observador que baixa a barreira quando recebe a notificação de que o observável terminou a emissão;

A execução produz os seguintes resultados:

avant souscription ------Thread[main] ---- Time[09:268]
Observable.call start ------Thread[RxComputationThreadPool-1] ---- Time[09:278]
début attente barrière ------Thread[main] ---- Time[09:278]
Observable.call onNext(44.4) ------Thread[RxComputationThreadPool-1] ---- Time[09:783]
Subscriber.onNext (44.4) ------Thread[RxComputationThreadPool-1] ---- Time[09:783]
Observable.call onNext(18.0) ------Thread[RxComputationThreadPool-1] ---- Time[10:183]
Subscriber.onNext (18.0) ------Thread[RxComputationThreadPool-1] ---- Time[10:184]
Observable.call onNext(54.0) ------Thread[RxComputationThreadPool-1] ---- Time[10:486]
Subscriber.onNext (54.0) ------Thread[RxComputationThreadPool-1] ---- Time[10:488]
Observable.call onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[10:489]
Subscriber.onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[10:490]
fin attente barrière ------Thread[main] ---- Time[10:491]
après souscription ------Thread[main] ---- Time[10:493]
  • linha 1: a subscrição está prestes a ocorrer;
  • linha 2: isto desencadeia a execução do método [call] na thread [RxComputationThreadPool-1]. Temos agora uma execução paralela com duas threads;
  • linha 3: por uma razão desconhecida, a thread [RxComputationThreadPool-1] cedeu o controlo. A thread [main] assume então o controlo e é bloqueada pela barreira (linha 70 do código). A partir deste ponto, apenas a thread [RxComputationThreadPool-1] pode operar;
  • linhas 4–11: observamos o comportamento visto anteriormente entre o observável e o seu observador, mas agora tudo ocorre na thread [RxComputationThreadPool-1];
  • linhas 12-13: o observador baixou a barreira (linha 48 do código) e a thread [RxComputationThreadPool-1] foi encerrada. A thread [main] assume o controlo e exibe duas mensagens;

7.2.2. Exemplo-07: Observável e observador em duas threads diferentes

  

Alteramos o exemplo anterior da seguinte forma:


package dvp.rxjava.observables;
 
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
 
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
 
public class Exemple07 {
    public static void main(String[] args) {
 
        // gatekeeper
        CountDownLatch latch = new CountDownLatch(1);
 
        // configuration of a real observable
        Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
            @Override
            public void call(Subscriber<? super Double> subscriber) {
                showInfos("Observable.call start");
                for (int i = 0; i < 3; i++) {
                    // waiting
                    try {
                        Thread.sleep(500 - i * 100);
                    } catch (InterruptedException e) {
                        // error
                        subscriber.onError(e);
                    }
                    // action
                    double value = new Random().nextInt(100) * 1.2;
                    showInfos(String.format("Observable.call onNext(%s)", value));
                    subscriber.onNext(value);
                }
                // finish
                showInfos(String.format("Observable.call onCompleted"));
                subscriber.onCompleted();
            }
        });
 
        // a subscriber
        Subscriber<Double> subscriber = new Subscriber<Double>() {
            @Override
            public void onCompleted() {
                showInfos("Subscriber.onCompleted");
                // we lower the barrier
                latch.countDown();
            }
 
            @Override
            public void onError(Throwable e) {
                showInfos(String.format("Subscriber.onError (%s)", e));
            }
 
            @Override
            public void onNext(Double aDouble) {
                showInfos(String.format("Subscriber.onNext (%s)", aDouble));
            }
        };
 
        // suite observable configuration
        obs1 = obs1.subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation());
        // subscription
        showInfos("avant souscription");
        obs1.subscribe(subscriber);
        // waiting in front of the barrier
        try {
            showInfos("début attente barrière");
            latch.await();
            showInfos("fin attente barrière");
        } catch (InterruptedException e1) {
            System.out.println(e1);
        }
        showInfos("après souscription");
 
    }
 
    private static void showInfos(String message) {
        System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
                new SimpleDateFormat("ss:SSS").format(new Date()));
    }
}

O código é idêntico ao do exemplo anterior, exceto na linha 63:


obs1 = obs1.subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation());

que configura o observável (subscribeOn) e o observador (observeOn) para serem executados numa das threads fornecidas pelo agendador [Schedulers.computation()].

Os resultados obtidos são os seguintes:

avant souscription ------Thread[main] ---- Time[09:643]
début attente barrière ------Thread[main] ---- Time[09:656]
Observable.call start ------Thread[RxComputationThreadPool-4] ---- Time[09:656]
Observable.call onNext(39.6) ------Thread[RxComputationThreadPool-4] ---- Time[10:162]
Subscriber.onNext (39.6) ------Thread[RxComputationThreadPool-3] ---- Time[10:163]
Observable.call onNext(98.39999999999999) ------Thread[RxComputationThreadPool-4] ---- Time[10:562]
Subscriber.onNext (98.39999999999999) ------Thread[RxComputationThreadPool-3] ---- Time[10:564]
Observable.call onNext(46.8) ------Thread[RxComputationThreadPool-4] ---- Time[10:864]
Observable.call onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[10:866]
Subscriber.onNext (46.8) ------Thread[RxComputationThreadPool-3] ---- Time[10:866]
Subscriber.onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[10:868]
fin attente barrière ------Thread[main] ---- Time[10:869]
après souscription ------Thread[main] ---- Time[10:870]

Podem ser observados os seguintes pontos:

  • o observável é executado na thread [RxComputationThreadPool-4] (linhas 3–4, 6, 8–9);
  • o observador é executado na thread [RxComputationThreadPool-3] (linhas 5, 7, 10-11);
  • eles executam-se de forma independente. Assim, nas linhas 8–9, o observável emite duas notificações (onNext, onCompleted) antes de o observador recuperar a notificação [onNext] (linha 10);

A biblioteca RxJava trata da transferência de dados (emissões) da thread do observável para a thread do observador. O programador não precisa de se preocupar com isto.

Já vimos como criar observáveis (Observable.from, Observable.create). Agora, vamos analisar os observáveis predefinidos na biblioteca RxJava.

7.3. Observáveis predefinidos

7.3.1. Exemplo-08: o método [Observable.range]

 

A partir de agora, utilizaremos classes dedicadas para os processos observados e os seus observadores. A ideia é poder registar os seus nomes, os seus threads de execução e os seus tempos de execução, para que possamos acompanhá-los ao longo do tempo.

A classe [Process] será simplesmente um Observable ao qual podemos atribuir um nome. Ela implementará a seguinte interface [IProcess]:


package dvp.rxjava.observables.utils;
 
import rx.Observable;
 
public interface IProcess<T> {
 
    // name of observable
    public String getName();
 
    // observable
    public Observable<T> getObservable();
 
}

Esta interface pode ser implementada pela seguinte classe [Process<T>]:


package dvp.rxjava.observables.utils;
 
import rx.Observable;
import rx.Scheduler;
 
public class Process<T> implements IProcess<T>{
 
    // observable name
    protected String name;
    // observed process
    protected Observable<T> observable;
 
    // manufacturers
    public Process(String name, Observable<T> observable) {
        // local initializations
        this.name = name;
        this.observable = observable;
    }
 
    // getters and setters
    public String getName() {
        return name;
    }
 
    public Observable<T> getObservable() {
        return observable;
    }
 
}
  • linha 9: o nome do processo;
  • linha 11: o observável observado;
  • linhas 14–18: o construtor;

O observador será descrito pela seguinte classe [Observer]:


package dvp.rxjava.observables.utils;
 
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
 
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
 
import rx.Subscriber;
 
public class Observateur<T> extends Subscriber<T> {
 
...
}
  • Linha 11: A classe `Observateur<T>` estende a classe `Subscriber<T>`, que apresentámos brevemente na Secção 7.1.3. Iremos utilizá-la como argumento para o método [`Observable.subscribe`]:

// exécution observable (observation)
obs1.subscribe(observateur);

O método [Observable.subscribe] utilizado na linha 2 acima tem a seguinte definição:

 

A função do [Subscriber] consiste principalmente em gerir os elementos emitidos pelo observável ao qual se subscreveu, utilizando os métodos da interface [Observer]: onNext, onError, onCompleted. A classe [Subscriber] possui os seguintes métodos:

 

No código da classe [Observer], utilizaremos o método [1] isUnsubscribed para determinar se a subscrição do subscritor foi cancelada ou não. A classe [Observer<T>] completa é a seguinte:


package dvp.rxjava.observables.utils;
 
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
 
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
 
import rx.Subscriber;
 
public class Observateur<T> extends Subscriber<T> {
 
    // a gatekeeper (semaphore)
    private CountDownLatch latch;
    // a display method
    private Consumer<String> showInfos;
    // observer's name
    private String observerName;
    // the name of the observed process
    private String processName;
 
    // manufacturers
    public Observateur() {
 
    }
 
    public Observateur(String name, CountDownLatch latch, Consumer<String> showInfos, String observedName) {
        this.observerName = name;
        this.latch = latch;
        this.showInfos = showInfos;
        this.processName = observedName;
    }
 
    // --------------------------- implementation interface Observer<T>
    @Override
    public void onCompleted() {
        // end of issues
        if (!isUnsubscribed()) {
            showInfos.accept(String.format("Subscriber [%s,%s].onCompleted", observerName, processName));
        }
        // end of main thread lock
        latch.countDown();
    }
 
    @Override
    public void onError(Throwable e) {
        // emission error
        if (!isUnsubscribed()) {
            showInfos.accept(String.format("Subscriber [%s, %s].onError (%s)", observerName, processName, e));
        }
    }
 
    @Override
    public void onNext(T value) {
        // an additional show
        if (!isUnsubscribed()) {
            try {
                showInfos.accept(String.format("Subscriber [%s,%s] : onNext (%s)", observerName, processName,
                        new ObjectMapper().writeValueAsString(value)));
            } catch (JsonProcessingException e) {
                showInfos.accept(String.format("Subscriber [%s,%s].onNext (%s)", observerName, processName, e));
            }
        }
    }
}
  • Para além das características de um Assinante, o Observador conterá as seguintes informações:
    • linha 14: uma barreira ou semáforo que será utilizado para bloquear o segmento principal até que o observador tenha recebido todos os elementos emitidos pelo observável. Isto ocorrerá na linha 36 do código quando o observador receber a notificação de fim de emissão do observável;
    • linha 16: uma instância de Consumer<String> que será utilizada para exibir uma mensagem na consola;
    • linha 18: o nome do observador, utilizado para distinguir entre observadores quando existem vários;
    • linha 20: o nome do processo observado;
  • linhas 36, 46, 54: os métodos [onCompleted, onError, onNext] da interface [Observer<T>] implementados pela classe abstrata [Subscriber<T>]. Esta classe não os implementa. Por conseguinte, isto deve ser feito nas suas classes filhas. Antes de fazer qualquer coisa nestes métodos, verificamos se o observador foi cancelado da assinatura do observável que está a observar;
  • linha 59: o método [onNext] do observador escreve a cadeia JSON do elemento recebido. Isto permitir-nos-á apresentar vários tipos de elementos;

Dito isto, vamos examinar um novo método da classe Observable, o método [range]:

 

O observável Observable.range(n,m) emite (m) inteiros que variam de n a n+m-1. Vamos explorá-lo com o seguinte código [Exemplo08]:


package dvp.rxjava.observables.exemples;
 
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
 
import dvp.rxjava.observables.utils.Observateur;
import rx.Observable;
import rx.schedulers.Schedulers;
 
public class Exemple08 {
    public static void main(String[] args) throws InterruptedException {
 
        // number of observers
        final int nbObservateurs = 2;
 
        // semaphore
        CountDownLatch latch = new CountDownLatch(nbObservateurs);
 
        // observable configuration
        Observable<Integer> obs1 = Observable.range(15, 3).subscribeOn(Schedulers.computation());
        // observable performance (observation)
        showInfos.accept("main : début observation");
        for (int i = 0; i < nbObservateurs; i++) {
            obs1.subscribe(new Observateur<>(String.format("observateur[%d]", i), latch, showInfos,"obs1"));
        }
        // waiting
        showInfos.accept("main : attente fin observation");
        latch.await();
        // end
        showInfos.accept("main : fin observation");
    }
 
    // displays
    static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
            Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
  • Linha 16: Vamos utilizar dois observadores;
  • linha 19: o guardrail (semáforo) é inicializado com o valor dois, porque colocaremos cada observador numa thread diferente. A thread principal terá, portanto, de esperar que ambas as threads dos observadores terminem;
  • linha 22: configuramos o observável para que seja executado numa thread do agendador [Schedulers.computation()]. O observador estará na mesma thread que o observável;
  • linhas 25–27: subscrevemos dois observadores ao observável. Isto irá desencadear a execução completa do observável para cada observador: os inteiros 15, 16 e 17 serão emitidos;
  • linha 30: a thread principal aguarda que os observadores terminem;

Os resultados obtidos são os seguintes:

main : début observation ------Thread[main] ---- Time[27:875]
main : attente fin observation ------Thread[main] ---- Time[27:893]
Subscriber[observateur[1],obs1] : onNext (15) ------Thread[RxComputationThreadPool-2] ---- Time[28:245]
Subscriber[observateur[0],obs1] : onNext (15) ------Thread[RxComputationThreadPool-1] ---- Time[28:245]
Subscriber[observateur[1],obs1] : onNext (16) ------Thread[RxComputationThreadPool-2] ---- Time[28:247]
Subscriber[observateur[0],obs1] : onNext (16) ------Thread[RxComputationThreadPool-1] ---- Time[28:248]
Subscriber[observateur[1],obs1] : onNext (17) ------Thread[RxComputationThreadPool-2] ---- Time[28:249]
Subscriber[observateur[1],obs1].onCompleted ------Thread[RxComputationThreadPool-2] ---- Time[28:250]
Subscriber[observateur[0],obs1] : onNext (17) ------Thread[RxComputationThreadPool-1] ---- Time[28:251]
Subscriber[observateur[0],obs1].onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[28:252]
main : fin observation ------Thread[main] ---- Time[28:252]
  • linha 2: o thread principal está bloqueado, à espera que os dois observadores terminem;
  • linhas 3-4: vemos que o observador 0 está na thread [RxComputationThreadPool-1] e o observador 1 na thread [RxComputationThreadPool-2];
  • linhas 3-10: vemos que ambos os observadores recebem exatamente os mesmos elementos;

Vamos utilizar a classe Observer aqui definida para ilustrar o comportamento de outros tipos de observáveis.

7.3.2. Exemplo-09: os métodos Observable.[interval, take, doNext]

  
 

Este exemplo ilustra a utilização do observável Observable.interval(long interval, TimeUnit unit), que emite inteiros longos a intervalos regulares. Nota [1]: por predefinição, o observável [Observable.interval] é executado numa das threads do agendador [Schedulers.computation].

O código será o seguinte:


package dvp.rxjava.observables.exemples;
 
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
 
import dvp.rxjava.observables.utils.Observateur;
import rx.Observable;
 
public class Exemple09 {
    public static void main(String[] args) throws InterruptedException {
 
        // number of observers
        final int nbObservateurs = 2;
 
        // semaphore
        CountDownLatch latch = new CountDownLatch(nbObservateurs);
 
        // observable configuration
        Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
                .doOnNext(l -> showInfos.accept(l.toString()));
        // observable performance (observation)
        showInfos.accept("main : début observation");
        for (int i = 0; i < nbObservateurs; i++) {
            obs1.subscribe(new Observateur<>(String.format("observateur [%d]", i), latch, showInfos,
                    "obs1"));
        }
        // waiting
        showInfos.accept("main : attente fin observation");
        latch.await();
        // end
        showInfos.accept("main : fin observation");
    }
 
    // displays
    static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
            Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
  • linha 22: o observável emite inteiros longos a cada 500 milissegundos. A sequência começa com o número 0;
  • linha 22: este observável emite um número infinito de valores. O método [Observable.take(n)] cria um novo observável que retém apenas os primeiros n elementos emitidos;
 

Vamos rever o código do observável:


Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.doOnNext(l -> showInfos.accept(l.toString()));

Linha 2: O método [Observable.doOnNext] é executado sempre que o observável emite um novo elemento. É frequentemente utilizado para registar informações. Aqui, pretendemos registar a data de emissão dos elementos para verificar se o intervalo de 500 milissegundos está a ser mantido. O método [Observable.doOnNext] não altera o observável ao qual é aplicado. A sua definição é a seguinte:

 

A execução produz os seguintes resultados:

main : début observation ------Thread[main] ---- Time[55:892]
main : attente fin observation ------Thread[main] ---- Time[55:911]
0 ------Thread[RxComputationThreadPool-1] ---- Time[56:412]
0 ------Thread[RxComputationThreadPool-2] ---- Time[56:413]
Subscriber[observateur [1],obs1] : onNext (0) ------Thread[RxComputationThreadPool-2] ---- Time[56:723]
Subscriber[observateur [0],obs1] : onNext (0) ------Thread[RxComputationThreadPool-1] ---- Time[56:723]
1 ------Thread[RxComputationThreadPool-1] ---- Time[56:906]
Subscriber[observateur [0],obs1] : onNext (1) ------Thread[RxComputationThreadPool-1] ---- Time[56:908]
1 ------Thread[RxComputationThreadPool-2] ---- Time[56:912]
Subscriber[observateur [1],obs1] : onNext (1) ------Thread[RxComputationThreadPool-2] ---- Time[56:914]
2 ------Thread[RxComputationThreadPool-1] ---- Time[57:405]
Subscriber[observateur [0],obs1] : onNext (2) ------Thread[RxComputationThreadPool-1] ---- Time[57:407]
Subscriber[observateur [0],obs1].onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[57:408]
2 ------Thread[RxComputationThreadPool-2] ---- Time[57:412]
Subscriber[observateur [1],obs1] : onNext (2) ------Thread[RxComputationThreadPool-2] ---- Time[57:414]
Subscriber[observateur [1],obs1].onCompleted ------Thread[RxComputationThreadPool-2] ---- Time[57:415]
main : fin observation ------Thread[main] ---- Time[57:416]
  • linhas 3, 7 e 11: vemos que o intervalo de emissão é de aproximadamente 500 ms;
  • os dois observadores estão, de facto, em duas threads diferentes, apesar de o observável não ter sido configurado para ser executado com um agendador específico. Este é o comportamento padrão do observável [Observable.interval] que vemos aqui;

7.3.3. Exemplos-10/12: os métodos Observable.[error, empty, never]

 

A partir de agora, seremos mais concisos nas nossas ilustrações dos métodos da classe [Observable]. O código anterior era o seguinte:


package dvp.rxjava.observables;
 
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
 
import rx.Observable;
 
public class Exemple09 {
    public static void main(String[] args) throws InterruptedException {
 
        // number of observers
        final int nbObservateurs = 2;
 
        // semaphore
        CountDownLatch latch = new CountDownLatch(nbObservateurs);
 
        // observable configuration
        Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
                .doOnNext(l -> showInfos.accept(l.toString()));
        // observable performance (observation)
        showInfos.accept("main : début observation");
        for (int i = 0; i < nbObservateurs; i++) {
            obs1.subscribe(new Observateur<>(String.format("observateur [%d]", i), latch, showInfos,
                    "obs1"));
        }
        // waiting
        showInfos.accept("main : attente fin observation");
        latch.await();
        // end
        showInfos.accept("main : fin observation");
    }
 
    // displays
    static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
            Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}

Este código já foi utilizado no exemplo anterior. Apenas as linhas 21–22 foram alteradas. Por isso, vamos extrair a maior parte deste código para a seguinte classe [ProcessUtils]:


package dvp.rxjava.observables.utils;
 
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
 
import rx.Observable;
 
public class ProcessUtils {
 
    @SafeVarargs
    public static void subscribe(int nbObservateurs, IProcess<?>... processes) throws InterruptedException {
 
        // semaphore
        CountDownLatch latch = new CountDownLatch(nbObservateurs * processes.length);
 
        // observable performance (observation)
        showInfos.accept("main : début observation");
        for (int i = 0; i < nbObservateurs; i++) {
            for (IProcess<?> process : processes) {
                Observable<?> obs = process.getObservable();
                obs.subscribe(new Observateur<>(String.format("observateur[%d]", i), latch, showInfos, process.getName()));
            }
        }
        // waiting
        showInfos.accept("main : attente fin observation");
        latch.await();
        // end
        showInfos.accept("main : fin observation");
    }
 
    // displays
    static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
            Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
  • linha 13: o método recebe dois parâmetros:
    • nbObservers: o número de observadores para os processos, passado como segundo parâmetro;
    • processos: os processos (denominados observáveis) a serem observados. Graças à notação [IProcess<?>], os processos podem emitir elementos de diferentes tipos;
  • linha 16: o semáforo deve ficar verde quando todos os observadores tiverem concluído todas as suas observações. O valor inicial do semáforo é, portanto, o número de observadores multiplicado pelo número de observações;
  • Linhas 20–25: Cada observador está inscrito em todos os processos que precisa de observar;
  • linha 23: recuperar o observável do processo (ver Secção 7.3.1);
  • linha 23: um observador está inscrito nele. São passadas quatro informações ao observador:
    • o seu nome;
    • o semáforo que deve decrementar quando receber a notificação de fim de transmissão do observável que está a observar;
    • o método a utilizar quando quiser registar informações na consola;
    • o nome do processo que irá observar;

Com estas classes definidas, o Exemplo 10 será o seguinte:


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
 
public class Exemple10 {
    public static void main(String[] args) throws InterruptedException {
        // observable configuration
        Observable<?> obs = Observable.error(new RuntimeException("Erreur !!!")).subscribeOn(Schedulers.computation());
        // observable execution (observation)
        ProcessUtils.subscribe(2,new Process<>("process1", obs));
    }
}

Na linha 11, o método estático [Observable.error] é definido da seguinte forma:

 

A linha 8 configura, portanto, um observável que simplesmente lança uma exceção para o método [onError] dos seus assinantes. A execução produz os seguintes resultados:


main : début observation ------Thread[main] ---- Time[22:618]
main : attente fin observation ------Thread[main] ---- Time[22:636]
Subscriber[observateur[1], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-2] ---- Time[22:638]
Subscriber[observateur[0], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-1] ---- Time[22:638]

Linhas 3 e 4: o método [onError] de ambos os assinantes recebeu a exceção lançada pelo observável.

Esta execução tem uma peculiaridade: os métodos [onCompleted] de ambos os observadores não foram chamados. Como resultado, a barreira não foi baixada e o thread principal permanece bloqueado no método estático [ProcessUtils.subscribe] na linha 3 a seguir:


// attente
showInfos.accept("main : attente fin observation");
latch.await();
// fin
showInfos.accept("main : fin observation");

Aqui vemos que, se ocorrer um erro no observável, o método [onCompleted] dos assinantes não é chamado. Por isso, modificamos o método [Observer.onError] da seguinte forma:


    @Override
    public void onError(Throwable e) {
        // erreur d'émission
        if (!isUnsubscribed()) {
            showInfos.accept(String.format("Subscriber[%s, %s].onError (%s)", observerName, processName, e));
        }
        // fin blocage thread principal
        latch.countDown();
}

Adicionamos as linhas 7–8 para libertar o bloqueio em caso de um erro observável. Com este novo código, a execução produz os seguintes resultados:


main : début observation ------Thread[main] ---- Time[40:750]
main : attente fin observation ------Thread[main] ---- Time[40:764]
Subscriber[observateur[0], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-1] ---- Time[40:766]
Subscriber[observateur[1], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-2] ---- Time[40:766]
main : fin observation ------Thread[main] ---- Time[40:767]

Obtemos a linha 5, que não tínhamos antes.

O Exemplo 11 será o seguinte:


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
 
public class Exemple11 {
    public static void main(String[] args) throws InterruptedException {
        // observable configuration
        Observable<?> obs1 = Observable.empty();
        // observable execution (observation)
        ProcessUtils.subscribe(2,new Process<>("process1",obs1));
    }
}

Linha 10: O método estático [Observable.empty] cria um observável que não emite elementos. Emite apenas a notificação de fim de emissão;

 

A execução do código do exemplo acima produz os seguintes resultados:

1
2
3
4
5
main : début observation ------Thread[main] ---- Time[37:073]
Subscriber[observateur[0],process1].onCompleted ------Thread[main] ---- Time[37:086]
Subscriber[observateur[1],process1].onCompleted ------Thread[main] ---- Time[37:086]
main : attente fin observation ------Thread[main] ---- Time[37:087]
main : fin observation ------Thread[main] ---- Time[37:087]
  • Linhas 2 e 3: Vemos que ambos os observadores recebem a notificação de fim de transmissão sem terem recebido quaisquer elementos anteriormente.

Pode-se questionar para que serve, na verdade, este método. Pode ser utilizado de forma análoga a uma coleção, inicialmente vazia, à qual são posteriormente adicionados elementos:

1
2
3
4
Observable obs=Observable.empty() ;
for(Observable o : observables){
    obs=obs.mergeWith(o) ;
}

Na linha 3, fundimos o observável inicial obs (linha 1) com outros observáveis.

O Exemplo 12 ilustra o método estático [Observable.never]:


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
 
public class Exemple12 {
    public static void main(String[] args) throws InterruptedException {
        // observable configuration
        Observable<?> obs1 = Observable.never();
        // observable execution (observation)
        ProcessUtils.subscribe(2,new Process<>("process1",obs1));
    }
}

O método estático [Observable.never] cria um observável que nunca emite:

 

A execução do exemplo produz os seguintes resultados:

main : début observation ------Thread[main] ---- Time[27:018]
main : attente fin observation ------Thread[main] ---- Time[27:030]

Linha 2: o thread principal aguarda indefinidamente. Isto acontece porque nenhum observável emite a notificação [onCompleted], o que permite que o semáforo (barreira) fique verde (baixar a barreira).

7.4. Multithreading

7.4.1. Exemplo 13: thread de ação, thread de observador

Na Secção 7.1.3, criámos um observável utilizando o método estático [Observable.create]:

 
  • o método [create] devolve um tipo Observable<T>;
  • o parâmetro do método [create] é uma função do tipo [Observable.OnSubscribe<T>] definida da seguinte forma:
 

O tipo [Observable.OnSubscribe<T>] é uma interface funcional que, por sua vez, estende a interface funcional [Action1<Subscriber<? super T>>]. O método [call] desta interface espera um tipo [Subscriber] (assinante, observador). No restante deste documento, por vezes referir-nos-emos ao tipo [Observable.OnSubscribe<T>] como uma ação. Iremos criar ações personalizadas que terão um nome. Estas serão instâncias da seguinte interface [IProcessAction]:

  

package dvp.rxjava.observables.utils;
 
import rx.Observable;
 
public interface IProcessAction<T> extends Observable.OnSubscribe<T> {
 
    // action has a name
    public String getName();
}
  • linha 5: a interface [IProcessAction<T>] tem todas as características da interface [Observable.OnSubscribe<T>];
  • linha 8: possui também um método [getName] que devolve o nome da instância que implementa a interface;

Vamos utilizar a seguinte ação denominada [ProcessAction01]:


package dvp.rxjava.observables.utils;
 
import java.util.Random;
 
import rx.Subscriber;
import rx.functions.Func1;
 
public class ProcessAction01<T> implements IProcessAction<T> {
 
    // data
    private String name;
    private int nbValues;
    private Func1<Integer, T> func1;
 
    // manufacturers
    public ProcessAction01(String name, int nbValues, Func1<Integer, T> func1) {
        this.name = name;
        this.nbValues = nbValues;
        this.func1 = func1;
    }
 
    @Override
    public void call(Subscriber<? super T> subscriber) {
        ProcessUtils.showInfos.accept(String.format("Observable (%s) call start", getName()));
        for (int i = 0; i < nbValues; i++) {
            // waiting
            try {
                Thread.sleep(new Random().nextInt(500));
            } catch (InterruptedException e) {
                // error
                ProcessUtils.showInfos.accept(String.format("Observable (%s) onError", getName()));
                subscriber.onError(e);
            }
            // element emission
            T value = func1.call(i);
            ProcessUtils.showInfos.accept(String.format("Observable (%s,%s) onNext (%s)", getName(), i, value));
            subscriber.onNext(value);
        }
        // finish
        ProcessUtils.showInfos.accept(String.format("Observable (%s) onCompleted", getName()));
        subscriber.onCompleted();
    }
 
    @Override
    public String getName() {
        return name;
    }
 
}
  • linha 8: a classe [ProcessAction01<T>] implementa a interface [IProcessAction<T>] e, por conseguinte, a interface [Observable.OnSubscribe<T>];
  • linha 11: o nome da ação;
  • linha 12: o número de valores a emitir;
  • linha 13: uma instância do tipo [Func1<Integer, T>] que recebe um inteiro e produz um tipo T a ser emitido pelo observável (linhas 35 e 37);
  • linhas 16–20: passamos o nome da ação, o número de valores a emitir e a função de emissão para o construtor;
  • linhas 23–42: o código do processo;
  • linha 23: o método [call] recebe como parâmetro o subscritor do observável associado ao processo;
  • linha 28: o processo emite os seus elementos após uma espera de duração aleatória;
  • linha 32: emissão de um erro;
  • linha 37: uma emissão normal;
  • linha 41: emite a notificação de fim de emissão;
  • linhas 25–38: a ação emite nbValues números reais após um tempo de espera aleatório (linha 30);
  • linha 35: o valor a ser emitido é fornecido pela função [func1] passada como parâmetro ao construtor (linha 16);

Reestruturamos a classe [Process] (ver Secção 7.3.1) para que também possa ser construída com uma ação nomeada. Adicionamos o seguinte construtor:


public Process(IProcessAction<T> na, Scheduler schedulerObserved, Scheduler schedulerObserver) {
        // nom process=nom action
        name = na.getName();
        // action --> observable
        observable = Observable.create(na);
        // thread d'exécution du processus observé
        if (schedulerObserved != null) {
            observable = observable.subscribeOn(schedulerObserved);
        }
        // thread d'observation de l'observateur
        if (schedulerObserver != null) {
            observable = observable.observeOn(schedulerObserver);
        }
    }
  • Linha 1: O construtor recebe 3 parâmetros:
    1. a ação nomeada que será utilizada para construir o observável (linha 5);
    2. o agendador do processo observado (pode ser nulo);
    3. o agendador do observador (pode ser nulo);
  • linha 5: o observável é criado a partir da ação passada como parâmetro;

O código seguinte [Exemplo 13] observa diferentes observáveis:


package dvp.rxjava.observables.exemples;
 
import java.util.Random;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
 
public class Exemple13 {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Double> process1 = new Process<>(
                new ProcessAction01<Double>("process1", 1, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
                Schedulers.computation());
        // process 2
        Process<String> process2 = new Process<>(
                new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), Schedulers.computation(), null);
        // process 3
        Process<Integer> process3 = new Process<>(new ProcessAction01<Integer>("process3", 3, i -> i * 2), null,
                Schedulers.computation());
        // process 4
        Process<Boolean> process4 = new Process<>(new ProcessAction01<Boolean>("process4", 4, i -> i % 2 == 0), null, null);
        // subscriptions
        ProcessUtils.subscribe(1, process1);
        ProcessUtils.subscribe(1, process2);
        ProcessUtils.subscribe(1, process3);
        ProcessUtils.subscribe(1, process4);
    }
}
  • linhas 13–15: o process1 produz 1 número real numa thread de computação que será observado noutra thread de computação;
  • linhas 17–18: o processo2 produz 2 cadeias de caracteres numa thread de computação, e não é dada qualquer indicação relativamente à thread do observador. Os resultados mostram que a observação ocorre, por predefinição, na mesma thread que a execução do processo;
  • linhas 20–21: o processo3 produz 3 inteiros numa thread não especificada, que serão observados numa thread de computação. Os resultados mostram que o processo é executado por predefinição na thread principal;
  • linha 23: o processo process4 produz 4 valores booleanos numa thread não especificada, que serão observados numa thread não especificada. Os resultados mostram que a execução do processo e a sua observação ocorrem, por predefinição, na thread principal;

O resultado da execução deste código é o seguinte:

main : début observation ------Thread[main] ---- Time[18:642]
main : attente fin observation ------Thread[main] ---- Time[18:660]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[18:660]
Observable (process1,0) onNext (68.39999999999999) ------Thread[RxComputationThreadPool-4] ---- Time[19:093]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[19:094]
Subscriber[observateur[0],process1] : onNext (68.39999999999999) ------Thread[RxComputationThreadPool-3] ---- Time[19:396]
Subscriber[observateur[0],process1].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[19:397]
main : fin observation ------Thread[main] ---- Time[19:397]
main : début observation ------Thread[main] ---- Time[19:398]
main : attente fin observation ------Thread[main] ---- Time[19:399]
Observable (process2) call start ------Thread[RxComputationThreadPool-5] ---- Time[19:399]
Observable (process2,0) onNext (valeur-0) ------Thread[RxComputationThreadPool-5] ---- Time[19:630]
Subscriber[observateur[0],process2] : onNext ("valeur-0") ------Thread[RxComputationThreadPool-5] ---- Time[19:631]
Observable (process2,1) onNext (valeur-1) ------Thread[RxComputationThreadPool-5] ---- Time[20:094]
Subscriber[observateur[0],process2] : onNext ("valeur-1") ------Thread[RxComputationThreadPool-5] ---- Time[20:095]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-5] ---- Time[20:096]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-5] ---- Time[20:096]
main : fin observation ------Thread[main] ---- Time[20:097]
main : début observation ------Thread[main] ---- Time[20:097]
Observable (process3) call start ------Thread[main] ---- Time[20:098]
Observable (process3,0) onNext (0) ------Thread[main] ---- Time[20:188]
Subscriber[observateur[0],process3] : onNext (0) ------Thread[RxComputationThreadPool-6] ---- Time[20:213]
Observable (process3,1) onNext (2) ------Thread[main] ---- Time[20:336]
Subscriber[observateur[0],process3] : onNext (2) ------Thread[RxComputationThreadPool-6] ---- Time[20:338]
Observable (process3,2) onNext (4) ------Thread[main] ---- Time[20:676]
Observable (process3) onCompleted ------Thread[main] ---- Time[20:677]
main : attente fin observation ------Thread[main] ---- Time[20:677]
Subscriber[observateur[0],process3] : onNext (4) ------Thread[RxComputationThreadPool-6] ---- Time[20:678]
Subscriber[observateur[0],process3].onCompleted ------Thread[RxComputationThreadPool-6] ---- Time[20:679]
main : fin observation ------Thread[main] ---- Time[20:679]
main : début observation ------Thread[main] ---- Time[20:680]
Observable (process4) call start ------Thread[main] ---- Time[20:680]
Observable (process4,0) onNext (true) ------Thread[main] ---- Time[21:065]
Subscriber[observateur[0],process4] : onNext (true) ------Thread[main] ---- Time[21:067]
Observable (process4,1) onNext (false) ------Thread[main] ---- Time[21:187]
Subscriber[observateur[0],process4] : onNext (false) ------Thread[main] ---- Time[21:188]
Observable (process4,2) onNext (true) ------Thread[main] ---- Time[21:624]
Subscriber[observateur[0],process4] : onNext (true) ------Thread[main] ---- Time[21:625]
Observable (process4,3) onNext (false) ------Thread[main] ---- Time[21:765]
Subscriber[observateur[0],process4] : onNext (false) ------Thread[main] ---- Time[21:766]
Observable (process4) onCompleted ------Thread[main] ---- Time[21:767]
Subscriber[observateur[0],process4].onCompleted ------Thread[main] ---- Time[21:767]
main : attente fin observation ------Thread[main] ---- Time[21:767]
main : fin observation ------Thread[main] ---- Time[21:768]
  • O processo process1 gera 1 número real (linha 4) na thread de computação [RxComputationThreadPool-4], que é observado na thread de computação [RxComputationThreadPool-3] (linha 6);
  • O processo process2 produz 2 cadeias de caracteres (linhas 12, 14) na thread de computação [RxComputationThreadPool-5], que são observadas nessa mesma thread (linhas 13, 15);
  • O processo process3 produz 3 inteiros (linhas 21, 23, 25) na thread principal, que são observados na thread de computação [RxComputationThreadPool-6] (linhas 22, 24, 28);
  • o processo process4 produz 4 valores booleanos (linhas 34, 36, 38, 40) na thread principal, que são observados nessa mesma thread principal (linhas 33, 35, 37, 39);

Convidamos o leitor a acompanhar o exposto acima:

  • o ciclo de vida do processo observado e da sua thread;
  • o ciclo de vida do seu observador e da sua thread;

Grande parte do apelo das bibliotecas Rx reside neste multithreading, que o programador não precisa de gerir por conta própria.

7.5. Combinações de múltiplos observáveis

7.5.1. Exemplo-14: Combinação de dois observáveis com [Observable.merge]

Apresentamos agora métodos estáticos da classe [Observable] que permitem combinar múltiplos observáveis num único observável de resultado.

O primeiro exemplo deste tipo é o seguinte:


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.ProcessAction01;
 
import java.util.Random;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
 
public class Exemple14 {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Double> process1 = new Process<>(
                new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
                Schedulers.computation());
        // process2
        Process<String> process2 = new Process<>(
                new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), Schedulers.computation(), null);
        // merge
        Process<?> process12 = new Process<>("process12",
                Observable.merge(process1.getObservable(), process2.getObservable()));
        // subscriptions
        ProcessUtils.subscribe(1, process12);
    }
}
  • linhas 15–17: um processo denominado [process1] emitirá 3 números reais numa thread de computação. Será também observado numa thread de computação;
  • linhas 19–20: um processo denominado [process2] emitirá 2 cadeias de caracteres numa thread de computação. A thread de observação não está especificada. Vimos anteriormente que, neste caso, a thread de observação é a thread de computação;
  • linha 23: os dois processos são fundidos, ou seja, é criado um observável cujos elementos provêm simultaneamente de ambos os processos. O método estático [Observable.merge] é utilizado para este efeito:
 

Ao contrário do que o diagrama acima possa sugerir, durante a fusão, os elementos do fluxo 1 podem ser intercalados entre os elementos do fluxo 2. Isto é demonstrado pelos resultados da execução:

main : début observation ------Thread[main] ---- Time[56:053]
main : attente fin observation ------Thread[main] ---- Time[56:073]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[56:073]
Observable (process2) call start ------Thread[RxComputationThreadPool-5] ---- Time[56:074]
Observable (process1,0) onNext (64.8) ------Thread[RxComputationThreadPool-4] ---- Time[56:263]
Observable (process2,0) onNext (valeur-0) ------Thread[RxComputationThreadPool-5] ---- Time[56:403]
Observable (process2,1) onNext (valeur-1) ------Thread[RxComputationThreadPool-5] ---- Time[56:515]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-5] ---- Time[56:516]
Subscriber[observateur[0],process12] : onNext (64.8) ------Thread[RxComputationThreadPool-3] ---- Time[56:552]
Subscriber[observateur[0],process12] : onNext ("valeur-0") ------Thread[RxComputationThreadPool-3] ---- Time[56:553]
Subscriber[observateur[0],process12] : onNext ("valeur-1") ------Thread[RxComputationThreadPool-3] ---- Time[56:553]
Observable (process1,1) onNext (56.4) ------Thread[RxComputationThreadPool-4] ---- Time[56:716]
Subscriber[observateur[0],process12] : onNext (56.4) ------Thread[RxComputationThreadPool-3] ---- Time[56:718]
Observable (process1,2) onNext (22.8) ------Thread[RxComputationThreadPool-4] ---- Time[57:082]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[57:083]
Subscriber[observateur[0],process12] : onNext (22.8) ------Thread[RxComputationThreadPool-3] ---- Time[57:084]
Subscriber[observateur[0],process12].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[57:085]
main : fin observation ------Thread[main] ---- Time[57:085]
  • linha 3: o processo [process1] é executado na thread de computação [RxComputationThreadPool-4];
  • linha 4: o processo [process2] está a ser executado na thread de computação [RxComputationThreadPool-5];
  • linha 9: o processo [process12] é observado na thread de computação [RxComputationThreadPool-3]. Não conheço a regra que levou a esta escolha;
  • linhas 9–11: vemos que o observador observa elementos de ambos os processos [process1] (linha 5) e [process2] (linhas 6, 7), mesmo que nenhum deles tenha terminado (há mistura);
  • o processo [process12] termina (linha 17) quando ambos os processos, process1 e process2, terminam;

7.5.2. Exemplo-15: Concatenar dois observáveis com [Observable.concat]

Vamos agora examinar o seguinte código:


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.ProcessAction01;
 
import java.util.Random;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
 
public class Exemple15 {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Double> process1 = new Process<>(
                new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
                Schedulers.computation());
        // process2
        Process<String> process2 = new Process<>(
                new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), null, Schedulers.computation());
        // concat
        Process<?> process12 = new Process<>("process12",
                Observable.concat(process1.getObservable(), process2.getObservable()));
        // subscriptions
        ProcessUtils.subscribe(1, process12);
    }
}
  • linhas 15–17: um processo denominado [process1] emitirá 3 números reais numa thread de computação. Será também observado numa thread de computação;
  • linhas 19–20: um processo denominado [process2] emitirá 2 cadeias de caracteres numa thread não especificada, neste caso a thread principal por predefinição. Será observado numa thread de computação;
  • linha 23: os dois processos são concatenados, ou seja, é criado um observável cujos elementos provêm de ambos os processos. Os valores emitidos não são misturados. O processo [process12] emitirá primeiro todos os valores do processo [process1] e, em seguida, os do processo [process2]. O método estático [Observable.concat] é utilizado para este efeito:
 

Os resultados da execução são os seguintes:

main : début observation ------Thread[main] ---- Time[30:162]
main : attente fin observation ------Thread[main] ---- Time[30:189]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[30:190]
Observable (process1,0) onNext (79.2) ------Thread[RxComputationThreadPool-4] ---- Time[30:681]
Observable (process1,1) onNext (98.39999999999999) ------Thread[RxComputationThreadPool-4] ---- Time[30:792]
Subscriber[observateur[0],process12] : onNext (79.2) ------Thread[RxComputationThreadPool-3] ---- Time[30:975]
Subscriber[observateur[0],process12] : onNext (98.39999999999999) ------Thread[RxComputationThreadPool-3] ---- Time[30:976]
Observable (process1,2) onNext (84.0) ------Thread[RxComputationThreadPool-4] ---- Time[31:084]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[31:085]
Subscriber[observateur[0],process12] : onNext (84.0) ------Thread[RxComputationThreadPool-3] ---- Time[31:086]
Observable (process2) call start ------Thread[RxComputationThreadPool-3] ---- Time[31:087]
Observable (process2,0) onNext (valeur-0) ------Thread[RxComputationThreadPool-3] ---- Time[31:556]
Subscriber[observateur[0],process12] : onNext ("valeur-0") ------Thread[RxComputationThreadPool-5] ---- Time[31:557]
Observable (process2,1) onNext (valeur-1) ------Thread[RxComputationThreadPool-3] ---- Time[31:608]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[31:609]
Subscriber[observateur[0],process12] : onNext ("valeur-1") ------Thread[RxComputationThreadPool-5] ---- Time[31:609]
Subscriber[observateur[0],process12].onCompleted ------Thread[RxComputationThreadPool-5] ---- Time[31:610]
main : fin observation ------Thread[main] ---- Time[31:611]
  • linhas 3-10: o processo [process1] é executado e o processo [process12] emite os valores emitidos por [process1];
  • linha 9: o processo [process1] terminou;
  • linhas 11-17: o processo [process2] é executado e o processo [process12] emite os valores emitidos pelo [process2];

Há uma peculiaridade em relação ao processo2: não especificámos um segmento de execução. Seria, portanto, de esperar que o segmento principal fosse utilizado por predefinição. No entanto, não é esse o caso. O segmento de execução foi o segmento de computação [RxComputationThreadPool-3] (linha 11). Por conseguinte, quando não é especificado nenhum segmento de execução ou de observação, não podemos fazer quaisquer suposições sobre qual o segmento que será escolhido.

7.5.3. Exemplo-16: Combinar dois observáveis com [Observable.zip]

Vamos agora examinar o código seguinte:


package dvp.rxjava.observables.exemples;
 
import java.util.Arrays;
import java.util.Random;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.functions.FuncN;
import rx.schedulers.Schedulers;
 
public class Exemple16 {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Double> process1 = new Process<>(
                new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
                Schedulers.computation());
        // process 2
        Process<String> process2 = new Process<>(
                new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), null, null);
        // 2-process combination function
        FuncN<String> funcn = new FuncN<String>() {
            @Override
            public String call(Object... args) {
                if (args.length == 2) {
                    return String.format("double=%s, string=%s", args[0], args[1]);
                } else {
                    throw new RuntimeException("la fonction attend 2 paramètres exactement");
                }
            }
        };
        // zip of the 2 processes
        Process<String> process12 = new Process<>("process12",
                Observable.zip(Arrays.asList(process1.getObservable(), process2.getObservable()), funcn));
        // subscriptions
        ProcessUtils.subscribe(1, process12);
    }
}
  • linhas 16–18: um processo denominado [process1] emitirá 3 números reais numa thread de computação. Será também observado numa thread de computação;
  • linhas 20–21: um processo denominado [process2] emitirá 2 cadeias de caracteres numa thread não especificada. A thread de observação também não é especificada;
  • linhas 23–32: instância de um tipo [FuncN<String>] com uma classe anónima. FuncN é uma interface funcional:
 

O método [FuncN.call] espera um conjunto de objetos e devolve um tipo R. A função [funcn] será utilizada para combinar os processos process1 e process2 nessa ordem. No método [FuncN.call]:

  • args[0] será um Double;
  • args[1] será um String;

Aqui, o resultado de [funcn.call] será a string da linha 27. A construção deste resultado não requer o conhecimento dos tipos dos argumentos do método de chamada.

Os dois processos são combinados da seguinte forma:


// zip des 2 processus
Process<String> process12 = new Process<>("process12",
Observable.zip(Arrays.asList(process1.getObservable(), process2.getObservable()), funcn));

O método [Observable.zip] funciona da seguinte forma:

 

Vemos que:

  • o primeiro argumento de zip é um Iterable<Observable>. No nosso exemplo, temos um parâmetro real do tipo List<Observable> composto pelos nossos dois observáveis;
  • o segundo argumento de zip é do tipo FuncN. No nosso exemplo, o parâmetro real é [funcn];

A execução produz os seguintes resultados:

main : début observation ------Thread[main] ---- Time[55:636]
Observable (process2) call start ------Thread[main] ---- Time[55:666]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[55:666]
Observable (process1,0) onNext (69.6) ------Thread[RxComputationThreadPool-4] ---- Time[55:902]
Observable (process2,0) onNext (valeur-0) ------Thread[main] ---- Time[56:076]
Observable (process1,1) onNext (82.8) ------Thread[RxComputationThreadPool-4] ---- Time[56:271]
Subscriber[observateur[0],process12] : onNext ("double=69.6, string=valeur-0") ------Thread[main] ---- Time[56:352]
Observable (process1,2) onNext (14.399999999999999) ------Thread[RxComputationThreadPool-4] ---- Time[56:641]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[56:642]
Observable (process2,1) onNext (valeur-1) ------Thread[main] ---- Time[56:778]
Subscriber[observateur[0],process12] : onNext ("double=82.8, string=valeur-1") ------Thread[main] ---- Time[56:779]
Observable (process2) onCompleted ------Thread[main] ---- Time[56:779]
Subscriber[observateur[0],process12].onCompleted ------Thread[main] ---- Time[56:780]
main : attente fin observation ------Thread[main] ---- Time[56:781]
main : fin observation ------Thread[main] ---- Time[56:781]
  • linhas 7, 11: o process12 emite dois elementos;
  • linha 8: o elemento adicional emitido pelo processo1, que não tem correspondência no processo2, não é emitido pelo processo resultante processo12;

Vemos que o process2, ao qual não tinha sido atribuída nem uma thread de execução nem uma thread de observação, utilizou a thread principal para ambas.

7.5.4. Exemplo-17: Combinar dois observáveis com [Observable.combineLatest]

Vamos agora examinar o seguinte código:


package dvp.rxjava.observables.exemples;
 
import java.util.Random;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
 
public class Exemple17 {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Double> process1 = new Process<>(
                new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
                Schedulers.computation());
        // process 2
        Process<Double> process2 = new Process<>(
                new ProcessAction01<Double>("process2", 2, i -> new Random().nextInt(200) * 1.4), null,
                Schedulers.computation());
        // combining the 2 processes
        Process<Double> process12 = new Process<>("process12",
                Observable.combineLatest(process1.getObservable(), process2.getObservable(), (d1, d2) -> d1 + d2));
        // subscriptions
        ProcessUtils.subscribe(1, process12);
    }
}
  • linhas 14–16: um processo denominado [process1] emitirá 3 números reais numa thread de computação. Também será observado numa thread de computação;
  • linhas 18–20: um processo denominado [process2] emitirá 2 números reais numa thread não vinculada. Estes serão observados numa thread de computação;
  • linha 23: os dois observáveis são combinados utilizando o seguinte método estático [Observable.combineLatest]:
 

O observável [combineLatest] funciona da seguinte forma: quando um dos dois observáveis emite um elemento E1, esse elemento é combinado pela função [combineFunction] com o último elemento emitido pelo outro observável.

A execução deste código produz o seguinte resultado:

main : début observation ------Thread[main] ---- Time[01:768]
Observable (process2) call start ------Thread[main] ---- Time[01:791]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[01:791]
Observable (process1,0) onNext (54.0) ------Thread[RxComputationThreadPool-4] ---- Time[01:991]
Observable (process2,0) onNext (56.0) ------Thread[main] ---- Time[02:245]
Observable (process1,1) onNext (51.6) ------Thread[RxComputationThreadPool-4] ---- Time[02:358]
Subscriber[observateur[0],process12] : onNext (110.0) ------Thread[RxComputationThreadPool-5] ---- Time[02:521]
Subscriber[observateur[0],process12] : onNext (107.6) ------Thread[RxComputationThreadPool-5] ---- Time[02:522]
Observable (process2,1) onNext (261.8) ------Thread[main] ---- Time[02:595]
Observable (process2) onCompleted ------Thread[main] ---- Time[02:596]
main : attente fin observation ------Thread[main] ---- Time[02:596]
Subscriber[observateur[0],process12] : onNext (313.40000000000003) ------Thread[RxComputationThreadPool-5] ---- Time[02:597]
Observable (process1,2) onNext (80.39999999999999) ------Thread[RxComputationThreadPool-4] ---- Time[02:790]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[02:791]
Subscriber[observateur[0],process12] : onNext (342.2) ------Thread[RxComputationThreadPool-3] ---- Time[02:792]
Subscriber[observateur[0],process12].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[02:792]
main : fin observation ------Thread[main] ---- Time[02:793]
  • Linha 5: A saída do process2 (56) é combinada com o último elemento produzido pelo process1 (54, linha 4) e produz o resultado mostrado na linha 7;
  • linha 6: a saída do process1 (51,6) é combinada com o último elemento produzido pelo process2 (56, linha 5) e produz o resultado da linha 8;
  • linha 9: a saída do processo2 (261,8) é combinada com o último elemento emitido pelo processo1 (51,6, linha 6) e produz o resultado da linha 12;
  • linha 13: a emissão do processo1 (80,39) é combinada com o último elemento emitido pelo processo2 (261,8, linha 9) e produz o resultado da linha 15;

Esta é uma variante do observável [zip] em que, desta vez, os elementos combinados não são necessariamente os elementos na mesma posição nos fluxos. Note-se aqui que o processo2, ao qual não tinha sido atribuída nenhuma thread de execução, foi executado na thread principal (linha 2).

7.5.5. Exemplo-18: Combinação de dois observáveis com [Observable.amb]

Vamos agora examinar o código seguinte:


package dvp.rxjava.observables.exemples;
 
import java.util.Random;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
 
public class Exemple18 {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Double> process1 = new Process<>(
                new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
                Schedulers.computation());
        // process 2
        Process<Double> process2 = new Process<>(
                new ProcessAction01<Double>("process2", 2, i -> new Random().nextInt(200) * 1.4), null, null);
        // combining the 2 processes
        Process<Double> process12 = new Process<>("process12",
                Observable.amb(process1.getObservable(), process2.getObservable()));
        // subscriptions
        ProcessUtils.subscribe(1, process12);
    }
}
  • linhas 14–16: um processo denominado [process1] emitirá 3 números reais numa thread de computação. Também será observado numa thread de computação;
  • linhas 18–20: um processo denominado [process2] emitirá 2 números reais numa thread não vinculada. Estes serão observados numa thread não vinculada;
  • linha 22: os dois observáveis são combinados utilizando o seguinte método estático [Observable.amb]:
 

Conforme ilustrado no diagrama acima, o observável [Observable.amb(Observable o1, Observable o2)] emite os elementos do observável que emite primeiro. Isto é confirmado pelos resultados do exemplo apresentado:

main : début observation ------Thread[main] ---- Time[21:594]
Observable (process2) call start ------Thread[main] ---- Time[21:612]
Observable (process1) call start ------Thread[RxComputationThreadPool-3] ---- Time[21:612]
Observable (process2,0) onNext (155.39999999999998) ------Thread[main] ---- Time[21:817]
Observable (process1) onError ------Thread[RxComputationThreadPool-3] ---- Time[21:820]
Observable (process1,0) onNext (90.0) ------Thread[RxComputationThreadPool-3] ---- Time[21:820]
Observable (process1,1) onNext (104.39999999999999) ------Thread[RxComputationThreadPool-3] ---- Time[21:877]
Subscriber[observateur[0],process12] : onNext (155.39999999999998) ------Thread[main] ---- Time[22:105]
Observable (process1,2) onNext (44.4) ------Thread[RxComputationThreadPool-3] ---- Time[22:122]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[22:123]
Observable (process2,1) onNext (201.6) ------Thread[main] ---- Time[22:581]
Subscriber[observateur[0],process12] : onNext (201.6) ------Thread[main] ---- Time[22:583]
Observable (process2) onCompleted ------Thread[main] ---- Time[22:583]
Subscriber[observateur[0],process12].onCompleted ------Thread[main] ---- Time[22:584]
main : attente fin observation ------Thread[main] ---- Time[22:585]
main : fin observation ------Thread[main] ---- Time[22:586]
  • linha 4: o processo2 é o primeiro a emitir;
  • Linhas 8, 12: o process12 emite todos os elementos emitidos pelo process2 (linhas 4, 11);

7.6. Cadeia de processamento para um observável

7.6.1. Exemplo-19: transformação de um observável com [Observable.map]

Nos exemplos anteriores, examinámos várias combinações de dois observáveis num terceiro observável. Apresentamos agora métodos estáticos da classe [Observable] que permitem operações de transformação, filtragem e agregação num observável. Aqui encontraremos métodos análogos aos da classe [Stream] estudada na Secção 5.

O nosso primeiro exemplo será o seguinte:


package dvp.rxjava.observables.exemples;
 
import java.util.Random;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
 
public class Exemple19 {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Double> process1 = new Process<>(
                new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
                Schedulers.computation());
        // process 2
        Process<String> process2 = new Process<>("process2",
                process1.getObservable().map(d -> String.format("valeur-%s", d)));
        // subscriptions
        ProcessUtils.subscribe(1, process2);
    }
}
  • linhas 14–16: um processo denominado process1 emitirá 3 números reais numa thread de computação. Também será observado numa thread de computação;
  • linhas 17–18: os números emitidos pelo process1 serão convertidos em cadeias de caracteres num process2;
  • linha 20: observamos o process2;

O método [Observable.map] na linha 18 é análogo ao método [Stream.map] discutido na Secção 5.5:

 

Os resultados do exemplo são os seguintes:

main : début observation ------Thread[main] ---- Time[55:328]
main : attente fin observation ------Thread[main] ---- Time[55:346]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[55:347]
Observable (process1,0) onNext (21.599999999999998) ------Thread[RxComputationThreadPool-4] ---- Time[55:354]
Observable (process1,1) onNext (97.2) ------Thread[RxComputationThreadPool-4] ---- Time[55:512]
Subscriber[observateur[0],process2] : onNext ("valeur-21.599999999999998") ------Thread[RxComputationThreadPool-3] ---- Time[55:615]
Subscriber[observateur[0],process2] : onNext ("valeur-97.2") ------Thread[RxComputationThreadPool-3] ---- Time[55:616]
Observable (process1,2) onNext (98.39999999999999) ------Thread[RxComputationThreadPool-4] ---- Time[55:803]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[55:804]
Subscriber[observateur[0],process2] : onNext ("valeur-98.39999999999999") ------Thread[RxComputationThreadPool-3] ---- Time[55:804]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[55:805]
main : fin observation ------Thread[main] ---- Time[55:805]
  • linhas 4, 5 e 8: as emissões do process1. Estes são números reais;
  • linhas 6, 7, 10: as emissões observadas do process2. Estas são cadeias de caracteres;

7.6.2. Exemplo-20: filtrar um observável com [Observable.filter]

O exemplo será o seguinte:


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
 
public class Exemple20 {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // process 2
        Process<Integer> process2 = new Process<>("process2", process1.getObservable().filter(i -> i % 2 == 0));
        // subscriptions
        ProcessUtils.subscribe(1, process2);
    }
}
  • linhas 11-12: um processo chamado process1 emitirá números inteiros de 0 a 2 numa thread de trabalho. Também será observado numa thread de trabalho;
  • linha 14: os números emitidos pelo process1 serão filtrados de modo a que apenas os números pares sejam retidos no process2;
  • linha 20: observamos o process2;

O método [Observable.filter] na linha 18 é análogo ao método [Stream.filter] discutido na Secção 5.4:

 

Os resultados do exemplo são os seguintes:

main : début observation ------Thread[main] ---- Time[30:319]
main : attente fin observation ------Thread[main] ---- Time[30:335]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[30:336]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[30:388]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[30:625]
Subscriber[observateur[0],process2] : onNext (0) ------Thread[RxComputationThreadPool-3] ---- Time[30:703]
Observable (process1,2) onNext (2) ------Thread[RxComputationThreadPool-4] ---- Time[30:704]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[30:705]
Subscriber[observateur[0],process2] : onNext (2) ------Thread[RxComputationThreadPool-3] ---- Time[30:706]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[30:707]
main : fin observation ------Thread[main] ---- Time[30:707]
  • linhas 4, 5 e 7: emissões do process1;
  • linhas 6, 9: as emissões observadas do process2. Estes são os elementos pares do process1;

7.6.3. Exemplo 21: transformando um observável com [Observable.flatMap]

O exemplo será o seguinte:


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
 
public class Exemple21 {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // process 2
        Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMap(i -> {
            int value = i * 10;
            return Observable.just(value, value + 1, value + 2);
        }));
        // subscriptions
        ProcessUtils.subscribe(1, process2);
    }
}
  • linhas 12-13: um processo chamado process1 emitirá números inteiros de 0 a 2 numa thread de computação. Será também observado numa thread de computação;
  • linhas 15–18: cada número n emitido por process1 é transformado num observável que emite os 3 números (10*n, 10*n+1, 10*n+2). Se tivéssemos usado o método [map] na linha 15, o process2 emitiria um tipo Observable<Integer> em vez de um tipo Integer. O método [flatMap] utilizado permite-nos achatar esta sequência de elementos do tipo Observable<Integer> numa sequência de elementos do tipo Integer composta por cada elemento de cada Observable<Integer>;
  • linha 20: observamos o process2;

O método [Observable.flatMap] na linha 15 é análogo ao método [Stream.flatMap] discutido na Secção 5.6.12:

 

Os resultados do exemplo são os seguintes:

main : début observation ------Thread[main] ---- Time[31:466]
main : attente fin observation ------Thread[main] ---- Time[31:486]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[31:486]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[31:777]
Subscriber[observateur[0],process2] : onNext (0) ------Thread[RxComputationThreadPool-3] ---- Time[32:082]
Subscriber[observateur[0],process2] : onNext (1) ------Thread[RxComputationThreadPool-3] ---- Time[32:085]
Subscriber[observateur[0],process2] : onNext (2) ------Thread[RxComputationThreadPool-3] ---- Time[32:087]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[32:192]
Subscriber[observateur[0],process2] : onNext (10) ------Thread[RxComputationThreadPool-3] ---- Time[32:194]
Subscriber[observateur[0],process2] : onNext (11) ------Thread[RxComputationThreadPool-3] ---- Time[32:196]
Subscriber[observateur[0],process2] : onNext (12) ------Thread[RxComputationThreadPool-3] ---- Time[32:197]
Observable (process1,2) onNext (2) ------Thread[RxComputationThreadPool-4] ---- Time[32:686]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[32:687]
Subscriber[observateur[0],process2] : onNext (20) ------Thread[RxComputationThreadPool-3] ---- Time[32:688]
Subscriber[observateur[0],process2] : onNext (21) ------Thread[RxComputationThreadPool-3] ---- Time[32:690]
Subscriber[observateur[0],process2] : onNext (22) ------Thread[RxComputationThreadPool-3] ---- Time[32:692]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[32:693]
main : fin observation ------Thread[main] ---- Time[32:693]
  • linhas 5-7: as três emissões do process2 a seguir à emissão na linha 4 do process1;
  • linhas 9-11: as três emissões do process2 a seguir à emissão na linha 8 do process1;
  • linhas 14-16: as três emissões do processo2 a seguir à emissão na linha 12 do processo1;

O código seguinte mostra como criar um tipo Observable<Integer[]> a partir do processo1 [Exemplo 21b]:


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
 
public class Exemple21b {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // process 2
        Process<Integer[]> process2 = new Process<>("process2", process1.getObservable().map(i -> {
            int value = i * 10;
            return new Integer[] { value, value + 1, value + 2 };
        }));
        // subscriptions
        ProcessUtils.subscribe(1, process2);
    }
}
  • linha 14: é utilizado o método [Observable.map];
  • linha 16: que retorna um tipo Integer[];

Os resultados são os seguintes:

main : début observation ------Thread[main] ---- Time[58:089]
main : attente fin observation ------Thread[main] ---- Time[58:107]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[58:108]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[58:503]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[58:762]
Subscriber[observateur[0],process2] : onNext ([0,1,2]) ------Thread[RxComputationThreadPool-3] ---- Time[58:792]
Subscriber[observateur[0],process2] : onNext ([10,11,12]) ------Thread[RxComputationThreadPool-3] ---- Time[58:795]
Observable (process1,2) onNext (2) ------Thread[RxComputationThreadPool-4] ---- Time[58:851]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[58:852]
Subscriber[observateur[0],process2] : onNext ([20,21,22]) ------Thread[RxComputationThreadPool-3] ---- Time[58:853]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[58:854]
main : fin observation ------Thread[main] ---- Time[58:854]
  • linhas 6, 7, 10: vemos os resultados do map;

Todas estas transformações observáveis podem ser encadeadas, uma vez que cada transformação produz uma nova observável. Isto é demonstrado no exemplo seguinte [Exemplo21c]:


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
 
public class Exemple21c {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // process 2
        Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMap(i -> {
            int value = i * 10;
            return Observable.just(value, value + 1, value + 2);
        }).filter(i -> i % 2 == 0));
        // subscriptions
        ProcessUtils.subscribe(1, process2);
    }
}
  • linhas 15–18: o flatMap é seguido por um filter;

Os resultados da execução são os seguintes:

main : début observation ------Thread[main] ---- Time[37:993]
main : attente fin observation ------Thread[main] ---- Time[38:016]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[38:017]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[38:124]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[38:366]
Observable (process1,2) onNext (2) ------Thread[RxComputationThreadPool-4] ---- Time[38:380]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[38:381]
Subscriber[observateur[0],process2] : onNext (0) ------Thread[RxComputationThreadPool-3] ---- Time[38:436]
Subscriber[observateur[0],process2] : onNext (2) ------Thread[RxComputationThreadPool-3] ---- Time[38:439]
Subscriber[observateur[0],process2] : onNext (10) ------Thread[RxComputationThreadPool-3] ---- Time[38:441]
Subscriber[observateur[0],process2] : onNext (12) ------Thread[RxComputationThreadPool-3] ---- Time[38:443]
Subscriber[observateur[0],process2] : onNext (20) ------Thread[RxComputationThreadPool-3] ---- Time[38:445]
Subscriber[observateur[0],process2] : onNext (22) ------Thread[RxComputationThreadPool-3] ---- Time[38:446]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[38:447]
main : fin observation ------Thread[main] ---- Time[38:447]
  • linhas 8-13: process2 emitiu apenas os elementos pares de flatMap;

Um método semelhante ao [flatMap] é o método [flatMapIterable], ilustrado pelo seguinte exemplo [Exemplo21d]:


package dvp.rxjava.observables.exemples;
 
import java.util.Arrays;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
 
public class Exemple21d {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // process 2
        Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMapIterable(i -> {
            int value = i * 10;
            return Arrays.asList(value, value + 1, value + 2);
        }).filter(i -> i % 2 == 0));
        // subscriptions
        ProcessUtils.subscribe(1, process2);
    }
}

Linha 16: Em vez de usar o método [flatMap], usamos o método [flatMapIterable]. Neste caso, a função de transformação deve produzir um tipo Iterable<T> (linha 18) em vez de um tipo Observable<T>.

Obtemos os mesmos resultados que antes.

Voltemos à definição do método [flatMap]:

 

Como mostrado acima, um elemento azul [3] foi inserido entre os dois elementos verdes [1-2]. Isto significa que, ao achatar Observable<T>s, o método [flatMap] preserva a ordem de emissão destes vários observáveis internos. Isto é demonstrado pelo seguinte exemplo [Exemplo21e]:


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
 
public class Exemple21e {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // process 2
        Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
                Schedulers.computation(), Schedulers.computation());
        // process 3
        Process<Integer> process3 = new Process<>("process3",
                process1.getObservable().flatMap(i -> process2.getObservable()));
        // subscriptions
        ProcessUtils.subscribe(1, process3);
    }
}
  • linhas 11-12: o processo process1 emite os inteiros [0,1];
  • linhas 14-15: o processo2 emite os números inteiros [10,11,12];
  • linhas 17-18: cada elemento emitido pelo processo1 está associado ao observável do processo2. Isto significa que:
    • o elemento [0] do processo1 será associado a uma observável que emite [10,11,12];
    • o mesmo se aplica ao elemento 1;

No final, os 6 números [10, 11, 12, 10, 11, 12] serão emitidos. Queremos ver em que ordem.

Os resultados da execução são os seguintes:

main : début observation ------Thread[main] ---- Time[22:540]
main : attente fin observation ------Thread[main] ---- Time[22:566]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[22:566]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[22:949]
Observable (process2) call start ------Thread[RxComputationThreadPool-6] ---- Time[22:951]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[23:159]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[23:160]
Observable (process2) call start ------Thread[RxComputationThreadPool-8] ---- Time[23:160]
Observable (process2,0) onNext (10) ------Thread[RxComputationThreadPool-6] ---- Time[23:286]
Observable (process2,0) onNext (10) ------Thread[RxComputationThreadPool-8] ---- Time[23:513]
Subscriber[observateur[0],process3] : onNext (10) ------Thread[RxComputationThreadPool-5] ---- Time[23:597]
Subscriber[observateur[0],process3] : onNext (10) ------Thread[RxComputationThreadPool-5] ---- Time[23:599]
Observable (process2,1) onNext (11) ------Thread[RxComputationThreadPool-6] ---- Time[23:645]
Subscriber[observateur[0],process3] : onNext (11) ------Thread[RxComputationThreadPool-5] ---- Time[23:647]
Observable (process2,2) onNext (12) ------Thread[RxComputationThreadPool-6] ---- Time[23:789]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-6] ---- Time[23:790]
Subscriber[observateur[0],process3] : onNext (12) ------Thread[RxComputationThreadPool-5] ---- Time[23:791]
Observable (process2,1) onNext (11) ------Thread[RxComputationThreadPool-8] ---- Time[23:976]
Subscriber[observateur[0],process3] : onNext (11) ------Thread[RxComputationThreadPool-7] ---- Time[23:978]
Observable (process2,2) onNext (12) ------Thread[RxComputationThreadPool-8] ---- Time[24:184]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-8] ---- Time[24:184]
Subscriber[observateur[0],process3] : onNext (12) ------Thread[RxComputationThreadPool-7] ---- Time[24:186]
Subscriber[observateur[0],process3].onCompleted ------Thread[RxComputationThreadPool-7] ---- Time[24:187]
main : fin observation ------Thread[main] ---- Time[24:187]

Podemos ver que a ordem de emissão do process3 foi: [10, 10, 11, 12, 11, 12] (linhas 11, 12, 14, 17, 19, 22). Portanto, os elementos emitidos pelo process2 ficaram, de facto, misturados. Podemos evitar isto utilizando o método [concatMap] em vez do método [flatMap]. Isto é demonstrado pelo código seguinte [Exemplo21ef]:


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
 
public class Exemple21ef {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // process 2
        Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
                Schedulers.computation(), Schedulers.computation());
        // process 3
        Process<Integer> process3 = new Process<>("process3",
                process1.getObservable().concatMap(i -> process2.getObservable()));
        // subscriptions
        ProcessUtils.subscribe(1, process3);
    }
}

Na linha 18, substituímos [flatMap] por [concatMap]. Os resultados da execução são os seguintes:

main : début observation ------Thread[main] ---- Time[45:507]
main : attente fin observation ------Thread[main] ---- Time[45:530]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[45:530]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[45:775]
Observable (process2) call start ------Thread[RxComputationThreadPool-6] ---- Time[45:778]
Observable (process2,0) onNext (10) ------Thread[RxComputationThreadPool-6] ---- Time[45:846]
Observable (process2,1) onNext (11) ------Thread[RxComputationThreadPool-6] ---- Time[45:890]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[45:947]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[45:948]
Observable (process2,2) onNext (12) ------Thread[RxComputationThreadPool-6] ---- Time[46:096]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-6] ---- Time[46:097]
Subscriber[observateur[0],process3] : onNext (10) ------Thread[RxComputationThreadPool-5] ---- Time[46:144]
Subscriber[observateur[0],process3] : onNext (11) ------Thread[RxComputationThreadPool-5] ---- Time[46:147]
Subscriber[observateur[0],process3] : onNext (12) ------Thread[RxComputationThreadPool-5] ---- Time[46:148]
Observable (process2) call start ------Thread[RxComputationThreadPool-8] ---- Time[46:149]
Observable (process2,0) onNext (10) ------Thread[RxComputationThreadPool-8] ---- Time[46:364]
Subscriber[observateur[0],process3] : onNext (10) ------Thread[RxComputationThreadPool-7] ---- Time[46:366]
Observable (process2,1) onNext (11) ------Thread[RxComputationThreadPool-8] ---- Time[46:529]
Subscriber[observateur[0],process3] : onNext (11) ------Thread[RxComputationThreadPool-7] ---- Time[46:531]
Observable (process2,2) onNext (12) ------Thread[RxComputationThreadPool-8] ---- Time[46:558]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-8] ---- Time[46:559]
Subscriber[observateur[0],process3] : onNext (12) ------Thread[RxComputationThreadPool-7] ---- Time[46:560]
Subscriber[observateur[0],process3].onCompleted ------Thread[RxComputationThreadPool-7] ---- Time[46:562]
main : fin observation ------Thread[main] ---- Time[46:562]

Vemos que a ordem de emissão do processo3 foi: [10, 11, 12, 10, 11, 12] (linhas 12–14, 17, 19, 22). Os elementos emitidos pelo processo2 não foram baralhados.

Outra variante do método [map] é o método [switchMap]:

 

Acima, a partir do observável [1], são criados três outros observáveis [2] com dois elementos, que são então achatados como em [flatMap] [3]. Note-se que o resultado tem 5 elementos, e não 6. Isto deve-se ao facto de, antes de o segundo observável emitir o seu segundo elemento [6], o terceiro observável emitir o seu primeiro elemento [5], fazendo com que o segundo observável seja descartado. Portanto, o elemento [6] não é encontrado no observável resultante [3].

Para ilustrar [switchMap], utilizaremos o seguinte exemplo [Exemplo21eg]:


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
 
public class Exemple21eg {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // process 2
        Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
                Schedulers.computation(), Schedulers.computation());
        // process 3
        Process<Integer> process3 = new Process<>("process3",
                process1.getObservable().switchMap(i -> process2.getObservable()));
        // subscriptions
        ProcessUtils.subscribe(1, process3);
    }
}

A execução do exemplo produz os seguintes resultados:

main : début observation ------Thread[main] ---- Time[02:388]
main : attente fin observation ------Thread[main] ---- Time[02:419]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[02:419]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[02:641]
Observable (process2) call start ------Thread[RxComputationThreadPool-6] ---- Time[02:643]
Observable (process2,0) onNext (10) ------Thread[RxComputationThreadPool-6] ---- Time[02:802]
Observable (process2,1) onNext (11) ------Thread[RxComputationThreadPool-6] ---- Time[02:888]
Observable (process2,2) onNext (12) ------Thread[RxComputationThreadPool-6] ---- Time[02:957]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-6] ---- Time[02:958]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[03:005]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[03:007]
Observable (process2) call start ------Thread[RxComputationThreadPool-8] ---- Time[03:007]
Observable (process2,0) onNext (10) ------Thread[RxComputationThreadPool-8] ---- Time[03:106]
Subscriber[observateur[0],process3] : onNext (10) ------Thread[RxComputationThreadPool-5] ---- Time[03:106]
Subscriber[observateur[0],process3] : onNext (10) ------Thread[RxComputationThreadPool-5] ---- Time[03:108]
Observable (process2,1) onNext (11) ------Thread[RxComputationThreadPool-8] ---- Time[03:236]
Subscriber[observateur[0],process3] : onNext (11) ------Thread[RxComputationThreadPool-7] ---- Time[03:238]
Observable (process2,2) onNext (12) ------Thread[RxComputationThreadPool-8] ---- Time[03:716]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-8] ---- Time[03:717]
Subscriber[observateur[0],process3] : onNext (12) ------Thread[RxComputationThreadPool-7] ---- Time[03:718]
Subscriber[observateur[0],process3].onCompleted ------Thread[RxComputationThreadPool-7] ---- Time[03:718]
main : fin observation ------Thread[main] ---- Time[03:719]
  • process1 emite 2 elementos que dão origem a 2 observáveis process2 de 3 elementos;
  • linha 14: o observador recebe o elemento n.º 0 emitido pelo primeiro observável process2 na linha 6;
  • linha 15: o observador recebe o elemento #0 emitido pela segunda observável do processo2 na linha 13. A história não explica por que razão não recebeu anteriormente os elementos 1 e 2 emitidos pela primeira observável do processo2 nas linhas 7 e 8. De qualquer forma, a primeira observável do processo2 é abandonada;
  • no final, o observador vê apenas 4 elementos (linhas 14, 15, 17, 20) em vez dos 6 que foram emitidos;

7.6.4. Exemplos-22: Outros métodos da classe [Observable]

A classe [Observable] inclui muitos métodos da classe [Stream] que funcionam de forma semelhante. Aqui estão alguns deles. Apresentaremos simplesmente o código e os seus resultados.

[Exemplo 22a - take=limit]


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
 
public class Exemple22a {
    public static void main(String[] args) throws InterruptedException {
        // process
        Process<Integer> process = new Process<>("process", Observable.range(1, 10).take(3));
        // subscriptions
        ProcessUtils.subscribe(1, process);
    }
}

resultados

1
2
3
4
5
6
7
main : début observation ------Thread[main] ---- Time[25:071]
Subscriber[observateur[0],process] : onNext (1) ------Thread[main] ---- Time[25:399]
Subscriber[observateur[0],process] : onNext (2) ------Thread[main] ---- Time[25:402]
Subscriber[observateur[0],process] : onNext (3) ------Thread[main] ---- Time[25:404]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[25:404]
main : attente fin observation ------Thread[main] ---- Time[25:406]
main : fin observation ------Thread[main] ---- Time[25:406]

[Exemplo 22b - takeLast]


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
 
public class Exemple22b {
    public static void main(String[] args) throws InterruptedException {
        // process
        Process<Integer> process = new Process<>("process", Observable.range(1, 10).takeLast(2));
        // subscriptions
        ProcessUtils.subscribe(1, process);
    }
}

resultados

1
2
3
4
5
6
main : début observation ------Thread[main] ---- Time[19:440]
Subscriber[observateur[0],process] : onNext (9) ------Thread[main] ---- Time[19:726]
Subscriber[observateur[0],process] : onNext (10) ------Thread[main] ---- Time[19:728]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[19:728]
main : attente fin observation ------Thread[main] ---- Time[19:729]
main : fin observation ------Thread[main] ---- Time[19:730]

[Exemplo 22c - ignorar]


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
 
public class Exemple22c {
    public static void main(String[] args) throws InterruptedException {
        // process
        Process<Integer> process = new Process<>("process", Observable.range(1, 10).skip(5).take(2));
        // subscriptions
        ProcessUtils.subscribe(1, process);
    }
}

resultados

1
2
3
4
5
6
main : début observation ------Thread[main] ---- Time[16:685]
Subscriber[observateur[0],process] : onNext (6) ------Thread[main] ---- Time[17:002]
Subscriber[observateur[0],process] : onNext (7) ------Thread[main] ---- Time[17:004]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[17:005]
main : attente fin observation ------Thread[main] ---- Time[17:006]
main : fin observation ------Thread[main] ---- Time[17:006]

[Exemplo 22d - reduzir]


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
 
public class Exemple22d {
    public static void main(String[] args) throws InterruptedException {
        // process
        Process<Integer> process = new Process<>("process", Observable.range(1, 10).reduce(0, (i, a) -> i + a));
        // subscriptions
        ProcessUtils.subscribe(1, process);
    }
}
  • linha 10: calcula a soma dos elementos no observável. O resultado é um observável que emite essa soma;

resultados

1
2
3
4
5
main : début observation ------Thread[main] ---- Time[52:412]
Subscriber[observateur[0],process] : onNext (55) ------Thread[main] ---- Time[52:640]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[52:640]
main : attente fin observation ------Thread[main] ---- Time[52:642]
main : fin observation ------Thread[main] ---- Time[52:642]

[Exemplo 22e - todos]


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
 
public class Exemple22e {
    public static void main(String[] args) throws InterruptedException {
        // process
        Process<Boolean> process = new Process<>("process", Observable.range(1, 10).all(i -> i > 10));
        // subscriptions
        ProcessUtils.subscribe(1, process);
    }
}
  • linha 10: retorna um Observable<Boolean> que emite o elemento true se o predicado do método [all] for verdadeiro para todos os elementos; caso contrário, retorna false;

resultados

1
2
3
4
5
main : début observation ------Thread[main] ---- Time[59:866]
Subscriber[observateur[0],process] : onNext (false) ------Thread[main] ---- Time[00:069]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[00:070]
main : attente fin observation ------Thread[main] ---- Time[00:071]
main : fin observation ------Thread[main] ---- Time[00:071]

[Exemplo 22f - contagem]


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
 
public class Exemple22f {
    public static void main(String[] args) throws InterruptedException {
        // process
        Process<Integer> process = new Process<>("process", Observable.range(1, 10).count());
        // subscriptions
        ProcessUtils.subscribe(1, process);
    }
}
  • linha 10: [Observable.count] cria um observável de 1 elemento que é a soma dos elementos observados;

resultados

1
2
3
4
5
main : début observation ------Thread[main] ---- Time[16:409]
Subscriber[observateur[0],process] : onNext (10) ------Thread[main] ---- Time[16:634]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[16:634]
main : attente fin observation ------Thread[main] ---- Time[16:635]
main : fin observation ------Thread[main] ---- Time[16:635]

[Exemplo 22g - distinto]


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
 
public class Exemple22g {
    public static void main(String[] args) throws InterruptedException {
        // process
        Process<Integer> process = new Process<>("process", Observable.just(1, 2, 1, 3).distinct());
        // subscriptions
        ProcessUtils.subscribe(1, process);
    }
}

resultados

1
2
3
4
5
6
7
main : début observation ------Thread[main] ---- Time[05:373]
Subscriber[observateur[0],process] : onNext (1) ------Thread[main] ---- Time[05:594]
Subscriber[observateur[0],process] : onNext (2) ------Thread[main] ---- Time[05:595]
Subscriber[observateur[0],process] : onNext (3) ------Thread[main] ---- Time[05:596]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[05:597]
main : attente fin observation ------Thread[main] ---- Time[05:597]
main : fin observation ------Thread[main] ---- Time[05:597]

[ Exemplo 22h - groupBy, asObservable]


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.observables.GroupedObservable;
 
public class Exemple22h {
    public static void main(String[] args) throws InterruptedException {
        // process
        Observable<GroupedObservable<Boolean, Integer>> obs = Observable.range(1, 10).groupBy(i -> i % 2 == 0);
        Process<Integer> process = new Process<>("process", obs.concatMap(g -> g.asObservable()));
        // subscriptions
        ProcessUtils.subscribe(1, process);
    }
}
  • Linha 11: O método [groupBy] agrupa os 10 elementos emitidos em dois grupos: números pares e números ímpares. O resultado é um Observable<GroupedObservable<Boolean, Integer>>, ou seja, um observável cujos elementos são do tipo GroupedObservable<Boolean, Integer>, onde Boolean é o tipo da chave do grupo (false, true neste caso) e é também o tipo do resultado da lambda passada como parâmetro ao método [groupBy], e Integer é o tipo dos elementos do grupo;
  • linha 12: o tipo GroupedObservable possui um método [asObservable] que nos permite criar um observável a partir deste tipo. Teremos, portanto, dois tipos Observable<Integer>, um para números pares e outro para números ímpares. A partir destes dois observáveis, o método [concatMap] criará um único observável;

resultados

main : début observation ------Thread[main] ---- Time[23:809]
Subscriber[observateur[0],process] : onNext (1) ------Thread[main] ---- Time[24:034]
Subscriber[observateur[0],process] : onNext (3) ------Thread[main] ---- Time[24:036]
Subscriber[observateur[0],process] : onNext (5) ------Thread[main] ---- Time[24:037]
Subscriber[observateur[0],process] : onNext (7) ------Thread[main] ---- Time[24:038]
Subscriber[observateur[0],process] : onNext (9) ------Thread[main] ---- Time[24:039]
Subscriber[observateur[0],process] : onNext (2) ------Thread[main] ---- Time[24:041]
Subscriber[observateur[0],process] : onNext (4) ------Thread[main] ---- Time[24:043]
Subscriber[observateur[0],process] : onNext (6) ------Thread[main] ---- Time[24:044]
Subscriber[observateur[0],process] : onNext (8) ------Thread[main] ---- Time[24:045]
Subscriber[observateur[0],process] : onNext (10) ------Thread[main] ---- Time[24:046]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[24:047]
main : attente fin observation ------Thread[main] ---- Time[24:047]
main : fin observation ------Thread[main] ---- Time[24:048]

[Exemplo 22i - carimbo de data/hora]


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
import rx.schedulers.Timestamped;
 
public class Exemple22i {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // process 2
        Process<Timestamped<Integer>> process2 = new Process<>("process2", process1.getObservable().timestamp());
        // subscriptions
        ProcessUtils.subscribe(1, process2);
    }
}
  • na linha 15, o método [timestamp] associa um carimbo de data/hora a cada elemento processado do observável;

resultados

main : début observation ------Thread[main] ---- Time[59:362]
main : attente fin observation ------Thread[main] ---- Time[59:377]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[59:378]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[59:553]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[59:692]
Subscriber[observateur[0],process2] : onNext ({"timestampMillis":1462975259555,"value":0}) ------Thread[RxComputationThreadPool-3] ---- Time[59:789]
Subscriber[observateur[0],process2] : onNext ({"timestampMillis":1462975259789,"value":1}) ------Thread[RxComputationThreadPool-3] ---- Time[59:791]
Observable (process1,2) onNext (2) ------Thread[RxComputationThreadPool-4] ---- Time[00:025]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[00:027]
Subscriber[observateur[0],process2] : onNext ({"timestampMillis":1462975260026,"value":2}) ------Thread[RxComputationThreadPool-3] ---- Time[00:031]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[00:033]
main : fin observation ------Thread[main] ---- Time[00:034]

Neste exemplo, é difícil perceber o que as informações de carimbo de data/hora representam:

  • linhas 4-5: vemos que o elemento 1 do processo1 foi emitido 139 ms após o elemento 0;
  • linhas 6 e 7: vemos que o elemento 1 do processo2 foi observado 234 ms após o elemento 0;
  • linhas 5 e 8: vemos que o elemento 2 do processo1 foi emitido 33 ms após o elemento 1;
  • linhas 7 e 10: vemos que o elemento 2 do processo2 foi observado 37 ms após o elemento 1;

Estes atrasos devem-se ao facto de as threads para observar e executar os observáveis não serem as mesmas. Se substituirmos as linhas 12–13 pelas seguintes linhas (Exemplo 22j):


// processus 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
null);
  • Linhas 2–3: Não especificamos o segmento de observação. Sabemos que, neste caso, o observável é observado no local onde é executado;

Isto produz os seguintes resultados:

main : début observation ------Thread[main] ---- Time[43:834]
main : attente fin observation ------Thread[main] ---- Time[43:845]
Observable (process1) call start ------Thread[RxComputationThreadPool-1] ---- Time[43:846]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-1] ---- Time[44:291]
Subscriber[observateur[0],process2] : onNext ({"timestampMillis":1462976384293,"value":0}) ------Thread[RxComputationThreadPool-1] ---- Time[44:552]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-1] ---- Time[44:878]
Subscriber[observateur[0],process2] : onNext ({"timestampMillis":1462976384879,"value":1}) ------Thread[RxComputationThreadPool-1] ---- Time[44:884]
Observable (process1,2) onNext (2) ------Thread[RxComputationThreadPool-1] ---- Time[45:274]
Subscriber[observateur[0],process2] : onNext ({"timestampMillis":1462976385275,"value":2}) ------Thread[RxComputationThreadPool-1] ---- Time[45:280]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[45:281]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[45:283]
main : fin observation ------Thread[main] ---- Time[45:284]
  • linhas 4 e 6: o process1 emite o seu elemento n.º 1 587 ms após o seu elemento n.º 0;
  • linhas 5 e 7: o observador observa estes dois elementos com um intervalo de 586 ms;
  • linhas 6 e 8: o processo1 emite o seu elemento #2 396 ms após o seu elemento #1;
  • linhas 7 e 9: o observador observa estes dois elementos com uma diferença de tempo de 396 ms;

Aqui, os valores dos carimbos de data/hora são consistentes: representam com precisão o tempo de transmissão do elemento.

7.7. Agendadores

7.7.1. Exemplo 23: o agendador [Schedulers.computation]

Vamos agora examinar os agendadores de execução. A observação será feita na thread de execução.

O tema dos agendadores é um pouco obscuro. Os vários agendadores são apresentados nesta pergunta no site StackOverflow [http://stackoverflow.com/questions/31276164/rxjava-schedulers-use-cases]:

 

Tentaremos ilustrar a utilização destes diferentes agendadores com exemplos. O primeiro ilustra o agendador [Schedulers.computation]:


package dvp.rxjava.observables.exemples;
 
import java.util.Random;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
 
public class Exemple23 {
    public static void main(String[] args) throws InterruptedException {
        // processes
        @SuppressWarnings("unchecked")
        Process<Double> processes[] = new Process[10];
        for (int i = 0; i < processes.length; i++) {
            processes[i] = new Process<>(
                    new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
                    Schedulers.computation(), null);
        }
        // subscriptions
        ProcessUtils.subscribe(1, processes);
    }
}
  • linhas 14–19: criamos uma matriz de 10 processos em execução numa thread de computação;
  • linha 17: cada processo gera um número real aleatório;
  • linha 21: subscrevemos todos estes processos;

Os resultados são os seguintes:

main : début observation ------Thread[main] ---- Time[01:034]
Observable (process0) call start ------Thread[RxComputationThreadPool-1] ---- Time[01:042]
Observable (process2) call start ------Thread[RxComputationThreadPool-3] ---- Time[01:042]
Observable (process1) call start ------Thread[RxComputationThreadPool-2] ---- Time[01:042]
Observable (process5) call start ------Thread[RxComputationThreadPool-6] ---- Time[01:043]
Observable (process7) call start ------Thread[RxComputationThreadPool-8] ---- Time[01:043]
Observable (process4) call start ------Thread[RxComputationThreadPool-5] ---- Time[01:042]
Observable (process3) call start ------Thread[RxComputationThreadPool-4] ---- Time[01:042]
main : attente fin observation ------Thread[main] ---- Time[01:043]
Observable (process6) call start ------Thread[RxComputationThreadPool-7] ---- Time[01:043]
Observable (process3,0) onNext (70.8) ------Thread[RxComputationThreadPool-4] ---- Time[01:115]
Observable (process1,0) onNext (13.2) ------Thread[RxComputationThreadPool-2] ---- Time[01:153]
Observable (process0,0) onNext (63.599999999999994) ------Thread[RxComputationThreadPool-1] ---- Time[01:215]
Subscriber[observateur[0],process0] : onNext (63.599999999999994) ------Thread[RxComputationThreadPool-1] ---- Time[01:326]
Subscriber[observateur[0],process3] : onNext (70.8) ------Thread[RxComputationThreadPool-4] ---- Time[01:326]
Subscriber[observateur[0],process1] : onNext (13.2) ------Thread[RxComputationThreadPool-2] ---- Time[01:326]
Observable (process3) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[01:326]
Observable (process0) onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[01:326]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-2] ---- Time[01:327]
Subscriber[observateur[0],process0].onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[01:327]
Subscriber[observateur[0],process3].onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[01:327]
Subscriber[observateur[0],process1].onCompleted ------Thread[RxComputationThreadPool-2] ---- Time[01:327]
Observable (process8) call start ------Thread[RxComputationThreadPool-1] ---- Time[01:329]
Observable (process9) call start ------Thread[RxComputationThreadPool-2] ---- Time[01:329]
...
main : fin observation ------Thread[main] ---- Time[01:610]
  • linhas 2-10: os primeiros 8 processos iniciam-se em 8 threads diferentes (a máquina utilizada tem 8 núcleos). Note-se que todos eles iniciam-se aproximadamente ao mesmo tempo;
  • linhas 17-19: 3 processos terminam, libertando assim 3 threads;
  • linhas 23-24: os dois últimos processos podem então iniciar, utilizando 2 das threads assim libertadas;

Podemos, portanto, concluir que o agendador [Schedulers.computation] fornece um conjunto de n threads, onde n é o número de núcleos na máquina. As threads são executadas em paralelo nestes núcleos.

7.7.2. Exemplo-24: o agendador [Schedulers.io]

Executamos o código anterior com o agendador [Schedulers.io]:


package dvp.rxjava.observables.exemples;
 
import java.util.Random;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
 
public class Exemple24 {
    public static void main(String[] args) throws InterruptedException {
        // processes
        @SuppressWarnings("unchecked")
        Process<Double> processes[] = new Process[10];
        for (int i = 0; i < processes.length; i++) {
            processes[i] = new Process<>(
                    new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
                    Schedulers.io(), null);
        }
        // subscriptions
        ProcessUtils.subscribe(1, processes);
    }
}
  • linha 18: os processos são executados utilizando os threads do agendador [Schedulers.io];

Isto produz os seguintes resultados:

main : début observation ------Thread[main] ---- Time[03:451]
Observable (process0) call start ------Thread[RxCachedThreadScheduler-1] ---- Time[03:459]
Observable (process1) call start ------Thread[RxCachedThreadScheduler-2] ---- Time[03:459]
Observable (process2) call start ------Thread[RxCachedThreadScheduler-3] ---- Time[03:460]
Observable (process3) call start ------Thread[RxCachedThreadScheduler-4] ---- Time[03:460]
Observable (process4) call start ------Thread[RxCachedThreadScheduler-5] ---- Time[03:464]
Observable (process5) call start ------Thread[RxCachedThreadScheduler-6] ---- Time[03:464]
Observable (process6) call start ------Thread[RxCachedThreadScheduler-7] ---- Time[03:465]
Observable (process8) call start ------Thread[RxCachedThreadScheduler-9] ---- Time[03:465]
Observable (process9) call start ------Thread[RxCachedThreadScheduler-10] ---- Time[03:465]
main : attente fin observation ------Thread[main] ---- Time[03:465]
Observable (process7) call start ------Thread[RxCachedThreadScheduler-8] ---- Time[03:465]
Observable (process7,0) onNext (54.0) ------Thread[RxCachedThreadScheduler-8] ---- Time[03:473]
Observable (process8,0) onNext (116.39999999999999) ------Thread[RxCachedThreadScheduler-9] ---- Time[03:500]
Observable (process6,0) onNext (105.6) ------Thread[RxCachedThreadScheduler-7] ---- Time[03:506]
Observable (process0,0) onNext (96.0) ------Thread[RxCachedThreadScheduler-1] ---- Time[03:509]
Observable (process5,0) onNext (25.2) ------Thread[RxCachedThreadScheduler-6] ---- Time[03:583]
Observable (process3,0) onNext (97.2) ------Thread[RxCachedThreadScheduler-4] ---- Time[03:684]
Subscriber[observateur[0],process7] : onNext (54.0) ------Thread[RxCachedThreadScheduler-8] ---- Time[03:685]
Subscriber[observateur[0],process6] : onNext (105.6) ------Thread[RxCachedThreadScheduler-7] ---- Time[03:685]
Subscriber[observateur[0],process0] : onNext (96.0) ------Thread[RxCachedThreadScheduler-1] ---- Time[03:685]
Subscriber[observateur[0],process8] : onNext (116.39999999999999) ------Thread[RxCachedThreadScheduler-9] ---- Time[03:685]
Observable (process0) onCompleted ------Thread[RxCachedThreadScheduler-1] ---- Time[03:686]
Observable (process6) onCompleted ------Thread[RxCachedThreadScheduler-7] ---- Time[03:686]
Observable (process7) onCompleted ------Thread[RxCachedThreadScheduler-8] ---- Time[03:685]
...
main : fin observation ------Thread[main] ---- Time[03:933]
  • linhas 2-10: cada um dos 10 processos inicia numa thread diferente. Ao contrário do caso anterior, todos os processos conseguiram ser iniciados. Note-se que estes inícios demoram 6 ms, enquanto anteriormente demoravam 1 ms;
  • linhas 13-18: os observáveis são emitidos um após o outro e não de forma quase paralela, como era o caso anteriormente;

Qual é a diferença entre os agendadores [Schedulers.io] e [Schedulers.computation]? Pode encontrar uma resposta na URL [http://stackoverflow.com/questions/31276164/rxjava-schedulers-use-cases]:

 

7.7.3. Exemplo-25: o agendador [Schedulers.newThread]

Executamos o código anterior utilizando o agendador [Schedulers.newThread]:


package dvp.rxjava.observables.exemples;
 
import java.util.Random;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
 
public class Exemple25 {
    public static void main(String[] args) throws InterruptedException {
        // processes
        @SuppressWarnings("unchecked")
        Process<Double> processes[] = new Process[10];
        for (int i = 0; i < processes.length; i++) {
            processes[i] = new Process<>(
                    new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
                    Schedulers.newThread(), null);
        }
        // subscriptions
        ProcessUtils.subscribe(1, processes);
    }
}

Os resultados obtidos são os mesmos que com o agendador [Schedulers.io]:

main : début observation ------Thread[main] ---- Time[17:058]
Observable (process0) call start ------Thread[RxNewThreadScheduler-1] ---- Time[17:065]
Observable (process1) call start ------Thread[RxNewThreadScheduler-2] ---- Time[17:065]
Observable (process2) call start ------Thread[RxNewThreadScheduler-3] ---- Time[17:066]
Observable (process3) call start ------Thread[RxNewThreadScheduler-4] ---- Time[17:066]
Observable (process4) call start ------Thread[RxNewThreadScheduler-5] ---- Time[17:068]
Observable (process5) call start ------Thread[RxNewThreadScheduler-6] ---- Time[17:069]
Observable (process6) call start ------Thread[RxNewThreadScheduler-7] ---- Time[17:069]
Observable (process8) call start ------Thread[RxNewThreadScheduler-9] ---- Time[17:069]
Observable (process7) call start ------Thread[RxNewThreadScheduler-8] ---- Time[17:069]
Observable (process9) call start ------Thread[RxNewThreadScheduler-10] ---- Time[17:069]
main : attente fin observation ------Thread[main] ---- Time[17:069]
Observable (process6,0) onNext (25.2) ------Thread[RxNewThreadScheduler-7] ---- Time[17:120]
Observable (process3,0) onNext (39.6) ------Thread[RxNewThreadScheduler-4] ---- Time[17:193]
Observable (process5,0) onNext (21.599999999999998) ------Thread[RxNewThreadScheduler-6] ---- Time[17:212]
Observable (process0,0) onNext (19.2) ------Thread[RxNewThreadScheduler-1] ---- Time[17:273]
Observable (process8,0) onNext (81.6) ------Thread[RxNewThreadScheduler-9] ---- Time[17:308]
Subscriber[observateur[0],process3] : onNext (39.6) ------Thread[RxNewThreadScheduler-4] ---- Time[17:331]
Subscriber[observateur[0],process0] : onNext (19.2) ------Thread[RxNewThreadScheduler-1] ---- Time[17:331]
Subscriber[observateur[0],process6] : onNext (25.2) ------Thread[RxNewThreadScheduler-7] ---- Time[17:331]
Subscriber[observateur[0],process8] : onNext (81.6) ------Thread[RxNewThreadScheduler-9] ---- Time[17:331]
Subscriber[observateur[0],process5] : onNext (21.599999999999998) ------Thread[RxNewThreadScheduler-6] ---- Time[17:331]
Observable (process8) onCompleted ------Thread[RxNewThreadScheduler-9] ---- Time[17:333]
Observable (process5) onCompleted ------Thread[RxNewThreadScheduler-6] ---- Time[17:333]
Observable (process6) onCompleted ------Thread[RxNewThreadScheduler-7] ---- Time[17:332]
Observable (process0) onCompleted ------Thread[RxNewThreadScheduler-1] ---- Time[17:332]
Observable (process3) onCompleted ------Thread[RxNewThreadScheduler-4] ---- Time[17:332]
...
main : fin observation ------Thread[main] ---- Time[17:571]

Na URL [http://stackoverflow.com/questions/33415881/retrofit-with-rxjava-schedulers-newthread-vs-schedulers-io], explica-se que o agendador [Schedulers.io] fornece um conjunto de threads, o que o agendador [Schedulers.newThread] não faz. Um conjunto de threads cria automaticamente um conjunto de threads. Este aloca-as aos processos que delas necessitam. Quando estes processos terminam, as suas threads não são eliminadas, mas regressam ao conjunto e podem então ser reutilizadas por outro processo. Isto é mais eficiente do que criar e eliminar threads constantemente. Por conseguinte, é preferível utilizar o agendador [Schedulers.io].

7.7.4. Exemplo 26: Os programadores [Schedulers.immediate, Schedulers.trampoline]

Voltemos à explicação fornecida para estes dois agendadores:

 

A explicação é bastante fácil de compreender, mas quando tentamos ilustrá-la, percebemos que ainda não a compreendemos bem. Foi o livro *Learning Reactive Programming With Java 8* que me ajudou a criar um exemplo baseado num que se encontra nesse livro, mas simplificado. Aqui está:


package dvp.rxjava.observables.exemples;
 
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.function.Consumer;
 
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Scheduler;
import rx.Scheduler.Worker;
import rx.functions.Action0;
import rx.schedulers.Schedulers;
 
public class Exemple26 {
    public static void main(String[] args) throws InterruptedException {
 
        // a scheduler
        Scheduler scheduler = Schedulers.immediate();
        // a worker of this scheme
        Worker worker = scheduler.createWorker();
        // an Action0 type to be executed on the worker
        Action0 action02 = new Action0() {
            @Override
            public void call() {
                // log action02
                ProcessUtils.showInfos.accept("action02");
            }
        };
 
        // an Action0 type to be executed on the worker
        Action0 action01 = new Action0() {
            @Override
            public void call() {
                // program a new action on the same worker
                worker.schedule(action02);
                // log action01
                ProcessUtils.showInfos.accept("action01");
            }
        };
        // action01 is programmed on the worker
        worker.schedule(action01);
    }
 
    // displays
    static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
            Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
 
}
  • linha 17: um agendador. Este será [Schedulers.immediate], como mostrado aqui, ou [Schedulers.trampoline] mais tarde;
  • linha 19: Ações do tipo Action0 (linhas 21, 20) podem ser executadas nos trabalhadores do agendador. O método [Scheduler.createWorker] cria um trabalhador. O método [Worker.schedule(Action0)] executa um tipo Action0 através de um trabalhador;
  • linhas 21–27: uma primeira ação chamada [action02] que será executada (linha 40) pelo trabalhador da linha 19;
  • linhas 30–38: uma segunda ação chamada [action01]. Tem a particularidade de fazer com que a action02 seja executada no mesmo trabalhador que ela própria (linha 34). É aqui que reside a diferença entre [Schedulers.immediate] e [Schedulers.trampoline]:
    • se o agendador for [Schedulers.immediate], então na linha 34, a ação action02 será executada imediatamente (daí o nome do agendador) e a ação action01 atualmente em execução será interrompida. Veremos então aparecer a mensagem da linha 25. Assim que a ação action02 terminar, a ação action01 será retomada e veremos a mensagem da linha 36;
    • se o agendador for [Schedulers.trampoline], então, na linha 34, a ação action02 é colocada na fila. Ela não será executada até que a tarefa atual, action01, esteja concluída. A mensagem na linha 36 aparecerá então. Assim que a ação action01 estiver concluída, a ação action02 será executada e a mensagem na linha 25 aparecerá;

A execução do código acima produz os seguintes resultados:

action02 ------Thread[main] ---- Time[38:480]
action01 ------Thread[main] ---- Time[38:485]

Se, na linha 17, utilizarmos o agendador [Schedulers.trampoline], obtemos os resultados opostos:

action01 ------Thread[main] ---- Time[42:972]
action02 ------Thread[main] ---- Time[42:976]

Dito isto, é difícil estabelecer uma ligação com observáveis. Não encontrei nenhum exemplo convincente que demonstrasse a vantagem de executar um observável numa destas duas threads. Aqui está um, no entanto, que não me parece nada natural:


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.Scheduler.Worker;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
 
public class Exemple27 {
    public static void main(String[] args) throws InterruptedException {
 
        // Worker
        Worker worker = Schedulers.immediate().createWorker();
        // Worker worker = Schedulers.trampoline().createWorker();
        // observable 1 sur worker
        worker.schedule(() -> Observable.range(1, 2).subscribe(new Action1<Integer>() {
 
            @Override
            public void call(Integer i) {
                ProcessUtils.showInfos.accept(String.valueOf(i));
                // observable 2 on same worker
                worker.schedule(() -> Observable.range(100, 2).subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer i) {
                        ProcessUtils.showInfos.accept(String.valueOf(i));
                    }
                }));
            }
        }));
    }
}
  • linhas 13–14: é criado um trabalhador utilizando um dos dois agendadores [Schedulers.immediate] e [Schedulers.trampoline];
  • linha 16: um primeiro observável obs1 é agendado neste trabalhador para emitir os números [1,2]
  • linha 22: cada vez que um elemento deste observável obs1 é observado, a observação de um segundo observável obs2 é iniciada no mesmo trabalhador para emitir os números [100,101];

Com o agendador [Schedulers.immediate], obtemos os seguintes resultados:

1
2
3
4
5
6
1 ------Thread[main] ---- Time[44:604]
100 ------Thread[main] ---- Time[44:610]
101 ------Thread[main] ---- Time[44:610]
2 ------Thread[main] ---- Time[44:612]
100 ------Thread[main] ---- Time[44:612]
101 ------Thread[main] ---- Time[44:612]

Já com o agendador [Schedulers.trampoline], obtemos os seguintes resultados:

1
2
3
4
5
6
1 ------Thread[main] ---- Time[14:107]
2 ------Thread[main] ---- Time[14:114]
100 ------Thread[main] ---- Time[14:115]
101 ------Thread[main] ---- Time[14:115]
100 ------Thread[main] ---- Time[14:115]
101 ------Thread[main] ---- Time[14:116]

7.8. Conclusão

Ainda há muito a fazer. Para obter uma compreensão mais profunda da biblioteca RxJava, encorajamos os leitores a continuar a sua aprendizagem utilizando as referências fornecidas no início deste documento. No entanto, dispomos agora dos conhecimentos básicos necessários para utilizar o RxJava em ambientes Swing e Android. É isso que iremos demonstrar a seguir.