7. La bibliothèque RxJava
La bibliothèque RxJava s'appuie sur le concept suivant : un flux d'éléments de type T Observable<T> est observé par un ou plusieurs souscripteurs (abonnés, observateurs, consommateurs) Subscriber<T>. La bibliothèque RxJava permet que le flux Observable<T> s'exécute dans un thread T1 et son observateur Subscriber<T> dans un thread T2 sans que le développeur n'ait à se soucier de gérer le cycle de vie de ces threads et de problèmes naturellement difficiles, tels que le partage de données entre threads et la synchronisation de ceux-ci pour exécuter une tâche globale. Elle facilite donc la programmation asynchrone.
Un flux Observable<T> produit des éléments de type T, observables au fur et à mesure qu'ils sont produits. Si l'observateur et l'observable (désigne le type Observable<T> par abus de langage) se trouvent dans le même thread, alors l'observable ne peut produire l'élément (i+1) que lorsque l'observateur a consommé l'élément i. Il y a peu de cas où cette architecture présente un intérêt. Si l'observateur et l'observable ne se trouvent pas dans le même thread, alors l'observable et son observateur ont des comportements autonomes : l'observable produit à son rythme et l'observateur consomme à son rythme. C'est là que réside l'intérêt de la bibliothèque. Nous avons toujours parlé jusqu'ici d'un observateur. En réalité, un observable peut avoir un nombre quelconque d'observateurs.
La bibliothèque RxJava est particulièrement bien adaptée à l'architecture vue dans le paragraphe 2 de découverte et qu'on rappelle ici :

- en [1], une couche de service délivre des services dont certains sont long à obtenir (requêtes réseau par exemple) ;
- cette couche de services est invoquée par une interface graphique [1] (Swing, Android, JavaFx). Si la couche de services est exécutée dans le même thread que la méthode [swing] qui l'utilise, l'interface graphique est figée (pas réactive) pendant l'attente du résultat du service ;
- en [2], une mince couche d'adaptation implémentée avec RxJava permet de présenter à la couche graphique une implémentation asynchrone du même service : celui-ci peut être exécuté dans un thread différent de celui de la méthode de la couche graphique qui l'invoque. Dans ce cas, l'interface graphique [3] reste réactive : l'utilisateur peut continuer à interagir avec elle, par exemple déclencher une nouvelle requête réseau en parallèle de la première et surtout, on peut lui offrir la possibilité d'annuler des traitements trop longs, chose impossible si l'interface graphique est figée ;
- l'appel [4] est synchrone alors que l'appel [5-6] est lui asynchrone ;
Dans cette architecture, la couche [2] offre des services rendant des types Observable<T> auxquels les méthodes de la couche graphique [3] peuvent s'abonner. Un service de la couche [2] délivre alors ses résultats un par un et la couche [3] peut réagir à chacun d'eux, en mettant par exemple à jour un ou plusieurs composants de l'interface graphique.
La classe Observable<T> possède plusieurs dizaines de méthodes. C'est l'une des difficultés de la bibliothèque : elle est très riche et il est difficile d'appréhender toutes ses possibilités. Nous allons en présenter certaines. La maîtrise des autres méthodes viendra ensuite avec le temps.
7.1. Créer des observables et s'y abonner
7.1.1. Exemple-01 : la méthode [Observable.from]
![]() |
Considérons le code suivant :
package dvp.rxjava.observables;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import java.util.Arrays;
public class Exemple01 {
public static void main(String[] args) {
// observables d'entiers
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
obs1.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.printf("next : %s%n", integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println(throwable);
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed");
}
});
}
}
- ligne 12 : on crée un type Observable<Integer> à partir d'une liste d'entiers.
La classe Observable<T> est un flux d'éléments de type T qu'on peut observer, de préférence de façon asynchrone mais pas forcément, au fur et à mesure qu'ils sont produits. Sa définition est la suivante :
![]() |
Comme il a déjà été dit, la classe Observable<T> a plusieurs dizaines de méthodes. Certaines sont similaires à celles de la classe Stream<T> étudiée au paragraphe 5. La documentation de RxJava inclut des 'marble diagrams' [2] qui illustrent le fonctionnement de ces méthodes :
- la ligne 3 illustre les émissions de l'observable au cours du temps ;
- la méthode [4] est appliquée aux éléments émis par l'observable. Elle produit en général un nouvel observable ;
- la ligne 5 montre le nouvel observable obtenu ;
La méthode [Observable.from] a la signature suivante :
![]() |
La méthode statique [Observable.from] permet de créer un Observable<T> à partir d'une collection d'éléments de type T. C'est une façon très simple de démarrer avec les observables. La ligne :
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
va donc émettre trois éléments. Elle ne les émet pas immédiatement. Elle va les émettre entièrement à chaque fois qu'un observateur se déclarera. C'est ce qu'on appelle un observable froid. L'observable réémet ses éléments pour chaque nouvel abonné.
On peut considérer l'instruction précédente comme une action de configuration de l'observable. Celui-ci est configuré une fois et exécuté n fois si n abonnés se présentent.
Comment s'abonne-t-on ?
Une façon de le faire est d'utiliser la méthode [Observable.subscribe] dont la définition utilisée ici est la suivante :
![]() |
- le 1er paramètre [Action1<T> onNext] (cf paragraphe 6.2) de la méthode est la méthode à exécuter lorsque l'observable émet un nouvel élément T ;
- le deuxième paramètre [Action1<Throwable> onError] de la méthode est la méthode à exécuter lorsque l'observable émet une exception ;
- le troisième paramètre [Action0 onComplete] (cf paragraphe 6.1) de la méthode est la méthode à exécuter lorsque l'observable émet une exception ;
- la méthode rend un type [Subscription] ;
Le type [Subscription] représente un abonnement à l'observable. Sa définition est la suivante :
![]() |
L'intérêt de cette interface [1] réside dans sa méthode [2] qui permet d'annuler un abonnement.
Dans notre exemple, le code de l'abonnement à l'observable est le suivant :
obs1.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.printf("next : %s%n", integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println(throwable);
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed");
}
});
- ligne 1 : le résultat de type [Subscription] est ignoré ;
- lignes 1-15 : les trois paramètres sont des instances de classes anonymes. Nous utiliserons également des lambdas. L'intérêt des classes anonymes est qu'on voit clairement les types de données attendus par l'unique méthode de ces classes ;
- lignes 2-5 : implémentation du 1er paramètre de type [Action1<Integer>] ;
- lignes 6-10 : implémentation du second paramètre de type [Action1<Throwable>] ;
- lignes 11-15 : implémentation du troisième paramètre de type [Action0] ;
Le code tout entier est le suivant :
package dvp.rxjava.observables;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import java.util.Arrays;
public class Exemple01 {
public static void main(String[] args) {
// observables d'entiers
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
// abonnement
obs1.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.printf("next : %s%n", integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println(throwable);
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed");
}
});
}
}
L'observable de la ligne 12 commence à émettre ses 3 éléments dès que la méthode [subscribe] est appelée ligne 14. A partir de ce moment :
- à chaque élément émis, les lignes 15-18 s'exécutent.
- à la fin des 3 éléments, les lignes 24-29 s'exécutent ;
- les lignes 19-24 ne seront jamais exécutées car l'observable n'émet pas ici d'exception ;
Par défaut, l'observable et l'observateur s'exécutent dans le même thread. Il existe quelques observables prédéfinis qui s'exécutent dans un thread différent du thread principal (ici le thread de la méthode main), mais pour la plupart d'entre-eux ce n'est pas le cas. Ici donc tout se passe dans le thread de la méthode [main] :
- l'observable émet l'élément 1 ;
- les lignes 15-18 s'exécutent et affichent cet élément ;
- l'observable émet l'élément 2 ;
- les lignes 15-18 s'exécutent et affichent cet élément ;
- l'observable émet l'élément 3 ;
- les lignes 15-18 s'exécutent et affichent cet élément ;
- l'observable émet la notification [completed] ;
- les lignes 24-29 s'exécutent ;
C'est ce que montrent les résultats obtenus :
La classe [Exemple02] reprend [Exemple01] en utilisant cette fois des fonctions lambdas pour paramètres de la méthode [Observable.subscribe] :
package dvp.rxjava.observables;
import java.util.Arrays;
import rx.Observable;
public class Exemple02 {
public static void main(String[] args) {
// observables d'entiers
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
// abonnement
obs1.subscribe(
(integer) -> System.out.printf("next : %s%n", integer),
(th) -> System.out.println(th),
() -> System.out.println("completed"));
}
}
7.1.2. Exemple-03 : la classe Observer
![]() |
La méthode [Observable.subscribe] qui permet de s'abonner à un observable a diverses versions dont la suivante :
package dvp.rxjava.observables;
import java.util.Arrays;
import rx.Observable;
import rx.Observer;
public class Exemple03 {
public static void main(String[] args) {
// observables d'entiers
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
// abonnement
obs1.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("completed");
}
@Override
public void onError(Throwable th) {
System.out.printf("throwable %s", th);
}
@Override
public void onNext(Integer integer) {
System.out.printf("next : %s%n", integer);
}
});
};
}
Ligne 13, au lieu de passer trois paramètres à la méthode [subscribe], on lui passe un type [Observer] suivant :
![]() |
Le type [Observer] est une interface avec trois méthodes :
- [onNext(T t)] qui est appelée chaque fois que l'observable émet un élément t ;
- [onError(Throwable th)] qui est appelée lorsque l'observable émet une exception th ;
- [onCompleted] qui est appelée lorsque l'observable indique qu'il a fini d'émettre ;
Le fonctionnement du code est analogue à celui expliqué précédemment. On obtient les résultats suivants :
7.1.3. Exemple-04 : la méthode [Observable.create]
![]() |
La méthode statique Observable.create est définie de la façon suivante :
![]() |
- la méthode [create] rend un type Observable<T> ;
- le paramètre de la méthode [create] est une fonction de type [Observable.OnSubscribe<T>] définie de la façon suivante :
![]() |
Le type [Observable.OnSubscribe<T>] est une interface fonctionnelle qui elle-même étend l'interface fonctionnelle [Action1<Subscriber<? super T>>]. La méthode [call] de cette interface attend un type [Subscriber] (abonné, souscripteur, observateur) défini de la façon suivante :
![]() |
On voit en [1], que la classe [Subscriber<T>] implémente l'interface [Observer<T>] présentée au paragraphe 7.1.2.
Au final, la méthode [<T> Observable.create] :
- attend comme paramètre une instance de type [Observable.OnSubscribe<T>] ayant l'unique méthode de signature : void call(Subscriber<T> s). Le type [Subscriber<T>] étend le type [Observer<T>] et dispose donc des méthodes onNext, onError, onCompleted ;
- rend un type Observable<T> ;
La méthode [<T> Observable.create] rend un observable configuré. Il n'y a eu encore aucune émission d'éléments. Lorsqu'un abonné [Subscriber<T> s] s'abonne à cet observable la méthode [void call(s)] de la fonction passée en paramètre de la méthode [<T> Observable.create] est alors appelée. Elle a pour rôle d'émettre des éléments t de type T et d'appeler la méthode [s.onNext(t)] de l'observateur à chaque émission. Lorsque celle-ci est terminée, la méthode [s.onCompleted(t)] de l'observateur doit être appelée et la méthode [call] se terminer. Si la méthode [call] rencontre une exception th, la méthode [s.onError(th)] de l'observateur doit être appelée et la méthode [call] se terminer ;
Pour illustrer ce fonctionnement complexe, nous utiliserons le code suivant [Exemple04] :
package dvp.rxjava.observables;
import rx.Observable;
import rx.Subscriber;
import java.util.Random;
public class Exemple04 {
public static void main(String[] args) {
// configuration observable de réels
Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
@Override
public void call(Subscriber<? super Double> subscriber) {
for (int i = 0; i < 3; i++) {
// émission élément i
subscriber.onNext(new Random((i + 1)).nextDouble());
}
// fin émission
subscriber.onCompleted();
}
});
// abonnement et donc émission
obs1.subscribe((d) -> System.out.printf("onNext %s%n", d), (th) -> System.out.printf("onError %s%n", th),
() -> System.out.println("onCompleted"));
}
}
- ligne 11 : on crée un observable émettant des types Double ;
- lignes 11-21 : le paramètre de la méthode [create] est instancié avec une classe anonyme ayant l'unique méthode [call] des lignes 12-20. L'observable créé en ligne 11 est prêt à émettre mais il n'émettra que lorsqu'un observateur arrivera ;
- lignes 13-21 : la méthode [call] reçoit la référence d'un observateur ;
- lignes 14-17 : émission de 3 éléments vers l'observateur ;
- lignes 19 : notification de fin d'émission à l'observateur ;
- lignes 23-24 : abonnement à l'observable de la ligne 11. On implémente les trois paramètres [onNext, onError, onCompleted] de la méthode [subscribe] par trois lambdas. Cet abonnement va créer l'abonné [Subscriber<Double>] qui va être passé à la méthode [call] de la ligne 13. L'émission d'éléments va alors commencer ;
- tout se passe dans le même thread : observable et observateur ;
On obtient les résultats suivants :
La méthode [Observable.create] permet de créer un observable à partir de n'importe quel phénomène. C'est cette méthode que nous avons utilisée dans le paragraphe 2 de découverte, pour transformer une interface synchrone en interface asynchrone.
7.1.4. Exemple-05 : refactoring de [Exemple-04]
![]() |
L'exemple suivant présente une nouvelle version de la méthode statique [Observable.subscribe] :
package dvp.rxjava.observables;
import rx.Observable;
import rx.Subscriber;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
public class Exemple05 {
public static void main(String[] args) {
// configuration d'un observables de réels
Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
@Override
public void call(Subscriber<? super Double> subscriber) {
showInfos("Observable.call start");
for (int i = 0; i < 3; i++) {
// attente
try {
Thread.sleep(500 - i * 100);
} catch (InterruptedException e) {
// erreur
subscriber.onError(e);
}
// action
double value = new Random().nextInt(100) * 1.2;
showInfos(String.format("Observable.call onNext(%s)", value));
subscriber.onNext(value);
}
// fini
showInfos(String.format("Observable.call onCompleted"));
subscriber.onCompleted();
}
});
// un souscripteur
Subscriber<Double> subscriber = new Subscriber<Double>() {
@Override
public void onCompleted() {
showInfos("Subscriber.onCompleted");
}
@Override
public void onError(Throwable e) {
showInfos(String.format("Subscriber.onError (%s)", e));
}
@Override
public void onNext(Double aDouble) {
showInfos(String.format("Subscriber.onNext (%s)", aDouble));
}
};
// souscription
showInfos("avant souscription");
obs1.subscribe(subscriber);
showInfos("après souscription");
}
private static void showInfos(String message) {
System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
new SimpleDateFormat("ss:SSS").format(new Date()));
}
}
- ligne 56 : la nouvelle version de la méthode statique [Observable.subscribe] admet pour paramètre le type [Subscriber] que nous avons présenté au paragraphe précédent ;
- lignes 37-52 : le souscripteur (abonné, observateur). Il implémente l'interface Observer avec ses trois méthodes onNext, onError, onCompleted ;
- lignes 61-64 : à partir de maintenant nous allons nous intéresser aux threads dans lesquels s'exécutent l'observable et son observateur ;
- ligne 62 : le nom du thread ;
- ligne 63 : l'heure courante exprimée en secondes et millisecondes. Cela va nous permettre de voir dans le temps l'émission d'éléments par l'observable et son traitement par l'observateur ;
- ce code a la même fonctionnalité que le code précédent. On a simplement refactorisé ce dernier ;
Les résultats obtenus sont les suivants :
- ligne 1 des résultats : avant la ligne 56 du code, rien ne s'est encore passé. L'observable a simplement été configuré ;
- ligne 2 des résultats : la ligne 56 du code provoque l'appel de la méthode [call] de la ligne 15. Ligne 3, le réel 80.39 est émis à destination de l'observateur ;
- ligne 4 : l'observateur reçoit le nombre émis ;
- lignes 5-8 : le processus précédent se répète 2 fois ;
- ligne 9 : l'observable envoie la notification de fin d'émission ;
- ligne 10 : l'observateur la reçoit ;
- ligne 11 : affichée par la ligne 57 du code ;
On voit donc que la seule ligne 56 de souscription a provoqué l'affichage des lignes 2-10 des résultats. Lorsqu'on débute avec la bibliothèque RxJava on se demande comment les choses s'enchaînent les unes aux autres et notamment les liens qui lient l'observateur et l'observable. On voit ici que la ligne 56, la souscription à l'observable,
- a provoqué l'émission de tous les éléments de l'observable ;
- que l'observable et l'observateur s'exécutent dans le même thread ;
- qu'à cause de cela, on observe la séquence : émission élément i, observation élément i, émission élément (i+1), observation élément (i+1), ...
On se rappelle que l'émetteur attendait avant d'émettre ses éléments :
// attente
try {
Thread.sleep(500 - i * 100);
} catch (InterruptedException e) {
// erreur
subscriber.onError(e);
}
où i ligne 3 représente le n° de l'émission (0<=i<3). Si on observe les heures d'émission des éléments de l'observable :
- lignes 2, 3 : l'élément 0 a été émis environ 500 ms après le début de l'abonnement ;
- lignes 3, 5 : l'élément 1 a été émis environ 400 ms après l'élément 0 ;
- lignes 5, 7 : l'élément 2 a été émis environ 300 ms après l'élément 1 ;
7.2. Thread d'exécution, thread d'observation
7.2.1. Exemple-06 : observable et observateur dans un thread autre que [main]
![]() |
Nous refactorisons l'exemple précédent de la façon suivante [Exemple06] :
package dvp.rxjava.observables;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class Exemple06 {
public static void main(String[] args) {
// garde-barrière
CountDownLatch latch = new CountDownLatch(1);
// configuration d'un observables de réels
Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
@Override
public void call(Subscriber<? super Double> subscriber) {
showInfos("Observable.call start");
for (int i = 0; i < 3; i++) {
// attente
try {
Thread.sleep(500 - i * 100);
} catch (InterruptedException e) {
// erreur
subscriber.onError(e);
}
// action
double value = new Random().nextInt(100) * 1.2;
showInfos(String.format("Observable.call onNext(%s)", value));
subscriber.onNext(value);
}
// fini
showInfos(String.format("Observable.call onCompleted"));
subscriber.onCompleted();
}
});
// un souscripteur
Subscriber<Double> subscriber = new Subscriber<Double>() {
@Override
public void onCompleted() {
showInfos("Subscriber.onCompleted");
// on baisse la barrière
latch.countDown();
}
@Override
public void onError(Throwable e) {
showInfos(String.format("Subscriber.onError (%s)", e));
}
@Override
public void onNext(Double aDouble) {
showInfos(String.format("Subscriber.onNext (%s)", aDouble));
}
};
// suite configuration observable
obs1 = obs1.subscribeOn(Schedulers.computation());
// souscription
showInfos("avant souscription");
obs1.subscribe(subscriber);
// attente devant la barrière
try {
showInfos("début attente barrière");
latch.await();
showInfos("fin attente barrière");
} catch (InterruptedException e1) {
System.out.println(e1);
}
showInfos("après souscription");
}
private static void showInfos(String message) {
System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
new SimpleDateFormat("ss:SSS").format(new Date()));
}
}
- ligne 16 : on crée un garde-barrière (sémaphore) avec un objet de type [CountDownLatch]. Cet objet sert à synchroniser des threads entre eux. Il est ici initialisé avec la valeur 1 qu'on appellera la valeur du garde-barrière (ou du sémaphore). Un thread se met en attente de la barrière par une opération :
latch.await();
Le thread est bloqué si la valeur du garde-barrière est >0. Un thread peut augmenter / diminuer la valeur interne du garde-barrière. Ligne 48, la valeur du garde-barrière est décrémentée de 1.
- ligne 63 : l'observable est configuré de façon à être exécuté sur un thread fourni par le schéduler [Schedulers.computation()]. Ce schéduler peut fournir autant de threads qu'il y a de coeurs sur la machine d'exécution. Le paragraphe de l'application exemple a montré l'utilisation d'autres schédulers (cf paragraphe 2.8) ;
Le principe du code est le suivant :
- la méthode [main] s'exécute dans le thread principal (main) ;
- ligne 66 : lance l'émission d'éléments de l'observable. Ceux-ci seront émis sur un thread différent du thread principal ;
- ligne 70 : le thread principal est bloqué car le garde-barrière a la valeur 1 (cf ligne 16). Il ne pourra continuer que lorsque cette valeur passera à 0. Cela se passe ligne 48. C'est l'observateur qui abaisse la barrière lorsqu'il reçoit la notification que l'observable a terminé ses émissions ;
L'exécution donne les résultats suivants :
- ligne 1 : la souscription va avoir lieu ;
- ligne 2 : celle-ci déclenche l'exécution de la méthode [call] sur le thread [RxComputationThreadPool-1]. On a maintenant une exécution parallèle avec deux threads ;
- ligne 3 : pour une raison non élucidée, le thread [RxComputationThreadPool-1] a passé la main. Le thread [main] la prend alors et est bloqué par le garde-barrière (ligne 70 du code). A partir de ce moment, seul le thread [RxComputationThreadPool-1] peut opérer ;
- lignes 4-11 : on observe le comportement observé précédemment entre l'observable et son observateur mais tout se passe maintenant le thread [RxComputationThreadPool-1] ;
- lignes 12-13 : l'observateur a baissé la barrière (ligne 48 du code) et le thread [RxComputationThreadPool-1] s'est terminé. Le thread [main] prend la main et affiche deux messages ;
7.2.2. Exemple-07 : observable et observateur dans deux threads différents
![]() |
Nous modifions l'exemple précédent de la façon suivante :
package dvp.rxjava.observables;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class Exemple07 {
public static void main(String[] args) {
// garde-barrière
CountDownLatch latch = new CountDownLatch(1);
// configuration d'un observables de réels
Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
@Override
public void call(Subscriber<? super Double> subscriber) {
showInfos("Observable.call start");
for (int i = 0; i < 3; i++) {
// attente
try {
Thread.sleep(500 - i * 100);
} catch (InterruptedException e) {
// erreur
subscriber.onError(e);
}
// action
double value = new Random().nextInt(100) * 1.2;
showInfos(String.format("Observable.call onNext(%s)", value));
subscriber.onNext(value);
}
// fini
showInfos(String.format("Observable.call onCompleted"));
subscriber.onCompleted();
}
});
// un souscripteur
Subscriber<Double> subscriber = new Subscriber<Double>() {
@Override
public void onCompleted() {
showInfos("Subscriber.onCompleted");
// on baisse la barrière
latch.countDown();
}
@Override
public void onError(Throwable e) {
showInfos(String.format("Subscriber.onError (%s)", e));
}
@Override
public void onNext(Double aDouble) {
showInfos(String.format("Subscriber.onNext (%s)", aDouble));
}
};
// suite configuration observable
obs1 = obs1.subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation());
// souscription
showInfos("avant souscription");
obs1.subscribe(subscriber);
// attente de vant la barrière
try {
showInfos("début attente barrière");
latch.await();
showInfos("fin attente barrière");
} catch (InterruptedException e1) {
System.out.println(e1);
}
showInfos("après souscription");
}
private static void showInfos(String message) {
System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
new SimpleDateFormat("ss:SSS").format(new Date()));
}
}
Le code est identique à celui de l'exemple précédent sauf pour la ligne 63 :
obs1 = obs1.subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation());
qui configure l'observable (subscribeOn) et l'observateur (observeOn) pour être exécutés sur l'un des threads fournis par le schéduler [Schedulers.computation()].
Les résultats obtenus sont les suivants :
On peut remarquer les points suivants :
- l'observable s'exécute dans le thread [RxComputationThreadPool-4] (lignes 3-4, 6, 8-9) ;
- l'observateur s'exécute dans le thread [RxComputationThreadPool-3] (lignes 5, 7, 10-11) ;
- qu'ils s'exécutent de façon autonome. Ainsi lignes 8-9, l'observable émet 2 notifications (onNext, onCompleted) avant que l'observateur ne récupère la notification [onNext] (ligne 10) ;
La bibliothèque RxJava s'occupe du passage de données (les émissions) du thread de l'observable au thread de l'observateur. Le développeur n'a pas à s'en soucier.
Nous avons vu comment créer des observables (Observable.from, Observable.create). Nous voyons maintenant les observables prédéfinis de la bibliothèque RxJava.
7.3. Observables prédéfinis
7.3.1. Exemple-08 : la méthode [Observable.range]
![]() | ![]() |
A partir de maintenant, nous allons utiliser des classes dédiées pour les processus observés et leurs observateurs. L'idée est de pouvoir loguer leur nom, leur thread d'exécution et les heures d'exécution afin de pouvoir suivre celles-ci dans le temps.
La classe [Process] sera simplement un Observable que l'on peut nommer. Elle implémentera l'interface [IProcess] suivante :
package dvp.rxjava.observables.utils;
import rx.Observable;
public interface IProcess<T> {
// nom de l'observable
public String getName();
// observable
public Observable<T> getObservable();
}
Cette interface pourra être implémentée par la classe [Process<T>] suivante :
package dvp.rxjava.observables.utils;
import rx.Observable;
import rx.Scheduler;
public class Process<T> implements IProcess<T>{
// nom observable
protected String name;
// processus observé
protected Observable<T> observable;
// constructeurs
public Process(String name, Observable<T> observable) {
// initialisations locales
this.name = name;
this.observable = observable;
}
// getters et setters
public String getName() {
return name;
}
public Observable<T> getObservable() {
return observable;
}
}
- ligne 9 : le nom du process ;
- ligne 11 : l'observable observé ;
- lignes 14-18 : le constructeur ;
L'observateur sera lui décrit par la classe [Observateur] suivante :
package dvp.rxjava.observables.utils;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import rx.Subscriber;
public class Observateur<T> extends Subscriber<T> {
...
}
- ligne 11, la classe Observateur<T> étend la classe Subscriber<T> que nous avons présentée brièvement au paragraphe 7.1.3. Nous l'utiliserons comme argument de la méthode [Observable.subscribe] :
// exécution observable (observation)
obs1.subscribe(observateur);
La méthode [Observable.subscribe] utilisée ligne 2 ci-dessus a la definition suivante :
![]() |
Le rôle du [Subscriber] est principalement de gérer les éléments émis par l'observable auquel il s'est abonné au moyen des méthodes de l'interface [Observer] : onNext, onError, onCompleted. La classe [Subscriber] a les méthodes suivantes :
![]() |
Dans le code de la classe [Observateur], nous utiliserons la méthode [1] isUnsubscribed, pour savoir si la souscription du souscripteur a été annulée ou pas. La classe [Observateur<T>] complète est la suivante :
package dvp.rxjava.observables.utils;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import rx.Subscriber;
public class Observateur<T> extends Subscriber<T> {
// un garde-barrière (sémaphore)
private CountDownLatch latch;
// une méthode d'affichage
private Consumer<String> showInfos;
// le nom de l'observateur
private String observerName;
// le nom du processus observé
private String processName;
// constructeurs
public Observateur() {
}
public Observateur(String name, CountDownLatch latch, Consumer<String> showInfos, String observedName) {
this.observerName = name;
this.latch = latch;
this.showInfos = showInfos;
this.processName = observedName;
}
// --------------------------- implémentation interface Observer<T>
@Override
public void onCompleted() {
// fin des émissions
if (!isUnsubscribed()) {
showInfos.accept(String.format("Subscriber [%s,%s].onCompleted", observerName, processName));
}
// fin blocage thread principal
latch.countDown();
}
@Override
public void onError(Throwable e) {
// erreur d'émission
if (!isUnsubscribed()) {
showInfos.accept(String.format("Subscriber [%s, %s].onError (%s)", observerName, processName, e));
}
}
@Override
public void onNext(T value) {
// une émission supplémentaire
if (!isUnsubscribed()) {
try {
showInfos.accept(String.format("Subscriber [%s,%s] : onNext (%s)", observerName, processName,
new ObjectMapper().writeValueAsString(value)));
} catch (JsonProcessingException e) {
showInfos.accept(String.format("Subscriber [%s,%s].onNext (%s)", observerName, processName, e));
}
}
}
}
- en plus des caractéristiques d'un Subscriber, l'observateur Observateur embarquera avec lui les informations suivantes :
- ligne 14 : un garde-barrière ou sémaphore qui servira à bloquer le thread principal jusqu'à ce que l'observateur ait reçu tous les éléments émis par l'observable. Cela se fera ligne 36 du code lorsque l'observateur recevra de l'observable la notification de fin d'émission ;
- ligne 16 : une instance Consumer<String> qui servira à afficher un message sur la console ;
- ligne 18 : le nom de l'observateur pour les distinguer les uns des autres lorsqu'il y en a plusieurs ;
- ligne 20 : le nom du processus observé ;
- lignes 36, 46, 54 : les méthodes [onCompleted, onError, onNext] de l'interface [Observer<T>] implémentée par la classe abstraite [Subscriber<T>]. Cette classe ne les implémente pas. Il faut donc le faire dans ses classes filles. Avant de faire quelque chose dans ces méthodes, on regarde si l'observateur n'a pas été désabonné de l'observable qu'il observe ;
- ligne 59 : la méthode [onNext] de l'observateur écrit la chaîne jSON de l'élément reçu. Cela nous permettra d'afficher divers types d'éléments ;
Ceci posé, étudions une nouvelle méthode de la classe Observable, la méthode [range] :
![]() |
L'observable Observable.range(n,m) émet (m) entiers allant de n à n+m-1. Nous l'étudions avec le code [Exemple08] suivant :
package dvp.rxjava.observables.exemples;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import dvp.rxjava.observables.utils.Observateur;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple08 {
public static void main(String[] args) throws InterruptedException {
// nombre d'observateurs
final int nbObservateurs = 2;
// sémaphore
CountDownLatch latch = new CountDownLatch(nbObservateurs);
// configuration observable
Observable<Integer> obs1 = Observable.range(15, 3).subscribeOn(Schedulers.computation());
// exécution observable (observation)
showInfos.accept("main : début observation");
for (int i = 0; i < nbObservateurs; i++) {
obs1.subscribe(new Observateur<>(String.format("observateur[%d]", i), latch, showInfos,"obs1"));
}
// attente
showInfos.accept("main : attente fin observation");
latch.await();
// fin
showInfos.accept("main : fin observation");
}
// affichages
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
- ligne 16 : on va utiliser deux observateurs ;
- ligne 19 : le garde-barrière (sémaphore) est initialisé à deux parce qu'on va mettre chaque observateur sur un thread différent. Le thread principal devra donc attendre la fin de deux threads d'observation ;
- ligne 22 : on configure l'observable de telle façon à ce qu'il s'exécute sur un thread du schéduler [Schedulers.computation()]. L'observateur sera sur le même thread que l'observable ;
- lignes 25-27 : on abonne deux observateurs à l'observable. Ceci va déclencher l'exécution complète de celui-ci pour chacun des observateurs : les entiers 15, 16 et 17 vont être émis ;
- ligne 30 : le thread principal attend la fin des observateurs ;
Les résultats obtenus sont les suivants :
- ligne 2 : le thread principal est bloqué en attente de la fin des 2 observateurs ;
- lignes 3-4 : on voit que l'observateur 0 est sur le thread [RxComputationThreadPool-1] et l'observateur 1 sur le thread [RxComputationThreadPool-2] ;
- lignes 3-10 : on voit que les deux observateurs reçoivent exactement les mêmes éléments ;
Nous allons utiliser la classe Observateur ainsi définie pour illustrer le comportement d'autres types d'observables.
7.3.2. Exemple-09 : les méthodes Observable.[interval, take, doNext]
![]() |
![]() |
Cet exemple illustre l'utilisation de l'observable Observable.interval(long interval, TimeUnit unit) qui émet des entiers longs à intervalles de temps réguliers. Il faut noter le point [1] : par défaut, l'observable [Observable.interval] s'exécute sur l'un des threads du schéduler [Schedulers.computation].
Le code sera le suivant :
package dvp.rxjava.observables.exemples;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import dvp.rxjava.observables.utils.Observateur;
import rx.Observable;
public class Exemple09 {
public static void main(String[] args) throws InterruptedException {
// nombre d'observateurs
final int nbObservateurs = 2;
// sémaphore
CountDownLatch latch = new CountDownLatch(nbObservateurs);
// configuration observable
Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.doOnNext(l -> showInfos.accept(l.toString()));
// exécution observable (observation)
showInfos.accept("main : début observation");
for (int i = 0; i < nbObservateurs; i++) {
obs1.subscribe(new Observateur<>(String.format("observateur [%d]", i), latch, showInfos,
"obs1"));
}
// attente
showInfos.accept("main : attente fin observation");
latch.await();
// fin
showInfos.accept("main : fin observation");
}
// affichages
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
- ligne 22 : l'observable émet des entiers longs toutes les 500 millisecondes. La série commence avec le nombre 0 ;
- ligne 22 : cet observable émet un nombre infini de valeurs. La méthode [Observable.take(n)] crée un nouvel observable qui ne conserve que les n premiers éléments émis ;
![]() |
Revenons sur le code de l'observable :
Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.doOnNext(l -> showInfos.accept(l.toString()));
Ligne 2, la méthode [Observable.doOnNext] s'exécute à chaque fois que l'observable émet un nouvel élément. C'est souvent utilisé pour loguer des informations. Ici, on veut loguer la date d'émission des éléments pour vérifier si l'intervalle de 500 millisecondes est bien vérifié. La méthode [Observable.doOnNext] ne modifie pas l'observable sur lequel elle s'applique. Sa définition est la suivante :
![]() |
L'exécution donne les résultats suivants :
- lignes 3, 7 et 11 : on voit qu'approximativement l'intervalle d'émission est proche de 500 ms ;
- les 2 observateurs sont bien sur deux threads différents alors même que l'observable n'avait pas été configuré pour s'exécuter avec un schéduler précis. C'est le fonctionnement par défaut de l'observable [Observable.interval] que l'on voit ici ;
7.3.3. Exemples-10/12 : les méthodes Observable.[error, empty, never]
![]() | ![]() |
Nous allons être désormais plus concis dans nos illustrations des méthodes de la classe [Observable]. Le code précédent était le suivant :
package dvp.rxjava.observables;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import rx.Observable;
public class Exemple09 {
public static void main(String[] args) throws InterruptedException {
// nombre d'observateurs
final int nbObservateurs = 2;
// sémaphore
CountDownLatch latch = new CountDownLatch(nbObservateurs);
// configuration observable
Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.doOnNext(l -> showInfos.accept(l.toString()));
// exécution observable (observation)
showInfos.accept("main : début observation");
for (int i = 0; i < nbObservateurs; i++) {
obs1.subscribe(new Observateur<>(String.format("observateur [%d]", i), latch, showInfos,
"obs1"));
}
// attente
showInfos.accept("main : attente fin observation");
latch.await();
// fin
showInfos.accept("main : fin observation");
}
// affichages
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
Ce code avait déjà été utilisé pour l'exemple précédent. Seules les lignes 21-22 changeaient. Nous allons donc factoriser la majorité de ce code dans la classe [ProcessUtils] suivante :
package dvp.rxjava.observables.utils;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import rx.Observable;
public class ProcessUtils {
@SafeVarargs
public static void subscribe(int nbObservateurs, IProcess<?>... processes) throws InterruptedException {
// sémaphore
CountDownLatch latch = new CountDownLatch(nbObservateurs * processes.length);
// exécution observable (observation)
showInfos.accept("main : début observation");
for (int i = 0; i < nbObservateurs; i++) {
for (IProcess<?> process : processes) {
Observable<?> obs = process.getObservable();
obs.subscribe(new Observateur<>(String.format("observateur[%d]", i), latch, showInfos, process.getName()));
}
}
// attente
showInfos.accept("main : attente fin observation");
latch.await();
// fin
showInfos.accept("main : fin observation");
}
// affichages
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
- ligne 13 : la méthode admet deux paramètres :
- nbObservateurs : le nombre d'observateurs des process passés en second paramètre ;
- processes : les processus (observables nommés) à observer. Grâce à la notation [IProcess<?>], les processus pourront émettre des éléments de types différents ;
- ligne 16 : le sémaphore doit passer au vert lorsque tous les observateurs ont terminé toutes leurs observations. La valeur intiale du sémaphore est dont le nombre d'observateurs fois le nombre d'observations ;
- lignes 20-25 : on abonne chaque observateur à tous les processus qu'il faut observer ;
- ligne 23 : on récupère l'observable à partir du process (cf paragraphe 7.3.1) ;
- ligne 23 : on lui abonne un observateur. On passe à celui-ci 4 informations :
- son nom ;
- le sémaphore qu'il doit décrémenter lorsqu'il reçoit la notification de fin d'émission de l'observable qu'il observe ;
- la méthode à utiliser lorsqu'il veut loguer des informations sur la console ;
- le nom du processus qu'il va observer ;
Ces classes étant définies, l'exemple 10 sera le suivant :
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple10 {
public static void main(String[] args) throws InterruptedException {
// configuration observable
Observable<?> obs = Observable.error(new RuntimeException("Erreur !!!")).subscribeOn(Schedulers.computation());
// exécution (observation) observable
ProcessUtils.subscribe(2,new Process<>("process1", obs));
}
}
Ligne 11, la méthode statique [Observable.error] est définie de la façon suivante :
![]() |
La ligne 8 configure donc un observable qui se contente d'émettre une exception à destination de la méthode [onError] de ses soucripteurs. L'exécution donne les résultats suivants :
main : début observation ------Thread[main] ---- Time[22:618]
main : attente fin observation ------Thread[main] ---- Time[22:636]
Subscriber[observateur[1], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-2] ---- Time[22:638]
Subscriber[observateur[0], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-1] ---- Time[22:638]
Lignes 3 et 4, la méthode [onError] des deux soucripteurs a reçu l'exception lancée par l'observable.
Cette exécution a une particularité : les méthodes [onCompleted] des deux observateurs n'ont pas été appelées. Du coup, la barrière n'a pas été abaissée et le thread principal reste bloqué dans la méthode statique [ProcessUtils.subscribe] à la ligne 3 suivante :
// attente
showInfos.accept("main : attente fin observation");
latch.await();
// fin
showInfos.accept("main : fin observation");
On découvre ici qu'en cas d'erreur de l'observable, la méthode [onCompleted] des souscripteurs n'est pas appelée. Nous modifions alors la méthode [Observateur.onError] de la façon suivante :
@Override
public void onError(Throwable e) {
// erreur d'émission
if (!isUnsubscribed()) {
showInfos.accept(String.format("Subscriber[%s, %s].onError (%s)", observerName, processName, e));
}
// fin blocage thread principal
latch.countDown();
}
Nous rajoutons les lignes 7-8 pour lever la barrière en cas d'erreur de l'observable. Avec ce nouveau code, l'exécution donne les résultats suivants :
main : début observation ------Thread[main] ---- Time[40:750]
main : attente fin observation ------Thread[main] ---- Time[40:764]
Subscriber[observateur[0], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-1] ---- Time[40:766]
Subscriber[observateur[1], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-2] ---- Time[40:766]
main : fin observation ------Thread[main] ---- Time[40:767]
Nous obtenons la ligne 5 que nous n'avions pas eue précédemment.
L'exemple 11 sera le suivant :
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple11 {
public static void main(String[] args) throws InterruptedException {
// configuration observable
Observable<?> obs1 = Observable.empty();
// exécution (observation) observable
ProcessUtils.subscribe(2,new Process<>("process1",obs1));
}
}
Ligne 10, la méthode statique [Observable.empty] crée un observable n'émettant aucun élément. Il n'émet que la notification de fin d'émission ;
![]() |
L'exécution du code de l'exemple ci-dessus donne les résultats suivants :
- lignes 2 et 3 : on voit que les deux observateurs reçoivent la notification de fin d'émission sans avoir reçu d'éléments auparavant.
On peut se demander à quoi peut bien servir cette méthode. On peut s'en servir de façon analogue à une collection, vide au départ, et dans laquelle on cumule ensuite des éléments :
Ligne 3, on fusionne l'observable initial obs (ligne 1) avec d'autres observables.
L'exemple 12 illustre la méthode statique [Observable.never] :
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple12 {
public static void main(String[] args) throws InterruptedException {
// configuration observable
Observable<?> obs1 = Observable.never();
// exécution (observation) observable
ProcessUtils.subscribe(2,new Process<>("process1",obs1));
}
}
La méthode statique [Observable.never] crée un observable qui n'émet jamais :
![]() |
L'exécution de l'exemple donne les résultats suivants :
Ligne 2, le thread principal attend indéfiniment. En effet, aucun observable n'émet la notification [onCompleted] qui permet de passer le sémaphore (garde-barrière) au vert (baisser la barrière).
7.4. Multi-threading
7.4.1. Exemple-13 : thread d'action, thread d'observation
Nous avons au paragraphe 7.1.3 créé un observable avec la méthode statique [Observable.create] :
![]() |
- la méthode [create] rend un type Observable<T> ;
- le paramètre de la méthode [create] est une fonction de type [Observable.OnSubscribe<T>] définie de la façon suivante :
![]() |
Le type [Observable.OnSubscribe<T>] est une interface fonctionnelle qui elle-même étend l'interface fonctionnelle [Action1<Subscriber<? super T>>]. La méthode [call] de cette interface attend un type [Subscriber] (abonné, souscripteur, observateur). Dans la suite de ce document, nous appellerons parfois le type [Observable.OnSubscribe<T>] une action. Nous allons créer des actions personnalisées qui auront un nom. Ce seront des instances de l'interface [IProcessAction] suivante :
![]() |
package dvp.rxjava.observables.utils;
import rx.Observable;
public interface IProcessAction<T> extends Observable.OnSubscribe<T> {
// l'action a un nom
public String getName();
}
- ligne 5 : l'interface [IProcessAction<T>] a toutes les caractéristiques de l'interface [Observable.OnSubscribe<T>] ;
- ligne 8 : elle a de plus une méthode [getName] qui rend le nom de l'instance implémentant l'interface ;
Nous allons utiliser l'action nommée [ProcessAction01] suivante :
package dvp.rxjava.observables.utils;
import java.util.Random;
import rx.Subscriber;
import rx.functions.Func1;
public class ProcessAction01<T> implements IProcessAction<T> {
// data
private String name;
private int nbValues;
private Func1<Integer, T> func1;
// constructeurs
public ProcessAction01(String name, int nbValues, Func1<Integer, T> func1) {
this.name = name;
this.nbValues = nbValues;
this.func1 = func1;
}
@Override
public void call(Subscriber<? super T> subscriber) {
ProcessUtils.showInfos.accept(String.format("Observable (%s) call start", getName()));
for (int i = 0; i < nbValues; i++) {
// attente
try {
Thread.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
// erreur
ProcessUtils.showInfos.accept(String.format("Observable (%s) onError", getName()));
subscriber.onError(e);
}
// émission d'un élément
T value = func1.call(i);
ProcessUtils.showInfos.accept(String.format("Observable (%s,%s) onNext (%s)", getName(), i, value));
subscriber.onNext(value);
}
// fini
ProcessUtils.showInfos.accept(String.format("Observable (%s) onCompleted", getName()));
subscriber.onCompleted();
}
@Override
public String getName() {
return name;
}
}
- ligne 8 : la classe [ProcessAction01<T>] implémente l'interface [IProcessAction<T>] donc l'interface [Observable.OnSubscribe<T>] ;
- ligne 11 : le nom de l'action ;
- ligne 12 : le nombre de valeurs à émettre ;
- ligne 13 : une instance de type [Func1<Integer, T>] qui à partir d'un entier fabrique un type T qui sera émis par l'observable (lignes 35 et 37) ;
- lignes 16-20 : on passe au constructeur et le nom de l'action et le nombre de valeurs à émettre et la fonction d'émission ;
- lignes 23-42 : le code du processus ;
- ligne 23 : la méthode [call] reçoit en paramètre l'abonné à l'observable associé au processus ;
- ligne 28 : le processus émet ses éléments après une attente de durée aléatoire ;
- ligne 32 : l'émission d'une erreur ;
- ligne 37 : une émission normale ;
- ligne 41 : émission de la notification de fin d'émission ;
- lignes 25-38 : l'action émet nbValues réels après un temps d'attente aléatoire (ligne 30) ;
- ligne 35 : la valeur à émettre est fournie par la fonction [func1] passée en paramètre au constructeur (ligne 16) ;
Nous refactorisons la classe [Process] (cf paragraphe 7.3.1) pour qu'elle puisse être construite aussi avec une action nommée. On lui ajoute le constructeur suivant :
public Process(IProcessAction<T> na, Scheduler schedulerObserved, Scheduler schedulerObserver) {
// nom process=nom action
name = na.getName();
// action --> observable
observable = Observable.create(na);
// thread d'exécution du processus observé
if (schedulerObserved != null) {
observable = observable.subscribeOn(schedulerObserved);
}
// thread d'observation de l'observateur
if (schedulerObserver != null) {
observable = observable.observeOn(schedulerObserver);
}
}
- ligne 1, le constructeur admet 3 paramètres :
- l'action nommée qui va servir à construire l'observable (ligne 5) ;
- le schéduler du processus observé (peut être null) ;
- le schéduler de l'observateur (peut être null) ;
- ligne 5 : l'observable est créé à partir de l'action passée en paramètre ;
Le code suivant [Exemple13] observe différents observables :
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple13 {
public static void main(String[] args) throws InterruptedException {
// processus 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 1, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// processus 2
Process<String> process2 = new Process<>(
new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), Schedulers.computation(), null);
// processus 3
Process<Integer> process3 = new Process<>(new ProcessAction01<Integer>("process3", 3, i -> i * 2), null,
Schedulers.computation());
// processus 4
Process<Boolean> process4 = new Process<>(new ProcessAction01<Boolean>("process4", 4, i -> i % 2 == 0), null, null);
// souscriptions
ProcessUtils.subscribe(1, process1);
ProcessUtils.subscribe(1, process2);
ProcessUtils.subscribe(1, process3);
ProcessUtils.subscribe(1, process4);
}
}
- lignes 13-15 : le processus process1 produit 1 nombre réel sur un thread de calcul qui sera observé sur un autre thread de calcul ;
- lignes 17-18 : le processus process2 produit 2 chaînes de caractères sur un thread de calcul et on ne donne pas d'indication sur le thread de l'observateur. Les résultats montrent que l'observation se fait par défaut sur le même thread que celui de l'exécution du processus ;
- lignes 20-21 : le processus process3 produit 3 nombres entiers sur un thread non imposé qui seront observés sur un thread de calcul. Les résultats montrent que l'exécution du processus se fait par défaut sur le thread principal ;
- ligne 23 : le processus process4 produit 4 booléens sur un thread non imposé qui seront observés sur sur un thread non imposé. Les résultats montrent que l'exécution du processus et son observation se font par défaut sur le thread principal ;
Le résultat de l'exécution de ce code est la suivant :
- le processus process1 produit 1 nombre réel (ligne 4) sur le thread de calcul [RxComputationThreadPool-4] qui est observé sur un le thread de calcul [RxComputationThreadPool-3] (ligne 6) ;
- le processus process2 produit 2 chaînes de caractères (lignes 12, 14) sur le thread de calcul [RxComputationThreadPool-5] qui sont observés sur ce même thread (lignes 13, 15) ;
- le processus process3 produit 3 nombres entiers (lignes 21, 23, 25) sur le thread principal qui sont observés sur le thread de calcul [RxComputationThreadPool-6] (lignes 22, 24, 28) ;
- le processus process4 produit 4 booléens (lignes 34, 36, 38, 40) sur le thread principal qui sont observés sur ce même thread principal (lignes 33, 35, 37, 39) ;
Le lecteur est invité à suivre ci-dessus :
- le cycle de vie du processus observé et son thread ;
- le cycle de vie de son observateur et son thread ;
Beaucoup de l'intérêt des bibliothèques Rx repose sur ce multi-threading que le développeur n'a pas à gérer lui-même.
7.5. Combinaisons de plusieurs observables
7.5.1. Exemple-14 : fusionner deux observables avec [Observable.merge]
Nous présentons maintenant des méthodes statiques de la classe [Observable] permettant de combiner plusieurs observables dans un observable résultat.
Le premier exemple de ce genre sera le suivant :
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.ProcessAction01;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple14 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process2
Process<String> process2 = new Process<>(
new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), Schedulers.computation(), null);
// merge
Process<?> process12 = new Process<>("process12",
Observable.merge(process1.getObservable(), process2.getObservable()));
// souscriptions
ProcessUtils.subscribe(1, process12);
}
}
- lignes 15-17 : un processus appelé [process1] va émettre 3 nombres réels sur un thread de calcul. Il sera également observé sur un thread de calcul ;
- lignes 19-20 : un processus appelé [process2] va émettre 2 chaînes de caractères sur un thread de calcul. Le thread d'observation n'est pas imposé. On a vu précédemment que dans ce cas, le thread d'observation est le thread de calcul ;
- ligne 23 : les deux processus sont fusionnés, ç-à-d qu'on crée un observable dont les éléments proviennent simultanément des deux processus. On utilise pour cela la méthode statique [Observable.merge] :
![]() |
Contrairement à ce que pourrait laisser croire le schéma ci-dessus, lors de la fusion, les éléments d'un flux 1 peuvent s'intercaler entre les éléments d'un flux 2. C'est ce que montrent les résultats de l'exécution :
- ligne 3 : le process [process1] s'exécute sur le thread de calcul [RxComputationThreadPool-4] ;
- ligne 4 : le process [process2] s'exécute sur le thread de calcul [RxComputationThreadPool-5] ;
- ligne 9 : le process [process12] est observé sur le thread de calcul [RxComputationThreadPool-3]. Je ne connais pas la règle qui a conduit à ce choix ;
- lignes 9-11 : on voit que l'observateur observe des éléments des deux process [process1] (ligne 5) et [process2] (lignes 6, 7) alors qu'aucun des deux n'est terminé (il y a mélange) ;
- le processus [process12] se termine (ligne 17) lorsque les deux processus process1 et process2 sont terminés ;
7.5.2. Exemple-15 : concaténer deux observables avec [Observable.concat]
Nous examinons maintenant le code suivant :
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.ProcessAction01;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple15 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process2
Process<String> process2 = new Process<>(
new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), null, Schedulers.computation());
// concat
Process<?> process12 = new Process<>("process12",
Observable.concat(process1.getObservable(), process2.getObservable()));
// souscriptions
ProcessUtils.subscribe(1, process12);
}
}
- lignes 15-17 : un processus appelé [process1] va émettre 3 nombres réels sur un thread de calcul. Il sera également observé sur un thread de calcul ;
- lignes 19-20 : un processus appelé [process2] va émettre 2 chaînes de caractères sur un thread non imposé, ici le thread principal par défaut. Il sera observé sur un thread de calcul ;
- ligne 23 : les deux processus sont concaténés, ç-à-d qu'on crée un observable dont les éléments proviennent des deux processus. Il n'y a pas mélange des valeurs émises. Le processus [process12] va d'abord émettre toutes les valeurs du processus [process1] puis celles du processus [process2]. On utilise pour cela la méthode statique [Observable.concat] :
![]() |
Les résultats de l'exécution sont les suivants :
- lignes 3-10 : le processus [process1] s'exécute et le processus [process12] émet les valeurs émises par [process1] ;
- ligne 9 : le processus [process1] est terminé ;
- lignes 11-17 : le processus [process2] s'exécute et le processus [process12] émet les valeurs émises par [process2] ;
Il y a une bizarrerie pour le processus process2 : on n'avait pas imposé de thread d'exécution. On pouvait alors s'attendre à ce que celui-ci par défaut soit le thread principal. Or il n'en est rien. Le thread d'exécution a été le thread de calcul [RxComputationThreadPool-3] (ligne 11). Donc lorsqu'on n'impose pas de thread d'exécution ou d'observation, on ne peut faire d'hypothèse sur le thread qui sera choisi.
7.5.3. Exemple-16 : combiner deux observables avec [Observable.zip]
Nous examinons maintenant le code suivant :
package dvp.rxjava.observables.exemples;
import java.util.Arrays;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.functions.FuncN;
import rx.schedulers.Schedulers;
public class Exemple16 {
public static void main(String[] args) throws InterruptedException {
// processus 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// processus 2
Process<String> process2 = new Process<>(
new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), null, null);
// fonction de combinaison des 2 processus
FuncN<String> funcn = new FuncN<String>() {
@Override
public String call(Object... args) {
if (args.length == 2) {
return String.format("double=%s, string=%s", args[0], args[1]);
} else {
throw new RuntimeException("la fonction attend 2 paramètres exactement");
}
}
};
// zip des 2 processus
Process<String> process12 = new Process<>("process12",
Observable.zip(Arrays.asList(process1.getObservable(), process2.getObservable()), funcn));
// souscriptions
ProcessUtils.subscribe(1, process12);
}
}
- lignes 16-18 : un processus appelé [process1] va émettre 3 nombres réels sur un thread de calcul. Il sera également observé sur un thread de calcul ;
- lignes 20-21 : un processus appelé [process2] va émettre 2 chaînes de caractères sur un thread non imposé. Le thread d'observation n'est pas imposé non plus ;
- lignes 23-32 : instanciation d'un type [FuncN<String>] avec une classe anonyme. FuncN est une interface fonctionnelle :
![]() |
La méthode [FuncN.call] attend un tableau d'objets et rend un type R. La fonction [funcn] va être utilisée pour combiner les processus process1 et process2 dans cet ordre. Dans la méthode [FuncN.call] :
- args[0] sera un Double ;
- args[1] sera un String ;
Ici, le résultat de [funcn.call] sera la chaîne de caractères de la ligne 27. La construction de ce résultat ne nécessite pas de connaître les types des arguments de la méthode call.
Les deux processus sont combinés de la façon suivante :
// zip des 2 processus
Process<String> process12 = new Process<>("process12",
Observable.zip(Arrays.asList(process1.getObservable(), process2.getObservable()), funcn));
La méthode [Observable.zip] a le fonctionnement suivant :
![]() |
On voit que :
- le 1er argument de zip est un Iterable<Observable>. Dans notre exemple, nous avons un paramètre effectif de type List<Observable> formé de nos deux observables ;
- le second argument de zip est un type FuncN. Dans notre exemple, le paramètre effectif est [funcn] ;
L'exécution donne les résultats suivants :
- lignes 7, 11 : le processus process12 émet deux éléments ;
- ligne 8 : l'élément supplémentaire émis par le processus process1 qui n'a pas de partenaire dans le processus process2 n'est pas émis par le processus résultat process12 ;
On voit que le processus process2 à qui on n'avait imposé ni thread d'exécution, ni thread d'observation a utilisé le thread principal pour les deux.
7.5.4. Exemple-17 : combiner deux observables avec [Observable.combineLatest]
Nous examinons maintenant le code suivant :
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple17 {
public static void main(String[] args) throws InterruptedException {
// processus 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// processus 2
Process<Double> process2 = new Process<>(
new ProcessAction01<Double>("process2", 2, i -> new Random().nextInt(200) * 1.4), null,
Schedulers.computation());
// combinaison des 2 processus
Process<Double> process12 = new Process<>("process12",
Observable.combineLatest(process1.getObservable(), process2.getObservable(), (d1, d2) -> d1 + d2));
// souscriptions
ProcessUtils.subscribe(1, process12);
}
}
- lignes 14-16 : un processus appelé [process1] va émettre 3 nombres réels sur un thread de calcul. Il sera également observé sur un thread de calcul ;
- lignes 18-20 : un processus appelé [process2] va émettre 2 nombres réels sur un thread non imposé. Ils seront observés sur un thread de calcul ;
- ligne 23 : les deux observables sont combinés avec la méthode statique [Observable.combineLatest] suivante :
![]() |
L'observable [combineLatest] fonctionne de la façon suivante : lorsque l'un des deux observables émet un élément E1, cet élément est combiné par [combineFunction] avec le dernier élément émis par l'autre observable.
L'exécution de ce code donne le résultat suivant :
- ligne 5 : l'émission de process2 (56) est combinée avec le dernier élément émis par process1 (54, ligne 4)et produit le résultat de la ligne 7 ;
- ligne 6 : l'émission de process1 (51.6) est combinée avec le dernier élément émis par process2 (56, ligne 5) et produit le résultat de la ligne 8 ;
- ligne 9 : l'émission de process2 (261.8) est combinée avec le dernier élément émis par process1 (51.6, ligne 6) et produit le résultat de la ligne 12 ;
- ligne 13 : l'émission de process1 (80.39) est combinée avec le dernier élément émis par process2 (261.8, ligne 9) et produit le résultat de la ligne 15 ;
On est ici dans une variante de l'observable [zip] où cette fois, les éléments combinés ne sont pas nécessairement les éléments de même position dans les flux. On remarque ici, que le processus process2 auquel on n'avait pas imposé de thread d'exécution a été ici exécuté sur le thread principal (ligne 2).
7.5.5. Exemple-18 : combiner deux observables avec [Observable.amb]
Nous examinons maintenant le code suivant :
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple18 {
public static void main(String[] args) throws InterruptedException {
// processus 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// processus 2
Process<Double> process2 = new Process<>(
new ProcessAction01<Double>("process2", 2, i -> new Random().nextInt(200) * 1.4), null, null);
// combinaison des 2 processus
Process<Double> process12 = new Process<>("process12",
Observable.amb(process1.getObservable(), process2.getObservable()));
// souscriptions
ProcessUtils.subscribe(1, process12);
}
}
- lignes 14-16 : un processus appelé [process1] va émettre 3 nombres réels sur un thread de calcul. Il sera également observé sur un thread de calcul ;
- lignes 18-20 : un processus appelé [process2] va émettre 2 nombres réels sur un thread non imposé. Ils seront observés sur un thread non imposé ;
- ligne 22 : les deux observables sont combinés avec la méthode statique [Observable.amb] suivante :
![]() |
Comme le montre le schéma ci-dessus, l'observable [Observable.amb(Observable o1, Observable o2)] émet les éléments de l'observable qui émet le premier. Ce que confirment les résultats de l'exemple présenté :
- ligne 4, c'est le processus process2 qui émet le premier ;
- lignes 8, 12 : le processus process12 émet tous les éléments émis par le processus process2 (lignes 4, 11) ;
7.6. Chaîne de traitement d'un observable
7.6.1. Exemple-19 : transformer un observable avec [Observable.map]
Dans les exemples précédents, nous avons examiné diverses combinaisons de deux observables en un troisième observable. Nous présentons maintenant des méthodes statiques de la classe [Observable] qui permettent des opérations de transformation, filtrage, agrégation sur un observable. On va retrouver ici des méthodes analogues à celles de la classe [Stream] étudiées au paragraphe 5.
Notre premier exemple sera le suivant :
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple19 {
public static void main(String[] args) throws InterruptedException {
// processus 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// processus 2
Process<String> process2 = new Process<>("process2",
process1.getObservable().map(d -> String.format("valeur-%s", d)));
// souscriptions
ProcessUtils.subscribe(1, process2);
}
}
- lignes 14-16 : un processus appelé process1 va émettre 3 nombres réels sur un thread de calcul. Il sera également observé sur un thread de calcul ;
- lignes 17-18 : les nombres émis par process1 vont être transformés en chaînes de caractères dans un processus process2 ;
- ligne 20 : on observe process2 ;
La méthode [Observable.map] de la ligne 18 est analogue à la méthode [Stream.map] étudiée au paragraphe 5.5 :
![]() |
Les résultats de l'exemple sont les suivants :
- lignes 4, 5 et 8 : les émissions de process1. Ce sont des nombres réels ;
- lignes 6, 7, 10 : les émissions de process2 observées. Ce sont des chaînes de caractères ;
7.6.2. Exemple-20 : filtrer un observable avec [Observable.filter]
L'exemple sera le suivant :
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple20 {
public static void main(String[] args) throws InterruptedException {
// processus 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// processus 2
Process<Integer> process2 = new Process<>("process2", process1.getObservable().filter(i -> i % 2 == 0));
// souscriptions
ProcessUtils.subscribe(1, process2);
}
}
- lignes 11-12 : un processus appelé process1 va émettre les nombres entiers de 0 à 2 sur un thread de calcul. Il sera également observé sur un thread de calcul ;
- ligne 14 : les nombres émis par process1 vont être filtrés pour ne garder dans process2 que les nombres pairs ;
- ligne 20 : on observe process2 ;
La méthode [Observable.filter] de la ligne 18 est analogue à la méthode [Stream.filter] étudiée au paragraphe 5.4 :
![]() |
Les résultats de l'exemple sont les suivants :
- lignes 4, 5 et 7 : les émissions de process1 ;
- lignes 6, 9 : les émissions de process2 observées. Ce sont les éléments de process1 qui sont pairs ;
7.6.3. Exemple-21 : transformer un observable avec [Observable.flatMap]
L'exemple sera le suivant :
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple21 {
public static void main(String[] args) throws InterruptedException {
// processus 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// processus 2
Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMap(i -> {
int value = i * 10;
return Observable.just(value, value + 1, value + 2);
}));
// souscriptions
ProcessUtils.subscribe(1, process2);
}
}
- lignes 12-13 : un processus appelé process1 va émettre les nombres entiers de 0 à 2 sur un thread de calcul. Il sera également observé sur un thread de calcul ;
- lignes 15-18 : chaque nombre n émis par process1 est tranformé en un observable émettant les 3 nombres (10*n, 10*n+1, 10*n+2). Si ligne 15, on utilisait la méthode [map], process2 émettrait un type Observable<Integer> et non un type Integer. La méthode [flatMap] utilisée permet d'aplatir (flatten) cette suite d'éléments de type Observable<Integer> en une suite d'éléments de type Integer constituée de chacun des éléments de chacun des Observable<Integer> ;
- ligne 20 : on observe process2 ;
La méthode [Observable.flatMap] de la ligne 15 est analogue à la méthode [Stream.flatMap] étudiée au paragraphe 5.6.12 :
![]() |
Les résultats de l'exemple sont les suivants :
- lignes 5-7 : les trois émissions de process2 suite à l'émission de la ligne 4 de process1 ;
- lignes 9-11 : les trois émissions de process2 suite à l'émission de la ligne 8 de process1 ;
- lignes 14-16 : les trois émissions de process2 suite à l'émission de la ligne 12 de process1 ;
Le code suivant montre comment créer un type Observable<Integer[]> à partir de process1 [Exemple21b] :
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21b {
public static void main(String[] args) throws InterruptedException {
// processus 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// processus 2
Process<Integer[]> process2 = new Process<>("process2", process1.getObservable().map(i -> {
int value = i * 10;
return new Integer[] { value, value + 1, value + 2 };
}));
// souscriptions
ProcessUtils.subscribe(1, process2);
}
}
- ligne 14 : on utilise la méthode [Observable.map] ;
- ligne 16 : qui retourne un type Integer[] ;
Les résultats sont les suivants :
- lignes 6, 7, 10 : on voit les résultats du map ;
Toutes ces transformations d'observable peuvent être chaînées puisque chaque transformation produit un nouvel observable. C'est ce que montre l'exemple suivant [Exemple21c] :
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple21c {
public static void main(String[] args) throws InterruptedException {
// processus 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// processus 2
Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMap(i -> {
int value = i * 10;
return Observable.just(value, value + 1, value + 2);
}).filter(i -> i % 2 == 0));
// souscriptions
ProcessUtils.subscribe(1, process2);
}
}
- lignes 15-18 : le flatMap est suivi d'un filter ;
Les résultats d'exécution sont les suivants :
- lignes 8-13 : process2 n'a émis que les éléments pairs issus du flatMap ;
Une méthode proche de [flatMap] est la méthode [flatMapIterable] illustrée par l'exemple suivant [Exemple21d] :
package dvp.rxjava.observables.exemples;
import java.util.Arrays;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21d {
public static void main(String[] args) throws InterruptedException {
// processus 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// processus 2
Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMapIterable(i -> {
int value = i * 10;
return Arrays.asList(value, value + 1, value + 2);
}).filter(i -> i % 2 == 0));
// souscriptions
ProcessUtils.subscribe(1, process2);
}
}
Ligne 16, au lieu d'utiliser la méthode [flatMap], on utilise la méthode [flatMapIterable]. Dans ce cas, la function de transformation doit produire un type Iterable<T> (ligne 18) au lieu d'un type Observable<T>.
On obtient les mêmes résultats que précédemment.
Revenons à la définition de la méthode [flatMap] :
![]() |
On voit ci-dessus, qu'un élément bleu [3] s'est inséré entre les deux éléments verts [1-2]. Cela veut dire que dans son opération d'aplatissement des Observable<T>, la méthode [flatMap] respecte l'ordre d'émission de ces différents observables internes. Ceci est montré par l'exemple suivant [Exemple21e] :
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21e {
public static void main(String[] args) throws InterruptedException {
// processus 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
Schedulers.computation());
// processus 2
Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
Schedulers.computation(), Schedulers.computation());
// processus 3
Process<Integer> process3 = new Process<>("process3",
process1.getObservable().flatMap(i -> process2.getObservable()));
// souscriptions
ProcessUtils.subscribe(1, process3);
}
}
- lignes 11-12 : le processus process1 émet les nombres entiers [0,1] ;
- lignes 14-15 : le processus process2 émet les nombres entiers [10,11,12] ;
- lignes 17-18 : à chaque élément émis par process1, on associe l'observable du processus process2. Cela signifie que :
- à l'élément [0] de process1 sera associé un observable émettant les [10,11,12] ;
- idem pour l'élément 1 ;
Au final, les 6 nombres [10, 11, 12, 10, 11, 12] vont être émis. On veut voir dans quel ordre.
Les résultats de l'exécution sont les suivants :
On voit que l'ordre d'émission du processus process3 a été : [10, 10, 11, 12, 11, 12] (lignes 11, 12, 14, 17, 19, 22). Il y a donc bien eu mélange des éléments émis par le processus process2. On peut éviter cela, en utilisant la méthode [concatMap] à la place de la méthode [flatMap]. C'est ce que montre le code suivant [Exemple21ef] :
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21ef {
public static void main(String[] args) throws InterruptedException {
// processus 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
Schedulers.computation());
// processus 2
Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
Schedulers.computation(), Schedulers.computation());
// processus 3
Process<Integer> process3 = new Process<>("process3",
process1.getObservable().concatMap(i -> process2.getObservable()));
// souscriptions
ProcessUtils.subscribe(1, process3);
}
}
Ligne 18, on a remplacé [flatMap] par [concatMap]. Les résultats de l'exécution sont les suivants :
On voit que l'ordre d'émission du processus process3 a été : [10, 11, 12, 10, 11, 12] (lignes 12-14, 17, 19, 22). Les éléments émis par le processus process2 n'ont pas été mélangés.
Une autre variante de la méthode [map] est la méthode [switchMap] :
![]() |
Ci-dessus, de l'observable [1], naissent 3 autres observables [2] de 2 éléments qui sont ensuite aplatis comme dans [flatMap] [3]. On peut remarquer que le résultat a 5 éléments et non 6. Cela vient du fait qu'avant que le deuxième observable n'émette son élément n° 2 [6], le troisième observable émet lui son premier élément [5], ce qui fait que le deuxième observable est abandonné. On ne retrouve donc pas l'élement [6] dans l'observable résultat [3].
Pour illustrer [switchMap], nous utiliserons l'exemple [Exemple21eg] suivant :
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21eg {
public static void main(String[] args) throws InterruptedException {
// processus 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
Schedulers.computation());
// processus 2
Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
Schedulers.computation(), Schedulers.computation());
// processus 3
Process<Integer> process3 = new Process<>("process3",
process1.getObservable().switchMap(i -> process2.getObservable()));
// souscriptions
ProcessUtils.subscribe(1, process3);
}
}
L'exécution de l'exemple donne les résultats suivants :
- process1 émet 2 éléments qui donnent naissance à 2 observables process2 de 3 éléments ;
- ligne 14 : l'observateur recoit l'élément n° 0 émis par le 1er observable process2 ligne 6 ;
- ligne 15 : l'observateur recoit l'élément n° 0 émis par le 2ième observable process2 ligne 13. L'histoire ne dit pas pourquoi il n'a pas reçu auparavant les éléments 1 et 2 émis par le 1er observable process2 aux lignes 7 et 8. Toujours est-il que le 1er observable process2 est abandonné ;
- au final, l'observateur ne voit que 4 éléments (lignes 14, 15, 17, 20) au lieu des 6 qui ont été émis ;
7.6.4. Exemples-22 : autres méthodes de la classe [Observable]
La classe [Observable] reprend de nombreuses méthodes de la classe [Stream] avec un fonctionnent analogue. En voici quelques unes. Nous nous contentons de donner le code et ses résultats.
[Exemple22a - take=limit]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22a {
public static void main(String[] args) throws InterruptedException {
// processus
Process<Integer> process = new Process<>("process", Observable.range(1, 10).take(3));
// souscriptions
ProcessUtils.subscribe(1, process);
}
}
résultats
[Exemple22b - takeLast]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22b {
public static void main(String[] args) throws InterruptedException {
// processus
Process<Integer> process = new Process<>("process", Observable.range(1, 10).takeLast(2));
// souscriptions
ProcessUtils.subscribe(1, process);
}
}
résultats
[Exemple22c - skip]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22c {
public static void main(String[] args) throws InterruptedException {
// processus
Process<Integer> process = new Process<>("process", Observable.range(1, 10).skip(5).take(2));
// souscriptions
ProcessUtils.subscribe(1, process);
}
}
résultats
[Exemple22d - reduce]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22d {
public static void main(String[] args) throws InterruptedException {
// processus
Process<Integer> process = new Process<>("process", Observable.range(1, 10).reduce(0, (i, a) -> i + a));
// souscriptions
ProcessUtils.subscribe(1, process);
}
}
- ligne 10 : calcule la somme des éléments de l'observable. Le résultat est un observable qui émet cette somme ;
résultats
[Exemple22e - all]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22e {
public static void main(String[] args) throws InterruptedException {
// processus
Process<Boolean> process = new Process<>("process", Observable.range(1, 10).all(i -> i > 10));
// souscriptions
ProcessUtils.subscribe(1, process);
}
}
- ligne 10 : rend un Observable<Boolean> qui émet l'élément true, si le prédicat de la méthode [all] est vrai pour tous les éléments, false sinon ;
résultats
[Exemple22f - count]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22f {
public static void main(String[] args) throws InterruptedException {
// processus
Process<Integer> process = new Process<>("process", Observable.range(1, 10).count());
// souscriptions
ProcessUtils.subscribe(1, process);
}
}
- ligne 10 : [Observable.count] crée un observable à 1 élément qui est la somme des éléments observés ;
résultats
[Exemple22g - distinct]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22g {
public static void main(String[] args) throws InterruptedException {
// processus
Process<Integer> process = new Process<>("process", Observable.just(1, 2, 1, 3).distinct());
// souscriptions
ProcessUtils.subscribe(1, process);
}
}
résultats
[Exemple22h - groupBy, asObservable]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.observables.GroupedObservable;
public class Exemple22h {
public static void main(String[] args) throws InterruptedException {
// processus
Observable<GroupedObservable<Boolean, Integer>> obs = Observable.range(1, 10).groupBy(i -> i % 2 == 0);
Process<Integer> process = new Process<>("process", obs.concatMap(g -> g.asObservable()));
// souscriptions
ProcessUtils.subscribe(1, process);
}
}
- ligne 11 : la méthode [groupBy] regroupe les 10 éléments émis en 2 groupes, les nombres pairs et les nombres impairs. Le résultat est un type Observable<GroupedObservable<Boolean, Integer>>, ç-à-d un observable dont les éléments sont de type GroupedObservable<Boolean, Integer> où Boolean est le type de la clé du groupe (false, true ici) et qui est aussi le type du résultat du lambda passée en paramètre à la méthode [groupBy], et Integer le type des éléments du groupe ;
- ligne 12 : le type GroupedObservable a une méthode [asObservable] qui permet de créer un observable à partir de ce type. On va donc avoir 2 types Observable<Integer>, l'un pour les nombres pairs, l'autre pour les nombres impairs. De ces deux observables, la méthode [concatMap] va en créer un seul ;
résultats
[Exemple22i - timestamp]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
import rx.schedulers.Timestamped;
public class Exemple22i {
public static void main(String[] args) throws InterruptedException {
// processus 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// processus 2
Process<Timestamped<Integer>> process2 = new Process<>("process2", process1.getObservable().timestamp());
// souscriptions
ProcessUtils.subscribe(1, process2);
}
}
- ligne 15, la méthode [timestamp] associe une heure à chaque élément de l'observable traité ;
résultats
Sur cet exemple, il est difficile de dire ce que représente l'information timestamp :
- lignes 4-5 : on voit que l'élément 1 de process1 a été émis 139 ms après l'élément 0 ;
- lignes 6 et 7 : on voit que l'élément 1 de process2 a été observé 234 ms après l'élément 0 ;
- lignes 5, 8 : on voit que l'élément 2 de process1 a été émis 33 ms après l'élément 1 ;
- lignes 7 et 10 : on voit que l'élément 2 de process2 a été observé 37 ms après l'élément 1 ;
Ces décalages sont dus au fait que les threads d'observation et d'exécution des observables ne sont pas les mêmes. Si on remplace les lignes 12-13 par les lignes suivantes (Exemple22j) :
// processus 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
null);
- lignes 2-3 : on n'impose pas le thread d'observation. On sait qu'alors dans ce cas, l'observable est observé là où il est exécuté ;
Cela donne les résultats suivants :
- lignes 4 et 6 : le processus process1 émet son élément n° 1 587 ms après son élément n° 0 ;
- lignes 5 et 7 : l'observateur observe ces 2 éléments avec un écart de 586 ms ;
- lignes 6 et 8 : le processus process1 émet son élément n° 2 396 ms après son élément n° 1 ;
- lignes 7 et 9 : l'observateur observe ces 2 éléments avec un écart de 396 ms ;
Ici, les valeurs du timestamp sont cohérentes : elles représentent bien la date d'émission de l'élément.
7.7. Les schédulers
7.7.1. Exemple-23 : le schéduler [Schedulers.computation]
Nous examinons maintenant les schédulers d'exécution. L'observation se fera sur le thread d'exécution.
Le sujet des schédulers est un peu obscur. Les différents schédulers sont présentés dans cette question sur le site de StackOverflow [http://stackoverflow.com/questions/31276164/rxjava-schedulers-use-cases]:
![]() |
Nous allons tenter d'illustrer l'usage de ces différents schédulers par des exemples. Le premier illustre le schéduler [Schedulers.computation] :
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple23 {
public static void main(String[] args) throws InterruptedException {
// processes
@SuppressWarnings("unchecked")
Process<Double> processes[] = new Process[10];
for (int i = 0; i < processes.length; i++) {
processes[i] = new Process<>(
new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
Schedulers.computation(), null);
}
// souscriptions
ProcessUtils.subscribe(1, processes);
}
}
- lignes 14-19 : on crée un tableau de 10 processus s'exécutant sur un thread de calcul ;
- ligne 17 : chaque processus émet un nombre réel aléatoire ;
- ligne 21 : on s'abonne à tous ces processus ;
Les résultats sont les suivants :
- lignes 2-10 : les 8 premiers processus démarrent sur 8 threads différents (la machine utilisée a 8 coeurs). on peut remarquer qu'ils démarrent tous approximativement au même moment ;
- lignes 17-19 : 3 processus se terminent et libèrent ainsi 3 threads ;
- lignes 23-24 : les deux derniers processus peuvent alors démarrer en prenant 2 des threads ainsi libérés ;
On retiendra donc que le schéduler [Schedulers.computation] fournit un pool de n threads, où n est le nombre de coeurs de la machine. Les threads sont exécutés en parallèle sur ces coeurs.
7.7.2. Exemple-24 : le schéduler [Schedulers.io]
Nous faisons exécuter le code précédent avec le schéduler [Schedulers.io] :
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple24 {
public static void main(String[] args) throws InterruptedException {
// processes
@SuppressWarnings("unchecked")
Process<Double> processes[] = new Process[10];
for (int i = 0; i < processes.length; i++) {
processes[i] = new Process<>(
new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
Schedulers.io(), null);
}
// souscriptions
ProcessUtils.subscribe(1, processes);
}
}
- ligne 18 : les processus s'exécutent avec les threads du schéduler [Schedulers.io] ;
Cela donne les résultats suivants :
- lignes 2-10 : les 10 processus démarrent chacun sur un thread différent. Contrairement au cas précédent, tous les processus ont pu être lancés. On remarque que ces lancements se font sur une durée de 6 ms alors que précédemment cela avait été 1 ms ;
- lignes 13-18 : les observables émettent les uns après les autres et non en quasi parallèle comme cela avait été le cas précédemment ;
Quelle est la diférence entre les schédulers [Schedulers.io] et [Schedulers.computation] ? Une réponse peut être trouvée à l'URL [http://stackoverflow.com/questions/31276164/rxjava-schedulers-use-cases] :
![]() |
7.7.3. Exemple-25 : le schéduler [Schedulers.newThread]
Nous faisons exécuter le code précédent avec le schéduler [Schedulers.newThread] :
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple25 {
public static void main(String[] args) throws InterruptedException {
// processes
@SuppressWarnings("unchecked")
Process<Double> processes[] = new Process[10];
for (int i = 0; i < processes.length; i++) {
processes[i] = new Process<>(
new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
Schedulers.newThread(), null);
}
// souscriptions
ProcessUtils.subscribe(1, processes);
}
}
Les résultats obtenus sont les mêmes qu'avec le schéduler [Schedulers.io] :
A l'URL [http://stackoverflow.com/questions/33415881/retrofit-with-rxjava-schedulers-newthread-vs-schedulers-io], on explique que le schéduler [Schedulers.io] fournit un pool de threads, ce que ne fait pas le schéduler [Schedulers.newThread]. Un pool de threads va créer d'office un nombre n de threads. Il va les allouer aux processus qui en ont besoin. Lorsque ceux-ci sont terminés, leurs threads ne sont pas supprimés mais reviennent dans le pool et peuvent être réutilisés alors par un autre processus. C'est plus économique que de créer / supprimer sans arrêt des threads. Donc on peut penser qu'il est préférable d'utiliser le schéduler [Schedulers.io].
7.7.4. Exemple-26 : les schédulers [Schedulers.immediate, Schedulers.trampoline]
Revenons à l'explication donnée pour ces deux schédulers :
![]() |
L'explication est assez simple à comprendre mais quand on veut l'illustrer on s'aperçoit qu'on ne l'a pas comprise. C'est le livre [Learning Reactive Programming With Java 8] qui m'a permis de créer un exemple qui reprend un exemple trouvé dans ce livre mais le simplifie. C'est le suivant :
package dvp.rxjava.observables.exemples;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.function.Consumer;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Scheduler;
import rx.Scheduler.Worker;
import rx.functions.Action0;
import rx.schedulers.Schedulers;
public class Exemple26 {
public static void main(String[] args) throws InterruptedException {
// un schéduler
Scheduler scheduler = Schedulers.immediate();
// un worker de ce schéduler
Worker worker = scheduler.createWorker();
// un type Action0 à excéuter sur le worker
Action0 action02 = new Action0() {
@Override
public void call() {
// log action02
ProcessUtils.showInfos.accept("action02");
}
};
// un type Action0 à exécuter sur le worker
Action0 action01 = new Action0() {
@Override
public void call() {
// on programme une nouvelle action sur le même worker
worker.schedule(action02);
// log action01
ProcessUtils.showInfos.accept("action01");
}
};
// action01 est programmée sur le worker
worker.schedule(action01);
}
// affichages
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
- ligne 17 : un schéduler. Ce sera soit [Schedulers.immediate] comme ici soit [Schedulers.trampoline] ultérieurement ;
- ligne 19 : on peut faire exécuter des actions de type Action0 (lignes 21, 20) sur des workers du schéduler. La méthode [Scheduler.createWorker] permet de créer un worker. La méthode [Worker.schedule(Action0)] permet de faire exécuter un type Action0 par un worker ;
- lignes 21-27 : une première action appelée [action02] qui sera exécutée (ligne 40) par le worker de la ligne 19 ;
- lignes 30-38 : une seconde action appelée [action01]. Elle a la particularité de faire exécuter l'action action02 sur le même worker qu'elle (ligne 34). C'est là que se situe la différence entre [Schedulers.immediate] et [Schedulers.trampoline] :
- si le schéduler est [Schedulers.immediate] alors ligne 34, l'action action02 va être exécutée immédiatement (d'où le nom du schéduler) et l'action action01 en cours va être interrompue. On verra alors apparaître le message de la ligne 25. L'action action02 terminée, l'action action01 va reprendre et on va voir le message de la ligne 36 ;
- si le schéduler est [Schedulers.trampoline] alors ligne 34, l'action action02 est mise en attente. Elle ne sera exécutée que lorsque la tâche en cours action01 sera terminée. On verra alors apparaître le message de la ligne 36. L'action action01 terminée, l'action action02 va s'exécuter et on va voir le message de la ligne 25 ;
L'exécution du code ci-dessus, donne les résultats suivants :
Si ligne 17, on utilise le schéduler [Schedulers.trampoline], on obtient les résultats inverses :
Ceci dit, il est difficile de faire un lien avec les observables. Je n'ai pas trouvé d'exemple convaincant qui aurait pu montrer l'intérêt d'exécuter un observable sur l'un de ces deux threads. En voici néanmoins un mais que je ne trouve pas du tout naturel :
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.Scheduler.Worker;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
public class Exemple27 {
public static void main(String[] args) throws InterruptedException {
// Worker
Worker worker = Schedulers.immediate().createWorker();
// Worker worker = Schedulers.trampoline().createWorker();
// observable 1 sur worker
worker.schedule(() -> Observable.range(1, 2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
ProcessUtils.showInfos.accept(String.valueOf(i));
// observable 2 sur même worker
worker.schedule(() -> Observable.range(100, 2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
ProcessUtils.showInfos.accept(String.valueOf(i));
}
}));
}
}));
}
}
- lignes 13-14 : on crée un worker à partir d'un des deux schédulers [Schedulers.immediate] et [Schedulers.trampoline] ;
- ligne 16 : un 1er observable obs1 est programmé sur ce worker pour émettre les nombres [1,2]
- ligne 22 : à chaque fois qu'un élément de cet observable obs1 est observé, l'observation d'un second observable obs2 est lancée sur le même worker pour émettre les nombres [100,101] ;
Avec le schéduler [Schedulers.immediate], on obtient les résultats suivants :
Alors qu'avec le schéduler [Schedulers.trampoline], on obtient les résultats suivants :
7.8. Conclusion
Il reste beaucoup à faire. Pour approfondir la bibliothèque RxJava, le lecteur est invité à continuer sa formation avec les références données au début de ce document. Malgré tout, nous avons les bases pour utiliser RxJava dans les environnements Swing et Andoid. C'est ce que nous allons montrer maintenant.








































