7. La libreria RxJava
La libreria RxJava si basa sul seguente concetto: uno stream di elementi di tipo T Observable<T> viene osservato da uno o più subscriber (sottoscrittori, osservatori, consumatori) Subscriber<T>. La libreria RxJava consente al flusso Observable<T> di essere eseguito nel thread T1 e al suo osservatore Subscriber<T> nel thread T2 senza che lo sviluppatore debba preoccuparsi della gestione del ciclo di vita di questi thread o di problemi intrinsecamente complessi, come la condivisione dei dati tra i thread e la loro sincronizzazione per l'esecuzione di un'operazione globale. Facilita quindi la programmazione asincrona.
Uno stream Observable<T> produce elementi di tipo T, che possono essere osservati man mano che vengono prodotti. Se l'osservatore e l'osservabile (un termine usato in senso lato per riferirsi al tipo Observable<T>) si trovano nello stesso thread, allora l'osservabile può produrre l'elemento (i+1) solo dopo che l'osservatore ha consumato l'elemento i. Ci sono pochi casi in cui questa architettura è utile. Se l'osservatore e l'osservabile non si trovano nello stesso thread, allora l'osservabile e il suo osservatore si comportano in modo autonomo: l'osservabile emette al proprio ritmo e l'osservatore consuma al proprio ritmo. È qui che risiede il valore della libreria. Finora abbiamo discusso solo di un singolo osservatore. In realtà, un osservabile può avere un numero qualsiasi di osservatori.
La libreria RxJava è particolarmente adatta all'architettura descritta nella Sezione 2 dell'introduzione e riassunta qui:

- in [1], un livello di servizio fornisce servizi, alcuni dei quali richiedono molto tempo per essere ottenuti (ad esempio, le richieste di rete);
- questo livello di servizio viene richiamato da un'interfaccia utente grafica [1] (Swing, Android, JavaFX). Se il livello di servizio viene eseguito nello stesso thread del metodo [Swing] che lo utilizza, l'interfaccia utente grafica si blocca (diventa non reattiva) mentre attende il risultato del servizio;
- In [2], un sottile livello di adattamento implementato con RxJava permette al livello GUI di ricevere un'implementazione asincrona dello stesso servizio: questo servizio può essere eseguito in un thread diverso dal metodo del livello GUI che lo invoca. In questo caso, la GUI [3] rimane reattiva: l'utente può continuare a interagire con essa, ad esempio attivando una nuova richiesta di rete in parallelo con la prima e, cosa più importante, all'utente può essere data la possibilità di annullare i processi che richiedono troppo tempo — cosa impossibile se la GUI è bloccata;
- La chiamata [4] è sincrona, mentre le chiamate [5-6] sono asincrone;
In questa architettura, il livello [2] fornisce servizi che restituiscono tipi Observable<T> a cui i metodi del livello grafico [3] possono iscriversi. Un servizio nel livello [2] fornisce quindi i propri risultati uno per uno, e il livello [3] può reagire a ciascuno di essi, ad esempio aggiornando uno o più componenti dell'interfaccia utente grafica.
La classe Observable<T> dispone di decine di metodi. Questa è una delle sfide della libreria: è molto ricca ed è difficile coglierne tutte le possibilità. Ne presenteremo alcune. La padronanza degli altri metodi arriverà con il tempo.
7.1. Creazione di osservabili e sottoscrizione agli stessi
7.1.1. Esempio-01: il metodo [Observable.from]
![]() |
Si consideri il seguente codice:
package dvp.rxjava.observables;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import java.util.Arrays;
public class Exemple01 {
public static void main(String[] args) {
// observable integers
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
obs1.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.printf("next : %s%n", integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println(throwable);
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed");
}
});
}
}
- Riga 12: creiamo un tipo Observable<Integer> da un elenco di numeri interi.
La classe Observable<T> è un flusso di elementi di tipo T che possono essere osservati — preferibilmente in modo asincrono, ma non necessariamente — man mano che vengono generati. La sua definizione è la seguente:
![]() |
Come accennato in precedenza, la classe Observable<T> dispone di decine di metodi. Alcuni sono simili a quelli della classe Stream<T> discussa nella Sezione 5. La documentazione di RxJava include dei "diagrammi a biglie" [2] che illustrano il funzionamento di questi metodi:
- La riga 3 illustra le emissioni dell'observable nel tempo;
- il metodo [4] viene applicato agli elementi emessi dall'observable. Generalmente produce un nuovo observable;
- la riga 5 mostra il nuovo osservabile ottenuto;
Il metodo [Observable.from] ha la seguente firma:
![]() |
Il metodo statico [Observable.from] consente di creare un Observable<T> a partire da una collezione di elementi di tipo T. Si tratta di un modo molto semplice per iniziare a utilizzare gli observable. La riga:
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
emetterà quindi tre elementi. Non li emette immediatamente. Li emetterà per intero ogni volta che un sottoscrittore si registra. Questo è chiamato osservabile freddo. L'osservabile riemette i suoi elementi per ogni nuovo sottoscrittore.
Possiamo considerare l'istruzione precedente come un'azione di configurazione per l'observable. Viene configurata una volta ed eseguita n volte se compaiono n sottoscrittori.
Come ci si abbona?
Un modo per farlo è utilizzare il metodo [Observable.subscribe], la cui definizione qui utilizzata è la seguente:
![]() |
- il primo parametro [Action1<T> onNext] (vedi Sezione 6.2) del metodo è il metodo da eseguire quando l'osservabile emette un nuovo elemento T;
- il secondo parametro [Action1<Throwable> onError] del metodo è il metodo da eseguire quando l'osservabile genera un'eccezione;
- il terzo parametro [Action0 onComplete] (vedi Sezione 6.1) del metodo è il metodo da eseguire quando l'osservabile emette un'eccezione;
- il metodo restituisce un tipo [Subscription];
Il tipo [Subscription] rappresenta una sottoscrizione all'osservabile. La sua definizione è la seguente:
![]() |
Il valore di questa interfaccia [1] risiede nel suo metodo [2], che consente di annullare un abbonamento.
Nel nostro esempio, il codice per l'iscrizione all'osservabile è il seguente:
obs1.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.printf("next : %s%n", integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println(throwable);
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed");
}
});
- riga 1: il risultato di tipo [Subscription] viene ignorato;
- righe 1–15: i tre parametri sono istanze di classi anonime. Useremo anche le lambda. Il vantaggio delle classi anonime è che i tipi di dati previsti dal singolo metodo di queste classi sono chiaramente visibili;
- righe 2–5: implementazione del primo parametro di tipo [Action1<Integer>];
- righe 6–10: implementazione del secondo parametro di tipo [Action1<Throwable>];
- righe 11–15: implementazione del terzo parametro di tipo [Action0];
Il codice completo è il seguente:
package dvp.rxjava.observables;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import java.util.Arrays;
public class Exemple01 {
public static void main(String[] args) {
// observable integers
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
// subscription
obs1.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.printf("next : %s%n", integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println(throwable);
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed");
}
});
}
}
L'osservabile alla riga 12 inizia a emettere i suoi tre elementi non appena viene chiamato il metodo [subscribe] alla riga 14. Da quel momento in poi:
- per ogni elemento emesso, vengono eseguite le righe 15–18.
- quando i 3 elementi sono terminati, vengono eseguite le righe 24–29;
- le righe 19–24 non verranno mai eseguite perché l'osservabile non emette un'eccezione in questo caso;
Per impostazione predefinita, l'osservabile e l'osservatore vengono eseguiti nello stesso thread. Esistono alcuni osservabili predefiniti che vengono eseguiti in un thread diverso dal thread principale (in questo caso, il thread del metodo main), ma per la maggior parte di essi non è così. Quindi, in questo caso, tutto avviene nel thread del metodo main:
- l'osservabile emette l'elemento 1;
- le righe 15–18 vengono eseguite e visualizzano questo elemento;
- l'osservabile emette l'elemento 2;
- le righe 15–18 eseguono e visualizzano questo elemento;
- l'osservabile emette l'elemento 3;
- le righe 15–18 eseguono e visualizzano questo elemento;
- l'osservabile emette la notifica [completed];
- le righe 24–29 vengono eseguite;
Ecco cosa mostrano i risultati:
La classe [Example02] riutilizza [Example01], questa volta utilizzando funzioni lambda come parametri per il metodo [Observable.subscribe]:
package dvp.rxjava.observables;
import java.util.Arrays;
import rx.Observable;
public class Exemple02 {
public static void main(String[] args) {
// observable integers
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
// subscription
obs1.subscribe(
(integer) -> System.out.printf("next : %s%n", integer),
(th) -> System.out.println(th),
() -> System.out.println("completed"));
}
}
7.1.2. Esempio-03: La classe Observer
![]() |
Il metodo [Observable.subscribe], che consente di sottoscrivere un osservabile, ha diverse versioni, tra cui le seguenti:
package dvp.rxjava.observables;
import java.util.Arrays;
import rx.Observable;
import rx.Observer;
public class Exemple03 {
public static void main(String[] args) {
// observable integers
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
// subscription
obs1.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("completed");
}
@Override
public void onError(Throwable th) {
System.out.printf("throwable %s", th);
}
@Override
public void onNext(Integer integer) {
System.out.printf("next : %s%n", integer);
}
});
};
}
Riga 13: invece di passare tre parametri al metodo [subscribe], gli passiamo un tipo [Observer] come segue:
![]() |
Il tipo [Observer] è un'interfaccia con tre metodi:
- [onNext(T t)], che viene chiamato ogni volta che l'osservabile emette un elemento t;
- [onError(Throwable th)], che viene chiamato quando l'osservabile genera un'eccezione th;
- [onCompleted], che viene chiamato quando l'osservabile indica di aver terminato l'emissione;
Il codice funziona in modo simile a quanto spiegato in precedenza. Otteniamo i seguenti risultati:
7.1.3. Esempio-04: Il metodo [Observable.create]
![]() |
Il metodo statico *Observable.create* è definito come segue:
![]() |
- Il metodo [create] restituisce un tipo Observable<T>;
- il parametro del metodo [create] è una funzione di tipo [Observable.OnSubscribe<T>] definita come segue:
![]() |
Il tipo [Observable.OnSubscribe<T>] è un'interfaccia funzionale che a sua volta estende l'interfaccia funzionale [Action1<Subscriber<? super T>>]. Il metodo [call] di questa interfaccia richiede un tipo [Subscriber] (subscriber, observer) definito come segue:
![]() |
Vediamo in [1] che la classe [Subscriber<T>] implementa l'interfaccia [Observer<T>] presentata nella Sezione 7.1.2.
Infine, il metodo [<T> Observable.create]:
- accetta come parametro un'istanza di tipo [Observable.OnSubscribe<T>] con un unico metodo: void call(Subscriber<T> s). Il tipo [Subscriber<T>] estende il tipo [Observer<T>] e quindi dispone dei metodi onNext, onError e onCompleted;
- restituisce un tipo Observable<T>;
Il metodo [<T> Observable.create] restituisce un osservabile configurato. Non è stato ancora emesso alcun elemento. Quando un sottoscrittore [Subscriber<T> s] si abbona a questo osservabile, viene chiamato il metodo [void call(s)] della funzione passata come parametro al metodo [<T> Observable.create]. Il suo ruolo è quello di emettere elementi t di tipo T e di chiamare il metodo dell’osservatore [s.onNext(t)] ad ogni emissione. Una volta completata l'operazione, deve essere chiamato il metodo [s.onCompleted(t)] dell'osservatore e il metodo [call] deve terminare. Se il metodo [call] incontra un'eccezione th, deve essere chiamato il metodo [s.onError(th)] dell'osservatore e il metodo [call] deve terminare;
Per illustrare questo comportamento complesso, useremo il seguente codice [Esempio04]:
package dvp.rxjava.observables;
import rx.Observable;
import rx.Subscriber;
import java.util.Random;
public class Exemple04 {
public static void main(String[] args) {
// observable configuration of reals
Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
@Override
public void call(Subscriber<? super Double> subscriber) {
for (int i = 0; i < 3; i++) {
// emission element i
subscriber.onNext(new Random((i + 1)).nextDouble());
}
// end of issue
subscriber.onCompleted();
}
});
// subscription and therefore emission
obs1.subscribe((d) -> System.out.printf("onNext %s%n", d), (th) -> System.out.printf("onError %s%n", th),
() -> System.out.println("onCompleted"));
}
}
- riga 11: viene creato un osservabile che emette tipi Double;
- righe 11–21: il parametro del metodo [create] viene istanziato con una classe anonima contenente il singolo metodo [call] delle righe 12–20. L'osservabile creato alla riga 11 è pronto per emettere, ma lo farà solo quando arriverà un osservatore;
- righe 13–21: il metodo [call] riceve un riferimento a un osservatore;
- righe 14–17: tre elementi vengono emessi all'osservatore;
- riga 19: notifica all'osservatore che l'emissione è terminata;
- righe 23–24: sottoscrizione all'osservabile della riga 11. Implementiamo i tre parametri [onNext, onError, onCompleted] del metodo [subscribe] utilizzando tre lambda. Questa sottoscrizione creerà il sottoscrittore [Subscriber<Double>], che verrà passato al metodo [call] nella riga 13. L'emissione degli elementi avrà quindi inizio;
- tutto avviene nello stesso thread: osservabile e osservatore;
Otteniamo i seguenti risultati:
Il metodo [Observable.create] consente di creare un osservabile a partire da qualsiasi evento. Questo è il metodo che abbiamo utilizzato nella Sezione 2 dell'introduzione per trasformare un'interfaccia sincrona in una asincrona.
7.1.4. Esempio-05: Rifattorizzazione di [Esempio-04]
![]() |
L'esempio seguente presenta una nuova versione del metodo statico [Observable.subscribe]:
package dvp.rxjava.observables;
import rx.Observable;
import rx.Subscriber;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
public class Exemple05 {
public static void main(String[] args) {
// configuration of a real observable
Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
@Override
public void call(Subscriber<? super Double> subscriber) {
showInfos("Observable.call start");
for (int i = 0; i < 3; i++) {
// waiting
try {
Thread.sleep(500 - i * 100);
} catch (InterruptedException e) {
// error
subscriber.onError(e);
}
// action
double value = new Random().nextInt(100) * 1.2;
showInfos(String.format("Observable.call onNext(%s)", value));
subscriber.onNext(value);
}
// finish
showInfos(String.format("Observable.call onCompleted"));
subscriber.onCompleted();
}
});
// a subscriber
Subscriber<Double> subscriber = new Subscriber<Double>() {
@Override
public void onCompleted() {
showInfos("Subscriber.onCompleted");
}
@Override
public void onError(Throwable e) {
showInfos(String.format("Subscriber.onError (%s)", e));
}
@Override
public void onNext(Double aDouble) {
showInfos(String.format("Subscriber.onNext (%s)", aDouble));
}
};
// subscription
showInfos("avant souscription");
obs1.subscribe(subscriber);
showInfos("après souscription");
}
private static void showInfos(String message) {
System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
new SimpleDateFormat("ss:SSS").format(new Date()));
}
}
- riga 56: la nuova versione del metodo statico [Observable.subscribe] accetta come parametro il tipo [Subscriber], che abbiamo introdotto nel paragrafo precedente;
- righe 37–52: il subscriber (osservatore). Implementa l'interfaccia Observer con i suoi tre metodi onNext, onError e onCompleted;
- Righe 61–64: Da qui in poi, ci concentreremo sui thread in cui vengono eseguiti l'observable e il suo osservatore;
- riga 62: il nome del thread;
- riga 63: l'ora corrente espressa in secondi e millisecondi. Questo ci permetterà di tracciare nel tempo l'emissione di elementi da parte dell'observable e la loro elaborazione da parte dell'observer;
- Questo codice ha la stessa funzionalità del codice precedente. Abbiamo semplicemente rifattorizzato quest'ultimo;
I risultati ottenuti sono i seguenti:
- Riga 1 dei risultati: prima della riga 56 del codice, non è ancora successo nulla. L'observable è stato semplicemente configurato;
- Riga 2 dei risultati: la riga 56 del codice attiva una chiamata al metodo [call] alla riga 15. Riga 3: il numero reale 80,39 viene emesso all'osservatore;
- Riga 4: l'osservatore riceve il numero emesso;
- righe 5–8: il processo precedente si ripete due volte;
- riga 9: l'osservabile invia la notifica di fine trasmissione;
- riga 10: l'osservatore la riceve;
- riga 11: visualizzata dalla riga 57 del codice;
Possiamo quindi vedere che la singola iscrizione alla riga 56 ha causato la visualizzazione delle righe 2–10 dei risultati. Quando si inizia a utilizzare la libreria RxJava, ci si chiede come le cose siano collegate tra loro, in particolare le connessioni tra l’osservatore e l’osservabile. Qui vediamo che la riga 56, l’iscrizione all’osservabile,
- ha innescato l'emissione di tutti gli elementi dell'osservabile;
- che l'osservabile e l'osservatore girano nello stesso thread;
- e che, per questo motivo, osserviamo la sequenza: emetti l'elemento i, osserva l'elemento i, emetti l'elemento (i+1), osserva l'elemento (i+1), ...
Ricordiamo che l'emittente era in attesa prima di emettere i propri elementi:
// attente
try {
Thread.sleep(500 - i * 100);
} catch (InterruptedException e) {
// erreur
subscriber.onError(e);
}
dove i nella riga 3 rappresenta il numero di emissione (0 <= i < 3). Se osserviamo i tempi di emissione degli elementi dell'osservabile:
- righe 2, 3: l'elemento 0 è stato emesso circa 500 ms dopo l'inizio della sottoscrizione;
- righe 3, 5: l'elemento 1 è stato emesso circa 400 ms dopo l'elemento 0;
- righe 5, 7: l'elemento 2 è stato emesso circa 300 ms dopo l'elemento 1;
7.2. Thread di esecuzione, thread di osservazione
7.2.1. Esempio-06: Osservabile e osservatore in un thread diverso da [main]
![]() |
Rifattorizziamo l'esempio precedente come segue [Esempio06]:
package dvp.rxjava.observables;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class Exemple06 {
public static void main(String[] args) {
// gatekeeper
CountDownLatch latch = new CountDownLatch(1);
// configuration of a real observable
Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
@Override
public void call(Subscriber<? super Double> subscriber) {
showInfos("Observable.call start");
for (int i = 0; i < 3; i++) {
// waiting
try {
Thread.sleep(500 - i * 100);
} catch (InterruptedException e) {
// error
subscriber.onError(e);
}
// action
double value = new Random().nextInt(100) * 1.2;
showInfos(String.format("Observable.call onNext(%s)", value));
subscriber.onNext(value);
}
// finish
showInfos(String.format("Observable.call onCompleted"));
subscriber.onCompleted();
}
});
// a subscriber
Subscriber<Double> subscriber = new Subscriber<Double>() {
@Override
public void onCompleted() {
showInfos("Subscriber.onCompleted");
// we lower the barrier
latch.countDown();
}
@Override
public void onError(Throwable e) {
showInfos(String.format("Subscriber.onError (%s)", e));
}
@Override
public void onNext(Double aDouble) {
showInfos(String.format("Subscriber.onNext (%s)", aDouble));
}
};
// suite observable configuration
obs1 = obs1.subscribeOn(Schedulers.computation());
// subscription
showInfos("avant souscription");
obs1.subscribe(subscriber);
// waiting at the gate
try {
showInfos("début attente barrière");
latch.await();
showInfos("fin attente barrière");
} catch (InterruptedException e1) {
System.out.println(e1);
}
showInfos("après souscription");
}
private static void showInfos(String message) {
System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
new SimpleDateFormat("ss:SSS").format(new Date()));
}
}
- Riga 16: creiamo un guardrail (semaforo) utilizzando un oggetto [CountDownLatch]. Questo oggetto viene utilizzato per sincronizzare i thread tra loro. Qui viene inizializzato con il valore 1, che chiameremo valore del guardrail (o semaforo). Un thread attende il guardrail utilizzando la seguente operazione:
latch.await();
Il thread viene bloccato se il valore del latch è >0. Un thread può incrementare o decrementare il valore interno del latch. Riga 48: il valore del latch viene decrementato di 1.
- Riga 63: l'osservabile è configurato per essere eseguito su un thread fornito dallo scheduler [Schedulers.computation()]. Questo scheduler può fornire tanti thread quanti sono i core presenti sulla macchina di esecuzione. La sezione sull'applicazione di esempio ha illustrato l'uso di altri scheduler (vedere la Sezione 2.8);
Il principio del codice è il seguente:
- il metodo [main] viene eseguito nel thread principale;
- riga 66: inizia a emettere elementi dall'osservabile. Questi saranno emessi su un thread diverso dal thread principale;
- riga 70: il thread principale viene bloccato perché la barriera ha il valore 1 (vedi riga 16). Può continuare solo quando questo valore cambia in 0. Ciò avviene alla riga 48. È l'osservatore che abbassa la barriera quando riceve la notifica che l'osservabile ha terminato l'emissione;
L'esecuzione produce i seguenti risultati:
- riga 1: la sottoscrizione sta per avvenire;
- riga 2: questo innesca l'esecuzione del metodo [call] sul thread [RxComputationThreadPool-1]. Ora abbiamo un'esecuzione parallela con due thread;
- riga 3: per una ragione sconosciuta, il thread [RxComputationThreadPool-1] ha ceduto il controllo. Il thread [main] prende quindi il controllo e viene bloccato dalla barriera (riga 70 del codice). Da questo punto in poi, solo il thread [RxComputationThreadPool-1] può operare;
- righe 4–11: osserviamo il comportamento visto in precedenza tra l'osservabile e il suo osservatore, ma ora tutto avviene nel thread [RxComputationThreadPool-1];
- righe 12-13: l'osservatore ha abbassato la barriera (riga 48 del codice) e il thread [RxComputationThreadPool-1] si è terminato. Il thread [main] prende il controllo e visualizza due messaggi;
7.2.2. Esempio-07: Osservabile e osservatore in due thread diversi
![]() |
Modifichiamo l'esempio precedente come segue:
package dvp.rxjava.observables;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class Exemple07 {
public static void main(String[] args) {
// gatekeeper
CountDownLatch latch = new CountDownLatch(1);
// configuration of a real observable
Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
@Override
public void call(Subscriber<? super Double> subscriber) {
showInfos("Observable.call start");
for (int i = 0; i < 3; i++) {
// waiting
try {
Thread.sleep(500 - i * 100);
} catch (InterruptedException e) {
// error
subscriber.onError(e);
}
// action
double value = new Random().nextInt(100) * 1.2;
showInfos(String.format("Observable.call onNext(%s)", value));
subscriber.onNext(value);
}
// finish
showInfos(String.format("Observable.call onCompleted"));
subscriber.onCompleted();
}
});
// a subscriber
Subscriber<Double> subscriber = new Subscriber<Double>() {
@Override
public void onCompleted() {
showInfos("Subscriber.onCompleted");
// we lower the barrier
latch.countDown();
}
@Override
public void onError(Throwable e) {
showInfos(String.format("Subscriber.onError (%s)", e));
}
@Override
public void onNext(Double aDouble) {
showInfos(String.format("Subscriber.onNext (%s)", aDouble));
}
};
// suite observable configuration
obs1 = obs1.subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation());
// subscription
showInfos("avant souscription");
obs1.subscribe(subscriber);
// waiting in front of the barrier
try {
showInfos("début attente barrière");
latch.await();
showInfos("fin attente barrière");
} catch (InterruptedException e1) {
System.out.println(e1);
}
showInfos("après souscription");
}
private static void showInfos(String message) {
System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
new SimpleDateFormat("ss:SSS").format(new Date()));
}
}
Il codice è identico a quello dell'esempio precedente, tranne che per la riga 63:
obs1 = obs1.subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation());
che configura l'osservabile (subscribeOn) e l'osservatore (observeOn) in modo che vengano eseguiti su uno dei thread forniti dallo scheduler [Schedulers.computation()].
I risultati ottenuti sono i seguenti:
Si possono notare i seguenti punti:
- l'osservabile viene eseguito nel thread [RxComputationThreadPool-4] (righe 3–4, 6, 8–9);
- l'osservatore viene eseguito nel thread [RxComputationThreadPool-3] (righe 5, 7, 10-11);
- eseguono in modo indipendente. Pertanto, nelle righe 8–9, l'osservabile emette due notifiche (onNext, onCompleted) prima che l'osservatore recuperi la notifica [onNext] (riga 10);
La libreria RxJava gestisce il trasferimento dei dati (emissioni) dal thread dell'osservabile al thread dell'osservatore. Lo sviluppatore non deve preoccuparsi di questo.
Abbiamo visto come creare osservabili (Observable.from, Observable.create). Ora diamo un'occhiata agli osservabili predefiniti nella libreria RxJava.
7.3. Observable predefiniti
7.3.1. Esempio-08: il metodo [Observable.range]
![]() | ![]() |
D'ora in poi, useremo classi dedicate per i processi osservati e i loro osservatori. L'idea è quella di poter registrare i loro nomi, i thread di esecuzione e i tempi di esecuzione, in modo da poterli monitorare nel tempo.
La classe [Process] sarà semplicemente un Observable a cui potremo assegnare un nome. Implementerà la seguente interfaccia [IProcess]:
package dvp.rxjava.observables.utils;
import rx.Observable;
public interface IProcess<T> {
// name of observable
public String getName();
// observable
public Observable<T> getObservable();
}
Questa interfaccia può essere implementata dalla seguente classe [Process<T>]:
package dvp.rxjava.observables.utils;
import rx.Observable;
import rx.Scheduler;
public class Process<T> implements IProcess<T>{
// observable name
protected String name;
// observed process
protected Observable<T> observable;
// manufacturers
public Process(String name, Observable<T> observable) {
// local initializations
this.name = name;
this.observable = observable;
}
// getters and setters
public String getName() {
return name;
}
public Observable<T> getObservable() {
return observable;
}
}
- riga 9: il nome del processo;
- riga 11: l'osservabile osservato;
- righe 14–18: il costruttore;
L'osservatore sarà descritto dalla seguente classe [Observer]:
package dvp.rxjava.observables.utils;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import rx.Subscriber;
public class Observateur<T> extends Subscriber<T> {
...
}
- Riga 11: La classe `Observateur<T>` estende la classe `Subscriber<T>`, che abbiamo brevemente presentato nella sezione 7.1.3. La useremo come argomento per il metodo [Observable.subscribe]:
// exécution observable (observation)
obs1.subscribe(observateur);
Il metodo [Observable.subscribe] utilizzato nella riga 2 sopra riportata ha la seguente definizione:
![]() |
Il ruolo del [Subscriber] consiste principalmente nel gestire gli elementi emessi dall'osservabile a cui si è iscritto utilizzando i metodi dell'interfaccia [Observer]: onNext, onError, onCompleted. La classe [Subscriber] dispone dei seguenti metodi:
![]() |
Nel codice della classe [Observer], useremo il metodo [1] isUnsubscribed per determinare se l'iscrizione dell'abbonato è stata annullata o meno. La classe completa [Observer<T>] è la seguente:
package dvp.rxjava.observables.utils;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import rx.Subscriber;
public class Observateur<T> extends Subscriber<T> {
// a gatekeeper (semaphore)
private CountDownLatch latch;
// a display method
private Consumer<String> showInfos;
// observer's name
private String observerName;
// the name of the observed process
private String processName;
// manufacturers
public Observateur() {
}
public Observateur(String name, CountDownLatch latch, Consumer<String> showInfos, String observedName) {
this.observerName = name;
this.latch = latch;
this.showInfos = showInfos;
this.processName = observedName;
}
// --------------------------- implementation interface Observer<T>
@Override
public void onCompleted() {
// end of issues
if (!isUnsubscribed()) {
showInfos.accept(String.format("Subscriber [%s,%s].onCompleted", observerName, processName));
}
// end of main thread lock
latch.countDown();
}
@Override
public void onError(Throwable e) {
// emission error
if (!isUnsubscribed()) {
showInfos.accept(String.format("Subscriber [%s, %s].onError (%s)", observerName, processName, e));
}
}
@Override
public void onNext(T value) {
// an additional show
if (!isUnsubscribed()) {
try {
showInfos.accept(String.format("Subscriber [%s,%s] : onNext (%s)", observerName, processName,
new ObjectMapper().writeValueAsString(value)));
} catch (JsonProcessingException e) {
showInfos.accept(String.format("Subscriber [%s,%s].onNext (%s)", observerName, processName, e));
}
}
}
}
- Oltre alle caratteristiche di un Subscriber, l'Observer conterrà le seguenti informazioni:
- riga 14: una barriera o un semaforo che verrà utilizzato per bloccare il thread principale fino a quando l'osservatore non avrà ricevuto tutti gli elementi emessi dall'osservabile. Ciò avverrà alla riga 36 del codice quando l'osservatore riceverà la notifica di fine emissione dall'osservabile;
- riga 16: un'istanza Consumer<String> che verrà utilizzata per visualizzare un messaggio sulla console;
- riga 18: il nome dell'osservatore, utilizzato per distinguere tra gli osservatori quando ce ne sono più di uno;
- riga 20: il nome del processo osservato;
- righe 36, 46, 54: i metodi [onCompleted, onError, onNext] dell'interfaccia [Observer<T>] implementati dalla classe astratta [Subscriber<T>]. Questa classe non li implementa. Ciò deve quindi essere fatto nelle sue classi figlie. Prima di fare qualsiasi cosa in questi metodi, controlliamo se l'osservatore si è disabbonato dall'osservabile che sta osservando;
- riga 59: il metodo [onNext] dell'osservatore scrive la stringa JSON dell'elemento ricevuto. Questo ci permetterà di visualizzare vari tipi di elementi;
Detto questo, esaminiamo un nuovo metodo della classe Observable, il metodo [range]:
![]() |
L'osservabile Observable.range(n,m) emette (m) numeri interi compresi tra n e n+m-1. Lo esploreremo con il seguente codice [Esempio08]:
package dvp.rxjava.observables.exemples;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import dvp.rxjava.observables.utils.Observateur;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple08 {
public static void main(String[] args) throws InterruptedException {
// number of observers
final int nbObservateurs = 2;
// semaphore
CountDownLatch latch = new CountDownLatch(nbObservateurs);
// observable configuration
Observable<Integer> obs1 = Observable.range(15, 3).subscribeOn(Schedulers.computation());
// observable performance (observation)
showInfos.accept("main : début observation");
for (int i = 0; i < nbObservateurs; i++) {
obs1.subscribe(new Observateur<>(String.format("observateur[%d]", i), latch, showInfos,"obs1"));
}
// waiting
showInfos.accept("main : attente fin observation");
latch.await();
// end
showInfos.accept("main : fin observation");
}
// displays
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
- Riga 16: useremo due osservatori;
- riga 19: il guardrail (semaforo) viene inizializzato a due perché collocheremo ciascun osservatore su un thread diverso. Il thread principale dovrà quindi attendere che entrambi i thread degli osservatori abbiano terminato;
- riga 22: configuriamo l'osservabile in modo che venga eseguito su un thread dello scheduler [Schedulers.computation()]. L'osservatore si troverà sullo stesso thread dell'osservabile;
- righe 25–27: iscriviamo due osservatori all'osservabile. Questo attiverà l'esecuzione completa dell'osservabile per ciascun osservatore: verranno emessi i numeri interi 15, 16 e 17;
- riga 30: il thread principale attende che gli osservatori terminino;
I risultati ottenuti sono i seguenti:
- riga 2: il thread principale è bloccato, in attesa che i due osservatori finiscano;
- righe 3-4: vediamo che l'osservatore 0 si trova sul thread [RxComputationThreadPool-1] e l'osservatore 1 sul thread [RxComputationThreadPool-2];
- righe 3-10: vediamo che entrambi gli osservatori ricevono esattamente gli stessi elementi;
Useremo la classe Observer definita qui per illustrare il comportamento di altri tipi di osservabili.
7.3.2. Esempio-09: i metodi Observable.[interval, take, doNext]
![]() |
![]() |
Questo esempio illustra l'uso dell'osservabile Observable.interval(long interval, TimeUnit unit), che emette numeri interi lunghi a intervalli regolari. Nota punto [1]: per impostazione predefinita, l'osservabile [Observable.interval] viene eseguito su uno dei thread dello scheduler [Schedulers.computation].
Il codice sarà il seguente:
package dvp.rxjava.observables.exemples;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import dvp.rxjava.observables.utils.Observateur;
import rx.Observable;
public class Exemple09 {
public static void main(String[] args) throws InterruptedException {
// number of observers
final int nbObservateurs = 2;
// semaphore
CountDownLatch latch = new CountDownLatch(nbObservateurs);
// observable configuration
Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.doOnNext(l -> showInfos.accept(l.toString()));
// observable performance (observation)
showInfos.accept("main : début observation");
for (int i = 0; i < nbObservateurs; i++) {
obs1.subscribe(new Observateur<>(String.format("observateur [%d]", i), latch, showInfos,
"obs1"));
}
// waiting
showInfos.accept("main : attente fin observation");
latch.await();
// end
showInfos.accept("main : fin observation");
}
// displays
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
- riga 22: l'osservabile emette numeri interi lunghi ogni 500 millisecondi. La sequenza inizia con il numero 0;
- riga 22: questo osservabile emette un numero infinito di valori. Il metodo [Observable.take(n)] crea un nuovo osservabile che conserva solo i primi n elementi emessi;
![]() |
Rivediamo il codice dell'osservabile:
Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.doOnNext(l -> showInfos.accept(l.toString()));
Riga 2: Il metodo [Observable.doOnNext] viene eseguito ogni volta che l'osservabile emette un nuovo elemento. Viene spesso utilizzato per registrare informazioni. In questo caso, vogliamo registrare la data di emissione degli elementi per verificare che venga mantenuto l'intervallo di 500 millisecondi. Il metodo [Observable.doOnNext] non modifica l'osservabile a cui viene applicato. La sua definizione è la seguente:
![]() |
L'esecuzione produce i seguenti risultati:
- righe 3, 7 e 11: vediamo che l'intervallo di emissione è di circa 500 ms;
- i due osservatori si trovano effettivamente su due thread diversi, anche se l'osservabile non era stato configurato per essere eseguito con uno scheduler specifico. Questo è il comportamento predefinito dell'osservabile [Observable.interval] che vediamo qui;
7.3.3. Esempi-10/12: i metodi Observable.[error, empty, never]
![]() | ![]() |
D'ora in poi, saremo più concisi nelle nostre illustrazioni dei metodi della classe [Observable]. Il codice precedente era il seguente:
package dvp.rxjava.observables;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import rx.Observable;
public class Exemple09 {
public static void main(String[] args) throws InterruptedException {
// number of observers
final int nbObservateurs = 2;
// semaphore
CountDownLatch latch = new CountDownLatch(nbObservateurs);
// observable configuration
Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.doOnNext(l -> showInfos.accept(l.toString()));
// observable performance (observation)
showInfos.accept("main : début observation");
for (int i = 0; i < nbObservateurs; i++) {
obs1.subscribe(new Observateur<>(String.format("observateur [%d]", i), latch, showInfos,
"obs1"));
}
// waiting
showInfos.accept("main : attente fin observation");
latch.await();
// end
showInfos.accept("main : fin observation");
}
// displays
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
Questo codice è già stato utilizzato nell'esempio precedente. Sono cambiate solo le righe 21–22. Estrarremo quindi la maggior parte di questo codice nella seguente classe [ProcessUtils]:
package dvp.rxjava.observables.utils;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import rx.Observable;
public class ProcessUtils {
@SafeVarargs
public static void subscribe(int nbObservateurs, IProcess<?>... processes) throws InterruptedException {
// semaphore
CountDownLatch latch = new CountDownLatch(nbObservateurs * processes.length);
// observable performance (observation)
showInfos.accept("main : début observation");
for (int i = 0; i < nbObservateurs; i++) {
for (IProcess<?> process : processes) {
Observable<?> obs = process.getObservable();
obs.subscribe(new Observateur<>(String.format("observateur[%d]", i), latch, showInfos, process.getName()));
}
}
// waiting
showInfos.accept("main : attente fin observation");
latch.await();
// end
showInfos.accept("main : fin observation");
}
// displays
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
- riga 13: il metodo accetta due parametri:
- nbObservers: il numero di osservatori per i processi passati come secondo parametro;
- processi: i processi (denominati "osservabili") da osservare. Grazie alla notazione [IProcess<?>], i processi possono emettere elementi di tipi diversi;
- riga 16: il semaforo deve diventare verde quando tutti gli osservatori hanno completato tutte le loro osservazioni. Il valore iniziale del semaforo è quindi il numero di osservatori moltiplicato per il numero di osservazioni;
- Righe 20–25: ogni osservatore è iscritto a tutti i processi che deve osservare;
- riga 23: recupera l'osservabile dal processo (vedi Sezione 7.3.1);
- riga 23: un osservatore è iscritto ad esso. All'osservatore vengono passate quattro informazioni:
- il suo nome;
- il semaforo che deve decrementare quando riceve la notifica di fine trasmissione dall'osservabile che sta osservando;
- il metodo da utilizzare quando desidera registrare informazioni sulla console;
- il nome del processo che osserverà;
Una volta definite queste classi, l'Esempio 10 sarà il seguente:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple10 {
public static void main(String[] args) throws InterruptedException {
// observable configuration
Observable<?> obs = Observable.error(new RuntimeException("Erreur !!!")).subscribeOn(Schedulers.computation());
// observable execution (observation)
ProcessUtils.subscribe(2,new Process<>("process1", obs));
}
}
Alla riga 11, il metodo statico [Observable.error] è definito come segue:
![]() |
La riga 8 configura quindi un osservabile che si limita a lanciare un'eccezione al metodo [onError] dei suoi sottoscrittori. L'esecuzione produce i seguenti risultati:
main : début observation ------Thread[main] ---- Time[22:618]
main : attente fin observation ------Thread[main] ---- Time[22:636]
Subscriber[observateur[1], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-2] ---- Time[22:638]
Subscriber[observateur[0], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-1] ---- Time[22:638]
Righe 3 e 4: il metodo [onError] di entrambi gli abbonati ha ricevuto l'eccezione generata dall'osservabile.
Questa esecuzione presenta una particolarità: i metodi [onCompleted] di entrambi gli osservatori non sono stati chiamati. Di conseguenza, la barriera non è stata abbassata e il thread principale rimane bloccato nel metodo statico [ProcessUtils.subscribe] alla riga 3 seguente:
// attente
showInfos.accept("main : attente fin observation");
latch.await();
// fin
showInfos.accept("main : fin observation");
Qui vediamo che, se si verifica un errore nell'osservabile, il metodo [onCompleted] degli abbonati non viene chiamato. Modifichiamo quindi il metodo [Observer.onError] come segue:
@Override
public void onError(Throwable e) {
// erreur d'émission
if (!isUnsubscribed()) {
showInfos.accept(String.format("Subscriber[%s, %s].onError (%s)", observerName, processName, e));
}
// fin blocage thread principal
latch.countDown();
}
Aggiungiamo le righe 7–8 per rilasciare il blocco in caso di errore rilevabile. Con questo nuovo codice, l'esecuzione produce i seguenti risultati:
main : début observation ------Thread[main] ---- Time[40:750]
main : attente fin observation ------Thread[main] ---- Time[40:764]
Subscriber[observateur[0], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-1] ---- Time[40:766]
Subscriber[observateur[1], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-2] ---- Time[40:766]
main : fin observation ------Thread[main] ---- Time[40:767]
Otteniamo la riga 5, che prima non avevamo.
L'esempio 11 sarà il seguente:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple11 {
public static void main(String[] args) throws InterruptedException {
// observable configuration
Observable<?> obs1 = Observable.empty();
// observable execution (observation)
ProcessUtils.subscribe(2,new Process<>("process1",obs1));
}
}
Riga 10: Il metodo statico [Observable.empty] crea un osservabile che non emette alcun elemento. Emette solo la notifica di fine emissione;
![]() |
L'esecuzione del codice nell'esempio sopra riportato produce i seguenti risultati:
- Righe 2 e 3: vediamo che entrambi gli osservatori ricevono la notifica di fine trasmissione senza aver ricevuto alcun elemento in precedenza.
Ci si potrebbe chiedere a cosa serva effettivamente questo metodo. Può essere utilizzato in modo analogo a una collezione, inizialmente vuota, alla quale vengono poi aggiunti degli elementi:
Nella riga 3, uniamo l'osservabile iniziale obs (riga 1) con altri osservabili.
L'esempio 12 illustra il metodo statico [Observable.never]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple12 {
public static void main(String[] args) throws InterruptedException {
// observable configuration
Observable<?> obs1 = Observable.never();
// observable execution (observation)
ProcessUtils.subscribe(2,new Process<>("process1",obs1));
}
}
Il metodo statico [Observable.never] crea un osservabile che non emette mai:
![]() |
L'esecuzione dell'esempio produce i seguenti risultati:
Riga 2: il thread principale rimane in attesa a tempo indeterminato. Questo perché nessun osservabile emette la notifica [onCompleted], che consente al semaforo (barriera) di diventare verde (abbassare la barriera).
7.4. Multithreading
7.4.1. Esempio 13: thread di azione, thread di osservazione
Nella Sezione 7.1.3, abbiamo creato un osservabile utilizzando il metodo statico [Observable.create]:
![]() |
- il metodo [create] restituisce un tipo Observable<T>;
- il parametro del metodo [create] è una funzione di tipo [Observable.OnSubscribe<T>] definita come segue:
![]() |
Il tipo [Observable.OnSubscribe<T>] è un'interfaccia funzionale che a sua volta estende l'interfaccia funzionale [Action1<Subscriber<? super T>>]. Il metodo [call] di questa interfaccia richiede un tipo [Subscriber] (sottoscrittore, osservatore). Nel resto di questo documento, a volte ci riferiremo al tipo [Observable.OnSubscribe<T>] come a un'azione. Creeremo azioni personalizzate che avranno un nome. Queste saranno istanze della seguente interfaccia [IProcessAction]:
![]() |
package dvp.rxjava.observables.utils;
import rx.Observable;
public interface IProcessAction<T> extends Observable.OnSubscribe<T> {
// action has a name
public String getName();
}
- riga 5: l'interfaccia [IProcessAction<T>] presenta tutte le caratteristiche dell'interfaccia [Observable.OnSubscribe<T>];
- riga 8: dispone inoltre di un metodo [getName] che restituisce il nome dell'istanza che implementa l'interfaccia;
Useremo la seguente azione denominata [ProcessAction01]:
package dvp.rxjava.observables.utils;
import java.util.Random;
import rx.Subscriber;
import rx.functions.Func1;
public class ProcessAction01<T> implements IProcessAction<T> {
// data
private String name;
private int nbValues;
private Func1<Integer, T> func1;
// manufacturers
public ProcessAction01(String name, int nbValues, Func1<Integer, T> func1) {
this.name = name;
this.nbValues = nbValues;
this.func1 = func1;
}
@Override
public void call(Subscriber<? super T> subscriber) {
ProcessUtils.showInfos.accept(String.format("Observable (%s) call start", getName()));
for (int i = 0; i < nbValues; i++) {
// waiting
try {
Thread.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
// error
ProcessUtils.showInfos.accept(String.format("Observable (%s) onError", getName()));
subscriber.onError(e);
}
// element emission
T value = func1.call(i);
ProcessUtils.showInfos.accept(String.format("Observable (%s,%s) onNext (%s)", getName(), i, value));
subscriber.onNext(value);
}
// finish
ProcessUtils.showInfos.accept(String.format("Observable (%s) onCompleted", getName()));
subscriber.onCompleted();
}
@Override
public String getName() {
return name;
}
}
- riga 8: la classe [ProcessAction01<T>] implementa l'interfaccia [IProcessAction<T>] e quindi l'interfaccia [Observable.OnSubscribe<T>];
- riga 11: il nome dell'azione;
- riga 12: il numero di valori da emettere;
- riga 13: un'istanza di tipo [Func1<Integer, T>] che accetta un intero e produce un tipo T da emettere dall'osservabile (righe 35 e 37);
- righe 16–20: passiamo al costruttore il nome dell'azione, il numero di valori da emettere e la funzione di emissione;
- righe 23–42: il codice del processo;
- riga 23: il metodo [call] accetta come parametro il sottoscrittore dell'osservabile associato al processo;
- riga 28: il processo emette i propri elementi dopo un'attesa di durata casuale;
- riga 32: emissione di un errore;
- riga 37: un'emissione normale;
- riga 41: emette la notifica di fine emissione;
- righe 25–38: l'azione emette nbValues numeri reali dopo un tempo di attesa casuale (riga 30);
- riga 35: il valore da emettere è fornito dalla funzione [func1] passata come parametro al costruttore (riga 16);
Rifattorizziamo la classe [Process] (vedi Sezione 7.3.1) in modo che possa essere costruita anche con un'azione denominata. Aggiungiamo il seguente costruttore:
public Process(IProcessAction<T> na, Scheduler schedulerObserved, Scheduler schedulerObserver) {
// nom process=nom action
name = na.getName();
// action --> observable
observable = Observable.create(na);
// thread d'exécution du processus observé
if (schedulerObserved != null) {
observable = observable.subscribeOn(schedulerObserved);
}
// thread d'observation de l'observateur
if (schedulerObserver != null) {
observable = observable.observeOn(schedulerObserver);
}
}
- Riga 1: Il costruttore accetta 3 parametri:
- l'azione denominata che verrà utilizzata per costruire l'osservabile (riga 5);
- lo scheduler del processo osservato (può essere nullo);
- lo scheduler dell'osservatore (può essere nullo);
- riga 5: l'osservabile viene creato dall'azione passata come parametro;
Il codice seguente [Esempio 13] osserva diversi osservabili:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple13 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 1, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<String> process2 = new Process<>(
new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), Schedulers.computation(), null);
// process 3
Process<Integer> process3 = new Process<>(new ProcessAction01<Integer>("process3", 3, i -> i * 2), null,
Schedulers.computation());
// process 4
Process<Boolean> process4 = new Process<>(new ProcessAction01<Boolean>("process4", 4, i -> i % 2 == 0), null, null);
// subscriptions
ProcessUtils.subscribe(1, process1);
ProcessUtils.subscribe(1, process2);
ProcessUtils.subscribe(1, process3);
ProcessUtils.subscribe(1, process4);
}
}
- righe 13–15: process1 produce 1 numero reale su un thread di calcolo che verrà osservato su un altro thread di calcolo;
- righe 17–18: process2 produce 2 stringhe su un thread di calcolo e non viene fornita alcuna indicazione riguardo al thread dell'osservatore. I risultati mostrano che l'osservazione avviene per impostazione predefinita sullo stesso thread dell'esecuzione del processo;
- righe 20–21: il processo 3 produce 3 numeri interi su un thread non specificato, che saranno osservati su un thread di calcolo. I risultati mostrano che il processo viene eseguito per impostazione predefinita sul thread principale;
- riga 23: il processo process4 produce 4 valori booleani su un thread non specificato, che saranno osservati su un thread non specificato. I risultati mostrano che l'esecuzione del processo e la sua osservazione avvengono per impostazione predefinita sul thread principale;
Il risultato dell'esecuzione di questo codice è il seguente:
- Il processo process1 genera un numero reale (riga 4) sul thread di calcolo [RxComputationThreadPool-4], che viene osservato sul thread di calcolo [RxComputationThreadPool-3] (riga 6);
- Il processo process2 produce 2 stringhe (righe 12, 14) sul thread di calcolo [RxComputationThreadPool-5], che vengono osservate su quello stesso thread (righe 13, 15);
- process3 produce 3 numeri interi (righe 21, 23, 25) sul thread principale, che vengono osservati sul thread di calcolo [RxComputationThreadPool-6] (righe 22, 24, 28);
- il processo process4 produce 4 valori booleani (righe 34, 36, 38, 40) sul thread principale, che vengono osservati su quello stesso thread principale (righe 33, 35, 37, 39);
Il lettore è invitato a seguire quanto sopra:
- il ciclo di vita del processo osservato e del suo thread;
- il ciclo di vita del suo osservatore e del suo thread;
Gran parte del fascino delle librerie Rx risiede in questo multithreading, che lo sviluppatore non deve gestire autonomamente.
7.5. Combinazioni di più osservabili
7.5.1. Esempio 14: Unione di due osservabili con [Observable.merge]
Presentiamo ora i metodi statici della classe [Observable] che consentono di combinare più osservabili in un unico osservabile di risultato.
Il primo esempio di questo tipo è il seguente:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.ProcessAction01;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple14 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process2
Process<String> process2 = new Process<>(
new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), Schedulers.computation(), null);
// merge
Process<?> process12 = new Process<>("process12",
Observable.merge(process1.getObservable(), process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process12);
}
}
- righe 15–17: un processo denominato [process1] emetterà 3 numeri reali su un thread di calcolo. Sarà inoltre osservato su un thread di calcolo;
- righe 19–20: un processo denominato [process2] emetterà 2 stringhe su un thread di calcolo. Il thread di osservazione non è specificato. Abbiamo visto in precedenza che in questo caso il thread di osservazione è il thread di calcolo;
- riga 23: i due processi vengono uniti, ovvero viene creato un osservabile i cui elementi provengono simultaneamente da entrambi i processi. A tal fine viene utilizzato il metodo statico [Observable.merge]:
![]() |
Contrariamente a quanto potrebbe suggerire il diagramma sopra riportato, durante l'unione, gli elementi del flusso 1 possono essere intercalati tra gli elementi del flusso 2. Ciò è dimostrato dai risultati dell'esecuzione:
- riga 3: il processo [process1] viene eseguito sul thread di calcolo [RxComputationThreadPool-4];
- riga 4: il processo [process2] è in esecuzione sul thread di calcolo [RxComputationThreadPool-5];
- riga 9: il processo [process12] viene osservato sul thread di calcolo [RxComputationThreadPool-3]. Non conosco la regola che ha portato a questa scelta;
- righe 9–11: vediamo che l'osservatore osserva elementi provenienti da entrambi i processi [process1] (riga 5) e [process2] (righe 6, 7) anche se nessuno dei due è terminato (c'è una commistione);
- il processo [process12] termina (riga 17) quando entrambi i processi, process1 e process2, sono terminati;
7.5.2. Esempio-15: Concatenazione di due osservabili con [Observable.concat]
Esamineremo ora il codice seguente:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.ProcessAction01;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple15 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process2
Process<String> process2 = new Process<>(
new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), null, Schedulers.computation());
// concat
Process<?> process12 = new Process<>("process12",
Observable.concat(process1.getObservable(), process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process12);
}
}
- righe 15–17: un processo denominato [process1] emetterà 3 numeri reali su un thread di calcolo. Sarà inoltre osservato su un thread di calcolo;
- righe 19–20: un processo denominato [process2] emetterà 2 stringhe su un thread non specificato, in questo caso il thread principale predefinito. Sarà osservato su un thread di calcolo;
- riga 23: i due processi vengono concatenati, ovvero viene creato un osservabile i cui elementi provengono da entrambi i processi. I valori emessi non vengono mescolati. Il processo [process12] emetterà prima tutti i valori del processo [process1] e poi quelli del processo [process2]. A questo scopo viene utilizzato il metodo statico [Observable.concat]:
![]() |
I risultati dell'esecuzione sono i seguenti:
- righe 3-10: il processo [process1] viene eseguito e il processo [process12] emette i valori emessi da [process1];
- riga 9: il processo [process1] è terminato;
- righe 11-17: il processo [process2] viene eseguito e il processo [process12] emette i valori emessi da [process2];
C'è una stranezza riguardo al processo 2: non abbiamo specificato un thread di esecuzione. Ci si potrebbe quindi aspettare che venga utilizzato il thread principale per impostazione predefinita. Tuttavia, non è così. Il thread di esecuzione era il thread di calcolo [RxComputationThreadPool-3] (riga 11). Pertanto, quando non viene specificato alcun thread di esecuzione o di osservazione, non possiamo fare alcuna ipotesi su quale thread verrà scelto.
7.5.3. Esempio 16: Combinazione di due osservabili con [Observable.zip]
Esamineremo ora il codice seguente:
package dvp.rxjava.observables.exemples;
import java.util.Arrays;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.functions.FuncN;
import rx.schedulers.Schedulers;
public class Exemple16 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<String> process2 = new Process<>(
new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), null, null);
// 2-process combination function
FuncN<String> funcn = new FuncN<String>() {
@Override
public String call(Object... args) {
if (args.length == 2) {
return String.format("double=%s, string=%s", args[0], args[1]);
} else {
throw new RuntimeException("la fonction attend 2 paramètres exactement");
}
}
};
// zip of the 2 processes
Process<String> process12 = new Process<>("process12",
Observable.zip(Arrays.asList(process1.getObservable(), process2.getObservable()), funcn));
// subscriptions
ProcessUtils.subscribe(1, process12);
}
}
- righe 16–18: un processo denominato [process1] emetterà 3 numeri reali su un thread di calcolo. Sarà inoltre osservato su un thread di calcolo;
- righe 20–21: un processo denominato [process2] emetterà 2 stringhe su un thread non specificato. Anche il thread di osservazione è non specificato;
- righe 23–32: istanziazione di un tipo [FuncN<String>] con una classe anonima. FuncN è un'interfaccia funzionale:
![]() |
Il metodo [FuncN.call] accetta un array di oggetti e restituisce un tipo R. La funzione [funcn] verrà utilizzata per combinare i processi process1 e process2 in quell'ordine. Nel metodo [FuncN.call]:
- args[0] sarà un Double;
- args[1] sarà una String;
Qui, il risultato di [funcn.call] sarà la stringa della riga 27. La costruzione di questo risultato non richiede la conoscenza dei tipi degli argomenti del metodo di chiamata.
I due processi vengono combinati come segue:
// zip des 2 processus
Process<String> process12 = new Process<>("process12",
Observable.zip(Arrays.asList(process1.getObservable(), process2.getObservable()), funcn));
Il metodo [Observable.zip] funziona come segue:
![]() |
Notiamo che:
- il primo argomento di zip è un Iterable<Observable>. Nel nostro esempio, abbiamo un parametro effettivo di tipo List<Observable> costituito dai nostri due osservabili;
- il secondo argomento di zip è di tipo FuncN. Nel nostro esempio, il parametro effettivo è [funcn];
L'esecuzione produce i seguenti risultati:
- righe 7, 11: process12 emette due elementi;
- riga 8: l'elemento aggiuntivo emesso da process1, che non ha un partner in process2, non viene emesso dal processo risultante process12;
Notiamo che process2, a cui non era stato assegnato né un thread di esecuzione né un thread di osservazione, ha utilizzato il thread principale per entrambi.
7.5.4. Esempio 17: Combinazione di due osservabili con [Observable.combineLatest]
Esamineremo ora il codice seguente:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple17 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Double> process2 = new Process<>(
new ProcessAction01<Double>("process2", 2, i -> new Random().nextInt(200) * 1.4), null,
Schedulers.computation());
// combining the 2 processes
Process<Double> process12 = new Process<>("process12",
Observable.combineLatest(process1.getObservable(), process2.getObservable(), (d1, d2) -> d1 + d2));
// subscriptions
ProcessUtils.subscribe(1, process12);
}
}
- righe 14–16: un processo denominato [process1] emetterà 3 numeri reali su un thread di calcolo. Sarà inoltre osservato su un thread di calcolo;
- righe 18–20: un processo denominato [process2] emetterà 2 numeri reali su un thread non associato. Questi saranno osservati su un thread di calcolo;
- riga 23: i due osservabili vengono combinati utilizzando il seguente metodo statico [Observable.combineLatest]:
![]() |
L'osservabile [combineLatest] funziona come segue: quando uno dei due osservabili emette un elemento E1, tale elemento viene combinato tramite [combineFunction] con l'ultimo elemento emesso dall'altro osservabile.
L'esecuzione di questo codice produce il seguente risultato:
- Riga 5: L'output di process2 (56) viene combinato con l'ultimo elemento emesso da process1 (54, riga 4) e produce il risultato mostrato nella riga 7;
- riga 6: l'output di process1 (51,6) viene combinato con l'ultimo elemento emesso da process2 (56, riga 5) e produce il risultato della riga 8;
- riga 9: l'output del processo 2 (261,8) viene combinato con l'ultimo elemento emesso dal processo 1 (51,6, riga 6) e produce il risultato della riga 12;
- riga 13: l'emissione da process1 (80,39) viene combinata con l'ultimo elemento emesso da process2 (261,8, riga 9) e produce il risultato della riga 15;
Si tratta di una variante dell'osservabile [zip] in cui, questa volta, gli elementi combinati non sono necessariamente gli elementi nella stessa posizione nei flussi. Si noti qui che il processo2, a cui non era stato assegnato alcun thread di esecuzione, è stato eseguito sul thread principale (riga 2).
7.5.5. Esempio 18: Combinazione di due osservabili con [Observable.amb]
Esamineremo ora il codice seguente:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple18 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Double> process2 = new Process<>(
new ProcessAction01<Double>("process2", 2, i -> new Random().nextInt(200) * 1.4), null, null);
// combining the 2 processes
Process<Double> process12 = new Process<>("process12",
Observable.amb(process1.getObservable(), process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process12);
}
}
- righe 14–16: un processo denominato [process1] emetterà 3 numeri reali su un thread di calcolo. Sarà inoltre osservato su un thread di calcolo;
- righe 18–20: un processo denominato [process2] emetterà 2 numeri reali su un thread non associato. Essi saranno osservati su un thread non associato;
- riga 22: i due osservabili vengono combinati utilizzando il seguente metodo statico [Observable.amb]:
![]() |
Come mostrato nel diagramma sopra, l'osservabile [Observable.amb(Observable o1, Observable o2)] emette gli elementi dell'osservabile che emette per primo. Ciò è confermato dai risultati dell'esempio presentato:
- riga 4: process2 è il primo a emettere;
- Righe 8, 12: process12 emette tutti gli elementi emessi da process2 (righe 4, 11);
7.6. Catena di elaborazione per un osservabile
7.6.1. Esempio 19: trasformazione di un osservabile con [Observable.map]
Negli esempi precedenti abbiamo esaminato varie combinazioni di due osservabili in un terzo osservabile. Ora presentiamo i metodi statici della classe [Observable] che consentono operazioni di trasformazione, filtraggio e aggregazione su un osservabile. Qui troveremo metodi analoghi a quelli della classe [Stream] studiata nella Sezione 5.
Il nostro primo esempio sarà il seguente:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple19 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<String> process2 = new Process<>("process2",
process1.getObservable().map(d -> String.format("valeur-%s", d)));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- righe 14–16: un processo denominato process1 emetterà 3 numeri reali su un thread di calcolo. Sarà inoltre monitorato su un thread di calcolo;
- righe 17–18: i numeri emessi da process1 saranno convertiti in stringhe in un process2;
- riga 20: osserviamo process2;
Il metodo [Observable.map] alla riga 18 è analogo al metodo [Stream.map] discusso nella Sezione 5.5:
![]() |
I risultati dell'esempio sono i seguenti:
- righe 4, 5 e 8: le emissioni da process1. Si tratta di numeri reali;
- righe 6, 7, 10: le emissioni osservate da process2. Si tratta di stringhe;
7.6.2. Esempio-20: filtrare un osservabile con [Observable.filter]
L'esempio sarà il seguente:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple20 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>("process2", process1.getObservable().filter(i -> i % 2 == 0));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- righe 11-12: un processo denominato process1 emetterà numeri interi da 0 a 2 su un thread di lavoro. Verrà inoltre monitorato su un thread di lavoro;
- riga 14: i numeri emessi da process1 saranno filtrati in modo che solo i numeri pari vengano conservati in process2;
- riga 20: osserviamo process2;
Il metodo [Observable.filter] alla riga 18 è analogo al metodo [Stream.filter] discusso nella Sezione 5.4:
![]() |
I risultati dell'esempio sono i seguenti:
- righe 4, 5 e 7: emissioni da process1;
- righe 6, 9: le emissioni osservate da process2. Questi sono gli elementi pari provenienti da process1;
7.6.3. Esempio-21: trasformazione di un osservabile con [Observable.flatMap]
L'esempio sarà il seguente:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple21 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMap(i -> {
int value = i * 10;
return Observable.just(value, value + 1, value + 2);
}));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- righe 12-13: un processo chiamato process1 emetterà numeri interi da 0 a 2 su un thread di calcolo. Sarà inoltre osservato su un thread di calcolo;
- righe 15–18: ogni numero n emesso da process1 viene trasformato in un osservabile che emette i 3 numeri (10*n, 10*n+1, 10*n+2). Se avessimo utilizzato il metodo [map] alla riga 15, process2 emetterebbe un tipo Observable<Integer> anziché un tipo Integer. Il metodo [flatMap] utilizzato ci permette di appiattire questa sequenza di elementi di tipo Observable<Integer> in una sequenza di elementi di tipo Integer costituita da ciascun elemento di ogni Observable<Integer>;
- riga 20: osserviamo process2;
Il metodo [Observable.flatMap] alla riga 15 è analogo al metodo [Stream.flatMap] discusso nella Sezione 5.6.12:
![]() |
I risultati dell'esempio sono i seguenti:
- righe 5-7: le tre emissioni da process2 che seguono l'emissione alla riga 4 di process1;
- righe 9-11: le tre emissioni da process2 che seguono l'emissione alla riga 8 di process1;
- righe 14-16: le tre emissioni del processo 2 successive all'emissione alla riga 12 del processo 1;
Il codice seguente mostra come creare un tipo Observable<Integer[]> da process1 [Esempio 21b]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21b {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer[]> process2 = new Process<>("process2", process1.getObservable().map(i -> {
int value = i * 10;
return new Integer[] { value, value + 1, value + 2 };
}));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- riga 14: viene utilizzato il metodo [Observable.map];
- riga 16: che restituisce un tipo Integer[];
I risultati sono i seguenti:
- righe 6, 7, 10: vediamo i risultati della mappa;
Tutte queste trasformazioni osservabili possono essere concatenate poiché ogni trasformazione produce una nuova osservabile. Ciò è dimostrato nel seguente esempio [Esempio21c]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple21c {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMap(i -> {
int value = i * 10;
return Observable.just(value, value + 1, value + 2);
}).filter(i -> i % 2 == 0));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- righe 15–18: flatMap è seguito da un filtro;
I risultati dell'esecuzione sono i seguenti:
- righe 8-13: process2 ha emesso solo gli elementi pari da flatMap;
Un metodo simile a [flatMap] è il metodo [flatMapIterable], illustrato dal seguente esempio [Esempio21d]:
package dvp.rxjava.observables.exemples;
import java.util.Arrays;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21d {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMapIterable(i -> {
int value = i * 10;
return Arrays.asList(value, value + 1, value + 2);
}).filter(i -> i % 2 == 0));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
Riga 16: invece di utilizzare il metodo [flatMap], utilizziamo il metodo [flatMapIterable]. In questo caso, la funzione di trasformazione deve produrre un tipo Iterable<T> (riga 18) anziché un tipo Observable<T>.
Otteniamo gli stessi risultati di prima.
Torniamo alla definizione del metodo [flatMap]:
![]() |
Come mostrato sopra, un elemento blu [3] è stato inserito tra i due elementi verdi [1-2]. Ciò significa che quando si appiattiscono gli Observable<T>, il metodo [flatMap] preserva l'ordine di emissione di questi vari osservabili interni. Ciò è dimostrato dal seguente esempio [Esempio21e]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21e {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
Schedulers.computation(), Schedulers.computation());
// process 3
Process<Integer> process3 = new Process<>("process3",
process1.getObservable().flatMap(i -> process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process3);
}
}
- righe 11-12: il processo process1 emette gli interi [0,1];
- righe 14-15: il processo2 emette i numeri interi [10,11,12];
- righe 17-18: ogni elemento emesso da process1 è associato all'osservabile di process2. Ciò significa che:
- l'elemento [0] di process1 sarà associato a un osservabile che emette [10,11,12];
- lo stesso vale per l'elemento 1;
Alla fine, verranno emessi i 6 numeri [10, 11, 12, 10, 11, 12]. Vogliamo vedere in quale ordine.
I risultati dell'esecuzione sono i seguenti:
Possiamo vedere che l'ordine di emissione di process3 era: [10, 10, 11, 12, 11, 12] (righe 11, 12, 14, 17, 19, 22). Pertanto, gli elementi emessi da process2 erano effettivamente mescolati. Possiamo evitare questo utilizzando il metodo [concatMap] invece del metodo [flatMap]. Ciò è dimostrato dal seguente codice [Esempio21ef]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21ef {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
Schedulers.computation(), Schedulers.computation());
// process 3
Process<Integer> process3 = new Process<>("process3",
process1.getObservable().concatMap(i -> process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process3);
}
}
Alla riga 18, abbiamo sostituito [flatMap] con [concatMap]. I risultati dell'esecuzione sono i seguenti:
Si nota che l'ordine di emissione del processo 3 era: [10, 11, 12, 10, 11, 12] (righe 12–14, 17, 19, 22). Gli elementi emessi dal processo 2 non sono stati rimescolati.
Un'altra variante del metodo [map] è il metodo [switchMap]:
![]() |
Nell'esempio sopra, dall'osservabile [1] vengono creati altri tre osservabili [2] con due elementi ciascuno, che vengono poi appiattiti come in [flatMap] [3]. Si noti che il risultato ha 5 elementi, non 6. Questo perché prima che il secondo osservabile emetta il suo secondo elemento [6], il terzo osservabile emette il suo primo elemento [5], causando lo scartamento del secondo osservabile. Pertanto, l'elemento [6] non si trova nell'osservabile risultante [3].
Per illustrare [switchMap], useremo il seguente esempio [Esempio21eg]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21eg {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
Schedulers.computation(), Schedulers.computation());
// process 3
Process<Integer> process3 = new Process<>("process3",
process1.getObservable().switchMap(i -> process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process3);
}
}
L'esecuzione dell'esempio produce i seguenti risultati:
- process1 emette 2 elementi che danno origine a 2 osservabili process2 di 3 elementi;
- riga 14: l'osservatore riceve l'elemento n. 0 emesso dal primo osservabile process2 alla riga 6;
- riga 15: l'osservatore riceve l'elemento n. 0 emesso dal secondo osservabile process2 alla riga 13. La storia non spiega perché in precedenza non abbia ricevuto gli elementi 1 e 2 emessi dal primo osservabile process2 alle righe 7 e 8. In ogni caso, il primo osservabile process2 viene abbandonato;
- alla fine, l'osservatore vede solo 4 elementi (righe 14, 15, 17, 20) invece dei 6 che sono stati emessi;
7.6.4. Esempi-22: Altri metodi della classe [Observable]
La classe [Observable] include molti metodi della classe [Stream] che funzionano in modo simile. Eccone alcuni. Ci limiteremo a fornire il codice e i relativi risultati.
[Esempio 22a - take=limit]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22a {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.range(1, 10).take(3));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
risultati
[Esempio 22b - takeLast]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22b {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.range(1, 10).takeLast(2));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
risultati
[Esempio 22c - salta]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22c {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.range(1, 10).skip(5).take(2));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
risultati
[Esempio 22d - reduce]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22d {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.range(1, 10).reduce(0, (i, a) -> i + a));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
- riga 10: calcola la somma degli elementi nell'osservabile. Il risultato è un osservabile che emette questa somma;
risultati
[Esempio 22e - tutto]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22e {
public static void main(String[] args) throws InterruptedException {
// process
Process<Boolean> process = new Process<>("process", Observable.range(1, 10).all(i -> i > 10));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
- riga 10: restituisce un Observable<Boolean> che emette l'elemento true se il predicato del metodo [all] è vero per tutti gli elementi, false in caso contrario;
risultati
[Esempio 22f - conteggio]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22f {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.range(1, 10).count());
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
- riga 10: [Observable.count] crea un osservabile a 1 elemento che è la somma degli elementi osservati;
risultati
[Esempio 22g - distinto]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22g {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.just(1, 2, 1, 3).distinct());
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
risultati
[ Esempio 22h - groupBy, asObservable]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.observables.GroupedObservable;
public class Exemple22h {
public static void main(String[] args) throws InterruptedException {
// process
Observable<GroupedObservable<Boolean, Integer>> obs = Observable.range(1, 10).groupBy(i -> i % 2 == 0);
Process<Integer> process = new Process<>("process", obs.concatMap(g -> g.asObservable()));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
- Riga 11: Il metodo [groupBy] raggruppa i 10 elementi emessi in due gruppi: numeri pari e numeri dispari. Il risultato è un Observable<GroupedObservable<Boolean, Integer>>, ovvero un osservabile i cui elementi sono di tipo GroupedObservable<Boolean, Integer>, dove Boolean è il tipo della chiave del gruppo (false, true in questo caso) ed è anche il tipo del risultato della lambda passata come parametro al metodo [groupBy], mentre Integer è il tipo degli elementi del gruppo;
- riga 12: il tipo GroupedObservable ha un metodo [asObservable] che ci permette di creare un osservabile da questo tipo. Avremo quindi due tipi Observable<Integer>, uno per i numeri pari e l'altro per i numeri dispari. Da questi due osservabili, il metodo [concatMap] ne creerà uno solo;
risultati
[Esempio22i - timestamp]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
import rx.schedulers.Timestamped;
public class Exemple22i {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Timestamped<Integer>> process2 = new Process<>("process2", process1.getObservable().timestamp());
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- alla riga 15, il metodo [timestamp] associa un timestamp a ciascun elemento elaborato dell'osservabile;
risultati
In questo esempio, è difficile capire cosa rappresentino le informazioni relative al timestamp:
- righe 4-5: vediamo che l'elemento 1 del processo1 è stato emesso 139 ms dopo l'elemento 0;
- righe 6 e 7: vediamo che l'elemento 1 del processo 2 è stato osservato 234 ms dopo l'elemento 0;
- righe 5, 8: vediamo che l'elemento 2 del processo1 è stato emesso 33 ms dopo l'elemento 1;
- righe 7 e 10: vediamo che l'elemento 2 del processo 2 è stato osservato 37 ms dopo l'elemento 1;
Questi ritardi sono dovuti al fatto che i thread per l'osservazione e l'esecuzione degli osservabili non sono gli stessi. Se sostituiamo le righe 12–13 con le seguenti righe (Esempio22j):
// processus 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
null);
- Righe 2–3: Non specifichiamo il thread di osservazione. Sappiamo che in questo caso l'osservabile viene osservato nel punto in cui viene eseguito;
Ciò produce i seguenti risultati:
- righe 4 e 6: process1 emette il suo elemento n. 1 587 ms dopo il suo elemento n. 0;
- righe 5 e 7: l'osservatore osserva questi due elementi con un intervallo di 586 ms;
- righe 6 e 8: il processo 1 emette il suo elemento n. 2 396 ms dopo il suo elemento n. 1;
- righe 7 e 9: l'osservatore osserva questi due elementi con una differenza temporale di 396 ms;
In questo caso, i valori dei timestamp sono coerenti: rappresentano accuratamente il tempo di trasmissione dell'elemento.
7.7. Scheduler
7.7.1. Esempio-23: lo scheduler [Schedulers.computation]
Esamineremo ora gli scheduler di esecuzione. L'osservazione verrà effettuata sul thread di esecuzione.
L'argomento degli scheduler è piuttosto oscuro. I vari scheduler sono presentati in questa domanda sul sito web StackOverflow [http://stackoverflow.com/questions/31276164/rxjava-schedulers-use-cases]:
![]() |
Cercheremo di illustrare l'uso di questi diversi scheduler con degli esempi. Il primo illustra lo scheduler [Schedulers.computation]:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple23 {
public static void main(String[] args) throws InterruptedException {
// processes
@SuppressWarnings("unchecked")
Process<Double> processes[] = new Process[10];
for (int i = 0; i < processes.length; i++) {
processes[i] = new Process<>(
new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
Schedulers.computation(), null);
}
// subscriptions
ProcessUtils.subscribe(1, processes);
}
}
- righe 14–19: creiamo un array di 10 processi in esecuzione su un thread di calcolo;
- riga 17: ogni processo genera un numero reale casuale;
- riga 21: ci iscriviamo a tutti questi processi;
I risultati sono i seguenti:
- righe 2-10: i primi 8 processi si avviano su 8 thread diversi (la macchina utilizzata ha 8 core). Si noti che si avviano tutti all'incirca nello stesso momento;
- righe 17-19: 3 processi terminano, liberando così 3 thread;
- righe 23-24: gli ultimi due processi possono quindi avviarsi utilizzando 2 dei thread così liberati;
Possiamo quindi concludere che lo scheduler [Schedulers.computation] fornisce un pool di n thread, dove n è il numero di core presenti sulla macchina. I thread vengono eseguiti in parallelo su questi core.
7.7.2. Esempio 24: lo scheduler [Schedulers.io]
Eseguiamo il codice precedente con lo scheduler [Schedulers.io]:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple24 {
public static void main(String[] args) throws InterruptedException {
// processes
@SuppressWarnings("unchecked")
Process<Double> processes[] = new Process[10];
for (int i = 0; i < processes.length; i++) {
processes[i] = new Process<>(
new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
Schedulers.io(), null);
}
// subscriptions
ProcessUtils.subscribe(1, processes);
}
}
- riga 18: i processi vengono eseguiti utilizzando i thread dello scheduler [Schedulers.io];
Ciò produce i seguenti risultati:
- righe 2-10: i 10 processi partono ciascuno su un thread diverso. A differenza del caso precedente, tutti i processi sono stati in grado di avviarsi. Si noti che questi avvii richiedono 6 ms, mentre in precedenza era 1 ms;
- righe 13-18: gli osservabili emettono uno dopo l'altro e non in modo neanche lontanamente parallelo come nel caso precedente;
Qual è la differenza tra gli scheduler [Schedulers.io] e [Schedulers.computation]? Una risposta è disponibile all'URL [http://stackoverflow.com/questions/31276164/rxjava-schedulers-use-cases]:
![]() |
7.7.3. Esempio-25: lo scheduler [Schedulers.newThread]
Eseguiamo il codice precedente utilizzando lo scheduler [Schedulers.newThread]:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple25 {
public static void main(String[] args) throws InterruptedException {
// processes
@SuppressWarnings("unchecked")
Process<Double> processes[] = new Process[10];
for (int i = 0; i < processes.length; i++) {
processes[i] = new Process<>(
new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
Schedulers.newThread(), null);
}
// subscriptions
ProcessUtils.subscribe(1, processes);
}
}
I risultati ottenuti sono gli stessi dello scheduler [Schedulers.io]:
All'URL [http://stackoverflow.com/questions/33415881/retrofit-with-rxjava-schedulers-newthread-vs-schedulers-io], viene spiegato che lo scheduler [Schedulers.io] fornisce un pool di thread, cosa che lo scheduler [Schedulers.newThread] non fa. Un pool di thread crea automaticamente un insieme di thread. Li assegna ai processi che ne hanno bisogno. Quando questi processi sono terminati, i loro thread non vengono eliminati ma tornano al pool e possono quindi essere riutilizzati da un altro processo. Questo è più efficiente rispetto alla creazione e all'eliminazione costante dei thread. Pertanto, è preferibile utilizzare lo scheduler [Schedulers.io].
7.7.4. Esempio 26: Gli scheduler [Schedulers.immediate, Schedulers.trampoline]
Torniamo alla spiegazione fornita per questi due scheduler:
![]() |
La spiegazione è abbastanza facile da capire, ma quando provi a illustrarla ti rendi conto di non averla compresa appieno. È stato il libro *Learning Reactive Programming With Java 8* ad aiutarmi a creare un esempio basato su uno presente in quel libro, ma semplificato. Eccolo qui:
package dvp.rxjava.observables.exemples;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.function.Consumer;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Scheduler;
import rx.Scheduler.Worker;
import rx.functions.Action0;
import rx.schedulers.Schedulers;
public class Exemple26 {
public static void main(String[] args) throws InterruptedException {
// a scheduler
Scheduler scheduler = Schedulers.immediate();
// a worker of this scheme
Worker worker = scheduler.createWorker();
// an Action0 type to be executed on the worker
Action0 action02 = new Action0() {
@Override
public void call() {
// log action02
ProcessUtils.showInfos.accept("action02");
}
};
// an Action0 type to be executed on the worker
Action0 action01 = new Action0() {
@Override
public void call() {
// program a new action on the same worker
worker.schedule(action02);
// log action01
ProcessUtils.showInfos.accept("action01");
}
};
// action01 is programmed on the worker
worker.schedule(action01);
}
// displays
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
- riga 17: uno scheduler. Questo sarà o [Schedulers.immediate] come mostrato qui o [Schedulers.trampoline] in seguito;
- riga 19: le azioni di tipo Action0 (righe 21, 20) possono essere eseguite sui worker dello scheduler. Il metodo [Scheduler.createWorker] crea un worker. Il metodo [Worker.schedule(Action0)] esegue un'azione di tipo Action0 tramite un worker;
- righe 21–27: una prima azione chiamata [action02] che verrà eseguita (riga 40) dal worker della riga 19;
- righe 30–38: una seconda azione chiamata [action01]. Ha la particolarità di far sì che action02 venga eseguita sullo stesso worker di se stessa (riga 34). È qui che risiede la differenza tra [Schedulers.immediate] e [Schedulers.trampoline]:
- se lo scheduler è [Schedulers.immediate], allora alla riga 34 l'azione action02 verrà eseguita immediatamente (da cui il nome dello scheduler) e l'azione action01 attualmente in esecuzione verrà interrotta. Vedremo quindi apparire il messaggio della riga 25. Una volta terminata action02, action01 riprenderà e vedremo il messaggio della riga 36;
- se lo scheduler è [Schedulers.trampoline], allora alla riga 34 l'azione action02 viene messa in coda. Non verrà eseguita finché l'attività corrente, action01, non sarà completata. Apparirà quindi il messaggio alla riga 36. Una volta completata action01, action02 verrà eseguita e apparirà il messaggio alla riga 25;
L'esecuzione del codice sopra riportato produce i seguenti risultati:
Se, alla riga 17, utilizziamo lo scheduler [Schedulers.trampoline], otteniamo risultati opposti:
Detto questo, è difficile stabilire una connessione con gli osservabili. Non ho trovato un esempio convincente che potesse dimostrare il vantaggio di eseguire un osservabile su uno di questi due thread. Eccone uno, però, che non trovo affatto naturale:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.Scheduler.Worker;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
public class Exemple27 {
public static void main(String[] args) throws InterruptedException {
// Worker
Worker worker = Schedulers.immediate().createWorker();
// Worker worker = Schedulers.trampoline().createWorker();
// observable 1 sur worker
worker.schedule(() -> Observable.range(1, 2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
ProcessUtils.showInfos.accept(String.valueOf(i));
// observable 2 on same worker
worker.schedule(() -> Observable.range(100, 2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
ProcessUtils.showInfos.accept(String.valueOf(i));
}
}));
}
}));
}
}
- righe 13–14: viene creato un worker utilizzando uno dei due scheduler [Schedulers.immediate] e [Schedulers.trampoline];
- riga 16: su questo worker viene pianificato un primo osservabile obs1 che emette i numeri [1,2]
- riga 22: ogni volta che viene osservato un elemento di questo osservabile obs1, viene avviata l'osservazione di un secondo osservabile obs2 sullo stesso worker per emettere i numeri [100,101];
Con lo scheduler [Schedulers.immediate], otteniamo i seguenti risultati:
Mentre con lo scheduler [Schedulers.trampoline] otteniamo i seguenti risultati:
7.8. Conclusione
C'è ancora molto da fare. Per acquisire una comprensione più approfondita della libreria RxJava, invitiamo i lettori a proseguire l'apprendimento utilizzando i riferimenti forniti all'inizio di questo documento. Tuttavia, ora disponiamo delle nozioni di base necessarie per utilizzare RxJava negli ambienti Swing e Android. È proprio ciò che dimostreremo di seguito.








































