Skip to content

8. Swing 环境中的 RxJava

8.1. 引言

在此,我们将重新审视第 2 节中介绍的 Swing 应用程序。

  

要在 Swing 环境中使用 RxJava,我们将使用 RxSwing 库,该库为 RxJava 添加了在 Swing 环境中非常有用的类和接口。为此,Swing 示例的 Gradle 文件如下:

  

buildscript {
    repositories {
        mavenCentral()
    }
}
apply plugin: 'java'
jar {
    baseName = 'exemples-01'
    version = '0.0.1-SNAPSHOT'
}
repositories {
    mavenCentral()
}
dependencies {
    compile('io.reactivex:rxswing:0.25.0')
    compile('io.reactivex:rxjava:1.1.3')
    compile('com.fasterxml.jackson.core:jackson-databind:2.7.3')
}
task wrapper(type: Wrapper) {
    gradleVersion = '2.9'
}
  • 第 15 行:对 RxSwing 的依赖;

我们将仅使用一个 RxSwing 专用的对象:调度器 [SwingScheduler.getInstance()],它负责在 Swing 事件循环线程上执行/监听可观察对象。我们将专门利用它来监听事件循环线程之外运行的可观察对象。让我们回顾一下示例应用程序的架构:

Image

  • 异步服务层提供返回可观察对象的方法。我们在事件循环线程以外的线程中执行这些可观察对象。这样,GUI 就能保持响应性,能够对用户输入做出反应。最明显的例子就是允许用户点击 [取消] 按钮来中断耗时过长的异步操作。要实现这一点,GUI 必须被冻结
  • Swing 层需要处理异步操作返回的结果,并利用这些结果更新 GUI。然而,这只能在事件循环线程中进行。为此,这些结果会在调度器 [SwingScheduler.getInstance()] 中被监听;

因此,在 GUI 事件处理代码中,与异步层 [rxService] 的交互采用以下形式:


Observable obs=rxService.doSomething(...).subscribeOn(Schedulers.computation()).observeOn(SwingScheduler.getInstance()) ;

其中调度器 [Schedulers.computation()] 可根据具体用例替换为其他调度器。

建议读者重新阅读第 2 段。现在,读者已具备充分理解该段落所需的知识。

8.2. 代码结构

该代码实现了以下架构:

Image

实现该架构的 IntelliJ IDEA 项目如下:

  
  • [rxswing.service] 包实现了同步(IService、Service)和异步(IRxService、RxService)服务层;
  • [rxswing.ui] 包实现了 Swing 接口;

8.3. 运行项目

要在 IntelliJ IDEA 中运行该项目,请按照以下步骤操作:

 

8.4. 同步服务

Image

  

同步服务层提供了以下 [IService] 接口:


package dvp.rxswing.service;
 
public interface IService {
  // random numbers in the [a,b] interval
  // n numbers are generated with n itself a random number in the interval [minCount, maxCount]
  // numbers are generated after a delay of milliseconds,
  // where [delay] is itself a random number in the interval [minDelay, maxDelay]
  public ServiceResponse getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay);
}

服务响应的 [ServiceResponse] 类型如下:


package dvp.rxswing.service;
 
import java.util.List;
 
public class ServiceResponse {
 
  // service waiting time
  private int delay;
  // random numbers
  private List<Integer> aleas;
  // execution thread
  private String executedOn;
 
  // manufacturers
 
  public ServiceResponse() {
      // execution thread
    executedOn = Thread.currentThread().getName();
  }
 
  public ServiceResponse(int delay, List<Integer> aleas) {
      // local builder
    this();
    // other initializations
    this.delay = delay;
    this.aleas = aleas;
  }
 
  // getters and setters
...
}

[IService] 接口由以下 [Service] 类实现:


package dvp.rxswing.service;
 
import java.util.*;

public class Service implements IService {
 
  @Override
  public ServiceResponse getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay) {
    // random numbers in the [a,b] interval
    // n numbers are generated with n itself a random number in the interval [minCount, maxCount]
    // numbers are generated after a delay of milliseconds,
    // where [delay] is itself a random number in the interval [minDelay, maxDelay]
 
    // some checks
    List<String> messages = new ArrayList<>();
    int erreur = 0;
    if (a < 0) {
      messages.add("Le nombre a de l'intervalle [a,b] de génération doit être supérieur à 0");
      erreur |= 2;
    }
    if (a >= b) {
      messages.add("Dans l'intervalle [a,b] de génération, on doit avoir a< b");
      erreur |= 4;
    }
    if (minCount < 0) {
      messages.add("Le nombre min de l'intervalle [min,count] du nombre de valeurs générées doit être supérieur à 0");
      erreur |= 16;
    }
    if (minCount > maxCount) {
      messages.add("Dans l'intervalle [min,count] du nombre de valeurs générées, on doit avoir min<= max");
      erreur |= 32;
    }
    if (minDelay < 0) {
      messages.add("Le nombre min de l'intervalle [min,count] du délai d'attente doit être supérieur à 0");
      erreur |= 64;
    }
    if (minCount > maxCount) {
      messages.add("Dans l'intervalle [min,count] du délai d'attente, on doit avoir min<= max");
      erreur |= 128;
    }
    if (maxDelay > 5000) {
      messages.add("L'attente en millisecondes avant la génération des nombres doit être dans l'intervalle [0,5000]");
      erreur |= 256;
    }
    // mistakes?
    if (!messages.isEmpty()) {
      throw new AleasException(String.join(" [---] ", messages), erreur);
    }
    // random number generator
    Random random = new Random();
    // waiting?
    int delay = minDelay + random.nextInt(maxDelay - minDelay + 1);
    if (delay > 0) {
      try {
        Thread.sleep(delay);
      } catch (InterruptedException e) {
        throw new AleasException(String.format("[%s : %s]", e.getClass().getName(), e.getMessage()), 1024);
      }
    }
    // result generation
    int count = minCount + random.nextInt(maxCount - minCount + 1);
    List<Integer> nombres = new ArrayList<>();
    for (int i = 0; i < count; i++) {
      nombres.add(a + random.nextInt(b - a + 1));
    }
    // return result
    return new ServiceResponse(delay,nombres);
  }
 
}

该服务使用的异常类 [AleasException] 如下:


package dvp.rxswing.service;
 
public class AleasException extends RuntimeException {
 
    private static final long serialVersionUID = 1L;
    // error code
  private int code;
 
  // manufacturers
  public AleasException() {
  }
 
  public AleasException(String detailMessage, int code) {
    super(detailMessage);
    this.code = code;
  }
 
  public AleasException(Throwable throwable, int code) {
    super(throwable);
    this.code = code;
  }
 
  public AleasException(String detailMessage, Throwable throwable, int code) {
    super(detailMessage, throwable);
    this.code = code;
  }
 
  // getters and setters
...
}
  • 第 3 行:它继承了 [RuntimeException] 类。因此,这是一个未处理的异常;
  • 第 7 行:它为其父类添加了一个错误代码(0 表示无错误);

8.5. 异步服务

Image

  

异步服务层提供了以下 [IRxService] 接口:


package dvp.rxswing.service;
 
import dvp.rxswing.ui.UiResponse;
import rx.Observable;
 
public interface IRxService {
  // random numbers in the [a,b] interval
  // n numbers are generated with n itself a random number in the interval [minCount, maxCount]
  // numbers are generated after a delay of milliseconds,
  // where [delay] is itself a random number in the interval [minDelay, maxDelay]
  public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse);
}
  • 第 11 行:该服务的 [getAleas] 方法现在返回一个可观察对象;

[getAleas] 方法返回一个类型为 [UiResponse] 的响应,该响应专用于 [Ui] 层。该类型的定义如下:


package dvp.rxswing.ui;
 
import dvp.rxswing.service.ServiceResponse;
 
import java.text.SimpleDateFormat;
import java.util.Calendar;
 
public class UiResponse {
 
  // customer id
  private int idClient;
  // service response
  private ServiceResponse serviceResponse;
  // observation thread name
  private String observedOn;
  // query time
  private String requestAt;
  // response time
  private String responseAt;
 
  // manufacturers
 
  public UiResponse() {
      // observation thread
    observedOn = Thread.currentThread().getName();
    // query time
    requestAt = getTimeStamp();
  }
 
  // private methods
 
  private String getTimeStamp() {
    return new SimpleDateFormat("hh:mm:ss:SSS").format(Calendar.getInstance().getTime());
  }
 
  // getters and setters
...
}
  • 随机数位于第 13 行的字段中;
  • 其余字段用于指定异步服务可观察对象的执行线程和观察线程,以及向服务发出的请求和收到的响应的时间戳;

该异步接口由以下 [RxService] 类实现:


package dvp.rxswing.service;
 
import dvp.rxswing.ui.UiResponse;
import rx.Observable;
 
public class RxService implements IRxService {
 
  // synchronous service
  private IService service;
 
  // manufacturer
  public RxService(IService service) {
    this.service = service;
  }
 
  @Override
  public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse) {
      // we create an observable emitting the value rendered by the synchronous service
    return Observable.create(subscriber -> {
      try {
          // synchronous call
        uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
        // the result is passed on to the observer
        subscriber.onNext(uiResponse);
      } catch (Exception e) {
          // we pass the error to the observer
        subscriber.onError(e);
      } finally {
          // the observer is informed that the emissions are finished
        subscriber.onCompleted();
      }
    });
  }
}
  • 第 12–14 行:异步服务的 [RxService] 类由同步接口 [IService] 的实例构建而成;
  • 第 20–33 行:构建可观察对象,即 [getAleas] 方法的结果;
  • 第 22 行:调用同步方法 [service.getAleas]。其类型为 [ServiceResponse] 的结果被封装在类型为 [UiResponse] 的对象中,以便提供给 [swing] 层。该对象最初作为方法调用参数(第 17 行中的最后一个参数)传入;
  • 第 24 行:将 [UiResponse] 发送给观察者(即 [swing] 层)。 [UiResponse] 对象不仅包含第 22 行同步服务生成的信息,还包含第 17 行调用 [getAleas] 方法时生成的其他信息。这就是为什么调用方法将 [UiResponse] 对象作为参数传递给 [getAleas] 方法(第 17 行,最后一个参数);
  • 第 30 行:我们不要忘记通知发射结束。这里有一个可观察对象,它只发射一个值:即同步服务返回的那个值;
  • 第 27 行:我们向观察者通知任何错误;

8.6. 图形用户界面

Image

  
  • 图形用户界面是使用 [NetBeans] IDE 构建的,该 IDE 拥有一个功能强大的图形化编辑器。该编辑器生成了 [AbstractJFrameAleas.form] 文件,该文件仅限此 IDE 使用;
  • [AbstractJFrameAleas] 类同样由 NetBeans 图形化编辑器生成。随后对其进行了如下重构:我们希望处理的 GUI 事件在 [AbstractJFrameAleas] 类中通过抽象方法进行处理,这些方法由子类 [JFrameAleasEvents] 实现。最终,
    • 抽象类 [AbstractJFrameAleas] 负责构建和显示图形用户界面;
    • 子类 [JFrameAleasEvents] 负责处理其事件;

[Request] 选项卡的 GUI 组件如下:

 
编号
类型
名称
角色
1
JTabbedPane
jTabbedPane1
一个带标签的容器。包含两个标签页(JPanel):[jPanelRequest] 用于请求,[jPanelResponse] 用于响应;
2
JTextField
jTextFieldNbValues
向随机数服务发送的请求数量。若异步服务在调度器 [Schedulers.io] 上运行,这些请求将共享一个处理器;
3
JTextField
jTextFieldA
区间 [a,b] 中的端点 a
4
JTextField
jTextFieldB
区间 [a,b] 的端点 b
5
JTextField
jTextFieldMinCount
minCount区间 [minCount, maxCount] 的端点
6
JTextField
jTextFieldMaxCount
[minCount, maxCount] 区间的上限
7
JTextField
jTextFieldMinDelay
minDelay 区间 [minDelay, maxDelay] 的下限
8
JTextField
jTextFieldMaxDelay
[minDelay, maxDelay] 区间的 maxDelay 边界
9
JCheckBox
jCheckBoxRxSwing
如果复选框被选中,则向异步接口发送请求。否则,则向同步接口发送请求
10
JComboBox
jComboBoxSchedulers
对于异步请求,将使用此处选定的调度器进行执行
11
JButton
jButtonGenerate
启动对同步或异步服务的请求执行

[响应] 选项卡的 GUI 组件如下:

 
编号
类型
名称
角色
1
JLabel
jLabelDuration
请求的总执行时间(以毫秒为单位)
2
JLabel
jLabelNbResponses
观察到的响应总数(可能与请求数不同,因为每个请求可能会返回多个待观察的值)
3
JList
jListNumbers
显示已观测(已接收)的值
4
JButton
jButtonCancel
取消当前正在执行的请求

8.7. 图形用户界面的实例化

  

[JFrameAleasEvents] 类负责处理 GUI 事件,包括对 [Generate] 按钮的点击。这是一个可执行类,运行于以下上下文中:


public class JFrameAleasEvents extends AbstractJFrameAleas {
 
    private static final long serialVersionUID = 1L;
    // synchronous generation service
    private IService service;
    // asynchronous generation service
    private IRxService rxService;
 
    // seizures
    private int nbRequests;
    private int a;
    private int b;
    private int minDelay;
    private int maxDelay;
    private int minCount;
    private int maxCount;
 
    // error messages
    private final String jLabelNbValuesErrorText = "Tapez un nombre entier >=1";
    private final String jLabelCountErrorText = "minCount doit être >=0 et maxCount>=minCount ";
    private final String jLabelDelayErrorText = "minDelay doit être >=0 et maxDelay>=minDelay et  maxDelay<=5000";
    private final String jLabelIntervalErrorText = "a doit être >=0 et b>=a ";
 
    // subscriptions to observables
    protected List<Subscription> subscriptions = new ArrayList<Subscription>();
    // start-end of execution
    private long debut;
    // mapper jSON
    private ObjectMapper jsonMapper;
    // answer model
    private DefaultListModel<String> model;
 
    // manufacturer
    public JFrameAleasEvents() {
        // parent
        super();
        // local
        initJFrame();
        // services
        service = new Service();
        rxService = new RxService(service);
        // mapper jSON
        jsonMapper = new ObjectMapper();
    }
 
    private void initJFrame() {
        // hide error messages
        jLabelCountError.setText("");
        jLabelDelayError.setText("");
        jLabelIntervalError.setText("");
        jLabelNbValuesError.setText("");
        // hide texts by default
        jTextFieldA.setText("100");
        jTextFieldB.setText("200");
        jTextFieldMinCount.setText("5");
        jTextFieldMaxCount.setText("10");
        jTextFieldMinDelay.setText("100");
        jTextFieldMaxDelay.setText("500");
        jTextFieldNbValeurs.setText("10");
        jLabelDuree.setText("");
        // answer model
        model = new DefaultListModel<>();
        jListNumbers.setModel(model);
        // number of hearts
        System.out.printf("La JVM a détecté [%s] coeurs sur votre machine%n", Runtime.getRuntime().availableProcessors());
    }
 
    public static void main(String args[]) {
        try {
            UIManager.setLookAndFeel(UIManager.getSystemLookAndFeelClassName());
        } catch (UnsupportedLookAndFeelException | ClassNotFoundException | InstantiationException
                | IllegalAccessException e) {
            System.out.println(e);
            System.exit(0);
        }
 
        /* Create and display the form */
        java.awt.EventQueue.invokeLater(() -> {
            new JFrameAleasEvents().setVisible(true);
        });
    }
  • 第 1 行:[JFrameAleasEvents] 类继承自 [AbstractJFrameAleas] 类,而 [AbstractJFrameAleas] 类又继承自 Swing 的 [JFrame] 类。因此,[JFrameAleasEvents] 类是一个 Swing 窗口;
  • 第 68–75 行:将要执行的 [main] 方法;
  • 第 70 行:设置 GUI 的外观和风格
  • 第 79 行:调用 [JFrameAleasEvents] 类的构造函数:GUI 将被构建并初始化。完成后,它会被显示出来;
  • 第 34–44 行:构造函数;
  • 第 36 行:调用父类构造函数将初始化 GUI。此时,GUI 的外观完全符合开发者的设计,但尚未显示;
  • 第 38 行:初始化 GUI 的某些组件;
  • 第 40 行:同步服务的实例化;
  • 第 41 行:实例化异步服务;

8.8. 同步请求的执行

点击 [Generate] 按钮将触发以下 [doGenerate] 方法的执行:


    @Override
    protected void doGenerate() {
        // saisies valides ?
        if (!isPageValid()) {
            return;
        }
        // rx ou pas ?
        if (jCheckBoxRxSwing.isSelected()) {
            // requêtes asynchrones
            doGenerateWithRxService();
        } else {
            // requêtes synchrones
            doGenerateWithService();
        }
}
  • 第 4–6 行:我们验证用户输入是否有效。关于 [isPageValid] 方法,我们不再赘述,因为它很简单;
  • 第 8 行:我们检查 RxSwing 复选框的状态;
  • 第 13 行:我们同步执行请求;

[doGenerateWithService] 方法如下:


    // synchronous generation
    private void doGenerateWithService() {
        // start waiting
        beginWaiting();
        try {
            for (int i = 0; i < nbRequests; i++) {
                // response preparation
                UiResponse uiResponse = new UiResponse();
                // customer no
                uiResponse.setIdClient(i);
                // synchronous call
                uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
                // response time
                uiResponse.setResponseAt();
                // update the JList model with the responses received
                model.add(0, jsonMapper.writeValueAsString(uiResponse));
                // update number of responses
                jLabelNbReponses.setText(String.valueOf(Integer.parseInt(jLabelNbReponses.getText()) + 1));
            }
        } catch (JsonProcessingException | RuntimeException e) {
            JOptionPane.showMessageDialog(this, getInfoForThrowable("L'erreur suivante s'est produite", e), "Informations",
                    JOptionPane.PLAIN_MESSAGE);
        }
        // end waiting
        endWaiting();
}
  • 第 12 行:对随机数生成服务进行同步调用;
  • [doGenerateWithService] 方法完全在 Swing 事件循环线程内执行。在该方法执行完毕之前,GUI 不会处理任何新事件。此时 GUI 处于冻结状态。因此,例如第 16 行和第 18 行中的 GUI 更新将永远不会被显示。它们只会显示最终值,且这将在所有请求执行完毕后发生;

[beginWaiting] 方法(第 4 行)如下:


    private void beginWaiting() {
        // buttons
        jButtonGenerate.setVisible(false);
        jButtonCancel.setVisible(true);
        // wait slider
        jTabbedPane1.setCursor(Cursor.getPredefinedCursor(Cursor.WAIT_CURSOR));
        jButtonCancel.setCursor(Cursor.getDefaultCursor());
        // raz answers
        model.clear();
        // rx subscriptions
        subscriptions.clear();
        // the response view is displayed
        jTabbedPane1.setSelectedIndex(1);
        jLabelNbReponses.setText("0");
        jLabelDuree.setText("");
        // start of execution
        debut = new Date().getTime();
}
  • 第 3 行:[Generate] 按钮被隐藏。这会触发一个事件,该事件同样只能在所有请求执行完毕后才执行。不过,我们从未看到它被隐藏,因为 [doGenerateWithService] 方法第 25 行中的 [endWaiting] 方法会将其再次显示;
  • 第 13 行:我们选择 [Response] 选项卡以查看响应的到达情况。同样,该事件也仅在所有请求完成后才会执行,届时我们将一次性看到所有响应,而我们原本希望看到它们依次到达;

同步接口显然存在不足。这些不足可以通过异步接口来克服。

8.9. 执行异步请求

执行异步请求的代码如下:


private void doGenerateWithRxService() {
        // début attente
        beginWaiting();
        // on va obtenir les nombres aléatoires sous la forme d'un observable
        Observable<UiResponse> observable = Observable.empty();
        // Schéduler d'exécution des différents observables
        Scheduler[] schedulers = { Schedulers.io(), Schedulers.computation(), Schedulers.newThread(),
                Schedulers.trampoline(), Schedulers.immediate() };
        Scheduler scheduler = schedulers[jComboBoxSchedulers.getSelectedIndex()];
        // configuration des observables
        for (int i = 0; i < nbRequests; i++) {
            // préparation réponse
            UiResponse uiResponse = new UiResponse();
            uiResponse.setIdClient(i);
            // l'observable est configuré pour s'exécuter sur le schéduler choisi par l'utilisateur
            // puis cumul de l'observable obtenu à l'observable du tout
            observable = observable.mergeWith(
                    rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(scheduler));
        }
        // observateur
        observable = observable.observeOn(SwingScheduler.getInstance());
        // pour l'instant, on a juste fait de la configuration
        // aucune requête n'a encore été faite au service synchrone de génération des nombres aléatoires
        // on s'abonne à l'observable - c'est ce qui va provoquer l'appel au service synchrone de génération des nombres aléatoires
        try {
            // il n'y a ici qu'un abonnement - le résultat est une souscription
            subscriptions.add(observable.subscribe(
                    // notification d'émission
                    uiResponse -> {
                        // on met à jour l'Ui avec la réponse
                        // ceci est possible car l'observation a lieu dans le thread de l'Ui
                        updateUi(uiResponse);
                    } ,
                    // notification d'erreur
                    th -> {
                        // cas d'erreur - on l'affiche
                        String message = getInfoForThrowable("L'erreur suivante s'est produite", th);
                        JOptionPane.showMessageDialog(this, message, "Informations", JOptionPane.PLAIN_MESSAGE);
                        // annulation requêtes
                        doCancel();
                    } ,
                    // notification [onCompleted]
                    // fin de l'attente
                    this::endWaiting));
        } catch (Throwable th) {
            // cas d'exception + générale - on l'affiche
            String message = getInfoForThrowable("L'erreur suivante s'est produite", th);
            JOptionPane.showMessageDialog(this, message, "Informations", JOptionPane.PLAIN_MESSAGE);
            // on annule les requêtes
            doCancel();
        }
    }
  • 第 3 行:更新 GUI 以指示正在进行一项可能耗时较长的操作;
  • 第 5 行:创建一个空的可观察对象。该可观察对象将由 [Swing] 层进行监听;
  • 第 7 行:可能的调度器数组;
  • 第 9 行:我们已赋予用户选择查询执行调度器的权限。此处获取用户所选的调度器;
  • 第 11–19 行:每个请求返回一个可观察对象,其元素会被合并(mergeWith)(第 17 行)到第 5 行创建的可观察对象中;
  • 第 13–14 行:构建 [UiResponse] 对象。请注意,该对象既是 [RxService.getAleas] 方法的输入参数,也是其返回结果(第 17–18 行);
  • 第 14 行:每个请求都通过一个编号进行标识,此处称为 [idClient]。这是必要的,因为在异步环境中,响应的接收顺序可能与请求的发送顺序不同。[idClient] 使我们能够确定响应属于哪个请求;
  • 第 17–18 行:通过 [rxService.getRandom] 发起异步请求。该请求将在用户选择的调度器上执行。其结果(类型为 Observable<UiResponse>)与第 5 行的可观察对象进行组合。需要注意的是,[rxService.getAleas] 方法在此处被执行并返回一个可观察对象。 但这并不意味着随机数已被生成。实际上,可观察对象只有在被订阅时才会执行。目前尚未发生这种情况;
  • 第 21 行:这是关键指令:我们指定第 5 行可观察对象发出的元素应在 UI 线程上进行观察。此处我们使用了 RxSwing 库特有的调度器;
  • 第 25–51 行:我们订阅第 5 行中的可观察对象。此时才会向生成随机数的同步服务请求随机数。关键部分在于第 29–33 行的指令。其余部分主要处理错误情况以及来自可观察对象的 [onCompleted] 通知;
  • 第 28–44 行:请记住,我们要求在 UI 线程上观察第 5 行中的进程。因此,第 28–44 行的代码在 UI 线程上运行;
  • 第 29–33 行:我们处理可观察对象的 [onNext] 通知。我们接收由被观察进程发出的 [UiResponse] 类型数据。这是某个异步请求的结果。我们使用此响应更新用户界面;
  • 第 34–41 行:我们处理可观察对象的 [onError] 通知。我们显示一个显示错误的对话框(第 37–38 行),然后取消请求(第 40 行);
  • 第 42–44 行:处理可观察对象的 [onCompleted] 通知。更新 GUI 以显示所请求的服务已完成。第 44 行也可以如下编写
 ()->{endWaiting();}

在此,我们选择使用方法引用;

  • 第 45–51 行:某些异常不会经过第 34–41 行。这种情况发生在请求过多时。一旦超过某个阈值(该阈值取决于运行时环境),就会引发 [StackOverflowError],该异常由第 45–51 行处理;
  • 第 27 行:订阅会生成一个 [Subscription] 类型的对象,并将其添加到订阅列表中。在此处,该列表仅包含一个元素;

第 32 行:我们使用以下 [updateUi] 方法更新用户界面:


    private void updateUi(UiResponse uiResponse) {
        // response time
        uiResponse.setResponseAt();
        // observation thread
        uiResponse.setObservedOn();
        // number of responses
        jLabelNbReponses.setText(String.valueOf(Integer.parseInt(jLabelNbReponses.getText()) + 1));
        // running time
        jLabelDuree.setText(String.valueOf(new Date().getTime() - debut));
        // add the jSON response string to the JList response template
        try {
            model.add(0, jsonMapper.writeValueAsString(uiResponse));
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
}

这里我们可以看到图形用户界面的组件被更新了(第 7、9、12 行)。要实现这一点,你必须位于 UI 线程(事件循环)中。

[endWaiting] 方法如下:


    private void endWaiting() {
        // generate] button visible
        jButtonGenerate.setVisible(true);
        // hidden [Cancel] button
        jButtonCancel.setVisible(false);
        // hidden wait cursor
        jTabbedPane1.setCursor(Cursor.getDefaultCursor());
        // selected answers tab
        jTabbedPane1.setSelectedIndex(1);
        // duration updated one last time
        jLabelDuree.setText(String.valueOf(new Date().getTime() - debut));
}

当异步请求执行过程中发生错误,或用户点击 [取消] 按钮时,会调用 [doCancel] 方法。其代码如下:


// les souscriptions aux observables
    private List<Subscription> subscriptions = new ArrayList<Subscription>();
....
 
    @Override
    protected void doCancel() {
        // fin attente
        endWaiting();
        // dans le cas de souscriptions
        if (jCheckBoxRxSwing.isSelected() && subscriptions != null) {
            subscriptions.forEach(Subscription::unsubscribe);
            //subscriptions.forEach(s -> s.unsubscribe());
        }
    }
 
  • 第 2 行:[subscriptions] 是一个订阅列表;
  • 第 11 行:所有订阅均被取消;
  • 第 12 行:第 11 行的另一种写法。此处的 [forEach] 方法期望接收一个 Consumer<Subscription> 类型的实例(参见第 4.4 节);

让我们回到 [doGenerateWithService] 方法的代码:它可以分解为两个步骤:

  1. 可观察对象配置步骤。此操作在 [doGenerateWithService] 方法的调用者线程中进行,即 UI 线程;
  2. 订阅操作,该操作将触发可观察对象的执行;

如果可观察对象使用了 [Schedulers.computation()、Scheduler.io()、Schedulers.newThread()] 中的任一调度器,则它们将在 UI 线程之外执行。这些不同的线程将争夺机器的核心资源。 由于请求属于长时间运行的操作(数百毫秒),在 UI 线程中执行的 [doGenerateWithService] 方法将在请求返回响应之前完成。然而,该方法是在 [Generate] 按钮的点击事件上执行的。一旦该事件处理完毕,UI 线程便能继续处理后续事件。 此类事件有多个。因此,[beginWaiting] 方法已设置了多个:


    private void beginWaiting() {
        // buttons
        jButtonGenerate.setVisible(false);
        jButtonCancel.setVisible(true);
        // waiting cursor
        jTabbedPane1.setCursor(Cursor.getPredefinedCursor(Cursor.WAIT_CURSOR));
        jButtonCancel.setCursor(Cursor.getDefaultCursor());
        // raz answers
        model.clear();
        // rx subscriptions
        subscriptions.clear();
        // the response view is displayed
        jTabbedPane1.setSelectedIndex(1);
        jLabelNbReponses.setText("0");
        jLabelDuree.setText("");
        // start of execution
        debut = new Date().getTime();
}

这段代码的几乎每一行都会影响图形用户界面。更新并非立即发生:事件会被放入事件循环队列中。一旦处理完 [Generate] 按钮的点击事件,这些事件就会依次执行,用户便能看到图形用户界面的变化:

  • 显示 [Response] 选项卡(第 13 行),并为其关联加载指示器(第 6 行)
  • 其 [Cancel] 按钮显示出来(第 4 行),用户可以点击它;
  • 响应的 JList 被清空(第 9 行);
  • 显示响应数量的 JLabel 显示 0;
  • 显示执行时间的 JLabel 显示空字符串;

在查询执行过程中,UI线程可以定期访问处理器,从而处理待处理事件。其中包括由[updateUi]方法设置的事件:


    private void updateUi(UiResponse uiResponse) {
        // response time
        uiResponse.setResponseAt();
        // observation thread
        uiResponse.setObservedOn();
        // number of responses
        jLabelNbReponses.setText(String.valueOf(Integer.parseInt(jLabelNbReponses.getText()) + 1));
        // running time
        jLabelDuree.setText(String.valueOf(new Date().getTime() - debut));
        // add the jSON response string to the JList response template
        try {
            model.add(0, jsonMapper.writeValueAsString(uiResponse));
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
}

当 UI 线程处于活动状态时:

  • 显示响应数量的 JLabel 被更新(第 7 行);
  • 显示执行时间的 JLabel 被更新(第 9 行);
  • 通过其模型更新响应的 JList(第 12 行);

这使得用户能够查看查询执行的进度。此外,用户还可以通过 [Cancel] 按钮取消查询。这正是将异步服务置于 [Swing] 层前端的核心意义所在,而 RxJava 则是实现这一功能的首选技术。

最后需要注意的是,如果用户选择了 [Schedulers.immediate()] 或 [Schedulers.trampoline()] 中的任一调度器,则可观察对象将在与调用者相同的主线程(即 UI 线程)上执行。这将导致同步行为。

第 2.8.12.8.22.8.3 2.8.4 节展示了使用不同调度器所获得的结果。