2. An introductory example
My first exposure to RxJava came through courses and tutorials I found online. Aside from the fact that the theory used concepts I wasn’t familiar with and had trouble understanding, I couldn’t really see how it could be useful in real life. We will therefore begin by presenting an example (a simple one, I hope) where the use of RxJava leads to a real simplification of the code, and from there, we will try to identify the important elements of this library.
The RxJava library is based on the following concept: a stream of elements of type T Observable<T> is observed by one or more subscribers (subscribers, observers, consumers) Subscriber<T>. The RxJava library allows the Observable<T> stream to run in a thread T1 and its Subscriber<T> observer in a thread T2 without the developer having to worry about managing the lifecycle of these threads or naturally difficult issues, such as sharing data between threads and synchronizing them to execute a global task. It therefore facilitates asynchronous programming.
An Observable<T> stream produces elements of type T, which are observable as they are produced. If the observer and the observable (a term used loosely to refer to the Observable<T> type) are in the same thread, then the observable can only produce element (i+1) once the observer has consumed element i. There are few cases where this architecture is useful. If the observer and the observable are not in the same thread, then the observable and its observer behave autonomously: the observable emits at its own pace and the observer consumes at its own pace. This is where the library’s value lies. So far, we have only discussed a single observer. In reality, an observable can have any number of observers.
2.1. The architecture of the example application
The example application has the following architecture:

- in [1], a service layer generates lists of random numbers. This layer runs in the same thread as the [swing] method that uses it. It then generates its numbers synchronously;
- in [2], a thin adaptation layer implemented with RxJava allows an asynchronous implementation of the same service to be presented to the [swing] layer: this service can run in a different thread from the [swing] method that uses it;
- Call [4] is synchronous, whereas calls [5-6] are asynchronous;
What we want to demonstrate here is that the Rx library makes it easy to transform a synchronous interface into an asynchronous one. Why is this useful? Events in a Swing interface are processed in a thread commonly referred to as the event loop. Events are queued and processed one after another. Event Ei+1 can only be processed once the previous event Ei has been fully processed. It is therefore important that event handling be as brief as possible so that the GUI remains responsive. Sometimes, handling an event can take a long time. This is the case if the handling involves network access. If we do not want to freeze the GUI in a way that is unacceptable to the user, these network operations must be performed in threads separate from the event loop to free it up. This brings us into the realm of concurrent programming (where multiple threads run in parallel), which is rightly considered difficult. The Rx library provides a simple and elegant solution to this problem.
To simulate long-running processes, the service in the example delivers its random numbers after a certain delay so that we can observe the behavior of the graphical user interface.
2.2. The executable
The executable for the example application is located in the [dvp/executables] folder of the examples:
![]() | ![]() |
There are various ways to run the [swing-01] archive depending on the configuration of the machine used to run it. For example, you can follow the process [1-3]. This will display the following graphical user interface:
![]() |
- The interface has two tabs [1-2]: one [Request] for sending a request to the random number generator service, and the other [Response] for displaying the received numbers;
- In [3], you specify how many requests you want to make to the service;
- In [4], you specify the desired number generation range [a,b];
- In [5], the number of values returned by the service will be a random number within the interval [minCount, maxCount] set by the user;
- in [6], before returning its response, the service will wait delay milliseconds, where delay is a random number within the user-defined interval [minDelay, maxDelay];
- By default, the [swing] layer will use the service’s synchronous interface. To use the asynchronous layer, the user will check [7]. In this case, the generation service will run in threads separate from the GUI’s event loop. The Rx library offers various strategies for generating these threads. The user can select their strategy in [8];
- number generation is performed using button [9];
![]() |
- [10] displays the results. We will explain their structure;
- in [11], the number of results obtained;
- in [12], the execution time in milliseconds;
- in [13], the user has the option to cancel the execution;
Each result has the following format:
{"idClient":0,"serviceResponse":{"delay":412,"aleas":[146,115,128,174,159,112,162,127],"executedOn":"RxComputationThreadPool-6"},"observedOn":"AWT-EventQueue-0","requestAt":"02:42:47:708","responseAt":"02:42:52:931"}
- [idClient]: the request ID. Note that multiple requests are sent to the generation service;
- [delay]: the wait time in milliseconds that the service observed before sending its result;
- [aleas]: the random numbers returned by the service;
- [executedOn]: the name of the thread in which the service ran;
- [observedOn]: the name of the thread that displayed the result. With a Swing interface, this can only be the event loop thread, here [AWT-EventQueue-0];
- [requestAt]: the time of the request in the format [hours:minutes:seconds:milliseconds];
- [responseAt]: the time the results were received in the same format;
We will now present the code snippets necessary for understanding the example.
2.3. The synchronous interface

The service layer [1] presents the following interface:
public interface IService {
// random numbers in [a,b]
// n numbers are generated, each with a random value in the interval [minCount, maxCount]
// the numbers are generated after a delay of delay milliseconds,
// where [delay] is a random number in the interval [minDelay, maxDelay]
public ServiceResponse getRandom(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay);
}
The [ServiceResponse] is as follows:
public class ServiceResponse {
// service wait time
private int delay;
// random numbers
private List<Integer> aleas;
// execution thread
private String executedOn;
// constructors
public ServiceResponse(int delay, List<Integer> aleas) {
executedOn = Thread.currentThread().getName();
this.delay = delay;
this.random = random;
}
// getters and setters
...
}
The answer has three parts:
- line 6: the random numbers generated;
- line 4: the wait time observed by the service before returning its result;
- line 8: the service's execution thread;
2.4. The synchronous call

We will now detail the synchronous call [4] made by the [swing] layer to the [1] service:
private void doGenerateWithService() {
// start waiting
beginWaiting();
try {
for (int i = 0; i < nbRequests; i++) {
UiResponse uiResponse = new UiResponse();
uiResponse.setIdClient(i);
uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
uiResponse.setResponseAt();
model.add(0, jsonMapper.writeValueAsString(uiResponse));
jLabelNbReponses.setText(String.valueOf(Integer.parseInt(jLabelNbReponses.getText()) + 1));
}
} catch (JsonProcessingException | RuntimeException e) {
System.out.println(e);
}
// end waiting
endWaiting();
}
- lines 5–12: the loop that processes the [nbRequests] requests made by the user;
- line 8: [service] is the implementation of the synchronous [IService] interface presented in Section 2.3;
- line 10: [model] is the model displayed by the JList component of the [Response] tab. The elements of this model are JSON strings of elements of type [UiResponse] as follows:
public class UiResponse {
// client ID
private int clientId;
// service response
private ServiceResponse serviceResponse;
// name of the observation thread
private String observedOn;
// request time
private String requestAt;
// response time
private String responseAt;
// constructors
public UiResponse() {
observedOn = Thread.currentThread().getName();
requestAt = getTimeStamp();
}
// private methods
private String getTimeStamp() {
return new SimpleDateFormat("hh:mm:ss:SSS").format(Calendar.getInstance().getTime());
}
// getters and setters
...
}
- line 6: the response from the number generation service;
- line 4: the number of the request being responded to;
- line 8: the thread displaying this response. As mentioned, this will always be the event loop thread;
- lines 10 and 12: the time of the request and the time of the response;
2.5. Testing synchronous calls
We run the following configuration:
![]() |
We obtain the following results in the [Response] tab:
![]() |
- In [1-2], we indeed received 10 responses as requested. They were inserted in the first position in the order they arrived. We can see that they were received in the order of the requests;
- They were all executed and displayed in the event loop thread [AWT-EventQueue-0]. The requests were therefore executed one after the other in this thread. There were no concurrent requests;
- what is not visible here is that during execution, the GUI is frozen. For example, there is no way to access the [Response] tab to view incoming responses or to stop execution using the [Cancel] button. Even if this button had been present on the [Request] tab, it would have been unusable. In fact, there would then be two events:
- clicking the [Generate] button;
- clicking the [Cancel] button;
The click on the [Cancel] button is only handled after the operation triggered by the click on the [Generate] button has finished. We have just seen that this operation occupied the event loop thread for the entire duration of execution, thereby preventing the handling of the click on the [Cancel] button. This is typically the kind of situation where Rx can provide a significant improvement;
2.6. The asynchronous interface and its implementation
We will now look at the interface of layer [2] and its implementation with Rx. This will not be immediately clear. We simply want to highlight the simplicity of the code in this implementation.

The asynchronous interface is as follows:
public interface IRxService {
// random numbers in [a,b]
// n numbers are generated with n random numbers in the interval [minCount, maxCount]
// the numbers are generated after a delay of delay milliseconds,
// where [delay] is a random number in the interval [minDelay, maxDelay]
public Observable<UiResponse> getRandom(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse);
}
The differences from the synchronous interface presented in Section 2.3 are as follows:
- the [UiResponse] class presented in Section 2.3 is now part of the parameters of the [getAleas] method (line 6). The reason for this is that, because requests now run in parallel and the service waits a random amount of time before returning its result, the responses will not come back to us in the order of the requests. We therefore pass the [UiResponse] object, which contains, among other information, the request ID:
// client ID (request)
private int clientId;
// service response
private ServiceResponse serviceResponse;
// name of the observation thread
private String observedOn;
// request time
private String requestAt;
// response time
private String responseAt;
- The response type of the asynchronous service is of type [Observable<UiResponse>]. The [Observable<>] type is provided by the Rx library. The [Observable<UiResponse>] result indicates that the [getAleas] method provides a stream of values of type [UiResponse], which are pushed one by one to their observer;
Now let’s look at the implementation of this interface:
public class RxService implements IRxService {
// service
private IService service;
// constructor
public RxService(IService service) {
this.service = service;
}
@Override
public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse) {
return Observable.create(subscriber -> {
try {
uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
subscriber.onNext(uiResponse);
} catch (Exception e) {
subscriber.onError(e);
} finally {
subscriber.onCompleted();
}
});
}
}
- lines 7–9: we provide the constructor with a reference to the synchronous interface [IService]. This interface will handle the generation of random numbers;
- the observable returned by the [getAleas] method is constructed by the static method [Observable.create]. This method allows us to build an asynchronous implementation from a synchronous one;
- line 13: the parameter of the static method [Observable.create] is here a lambda function that takes a [Subscriber] type as a parameter, again an Rx type. A [Subscriber] is an object that subscribes to a stream of observables, i.e., a stream of data delivered asynchronously. Here, we use three methods of this subscriber:
- [Subscriber.onNext] to pass data to it (line 16);
- [Subscriber.onError] to send it an exception (line 18);
- [Subscriber.onCompleted] to indicate to the subscriber that the data stream has ended (line 20);
There can be multiple subscribers to the same observable. Here, we will have only one subscriber subscribing to a stream of a single piece of data, the one produced in lines 15–16. The data is produced by the service’s synchronous implementation (line 15) and returned to the subscriber (line 16).
Even if all of this remains somewhat obscure, one cannot help but be struck by the extreme conciseness of this asynchronous implementation of the service.
2.7. The Asynchronous Call

We will now examine the synchronous call [5] made by the [swing] layer to the [2] service:
private void doGenerateWithRxService() {
// start waiting
beginWaiting();
// request random numbers
Observable<UiResponse> observables = Observable.empty();
for (int i = 0; i < nbRequests; i++) {
UiResponse uiResponse = new UiResponse();
uiResponse.setIdClient(i);
// scheduler
int schedulerIndex = jComboBoxSchedulers.getSelectedIndex();
switch (schedulerIndex) {
case 0:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.io()));
break;
...
}
}
...
}
- lines 6–10: execution of the [nbRequests] requests requested by the user;
- lines 7-8: preparation of the [UiResponse] object required by the [getAleas] method of the asynchronous service (line 13). This mainly involves recording the [idClient] of the request;
- line 13: the [getAleas] method of the asynchronous service is called. It returns an [Observable<UiResponse>] object. This call does not yet invoke the synchronous service. Let’s return to the code for the asynchronous [getAleas]:
@Override
public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse) {
return Observable.create(subscriber -> {
try {
uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
subscriber.onNext(uiResponse);
} catch (Exception e) {
subscriber.onError(e);
} finally {
subscriber.onCompleted();
}
});
}
The code in lines 4–11, which calls the synchronous service, is executed only when a subscriber registers. As long as there are no subscribers, this code is not executed.
Let’s return to the code for the [doGenerateWithRxService] method:
- line 5: we create an empty observable (nothing is observed);
- line 13: we create an observable whose stream will be the merge of the [nbRequests] asynchronous streams associated with the [nbRequests] requests. This is achieved using the [Observable.mergeWith] method, which allows two asynchronous streams to be merged. In Rx terminology, [mergeWith] is called a stream operator. These operators have the characteristic that the result of the operation is, in most cases, another [Observable]. Ultimately, after line 17, the variable [observables] refers to a single stream consisting of the [nbRequests] asynchronous responses made by the asynchronous service;
- line 13: the merge operation could have been written as:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse));
but we wrote:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.io()));
Here, we used the [subscribeOn] operator on the observable [rxService.getAleas]. As is often the case, the result is again an observable. The [subscribeOn] operator specifies that the observable must be executed in a thread provided by a [Scheduler]. There are several possible [Schedulers] suited to different situations. In the GUI, we’ve provided several options to see how they differ:
![]() |
This results in the following code:
private void doGenerateWithRxService() {
// start waiting
beginWaiting();
// request random numbers
Observable<UiResponse> observables = Observable.empty();
for (int i = 0; i < nbRequests; i++) {
UiResponse uiResponse = new UiResponse();
uiResponse.setIdClient(i);
// scheduler
int schedulerIndex = jComboBoxSchedulers.getSelectedIndex();
switch (schedulerIndex) {
case 0:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.io()));
break;
case 1:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.computation()));
break;
case 2:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.newThread()));
break;
case 3:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.trampoline()));
break;
case 4:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.immediate()));
break;
}
}
...
}
Let’s revisit the code in lines 12–14. The scheduler [Schedulers.io()] assigns a new thread to each observable. If we follow the code:
- line 5: we have an empty observable;
- line 13, iteration 1: observables is the list [observable0/thread0] (Observable observable0 running on thread thread0);
- line 13, iteration 2: observables is the list [observable0/thread0, observable1/thread1];
- etc...
Ultimately, after line 28, we have an observable resulting from the merger of [nbRequests] observables running on [nbRequests] different threads. Not all schedulers work this way, as we will see during testing.
Let’s continue examining the code for calling the asynchronous service:
private void doGenerateWithRxService() {
// start waiting
beginWaiting();
// request random numbers
Observable<UiResponse> observables = Observable.empty();
for (int i = 0; i < nbRequests; i++) {
...
}
// observer
observables = observables.observeOn(SwingScheduler.getInstance());
// execute these observables
subscriptions.add(observables.subscribe(uiResponse -> {
updateUi(uiResponse);
} , th -> {
System.out.println(th);
doCancel();
} , this::doCancel));
}
- We have seen that when we reach line 10, we have a single observable, a merge of [nbRequests] observables that may or may not run on [nbRequests] different threads, depending on the scheduler chosen by the user;
- Line 10: The [observeOn] operator allows us to specify on which thread we want to retrieve the data from the observable, in this case the [nbRequests] objects of type [UiResponse]. In a Swing interface, we have no choice. Any update to the interface must be performed in the event loop thread. Here, the observable’s data will be displayed in a Swing JList component. The [SwingScheduler.getInstance()] thread represents the event loop thread. The [SwingScheduler] class does not come from the RxJava library but from the RxSwing library;
- when we reach line 12, the synchronous service has still not been called because the observable on line 10 does not yet have a subscriber. Lines 12–17 provide one, using the [subscribe] operator. The parameters of this operator are three lambda functions:
- the first [uiResponse -> {updateUi(uiResponse);}] takes as a parameter one of the [UiResponse] objects produced by the observable. Recall that here, we will have [nbRequests] objects of this type. The associated method, updateUi in this case, must process this result;
- the second [th -> {System.out.println(th);doCancel();}] takes a [Throwable] type as a parameter, in this case an exception that occurred during the observable’s execution. The associated method must process this information. Here, we display it on the console (line 15) and cancel the execution, which will update certain elements of the GUI;
- the third [this::doCancel] is called when the observable signals that it has no more data to transmit. Here, the observable is the union of [nbRequests] observables. The resulting observable will indicate that it has finished when all the observables composing it have themselves signaled that they have finished their work. So when this third lambda function is executed, we have received all the data. The local method [doCancel] updates the GUI to reflect that the execution is complete;
The [subscriptions] variable is defined as follows:
// subscriptions to observables
protected List<Subscription> subscriptions = new ArrayList<Subscription>();
The [Subscription] type represents a subscription, i.e., the link between a subscriber [Subscriber] and what they are observing [Observable]. We have used a list of subscriptions here, although in this example there is only one. The local method [doCancel], executed when the observable signals that it has no more data to transmit, is as follows:
@Override
protected void doCancel() {
// End waiting
endWaiting();
// if there are subscriptions
if (jCheckBoxRxSwing.isSelected() && subscriptions != null) {
subscriptions.forEach(Subscription::unsubscribe);
}
}
- Line 7 unsubscribes all subscribers from the observable;
From this brief explanation, we can take away the following key points:
- the type [Observable] denotes a stream of values, which are pushed one by one to subscribers or observers;
- the [Subscriber] type denotes a subscriber of the [Observable] type;
- the [Subscription] type denotes a subscription, i.e., the link between a [Subscriber] and an [Observable];
- the [Observable] type supports operators [mergeWith, empty, subscribeOn, observeOn, ...], most of which produce observables. These operators are used to configure the observable before it runs:
- what to observe;
- the thread on which the observable runs;
- the thread on which the subscriber receives data from the observable;
- There are two types of observables: [cold] and [hot]. A cold observable is fully executed for each new subscriber. If each execution produces the same data, each new subscriber receives the same data as the previous one. A hot observable generally produces data continuously. When a subscriber subscribes, they receive the data emitted from the time of their subscription. They do not receive data that may have been emitted previously. In our example, the observable is cold: it is fully re-executed for each new subscriber. What is actually executed in our example? To find out, we need to go back to the definition of the observed observable:
@Override
public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse) {
return Observable.create(subscriber -> {
try {
uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
subscriber.onNext(uiResponse);
} catch (Exception e) {
subscriber.onError(e);
} finally {
subscriber.onCompleted();
}
});
}
For each new subscriber, the lambda function, which is a parameter of the [Observable.create] method (line 3), is re-executed. Therefore, lines 4–11 are executed for each new subscriber [subscriber];
2.8. Testing Asynchronous Calls
We start by demonstrating the effect of the different schedulers available. To do this, we use the following parameters:
![]() |
We set [1-2] to small values so that even if the requests are executed on the same thread, we still don’t have to wait too long.
2.8.1. with the [Schedulers.io] scheduler
![]() |
The following points can be noted:
- the responses are received in an order that does not match the order of the requests (see idClient);
- each request ran in a different thread;
- the GUI is no longer frozen this time:
- you can switch between tabs;
- we see the data coming in;
- there is no time to see the [Cancel] button because execution is too fast. We will highlight this in another test;
2.8.2. with the [Schedulers.computation] scheduler
![]() |
The following points can be noted:
- the responses are received in an order that does not match the order of the requests (see idClient);
- the requests were executed in 8 threads;
- thread #3 was used for requests 8 and 0;
- thread #4 was used for requests 9 and 1;
- the other requests each had a different thread;
The scheduler [Schedulers.computation] uses as many threads as there are cores on the machine being used. This information is obtained via the expression [Runtime.getRuntime().availableProcessors()].
2.8.3. with the [Schedulers.newThread] scheduler
![]() |
The behavior is similar to that of the [Schedulers.io] scheduler.
2.8.4. with the schedulers [Schedulers.trampoline, Schedulers.immediate]
![]() |
The behavior is synchronous. All requests are executed on the event loop thread. This result should not be generalized; rather, it simply means that in this specific example, both schedulers operated synchronously.
2.9. Edge cases
In this example, we will work with schedulers that support asynchronous operation. First, we increase the number of requests to 100 using the [Schedulers.computation] scheduler, which runs on 8 threads here. We obtain the following result:
![]() |
- in [1], the [Cancel] button is present and usable (asynchronous operation);
Now, let’s let the execution run to completion:
![]() |
We see in [2] that executing the 100 requests took about 4 seconds (across 8 threads).
Now, let’s run these same 100 requests using the [Schedulers.newThread] scheduler, which executes each request on a separate thread:
![]() |
In [1], we see that executing the 100 requests (across 100 threads) took half a second. This is therefore significantly faster than with the [Schedulers.computation] scheduler.
Now, let’s make 800 requests under the same conditions, still using the [Schedulers.newThread] scheduler. We get the following results:
![]() |
The 800 requests are executed in about 1 second.
When we increase this number (beyond 2,500 requests on my machine—executed in 1.5 seconds—this number is, of course, highly dependent on the runtime environment), we eventually get the following exception:
![]() |
We therefore have a stack overflow. Tests show that the behavior of the [Schedulers.newThread] scheduler is not deterministic. You may encounter the previous exception, run new tests, then return to the configuration that caused the exception and no longer encounter it.
2.10. Conclusion
We have demonstrated an example of using the Rx library. Let’s summarize what we’ve learned:
We started with the following architecture:

- in [4], the [swing] layer made synchronous calls to the [service] layer;
- in [5], the [swing] layer made asynchronous calls to the [rxService] layer, which in turn made a synchronous call [6] to the [service] layer;
The first thing we saw was that the Rx library made it easy to create the asynchronous [rxService] interface from the synchronous [service] interface (see Section 2.4). This is an important lesson because it means that we can easily evolve a synchronous application into an asynchronous one.
In the [swing] layer, two separate methods were written:
- one to make synchronous calls to the service (see section 2.4);
- the other to make asynchronous calls to it (see section 2.7);
Writing asynchronous calls has proven to be significantly more complex than writing synchronous calls. Nevertheless, those who have worked with concurrent programming involving multiple threads that need to be synchronized will find that the Rx solution is simpler to write and avoids all the difficult problems of synchronization and inter-thread communication. In this article, we have highlighted the following key points:
- the [Observable] type denotes a stream of events (values) that may (but need not) be asynchronous and that can be observed;
- the [Subscriber] type denotes a subscriber to an [Observable] type;
- the [Subscription] type denotes a subscription, i.e., the link between a [Subscriber] and an [Observable];
- the [Observable] type supports operators [mergeWith, empty, subscribeOn, observeOn, ...] that mostly produce observables. These operators are used to configure the observable before it runs:
- what to observe;
- the thread on which the observable runs;
- the thread on which the subscriber receives data from the observable;
- there are two types of observables: [cold] and [hot]. A cold observable is fully executed for each new subscriber. If each execution produces the same data, each new subscriber receives the same data as the previous one. A hot observable generally produces data continuously. When a subscriber subscribes, they receive the data emitted from the time of their subscription. They do not receive any data that may have been emitted previously. In our example, the observable is cold: it is fully re-executed for each new subscriber.
Now that we’ve seen an example demonstrating the value of the Rx library, we’ll explore it in more detail.
The Rx library has many methods with generic parameters in their signatures. We’ll briefly review these signatures (section 3). The parameters of these methods are mostly functional interfaces (Java 8), i.e., interfaces with only a single method. The actual parameters must therefore be instances of these interfaces. Before Java 8, it was common practice to implement an interface using an anonymous class. With Java 8, if the interface is a functional interface, it is more concise to implement it using a lambda function. We will therefore introduce these (Section 4). Once this has been done, we will introduce the [Stream] class (Section 5), which allows you to process Java collections using lambda functions. This class is interesting because RxJava’s [Observable] class borrows:
- certain methods;
- the same way of chaining methods together to process the same observable;
We will then introduce the functional interfaces specific to the RxJava library (section 6). We will continue with the main elements of the Rx library [Observable, Subscriber, Subscription, operators] (section 7). The [Observable] class has dozens of operators that are themselves overloaded multiple times. This initially creates significant complexity because these operators and their overloads sometimes differ by only a single detail, and it is difficult, without experience, to know which operator to use. We will present only a limited number of operators, and most of the time we will ignore their overloads.
The entire previous section will be covered using the RxJava library in simple console applications. Once we have mastered the RxJava library, we will use it in two types of graphical applications:
- in Section 8, we will revisit the example Swing application to explore it in greater detail. We will then use the RxSwing library;
- in Section 9, we will create an Android application using the RxAndroid library;
Once all this is done, the reader will have the tools to stand on their own two feet. It will likely take some time before they can use the Rx library intuitively. I found this library particularly interesting. However, I found it complex to understand, and the learning curve was steep. I hope this document will shorten that learning curve for the reader. It seems to me that it’s well worth the effort.
















