Skip to content

20. Asynchronous Programming with RxJava

Recommended reading: [Introduction to RxJava. Application to Swing and Android environments.]

In this chapter, we revisit Chapter 17.6, where we built a client/server application with the following architecture:

Certain user actions on the Swing interface in [1] trigger actions all the way to the database in [3] via an HTTP network [2]. Because of this, the response to the user’s action may take more or less time to arrive. It would be helpful to include a loading indicator on the user interface with an option to cancel the initiated operation if it takes too long. In Chapter 17.6, every user action requiring data exchange with the server is synchronous. The event handler executed by the code does not complete until the response is received. During this time, the graphical interface is frozen: it does not respond to new user actions. These are simply queued to be processed once the currently running event handler has finished. Thus, if a cancel button were displayed, the user could click it, but nothing would happen until the current operation was finished. The cancel button would then serve no purpose.

For the click on the cancel button to take effect, the current operation must be completed. To achieve this, it must launch the potentially long-running operation asynchronously:

  • the event handler initiates the long-running operation but does not wait for its result and returns control to the UI thread that handles the graphical interface events. The long-running operation is launched on a thread different from the UI thread, which prevents the latter from being blocked;
  • If the user clicks the cancel button before the long-running operation completes, the idle UI thread can handle this event. The long-running operation can then be abandoned by ignoring its result;
  • If the long-running operation has not been canceled, the arrival of the response will trigger an event in the UI thread. If the UI thread is idle, it will then execute the code associated with this event, which will process the response;

The user interface will function as before. If the server’s response times are fast, the user will not notice a difference. If they are noticeable, the user will see a cancel button appear and will have the option to interrupt the current operation.

The [Rx] library enables asynchronous programming. Its major advantage lies in the fact that it has been ported to numerous environments (Java, .NET, JS, etc.) and that proficiency in one environment can be easily transferred to another. Here, we will refer to Chapter 2 of the document [Introduction to RxJava. Application to Swing and Android Environments]. The reader is encouraged to read it. In the following sections, we will use code from the examples in this chapter.

We will evolve the application’s architecture as follows:

  • In [1], we insert an [RxJava] layer between the [Swing] layer and the [business logic] layer. The methods in the latter will now be called asynchronously;

We will proceed in several steps:

  • Step 1: The [business logic, DAO] layer currently presents a synchronous interface to the [UI] layer. We will transform it into an asynchronous [RxJava, business logic, DAO] layer;
  • Step 2: We will convert the synchronous console application into an application that remains synchronous but uses the asynchronous interface [RxJava, business, DAO];
  • Step 3: We will convert the synchronous Swing application into an asynchronous Swing application;

20.1. Step 1

We are converting the current synchronous layer [business logic, DAO] into an asynchronous layer [RxJava, business logic, DAO].

20.1.1. Creation

We start with the Maven project from Chapter 17.4, which we open in NetBeans:

We duplicate this project [1] (copy/paste) into a new project [elections-rxjava-business-dao-security-webjson] [2].

20.1.2. Maven Configuration

We update the [pom.xml] file of the new project to add the dependency on the [RxJava] library:


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>istia.st.elections</groupId>
  <artifactId>elections-metier-dao-security-rxjava-webjson</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <description>Client jUnit du serveur web / jSON</description>
  <name>elections-metier-dao-security-rxjava-webjson</name>
 
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>1.8</java.version>
  </properties>
 
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.2.7.RELEASE</version>
  </parent>
 
  <dependencies>
    <!-- Spring -->
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-web</artifactId>
    </dependency>
    <!-- jSON library used by Spring -->
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-core</artifactId>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
    </dependency>
    <!-- component used by Spring RestTemplate -->
    <dependency>
      <groupId>org.apache.httpcomponents</groupId>
      <artifactId>httpclient</artifactId>
    </dependency>
    <!-- Google Guava -->
    <dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>16.0.1</version>
      <scope>test</scope>
    </dependency>
    <!-- log library -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-logging</artifactId>
    </dependency>
    <!-- Spring Boot Test -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <!-- Spring Boot -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot</artifactId>
      <scope>test</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/io.reactivex/rxjava -->
    <dependency>
      <groupId>io.reactivex</groupId>
      <artifactId>rxjava</artifactId>
      <version>1.2.0</version>
    </dependency>
 
  </dependencies>
 
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.18.1</version>
      </plugin>
    </plugins>
  </build>
</project>
  • lines 65–70: we added the dependency on the RxJava library;

20.1.3. Asynchronous implementation of the [business] layer

To implement the [RxJava, business] layer, we add an asynchronous interface [IRxElectionsMetier] [1] and its implementation [RxElectionsMetier] [2] to the project:

  

The [IRxElectionsMetier] interface is the asynchronous interface for the [RxJava, business] layer. Its code is as follows:


package elections.security.client.metier;
 
import elections.security.client.entities.ListeElectorale;
import elections.security.client.entities.User;
import rx.Observable;
 
public interface IRxElectionsMetier {
 
  // authentication
  Observable<Void> authenticate(User user);
 
  // get the lists in competition
  Observable<ListeElectorale[]> getListesElectorales(User user);
 
  // the number of seats to be filled
  Observable<Integer> getNbSiegesAPourvoir(User user);
 
  // the electoral threshold
  Observable<Double> getSeuilElectoral(User user);
 
  // recording results
  Observable<Void> recordResultats(User user, ListeElectorale[] listesElectorales);
 
  // calculating seats
  Observable<ListeElectorale[]> calculerSieges(User user, ListeElectorale[] listesElectorales);
 
}

The [IRxElectionsMetier] interface inherits the methods from the [IElectionsMetier] interface, but where a method M in the [IElectionsMetier] interface returned a result of type T, the method M in the [IRxElectionsMetier] interface returns a result of type Observable<T>. The [Observable] type is provided by the RxJava library. An Observable<T> type provides the [subscribe] method, which retrieves the type T asynchronously. Three events are associated with this method:

  • onSuccess(T result), which notifies that a result of type T is available. The asynchronous operation may return multiple results;
  • onError(Throwable th), which notifies that the asynchronous operation encountered an error;
  • onCompleted(), which notifies that the asynchronous operation has completed;

Until the [Observable.subscribe] method is called, the asynchronous operation associated with the observable is not initiated. The code that calls a method M of the [IRxElectionsMetier] interface does not obtain the expected result T, but an Observable<T> type that will later allow it to obtain the result T by calling the [Observable.subscribe] method.

The [RxElectionsMetier] implementation of the [IRxElectionsMetier] interface is as follows:


package elections.security.client.metier;
 
import elections.security.client.entities.ListeElectorale;
import elections.security.client.entities.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import rx.Observable;
 
@Component
public class RxElectionsMetier implements IRxElectionsMetier {
 
  @Autowired
  private IElectionsMetier metier;
 
  @Override
  public Observable<Void> authenticate(User user) {
    ...
  }
 
  @Override
  public Observable<ListeElectorale[]> getListesElectorales(User user) {
    return Observable.create(subscriber -> {
      try {
        // call synchronous method then reply to subscriber
        subscriber.onNext(metier.getListesElectorales(user));
        // we signal the end of the observable
        subscriber.onCompleted();
      } catch (Exception e) {
        // we forward the exception
        subscriber.onError(e);
      }
    });
  }
 
  @Override
  public Observable<Integer> getNbSiegesAPourvoir(User user) {
    ...
  }
 
  @Override
  public Observable<Double> getSeuilElectoral(User user) {
    ...
  }
 
  @Override
  public Observable<Void> recordResultats(User user, ListeElectorale[] listesElectorales) {
    ...
  }
 
  @Override
  public Observable<ListeElectorale[]> calculerSieges(User user, ListeElectorale[] listesElectorales) {
    ...
  }
}
  • lines 12-13: Spring injection of the synchronous business layer;
  • lines 20-34: we will comment on the [getVoterLists] method, which instead of returning a [VoterList[]] type returns an [Observable<VoterList[]>] type;
  • lines 22-32: the static method [Observable.create] allows you to create an Observable from a [Subscriber] type. The [Subscriber] type represents a subscriber to the streams of results produced by the observed process (the Observable). It provides three methods:
    • [Subscriber.onNext] (line 25) to receive a result from the observed process;
    • [Subscriber.onError] (line 30) to receive an exception from the observed process. After an exception, the [Observable] type no longer emits results;
    • [Subscriber.onCompleted] (line 27) to receive the end-of-emission signal from the observed process. Here, the observed process emits only one element. Note that this signal is not emitted if an exception occurs. This is the default behavior of Observables: the emission of an exception also signals the end of emissions. Subscribers are aware of this;
  • lines 22–34: the [Observable.create] method takes an [Observable.OnSubscribe] type as a parameter. This type is a functional interface. This concept was introduced with Java 8 and refers to an interface with a single method. Here, the single method of the [Observable.OnSubscribe] interface is as follows:
T call(Subscriber<T> subscriber)

To implement a single-method functional interface m(param1, param2, ..., paramn), you can use the following simplified syntax:

(param1, param2, ..., paramn) -> { code de la méthode m}

This is what is done in lines 22–34:

  • [subscriber] is the parameter of the [Observable.OnSubscribe.call] method;
  • lines 23–32: the code we want to provide to the [call] method;
  • line 25: we request the voter lists synchronously from the [business] layer injected on line 13. We will therefore have to wait for the result. When it is received, it is passed to the subscriber’s [onNext] method;
  • line 28: in case of an error, the exception is passed to the subscriber’s [onError] method;
  • line 31: We wait for a single result. Once it is obtained (either the voter lists or an exception), we notify the subscriber that the observed process has finished emitting results;

It is important to remember that the [RxElectionsMetier] method returns a type Observable<VoterList[]> and not the type VoterList[] itself. The calling code must call the method Observable<VoterList[]>.subscribe for the code in lines 23–33 to be executed and return the voter lists via line 25.

The code for the other methods is similar:


package elections.security.client.metier;
 
import elections.security.client.entities.ListeElectorale;
import elections.security.client.entities.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import rx.Observable;
 
@Component
public class RxElectionsMetier implements IRxElectionsMetier {
 
  @Autowired
  private IElectionsMetier metier;
 
  @Override
  public Observable<Void> authenticate(User user) {
    return Observable.create(subscriber -> {
      try {
        // synchronous method call
        metier.authenticate(user);
        // we signal the end of the observable
        subscriber.onCompleted();
      } catch (Exception e) {
        // we forward the exception
        subscriber.onError(e);
      }
    });
 
  }
 
  @Override
  public Observable<ListeElectorale[]> getListesElectorales(User user) {
    return Observable.create(subscriber -> {
      try {
        // call synchronous method then reply to subscriber
        subscriber.onNext(metier.getListesElectorales(user));
        // we signal the end of the observable
        subscriber.onCompleted();
      } catch (Exception e) {
        // we forward the exception
        subscriber.onError(e);
      }
    });
  }
 
  @Override
  public Observable<Integer> getNbSiegesAPourvoir(User user) {
    return Observable.create(subscriber -> {
      try {
        // call synchronous method then reply to subscriber
        subscriber.onNext(metier.getNbSiegesAPourvoir(user));
        // we signal the end of the observable
        subscriber.onCompleted();
      } catch (Exception e) {
        // we forward the exception
        subscriber.onError(e);
      }
    });
  }
 
  @Override
  public Observable<Double> getSeuilElectoral(User user) {
    return Observable.create(subscriber -> {
      try {
        // call synchronous method then reply to subscriber
        subscriber.onNext(metier.getSeuilElectoral(user));
        // we signal the end of the observable
        subscriber.onCompleted();
      } catch (Exception e) {
        // we forward the exception
        subscriber.onError(e);
      }
    });
  }
 
  @Override
  public Observable<Void> recordResultats(User user, ListeElectorale[] listesElectorales) {
    return Observable.create(subscriber -> {
      try {
        // synchronous method call
        metier.recordResultats(user, listesElectorales);
        // we signal the end of the observable
        subscriber.onCompleted();
      } catch (Exception e) {
        // we forward the exception
        subscriber.onError(e);
      }
    });
  }
 
  @Override
  public Observable<ListeElectorale[]> calculerSieges(User user, ListeElectorale[] listesElectorales) {
    return Observable.create(subscriber -> {
      try {
        // call synchronous method then reply to subscriber
        subscriber.onNext(metier.calculerSieges(user, listesElectorales));
        // we signal the end of the observable
        subscriber.onCompleted();
      } catch (Exception e) {
        // we forward the exception
        subscriber.onError(e);
      }
    });
  }
}
  • lines 20 and 81: the subscriber's [onNext] method is not called because the subscriber does not expect any results;

20.1.4. JUnit tests for the [business] layer

  

20.1.4.1. Test01

We revisit the unit test [Test01] discussed in Section 17.4.4. It was designed to make synchronous calls to the [IElectionsMetier] interface. We modify it so that it makes synchronous calls to the new [IRxElectionsMetier] interface. It is indeed possible to make synchronous calls to an asynchronous RxJava interface. The code becomes as follows:


package elections.security.client.metier.junit;
 
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
 
import elections.security.client.config.MetierConfig;
import elections.security.client.entities.ElectionsException;
import elections.security.client.entities.ListeElectorale;
import elections.security.client.entities.User;
import elections.security.client.metier.IRxElectionsMetier;
import rx.observables.BlockingObservable;
 
@SpringApplicationConfiguration(classes = MetierConfig.class)
@RunWith(SpringJUnit4ClassRunner.class)
public class Test01 {
 
  // layer [electionsMetier]
  @Autowired
  private IRxElectionsMetier electionsMetier;
 
  // mapper jSON
  private final ObjectMapper mapper = new ObjectMapper();
 
  // users
  static private User admin;
  static private User user;
  static private User unknown;
 
  @BeforeClass
  public static void initTest() {
    admin = new User("admin", "admin");
    user = new User("user", "user");
    unknown = new User("x", "y");
  }
 
  @Test()
  public void checkUserUser() {
    ElectionsException se = null;
    try {
      BlockingObservable.from(electionsMetier.authenticate(user)).firstOrDefault(null);
    } catch (ElectionsException e) {
      se = e;
    }
    Assert.assertNotNull(se);
    Assert.assertEquals("403 Forbidden", se.getErreurs().get(0));
  }
 
  @Test()
  public void checkUserUnknown() {
    ElectionsException se = null;
    try {
      BlockingObservable.from(electionsMetier.authenticate(unknown)).firstOrDefault(null);
    } catch (ElectionsException e) {
      se = e;
    }
    Assert.assertNotNull(se);
    Assert.assertEquals("401 Unauthorized", se.getErreurs().get(0));
  }
 
  @Test()
  public void checkUserAdmin() {
    ElectionsException se = null;
    try {
      BlockingObservable.from(electionsMetier.authenticate(admin)).firstOrDefault(null);
    } catch (ElectionsException e) {
      se = e;
    }
    Assert.assertNull(se);
  }
 
  /**
   * vérification 1 : méthode de calcul des sièges on fixe en dur les listes
   */
  @Test
  public void calculSieges1() {
    // create the table of 7 candidate lists
    ListeElectorale[] listes = new ListeElectorale[7];
    listes[0] = new ListeElectorale("A", 32000, 0, false);
    listes[1] = new ListeElectorale("B", 25000, 0, false);
    listes[2] = new ListeElectorale("C", 16000, 0, false);
    listes[3] = new ListeElectorale("D", 12000, 0, false);
    listes[4] = new ListeElectorale("E", 8000, 0, false);
    listes[5] = new ListeElectorale("F", 4500, 0, false);
    listes[6] = new ListeElectorale("G", 2500, 0, false);
    // the seats for each list are calculated
    listes = BlockingObservable.from(electionsMetier.calculerSieges(admin, listes)).first();
    // check results
    Assert.assertEquals(2, listes[0].getSieges());
    Assert.assertFalse(listes[0].isElimine());
    Assert.assertEquals(2, listes[1].getSieges());
    Assert.assertFalse(listes[1].isElimine());
    Assert.assertEquals(1, listes[2].getSieges());
    Assert.assertFalse(listes[2].isElimine());
    Assert.assertEquals(1, listes[3].getSieges());
    Assert.assertFalse(listes[3].isElimine());
    Assert.assertEquals(0, listes[4].getSieges());
    Assert.assertFalse(listes[4].isElimine());
    Assert.assertEquals(0, listes[5].getSieges());
    Assert.assertTrue(listes[5].isElimine());
    Assert.assertEquals(0, listes[6].getSieges());
    Assert.assertTrue(listes[6].isElimine());
  }
 
  /**
   * vérification 2 : méthode de calcul des sièges on demande les listes à la couche [metier] puis on fixe en dur les
   * voix
   */
  @Test
  public void calculSieges2() {
    // create the table of 7 candidate lists
    ListeElectorale[] listes = BlockingObservable.from(electionsMetier.getListesElectorales(admin)).first();
    // the voices are hard-fixed
    listes[0].setVoix(32000);
    listes[1].setVoix(25000);
    listes[2].setVoix(16000);
    listes[3].setVoix(12000);
    listes[4].setVoix(8000);
    listes[5].setVoix(4500);
    listes[6].setVoix(2500);
    // the seats obtained by each list are calculated
    listes = BlockingObservable.from(electionsMetier.calculerSieges(admin, listes)).first();
    // check results
    Assert.assertEquals(2, listes[0].getSieges());
    Assert.assertFalse(listes[0].isElimine());
    Assert.assertEquals(2, listes[1].getSieges());
    Assert.assertFalse(listes[1].isElimine());
    Assert.assertEquals(1, listes[2].getSieges());
    Assert.assertFalse(listes[2].isElimine());
    Assert.assertEquals(1, listes[3].getSieges());
    Assert.assertFalse(listes[3].isElimine());
    Assert.assertEquals(0, listes[4].getSieges());
    Assert.assertFalse(listes[4].isElimine());
    Assert.assertEquals(0, listes[5].getSieges());
    Assert.assertTrue(listes[5].isElimine());
    Assert.assertEquals(0, listes[6].getSieges());
    Assert.assertTrue(listes[6].isElimine());
  }
 
  /**
   * vérification 3 méthode de calcul des sièges on provoque une exception
   */
  @Test(expected = ElectionsException.class)
  public void calculSieges3() {
    // we create a table of 24 candidate lists, each with 1 vote
    ListeElectorale[] listes = new ListeElectorale[25];
    // all 25 lists will have the same number of votes (4%)
    for (int i = 0; i < listes.length; i++) {
      listes[i] = new ListeElectorale("Liste" + (i + 1), 1, 0, false);
    }
    // calculation of seats - normally there should be a ElectionsException
    // with an electoral threshold of 5%
    BlockingObservable.from(electionsMetier.calculerSieges(admin, listes)).first();
  }
 
  /**
   * enregistrement des résultats de l'élection
   *
   * @throws JsonProcessingException
   */
  @Test
  public void ecritureResultatsElections() throws JsonProcessingException {
    // create the table of 7 candidate lists
    ListeElectorale[] listes = BlockingObservable.from(electionsMetier.getListesElectorales(admin)).first();
    // the voices are hard-fixed
    listes[0].setVoix(32000);
    listes[1].setVoix(25000);
    listes[2].setVoix(16000);
    listes[3].setVoix(12000);
    listes[4].setVoix(8000);
    listes[5].setVoix(4500);
    listes[6].setVoix(2500);
    // the seats obtained by each list are calculated
    listes = BlockingObservable.from(electionsMetier.calculerSieges(admin, listes)).first();
    // display results
    for (int i = 0; i < listes.length; i++) {
      System.out.println(mapper.writeValueAsString(listes[i]));
    }
    // results are entered into the database
    BlockingObservable.from(electionsMetier.recordResultats(admin, listes)).firstOrDefault(null);
    // check results
    listes = BlockingObservable.from(electionsMetier.getListesElectorales(admin)).first();
    // display results
    for (int i = 0; i < listes.length; i++) {
      System.out.println(mapper.writeValueAsString(listes[i]));
    }
    Assert.assertEquals(2, listes[0].getSieges());
    Assert.assertFalse(listes[0].isElimine());
    Assert.assertEquals(2, listes[1].getSieges());
    Assert.assertFalse(listes[1].isElimine());
    Assert.assertEquals(1, listes[2].getSieges());
    Assert.assertFalse(listes[2].isElimine());
    Assert.assertEquals(1, listes[3].getSieges());
    Assert.assertFalse(listes[3].isElimine());
    Assert.assertEquals(0, listes[4].getSieges());
    Assert.assertFalse(listes[4].isElimine());
    Assert.assertEquals(0, listes[5].getSieges());
    Assert.assertTrue(listes[5].isElimine());
    Assert.assertEquals(0, listes[6].getSieges());
    Assert.assertTrue(listes[6].isElimine());
  }
}

Let's examine the changes:

  • line 48: the static method [BlockingObservable.from(Observable).first]:
    • subscribes to the observable passed as a parameter to [from];
    • triggers the execution of the code associated with the observable;
    • waits to receive the first result. This is therefore a synchronous operation;

We use the [firstOrDefault(null)] method here because the observable [metier.authenticate] does not return a result when executed. The result of the [firstOrDefault(null)] method will therefore be null, a value that is not used here;

We repeat this pattern throughout the rest of the code whenever we want to call the [business] layer.

The unit test [Test01] must pass:

 

Task: Verify that the [Test01] test passes.


20.1.4.2. Test02

We modify the [Test01] test to now test the asynchronous interface [IRxElectionsMetier] by making asynchronous calls to its methods.

Let’s examine an initial test:


  // thread synchronization semaphore
  private CountDownLatch latch;
 
  // -----------------------------------
  private ElectionsException checkUserUserException;
 
  @Test()
  public void checkUserUser() throws InterruptedException {
    // 1" semaphore
    latch = new CountDownLatch((1));
    // asynchronous operation
    electionsMetier.authenticate(user).subscribeOn(Schedulers.io())
            .subscribe((result) -> {
            },
                    (th) -> {
                      checkUserUserException = (ElectionsException) th;
                      latch.countDown();
                    },
                    () -> {
                      latch.countDown();
                    });
    // waiting for semaphore
    latch.await();
    // checking results
    Assert.assertNotNull(checkUserUserException);
    Assert.assertEquals("403 Forbidden", checkUserUserException.getErreurs().get(0));
}
  • Line 2: A semaphore is a tool used to synchronize threads with each other. Threads are execution flows running in parallel. To execute task T1, thread [Thread1] may need task T2, executed by thread [Thread2], to be completed. It then waits for thread [Thread2] to send it a signal indicating that task T2 is complete. There are various ways to manage this synchronization between two threads. The method used here is as follows:
    • line 10: thread [Thread1] creates a semaphore with the value 1;
    • line 12: thread [Thread1] creates and launches a thread [Thread2]. This is achieved using the syntax:

electionsMetier.authenticate(user).subscribeOn(Schedulers.io())

The [Observable.subscribeOn] method sets the thread on which the observed process will run. The parameter of [subscribeOn] is a thread pool. The RxJava library provides several pools suited to different situations. The [Schedulers.io()] pool is the one recommended for network operations;

  • (continued)
    • lines 12-13: the operation

electionsMetier.authenticate(user).subscribeOn(Schedulers.io()).subscribe(...)

executes the synchronous operation encapsulated in the observable [authenticate(user)]. But because this synchronous operation is launched on a thread other than [Thread1], the latter does not wait for the response from the [subscribe] method and moves on to the next statement;

  • (continued)
    • line 23: the [Thread1] thread pauses and waits for the semaphore to go to 0 (it is currently at 1);
  • lines 13–21: the [subscribe] method takes three lambda functions as parameters:
    • the first [(result)->{...}] is called every time the observable [authenticate(user)] emits a result [result]. Here we have an observable [authenticate(user)] that does something but emits no result. The lambda [(result)->{}] will therefore never be called. That is why its code is empty here [{}];
    • the second [(th)->{...}] takes a [Throwable] type as a parameter. It is called when the observable’s execution encounters an exception. Here, we handle the [Throwable th] parameter as follows:
      • line 16: we store it in a field of the test class of type [ElectionsException] because the executed observable only throws this type of exception;
      • line 17: we set the semaphore to 0 to indicate that the [Thread2] thread has finished its work;
    • the third [()->{...}] is called when the observable has no more elements to emit. We handle this event as follows:
      • line 20: we set the semaphore to 0 to indicate that thread [Thread2] has finished its work;

Note that the third lambda is not called if an exception occurs. This is why we had to set the semaphore to 0 in line 17 as well;

  • line 25: when we reach this line, the observable has finished its work. We can then perform the same checks as in test [Test01];

Let’s examine another test:


// -----------------------------------
  private ElectionsException calculSieges1Exception;
  private ListeElectorale[] listesCalculSieges1;
 
  @Test
  public void calculSieges1() throws InterruptedException {
    // create the table of 7 candidate lists
    ListeElectorale[] listes = new ListeElectorale[7];
    listes[0] = new ListeElectorale("A", 32000, 0, false);
    listes[1] = new ListeElectorale("B", 25000, 0, false);
    listes[2] = new ListeElectorale("C", 16000, 0, false);
    listes[3] = new ListeElectorale("D", 12000, 0, false);
    listes[4] = new ListeElectorale("E", 8000, 0, false);
    listes[5] = new ListeElectorale("F", 4500, 0, false);
    listes[6] = new ListeElectorale("G", 2500, 0, false);
    // 1" semaphore
    latch = new CountDownLatch((1));
    // asynchronous operation    
    // the seats for each list are calculated
    electionsMetier.calculerSieges(admin, listes).subscribeOn(Schedulers.io())
            .subscribe((result) -> {
              listesCalculSieges1 = result;
            },
                    (th) -> {
                      calculSieges1Exception = (ElectionsException) th;
                      latch.countDown();
                    },
                    () -> {
                      latch.countDown();
                    });
    // waiting for semaphore
    latch.await();
    // check results
    Assert.assertNull(calculSieges1Exception);
    Assert.assertEquals(2, listesCalculSieges1[0].getSieges());
    Assert.assertFalse(listesCalculSieges1[0].isElimine());
    Assert.assertEquals(2, listesCalculSieges1[1].getSieges());
    Assert.assertFalse(listesCalculSieges1[1].isElimine());
    Assert.assertEquals(1, listesCalculSieges1[2].getSieges());
    Assert.assertFalse(listesCalculSieges1[2].isElimine());
    Assert.assertEquals(1, listesCalculSieges1[3].getSieges());
    Assert.assertFalse(listesCalculSieges1[3].isElimine());
    Assert.assertEquals(0, listesCalculSieges1[4].getSieges());
    Assert.assertFalse(listesCalculSieges1[4].isElimine());
    Assert.assertEquals(0, listesCalculSieges1[5].getSieges());
    Assert.assertTrue(listesCalculSieges1[5].isElimine());
    Assert.assertEquals(0, listesCalculSieges1[6].getSieges());
    Assert.assertTrue(listesCalculSieges1[6].isElimine());
  }
  • lines 20–30: asynchronous execution of the observable [electionsMetier.calculateSeats(admin, lists)];
  • lines 21–23: the execution of the observable returns a [VoterList[]] type, which is stored in a field of the test class, line 3;
  • lines 34–48: these checks are those from the [Test01] test, to which we have added the check on line 34 that ensures no exception occurred;

The entire [Test02] test is available in the course materials.


Assignment: Run the [Test02] test and verify that it passes.


20.1.4.3. Test03

Test [Test03] does the same thing as test [Test01]: it tests the [IRxElectionsMetier] interface using synchronous calls to that interface. It is a copy of test [Test02] with two differences:

  • the observables are no longer executed in a thread different from the one running the tests. When thread [Thread1] executes the [subscribe] method of an observable, it initiates an HTTP operation to the server also on thread [Thread1]. The entire [subscribe] method then becomes synchronous;
  • since there is now only one thread, thread synchronization becomes unnecessary and the semaphore disappears;

Here are two examples of tests:


  // -----------------------------------
  private ElectionsException checkUserUserException;
 
  @Test()
  public void checkUserUser() throws InterruptedException {
    // synchronous operation
    electionsMetier.authenticate(user)
            .subscribe((result) -> {
            },
                    (th) -> {
                      checkUserUserException = (ElectionsException) th;
                    },
                    () -> {
                    });
    // checking results
    Assert.assertNotNull(checkUserUserException);
    Assert.assertEquals("403 Forbidden", checkUserUserException.getErreurs().get(0));
}
  • line 7: by default, the method [electionsMetier.authenticate(user).subscribe] runs in the calling code's thread. This is therefore a synchronous operation;

  // -----------------------------------
  private ElectionsException calculSieges1Exception;
  private ListeElectorale[] listesCalculSieges1;
 
  @Test
  public void calculSieges1() throws InterruptedException {
    // create the table of 7 candidate lists
    ListeElectorale[] listes = new ListeElectorale[7];
    listes[0] = new ListeElectorale("A", 32000, 0, false);
    listes[1] = new ListeElectorale("B", 25000, 0, false);
    listes[2] = new ListeElectorale("C", 16000, 0, false);
    listes[3] = new ListeElectorale("D", 12000, 0, false);
    listes[4] = new ListeElectorale("E", 8000, 0, false);
    listes[5] = new ListeElectorale("F", 4500, 0, false);
    listes[6] = new ListeElectorale("G", 2500, 0, false);
    // synchronous operation    
    // the seats for each list are calculated
    electionsMetier.calculerSieges(admin, listes)
            .subscribe((result) -> {
              listesCalculSieges1 = result;
            },
                    (th) -> {
                      calculSieges1Exception = (ElectionsException) th;
                    },
                    () -> {
                    });
    // check results
    Assert.assertNull(calculSieges1Exception);
    Assert.assertEquals(2, listesCalculSieges1[0].getSieges());
    Assert.assertFalse(listesCalculSieges1[0].isElimine());
    Assert.assertEquals(2, listesCalculSieges1[1].getSieges());
    Assert.assertFalse(listesCalculSieges1[1].isElimine());
    Assert.assertEquals(1, listesCalculSieges1[2].getSieges());
    Assert.assertFalse(listesCalculSieges1[2].isElimine());
    Assert.assertEquals(1, listesCalculSieges1[3].getSieges());
    Assert.assertFalse(listesCalculSieges1[3].isElimine());
    Assert.assertEquals(0, listesCalculSieges1[4].getSieges());
    Assert.assertFalse(listesCalculSieges1[4].isElimine());
    Assert.assertEquals(0, listesCalculSieges1[5].getSieges());
    Assert.assertTrue(listesCalculSieges1[5].isElimine());
    Assert.assertEquals(0, listesCalculSieges1[6].getSieges());
    Assert.assertTrue(listesCalculSieges1[6].isElimine());
  }

Task: Run the test [Test03] and verify that it passes.


20.2. Step 2

We will now port the synchronous console application from Chapter 17.5 into an application that remains synchronous but uses the asynchronous interface [RxJava, business logic, DAO];

We start with the project [elections-console-business-dao-security-webjson] [1] from Chapter 17.5, which we duplicate into a new project [elections-console-rxjava-business-dao-security-webjson] [2]:

  • In [3-4], in the new project, we remove the dependency on the old synchronous [business] layer;
  • In [5-9], we add a dependency on the new asynchronous [business] layer;
  • in [10-14], we rename the [ElectionsConsole] class to [ElectionsConsole01];

Similarly, we rename the [BootElectionsConsole] class to [BootElectionsConsole01]:

 

The current code for the [BootElectionsConsole01] class is as follows:


package elections.security.client.boot;
 
import elections.security.client.console.IElectionsUI;
 
 
public class BootElectionsConsole01 extends AbstractBootElections{
    public static void main(String[] arguments) {
        new BootElectionsConsole01().run();
    }
 
    @Override
    protected IElectionsUI getUI() {
        return ctx.getBean("electionsConsole",IElectionsUI.class);
    }
}
  • Line 13: Because we changed the class name from [ElectionsConsole] to [ElectionsConsole01], we must now write:

        return ctx.getBean("electionsConsole01",IElectionsUI.class);

Let’s go back to the code for the [ElectionsConsole01] class:


@Component
public class ElectionsConsole01 implements IElectionsUI {
 
    @Autowired
    private IElectionsMetier electionsMetier;
 
  @Autowired
  private User admin;
 
    @Override
    public void run() {
        // competing lists
        ListeElectorale[] listes;
        // data entry
        try (Scanner clavier = new Scanner(System.in)) {
         // lists in competition are requested from the [metier] layer
         listes = electionsMetier.getListesElectorales(admin);
            ...
        // we calculate the number of seats
        listes=electionsMetier.calculerSieges(admin,listes);
        // we record the results
        electionsMetier.recordResultats(admin,listes);
        ...
}

If we follow the example of test [Test01] in section 20.1.4.1, lines 5, 17, 20, and 22 will change as follows:


@Component
public class ElectionsConsole01 implements IElectionsUI {
 
  @Autowired
  private IRxElectionsMetier electionsMetier;
 
  @Autowired
  private User admin;
 
  @Override
  public void run() {
    // competing lists
    ListeElectorale[] listes;
    // data entry
    try (Scanner clavier = new Scanner(System.in)) {
      // lists in competition are requested from the [metier] layer
      listes = BlockingObservable.from(electionsMetier.getListesElectorales(admin)).first();
      ...
    // we calculate the number of seats
    listes = BlockingObservable.from(electionsMetier.calculerSieges(admin, listes)).first();
    // we record the results
    BlockingObservable.from(electionsMetier.recordResultats(admin, listes));
    ...
  }

Task: Configure the project to run the [BootElectionsConsole01] class with the three parameters [SS, Hours worked, Days worked] and verify that running the project as configured produces the expected results.



Task: Configure the project to run the pair [BootElectionsConsole02, ElectionsConsole02], where the class [ElectionsConsole02] has been written following the model of the [Test02] test in Section 20.1.4.2.



Task: Configure the project to run the pair [BootElectionsConsole03, ElectionsConsole03], where the class [ElectionsConsole03] has been written following the model of the test [Test03] in Section 20.1.4.3.


20.3. Step 3

We will now proceed to port the Swing application to an asynchronous environment.

We start by duplicating the [elections-swing-metier-dao-security-webjson] [1] project from Chapter 17.6 into a new project [elections-swing-rxjava-metier-dao-security-webjson] [2]:

  • in [3, 4], we remove the dependency on the synchronous [console] layer;
  • in [5-9], we add a dependency on the asynchronous console layer;

The [swing] layer will make true asynchronous calls to the [business] layer. When calling a method on the latter, there will be two threads:

  • the UI thread, which handles events;
  • an I/O thread that will execute the HTTP call to the server;

Throughout the asynchronous call, we should display a loading image and a cancel button. We won’t do that here, and it will be suggested to you as an improvement to the application. The changes are made in the two classes that make calls to the [business] layer:

 

20.3.1. Maven Configuration

Here, we will use the [RxSwing] library, which extends the [RxJava] library with features available only in a Swing environment. To do this, we modify the [pom.xml] file as follows:


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>istia.st.elections</groupId>
  <artifactId>elections-swing-rxjava-metier-dao-security-webjson</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>elections-swing-rxjava-metier-dao-security-webjson</name>
  <description>couche swing asynchrone du client web / jSON</description>
 
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>1.8</java.version>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>
 
  <dependencies>
    <!-- RxSwing -->
    <!-- https://mvnrepository.com/artifact/io.reactivex/rxswing -->
    <dependency>
      <groupId>io.reactivex</groupId>
      <artifactId>rxswing</artifactId>
      <version>0.27.0</version>
    </dependency>
    <!-- lower layers -->
    <dependency>
      <groupId>istia.st.elections</groupId>
      <artifactId>elections-console-rxjava-metier-dao-security-webjson</artifactId>
      <version>0.0.1-SNAPSHOT</version>
    </dependency>
  </dependencies>
 
</project>

20.3.2. The [ElectionsConnectForm] class

In an asynchronous operation, the [ElectionsConnectForm] class becomes the following:


package elections.security.client.swing;
 
import elections.security.client.console.IElectionsUI;
import elections.security.client.entities.User;
 
import java.awt.Dimension;
import java.awt.Toolkit;
import javax.swing.SwingUtilities;
 
import elections.security.client.metier.IRxElectionsMetier;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import rx.schedulers.Schedulers;
import rx.schedulers.SwingScheduler;

@Component
public class ElectionsConnectForm extends AbstractElectionsConnectForm implements IElectionsUI {
 
  private static final long serialVersionUID = 1L;
 
  // reference to the asynchronous [business] layer
  @Autowired
  private IRxElectionsMetier metier;
 
  // logged-in user
  private User user;
 
  // main form
  @Autowired
  private ElectionsMainForm electionsMainForm;
 
  // session UI
  @Autowired
  private UiSession uiSession;
 
  @Override
  protected void doConnect() {
    if (isPageValid()) {
      // user authentication
      metier.authenticate(user).subscribeOn(Schedulers.io()).observeOn(SwingScheduler.getInstance()).subscribe(
        // there is no answer
        (result) -> {
        },
        // exception management
        (th) -> {
          // we note the error
          String info = getInfoForException("Les erreurs suivantes se sont produites :", th);
          // display info
          jTextPaneErreurs.setText(info);
          jTextPaneErreurs.setCaretPosition(0);
 
        },
        // authentication is complete
        () -> {
          // the user is stored in the session
          uiSession.setUser(user);
          // connection view is hidden
          setVisible(false);
          // the main view is displayed
          electionsMainForm.run();
        });
    }
  }
 
  // initializations
  @Override
  protected void init() {
    ...
  }
 
  @Override
  public void run() {
    // the graphical interface is displayed
    SwingUtilities.invokeLater(new Runnable() {
      public void run() {
        init();
        setVisible(true);
      }
    });
  }
 
  private boolean isPageValid() {
    ...
  }
 
  private String getInfoForException(String message, Throwable ex) {
    ...
  }
 
}
  • Lines 36–63: The [doConnect] method is executed when the user taps the [Connect] menu option:
 

It's all in line 40:


      metier.authenticate(user).subscribeOn(Schedulers.io()).observeOn(SwingScheduler.getInstance()).subscribe(...)
  • The observed process is [metier.authenticate(user)];
  • it will be executed on an I/O thread taken from the [Schedulers.io()] pool;
  • it will be observed in the UI thread, the one that handles Swing interface events [observeOn(SwingScheduler.getInstance())]. This thread is obtained via the method [SwingScheduler.getInstance()], where [SwingScheduler] is a class provided by the [RxSwing] library. This is mandatory. When the result of the asynchronous operation is obtained, it is often used to modify elements of the Swing interface. However, the Swing interface can only be modified in the UI thread; otherwise, an exception is thrown. Lines 41–61 must therefore execute in the UI thread. This is ensured here by the [observeOn(SwingScheduler.getInstance())] method;

Let’s comment on the rest of the code:

  • lines 42–43: these lines are there to comply with the syntax of the [subscribe] method. They will never be executed because the [metier.authenticate(user)] process returns no result;
  • lines 35–52: when an exception is received, it is displayed;
  • lines 54-61: executed when the [metier.authenticate(user)] process signals the end of its emissions;

20.3.3. The [ElectionsMainForm] class

 

20.3.3.1. Initializing the graphical user interface


package elections.security.client.swing;
 
import elections.security.client.console.IElectionsUI;
import elections.security.client.entities.ListeElectorale;
import elections.security.client.entities.User;
import elections.security.client.metier.IRxElectionsMetier;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import rx.schedulers.Schedulers;
import rx.schedulers.SwingScheduler;
 
import javax.swing.*;
import java.awt.*;
import java.util.ArrayList;
import java.util.List;
 
@Component
public class ElectionsMainForm extends AbstractElectionsMainForm implements IElectionsUI {
 
  private static final long serialVersionUID = 1L;
 
  // reference to the asynchronous [business] layer
  @Autowired
  private IRxElectionsMetier metier;
 
  // session UI
  @Autowired
  private UiSession uiSession;
 
  // logged-in user
  private User user;
 
  // list templates JList
  private DefaultListModel<String> modèleNomsVoix = null;
  private DefaultListModel<String> modèleRésultats = null;
 
  // competing lists
  private ListeElectorale[] listes;
 
  // user-entered lists
  private final List<ListeElectorale> listesSaisies = new ArrayList<>();
  private ListeElectorale[] tListesSaisies;
 
  // initializations
  @Override
  protected void init() {
    // generation of components by the parent class
    super.init();
    // form status
    Utilitaires.setEnabled(new JLabel[]{jLabelAjouter, jLabelCalculer, jLabelEnregistrer, jLabelSupprimer}, false);
    Utilitaires.setEnabled(
            new JMenuItem[]{jMenuItemAjouter, jMenuItemCalculer, jMenuItemEnregistrer, jMenuItemSupprimer}, false);
    // center window
    Dimension screenSize = Toolkit.getDefaultToolkit().getScreenSize();
    Dimension frameSize = getSize();
    if (frameSize.height > screenSize.height) {
      frameSize.height = screenSize.height;
    }
    if (frameSize.width > screenSize.width) {
      frameSize.width = screenSize.width;
    }
    setLocation((screenSize.width - frameSize.width) / 2, (screenSize.height - frameSize.height) / 2);
    // logged-in user
    user = uiSession.getUser();
    // local initializations
    modèleNomsVoix = new DefaultListModel<>();
    jListNomsVoix.setModel(modèleNomsVoix);
    modèleRésultats = new DefaultListModel<>();
    jListResultats.setModel(modèleRésultats);
    // lists are requested from the [business] layer
    metier.getListesElectorales(user).subscribeOn(Schedulers.io()).observeOn(SwingScheduler.getInstance())
            .subscribe(
                    // answer
                    listesElectorales -> {
                      // memorize lists
                      listes = listesElectorales;
                    },
                    // exception
                    (th) -> showException(th),
                    // observable purpose
                    () -> {
                      // next step
                      doInitStep2();
                    });
  }
...
  • line 46: the [init] method is executed when the associated window is about to be displayed. Its purpose is to initialize components [1-3] below:
 
  • lines 71–85: the candidate lists are requested asynchronously (component [1]);
  • line 71: the observed process is [metier.getListesElectorales(user)]. It is executed on an I/O thread [subscribeOn(Schedulers.io())] and observed on the UI thread [observeOn(SwingScheduler.getInstance())];
  • lines 74–77: the result returned by the observed process is stored in the [listes] field on line 38;
  • line 79: any exception is handled by the following method:

  private void showException(Throwable th) {
    // exception is displayed
    jTextPaneMessages.setText(getInfoForException("Les erreurs suivantes se sont produites : ", th));
    jTextPaneMessages.setCaretPosition(0);
}
  • Lines 81–84: At the end of the observed process, lines 81–84 are executed. These lines are not executed if an exception occurred. The [doInitStep2] method performs step 2 of the initialization as follows:

  private void doInitStep2() {
    // associate list names with the jComboBoxNomsListes combo
    for (int i = 0; i < listes.length; i++) {
      jComboBoxNomsListes.addItem(String.format("%s - %s", listes[i].getId(), listes[i].getNom()));
    }
    // number of seats to be filled
    metier.getNbSiegesAPourvoir(user).subscribeOn(Schedulers.io()).observeOn(SwingScheduler.getInstance())
            .subscribe(
                    // answer
                    nbSiegesAPourvoir -> {
                      // initialize the label linked to this information
                      jLabelSAP.setText(jLabelSAP.getText() + nbSiegesAPourvoir);
                    },
                    // exception
                    (th) -> showException(th),
                    // observable purpose
                    () -> {
                      // next step
                      doInitStep3();
                    });
}
  • lines 3–5: we use the result from the previous step to populate the dropdown list with the names of the candidate lists;
  • lines 7–20: we request the number of seats to be filled asynchronously;
  • line 7: the observed process is [metier.getNbSiegesAPourvoir(user)]. It is executed on an I/O thread [subscribeOn(Schedulers.io())] and observed on the UI thread [observeOn(SwingScheduler.getInstance())];
  • lines 10–13: the result returned by the process is used to update the graphical user interface;
  • line 15: any exceptions are displayed;
  • lines 17–20: upon receiving the end signal from the observable, we proceed to step 3 of the initialization process;

Step 3 of the initialization is handled by the following code:


  private void doInitStep3() {
    // electoral threshold
    metier.getSeuilElectoral(user).subscribeOn(Schedulers.io()).observeOn(SwingScheduler.getInstance())
            .subscribe(
                    // answer
                    seuilElectoral -> {
                      // initialize the label linked to this information
                      jLabelSE.setText(jLabelSE.getText() + seuilElectoral);
                    },
                    // exception
                    (th) -> showException(th),
                    // observable purpose
                    () -> {
                    });
}
  • lines 3-4: the electoral threshold is requested asynchronously;
  • line 3: the observed process is [business.getVotingThreshold(user)]. It is executed on an I/O thread [subscribeOn(Schedulers.io())] and observed on the UI thread [observeOn(SwingScheduler.getInstance())];
  • lines 6-9: the result returned by the process is used to update the GUI;
  • line 11: any exceptions are displayed;
  • lines 13–14: upon receiving the end signal from the observable, no action is taken: the GUI initialization process is complete;

20.3.3.2. Calculating the seats won by the various lists

The [doCalculer] method is responsible for calculating the number of seats won by the various lists:


  @Override
  protected void doCalculer() {
    tListesSaisies = listesSaisies.toArray(new ListeElectorale[0]);
    // calcul des sièges
    String info = null;
    metier.calculerSieges(user, tListesSaisies).subscribeOn(Schedulers.io()).observeOn(SwingScheduler.getInstance())
            .subscribe(
                    // traitement résultat
                    result -> consumeResultSieges(result),
                    // traitement exception
                    th -> showException(th),
                    // fin observable
                    () -> {
                    }
            );
}
  • lines 6–15: the seats obtained by the various lists are calculated asynchronously;
  • line 6: the observed process is [metier.calculateSeats(user, tListsEntered)]. It is executed on an I/O thread [subscribeOn(Schedulers.io())] and observed on the UI thread [observeOn(SwingScheduler.getInstance())];
  • line 9: the result returned by the process is used by the method [consumeResultSeats];
  • line 11: any exceptions are displayed;
  • lines 13–14: upon receiving the end signal from the observable, no action is taken;

Line 9: the [consumeResultSieges] method processes the result returned by the observed process, updating the candidate lists with their [seats, eliminated] fields:


  private void consumeResultSieges(ListeElectorale[] tListesSaisies) {
    // the result is stored
    this.tListesSaisies = tListesSaisies;
    // display of results
    modèleRésultats.clear();
    for (int i = 0; i < tListesSaisies.length; i++) {
      modèleRésultats.addElement(tListesSaisies[i].toString());
    }
    // maj state form
    Utilitaires.setEnabled(new JLabel[]{jLabelEnregistrer}, true);
    Utilitaires.setEnabled(new JLabel[]{jLabelCalculer}, false);
    Utilitaires.setEnabled(new JMenuItem[]{jMenuItemEnregistrer}, true);
    Utilitaires.setEnabled(new JMenuItem[]{jMenuItemCalculer}, false);
    jTextPaneMessages.setText("Calcul terminé");
}
  • lines 4-14: the result is used to update the GUI;

20.3.3.3. Recording the election results

The election results are saved using the following [doEnregistrer] method:


  @Override
  protected void doEnregistrer() {
    // on demande l'enregistrement à la couche [métier]
    metier.recordResultats(user, tListesSaisies).subscribeOn(Schedulers.io()).observeOn(SwingScheduler.getInstance())
            .subscribe(
                    // traitement du résultat - il n'y en a pas ici
                    (param) -> {
                    },
                    // traitement de l'exception
                    (th) -> showException(th),
                    // fin observable
                    () -> {
                      // maj du formulaire
                      Utilitaires.setEnabled(new JLabel[]{jLabelEnregistrer}, false);
                      Utilitaires.setEnabled(new JMenuItem[]{jMenuItemEnregistrer}, false);
                      jTextPaneMessages.setText("Enregistrement des résultats réalisé");
                    }
            );
}
  • lines 4–17: the election results are saved asynchronously;
  • line 4: the observed process is [business.recordResults(user, tEnteredLists)]. It is executed on an I/O thread [subscribeOn(Schedulers.io())] and observed on the UI thread [observeOn(SwingScheduler.getInstance())];
  • lines 7–8: these lines will never be executed because the observed process does not return a result;
  • line 10: any exception is displayed;
  • lines 14–16: upon receiving the end signal from the observable, the GUI is updated;

Task: Verify that the Swing application works. Then modify the GUI and the code so that during an asynchronous operation with the web server/JSON, a loading image appears along with an option to cancel the current operation.