Skip to content

7. A biblioteca RxJava

A biblioteca RxJava baseia-se no seguinte conceito: um fluxo de elementos do tipo T Observable<T> é observado por um ou mais subscritores (assinantes, observadores, consumidores) Subscriber<T>. A biblioteca RxJava permite que o fluxo Observable<T> seja executado num thread T1 e o seu observador Subscriber<T> num thread T2, sem que o programadortenha de se preocupar com a gestão do ciclo de vida dessas threads nem com problemas naturalmente complexos, tais como a partilha de dados entre threads e a sincronização das mesmas para executar uma tarefa global. Facilita, assim, a programação assíncrona.

Um fluxo Observable<T> produz elementos do tipo T, observáveis à medida que são produzidos. Se o observador e o observável (termo que designa o tipo Observable<T>, por conveniência) se encontrarem na mesma thread, então o observável só pode produzir o elemento (i+1) quando o observador tiver consumido o elemento i. São poucos os casos em que esta arquitetura se revela vantajosa. Se o observador e o observável não se encontrarem no mesmo thread, então o observável e o seu observador têm comportamentos autónomos: o observável produz ao seu próprio ritmo e o observador consome ao seu próprio ritmo. É aí que reside o interesse da biblioteca. Até agora, temos falado sempre de um único observador. Na realidade, um observável pode ter um número qualquer de observadores.

A biblioteca RxJava está particularmente bem adaptada à arquitetura apresentada no parágrafo 2 da introdução e que aqui recordamos:

Image

  • em [1], uma camada de serviços presta serviços, alguns dos quais demoram a ser obtidos (por exemplo, pedidos de rede);
  • esta camada de serviços é invocada por uma interface gráfica [1] (Swing, Android, JavaFx). Se a camada de serviços for executada no mesmo thread que o método [swing] que a utiliza, a interface gráfica fica congelada (não reativa) enquanto aguarda o resultado do serviço;
  • em [2], uma fina camada de adaptação implementada com RxJava permite apresentar à camada gráfica uma implementação assíncrona do mesmo serviço: este pode ser executado numa thread diferente daquela do método da camada gráfica que o invoca. Neste caso, a interface gráfica [3] mantém-se responsiva: o utilizador pode continuar a interagir com ela, por exemplo, iniciar um novo pedido de rede em paralelo com o primeiro e, sobretudo, é possível oferecer-lhe a possibilidade de cancelar processamentos demasiado demorados, algo impossível se a interface gráfica estiver congelada;
  • a chamada [4] é síncrona, enquanto a chamada [5-6] é assíncrona;

Nesta arquitetura, a camada [2] fornece serviços que devolvem tipos Observable<T>, aos quais os métodos da camada gráfica [3] podem subscrever. Um serviço da camada [2] fornece então os seus resultados um a um e a camada [3] pode reagir a cada um deles, atualizando, por exemplo, um ou vários componentes da interface gráfica.

A classe Observable<T> possui várias dezenas de métodos. Esta é uma das dificuldades da biblioteca: é muito rica e é difícil compreender todas as suas possibilidades. Vamos apresentar algumas delas. O domínio dos restantes métodos virá com o tempo.

7.1. Criar observáveis e subscrevê-los

7.1.1. Exemplo-01: o método [Observable.from]

  

Consideremos o seguinte código:


package dvp.rxjava.observables;

import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;

import java.util.Arrays;

public class Exemple01 {
  public static void main(String[] args) {
    // observáveis de inteiros
    Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
    obs1.subscribe(new Action1<Integer>() {
      @Override
      public void call(Integer integer) {
        System.out.printf("next : %s%n", integer);
      }
    }, new Action1<Throwable>() {
      @Override
      public void call(Throwable throwable) {
        System.out.println(throwable);
      }
    }, new Action0() {
      @Override
      public void call() {
        System.out.println("completed");
      }
    });
  }
}
  • linha 12: cria-se um tipo Observable<Integer> a partir de uma lista de inteiros.

A classe Observable<T> é um fluxo de elementos do tipo T que podem ser observados, de preferência de forma assíncrona, mas não necessariamente, à medida que são produzidos. A sua definição é a seguinte:

 

Como já foi referido, a classe Observable<T> possui várias dezenas de métodos. Alguns são semelhantes aos da classe Stream<T> analisada no parágrafo 5. A documentação da RxJava inclui «diagramas de mármore» [2] que ilustram o funcionamento destes métodos:

  • a linha 3 ilustra as emissões do observável ao longo do tempo;
  • o método [4] é aplicado aos elementos emitidos pelo observável. Em geral, produz um novo observável;
  • a linha 5 mostra o novo observável obtido;

O método [Observable.from] tem a seguinte assinatura:

 

O método estático [Observable.from] permite criar um Observable<T> a partir de uma coleção de elementos do tipo T. É uma forma muito simples de começar a trabalhar com observáveis. A linha:


    Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));

irá, portanto, emitir três elementos. Não os emite imediatamente. Irá emiti-los na íntegra sempre que um observador se registar. É o que se denomina um observável «frio». O observável reemite os seus elementos para cada novo subscritor.

Pode-se considerar a instrução anterior como uma ação de configuração do observável. Este é configurado uma vez e executado n vezes, caso surjam n subscritores.

Como é que se subscreve?

Uma forma de o fazer é utilizar o método [Observable.subscribe], cuja definição utilizada aqui é a seguinte:

 
  • o primeiro parâmetro [Action1<T> onNext] (ver parágrafo 6.2) do método é o método a executar quando o observável emite um novo elemento T;
  • o segundo parâmetro [Action1<Throwable> onError] do método é o método a executar quando o observável lança uma exceção;
  • o terceiro parâmetro [Action0 onComplete] (ver parágrafo 6.1) do método é o método a executar quando o observável lança uma exceção;
  • o método devolve um tipo [Subscription];

O tipo [Subscription] representa uma subscrição do observável. A sua definição é a seguinte:

 

O interesse desta interface [1] reside no seu método [2], que permite cancelar uma subscrição.

No nosso exemplo, o código da subscrição do observável é o seguinte:


    obs1.subscribe(new Action1<Integer>() {
      @Override
      public void call(Integer integer) {
        System.out.printf("next : %s%n", integer);
      }
    }, new Action1<Throwable>() {
      @Override
      public void call(Throwable throwable) {
        System.out.println(throwable);
      }
    }, new Action0() {
      @Override
      public void call() {
        System.out.println("completed");
      }
});
  • linha 1: o resultado do tipo [Subscription] é ignorado;
  • linhas 1-15: os três parâmetros são instâncias de classes anónimas. Também utilizaremos lambdas. A vantagem das classes anónimas é que se consegue ver claramente os tipos de dados esperados pelo único método dessas classes;
  • linhas 2-5: implementação do primeiro parâmetro do tipo [Action1<Integer>];
  • linhas 6-10: implementação do segundo parâmetro do tipo [Action1<Throwable>];
  • linhas 11-15: implementação do terceiro parâmetro do tipo [Action0];

O código completo é o seguinte:


package dvp.rxjava.observables;

import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;

import java.util.Arrays;

public class Exemple01 {
  public static void main(String[] args) {
    // observáveis de inteiros
    Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
    // assinatura
    obs1.subscribe(new Action1<Integer>() {
      @Override
      public void call(Integer integer) {
        System.out.printf("next : %s%n", integer);
      }
    }, new Action1<Throwable>() {
      @Override
      public void call(Throwable throwable) {
        System.out.println(throwable);
      }
    }, new Action0() {
      @Override
      public void call() {
        System.out.println("completed");
      }
    });
  }
}

O observável da linha 12 começa a emitir os seus 3 elementos assim que o método [subscribe] é chamado na linha 14. A partir desse momento:

  • a cada elemento emitido, as linhas 15-18 são executadas.
  • no final dos 3 elementos, as linhas 24-29 são executadas;
  • as linhas 19-24 nunca serão executadas, uma vez que o observável não emite aqui nenhuma exceção;

Por predefinição, o observável e o observador são executados no mesmo thread. Existem alguns observáveis predefinidos que são executados num thread diferente do thread principal (neste caso, o thread do método main), mas para a maioria deles não é esse o caso. Assim, tudo ocorre aqui na thread do método [main]:

  • o observável emite o elemento 1;
  • as linhas 15-18 são executadas e apresentam esse elemento;
  • o observável emite o elemento 2;
  • as linhas 15-18 são executadas e exibem esse elemento;
  • o observável emite o elemento 3;
  • as linhas 15-18 são executadas e apresentam esse elemento;
  • o observável emite a notificação [completed];
  • as linhas 24-29 são executadas;

É isto que mostram os resultados obtidos:

1
2
3
4
next : 1
next : 2
next : 3
completed

A classe [Exemple02] retoma a classe [Exemple01], utilizando desta vez funções lambda como parâmetros do método [Observable.subscribe]:


package dvp.rxjava.observables;

import java.util.Arrays;

import rx.Observable;

public class Exemple02 {
  public static void main(String[] args) {
    // observáveis de inteiros
    Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
    // assinatura
    obs1.subscribe(
      (integer) -> System.out.printf("next : %s%n", integer),
      (th) -> System.out.println(th),
      () -> System.out.println("completed"));
  }
}

7.1.2. Exemplo-03: a classe Observer

  

O método [Observable.subscribe], que permite subscrever um observável, tem várias versões, incluindo a seguinte:


package dvp.rxjava.observables;

import java.util.Arrays;

import rx.Observable;
import rx.Observer;

public class Exemple03 {
    public static void main(String[] args) {
        // observáveis de inteiros
        Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
        // assinatura
        obs1.subscribe(new Observer<Integer>() {
            @Override
            public void onCompleted() {
                System.out.println("completed");
            }

            @Override
            public void onError(Throwable th) {
                System.out.printf("throwable %s", th);
            }

            @Override
            public void onNext(Integer integer) {
                System.out.printf("next : %s%n", integer);
            }
        });
    };
}

Na linha 13, em vez de passar três parâmetros ao método [subscribe], passa-se-lhe um tipo [Observer], como se segue:

 

O tipo [Observer] é uma interface com três métodos:

  • [onNext(T t)], que é chamado sempre que o observável emite um elemento t;
  • [onError(Throwable th)], que é chamado quando o observável lança uma exceção th;
  • [onCompleted], que é chamado quando o observável indica que terminou de emitir;

O funcionamento do código é semelhante ao explicado anteriormente. Obtêm-se os seguintes resultados:

1
2
3
4
next : 1
next : 2
next : 3
completed

7.1.3. Exemplo-04: o método [Observable.create]

  

O método estático Observable.create é definido da seguinte forma:

 
  • o método [create] devolve um tipo Observable<T>;
  • o parâmetro do método [create] é uma função do tipo [Observable.OnSubscribe<T>] definida da seguinte forma:
 

O tipo [Observable.OnSubscribe<T>] é uma interface funcional que, por sua vez, estende a interface funcional [Action1<Subscriber<? super T>>]. O método [call] desta interface espera um tipo [Subscriber] (assinante, subscritor, observador) definido da seguinte forma:

 

Vê-se em [1] que a classe [Subscriber<T>] implementa a interface [Observer<T>] apresentada no parágrafo 7.1.2.

Por fim, o método [<T> Observable.create]:

  • espera como parâmetro uma instância do tipo [Observable.OnSubscribe<T>] com o único método de assinatura: void call(Subscriber<T> s). O tipo [Subscriber<T>] estende o tipo [Observer<T>] e, por conseguinte, dispõe dos métodos onNext, onError, onCompleted;
  • retorna um tipo Observable<T>;

O método [<T> Observable.create] devolve um observável configurado. Ainda não houve qualquer emissão de elementos. Quando um subscritor [Subscriber<T> s] subscreve este observável, é então chamado o método [void call(s)] da função passada como parâmetro do método [<T> Observable.create]. A sua função consiste em emitir elementos t do tipo T e em chamar o método [s.onNext(t)] do observador a cada emissão. Quando este estiver concluído, o método [s.onCompleted(t)] do observador deve ser chamado e o método [call] deve terminar. Se o método [call] encontrar uma exceção th, o método [s.onError(th)] do observador deve ser chamado e o método [call] deve terminar;

Para ilustrar este funcionamento complexo, utilizaremos o seguinte código [Exemple04]:


package dvp.rxjava.observables;

import rx.Observable;
import rx.Subscriber;

import java.util.Random;

public class Exemple04 {
    public static void main(String[] args) {
        // configuração observável de números reais
        Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
            @Override
            public void call(Subscriber<? super Double> subscriber) {
                for (int i = 0; i < 3; i++) {
                    // emissão do elemento i
                    subscriber.onNext(new Random((i + 1)).nextDouble());
                }
                // fim da emissão
                subscriber.onCompleted();
            }
        });
        // assinatura e, consequentemente, emissão
        obs1.subscribe((d) -> System.out.printf("onNext %s%n", d), (th) -> System.out.printf("onError %s%n", th),
                () -> System.out.println("onCompleted"));
    }
}
  • linha 11: cria-se um observável que emite tipos Double;
  • linhas 11-21: o parâmetro do método [create] é instanciado com uma classe anónima que possui o método único [call] das linhas 12-20. O observável criado na linha 11 está pronto para emitir, mas só o fará quando um observador for registado;
  • linhas 13-21: o método [call] recebe a referência de um observador;
  • linhas 14-17: emissão de 3 elementos para o observador;
  • linha 19: notificação de fim de transmissão ao observador;
  • linhas 23-24: subscrição do observável da linha 11. Implementam-se os três parâmetros [onNext, onError, onCompleted] do método [subscribe] através de três funções lambda. Esta subscrição irá criar o subscritor [Subscriber<Double>], que será passado para o método [call] da linha 13. A emissão de elementos terá então início;
  • tudo ocorre no mesmo thread: observável e observador;

Obtêm-se os seguintes resultados:

1
2
3
4
onNext 0.7308781907032909
onNext 0.7311469360199058
onNext 0.731057369148862
onCompleted

O método [Observable.create] permite criar um observável a partir de qualquer fenómeno. Foi este método que utilizámos no parágrafo 2 da secção «Descoberta», para transformar uma interface síncrona numa interface assíncrona.

7.1.4. Exemplo-05: refatoração de [Exemple-04]

  

O exemplo seguinte apresenta uma nova versão do método estático [Observable.subscribe]:


package dvp.rxjava.observables;

import rx.Observable;
import rx.Subscriber;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;

public class Exemple05 {
    public static void main(String[] args) {
        // configuração de um observável de números reais
        Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
            @Override
            public void call(Subscriber<? super Double> subscriber) {
                showInfos("Observable.call start");
                for (int i = 0; i < 3; i++) {
                    // espera
                    try {
                        Thread.sleep(500 - i * 100);
                    } catch (InterruptedException e) {
                        // erro
                        subscriber.onError(e);
                    }
                    // ação
                    double value = new Random().nextInt(100) * 1.2;
                    showInfos(String.format("Observable.call onNext(%s)", value));
                    subscriber.onNext(value);
                }
                // concluído
                showInfos(String.format("Observable.call onCompleted"));
                subscriber.onCompleted();
            }
        });

        // um subscritor
        Subscriber<Double> subscriber = new Subscriber<Double>() {
            @Override
            public void onCompleted() {
                showInfos("Subscriber.onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                showInfos(String.format("Subscriber.onError (%s)", e));
            }

            @Override
            public void onNext(Double aDouble) {
                showInfos(String.format("Subscriber.onNext (%s)", aDouble));
            }
        };

        // subscrição
        showInfos("avant souscription");
        obs1.subscribe(subscriber);
        showInfos("après souscription");

    }

    private static void showInfos(String message) {
        System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
                new SimpleDateFormat("ss:SSS").format(new Date()));
    }
}
  • linha 56: a nova versão do método estático [Observable.subscribe] aceita como parâmetro o tipo [Subscriber] que apresentámos no parágrafo anterior;
  • linhas 37-52: o subscritor (assinante, observador). Este implementa a interface Observer com os seus três métodos onNext, onError e onCompleted;
  • linhas 61-64: a partir de agora, vamos concentrar-nos nos threads em que o observável e o seu observador são executados;
  • linha 62: o nome do thread;
  • linha 63: a hora atual expressa em segundos e milissegundos. Isto vai permitir-nos acompanhar ao longo do tempo a emissão de elementos pelo observável e o seu processamento pelo observador;
  • este código tem a mesma funcionalidade que o código anterior. Simplesmente refatorámos este último;

Os resultados obtidos são os seguintes:

avant souscription ------Thread[main] ---- Time[31:685]
Observable.call start ------Thread[main] ---- Time[31:691]
Observable.call onNext(80.39999999999999) ------Thread[main] ---- Time[32:194]
Subscriber.onNext (80.39999999999999) ------Thread[main] ---- Time[32:195]
Observable.call onNext(73.2) ------Thread[main] ---- Time[32:595]
Subscriber.onNext (73.2) ------Thread[main] ---- Time[32:595]
Observable.call onNext(106.8) ------Thread[main] ---- Time[32:897]
Subscriber.onNext (106.8) ------Thread[main] ---- Time[32:897]
Observable.call onCompleted ------Thread[main] ---- Time[32:898]
Subscriber.onCompleted ------Thread[main] ---- Time[32:898]
après souscription ------Thread[main] ---- Time[32:899]
  • linha 1 dos resultados: antes da linha 56 do código, ainda não aconteceu nada. O observável foi simplesmente configurado;
  • linha 2 dos resultados: a linha 56 do código provoca a chamada do método [call] da linha 15. Na linha 3, o número real 80,39 é enviado para o observador;
  • linha 4: o observador recebe o número enviado;
  • linhas 5-8: o processo anterior repete-se duas vezes;
  • linha 9: o observável envia a notificação de fim de transmissão;
  • linha 10: o observador recebe-a;
  • linha 11: exibida pela linha 57 do código;

Vemos, portanto, que a única linha 56 de subscrição provocou a exibição das linhas 2 a 10 dos resultados. Quando se começa a utilizar a biblioteca RxJava, questiona-se como as coisas se encadeiam umas às outras e, em particular, as ligações que unem o observador e o observável. Vemos aqui que a linha 56, a subscrição do observável,

  • provocou a emissão de todos os elementos do observável;
  • que o observável e o observador são executados no mesmo thread;
  • que, por causa disso, observa-se a sequência: emissão do elemento i, observação do elemento i, emissão do elemento (i+1), observação do elemento (i+1), ...

Recorde-se que o emissor esperava antes de emitir os seus elementos:


                    // em espera
                    try {
                        Thread.sleep(500 - i * 100);
                    } catch (InterruptedException e) {
                        // erro
                        subscriber.onError(e);
}

onde i na linha 3 representa o número da emissão (0<=i<3). Se observarmos as horas de emissão dos elementos do observável:

  • linhas 2 e 3: o elemento 0 foi transmitido cerca de 500 ms após o início da subscrição;
  • linhas 3 e 5: o elemento 1 foi transmitido cerca de 400 ms após o elemento 0;
  • linhas 5, 7: o elemento 2 foi emitido cerca de 300 ms após o elemento 1;

7.2. Thread de execução, thread de observação

7.2.1. Exemplo-06: observável e observador num thread diferente de [main]

  

Reestruturamos o exemplo anterior da seguinte forma [Exemple06]:


package dvp.rxjava.observables;

import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;

public class Exemple06 {
    public static void main(String[] args) {

        // guarda-barreira
        CountDownLatch latch = new CountDownLatch(1);

        // configuração de um observável de valores reais
        Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
            @Override
            public void call(Subscriber<? super Double> subscriber) {
                showInfos("Observable.call start");
                for (int i = 0; i < 3; i++) {
                    // em espera
                    try {
                        Thread.sleep(500 - i * 100);
                    } catch (InterruptedException e) {
                        // erro
                        subscriber.onError(e);
                    }
                    // ação
                    double value = new Random().nextInt(100) * 1.2;
                    showInfos(String.format("Observable.call onNext(%s)", value));
                    subscriber.onNext(value);
                }
                // concluído
                showInfos(String.format("Observable.call onCompleted"));
                subscriber.onCompleted();
            }
        });

        // um subscritor
        Subscriber<Double> subscriber = new Subscriber<Double>() {
            @Override
            public void onCompleted() {
                showInfos("Subscriber.onCompleted");
                // baixar a barreira
                latch.countDown();
            }

            @Override
            public void onError(Throwable e) {
                showInfos(String.format("Subscriber.onError (%s)", e));
            }

            @Override
            public void onNext(Double aDouble) {
                showInfos(String.format("Subscriber.onNext (%s)", aDouble));
            }
        };

        // continuação da configuração observável
        obs1 = obs1.subscribeOn(Schedulers.computation());
        // subscrição
        showInfos("avant souscription");
        obs1.subscribe(subscriber);
        // espera em frente à barreira
        try {
            showInfos("début attente barrière");
            latch.await();
            showInfos("fin attente barrière");
        } catch (InterruptedException e1) {
            System.out.println(e1);
        }
        showInfos("après souscription");

    }

    private static void showInfos(String message) {
        System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
                new SimpleDateFormat("ss:SSS").format(new Date()));
    }
}
  • linha 16: criamos um semáforo com um objeto do tipo [CountDownLatch]. Este objeto serve para sincronizar threads entre si. Aqui, é inicializado com o valor 1, ao qual chamaremos de valor do semáforo. Uma thread entra em espera no semáforo através de uma operação:

latch.await();

A thread fica bloqueada se o valor do guard-barrier for >0. Uma thread pode aumentar ou diminuir o valor interno do guard-barrier. Na linha 48, o valor do guard-barrier é decrementado em 1.

  • Linha 63: o observável é configurado para ser executado num thread fornecido pelo agendador [Schedulers.computation()]. Este agendador pode fornecer tantos threads quantos forem os núcleos da máquina de execução. O parágrafo sobre a aplicação de exemplo demonstrou a utilização de outros agendadores (ver parágrafo 2.8);

O princípio do código é o seguinte:

  • o método [main] é executado na thread principal (main);
  • linha 66: inicia a emissão de elementos do observável. Estes serão emitidos numa thread diferente da thread principal;
  • linha 70: a thread principal fica bloqueada porque o bloqueador tem o valor 1 (ver linha 16). Só poderá continuar quando este valor passar para 0. Isto acontece na linha 48. É o observador que desce o bloqueador quando recebe a notificação de que o observável terminou as suas emissões;

A execução produz os seguintes resultados:

avant souscription ------Thread[main] ---- Time[09:268]
Observable.call start ------Thread[RxComputationThreadPool-1] ---- Time[09:278]
début attente barrière ------Thread[main] ---- Time[09:278]
Observable.call onNext(44.4) ------Thread[RxComputationThreadPool-1] ---- Time[09:783]
Subscriber.onNext (44.4) ------Thread[RxComputationThreadPool-1] ---- Time[09:783]
Observable.call onNext(18.0) ------Thread[RxComputationThreadPool-1] ---- Time[10:183]
Subscriber.onNext (18.0) ------Thread[RxComputationThreadPool-1] ---- Time[10:184]
Observable.call onNext(54.0) ------Thread[RxComputationThreadPool-1] ---- Time[10:486]
Subscriber.onNext (54.0) ------Thread[RxComputationThreadPool-1] ---- Time[10:488]
Observable.call onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[10:489]
Subscriber.onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[10:490]
fin attente barrière ------Thread[main] ---- Time[10:491]
après souscription ------Thread[main] ---- Time[10:493]
  • linha 1: a subscrição vai ocorrer;
  • linha 2: esta subscrição desencadeia a execução do método [call] na thread [RxComputationThreadPool-1]. Temos agora uma execução paralela com duas threads;
  • linha 3: por uma razão ainda não esclarecida, a thread [RxComputationThreadPool-1] cedeu o controlo. A thread [main] assume então o controlo e fica bloqueada pelo bloqueador (linha 70 do código). A partir deste momento, apenas a thread [RxComputationThreadPool-1] pode operar;
  • linhas 4-11: observa-se o comportamento anteriormente observado entre o observável e o seu observador, mas agora tudo decorre no thread [RxComputationThreadPool-1];
  • linhas 12-13: o observador baixou a barreira (linha 48 do código) e o thread [RxComputationThreadPool-1] terminou. O thread [main] assume o controlo e apresenta duas mensagens;

7.2.2. Exemplo-07: observável e observador em dois threads diferentes

  

Alteramos o exemplo anterior da seguinte forma:


package dvp.rxjava.observables;

import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;

public class Exemple07 {
    public static void main(String[] args) {

        // guarda da barreira
        CountDownLatch latch = new CountDownLatch(1);

        // configuração de um observável de valores reais
        Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
            @Override
            public void call(Subscriber<? super Double> subscriber) {
                showInfos("Observable.call start");
                for (int i = 0; i < 3; i++) {
                    // espera
                    try {
                        Thread.sleep(500 - i * 100);
                    } catch (InterruptedException e) {
                        // erro
                        subscriber.onError(e);
                    }
                    // ação
                    double value = new Random().nextInt(100) * 1.2;
                    showInfos(String.format("Observable.call onNext(%s)", value));
                    subscriber.onNext(value);
                }
                // concluído
                showInfos(String.format("Observable.call onCompleted"));
                subscriber.onCompleted();
            }
        });

        // um subscritor
        Subscriber<Double> subscriber = new Subscriber<Double>() {
            @Override
            public void onCompleted() {
                showInfos("Subscriber.onCompleted");
                // baixar a barreira
                latch.countDown();
            }

            @Override
            public void onError(Throwable e) {
                showInfos(String.format("Subscriber.onError (%s)", e));
            }

            @Override
            public void onNext(Double aDouble) {
                showInfos(String.format("Subscriber.onNext (%s)", aDouble));
            }
        };

        // continuação da configuração observável
        obs1 = obs1.subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation());
        // subscrição
        showInfos("avant souscription");
        obs1.subscribe(subscriber);
        // à espera de que a barreira suba
        try {
            showInfos("début attente barrière");
            latch.await();
            showInfos("fin attente barrière");
        } catch (InterruptedException e1) {
            System.out.println(e1);
        }
        showInfos("après souscription");

    }

    private static void showInfos(String message) {
        System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
                new SimpleDateFormat("ss:SSS").format(new Date()));
    }
}

O código é idêntico ao do exemplo anterior, exceto na linha 63:


obs1 = obs1.subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation());

que configura o observável (subscribeOn) e o observador (observeOn) para serem executados numa das threads fornecidas pelo agendador [Schedulers.computation()].

Os resultados obtidos são os seguintes:

avant souscription ------Thread[main] ---- Time[09:643]
début attente barrière ------Thread[main] ---- Time[09:656]
Observable.call start ------Thread[RxComputationThreadPool-4] ---- Time[09:656]
Observable.call onNext(39.6) ------Thread[RxComputationThreadPool-4] ---- Time[10:162]
Subscriber.onNext (39.6) ------Thread[RxComputationThreadPool-3] ---- Time[10:163]
Observable.call onNext(98.39999999999999) ------Thread[RxComputationThreadPool-4] ---- Time[10:562]
Subscriber.onNext (98.39999999999999) ------Thread[RxComputationThreadPool-3] ---- Time[10:564]
Observable.call onNext(46.8) ------Thread[RxComputationThreadPool-4] ---- Time[10:864]
Observable.call onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[10:866]
Subscriber.onNext (46.8) ------Thread[RxComputationThreadPool-3] ---- Time[10:866]
Subscriber.onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[10:868]
fin attente barrière ------Thread[main] ---- Time[10:869]
après souscription ------Thread[main] ---- Time[10:870]

É possível observar os seguintes pontos:

  • o observável é executado na thread [RxComputationThreadPool-4] (linhas 3-4, 6, 8-9);
  • o observador é executado no thread [RxComputationThreadPool-3] (linhas 5, 7, 10-11);
  • que ambos se executam de forma autónoma. Assim, nas linhas 8-9, o observável emite duas notificações (onNext, onCompleted) antes de o observador recuperar a notificação [onNext] (linha 10);

A biblioteca RxJava encarrega-se da transferência de dados (as emissões) da thread do observável para a thread do observador. O programador não precisa de se preocupar com isso.

Já vimos como criar observáveis (Observable.from, Observable.create). Vamos agora ver os observáveis predefinidos da biblioteca RxJava.

7.3. Observáveis predefinidos

7.3.1. Exemplo-08: o método [Observable.range]

 

A partir de agora, vamos utilizar classes dedicadas aos processos observados e aos seus observadores. A ideia é poder registar o seu nome, o seu thread de execução e as horas de execução, de modo a poder acompanhá-los ao longo do tempo.

A classe [Process] será simplesmente um Observable ao qual se pode atribuir um nome. Ela implementará a seguinte interface [IProcess]:


package dvp.rxjava.observables.utils;

import rx.Observable;

public interface IProcess<T> {

    // nome do observável
    public String getName();

    // observável
    public Observable<T> getObservable();

}

Esta interface poderá ser implementada pela seguinte classe [Process<T>]:


package dvp.rxjava.observables.utils;

import rx.Observable;
import rx.Scheduler;

public class Process<T> implements IProcess<T>{

    // nome da variável observável
    protected String name;
    // processo observado
    protected Observable<T> observable;

    // construtores
    public Process(String name, Observable<T> observable) {
        // inicializações locais
        this.name = name;
        this.observable = observable;
    }

    // getters e setters
    public String getName() {
        return name;
    }

    public Observable<T> getObservable() {
        return observable;
    }

}
  • linha 9: o nome do processo;
  • linha 11: a observável observada;
  • linhas 14-18: o construtor;

O observador será, por sua vez, descrito pela seguinte classe [Observateur]:


package dvp.rxjava.observables.utils;

import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import rx.Subscriber;

public class Observateur<T> extends Subscriber<T> {

...
}
  • na linha 11, a classe Observateur<T> estende a classe Subscriber<T> que apresentámos brevemente no parágrafo 7.1.3. Iremos utilizá-la como argumento do método [Observable.subscribe]:

// execução observável (observação)
obs1.subscribe(observateur);

O método [Observable.subscribe] utilizado na linha 2 acima tem a seguinte definição:

 

A função do [Subscriber] consiste principalmente em gerir os elementos emitidos pelo observável ao qual se subscreveu, através dos métodos da interface [Observer]: onNext, onError, onCompleted. A classe [Subscriber] possui os seguintes métodos:

 

No código da classe [Observateur], utilizaremos o método [1] isUnsubscribed para verificar se a subscrição do subscritor foi cancelada ou não. A classe [Observateur<T>] completa é a seguinte:


package dvp.rxjava.observables.utils;

import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import rx.Subscriber;

public class Observateur<T> extends Subscriber<T> {

    // um semáforo
    private CountDownLatch latch;
    // um método de visualização
    private Consumer<String> showInfos;
    // o nome do observador
    private String observerName;
    // o nome do processo observado
    private String processName;

    // construtores
    public Observateur() {

    }

    public Observateur(String name, CountDownLatch latch, Consumer<String> showInfos, String observedName) {
        this.observerName = name;
        this.latch = latch;
        this.showInfos = showInfos;
        this.processName = observedName;
    }

    // --------------------------- implementação da interface Observer<T>
    @Override
    public void onCompleted() {
        // fim das transmissões
        if (!isUnsubscribed()) {
            showInfos.accept(String.format("Subscriber [%s,%s].onCompleted", observerName, processName));
        }
        // fim do bloqueio do thread principal
        latch.countDown();
    }

    @Override
    public void onError(Throwable e) {
        // erro de emissão
        if (!isUnsubscribed()) {
            showInfos.accept(String.format("Subscriber [%s, %s].onError (%s)", observerName, processName, e));
        }
    }

    @Override
    public void onNext(T value) {
        // uma emissão adicional
        if (!isUnsubscribed()) {
            try {
                showInfos.accept(String.format("Subscriber [%s,%s] : onNext (%s)", observerName, processName,
                        new ObjectMapper().writeValueAsString(value)));
            } catch (JsonProcessingException e) {
                showInfos.accept(String.format("Subscriber [%s,%s].onNext (%s)", observerName, processName, e));
            }
        }
    }
}
  • Para além das características de um Subscriber, o observador Observateur incluirá as seguintes informações:
    • linha 14: um guard-barrier ou semáforo que servirá para bloquear o thread principal até que o observador tenha recebido todos os elementos emitidos pelo observável. Isto ocorrerá na linha 36 do código, quando o observador receber do observável a notificação de fim de emissão;
    • linha 16: uma instância Consumer<String> que servirá para exibir uma mensagem na consola;
    • linha 18: o nome do observador, para os distinguir uns dos outros quando houver vários;
    • linha 20: o nome do processo observado;
  • linhas 36, 46, 54: os métodos [onCompleted, onError, onNext] da interface [Observer<T>] implementada pela classe abstrata [Subscriber<T>]. Esta classe não os implementa. Por isso, é necessário fazê-lo nas suas classes filhas. Antes de realizar qualquer ação nestes métodos, verifica-se se o observador não foi desabonado do observável que está a observar;
  • linha 59: o método [onNext] do observador escreve a cadeia jSON do elemento recebido. Isto permitir-nos-á apresentar vários tipos de elementos;

Dito isto, vamos analisar um novo método da classe Observable, o método [range]:

 

O observável Observable.range(n,m) emite (m) inteiros que vão de n a n+m-1. Analisamo-lo com o código [Exemple08] seguinte:


package dvp.rxjava.observables.exemples;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;

import dvp.rxjava.observables.utils.Observateur;
import rx.Observable;
import rx.schedulers.Schedulers;

public class Exemple08 {
    public static void main(String[] args) throws InterruptedException {

        // número de observadores
        final int nbObservateurs = 2;

        // semáforo
        CountDownLatch latch = new CountDownLatch(nbObservateurs);

        // configuração observável
        Observable<Integer> obs1 = Observable.range(15, 3).subscribeOn(Schedulers.computation());
        // execução observável (observação)
        showInfos.accept("main : début observation");
        for (int i = 0; i < nbObservateurs; i++) {
            obs1.subscribe(new Observateur<>(String.format("observateur[%d]", i), latch, showInfos,"obs1"));
        }
        // espera
        showInfos.accept("main : attente fin observation");
        latch.await();
        // fim
        showInfos.accept("main : fin observation");
    }

    // visualizações
    static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
            Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
  • linha 16: vamos utilizar dois observadores;
  • linha 19: o semáforo é inicializado com o valor 2, porque vamos colocar cada observador num thread diferente. O thread principal terá, portanto, de aguardar a conclusão dos dois threads de observação;
  • linha 22: configuramos o observável de forma a que seja executado num thread do agendador [Schedulers.computation()]. O observador estará no mesmo thread que o observável;
  • linhas 25-27: subscrevemos dois observadores ao observável. Isto irá desencadear a execução completa deste para cada um dos observadores: os números inteiros 15, 16 e 17 serão emitidos;
  • linha 30: o thread principal aguarda a conclusão dos observadores;

Os resultados obtidos são os seguintes:

main : début observation ------Thread[main] ---- Time[27:875]
main : attente fin observation ------Thread[main] ---- Time[27:893]
Subscriber[observateur[1],obs1] : onNext (15) ------Thread[RxComputationThreadPool-2] ---- Time[28:245]
Subscriber[observateur[0],obs1] : onNext (15) ------Thread[RxComputationThreadPool-1] ---- Time[28:245]
Subscriber[observateur[1],obs1] : onNext (16) ------Thread[RxComputationThreadPool-2] ---- Time[28:247]
Subscriber[observateur[0],obs1] : onNext (16) ------Thread[RxComputationThreadPool-1] ---- Time[28:248]
Subscriber[observateur[1],obs1] : onNext (17) ------Thread[RxComputationThreadPool-2] ---- Time[28:249]
Subscriber[observateur[1],obs1].onCompleted ------Thread[RxComputationThreadPool-2] ---- Time[28:250]
Subscriber[observateur[0],obs1] : onNext (17) ------Thread[RxComputationThreadPool-1] ---- Time[28:251]
Subscriber[observateur[0],obs1].onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[28:252]
main : fin observation ------Thread[main] ---- Time[28:252]
  • linha 2: o thread principal fica bloqueado, à espera que os dois observadores terminem;
  • linhas 3-4: verifica-se que o observador 0 está no thread [RxComputationThreadPool-1] e o observador 1 no thread [RxComputationThreadPool-2];
  • linhas 3-10: verifica-se que os dois observadores recebem exatamente os mesmos elementos;

Vamos utilizar a classe Observateur assim definida para ilustrar o comportamento de outros tipos de observáveis.

7.3.2. Exemplo-09: os métodos Observable.[interval, take, doNext]

  
 

Este exemplo ilustra a utilização do observável Observable.interval (intervalo longo, unidade TimeUnit), que emite inteiros longos a intervalos de tempo regulares. É importante referir o ponto [1]: por predefinição, o observável [Observable.interval] é executado numa das threads do agendador [Schedulers.computation].

O código será o seguinte:


package dvp.rxjava.observables.exemples;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import dvp.rxjava.observables.utils.Observateur;
import rx.Observable;

public class Exemple09 {
    public static void main(String[] args) throws InterruptedException {

        // número de observadores
        final int nbObservateurs = 2;

        // semáforo
        CountDownLatch latch = new CountDownLatch(nbObservateurs);

        // configuração observável
        Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
                .doOnNext(l -> showInfos.accept(l.toString()));
        // execução observável (observação)
        showInfos.accept("main : début observation");
        for (int i = 0; i < nbObservateurs; i++) {
            obs1.subscribe(new Observateur<>(String.format("observateur [%d]", i), latch, showInfos,
                    "obs1"));
        }
        // espera
        showInfos.accept("main : attente fin observation");
        latch.await();
        // fim
        showInfos.accept("main : fin observation");
    }

    // visualizações
    static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
            Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
  • linha 22: o observável emite inteiros longos a cada 500 milissegundos. A série começa com o número 0;
  • linha 22: este observável emite um número infinito de valores. O método [Observable.take(n)] cria um novo observável que conserva apenas os primeiros n elementos emitidos;
 

Voltemos ao código do observável:


Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.doOnNext(l -> showInfos.accept(l.toString()));

Na linha 2, o método [Observable.doOnNext] é executado sempre que o observável emite um novo elemento. Isto é frequentemente utilizado para registar informações. Neste caso, pretendemos registar a data de emissão dos elementos para verificar se o intervalo de 500 milissegundos está a ser cumprido. O método [Observable.doOnNext] não altera o observável ao qual se aplica. A sua definição é a seguinte:

 

A execução produz os seguintes resultados:

main : début observation ------Thread[main] ---- Time[55:892]
main : attente fin observation ------Thread[main] ---- Time[55:911]
0 ------Thread[RxComputationThreadPool-1] ---- Time[56:412]
0 ------Thread[RxComputationThreadPool-2] ---- Time[56:413]
Subscriber[observateur [1],obs1] : onNext (0) ------Thread[RxComputationThreadPool-2] ---- Time[56:723]
Subscriber[observateur [0],obs1] : onNext (0) ------Thread[RxComputationThreadPool-1] ---- Time[56:723]
1 ------Thread[RxComputationThreadPool-1] ---- Time[56:906]
Subscriber[observateur [0],obs1] : onNext (1) ------Thread[RxComputationThreadPool-1] ---- Time[56:908]
1 ------Thread[RxComputationThreadPool-2] ---- Time[56:912]
Subscriber[observateur [1],obs1] : onNext (1) ------Thread[RxComputationThreadPool-2] ---- Time[56:914]
2 ------Thread[RxComputationThreadPool-1] ---- Time[57:405]
Subscriber[observateur [0],obs1] : onNext (2) ------Thread[RxComputationThreadPool-1] ---- Time[57:407]
Subscriber[observateur [0],obs1].onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[57:408]
2 ------Thread[RxComputationThreadPool-2] ---- Time[57:412]
Subscriber[observateur [1],obs1] : onNext (2) ------Thread[RxComputationThreadPool-2] ---- Time[57:414]
Subscriber[observateur [1],obs1].onCompleted ------Thread[RxComputationThreadPool-2] ---- Time[57:415]
main : fin observation ------Thread[main] ---- Time[57:416]
  • linhas 3, 7 e 11: verifica-se que, aproximadamente, o intervalo de emissão é próximo de 500 ms;
  • Os dois observadores estão, evidentemente, em dois threads diferentes, apesar de o observável não ter sido configurado para ser executado com um agendador específico. Trata-se do funcionamento por predefinição do observável [Observable.interval] que vemos aqui;

7.3.3. Exemplos-10/12: os métodos Observable.[error, empty, never]

 

A partir de agora, seremos mais concisos nas nossas ilustrações dos métodos da classe [Observable]. O código anterior era o seguinte:


package dvp.rxjava.observables;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import rx.Observable;

public class Exemple09 {
    public static void main(String[] args) throws InterruptedException {

        // número de observadores
        final int nbObservateurs = 2;

        // semáforo
        CountDownLatch latch = new CountDownLatch(nbObservateurs);

        // configuração observável
        Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
                .doOnNext(l -> showInfos.accept(l.toString()));
        // execução observável (observação)
        showInfos.accept("main : début observation");
        for (int i = 0; i < nbObservateurs; i++) {
            obs1.subscribe(new Observateur<>(String.format("observateur [%d]", i), latch, showInfos,
                    "obs1"));
        }
        // espera
        showInfos.accept("main : attente fin observation");
        latch.await();
        // fim
        showInfos.accept("main : fin observation");
    }

    // visualizações
    static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
            Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}

Este código já tinha sido utilizado no exemplo anterior. Apenas as linhas 21 e 22 mudavam. Vamos, portanto, factorizar a maior parte deste código na seguinte classe [ProcessUtils]:


package dvp.rxjava.observables.utils;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;

import rx.Observable;

public class ProcessUtils {

    @SafeVarargs
    public static void subscribe(int nbObservateurs, IProcess<?>... processes) throws InterruptedException {

        // semáforo
        CountDownLatch latch = new CountDownLatch(nbObservateurs * processes.length);

        // execução observável (observação)
        showInfos.accept("main : début observation");
        for (int i = 0; i < nbObservateurs; i++) {
            for (IProcess<?> process : processes) {
                Observable<?> obs = process.getObservable();
                obs.subscribe(new Observateur<>(String.format("observateur[%d]", i), latch, showInfos, process.getName()));
            }
        }
        // espera
        showInfos.accept("main : attente fin observation");
        latch.await();
        // fim
        showInfos.accept("main : fin observation");
    }

    // visualizações
    static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
            Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
  • linha 13: o método aceita dois parâmetros:
    • nbObservateurs: o número de observadores dos processos passados como segundo parâmetro;
    • processes: os processos (observáveis nomeados) a observar. Graças à notação [IProcess<?>], os processos poderão emitir elementos de tipos diferentes;
  • linha 16: o semáforo deve passar para verde quando todos os observadores tiverem concluído todas as suas observações. O valor inicial do semáforo é, portanto, o número de observadores multiplicado pelo número de observações;
  • linhas 20-25: cada observador é subscrito a todos os processos que deve observar;
  • linha 23: recupera-se o observável a partir do processo (ver parágrafo 7.3.1);
  • linha 23: subscreve-se um observador a essa observável. São transmitidas a este observador quatro informações:
    • o seu nome;
    • o semáforo que deve decrementar quando receber a notificação de fim de emissão da variável observável que está a observar;
    • o método a utilizar quando quiser registar informações na consola;
    • o nome do processo que vai observar;

Uma vez definidas estas classes, o exemplo 10 será o seguinte:


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;

public class Exemple10 {
    public static void main(String[] args) throws InterruptedException {
        // configuração observável
        Observable<?> obs = Observable.error(new RuntimeException("Erreur !!!")).subscribeOn(Schedulers.computation());
        // execução (observação) observável
        ProcessUtils.subscribe(2,new Process<>("process1", obs));
    }
}

Na linha 11, o método estático [Observable.error] é definido da seguinte forma:

 

A linha 8 configura, portanto, um observável que se limita a lançar uma exceção para o método [onError] dos seus subscritores. A execução produz os seguintes resultados:


main : début observation ------Thread[main] ---- Time[22:618]
main : attente fin observation ------Thread[main] ---- Time[22:636]
Subscriber[observateur[1], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-2] ---- Time[22:638]
Subscriber[observateur[0], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-1] ---- Time[22:638]

Nas linhas 3 e 4, o método [onError] dos dois subscritores recebeu a exceção lançada pelo observável.

Esta execução tem uma particularidade: os métodos [onCompleted] dos dois observadores não foram chamados. Consequentemente, a barreira não foi baixada e o thread principal permanece bloqueado no método estático [ProcessUtils.subscribe] na linha 3 seguinte:


// em espera
showInfos.accept("main : attente fin observation");
latch.await();
// fim
showInfos.accept("main : fin observation");

Verifica-se aqui que, em caso de erro do observável, o método [onCompleted] dos subscritores não é chamado. Modificamos, então, o método [Observateur.onError] da seguinte forma:


    @Override
    public void onError(Throwable e) {
        // erro de emissão
        if (!isUnsubscribed()) {
            showInfos.accept(String.format("Subscriber[%s, %s].onError (%s)", observerName, processName, e));
        }
        // fim do bloqueio do thread principal
        latch.countDown();
}

Adicionamos as linhas 7-8 para eliminar a barreira em caso de erro do observável. Com este novo código, a execução produz os seguintes resultados:


main : début observation ------Thread[main] ---- Time[40:750]
main : attente fin observation ------Thread[main] ---- Time[40:764]
Subscriber[observateur[0], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-1] ---- Time[40:766]
Subscriber[observateur[1], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-2] ---- Time[40:766]
main : fin observation ------Thread[main] ---- Time[40:767]

Obtenemos a linha 5, que não tínhamos obtido anteriormente.

O exemplo 11 será o seguinte:


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;

public class Exemple11 {
    public static void main(String[] args) throws InterruptedException {
        // configuração observável
        Observable<?> obs1 = Observable.empty();
        // execução (observação) observável
        ProcessUtils.subscribe(2,new Process<>("process1",obs1));
    }
}

Na linha 10, o método estático [Observable.empty] cria um observável que não emite nenhum elemento. Este emite apenas a notificação de fim de emissão;

 

A execução do código do exemplo acima produz os seguintes resultados:

1
2
3
4
5
main : début observation ------Thread[main] ---- Time[37:073]
Subscriber[observateur[0],process1].onCompleted ------Thread[main] ---- Time[37:086]
Subscriber[observateur[1],process1].onCompleted ------Thread[main] ---- Time[37:086]
main : attente fin observation ------Thread[main] ---- Time[37:087]
main : fin observation ------Thread[main] ---- Time[37:087]
  • linhas 2 e 3: verifica-se que os dois observadores recebem a notificação de fim de emissão sem terem recebido quaisquer elementos anteriormente.

Podemos questionar-nos sobre qual será a utilidade deste método. Podemos utilizá-lo de forma análoga a uma coleção, inicialmente vazia, na qual se acumulam posteriormente elementos:

1
2
3
4
Observable obs=Observable.empty() ;
for(Observable o : observables){
    obs=obs.mergeWith(o) ;
}

Na linha 3, funde-se o observável inicial obs (linha 1) com outros observáveis.

O exemplo 12 ilustra o método estático [Observable.never]:


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;

public class Exemple12 {
    public static void main(String[] args) throws InterruptedException {
        // configuração observável
        Observable<?> obs1 = Observable.never();
        // execução (observação) observável
        ProcessUtils.subscribe(2,new Process<>("process1",obs1));
    }
}

O método estático [Observable.never] cria um observável que nunca emite:

 

A execução do exemplo produz os seguintes resultados:

main : début observation ------Thread[main] ---- Time[27:018]
main : attente fin observation ------Thread[main] ---- Time[27:030]

Na linha 2, o thread principal fica à espera indefinidamente. Com efeito, nenhum observável emite a notificação [onCompleted] que permite que o semáforo (barreira) passe para verde (baixar a barreira).

7.4. Multi-threading

7.4.1. Exemplo 13: thread de ação, thread de observação

No parágrafo 7.1.3, criámos um observável com o método estático [Observable.create]:

 
  • o método [create] devolve um tipo Observable<T>;
  • o parâmetro do método [create] é uma função do tipo [Observable.OnSubscribe<T>] definida da seguinte forma:
 

O tipo [Observable.OnSubscribe<T>] é uma interface funcional que, por sua vez, estende a interface funcional [Action1<Subscriber<? super T>>]. O método [call] desta interface espera um tipo [Subscriber] (assinante, subscritor, observador). No restante deste documento, por vezes referir-nos-emos ao tipo [Observable.OnSubscribe<T>] como uma ação. Vamos criar ações personalizadas que terão um nome. Serão instâncias da seguinte interface [IProcessAction]:

  

package dvp.rxjava.observables.utils;

import rx.Observable;

public interface IProcessAction<T> extends Observable.OnSubscribe<T> {

    // a ação tem um nome
    public String getName();
}
  • linha 5: a interface [IProcessAction<T>] possui todas as características da interface [Observable.OnSubscribe<T>];
  • linha 8: além disso, possui um método [getName] que devolve o nome da instância que implementa a interface;

Vamos utilizar a seguinte ação denominada [ProcessAction01]:


package dvp.rxjava.observables.utils;

import java.util.Random;

import rx.Subscriber;
import rx.functions.Func1;

public class ProcessAction01<T> implements IProcessAction<T> {

    // dados
    private String name;
    private int nbValues;
    private Func1<Integer, T> func1;

    // construtores
    public ProcessAction01(String name, int nbValues, Func1<Integer, T> func1) {
        this.name = name;
        this.nbValues = nbValues;
        this.func1 = func1;
    }

    @Override
    public void call(Subscriber<? super T> subscriber) {
        ProcessUtils.showInfos.accept(String.format("Observable (%s) call start", getName()));
        for (int i = 0; i < nbValues; i++) {
            // espera
            try {
                Thread.sleep(new Random().nextInt(500));
            } catch (InterruptedException e) {
                // erro
                ProcessUtils.showInfos.accept(String.format("Observable (%s) onError", getName()));
                subscriber.onError(e);
            }
            // emissão de um elemento
            T value = func1.call(i);
            ProcessUtils.showInfos.accept(String.format("Observable (%s,%s) onNext (%s)", getName(), i, value));
            subscriber.onNext(value);
        }
        // concluído
        ProcessUtils.showInfos.accept(String.format("Observable (%s) onCompleted", getName()));
        subscriber.onCompleted();
    }

    @Override
    public String getName() {
        return name;
    }

}
  • linha 8: a classe [ProcessAction01<T>] implementa a interface [IProcessAction<T>] e, por conseguinte, a interface [Observable.OnSubscribe<T>];
  • linha 11: o nome da ação;
  • linha 12: o número de valores a emitir;
  • linha 13: uma instância do tipo [Func1<Integer, T>] que, a partir de um inteiro, cria um tipo T que será emitido pelo observável (linhas 35 e 37);
  • linhas 16-20: passam-se ao construtor o nome da ação, o número de valores a emitir e a função de emissão;
  • linhas 23-42: o código do processo;
  • linha 23: o método [call] recebe como parâmetro o subscritor do observável associado ao processo;
  • linha 28: o processo emite os seus elementos após uma espera de duração aleatória;
  • linha 32: a emissão de um erro;
  • linha 37: uma emissão normal;
  • linha 41: emissão da notificação de fim de emissão;
  • linhas 25-38: a ação emite valores reais nbValues após um tempo de espera aleatório (linha 30);
  • linha 35: o valor a emitir é fornecido pela função [func1] passada como parâmetro ao construtor (linha 16);

Reestruturamos a classe [Process] (ver parágrafo 7.3.1) para que possa ser instanciada também com uma ação nomeada. Adicionamos-lhe o seguinte construtor:


public Process(IProcessAction<T> na, Scheduler schedulerObserved, Scheduler schedulerObserver) {
        // nome do processo=nome da ação
        name = na.getName();
        // ação --> observável
        observable = Observable.create(na);
        // thread de execução do processo observado
        if (schedulerObserved != null) {
            observable = observable.subscribeOn(schedulerObserved);
        }
        // thread de observação do observador
        if (schedulerObserver != null) {
            observable = observable.observeOn(schedulerObserver);
        }
    }
  • na linha 1, o construtor aceita 3 parâmetros:
    1. a ação nomeada que servirá para construir o observável (linha 5);
    2. o agendador do processo observado (pode ser null);
    3. o agendador do observador (pode ser null);
  • linha 5: o observável é criado a partir da ação passada como parâmetro;

O código seguinte, [Exemple13], observa diferentes observáveis:


package dvp.rxjava.observables.exemples;

import java.util.Random;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;

public class Exemple13 {
    public static void main(String[] args) throws InterruptedException {
        // processo 1
        Process<Double> process1 = new Process<>(
                new ProcessAction01<Double>("process1", 1, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
                Schedulers.computation());
        // processo 2
        Process<String> process2 = new Process<>(
                new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), Schedulers.computation(), null);
        // processo 3
        Process<Integer> process3 = new Process<>(new ProcessAction01<Integer>("process3", 3, i -> i * 2), null,
                Schedulers.computation());
        // processo 4
        Process<Boolean> process4 = new Process<>(new ProcessAction01<Boolean>("process4", 4, i -> i % 2 == 0), null, null);
        // subscrições
        ProcessUtils.subscribe(1, process1);
        ProcessUtils.subscribe(1, process2);
        ProcessUtils.subscribe(1, process3);
        ProcessUtils.subscribe(1, process4);
    }
}
  • linhas 13-15: o processo process1 produz 1 número real num thread de cálculo, que será observado noutro thread de cálculo;
  • linhas 17-18: o processo process2 produz 2 cadeias de caracteres num thread de cálculo e não é fornecida qualquer indicação sobre o thread do observador. Os resultados mostram que a observação é feita, por predefinição, no mesmo thread em que o processo é executado;
  • linhas 20-21: o processo process3 produz 3 números inteiros num thread não especificado, que serão observados num thread de cálculo. Os resultados mostram que a execução do processo ocorre, por predefinição, no thread principal;
  • linha 23: o processo process4 produz 4 valores booleanos num thread não imposto, que serão observados num thread não imposto. Os resultados mostram que a execução do processo e a sua observação ocorrem, por predefinição, no thread principal;

O resultado da execução deste código é o seguinte:

main : début observation ------Thread[main] ---- Time[18:642]
main : attente fin observation ------Thread[main] ---- Time[18:660]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[18:660]
Observable (process1,0) onNext (68.39999999999999) ------Thread[RxComputationThreadPool-4] ---- Time[19:093]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[19:094]
Subscriber[observateur[0],process1] : onNext (68.39999999999999) ------Thread[RxComputationThreadPool-3] ---- Time[19:396]
Subscriber[observateur[0],process1].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[19:397]
main : fin observation ------Thread[main] ---- Time[19:397]
main : début observation ------Thread[main] ---- Time[19:398]
main : attente fin observation ------Thread[main] ---- Time[19:399]
Observable (process2) call start ------Thread[RxComputationThreadPool-5] ---- Time[19:399]
Observable (process2,0) onNext (valeur-0) ------Thread[RxComputationThreadPool-5] ---- Time[19:630]
Subscriber[observateur[0],process2] : onNext ("valeur-0") ------Thread[RxComputationThreadPool-5] ---- Time[19:631]
Observable (process2,1) onNext (valeur-1) ------Thread[RxComputationThreadPool-5] ---- Time[20:094]
Subscriber[observateur[0],process2] : onNext ("valeur-1") ------Thread[RxComputationThreadPool-5] ---- Time[20:095]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-5] ---- Time[20:096]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-5] ---- Time[20:096]
main : fin observation ------Thread[main] ---- Time[20:097]
main : début observation ------Thread[main] ---- Time[20:097]
Observable (process3) call start ------Thread[main] ---- Time[20:098]
Observable (process3,0) onNext (0) ------Thread[main] ---- Time[20:188]
Subscriber[observateur[0],process3] : onNext (0) ------Thread[RxComputationThreadPool-6] ---- Time[20:213]
Observable (process3,1) onNext (2) ------Thread[main] ---- Time[20:336]
Subscriber[observateur[0],process3] : onNext (2) ------Thread[RxComputationThreadPool-6] ---- Time[20:338]
Observable (process3,2) onNext (4) ------Thread[main] ---- Time[20:676]
Observable (process3) onCompleted ------Thread[main] ---- Time[20:677]
main : attente fin observation ------Thread[main] ---- Time[20:677]
Subscriber[observateur[0],process3] : onNext (4) ------Thread[RxComputationThreadPool-6] ---- Time[20:678]
Subscriber[observateur[0],process3].onCompleted ------Thread[RxComputationThreadPool-6] ---- Time[20:679]
main : fin observation ------Thread[main] ---- Time[20:679]
main : début observation ------Thread[main] ---- Time[20:680]
Observable (process4) call start ------Thread[main] ---- Time[20:680]
Observable (process4,0) onNext (true) ------Thread[main] ---- Time[21:065]
Subscriber[observateur[0],process4] : onNext (true) ------Thread[main] ---- Time[21:067]
Observable (process4,1) onNext (false) ------Thread[main] ---- Time[21:187]
Subscriber[observateur[0],process4] : onNext (false) ------Thread[main] ---- Time[21:188]
Observable (process4,2) onNext (true) ------Thread[main] ---- Time[21:624]
Subscriber[observateur[0],process4] : onNext (true) ------Thread[main] ---- Time[21:625]
Observable (process4,3) onNext (false) ------Thread[main] ---- Time[21:765]
Subscriber[observateur[0],process4] : onNext (false) ------Thread[main] ---- Time[21:766]
Observable (process4) onCompleted ------Thread[main] ---- Time[21:767]
Subscriber[observateur[0],process4].onCompleted ------Thread[main] ---- Time[21:767]
main : attente fin observation ------Thread[main] ---- Time[21:767]
main : fin observation ------Thread[main] ---- Time[21:768]
  • o processo process1 produz 1 número real (linha 4) na thread de cálculo [RxComputationThreadPool-4], que é observado na thread de cálculo [RxComputationThreadPool-3] (linha 6);
  • o processo process2 produz 2 cadeias de caracteres (linhas 12, 14) no thread de cálculo [RxComputationThreadPool-5], que são observadas nesse mesmo thread (linhas 13, 15);
  • o processo process3 produz 3 números inteiros (linhas 21, 23, 25) no thread principal, que são observados no thread de cálculo [RxComputationThreadPool-6] (linhas 22, 24, 28);
  • o processo process4 gera 4 valores booleanos (linhas 34, 36, 38, 40) no thread principal, que são observados nesse mesmo thread principal (linhas 33, 35, 37, 39);

Convida-se o leitor a acompanhar o que se segue:

  • o ciclo de vida do processo observado e da sua thread;
  • o ciclo de vida do seu observador e da sua thread;

Grande parte do interesse das bibliotecas Rx reside neste multithreading que o programador não tem de gerir por si próprio.

7.5. Combinações de vários observáveis

7.5.1. Exemplo 14: fundir dois observáveis com [Observable.merge]

Apresentamos agora métodos estáticos da classe [Observable] que permitem combinar vários observáveis num observável resultante.

O primeiro exemplo deste tipo será o seguinte:


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.ProcessAction01;

import java.util.Random;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;

public class Exemple14 {
    public static void main(String[] args) throws InterruptedException {
        // processo 1
        Process<Double> process1 = new Process<>(
                new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
                Schedulers.computation());
        // processo 2
        Process<String> process2 = new Process<>(
                new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), Schedulers.computation(), null);
        // fusão
        Process<?> process12 = new Process<>("process12",
                Observable.merge(process1.getObservable(), process2.getObservable()));
        // subscrições
        ProcessUtils.subscribe(1, process12);
    }
}
  • linhas 15-17: um processo denominado [process1] irá emitir 3 números reais num thread de cálculo. Será também observado num thread de cálculo;
  • linhas 19-20: um processo denominado [process2] irá emitir 2 cadeias de caracteres num thread de cálculo. O thread de observação não é imposto. Vimos anteriormente que, neste caso, o thread de observação é o thread de cálculo;
  • linha 23: os dois processos são fundidos, ou seja, cria-se um observável cujos elementos provêm simultaneamente dos dois processos. Para tal, utiliza-se o método estático [Observable.merge]:
 

Ao contrário do que o esquema acima possa sugerir, durante a fusão, os elementos de um fluxo 1 podem intercalar-se entre os elementos de um fluxo 2. É isso que mostram os resultados da execução:

main : début observation ------Thread[main] ---- Time[56:053]
main : attente fin observation ------Thread[main] ---- Time[56:073]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[56:073]
Observable (process2) call start ------Thread[RxComputationThreadPool-5] ---- Time[56:074]
Observable (process1,0) onNext (64.8) ------Thread[RxComputationThreadPool-4] ---- Time[56:263]
Observable (process2,0) onNext (valeur-0) ------Thread[RxComputationThreadPool-5] ---- Time[56:403]
Observable (process2,1) onNext (valeur-1) ------Thread[RxComputationThreadPool-5] ---- Time[56:515]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-5] ---- Time[56:516]
Subscriber[observateur[0],process12] : onNext (64.8) ------Thread[RxComputationThreadPool-3] ---- Time[56:552]
Subscriber[observateur[0],process12] : onNext ("valeur-0") ------Thread[RxComputationThreadPool-3] ---- Time[56:553]
Subscriber[observateur[0],process12] : onNext ("valeur-1") ------Thread[RxComputationThreadPool-3] ---- Time[56:553]
Observable (process1,1) onNext (56.4) ------Thread[RxComputationThreadPool-4] ---- Time[56:716]
Subscriber[observateur[0],process12] : onNext (56.4) ------Thread[RxComputationThreadPool-3] ---- Time[56:718]
Observable (process1,2) onNext (22.8) ------Thread[RxComputationThreadPool-4] ---- Time[57:082]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[57:083]
Subscriber[observateur[0],process12] : onNext (22.8) ------Thread[RxComputationThreadPool-3] ---- Time[57:084]
Subscriber[observateur[0],process12].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[57:085]
main : fin observation ------Thread[main] ---- Time[57:085]
  • linha 3: o processo [process1] é executado na thread de cálculo [RxComputationThreadPool-4];
  • linha 4: o processo [process2] é executado na thread de cálculo [RxComputationThreadPool-5];
  • linha 9: o processo [process12] é observado na thread de cálculo [RxComputationThreadPool-3]. Não sei qual foi a regra que levou a esta escolha;
  • linhas 9-11: verifica-se que o observador observa elementos dos dois processos [process1] (linha 5) e [process2] (linhas 6, 7), embora nenhum dos dois esteja concluído (há mistura);
  • o processo [process12] termina (linha 17) quando os dois processos process1 e process2 terminam;

7.5.2. Exemplo 15: concatenar dois observáveis com [Observable.concat]

Analisamos agora o código seguinte:


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.ProcessAction01;

import java.util.Random;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;

public class Exemple15 {
    public static void main(String[] args) throws InterruptedException {
        // processo 1
        Process<Double> process1 = new Process<>(
                new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
                Schedulers.computation());
        // processo 2
        Process<String> process2 = new Process<>(
                new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), null, Schedulers.computation());
        // concat
        Process<?> process12 = new Process<>("process12",
                Observable.concat(process1.getObservable(), process2.getObservable()));
        // subscrições
        ProcessUtils.subscribe(1, process12);
    }
}
  • linhas 15-17: um processo denominado [process1] irá emitir 3 números reais num thread de cálculo. Será também observado num thread de cálculo;
  • linhas 19-20: um processo denominado [process2] irá emitir 2 cadeias de caracteres numa thread não especificada, neste caso a thread principal por predefinição. Será observado numa thread de cálculo;
  • linha 23: os dois processos são concatenados, ou seja, cria-se um observável cujos elementos provêm dos dois processos. Não há mistura dos valores emitidos. O processo [process12] irá primeiro emitir todos os valores do processo [process1] e, em seguida, os do processo [process2]. Para tal, utiliza-se o método estático [Observable.concat]:
 

Os resultados da execução são os seguintes:

main : début observation ------Thread[main] ---- Time[30:162]
main : attente fin observation ------Thread[main] ---- Time[30:189]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[30:190]
Observable (process1,0) onNext (79.2) ------Thread[RxComputationThreadPool-4] ---- Time[30:681]
Observable (process1,1) onNext (98.39999999999999) ------Thread[RxComputationThreadPool-4] ---- Time[30:792]
Subscriber[observateur[0],process12] : onNext (79.2) ------Thread[RxComputationThreadPool-3] ---- Time[30:975]
Subscriber[observateur[0],process12] : onNext (98.39999999999999) ------Thread[RxComputationThreadPool-3] ---- Time[30:976]
Observable (process1,2) onNext (84.0) ------Thread[RxComputationThreadPool-4] ---- Time[31:084]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[31:085]
Subscriber[observateur[0],process12] : onNext (84.0) ------Thread[RxComputationThreadPool-3] ---- Time[31:086]
Observable (process2) call start ------Thread[RxComputationThreadPool-3] ---- Time[31:087]
Observable (process2,0) onNext (valeur-0) ------Thread[RxComputationThreadPool-3] ---- Time[31:556]
Subscriber[observateur[0],process12] : onNext ("valeur-0") ------Thread[RxComputationThreadPool-5] ---- Time[31:557]
Observable (process2,1) onNext (valeur-1) ------Thread[RxComputationThreadPool-3] ---- Time[31:608]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[31:609]
Subscriber[observateur[0],process12] : onNext ("valeur-1") ------Thread[RxComputationThreadPool-5] ---- Time[31:609]
Subscriber[observateur[0],process12].onCompleted ------Thread[RxComputationThreadPool-5] ---- Time[31:610]
main : fin observation ------Thread[main] ---- Time[31:611]
  • linhas 3-10: o processo [process1] está a ser executado e o processo [process12] emite os valores gerados pelo [process1];
  • linha 9: o processo [process1] foi concluído;
  • linhas 11-17: o processo [process2] está a ser executado e o processo [process12] transmite os valores emitidos pelo [process2];

Há uma anomalia no processo process2: não tinha sido definido nenhum thread de execução. Seria de esperar que, por predefinição, este fosse o thread principal. No entanto, não é esse o caso. O thread de execução foi o thread de cálculo [RxComputationThreadPool-3] (linha 11). Assim, quando não se define um thread de execução ou de observação, não é possível fazer suposições sobre o thread que será escolhido.

7.5.3. Exemplo 16: combinar duas variáveis observáveis com [Observable.zip]

Analisamos agora o código seguinte:


package dvp.rxjava.observables.exemples;

import java.util.Arrays;
import java.util.Random;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.functions.FuncN;
import rx.schedulers.Schedulers;

public class Exemple16 {
    public static void main(String[] args) throws InterruptedException {
        // processo 1
        Process<Double> process1 = new Process<>(
                new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
                Schedulers.computation());
        // processo 2
        Process<String> process2 = new Process<>(
                new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), null, null);
        // função de combinação dos 2 processos
        FuncN<String> funcn = new FuncN<String>() {
            @Override
            public String call(Object... args) {
                if (args.length == 2) {
                    return String.format("double=%s, string=%s", args[0], args[1]);
                } else {
                    throw new RuntimeException("la fonction attend 2 paramètres exactement");
                }
            }
        };
        // compactação dos 2 processos
        Process<String> process12 = new Process<>("process12",
                Observable.zip(Arrays.asList(process1.getObservable(), process2.getObservable()), funcn));
        // subscrições
        ProcessUtils.subscribe(1, process12);
    }
}
  • linhas 16-18: um processo denominado [process1] irá emitir 3 números reais num thread de cálculo. Será também observado num thread de cálculo;
  • linhas 20-21: um processo denominado [process2] irá emitir 2 cadeias de caracteres num thread não imposto. O thread de observação também não é imposto;
  • linhas 23-32: instanciação de um tipo [FuncN<String>] com uma classe anónima. FuncN é uma interface funcional:
 

O método [FuncN.call] espera um array de objetos e devolve um tipo R. A função [funcn] será utilizada para combinar os processos process1 e process2 nesta ordem. No método [FuncN.call]:

  • args[0] será um Double;
  • args[1] será um String;

Aqui, o resultado de [funcn.call] será a cadeia de caracteres da linha 27. A construção deste resultado não requer o conhecimento dos tipos dos argumentos do método call.

Os dois processos são combinados da seguinte forma:


// compactação dos 2 processos
Process<String> process12 = new Process<>("process12",
Observable.zip(Arrays.asList(process1.getObservable(), process2.getObservable()), funcn));

O método [Observable.zip] funciona da seguinte forma:

 

Verifica-se que:

  • o primeiro argumento do zip é um Iterable<Observable>. No nosso exemplo, temos um parâmetro efetivo do tipo List<Observable>, formado pelos nossos dois observáveis;
  • o segundo argumento do zip é do tipo FuncN. No nosso exemplo, o parâmetro efetivo é [funcn];

A execução produz os seguintes resultados:

main : début observation ------Thread[main] ---- Time[55:636]
Observable (process2) call start ------Thread[main] ---- Time[55:666]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[55:666]
Observable (process1,0) onNext (69.6) ------Thread[RxComputationThreadPool-4] ---- Time[55:902]
Observable (process2,0) onNext (valeur-0) ------Thread[main] ---- Time[56:076]
Observable (process1,1) onNext (82.8) ------Thread[RxComputationThreadPool-4] ---- Time[56:271]
Subscriber[observateur[0],process12] : onNext ("double=69.6, string=valeur-0") ------Thread[main] ---- Time[56:352]
Observable (process1,2) onNext (14.399999999999999) ------Thread[RxComputationThreadPool-4] ---- Time[56:641]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[56:642]
Observable (process2,1) onNext (valeur-1) ------Thread[main] ---- Time[56:778]
Subscriber[observateur[0],process12] : onNext ("double=82.8, string=valeur-1") ------Thread[main] ---- Time[56:779]
Observable (process2) onCompleted ------Thread[main] ---- Time[56:779]
Subscriber[observateur[0],process12].onCompleted ------Thread[main] ---- Time[56:780]
main : attente fin observation ------Thread[main] ---- Time[56:781]
main : fin observation ------Thread[main] ---- Time[56:781]
  • linhas 7, 11: o processo process12 emite dois elementos;
  • linha 8: o elemento adicional emitido pelo processo process1, que não tem um parceiro no processo process2, não é emitido pelo processo de resultado process12;

Verifica-se que o processo process2, ao qual não tinha sido atribuído nem um thread de execução nem um thread de observação, utilizou o thread principal para ambos.

7.5.4. Exemplo 17: combinar dois observáveis com [Observable.combineLatest]

Analisamos agora o código seguinte:


package dvp.rxjava.observables.exemples;

import java.util.Random;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;

public class Exemple17 {
    public static void main(String[] args) throws InterruptedException {
        // processo 1
        Process<Double> process1 = new Process<>(
                new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
                Schedulers.computation());
        // processo 2
        Process<Double> process2 = new Process<>(
                new ProcessAction01<Double>("process2", 2, i -> new Random().nextInt(200) * 1.4), null,
                Schedulers.computation());
        // combinação dos 2 processos
        Process<Double> process12 = new Process<>("process12",
                Observable.combineLatest(process1.getObservable(), process2.getObservable(), (d1, d2) -> d1 + d2));
        // subscrições
        ProcessUtils.subscribe(1, process12);
    }
}
  • linhas 14-16: um processo denominado [process1] irá emitir 3 números reais numa thread de cálculo. Será também observado numa thread de cálculo;
  • linhas 18-20: um processo denominado [process2] irá emitir 2 números reais numa thread não imposta. Estes serão observados numa thread de cálculo;
  • linha 23: os dois observáveis são combinados com o seguinte método estático [Observable.combineLatest]:
 

O observável [combineLatest] funciona da seguinte forma: quando um dos dois observáveis emite um elemento E1, esse elemento é combinado por [combineFunction] com o último elemento emitido pelo outro observável.

A execução deste código produz o seguinte resultado:

main : début observation ------Thread[main] ---- Time[01:768]
Observable (process2) call start ------Thread[main] ---- Time[01:791]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[01:791]
Observable (process1,0) onNext (54.0) ------Thread[RxComputationThreadPool-4] ---- Time[01:991]
Observable (process2,0) onNext (56.0) ------Thread[main] ---- Time[02:245]
Observable (process1,1) onNext (51.6) ------Thread[RxComputationThreadPool-4] ---- Time[02:358]
Subscriber[observateur[0],process12] : onNext (110.0) ------Thread[RxComputationThreadPool-5] ---- Time[02:521]
Subscriber[observateur[0],process12] : onNext (107.6) ------Thread[RxComputationThreadPool-5] ---- Time[02:522]
Observable (process2,1) onNext (261.8) ------Thread[main] ---- Time[02:595]
Observable (process2) onCompleted ------Thread[main] ---- Time[02:596]
main : attente fin observation ------Thread[main] ---- Time[02:596]
Subscriber[observateur[0],process12] : onNext (313.40000000000003) ------Thread[RxComputationThreadPool-5] ---- Time[02:597]
Observable (process1,2) onNext (80.39999999999999) ------Thread[RxComputationThreadPool-4] ---- Time[02:790]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[02:791]
Subscriber[observateur[0],process12] : onNext (342.2) ------Thread[RxComputationThreadPool-3] ---- Time[02:792]
Subscriber[observateur[0],process12].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[02:792]
main : fin observation ------Thread[main] ---- Time[02:793]
  • linha 5: a emissão de process2 (56) é combinada com o último elemento emitido por process1 (54, linha 4) e produz o resultado da linha 7;
  • linha 6: a emissão de process1 (51,6) é combinada com o último elemento emitido por process2 (56, linha 5) e produz o resultado da linha 8;
  • linha 9: a emissão de process2 (261,8) é combinada com o último elemento emitido por process1 (51,6, linha 6) e produz o resultado da linha 12;
  • linha 13: a emissão de process1 (80,39) é combinada com o último elemento emitido por process2 (261,8, linha 9) e produz o resultado da linha 15;

Estamos aqui perante uma variante do observável [zip], em que, desta vez, os elementos combinados não são necessariamente os elementos que ocupam a mesma posição nos fluxos. Repara-se aqui que o processo process2, ao qual não tinha sido atribuída nenhuma thread de execução, foi aqui executado na thread principal (linha 2).

7.5.5. Exemplo 18: combinar dois observáveis com [Observable.amb]

Analisamos agora o código seguinte:


package dvp.rxjava.observables.exemples;

import java.util.Random;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;

public class Exemple18 {
    public static void main(String[] args) throws InterruptedException {
        // processo 1
        Process<Double> process1 = new Process<>(
                new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
                Schedulers.computation());
        // processo 2
        Process<Double> process2 = new Process<>(
                new ProcessAction01<Double>("process2", 2, i -> new Random().nextInt(200) * 1.4), null, null);
        // combinação dos 2 processos
        Process<Double> process12 = new Process<>("process12",
                Observable.amb(process1.getObservable(), process2.getObservable()));
        // subscrições
        ProcessUtils.subscribe(1, process12);
    }
}
  • linhas 14-16: um processo denominado [process1] irá emitir 3 números reais numa thread de cálculo. Será também observado numa thread de cálculo;
  • linhas 18-20: um processo denominado [process2] irá emitir 2 números reais num thread não restrito. Estes serão observados num thread não restrito;
  • linha 22: os dois observáveis são combinados com o seguinte método estático [Observable.amb]:
 

Como mostra o esquema acima, o observável [Observable.amb(Observable o1, Observable o2)] emite os elementos do observável que os emite em primeiro lugar. O que é confirmado pelos resultados do exemplo apresentado:

main : début observation ------Thread[main] ---- Time[21:594]
Observable (process2) call start ------Thread[main] ---- Time[21:612]
Observable (process1) call start ------Thread[RxComputationThreadPool-3] ---- Time[21:612]
Observable (process2,0) onNext (155.39999999999998) ------Thread[main] ---- Time[21:817]
Observable (process1) onError ------Thread[RxComputationThreadPool-3] ---- Time[21:820]
Observable (process1,0) onNext (90.0) ------Thread[RxComputationThreadPool-3] ---- Time[21:820]
Observable (process1,1) onNext (104.39999999999999) ------Thread[RxComputationThreadPool-3] ---- Time[21:877]
Subscriber[observateur[0],process12] : onNext (155.39999999999998) ------Thread[main] ---- Time[22:105]
Observable (process1,2) onNext (44.4) ------Thread[RxComputationThreadPool-3] ---- Time[22:122]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[22:123]
Observable (process2,1) onNext (201.6) ------Thread[main] ---- Time[22:581]
Subscriber[observateur[0],process12] : onNext (201.6) ------Thread[main] ---- Time[22:583]
Observable (process2) onCompleted ------Thread[main] ---- Time[22:583]
Subscriber[observateur[0],process12].onCompleted ------Thread[main] ---- Time[22:584]
main : attente fin observation ------Thread[main] ---- Time[22:585]
main : fin observation ------Thread[main] ---- Time[22:586]
  • na linha 4, é o processo process2 que emite primeiro;
  • linhas 8 e 12: o processo process12 emite todos os elementos emitidos pelo processo process2 (linhas 4 e 11);

7.6. Cadeia de processamento de um observável

7.6.1. Exemplo 19: transformar um observável com [Observable.map]

Nos exemplos anteriores, analisámos várias combinações de dois observáveis num terceiro observável. Apresentamos agora métodos estáticos da classe [Observable] que permitem operações de transformação, filtragem e agregação num observável. Encontraremos aqui métodos análogos aos da classe [Stream] estudados no parágrafo 5.

O nosso primeiro exemplo será o seguinte:


package dvp.rxjava.observables.exemples;

import java.util.Random;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;

public class Exemple19 {
    public static void main(String[] args) throws InterruptedException {
        // processo 1
        Process<Double> process1 = new Process<>(
                new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
                Schedulers.computation());
        // processo 2
        Process<String> process2 = new Process<>("process2",
                process1.getObservable().map(d -> String.format("valeur-%s", d)));
        // subscrições
        ProcessUtils.subscribe(1, process2);
    }
}
  • linhas 14-16: um processo denominado process1 irá emitir 3 números reais num thread de cálculo. Será também observado num thread de cálculo;
  • linhas 17-18: os números emitidos por process1 serão transformados em cadeias de caracteres num processo process2;
  • linha 20: observa-se o process2;

O método [Observable.map] da linha 18 é análogo ao método [Stream.map] analisado no parágrafo 5.5:

 

Os resultados do exemplo são os seguintes:

main : début observation ------Thread[main] ---- Time[55:328]
main : attente fin observation ------Thread[main] ---- Time[55:346]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[55:347]
Observable (process1,0) onNext (21.599999999999998) ------Thread[RxComputationThreadPool-4] ---- Time[55:354]
Observable (process1,1) onNext (97.2) ------Thread[RxComputationThreadPool-4] ---- Time[55:512]
Subscriber[observateur[0],process2] : onNext ("valeur-21.599999999999998") ------Thread[RxComputationThreadPool-3] ---- Time[55:615]
Subscriber[observateur[0],process2] : onNext ("valeur-97.2") ------Thread[RxComputationThreadPool-3] ---- Time[55:616]
Observable (process1,2) onNext (98.39999999999999) ------Thread[RxComputationThreadPool-4] ---- Time[55:803]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[55:804]
Subscriber[observateur[0],process2] : onNext ("valeur-98.39999999999999") ------Thread[RxComputationThreadPool-3] ---- Time[55:804]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[55:805]
main : fin observation ------Thread[main] ---- Time[55:805]
  • linhas 4, 5 e 8: os resultados de process1. Trata-se de números reais;
  • linhas 6, 7 e 10: as emissões de process2 observadas. São cadeias de caracteres;

7.6.2. Exemplo-20: filtrar uma variável observável com [Observable.filter]

O exemplo será o seguinte:


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;

public class Exemple20 {
    public static void main(String[] args) throws InterruptedException {
        // processo 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // processo 2
        Process<Integer> process2 = new Process<>("process2", process1.getObservable().filter(i -> i % 2 == 0));
        // subscrições
        ProcessUtils.subscribe(1, process2);
    }
}
  • linhas 11-12: um processo denominado process1 irá emitir os números inteiros de 0 a 2 num thread de cálculo. Será também observado num thread de cálculo;
  • linha 14: os números emitidos por process1 serão filtrados para que apenas os números pares fiquem em process2;
  • linha 20: observa-se o process2;

O método [Observable.filter] da linha 18 é análogo ao método [Stream.filter] analisado no parágrafo 5.4:

 

Os resultados do exemplo são os seguintes:

main : début observation ------Thread[main] ---- Time[30:319]
main : attente fin observation ------Thread[main] ---- Time[30:335]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[30:336]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[30:388]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[30:625]
Subscriber[observateur[0],process2] : onNext (0) ------Thread[RxComputationThreadPool-3] ---- Time[30:703]
Observable (process1,2) onNext (2) ------Thread[RxComputationThreadPool-4] ---- Time[30:704]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[30:705]
Subscriber[observateur[0],process2] : onNext (2) ------Thread[RxComputationThreadPool-3] ---- Time[30:706]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[30:707]
main : fin observation ------Thread[main] ---- Time[30:707]
  • linhas 4, 5 e 7: as transmissões de process1;
  • linhas 6 e 9: as emissões de process2 observadas. São os elementos de process1 que são pares;

7.6.3. Exemplo 21: transformar um observável com [Observable.flatMap]

O exemplo será o seguinte:


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;

public class Exemple21 {
    public static void main(String[] args) throws InterruptedException {
        // processo 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // processo 2
        Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMap(i -> {
            int value = i * 10;
            return Observable.just(value, value + 1, value + 2);
        }));
        // subscrições
        ProcessUtils.subscribe(1, process2);
    }
}
  • linhas 12-13: um processo denominado process1 irá emitir os números inteiros de 0 a 2 num thread de cálculo. Será também observado num thread de cálculo;
  • linhas 15-18: cada número n emitido por process1 é transformado num observável que emite os 3 números (10*n, 10*n+1, 10*n+2). Se, na linha 15, se utilizasse o método [map], o process2 emitiria um tipo Observable<Integer> e não um tipo Integer. O método [flatMap] utilizado permite simplificar (flatten) esta sequência de elementos do tipo Observable<Integer> numa sequência de elementos do tipo Integer constituída por cada um dos elementos de cada um dos Observable<Integer>;
  • linha 20: observa-se process2;

O método [Observable.flatMap] da linha 15 é análogo ao método [Stream.flatMap] analisado no parágrafo 5.6.12:

 

Os resultados do exemplo são os seguintes:

main : début observation ------Thread[main] ---- Time[31:466]
main : attente fin observation ------Thread[main] ---- Time[31:486]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[31:486]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[31:777]
Subscriber[observateur[0],process2] : onNext (0) ------Thread[RxComputationThreadPool-3] ---- Time[32:082]
Subscriber[observateur[0],process2] : onNext (1) ------Thread[RxComputationThreadPool-3] ---- Time[32:085]
Subscriber[observateur[0],process2] : onNext (2) ------Thread[RxComputationThreadPool-3] ---- Time[32:087]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[32:192]
Subscriber[observateur[0],process2] : onNext (10) ------Thread[RxComputationThreadPool-3] ---- Time[32:194]
Subscriber[observateur[0],process2] : onNext (11) ------Thread[RxComputationThreadPool-3] ---- Time[32:196]
Subscriber[observateur[0],process2] : onNext (12) ------Thread[RxComputationThreadPool-3] ---- Time[32:197]
Observable (process1,2) onNext (2) ------Thread[RxComputationThreadPool-4] ---- Time[32:686]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[32:687]
Subscriber[observateur[0],process2] : onNext (20) ------Thread[RxComputationThreadPool-3] ---- Time[32:688]
Subscriber[observateur[0],process2] : onNext (21) ------Thread[RxComputationThreadPool-3] ---- Time[32:690]
Subscriber[observateur[0],process2] : onNext (22) ------Thread[RxComputationThreadPool-3] ---- Time[32:692]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[32:693]
main : fin observation ------Thread[main] ---- Time[32:693]
  • linhas 5-7: as três transmissões de process2 na sequência da transmissão da linha 4 de process1;
  • linhas 9-11: as três emissões de process2 na sequência da emissão da linha 8 de process1;
  • linhas 14-16: as três emissões de process2 na sequência da emissão da linha 12 de process1;

O código seguinte mostra como criar um tipo Observable<Integer[]> a partir de process1 e [Exemple21b]:


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;

public class Exemple21b {
    public static void main(String[] args) throws InterruptedException {
        // processo 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // processo 2
        Process<Integer[]> process2 = new Process<>("process2", process1.getObservable().map(i -> {
            int value = i * 10;
            return new Integer[] { value, value + 1, value + 2 };
        }));
        // subscrições
        ProcessUtils.subscribe(1, process2);
    }
}
  • linha 14: utiliza-se o método [Observable.map];
  • linha 16: que devolve um tipo Integer[];

Os resultados são os seguintes:

main : début observation ------Thread[main] ---- Time[58:089]
main : attente fin observation ------Thread[main] ---- Time[58:107]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[58:108]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[58:503]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[58:762]
Subscriber[observateur[0],process2] : onNext ([0,1,2]) ------Thread[RxComputationThreadPool-3] ---- Time[58:792]
Subscriber[observateur[0],process2] : onNext ([10,11,12]) ------Thread[RxComputationThreadPool-3] ---- Time[58:795]
Observable (process1,2) onNext (2) ------Thread[RxComputationThreadPool-4] ---- Time[58:851]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[58:852]
Subscriber[observateur[0],process2] : onNext ([20,21,22]) ------Thread[RxComputationThreadPool-3] ---- Time[58:853]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[58:854]
main : fin observation ------Thread[main] ---- Time[58:854]
  • linhas 6, 7, 10: vemos os resultados do map;

Todas estas transformações de observáveis podem ser encadeadas, uma vez que cada transformação produz um novo observável. É o que ilustra o exemplo seguinte [Exemple21c]:


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;

public class Exemple21c {
    public static void main(String[] args) throws InterruptedException {
        // processo 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // processo 2
        Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMap(i -> {
            int value = i * 10;
            return Observable.just(value, value + 1, value + 2);
        }).filter(i -> i % 2 == 0));
        // subscrições
        ProcessUtils.subscribe(1, process2);
    }
}
  • linhas 15-18: o flatMap é seguido por um filter;

Os resultados da execução são os seguintes:

main : début observation ------Thread[main] ---- Time[37:993]
main : attente fin observation ------Thread[main] ---- Time[38:016]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[38:017]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[38:124]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[38:366]
Observable (process1,2) onNext (2) ------Thread[RxComputationThreadPool-4] ---- Time[38:380]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[38:381]
Subscriber[observateur[0],process2] : onNext (0) ------Thread[RxComputationThreadPool-3] ---- Time[38:436]
Subscriber[observateur[0],process2] : onNext (2) ------Thread[RxComputationThreadPool-3] ---- Time[38:439]
Subscriber[observateur[0],process2] : onNext (10) ------Thread[RxComputationThreadPool-3] ---- Time[38:441]
Subscriber[observateur[0],process2] : onNext (12) ------Thread[RxComputationThreadPool-3] ---- Time[38:443]
Subscriber[observateur[0],process2] : onNext (20) ------Thread[RxComputationThreadPool-3] ---- Time[38:445]
Subscriber[observateur[0],process2] : onNext (22) ------Thread[RxComputationThreadPool-3] ---- Time[38:446]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[38:447]
main : fin observation ------Thread[main] ---- Time[38:447]
  • linhas 8-13: o process2 emitiu apenas os elementos pares provenientes do flatMap;

Um método semelhante ao [flatMap] é o método [flatMapIterable], ilustrado pelo exemplo seguinte [Exemple21d]:


package dvp.rxjava.observables.exemples;

import java.util.Arrays;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;

public class Exemple21d {
    public static void main(String[] args) throws InterruptedException {
        // processo 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // processo 2
        Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMapIterable(i -> {
            int value = i * 10;
            return Arrays.asList(value, value + 1, value + 2);
        }).filter(i -> i % 2 == 0));
        // subscrições
        ProcessUtils.subscribe(1, process2);
    }
}

Na linha 16, em vez de utilizar o método [flatMap], utiliza-se o método [flatMapIterable]. Neste caso, a função de transformação deve produzir um tipo Iterable<T> (linha 18) em vez de um tipo Observable<T>.

Obtêm-se os mesmos resultados que anteriormente.

Voltemos à definição do método [flatMap]:

 

Como se pode ver acima, um elemento azul [3] foi inserido entre os dois elementos verdes [1-2]. Isto significa que, na sua operação de nivelamento dos Observable<T>, o método [flatMap] respeita a ordem de emissão destes diferentes observáveis internos. Isto é demonstrado pelo exemplo seguinte [Exemple21e]:


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;

public class Exemple21e {
    public static void main(String[] args) throws InterruptedException {
        // processo 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // processo 2
        Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
                Schedulers.computation(), Schedulers.computation());
        // processo 3
        Process<Integer> process3 = new Process<>("process3",
                process1.getObservable().flatMap(i -> process2.getObservable()));
        // subscrições
        ProcessUtils.subscribe(1, process3);
    }
}
  • linhas 11-12: o processo process1 gera os números inteiros [0,1];
  • linhas 14-15: o processo process2 emite os números inteiros [10,11,12];
  • linhas 17-18: a cada elemento emitido pelo process1 é associada a observável do processo process2. Isto significa que:
    • ao elemento [0] do process1 será associada uma observável que emite os [10,11,12];
    • o mesmo se aplica ao elemento 1;

No final, serão emitidos os 6 números [10, 11, 12, 10, 11, 12]. Queremos ver em que ordem.

Os resultados da execução são os seguintes:

main : début observation ------Thread[main] ---- Time[22:540]
main : attente fin observation ------Thread[main] ---- Time[22:566]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[22:566]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[22:949]
Observable (process2) call start ------Thread[RxComputationThreadPool-6] ---- Time[22:951]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[23:159]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[23:160]
Observable (process2) call start ------Thread[RxComputationThreadPool-8] ---- Time[23:160]
Observable (process2,0) onNext (10) ------Thread[RxComputationThreadPool-6] ---- Time[23:286]
Observable (process2,0) onNext (10) ------Thread[RxComputationThreadPool-8] ---- Time[23:513]
Subscriber[observateur[0],process3] : onNext (10) ------Thread[RxComputationThreadPool-5] ---- Time[23:597]
Subscriber[observateur[0],process3] : onNext (10) ------Thread[RxComputationThreadPool-5] ---- Time[23:599]
Observable (process2,1) onNext (11) ------Thread[RxComputationThreadPool-6] ---- Time[23:645]
Subscriber[observateur[0],process3] : onNext (11) ------Thread[RxComputationThreadPool-5] ---- Time[23:647]
Observable (process2,2) onNext (12) ------Thread[RxComputationThreadPool-6] ---- Time[23:789]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-6] ---- Time[23:790]
Subscriber[observateur[0],process3] : onNext (12) ------Thread[RxComputationThreadPool-5] ---- Time[23:791]
Observable (process2,1) onNext (11) ------Thread[RxComputationThreadPool-8] ---- Time[23:976]
Subscriber[observateur[0],process3] : onNext (11) ------Thread[RxComputationThreadPool-7] ---- Time[23:978]
Observable (process2,2) onNext (12) ------Thread[RxComputationThreadPool-8] ---- Time[24:184]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-8] ---- Time[24:184]
Subscriber[observateur[0],process3] : onNext (12) ------Thread[RxComputationThreadPool-7] ---- Time[24:186]
Subscriber[observateur[0],process3].onCompleted ------Thread[RxComputationThreadPool-7] ---- Time[24:187]
main : fin observation ------Thread[main] ---- Time[24:187]

Vemos que a ordem de emissão do processo process3 foi: [10, 10, 11, 12, 11, 12] (linhas 11, 12, 14, 17, 19, 22). Houve, portanto, uma mistura dos elementos emitidos pelo processo process2. É possível evitar isso utilizando o método [concatMap] em vez do método [flatMap]. É isso que mostra o código seguinte [Exemple21ef]:


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;

public class Exemple21ef {
    public static void main(String[] args) throws InterruptedException {
        // processo 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // processo 2
        Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
                Schedulers.computation(), Schedulers.computation());
        // processo 3
        Process<Integer> process3 = new Process<>("process3",
                process1.getObservable().concatMap(i -> process2.getObservable()));
        // subscrições
        ProcessUtils.subscribe(1, process3);
    }
}

Na linha 18, substituímos [flatMap] por [concatMap]. Os resultados da execução são os seguintes:

main : début observation ------Thread[main] ---- Time[45:507]
main : attente fin observation ------Thread[main] ---- Time[45:530]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[45:530]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[45:775]
Observable (process2) call start ------Thread[RxComputationThreadPool-6] ---- Time[45:778]
Observable (process2,0) onNext (10) ------Thread[RxComputationThreadPool-6] ---- Time[45:846]
Observable (process2,1) onNext (11) ------Thread[RxComputationThreadPool-6] ---- Time[45:890]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[45:947]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[45:948]
Observable (process2,2) onNext (12) ------Thread[RxComputationThreadPool-6] ---- Time[46:096]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-6] ---- Time[46:097]
Subscriber[observateur[0],process3] : onNext (10) ------Thread[RxComputationThreadPool-5] ---- Time[46:144]
Subscriber[observateur[0],process3] : onNext (11) ------Thread[RxComputationThreadPool-5] ---- Time[46:147]
Subscriber[observateur[0],process3] : onNext (12) ------Thread[RxComputationThreadPool-5] ---- Time[46:148]
Observable (process2) call start ------Thread[RxComputationThreadPool-8] ---- Time[46:149]
Observable (process2,0) onNext (10) ------Thread[RxComputationThreadPool-8] ---- Time[46:364]
Subscriber[observateur[0],process3] : onNext (10) ------Thread[RxComputationThreadPool-7] ---- Time[46:366]
Observable (process2,1) onNext (11) ------Thread[RxComputationThreadPool-8] ---- Time[46:529]
Subscriber[observateur[0],process3] : onNext (11) ------Thread[RxComputationThreadPool-7] ---- Time[46:531]
Observable (process2,2) onNext (12) ------Thread[RxComputationThreadPool-8] ---- Time[46:558]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-8] ---- Time[46:559]
Subscriber[observateur[0],process3] : onNext (12) ------Thread[RxComputationThreadPool-7] ---- Time[46:560]
Subscriber[observateur[0],process3].onCompleted ------Thread[RxComputationThreadPool-7] ---- Time[46:562]
main : fin observation ------Thread[main] ---- Time[46:562]

Verifica-se que a ordem de emissão do processo process3 foi: [10, 11, 12, 10, 11, 12] (linhas 12-14, 17, 19, 22). Os elementos emitidos pelo processo process2 não foram misturados.

Outra variante do método [map] é o método [switchMap]:

 

Acima, a partir do observável [1], surgem outros 3 observáveis [2] com 2 elementos, que são posteriormente achatados tal como em [flatMap] e [3]. Pode-se observar que o resultado tem 5 elementos e não 6. Isto deve-se ao facto de, antes de o segundo observável emitir o seu elemento n.º 2, [6], o terceiro observável emitir o seu primeiro elemento, [5], o que faz com que o segundo observável seja abandonado. Por conseguinte, o elemento [6] não aparece no observável resultante [3].

Para ilustrar o [switchMap], utilizaremos o seguinte exemplo [Exemple21eg]:


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;

public class Exemple21eg {
    public static void main(String[] args) throws InterruptedException {
        // processo 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // processo 2
        Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
                Schedulers.computation(), Schedulers.computation());
        // processo 3
        Process<Integer> process3 = new Process<>("process3",
                process1.getObservable().switchMap(i -> process2.getObservable()));
        // subscrições
        ProcessUtils.subscribe(1, process3);
    }
}

A execução do exemplo produz os seguintes resultados:

main : début observation ------Thread[main] ---- Time[02:388]
main : attente fin observation ------Thread[main] ---- Time[02:419]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[02:419]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[02:641]
Observable (process2) call start ------Thread[RxComputationThreadPool-6] ---- Time[02:643]
Observable (process2,0) onNext (10) ------Thread[RxComputationThreadPool-6] ---- Time[02:802]
Observable (process2,1) onNext (11) ------Thread[RxComputationThreadPool-6] ---- Time[02:888]
Observable (process2,2) onNext (12) ------Thread[RxComputationThreadPool-6] ---- Time[02:957]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-6] ---- Time[02:958]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[03:005]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[03:007]
Observable (process2) call start ------Thread[RxComputationThreadPool-8] ---- Time[03:007]
Observable (process2,0) onNext (10) ------Thread[RxComputationThreadPool-8] ---- Time[03:106]
Subscriber[observateur[0],process3] : onNext (10) ------Thread[RxComputationThreadPool-5] ---- Time[03:106]
Subscriber[observateur[0],process3] : onNext (10) ------Thread[RxComputationThreadPool-5] ---- Time[03:108]
Observable (process2,1) onNext (11) ------Thread[RxComputationThreadPool-8] ---- Time[03:236]
Subscriber[observateur[0],process3] : onNext (11) ------Thread[RxComputationThreadPool-7] ---- Time[03:238]
Observable (process2,2) onNext (12) ------Thread[RxComputationThreadPool-8] ---- Time[03:716]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-8] ---- Time[03:717]
Subscriber[observateur[0],process3] : onNext (12) ------Thread[RxComputationThreadPool-7] ---- Time[03:718]
Subscriber[observateur[0],process3].onCompleted ------Thread[RxComputationThreadPool-7] ---- Time[03:718]
main : fin observation ------Thread[main] ---- Time[03:719]
  • process1 emite 2 elementos que dão origem a 2 observáveis process2 de 3 elementos;
  • linha 14: o observador recebe o elemento n.º 0 emitido pelo primeiro observável process2, linha 6;
  • linha 15: o observador recebe o elemento n.º 0 emitido pelo segundo observável process2, na linha 13. Não se sabe por que razão não recebeu anteriormente os elementos 1 e 2 emitidos pelo primeiro observável process2 nas linhas 7 e 8. Seja como for, o primeiro observável process2 é abandonado;
  • no final, o observador vê apenas 4 elementos (linhas 14, 15, 17, 20) em vez dos 6 que foram emitidos;

7.6.4. Exemplos-22: outros métodos da classe [Observable]

A classe [Observable] retoma vários métodos da classe [Stream] com um funcionamento análogo. Aqui estão alguns deles. Limitamo-nos a apresentar o código e os respetivos resultados.

[Exemple22a - take=limit]


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;

public class Exemple22a {
    public static void main(String[] args) throws InterruptedException {
        // processo
        Process<Integer> process = new Process<>("process", Observable.range(1, 10).take(3));
        // subscrições
        ProcessUtils.subscribe(1, process);
    }
}

resultados

1
2
3
4
5
6
7
main : début observation ------Thread[main] ---- Time[25:071]
Subscriber[observateur[0],process] : onNext (1) ------Thread[main] ---- Time[25:399]
Subscriber[observateur[0],process] : onNext (2) ------Thread[main] ---- Time[25:402]
Subscriber[observateur[0],process] : onNext (3) ------Thread[main] ---- Time[25:404]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[25:404]
main : attente fin observation ------Thread[main] ---- Time[25:406]
main : fin observation ------Thread[main] ---- Time[25:406]

[Exemple22b - takeLast]


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;

public class Exemple22b {
    public static void main(String[] args) throws InterruptedException {
        // processo
        Process<Integer> process = new Process<>("process", Observable.range(1, 10).takeLast(2));
        // subscrições
        ProcessUtils.subscribe(1, process);
    }
}

resultados

1
2
3
4
5
6
main : début observation ------Thread[main] ---- Time[19:440]
Subscriber[observateur[0],process] : onNext (9) ------Thread[main] ---- Time[19:726]
Subscriber[observateur[0],process] : onNext (10) ------Thread[main] ---- Time[19:728]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[19:728]
main : attente fin observation ------Thread[main] ---- Time[19:729]
main : fin observation ------Thread[main] ---- Time[19:730]

[Exemple22c - skip]


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;

public class Exemple22c {
    public static void main(String[] args) throws InterruptedException {
        // processos
        Process<Integer> process = new Process<>("process", Observable.range(1, 10).skip(5).take(2));
        // subscrições
        ProcessUtils.subscribe(1, process);
    }
}

resultados

1
2
3
4
5
6
main : début observation ------Thread[main] ---- Time[16:685]
Subscriber[observateur[0],process] : onNext (6) ------Thread[main] ---- Time[17:002]
Subscriber[observateur[0],process] : onNext (7) ------Thread[main] ---- Time[17:004]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[17:005]
main : attente fin observation ------Thread[main] ---- Time[17:006]
main : fin observation ------Thread[main] ---- Time[17:006]

[Exemple22d - reduce]


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;

public class Exemple22d {
    public static void main(String[] args) throws InterruptedException {
        // processos
        Process<Integer> process = new Process<>("process", Observable.range(1, 10).reduce(0, (i, a) -> i + a));
        // subscrições
        ProcessUtils.subscribe(1, process);
    }
}
  • linha 10: calcula a soma dos elementos do observável. O resultado é um observável que emite essa soma;

resultados

1
2
3
4
5
main : début observation ------Thread[main] ---- Time[52:412]
Subscriber[observateur[0],process] : onNext (55) ------Thread[main] ---- Time[52:640]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[52:640]
main : attente fin observation ------Thread[main] ---- Time[52:642]
main : fin observation ------Thread[main] ---- Time[52:642]

[Exemple22e - all]


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;

public class Exemple22e {
    public static void main(String[] args) throws InterruptedException {
        // processos
        Process<Boolean> process = new Process<>("process", Observable.range(1, 10).all(i -> i > 10));
        // subscrições
        ProcessUtils.subscribe(1, process);
    }
}
  • linha 10: devolve um Observable<Boolean> que emite o elemento true, se o predicado do método [all] for verdadeiro para todos os elementos; caso contrário, devolve false;

resultados

1
2
3
4
5
main : début observation ------Thread[main] ---- Time[59:866]
Subscriber[observateur[0],process] : onNext (false) ------Thread[main] ---- Time[00:069]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[00:070]
main : attente fin observation ------Thread[main] ---- Time[00:071]
main : fin observation ------Thread[main] ---- Time[00:071]

[Exemple22f - count]


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;

public class Exemple22f {
    public static void main(String[] args) throws InterruptedException {
        // processos
        Process<Integer> process = new Process<>("process", Observable.range(1, 10).count());
        // subscrições
        ProcessUtils.subscribe(1, process);
    }
}
  • linha 10: [Observable.count] cria um observável com 1 elemento que corresponde à soma dos elementos observados;

resultados

1
2
3
4
5
main : début observation ------Thread[main] ---- Time[16:409]
Subscriber[observateur[0],process] : onNext (10) ------Thread[main] ---- Time[16:634]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[16:634]
main : attente fin observation ------Thread[main] ---- Time[16:635]
main : fin observation ------Thread[main] ---- Time[16:635]

[Exemple22g - distinct]


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;

public class Exemple22g {
    public static void main(String[] args) throws InterruptedException {
        // processos
        Process<Integer> process = new Process<>("process", Observable.just(1, 2, 1, 3).distinct());
        // subscrições
        ProcessUtils.subscribe(1, process);
    }
}

resultados

1
2
3
4
5
6
7
main : début observation ------Thread[main] ---- Time[05:373]
Subscriber[observateur[0],process] : onNext (1) ------Thread[main] ---- Time[05:594]
Subscriber[observateur[0],process] : onNext (2) ------Thread[main] ---- Time[05:595]
Subscriber[observateur[0],process] : onNext (3) ------Thread[main] ---- Time[05:596]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[05:597]
main : attente fin observation ------Thread[main] ---- Time[05:597]
main : fin observation ------Thread[main] ---- Time[05:597]

[Exemple22h - groupBy, asObservable]


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.observables.GroupedObservable;

public class Exemple22h {
    public static void main(String[] args) throws InterruptedException {
        // processos
        Observable<GroupedObservable<Boolean, Integer>> obs = Observable.range(1, 10).groupBy(i -> i % 2 == 0);
        Process<Integer> process = new Process<>("process", obs.concatMap(g -> g.asObservable()));
        // subscrições
        ProcessUtils.subscribe(1, process);
    }
}
  • linha 11: o método [groupBy] agrupa os 10 elementos emitidos em 2 grupos, os números pares e os números ímpares. O resultado é um tipo Observable<GroupedObservable<Boolean, Integer>>, ou seja, um observável cujos elementos são do tipo GroupedObservable<Boolean, Integer>, em que Boolean é o tipo da chave do grupo (false, true, neste caso) e que é também o tipo do resultado da função lambda passada como parâmetro ao método [groupBy], sendo Integer o tipo dos elementos do grupo;
  • linha 12: o tipo GroupedObservable possui um método [asObservable] que permite criar um observável a partir desse tipo. Teremos, portanto, dois tipos Observable<Integer>, um para os números pares e outro para os números ímpares. A partir destes dois observáveis, o método [concatMap] irá criar um único observável;

resultados

main : début observation ------Thread[main] ---- Time[23:809]
Subscriber[observateur[0],process] : onNext (1) ------Thread[main] ---- Time[24:034]
Subscriber[observateur[0],process] : onNext (3) ------Thread[main] ---- Time[24:036]
Subscriber[observateur[0],process] : onNext (5) ------Thread[main] ---- Time[24:037]
Subscriber[observateur[0],process] : onNext (7) ------Thread[main] ---- Time[24:038]
Subscriber[observateur[0],process] : onNext (9) ------Thread[main] ---- Time[24:039]
Subscriber[observateur[0],process] : onNext (2) ------Thread[main] ---- Time[24:041]
Subscriber[observateur[0],process] : onNext (4) ------Thread[main] ---- Time[24:043]
Subscriber[observateur[0],process] : onNext (6) ------Thread[main] ---- Time[24:044]
Subscriber[observateur[0],process] : onNext (8) ------Thread[main] ---- Time[24:045]
Subscriber[observateur[0],process] : onNext (10) ------Thread[main] ---- Time[24:046]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[24:047]
main : attente fin observation ------Thread[main] ---- Time[24:047]
main : fin observation ------Thread[main] ---- Time[24:048]

[Exemple22i - timestamp]


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
import rx.schedulers.Timestamped;

public class Exemple22i {
    public static void main(String[] args) throws InterruptedException {
        // processo 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // processo 2
        Process<Timestamped<Integer>> process2 = new Process<>("process2", process1.getObservable().timestamp());
        // subscrições
        ProcessUtils.subscribe(1, process2);
    }
}
  • na linha 15, o método [timestamp] associa uma hora a cada elemento da variável observável processada;

resultados

main : début observation ------Thread[main] ---- Time[59:362]
main : attente fin observation ------Thread[main] ---- Time[59:377]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[59:378]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[59:553]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[59:692]
Subscriber[observateur[0],process2] : onNext ({"timestampMillis":1462975259555,"value":0}) ------Thread[RxComputationThreadPool-3] ---- Time[59:789]
Subscriber[observateur[0],process2] : onNext ({"timestampMillis":1462975259789,"value":1}) ------Thread[RxComputationThreadPool-3] ---- Time[59:791]
Observable (process1,2) onNext (2) ------Thread[RxComputationThreadPool-4] ---- Time[00:025]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[00:027]
Subscriber[observateur[0],process2] : onNext ({"timestampMillis":1462975260026,"value":2}) ------Thread[RxComputationThreadPool-3] ---- Time[00:031]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[00:033]
main : fin observation ------Thread[main] ---- Time[00:034]

Neste exemplo, é difícil determinar o que representa a informação timestamp:

  • linhas 4-5: verifica-se que o elemento 1 de process1 foi emitido 139 ms após o elemento 0;
  • linhas 6 e 7: verifica-se que o elemento 1 de process2 foi observado 234 ms após o elemento 0;
  • linhas 5 e 8: verifica-se que o elemento 2 de process1 foi emitido 33 ms após o elemento 1;
  • linhas 7 e 10: verifica-se que o elemento 2 de process2 foi observado 37 ms após o elemento 1;

Estes desfasamentos devem-se ao facto de os threads de observação e de execução dos observáveis não serem os mesmos. Se substituirmos as linhas 12-13 pelas seguintes (Exemplo22j):


// processo 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
null);
  • linhas 2-3: não se impõe o thread de observação. Sabe-se que, nesse caso, o observável é observado no local onde é executado;

Isto dá os seguintes resultados:

main : début observation ------Thread[main] ---- Time[43:834]
main : attente fin observation ------Thread[main] ---- Time[43:845]
Observable (process1) call start ------Thread[RxComputationThreadPool-1] ---- Time[43:846]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-1] ---- Time[44:291]
Subscriber[observateur[0],process2] : onNext ({"timestampMillis":1462976384293,"value":0}) ------Thread[RxComputationThreadPool-1] ---- Time[44:552]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-1] ---- Time[44:878]
Subscriber[observateur[0],process2] : onNext ({"timestampMillis":1462976384879,"value":1}) ------Thread[RxComputationThreadPool-1] ---- Time[44:884]
Observable (process1,2) onNext (2) ------Thread[RxComputationThreadPool-1] ---- Time[45:274]
Subscriber[observateur[0],process2] : onNext ({"timestampMillis":1462976385275,"value":2}) ------Thread[RxComputationThreadPool-1] ---- Time[45:280]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[45:281]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[45:283]
main : fin observation ------Thread[main] ---- Time[45:284]
  • linhas 4 e 6: o processo process1 emite o seu elemento n.º 1 587 ms após o seu elemento n.º 0;
  • linhas 5 e 7: o observador observa estes dois elementos com um intervalo de 586 ms;
  • linhas 6 e 8: o processo process1 emite o seu elemento n.º 2 396 ms após o seu elemento n.º 1;
  • linhas 7 e 9: o observador observa estes dois elementos com um intervalo de 396 ms;

Aqui, os valores do timestamp são coerentes: representam efetivamente a data de emissão do elemento.

7.7. Os agendadores

7.7.1. Exemplo 23: o agendador [Schedulers.computation]

Vamos agora analisar os agendadores de execução. A observação será feita no thread de execução.

O tema dos agendadores é um pouco obscuro. Os diferentes agendadores são apresentados nesta questão no site de StackOverflow [http://stackoverflow.com/questions/31276164/rxjava-schedulers-use-cases]:

 

Vamos tentar ilustrar a utilização destes diferentes agendadores através de exemplos. O primeiro ilustra o agendador [Schedulers.computation]:


package dvp.rxjava.observables.exemples;

import java.util.Random;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;

public class Exemple23 {
    public static void main(String[] args) throws InterruptedException {
        // processos
        @SuppressWarnings("unchecked")
        Process<Double> processes[] = new Process[10];
        for (int i = 0; i < processes.length; i++) {
            processes[i] = new Process<>(
                    new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
                    Schedulers.computation(), null);
        }
        // subscrições
        ProcessUtils.subscribe(1, processes);
    }
}
  • linhas 14-19: cria-se uma matriz de 10 processos a serem executados num thread de cálculo;
  • linha 17: cada processo gera um número real aleatório;
  • linha 21: subscrevem-se todos esses processos;

Os resultados são os seguintes:

main : début observation ------Thread[main] ---- Time[01:034]
Observable (process0) call start ------Thread[RxComputationThreadPool-1] ---- Time[01:042]
Observable (process2) call start ------Thread[RxComputationThreadPool-3] ---- Time[01:042]
Observable (process1) call start ------Thread[RxComputationThreadPool-2] ---- Time[01:042]
Observable (process5) call start ------Thread[RxComputationThreadPool-6] ---- Time[01:043]
Observable (process7) call start ------Thread[RxComputationThreadPool-8] ---- Time[01:043]
Observable (process4) call start ------Thread[RxComputationThreadPool-5] ---- Time[01:042]
Observable (process3) call start ------Thread[RxComputationThreadPool-4] ---- Time[01:042]
main : attente fin observation ------Thread[main] ---- Time[01:043]
Observable (process6) call start ------Thread[RxComputationThreadPool-7] ---- Time[01:043]
Observable (process3,0) onNext (70.8) ------Thread[RxComputationThreadPool-4] ---- Time[01:115]
Observable (process1,0) onNext (13.2) ------Thread[RxComputationThreadPool-2] ---- Time[01:153]
Observable (process0,0) onNext (63.599999999999994) ------Thread[RxComputationThreadPool-1] ---- Time[01:215]
Subscriber[observateur[0],process0] : onNext (63.599999999999994) ------Thread[RxComputationThreadPool-1] ---- Time[01:326]
Subscriber[observateur[0],process3] : onNext (70.8) ------Thread[RxComputationThreadPool-4] ---- Time[01:326]
Subscriber[observateur[0],process1] : onNext (13.2) ------Thread[RxComputationThreadPool-2] ---- Time[01:326]
Observable (process3) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[01:326]
Observable (process0) onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[01:326]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-2] ---- Time[01:327]
Subscriber[observateur[0],process0].onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[01:327]
Subscriber[observateur[0],process3].onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[01:327]
Subscriber[observateur[0],process1].onCompleted ------Thread[RxComputationThreadPool-2] ---- Time[01:327]
Observable (process8) call start ------Thread[RxComputationThreadPool-1] ---- Time[01:329]
Observable (process9) call start ------Thread[RxComputationThreadPool-2] ---- Time[01:329]
...
main : fin observation ------Thread[main] ---- Time[01:610]
  • linhas 2-10: os primeiros 8 processos iniciam-se em 8 threads diferentes (a máquina utilizada tem 8 núcleos). É possível observar que todos iniciam aproximadamente ao mesmo tempo;
  • linhas 17-19: 3 processos terminam, libertando assim 3 threads;
  • linhas 23-24: os dois últimos processos podem então iniciar, ocupando 2 das threads assim libertadas;

Conclui-se, portanto, que o agendador [Schedulers.computation] fornece um conjunto de n threads, em que n é o número de núcleos da máquina. As threads são executadas em paralelo nesses núcleos.

7.7.2. Exemplo 24: o agendador [Schedulers.io]

Executamos o código anterior com o agendador [Schedulers.io]:


package dvp.rxjava.observables.exemples;

import java.util.Random;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;

public class Exemple24 {
    public static void main(String[] args) throws InterruptedException {
        // processos
        @SuppressWarnings("unchecked")
        Process<Double> processes[] = new Process[10];
        for (int i = 0; i < processes.length; i++) {
            processes[i] = new Process<>(
                    new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
                    Schedulers.io(), null);
        }
        // subscrições
        ProcessUtils.subscribe(1, processes);
    }
}
  • linha 18: os processos são executados com os threads do agendador [Schedulers.io];

Isto produz os seguintes resultados:

main : début observation ------Thread[main] ---- Time[03:451]
Observable (process0) call start ------Thread[RxCachedThreadScheduler-1] ---- Time[03:459]
Observable (process1) call start ------Thread[RxCachedThreadScheduler-2] ---- Time[03:459]
Observable (process2) call start ------Thread[RxCachedThreadScheduler-3] ---- Time[03:460]
Observable (process3) call start ------Thread[RxCachedThreadScheduler-4] ---- Time[03:460]
Observable (process4) call start ------Thread[RxCachedThreadScheduler-5] ---- Time[03:464]
Observable (process5) call start ------Thread[RxCachedThreadScheduler-6] ---- Time[03:464]
Observable (process6) call start ------Thread[RxCachedThreadScheduler-7] ---- Time[03:465]
Observable (process8) call start ------Thread[RxCachedThreadScheduler-9] ---- Time[03:465]
Observable (process9) call start ------Thread[RxCachedThreadScheduler-10] ---- Time[03:465]
main : attente fin observation ------Thread[main] ---- Time[03:465]
Observable (process7) call start ------Thread[RxCachedThreadScheduler-8] ---- Time[03:465]
Observable (process7,0) onNext (54.0) ------Thread[RxCachedThreadScheduler-8] ---- Time[03:473]
Observable (process8,0) onNext (116.39999999999999) ------Thread[RxCachedThreadScheduler-9] ---- Time[03:500]
Observable (process6,0) onNext (105.6) ------Thread[RxCachedThreadScheduler-7] ---- Time[03:506]
Observable (process0,0) onNext (96.0) ------Thread[RxCachedThreadScheduler-1] ---- Time[03:509]
Observable (process5,0) onNext (25.2) ------Thread[RxCachedThreadScheduler-6] ---- Time[03:583]
Observable (process3,0) onNext (97.2) ------Thread[RxCachedThreadScheduler-4] ---- Time[03:684]
Subscriber[observateur[0],process7] : onNext (54.0) ------Thread[RxCachedThreadScheduler-8] ---- Time[03:685]
Subscriber[observateur[0],process6] : onNext (105.6) ------Thread[RxCachedThreadScheduler-7] ---- Time[03:685]
Subscriber[observateur[0],process0] : onNext (96.0) ------Thread[RxCachedThreadScheduler-1] ---- Time[03:685]
Subscriber[observateur[0],process8] : onNext (116.39999999999999) ------Thread[RxCachedThreadScheduler-9] ---- Time[03:685]
Observable (process0) onCompleted ------Thread[RxCachedThreadScheduler-1] ---- Time[03:686]
Observable (process6) onCompleted ------Thread[RxCachedThreadScheduler-7] ---- Time[03:686]
Observable (process7) onCompleted ------Thread[RxCachedThreadScheduler-8] ---- Time[03:685]
...
main : fin observation ------Thread[main] ---- Time[03:933]
  • linhas 2-10: os 10 processos são iniciados, cada um num thread diferente. Ao contrário do caso anterior, todos os processos puderam ser iniciados. Verifica-se que estes inícios demoram 6 ms, enquanto anteriormente demoravam 1 ms;
  • linhas 13-18: os observáveis emitem um após o outro e não de forma quase paralela, como tinha acontecido anteriormente;

Qual é a diferença entre os agendadores [Schedulers.io] e [Schedulers.computation]? Pode encontrar-se uma resposta no URL [http://stackoverflow.com/questions/31276164/rxjava-schedulers-use-cases]:

 

7.7.3. Exemplo 25: o agendador [Schedulers.newThread]

Executamos o código anterior com o agendador [Schedulers.newThread]:


package dvp.rxjava.observables.exemples;

import java.util.Random;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;

public class Exemple25 {
    public static void main(String[] args) throws InterruptedException {
        // processos
        @SuppressWarnings("unchecked")
        Process<Double> processes[] = new Process[10];
        for (int i = 0; i < processes.length; i++) {
            processes[i] = new Process<>(
                    new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
                    Schedulers.newThread(), null);
        }
        // subscrições
        ProcessUtils.subscribe(1, processes);
    }
}

Os resultados obtidos são os mesmos que com o agendador [Schedulers.io]:

main : début observation ------Thread[main] ---- Time[17:058]
Observable (process0) call start ------Thread[RxNewThreadScheduler-1] ---- Time[17:065]
Observable (process1) call start ------Thread[RxNewThreadScheduler-2] ---- Time[17:065]
Observable (process2) call start ------Thread[RxNewThreadScheduler-3] ---- Time[17:066]
Observable (process3) call start ------Thread[RxNewThreadScheduler-4] ---- Time[17:066]
Observable (process4) call start ------Thread[RxNewThreadScheduler-5] ---- Time[17:068]
Observable (process5) call start ------Thread[RxNewThreadScheduler-6] ---- Time[17:069]
Observable (process6) call start ------Thread[RxNewThreadScheduler-7] ---- Time[17:069]
Observable (process8) call start ------Thread[RxNewThreadScheduler-9] ---- Time[17:069]
Observable (process7) call start ------Thread[RxNewThreadScheduler-8] ---- Time[17:069]
Observable (process9) call start ------Thread[RxNewThreadScheduler-10] ---- Time[17:069]
main : attente fin observation ------Thread[main] ---- Time[17:069]
Observable (process6,0) onNext (25.2) ------Thread[RxNewThreadScheduler-7] ---- Time[17:120]
Observable (process3,0) onNext (39.6) ------Thread[RxNewThreadScheduler-4] ---- Time[17:193]
Observable (process5,0) onNext (21.599999999999998) ------Thread[RxNewThreadScheduler-6] ---- Time[17:212]
Observable (process0,0) onNext (19.2) ------Thread[RxNewThreadScheduler-1] ---- Time[17:273]
Observable (process8,0) onNext (81.6) ------Thread[RxNewThreadScheduler-9] ---- Time[17:308]
Subscriber[observateur[0],process3] : onNext (39.6) ------Thread[RxNewThreadScheduler-4] ---- Time[17:331]
Subscriber[observateur[0],process0] : onNext (19.2) ------Thread[RxNewThreadScheduler-1] ---- Time[17:331]
Subscriber[observateur[0],process6] : onNext (25.2) ------Thread[RxNewThreadScheduler-7] ---- Time[17:331]
Subscriber[observateur[0],process8] : onNext (81.6) ------Thread[RxNewThreadScheduler-9] ---- Time[17:331]
Subscriber[observateur[0],process5] : onNext (21.599999999999998) ------Thread[RxNewThreadScheduler-6] ---- Time[17:331]
Observable (process8) onCompleted ------Thread[RxNewThreadScheduler-9] ---- Time[17:333]
Observable (process5) onCompleted ------Thread[RxNewThreadScheduler-6] ---- Time[17:333]
Observable (process6) onCompleted ------Thread[RxNewThreadScheduler-7] ---- Time[17:332]
Observable (process0) onCompleted ------Thread[RxNewThreadScheduler-1] ---- Time[17:332]
Observable (process3) onCompleted ------Thread[RxNewThreadScheduler-4] ---- Time[17:332]
...
main : fin observation ------Thread[main] ---- Time[17:571]

No URL e no [http://stackoverflow.com/questions/33415881/retrofit-with-rxjava-schedulers-newthread-vs-schedulers-io], explica-se que o agendador [Schedulers.io] fornece um conjunto de threads, o que não acontece com o agendador [Schedulers.newThread]. Um conjunto de threads cria automaticamente um número n de threads. Aloca-as aos processos que delas necessitam. Quando estes terminam, as suas threads não são eliminadas, mas regressam ao conjunto e podem então ser reutilizadas por outro processo. Isto é mais eficiente do que criar e eliminar threads constantemente. Por isso, pode considerar-se que é preferível utilizar o agendador [Schedulers.io].

7.7.4. Exemplo 26: os agendadores [Schedulers.immediate, Schedulers.trampoline]

Voltemos à explicação dada para estes dois agendadores:

 

A explicação é bastante simples de compreender, mas quando se tenta ilustrá-la, percebe-se que não a compreendemos. Foi o livro [Learning Reactive Programming With Java 8] que me permitiu criar um exemplo que retoma um exemplo encontrado nesse livro, mas que o simplifica. É o seguinte:


package dvp.rxjava.observables.exemples;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.function.Consumer;

import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Scheduler;
import rx.Scheduler.Worker;
import rx.functions.Action0;
import rx.schedulers.Schedulers;

public class Exemple26 {
    public static void main(String[] args) throws InterruptedException {

        // um agendador
        Scheduler scheduler = Schedulers.immediate();
        // um worker desse agendador
        Worker worker = scheduler.createWorker();
        // um tipo Action0 a ser executado no trabalhador
        Action0 action02 = new Action0() {
            @Override
            public void call() {
                // registo da ação02
                ProcessUtils.showInfos.accept("action02");
            }
        };

        // um tipo Action0 a executar no worker
        Action0 action01 = new Action0() {
            @Override
            public void call() {
                // programa-se uma nova ação no mesmo worker
                worker.schedule(action02);
                // registo da ação01
                ProcessUtils.showInfos.accept("action01");
            }
        };
        // a ação 01 está programada no worker
        worker.schedule(action01);
    }

    // visualizações
    static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
            Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));

}
  • linha 17: um agendador. Será ou o [Schedulers.immediate], como aqui, ou o [Schedulers.trampoline] posteriormente;
  • linha 19: é possível executar ações do tipo Action0 (linhas 21, 20) nos workers do agendador. O método [Scheduler.createWorker] permite criar um worker. O método [Worker.schedule(Action0)] permite que um worker execute um tipo Action0;
  • linhas 21-27: uma primeira ação denominada [action02] que será executada (linha 40) pelo worker da linha 19;
  • linhas 30-38: uma segunda ação denominada [action01]. Esta ação tem a particularidade de fazer com que a ação action02 seja executada no mesmo worker que ela (linha 34). É aqui que reside a diferença entre [Schedulers.immediate] e [Schedulers.trampoline]:
    • se o agendador for [Schedulers.immediate], então, na linha 34, a ação action02 será executada imediatamente (daí o nome do agendador) e a ação action01 em curso será interrompida. Aparecerá então a mensagem da linha 25. Concluída a ação action02, a ação action01 será retomada e aparecerá a mensagem da linha 36;
    • se o agendador for [Schedulers.trampoline], então, na linha 34, a ação action02 é colocada em espera. Só será executada quando a tarefa em curso action01 estiver concluída. Veremos então aparecer a mensagem da linha 36. Concluída a ação action01, a ação action02 será executada e veremos a mensagem da linha 25;

A execução do código acima produz os seguintes resultados:

action02 ------Thread[main] ---- Time[38:480]
action01 ------Thread[main] ---- Time[38:485]

Se, na linha 17, utilizarmos o agendador [Schedulers.trampoline], obtemos os resultados opostos:

action01 ------Thread[main] ---- Time[42:972]
action02 ------Thread[main] ---- Time[42:976]

Dito isto, é difícil estabelecer uma ligação com os observáveis. Não encontrei nenhum exemplo convincente que pudesse demonstrar a vantagem de executar um observável numa destas duas threads. Aqui está, no entanto, um exemplo, mas que não me parece nada natural:


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.Scheduler.Worker;
import rx.functions.Action1;
import rx.schedulers.Schedulers;

public class Exemple27 {
    public static void main(String[] args) throws InterruptedException {

        // Worker
        Worker worker = Schedulers.immediate().createWorker();
        // Worker worker = Schedulers.trampoline().createWorker();
        // observável 1 no worker
        worker.schedule(() -> Observable.range(1, 2).subscribe(new Action1<Integer>() {

            @Override
            public void call(Integer i) {
                ProcessUtils.showInfos.accept(String.valueOf(i));
                // observável 2 no mesmo worker
                worker.schedule(() -> Observable.range(100, 2).subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer i) {
                        ProcessUtils.showInfos.accept(String.valueOf(i));
                    }
                }));
            }
        }));
    }
}
  • linhas 13-14: cria-se um worker a partir de um dos dois agendadores [Schedulers.immediate] e [Schedulers.trampoline];
  • linha 16: um primeiro observável obs1 é programado neste worker para emitir os números [1,2]
  • linha 22: sempre que um elemento deste observável obs1 é observado, é iniciada a observação de um segundo observável obs2 no mesmo worker para emitir os números [100,101];

Com o agendador [Schedulers.immediate], obtêm-se os seguintes resultados:

1
2
3
4
5
6
1 ------Thread[main] ---- Time[44:604]
100 ------Thread[main] ---- Time[44:610]
101 ------Thread[main] ---- Time[44:610]
2 ------Thread[main] ---- Time[44:612]
100 ------Thread[main] ---- Time[44:612]
101 ------Thread[main] ---- Time[44:612]

Já com o agendador [Schedulers.trampoline], obtêm-se os seguintes resultados:

1
2
3
4
5
6
1 ------Thread[main] ---- Time[14:107]
2 ------Thread[main] ---- Time[14:114]
100 ------Thread[main] ---- Time[14:115]
101 ------Thread[main] ---- Time[14:115]
100 ------Thread[main] ---- Time[14:115]
101 ------Thread[main] ---- Time[14:116]

7.8. Conclusion

Ainda há muito a fazer. Para aprofundar os conhecimentos sobre a biblioteca RxJava, convidamos o leitor a prosseguir a sua formação com as referências indicadas no início deste documento. Ainda assim, dispomos das bases necessárias para utilizar o RxJava nos ambientes Swing e Android. É isso que vamos demonstrar agora.