Skip to content

20. 使用 RxJava 进行异步编程

推荐阅读:[RxJava 入门:在 Swing 和 Android 环境中的应用]

在本章中,我们将回顾第17.6章,当时我们构建了一个具有以下架构的客户端/服务器应用程序:

在[1]中的Swing界面上,某些用户操作会通过HTTP网络[2]触发一系列操作,最终到达[3]中的数据库。因此,用户操作的响应到达所需的时间可能会有所不同。在用户界面中添加一个加载指示器,并提供在操作耗时过长时取消该操作的选项,将非常有用。 在第17.6章中,所有需要与服务器进行数据交换的用户操作都是同步的。 代码执行的事件处理程序在收到响应之前不会完成。在此期间,图形界面处于冻结状态:它不会响应新的用户操作。这些操作会被简单地排队,待当前运行的事件处理程序完成后再进行处理。因此,如果显示了一个取消按钮,用户虽然可以点击它,但在当前操作完成之前不会发生任何变化。这样一来,取消按钮就失去了作用。

要使“取消”按钮的点击生效,当前操作必须先完成。为实现这一点,必须以异步方式启动该可能耗时较长的操作:

  • 事件处理程序启动该耗时操作,但不等待其结果,而是将控制权交还给处理图形界面事件的 UI 线程。该耗时操作在与 UI 线程不同的线程上运行,从而避免了 UI 线程被阻塞;
  • 如果用户在长时间运行的操作完成前点击了取消按钮,处于空闲状态的 UI 线程即可处理此事件。此时可以通过忽略该操作的结果来终止该长时间运行的操作;
  • 如果长时间运行的操作未被取消,响应的到达将在 UI 线程中触发一个事件。如果 UI 线程处于空闲状态,它将执行与该事件关联的代码,从而处理该响应;

用户界面将照常运行。如果服务器的响应速度很快,用户不会察觉任何差异。如果响应速度明显较慢,用户将看到一个取消按钮出现,并可选择中断当前操作。

[Rx] 库支持异步编程。其主要优势在于已移植到多种环境(Java、.NET、JS 等),且在一种环境中的熟练技能可轻松迁移到另一种环境。在此,我们将参考文档 [《RxJava 入门:Swing 与 Android 环境的应用》]。建议读者阅读该章节。在接下来的章节中,我们将使用该章例题中的代码。

我们将按以下方式演进应用程序的架构:

  • 在[1]中,我们在[Swing]层与[业务逻辑]层之间插入了一个[RxJava]层。业务逻辑层中的方法现在将以异步方式被调用;

我们将分几个步骤进行:

  • 步骤 1:当前 [业务逻辑、DAO] 层向 [UI] 层提供的是同步接口。我们将将其转换为异步的 [RxJava、业务逻辑、DAO] 层;
  • 步骤 2:我们将把同步的控制台应用程序转换为一个仍保持同步但使用异步接口 [RxJava, 业务, DAO] 的应用程序;
  • 步骤 3:我们将把同步的 Swing 应用程序转换为异步的 Swing 应用程序;

20.1. 步骤 1

我们将当前的同步层 [业务逻辑、DAO] 转换为异步层 [RxJava、业务逻辑、DAO]。

20.1.1. 创建

我们从第 17.4 章的 Maven 项目开始,并在 NetBeans 中打开该项目:

我们将该项目 [1] 复制(复制/粘贴)到一个新项目 [elections-rxjava-business-dao-security-webjson] [2] 中。

20.1.2. Maven 配置

我们更新新项目的 [pom.xml] 文件,以添加对 [RxJava] 库的依赖:


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>istia.st.elections</groupId>
  <artifactId>elections-metier-dao-security-rxjava-webjson</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <description>Client jUnit du serveur web / jSON</description>
  <name>elections-metier-dao-security-rxjava-webjson</name>
 
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>1.8</java.version>
  </properties>
 
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.2.7.RELEASE</version>
  </parent>
 
  <dependencies>
    <!-- Spring -->
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-web</artifactId>
    </dependency>
    <!-- jSON library used by Spring -->
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-core</artifactId>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
    </dependency>
    <!-- component used by Spring RestTemplate -->
    <dependency>
      <groupId>org.apache.httpcomponents</groupId>
      <artifactId>httpclient</artifactId>
    </dependency>
    <!-- Google Guava -->
    <dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>16.0.1</version>
      <scope>test</scope>
    </dependency>
    <!-- log library -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-logging</artifactId>
    </dependency>
    <!-- Spring Boot Test -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <!-- Spring Boot -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot</artifactId>
      <scope>test</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/io.reactivex/rxjava -->
    <dependency>
      <groupId>io.reactivex</groupId>
      <artifactId>rxjava</artifactId>
      <version>1.2.0</version>
    </dependency>
 
  </dependencies>
 
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.18.1</version>
      </plugin>
    </plugins>
  </build>
</project>
  • 第 65–70 行:我们添加了对 RxJava 库的依赖;

20.1.3. [业务]层的异步实现

为了实现 [RxJava, 业务] 层,我们将异步接口 [IRxElectionsMetier] [1] 及其实现类 [RxElectionsMetier] [2] 添加到项目中:

  

[IRxElectionsMetier] 接口是 [RxJava, 业务] 层的异步接口。其代码如下:


package elections.security.client.metier;
 
import elections.security.client.entities.ListeElectorale;
import elections.security.client.entities.User;
import rx.Observable;
 
public interface IRxElectionsMetier {
 
  // authentication
  Observable<Void> authenticate(User user);
 
  // get the lists in competition
  Observable<ListeElectorale[]> getListesElectorales(User user);
 
  // the number of seats to be filled
  Observable<Integer> getNbSiegesAPourvoir(User user);
 
  // the electoral threshold
  Observable<Double> getSeuilElectoral(User user);
 
  // recording results
  Observable<Void> recordResultats(User user, ListeElectorale[] listesElectorales);
 
  // calculating seats
  Observable<ListeElectorale[]> calculerSieges(User user, ListeElectorale[] listesElectorales);
 
}

[IRxElectionsMetier] 接口继承了 [IElectionsMetier] 接口中的方法,但当 [IElectionsMetier] 接口中的方法 M 返回类型为 T 的结果时,[IRxElectionsMetier] 接口中的方法 M 则返回类型为 Observable<T> 的结果。[Observable] 类型由 RxJava 库提供。 Observable<T> 类型提供了 [subscribe] 方法,该方法以异步方式获取类型 T。此方法关联了三个事件:

  • onSuccess(T result),用于通知类型 T 的结果已可用。异步操作可能返回多个结果;
  • onError(Throwable th),用于通知异步操作遇到错误;
  • onCompleted(),用于通知异步操作已完成;

在调用 [Observable.subscribe] 方法之前,与该可观察对象关联的异步操作不会被启动。调用 [IRxElectionsMetier] 接口的方法 M 的代码不会直接获得预期结果 T,而是获得一个 Observable<T> 类型,该类型允许其通过调用 [Observable.subscribe] 方法在后续获取结果 T。

[IRxElectionsMetier] 接口的 [RxElectionsMetier] 实现如下:


package elections.security.client.metier;
 
import elections.security.client.entities.ListeElectorale;
import elections.security.client.entities.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import rx.Observable;
 
@Component
public class RxElectionsMetier implements IRxElectionsMetier {
 
  @Autowired
  private IElectionsMetier metier;
 
  @Override
  public Observable<Void> authenticate(User user) {
    ...
  }
 
  @Override
  public Observable<ListeElectorale[]> getListesElectorales(User user) {
    return Observable.create(subscriber -> {
      try {
        // call synchronous method then reply to subscriber
        subscriber.onNext(metier.getListesElectorales(user));
        // we signal the end of the observable
        subscriber.onCompleted();
      } catch (Exception e) {
        // we forward the exception
        subscriber.onError(e);
      }
    });
  }
 
  @Override
  public Observable<Integer> getNbSiegesAPourvoir(User user) {
    ...
  }
 
  @Override
  public Observable<Double> getSeuilElectoral(User user) {
    ...
  }
 
  @Override
  public Observable<Void> recordResultats(User user, ListeElectorale[] listesElectorales) {
    ...
  }
 
  @Override
  public Observable<ListeElectorale[]> calculerSieges(User user, ListeElectorale[] listesElectorales) {
    ...
  }
}
  • 第 12-13 行:对同步业务层进行 Spring 注入;
  • 第 20-34 行:我们将对 [getVoterLists] 方法进行说明,该方法不再返回 [VoterList[]] 类型,而是返回 [Observable<VoterList[]>] 类型;
  • 第 22-32 行:静态方法 [Observable.create] 允许您根据 [Subscriber] 类型创建一个 Observable。[Subscriber] 类型代表了被观察过程(即 Observable)所产生结果流的订阅者。它提供了三个方法:
    • [Subscriber.onNext](第 25 行)用于接收来自被观察过程的结果;
    • [Subscriber.onError](第 30 行)用于接收来自被观察过程的异常。发生异常后,[Observable] 类型将不再发布结果;
    • [Subscriber.onCompleted](第 27 行)用于接收来自被观察过程的发射结束信号。在此,被观察过程仅发射一个元素。请注意,如果发生异常,则不会发出此信号。这是 Observables 的默认行为:异常的发射同时也标志着发射的结束。订阅者会意识到这一点;
  • 第 22–34 行:[Observable.create] 方法将 [Observable.OnSubscribe] 类型作为参数。该类型是一个函数式接口。这一概念随 Java 8 引入,指具有单一方法的接口。此处,[Observable.OnSubscribe] 接口的唯一方法如下:
T call(Subscriber<T> subscriber)

要实现一个单方法函数接口 m(param1, param2, ..., paramn),可以使用以下简化语法:

(param1, param2, ..., paramn) -> { code de la méthode m}

第 22–34 行就是这样实现的:

  • [subscriber] 是 [Observable.OnSubscribe.call] 方法的参数;
  • 第 23–32 行:我们要提供给 [call] 方法的代码;
  • 第 25 行:我们从第 13 行注入的 [business] 层同步请求选民名单。因此,我们需要等待结果。收到结果后,将其传递给订阅者的 [onNext] 方法;
  • 第 28 行:若发生错误,将异常传递给订阅者的 [onError] 方法;
  • 第 31 行:我们等待单个结果。一旦获得结果(无论是选民名单还是异常),便通知订阅者被观察的过程已停止发布结果;

需要特别注意的是,[RxElectionsMetier] 方法返回的是 Observable<VoterList[]> 类型,而非 VoterList[] 类型本身。调用方必须调用 Observable<VoterList[]>.subscribe 方法,才能使第 23–33 行的代码执行,并通过第 25 行返回选民名单。

其他方法的代码类似:


package elections.security.client.metier;
 
import elections.security.client.entities.ListeElectorale;
import elections.security.client.entities.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import rx.Observable;
 
@Component
public class RxElectionsMetier implements IRxElectionsMetier {
 
  @Autowired
  private IElectionsMetier metier;
 
  @Override
  public Observable<Void> authenticate(User user) {
    return Observable.create(subscriber -> {
      try {
        // synchronous method call
        metier.authenticate(user);
        // we signal the end of the observable
        subscriber.onCompleted();
      } catch (Exception e) {
        // we forward the exception
        subscriber.onError(e);
      }
    });
 
  }
 
  @Override
  public Observable<ListeElectorale[]> getListesElectorales(User user) {
    return Observable.create(subscriber -> {
      try {
        // call synchronous method then reply to subscriber
        subscriber.onNext(metier.getListesElectorales(user));
        // we signal the end of the observable
        subscriber.onCompleted();
      } catch (Exception e) {
        // we forward the exception
        subscriber.onError(e);
      }
    });
  }
 
  @Override
  public Observable<Integer> getNbSiegesAPourvoir(User user) {
    return Observable.create(subscriber -> {
      try {
        // call synchronous method then reply to subscriber
        subscriber.onNext(metier.getNbSiegesAPourvoir(user));
        // we signal the end of the observable
        subscriber.onCompleted();
      } catch (Exception e) {
        // we forward the exception
        subscriber.onError(e);
      }
    });
  }
 
  @Override
  public Observable<Double> getSeuilElectoral(User user) {
    return Observable.create(subscriber -> {
      try {
        // call synchronous method then reply to subscriber
        subscriber.onNext(metier.getSeuilElectoral(user));
        // we signal the end of the observable
        subscriber.onCompleted();
      } catch (Exception e) {
        // we forward the exception
        subscriber.onError(e);
      }
    });
  }
 
  @Override
  public Observable<Void> recordResultats(User user, ListeElectorale[] listesElectorales) {
    return Observable.create(subscriber -> {
      try {
        // synchronous method call
        metier.recordResultats(user, listesElectorales);
        // we signal the end of the observable
        subscriber.onCompleted();
      } catch (Exception e) {
        // we forward the exception
        subscriber.onError(e);
      }
    });
  }
 
  @Override
  public Observable<ListeElectorale[]> calculerSieges(User user, ListeElectorale[] listesElectorales) {
    return Observable.create(subscriber -> {
      try {
        // call synchronous method then reply to subscriber
        subscriber.onNext(metier.calculerSieges(user, listesElectorales));
        // we signal the end of the observable
        subscriber.onCompleted();
      } catch (Exception e) {
        // we forward the exception
        subscriber.onError(e);
      }
    });
  }
}
  • 第 20 行和第 81 行:由于订阅者不期望任何结果,因此未调用订阅者的 [onNext] 方法;

20.1.4. 针对 [业务] 层的 JUnit 测试

  

20.1.4.1. Test01

我们重新审视第17.4.4节中讨论的单元测试[Test01]。该测试原本设计为对[IElectionsMetier]接口进行同步调用。我们将它修改为对新的[IRxElectionsMetier]接口进行同步调用。实际上,确实可以对异步的RxJava接口进行同步调用。代码如下:


package elections.security.client.metier.junit;
 
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
 
import elections.security.client.config.MetierConfig;
import elections.security.client.entities.ElectionsException;
import elections.security.client.entities.ListeElectorale;
import elections.security.client.entities.User;
import elections.security.client.metier.IRxElectionsMetier;
import rx.observables.BlockingObservable;
 
@SpringApplicationConfiguration(classes = MetierConfig.class)
@RunWith(SpringJUnit4ClassRunner.class)
public class Test01 {
 
  // layer [electionsMetier]
  @Autowired
  private IRxElectionsMetier electionsMetier;
 
  // mapper jSON
  private final ObjectMapper mapper = new ObjectMapper();
 
  // users
  static private User admin;
  static private User user;
  static private User unknown;
 
  @BeforeClass
  public static void initTest() {
    admin = new User("admin", "admin");
    user = new User("user", "user");
    unknown = new User("x", "y");
  }
 
  @Test()
  public void checkUserUser() {
    ElectionsException se = null;
    try {
      BlockingObservable.from(electionsMetier.authenticate(user)).firstOrDefault(null);
    } catch (ElectionsException e) {
      se = e;
    }
    Assert.assertNotNull(se);
    Assert.assertEquals("403 Forbidden", se.getErreurs().get(0));
  }
 
  @Test()
  public void checkUserUnknown() {
    ElectionsException se = null;
    try {
      BlockingObservable.from(electionsMetier.authenticate(unknown)).firstOrDefault(null);
    } catch (ElectionsException e) {
      se = e;
    }
    Assert.assertNotNull(se);
    Assert.assertEquals("401 Unauthorized", se.getErreurs().get(0));
  }
 
  @Test()
  public void checkUserAdmin() {
    ElectionsException se = null;
    try {
      BlockingObservable.from(electionsMetier.authenticate(admin)).firstOrDefault(null);
    } catch (ElectionsException e) {
      se = e;
    }
    Assert.assertNull(se);
  }
 
  /**
   * vérification 1 : méthode de calcul des sièges on fixe en dur les listes
   */
  @Test
  public void calculSieges1() {
    // create the table of 7 candidate lists
    ListeElectorale[] listes = new ListeElectorale[7];
    listes[0] = new ListeElectorale("A", 32000, 0, false);
    listes[1] = new ListeElectorale("B", 25000, 0, false);
    listes[2] = new ListeElectorale("C", 16000, 0, false);
    listes[3] = new ListeElectorale("D", 12000, 0, false);
    listes[4] = new ListeElectorale("E", 8000, 0, false);
    listes[5] = new ListeElectorale("F", 4500, 0, false);
    listes[6] = new ListeElectorale("G", 2500, 0, false);
    // the seats for each list are calculated
    listes = BlockingObservable.from(electionsMetier.calculerSieges(admin, listes)).first();
    // check results
    Assert.assertEquals(2, listes[0].getSieges());
    Assert.assertFalse(listes[0].isElimine());
    Assert.assertEquals(2, listes[1].getSieges());
    Assert.assertFalse(listes[1].isElimine());
    Assert.assertEquals(1, listes[2].getSieges());
    Assert.assertFalse(listes[2].isElimine());
    Assert.assertEquals(1, listes[3].getSieges());
    Assert.assertFalse(listes[3].isElimine());
    Assert.assertEquals(0, listes[4].getSieges());
    Assert.assertFalse(listes[4].isElimine());
    Assert.assertEquals(0, listes[5].getSieges());
    Assert.assertTrue(listes[5].isElimine());
    Assert.assertEquals(0, listes[6].getSieges());
    Assert.assertTrue(listes[6].isElimine());
  }
 
  /**
   * vérification 2 : méthode de calcul des sièges on demande les listes à la couche [metier] puis on fixe en dur les
   * voix
   */
  @Test
  public void calculSieges2() {
    // create the table of 7 candidate lists
    ListeElectorale[] listes = BlockingObservable.from(electionsMetier.getListesElectorales(admin)).first();
    // the voices are hard-fixed
    listes[0].setVoix(32000);
    listes[1].setVoix(25000);
    listes[2].setVoix(16000);
    listes[3].setVoix(12000);
    listes[4].setVoix(8000);
    listes[5].setVoix(4500);
    listes[6].setVoix(2500);
    // the seats obtained by each list are calculated
    listes = BlockingObservable.from(electionsMetier.calculerSieges(admin, listes)).first();
    // check results
    Assert.assertEquals(2, listes[0].getSieges());
    Assert.assertFalse(listes[0].isElimine());
    Assert.assertEquals(2, listes[1].getSieges());
    Assert.assertFalse(listes[1].isElimine());
    Assert.assertEquals(1, listes[2].getSieges());
    Assert.assertFalse(listes[2].isElimine());
    Assert.assertEquals(1, listes[3].getSieges());
    Assert.assertFalse(listes[3].isElimine());
    Assert.assertEquals(0, listes[4].getSieges());
    Assert.assertFalse(listes[4].isElimine());
    Assert.assertEquals(0, listes[5].getSieges());
    Assert.assertTrue(listes[5].isElimine());
    Assert.assertEquals(0, listes[6].getSieges());
    Assert.assertTrue(listes[6].isElimine());
  }
 
  /**
   * vérification 3 méthode de calcul des sièges on provoque une exception
   */
  @Test(expected = ElectionsException.class)
  public void calculSieges3() {
    // we create a table of 24 candidate lists, each with 1 vote
    ListeElectorale[] listes = new ListeElectorale[25];
    // all 25 lists will have the same number of votes (4%)
    for (int i = 0; i < listes.length; i++) {
      listes[i] = new ListeElectorale("Liste" + (i + 1), 1, 0, false);
    }
    // calculation of seats - normally there should be a ElectionsException
    // with an electoral threshold of 5%
    BlockingObservable.from(electionsMetier.calculerSieges(admin, listes)).first();
  }
 
  /**
   * enregistrement des résultats de l'élection
   *
   * @throws JsonProcessingException
   */
  @Test
  public void ecritureResultatsElections() throws JsonProcessingException {
    // create the table of 7 candidate lists
    ListeElectorale[] listes = BlockingObservable.from(electionsMetier.getListesElectorales(admin)).first();
    // the voices are hard-fixed
    listes[0].setVoix(32000);
    listes[1].setVoix(25000);
    listes[2].setVoix(16000);
    listes[3].setVoix(12000);
    listes[4].setVoix(8000);
    listes[5].setVoix(4500);
    listes[6].setVoix(2500);
    // the seats obtained by each list are calculated
    listes = BlockingObservable.from(electionsMetier.calculerSieges(admin, listes)).first();
    // display results
    for (int i = 0; i < listes.length; i++) {
      System.out.println(mapper.writeValueAsString(listes[i]));
    }
    // results are entered into the database
    BlockingObservable.from(electionsMetier.recordResultats(admin, listes)).firstOrDefault(null);
    // check results
    listes = BlockingObservable.from(electionsMetier.getListesElectorales(admin)).first();
    // display results
    for (int i = 0; i < listes.length; i++) {
      System.out.println(mapper.writeValueAsString(listes[i]));
    }
    Assert.assertEquals(2, listes[0].getSieges());
    Assert.assertFalse(listes[0].isElimine());
    Assert.assertEquals(2, listes[1].getSieges());
    Assert.assertFalse(listes[1].isElimine());
    Assert.assertEquals(1, listes[2].getSieges());
    Assert.assertFalse(listes[2].isElimine());
    Assert.assertEquals(1, listes[3].getSieges());
    Assert.assertFalse(listes[3].isElimine());
    Assert.assertEquals(0, listes[4].getSieges());
    Assert.assertFalse(listes[4].isElimine());
    Assert.assertEquals(0, listes[5].getSieges());
    Assert.assertTrue(listes[5].isElimine());
    Assert.assertEquals(0, listes[6].getSieges());
    Assert.assertTrue(listes[6].isElimine());
  }
}

让我们来看看这些改动:

  • 第 48 行:静态方法 [BlockingObservable.from(Observable).first]:
    • 订阅作为 [from] 参数传递的可观察对象;
    • 触发与该可观察对象关联的代码的执行;
    • 等待接收第一个结果。因此这是一项同步操作;

此处使用 [firstOrDefault(null)] 方法,是因为可观察对象 [metier.authenticate] 在执行时不会返回结果。因此 [firstOrDefault(null)] 方法的结果将为 null,该值在此处不会被使用;

在后续代码中,每当需要调用 [business] 层时,我们都会重复使用这一模式。

单元测试 [Test01] 必须通过:

 

任务:验证 [Test01] 测试是否通过。


20.1.4.2. Test02

我们将 [Test01] 测试进行修改,通过对其方法进行异步调用,从而测试异步接口 [IRxElectionsMetier]。

让我们来看一个初始测试:


  // thread synchronization semaphore
  private CountDownLatch latch;
 
  // -----------------------------------
  private ElectionsException checkUserUserException;
 
  @Test()
  public void checkUserUser() throws InterruptedException {
    // 1" semaphore
    latch = new CountDownLatch((1));
    // asynchronous operation
    electionsMetier.authenticate(user).subscribeOn(Schedulers.io())
            .subscribe((result) -> {
            },
                    (th) -> {
                      checkUserUserException = (ElectionsException) th;
                      latch.countDown();
                    },
                    () -> {
                      latch.countDown();
                    });
    // waiting for semaphore
    latch.await();
    // checking results
    Assert.assertNotNull(checkUserUserException);
    Assert.assertEquals("403 Forbidden", checkUserUserException.getErreurs().get(0));
}
  • 第2行:信号量是一种用于在线程之间进行同步的工具。线程是并行运行的执行流。为了执行任务 T1,线程 [Thread1] 可能需要由线程 [Thread2] 执行的任务 T2 先完成。因此,它会等待线程 [Thread2] 向其发送信号,以表明任务 T2 已完成。 管理两个线程之间这种同步的方法多种多样。此处采用的方法如下:
    • 第 10 行:线程 [Thread1] 创建一个值为 1 的信号量;
    • 第 12 行:线程 [Thread1] 创建并启动线程 [Thread2]。实现方式如下:

electionsMetier.authenticate(user).subscribeOn(Schedulers.io())

[Observable.subscribeOn] 方法用于设置被观察过程的运行线程。该方法的参数是一个线程池。RxJava 库提供了多种适用于不同场景的线程池。[Schedulers.io()] 线程池是推荐用于网络操作的;

  • (续)
    • 第 12-13 行:该操作

electionsMetier.authenticate(user).subscribeOn(Schedulers.io()).subscribe(...)

执行了封装在可观察对象 [authenticate(user)] 中的同步操作。但由于该同步操作是在 [Thread1] 以外的线程上启动的,因此 [Thread1] 不会等待 [subscribe] 方法的响应,而是直接执行下一条语句;

  • (续)
    • 第 23 行:[Thread1] 线程暂停并等待信号量变为 0(当前值为 1);
  • 第 13–21 行:[subscribe] 方法接受三个 lambda 函数作为参数:
    • 第一个 [(result)->{...}] 会在可观察对象 [authenticate(user)] 每次发出结果 [result] 时被调用。这里我们有一个可观察对象 [authenticate(user)],它会执行某些操作但不发出结果。因此,lambda 表达式 [(result)->{}] 将永远不会被调用。这就是为什么这里的代码是空的 [{}];
    • 第二个 [(th)->{...}] 接受 [Throwable] 类型的参数。当可观察对象的执行遇到异常时,它会被调用。在此,我们对 [Throwable th] 参数的处理如下:
      • 第 16 行:我们将它存储在测试类的 [ElectionsException] 类型字段中,因为被执行的可观察对象只抛出这种类型的异常;
      • 第 17 行:我们将信号量设置为 0,以表示 [Thread2] 线程已完成工作;
    • 第三个 [()->{...}] 会在可观察对象没有更多元素可发射时被调用。我们按以下方式处理此事件:
      • 第 20 行:我们将信号量设置为 0,以表示 [Thread2] 线程已完成工作;

请注意,如果发生异常,第三个 lambda 表达式将不会被调用。这就是为什么我们必须在第 17 行也将信号量设置为 0;

  • 第 25 行:当执行到此行时,可观察对象已完成工作。此时我们可以执行与测试 [Test01] 中相同的检查;

让我们再看一个测试:


// -----------------------------------
  private ElectionsException calculSieges1Exception;
  private ListeElectorale[] listesCalculSieges1;
 
  @Test
  public void calculSieges1() throws InterruptedException {
    // create the table of 7 candidate lists
    ListeElectorale[] listes = new ListeElectorale[7];
    listes[0] = new ListeElectorale("A", 32000, 0, false);
    listes[1] = new ListeElectorale("B", 25000, 0, false);
    listes[2] = new ListeElectorale("C", 16000, 0, false);
    listes[3] = new ListeElectorale("D", 12000, 0, false);
    listes[4] = new ListeElectorale("E", 8000, 0, false);
    listes[5] = new ListeElectorale("F", 4500, 0, false);
    listes[6] = new ListeElectorale("G", 2500, 0, false);
    // 1" semaphore
    latch = new CountDownLatch((1));
    // asynchronous operation    
    // the seats for each list are calculated
    electionsMetier.calculerSieges(admin, listes).subscribeOn(Schedulers.io())
            .subscribe((result) -> {
              listesCalculSieges1 = result;
            },
                    (th) -> {
                      calculSieges1Exception = (ElectionsException) th;
                      latch.countDown();
                    },
                    () -> {
                      latch.countDown();
                    });
    // waiting for semaphore
    latch.await();
    // check results
    Assert.assertNull(calculSieges1Exception);
    Assert.assertEquals(2, listesCalculSieges1[0].getSieges());
    Assert.assertFalse(listesCalculSieges1[0].isElimine());
    Assert.assertEquals(2, listesCalculSieges1[1].getSieges());
    Assert.assertFalse(listesCalculSieges1[1].isElimine());
    Assert.assertEquals(1, listesCalculSieges1[2].getSieges());
    Assert.assertFalse(listesCalculSieges1[2].isElimine());
    Assert.assertEquals(1, listesCalculSieges1[3].getSieges());
    Assert.assertFalse(listesCalculSieges1[3].isElimine());
    Assert.assertEquals(0, listesCalculSieges1[4].getSieges());
    Assert.assertFalse(listesCalculSieges1[4].isElimine());
    Assert.assertEquals(0, listesCalculSieges1[5].getSieges());
    Assert.assertTrue(listesCalculSieges1[5].isElimine());
    Assert.assertEquals(0, listesCalculSieges1[6].getSieges());
    Assert.assertTrue(listesCalculSieges1[6].isElimine());
  }
  • 第 20–30 行:异步执行可观察对象 [electionsMetier.calculateSeats(admin, lists)];
  • 第 21–23 行:可观察对象的执行返回 [VoterList[]] 类型,该结果存储在测试类的第 3 行字段中;
  • 第 34–48 行:这些检查来自 [Test01] 测试,我们在第 34 行添加了检查,以确保未发生异常;

完整的 [Test02] 测试可在课程材料中找到。


作业:运行 [Test02] 测试,并验证其是否通过。


20.1.4.3. Test03

测试 [Test03] 与测试 [Test01] 功能相同:它通过向 [IRxElectionsMetier] 接口发出同步调用,对该接口进行测试。该测试是测试 [Test02] 的副本,但有两处不同:

  • 可观察对象不再在与运行测试不同的线程中执行。当线程 [Thread1] 执行可观察对象的 [subscribe] 方法时,它会在线程 [Thread1] 上向服务器发起 HTTP 操作。整个 [subscribe] 方法因此变为同步操作;
  • 由于现在仅有一个线程,线程同步变得不再必要,因此信号量被移除;

以下是两个测试示例:


  // -----------------------------------
  private ElectionsException checkUserUserException;
 
  @Test()
  public void checkUserUser() throws InterruptedException {
    // synchronous operation
    electionsMetier.authenticate(user)
            .subscribe((result) -> {
            },
                    (th) -> {
                      checkUserUserException = (ElectionsException) th;
                    },
                    () -> {
                    });
    // checking results
    Assert.assertNotNull(checkUserUserException);
    Assert.assertEquals("403 Forbidden", checkUserUserException.getErreurs().get(0));
}
  • 第 7 行:默认情况下,方法 [electionsMetier.authenticate(user).subscribe] 在调用代码的线程中运行。因此,这是一项同步操作;

  // -----------------------------------
  private ElectionsException calculSieges1Exception;
  private ListeElectorale[] listesCalculSieges1;
 
  @Test
  public void calculSieges1() throws InterruptedException {
    // create the table of 7 candidate lists
    ListeElectorale[] listes = new ListeElectorale[7];
    listes[0] = new ListeElectorale("A", 32000, 0, false);
    listes[1] = new ListeElectorale("B", 25000, 0, false);
    listes[2] = new ListeElectorale("C", 16000, 0, false);
    listes[3] = new ListeElectorale("D", 12000, 0, false);
    listes[4] = new ListeElectorale("E", 8000, 0, false);
    listes[5] = new ListeElectorale("F", 4500, 0, false);
    listes[6] = new ListeElectorale("G", 2500, 0, false);
    // synchronous operation    
    // the seats for each list are calculated
    electionsMetier.calculerSieges(admin, listes)
            .subscribe((result) -> {
              listesCalculSieges1 = result;
            },
                    (th) -> {
                      calculSieges1Exception = (ElectionsException) th;
                    },
                    () -> {
                    });
    // check results
    Assert.assertNull(calculSieges1Exception);
    Assert.assertEquals(2, listesCalculSieges1[0].getSieges());
    Assert.assertFalse(listesCalculSieges1[0].isElimine());
    Assert.assertEquals(2, listesCalculSieges1[1].getSieges());
    Assert.assertFalse(listesCalculSieges1[1].isElimine());
    Assert.assertEquals(1, listesCalculSieges1[2].getSieges());
    Assert.assertFalse(listesCalculSieges1[2].isElimine());
    Assert.assertEquals(1, listesCalculSieges1[3].getSieges());
    Assert.assertFalse(listesCalculSieges1[3].isElimine());
    Assert.assertEquals(0, listesCalculSieges1[4].getSieges());
    Assert.assertFalse(listesCalculSieges1[4].isElimine());
    Assert.assertEquals(0, listesCalculSieges1[5].getSieges());
    Assert.assertTrue(listesCalculSieges1[5].isElimine());
    Assert.assertEquals(0, listesCalculSieges1[6].getSieges());
    Assert.assertTrue(listesCalculSieges1[6].isElimine());
  }

任务:运行测试 [Test03] 并验证其是否通过。


20.2. 步骤 2

接下来,我们将把第 17.5 章中的同步控制台应用程序移植为一个仍保持同步但使用异步接口 [RxJava、业务逻辑、DAO] 的应用程序;

我们从第17.5章的项目 [elections-console-business-dao-security-webjson] [1] 开始将其复制为一个新项目 [elections-console-rxjava-business-dao-security-webjson] [2]:

  • 在新项目中,我们在 [3-4] 处移除了对旧的同步 [business] 层的依赖;
  • 在 [5-9] 中,我们引入了对新的异步 [业务] 层的依赖;
  • 在 [10-14] 中,我们将 [ElectionsConsole] 类重命名为 [ElectionsConsole01];

同样,我们将 [BootElectionsConsole] 类重命名为 [BootElectionsConsole01]:

 

[BootElectionsConsole01] 类的当前代码如下:


package elections.security.client.boot;
 
import elections.security.client.console.IElectionsUI;
 
 
public class BootElectionsConsole01 extends AbstractBootElections{
    public static void main(String[] arguments) {
        new BootElectionsConsole01().run();
    }
 
    @Override
    protected IElectionsUI getUI() {
        return ctx.getBean("electionsConsole",IElectionsUI.class);
    }
}
  • 第 13 行:由于我们将类名从 [ElectionsConsole] 改为 [ElectionsConsole01],现在必须写成:

        return ctx.getBean("electionsConsole01",IElectionsUI.class);

让我们回到 [ElectionsConsole01] 类的代码:


@Component
public class ElectionsConsole01 implements IElectionsUI {
 
    @Autowired
    private IElectionsMetier electionsMetier;
 
  @Autowired
  private User admin;
 
    @Override
    public void run() {
        // competing lists
        ListeElectorale[] listes;
        // data entry
        try (Scanner clavier = new Scanner(System.in)) {
         // lists in competition are requested from the [metier] layer
         listes = electionsMetier.getListesElectorales(admin);
            ...
        // we calculate the number of seats
        listes=electionsMetier.calculerSieges(admin,listes);
        // we record the results
        electionsMetier.recordResultats(admin,listes);
        ...
}

如果我们参照第20.1.4.1节中的测试[Test01]示例,第5、17、20和22行将按如下方式更改:


@Component
public class ElectionsConsole01 implements IElectionsUI {
 
  @Autowired
  private IRxElectionsMetier electionsMetier;
 
  @Autowired
  private User admin;
 
  @Override
  public void run() {
    // competing lists
    ListeElectorale[] listes;
    // data entry
    try (Scanner clavier = new Scanner(System.in)) {
      // lists in competition are requested from the [metier] layer
      listes = BlockingObservable.from(electionsMetier.getListesElectorales(admin)).first();
      ...
    // we calculate the number of seats
    listes = BlockingObservable.from(electionsMetier.calculerSieges(admin, listes)).first();
    // we record the results
    BlockingObservable.from(electionsMetier.recordResultats(admin, listes));
    ...
  }

任务:配置项目以运行 [BootElectionsConsole01] 类,并传入三个参数 [SS, 工作小时数, 工作日数],验证按此配置运行项目是否能产生预期结果。



任务:配置项目以运行 [BootElectionsConsole02, ElectionsConsole02] 这对组合,其中 [ElectionsConsole02] 类是根据第 20.1.4.2 节中的 [Test02] 测试模型编写的。



任务:配置项目以运行 [BootElectionsConsole03, ElectionsConsole03] 这对组合,其中 [ElectionsConsole03] 类是参照第 20.1.4.3 节中的 [Test03] 测试编写的。


20.3. 步骤 3

现在,我们将着手将该 Swing 应用程序移植到异步环境中。

首先,我们将第17.6章中的 [elections-swing-metier-dao-security-webjson] [1] 项目复制到一个新项目 [elections-swing-rxjava-metier-dao-security-webjson] [2] 中:

  • 在 [3, 4] 中,我们移除了对同步 [console] 层的依赖;
  • 在 [5-9] 中,我们引入了对异步控制台层的依赖;

[swing] 层将向 [business] 层发起真正的异步调用。在调用后者的方法时,将存在两个线程:

  • 处理事件的 UI 线程;
  • 一个 I/O 线程,负责执行对服务器的 HTTP 调用;

在整个异步调用过程中,我们应显示加载图标和取消按钮。本文中暂不实现此功能,这将作为应用程序的改进建议提供给您。相关修改将在两个调用 [business] 层的类中进行:

 

20.3.1. Maven 配置

在此,我们将使用 [RxSwing] 库,该库在 [RxJava] 库的基础上扩展了仅在 Swing 环境中可用的一些功能。为此,我们需要按以下方式修改 [pom.xml] 文件:


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>istia.st.elections</groupId>
  <artifactId>elections-swing-rxjava-metier-dao-security-webjson</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>elections-swing-rxjava-metier-dao-security-webjson</name>
  <description>couche swing asynchrone du client web / jSON</description>
 
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>1.8</java.version>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>
 
  <dependencies>
    <!-- RxSwing -->
    <!-- https://mvnrepository.com/artifact/io.reactivex/rxswing -->
    <dependency>
      <groupId>io.reactivex</groupId>
      <artifactId>rxswing</artifactId>
      <version>0.27.0</version>
    </dependency>
    <!-- lower layers -->
    <dependency>
      <groupId>istia.st.elections</groupId>
      <artifactId>elections-console-rxjava-metier-dao-security-webjson</artifactId>
      <version>0.0.1-SNAPSHOT</version>
    </dependency>
  </dependencies>
 
</project>

20.3.2. [ElectionsConnectForm] 类

在异步操作中,[ElectionsConnectForm] 类将变为如下形式:


package elections.security.client.swing;
 
import elections.security.client.console.IElectionsUI;
import elections.security.client.entities.User;
 
import java.awt.Dimension;
import java.awt.Toolkit;
import javax.swing.SwingUtilities;
 
import elections.security.client.metier.IRxElectionsMetier;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import rx.schedulers.Schedulers;
import rx.schedulers.SwingScheduler;

@Component
public class ElectionsConnectForm extends AbstractElectionsConnectForm implements IElectionsUI {
 
  private static final long serialVersionUID = 1L;
 
  // reference to the asynchronous [business] layer
  @Autowired
  private IRxElectionsMetier metier;
 
  // logged-in user
  private User user;
 
  // main form
  @Autowired
  private ElectionsMainForm electionsMainForm;
 
  // session UI
  @Autowired
  private UiSession uiSession;
 
  @Override
  protected void doConnect() {
    if (isPageValid()) {
      // user authentication
      metier.authenticate(user).subscribeOn(Schedulers.io()).observeOn(SwingScheduler.getInstance()).subscribe(
        // there is no answer
        (result) -> {
        },
        // exception management
        (th) -> {
          // we note the error
          String info = getInfoForException("Les erreurs suivantes se sont produites :", th);
          // display info
          jTextPaneErreurs.setText(info);
          jTextPaneErreurs.setCaretPosition(0);
 
        },
        // authentication is complete
        () -> {
          // the user is stored in the session
          uiSession.setUser(user);
          // connection view is hidden
          setVisible(false);
          // the main view is displayed
          electionsMainForm.run();
        });
    }
  }
 
  // initializations
  @Override
  protected void init() {
    ...
  }
 
  @Override
  public void run() {
    // the graphical interface is displayed
    SwingUtilities.invokeLater(new Runnable() {
      public void run() {
        init();
        setVisible(true);
      }
    });
  }
 
  private boolean isPageValid() {
    ...
  }
 
  private String getInfoForException(String message, Throwable ex) {
    ...
  }
 
}
  • 第 36–63 行:当用户点击 [Connect] 菜单选项时,将执行 [doConnect] 方法:
 

答案都在第40行:


      metier.authenticate(user).subscribeOn(Schedulers.io()).observeOn(SwingScheduler.getInstance()).subscribe(...)
  • 被观察的过程是 [metier.authenticate(user)];
  • 它将在从 [Schedulers.io()] 池中获取的 I/O 线程上执行;
  • 该进程将在 UI 线程中被观察,该线程负责处理 Swing 界面事件 [observeOn(SwingScheduler.getInstance())]。该线程通过方法 [SwingScheduler.getInstance()] 获取,其中 [SwingScheduler] 是 [RxSwing] 库提供的类。此步骤是必需的。 当获得异步操作的结果时,通常会将其用于修改 Swing 界面的元素。然而,Swing 界面只能在 UI 线程中进行修改;否则将抛出异常。因此,第 41–61 行必须在 UI 线程中执行。此处通过 [observeOn(SwingScheduler.getInstance())] 方法确保了这一点;

下面对剩余代码进行说明:

  • 第 42–43 行:这些代码是为了符合 [subscribe] 方法的语法要求。它们永远不会被执行,因为 [metier.authenticate(user)] 过程不会返回任何结果;
  • 第 35–52 行:当接收到异常时,会将其显示出来;
  • 第 54–61 行:当 [metier.authenticate(user)] 流程发出结束信号时执行;

20.3.3. [ElectionsMainForm] 类

 

20.3.3.1. 初始化图形用户界面


package elections.security.client.swing;
 
import elections.security.client.console.IElectionsUI;
import elections.security.client.entities.ListeElectorale;
import elections.security.client.entities.User;
import elections.security.client.metier.IRxElectionsMetier;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import rx.schedulers.Schedulers;
import rx.schedulers.SwingScheduler;
 
import javax.swing.*;
import java.awt.*;
import java.util.ArrayList;
import java.util.List;
 
@Component
public class ElectionsMainForm extends AbstractElectionsMainForm implements IElectionsUI {
 
  private static final long serialVersionUID = 1L;
 
  // reference to the asynchronous [business] layer
  @Autowired
  private IRxElectionsMetier metier;
 
  // session UI
  @Autowired
  private UiSession uiSession;
 
  // logged-in user
  private User user;
 
  // list templates JList
  private DefaultListModel<String> modèleNomsVoix = null;
  private DefaultListModel<String> modèleRésultats = null;
 
  // competing lists
  private ListeElectorale[] listes;
 
  // user-entered lists
  private final List<ListeElectorale> listesSaisies = new ArrayList<>();
  private ListeElectorale[] tListesSaisies;
 
  // initializations
  @Override
  protected void init() {
    // generation of components by the parent class
    super.init();
    // form status
    Utilitaires.setEnabled(new JLabel[]{jLabelAjouter, jLabelCalculer, jLabelEnregistrer, jLabelSupprimer}, false);
    Utilitaires.setEnabled(
            new JMenuItem[]{jMenuItemAjouter, jMenuItemCalculer, jMenuItemEnregistrer, jMenuItemSupprimer}, false);
    // center window
    Dimension screenSize = Toolkit.getDefaultToolkit().getScreenSize();
    Dimension frameSize = getSize();
    if (frameSize.height > screenSize.height) {
      frameSize.height = screenSize.height;
    }
    if (frameSize.width > screenSize.width) {
      frameSize.width = screenSize.width;
    }
    setLocation((screenSize.width - frameSize.width) / 2, (screenSize.height - frameSize.height) / 2);
    // logged-in user
    user = uiSession.getUser();
    // local initializations
    modèleNomsVoix = new DefaultListModel<>();
    jListNomsVoix.setModel(modèleNomsVoix);
    modèleRésultats = new DefaultListModel<>();
    jListResultats.setModel(modèleRésultats);
    // lists are requested from the [business] layer
    metier.getListesElectorales(user).subscribeOn(Schedulers.io()).observeOn(SwingScheduler.getInstance())
            .subscribe(
                    // answer
                    listesElectorales -> {
                      // memorize lists
                      listes = listesElectorales;
                    },
                    // exception
                    (th) -> showException(th),
                    // observable purpose
                    () -> {
                      // next step
                      doInitStep2();
                    });
  }
...
  • 第 46 行:当关联的窗口即将显示时,会执行 [init] 方法。其目的是初始化下面的组件 [1-3]:
 
  • 第 71–85 行:异步请求候选人名单(组件 [1]);
  • 第 71 行:被观察的进程是 [metier.getListesElectorales(user)]。它在 I/O 线程上执行 [subscribeOn(Schedulers.io())],并在 UI 线程上进行观察 [observeOn(SwingScheduler.getInstance())];
  • 第 74–77 行:被观察进程返回的结果存储在第 38 行的 [listes] 字段中;
  • 第 79 行:任何异常均由以下方法处理:

  private void showException(Throwable th) {
    // exception is displayed
    jTextPaneMessages.setText(getInfoForException("Les erreurs suivantes se sont produites : ", th));
    jTextPaneMessages.setCaretPosition(0);
}
  • 第 81–84 行:在被观察进程结束时,将执行第 81–84 行。如果发生了异常,则不会执行这些行。[doInitStep2] 方法按以下方式执行初始化的第 2 步:

  private void doInitStep2() {
    // associate list names with the jComboBoxNomsListes combo
    for (int i = 0; i < listes.length; i++) {
      jComboBoxNomsListes.addItem(String.format("%s - %s", listes[i].getId(), listes[i].getNom()));
    }
    // number of seats to be filled
    metier.getNbSiegesAPourvoir(user).subscribeOn(Schedulers.io()).observeOn(SwingScheduler.getInstance())
            .subscribe(
                    // answer
                    nbSiegesAPourvoir -> {
                      // initialize the label linked to this information
                      jLabelSAP.setText(jLabelSAP.getText() + nbSiegesAPourvoir);
                    },
                    // exception
                    (th) -> showException(th),
                    // observable purpose
                    () -> {
                      // next step
                      doInitStep3();
                    });
}
  • 第 3–5 行:我们使用上一步的结果,将候选名单的名称填入下拉列表中;
  • 第 7–20 行:我们异步请求待填补的席位数量;
  • 第 7 行:被观察的进程是 [metier.getNbSiegesAPourvoir(user)]。它在 I/O 线程上执行 [subscribeOn(Schedulers.io())],并在 UI 线程上进行观察 [observeOn(SwingScheduler.getInstance())];
  • 第 10–13 行:使用该进程返回的结果更新图形用户界面;
  • 第 15 行:显示任何异常;
  • 第 17–20 行:接收到可观察对象发出的结束信号后,我们将进入初始化过程的第 3 步;

初始化过程的第 3 步由以下代码处理:


  private void doInitStep3() {
    // electoral threshold
    metier.getSeuilElectoral(user).subscribeOn(Schedulers.io()).observeOn(SwingScheduler.getInstance())
            .subscribe(
                    // answer
                    seuilElectoral -> {
                      // initialize the label linked to this information
                      jLabelSE.setText(jLabelSE.getText() + seuilElectoral);
                    },
                    // exception
                    (th) -> showException(th),
                    // observable purpose
                    () -> {
                    });
}
  • 第 3-4 行:异步请求选举阈值;
  • 第 3 行:被观察的进程是 [business.getVotingThreshold(user)]。它在 I/O 线程上执行 [subscribeOn(Schedulers.io())],并在 UI 线程上进行观察 [observeOn(SwingScheduler.getInstance())];
  • 第 6-9 行:使用该过程返回的结果更新 GUI;
  • 第 11 行:显示任何异常;
  • 第 13–14 行:收到可观察对象发出的结束信号后,不执行任何操作:GUI 初始化过程已完成;

20.3.3.2. 计算各候选名单获得的席位

[doCalculer] 方法负责计算各候选名单所获席位数:


  @Override
  protected void doCalculer() {
    tListesSaisies = listesSaisies.toArray(new ListeElectorale[0]);
    // calcul des sièges
    String info = null;
    metier.calculerSieges(user, tListesSaisies).subscribeOn(Schedulers.io()).observeOn(SwingScheduler.getInstance())
            .subscribe(
                    // traitement résultat
                    result -> consumeResultSieges(result),
                    // traitement exception
                    th -> showException(th),
                    // fin observable
                    () -> {
                    }
            );
}
  • 第 6–15 行:通过各种列表获得的座位数以异步方式计算;
  • 第 6 行:被观察的过程是 [metier.calculateSeats(user, tListsEntered)]。它在 I/O 线程上执行 [subscribeOn(Schedulers.io())],并在 UI 线程上被观察 [observeOn(SwingScheduler.getInstance())];
  • 第 9 行:该过程返回的结果由方法 [consumeResultSeats] 使用;
  • 第 11 行:显示任何异常;
  • 第13–14行:收到来自可观测量的结束信号后,不采取任何行动;

第 9 行:[consumeResultSieges] 方法处理被观察进程返回的结果,并更新候选人列表中的 [seats, eliminated] 字段:


  private void consumeResultSieges(ListeElectorale[] tListesSaisies) {
    // the result is stored
    this.tListesSaisies = tListesSaisies;
    // display of results
    modèleRésultats.clear();
    for (int i = 0; i < tListesSaisies.length; i++) {
      modèleRésultats.addElement(tListesSaisies[i].toString());
    }
    // maj state form
    Utilitaires.setEnabled(new JLabel[]{jLabelEnregistrer}, true);
    Utilitaires.setEnabled(new JLabel[]{jLabelCalculer}, false);
    Utilitaires.setEnabled(new JMenuItem[]{jMenuItemEnregistrer}, true);
    Utilitaires.setEnabled(new JMenuItem[]{jMenuItemCalculer}, false);
    jTextPaneMessages.setText("Calcul terminé");
}
  • 第4-14行:结果用于更新图形用户界面;

20.3.3.3. 记录选举结果

选举结果通过以下 [doEnregistrer] 方法保存:


  @Override
  protected void doEnregistrer() {
    // on demande l'enregistrement à la couche [métier]
    metier.recordResultats(user, tListesSaisies).subscribeOn(Schedulers.io()).observeOn(SwingScheduler.getInstance())
            .subscribe(
                    // traitement du résultat - il n'y en a pas ici
                    (param) -> {
                    },
                    // traitement de l'exception
                    (th) -> showException(th),
                    // fin observable
                    () -> {
                      // maj du formulaire
                      Utilitaires.setEnabled(new JLabel[]{jLabelEnregistrer}, false);
                      Utilitaires.setEnabled(new JMenuItem[]{jMenuItemEnregistrer}, false);
                      jTextPaneMessages.setText("Enregistrement des résultats réalisé");
                    }
            );
}
  • 第 4–17 行:选举结果以异步方式保存;
  • 第 4 行:被观察的过程是 [business.recordResults(user, tEnteredLists)]。它在 I/O 线程上执行 [subscribeOn(Schedulers.io())],并在 UI 线程上被观察 [observeOn(SwingScheduler.getInstance())];
  • 第 7–8 行:由于被观察过程不返回结果,因此这些行永远不会被执行;
  • 第 10 行:显示任何异常;
  • 第 14–16 行:接收到可观察对象发出的结束信号后,更新 GUI;

任务:验证 Swing 应用程序是否正常工作。然后修改 GUI 和代码,使得在与 Web 服务器/JSON 进行异步操作时,会显示加载图标并提供取消当前操作的选项。