2. 入门示例
我最初接触 RxJava 是通过网上找到的课程和教程。除了理论中涉及了一些我不熟悉且难以理解的概念之外,我也实在看不出它在实际应用中有什么用处。 因此,我们将首先通过一个示例(希望是一个简单的示例)来展示:使用 RxJava 如何真正简化代码,并以此为基础,尝试梳理出该库的核心要素。
RxJava 库基于以下概念:由一个或多个订阅者(Subscriber<T>,也称为观察者或消费者)观察的 Observable<T> 类型元素流。 RxJava 库允许 Observable<T> 流在线程 T1 中运行,其 Subscriber<T> 观察者在线程 T2 中运行,而开发者无需担心管理这些线程的生命周期,也无需处理诸如线程间数据共享和为执行全局任务而进行同步等自然而然的难题。因此,它简化了异步编程。
一个 Observable<T> 流会生成类型为 T 的元素,这些元素在生成时即可被观察。如果观察者和可观察对象(此术语泛指 Observable<T> 类型)位于同一线程中,那么只有当观察者消耗了元素 i 之后,可观察对象才能生成元素 (i+1)。这种架构仅在少数情况下有用。 如果观察者和可观察对象不在同一个线程中,那么可观察对象及其观察者将各自独立运行:可观察对象按自己的节奏发布数据,观察者也按自己的节奏消费数据。这正是该库的价值所在。到目前为止,我们只讨论了一个观察者。实际上,一个可观察对象可以拥有任意数量的观察者。
2.1. 示例应用程序的架构
示例应用程序具有以下架构:

- 在 [1] 中,服务层生成随机数列表。该层与调用它的 [swing] 方法运行在同一线程中,因此其生成随机数的过程是同步的;
- 在 [2] 中,一个使用 RxJava 实现的轻量级适配层,允许将同一服务的异步实现呈现给 [swing] 层:该服务可以与调用它的 [swing] 方法在不同的线程中运行;
- 调用 [4] 是同步的,而调用 [5-6] 是异步的;
我们在此想要说明的是,Rx 库能够轻松地将同步接口转换为异步接口。这有什么用处呢?Swing 接口中的事件通常在被称为“事件循环”的线程中处理。事件会被排队并依次处理。只有当前一个事件 Ei 被完全处理完毕后,事件 Ei+1 才能被处理。因此,为了保持 GUI 的响应性,事件处理必须尽可能简短。 有时,处理一个事件可能需要很长时间。如果处理过程涉及网络访问,就会出现这种情况。如果我们不想让 GUI 冻结到用户无法接受的程度,这些网络操作必须在与事件循环分离的线程中执行,以释放事件循环。这将我们带入了并发编程(即多个线程并行运行)的领域,而并发编程被公认为是一项困难的任务。Rx 库为这个问题提供了一个简单而优雅的解决方案。
为了模拟长时间运行的进程,示例中的服务会在经过一定延迟后才返回随机数,以便我们观察图形用户界面的行为。
2.2. 可执行文件
示例应用程序的可执行文件位于示例目录的 [dvp/executables] 文件夹中:
![]() | ![]() |
根据运行机器的配置不同,运行 [swing-01] 压缩包有多种方法。例如,您可以按照步骤 [1-3] 进行操作。这将显示以下图形用户界面:
![]() |
- 该界面包含两个选项卡 [1-2]:一个是 [Request],用于向随机数生成器服务发送请求;另一个是 [Response],用于显示接收到的数字;
- 在 [3] 中,您指定希望向该服务发送多少次请求;
- 在 [4] 中,您指定所需的随机数生成范围 [a,b];
- 在 [5] 中,服务返回的数值个数将是一个落在用户设定的区间 [minCount, maxCount] 内的随机数;
- 在 [6] 中,服务在返回响应前将等待 delay 毫秒,其中 delay 是用户定义区间 [minDelay, maxDelay] 内的随机数;
- 默认情况下,[swing] 层将使用该服务的同步接口。若要使用异步层,用户需勾选 [7]。此时,生成服务将在与 GUI 事件循环分离的线程中运行。Rx 库提供了多种生成这些线程的策略,用户可在 [8] 中选择其策略;
- 数字生成通过按钮 [9] 进行;
![]() |
- [10] 展示了结果。我们将解释其结构;
- 在 [11] 中,表示获得的结果数量;
- 在[12]中,显示以毫秒为单位的执行时间;
- 在[13]中,用户可以选择取消执行;
每个结果采用以下格式:
{"idClient":0,"serviceResponse":{"delay":412,"aleas":[146,115,128,174,159,112,162,127],"executedOn":"RxComputationThreadPool-6"},"observedOn":"AWT-EventQueue-0","requestAt":"02:42:47:708","responseAt":"02:42:52:931"}
- [idClient]:请求 ID。请注意,会向生成服务发送多个请求;
- [delay]:服务在发送结果前观察到的等待时间(单位为毫秒);
- [aleas]:服务返回的随机数;
- [executedOn]:服务运行的线程名称;
- [observedOn]:显示结果的线程名称。对于 Swing 界面,这只能是事件循环线程,此处为 [AWT-EventQueue-0];
- [requestAt]:请求时间,格式为 [时:分:秒:毫秒];
- [responseAt]:接收结果的时间,格式同上;
接下来我们将展示理解本示例所需的代码片段。
2.3. 同步接口

服务层 [1] 提供以下接口:
public interface IService {
// random numbers in [a,b]
// n numbers are generated with random n in the interval [minCount, maxCount]
// numbers are generated after a delay of milliseconds,
// where [delay] is a random number in the range [minDelay, maxDelay]
public ServiceResponse getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay);
}
[ServiceResponse] 如下所示:
public class ServiceResponse {
// service waiting time
private int delay;
// random numbers
private List<Integer> aleas;
// execution thread
private String executedOn;
// manufacturers
public ServiceResponse(int delay, List<Integer> aleas) {
executedOn = Thread.currentThread().getName();
this.delay = delay;
this.aleas = aleas;
}
// getters and setters
...
}
答案分为三部分:
- 第6行:生成的随机数;
- 第4行:服务在返回结果前观察到的等待时间;
- 第8行:服务的执行线程;
2.4. 同步调用

接下来我们将详细说明 [swing] 层对 [1] 服务发起的同步调用 [4]:
private void doGenerateWithService() {
// start waiting
beginWaiting();
try {
for (int i = 0; i < nbRequests; i++) {
UiResponse uiResponse = new UiResponse();
uiResponse.setIdClient(i);
uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
uiResponse.setResponseAt();
model.add(0, jsonMapper.writeValueAsString(uiResponse));
jLabelNbReponses.setText(String.valueOf(Integer.parseInt(jLabelNbReponses.getText()) + 1));
}
} catch (JsonProcessingException | RuntimeException e) {
System.out.println(e);
}
// end waiting
endWaiting();
}
- 第 5–12 行:处理用户发起的 [nbRequests] 个请求的循环;
- 第 8 行:[service] 是第 2.3 节中介绍的同步 [IService] 接口的实现;
- 第 10 行:[model] 是 [Response] 选项卡中 JList 组件所显示的模型。该模型的元素是 [UiResponse] 类型的 JSON 字符串,具体如下:
public class UiResponse {
// customer id
private int idClient;
// service response
private ServiceResponse serviceResponse;
// observation thread name
private String observedOn;
// query time
private String requestAt;
// response time
private String responseAt;
// manufacturers
public UiResponse() {
observedOn = Thread.currentThread().getName();
requestAt = getTimeStamp();
}
// private methods
private String getTimeStamp() {
return new SimpleDateFormat("hh:mm:ss:SSS").format(Calendar.getInstance().getTime());
}
// getters and setters
...
}
- 第 6 行:来自号码生成服务的响应;
- 第 4 行:正在响应的请求编号;
- 第 8 行:显示此响应的线程。如前所述,这始终是事件循环线程;
- 第 10 行和第 12 行:请求的时间和响应的时间;
2.5. 测试同步调用
我们运行以下配置:
![]() |
在 [响应] 选项卡中,我们得到以下结果:
![]() |
- 在[1-2]中,我们确实如请求所示收到了10个响应。它们按照到达顺序被插入到队列的首位。我们可以看到,它们的接收顺序与请求顺序一致;
- 它们均在事件循环线程 [AWT-EventQueue-0] 中执行并显示。因此,这些请求在此线程中依次执行。不存在并发请求;
- 这里未显示的是,在执行过程中,GUI 处于冻结状态。例如,无法访问 [Response] 选项卡查看传入的响应,也无法使用 [Cancel] 按钮停止执行。即使该按钮出现在 [Request] 选项卡上,也无法使用。实际上,此时将存在两个事件:
- 点击 [Generate] 按钮;
- 点击 [取消] 按钮;
只有在由点击 [Generate] 按钮触发的操作完成后,[Cancel] 按钮的点击才会被处理。我们刚才看到,该操作在整个执行过程中都占用了事件循环线程,从而阻止了对 [Cancel] 按钮点击的处理。这通常是 Rx 能够带来显著改进的典型场景;
2.6. 异步接口及其实现
接下来我们将探讨第 [2] 层的接口及其基于 Rx 的实现。这可能不会立刻让人一目了然。我们只是想突出该实现中代码的简洁性。

异步接口如下:
public interface IRxService {
// random numbers in [a,b]
// n numbers are generated with random n in the interval [minCount, maxCount]
// numbers are generated after a delay of milliseconds,
// where [delay] is a random number in the range [minDelay, maxDelay]
public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse);
}
与第 2.3 节中介绍的同步接口相比,其区别如下:
- 第 2.3 节中介绍的 [UiResponse] 类现在已成为 [getAleas] 方法的参数之一(第 6 行)。原因在于,由于请求现在并行运行,且服务会在返回结果前等待一段随机时间,因此响应不会按请求的顺序返回给我们。 因此,我们传递 [UiResponse] 对象,该对象包含请求 ID 等信息:
// id du client (requête)
private int idClient;
// réponse du service
private ServiceResponse serviceResponse;
// nom du thread d'observation
private String observedOn;
// heure de la requête
private String requestAt;
// heure de la réponse
private String responseAt;
- 该异步服务的响应类型为 [Observable<UiResponse>]。类型 [Observable<>] 由 Rx 库提供。结果 [Observable<UiResponse>] 表示 [getAleas] 方法提供了一个 [UiResponse] 类型的值流,这些值会被逐个推送给其观察者;
现在让我们看看该接口的实现:
public class RxService implements IRxService {
// service
private IService service;
// manufacturer
public RxService(IService service) {
this.service = service;
}
@Override
public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse) {
return Observable.create(subscriber -> {
try {
uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
subscriber.onNext(uiResponse);
} catch (Exception e) {
subscriber.onError(e);
} finally {
subscriber.onCompleted();
}
});
}
}
- 第 7–9 行:我们向构造函数传入同步接口 [IService] 的引用。该接口将负责生成随机数;
- 由 [getAleas] 方法返回的可观察对象通过静态方法 [Observable.create] 构建。该方法允许我们基于同步实现构建异步实现;
- 第 13 行:静态方法 [Observable.create] 的参数是一个 lambda 函数,该函数接受 [Subscriber] 类型作为参数,这同样是一个 Rx 类型。[Subscriber] 是一个订阅可观察对象流的对象,即订阅异步传输的数据流。在此,我们使用该订阅器的三个方法:
- [Subscriber.onNext] 用于向其传递数据(第 16 行);
- [Subscriber.onError] 用于向其发送异常(第 18 行);
- [Subscriber.onCompleted] 用于告知订阅者数据流已结束(第 20 行);
同一个可观察对象可以有多个订阅者。在此,我们仅有一个订阅者订阅单条数据流,即第 15–16 行生成的数据。该数据由服务的同步实现生成(第 15 行),并返回给订阅者(第 16 行)。
即使这一切仍有些晦涩难懂,但人们仍会为该服务的这种异步实现所展现出的极致简洁性而惊叹。
2.7. 异步调用

接下来,我们将考察由 [swing] 层对 [2] 服务发出的同步调用 [5]:
private void doGenerateWithRxService() {
// start waiting
beginWaiting();
// we ask for the random numbers
Observable<UiResponse> observables = Observable.empty();
for (int i = 0; i < nbRequests; i++) {
UiResponse uiResponse = new UiResponse();
uiResponse.setIdClient(i);
// scheduler
int schedulerIndex = jComboBoxSchedulers.getSelectedIndex();
switch (schedulerIndex) {
case 0:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.io()));
break;
...
}
}
...
}
- 第 6–10 行:执行用户请求的 [nbRequests] 个请求;
- 第 7–8 行:准备异步服务 [getAleas] 方法(第 13 行)所需的 [UiResponse] 对象。这主要涉及记录请求的 [idClient];
- 第 13 行:调用异步服务的 [getAleas] 方法。该方法返回一个 [Observable<UiResponse>] 对象。此调用尚未触发同步服务。让我们回到异步 [getAleas] 的代码:
@Override
public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse) {
return Observable.create(subscriber -> {
try {
uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
subscriber.onNext(uiResponse);
} catch (Exception e) {
subscriber.onError(e);
} finally {
subscriber.onCompleted();
}
});
}
第 4 至 11 行的代码(调用同步服务)仅在订阅者注册时才会执行。只要没有订阅者,这段代码就不会被执行。
让我们回到 [doGenerateWithRxService] 方法的代码:
- 第 5 行:我们创建一个空的 Observable(未观察任何内容);
- 第 13 行:我们创建一个 Observable,其数据流将合并与 [nbRequests] 请求关联的 [nbRequests] 个异步数据流。这是通过 [Observable.mergeWith] 方法实现的,该方法允许合并两个异步数据流。 在 Rx 术语中,[mergeWith] 被称为流操作符。这类操作符的特点是,其运算结果在大多数情况下是另一个 [Observable]。最终,在第 17 行之后,变量 [observables] 指代一个由异步服务发出的 [nbRequests] 个异步响应组成的单一数据流;
- 第 13 行:合并操作也可以这样写:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse));
但我们写的是:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.io()));
在此,我们对可观察对象 [rxService.getAleas] 使用了 [subscribeOn] 运算符。与通常情况一样,其结果仍是可观察对象。 [subscribeOn] 运算符指定该可观察对象必须在 [Scheduler] 提供的线程中执行。有几种可能的 [Scheduler] 适用于不同的情况。在 GUI 中,我们提供了几个选项以展示它们之间的差异:
![]() |
这将生成以下代码:
private void doGenerateWithRxService() {
// start waiting
beginWaiting();
// we ask for the random numbers
Observable<UiResponse> observables = Observable.empty();
for (int i = 0; i < nbRequests; i++) {
UiResponse uiResponse = new UiResponse();
uiResponse.setIdClient(i);
// scheduler
int schedulerIndex = jComboBoxSchedulers.getSelectedIndex();
switch (schedulerIndex) {
case 0:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.io()));
break;
case 1:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.computation()));
break;
case 2:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.newThread()));
break;
case 3:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.trampoline()));
break;
case 4:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.immediate()));
break;
}
}
...
}
让我们重新审视第 12–14 行的代码。调度器 [Schedulers.io()] 为每个可观察对象分配一个新线程。如果我们追踪代码:
- 第 5 行:我们有一个空的可观察对象;
- 第 13 行,第 1 次迭代:observables 是列表 [observable0/thread0](在线程 thread0 上运行的 Observable observable0);
- 第 13 行,第 2 次迭代:observables 是一个列表 [observable0/thread0, observable1/thread1];
- 等等……
最终,在第 28 行之后,我们得到一个由 [nbRequests] 个在 [nbRequests] 个不同线程上运行的可观察对象合并而成的可观察对象。并非所有调度器都按此方式工作,这一点我们将在测试过程中看到。
让我们继续分析调用异步服务的代码:
private void doGenerateWithRxService() {
// start waiting
beginWaiting();
// we ask for the random numbers
Observable<UiResponse> observables = Observable.empty();
for (int i = 0; i < nbRequests; i++) {
...
}
// observer
observables = observables.observeOn(SwingScheduler.getInstance());
// these observables are executed
subscriptions.add(observables.subscribe(uiResponse -> {
updateUi(uiResponse);
} , th -> {
System.out.println(th);
doCancel();
} , this::doCancel));
}
- 我们已经看到,当执行到第 10 行时,我们有一个单一的可观察对象,它是 [nbRequests] 个可观察对象的合并结果,这些可观察对象可能会在 [nbRequests] 个不同的线程上运行,也可能不会,这取决于用户选择的调度器;
- 第 10 行:[observeOn] 运算符允许我们指定从哪个线程检索可观察对象的数据,在本例中是类型为 [UiResponse] 的 [nbRequests] 对象。在 Swing 界面中,我们别无选择。 对界面的任何更新都必须在事件循环线程中进行。在此,可观察对象的数据将显示在 Swing JList 组件中。[SwingScheduler.getInstance()] 表示事件循环线程。[SwingScheduler] 类并非来自 RxJava 库,而是来自 RxSwing 库;
- 当执行到第 12 行时,同步服务仍未被调用,因为第 10 行的可观察对象尚未拥有订阅者。第 12–17 行通过 [subscribe] 运算符提供了一个订阅者。该运算符的参数是三个 lambda 函数:
- 第一个 [uiResponse -> {updateUi(uiResponse);}] 将可观察对象生成的 [UiResponse] 对象之一作为参数。请注意,此处将有 [nbRequests] 个此类对象。关联方法(本例中为 updateUi)必须处理该结果;
- 第二个 [th -> {System.out.println(th);doCancel();}] 将 [Throwable] 类型作为参数,此处指可观察对象执行过程中发生的异常。关联方法必须处理此信息。在此,我们将其显示在控制台上(第 15 行)并取消执行,这将更新 GUI 的某些元素;
- 第三个 [this::doCancel] 在可观察对象发出已无数据可传输的信号时被调用。此处,该可观察对象是 [nbRequests] 个可观察对象的并集。当构成它的所有可观察对象都已发出工作完成的信号时,该结果可观察对象将指示其已完成。 因此,当这个第三个 lambda 函数被执行时,我们已经接收到了所有数据。本地方法 [doCancel] 会更新 GUI 以反映执行已完成;
[subscriptions] 变量定义如下:
// les souscriptions aux observables
protected List<Subscription> subscriptions = new ArrayList<Subscription>();
[Subscription] 类型表示一个订阅,即订阅者 [Subscriber] 与其所观察的对象 [Observable] 之间的关联。虽然本例中仅有一个订阅,但我们在此使用了订阅列表。当可观察对象发出已无数据可传输的信号时,将执行局部方法 [doCancel],其实现如下:
@Override
protected void doCancel() {
// fin attente
endWaiting();
// dans le cas de souscriptions
if (jCheckBoxRxSwing.isSelected() && subscriptions != null) {
subscriptions.forEach(Subscription::unsubscribe);
}
}
- 第 7 行将所有订阅者从可观察对象中取消订阅;
通过这简短的说明,我们可以总结出以下要点:
- 类型 [Observable] 表示一个值流,这些值会逐个推送给订阅者或观察者;
- 类型 [Subscriber] 表示 [Observable] 类型的订阅者;
- [Subscription] 类型表示一个订阅,即 [Subscriber] 与 [Observable] 之间的关联;
- [Observable] 类型支持 [mergeWith, empty, subscribeOn, observeOn, ...] 等操作符,其中大部分会生成可观察对象。这些操作符用于在可观察对象运行前对其进行配置:
- 观察什么;
- 可观察对象运行的线程;
- 订阅者从可观察对象接收数据的线程;
- 可观察对象有两种类型:[cold] 和 [hot]。冷可观察对象(cold observable)会为每个新订阅者完全执行一次。如果每次执行产生相同的数据,则每个新订阅者接收的数据与前一个订阅者相同。热可观察对象(hot observable)通常会持续产生数据。当订阅者订阅时,他们接收的是从订阅时刻起发出的数据,而不会接收之前可能已发出的数据。 在我们的示例中,该可观察对象是冷的:对于每个新订阅者,它都会被完全重新执行。我们的示例中实际执行了什么?要弄清楚这一点,我们需要回到被观察的可观察对象的定义:
@Override
public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse) {
return Observable.create(subscriber -> {
try {
uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
subscriber.onNext(uiResponse);
} catch (Exception e) {
subscriber.onError(e);
} finally {
subscriber.onCompleted();
}
});
}
对于每个新的订阅者,作为 [Observable.create] 方法参数的 lambda 函数(第 3 行)都会被重新执行。因此,第 4–11 行代码会针对每个新的订阅者 [subscriber] 执行;
2.8. 测试异步调用
首先,我们将演示不同调度程序的效果。为此,我们使用以下参数:
![]() |
我们将 [1-2] 设置为较小的数值,这样即使请求在同一个线程上执行,我们也不必等待太久。
2.8.1. 使用 [Schedulers.io] 调度器
![]() |
需要注意以下几点:
- 响应的接收顺序与请求的顺序不一致(参见 idClient);
- 每个请求都在不同的线程中运行;
- 此次 GUI 不再卡死:
- 可以切换标签页;
- 可以看到数据正在传入;
- 由于执行速度过快,来不及看到 [取消] 按钮。我们将在另一项测试中重点展示这一点;
2.8.2. 使用 [Schedulers.computation] 调度器
![]() |
需要注意以下几点:
- 响应的接收顺序与请求的顺序不一致(参见 idClient);
- 请求是在 8 个线程中执行的;
- 第 3 个线程用于处理请求 8 和 0;
- 线程 #4 用于处理请求 9 和 1;
- 其他请求各自都有一个不同的线程;
调度器 [Schedulers.computation] 使用的线程数等于当前机器的核心数。该信息通过表达式 [Runtime.getRuntime().availableProcessors()] 获取。
2.8.3. 使用 [Schedulers.newThread] 调度器
![]() |
其行为与 [Schedulers.io] 调度器类似。
2.8.4. 使用调度器 [Schedulers.trampoline, Schedulers.immediate]
![]() |
该行为是同步的。所有请求都在事件循环线程上执行。这一结果不应被泛化;它仅仅意味着在这个具体示例中,这两个调度器都以同步方式运行。
2.9. 边界情况
在此示例中,我们将使用支持异步操作的调度器。首先,我们使用 [Schedulers.computation] 调度器将请求数增加到 100 个,该调度器在此处运行于 8 个线程上。我们得到以下结果:
![]() |
- 在 [1] 中,[取消] 按钮可见且可用(异步操作);
现在,让我们让执行过程运行到结束:
![]() |
我们在[2]中看到,执行这100个请求大约耗时4秒(由8个线程共同完成)。
现在,让我们使用 [Schedulers.newThread] 调度器来运行这 100 个请求,该调度器会将每个请求分配到单独的线程中执行:
![]() |
在[1]中,我们可以看到执行这100个请求(分布在100个线程上)仅耗时半秒。因此,这比使用[Schedulers.computation]调度器要快得多。
现在,让我们在相同条件下发起800次请求,仍使用[Schedulers.newThread]调度器。我们得到以下结果:
![]() |
这800次请求大约在1秒内执行完毕。
当我们增加这个数量(在我的机器上超过2,500个请求——耗时1.5秒——当然,这个数字在很大程度上取决于运行时环境)时,最终会出现以下异常:
![]() |
因此,我们遇到了栈溢出。测试表明,[Schedulers.newThread] 调度器的行为是不确定的。您可能会遇到上述异常,运行新的测试,然后回到引发异常的配置,却不再遇到该异常。
2.10. 结论
我们演示了一个使用 Rx 库的示例。让我们总结一下所学内容:
我们最初采用的是以下架构:

- 在 [4] 中,[swing] 层向 [service] 层发起同步调用;
- 在 [5] 中,[swing] 层向 [rxService] 层发起异步调用,而 [rxService] 层随后向 [service] 层发起同步调用 [6];
我们首先注意到的是,Rx 库使得从同步的 [service] 接口创建异步的 [rxService] 接口变得非常简单(参见第 2.4 节)。这是一个重要的启示,因为这意味着我们可以轻松地将同步应用程序演进为异步应用程序。
在 [swing] 层中,编写了两个独立的方法:
实践证明,编写异步调用比编写同步调用复杂得多。尽管如此,那些曾处理过涉及多线程且需要同步的并发编程的人会发现,Rx 解决方案更易于编写,并且能够避免所有棘手的同步和线程间通信问题。在本文中,我们强调了以下要点:
- [Observable] 类型表示一个事件(值)流,该流可能(但不必)是异步的,且可以被观察;
- [Subscriber] 类型表示 [Observable] 类型的订阅者;
- [Subscription] 类型表示一个订阅,即 [Subscriber] 与 [Observable] 之间的关联;
- [Observable] 类型支持 [mergeWith、empty、subscribeOn、observeOn 等] 运算符,这些运算符大多会生成可观察对象。这些运算符用于在可观察对象运行前对其进行配置:
- 观察什么;
- 可观察对象运行的线程;
- 订阅者从可观察对象接收数据所使用的线程;
- 可观察对象分为两种类型:[冷] 和 [热]。冷可观察对象会在每个新订阅者加入时完全重新执行一次。如果每次执行产生的数据相同,那么每个新订阅者收到的数据将与前一个订阅者相同。热可观察对象通常会持续产生数据。当订阅者订阅时,他们将收到从订阅时刻起发出的数据,而不会收到之前可能已发出的任何数据。 在我们的示例中,该可观察对象属于冷可观察对象:对于每个新订阅者,它都会被完全重新执行。
既然我们已经通过一个示例了解了 Rx 库的价值,接下来我们将对其进行更深入的探讨。
Rx 库中有许多方法的签名中包含泛型参数。我们将简要回顾这些签名(第 3 节)。这些方法的参数大多是函数式接口(Java 8),即仅包含一个方法的接口。因此,实际参数必须是这些接口的实例。在 Java 8 之前,通常使用匿名类来实现接口。 在 Java 8 中,如果接口是函数式接口,使用 lambda 表达式来实现会更加简洁。因此,我们将介绍 lambda 表达式(第 4 节)。完成这一步后,我们将介绍 [Stream] 类(第 5 节),它允许您使用 lambda 表达式处理 Java 集合。这个类之所以有趣,是因为 RxJava 的 [Observable] 类借用了:
- 某些方法;
- 将方法串联起来处理同一可观察对象的方式;
随后我们将介绍 RxJava 库特有的函数式接口(第 6 节)。接着我们将继续讲解 Rx 库的主要元素 [Observable、Subscriber、Subscription、运算符](第 7 节)。 [Observable] 类拥有数十个运算符,而这些运算符本身又被重载了多次。这最初会带来相当大的复杂性,因为这些运算符及其重载版本有时仅在某个细节上有所不同,若缺乏经验,很难判断该使用哪个运算符。我们将仅介绍数量有限的运算符,并且大多数情况下会忽略它们的重载版本。
上一节的全部内容将通过在简单的控制台应用程序中使用 RxJava 库来讲解。掌握 RxJava 库后,我们将将其应用于两类图形化应用程序:
- 在第 8 节中,我们将重新审视那个 Swing 示例应用程序,并对其进行更深入的探讨。随后我们将使用 RxSwing 库;
- 在第 9 节中,我们将使用 RxAndroid 库创建一个 Android 应用程序;
完成上述内容后,读者将掌握独立开发所需的工具。虽然要能直观地运用 Rx 库可能还需要一些时间,但我认为这个库特别有趣。不过,我发现它难以理解,学习曲线也相当陡峭。希望本文能帮助读者缩短这一学习曲线。在我看来,这绝对值得付出努力。
















