2. Un esempio introduttivo
Il mio primo approccio con RxJava è avvenuto attraverso corsi e tutorial che ho trovato online. A parte il fatto che la teoria utilizzava concetti che non conoscevo e che facevo fatica a comprendere, non riuscivo davvero a capire come potesse essere utile nella vita reale. Inizieremo quindi presentando un esempio (spero semplice) in cui l’uso di RxJava porta a una reale semplificazione del codice, e da lì cercheremo di identificare gli elementi importanti di questa libreria.
La libreria RxJava si basa sul seguente concetto: uno stream di elementi di tipo T Observable<T> viene osservato da uno o più subscriber (abbonati, osservatori, consumatori) Subscriber<T>. La libreria RxJava permette al flusso Observable<T> di essere eseguito in un thread T1 e al suo osservatore Subscriber<T> in un thread T2 senza che lo sviluppatore debba preoccuparsi di gestire il ciclo di vita di questi thread o di questioni naturalmente complesse, come la condivisione dei dati tra i thread e la loro sincronizzazione per eseguire un'operazione globale. Facilita quindi la programmazione asincrona.
Uno stream Observable<T> produce elementi di tipo T, che sono osservabili man mano che vengono generati. Se l'osservatore e l'osservabile (termine usato in senso lato per riferirsi al tipo Observable<T>) si trovano nello stesso thread, l'osservabile può produrre l'elemento (i+1) solo dopo che l'osservatore ha consumato l'elemento i. Sono pochi i casi in cui questa architettura risulta utile. Se l'osservatore e l'osservabile non si trovano nello stesso thread, l'osservabile e il suo osservatore si comportano in modo autonomo: l'osservabile emette al proprio ritmo e l'osservatore consuma al proprio ritmo. È qui che risiede il valore della libreria. Finora abbiamo discusso solo di un singolo osservatore. In realtà, un osservabile può avere un numero qualsiasi di osservatori.
2.1. L'architettura dell'applicazione di esempio
L'applicazione di esempio presenta la seguente architettura:

- in [1], un livello di servizio genera elenchi di numeri casuali. Questo livello viene eseguito nello stesso thread del metodo [swing] che lo utilizza. Genera quindi i propri numeri in modo sincrono;
- in [2], un sottile livello di adattamento implementato con RxJava consente di presentare al livello [swing] un'implementazione asincrona dello stesso servizio: questo servizio può essere eseguito in un thread diverso dal metodo [swing] che lo utilizza;
- La chiamata [4] è sincrona, mentre le chiamate [5-6] sono asincrone;
Ciò che vogliamo dimostrare qui è che la libreria Rx rende facile trasformare un'interfaccia sincrona in una asincrona. Perché è utile? Gli eventi in un'interfaccia Swing vengono elaborati in un thread comunemente denominato ciclo di eventi. Gli eventi vengono messi in coda ed elaborati uno dopo l'altro. L'evento Ei+1 può essere elaborato solo una volta che l'evento precedente Ei è stato completamente elaborato. È quindi importante che la gestione degli eventi sia il più breve possibile in modo che la GUI rimanga reattiva. A volte, la gestione di un evento può richiedere molto tempo. Questo accade se la gestione comporta l'accesso alla rete. Se non vogliamo che la GUI si blocchi in un modo inaccettabile per l'utente, queste operazioni di rete devono essere eseguite in thread separati dal ciclo di eventi per liberarlo. Questo ci porta nel regno della programmazione concorrente (dove più thread vengono eseguiti in parallelo), che è giustamente considerata difficile. La libreria Rx fornisce una soluzione semplice ed elegante a questo problema.
Per simulare processi di lunga durata, il servizio nell'esempio fornisce i suoi numeri casuali dopo un certo ritardo in modo da poter osservare il comportamento dell'interfaccia grafica utente.
2.2. L'eseguibile
L'eseguibile per l'applicazione di esempio si trova nella cartella [dvp/executables] degli esempi:
![]() | ![]() |
Esistono vari modi per eseguire l'archivio [swing-01] a seconda della configurazione del computer utilizzato. Ad esempio, è possibile seguire la procedura [1-3]. Verrà visualizzata la seguente interfaccia grafica utente:
![]() |
- L'interfaccia presenta due schede [1-2]: una [Request] per l'invio di una richiesta al servizio del generatore di numeri casuali e l'altra [Response] per la visualizzazione dei numeri ricevuti;
- In [3] si specifica il numero di richieste che si desidera inviare al servizio;
- In [4], si specifica l'intervallo di generazione dei numeri desiderato [a,b];
- In [5], il numero di valori restituiti dal servizio sarà un numero casuale compreso nell'intervallo [minCount, maxCount] impostato dall'utente;
- in [6], prima di restituire la risposta, il servizio attenderà delay millisecondi, dove delay è un numero casuale compreso nell'intervallo definito dall'utente [minDelay, maxDelay];
- Per impostazione predefinita, il livello [swing] utilizzerà l'interfaccia sincrona del servizio. Per utilizzare il livello asincrono, l'utente selezionerà [7]. In questo caso, il servizio di generazione verrà eseguito in thread separati dal ciclo di eventi della GUI. La libreria Rx offre varie strategie per la generazione di questi thread. L'utente può selezionare la propria strategia in [8];
- la generazione dei numeri viene eseguita utilizzando il pulsante [9];
![]() |
- [10] mostra i risultati. Ne spiegheremo la struttura;
- in [11], il numero di risultati ottenuti;
- in [12], il tempo di esecuzione in millisecondi;
- in [13], l'utente ha la possibilità di annullare l'esecuzione;
Ogni risultato ha il seguente formato:
{"idClient":0,"serviceResponse":{"delay":412,"aleas":[146,115,128,174,159,112,162,127],"executedOn":"RxComputationThreadPool-6"},"observedOn":"AWT-EventQueue-0","requestAt":"02:42:47:708","responseAt":"02:42:52:931"}
- [idClient]: l'ID della richiesta. Si noti che vengono inviate più richieste al servizio di generazione;
- [delay]: il tempo di attesa in millisecondi osservato dal servizio prima di inviare il risultato;
- [aleas]: i numeri casuali restituiti dal servizio;
- [executedOn]: il nome del thread in cui il servizio è stato eseguito;
- [observedOn]: il nome del thread che ha visualizzato il risultato. Con un'interfaccia Swing, questo può essere solo il thread del ciclo di eventi, qui [AWT-EventQueue-0];
- [requestAt]: l'ora della richiesta nel formato [ore:minuti:secondi:millisecondi];
- [responseAt]: l'ora in cui sono stati ricevuti i risultati nello stesso formato;
Presenteremo ora i frammenti di codice necessari per comprendere l'esempio.
2.3. L'interfaccia sincrona

Il livello di servizio [1] presenta la seguente interfaccia:
public interface IService {
// random numbers in [a,b]
// n numbers are generated with random n in the interval [minCount, maxCount]
// numbers are generated after a delay of milliseconds,
// where [delay] is a random number in the range [minDelay, maxDelay]
public ServiceResponse getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay);
}
Il [ServiceResponse] è il seguente:
public class ServiceResponse {
// service waiting time
private int delay;
// random numbers
private List<Integer> aleas;
// execution thread
private String executedOn;
// manufacturers
public ServiceResponse(int delay, List<Integer> aleas) {
executedOn = Thread.currentThread().getName();
this.delay = delay;
this.aleas = aleas;
}
// getters and setters
...
}
La risposta è composta da tre parti:
- riga 6: i numeri casuali generati;
- riga 4: il tempo di attesa osservato dal servizio prima di restituire il risultato;
- riga 8: il thread di esecuzione del servizio;
2.4. La chiamata sincrona

Descriveremo ora in dettaglio la chiamata sincrona [4] effettuata dal livello [swing] al servizio [1]:
private void doGenerateWithService() {
// start waiting
beginWaiting();
try {
for (int i = 0; i < nbRequests; i++) {
UiResponse uiResponse = new UiResponse();
uiResponse.setIdClient(i);
uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
uiResponse.setResponseAt();
model.add(0, jsonMapper.writeValueAsString(uiResponse));
jLabelNbReponses.setText(String.valueOf(Integer.parseInt(jLabelNbReponses.getText()) + 1));
}
} catch (JsonProcessingException | RuntimeException e) {
System.out.println(e);
}
// end waiting
endWaiting();
}
- righe 5–12: il ciclo che elabora le [nbRequests] richieste effettuate dall'utente;
- riga 8: [service] è l'implementazione dell'interfaccia sincrona [IService] presentata nella Sezione 2.3;
- riga 10: [model] è il modello visualizzato dal componente JList della scheda [Response]. Gli elementi di questo modello sono stringhe JSON di elementi di tipo [UiResponse] come segue:
public class UiResponse {
// customer id
private int idClient;
// service response
private ServiceResponse serviceResponse;
// observation thread name
private String observedOn;
// query time
private String requestAt;
// response time
private String responseAt;
// manufacturers
public UiResponse() {
observedOn = Thread.currentThread().getName();
requestAt = getTimeStamp();
}
// private methods
private String getTimeStamp() {
return new SimpleDateFormat("hh:mm:ss:SSS").format(Calendar.getInstance().getTime());
}
// getters and setters
...
}
- riga 6: la risposta dal servizio di generazione dei numeri;
- riga 4: il numero della richiesta a cui si sta rispondendo;
- riga 8: il thread che visualizza questa risposta. Come accennato, questo sarà sempre il thread del ciclo di eventi;
- righe 10 e 12: l'ora della richiesta e l'ora della risposta;
2.5. Test delle chiamate sincrone
Eseguiamo la seguente configurazione:
![]() |
Nella scheda [Risposta] otteniamo i seguenti risultati:
![]() |
- In [1-2], abbiamo effettivamente ricevuto 10 risposte come richiesto. Sono state inserite nella prima posizione nell'ordine in cui sono arrivate. Possiamo vedere che sono state ricevute nell'ordine delle richieste;
- Sono state tutte eseguite e visualizzate nel thread del ciclo di eventi [AWT-EventQueue-0]. Le richieste sono state quindi eseguite una dopo l'altra in questo thread. Non c'erano richieste concorrenti;
- ciò che qui non è visibile è che durante l'esecuzione la GUI è bloccata. Ad esempio, non è possibile accedere alla scheda [Response] per visualizzare le risposte in arrivo o interrompere l'esecuzione utilizzando il pulsante [Cancel]. Anche se questo pulsante fosse stato presente nella scheda [Request], sarebbe stato inutilizzabile. Infatti, si sarebbero verificati due eventi:
- fare clic sul pulsante [Generate];
- fare clic sul pulsante [Annulla];
Il clic sul pulsante [Cancel] viene gestito solo dopo che l'operazione innescata dal clic sul pulsante [Generate] è terminata. Abbiamo appena visto che questa operazione ha occupato il thread del ciclo di eventi per l'intera durata dell'esecuzione, impedendo così la gestione del clic sul pulsante [Cancel]. Questo è tipicamente il tipo di situazione in cui Rx può fornire un miglioramento significativo;
2.6. L'interfaccia asincrona e la sua implementazione
Ora esamineremo l'interfaccia del livello [2] e la sua implementazione con Rx. Ciò non risulterà immediatamente chiaro. Vogliamo semplicemente evidenziare la semplicità del codice in questa implementazione.

L'interfaccia asincrona è la seguente:
public interface IRxService {
// random numbers in [a,b]
// n numbers are generated with random n in the interval [minCount, maxCount]
// numbers are generated after a delay of milliseconds,
// where [delay] is a random number in the range [minDelay, maxDelay]
public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse);
}
Le differenze rispetto all'interfaccia sincrona presentata nella Sezione 2.3 sono le seguenti:
- la classe [UiResponse] presentata nella Sezione 2.3 fa ora parte dei parametri del metodo [getAleas] (riga 6). Il motivo è che, poiché le richieste ora vengono eseguite in parallelo e il servizio attende un tempo casuale prima di restituire il risultato, le risposte non ci arriveranno nell'ordine delle richieste. Passiamo quindi l'oggetto [UiResponse], che contiene, tra le altre informazioni, l'ID della richiesta:
// id du client (requête)
private int idClient;
// réponse du service
private ServiceResponse serviceResponse;
// nom du thread d'observation
private String observedOn;
// heure de la requête
private String requestAt;
// heure de la réponse
private String responseAt;
- Il tipo di risposta del servizio asincrono è di tipo [Observable<UiResponse>]. Il tipo [Observable<>] è fornito dalla libreria Rx. Il risultato [Observable<UiResponse>] indica che il metodo [getAleas] fornisce un flusso di valori di tipo [UiResponse], che vengono inviati uno per uno al loro osservatore;
Ora diamo un'occhiata all'implementazione di questa interfaccia:
public class RxService implements IRxService {
// service
private IService service;
// manufacturer
public RxService(IService service) {
this.service = service;
}
@Override
public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse) {
return Observable.create(subscriber -> {
try {
uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
subscriber.onNext(uiResponse);
} catch (Exception e) {
subscriber.onError(e);
} finally {
subscriber.onCompleted();
}
});
}
}
- righe 7–9: forniamo al costruttore un riferimento all'interfaccia sincrona [IService]. Questa interfaccia gestirà la generazione di numeri casuali;
- l'osservabile restituito dal metodo [getAleas] viene costruito dal metodo statico [Observable.create]. Questo metodo ci permette di costruire un'implementazione asincrona a partire da una sincrona;
- riga 13: il parametro del metodo statico [Observable.create] è qui una funzione lambda che accetta un tipo [Subscriber] come parametro, ancora una volta un tipo Rx. Un [Subscriber] è un oggetto che si abbona a uno stream di osservabili, ovvero uno stream di dati forniti in modo asincrono. Qui utilizziamo tre metodi di questo subscriber:
- [Subscriber.onNext] per passargli dei dati (riga 16);
- [Subscriber.onError] per inviargli un'eccezione (riga 18);
- [Subscriber.onCompleted] per indicare all'abbonato che il flusso di dati è terminato (riga 20);
Possono esserci più subscriber per lo stesso osservabile. Qui, avremo un solo subscriber che si abbona a un flusso di un singolo dato, quello prodotto nelle righe 15–16. Il dato viene prodotto dall'implementazione sincrona del servizio (riga 15) e restituito al subscriber (riga 16).
Anche se tutto questo rimane un po' oscuro, non si può fare a meno di rimanere colpiti dall'estrema concisione di questa implementazione asincrona del servizio.
2.7. La chiamata asincrona

Esamineremo ora la chiamata sincrona [5] effettuata dal livello [swing] al servizio [2]:
private void doGenerateWithRxService() {
// start waiting
beginWaiting();
// we ask for the random numbers
Observable<UiResponse> observables = Observable.empty();
for (int i = 0; i < nbRequests; i++) {
UiResponse uiResponse = new UiResponse();
uiResponse.setIdClient(i);
// scheduler
int schedulerIndex = jComboBoxSchedulers.getSelectedIndex();
switch (schedulerIndex) {
case 0:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.io()));
break;
...
}
}
...
}
- righe 6–10: esecuzione delle [nbRequests] richieste dall'utente;
- righe 7-8: preparazione dell'oggetto [UiResponse] richiesto dal metodo [getAleas] del servizio asincrono (riga 13). Ciò comporta principalmente la registrazione dell'[idClient] della richiesta;
- riga 13: viene chiamato il metodo [getAleas] del servizio asincrono. Esso restituisce un oggetto [Observable<UiResponse>]. Questa chiamata non invoca ancora il servizio sincrono. Torniamo al codice per il [getAleas] asincrono:
@Override
public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse) {
return Observable.create(subscriber -> {
try {
uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
subscriber.onNext(uiResponse);
} catch (Exception e) {
subscriber.onError(e);
} finally {
subscriber.onCompleted();
}
});
}
Il codice nelle righe 4–11, che chiama il servizio sincrono, viene eseguito solo quando un sottoscrittore si registra. Finché non ci sono sottoscrittori, questo codice non viene eseguito.
Torniamo al codice del metodo [doGenerateWithRxService]:
- riga 5: creiamo un osservabile vuoto (non viene osservato nulla);
- riga 13: creiamo un osservabile il cui flusso sarà la fusione dei flussi asincroni [nbRequests] associati alle richieste [nbRequests]. Ciò si ottiene utilizzando il metodo [Observable.mergeWith], che consente di unire due flussi asincroni. Nella terminologia Rx, [mergeWith] è chiamato operatore di flusso. Questi operatori hanno la caratteristica che il risultato dell'operazione è, nella maggior parte dei casi, un altro [Observable]. In definitiva, dopo la riga 17, la variabile [observables] si riferisce a un unico flusso costituito dalle [nbRequests] risposte asincrone effettuate dal servizio asincrono;
- riga 13: l'operazione di unione avrebbe potuto essere scritta come:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse));
ma abbiamo scritto:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.io()));
Qui abbiamo utilizzato l'operatore [subscribeOn] sull'osservabile [rxService.getAleas]. Come spesso accade, il risultato è di nuovo un osservabile. L'operatore [subscribeOn] specifica che l'osservabile deve essere eseguito in un thread fornito da uno [Scheduler]. Esistono diversi [Scheduler] adatti a diverse situazioni. Nella GUI abbiamo fornito diverse opzioni per vedere come differiscono:
![]() |
Il risultato è il seguente codice:
private void doGenerateWithRxService() {
// start waiting
beginWaiting();
// we ask for the random numbers
Observable<UiResponse> observables = Observable.empty();
for (int i = 0; i < nbRequests; i++) {
UiResponse uiResponse = new UiResponse();
uiResponse.setIdClient(i);
// scheduler
int schedulerIndex = jComboBoxSchedulers.getSelectedIndex();
switch (schedulerIndex) {
case 0:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.io()));
break;
case 1:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.computation()));
break;
case 2:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.newThread()));
break;
case 3:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.trampoline()));
break;
case 4:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.immediate()));
break;
}
}
...
}
Rivediamo il codice nelle righe 12–14. Lo scheduler [Schedulers.io()] assegna un nuovo thread a ciascun osservabile. Se seguiamo il codice:
- riga 5: abbiamo un osservabile vuoto;
- riga 13, iterazione 1: observables è la lista [observable0/thread0] (Observable observable0 in esecuzione sul thread thread0);
- riga 13, iterazione 2: observables è la lista [observable0/thread0, observable1/thread1];
- ecc...
Alla fine, dopo la riga 28, abbiamo un osservabile risultante dalla fusione di [nbRequests] osservabili in esecuzione su [nbRequests] thread diversi. Non tutti gli scheduler funzionano in questo modo, come vedremo durante i test.
Continuiamo ad esaminare il codice per la chiamata al servizio asincrono:
private void doGenerateWithRxService() {
// start waiting
beginWaiting();
// we ask for the random numbers
Observable<UiResponse> observables = Observable.empty();
for (int i = 0; i < nbRequests; i++) {
...
}
// observer
observables = observables.observeOn(SwingScheduler.getInstance());
// these observables are executed
subscriptions.add(observables.subscribe(uiResponse -> {
updateUi(uiResponse);
} , th -> {
System.out.println(th);
doCancel();
} , this::doCancel));
}
- Abbiamo visto che quando arriviamo alla riga 10, abbiamo un unico osservabile, un'unione di [nbRequests] osservabili che possono o meno essere eseguiti su [nbRequests] thread diversi, a seconda dello scheduler scelto dall'utente;
- Riga 10: L'operatore [observeOn] ci permette di specificare su quale thread vogliamo recuperare i dati dall'observable, in questo caso gli oggetti [nbRequests] di tipo [UiResponse]. In un'interfaccia Swing, non abbiamo altra scelta. Qualsiasi aggiornamento dell'interfaccia deve essere eseguito nel thread del ciclo di eventi. Qui, i dati dell'osservabile verranno visualizzati in un componente JList di Swing. Il thread [SwingScheduler.getInstance()] rappresenta il thread del ciclo di eventi. La classe [SwingScheduler] non proviene dalla libreria RxJava ma dalla libreria RxSwing;
- quando arriviamo alla riga 12, il servizio sincrono non è ancora stato chiamato perché l'osservabile alla riga 10 non ha ancora un sottoscrittore. Le righe 12–17 ne forniscono uno, utilizzando l'operatore [subscribe]. I parametri di questo operatore sono tre funzioni lambda:
- la prima [uiResponse -> {updateUi(uiResponse);}] prende come parametro uno degli oggetti [UiResponse] prodotti dall'osservabile. Ricordiamo che qui avremo [nbRequests] oggetti di questo tipo. Il metodo associato, in questo caso updateUi, deve elaborare questo risultato;
- il secondo [th -> {System.out.println(th);doCancel();}] accetta come parametro un tipo [Throwable], in questo caso un'eccezione verificatasi durante l'esecuzione dell'osservabile. Il metodo associato deve elaborare questa informazione. Qui, la visualizziamo sulla console (riga 15) e annulliamo l'esecuzione, il che aggiornerà alcuni elementi della GUI;
- il terzo [this::doCancel] viene chiamato quando l'osservabile segnala di non avere più dati da trasmettere. Qui, l'osservabile è l'unione di [nbRequests] osservabili. L'osservabile risultante indicherà di aver terminato quando tutti gli osservabili che lo compongono avranno a loro volta segnalato di aver terminato il proprio lavoro. Quindi, quando viene eseguita questa terza funzione lambda, abbiamo ricevuto tutti i dati. Il metodo locale [doCancel] aggiorna la GUI per riflettere il completamento dell'esecuzione;
La variabile [subscriptions] è definita come segue:
// les souscriptions aux observables
protected List<Subscription> subscriptions = new ArrayList<Subscription>();
Il tipo [Subscription] rappresenta una sottoscrizione, ovvero il collegamento tra un sottoscrittore [Subscriber] e ciò che sta osservando [Observable]. Qui abbiamo utilizzato un elenco di sottoscrizioni, sebbene in questo esempio ce ne sia solo una. Il metodo locale [doCancel], eseguito quando l'osservabile segnala di non avere più dati da trasmettere, è il seguente:
@Override
protected void doCancel() {
// fin attente
endWaiting();
// dans le cas de souscriptions
if (jCheckBoxRxSwing.isSelected() && subscriptions != null) {
subscriptions.forEach(Subscription::unsubscribe);
}
}
- La riga 7 cancella l'iscrizione di tutti gli iscritti dall'osservabile;
Da questa breve spiegazione, possiamo trarre i seguenti punti chiave:
- il tipo [Observable] denota un flusso di valori, che vengono inviati uno alla volta agli iscritti o agli osservatori;
- il tipo [Subscriber] indica un sottoscrittore del tipo [Observable];
- il tipo [Subscription] indica una sottoscrizione, ovvero il collegamento tra un [Subscriber] e un [Observable];
- il tipo [Observable] supporta gli operatori [mergeWith, empty, subscribeOn, observeOn, ...], la maggior parte dei quali produce osservabili. Questi operatori vengono utilizzati per configurare l'osservabile prima che venga eseguito:
- cosa osservare;
- il thread su cui viene eseguito l'osservabile;
- il thread su cui l'abbonato riceve i dati dall'osservabile;
- Esistono due tipi di osservabili: [cold] e [hot]. Un osservabile cold viene eseguito completamente per ogni nuovo sottoscrittore. Se ogni esecuzione produce gli stessi dati, ogni nuovo sottoscrittore riceve gli stessi dati del precedente. Un osservabile hot generalmente produce dati in modo continuo. Quando un sottoscrittore si abbona, riceve i dati emessi a partire dal momento della sua sottoscrizione. Non riceve i dati che potrebbero essere stati emessi in precedenza. Nel nostro esempio, l'osservabile è cold: viene rieseguito completamente per ogni nuovo sottoscrittore. Cosa viene effettivamente eseguito nel nostro esempio? Per scoprirlo, dobbiamo tornare alla definizione dell'osservabile osservato:
@Override
public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse) {
return Observable.create(subscriber -> {
try {
uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
subscriber.onNext(uiResponse);
} catch (Exception e) {
subscriber.onError(e);
} finally {
subscriber.onCompleted();
}
});
}
Per ogni nuovo sottoscrittore, la funzione lambda, che è un parametro del metodo [Observable.create] (riga 3), viene rieseguita. Pertanto, le righe 4–11 vengono eseguite per ogni nuovo sottoscrittore [subscriber];
2.8. Test delle chiamate asincrone
Iniziamo dimostrando l'effetto dei diversi scheduler disponibili. A tal fine, utilizziamo i seguenti parametri:
![]() |
Impostiamo [1-2] su valori bassi in modo che, anche se le richieste vengono eseguite sullo stesso thread, non si debba comunque attendere troppo a lungo.
2.8.1. con lo scheduler [Schedulers.io]
![]() |
Si possono notare i seguenti punti:
- le risposte vengono ricevute in un ordine che non corrisponde all'ordine delle richieste (vedi idClient);
- ogni richiesta è stata eseguita in un thread diverso;
- questa volta l'interfaccia grafica non è più bloccata:
- è possibile passare da una scheda all'altra;
- vediamo i dati in arrivo;
- non c'è il tempo di vedere il pulsante [Annulla] perché l'esecuzione è troppo veloce. Lo evidenzieremo in un altro test;
2.8.2. con lo scheduler [Schedulers.computation]
![]() |
Si possono notare i seguenti punti:
- le risposte vengono ricevute in un ordine che non corrisponde all'ordine delle richieste (vedi idClient);
- le richieste sono state eseguite in 8 thread;
- il thread n. 3 è stato utilizzato per le richieste 8 e 0;
- il thread n. 4 è stato utilizzato per le richieste 9 e 1;
- le altre richieste avevano ciascuna un thread diverso;
Lo scheduler [Schedulers.computation] utilizza tanti thread quanti sono i core presenti sulla macchina in uso. Questa informazione viene ottenuta tramite l'espressione [Runtime.getRuntime().availableProcessors()].
2.8.3. con lo scheduler [Schedulers.newThread]
![]() |
Il comportamento è simile a quello dello scheduler [Schedulers.io].
2.8.4. con gli scheduler [Schedulers.trampoline, Schedulers.immediate]
![]() |
Il comportamento è sincrono. Tutte le richieste vengono eseguite sul thread del ciclo di eventi. Questo risultato non va generalizzato; significa semplicemente che in questo specifico esempio entrambi gli scheduler hanno operato in modo sincrono.
2.9. Casi limite
In questo esempio, lavoreremo con scheduler che supportano il funzionamento asincrono. Innanzitutto, aumentiamo il numero di richieste a 100 utilizzando lo scheduler [Schedulers.computation], che qui viene eseguito su 8 thread. Otteniamo il seguente risultato:
![]() |
- in [1], il pulsante [Cancel] è presente e utilizzabile (funzionamento asincrono);
Ora lasciamo che l'esecuzione vada a buon fine:
![]() |
In [2] vediamo che l'esecuzione delle 100 richieste ha richiesto circa 4 secondi (su 8 thread).
Ora, eseguiamo queste stesse 100 richieste utilizzando lo scheduler [Schedulers.newThread], che esegue ogni richiesta su un thread separato:
![]() |
In [1], vediamo che l'esecuzione delle 100 richieste (su 100 thread) ha richiesto mezzo secondo. Questo è quindi significativamente più veloce rispetto allo scheduler [Schedulers.computation].
Ora, effettuiamo 800 richieste nelle stesse condizioni, utilizzando ancora lo scheduler [Schedulers.newThread]. Otteniamo i seguenti risultati:
![]() |
Le 800 richieste vengono eseguite in circa 1 secondo.
Quando aumentiamo questo numero (oltre le 2.500 richieste sul mio computer — eseguite in 1,5 secondi — questo numero dipende ovviamente in larga misura dall'ambiente di runtime), alla fine otteniamo la seguente eccezione:
![]() |
Si verifica quindi un overflow dello stack. I test dimostrano che il comportamento dello scheduler [Schedulers.newThread] non è deterministico. È possibile che si verifichi l'eccezione descritta in precedenza, che si eseguano nuovi test, che poi si torni alla configurazione che ha causato l'eccezione e che questa non si verifichi più.
2.10. Conclusione
Abbiamo illustrato un esempio di utilizzo della libreria Rx. Riassumiamo ciò che abbiamo appreso:
Abbiamo iniziato con la seguente architettura:

- in [4], il livello [swing] effettuava chiamate sincrone al livello [service];
- in [5], il livello [swing] effettuava chiamate asincrone al livello [rxService], che a sua volta effettuava una chiamata sincrona [6] al livello [service];
La prima cosa che abbiamo notato è che la libreria Rx ha reso facile creare l'interfaccia asincrona [rxService] a partire dall'interfaccia sincrona [service] (vedi Sezione 2.4). Questa è una lezione importante perché significa che possiamo facilmente trasformare un'applicazione sincrona in una asincrona.
Nel livello [swing] sono stati scritti due metodi distinti:
- uno per effettuare chiamate sincrone al servizio (vedere la sezione 2.4);
- l'altro per effettuare chiamate asincrone al servizio (vedi sezione 2.7);
La scrittura di chiamate asincrone si è rivelata significativamente più complessa rispetto a quella delle chiamate sincrone. Tuttavia, chi ha lavorato con la programmazione concorrente che coinvolge più thread che devono essere sincronizzati, troverà che la soluzione Rx è più semplice da scrivere ed evita tutti i difficili problemi di sincronizzazione e comunicazione tra thread. In questo articolo, abbiamo evidenziato i seguenti punti chiave:
- il tipo [Observable] denota un flusso di eventi (valori) che possono (ma non necessariamente) essere asincroni e che possono essere osservati;
- il tipo [Subscriber] indica un sottoscrittore di un tipo [Observable];
- il tipo [Subscription] indica una sottoscrizione, ovvero il collegamento tra un [Subscriber] e un [Observable];
- Il tipo [Observable] supporta operatori [mergeWith, empty, subscribeOn, observeOn, ...] che, nella maggior parte dei casi, generano osservabili. Questi operatori vengono utilizzati per configurare l'osservabile prima della sua esecuzione:
- cosa osservare;
- il thread su cui viene eseguito l'osservabile;
- il thread su cui l'abbonato riceve i dati dall'osservabile;
- esistono due tipi di osservabili: [cold] e [hot]. Un osservabile cold viene eseguito completamente per ogni nuovo sottoscrittore. Se ogni esecuzione produce gli stessi dati, ogni nuovo sottoscrittore riceve gli stessi dati del precedente. Un osservabile hot generalmente produce dati in modo continuo. Quando un sottoscrittore si abbona, riceve i dati emessi dal momento della sua sottoscrizione. Non riceve alcun dato che possa essere stato emesso in precedenza. Nel nostro esempio, l'osservabile è cold: viene completamente rieseguito per ogni nuovo sottoscrittore.
Ora che abbiamo visto un esempio che dimostra il valore della libreria Rx, la esploreremo più nel dettaglio.
La libreria Rx ha molti metodi con parametri generici nelle loro firme. Esamineremo brevemente queste firme (sezione 3). I parametri di questi metodi sono per lo più interfacce funzionali (Java 8), ovvero interfacce con un solo metodo. I parametri effettivi devono quindi essere istanze di queste interfacce. Prima di Java 8, era pratica comune implementare un'interfaccia utilizzando una classe anonima. Con Java 8, se l'interfaccia è un'interfaccia funzionale, è più conciso implementarla utilizzando una funzione lambda. Introdurremo quindi queste ultime (Sezione 4). Una volta fatto ciò, introdurremo la classe [Stream] (Sezione 5), che consente di elaborare le collezioni Java utilizzando le funzioni lambda. Questa classe è interessante perché la classe [Observable] di RxJava prende in prestito:
- alcuni metodi;
- lo stesso modo di concatenare i metodi per elaborare lo stesso osservabile;
Introdurremo quindi le interfacce funzionali specifiche della libreria RxJava (sezione 6). Continueremo con gli elementi principali della libreria Rx [Observable, Subscriber, Subscription, operatori] (sezione 7). La classe [Observable] dispone di decine di operatori che a loro volta sono sovraccaricati più volte. Ciò crea inizialmente una notevole complessità poiché questi operatori e i loro sovraccarichi a volte differiscono solo per un singolo dettaglio ed è difficile, senza esperienza, sapere quale operatore utilizzare. Presenteremo solo un numero limitato di operatori e, nella maggior parte dei casi, ignoreremo i loro sovraccarichi.
L'intera sezione precedente verrà trattata utilizzando la libreria RxJava in semplici applicazioni da console. Una volta acquisita padronanza della libreria RxJava, la useremo in due tipi di applicazioni grafiche:
- nella Sezione 8, riprenderemo l'esempio dell'applicazione Swing per esplorarlo in modo più dettagliato. Utilizzeremo quindi la libreria RxSwing;
- nella Sezione 9, creeremo un'applicazione Android utilizzando la libreria RxAndroid;
Una volta fatto tutto questo, il lettore avrà gli strumenti per camminare con le proprie gambe. Probabilmente ci vorrà un po' di tempo prima che riesca a utilizzare la libreria Rx in modo intuitivo. Ho trovato questa libreria particolarmente interessante. Tuttavia, l'ho trovata complessa da comprendere e la curva di apprendimento è stata ripida. Spero che questo documento accorci tale curva di apprendimento per il lettore. Mi sembra che ne valga davvero la pena.
















