Skip to content

2. Ein Einführungsbeispiel

Meine ersten Berührungen mit RxJava hatte ich durch Kurse und Tutorials, die ich online gefunden habe. Abgesehen davon, dass die Theorie Konzepte verwendete, die mir nicht vertraut waren und die ich nur schwer verstehen konnte, konnte ich nicht wirklich erkennen, wie sie im Alltag nützlich sein könnte. Wir beginnen daher mit einem Beispiel (einem einfachen, wie ich hoffe), bei dem der Einsatz von RxJava zu einer echten Vereinfachung des Codes führt, und versuchen von dort aus, die wichtigen Elemente dieser Bibliothek herauszuarbeiten.

Die RxJava-Bibliothek basiert auf folgendem Konzept: Ein Strom von Elementen vom Typ T Observable<T> wird von einem oder mehreren Abonnenten (Subscribers, Observers, Consumers) vom Typ Subscriber<T> beobachtet. Die RxJava-Bibliothek ermöglicht es, den Observable<T>-Stream in einem Thread T1 und seinen Subscriber<T>-Beobachter in einem Thread T2 auszuführen, ohne dass sich der Entwickler um die Verwaltung des Lebenszyklus dieser Threads oder um naturgemäß schwierige Probleme wie den Datenaustausch zwischen Threads und deren Synchronisation zur Ausführung einer globalen Aufgabe kümmern muss. Sie erleichtert somit die asynchrone Programmierung.

Ein Observable<T>-Stream erzeugt Elemente vom Typ T, die bei ihrer Erzeugung beobachtet werden können. Befinden sich der Beobachter und das Observable (ein Begriff, der hier im weiteren Sinne für den Typ Observable<T> verwendet wird) im selben Thread, kann das Observable das Element (i+1) erst dann erzeugen, wenn der Beobachter das Element i verarbeitet hat. Es gibt nur wenige Fälle, in denen diese Architektur sinnvoll ist. Befinden sich der Beobachter und das Observable nicht im selben Thread, verhalten sich das Observable und sein Beobachter autonom: Das Observable sendet in seinem eigenen Tempo, und der Beobachter verarbeitet in seinem eigenen Tempo. Hierin liegt der Wert der Bibliothek. Bislang haben wir nur einen einzelnen Beobachter behandelt. In der Realität kann ein Observable eine beliebige Anzahl von Beobachtern haben.

2.1. Die Architektur der Beispielanwendung

Die Beispielanwendung weist die folgende Architektur auf:

Image

  • In [1] generiert eine Service-Schicht Listen mit Zufallszahlen. Diese Schicht läuft im selben Thread wie die [swing]-Methode, die sie nutzt. Sie generiert ihre Zahlen dann synchron;
  • in [2] ermöglicht eine mit RxJava implementierte dünne Anpassungsschicht die asynchrone Bereitstellung desselben Dienstes für die [swing]-Schicht: Dieser Dienst kann in einem anderen Thread als die [swing]-Methode laufen, die ihn nutzt;
  • Der Aufruf [4] ist synchron, während die Aufrufe [5-6] asynchron sind;

Was wir hier zeigen wollen, ist, dass die Rx-Bibliothek es einfach macht, eine synchrone Schnittstelle in eine asynchrone umzuwandeln. Warum ist das nützlich? Ereignisse in einer Swing-Schnittstelle werden in einem Thread verarbeitet, der gemeinhin als Ereignisschleife bezeichnet wird. Ereignisse werden in eine Warteschlange gestellt und nacheinander verarbeitet. Das Ereignis Ei+1 kann erst verarbeitet werden, wenn das vorherige Ereignis Ei vollständig verarbeitet wurde. Es ist daher wichtig, dass die Ereignisbehandlung so kurz wie möglich ist, damit die GUI reaktionsfähig bleibt. Manchmal kann die Bearbeitung eines Ereignisses lange dauern. Dies ist der Fall, wenn die Bearbeitung einen Netzwerkzugriff erfordert. Wenn wir die GUI nicht in einer für den Benutzer inakzeptablen Weise einfrieren wollen, müssen diese Netzwerkoperationen in Threads ausgeführt werden, die von der Ereignisschleife getrennt sind, um diese zu entlasten. Dies führt uns in den Bereich der parallelen Programmierung (bei der mehrere Threads parallel laufen), die zu Recht als schwierig gilt. Die Rx-Bibliothek bietet eine einfache und elegante Lösung für dieses Problem.

Um lang andauernde Prozesse zu simulieren, liefert der Dienst im Beispiel seine Zufallszahlen nach einer bestimmten Verzögerung aus, sodass wir das Verhalten der grafischen Benutzeroberfläche beobachten können.

2.2. Die ausführbare Datei

Die ausführbare Datei für die Beispielanwendung befindet sich im Ordner [dvp/executables] der Beispiele:

Es gibt verschiedene Möglichkeiten, das Archiv [swing-01] auszuführen, je nach Konfiguration des verwendeten Rechners. Sie können beispielsweise die Schritte [1-3] befolgen. Dadurch wird die folgende grafische Benutzeroberfläche angezeigt:

 
  • Die Oberfläche verfügt über zwei Registerkarten [1-2]: eine [Request] zum Senden einer Anfrage an den Zufallszahlengenerator-Dienst und die andere [Response] zur Anzeige der empfangenen Zahlen;
  • Unter [3] geben Sie an, wie viele Anfragen Sie an den Dienst senden möchten;
  • Unter [4] geben Sie den gewünschten Bereich für die Zahlengenerierung [a,b] an;
  • In [5] wird die Anzahl der vom Dienst zurückgegebenen Werte auf eine Zufallszahl innerhalb des vom Benutzer festgelegten Intervalls [minCount, maxCount] gesetzt;
  • In [6] wartet der Dienst vor der Rückgabe seiner Antwort delay Millisekunden, wobei delay eine Zufallszahl innerhalb des vom Benutzer definierten Intervalls [minDelay, maxDelay] ist;
  • Standardmäßig verwendet die [swing]-Schicht die synchrone Schnittstelle des Dienstes. Um die asynchrone Schicht zu verwenden, muss der Benutzer [7] aktivieren. In diesem Fall läuft der Generierungsdienst in Threads, die von der Ereignisschleife der GUI getrennt sind. Die Rx-Bibliothek bietet verschiedene Strategien zur Erzeugung dieser Threads. Der Benutzer kann seine Strategie in [8] auswählen;
  • Die Zahlengenerierung erfolgt über die Schaltfläche [9];
 
  • [10] zeigt die Ergebnisse. Wir werden deren Struktur erläutern;
  • in [11] die Anzahl der erhaltenen Ergebnisse;
  • in [12] die Ausführungszeit in Millisekunden;
  • in [13] hat der Benutzer die Möglichkeit, die Ausführung abzubrechen;

Jedes Ergebnis hat das folgende Format:

{"idClient":0,"serviceResponse":{"delay":412,"aleas":[146,115,128,174,159,112,162,127],"executedOn":"RxComputationThreadPool-6"},"observedOn":"AWT-EventQueue-0","requestAt":"02:42:47:708","responseAt":"02:42:52:931"}
  • [idClient]: die Anfrage-ID. Beachten Sie, dass mehrere Anfragen an den Generierungsdienst gesendet werden;
  • [delay]: die Wartezeit in Millisekunden, die der Dienst vor dem Senden seines Ergebnisses vermerkt hat;
  • [aleas]: die vom Dienst zurückgegebenen Zufallszahlen;
  • [executedOn]: der Name des Threads, in dem der Dienst ausgeführt wurde;
  • [observedOn]: der Name des Threads, der das Ergebnis angezeigt hat. Bei einer Swing-Oberfläche kann dies nur der Event-Loop-Thread sein, hier [AWT-EventQueue-0];
  • [requestAt]: der Zeitpunkt der Anfrage im Format [Stunden:Minuten:Sekunden:Millisekunden];
  • [responseAt]: der Zeitpunkt des Erhalts der Ergebnisse im gleichen Format;

Wir stellen nun die Codeausschnitte vor, die zum Verständnis des Beispiels erforderlich sind.

2.3. Die synchrone Schnittstelle

Image

Die Service-Schicht [1] stellt die folgende Schnittstelle bereit:


public interface IService {
  // random numbers in [a,b]
  // n numbers are generated with random n in the interval [minCount, maxCount]
  // numbers are generated after a delay of milliseconds,
  // where [delay] is a random number in the range [minDelay, maxDelay]
  public ServiceResponse getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay);
}

Die [ServiceResponse] sieht wie folgt aus:


public class ServiceResponse {
 
  // service waiting time
  private int delay;
  // random numbers
  private List<Integer> aleas;
  // execution thread
  private String executedOn;
 
  // manufacturers
 
  public ServiceResponse(int delay, List<Integer> aleas) {
    executedOn = Thread.currentThread().getName();
    this.delay = delay;
    this.aleas = aleas;
  }
 
  // getters and setters
...
}

Die Antwort besteht aus drei Teilen:

  • Zeile 6: die generierten Zufallszahlen;
  • Zeile 4: die vom Dienst beobachtete Wartezeit vor der Rückgabe seines Ergebnisses;
  • Zeile 8: der Ausführungsthread des Dienstes;

2.4. Der synchrone Aufruf

Image

Wir werden nun den synchronen Aufruf [4] detailliert beschreiben, den die [swing]-Schicht an den [1]-Dienst sendet:


  private void doGenerateWithService() {
    // start waiting
    beginWaiting();
    try {
      for (int i = 0; i < nbRequests; i++) {
        UiResponse uiResponse = new UiResponse();
        uiResponse.setIdClient(i);
        uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
        uiResponse.setResponseAt();
        model.add(0, jsonMapper.writeValueAsString(uiResponse));
        jLabelNbReponses.setText(String.valueOf(Integer.parseInt(jLabelNbReponses.getText()) + 1));
      }
    } catch (JsonProcessingException | RuntimeException e) {
      System.out.println(e);
    }
    // end waiting
    endWaiting();
}
  • Zeilen 5–12: Die Schleife, die die vom Benutzer gestellten [nbRequests]-Anfragen verarbeitet;
  • Zeile 8: [service] ist die Implementierung der in Abschnitt 2.3 vorgestellten synchronen [IService]-Schnittstelle;
  • Zeile 10: [model] ist das Modell, das von der JList-Komponente der Registerkarte [Response] angezeigt wird. Die Elemente dieses Modells sind JSON-Strings mit Elementen vom Typ [UiResponse] wie folgt:

public class UiResponse {
 
  // customer id
  private int idClient;
  // service response
  private ServiceResponse serviceResponse;
  // observation thread name
  private String observedOn;
  // query time
  private String requestAt;
  // response time
  private String responseAt;
 
  // manufacturers
 
  public UiResponse() {
    observedOn = Thread.currentThread().getName();
    requestAt = getTimeStamp();
  }
  // private methods
 
  private String getTimeStamp() {
    return new SimpleDateFormat("hh:mm:ss:SSS").format(Calendar.getInstance().getTime());
  }
 
  // getters and setters
...
}
  • Zeile 6: die Antwort vom Dienst zur Nummerngenerierung;
  • Zeile 4: die Nummer der Anfrage, auf die geantwortet wird;
  • Zeile 8: der Thread, der diese Antwort anzeigt. Wie bereits erwähnt, ist dies immer der Event-Loop-Thread;
  • Zeilen 10 und 12: der Zeitpunkt der Anfrage und der Zeitpunkt der Antwort;

2.5. Testen synchroner Aufrufe

Wir führen die folgende Konfiguration aus:

 

Auf der Registerkarte [Response] erhalten wir folgende Ergebnisse:

 
  • In [1-2] haben wir tatsächlich 10 Antworten wie angefordert erhalten. Sie wurden in der Reihenfolge ihres Eintreffens an die erste Position eingefügt. Wir können sehen, dass sie in der Reihenfolge der Anfragen empfangen wurden;
  • Sie wurden alle im Event-Loop-Thread [AWT-EventQueue-0] ausgeführt und angezeigt. Die Anfragen wurden daher nacheinander in diesem Thread ausgeführt. Es gab keine gleichzeitigen Anfragen;
  • Was hier nicht sichtbar ist: Während der Ausführung ist die GUI eingefroren. So ist es beispielsweise nicht möglich, auf die Registerkarte [Response] zuzugreifen, um eingehende Antworten anzuzeigen, oder die Ausführung über die Schaltfläche [Cancel] zu stoppen. Selbst wenn diese Schaltfläche auf der Registerkarte [Request] vorhanden gewesen wäre, wäre sie unbrauchbar gewesen. Tatsächlich gäbe es dann zwei Ereignisse:
    • Klicken auf die Schaltfläche [Generate];
    • Klicken auf die Schaltfläche [Abbrechen];

Der Klick auf die Schaltfläche [Cancel] wird erst verarbeitet, nachdem der durch den Klick auf die Schaltfläche [Generate] ausgelöste Vorgang abgeschlossen ist. Wir haben gerade gesehen, dass dieser Vorgang den Event-Loop-Thread für die gesamte Dauer der Ausführung belegt hat und dadurch die Verarbeitung des Klicks auf die Schaltfläche [Cancel] verhindert hat. Dies ist typischerweise die Art von Situation, in der Rx eine erhebliche Verbesserung bieten kann;

2.6. Die asynchrone Schnittstelle und ihre Implementierung

Wir werden uns nun die Schnittstelle der Ebene [2] und ihre Implementierung mit Rx ansehen. Dies wird nicht sofort verständlich sein. Wir möchten lediglich die Einfachheit des Codes in dieser Implementierung hervorheben.

Image

Die asynchrone Schnittstelle sieht wie folgt aus:


public interface IRxService {
  // random numbers in [a,b]
  // n numbers are generated with random n in the interval [minCount, maxCount]
  // numbers are generated after a delay of milliseconds,
  // where [delay] is a random number in the range [minDelay, maxDelay]
  public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse);
}

Die Unterschiede zur in Abschnitt 2.3 vorgestellten synchronen Schnittstelle sind wie folgt:

  • Die in Abschnitt 2.3 vorgestellte Klasse [UiResponse] ist nun Teil der Parameter der Methode [getAleas] (Zeile 6). Der Grund dafür ist, dass die Anfragen nun parallel ausgeführt werden und der Dienst eine zufällige Zeit abwartet, bevor er sein Ergebnis zurückgibt, sodass die Antworten nicht in der Reihenfolge der Anfragen bei uns eintreffen. Wir übergeben daher das [UiResponse]-Objekt, das unter anderem die Anfrage-ID enthält:

  // id du client (requête)
  private int idClient;
  // réponse du service
  private ServiceResponse serviceResponse;
  // nom du thread d'observation
  private String observedOn;
  // heure de la requête
  private String requestAt;
  // heure de la réponse
  private String responseAt;
  • Der Antworttyp des asynchronen Dienstes ist vom Typ [Observable<UiResponse>]. Der Typ [Observable<>] wird von der Rx-Bibliothek bereitgestellt. Das Ergebnis [Observable<UiResponse>] zeigt an, dass die Methode [getAleas] einen Strom von Werten vom Typ [UiResponse] bereitstellt, die nacheinander an ihren Beobachter übermittelt werden;

Sehen wir uns nun die Implementierung dieser Schnittstelle an:


public class RxService implements IRxService {
 
  // service
  private IService service;
 
  // manufacturer
  public RxService(IService service) {
    this.service = service;
  }
 
  @Override
  public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse) {
    return Observable.create(subscriber -> {
      try {
        uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
        subscriber.onNext(uiResponse);
      } catch (Exception e) {
        subscriber.onError(e);
      } finally {
        subscriber.onCompleted();
      }
    });
  }
}
  • Zeilen 7–9: Wir übergeben dem Konstruktor eine Referenz auf die synchrone Schnittstelle [IService]. Diese Schnittstelle übernimmt die Erzeugung von Zufallszahlen;
  • das von der Methode [getAleas] zurückgegebene Observable wird durch die statische Methode [Observable.create] erstellt. Diese Methode ermöglicht es uns, aus einer synchronen eine asynchrone Implementierung zu erstellen;
  • Zeile 13: Der Parameter der statischen Methode [Observable.create] ist hier eine Lambda-Funktion, die einen Typ [Subscriber] als Parameter annimmt, wiederum einen Rx-Typ. Ein [Subscriber] ist ein Objekt, das einen Stream von Observables abonniert, d. h. einen Stream von asynchron gelieferten Daten. Hier verwenden wir drei Methoden dieses Subscribers:
    • [Subscriber.onNext], um Daten an ihn zu übergeben (Zeile 16);
    • [Subscriber.onError], um ihm eine Ausnahme zu senden (Zeile 18);
    • [Subscriber.onCompleted], um dem Subscriber mitzuteilen, dass der Datenstrom beendet ist (Zeile 20);

Es kann mehrere Subscriber für dasselbe Observable geben. Hier haben wir nur einen Subscriber, der einen Stream mit einem einzigen Datenelement abonniert, nämlich das in den Zeilen 15–16 erzeugte. Die Daten werden durch die synchrone Implementierung des Dienstes erzeugt (Zeile 15) und an den Subscriber zurückgegeben (Zeile 16).

Auch wenn all dies noch etwas undurchsichtig bleibt, kann man nicht umhin, von der extremen Prägnanz dieser asynchronen Implementierung des Dienstes beeindruckt zu sein.

2.7. Der asynchrone Aufruf

Image

Wir werden nun den synchronen Aufruf [5] untersuchen, den die [swing]-Schicht an den [2]-Dienst sendet:


private void doGenerateWithRxService() {
        // start waiting
        beginWaiting();
        // we ask for the random numbers
        Observable<UiResponse> observables = Observable.empty();
        for (int i = 0; i < nbRequests; i++) {
            UiResponse uiResponse = new UiResponse();
            uiResponse.setIdClient(i);
            // scheduler
            int schedulerIndex = jComboBoxSchedulers.getSelectedIndex();
            switch (schedulerIndex) {
            case 0:
                observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.io()));
                break;
...
            }
        }
...
    }
  • Zeilen 6–10: Ausführung der vom Benutzer angeforderten [nbRequests]-Anfragen;
  • Zeilen 7–8: Vorbereitung des [UiResponse]-Objekts, das von der [getAleas]-Methode des asynchronen Dienstes (Zeile 13) benötigt wird. Dabei wird hauptsächlich die [idClient] der Anfrage gespeichert;
  • Zeile 13: Die Methode [getAleas] des asynchronen Dienstes wird aufgerufen. Sie gibt ein [Observable<UiResponse>]-Objekt zurück. Dieser Aufruf löst den synchronen Dienst noch nicht aus. Kehren wir zum Code für das asynchrone [getAleas] zurück:

  @Override
  public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse) {
    return Observable.create(subscriber -> {
      try {
        uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
        subscriber.onNext(uiResponse);
      } catch (Exception e) {
        subscriber.onError(e);
      } finally {
        subscriber.onCompleted();
      }
    });
}

Der Code in den Zeilen 4–11, der den synchronen Dienst aufruft, wird nur ausgeführt, wenn sich ein Abonnent registriert. Solange es keine Abonnenten gibt, wird dieser Code nicht ausgeführt.

Kehren wir zum Code für die Methode [doGenerateWithRxService] zurück:

  • Zeile 5: Wir erstellen ein leeres Observable (es wird nichts beobachtet);
  • Zeile 13: Wir erstellen ein Observable, dessen Stream die Zusammenführung der asynchronen Streams [nbRequests] ist, die mit den [nbRequests]-Anfragen verbunden sind. Dies wird mithilfe der Methode [Observable.mergeWith] erreicht, die es ermöglicht, zwei asynchrone Streams zusammenzuführen. In der Rx-Terminologie wird [mergeWith] als Stream-Operator bezeichnet. Diese Operatoren zeichnen sich dadurch aus, dass das Ergebnis der Operation in den meisten Fällen ein weiteres [Observable] ist. Letztendlich bezieht sich die Variable [observables] nach Zeile 17 auf einen einzigen Stream, der aus den [nbRequests] asynchronen Antworten des asynchronen Dienstes besteht;
  • Zeile 13: Die Merge-Operation hätte wie folgt geschrieben werden können:

observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse));

aber wir haben geschrieben:


observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.io()));

Hier haben wir den Operator [subscribeOn] auf das Observable [rxService.getAleas] angewendet. Wie so oft ist das Ergebnis wiederum ein Observable. Der Operator [subscribeOn] legt fest, dass das Observable in einem von einem [Scheduler] bereitgestellten Thread ausgeführt werden muss. Es gibt mehrere mögliche [Scheduler], die für unterschiedliche Situationen geeignet sind. In der GUI haben wir mehrere Optionen bereitgestellt, um zu sehen, wie sie sich unterscheiden:

  

Daraus ergibt sich folgender Code:


    private void doGenerateWithRxService() {
        // start waiting
        beginWaiting();
        // we ask for the random numbers
        Observable<UiResponse> observables = Observable.empty();
        for (int i = 0; i < nbRequests; i++) {
            UiResponse uiResponse = new UiResponse();
            uiResponse.setIdClient(i);
            // scheduler
            int schedulerIndex = jComboBoxSchedulers.getSelectedIndex();
            switch (schedulerIndex) {
            case 0:
                observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.io()));
                break;
            case 1:
                observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.computation()));
                break;
            case 2:
                observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.newThread()));
                break;
            case 3:
                observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.trampoline()));
                break;
            case 4:
                observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.immediate()));
                break;
            }
        }
...
}

Schauen wir uns den Code in den Zeilen 12–14 noch einmal an. Der Scheduler [Schedulers.io()] weist jedem Observable einen neuen Thread zu. Wenn wir dem Code folgen:

  • Zeile 5: Wir haben ein leeres Observable;
  • Zeile 13, Iteration 1: observables ist die Liste [observable0/thread0] (Observable observable0 läuft auf Thread thread0);
  • Zeile 13, Iteration 2: „observables“ ist die Liste [observable0/thread0, observable1/thread1];
  • usw...

Letztendlich haben wir nach Zeile 28 ein Observable, das aus der Zusammenführung von [nbRequests] Observables resultiert, die auf [nbRequests] verschiedenen Threads laufen. Nicht alle Scheduler funktionieren auf diese Weise, wie wir beim Testen sehen werden.

Schauen wir uns den Code für den Aufruf des asynchronen Dienstes weiter an:


private void doGenerateWithRxService() {
        // start waiting
        beginWaiting();
        // we ask for the random numbers
        Observable<UiResponse> observables = Observable.empty();
        for (int i = 0; i < nbRequests; i++) {
        ...
        }
        // observer
        observables = observables.observeOn(SwingScheduler.getInstance());
        // these observables are executed
        subscriptions.add(observables.subscribe(uiResponse -> {
            updateUi(uiResponse);
        } , th -> {
            System.out.println(th);
            doCancel();
        } , this::doCancel));
    }
  • Wir haben gesehen, dass wir bei Erreichen von Zeile 10 ein einzelnes Observable haben, eine Zusammenführung von [nbRequests] Observables, die je nach dem vom Benutzer gewählten Scheduler auf [nbRequests] verschiedenen Threads ausgeführt werden können oder auch nicht;
  • Zeile 10: Mit dem Operator [observeOn] können wir festlegen, in welchem Thread wir die Daten aus dem Observable abrufen möchten, in diesem Fall die Objekte [nbRequests] vom Typ [UiResponse]. In einer Swing-Oberfläche haben wir keine andere Wahl. Jede Aktualisierung der Schnittstelle muss im Event-Loop-Thread erfolgen. Hier werden die Daten des Observables in einer Swing-JList-Komponente angezeigt. Der Thread [SwingScheduler.getInstance()] stellt den Event-Loop-Thread dar. Die Klasse [SwingScheduler] stammt nicht aus der RxJava-Bibliothek, sondern aus der RxSwing-Bibliothek;
  • wenn wir Zeile 12 erreichen, wurde der synchrone Dienst noch nicht aufgerufen, da das Observable in Zeile 10 noch keinen Abonnenten hat. Die Zeilen 12–17 stellen mithilfe des [subscribe]-Operators einen bereit. Die Parameter dieses Operators sind drei Lambda-Funktionen:
    • Die erste [uiResponse -> {updateUi(uiResponse);}] nimmt als Parameter eines der vom Observable erzeugten [UiResponse]-Objekte entgegen. Zur Erinnerung: Wir werden hier [nbRequests] Objekte dieses Typs haben. Die zugehörige Methode, in diesem Fall updateUi, muss dieses Ergebnis verarbeiten;
    • die zweite [th -> {System.out.println(th);doCancel();}] nimmt einen Typ [Throwable] als Parameter entgegen, in diesem Fall eine Ausnahme, die während der Ausführung des Observables aufgetreten ist. Die zugehörige Methode muss diese Information verarbeiten. Hier zeigen wir sie auf der Konsole an (Zeile 15) und brechen die Ausführung ab, wodurch bestimmte Elemente der GUI aktualisiert werden;
    • das dritte [this::doCancel] wird aufgerufen, wenn das Observable signalisiert, dass es keine Daten mehr zu übertragen hat. Hier ist das Observable die Vereinigung von [nbRequests] Observables. Das resultierende Observable zeigt an, dass es fertig ist, wenn alle Observables, aus denen es besteht, selbst signalisiert haben, dass sie ihre Arbeit beendet haben. Wenn also diese dritte Lambda-Funktion ausgeführt wird, haben wir alle Daten empfangen. Die lokale Methode [doCancel] aktualisiert die GUI, um anzuzeigen, dass die Ausführung abgeschlossen ist;

Die Variable [subscriptions] ist wie folgt definiert:


    // les souscriptions aux observables
protected List<Subscription> subscriptions = new ArrayList<Subscription>();

Der Typ [Subscription] repräsentiert ein Abonnement, d. h. die Verbindung zwischen einem Abonnenten [Subscriber] und dem, was dieser beobachtet [Observable]. Wir haben hier eine Liste von Abonnements verwendet, obwohl es in diesem Beispiel nur eines gibt. Die lokale Methode [doCancel], die ausgeführt wird, wenn das Observable signalisiert, dass es keine Daten mehr zu übertragen hat, lautet wie folgt:


    @Override
    protected void doCancel() {
        // fin attente
        endWaiting();
        // dans le cas de souscriptions
        if (jCheckBoxRxSwing.isSelected() && subscriptions != null) {
            subscriptions.forEach(Subscription::unsubscribe);
        }
}
  • Zeile 7 meldet alle Abonnenten vom Observable ab;

Aus dieser kurzen Erklärung lassen sich folgende wichtige Punkte ableiten:

  • Der Typ [Observable] bezeichnet einen Strom von Werten, die nacheinander an Abonnenten oder Beobachter übermittelt werden;
  • Der Typ [Subscriber] bezeichnet einen Abonnenten vom Typ [Observable];
  • Der Typ [Subscription] bezeichnet ein Abonnement, d. h. die Verbindung zwischen einem [Subscriber] und einem [Observable];
  • der Typ [Observable] unterstützt Operatoren [mergeWith, empty, subscribeOn, observeOn, ...], von denen die meisten Observables erzeugen. Diese Operatoren werden verwendet, um das Observable vor seiner Ausführung zu konfigurieren:
    • was beobachtet werden soll;
    • auf welchem Thread das Observable ausgeführt wird;
    • der Thread, auf dem der Abonnent Daten vom Observable empfängt;
  • Es gibt zwei Arten von Observables: [cold] und [hot]. Ein Cold-Observable wird für jeden neuen Subscriber vollständig ausgeführt. Wenn jede Ausführung dieselben Daten erzeugt, erhält jeder neue Subscriber dieselben Daten wie der vorherige. Ein Hot-Observable erzeugt im Allgemeinen kontinuierlich Daten. Wenn sich ein Subscriber anmeldet, erhält er die Daten, die ab dem Zeitpunkt seiner Anmeldung ausgegeben wurden. Er erhält keine Daten, die möglicherweise zuvor ausgegeben wurden. In unserem Beispiel ist das Observable „cold“: Es wird für jeden neuen Abonnenten vollständig neu ausgeführt. Was wird in unserem Beispiel tatsächlich ausgeführt? Um das herauszufinden, müssen wir zur Definition des beobachteten Observables zurückkehren:

  @Override
  public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse) {
    return Observable.create(subscriber -> {
      try {
        uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
        subscriber.onNext(uiResponse);
      } catch (Exception e) {
        subscriber.onError(e);
      } finally {
        subscriber.onCompleted();
      }
    });
}

Für jeden neuen Abonnenten wird die Lambda-Funktion, die ein Parameter der Methode [Observable.create] ist (Zeile 3), erneut ausgeführt. Daher werden die Zeilen 4–11 für jeden neuen Abonnenten [subscriber] ausgeführt;

2.8. Testen asynchroner Aufrufe

Zunächst demonstrieren wir die Auswirkungen der verschiedenen verfügbaren Scheduler. Dazu verwenden wir die folgenden Parameter:

 

Wir setzen [1-2] auf kleine Werte, damit wir auch dann nicht zu lange warten müssen, wenn die Anfragen auf demselben Thread ausgeführt werden.

2.8.1. mit dem [Schedulers.io]-Scheduler

 

Folgende Punkte sind zu beachten:

  • Die Antworten werden in einer Reihenfolge empfangen, die nicht mit der Reihenfolge der Anfragen übereinstimmt (siehe idClient);
  • jede Anfrage wurde in einem anderen Thread ausgeführt;
  • die Benutzeroberfläche ist diesmal nicht mehr eingefroren:
    • Man kann zwischen den Registerkarten wechseln;
    • wir sehen, wie die Daten eingehen;
    • es bleibt keine Zeit, die Schaltfläche [Abbrechen] zu sehen, da die Ausführung zu schnell erfolgt. Wir werden dies in einem weiteren Test hervorheben;

2.8.2. mit dem Scheduler [Schedulers.computation]

 

Folgende Punkte sind zu beachten:

  • Die Antworten werden in einer Reihenfolge empfangen, die nicht mit der Reihenfolge der Anfragen übereinstimmt (siehe idClient);
  • die Anfragen wurden in 8 Threads ausgeführt;
  • Thread Nr. 3 wurde für die Anfragen 8 und 0 verwendet;
  • Thread Nr. 4 wurde für die Anfragen 9 und 1 verwendet;
  • Die anderen Anfragen hatten jeweils einen eigenen Thread;

Der Scheduler [Schedulers.computation] verwendet so viele Threads, wie es Kerne auf dem verwendeten Rechner gibt. Diese Information wird über den Ausdruck [Runtime.getRuntime().availableProcessors()] abgerufen.

2.8.3. mit dem Scheduler [Schedulers.newThread]

 

Das Verhalten ähnelt dem des Schedulers [Schedulers.io].

2.8.4. mit den Schedulern [Schedulers.trampoline, Schedulers.immediate]

 

Das Verhalten ist synchron. Alle Anfragen werden auf dem Event-Loop-Thread ausgeführt. Dieses Ergebnis sollte nicht verallgemeinert werden; es bedeutet vielmehr lediglich, dass in diesem konkreten Beispiel beide Scheduler synchron arbeiteten.

2.9. Randfälle

In diesem Beispiel arbeiten wir mit Schedulern, die asynchronen Betrieb unterstützen. Zunächst erhöhen wir die Anzahl der Anfragen auf 100 unter Verwendung des Schedulers [Schedulers.computation], der hier auf 8 Threads läuft. Wir erhalten das folgende Ergebnis:

 
  • in [1] ist die Schaltfläche [Cancel] vorhanden und kann verwendet werden (asynchroner Betrieb);

Lassen wir nun die Ausführung bis zum Ende laufen:

 

In [2] sehen wir, dass die Ausführung der 100 Anfragen etwa 4 Sekunden gedauert hat (verteilt auf 8 Threads).

Führen wir nun dieselben 100 Anfragen mit dem Scheduler [Schedulers.newThread] aus, der jede Anfrage in einem separaten Thread ausführt:

 

In [1] sehen wir, dass die Ausführung der 100 Anfragen (über 100 Threads) eine halbe Sekunde gedauert hat. Dies ist somit deutlich schneller als mit dem Scheduler [Schedulers.computation].

Führen wir nun unter denselben Bedingungen 800 Anfragen durch, wobei wir weiterhin den Scheduler [Schedulers.newThread] verwenden. Wir erhalten folgende Ergebnisse:

 

Die 800 Anfragen werden in etwa 1 Sekunde ausgeführt.

Wenn wir diese Zahl erhöhen (auf meinem Rechner über 2.500 Anfragen – ausgeführt in 1,5 Sekunden – wobei diese Zahl natürlich stark von der Laufzeitumgebung abhängt), erhalten wir schließlich die folgende Ausnahme:

  

Wir haben es also mit einem Stapelüberlauf zu tun. Tests zeigen, dass das Verhalten des Schedulers [Schedulers.newThread] nicht deterministisch ist. Es kann vorkommen, dass die oben genannte Ausnahme auftritt, Sie neue Tests ausführen, dann zu der Konfiguration zurückkehren, die die Ausnahme verursacht hat, und diese Ausnahme nicht mehr auftritt.

2.10. Fazit

Wir haben ein Beispiel für die Verwendung der Rx-Bibliothek gezeigt. Fassen wir zusammen, was wir gelernt haben:

Wir sind mit der folgenden Architektur gestartet:

Image

  • In [4] führte die [swing]-Schicht synchrone Aufrufe an die [service]-Schicht durch;
  • in [5] führte die [swing]-Schicht asynchrone Aufrufe an die [rxService]-Schicht durch, die wiederum einen synchronen Aufruf [6] an die [service]-Schicht ausführte;

Als Erstes stellten wir fest, dass die Rx-Bibliothek es einfach machte, die asynchrone [rxService]-Schnittstelle aus der synchronen [service]-Schnittstelle zu erstellen (siehe Abschnitt 2.4). Dies ist eine wichtige Erkenntnis, da es bedeutet, dass wir eine synchrone Anwendung leicht zu einer asynchronen weiterentwickeln können.

In der [swing]-Schicht wurden zwei separate Methoden geschrieben:

  • eine, um synchrone Aufrufe an den Dienst zu senden (siehe Abschnitt 2.4);
  • die andere, um asynchrone Aufrufe an ihn zu senden (siehe Abschnitt 2.7);

Das Schreiben asynchroner Aufrufe hat sich als deutlich komplexer erwiesen als das Schreiben synchroner Aufrufe. Dennoch werden diejenigen, die bereits mit paralleler Programmierung gearbeitet haben, bei der mehrere Threads synchronisiert werden müssen, feststellen, dass die Rx-Lösung einfacher zu schreiben ist und alle schwierigen Probleme der Synchronisation und der Kommunikation zwischen Threads vermeidet. In diesem Artikel haben wir die folgenden Kernpunkte hervorgehoben:

  • Der Typ [Observable] bezeichnet einen Strom von Ereignissen (Werten), die asynchron sein können (müssen aber nicht) und die beobachtet werden können;
  • Der Typ [Subscriber] bezeichnet einen Abonnenten eines Typs [Observable];
  • der Typ [Subscription] bezeichnet ein Abonnement, d. h. die Verbindung zwischen einem [Subscriber] und einem [Observable];
  • Der Typ [Observable] unterstützt Operatoren [mergeWith, empty, subscribeOn, observeOn, ...], die meist Observables erzeugen. Diese Operatoren dienen dazu, das Observable vor seiner Ausführung zu konfigurieren:
    • was beobachtet werden soll;
    • auf welchem Thread das Observable ausgeführt wird;
    • der Thread, auf dem der Abonnent Daten vom Observable empfängt;
  • Es gibt zwei Arten von Observables: [cold] und [hot]. Ein Cold-Observable wird für jeden neuen Abonnenten vollständig ausgeführt. Wenn jede Ausführung dieselben Daten erzeugt, erhält jeder neue Abonnent dieselben Daten wie der vorherige. Ein Hot-Observable erzeugt im Allgemeinen kontinuierlich Daten. Wenn sich ein Abonnent anmeldet, erhält er die Daten, die ab dem Zeitpunkt seiner Anmeldung ausgegeben wurden. Er erhält keine Daten, die möglicherweise zuvor ausgegeben wurden. In unserem Beispiel ist das Observable „cold“: Es wird für jeden neuen Abonnenten vollständig neu ausgeführt.

Nachdem wir nun ein Beispiel gesehen haben, das den Nutzen der Rx-Bibliothek verdeutlicht, werden wir sie genauer untersuchen.

Die Rx-Bibliothek verfügt über viele Methoden mit generischen Parametern in ihren Signaturen. Wir werden diese Signaturen kurz durchgehen (Abschnitt 3). Die Parameter dieser Methoden sind meist funktionale Schnittstellen (Java 8), d. h. Schnittstellen mit nur einer einzigen Methode. Die tatsächlichen Parameter müssen daher Instanzen dieser Schnittstellen sein. Vor Java 8 war es gängige Praxis, eine Schnittstelle mithilfe einer anonymen Klasse zu implementieren. In Java 8 ist es bei funktionalen Schnittstellen prägnanter, diese mithilfe einer Lambda-Funktion zu implementieren. Wir werden diese daher vorstellen (Abschnitt 4). Anschließend stellen wir die Klasse [Stream] vor (Abschnitt 5), mit der Sie Java-Sammlungen mithilfe von Lambda-Funktionen verarbeiten können. Diese Klasse ist interessant, da die Klasse [Observable] von RxJava

  • bestimmte Methoden;
  • die gleiche Art, Methoden miteinander zu verketten, um dasselbe Observable zu verarbeiten;

Anschließend werden wir die funktionalen Schnittstellen vorstellen, die spezifisch für die RxJava-Bibliothek sind (Abschnitt 6). Wir fahren fort mit den Hauptelementen der Rx-Bibliothek [Observable, Subscriber, Subscription, Operatoren] (Abschnitt 7). Die Klasse [Observable] verfügt über Dutzende von Operatoren, die ihrerseits mehrfach überladen sind. Dies führt zunächst zu einer erheblichen Komplexität, da sich diese Operatoren und ihre Überladungen manchmal nur in einem einzigen Detail unterscheiden und es ohne Erfahrung schwierig ist, zu wissen, welcher Operator zu verwenden ist. Wir werden nur eine begrenzte Anzahl von Operatoren vorstellen und ihre Überladungen meist außer Acht lassen.

Der gesamte vorangegangene Abschnitt wird anhand der RxJava-Bibliothek in einfachen Konsolenanwendungen behandelt. Sobald wir die RxJava-Bibliothek beherrschen, werden wir sie in zwei Arten von grafischen Anwendungen einsetzen:

  • In Abschnitt 8 werden wir die Beispiel-Swing-Anwendung erneut betrachten, um sie genauer zu untersuchen. Wir werden dann die RxSwing-Bibliothek verwenden;
  • in Abschnitt 9 erstellen wir eine Android-Anwendung unter Verwendung der RxAndroid-Bibliothek;

Sobald all dies erledigt ist, verfügt der Leser über die Werkzeuge, um auf eigenen Beinen zu stehen. Es wird wahrscheinlich einige Zeit dauern, bis er die Rx-Bibliothek intuitiv nutzen kann. Ich fand diese Bibliothek besonders interessant. Allerdings empfand ich sie als schwer verständlich, und die Lernkurve war steil. Ich hoffe, dass dieses Dokument diese Lernkurve für den Leser verkürzt. Meiner Meinung nach ist der Aufwand die Mühe wert.