Skip to content

7. Die RxJava-Bibliothek

Die RxJava-Bibliothek basiert auf folgendem Konzept: Ein Stream von Elementen des Typs T Observable<T> wird von einem oder mehreren Abonnenten (Subscribers, Observers, Consumers) vom Typ Subscriber<T> beobachtet. Die RxJava-Bibliothek ermöglicht es, den Observable<T>-Stream im Thread T1 und seinen Subscriber<T>-Beobachter im Thread T2 auszuführen, ohne dass sich der Entwickler um die Verwaltung des Lebenszyklus dieser Threads oder um naturgemäß schwierige Probleme wie den Datenaustausch zwischen Threads und deren Synchronisation zur Ausführung einer globalen Aufgabe kümmern muss. Sie erleichtert somit die asynchrone Programmierung.

Ein Observable<T>-Stream erzeugt Elemente vom Typ T, die bei ihrer Erzeugung beobachtet werden können. Befinden sich der Beobachter und das Observable (ein Begriff, der im weiteren Sinne für den Typ Observable<T> verwendet wird) im selben Thread, kann das Observable das Element (i+1) erst dann erzeugen, wenn der Beobachter das Element i verarbeitet hat. Es gibt nur wenige Fälle, in denen diese Architektur sinnvoll ist. Befinden sich der Beobachter und das Observable nicht im selben Thread, verhalten sich das Observable und sein Beobachter autonom: Das Observable sendet in seinem eigenen Tempo, und der Beobachter verarbeitet in seinem eigenen Tempo. Hierin liegt der Wert der Bibliothek. Bislang haben wir nur einen einzelnen Beobachter behandelt. In der Realität kann ein Observable eine beliebige Anzahl von Beobachtern haben.

Die RxJava-Bibliothek eignet sich besonders gut für die in Abschnitt 2 der Einleitung beschriebene und hier zusammengefasste Architektur:

Image

  • In [1] stellt eine Service-Schicht Dienste bereit, von denen einige lange Zeit in Anspruch nehmen (z. B. Netzwerkanfragen);
  • diese Service-Schicht wird von einer grafischen Benutzeroberfläche [1] (Swing, Android, JavaFX) aufgerufen. Wenn die Service-Schicht im selben Thread läuft wie die [Swing]-Methode, die sie nutzt, friert die grafische Benutzeroberfläche ein (reagiert nicht mehr), während sie auf das Service-Ergebnis wartet;
  • In [2] ermöglicht eine mit RxJava implementierte dünne Anpassungsschicht, der GUI-Schicht eine asynchrone Implementierung desselben Dienstes bereitzustellen: Dieser Dienst kann in einem anderen Thread als die Methode der GUI-Schicht laufen, die ihn aufruft. In diesem Fall bleibt die GUI [3] reaktionsfähig: Der Benutzer kann weiterhin mit ihr interagieren, beispielsweise indem er parallel zur ersten eine neue Netzwerkanfrage auslöst, und – was am wichtigsten ist – dem Benutzer kann die Möglichkeit gegeben werden, Prozesse abzubrechen, die zu lange dauern – was unmöglich wäre, wenn die GUI eingefroren wäre;
  • Der Aufruf [4] ist synchron, während die Aufrufe [5-6] asynchron sind;

In dieser Architektur stellt die Ebene [2] Dienste bereit, die Observable<T>-Typen zurückgeben, die von den Methoden der grafischen Ebene [3] abonniert werden können. Ein Dienst in Ebene [2] liefert dann seine Ergebnisse nacheinander, und Ebene [3] kann auf jedes einzelne reagieren, beispielsweise durch Aktualisierung einer oder mehrerer Komponenten der grafischen Benutzeroberfläche.

Die Klasse Observable&lt;T&gt; verfügt über Dutzende von Methoden. Dies ist eine der Herausforderungen der Bibliothek: Sie ist sehr umfangreich, und es ist schwierig, alle ihre Möglichkeiten zu erfassen. Wir werden einige davon vorstellen. Die Beherrschung der anderen Methoden wird mit der Zeit kommen.

7.1. Observables erstellen und abonnieren

7.1.1. Beispiel-01: die Methode [Observable.from]

  

Betrachten Sie den folgenden Code:


package dvp.rxjava.observables;
 
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
 
import java.util.Arrays;
 
public class Exemple01 {
  public static void main(String[] args) {
    // observable integers
    Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
    obs1.subscribe(new Action1<Integer>() {
      @Override
      public void call(Integer integer) {
        System.out.printf("next : %s%n", integer);
      }
    }, new Action1<Throwable>() {
      @Override
      public void call(Throwable throwable) {
        System.out.println(throwable);
      }
    }, new Action0() {
      @Override
      public void call() {
        System.out.println("completed");
      }
    });
  }
}
  • Zeile 12: Wir erstellen einen Typ „Observable<Integer>“ aus einer Liste von Ganzzahlen.

Die Klasse *Observable&lt;T&gt;* ist ein Strom von Elementen des Typs T, die – vorzugsweise asynchron, aber nicht zwingend – bei ihrer Erzeugung beobachtet werden können. Ihre Definition lautet wie folgt:

 

Wie bereits erwähnt, verfügt die Klasse Observable&lt;T&gt; über Dutzende von Methoden. Einige ähneln denen der Klasse *Stream&lt;T&gt;*, die in Abschnitt 5 behandelt wurde. Die RxJava-Dokumentation enthält „Marble-Diagramme“ [2], die veranschaulichen, wie diese Methoden funktionieren:

  • Zeile 3 veranschaulicht die Emissionen des Observables im Zeitverlauf;
  • die Methode [4] wird auf die vom Observable emittierten Elemente angewendet. Sie erzeugt in der Regel ein neues Observable;
  • Zeile 5 zeigt das erhaltene neue Observable;

Die Methode [Observable.from] hat die folgende Signatur:

 

Mit der statischen Methode [Observable.from] können Sie aus einer Sammlung von Elementen des Typs T ein Observable<T> erstellen. Dies ist eine sehr einfache Möglichkeit, um mit Observables zu beginnen. Die Zeile:


    Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));

gibt daher drei Elemente aus. Sie gibt diese jedoch nicht sofort aus. Sie gibt sie jedes Mal vollständig aus, wenn sich ein Abonnent registriert. Dies wird als „Cold Observable“ bezeichnet. Das Observable gibt seine Elemente für jeden neuen Abonnenten erneut aus.

Wir können uns die vorherige Anweisung als eine Konfigurationsaktion für das Observable vorstellen. Es wird einmal konfiguriert und n-mal ausgeführt, wenn n Abonnenten erscheinen.

Wie abonniert man?

Eine Möglichkeit hierfür ist die Verwendung der Methode [Observable.subscribe], deren Definition hier wie folgt lautet:

 
  • Der erste Parameter [Action1<T> onNext] (siehe Abschnitt 6.2) der Methode ist die Methode, die ausgeführt werden soll, wenn das Observable ein neues Element T ausgibt;
  • der zweite Parameter [Action1<Throwable> onError] der Methode ist die Methode, die ausgeführt werden soll, wenn das Observable eine Ausnahme auslöst;
  • der dritte Parameter [Action0 onComplete] (siehe Abschnitt 6.1) der Methode ist die Methode, die ausgeführt werden soll, wenn das Observable eine Ausnahme auslöst;
  • die Methode gibt einen Typ [Subscription] zurück;

Der Typ [Subscription] stellt ein Abonnement für das Observable dar. Seine Definition lautet wie folgt:

 

Der Wert dieser Schnittstelle [1] liegt in ihrer Methode [2], die es ermöglicht, ein Abonnement zu kündigen.

In unserem Beispiel lautet der Code zum Abonnieren des Observables wie folgt:


    obs1.subscribe(new Action1<Integer>() {
      @Override
      public void call(Integer integer) {
        System.out.printf("next : %s%n", integer);
      }
    }, new Action1<Throwable>() {
      @Override
      public void call(Throwable throwable) {
        System.out.println(throwable);
      }
    }, new Action0() {
      @Override
      public void call() {
        System.out.println("completed");
      }
});
  • Zeile 1: Das Ergebnis vom Typ [Subscription] wird ignoriert;
  • Zeilen 1–15: Die drei Parameter sind Instanzen anonymer Klassen. Wir werden auch Lambdas verwenden. Der Vorteil anonymer Klassen besteht darin, dass die von der einzigen Methode dieser Klassen erwarteten Datentypen klar erkennbar sind;
  • Zeilen 2–5: Implementierung des ersten Parameters vom Typ [Action1<Integer>];
  • Zeilen 6–10: Implementierung des zweiten Parameters vom Typ [Action1<Throwable>];
  • Zeilen 11–15: Implementierung des dritten Parameters vom Typ [Action0];

Der vollständige Code lautet wie folgt:


package dvp.rxjava.observables;
 
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
 
import java.util.Arrays;
 
public class Exemple01 {
  public static void main(String[] args) {
    // observable integers
    Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
    // subscription
    obs1.subscribe(new Action1<Integer>() {
      @Override
      public void call(Integer integer) {
        System.out.printf("next : %s%n", integer);
      }
    }, new Action1<Throwable>() {
      @Override
      public void call(Throwable throwable) {
        System.out.println(throwable);
      }
    }, new Action0() {
      @Override
      public void call() {
        System.out.println("completed");
      }
    });
  }
}

Das Observable in Zeile 12 beginnt mit der Ausgabe seiner drei Elemente, sobald die Methode [subscribe] in Zeile 14 aufgerufen wird. Ab diesem Zeitpunkt:

  • werden für jedes ausgegebene Element die Zeilen 15–18 ausgeführt.
  • Wenn die 3 Elemente ausgegeben sind, werden die Zeilen 24–29 ausgeführt;
  • die Zeilen 19–24 werden niemals ausgeführt, da das Observable hier keine Ausnahme auslöst;

Standardmäßig laufen das Observable und der Observer im selben Thread. Es gibt einige vordefinierte Observables, die in einem anderen Thread als dem Hauptthread (hier dem Thread der main-Methode) laufen, aber für die meisten ist dies nicht der Fall. Hier geschieht also alles im Thread der main-Methode:

  • Das Observable gibt das Element 1 aus;
  • die Zeilen 15–18 werden ausgeführt und zeigen dieses Element an;
  • Das Observable gibt das Element 2 aus;
  • Die Zeilen 15–18 führen dieses Element aus und zeigen es an;
  • Das Observable gibt Element 3 aus;
  • Die Zeilen 15–18 führen dieses Element aus und zeigen es an;
  • das Observable sendet die Benachrichtigung [completed];
  • die Zeilen 24–29 werden ausgeführt;

Das Ergebnis sieht wie folgt aus:

1
2
3
4
next : 1
next : 2
next : 3
completed

Die Klasse [Example02] greift auf [Example01] zurück, wobei diesmal Lambda-Funktionen als Parameter für die Methode [Observable.subscribe] verwendet werden:


package dvp.rxjava.observables;
 
import java.util.Arrays;
 
import rx.Observable;
 
public class Exemple02 {
  public static void main(String[] args) {
    // observable integers
    Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
    // subscription
    obs1.subscribe(
      (integer) -> System.out.printf("next : %s%n", integer),
      (th) -> System.out.println(th),
      () -> System.out.println("completed"));
  }
}

7.1.2. Beispiel-03: Die Observer-Klasse

  

Die Methode [Observable.subscribe], mit der Sie ein Observable abonnieren können, gibt es in mehreren Varianten, darunter die folgenden:


package dvp.rxjava.observables;
 
import java.util.Arrays;
 
import rx.Observable;
import rx.Observer;
 
public class Exemple03 {
    public static void main(String[] args) {
        // observable integers
        Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
        // subscription
        obs1.subscribe(new Observer<Integer>() {
            @Override
            public void onCompleted() {
                System.out.println("completed");
            }
 
            @Override
            public void onError(Throwable th) {
                System.out.printf("throwable %s", th);
            }
 
            @Override
            public void onNext(Integer integer) {
                System.out.printf("next : %s%n", integer);
            }
        });
    };
}

Zeile 13: Anstatt drei Parameter an die [subscribe]-Methode zu übergeben, übergeben wir ihr wie folgt einen Typ [Observer]:

 

Der Typ [Observer] ist eine Schnittstelle mit drei Methoden:

  • [onNext(T t)], die jedes Mal aufgerufen wird, wenn das Observable ein Element t ausgibt;
  • [onError(Throwable th)], die aufgerufen wird, wenn das Observable eine Ausnahme th auslöst;
  • [onCompleted], die aufgerufen wird, wenn das Observable anzeigt, dass es die Ausgabe beendet hat;

Der Code funktioniert ähnlich wie zuvor erläutert. Wir erhalten folgende Ergebnisse:

1
2
3
4
next : 1
next : 2
next : 3
completed

7.1.3. Beispiel 04: Die Methode [Observable.create]

  

Die statische Methode *Observable.create* ist wie folgt definiert:

 
  • Die Methode [create] gibt einen Typ Observable<T> zurück;
  • Der Parameter der [create]-Methode ist eine Funktion vom Typ [Observable.OnSubscribe<T>], die wie folgt definiert ist:
 

Der Typ [Observable.OnSubscribe<T>] ist eine funktionale Schnittstelle, die selbst die funktionale Schnittstelle [Action1<Subscriber<? super T>>] erweitert. Die Methode [call] dieser Schnittstelle erwartet einen Typ [Subscriber] (subscriber, observer), der wie folgt definiert ist:

 

In [1] sehen wir, dass die Klasse [Subscriber<T>] die in Abschnitt 7.1.2 vorgestellte Schnittstelle [Observer<T>] implementiert.

Letztendlich nimmt die Methode [<T> Observable.create]:

  • als Parameter eine Instanz vom Typ [Observable.OnSubscribe<T>] mit einer einzigen Methode: void call(Subscriber<T> s). Der Typ [Subscriber<T>] erweitert den Typ [Observer<T>] und verfügt daher über die Methoden onNext, onError und onCompleted;
  • gibt einen Typ Observable<T> zurück;

Die Methode [<T> Observable.create] gibt ein konfiguriertes Observable zurück. Es wurden noch keine Elemente ausgegeben. Wenn ein Abonnent [Subscriber<T> s] dieses Observable abonniert, wird die Methode [void call(s)] der Funktion aufgerufen, die als Parameter an die Methode [<T> Observable.create] übergeben wurde. Ihre Aufgabe ist es, Elemente t vom Typ T auszugeben und bei jeder Ausgabe die Methode [s.onNext(t)] des Observers aufzurufen. Wenn dies abgeschlossen ist, muss die Methode [s.onCompleted(t)] des Beobachters aufgerufen werden und die Methode [call] muss beendet werden. Wenn die Methode [call] auf eine Ausnahme th stößt, muss die Methode [s.onError(th)] des Beobachters aufgerufen werden und die Methode [call] muss beendet werden;

Um dieses komplexe Verhalten zu veranschaulichen, verwenden wir den folgenden Code [Beispiel04]:


package dvp.rxjava.observables;
 
import rx.Observable;
import rx.Subscriber;
 
import java.util.Random;
 
public class Exemple04 {
    public static void main(String[] args) {
        // observable configuration of reals
        Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
            @Override
            public void call(Subscriber<? super Double> subscriber) {
                for (int i = 0; i < 3; i++) {
                    // emission element i
                    subscriber.onNext(new Random((i + 1)).nextDouble());
                }
                // end of issue
                subscriber.onCompleted();
            }
        });
        // subscription and therefore emission
        obs1.subscribe((d) -> System.out.printf("onNext %s%n", d), (th) -> System.out.printf("onError %s%n", th),
                () -> System.out.println("onCompleted"));
    }
}
  • Zeile 11: Es wird ein Observable erstellt, das Double-Typen ausgibt;
  • Zeilen 11–21: Der Parameter der Methode [create] wird mit einer anonymen Klasse instanziiert, die die einzige Methode [call] aus den Zeilen 12–20 enthält. Das in Zeile 11 erstellte Observable ist bereit zum Emittieren, wird jedoch erst dann emittieren, wenn ein Observer eintrifft;
  • Zeilen 13–21: Die [call]-Methode erhält eine Referenz auf einen Beobachter;
  • Zeilen 14–17: Drei Elemente werden an den Beobachter ausgegeben;
  • Zeile 19: Benachrichtigt den Beobachter, dass die Ausgabe abgeschlossen ist;
  • Zeilen 23–24: Abonnement des Observables aus Zeile 11. Wir implementieren die drei Parameter [onNext, onError, onCompleted] der [subscribe]-Methode mithilfe von drei Lambdas. Dieses Abonnement erstellt den Abonnenten [Subscriber<Double>], der in Zeile 13 an die [call]-Methode übergeben wird. Die Ausgabe von Elementen beginnt dann;
  • alles geschieht im selben Thread: Observable und Observer;

Wir erhalten die folgenden Ergebnisse:

1
2
3
4
onNext 0.7308781907032909
onNext 0.7311469360199058
onNext 0.731057369148862
onCompleted

Mit der Methode [Observable.create] können Sie aus jedem beliebigen Ereignis ein Observable erstellen. Dies ist die Methode, die wir in Abschnitt 2 der Einführung verwendet haben, um eine synchrone Schnittstelle in eine asynchrone umzuwandeln.

7.1.4. Beispiel-05: Refactoring von [Beispiel-04]

  

Das folgende Beispiel zeigt eine neue Version der statischen Methode [Observable.subscribe]:


package dvp.rxjava.observables;
 
import rx.Observable;
import rx.Subscriber;
 
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
 
public class Exemple05 {
    public static void main(String[] args) {
        // configuration of a real observable
        Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
            @Override
            public void call(Subscriber<? super Double> subscriber) {
                showInfos("Observable.call start");
                for (int i = 0; i < 3; i++) {
                    // waiting
                    try {
                        Thread.sleep(500 - i * 100);
                    } catch (InterruptedException e) {
                        // error
                        subscriber.onError(e);
                    }
                    // action
                    double value = new Random().nextInt(100) * 1.2;
                    showInfos(String.format("Observable.call onNext(%s)", value));
                    subscriber.onNext(value);
                }
                // finish
                showInfos(String.format("Observable.call onCompleted"));
                subscriber.onCompleted();
            }
        });
 
        // a subscriber
        Subscriber<Double> subscriber = new Subscriber<Double>() {
            @Override
            public void onCompleted() {
                showInfos("Subscriber.onCompleted");
            }
 
            @Override
            public void onError(Throwable e) {
                showInfos(String.format("Subscriber.onError (%s)", e));
            }
 
            @Override
            public void onNext(Double aDouble) {
                showInfos(String.format("Subscriber.onNext (%s)", aDouble));
            }
        };
 
        // subscription
        showInfos("avant souscription");
        obs1.subscribe(subscriber);
        showInfos("après souscription");
 
    }
 
    private static void showInfos(String message) {
        System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
                new SimpleDateFormat("ss:SSS").format(new Date()));
    }
}
  • Zeile 56: Die neue Version der statischen Methode [Observable.subscribe] akzeptiert den Typ [Subscriber] als Parameter, den wir im vorigen Absatz vorgestellt haben;
  • Zeilen 37–52: der Subscriber (Beobachter). Er implementiert die Observer-Schnittstelle mit ihren drei Methoden onNext, onError und onCompleted;
  • Zeilen 61–64: Von hier an konzentrieren wir uns auf die Threads, in denen das Observable und sein Observer ausgeführt werden;
  • Zeile 62: der Name des Threads;
  • Zeile 63: die aktuelle Zeit, ausgedrückt in Sekunden und Millisekunden. Dies ermöglicht es uns, die Ausgabe von Elementen durch das Observable und deren Verarbeitung durch den Observer im Zeitverlauf zu verfolgen;
  • Dieser Code hat dieselbe Funktionalität wie der vorherige Code. Wir haben den letzteren lediglich umgestaltet;

Die erzielten Ergebnisse lauten wie folgt:

avant souscription ------Thread[main] ---- Time[31:685]
Observable.call start ------Thread[main] ---- Time[31:691]
Observable.call onNext(80.39999999999999) ------Thread[main] ---- Time[32:194]
Subscriber.onNext (80.39999999999999) ------Thread[main] ---- Time[32:195]
Observable.call onNext(73.2) ------Thread[main] ---- Time[32:595]
Subscriber.onNext (73.2) ------Thread[main] ---- Time[32:595]
Observable.call onNext(106.8) ------Thread[main] ---- Time[32:897]
Subscriber.onNext (106.8) ------Thread[main] ---- Time[32:897]
Observable.call onCompleted ------Thread[main] ---- Time[32:898]
Subscriber.onCompleted ------Thread[main] ---- Time[32:898]
après souscription ------Thread[main] ---- Time[32:899]
  • Zeile 1 der Ergebnisse: Vor Zeile 56 des Codes ist noch nichts passiert. Das Observable wurde lediglich konfiguriert;
  • Zeile 2 der Ergebnisse: Zeile 56 des Codes löst einen Aufruf der Methode [call] in Zeile 15 aus. Zeile 3: Die reelle Zahl 80,39 wird an den Observer gesendet;
  • Zeile 4: Der Observer empfängt die gesendete Zahl;
  • Zeilen 5–8: Der vorherige Vorgang wiederholt sich zweimal;
  • Zeile 9: Das Observable sendet die Benachrichtigung über das Ende der Übertragung;
  • Zeile 10: Der Beobachter empfängt sie;
  • Zeile 11: wird durch Zeile 57 des Codes angezeigt;

Wir sehen also, dass die einzelne Abonnementzeile 56 dazu geführt hat, dass die Zeilen 2–10 der Ergebnisse angezeigt wurden. Wenn man mit der RxJava-Bibliothek beginnt, fragt man sich, wie die Dinge miteinander verknüpft sind, insbesondere die Verbindungen zwischen dem Observer und dem Observable. Hier sehen wir, dass Zeile 56, das Abonnement des Observables,

  • die Ausgabe aller Elemente des Observables ausgelöst hat;
  • dass das Observable und der Observer im selben Thread laufen;
  • und dass wir aus diesem Grund die folgende Abfolge beobachten: Element i emittieren, Element i beobachten, Element (i+1) emittieren, Element (i+1) beobachten, ...

Erinnern wir uns daran, dass der Emitter vor der Emission seiner Elemente gewartet hat:


                    // attente
                    try {
                        Thread.sleep(500 - i * 100);
                    } catch (InterruptedException e) {
                        // erreur
                        subscriber.onError(e);
}

wobei i in Zeile 3 die Emissionsnummer darstellt (0 <= i < 3). Betrachten wir die Emissionszeiten der Elemente des Observables:

  • Zeilen 2, 3: Element 0 wurde etwa 500 ms nach Beginn des Abonnements ausgegeben;
  • Zeilen 3, 5: Element 1 wurde etwa 400 ms nach Element 0 emittiert;
  • Zeilen 5, 7: Element 2 wurde etwa 300 ms nach Element 1 emittiert;

7.2. Ausführungsthread, Beobachtungs-Thread

7.2.1. Beispiel-06: Observable und Observer in einem anderen Thread als [main]

  

Wir überarbeiten das vorherige Beispiel wie folgt [Beispiel 06]:


package dvp.rxjava.observables;
 
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
 
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
 
public class Exemple06 {
    public static void main(String[] args) {
 
        // gatekeeper
        CountDownLatch latch = new CountDownLatch(1);
 
        // configuration of a real observable
        Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
            @Override
            public void call(Subscriber<? super Double> subscriber) {
                showInfos("Observable.call start");
                for (int i = 0; i < 3; i++) {
                    // waiting
                    try {
                        Thread.sleep(500 - i * 100);
                    } catch (InterruptedException e) {
                        // error
                        subscriber.onError(e);
                    }
                    // action
                    double value = new Random().nextInt(100) * 1.2;
                    showInfos(String.format("Observable.call onNext(%s)", value));
                    subscriber.onNext(value);
                }
                // finish
                showInfos(String.format("Observable.call onCompleted"));
                subscriber.onCompleted();
            }
        });
 
        // a subscriber
        Subscriber<Double> subscriber = new Subscriber<Double>() {
            @Override
            public void onCompleted() {
                showInfos("Subscriber.onCompleted");
                // we lower the barrier
                latch.countDown();
            }
 
            @Override
            public void onError(Throwable e) {
                showInfos(String.format("Subscriber.onError (%s)", e));
            }
 
            @Override
            public void onNext(Double aDouble) {
                showInfos(String.format("Subscriber.onNext (%s)", aDouble));
            }
        };
 
        // suite observable configuration
        obs1 = obs1.subscribeOn(Schedulers.computation());
        // subscription
        showInfos("avant souscription");
        obs1.subscribe(subscriber);
        // waiting at the gate
        try {
            showInfos("début attente barrière");
            latch.await();
            showInfos("fin attente barrière");
        } catch (InterruptedException e1) {
            System.out.println(e1);
        }
        showInfos("après souscription");
 
    }
 
    private static void showInfos(String message) {
        System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
                new SimpleDateFormat("ss:SSS").format(new Date()));
    }
}
  • Zeile 16: Wir erstellen eine Schutzbarriere (Semaphor) mithilfe eines [CountDownLatch]-Objekts. Dieses Objekt dient dazu, Threads miteinander zu synchronisieren. Hier wird es mit dem Wert 1 initialisiert, den wir als Wert der Schutzbarriere (oder des Semaphors) bezeichnen. Ein Thread wartet mit dem folgenden Befehl auf die Schutzbarriere:

latch.await();

Der Thread wird blockiert, wenn der Latch-Wert >0 ist. Ein Thread kann den internen Wert des Latch erhöhen oder verringern. Zeile 48: Der Latch-Wert wird um 1 verringert.

  • Zeile 63: Das Observable ist so konfiguriert, dass es auf einem vom Scheduler bereitgestellten Thread läuft [Schedulers.computation()]. Dieser Scheduler kann so viele Threads bereitstellen, wie es Kerne auf dem Ausführungsrechner gibt. Der Abschnitt zur Beispielanwendung demonstrierte die Verwendung anderer Scheduler (siehe Abschnitt 2.8);

Das Prinzip des Codes ist wie folgt:

  • Die [main]-Methode läuft im Hauptthread;
  • Zeile 66: beginnt mit der Ausgabe von Elementen aus dem Observable. Diese werden auf einem anderen Thread als dem Hauptthread ausgegeben;
  • Zeile 70: Der Hauptthread wird blockiert, da die Barriere den Wert 1 hat (siehe Zeile 16). Er kann erst fortfahren, wenn dieser Wert auf 0 wechselt. Dies geschieht in Zeile 48. Es ist der Beobachter, der die Barriere senkt, wenn er die Benachrichtigung erhält, dass das Observable die Ausgabe beendet hat;

Die Ausführung liefert folgende Ergebnisse:

avant souscription ------Thread[main] ---- Time[09:268]
Observable.call start ------Thread[RxComputationThreadPool-1] ---- Time[09:278]
début attente barrière ------Thread[main] ---- Time[09:278]
Observable.call onNext(44.4) ------Thread[RxComputationThreadPool-1] ---- Time[09:783]
Subscriber.onNext (44.4) ------Thread[RxComputationThreadPool-1] ---- Time[09:783]
Observable.call onNext(18.0) ------Thread[RxComputationThreadPool-1] ---- Time[10:183]
Subscriber.onNext (18.0) ------Thread[RxComputationThreadPool-1] ---- Time[10:184]
Observable.call onNext(54.0) ------Thread[RxComputationThreadPool-1] ---- Time[10:486]
Subscriber.onNext (54.0) ------Thread[RxComputationThreadPool-1] ---- Time[10:488]
Observable.call onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[10:489]
Subscriber.onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[10:490]
fin attente barrière ------Thread[main] ---- Time[10:491]
après souscription ------Thread[main] ---- Time[10:493]
  • Zeile 1: Das Abonnieren steht kurz bevor;
  • Zeile 2: Dies löst die Ausführung der Methode [call] im Thread [RxComputationThreadPool-1] aus. Wir haben nun eine parallele Ausführung mit zwei Threads;
  • Zeile 3: Aus unbekanntem Grund hat der Thread [RxComputationThreadPool-1] die Kontrolle abgegeben. Der [main]-Thread übernimmt daraufhin die Kontrolle und wird durch die Barriere blockiert (Zeile 70 des Codes). Ab diesem Zeitpunkt kann nur noch der Thread [RxComputationThreadPool-1] arbeiten;
  • Zeilen 4–11: Wir beobachten das zuvor gesehene Verhalten zwischen dem Observable und seinem Observer, aber alles findet nun im Thread [RxComputationThreadPool-1] statt;
  • Zeilen 12–13: Der Observer hat die Barriere gesenkt (Zeile 48 des Codes) und der Thread [RxComputationThreadPool-1] wurde beendet. Der Thread [main] übernimmt die Kontrolle und zeigt zwei Meldungen an;

7.2.2. Beispiel-07: Observable und Beobachter in zwei verschiedenen Threads

  

Wir ändern das vorherige Beispiel wie folgt:


package dvp.rxjava.observables;
 
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
 
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
 
public class Exemple07 {
    public static void main(String[] args) {
 
        // gatekeeper
        CountDownLatch latch = new CountDownLatch(1);
 
        // configuration of a real observable
        Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
            @Override
            public void call(Subscriber<? super Double> subscriber) {
                showInfos("Observable.call start");
                for (int i = 0; i < 3; i++) {
                    // waiting
                    try {
                        Thread.sleep(500 - i * 100);
                    } catch (InterruptedException e) {
                        // error
                        subscriber.onError(e);
                    }
                    // action
                    double value = new Random().nextInt(100) * 1.2;
                    showInfos(String.format("Observable.call onNext(%s)", value));
                    subscriber.onNext(value);
                }
                // finish
                showInfos(String.format("Observable.call onCompleted"));
                subscriber.onCompleted();
            }
        });
 
        // a subscriber
        Subscriber<Double> subscriber = new Subscriber<Double>() {
            @Override
            public void onCompleted() {
                showInfos("Subscriber.onCompleted");
                // we lower the barrier
                latch.countDown();
            }
 
            @Override
            public void onError(Throwable e) {
                showInfos(String.format("Subscriber.onError (%s)", e));
            }
 
            @Override
            public void onNext(Double aDouble) {
                showInfos(String.format("Subscriber.onNext (%s)", aDouble));
            }
        };
 
        // suite observable configuration
        obs1 = obs1.subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation());
        // subscription
        showInfos("avant souscription");
        obs1.subscribe(subscriber);
        // waiting in front of the barrier
        try {
            showInfos("début attente barrière");
            latch.await();
            showInfos("fin attente barrière");
        } catch (InterruptedException e1) {
            System.out.println(e1);
        }
        showInfos("après souscription");
 
    }
 
    private static void showInfos(String message) {
        System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
                new SimpleDateFormat("ss:SSS").format(new Date()));
    }
}

Der Code ist bis auf Zeile 63 identisch mit dem des vorherigen Beispiels:


obs1 = obs1.subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation());

die das Observable (subscribeOn) und den Observer (observeOn) so konfiguriert, dass sie auf einem der vom Scheduler [Schedulers.computation()] bereitgestellten Threads ausgeführt werden.

Die erhaltenen Ergebnisse lauten wie folgt:

avant souscription ------Thread[main] ---- Time[09:643]
début attente barrière ------Thread[main] ---- Time[09:656]
Observable.call start ------Thread[RxComputationThreadPool-4] ---- Time[09:656]
Observable.call onNext(39.6) ------Thread[RxComputationThreadPool-4] ---- Time[10:162]
Subscriber.onNext (39.6) ------Thread[RxComputationThreadPool-3] ---- Time[10:163]
Observable.call onNext(98.39999999999999) ------Thread[RxComputationThreadPool-4] ---- Time[10:562]
Subscriber.onNext (98.39999999999999) ------Thread[RxComputationThreadPool-3] ---- Time[10:564]
Observable.call onNext(46.8) ------Thread[RxComputationThreadPool-4] ---- Time[10:864]
Observable.call onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[10:866]
Subscriber.onNext (46.8) ------Thread[RxComputationThreadPool-3] ---- Time[10:866]
Subscriber.onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[10:868]
fin attente barrière ------Thread[main] ---- Time[10:869]
après souscription ------Thread[main] ---- Time[10:870]

Folgende Punkte sind zu beachten:

  • Das Observable läuft im Thread [RxComputationThreadPool-4] (Zeilen 3–4, 6, 8–9);
  • Der Observer läuft im Thread [RxComputationThreadPool-3] (Zeilen 5, 7, 10–11);
  • sie laufen unabhängig voneinander. Daher sendet das Observable in den Zeilen 8–9 zwei Benachrichtigungen (onNext, onCompleted), bevor der Observer die Benachrichtigung [onNext] abruft (Zeile 10);

Die RxJava-Bibliothek übernimmt die Datenübertragung (Emissionen) vom Thread des Observables zum Thread des Observers. Der Entwickler muss sich darum keine Gedanken machen.

Wir haben gesehen, wie man Observables erstellt (Observable.from, Observable.create). Sehen wir uns nun die vordefinierten Observables in der RxJava-Bibliothek an.

7.3. Vordefinierte Observables

7.3.1. Beispiel-08: die Methode [Observable.range]

 

Von nun an werden wir spezielle Klassen für die beobachteten Prozesse und ihre Beobachter verwenden. Die Idee dahinter ist, ihre Namen, ihre Ausführungsthreads und ihre Ausführungszeiten protokollieren zu können, damit wir sie im Zeitverlauf nachverfolgen können.

Die Klasse [Process] wird einfach ein Observable sein, das wir benennen können. Sie wird die folgende Schnittstelle [IProcess] implementieren:


package dvp.rxjava.observables.utils;
 
import rx.Observable;
 
public interface IProcess<T> {
 
    // name of observable
    public String getName();
 
    // observable
    public Observable<T> getObservable();
 
}

Diese Schnittstelle kann durch die folgende Klasse [Process<T>] implementiert werden:


package dvp.rxjava.observables.utils;
 
import rx.Observable;
import rx.Scheduler;
 
public class Process<T> implements IProcess<T>{
 
    // observable name
    protected String name;
    // observed process
    protected Observable<T> observable;
 
    // manufacturers
    public Process(String name, Observable<T> observable) {
        // local initializations
        this.name = name;
        this.observable = observable;
    }
 
    // getters and setters
    public String getName() {
        return name;
    }
 
    public Observable<T> getObservable() {
        return observable;
    }
 
}
  • Zeile 9: der Name des Prozesses;
  • Zeile 11: die beobachtete Größe;
  • Zeilen 14–18: der Konstruktor;

Der Beobachter wird durch die folgende Klasse [Observer] beschrieben:


package dvp.rxjava.observables.utils;
 
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
 
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
 
import rx.Subscriber;
 
public class Observateur<T> extends Subscriber<T> {
 
...
}
  • Zeile 11: Die Klasse `Observateur<T>` erweitert die Klasse `Subscriber<T>`, die wir in Abschnitt 7.1.3 kurz vorgestellt haben. Wir werden sie als Argument für die Methode `[Observable.subscribe]` verwenden:

// exécution observable (observation)
obs1.subscribe(observateur);

Die in Zeile 2 oben verwendete Methode [Observable.subscribe] hat folgende Definition:

 

Die Aufgabe des [Subscriber] besteht in erster Linie darin, die vom Observable, den er abonniert hat, ausgegebenen Elemente mithilfe der Methoden der [Observer]-Schnittstelle zu verwalten: onNext, onError, onCompleted. Die Klasse [Subscriber] verfügt über die folgenden Methoden:

 

Im Code für die [Observer]-Klasse verwenden wir die Methode [1] isUnsubscribed, um festzustellen, ob das Abonnement des Subscribers gekündigt wurde oder nicht. Die vollständige [Observer<T>]-Klasse lautet wie folgt:


package dvp.rxjava.observables.utils;
 
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
 
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
 
import rx.Subscriber;
 
public class Observateur<T> extends Subscriber<T> {
 
    // a gatekeeper (semaphore)
    private CountDownLatch latch;
    // a display method
    private Consumer<String> showInfos;
    // observer's name
    private String observerName;
    // the name of the observed process
    private String processName;
 
    // manufacturers
    public Observateur() {
 
    }
 
    public Observateur(String name, CountDownLatch latch, Consumer<String> showInfos, String observedName) {
        this.observerName = name;
        this.latch = latch;
        this.showInfos = showInfos;
        this.processName = observedName;
    }
 
    // --------------------------- implementation interface Observer<T>
    @Override
    public void onCompleted() {
        // end of issues
        if (!isUnsubscribed()) {
            showInfos.accept(String.format("Subscriber [%s,%s].onCompleted", observerName, processName));
        }
        // end of main thread lock
        latch.countDown();
    }
 
    @Override
    public void onError(Throwable e) {
        // emission error
        if (!isUnsubscribed()) {
            showInfos.accept(String.format("Subscriber [%s, %s].onError (%s)", observerName, processName, e));
        }
    }
 
    @Override
    public void onNext(T value) {
        // an additional show
        if (!isUnsubscribed()) {
            try {
                showInfos.accept(String.format("Subscriber [%s,%s] : onNext (%s)", observerName, processName,
                        new ObjectMapper().writeValueAsString(value)));
            } catch (JsonProcessingException e) {
                showInfos.accept(String.format("Subscriber [%s,%s].onNext (%s)", observerName, processName, e));
            }
        }
    }
}
  • Zusätzlich zu den Eigenschaften eines Subscribers enthält der Observer die folgenden Informationen:
    • Zeile 14: eine Barriere oder ein Semaphor, das verwendet wird, um den Hauptthread zu blockieren, bis der Observer alle vom Observable emittierten Elemente empfangen hat. Dies geschieht in Zeile 36 des Codes, wenn der Observer die Benachrichtigung über das Ende der Emission vom Observable erhält;
    • Zeile 16: eine Consumer<String>-Instanz, die dazu dient, eine Meldung auf der Konsole anzuzeigen;
    • Zeile 18: der Name des Beobachters, der zur Unterscheidung zwischen mehreren Beobachtern dient;
    • Zeile 20: der Name des beobachteten Prozesses;
  • Zeilen 36, 46, 54: die Methoden [onCompleted, onError, onNext] der Schnittstelle [Observer<T>], die von der abstrakten Klasse [Subscriber<T>] implementiert wird. Diese Klasse implementiert sie nicht. Dies muss daher in ihren Unterklassen erfolgen. Bevor in diesen Methoden etwas ausgeführt wird, prüfen wir, ob der Beobachter von dem beobachteten Objekt abgemeldet wurde;
  • Zeile 59: Die [onNext]-Methode des Beobachters schreibt die JSON-Zeichenkette des empfangenen Elements. Dies ermöglicht es uns, verschiedene Arten von Elementen anzuzeigen;

Schauen wir uns nun eine neue Methode der Observable-Klasse an, die [range]-Methode:

 

Das Observable Observable.range(n,m) gibt (m) Ganzzahlen im Bereich von n bis n+m-1 aus. Wir werden dies anhand des folgenden Codes [Beispiel08] untersuchen:


package dvp.rxjava.observables.exemples;
 
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
 
import dvp.rxjava.observables.utils.Observateur;
import rx.Observable;
import rx.schedulers.Schedulers;
 
public class Exemple08 {
    public static void main(String[] args) throws InterruptedException {
 
        // number of observers
        final int nbObservateurs = 2;
 
        // semaphore
        CountDownLatch latch = new CountDownLatch(nbObservateurs);
 
        // observable configuration
        Observable<Integer> obs1 = Observable.range(15, 3).subscribeOn(Schedulers.computation());
        // observable performance (observation)
        showInfos.accept("main : début observation");
        for (int i = 0; i < nbObservateurs; i++) {
            obs1.subscribe(new Observateur<>(String.format("observateur[%d]", i), latch, showInfos,"obs1"));
        }
        // waiting
        showInfos.accept("main : attente fin observation");
        latch.await();
        // end
        showInfos.accept("main : fin observation");
    }
 
    // displays
    static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
            Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
  • Zeile 16: Wir verwenden zwei Beobachter;
  • Zeile 19: Die Schutzbarriere (Semaphor) wird auf zwei initialisiert, da wir jeden Beobachter in einem anderen Thread platzieren. Der Hauptthread muss daher warten, bis beide Beobachter-Threads beendet sind;
  • Zeile 22: Wir konfigurieren das Observable so, dass es auf einem Thread des Schedulers läuft [Schedulers.computation()]. Der Beobachter befindet sich auf demselben Thread wie das Observable;
  • Zeilen 25–27: Wir abonnieren zwei Beobachter für das Observable. Dies löst für jeden Beobachter die vollständige Ausführung des Observables aus: Die Ganzzahlen 15, 16 und 17 werden ausgegeben;
  • Zeile 30: Der Hauptthread wartet, bis die Beobachter fertig sind;

Die erhaltenen Ergebnisse lauten wie folgt:

main : début observation ------Thread[main] ---- Time[27:875]
main : attente fin observation ------Thread[main] ---- Time[27:893]
Subscriber[observateur[1],obs1] : onNext (15) ------Thread[RxComputationThreadPool-2] ---- Time[28:245]
Subscriber[observateur[0],obs1] : onNext (15) ------Thread[RxComputationThreadPool-1] ---- Time[28:245]
Subscriber[observateur[1],obs1] : onNext (16) ------Thread[RxComputationThreadPool-2] ---- Time[28:247]
Subscriber[observateur[0],obs1] : onNext (16) ------Thread[RxComputationThreadPool-1] ---- Time[28:248]
Subscriber[observateur[1],obs1] : onNext (17) ------Thread[RxComputationThreadPool-2] ---- Time[28:249]
Subscriber[observateur[1],obs1].onCompleted ------Thread[RxComputationThreadPool-2] ---- Time[28:250]
Subscriber[observateur[0],obs1] : onNext (17) ------Thread[RxComputationThreadPool-1] ---- Time[28:251]
Subscriber[observateur[0],obs1].onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[28:252]
main : fin observation ------Thread[main] ---- Time[28:252]
  • Zeile 2: Der Hauptthread ist blockiert und wartet darauf, dass die beiden Beobachter fertig werden;
  • Zeilen 3–4: Wir sehen, dass Beobachter 0 auf dem Thread [RxComputationThreadPool-1] und Beobachter 1 auf dem Thread [RxComputationThreadPool-2] läuft;
  • Zeilen 3–10: Wir sehen, dass beide Beobachter genau dieselben Elemente erhalten;

Wir werden die hier definierte Observer-Klasse verwenden, um das Verhalten anderer Arten von Observables zu veranschaulichen.

7.3.2. Beispiel-09: Die Methoden Observable.[interval, take, doNext]

  
 

Dieses Beispiel veranschaulicht die Verwendung des Observables *Observable.interval(long interval, TimeUnit unit)*, das in regelmäßigen Abständen lange Ganzzahlen ausgibt. Beachten Sie Punkt [1]: Standardmäßig wird das Observable Observable.interval auf einem der Threads des Schedulers Schedulers.computation ausgeführt.

Der Code sieht wie folgt aus:


package dvp.rxjava.observables.exemples;
 
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
 
import dvp.rxjava.observables.utils.Observateur;
import rx.Observable;
 
public class Exemple09 {
    public static void main(String[] args) throws InterruptedException {
 
        // number of observers
        final int nbObservateurs = 2;
 
        // semaphore
        CountDownLatch latch = new CountDownLatch(nbObservateurs);
 
        // observable configuration
        Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
                .doOnNext(l -> showInfos.accept(l.toString()));
        // observable performance (observation)
        showInfos.accept("main : début observation");
        for (int i = 0; i < nbObservateurs; i++) {
            obs1.subscribe(new Observateur<>(String.format("observateur [%d]", i), latch, showInfos,
                    "obs1"));
        }
        // waiting
        showInfos.accept("main : attente fin observation");
        latch.await();
        // end
        showInfos.accept("main : fin observation");
    }
 
    // displays
    static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
            Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
  • Zeile 22: Das Observable gibt alle 500 Millisekunden lange Ganzzahlen aus. Die Sequenz beginnt mit der Zahl 0;
  • Zeile 22: Dieses Observable gibt eine unendliche Anzahl von Werten aus. Die Methode [Observable.take(n)] erstellt ein neues Observable, das nur die ersten n ausgegebenen Elemente beibehält;
 

Schauen wir uns den Code des Observables noch einmal an:


Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.doOnNext(l -> showInfos.accept(l.toString()));

Zeile 2: Die Methode [Observable.doOnNext] wird jedes Mal ausgeführt, wenn das Observable ein neues Element ausgibt. Dies wird häufig zum Protokollieren von Informationen verwendet. Hier möchten wir das Ausgabedatum der Elemente protokollieren, um zu überprüfen, ob das Intervall von 500 Millisekunden eingehalten wird. Die Methode [Observable.doOnNext] verändert das Observable, auf das sie angewendet wird, nicht. Ihre Definition lautet wie folgt:

 

Die Ausführung liefert folgende Ergebnisse:

main : début observation ------Thread[main] ---- Time[55:892]
main : attente fin observation ------Thread[main] ---- Time[55:911]
0 ------Thread[RxComputationThreadPool-1] ---- Time[56:412]
0 ------Thread[RxComputationThreadPool-2] ---- Time[56:413]
Subscriber[observateur [1],obs1] : onNext (0) ------Thread[RxComputationThreadPool-2] ---- Time[56:723]
Subscriber[observateur [0],obs1] : onNext (0) ------Thread[RxComputationThreadPool-1] ---- Time[56:723]
1 ------Thread[RxComputationThreadPool-1] ---- Time[56:906]
Subscriber[observateur [0],obs1] : onNext (1) ------Thread[RxComputationThreadPool-1] ---- Time[56:908]
1 ------Thread[RxComputationThreadPool-2] ---- Time[56:912]
Subscriber[observateur [1],obs1] : onNext (1) ------Thread[RxComputationThreadPool-2] ---- Time[56:914]
2 ------Thread[RxComputationThreadPool-1] ---- Time[57:405]
Subscriber[observateur [0],obs1] : onNext (2) ------Thread[RxComputationThreadPool-1] ---- Time[57:407]
Subscriber[observateur [0],obs1].onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[57:408]
2 ------Thread[RxComputationThreadPool-2] ---- Time[57:412]
Subscriber[observateur [1],obs1] : onNext (2) ------Thread[RxComputationThreadPool-2] ---- Time[57:414]
Subscriber[observateur [1],obs1].onCompleted ------Thread[RxComputationThreadPool-2] ---- Time[57:415]
main : fin observation ------Thread[main] ---- Time[57:416]
  • Zeilen 3, 7 und 11: Wir sehen, dass das Emissionsintervall etwa 500 ms beträgt;
  • die beiden Beobachter befinden sich tatsächlich auf zwei verschiedenen Threads, obwohl das Observable nicht für die Ausführung mit einem bestimmten Scheduler konfiguriert wurde. Dies ist das Standardverhalten des hier gezeigten Observables [Observable.interval];

7.3.3. Beispiele-10/12: die Methoden Observable.[error, empty, never]

 

Von nun an werden wir die Methoden der Klasse [Observable] prägnanter veranschaulichen. Der vorherige Code lautete wie folgt:


package dvp.rxjava.observables;
 
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
 
import rx.Observable;
 
public class Exemple09 {
    public static void main(String[] args) throws InterruptedException {
 
        // number of observers
        final int nbObservateurs = 2;
 
        // semaphore
        CountDownLatch latch = new CountDownLatch(nbObservateurs);
 
        // observable configuration
        Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
                .doOnNext(l -> showInfos.accept(l.toString()));
        // observable performance (observation)
        showInfos.accept("main : début observation");
        for (int i = 0; i < nbObservateurs; i++) {
            obs1.subscribe(new Observateur<>(String.format("observateur [%d]", i), latch, showInfos,
                    "obs1"));
        }
        // waiting
        showInfos.accept("main : attente fin observation");
        latch.await();
        // end
        showInfos.accept("main : fin observation");
    }
 
    // displays
    static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
            Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}

Dieser Code wurde bereits im vorherigen Beispiel verwendet. Nur die Zeilen 21–22 haben sich geändert. Wir werden daher den Großteil dieses Codes in die folgende Klasse [ProcessUtils] auslagern:


package dvp.rxjava.observables.utils;
 
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
 
import rx.Observable;
 
public class ProcessUtils {
 
    @SafeVarargs
    public static void subscribe(int nbObservateurs, IProcess<?>... processes) throws InterruptedException {
 
        // semaphore
        CountDownLatch latch = new CountDownLatch(nbObservateurs * processes.length);
 
        // observable performance (observation)
        showInfos.accept("main : début observation");
        for (int i = 0; i < nbObservateurs; i++) {
            for (IProcess<?> process : processes) {
                Observable<?> obs = process.getObservable();
                obs.subscribe(new Observateur<>(String.format("observateur[%d]", i), latch, showInfos, process.getName()));
            }
        }
        // waiting
        showInfos.accept("main : attente fin observation");
        latch.await();
        // end
        showInfos.accept("main : fin observation");
    }
 
    // displays
    static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
            Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
  • Zeile 13: Die Methode nimmt zwei Parameter entgegen:
    • nbObservers: die Anzahl der Beobachter für die Prozesse, die als zweiter Parameter übergeben wird;
    • Prozesse: die zu beobachtenden Prozesse (sogenannte „Observables“). Dank der Notation [IProcess<?>] können Prozesse Elemente unterschiedlicher Typen ausgeben;
  • Zeile 16: Das Semaphor muss auf Grün schalten, wenn alle Beobachter alle ihre Beobachtungen abgeschlossen haben. Der Anfangswert des Semaphors ist daher die Anzahl der Beobachter multipliziert mit der Anzahl der Beobachtungen;
  • Zeilen 20–25: Jeder Beobachter ist auf alle Prozesse abonniert, die er beobachten muss;
  • Zeile 23: Abrufen des Observables aus dem Prozess (siehe Abschnitt 7.3.1);
  • Zeile 23: Ein Beobachter ist darauf abonniert. Vier Informationen werden an den Beobachter übergeben:
    • seinen Namen;
    • das Semaphor, das er dekrementieren muss, wenn er die Benachrichtigung über das Ende der Übertragung von der von ihm beobachteten Observable erhält;
    • die Methode, die verwendet werden soll, wenn er Informationen in der Konsole protokollieren möchte;
    • den Namen des Prozesses, den er beobachten wird;

Nachdem diese Klassen definiert sind, sieht Beispiel 10 wie folgt aus:


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
 
public class Exemple10 {
    public static void main(String[] args) throws InterruptedException {
        // observable configuration
        Observable<?> obs = Observable.error(new RuntimeException("Erreur !!!")).subscribeOn(Schedulers.computation());
        // observable execution (observation)
        ProcessUtils.subscribe(2,new Process<>("process1", obs));
    }
}

In Zeile 11 ist die statische Methode [Observable.error] wie folgt definiert:

 

Zeile 8 konfiguriert daher eine Beobachtbare, die lediglich eine Ausnahme an die [onError]-Methode ihrer Abonnenten auslöst. Die Ausführung liefert folgende Ergebnisse:


main : début observation ------Thread[main] ---- Time[22:618]
main : attente fin observation ------Thread[main] ---- Time[22:636]
Subscriber[observateur[1], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-2] ---- Time[22:638]
Subscriber[observateur[0], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-1] ---- Time[22:638]

Zeilen 3 und 4: Die [onError]-Methode beider Subscriber hat die vom Observable ausgelöste Ausnahme empfangen.

Diese Ausführung weist eine Besonderheit auf: Die [onCompleted]-Methoden beider Beobachter wurden nicht aufgerufen. Infolgedessen wurde die Barriere nicht gesenkt, und der Hauptthread bleibt in der statischen Methode [ProcessUtils.subscribe] in der folgenden Zeile 3 blockiert:


// attente
showInfos.accept("main : attente fin observation");
latch.await();
// fin
showInfos.accept("main : fin observation");

Hier sehen wir, dass die [onCompleted]-Methode der Abonnenten nicht aufgerufen wird, wenn im Observable ein Fehler auftritt. Wir ändern daher die [Observer.onError]-Methode wie folgt:


    @Override
    public void onError(Throwable e) {
        // erreur d'émission
        if (!isUnsubscribed()) {
            showInfos.accept(String.format("Subscriber[%s, %s].onError (%s)", observerName, processName, e));
        }
        // fin blocage thread principal
        latch.countDown();
}

Wir fügen die Zeilen 7–8 hinzu, um die Sperre im Falle eines beobachtbaren Fehlers aufzuheben. Mit diesem neuen Code liefert die Ausführung die folgenden Ergebnisse:


main : début observation ------Thread[main] ---- Time[40:750]
main : attente fin observation ------Thread[main] ---- Time[40:764]
Subscriber[observateur[0], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-1] ---- Time[40:766]
Subscriber[observateur[1], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-2] ---- Time[40:766]
main : fin observation ------Thread[main] ---- Time[40:767]

Wir erhalten Zeile 5, die wir zuvor nicht hatten.

Beispiel 11 sieht wie folgt aus:


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
 
public class Exemple11 {
    public static void main(String[] args) throws InterruptedException {
        // observable configuration
        Observable<?> obs1 = Observable.empty();
        // observable execution (observation)
        ProcessUtils.subscribe(2,new Process<>("process1",obs1));
    }
}

Zeile 10: Die statische Methode [Observable.empty] erstellt ein Observable, das keine Elemente ausgibt. Es gibt lediglich die Benachrichtigung über das Ende der Ausgabe aus;

 

Die Ausführung des Codes im obigen Beispiel liefert folgende Ergebnisse:

1
2
3
4
5
main : début observation ------Thread[main] ---- Time[37:073]
Subscriber[observateur[0],process1].onCompleted ------Thread[main] ---- Time[37:086]
Subscriber[observateur[1],process1].onCompleted ------Thread[main] ---- Time[37:086]
main : attente fin observation ------Thread[main] ---- Time[37:087]
main : fin observation ------Thread[main] ---- Time[37:087]
  • Zeilen 2 und 3: Wir sehen, dass beide Beobachter die Benachrichtigung über das Ende der Übertragung erhalten, ohne zuvor Elemente empfangen zu haben.

Man könnte sich fragen, wozu diese Methode eigentlich dient. Sie kann analog zu einer zunächst leeren Sammlung verwendet werden, in die dann Elemente hinzugefügt werden:

1
2
3
4
Observable obs=Observable.empty() ;
for(Observable o : observables){
    obs=obs.mergeWith(o) ;
}

In Zeile 3 führen wir das ursprüngliche Observable obs (Zeile 1) mit anderen Observables zusammen.

Beispiel 12 veranschaulicht die statische Methode [Observable.never]:


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
 
public class Exemple12 {
    public static void main(String[] args) throws InterruptedException {
        // observable configuration
        Observable<?> obs1 = Observable.never();
        // observable execution (observation)
        ProcessUtils.subscribe(2,new Process<>("process1",obs1));
    }
}

Die statische Methode [Observable.never] erstellt ein Observable, das niemals eine Beobachtung ausgibt:

 

Die Ausführung des Beispiels liefert folgende Ergebnisse:

main : début observation ------Thread[main] ---- Time[27:018]
main : attente fin observation ------Thread[main] ---- Time[27:030]

Zeile 2: Der Hauptthread wartet unbegrenzt. Dies liegt daran, dass kein Observable die Benachrichtigung [onCompleted] ausgibt, wodurch der Semaphor (die Barriere) auf Grün schalten (die Barriere senken) könnte.

7.4. Multithreading

7.4.1. Beispiel 13: Aktions-Thread, Beobachter-Thread

In Abschnitt 7.1.3 haben wir mit der statischen Methode [Observable.create] ein Observable erstellt:

 
  • Die Methode [create] gibt einen Typ Observable<T> zurück;
  • Der Parameter der [create]-Methode ist eine Funktion vom Typ [Observable.OnSubscribe<T>], die wie folgt definiert ist:
 

Der Typ [Observable.OnSubscribe<T>] ist eine funktionale Schnittstelle, die selbst die funktionale Schnittstelle [Action1<Subscriber<? super T>>] erweitert. Die Methode [call] dieser Schnittstelle erwartet einen Typ [Subscriber] (Abonnent, Beobachter). Im weiteren Verlauf dieses Dokuments werden wir den Typ [Observable.OnSubscribe<T>] manchmal als Aktion bezeichnen. Wir werden benutzerdefinierte Aktionen erstellen, die einen Namen haben. Diese sind Instanzen der folgenden Schnittstelle [IProcessAction]:

  

package dvp.rxjava.observables.utils;
 
import rx.Observable;
 
public interface IProcessAction<T> extends Observable.OnSubscribe<T> {
 
    // action has a name
    public String getName();
}
  • Zeile 5: Die Schnittstelle [IProcessAction<T>] weist alle Merkmale der Schnittstelle [Observable.OnSubscribe<T>] auf;
  • Zeile 8: Sie verfügt außerdem über eine [getName]-Methode, die den Namen der Instanz zurückgibt, die die Schnittstelle implementiert;

Wir werden die folgende Aktion mit dem Namen [ProcessAction01] verwenden:


package dvp.rxjava.observables.utils;
 
import java.util.Random;
 
import rx.Subscriber;
import rx.functions.Func1;
 
public class ProcessAction01<T> implements IProcessAction<T> {
 
    // data
    private String name;
    private int nbValues;
    private Func1<Integer, T> func1;
 
    // manufacturers
    public ProcessAction01(String name, int nbValues, Func1<Integer, T> func1) {
        this.name = name;
        this.nbValues = nbValues;
        this.func1 = func1;
    }
 
    @Override
    public void call(Subscriber<? super T> subscriber) {
        ProcessUtils.showInfos.accept(String.format("Observable (%s) call start", getName()));
        for (int i = 0; i < nbValues; i++) {
            // waiting
            try {
                Thread.sleep(new Random().nextInt(500));
            } catch (InterruptedException e) {
                // error
                ProcessUtils.showInfos.accept(String.format("Observable (%s) onError", getName()));
                subscriber.onError(e);
            }
            // element emission
            T value = func1.call(i);
            ProcessUtils.showInfos.accept(String.format("Observable (%s,%s) onNext (%s)", getName(), i, value));
            subscriber.onNext(value);
        }
        // finish
        ProcessUtils.showInfos.accept(String.format("Observable (%s) onCompleted", getName()));
        subscriber.onCompleted();
    }
 
    @Override
    public String getName() {
        return name;
    }
 
}
  • Zeile 8: Die Klasse [ProcessAction01<T>] implementiert die Schnittstelle [IProcessAction<T>] und damit auch die Schnittstelle [Observable.OnSubscribe<T>];
  • Zeile 11: Der Name der Aktion;
  • Zeile 12: die Anzahl der auszugebenden Werte;
  • Zeile 13: eine Instanz vom Typ [Func1<Integer, T>], die eine Ganzzahl entgegennimmt und einen Typ T erzeugt, der vom Observable ausgegeben wird (Zeilen 35 und 37);
  • Zeilen 16–20: Wir übergeben den Aktionsnamen, die Anzahl der auszugebenden Werte und die Ausgabefunktion an den Konstruktor;
  • Zeilen 23–42: der Prozesscode;
  • Zeile 23: Die Methode [call] nimmt als Parameter den Abonnenten des mit dem Prozess verbundenen Observables entgegen;
  • Zeile 28: Der Prozess gibt seine Elemente nach einer Wartezeit von zufälliger Dauer aus;
  • Zeile 32: Emission eines Fehlers;
  • Zeile 37: eine normale Emission;
  • Zeile 41: sendet die Benachrichtigung über das Ende der Ausgabe;
  • Zeilen 25–38: Die Aktion sendet nbValues reelle Zahlen nach einer zufälligen Wartezeit (Zeile 30);
  • Zeile 35: Der auszugebende Wert wird von der Funktion [func1] bereitgestellt, die als Parameter an den Konstruktor übergeben wird (Zeile 16);

Wir refaktorisieren die Klasse [Process] (siehe Abschnitt 7.3.1), damit sie auch mit einer benannten Aktion instanziiert werden kann. Wir fügen den folgenden Konstruktor hinzu:


public Process(IProcessAction<T> na, Scheduler schedulerObserved, Scheduler schedulerObserver) {
        // nom process=nom action
        name = na.getName();
        // action --> observable
        observable = Observable.create(na);
        // thread d'exécution du processus observé
        if (schedulerObserved != null) {
            observable = observable.subscribeOn(schedulerObserved);
        }
        // thread d'observation de l'observateur
        if (schedulerObserver != null) {
            observable = observable.observeOn(schedulerObserver);
        }
    }
  • Zeile 1: Der Konstruktor nimmt 3 Parameter entgegen:
    1. die benannte Aktion, die zum Erstellen des Observables verwendet wird (Zeile 5);
    2. den Scheduler des beobachteten Prozesses (kann null sein);
    3. den Scheduler des Beobachters (kann null sein);
  • Zeile 5: Das Observable wird aus der als Parameter übergebenen Aktion erstellt;

Der folgende Code [Beispiel 13] beobachtet verschiedene Observables:


package dvp.rxjava.observables.exemples;
 
import java.util.Random;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
 
public class Exemple13 {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Double> process1 = new Process<>(
                new ProcessAction01<Double>("process1", 1, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
                Schedulers.computation());
        // process 2
        Process<String> process2 = new Process<>(
                new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), Schedulers.computation(), null);
        // process 3
        Process<Integer> process3 = new Process<>(new ProcessAction01<Integer>("process3", 3, i -> i * 2), null,
                Schedulers.computation());
        // process 4
        Process<Boolean> process4 = new Process<>(new ProcessAction01<Boolean>("process4", 4, i -> i % 2 == 0), null, null);
        // subscriptions
        ProcessUtils.subscribe(1, process1);
        ProcessUtils.subscribe(1, process2);
        ProcessUtils.subscribe(1, process3);
        ProcessUtils.subscribe(1, process4);
    }
}
  • Zeilen 13–15: process1 erzeugt 1 reelle Zahl in einem Berechnungs-Thread, die in einem anderen Berechnungs-Thread beobachtet wird;
  • Zeilen 17–18: process2 erzeugt 2 Zeichenfolgen in einem Rechen-Thread, und es wird kein Hinweis auf den Thread des Beobachters gegeben. Die Ergebnisse zeigen, dass die Beobachtung standardmäßig im selben Thread wie die Prozessausführung erfolgt;
  • Zeilen 20–21: process3 erzeugt 3 Ganzzahlen auf einem nicht näher bezeichneten Thread, die auf einem Berechnungs-Thread beobachtet werden. Die Ergebnisse zeigen, dass der Prozess standardmäßig auf dem Haupt-Thread läuft;
  • Zeile 23: Der Prozess process4 erzeugt 4 Boolesche Werte auf einem nicht spezifizierten Thread, die auf einem nicht spezifizierten Thread beobachtet werden. Die Ergebnisse zeigen, dass die Prozessausführung und ihre Beobachtung standardmäßig auf dem Hauptthread erfolgen;

Das Ergebnis der Ausführung dieses Codes lautet wie folgt:

main : début observation ------Thread[main] ---- Time[18:642]
main : attente fin observation ------Thread[main] ---- Time[18:660]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[18:660]
Observable (process1,0) onNext (68.39999999999999) ------Thread[RxComputationThreadPool-4] ---- Time[19:093]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[19:094]
Subscriber[observateur[0],process1] : onNext (68.39999999999999) ------Thread[RxComputationThreadPool-3] ---- Time[19:396]
Subscriber[observateur[0],process1].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[19:397]
main : fin observation ------Thread[main] ---- Time[19:397]
main : début observation ------Thread[main] ---- Time[19:398]
main : attente fin observation ------Thread[main] ---- Time[19:399]
Observable (process2) call start ------Thread[RxComputationThreadPool-5] ---- Time[19:399]
Observable (process2,0) onNext (valeur-0) ------Thread[RxComputationThreadPool-5] ---- Time[19:630]
Subscriber[observateur[0],process2] : onNext ("valeur-0") ------Thread[RxComputationThreadPool-5] ---- Time[19:631]
Observable (process2,1) onNext (valeur-1) ------Thread[RxComputationThreadPool-5] ---- Time[20:094]
Subscriber[observateur[0],process2] : onNext ("valeur-1") ------Thread[RxComputationThreadPool-5] ---- Time[20:095]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-5] ---- Time[20:096]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-5] ---- Time[20:096]
main : fin observation ------Thread[main] ---- Time[20:097]
main : début observation ------Thread[main] ---- Time[20:097]
Observable (process3) call start ------Thread[main] ---- Time[20:098]
Observable (process3,0) onNext (0) ------Thread[main] ---- Time[20:188]
Subscriber[observateur[0],process3] : onNext (0) ------Thread[RxComputationThreadPool-6] ---- Time[20:213]
Observable (process3,1) onNext (2) ------Thread[main] ---- Time[20:336]
Subscriber[observateur[0],process3] : onNext (2) ------Thread[RxComputationThreadPool-6] ---- Time[20:338]
Observable (process3,2) onNext (4) ------Thread[main] ---- Time[20:676]
Observable (process3) onCompleted ------Thread[main] ---- Time[20:677]
main : attente fin observation ------Thread[main] ---- Time[20:677]
Subscriber[observateur[0],process3] : onNext (4) ------Thread[RxComputationThreadPool-6] ---- Time[20:678]
Subscriber[observateur[0],process3].onCompleted ------Thread[RxComputationThreadPool-6] ---- Time[20:679]
main : fin observation ------Thread[main] ---- Time[20:679]
main : début observation ------Thread[main] ---- Time[20:680]
Observable (process4) call start ------Thread[main] ---- Time[20:680]
Observable (process4,0) onNext (true) ------Thread[main] ---- Time[21:065]
Subscriber[observateur[0],process4] : onNext (true) ------Thread[main] ---- Time[21:067]
Observable (process4,1) onNext (false) ------Thread[main] ---- Time[21:187]
Subscriber[observateur[0],process4] : onNext (false) ------Thread[main] ---- Time[21:188]
Observable (process4,2) onNext (true) ------Thread[main] ---- Time[21:624]
Subscriber[observateur[0],process4] : onNext (true) ------Thread[main] ---- Time[21:625]
Observable (process4,3) onNext (false) ------Thread[main] ---- Time[21:765]
Subscriber[observateur[0],process4] : onNext (false) ------Thread[main] ---- Time[21:766]
Observable (process4) onCompleted ------Thread[main] ---- Time[21:767]
Subscriber[observateur[0],process4].onCompleted ------Thread[main] ---- Time[21:767]
main : attente fin observation ------Thread[main] ---- Time[21:767]
main : fin observation ------Thread[main] ---- Time[21:768]
  • Der Prozess „process1“ erzeugt im Berechnungs-Thread [RxComputationThreadPool-4] eine reelle Zahl (Zeile 4), die im Berechnungs-Thread [RxComputationThreadPool-3] beobachtet wird (Zeile 6);
  • Der Prozess process2 erzeugt 2 Zeichenfolgen (Zeilen 12, 14) im Berechnungs-Thread [RxComputationThreadPool-5], die im selben Thread beobachtet werden (Zeilen 13, 15);
  • Der Prozess process3 erzeugt 3 Ganzzahlen (Zeilen 21, 23, 25) auf dem Hauptthread, die auf dem Berechnungs-Thread [RxComputationThreadPool-6] beobachtet werden (Zeilen 22, 24, 28);
  • Der Prozess process4 erzeugt 4 Boolesche Werte (Zeilen 34, 36, 38, 40) auf dem Hauptthread, die auf demselben Hauptthread beobachtet werden (Zeilen 33, 35, 37, 39);

Der Leser ist eingeladen, dem oben Gesagten zu folgen:

  • den Lebenszyklus des beobachteten Prozesses und seines Threads;
  • den Lebenszyklus seines Beobachters und dessen Thread;

Ein Großteil der Attraktivität von Rx-Bibliotheken liegt in diesem Multithreading, das der Entwickler nicht selbst verwalten muss.

7.5. Kombinationen mehrerer Observables

7.5.1. Beispiel 14: Zusammenführen von zwei Observables mit [Observable.merge]

Wir stellen nun statische Methoden der Klasse [Observable] vor, die es ermöglichen, mehrere Observables zu einem einzigen Ergebnis-Observable zu kombinieren.

Das erste Beispiel dieser Art lautet wie folgt:


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.ProcessAction01;
 
import java.util.Random;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
 
public class Exemple14 {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Double> process1 = new Process<>(
                new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
                Schedulers.computation());
        // process2
        Process<String> process2 = new Process<>(
                new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), Schedulers.computation(), null);
        // merge
        Process<?> process12 = new Process<>("process12",
                Observable.merge(process1.getObservable(), process2.getObservable()));
        // subscriptions
        ProcessUtils.subscribe(1, process12);
    }
}
  • Zeilen 15–17: Ein Prozess namens [process1] gibt 3 reelle Zahlen auf einem Berechnungs-Thread aus. Er wird ebenfalls auf einem Berechnungs-Thread beobachtet;
  • Zeilen 19–20: Ein Prozess namens [process2] gibt 2 Zeichenfolgen auf einem Berechnungs-Thread aus. Der Beobachtungs-Thread ist nicht angegeben. Wir haben zuvor gesehen, dass in diesem Fall der Beobachtungs-Thread der Berechnungs-Thread ist;
  • Zeile 23: Die beiden Prozesse werden zusammengeführt, d. h., es wird ein Observable erstellt, dessen Elemente gleichzeitig aus beiden Prozessen stammen. Hierfür wird die statische Methode [Observable.merge] verwendet:
 

Entgegen dem, was das obige Diagramm vermuten lässt, können während der Zusammenführung Elemente aus Stream 1 zwischen die Elemente von Stream 2 eingefügt werden. Dies zeigen die Ausführungsergebnisse:

main : début observation ------Thread[main] ---- Time[56:053]
main : attente fin observation ------Thread[main] ---- Time[56:073]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[56:073]
Observable (process2) call start ------Thread[RxComputationThreadPool-5] ---- Time[56:074]
Observable (process1,0) onNext (64.8) ------Thread[RxComputationThreadPool-4] ---- Time[56:263]
Observable (process2,0) onNext (valeur-0) ------Thread[RxComputationThreadPool-5] ---- Time[56:403]
Observable (process2,1) onNext (valeur-1) ------Thread[RxComputationThreadPool-5] ---- Time[56:515]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-5] ---- Time[56:516]
Subscriber[observateur[0],process12] : onNext (64.8) ------Thread[RxComputationThreadPool-3] ---- Time[56:552]
Subscriber[observateur[0],process12] : onNext ("valeur-0") ------Thread[RxComputationThreadPool-3] ---- Time[56:553]
Subscriber[observateur[0],process12] : onNext ("valeur-1") ------Thread[RxComputationThreadPool-3] ---- Time[56:553]
Observable (process1,1) onNext (56.4) ------Thread[RxComputationThreadPool-4] ---- Time[56:716]
Subscriber[observateur[0],process12] : onNext (56.4) ------Thread[RxComputationThreadPool-3] ---- Time[56:718]
Observable (process1,2) onNext (22.8) ------Thread[RxComputationThreadPool-4] ---- Time[57:082]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[57:083]
Subscriber[observateur[0],process12] : onNext (22.8) ------Thread[RxComputationThreadPool-3] ---- Time[57:084]
Subscriber[observateur[0],process12].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[57:085]
main : fin observation ------Thread[main] ---- Time[57:085]
  • Zeile 3: Der Prozess [process1] läuft auf dem Berechnungs-Thread [RxComputationThreadPool-4];
  • Zeile 4: Der Prozess [process2] läuft auf dem Berechnungs-Thread [RxComputationThreadPool-5];
  • Zeile 9: Der Prozess [process12] wird auf dem Berechnungs-Thread [RxComputationThreadPool-3] beobachtet. Ich kenne die Regel nicht, die zu dieser Auswahl geführt hat;
  • Zeilen 9–11: Wir sehen, dass der Beobachter Elemente sowohl aus Prozess [process1] (Zeile 5) als auch aus Prozess [process2] (Zeilen 6, 7) beobachtet, obwohl keiner der beiden abgeschlossen ist (es liegt eine Vermischung vor);
  • Der Prozess [process12] wird beendet (Zeile 17), wenn beide Prozesse, process1 und process2, beendet sind;

7.5.2. Beispiel 15: Verkettung zweier Observables mit [Observable.concat]

Wir werden nun den folgenden Code untersuchen:


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.ProcessAction01;
 
import java.util.Random;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
 
public class Exemple15 {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Double> process1 = new Process<>(
                new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
                Schedulers.computation());
        // process2
        Process<String> process2 = new Process<>(
                new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), null, Schedulers.computation());
        // concat
        Process<?> process12 = new Process<>("process12",
                Observable.concat(process1.getObservable(), process2.getObservable()));
        // subscriptions
        ProcessUtils.subscribe(1, process12);
    }
}
  • Zeilen 15–17: Ein Prozess namens [process1] gibt 3 reelle Zahlen auf einem Berechnungs-Thread aus. Er wird ebenfalls auf einem Berechnungs-Thread beobachtet;
  • Zeilen 19–20: Ein Prozess namens [process2] gibt 2 Zeichenfolgen auf einem nicht näher bezeichneten Thread aus, hier dem standardmäßigen Hauptthread. Er wird auf einem Berechnungs-Thread beobachtet;
  • Zeile 23: Die beiden Prozesse werden verkettet, d. h., es wird ein Observable erstellt, dessen Elemente aus beiden Prozessen stammen. Die ausgegebenen Werte werden nicht vermischt. Der Prozess [process12] gibt zunächst alle Werte aus dem Prozess [process1] und anschließend die aus dem Prozess [process2] aus. Hierfür wird die statische Methode [Observable.concat] verwendet:
 

Die Ergebnisse der Ausführung lauten wie folgt:

main : début observation ------Thread[main] ---- Time[30:162]
main : attente fin observation ------Thread[main] ---- Time[30:189]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[30:190]
Observable (process1,0) onNext (79.2) ------Thread[RxComputationThreadPool-4] ---- Time[30:681]
Observable (process1,1) onNext (98.39999999999999) ------Thread[RxComputationThreadPool-4] ---- Time[30:792]
Subscriber[observateur[0],process12] : onNext (79.2) ------Thread[RxComputationThreadPool-3] ---- Time[30:975]
Subscriber[observateur[0],process12] : onNext (98.39999999999999) ------Thread[RxComputationThreadPool-3] ---- Time[30:976]
Observable (process1,2) onNext (84.0) ------Thread[RxComputationThreadPool-4] ---- Time[31:084]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[31:085]
Subscriber[observateur[0],process12] : onNext (84.0) ------Thread[RxComputationThreadPool-3] ---- Time[31:086]
Observable (process2) call start ------Thread[RxComputationThreadPool-3] ---- Time[31:087]
Observable (process2,0) onNext (valeur-0) ------Thread[RxComputationThreadPool-3] ---- Time[31:556]
Subscriber[observateur[0],process12] : onNext ("valeur-0") ------Thread[RxComputationThreadPool-5] ---- Time[31:557]
Observable (process2,1) onNext (valeur-1) ------Thread[RxComputationThreadPool-3] ---- Time[31:608]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[31:609]
Subscriber[observateur[0],process12] : onNext ("valeur-1") ------Thread[RxComputationThreadPool-5] ---- Time[31:609]
Subscriber[observateur[0],process12].onCompleted ------Thread[RxComputationThreadPool-5] ---- Time[31:610]
main : fin observation ------Thread[main] ---- Time[31:611]
  • Zeilen 3–10: Prozess [process1] läuft und Prozess [process12] gibt die von [process1] ausgegebenen Werte weiter;
  • Zeile 9: Prozess [process1] ist beendet;
  • Zeilen 11–17: Prozess [process2] läuft und Prozess [process12] gibt die von [process2] ausgegebenen Werte aus;

Bei Prozess2 gibt es eine Besonderheit: Wir haben keinen Ausführungsthread angegeben. Man könnte daher erwarten, dass standardmäßig der Hauptthread verwendet wird. Dies ist jedoch nicht der Fall. Der Ausführungsthread war der Berechnungsthread [RxComputationThreadPool-3] (Zeile 11). Wenn also kein Ausführungs- oder Beobachtungsthread angegeben ist, können wir keine Annahmen darüber treffen, welcher Thread ausgewählt wird.

7.5.3. Beispiel 16: Kombinieren von zwei Observables mit [Observable.zip]

Wir werden nun den folgenden Code untersuchen:


package dvp.rxjava.observables.exemples;
 
import java.util.Arrays;
import java.util.Random;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.functions.FuncN;
import rx.schedulers.Schedulers;
 
public class Exemple16 {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Double> process1 = new Process<>(
                new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
                Schedulers.computation());
        // process 2
        Process<String> process2 = new Process<>(
                new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), null, null);
        // 2-process combination function
        FuncN<String> funcn = new FuncN<String>() {
            @Override
            public String call(Object... args) {
                if (args.length == 2) {
                    return String.format("double=%s, string=%s", args[0], args[1]);
                } else {
                    throw new RuntimeException("la fonction attend 2 paramètres exactement");
                }
            }
        };
        // zip of the 2 processes
        Process<String> process12 = new Process<>("process12",
                Observable.zip(Arrays.asList(process1.getObservable(), process2.getObservable()), funcn));
        // subscriptions
        ProcessUtils.subscribe(1, process12);
    }
}
  • Zeilen 16–18: Ein Prozess namens [process1] gibt 3 reelle Zahlen auf einem Berechnungs-Thread aus. Er wird ebenfalls auf einem Berechnungs-Thread beobachtet;
  • Zeilen 20–21: Ein Prozess namens [process2] gibt 2 Zeichenfolgen auf einem nicht näher bezeichneten Thread aus. Der Beobachtungs-Thread ist ebenfalls nicht näher bezeichnet;
  • Zeilen 23–32: Instanziierung eines Typs [FuncN<String>] mit einer anonymen Klasse. FuncN ist eine funktionale Schnittstelle:
 

Die Methode [FuncN.call] erwartet ein Array von Objekten und gibt einen Typ R zurück. Die Funktion [funcn] wird verwendet, um die Prozesse process1 und process2 in dieser Reihenfolge zu kombinieren. In der Methode [FuncN.call]:

  • args[0] ein Double sein;
  • args[1] ist ein String;

Hier ist das Ergebnis von [funcn.call] die Zeichenkette aus Zeile 27. Für die Konstruktion dieses Ergebnisses ist es nicht erforderlich, die Typen der Argumente der Aufrufmethode zu kennen.

Die beiden Prozesse werden wie folgt kombiniert:


// zip des 2 processus
Process<String> process12 = new Process<>("process12",
Observable.zip(Arrays.asList(process1.getObservable(), process2.getObservable()), funcn));

Die Methode [Observable.zip] funktioniert wie folgt:

 

Wir sehen, dass:

  • das erste Argument von zip ein Iterable<Observable> ist. In unserem Beispiel haben wir einen tatsächlichen Parameter vom Typ List<Observable>, der aus unseren beiden Observables besteht;
  • das zweite Argument von zip ist vom Typ FuncN. In unserem Beispiel ist der tatsächliche Parameter [funcn];

Die Ausführung liefert die folgenden Ergebnisse:

main : début observation ------Thread[main] ---- Time[55:636]
Observable (process2) call start ------Thread[main] ---- Time[55:666]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[55:666]
Observable (process1,0) onNext (69.6) ------Thread[RxComputationThreadPool-4] ---- Time[55:902]
Observable (process2,0) onNext (valeur-0) ------Thread[main] ---- Time[56:076]
Observable (process1,1) onNext (82.8) ------Thread[RxComputationThreadPool-4] ---- Time[56:271]
Subscriber[observateur[0],process12] : onNext ("double=69.6, string=valeur-0") ------Thread[main] ---- Time[56:352]
Observable (process1,2) onNext (14.399999999999999) ------Thread[RxComputationThreadPool-4] ---- Time[56:641]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[56:642]
Observable (process2,1) onNext (valeur-1) ------Thread[main] ---- Time[56:778]
Subscriber[observateur[0],process12] : onNext ("double=82.8, string=valeur-1") ------Thread[main] ---- Time[56:779]
Observable (process2) onCompleted ------Thread[main] ---- Time[56:779]
Subscriber[observateur[0],process12].onCompleted ------Thread[main] ---- Time[56:780]
main : attente fin observation ------Thread[main] ---- Time[56:781]
main : fin observation ------Thread[main] ---- Time[56:781]
  • Zeilen 7, 11: process12 gibt zwei Elemente aus;
  • Zeile 8: Das von Prozess1 ausgegebene zusätzliche Element, das in Prozess2 keinen Partner hat, wird vom Ergebnisprozess Prozess12 nicht ausgegeben;

Wir sehen, dass process2, dem weder ein Ausführungs- noch ein Beobachtungs-Thread zugewiesen worden war, für beides den Haupt-Thread verwendet hat.

7.5.4. Beispiel 17: Kombinieren von zwei Observables mit [Observable.combineLatest]

Wir werden nun den folgenden Code untersuchen:


package dvp.rxjava.observables.exemples;
 
import java.util.Random;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
 
public class Exemple17 {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Double> process1 = new Process<>(
                new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
                Schedulers.computation());
        // process 2
        Process<Double> process2 = new Process<>(
                new ProcessAction01<Double>("process2", 2, i -> new Random().nextInt(200) * 1.4), null,
                Schedulers.computation());
        // combining the 2 processes
        Process<Double> process12 = new Process<>("process12",
                Observable.combineLatest(process1.getObservable(), process2.getObservable(), (d1, d2) -> d1 + d2));
        // subscriptions
        ProcessUtils.subscribe(1, process12);
    }
}
  • Zeilen 14–16: Ein Prozess namens [process1] gibt 3 reelle Zahlen auf einem Berechnungs-Thread aus. Er wird ebenfalls auf einem Berechnungs-Thread beobachtet;
  • Zeilen 18–20: Ein Prozess namens [process2] gibt 2 reelle Zahlen auf einem ungebundenen Thread aus. Diese werden auf einem Berechnungs-Thread beobachtet;
  • Zeile 23: Die beiden Observables werden mithilfe der folgenden statischen Methode [Observable.combineLatest] kombiniert:
 

Das Observable [combineLatest] funktioniert wie folgt: Wenn eines der beiden Observables ein Element E1 ausgibt, wird dieses Element durch [combineFunction] mit dem zuletzt vom anderen Observable ausgegebenen Element kombiniert.

Die Ausführung dieses Codes liefert das folgende Ergebnis:

main : début observation ------Thread[main] ---- Time[01:768]
Observable (process2) call start ------Thread[main] ---- Time[01:791]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[01:791]
Observable (process1,0) onNext (54.0) ------Thread[RxComputationThreadPool-4] ---- Time[01:991]
Observable (process2,0) onNext (56.0) ------Thread[main] ---- Time[02:245]
Observable (process1,1) onNext (51.6) ------Thread[RxComputationThreadPool-4] ---- Time[02:358]
Subscriber[observateur[0],process12] : onNext (110.0) ------Thread[RxComputationThreadPool-5] ---- Time[02:521]
Subscriber[observateur[0],process12] : onNext (107.6) ------Thread[RxComputationThreadPool-5] ---- Time[02:522]
Observable (process2,1) onNext (261.8) ------Thread[main] ---- Time[02:595]
Observable (process2) onCompleted ------Thread[main] ---- Time[02:596]
main : attente fin observation ------Thread[main] ---- Time[02:596]
Subscriber[observateur[0],process12] : onNext (313.40000000000003) ------Thread[RxComputationThreadPool-5] ---- Time[02:597]
Observable (process1,2) onNext (80.39999999999999) ------Thread[RxComputationThreadPool-4] ---- Time[02:790]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[02:791]
Subscriber[observateur[0],process12] : onNext (342.2) ------Thread[RxComputationThreadPool-3] ---- Time[02:792]
Subscriber[observateur[0],process12].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[02:792]
main : fin observation ------Thread[main] ---- Time[02:793]
  • Zeile 5: Die Ausgabe von process2 (56) wird mit dem letzten von process1 ausgegebenen Element (54, Zeile 4) kombiniert und liefert das in Zeile 7 gezeigte Ergebnis;
  • Zeile 6: Die Ausgabe von process1 (51,6) wird mit dem letzten von process2 ausgegebenen Element (56, Zeile 5) kombiniert und ergibt das Ergebnis in Zeile 8;
  • Zeile 9: Die Ausgabe von Prozess 2 (261,8) wird mit dem letzten von Prozess 1 ausgegebenen Element (51,6, Zeile 6) kombiniert und ergibt das Ergebnis in Zeile 12;
  • Zeile 13: Die Ausgabe von Prozess1 (80,39) wird mit dem letzten von Prozess2 ausgegebenen Element (261,8, Zeile 9) kombiniert und ergibt das Ergebnis in Zeile 15;

Dies ist eine Variante des Observables [zip], bei der die kombinierten Elemente diesmal nicht unbedingt die Elemente an derselben Position in den Strömen sind. Beachten Sie hierbei, dass Prozess2, dem kein Ausführungsthread zugewiesen worden war, auf dem Hauptthread ausgeführt wurde (Zeile 2).

7.5.5. Beispiel 18: Kombinieren von zwei Observables mit [Observable.amb]

Wir werden nun den folgenden Code untersuchen:


package dvp.rxjava.observables.exemples;
 
import java.util.Random;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
 
public class Exemple18 {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Double> process1 = new Process<>(
                new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
                Schedulers.computation());
        // process 2
        Process<Double> process2 = new Process<>(
                new ProcessAction01<Double>("process2", 2, i -> new Random().nextInt(200) * 1.4), null, null);
        // combining the 2 processes
        Process<Double> process12 = new Process<>("process12",
                Observable.amb(process1.getObservable(), process2.getObservable()));
        // subscriptions
        ProcessUtils.subscribe(1, process12);
    }
}
  • Zeilen 14–16: Ein Prozess namens [process1] gibt 3 reelle Zahlen auf einem Berechnungs-Thread aus. Er wird ebenfalls auf einem Berechnungs-Thread beobachtet;
  • Zeilen 18–20: Ein Prozess namens [process2] gibt 2 reelle Zahlen auf einem ungebundenen Thread aus. Diese werden auf einem ungebundenen Thread beobachtet;
  • Zeile 22: Die beiden Observables werden mithilfe der folgenden statischen Methode [Observable.amb] kombiniert:
 

Wie in der obigen Abbildung dargestellt, gibt das Observable [Observable.amb(Observable o1, Observable o2)] die Elemente des Observables aus, das zuerst Elemente ausgibt. Dies wird durch die Ergebnisse des vorgestellten Beispiels bestätigt:

main : début observation ------Thread[main] ---- Time[21:594]
Observable (process2) call start ------Thread[main] ---- Time[21:612]
Observable (process1) call start ------Thread[RxComputationThreadPool-3] ---- Time[21:612]
Observable (process2,0) onNext (155.39999999999998) ------Thread[main] ---- Time[21:817]
Observable (process1) onError ------Thread[RxComputationThreadPool-3] ---- Time[21:820]
Observable (process1,0) onNext (90.0) ------Thread[RxComputationThreadPool-3] ---- Time[21:820]
Observable (process1,1) onNext (104.39999999999999) ------Thread[RxComputationThreadPool-3] ---- Time[21:877]
Subscriber[observateur[0],process12] : onNext (155.39999999999998) ------Thread[main] ---- Time[22:105]
Observable (process1,2) onNext (44.4) ------Thread[RxComputationThreadPool-3] ---- Time[22:122]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[22:123]
Observable (process2,1) onNext (201.6) ------Thread[main] ---- Time[22:581]
Subscriber[observateur[0],process12] : onNext (201.6) ------Thread[main] ---- Time[22:583]
Observable (process2) onCompleted ------Thread[main] ---- Time[22:583]
Subscriber[observateur[0],process12].onCompleted ------Thread[main] ---- Time[22:584]
main : attente fin observation ------Thread[main] ---- Time[22:585]
main : fin observation ------Thread[main] ---- Time[22:586]
  • Zeile 4: process2 sendet als erstes;
  • Zeilen 8, 12: process12 sendet alle von process2 gesendeten Elemente (Zeilen 4, 11);

7.6. Verarbeitungskette für ein Observable

7.6.1. Beispiel 19: Transformieren eines Observables mit [Observable.map]

In den vorangegangenen Beispielen haben wir verschiedene Kombinationen von zwei Observables zu einem dritten Observable untersucht. Wir stellen nun statische Methoden der Klasse [Observable] vor, die Transformations-, Filter- und Aggregationsoperationen auf einem Observable ermöglichen. Hier finden wir Methoden, die denen der in Abschnitt 5 behandelten Klasse [Stream] entsprechen.

Unser erstes Beispiel sieht wie folgt aus:


package dvp.rxjava.observables.exemples;
 
import java.util.Random;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
 
public class Exemple19 {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Double> process1 = new Process<>(
                new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
                Schedulers.computation());
        // process 2
        Process<String> process2 = new Process<>("process2",
                process1.getObservable().map(d -> String.format("valeur-%s", d)));
        // subscriptions
        ProcessUtils.subscribe(1, process2);
    }
}
  • Zeilen 14–16: Ein Prozess namens process1 gibt 3 reelle Zahlen in einem Berechnungs-Thread aus. Er wird ebenfalls in einem Berechnungs-Thread beobachtet;
  • Zeilen 17–18: Die von process1 ausgegebenen Zahlen werden in process2 in Zeichenfolgen umgewandelt;
  • Zeile 20: Wir beobachten „process2“;

Die Methode [Observable.map] in Zeile 18 entspricht der in Abschnitt 5.5 behandelten Methode [Stream.map]:

 

Die Ergebnisse des Beispiels lauten wie folgt:

main : début observation ------Thread[main] ---- Time[55:328]
main : attente fin observation ------Thread[main] ---- Time[55:346]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[55:347]
Observable (process1,0) onNext (21.599999999999998) ------Thread[RxComputationThreadPool-4] ---- Time[55:354]
Observable (process1,1) onNext (97.2) ------Thread[RxComputationThreadPool-4] ---- Time[55:512]
Subscriber[observateur[0],process2] : onNext ("valeur-21.599999999999998") ------Thread[RxComputationThreadPool-3] ---- Time[55:615]
Subscriber[observateur[0],process2] : onNext ("valeur-97.2") ------Thread[RxComputationThreadPool-3] ---- Time[55:616]
Observable (process1,2) onNext (98.39999999999999) ------Thread[RxComputationThreadPool-4] ---- Time[55:803]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[55:804]
Subscriber[observateur[0],process2] : onNext ("valeur-98.39999999999999") ------Thread[RxComputationThreadPool-3] ---- Time[55:804]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[55:805]
main : fin observation ------Thread[main] ---- Time[55:805]
  • Zeilen 4, 5 und 8: die Emissionen von process1. Dies sind reelle Zahlen;
  • Zeilen 6, 7, 10: die beobachteten Emissionen von process2. Dies sind Zeichenfolgen;

7.6.2. Beispiel 20: Filtern einer Beobachtungsgröße mit [Observable.filter]

Das Beispiel sieht wie folgt aus:


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
 
public class Exemple20 {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // process 2
        Process<Integer> process2 = new Process<>("process2", process1.getObservable().filter(i -> i % 2 == 0));
        // subscriptions
        ProcessUtils.subscribe(1, process2);
    }
}
  • Zeilen 11–12: Ein Prozess namens process1 gibt in einem Worker-Thread Ganzzahlen von 0 bis 2 aus. Er wird ebenfalls in einem Worker-Thread beobachtet;
  • Zeile 14: Die von process1 ausgegebenen Zahlen werden gefiltert, sodass in process2 nur gerade Zahlen verbleiben;
  • Zeile 20: Wir beobachten „process2“;

Die Methode [Observable.filter] in Zeile 18 entspricht der in Abschnitt 5.4 behandelten Methode [Stream.filter]:

 

Die Ergebnisse des Beispiels lauten wie folgt:

main : début observation ------Thread[main] ---- Time[30:319]
main : attente fin observation ------Thread[main] ---- Time[30:335]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[30:336]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[30:388]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[30:625]
Subscriber[observateur[0],process2] : onNext (0) ------Thread[RxComputationThreadPool-3] ---- Time[30:703]
Observable (process1,2) onNext (2) ------Thread[RxComputationThreadPool-4] ---- Time[30:704]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[30:705]
Subscriber[observateur[0],process2] : onNext (2) ------Thread[RxComputationThreadPool-3] ---- Time[30:706]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[30:707]
main : fin observation ------Thread[main] ---- Time[30:707]
  • Zeilen 4, 5 und 7: Emissionen von process1;
  • Zeilen 6, 9: die beobachteten Emissionen von Prozess2. Dies sind die gerade nummerierten Elemente von Prozess1;

7.6.3. Beispiel 21: Transformieren einer Observable mit [Observable.flatMap]

Das Beispiel sieht wie folgt aus:


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
 
public class Exemple21 {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // process 2
        Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMap(i -> {
            int value = i * 10;
            return Observable.just(value, value + 1, value + 2);
        }));
        // subscriptions
        ProcessUtils.subscribe(1, process2);
    }
}
  • Zeilen 12–13: Ein Prozess namens process1 gibt in einem Berechnungs-Thread ganze Zahlen von 0 bis 2 aus. Er wird ebenfalls in einem Berechnungs-Thread beobachtet;
  • Zeilen 15–18: Jede von process1 ausgegebene Zahl n wird in ein Observable umgewandelt, das die drei Zahlen (10*n, 10*n+1, 10*n+2) ausgibt. Hätten wir in Zeile 15 die Methode [map] verwendet, würde process2 einen Typ Observable<Integer> anstelle eines Typs Integer ausgeben. Die verwendete Methode [flatMap] ermöglicht es uns, diese Sequenz von Elementen des Typs Observable<Integer> in eine Sequenz von Elementen des Typs Integer zu glätten, die aus jedem Element jedes Observable<Integer> besteht;
  • Zeile 20: Wir beobachten process2;

Die Methode [Observable.flatMap] in Zeile 15 entspricht der in Abschnitt 5.6.12 behandelten Methode [Stream.flatMap]:

 

Die Ergebnisse des Beispiels lauten wie folgt:

main : début observation ------Thread[main] ---- Time[31:466]
main : attente fin observation ------Thread[main] ---- Time[31:486]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[31:486]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[31:777]
Subscriber[observateur[0],process2] : onNext (0) ------Thread[RxComputationThreadPool-3] ---- Time[32:082]
Subscriber[observateur[0],process2] : onNext (1) ------Thread[RxComputationThreadPool-3] ---- Time[32:085]
Subscriber[observateur[0],process2] : onNext (2) ------Thread[RxComputationThreadPool-3] ---- Time[32:087]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[32:192]
Subscriber[observateur[0],process2] : onNext (10) ------Thread[RxComputationThreadPool-3] ---- Time[32:194]
Subscriber[observateur[0],process2] : onNext (11) ------Thread[RxComputationThreadPool-3] ---- Time[32:196]
Subscriber[observateur[0],process2] : onNext (12) ------Thread[RxComputationThreadPool-3] ---- Time[32:197]
Observable (process1,2) onNext (2) ------Thread[RxComputationThreadPool-4] ---- Time[32:686]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[32:687]
Subscriber[observateur[0],process2] : onNext (20) ------Thread[RxComputationThreadPool-3] ---- Time[32:688]
Subscriber[observateur[0],process2] : onNext (21) ------Thread[RxComputationThreadPool-3] ---- Time[32:690]
Subscriber[observateur[0],process2] : onNext (22) ------Thread[RxComputationThreadPool-3] ---- Time[32:692]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[32:693]
main : fin observation ------Thread[main] ---- Time[32:693]
  • Zeilen 5–7: die drei Emissionen von process2 im Anschluss an die Emission in Zeile 4 von process1;
  • Zeilen 9–11: die drei Emissionen von process2 im Anschluss an die Emission in Zeile 8 von process1;
  • Zeilen 14–16: die drei Emissionen von Prozess2, die auf die Emission in Zeile 12 von Prozess1 folgen;

Der folgende Code zeigt, wie man einen Typ Observable<Integer[]> aus Prozess1 erstellt [Beispiel 21b]:


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
 
public class Exemple21b {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // process 2
        Process<Integer[]> process2 = new Process<>("process2", process1.getObservable().map(i -> {
            int value = i * 10;
            return new Integer[] { value, value + 1, value + 2 };
        }));
        // subscriptions
        ProcessUtils.subscribe(1, process2);
    }
}
  • Zeile 14: Die Methode [Observable.map] wird verwendet;
  • Zeile 16: die einen Typ Integer[] zurückgibt;

Die Ergebnisse lauten wie folgt:

main : début observation ------Thread[main] ---- Time[58:089]
main : attente fin observation ------Thread[main] ---- Time[58:107]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[58:108]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[58:503]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[58:762]
Subscriber[observateur[0],process2] : onNext ([0,1,2]) ------Thread[RxComputationThreadPool-3] ---- Time[58:792]
Subscriber[observateur[0],process2] : onNext ([10,11,12]) ------Thread[RxComputationThreadPool-3] ---- Time[58:795]
Observable (process1,2) onNext (2) ------Thread[RxComputationThreadPool-4] ---- Time[58:851]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[58:852]
Subscriber[observateur[0],process2] : onNext ([20,21,22]) ------Thread[RxComputationThreadPool-3] ---- Time[58:853]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[58:854]
main : fin observation ------Thread[main] ---- Time[58:854]
  • Zeilen 6, 7, 10: Wir sehen die Ergebnisse der Map-Funktion;

All diese beobachtbaren Transformationen können verkettet werden, da jede Transformation eine neue Beobachtbare erzeugt. Dies wird im folgenden Beispiel [Beispiel 21c] veranschaulicht:


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
 
public class Exemple21c {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // process 2
        Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMap(i -> {
            int value = i * 10;
            return Observable.just(value, value + 1, value + 2);
        }).filter(i -> i % 2 == 0));
        // subscriptions
        ProcessUtils.subscribe(1, process2);
    }
}
  • Zeilen 15–18: Auf flatMap folgt ein Filter;

Die Ausführungsergebnisse lauten wie folgt:

main : début observation ------Thread[main] ---- Time[37:993]
main : attente fin observation ------Thread[main] ---- Time[38:016]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[38:017]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[38:124]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[38:366]
Observable (process1,2) onNext (2) ------Thread[RxComputationThreadPool-4] ---- Time[38:380]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[38:381]
Subscriber[observateur[0],process2] : onNext (0) ------Thread[RxComputationThreadPool-3] ---- Time[38:436]
Subscriber[observateur[0],process2] : onNext (2) ------Thread[RxComputationThreadPool-3] ---- Time[38:439]
Subscriber[observateur[0],process2] : onNext (10) ------Thread[RxComputationThreadPool-3] ---- Time[38:441]
Subscriber[observateur[0],process2] : onNext (12) ------Thread[RxComputationThreadPool-3] ---- Time[38:443]
Subscriber[observateur[0],process2] : onNext (20) ------Thread[RxComputationThreadPool-3] ---- Time[38:445]
Subscriber[observateur[0],process2] : onNext (22) ------Thread[RxComputationThreadPool-3] ---- Time[38:446]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[38:447]
main : fin observation ------Thread[main] ---- Time[38:447]
  • Zeilen 8–13: process2 hat nur die geraden Elemente aus flatMap ausgegeben;

Eine Methode, die [flatMap] ähnelt, ist die Methode [flatMapIterable], die durch das folgende Beispiel [Beispiel 21d] veranschaulicht wird:


package dvp.rxjava.observables.exemples;
 
import java.util.Arrays;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
 
public class Exemple21d {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // process 2
        Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMapIterable(i -> {
            int value = i * 10;
            return Arrays.asList(value, value + 1, value + 2);
        }).filter(i -> i % 2 == 0));
        // subscriptions
        ProcessUtils.subscribe(1, process2);
    }
}

Zeile 16: Anstelle der Methode [flatMap] verwenden wir die Methode [flatMapIterable]. In diesem Fall muss die Transformationsfunktion einen Typ Iterable<T> (Zeile 18) anstelle eines Typs Observable<T> zurückgeben.

Wir erhalten die gleichen Ergebnisse wie zuvor.

Kehren wir zur Definition der Methode [flatMap] zurück:

 

Wie oben gezeigt, wurde ein blaues Element [3] zwischen die beiden grünen Elemente [1-2] eingefügt. Das bedeutet, dass die Methode [flatMap] beim Abflachen von Observable<T>s die Emissionsreihenfolge dieser verschiedenen internen Observables beibehält. Dies wird durch das folgende Beispiel [Beispiel21e] veranschaulicht:


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
 
public class Exemple21e {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // process 2
        Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
                Schedulers.computation(), Schedulers.computation());
        // process 3
        Process<Integer> process3 = new Process<>("process3",
                process1.getObservable().flatMap(i -> process2.getObservable()));
        // subscriptions
        ProcessUtils.subscribe(1, process3);
    }
}
  • Zeilen 11–12: Der Prozess „process1“ gibt die ganzen Zahlen [0,1] aus;
  • Zeilen 14–15: Prozess2 gibt die ganzen Zahlen [10, 11, 12] aus;
  • Zeilen 17–18: Jedes von Prozess1 ausgegebene Element wird der Beobachtungsgröße von Prozess2 zugeordnet. Das bedeutet:
    • das Element [0] von Prozess1 einer Beobachtungsgröße zugeordnet wird, die [10,11,12] ausgibt;
    • das Gleiche gilt für das Element 1;

Am Ende werden die 6 Zahlen [10, 11, 12, 10, 11, 12] ausgegeben. Wir wollen sehen, in welcher Reihenfolge.

Die Ausführungsergebnisse lauten wie folgt:

main : début observation ------Thread[main] ---- Time[22:540]
main : attente fin observation ------Thread[main] ---- Time[22:566]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[22:566]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[22:949]
Observable (process2) call start ------Thread[RxComputationThreadPool-6] ---- Time[22:951]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[23:159]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[23:160]
Observable (process2) call start ------Thread[RxComputationThreadPool-8] ---- Time[23:160]
Observable (process2,0) onNext (10) ------Thread[RxComputationThreadPool-6] ---- Time[23:286]
Observable (process2,0) onNext (10) ------Thread[RxComputationThreadPool-8] ---- Time[23:513]
Subscriber[observateur[0],process3] : onNext (10) ------Thread[RxComputationThreadPool-5] ---- Time[23:597]
Subscriber[observateur[0],process3] : onNext (10) ------Thread[RxComputationThreadPool-5] ---- Time[23:599]
Observable (process2,1) onNext (11) ------Thread[RxComputationThreadPool-6] ---- Time[23:645]
Subscriber[observateur[0],process3] : onNext (11) ------Thread[RxComputationThreadPool-5] ---- Time[23:647]
Observable (process2,2) onNext (12) ------Thread[RxComputationThreadPool-6] ---- Time[23:789]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-6] ---- Time[23:790]
Subscriber[observateur[0],process3] : onNext (12) ------Thread[RxComputationThreadPool-5] ---- Time[23:791]
Observable (process2,1) onNext (11) ------Thread[RxComputationThreadPool-8] ---- Time[23:976]
Subscriber[observateur[0],process3] : onNext (11) ------Thread[RxComputationThreadPool-7] ---- Time[23:978]
Observable (process2,2) onNext (12) ------Thread[RxComputationThreadPool-8] ---- Time[24:184]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-8] ---- Time[24:184]
Subscriber[observateur[0],process3] : onNext (12) ------Thread[RxComputationThreadPool-7] ---- Time[24:186]
Subscriber[observateur[0],process3].onCompleted ------Thread[RxComputationThreadPool-7] ---- Time[24:187]
main : fin observation ------Thread[main] ---- Time[24:187]

Wir sehen, dass die Emissionsreihenfolge von process3 wie folgt war: [10, 10, 11, 12, 11, 12] (Zeilen 11, 12, 14, 17, 19, 22). Daher waren die von process2 emittierten Elemente tatsächlich durcheinander geraten. Wir können dies vermeiden, indem wir die Methode [concatMap] anstelle der Methode [flatMap] verwenden. Dies wird durch den folgenden Code [Beispiel21ef] veranschaulicht:


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
 
public class Exemple21ef {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // process 2
        Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
                Schedulers.computation(), Schedulers.computation());
        // process 3
        Process<Integer> process3 = new Process<>("process3",
                process1.getObservable().concatMap(i -> process2.getObservable()));
        // subscriptions
        ProcessUtils.subscribe(1, process3);
    }
}

In Zeile 18 haben wir [flatMap] durch [concatMap] ersetzt. Die Ergebnisse der Ausführung lauten wie folgt:

main : début observation ------Thread[main] ---- Time[45:507]
main : attente fin observation ------Thread[main] ---- Time[45:530]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[45:530]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[45:775]
Observable (process2) call start ------Thread[RxComputationThreadPool-6] ---- Time[45:778]
Observable (process2,0) onNext (10) ------Thread[RxComputationThreadPool-6] ---- Time[45:846]
Observable (process2,1) onNext (11) ------Thread[RxComputationThreadPool-6] ---- Time[45:890]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[45:947]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[45:948]
Observable (process2,2) onNext (12) ------Thread[RxComputationThreadPool-6] ---- Time[46:096]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-6] ---- Time[46:097]
Subscriber[observateur[0],process3] : onNext (10) ------Thread[RxComputationThreadPool-5] ---- Time[46:144]
Subscriber[observateur[0],process3] : onNext (11) ------Thread[RxComputationThreadPool-5] ---- Time[46:147]
Subscriber[observateur[0],process3] : onNext (12) ------Thread[RxComputationThreadPool-5] ---- Time[46:148]
Observable (process2) call start ------Thread[RxComputationThreadPool-8] ---- Time[46:149]
Observable (process2,0) onNext (10) ------Thread[RxComputationThreadPool-8] ---- Time[46:364]
Subscriber[observateur[0],process3] : onNext (10) ------Thread[RxComputationThreadPool-7] ---- Time[46:366]
Observable (process2,1) onNext (11) ------Thread[RxComputationThreadPool-8] ---- Time[46:529]
Subscriber[observateur[0],process3] : onNext (11) ------Thread[RxComputationThreadPool-7] ---- Time[46:531]
Observable (process2,2) onNext (12) ------Thread[RxComputationThreadPool-8] ---- Time[46:558]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-8] ---- Time[46:559]
Subscriber[observateur[0],process3] : onNext (12) ------Thread[RxComputationThreadPool-7] ---- Time[46:560]
Subscriber[observateur[0],process3].onCompleted ------Thread[RxComputationThreadPool-7] ---- Time[46:562]
main : fin observation ------Thread[main] ---- Time[46:562]

Wir sehen, dass die Emissionsreihenfolge von Prozess3 wie folgt war: [10, 11, 12, 10, 11, 12] (Zeilen 12–14, 17, 19, 22). Die von Prozess2 emittierten Elemente wurden nicht durcheinandergewürfelt.

Eine weitere Variante der [map]-Methode ist die [switchMap]-Methode:

 

Oben werden aus dem Observable [1] drei weitere Observables [2] mit jeweils zwei Elementen erstellt, die dann wie bei [flatMap] [3] abgeflacht werden. Beachten Sie, dass das Ergebnis 5 Elemente enthält, nicht 6. Das liegt daran, dass, bevor das zweite Observable sein zweites Element [6] ausgibt, das dritte Observable sein erstes Element [5] ausgibt, wodurch das zweite Observable verworfen wird. Daher ist das Element [6] im resultierenden Observable [3] nicht zu finden.

Zur Veranschaulichung von [switchMap] verwenden wir das folgende Beispiel [Beispiel21eg]:


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
 
public class Exemple21eg {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // process 2
        Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
                Schedulers.computation(), Schedulers.computation());
        // process 3
        Process<Integer> process3 = new Process<>("process3",
                process1.getObservable().switchMap(i -> process2.getObservable()));
        // subscriptions
        ProcessUtils.subscribe(1, process3);
    }
}

Die Ausführung des Beispiels liefert folgende Ergebnisse:

main : début observation ------Thread[main] ---- Time[02:388]
main : attente fin observation ------Thread[main] ---- Time[02:419]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[02:419]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[02:641]
Observable (process2) call start ------Thread[RxComputationThreadPool-6] ---- Time[02:643]
Observable (process2,0) onNext (10) ------Thread[RxComputationThreadPool-6] ---- Time[02:802]
Observable (process2,1) onNext (11) ------Thread[RxComputationThreadPool-6] ---- Time[02:888]
Observable (process2,2) onNext (12) ------Thread[RxComputationThreadPool-6] ---- Time[02:957]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-6] ---- Time[02:958]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[03:005]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[03:007]
Observable (process2) call start ------Thread[RxComputationThreadPool-8] ---- Time[03:007]
Observable (process2,0) onNext (10) ------Thread[RxComputationThreadPool-8] ---- Time[03:106]
Subscriber[observateur[0],process3] : onNext (10) ------Thread[RxComputationThreadPool-5] ---- Time[03:106]
Subscriber[observateur[0],process3] : onNext (10) ------Thread[RxComputationThreadPool-5] ---- Time[03:108]
Observable (process2,1) onNext (11) ------Thread[RxComputationThreadPool-8] ---- Time[03:236]
Subscriber[observateur[0],process3] : onNext (11) ------Thread[RxComputationThreadPool-7] ---- Time[03:238]
Observable (process2,2) onNext (12) ------Thread[RxComputationThreadPool-8] ---- Time[03:716]
Observable (process2) onCompleted ------Thread[RxComputationThreadPool-8] ---- Time[03:717]
Subscriber[observateur[0],process3] : onNext (12) ------Thread[RxComputationThreadPool-7] ---- Time[03:718]
Subscriber[observateur[0],process3].onCompleted ------Thread[RxComputationThreadPool-7] ---- Time[03:718]
main : fin observation ------Thread[main] ---- Time[03:719]
  • process1 sendet 2 Elemente aus, die zu 2 process2-Observables mit jeweils 3 Elementen führen;
  • Zeile 14: Der Beobachter empfängt Element Nr. 0, das von der ersten process2-Beobachtungsgröße in Zeile 6 ausgesendet wurde;
  • Zeile 15: Der Beobachter empfängt Element Nr. 0, das von der zweiten Prozess2-Beobachtungsgröße in Zeile 13 ausgesendet wurde. Die Darstellung erklärt nicht, warum er zuvor die Elemente 1 und 2 nicht empfangen hat, die von der ersten Prozess2-Beobachtungsgröße in den Zeilen 7 und 8 ausgesendet wurden. In jedem Fall wird die erste Prozess2-Beobachtungsgröße verworfen;
  • Letztendlich sieht der Beobachter nur 4 Elemente (Zeilen 14, 15, 17, 20) statt der 6, die ausgesendet wurden;

7.6.4. Beispiele-22: Weitere Methoden der Klasse [Observable]

Die Klasse [Observable] enthält viele Methoden aus der Klasse [Stream], die auf ähnliche Weise funktionieren. Hier sind einige davon. Wir geben lediglich den Code und die Ergebnisse an.

[Beispiel 22a – take=limit]


package dvp.rxjava.observables.exemples;

import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
 
public class Exemple22a {
    public static void main(String[] args) throws InterruptedException {
        // process
        Process<Integer> process = new Process<>("process", Observable.range(1, 10).take(3));
        // subscriptions
        ProcessUtils.subscribe(1, process);
    }
}

Ergebnisse

1
2
3
4
5
6
7
main : début observation ------Thread[main] ---- Time[25:071]
Subscriber[observateur[0],process] : onNext (1) ------Thread[main] ---- Time[25:399]
Subscriber[observateur[0],process] : onNext (2) ------Thread[main] ---- Time[25:402]
Subscriber[observateur[0],process] : onNext (3) ------Thread[main] ---- Time[25:404]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[25:404]
main : attente fin observation ------Thread[main] ---- Time[25:406]
main : fin observation ------Thread[main] ---- Time[25:406]

[Beispiel 22b – takeLast]


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
 
public class Exemple22b {
    public static void main(String[] args) throws InterruptedException {
        // process
        Process<Integer> process = new Process<>("process", Observable.range(1, 10).takeLast(2));
        // subscriptions
        ProcessUtils.subscribe(1, process);
    }
}

Ergebnisse

1
2
3
4
5
6
main : début observation ------Thread[main] ---- Time[19:440]
Subscriber[observateur[0],process] : onNext (9) ------Thread[main] ---- Time[19:726]
Subscriber[observateur[0],process] : onNext (10) ------Thread[main] ---- Time[19:728]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[19:728]
main : attente fin observation ------Thread[main] ---- Time[19:729]
main : fin observation ------Thread[main] ---- Time[19:730]

[Beispiel 22c – überspringen]


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
 
public class Exemple22c {
    public static void main(String[] args) throws InterruptedException {
        // process
        Process<Integer> process = new Process<>("process", Observable.range(1, 10).skip(5).take(2));
        // subscriptions
        ProcessUtils.subscribe(1, process);
    }
}

Ergebnisse

1
2
3
4
5
6
main : début observation ------Thread[main] ---- Time[16:685]
Subscriber[observateur[0],process] : onNext (6) ------Thread[main] ---- Time[17:002]
Subscriber[observateur[0],process] : onNext (7) ------Thread[main] ---- Time[17:004]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[17:005]
main : attente fin observation ------Thread[main] ---- Time[17:006]
main : fin observation ------Thread[main] ---- Time[17:006]

[Beispiel 22d – reduce]


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
 
public class Exemple22d {
    public static void main(String[] args) throws InterruptedException {
        // process
        Process<Integer> process = new Process<>("process", Observable.range(1, 10).reduce(0, (i, a) -> i + a));
        // subscriptions
        ProcessUtils.subscribe(1, process);
    }
}
  • Zeile 10: berechnet die Summe der Elemente im Observable. Das Ergebnis ist ein Observable, das diese Summe ausgibt;

Ergebnisse

1
2
3
4
5
main : début observation ------Thread[main] ---- Time[52:412]
Subscriber[observateur[0],process] : onNext (55) ------Thread[main] ---- Time[52:640]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[52:640]
main : attente fin observation ------Thread[main] ---- Time[52:642]
main : fin observation ------Thread[main] ---- Time[52:642]

[Beispiel 22e – alle]


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
 
public class Exemple22e {
    public static void main(String[] args) throws InterruptedException {
        // process
        Process<Boolean> process = new Process<>("process", Observable.range(1, 10).all(i -> i > 10));
        // subscriptions
        ProcessUtils.subscribe(1, process);
    }
}
  • Zeile 10: Gibt ein Observable<Boolean> zurück, das das Element „true“ ausgibt, wenn das Prädikat der Methode [all] für alle Elemente wahr ist, andernfalls „false“;

Ergebnisse

1
2
3
4
5
main : début observation ------Thread[main] ---- Time[59:866]
Subscriber[observateur[0],process] : onNext (false) ------Thread[main] ---- Time[00:069]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[00:070]
main : attente fin observation ------Thread[main] ---- Time[00:071]
main : fin observation ------Thread[main] ---- Time[00:071]

[Beispiel 22f – Zählung]


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
 
public class Exemple22f {
    public static void main(String[] args) throws InterruptedException {
        // process
        Process<Integer> process = new Process<>("process", Observable.range(1, 10).count());
        // subscriptions
        ProcessUtils.subscribe(1, process);
    }
}
  • Zeile 10: [Observable.count] erstellt ein Observable mit einem Element, das der Summe der beobachteten Elemente entspricht;

Ergebnisse

1
2
3
4
5
main : début observation ------Thread[main] ---- Time[16:409]
Subscriber[observateur[0],process] : onNext (10) ------Thread[main] ---- Time[16:634]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[16:634]
main : attente fin observation ------Thread[main] ---- Time[16:635]
main : fin observation ------Thread[main] ---- Time[16:635]

[Beispiel 22g – distinct]


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
 
public class Exemple22g {
    public static void main(String[] args) throws InterruptedException {
        // process
        Process<Integer> process = new Process<>("process", Observable.just(1, 2, 1, 3).distinct());
        // subscriptions
        ProcessUtils.subscribe(1, process);
    }
}

Ergebnisse

1
2
3
4
5
6
7
main : début observation ------Thread[main] ---- Time[05:373]
Subscriber[observateur[0],process] : onNext (1) ------Thread[main] ---- Time[05:594]
Subscriber[observateur[0],process] : onNext (2) ------Thread[main] ---- Time[05:595]
Subscriber[observateur[0],process] : onNext (3) ------Thread[main] ---- Time[05:596]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[05:597]
main : attente fin observation ------Thread[main] ---- Time[05:597]
main : fin observation ------Thread[main] ---- Time[05:597]

[ Beispiel 22h – groupBy, asObservable]


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.observables.GroupedObservable;
 
public class Exemple22h {
    public static void main(String[] args) throws InterruptedException {
        // process
        Observable<GroupedObservable<Boolean, Integer>> obs = Observable.range(1, 10).groupBy(i -> i % 2 == 0);
        Process<Integer> process = new Process<>("process", obs.concatMap(g -> g.asObservable()));
        // subscriptions
        ProcessUtils.subscribe(1, process);
    }
}
  • Zeile 11: Die Methode [groupBy] gruppiert die 10 ausgegebenen Elemente in zwei Gruppen: gerade Zahlen und ungerade Zahlen. Das Ergebnis ist ein Observable<GroupedObservable<Boolean, Integer>>, d. h. ein Observable, dessen Elemente vom Typ GroupedObservable<Boolean, Integer> sind, wobei Boolean der Typ des Gruppenschlüssels ist (in diesem Fall false, true) und auch der Typ des Ergebnisses des Lambda-Ausdrucks, der als Parameter an die [groupBy]-Methode übergeben wird, und Integer der Typ der Elemente der Gruppe ist;
  • Zeile 12: Der Typ GroupedObservable verfügt über eine [asObservable]-Methode, mit der wir aus diesem Typ ein Observable erstellen können. Wir erhalten somit zwei Observable<Integer>-Typen, einen für gerade Zahlen und einen für ungerade Zahlen. Aus diesen beiden Observables erstellt die [concatMap]-Methode ein einziges Observable;

Ergebnisse

main : début observation ------Thread[main] ---- Time[23:809]
Subscriber[observateur[0],process] : onNext (1) ------Thread[main] ---- Time[24:034]
Subscriber[observateur[0],process] : onNext (3) ------Thread[main] ---- Time[24:036]
Subscriber[observateur[0],process] : onNext (5) ------Thread[main] ---- Time[24:037]
Subscriber[observateur[0],process] : onNext (7) ------Thread[main] ---- Time[24:038]
Subscriber[observateur[0],process] : onNext (9) ------Thread[main] ---- Time[24:039]
Subscriber[observateur[0],process] : onNext (2) ------Thread[main] ---- Time[24:041]
Subscriber[observateur[0],process] : onNext (4) ------Thread[main] ---- Time[24:043]
Subscriber[observateur[0],process] : onNext (6) ------Thread[main] ---- Time[24:044]
Subscriber[observateur[0],process] : onNext (8) ------Thread[main] ---- Time[24:045]
Subscriber[observateur[0],process] : onNext (10) ------Thread[main] ---- Time[24:046]
Subscriber[observateur[0],process].onCompleted ------Thread[main] ---- Time[24:047]
main : attente fin observation ------Thread[main] ---- Time[24:047]
main : fin observation ------Thread[main] ---- Time[24:048]

[Beispiel 22i – Zeitstempel]


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
import rx.schedulers.Timestamped;
 
public class Exemple22i {
    public static void main(String[] args) throws InterruptedException {
        // process 1
        Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
                Schedulers.computation());
        // process 2
        Process<Timestamped<Integer>> process2 = new Process<>("process2", process1.getObservable().timestamp());
        // subscriptions
        ProcessUtils.subscribe(1, process2);
    }
}
  • Zeile 15: Die Methode [timestamp] ordnet jedem verarbeiteten Element des Observables einen Zeitstempel zu;

Ergebnisse

main : début observation ------Thread[main] ---- Time[59:362]
main : attente fin observation ------Thread[main] ---- Time[59:377]
Observable (process1) call start ------Thread[RxComputationThreadPool-4] ---- Time[59:378]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-4] ---- Time[59:553]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-4] ---- Time[59:692]
Subscriber[observateur[0],process2] : onNext ({"timestampMillis":1462975259555,"value":0}) ------Thread[RxComputationThreadPool-3] ---- Time[59:789]
Subscriber[observateur[0],process2] : onNext ({"timestampMillis":1462975259789,"value":1}) ------Thread[RxComputationThreadPool-3] ---- Time[59:791]
Observable (process1,2) onNext (2) ------Thread[RxComputationThreadPool-4] ---- Time[00:025]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[00:027]
Subscriber[observateur[0],process2] : onNext ({"timestampMillis":1462975260026,"value":2}) ------Thread[RxComputationThreadPool-3] ---- Time[00:031]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-3] ---- Time[00:033]
main : fin observation ------Thread[main] ---- Time[00:034]

In diesem Beispiel ist es schwer zu erkennen, was die Zeitstempelinformationen bedeuten:

  • Zeilen 4–5: Wir sehen, dass Element 1 von Prozess1 139 ms nach Element 0 ausgegeben wurde;
  • Zeilen 6 und 7: Wir sehen, dass Element 1 von Prozess2 234 ms nach Element 0 beobachtet wurde;
  • Zeilen 5, 8: Wir sehen, dass Element 2 von Prozess1 33 ms nach Element 1 ausgegeben wurde;
  • Zeilen 7 und 10: Wir sehen, dass Element 2 von Prozess2 37 ms nach Element 1 beobachtet wurde;

Diese Verzögerungen sind darauf zurückzuführen, dass die Threads für die Beobachtung und die Ausführung der Observables nicht identisch sind. Wenn wir die Zeilen 12–13 durch die folgenden Zeilen ersetzen (Beispiel 22j):


// processus 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
null);
  • Zeilen 2–3: Wir geben den Beobachtungs-Thread nicht an. Wir wissen, dass in diesem Fall das Observable dort beobachtet wird, wo es ausgeführt wird;

Dies führt zu folgenden Ergebnissen:

main : début observation ------Thread[main] ---- Time[43:834]
main : attente fin observation ------Thread[main] ---- Time[43:845]
Observable (process1) call start ------Thread[RxComputationThreadPool-1] ---- Time[43:846]
Observable (process1,0) onNext (0) ------Thread[RxComputationThreadPool-1] ---- Time[44:291]
Subscriber[observateur[0],process2] : onNext ({"timestampMillis":1462976384293,"value":0}) ------Thread[RxComputationThreadPool-1] ---- Time[44:552]
Observable (process1,1) onNext (1) ------Thread[RxComputationThreadPool-1] ---- Time[44:878]
Subscriber[observateur[0],process2] : onNext ({"timestampMillis":1462976384879,"value":1}) ------Thread[RxComputationThreadPool-1] ---- Time[44:884]
Observable (process1,2) onNext (2) ------Thread[RxComputationThreadPool-1] ---- Time[45:274]
Subscriber[observateur[0],process2] : onNext ({"timestampMillis":1462976385275,"value":2}) ------Thread[RxComputationThreadPool-1] ---- Time[45:280]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[45:281]
Subscriber[observateur[0],process2].onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[45:283]
main : fin observation ------Thread[main] ---- Time[45:284]
  • Zeilen 4 und 6: process1 sendet sein Element Nr. 1 587 ms nach seinem Element Nr. 0;
  • Zeilen 5 und 7: Der Beobachter beobachtet diese beiden Elemente mit einem Abstand von 586 ms;
  • Zeilen 6 und 8: Prozess1 sendet sein Element Nr. 2 396 ms nach seinem Element Nr. 1;
  • Zeilen 7 und 9: Der Beobachter beobachtet diese beiden Elemente mit einem Zeitunterschied von 396 ms;

Hier sind die Zeitstempelwerte konsistent: Sie geben die Übertragungszeit des Elements genau wieder.

7.7. Scheduler

7.7.1. Beispiel 23: Der Scheduler [Schedulers.computation]

Wir werden nun die Ausführungsplaner untersuchen. Die Betrachtung erfolgt auf dem Ausführungsthread.

Das Thema Scheduler ist etwas undurchsichtig. Die verschiedenen Scheduler werden in dieser Frage auf der StackOverflow-Website vorgestellt [http://stackoverflow.com/questions/31276164/rxjava-schedulers-use-cases]:

 

Wir werden versuchen, die Verwendung dieser verschiedenen Scheduler anhand von Beispielen zu veranschaulichen. Das erste Beispiel veranschaulicht den Scheduler [Schedulers.computation]:


package dvp.rxjava.observables.exemples;
 
import java.util.Random;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
 
public class Exemple23 {
    public static void main(String[] args) throws InterruptedException {
        // processes
        @SuppressWarnings("unchecked")
        Process<Double> processes[] = new Process[10];
        for (int i = 0; i < processes.length; i++) {
            processes[i] = new Process<>(
                    new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
                    Schedulers.computation(), null);
        }
        // subscriptions
        ProcessUtils.subscribe(1, processes);
    }
}
  • Zeilen 14–19: Wir erstellen ein Array mit 10 Prozessen, die auf einem Berechnungs-Thread laufen;
  • Zeile 17: Jeder Prozess generiert eine zufällige reelle Zahl;
  • Zeile 21: Wir abonnieren alle diese Prozesse;

Die Ergebnisse lauten wie folgt:

main : début observation ------Thread[main] ---- Time[01:034]
Observable (process0) call start ------Thread[RxComputationThreadPool-1] ---- Time[01:042]
Observable (process2) call start ------Thread[RxComputationThreadPool-3] ---- Time[01:042]
Observable (process1) call start ------Thread[RxComputationThreadPool-2] ---- Time[01:042]
Observable (process5) call start ------Thread[RxComputationThreadPool-6] ---- Time[01:043]
Observable (process7) call start ------Thread[RxComputationThreadPool-8] ---- Time[01:043]
Observable (process4) call start ------Thread[RxComputationThreadPool-5] ---- Time[01:042]
Observable (process3) call start ------Thread[RxComputationThreadPool-4] ---- Time[01:042]
main : attente fin observation ------Thread[main] ---- Time[01:043]
Observable (process6) call start ------Thread[RxComputationThreadPool-7] ---- Time[01:043]
Observable (process3,0) onNext (70.8) ------Thread[RxComputationThreadPool-4] ---- Time[01:115]
Observable (process1,0) onNext (13.2) ------Thread[RxComputationThreadPool-2] ---- Time[01:153]
Observable (process0,0) onNext (63.599999999999994) ------Thread[RxComputationThreadPool-1] ---- Time[01:215]
Subscriber[observateur[0],process0] : onNext (63.599999999999994) ------Thread[RxComputationThreadPool-1] ---- Time[01:326]
Subscriber[observateur[0],process3] : onNext (70.8) ------Thread[RxComputationThreadPool-4] ---- Time[01:326]
Subscriber[observateur[0],process1] : onNext (13.2) ------Thread[RxComputationThreadPool-2] ---- Time[01:326]
Observable (process3) onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[01:326]
Observable (process0) onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[01:326]
Observable (process1) onCompleted ------Thread[RxComputationThreadPool-2] ---- Time[01:327]
Subscriber[observateur[0],process0].onCompleted ------Thread[RxComputationThreadPool-1] ---- Time[01:327]
Subscriber[observateur[0],process3].onCompleted ------Thread[RxComputationThreadPool-4] ---- Time[01:327]
Subscriber[observateur[0],process1].onCompleted ------Thread[RxComputationThreadPool-2] ---- Time[01:327]
Observable (process8) call start ------Thread[RxComputationThreadPool-1] ---- Time[01:329]
Observable (process9) call start ------Thread[RxComputationThreadPool-2] ---- Time[01:329]
...
main : fin observation ------Thread[main] ---- Time[01:610]
  • Zeilen 2–10: Die ersten 8 Prozesse starten auf 8 verschiedenen Threads (der verwendete Rechner hat 8 Kerne). Beachten Sie, dass sie alle ungefähr zur gleichen Zeit starten;
  • Zeilen 17–19: 3 Prozesse werden beendet, wodurch 3 Threads freigegeben werden;
  • Zeilen 23–24: Die letzten beiden Prozesse können dann starten, indem sie 2 der so freigewordenen Threads belegen;

Wir können daher schlussfolgern, dass der Scheduler [Schedulers.computation] einen Pool von n Threads bereitstellt, wobei n die Anzahl der Kerne auf dem Rechner ist. Die Threads werden parallel auf diesen Kernen ausgeführt.

7.7.2. Beispiel 24: Der Scheduler [Schedulers.io]

Wir führen den vorherigen Code mit dem Scheduler [Schedulers.io] aus:


package dvp.rxjava.observables.exemples;
 
import java.util.Random;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
 
public class Exemple24 {
    public static void main(String[] args) throws InterruptedException {
        // processes
        @SuppressWarnings("unchecked")
        Process<Double> processes[] = new Process[10];
        for (int i = 0; i < processes.length; i++) {
            processes[i] = new Process<>(
                    new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
                    Schedulers.io(), null);
        }
        // subscriptions
        ProcessUtils.subscribe(1, processes);
    }
}
  • Zeile 18: Prozesse werden mit den Threads des Schedulers ausgeführt [Schedulers.io];

Dies führt zu folgenden Ergebnissen:

main : début observation ------Thread[main] ---- Time[03:451]
Observable (process0) call start ------Thread[RxCachedThreadScheduler-1] ---- Time[03:459]
Observable (process1) call start ------Thread[RxCachedThreadScheduler-2] ---- Time[03:459]
Observable (process2) call start ------Thread[RxCachedThreadScheduler-3] ---- Time[03:460]
Observable (process3) call start ------Thread[RxCachedThreadScheduler-4] ---- Time[03:460]
Observable (process4) call start ------Thread[RxCachedThreadScheduler-5] ---- Time[03:464]
Observable (process5) call start ------Thread[RxCachedThreadScheduler-6] ---- Time[03:464]
Observable (process6) call start ------Thread[RxCachedThreadScheduler-7] ---- Time[03:465]
Observable (process8) call start ------Thread[RxCachedThreadScheduler-9] ---- Time[03:465]
Observable (process9) call start ------Thread[RxCachedThreadScheduler-10] ---- Time[03:465]
main : attente fin observation ------Thread[main] ---- Time[03:465]
Observable (process7) call start ------Thread[RxCachedThreadScheduler-8] ---- Time[03:465]
Observable (process7,0) onNext (54.0) ------Thread[RxCachedThreadScheduler-8] ---- Time[03:473]
Observable (process8,0) onNext (116.39999999999999) ------Thread[RxCachedThreadScheduler-9] ---- Time[03:500]
Observable (process6,0) onNext (105.6) ------Thread[RxCachedThreadScheduler-7] ---- Time[03:506]
Observable (process0,0) onNext (96.0) ------Thread[RxCachedThreadScheduler-1] ---- Time[03:509]
Observable (process5,0) onNext (25.2) ------Thread[RxCachedThreadScheduler-6] ---- Time[03:583]
Observable (process3,0) onNext (97.2) ------Thread[RxCachedThreadScheduler-4] ---- Time[03:684]
Subscriber[observateur[0],process7] : onNext (54.0) ------Thread[RxCachedThreadScheduler-8] ---- Time[03:685]
Subscriber[observateur[0],process6] : onNext (105.6) ------Thread[RxCachedThreadScheduler-7] ---- Time[03:685]
Subscriber[observateur[0],process0] : onNext (96.0) ------Thread[RxCachedThreadScheduler-1] ---- Time[03:685]
Subscriber[observateur[0],process8] : onNext (116.39999999999999) ------Thread[RxCachedThreadScheduler-9] ---- Time[03:685]
Observable (process0) onCompleted ------Thread[RxCachedThreadScheduler-1] ---- Time[03:686]
Observable (process6) onCompleted ------Thread[RxCachedThreadScheduler-7] ---- Time[03:686]
Observable (process7) onCompleted ------Thread[RxCachedThreadScheduler-8] ---- Time[03:685]
...
main : fin observation ------Thread[main] ---- Time[03:933]
  • Zeilen 2–10: Die 10 Prozesse starten jeweils auf einem anderen Thread. Im Gegensatz zum vorherigen Fall konnten alle Prozesse gestartet werden. Beachten Sie, dass diese Startvorgänge 6 ms dauern, während es zuvor 1 ms waren;
  • Zeilen 13–18: Die Observables werden nacheinander ausgegeben und nicht annähernd parallel, wie es zuvor der Fall war;

Was ist der Unterschied zwischen den Schedulern [Schedulers.io] und [Schedulers.computation]? Eine Antwort finden Sie unter der URL [http://stackoverflow.com/questions/31276164/rxjava-schedulers-use-cases]:

 

7.7.3. Beispiel 25: Der Scheduler [Schedulers.newThread]

Wir führen den vorherigen Code mit dem Scheduler [Schedulers.newThread] aus:


package dvp.rxjava.observables.exemples;
 
import java.util.Random;
 
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
 
public class Exemple25 {
    public static void main(String[] args) throws InterruptedException {
        // processes
        @SuppressWarnings("unchecked")
        Process<Double> processes[] = new Process[10];
        for (int i = 0; i < processes.length; i++) {
            processes[i] = new Process<>(
                    new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
                    Schedulers.newThread(), null);
        }
        // subscriptions
        ProcessUtils.subscribe(1, processes);
    }
}

Die erhaltenen Ergebnisse sind dieselben wie beim [Schedulers.io]-Scheduler:

main : début observation ------Thread[main] ---- Time[17:058]
Observable (process0) call start ------Thread[RxNewThreadScheduler-1] ---- Time[17:065]
Observable (process1) call start ------Thread[RxNewThreadScheduler-2] ---- Time[17:065]
Observable (process2) call start ------Thread[RxNewThreadScheduler-3] ---- Time[17:066]
Observable (process3) call start ------Thread[RxNewThreadScheduler-4] ---- Time[17:066]
Observable (process4) call start ------Thread[RxNewThreadScheduler-5] ---- Time[17:068]
Observable (process5) call start ------Thread[RxNewThreadScheduler-6] ---- Time[17:069]
Observable (process6) call start ------Thread[RxNewThreadScheduler-7] ---- Time[17:069]
Observable (process8) call start ------Thread[RxNewThreadScheduler-9] ---- Time[17:069]
Observable (process7) call start ------Thread[RxNewThreadScheduler-8] ---- Time[17:069]
Observable (process9) call start ------Thread[RxNewThreadScheduler-10] ---- Time[17:069]
main : attente fin observation ------Thread[main] ---- Time[17:069]
Observable (process6,0) onNext (25.2) ------Thread[RxNewThreadScheduler-7] ---- Time[17:120]
Observable (process3,0) onNext (39.6) ------Thread[RxNewThreadScheduler-4] ---- Time[17:193]
Observable (process5,0) onNext (21.599999999999998) ------Thread[RxNewThreadScheduler-6] ---- Time[17:212]
Observable (process0,0) onNext (19.2) ------Thread[RxNewThreadScheduler-1] ---- Time[17:273]
Observable (process8,0) onNext (81.6) ------Thread[RxNewThreadScheduler-9] ---- Time[17:308]
Subscriber[observateur[0],process3] : onNext (39.6) ------Thread[RxNewThreadScheduler-4] ---- Time[17:331]
Subscriber[observateur[0],process0] : onNext (19.2) ------Thread[RxNewThreadScheduler-1] ---- Time[17:331]
Subscriber[observateur[0],process6] : onNext (25.2) ------Thread[RxNewThreadScheduler-7] ---- Time[17:331]
Subscriber[observateur[0],process8] : onNext (81.6) ------Thread[RxNewThreadScheduler-9] ---- Time[17:331]
Subscriber[observateur[0],process5] : onNext (21.599999999999998) ------Thread[RxNewThreadScheduler-6] ---- Time[17:331]
Observable (process8) onCompleted ------Thread[RxNewThreadScheduler-9] ---- Time[17:333]
Observable (process5) onCompleted ------Thread[RxNewThreadScheduler-6] ---- Time[17:333]
Observable (process6) onCompleted ------Thread[RxNewThreadScheduler-7] ---- Time[17:332]
Observable (process0) onCompleted ------Thread[RxNewThreadScheduler-1] ---- Time[17:332]
Observable (process3) onCompleted ------Thread[RxNewThreadScheduler-4] ---- Time[17:332]
...
main : fin observation ------Thread[main] ---- Time[17:571]

Unter der URL [http://stackoverflow.com/questions/33415881/retrofit-with-rxjava-schedulers-newthread-vs-schedulers-io] wird erklärt, dass der Scheduler [Schedulers.io] einen Thread-Pool bereitstellt, was der Scheduler [Schedulers.newThread] nicht tut. Ein Thread-Pool erstellt automatisch eine Reihe von Threads. Er weist diese den Prozessen zu, die sie benötigen. Wenn diese Prozesse beendet sind, werden ihre Threads nicht gelöscht, sondern kehren in den Pool zurück und können dann von einem anderen Prozess wiederverwendet werden. Dies ist effizienter als das ständige Erstellen und Löschen von Threads. Daher ist es vorzuziehen, den Scheduler [Schedulers.io] zu verwenden.

7.7.4. Beispiel 26: Die Scheduler [Schedulers.immediate, Schedulers.trampoline]

Kehren wir zur Erklärung dieser beiden Scheduler zurück:

 

Die Erklärung ist relativ leicht zu verstehen, aber wenn man versucht, sie zu veranschaulichen, merkt man, dass man sie noch nicht wirklich begriffen hat. Es war das Buch *Learning Reactive Programming With Java 8*, das mir half, ein Beispiel zu erstellen, das auf einem Beispiel aus diesem Buch basiert, aber vereinfacht ist. Hier ist es:


package dvp.rxjava.observables.exemples;
 
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.function.Consumer;
 
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Scheduler;
import rx.Scheduler.Worker;
import rx.functions.Action0;
import rx.schedulers.Schedulers;
 
public class Exemple26 {
    public static void main(String[] args) throws InterruptedException {
 
        // a scheduler
        Scheduler scheduler = Schedulers.immediate();
        // a worker of this scheme
        Worker worker = scheduler.createWorker();
        // an Action0 type to be executed on the worker
        Action0 action02 = new Action0() {
            @Override
            public void call() {
                // log action02
                ProcessUtils.showInfos.accept("action02");
            }
        };
 
        // an Action0 type to be executed on the worker
        Action0 action01 = new Action0() {
            @Override
            public void call() {
                // program a new action on the same worker
                worker.schedule(action02);
                // log action01
                ProcessUtils.showInfos.accept("action01");
            }
        };
        // action01 is programmed on the worker
        worker.schedule(action01);
    }
 
    // displays
    static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
            Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
 
}
  • Zeile 17: ein Scheduler. Dies ist entweder [Schedulers.immediate], wie hier gezeigt, oder später [Schedulers.trampoline];
  • Zeile 19: Aktionen vom Typ Action0 (Zeilen 21, 20) können auf den Workern des Schedulers ausgeführt werden. Die Methode [Scheduler.createWorker] erstellt einen Worker. Die Methode [Worker.schedule(Action0)] führt eine Aktion vom Typ Action0 über einen Worker aus;
  • Zeilen 21–27: Eine erste Aktion namens [action02], die (Zeile 40) vom Worker aus Zeile 19 ausgeführt wird;
  • Zeilen 30–38: eine zweite Aktion namens [action01]. Sie hat die Besonderheit, dass sie bewirkt, dass action02 auf demselben Worker wie sie selbst ausgeführt wird (Zeile 34). Hier liegt der Unterschied zwischen [Schedulers.immediate] und [Schedulers.trampoline]:
    • Wenn der Scheduler [Schedulers.immediate] ist, wird in Zeile 34 die Aktion action02 sofort ausgeführt (daher der Name des Schedulers) und die aktuell laufende Aktion action01 wird unterbrochen. Wir sehen dann die Meldung aus Zeile 25 erscheinen. Sobald action02 beendet ist, wird action01 fortgesetzt und wir sehen die Meldung aus Zeile 36;
    • Wenn der Scheduler [Schedulers.trampoline] ist, wird in Zeile 34 die Aktion action02 in die Warteschlange gestellt. Sie wird erst ausgeführt, wenn die aktuelle Aufgabe, action01, abgeschlossen ist. Dann erscheint die Meldung in Zeile 36. Sobald action01 abgeschlossen ist, wird action02 ausgeführt, und die Meldung in Zeile 25 erscheint;

Die Ausführung des obigen Codes liefert folgende Ergebnisse:

action02 ------Thread[main] ---- Time[38:480]
action01 ------Thread[main] ---- Time[38:485]

Wenn wir in Zeile 17 den Scheduler [Schedulers.trampoline] verwenden, erhalten wir die gegenteiligen Ergebnisse:

action01 ------Thread[main] ---- Time[42:972]
action02 ------Thread[main] ---- Time[42:976]

Allerdings ist es schwierig, eine Verbindung zu Observables herzustellen. Ich habe kein überzeugendes Beispiel gefunden, das den Vorteil der Ausführung eines Observables auf einem dieser beiden Threads verdeutlichen könnte. Hier ist jedoch eines, das ich überhaupt nicht natürlich finde:


package dvp.rxjava.observables.exemples;
 
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.Scheduler.Worker;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
 
public class Exemple27 {
    public static void main(String[] args) throws InterruptedException {
 
        // Worker
        Worker worker = Schedulers.immediate().createWorker();
        // Worker worker = Schedulers.trampoline().createWorker();
        // observable 1 sur worker
        worker.schedule(() -> Observable.range(1, 2).subscribe(new Action1<Integer>() {
 
            @Override
            public void call(Integer i) {
                ProcessUtils.showInfos.accept(String.valueOf(i));
                // observable 2 on same worker
                worker.schedule(() -> Observable.range(100, 2).subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer i) {
                        ProcessUtils.showInfos.accept(String.valueOf(i));
                    }
                }));
            }
        }));
    }
}
  • Zeilen 13–14: Ein Worker wird unter Verwendung eines der beiden Scheduler [Schedulers.immediate] und [Schedulers.trampoline] erstellt;
  • Zeile 16: Eine erste Beobachtungsgröße obs1 wird auf diesem Worker eingeplant, um die Zahlen [1,2] auszugeben
  • Zeile 22: Jedes Mal, wenn ein Element dieser Beobachtungsgröße obs1 beobachtet wird, wird die Beobachtung einer zweiten Beobachtungsgröße obs2 auf demselben Worker gestartet, um die Zahlen [100,101] auszugeben;

Mit dem Scheduler [Schedulers.immediate] erhalten wir folgende Ergebnisse:

1
2
3
4
5
6
1 ------Thread[main] ---- Time[44:604]
100 ------Thread[main] ---- Time[44:610]
101 ------Thread[main] ---- Time[44:610]
2 ------Thread[main] ---- Time[44:612]
100 ------Thread[main] ---- Time[44:612]
101 ------Thread[main] ---- Time[44:612]

Mit dem Scheduler [Schedulers.trampoline] erhalten wir hingegen folgende Ergebnisse:

1
2
3
4
5
6
1 ------Thread[main] ---- Time[14:107]
2 ------Thread[main] ---- Time[14:114]
100 ------Thread[main] ---- Time[14:115]
101 ------Thread[main] ---- Time[14:115]
100 ------Thread[main] ---- Time[14:115]
101 ------Thread[main] ---- Time[14:116]

7.8. Fazit

Es gibt noch viel zu tun. Um ein tieferes Verständnis der RxJava-Bibliothek zu erlangen, sollten Leser ihr Lernen anhand der am Anfang dieses Dokuments angegebenen Referenzen fortsetzen. Dennoch verfügen wir nun über die Grundlagen, die für die Verwendung von RxJava in Swing- und Android-Umgebungen erforderlich sind. Genau das werden wir als Nächstes demonstrieren.