20. Asynchronous Programming with RxJava
Recommended reading: [Introduction to RxJava. Application to Swing and Android environments.]
In this chapter, we revisit Chapter 17.6, where we built a client/server application with the following architecture:
![]() |
Certain user actions on the Swing interface in [1] trigger actions all the way to the database in [3] via an HTTP network [2]. Because of this, the response to the user’s action may take more or less time to arrive. It would be helpful to include a loading indicator on the user interface with an option to cancel the initiated operation if it takes too long. In Chapter 17.6, every user action requiring data exchange with the server is synchronous. The event handler executed by the code does not complete until the response is received. During this time, the graphical interface is frozen: it does not respond to new user actions. These are simply queued to be processed once the currently running event handler has finished. Thus, if a cancel button were displayed, the user could click it, but nothing would happen until the current operation was finished. The cancel button would then serve no purpose.
For the click on the cancel button to take effect, the current operation must be completed. To achieve this, it must launch the potentially long-running operation asynchronously:
- the event handler initiates the long-running operation but does not wait for its result and returns control to the UI thread that handles the graphical interface events. The long-running operation is launched on a thread different from the UI thread, which prevents the latter from being blocked;
- If the user clicks the cancel button before the long-running operation completes, the idle UI thread can handle this event. The long-running operation can then be abandoned by ignoring its result;
- If the long-running operation has not been canceled, the arrival of the response will trigger an event in the UI thread. If the UI thread is idle, it will then execute the code associated with this event, which will process the response;
The user interface will function as before. If the server’s response times are fast, the user will not notice a difference. If they are noticeable, the user will see a cancel button appear and will have the option to interrupt the current operation.
The [Rx] library enables asynchronous programming. Its major advantage lies in the fact that it has been ported to numerous environments (Java, .NET, JS, etc.) and that proficiency in one environment can be easily transferred to another. Here, we will refer to Chapter 2 of the document [Introduction to RxJava. Application to Swing and Android Environments]. The reader is encouraged to read it. In the following sections, we will use code from the examples in this chapter.
We will evolve the application’s architecture as follows:
![]() |
- In [1], we insert an [RxJava] layer between the [Swing] layer and the [business logic] layer. The methods in the latter will now be called asynchronously;
We will proceed in several steps:
- Step 1: The [business logic, DAO] layer currently presents a synchronous interface to the [UI] layer. We will transform it into an asynchronous [RxJava, business logic, DAO] layer;
- Step 2: We will convert the synchronous console application into an application that remains synchronous but uses the asynchronous interface [RxJava, business, DAO];
- Step 3: We will convert the synchronous Swing application into an asynchronous Swing application;
20.1. Step 1
We are converting the current synchronous layer [business logic, DAO] into an asynchronous layer [RxJava, business logic, DAO].
20.1.1. Creation
We start with the Maven project from Chapter 17.4, which we open in NetBeans:
![]() | ![]() |
We duplicate this project [1] (copy/paste) into a new project [elections-rxjava-business-dao-security-webjson] [2].
20.1.2. Maven Configuration
We update the [pom.xml] file of the new project to add the dependency on the [RxJava] library:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>istia.st.elections</groupId>
<artifactId>elections-metier-dao-security-rxjava-webjson</artifactId>
<version>0.0.1-SNAPSHOT</version>
<description>jUnit client for the web server / JSON</description>
<name>elections-metier-dao-security-rxjava-webjson</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.2.7.RELEASE</version>
</parent>
<dependencies>
<!-- Spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
</dependency>
<!-- JSON library used by Spring -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- component used by Spring RestTemplate -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<!-- Google Guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>16.0.1</version>
<scope>test</scope>
</dependency>
<!-- logging library -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<!-- Spring Boot Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- Spring Boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/io.reactivex/rxjava -->
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>1.2.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
</plugin>
</plugins>
</build>
</project>
- lines 65–70: we added the dependency on the RxJava library;
20.1.3. Asynchronous implementation of the [business] layer
![]() |
To implement the [RxJava, business] layer, we add an asynchronous interface [IRxElectionsMetier] [1] and its implementation [RxElectionsMetier] [2] to the project:
![]() |
The [IRxElectionsMetier] interface is the asynchronous interface for the [RxJava, business] layer. Its code is as follows:
package elections.security.client.business;
import elections.security.client.entities.VoterList;
import elections.security.client.entities.User;
import rx.Observable;
public interface IRxElectionsMetier {
// authentication
Observable<Void> authenticate(User user);
// get the candidate lists
Observable<VoterList[]> getVoterLists(User user);
// the number of seats to be filled
Observable<Integer> getNbSeatsToBeFilled(User user);
// the electoral threshold
Observable<Double> getVotingThreshold(User user);
// recording the results
Observable<Void> recordResults(User user, VoterLists[] voterLists);
// seat calculation
Observable<VoterList[]> calculateSeats(User user, VoterList[] voterLists);
}
The [IRxElectionsMetier] interface inherits the methods from the [IElectionsMetier] interface, but where a method M in the [IElectionsMetier] interface returned a result of type T, the method M in the [IRxElectionsMetier] interface returns a result of type Observable<T>. The [Observable] type is provided by the RxJava library. An Observable<T> type provides the [subscribe] method, which retrieves the type T asynchronously. Three events are associated with this method:
- onSuccess(T result), which notifies that a result of type T is available. The asynchronous operation may return multiple results;
- onError(Throwable th), which notifies that the asynchronous operation encountered an error;
- onCompleted(), which notifies that the asynchronous operation has completed;
Until the [Observable.subscribe] method is called, the asynchronous operation associated with the observable is not initiated. The code that calls a method M of the [IRxElectionsMetier] interface does not obtain the expected result T, but an Observable<T> type that will later allow it to obtain the result T by calling the [Observable.subscribe] method.
The [RxElectionsMetier] implementation of the [IRxElectionsMetier] interface is as follows:
package elections.security.client.metier;
import elections.security.client.entities.VoterList;
import elections.security.client.entities.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import rx.Observable;
@Component
public class RxElectionsMetier implements IRxElectionsMetier {
@Autowired
private IElectionsMetier business;
@Override
public Observable<Void> authenticate(User user) {
...
}
@Override
public Observable<VoterList[]> getVoterLists(User user) {
return Observable.create(subscriber -> {
try {
// call synchronous method then respond to subscriber
subscriber.onNext(business.getVoterLists(user));
// signal the end of the observable
subscriber.onCompleted();
} catch (Exception e) {
// propagate the exception
subscriber.onError(e);
}
});
}
@Override
public Observable<Integer> getNbSeatsAvailable(User user) {
...
}
@Override
public Observable<Double> getVotingThreshold(User user) {
...
}
@Override
public Observable<Void> recordResults(User user, VoterLists[] voterLists) {
...
}
@Override
public Observable<VoterList[]> calculateSeats(User user, VoterList[] voterLists) {
...
}
}
- lines 12-13: Spring injection of the synchronous business layer;
- lines 20-34: we will comment on the [getVoterLists] method, which instead of returning a [VoterList[]] type returns an [Observable<VoterList[]>] type;
- lines 22-32: the static method [Observable.create] allows you to create an Observable from a [Subscriber] type. The [Subscriber] type represents a subscriber to the streams of results produced by the observed process (the Observable). It provides three methods:
- [Subscriber.onNext] (line 25) to receive a result from the observed process;
- [Subscriber.onError] (line 30) to receive an exception from the observed process. After an exception, the [Observable] type no longer emits results;
- [Subscriber.onCompleted] (line 27) to receive the end-of-emission signal from the observed process. Here, the observed process emits only one element. Note that this signal is not emitted if an exception occurs. This is the default behavior of Observables: the emission of an exception also signals the end of emissions. Subscribers are aware of this;
- lines 22–34: the [Observable.create] method takes an [Observable.OnSubscribe] type as a parameter. This type is a functional interface. This concept was introduced with Java 8 and refers to an interface with a single method. Here, the single method of the [Observable.OnSubscribe] interface is as follows:
T call(Subscriber<T> subscriber)
To implement a single-method functional interface m(param1, param2, ..., paramn), you can use the following simplified syntax:
This is what is done in lines 22–34:
- [subscriber] is the parameter of the [Observable.OnSubscribe.call] method;
- lines 23–32: the code we want to provide to the [call] method;
- line 25: we request the voter lists synchronously from the [business] layer injected on line 13. We will therefore have to wait for the result. When it is received, it is passed to the subscriber’s [onNext] method;
- line 28: in case of an error, the exception is passed to the subscriber’s [onError] method;
- line 31: We wait for a single result. Once it is obtained (either the voter lists or an exception), we notify the subscriber that the observed process has finished emitting results;
It is important to remember that the [RxElectionsMetier] method returns a type Observable<VoterList[]> and not the type VoterList[] itself. The calling code must call the method Observable<VoterList[]>.subscribe for the code in lines 23–33 to be executed and return the voter lists via line 25.
The code for the other methods is similar:
package elections.security.client.metier;
import elections.security.client.entities.VoterList;
import elections.security.client.entities.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import rx.Observable;
@Component
public class RxElectionsMetier implements IRxElectionsMetier {
@Autowired
private IElectionsMetier business;
@Override
public Observable<Void> authenticate(User user) {
return Observable.create(subscriber -> {
try {
// call synchronous method
business.authenticate(user);
// signal the end of the observable
subscriber.onCompleted();
} catch (Exception e) {
// propagate the exception
subscriber.onError(e);
}
});
}
@Override
public Observable<VoterList[]> getVoterLists(User user) {
return Observable.create(subscriber -> {
try {
// call synchronous method then respond to subscriber
subscriber.onNext(business.getVoterLists(user));
// signal the end of the observable
subscriber.onCompleted();
} catch (Exception e) {
// propagate the exception
subscriber.onError(e);
}
});
}
@Override
public Observable<Integer> getNbSeatsAvailable(User user) {
return Observable.create(subscriber -> {
try {
// call synchronous method then respond to subscriber
subscriber.onNext(businessLogic.getNbSiegesAPourvoir(user));
// signal the end of the observable
subscriber.onCompleted();
} catch (Exception e) {
// propagate the exception
subscriber.onError(e);
}
});
}
@Override
public Observable<Double> getVotingThreshold(User user) {
return Observable.create(subscriber -> {
try {
// call synchronous method then respond to subscriber
subscriber.onNext(business.getVotingThreshold(user));
// signal the end of the observable
subscriber.onCompleted();
} catch (Exception e) {
// propagate the exception
subscriber.onError(e);
}
});
}
@Override
public Observable<Void> recordResults(User user, VoterLists[] voterLists) {
return Observable.create(subscriber -> {
try {
// call synchronous method
businessLogic.recordResults(user, voterLists);
// signal the end of the observable
subscriber.onCompleted();
} catch (Exception e) {
// propagate the exception
subscriber.onError(e);
}
});
}
@Override
public Observable<VoterList[]> calculateSeats(User user, VoterList[] voterLists) {
return Observable.create(subscriber -> {
try {
// call synchronous method then respond to subscriber
subscriber.onNext(businessLogic.calculateSeats(user, electoralLists));
// signal the end of the observable
subscriber.onCompleted();
} catch (Exception e) {
// propagate the exception
subscriber.onError(e);
}
});
}
}
- lines 20 and 81: the subscriber's [onNext] method is not called because the subscriber does not expect any results;
20.1.4. JUnit tests for the [business] layer
![]() |
20.1.4.1. Test01
We revisit the unit test [Test01] discussed in Section 17.4.4. It was designed to make synchronous calls to the [IElectionsMetier] interface. We modify it so that it makes synchronous calls to the new [IRxElectionsMetier] interface. It is indeed possible to make synchronous calls to an asynchronous RxJava interface. The code becomes as follows:
package elections.security.client.metier.junit;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import elections.security.client.config.MetierConfig;
import elections.security.client.entities.ElectionsException;
import elections.security.client.entities.VoterList;
import elections.security.client.entities.User;
import elections.security.client.business.IRxElectionsBusiness;
import rx.observables.BlockingObservable;
@SpringApplicationConfiguration(classes = MetierConfig.class)
@RunWith(SpringJUnit4ClassRunner.class)
public class Test01 {
// [electionsMetier] layer
@Autowired
private IRxElectionsMetier electionsMetier;
// JSON mapper
private final ObjectMapper mapper = new ObjectMapper();
// users
static private User admin;
static private User user;
static private User unknown;
@BeforeClass
public static void initTest() {
admin = new User("admin", "admin");
user = new User("user", "user");
unknown = new User("x", "y");
}
@Test()
public void checkUserUser() {
ElectionsException se = null;
try {
BlockingObservable.from(electionsMetier.authenticate(user)).firstOrDefault(null);
} catch (ElectionsException e) {
se = e;
}
Assert.assertNotNull(se);
Assert.assertEquals("403 Forbidden", se.getErrors().get(0));
}
@Test()
public void checkUserUnknown() {
ElectionsException se = null;
try {
BlockingObservable.from(electionsMetier.authenticate(unknown)).firstOrDefault(null);
} catch (ElectionsException e) {
se = e;
}
Assert.assertNotNull(se);
Assert.assertEquals("401 Unauthorized", se.getErrors().get(0));
}
@Test()
public void checkUserAdmin() {
ElectionsException se = null;
try {
BlockingObservable.from(electionsMetier.authenticate(admin)).firstOrDefault(null);
} catch (ElectionsException e) {
se = e;
}
Assert.assertNull(se);
}
/**
* check 1: seat allocation method—the lists are hard-coded
*/
@Test
public void calculateSeats1() {
// Create an array of the 7 candidate lists
ElectionList[] lists = new ElectionList[7];
lists[0] = new ElectoralList("A", 32000, 0, false);
lists[1] = new ElectoralList("B", 25000, 0, false);
lists[2] = new ElectoralList("C", 16000, 0, false);
lists[3] = new VoterList("D", 12000, 0, false);
lists[4] = new VoterList("E", 8000, 0, false);
lists[5] = new VoterList("F", 4500, 0, false);
lists[6] = new VoterList("G", 2500, 0, false);
// calculate the seats for each list
lists = BlockingObservable.from(electionsMetier.calculateSeats(admin, lists)).first();
// verify the results
Assert.assertEquals(2, lists[0].getSeats());
Assert.assertFalse(lists[0].isEliminated());
Assert.assertEquals(2, lists[1].getSeats());
Assert.assertFalse(lists[1].isEliminated());
Assert.assertEquals(1, lists[2].getSeats());
Assert.assertFalse(lists[2].isEliminated());
Assert.assertEquals(1, lists[3].getSeats());
Assert.assertFalse(lists[3].isEliminated());
Assert.assertEquals(0, lists[4].getSeats());
Assert.assertFalse(lists[4].isEliminated());
Assert.assertEquals(0, lists[5].getSeats());
Assert.assertTrue(lists[5].isEliminated());
Assert.assertEquals(0, lists[6].getSeats());
Assert.assertTrue(lists[6].isEliminated());
}
/**
* Check 2: Seat calculation method—we request the lists from the [business] layer, then hard-code the
* votes
*/
@Test
public void calculateSeats2() {
// create an array of the 7 candidate lists
VoterLists[] lists = BlockingObservable.from(electionsBusiness.getVoterLists(admin)).first();
// hard-code the votes
lists[0].setVotes(32000);
lists[1].setVotes(25000);
lists[2].setVotes(16000);
lists[3].setVotes(12000);
lists[4].setSamplingRate(8000);
lists[5].setVoice(4500);
lists[6].setVotes(2500);
// Calculate the number of seats won by each list
lists = BlockingObservable.from(electionsMetier.calculateSeats(admin, lists)).first();
// verify the results
Assert.assertEquals(2, lists[0].getSeats());
Assert.assertFalse(lists[0].isEliminated());
Assert.assertEquals(2, lists[1].getSeats());
Assert.assertFalse(lists[1].isEliminated());
Assert.assertEquals(1, lists[2].getSeats());
Assert.assertFalse(lists[2].isEliminated());
Assert.assertEquals(1, lists[3].getSeats());
Assert.assertFalse(lists[3].isEliminated());
Assert.assertEquals(0, lists[4].getSeats());
Assert.assertFalse(lists[4].isEliminated());
Assert.assertEquals(0, lists[5].getSeats());
Assert.assertTrue(lists[5].isEliminated());
Assert.assertEquals(0, lists[6].getSeats());
Assert.assertTrue(lists[6].isEliminated());
}
/**
* Verification 3: seat calculation method throws an exception
*/
@Test(expected = ElectionsException.class)
public void calculateSeats3() {
// Create an array of 24 candidate lists, each with 1 vote
VoterList[] lists = new VoterList[25];
// all 25 lists will have the same number of votes (4%)
for (int i = 0; i < listes.length; i++) {
lists[i] = new ElectoralList("List" + (i + 1), 1, 0, false);
}
// Calculate seats - normally we should get an ElectionsException
// with a 5% electoral threshold
BlockingObservable.from(electionsMetier.calculateSeats(admin, lists)).first();
}
/**
* recording election results
*
* @throws JsonProcessingException
*/
@Test
public void writeElectionResults() throws JsonProcessingException {
// Create an array of the 7 candidate lists
ElectionLists[] lists = BlockingObservable.from(electionsBusiness.getElectionLists(admin)).first();
// hard-code the votes
lists[0].setVotes(32000);
lists[1].setVotes(25000);
lists[2].setVotes(16000);
lists[3].setVotes(12000);
lists[4].setSamplingRate(8000);
lists[5].setVoice(4500);
lists[6].setVotes(2500);
// Calculate the number of seats won by each list
lists = BlockingObservable.from(electionsMetier.calculateSeats(admin, lists)).first();
// display the results
for (int i = 0; i < lists.length; i++) {
System.out.println(mapper.writeValueAsString(lists[i]));
}
// save the results to the database
BlockingObservable.from(electionsMetier.recordResultats(admin, listes)).firstOrDefault(null);
// check the results
listes = BlockingObservable.from(electionsMetier.getListesElectorales(admin)).first();
// display the results
for (int i = 0; i < lists.length; i++) {
System.out.println(mapper.writeValueAsString(lists[i]));
}
Assert.assertEquals(2, lists[0].getSeats());
Assert.assertFalse(lists[0].isEliminated());
Assert.assertEquals(2, lists[1].getSeats());
Assert.assertFalse(lists[1].isEliminated());
Assert.assertEquals(1, lists[2].getSeats());
Assert.assertFalse(lists[2].isEliminated());
Assert.assertEquals(1, lists[3].getSeats());
Assert.assertFalse(lists[3].isEliminated());
Assert.assertEquals(0, lists[4].getSeats());
Assert.assertFalse(lists[4].isEliminated());
Assert.assertEquals(0, lists[5].getSeats());
Assert.assertTrue(lists[5].isEliminated());
Assert.assertEquals(0, lists[6].getSeats());
Assert.assertTrue(lists[6].isEliminated());
}
}
Let's examine the changes:
- line 48: the static method [BlockingObservable.from(Observable).first]:
- subscribes to the observable passed as a parameter to [from];
- triggers the execution of the code associated with the observable;
- waits to receive the first result. This is therefore a synchronous operation;
We use the [firstOrDefault(null)] method here because the observable [metier.authenticate] does not return a result when executed. The result of the [firstOrDefault(null)] method will therefore be null, a value that is not used here;
We repeat this pattern throughout the rest of the code whenever we want to call the [business] layer.
The unit test [Test01] must pass:
![]() |
Task: Verify that the [Test01] test passes.
20.1.4.2. Test02
We modify the [Test01] test to now test the asynchronous interface [IRxElectionsMetier] by making asynchronous calls to its methods.
Let’s examine an initial test:
// thread synchronization semaphore
private CountDownLatch latch;
// -----------------------------------
private ElectionsException checkUserUserException;
@Test()
public void checkUserUser() throws InterruptedException {
// semaphore set to 1
latch = new CountDownLatch((1));
// asynchronous operation
electionsMetier.authenticate(user).subscribeOn(Schedulers.io())
.subscribe((result) -> {
},
(th) -> {
checkUserUserException = (ElectionsException) th;
latch.countDown();
},
() -> {
latch.countDown();
});
// wait for semaphore
latch.await();
// check results
Assert.assertNotNull(checkUserUserException);
Assert.assertEquals("403 Forbidden", checkUserUserException.getErrors().get(0));
}
- Line 2: A semaphore is a tool used to synchronize threads with each other. Threads are execution flows running in parallel. To execute task T1, thread [Thread1] may need task T2, executed by thread [Thread2], to be completed. It then waits for thread [Thread2] to send it a signal indicating that task T2 is complete. There are various ways to manage this synchronization between two threads. The method used here is as follows:
- line 10: thread [Thread1] creates a semaphore with the value 1;
- line 12: thread [Thread1] creates and launches a thread [Thread2]. This is achieved using the syntax:
electionsMetier.authenticate(user).subscribeOn(Schedulers.io())
The [Observable.subscribeOn] method sets the thread on which the observed process will run. The parameter of [subscribeOn] is a thread pool. The RxJava library provides several pools suited to different situations. The [Schedulers.io()] pool is the one recommended for network operations;
- (continued)
- lines 12-13: the operation
electionsMetier.authenticate(user).subscribeOn(Schedulers.io()).subscribe(...)
executes the synchronous operation encapsulated in the observable [authenticate(user)]. But because this synchronous operation is launched on a thread other than [Thread1], the latter does not wait for the response from the [subscribe] method and moves on to the next statement;
- (continued)
- line 23: the [Thread1] thread pauses and waits for the semaphore to go to 0 (it is currently at 1);
- lines 13–21: the [subscribe] method takes three lambda functions as parameters:
- the first [(result)->{...}] is called every time the observable [authenticate(user)] emits a result [result]. Here we have an observable [authenticate(user)] that does something but emits no result. The lambda [(result)->{}] will therefore never be called. That is why its code is empty here [{}];
- the second [(th)->{...}] takes a [Throwable] type as a parameter. It is called when the observable’s execution encounters an exception. Here, we handle the [Throwable th] parameter as follows:
- line 16: we store it in a field of the test class of type [ElectionsException] because the executed observable only throws this type of exception;
- line 17: we set the semaphore to 0 to indicate that the [Thread2] thread has finished its work;
- the third [()->{...}] is called when the observable has no more elements to emit. We handle this event as follows:
- line 20: we set the semaphore to 0 to indicate that thread [Thread2] has finished its work;
Note that the third lambda is not called if an exception occurs. This is why we had to set the semaphore to 0 in line 17 as well;
- line 25: when we reach this line, the observable has finished its work. We can then perform the same checks as in test [Test01];
Let’s examine another test:
// -----------------------------------
private ElectionsException calculSieges1Exception;
private VoterList[] seatCalculationLists1;
@Test
public void calculateSeats1() throws InterruptedException {
// create an array of 7 candidate lists
VoterList[] lists = new VoterList[7];
lists[0] = new VoterList("A", 32000, 0, false);
lists[1] = new VoterList("B", 25000, 0, false);
lists[2] = new ElectoralList("C", 16000, 0, false);
lists[3] = new VoterList("D", 12000, 0, false);
lists[4] = new VoterList("E", 8000, 0, false);
lists[5] = new VoterList("F", 4500, 0, false);
lists[6] = new VoterList("G", 2500, 0, false);
// 1-count semaphore
latch = new CountDownLatch((1));
// asynchronous operation
// calculate the seats for each list
electionsMetier.calculateSeats(admin, lists).subscribeOn(Schedulers.io())
.subscribe((result) -> {
seatsCalculatedLists1 = result;
},
(th) -> {
calculSieges1Exception = (ElectionsException) th;
latch.countDown();
},
() -> {
latch.countDown();
});
// wait for semaphore
latch.await();
// check the results
Assert.assertNull(calculSieges1Exception);
Assert.assertEquals(2, seatsCalculation1[0].getSeats());
Assert.assertFalse(seatsCalculationList1[0].isEliminated());
Assert.assertEquals(2, seatsCalculationList1[1].getSeats());
Assert.assertFalse(seatsCalculation1[1].isEliminated());
Assert.assertEquals(1, seatsCalculationList1[2].getSeats());
Assert.assertFalse(seatsCalculationLists1[2].isEliminated());
Assert.assertEquals(1, seatsCalculationList1[3].getSeats());
Assert.assertFalse(seatsCalculationLists1[3].isEliminated());
Assert.assertEquals(0, seatsCalculationList1[4].getSeats());
Assert.assertFalse(seatsCalculationLists1[4].isEliminated());
Assert.assertEquals(0, seatsCalculationList1[5].getSeats());
Assert.assertTrue(seatsCalculationLists1[5].isEliminated());
Assert.assertEquals(0, seatsCalculationList1[6].getSeats());
Assert.assertTrue(seatsCalculationLists1[6].isEliminated());
}
- lines 20–30: asynchronous execution of the observable [electionsMetier.calculateSeats(admin, lists)];
- lines 21–23: the execution of the observable returns a [VoterList[]] type, which is stored in a field of the test class, line 3;
- lines 34–48: these checks are those from the [Test01] test, to which we have added the check on line 34 that ensures no exception occurred;
The entire [Test02] test is available in the course materials.
Assignment: Run the [Test02] test and verify that it passes.
20.1.4.3. Test03
Test [Test03] does the same thing as test [Test01]: it tests the [IRxElectionsMetier] interface using synchronous calls to that interface. It is a copy of test [Test02] with two differences:
- the observables are no longer executed in a thread different from the one running the tests. When thread [Thread1] executes the [subscribe] method of an observable, it initiates an HTTP operation to the server also on thread [Thread1]. The entire [subscribe] method then becomes synchronous;
- since there is now only one thread, thread synchronization becomes unnecessary and the semaphore disappears;
Here are two examples of tests:
// -----------------------------------
private ElectionsException checkUserUserException;
@Test()
public void checkUserUser() throws InterruptedException {
// synchronous operation
electionsMetier.authenticate(user)
.subscribe((result) -> {
},
(th) -> {
checkUserUserException = (ElectionsException) th;
},
() -> {
});
// verify results
Assert.assertNotNull(checkUserUserException);
Assert.assertEquals("403 Forbidden", checkUserUserException.getErrors().get(0));
}
- line 7: by default, the method [electionsMetier.authenticate(user).subscribe] runs in the calling code's thread. This is therefore a synchronous operation;
// -----------------------------------
private ElectionsException calculSieges1Exception;
private VoterList[] voterLists1;
@Test
public void calculateSeats1() throws InterruptedException {
// create an array of 7 candidate lists
VoterList[] lists = new VoterList[7];
lists[0] = new VoterList("A", 32000, 0, false);
lists[1] = new VoterList("B", 25000, 0, false);
lists[2] = new VoterList("C", 16000, 0, false);
lists[3] = new VoterList("D", 12000, 0, false);
lists[4] = new VoterList("E", 8000, 0, false);
lists[5] = new VoterList("F", 4500, 0, false);
lists[6] = new VoterList("G", 2500, 0, false);
// synchronous operation
// calculate the seats for each list
electionsMetier.calculateSeats(admin, lists)
.subscribe((result) -> {
seatsCalculatedLists1 = result;
},
(th) -> {
SeatCalculation1Exception = (ElectionsException) th;
},
() -> {
});
// check the results
Assert.assertNull(calculSieges1Exception);
Assert.assertEquals(2, seatsCalculation1[0].getSeats());
Assert.assertFalse(seatsCalculation1[0].isEliminated());
Assert.assertEquals(2, seatsCalculation1[1].getSeats());
Assert.assertFalse(seatsCalculation1[1].isEliminated());
Assert.assertEquals(1, seatsCalculationList1[2].getSeats());
Assert.assertFalse(seatsCalculationLists1[2].isEliminated());
Assert.assertEquals(1, seatsCalculationList1[3].getSeats());
Assert.assertFalse(seatsCalculationLists1[3].isEliminated());
Assert.assertEquals(0, seatsCalculationList1[4].getSeats());
Assert.assertFalse(seatsCalculationLists1[4].isEliminated());
Assert.assertEquals(0, seatsCalculationList1[5].getSeats());
Assert.assertTrue(seatsCalculationLists1[5].isEliminated());
Assert.assertEquals(0, seatsCalculationList1[6].getSeats());
Assert.assertTrue(seatsCalculationLists1[6].isEliminated());
}
Task: Run the test [Test03] and verify that it passes.
20.2. Step 2
We will now port the synchronous console application from Chapter 17.5 into an application that remains synchronous but uses the asynchronous interface [RxJava, business logic, DAO];
![]() |
We start with the project [elections-console-business-dao-security-webjson] [1] from Chapter 17.5, which we duplicate into a new project [elections-console-rxjava-business-dao-security-webjson] [2]:
![]() | ![]() |
- In [3-4], in the new project, we remove the dependency on the old synchronous [business] layer;
![]() | ![]() |
- In [5-9], we add a dependency on the new asynchronous [business] layer;
![]() | ![]() |
- in [10-14], we rename the [ElectionsConsole] class to [ElectionsConsole01];
Similarly, we rename the [BootElectionsConsole] class to [BootElectionsConsole01]:
![]() |
The current code for the [BootElectionsConsole01] class is as follows:
package elections.security.client.boot;
import elections.security.client.console.IElectionsUI;
public class BootElectionsConsole01 extends AbstractBootElections {
public static void main(String[] arguments) {
new BootElectionsConsole01().run();
}
@Override
protected IElectionsUI getUI() {
return ctx.getBean("electionsConsole", IElectionsUI.class);
}
}
- Line 13: Because we changed the class name from [ElectionsConsole] to [ElectionsConsole01], we must now write:
return ctx.getBean("electionsConsole01", IElectionsUI.class);
Let’s go back to the code for the [ElectionsConsole01] class:
@Component
public class ElectionsConsole01 implements IElectionsUI {
@Autowired
private IElectionsMetier electionsMetier;
@Autowired
private User admin;
@Override
public void run() {
// the competing lists
VoterLists[] lists;
// data entry
try (Scanner keyboard = new Scanner(System.in)) {
// request the competing lists from the [business] layer
lists = businessElections.getElectionLists(admin);
...
// calculate the seats
lists = businessElections.calculateSeats(admin, lists);
// save the results
electionsMetier.recordResults(admin, lists);
...
}
If we follow the example of test [Test01] in section 20.1.4.1, lines 5, 17, 20, and 22 will change as follows:
@Component
public class ElectionsConsole01 implements IElectionsUI {
@Autowired
private IRxElectionsMetier electionsMetier;
@Autowired
private User admin;
@Override
public void run() {
// the competing lists
VoterLists[] lists;
// data entry
try (Scanner keyboard = new Scanner(System.in)) {
// request the competing lists from the [business] layer
lists = BlockingObservable.from(electionsMetier.getListesElectorales(admin)).first();
...
// calculate the seats
lists = BlockingObservable.from(electionsMetier.calculateSeats(admin, lists)).first();
// record the results
BlockingObservable.from(electionsMetier.recordResults(admin, lists));
...
}
Task: Configure the project to run the [BootElectionsConsole01] class with the three parameters [SS, Hours worked, Days worked] and verify that running the project as configured produces the expected results.
Task: Configure the project to run the pair [BootElectionsConsole02, ElectionsConsole02], where the class [ElectionsConsole02] has been written following the model of the [Test02] test in Section 20.1.4.2.
Task: Configure the project to run the pair [BootElectionsConsole03, ElectionsConsole03], where the class [ElectionsConsole03] has been written following the model of the test [Test03] in Section 20.1.4.3.
20.3. Step 3
We will now proceed to port the Swing application to an asynchronous environment.
![]() |
We start by duplicating the [elections-swing-metier-dao-security-webjson] [1] project from Chapter 17.6 into a new project [elections-swing-rxjava-metier-dao-security-webjson] [2]:
![]() | ![]() |
- in [3, 4], we remove the dependency on the synchronous [console] layer;
![]() | ![]() |
- in [5-9], we add a dependency on the asynchronous console layer;
The [swing] layer will make true asynchronous calls to the [business] layer. When calling a method on the latter, there will be two threads:
- the UI thread, which handles events;
- an I/O thread that will execute the HTTP call to the server;
Throughout the asynchronous call, we should display a loading image and a cancel button. We won’t do that here, and it will be suggested to you as an improvement to the application. The changes are made in the two classes that make calls to the [business] layer:
![]() |
20.3.1. Maven Configuration
Here, we will use the [RxSwing] library, which extends the [RxJava] library with features available only in a Swing environment. To do this, we modify the [pom.xml] file as follows:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>istia.st.elections</groupId>
<artifactId>elections-swing-rxjava-business-dao-security-webjson</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>elections-swing-rxjava-business-dao-security-webjson</name>
<description>Asynchronous Swing layer for the web client / JSON</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!-- RxSwing -->
<!-- https://mvnrepository.com/artifact/io.reactivex/rxswing -->
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxswing</artifactId>
<version>0.27.0</version>
</dependency>
<!-- lower layers -->
<dependency>
<groupId>istia.st.elections</groupId>
<artifactId>elections-console-rxjava-business-dao-security-webjson</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
20.3.2. The [ElectionsConnectForm] class
In an asynchronous operation, the [ElectionsConnectForm] class becomes the following:
package elections.security.client.swing;
import elections.security.client.console.IElectionsUI;
import elections.security.client.entities.User;
import java.awt.Dimension;
import java.awt.Toolkit;
import javax.swing.SwingUtilities;
import elections.security.client.business.IRxElectionsBusiness;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import rx.schedulers.Schedulers;
import rx.schedulers.SwingScheduler;
@Component
public class ElectionsConnectForm extends AbstractElectionsConnectForm implements IElectionsUI {
private static final long serialVersionUID = 1L;
// reference to the asynchronous [business] layer
@Autowired
private IRxElectionsMetier business;
// logged-in user
private User user;
// main form
@Autowired
private ElectionsMainForm electionsMainForm;
// UI session
@Autowired
private UiSession uiSession;
@Override
protected void doConnect() {
if (isPageValid()) {
// user authentication
business.authenticate(user).subscribeOn(Schedulers.io()).observeOn(SwingScheduler.getInstance()).subscribe(
// no response
(result) -> {
},
// exception handling
(th) -> {
// log the error
String info = getInfoForException("The following errors occurred:", th);
// display the info
jTextPaneErrors.setText(info);
jTextPaneErrors.setCaretPosition(0);
},
// authentication is complete
() -> {
// store the user in the session
uiSession.setUser(user);
// the login view is hidden
setVisible(false);
// display the main view
electionsMainForm.run();
});
}
}
// initializations
@Override
protected void init() {
...
}
@Override
public void run() {
// display the GUI
SwingUtilities.invokeLater(new Runnable() {
public void run() {
init();
setVisible(true);
}
});
}
private boolean isPageValid() {
...
}
private String getInfoForException(String message, Throwable ex) {
...
}
}
- Lines 36–63: The [doConnect] method is executed when the user taps the [Connect] menu option:
![]() |
It's all in line 40:
business.authenticate(user).subscribeOn(Schedulers.io()).observeOn(SwingScheduler.getInstance()).subscribe(...)
- The observed process is [metier.authenticate(user)];
- it will be executed on an I/O thread taken from the [Schedulers.io()] pool;
- it will be observed in the UI thread, the one that handles Swing interface events [observeOn(SwingScheduler.getInstance())]. This thread is obtained via the method [SwingScheduler.getInstance()], where [SwingScheduler] is a class provided by the [RxSwing] library. This is mandatory. When the result of the asynchronous operation is obtained, it is often used to modify elements of the Swing interface. However, the Swing interface can only be modified in the UI thread; otherwise, an exception is thrown. Lines 41–61 must therefore execute in the UI thread. This is ensured here by the [observeOn(SwingScheduler.getInstance())] method;
Let’s comment on the rest of the code:
- lines 42–43: these lines are there to comply with the syntax of the [subscribe] method. They will never be executed because the [metier.authenticate(user)] process returns no result;
- lines 35–52: when an exception is received, it is displayed;
- lines 54-61: executed when the [metier.authenticate(user)] process signals the end of its emissions;
20.3.3. The [ElectionsMainForm] class
![]() |
20.3.3.1. Initializing the graphical user interface
package elections.security.client.swing;
import elections.security.client.console.IElectionsUI;
import elections.security.client.entities.VoterList;
import elections.security.client.entities.User;
import elections.security.client.business.IRxElectionsBusiness;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import rx.schedulers.Schedulers;
import rx.schedulers.SwingScheduler;
import javax.swing.*;
import java.awt.*;
import java.util.ArrayList;
import java.util.List;
@Component
public class ElectionsMainForm extends AbstractElectionsMainForm implements IElectionsUI {
private static final long serialVersionUID = 1L;
// reference to the asynchronous [business] layer
@Autowired
private IRxElectionsMetier business;
// UI session
@Autowired
private UiSession uiSession;
// logged-in user
private User user;
// JList models
private DefaultListModel<String> voiceNamesModel = null;
private DefaultListModel<String> resultsModel = null;
// competing lists
private ElectoralList[] lists;
// lists entered by the user
private final List<ElectionList> enteredLists = new ArrayList<>();
private ElectoralList[] enteredLists;
// initializations
@Override
protected void init() {
// generate components via the parent class
super.init();
// form state
Utilities.setEnabled(new JLabel[]{jLabelAdd, jLabelCalculate, jLabelSave, jLabelDelete}, false);
Utilities.setEnabled(
new JMenuItem[]{jMenuItemAdd, jMenuItemCalculate, jMenuItemSave, jMenuItemDelete}, false);
// Center the window
Dimension screenSize = Toolkit.getDefaultToolkit().getScreenSize();
Dimension frameSize = getSize();
if (frameSize.height > screenSize.height) {
frameSize.height = screenSize.height;
}
if (frameSize.width > screenSize.width) {
frameSize.width = screenSize.width;
}
setLocation((screenSize.width - frameSize.width) / 2, (screenSize.height - frameSize.height) / 2);
// logged-in user
user = uiSession.getUser();
// local initializations
voiceNamesModel = new DefaultListModel<>();
jListVoiceNames.setModel(voiceNamesModel);
resultsModel = new DefaultListModel<>();
jListResults.setModel(resultsModel);
// request the lists from the [business] layer
business.getVoterLists(user).subscribeOn(Schedulers.io()).observeOn(SwingScheduler.getInstance())
.subscribe(
// response
electoralLists -> {
// store the lists
lists = electoralLists;
},
// exception
(th) -> showException(th),
// end of observable
() -> {
// next step
doInitStep2();
});
}
...
- line 46: the [init] method is executed when the associated window is about to be displayed. Its purpose is to initialize components [1-3] below:
![]() |
- lines 71–85: the candidate lists are requested asynchronously (component [1]);
- line 71: the observed process is [metier.getListesElectorales(user)]. It is executed on an I/O thread [subscribeOn(Schedulers.io())] and observed on the UI thread [observeOn(SwingScheduler.getInstance())];
- lines 74–77: the result returned by the observed process is stored in the [listes] field on line 38;
- line 79: any exception is handled by the following method:
private void showException(Throwable th) {
// display the exception
jTextPaneMessages.setText(getInfoForException("The following errors occurred: ", th));
jTextPaneMessages.setCaretPosition(0);
}
- Lines 81–84: At the end of the observed process, lines 81–84 are executed. These lines are not executed if an exception occurred. The [doInitStep2] method performs step 2 of the initialization as follows:
private void doInitStep2() {
// associate the list names with the jComboBoxNomsListes combo box
for (int i = 0; i < lists.length; i++) {
jComboBoxNomsListes.addItem(String.format("%s - %s", listes[i].getId(), listes[i].getNom()));
}
// number of seats to be filled
job.getNumberOfSeatsToBeFilled(user).subscribeOn(Schedulers.io()).observeOn(SwingScheduler.getInstance())
.subscribe(
// response
nbSeatsToFill -> {
// initialize the label associated with this information
jLabelSAP.setText(jLabelSAP.getText() + nbSeatsAvailable);
},
// exception
(th) -> showException(th),
// end of observable
() -> {
// next step
doInitStep3();
});
}
- lines 3–5: we use the result from the previous step to populate the dropdown list with the names of the candidate lists;
- lines 7–20: we request the number of seats to be filled asynchronously;
- line 7: the observed process is [metier.getNbSiegesAPourvoir(user)]. It is executed on an I/O thread [subscribeOn(Schedulers.io())] and observed on the UI thread [observeOn(SwingScheduler.getInstance())];
- lines 10–13: the result returned by the process is used to update the graphical user interface;
- line 15: any exceptions are displayed;
- lines 17–20: upon receiving the end signal from the observable, we proceed to step 3 of the initialization process;
Step 3 of the initialization is handled by the following code:
private void doInitStep3() {
// electoral threshold
business.getVotingThreshold(user).subscribeOn(Schedulers.io()).observeOn(SwingScheduler.getInstance())
.subscribe(
// response
votingThreshold -> {
// initialize the label associated with this information
jLabelSE.setText(jLabelSE.getText() + electoralThreshold);
},
// exception
(th) -> showException(th),
// end of observable
() -> {
});
}
- lines 3-4: the electoral threshold is requested asynchronously;
- line 3: the observed process is [business.getVotingThreshold(user)]. It is executed on an I/O thread [subscribeOn(Schedulers.io())] and observed on the UI thread [observeOn(SwingScheduler.getInstance())];
- lines 6-9: the result returned by the process is used to update the GUI;
- line 11: any exceptions are displayed;
- lines 13–14: upon receiving the end signal from the observable, no action is taken: the GUI initialization process is complete;
20.3.3.2. Calculating the seats won by the various lists
The [doCalculer] method is responsible for calculating the number of seats won by the various lists:
@Override
protected void doCalculate() {
tListesSaisies = listesSaisies.toArray(new ListeElectorale[0]);
// calculate seats
String info = null;
business.calculateSeats(user, tEnteredLists).subscribeOn(Schedulers.io()).observeOn(SwingScheduler.getInstance())
.subscribe(
// process result
result -> consumeResultSeats(result),
// exception handling
th -> showException(th),
// end of observable
() -> {
}
);
}
- lines 6–15: the seats obtained by the various lists are calculated asynchronously;
- line 6: the observed process is [metier.calculateSeats(user, tListsEntered)]. It is executed on an I/O thread [subscribeOn(Schedulers.io())] and observed on the UI thread [observeOn(SwingScheduler.getInstance())];
- line 9: the result returned by the process is used by the method [consumeResultSeats];
- line 11: any exceptions are displayed;
- lines 13–14: upon receiving the end signal from the observable, no action is taken;
Line 9: the [consumeResultSieges] method processes the result returned by the observed process, updating the candidate lists with their [seats, eliminated] fields:
private void consumeResultSeats(ElectoralList[] tEnteredLists) {
// Store the result
this.tListesSaisies = tListesSaisies;
// display results
resultsTemplate.clear();
for (int i = 0; i < tListesSaisies.length; i++) {
resultsModel.addElement(tListsEntered[i].toString());
}
// Update form state
Utilities.setEnabled(new JLabel[]{jLabelSave}, true);
Utilities.setEnabled(new JLabel[]{jLabelCalculate}, false);
Utilities.setEnabled(new JMenuItem[]{jMenuItemSave}, true);
Utilities.setEnabled(new JMenuItem[]{jMenuItemCalculate}, false);
jTextPaneMessages.setText("Calculation complete");
}
- lines 4-14: the result is used to update the GUI;
20.3.3.3. Recording the election results
The election results are saved using the following [doEnregistrer] method:
@Override
protected void doEnregistrer() {
// we request the recording from the [business] layer
business.recordResults(user, tEnteredLists).subscribeOn(Schedulers.io()).observeOn(SwingScheduler.getInstance())
.subscribe(
// process the result - there isn't one here
(param) -> {
},
// exception handling
(th) -> showException(th),
// observable end
() -> {
// update the form
Utilities.setEnabled(new JLabel[]{jLabelSave}, false);
Utilities.setEnabled(new JMenuItem[]{jMenuItemSave}, false);
jTextPaneMessages.setText("Results saved");
}
);
}
- lines 4–17: the election results are saved asynchronously;
- line 4: the observed process is [business.recordResults(user, tEnteredLists)]. It is executed on an I/O thread [subscribeOn(Schedulers.io())] and observed on the UI thread [observeOn(SwingScheduler.getInstance())];
- lines 7–8: these lines will never be executed because the observed process does not return a result;
- line 10: any exception is displayed;
- lines 14–16: upon receiving the end signal from the observable, the GUI is updated;
Task: Verify that the Swing application works. Then modify the GUI and the code so that during an asynchronous operation with the web server/JSON, a loading image appears along with an option to cancel the current operation.























