7. RxJava 库
RxJava 库基于以下概念:由一个或多个订阅者(Subscriber<T>,也称为观察者或消费者)观察的 Observable<T> 类型元素流。 RxJava 库允许 Observable<T> 流在线程 T1 中运行,其 Subscriber<T> 观察者在线程 T2 中运行,而开发者无需担心管理这些线程的生命周期,也无需处理诸如线程间数据共享和为执行全局任务而进行同步等自然而然的难题。因此,它简化了异步编程。
一个 Observable<T> 流会生成类型为 T 的元素,这些元素可在生成时被观察。如果观察者和可观察对象(该术语通常泛指 Observable<T> 类型)位于同一线程中,则可观察对象只能在观察者消耗完元素 i 之后,才生成元素 (i+1)。这种架构仅在少数情况下有用。 如果观察者和可观察对象不在同一线程中,则可观察对象及其观察者将独立运行:可观察对象按自己的节奏发布元素,观察者按自己的节奏消费元素。这正是该库的价值所在。到目前为止,我们只讨论了一个观察者。实际上,一个可观察对象可以拥有任意数量的观察者。
RxJava 库特别适合用于实现简介第 2 节中描述并在此总结的架构:

- 在 [1] 中,服务层提供各项服务,其中部分服务获取耗时较长(例如网络请求);
- 该服务层由图形用户界面 [1](Swing、Android、JavaFX)调用。如果服务层与调用它的 [Swing] 方法运行在同一线程中,图形用户界面在等待服务结果时会冻结(变得无响应);
- 在[2]中,通过RxJava实现的轻量级适配层使GUI层能够获得该服务的异步实现:该服务可在与调用它的GUI层方法不同的线程中运行。 在此情况下,GUI [3] 保持响应:用户可以继续与其交互,例如在第一个网络请求并行时触发新的网络请求,最重要的是,用户可以取消耗时过长的进程——如果 GUI 冻结,这是不可能的;
- 调用 [4] 是同步的,而调用 [5-6] 则是异步的;
在此架构中,第 [2] 层提供返回 Observable<T> 类型的服务,图形层 [3] 的方法可以订阅这些类型。随后,第 [2] 层中的服务会逐一交付其结果,而第 [3] 层可以对每个结果做出响应,例如通过更新图形用户界面的一个或多个组件。
Observable<T> 类拥有数十种方法。这是该库面临的挑战之一:它功能非常丰富,很难掌握其所有可能性。我们将介绍其中的一部分。掌握其余方法则需要随着时间的推移。
7.1. 创建可观察对象并订阅
7.1.1. 示例-01:[Observable.from] 方法
![]() |
请看以下代码:
package dvp.rxjava.observables;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import java.util.Arrays;
public class Exemple01 {
public static void main(String[] args) {
// observable integers
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
obs1.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.printf("next : %s%n", integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println(throwable);
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed");
}
});
}
}
- 第 12 行:我们从一个整数列表创建一个 Observable<Integer> 类型。
Observable<T> 类是一个由类型 T 的元素组成的流,这些元素在生成时可以被观察——最好是异步观察,但并非必须如此。其定义如下:
![]() |
如前所述,Observable<T> 类拥有数十种方法。其中一些与第 5 节中讨论的 Stream<T> 类的方法相似。RxJava 文档中包含“大理石图”[2],用于说明这些方法的工作原理:
- 第 3 行展示了可观察对象随时间推移的发射过程;
- 方法 [4] 被应用于可观察对象发出的元素。它通常会生成一个新的可观察对象;
- 第 5 行展示了获得的新可观察对象;
[Observable.from] 方法的签名如下:
![]() |
静态方法 [Observable.from] 允许您从一组类型为 T 的元素中创建一个 Observable<T>。这是入门可观察对象的一种非常简单的方法。以下代码行:
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
因此将发出三个元素。它不会立即发出这些元素,而是在每次订阅者注册时完整地发出所有元素。这被称为冷可观察对象。该可观察对象会为每个新订阅者重新发出其元素。
我们可以将上述语句视为对可观察对象的配置操作。它仅配置一次,但若出现 n 个订阅者,则会执行 n 次。
如何订阅?
一种实现方式是使用 [Observable.subscribe] 方法,此处采用的定义如下:
![]() |
- 该方法的第一个参数 [Action1<T> onNext](参见第 6.2 节)是当可观察对象发出新元素 T 时要执行的方法;
- 该方法的第二个参数 [Action1<Throwable> onError] 是当可观察对象抛出异常时要执行的方法;
- 该方法的第三个参数 [Action0 onComplete](参见第 6.1 节)是在可观察对象完成时执行的方法;
- 该方法返回类型 [Subscription];
类型 [Subscription] 表示对可观察对象的订阅。其定义如下:
![]() |
该接口 [1] 的价值在于其方法 [2],该方法允许取消订阅。
在我们的示例中,订阅可观察对象的代码如下:
obs1.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.printf("next : %s%n", integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println(throwable);
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed");
}
});
- 第 1 行:类型为 [Subscription] 的结果被忽略;
- 第 1–15 行:这三个参数是匿名类的实例。我们还将使用 lambda 表达式。匿名类的优势在于,这些类中单个方法所期望的数据类型一目了然;
- 第 2–5 行:类型为 [Action1<Integer>] 的第一个参数的实现;
- 第 6–10 行:类型为 [Action1<Throwable>] 的第二个参数的实现;
- 第 11–15 行:类型为 [Action0] 的第三个参数的实现;
完整的代码如下:
package dvp.rxjava.observables;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import java.util.Arrays;
public class Exemple01 {
public static void main(String[] args) {
// observable integers
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
// subscription
obs1.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.printf("next : %s%n", integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println(throwable);
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed");
}
});
}
}
第 12 行的可观察对象会在第 14 行调用 [subscribe] 方法后立即开始发布其三个元素。从那时起:
- 对于每个发出的元素,都会执行第 15–18 行代码。
- 当 3 个元素全部发送完毕后,第 24–29 行代码执行;
- 第19–24行将永远不会被执行,因为可观察对象在此处不会抛出异常;
默认情况下,可观察对象和观察者运行在同一个线程中。虽然有少数预定义的可观察对象会在主线程(此处指 main 方法的线程)之外的线程中运行,但大多数情况并非如此。因此,在此示例中,所有操作都在 main 方法的线程中进行:
- 可观察对象会发出元素 1;
- 第 15–18 行代码执行并显示该元素;
- 可观察对象发布元素 2;
- 第 15–18 行执行并显示该元素;
- 该可观察对象会发布元素 3;
- 第 15–18 行执行并显示此元素;
- 可观察对象发出 [completed] 通知;
- 第 24–29 行执行;
结果如下所示:
[Example02] 类复用了 [Example01],这次将 lambda 函数作为 [Observable.subscribe] 方法的参数:
package dvp.rxjava.observables;
import java.util.Arrays;
import rx.Observable;
public class Exemple02 {
public static void main(String[] args) {
// observable integers
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
// subscription
obs1.subscribe(
(integer) -> System.out.printf("next : %s%n", integer),
(th) -> System.out.println(th),
() -> System.out.println("completed"));
}
}
7.1.2. 示例-03:观察者类
![]() |
[Observable.subscribe] 方法允许您订阅一个可观察对象,该方法有多种版本,包括以下几种:
package dvp.rxjava.observables;
import java.util.Arrays;
import rx.Observable;
import rx.Observer;
public class Exemple03 {
public static void main(String[] args) {
// observable integers
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
// subscription
obs1.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("completed");
}
@Override
public void onError(Throwable th) {
System.out.printf("throwable %s", th);
}
@Override
public void onNext(Integer integer) {
System.out.printf("next : %s%n", integer);
}
});
};
}
第 13 行:我们不再向 [subscribe] 方法传递三个参数,而是像下面这样向其传递一个 [Observer] 类型:
![]() |
[Observer] 类型是一个包含三个方法的接口:
- [onNext(T t)],该方法在可观察对象每次发出元素 t 时被调用;
- [onError(Throwable th)],当可观察对象抛出异常 th 时被调用;
- [onCompleted],当可观察对象表示已完成发布时被调用;
该代码的工作原理与之前解释的类似。我们得到以下结果:
7.1.3. 示例-04:[Observable.create] 方法
![]() |
静态方法 Observable.create 的定义如下:
![]() |
- [create] 方法返回类型 Observable<T>;
- [create] 方法的参数是一个类型为 [Observable.OnSubscribe<T>] 的函数,定义如下:
![]() |
类型 [Observable.OnSubscribe<T>] 是一个函数式接口,它本身继承自函数式接口 [Action1<Subscriber<? super T>>]。该接口的 [call] 方法期望接收一个定义如下、类型为 [Subscriber] (subscriber, observer) 的对象:
![]() |
我们在 [1] 中看到,类 [Subscriber<T>] 实现了第 7.1.2 节中介绍的接口 [Observer<T>]。
最终,方法 [<T> Observable.create]:
- 其参数为类型 [Observable.OnSubscribe<T>] 的实例,该实例仅包含一个方法:void call(Subscriber<T> s)。类型 [Subscriber<T>] 继承自类型 [Observer<T>],因此具有 onNext、onError 和 onCompleted 方法;
- 并返回类型 Observable<T>;
[<T> Observable.create] 方法返回一个已配置的可观察对象。 目前尚未发出任何元素。当订阅者 [Subscriber<T> s] 订阅此可观察对象时,将调用作为 [<T> Observable.create] 方法参数传递的函数中的 [void call(s)] 方法。其作用是发出类型为 T 的元素 t,并在每次发出时调用观察者的 [s.onNext(t)] 方法。 当此过程完成后,必须调用观察者的 [s.onCompleted(t)] 方法,并且 [call] 方法必须终止。如果 [call] 方法遇到异常 th,则必须调用观察者的 [s.onError(th)] 方法,并且 [call] 方法必须终止;
为说明这一复杂行为,我们将使用以下代码 [Example04]:
package dvp.rxjava.observables;
import rx.Observable;
import rx.Subscriber;
import java.util.Random;
public class Exemple04 {
public static void main(String[] args) {
// observable configuration of reals
Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
@Override
public void call(Subscriber<? super Double> subscriber) {
for (int i = 0; i < 3; i++) {
// emission element i
subscriber.onNext(new Random((i + 1)).nextDouble());
}
// end of issue
subscriber.onCompleted();
}
});
// subscription and therefore emission
obs1.subscribe((d) -> System.out.printf("onNext %s%n", d), (th) -> System.out.printf("onError %s%n", th),
() -> System.out.println("onCompleted"));
}
}
- 第 11 行:创建了一个发射 Double 类型的数据的可观察对象;
- 第 11–21 行:使用第 12–20 行中包含单个 [call] 方法的匿名类对 [create] 方法的参数进行实例化。第 11 行创建的可观察对象已准备好进行发射,但它仅在观察者到达时才会发射;
- 第 13–21 行:[call] 方法接收一个观察者的引用;
- 第 14–17 行:向观察者发射三个元素;
- 第 19 行:通知观察者已发送完毕;
- 第 23–24 行:订阅第 11 行创建的可观察对象。我们使用三个 lambda 表达式来实现 [subscribe] 方法的三个参数 [onNext, onError, onCompleted]。此订阅将创建订阅者 [Subscriber<Double>],该订阅者将被传递给第 13 行的 [call] 方法。随后将开始发射元素;
- 所有操作均在同一线程中进行:可观察对象和观察者;
我们得到以下结果:
[Observable.create] 方法允许您从任何事件创建一个可观察对象。这是我们在入门指南第 2 节中用来将同步接口转换为异步接口的方法。
7.1.4. 示例-05:[示例-04]的重构
![]() |
以下示例展示了静态方法 [Observable.subscribe] 的新版本:
package dvp.rxjava.observables;
import rx.Observable;
import rx.Subscriber;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
public class Exemple05 {
public static void main(String[] args) {
// configuration of a real observable
Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
@Override
public void call(Subscriber<? super Double> subscriber) {
showInfos("Observable.call start");
for (int i = 0; i < 3; i++) {
// waiting
try {
Thread.sleep(500 - i * 100);
} catch (InterruptedException e) {
// error
subscriber.onError(e);
}
// action
double value = new Random().nextInt(100) * 1.2;
showInfos(String.format("Observable.call onNext(%s)", value));
subscriber.onNext(value);
}
// finish
showInfos(String.format("Observable.call onCompleted"));
subscriber.onCompleted();
}
});
// a subscriber
Subscriber<Double> subscriber = new Subscriber<Double>() {
@Override
public void onCompleted() {
showInfos("Subscriber.onCompleted");
}
@Override
public void onError(Throwable e) {
showInfos(String.format("Subscriber.onError (%s)", e));
}
@Override
public void onNext(Double aDouble) {
showInfos(String.format("Subscriber.onNext (%s)", aDouble));
}
};
// subscription
showInfos("avant souscription");
obs1.subscribe(subscriber);
showInfos("après souscription");
}
private static void showInfos(String message) {
System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
new SimpleDateFormat("ss:SSS").format(new Date()));
}
}
- 第 56 行:静态方法 [Observable.subscribe] 的新版本接受 [Subscriber] 类型作为参数,我们在上一段中已介绍过该类型;
- 第 37–52 行:订阅者(观察者)。它实现了 Observer 接口,包含 onNext、onError 和 onCompleted 这三个方法;
- 第 61–64 行:从这里开始,我们将重点关注可观察对象及其观察者运行的线程;
- 第 62 行:线程名称;
- 第 63 行:以秒和毫秒为单位的当前时间。这将使我们能够随时间推移追踪可观察对象的元素发布及其在观察者中的处理过程;
- 这段代码的功能与前面的代码相同。我们只是对后者进行了重构;
所得结果如下:
- 结果第 1 行:在代码第 56 行之前,尚未发生任何事情。可观察对象仅被配置;
- 结果第 2 行:代码第 56 行触发了第 15 行 [call] 方法的调用。第 3 行:实数 80.39 被发给观察者;
- 第4行:观察者接收到了发出的数值;
- 第5–8行:上述过程重复两次;
- 第 9 行:可观察对象发送广播结束通知;
- 第 10 行:观察者接收该通知;
- 第 11 行:由代码第 57 行显示;
因此,我们可以看到,第 56 行单行订阅导致了结果中的第 2–10 行被显示出来。初次接触 RxJava 库时,人们往往会疑惑各项内容是如何关联在一起的,尤其是观察者与可观察对象之间的联系。在这里,我们可以看到第 56 行——对可观察对象的订阅——
- 触发了可观察对象所有元素的发射;
- 可知可观察对象与观察者运行在同一线程中;
- 正因如此,我们观察到以下序列:发射元素 i,观察元素 i,发射元素 (i+1),观察元素 (i+1),……
回想一下,发射器在发射元素之前是处于等待状态的:
// attente
try {
Thread.sleep(500 - i * 100);
} catch (InterruptedException e) {
// erreur
subscriber.onError(e);
}
其中第 3 行中的 i 代表事件的序列号(0 <= i < 3)。如果我们观察可观察对象各元素的触发时间:
- 第 2、3 行:元素 0 是在订阅开始后约 500 毫秒发出的;
- 第 3、5 行:元素 1 约在元素 0 之后 400 毫秒被发出;
- 第 5、7 行:元素 2 约在元素 1 之后 300 毫秒被发出;
7.2. 执行线程、观察线程
7.2.1. 示例-06:位于 [main] 之外的线程中的 Observable 和观察者
![]() |
我们将前一个示例重构如下 [示例06]:
package dvp.rxjava.observables;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class Exemple06 {
public static void main(String[] args) {
// gatekeeper
CountDownLatch latch = new CountDownLatch(1);
// configuration of a real observable
Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
@Override
public void call(Subscriber<? super Double> subscriber) {
showInfos("Observable.call start");
for (int i = 0; i < 3; i++) {
// waiting
try {
Thread.sleep(500 - i * 100);
} catch (InterruptedException e) {
// error
subscriber.onError(e);
}
// action
double value = new Random().nextInt(100) * 1.2;
showInfos(String.format("Observable.call onNext(%s)", value));
subscriber.onNext(value);
}
// finish
showInfos(String.format("Observable.call onCompleted"));
subscriber.onCompleted();
}
});
// a subscriber
Subscriber<Double> subscriber = new Subscriber<Double>() {
@Override
public void onCompleted() {
showInfos("Subscriber.onCompleted");
// we lower the barrier
latch.countDown();
}
@Override
public void onError(Throwable e) {
showInfos(String.format("Subscriber.onError (%s)", e));
}
@Override
public void onNext(Double aDouble) {
showInfos(String.format("Subscriber.onNext (%s)", aDouble));
}
};
// suite observable configuration
obs1 = obs1.subscribeOn(Schedulers.computation());
// subscription
showInfos("avant souscription");
obs1.subscribe(subscriber);
// waiting at the gate
try {
showInfos("début attente barrière");
latch.await();
showInfos("fin attente barrière");
} catch (InterruptedException e1) {
System.out.println(e1);
}
showInfos("après souscription");
}
private static void showInfos(String message) {
System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
new SimpleDateFormat("ss:SSS").format(new Date()));
}
}
- 第 16 行:我们使用 [CountDownLatch] 对象创建一个护栏(信号量)。该对象用于在线程之间进行同步。此处,它被初始化为值 1,我们将此值称为护栏(或信号量)值。一个线程通过以下操作等待护栏:
latch.await();
如果锁的值大于 0,线程将被阻塞。线程可以对锁的内部值进行递增或递减操作。第 48 行:将锁的值递减 1。
- 第 63 行:将可观察对象配置为在调度器 [Schedulers.computation()] 提供的线程上运行。该调度器可提供与执行机器核心数相同数量的线程。示例应用程序部分演示了其他调度器的使用(参见第 2.8 节);
该代码的工作原理如下:
- [main] 方法在主线程中运行;
- 第 66 行:开始从可观察对象中发出元素。这些元素将在主线程以外的线程上发出;
- 第 70 行:由于屏障的值为 1(参见第 16 行),主线程被阻塞。只有当该值变为 0 时,主线程才能继续执行。这一变化发生在第 48 行。当观察者接收到可观察对象已完成发射的通知时,它会将屏障值设为 0;
执行结果如下:
- 第 1 行:订阅即将开始;
- 第2行:这触发了线程[RxComputationThreadPool-1]上[call]方法的执行。现在有两个线程并行执行;
- 第 3 行:由于未知原因,[RxComputationThreadPool-1] 线程已让出控制权。[main] 线程随后接管控制权,并被屏障阻塞(代码第 70 行)。从这一刻起,只有 [RxComputationThreadPool-1] 线程可以运行;
- 第 4–11 行:我们观察到可观察对象与其观察者之间此前出现的行为,但现在一切都在 [RxComputationThreadPool-1] 线程中进行;
- 第 12–13 行:观察者已下放屏障(代码第 48 行),[RxComputationThreadPool-1] 线程已终止。[main] 线程接管控制权并显示两条消息;
7.2.2. 示例-07:位于两个不同线程中的可观察对象与观察者
![]() |
我们将前面的示例修改如下:
package dvp.rxjava.observables;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class Exemple07 {
public static void main(String[] args) {
// gatekeeper
CountDownLatch latch = new CountDownLatch(1);
// configuration of a real observable
Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
@Override
public void call(Subscriber<? super Double> subscriber) {
showInfos("Observable.call start");
for (int i = 0; i < 3; i++) {
// waiting
try {
Thread.sleep(500 - i * 100);
} catch (InterruptedException e) {
// error
subscriber.onError(e);
}
// action
double value = new Random().nextInt(100) * 1.2;
showInfos(String.format("Observable.call onNext(%s)", value));
subscriber.onNext(value);
}
// finish
showInfos(String.format("Observable.call onCompleted"));
subscriber.onCompleted();
}
});
// a subscriber
Subscriber<Double> subscriber = new Subscriber<Double>() {
@Override
public void onCompleted() {
showInfos("Subscriber.onCompleted");
// we lower the barrier
latch.countDown();
}
@Override
public void onError(Throwable e) {
showInfos(String.format("Subscriber.onError (%s)", e));
}
@Override
public void onNext(Double aDouble) {
showInfos(String.format("Subscriber.onNext (%s)", aDouble));
}
};
// suite observable configuration
obs1 = obs1.subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation());
// subscription
showInfos("avant souscription");
obs1.subscribe(subscriber);
// waiting in front of the barrier
try {
showInfos("début attente barrière");
latch.await();
showInfos("fin attente barrière");
} catch (InterruptedException e1) {
System.out.println(e1);
}
showInfos("après souscription");
}
private static void showInfos(String message) {
System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
new SimpleDateFormat("ss:SSS").format(new Date()));
}
}
该代码与前一个示例完全相同,仅第 63 行有所不同:
obs1 = obs1.subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation());
该行将可观察对象(subscribeOn)和观察者(observeOn)配置为在调度器 [Schedulers.computation()] 提供的线程之一上运行。
所得结果如下:
可以注意到以下几点:
- 可观察对象在线程 [RxComputationThreadPool-4] 中运行(第 3–4 行、第 6 行、第 8–9 行);
- 观察者运行在线程 [RxComputationThreadPool-3] 中(第 5、7、10–11 行);
- 它们独立运行。因此,在第 8–9 行,可观察对象在观察者获取 [onNext] 通知(第 10 行)之前,已发出两个通知(onNext、onCompleted);
RxJava 库负责处理从可观察对象线程到观察者线程的数据传输(发布)。开发者无需为此担心。
我们已经了解了如何创建可观察对象(Observable.from、Observable.create)。现在让我们看看 RxJava 库中的预定义可观察对象。
7.3. 预定义的可观察对象
7.3.1. 示例-08:[Observable.range] 方法
![]() | ![]() |
从现在开始,我们将为被观察的过程及其观察者使用专门的类。这样做的目的是能够记录它们的名称、执行线程以及执行时间,以便我们能够随时间推移对其进行追踪。
[Process] 类将仅仅是一个我们可以命名的 Observable。它将实现以下 [IProcess] 接口:
package dvp.rxjava.observables.utils;
import rx.Observable;
public interface IProcess<T> {
// name of observable
public String getName();
// observable
public Observable<T> getObservable();
}
该接口可通过以下 [Process<T>] 类实现:
package dvp.rxjava.observables.utils;
import rx.Observable;
import rx.Scheduler;
public class Process<T> implements IProcess<T>{
// observable name
protected String name;
// observed process
protected Observable<T> observable;
// manufacturers
public Process(String name, Observable<T> observable) {
// local initializations
this.name = name;
this.observable = observable;
}
// getters and setters
public String getName() {
return name;
}
public Observable<T> getObservable() {
return observable;
}
}
- 第 9 行:进程名称;
- 第 11 行:被观测的可观测量;
- 第14–18行:构造函数;
观察者将由以下 [Observer] 类描述:
package dvp.rxjava.observables.utils;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import rx.Subscriber;
public class Observateur<T> extends Subscriber<T> {
...
}
- 第 11 行:Observateur<T> 类继承自 Subscriber<T> 类,我们在第 7.1.3 节中曾简要介绍过该类。我们将把它作为 [Observable.subscribe] 方法的参数:
// exécution observable (observation)
obs1.subscribe(observateur);
上文第 2 行中使用的 [Observable.subscribe] 方法定义如下:
![]() |
[Subscriber] 的主要作用是利用 [Observer] 接口的方法(onNext、onError、onCompleted)来管理其订阅的 Observable 所发出的元素。[Subscriber] 类具有以下方法:
![]() |
在 [Observer] 类的代码中,我们将使用 [1] isUnsubscribed 方法来判断订阅者的订阅是否已被取消。完整的 [Observer<T>] 类如下:
package dvp.rxjava.observables.utils;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import rx.Subscriber;
public class Observateur<T> extends Subscriber<T> {
// a gatekeeper (semaphore)
private CountDownLatch latch;
// a display method
private Consumer<String> showInfos;
// observer's name
private String observerName;
// the name of the observed process
private String processName;
// manufacturers
public Observateur() {
}
public Observateur(String name, CountDownLatch latch, Consumer<String> showInfos, String observedName) {
this.observerName = name;
this.latch = latch;
this.showInfos = showInfos;
this.processName = observedName;
}
// --------------------------- implementation interface Observer<T>
@Override
public void onCompleted() {
// end of issues
if (!isUnsubscribed()) {
showInfos.accept(String.format("Subscriber [%s,%s].onCompleted", observerName, processName));
}
// end of main thread lock
latch.countDown();
}
@Override
public void onError(Throwable e) {
// emission error
if (!isUnsubscribed()) {
showInfos.accept(String.format("Subscriber [%s, %s].onError (%s)", observerName, processName, e));
}
}
@Override
public void onNext(T value) {
// an additional show
if (!isUnsubscribed()) {
try {
showInfos.accept(String.format("Subscriber [%s,%s] : onNext (%s)", observerName, processName,
new ObjectMapper().writeValueAsString(value)));
} catch (JsonProcessingException e) {
showInfos.accept(String.format("Subscriber [%s,%s].onNext (%s)", observerName, processName, e));
}
}
}
}
- 除了订阅者的特性外,观察者还将携带以下信息:
- 第 14 行:一个屏障或信号量,用于阻塞主线程,直到观察者接收完可观察对象发出的所有元素。这将在代码第 36 行发生,即观察者收到可观察对象发出的发射结束通知时;
- 第 16 行:一个 Consumer<String> 实例,用于在控制台显示消息;
- 第 18 行:观察者的名称,用于在存在多个观察者时进行区分;
- 第 20 行:被观察进程的名称;
- 第 36、46、54 行:抽象类 [Subscriber<T>] 所实现的 [Observer<T>] 接口中的 [onCompleted, onError, onNext] 方法。该类本身并未实现这些方法,因此必须由其子类来实现。在这些方法中执行任何操作之前,我们会先检查观察者是否已从其正在观察的可观察对象上取消订阅;
- 第 59 行:观察者的 [onNext] 方法会写入接收到的元素的 JSON 字符串。这将使我们能够显示各种类型的元素;
话虽如此,让我们来研究一下 Observable 类的一个新方法——[range] 方法:
![]() |
Observable.range(n, m) 会发出 (m) 个整数,范围从 n 到 n+m-1。我们将通过以下 [Example08] 代码来探索它:
package dvp.rxjava.observables.exemples;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import dvp.rxjava.observables.utils.Observateur;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple08 {
public static void main(String[] args) throws InterruptedException {
// number of observers
final int nbObservateurs = 2;
// semaphore
CountDownLatch latch = new CountDownLatch(nbObservateurs);
// observable configuration
Observable<Integer> obs1 = Observable.range(15, 3).subscribeOn(Schedulers.computation());
// observable performance (observation)
showInfos.accept("main : début observation");
for (int i = 0; i < nbObservateurs; i++) {
obs1.subscribe(new Observateur<>(String.format("observateur[%d]", i), latch, showInfos,"obs1"));
}
// waiting
showInfos.accept("main : attente fin observation");
latch.await();
// end
showInfos.accept("main : fin observation");
}
// displays
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
- 第 16 行:我们将使用两个观察者;
- 第 19 行:将护栏(信号量)初始化为 2,因为我们将把每个观察者放置在不同的线程上。因此,主线程必须等待两个观察者线程都完成后才能继续;
- 第 22 行:我们将可观察对象配置为在调度器 [Schedulers.computation()] 管理的线程上运行。观察者将与可观察对象位于同一线程上;
- 第 25–27 行:我们将两个观察者订阅到可观察对象上。这将触发可观察对象为每个观察者执行完整过程:将发出整数 15、16 和 17;
- 第 30 行:主线程等待观察者完成;
所得结果如下:
- 第 2 行:主线程被阻塞,等待两个观察者完成;
- 第 3-4 行:我们看到观察者 0 位于线程 [RxComputationThreadPool-1] 上,观察者 1 位于线程 [RxComputationThreadPool-2] 上;
- 第3-10行:我们可以看到两个观察者接收到的元素完全相同;
我们将使用此处定义的 Observer 类来说明其他类型可观察对象的行为。
7.3.2. 示例-09:Observable 的 [interval, take, doNext] 方法
![]() |
![]() |
此示例演示了 Observable.interval(long interval, TimeUnit unit) 可观察对象的使用,该对象会以固定间隔发出长整型数据。请注意要点 [1]:默认情况下,[Observable.interval] 可观察对象会在 [Schedulers.computation] 调度器的某个线程上运行。
代码如下:
package dvp.rxjava.observables.exemples;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import dvp.rxjava.observables.utils.Observateur;
import rx.Observable;
public class Exemple09 {
public static void main(String[] args) throws InterruptedException {
// number of observers
final int nbObservateurs = 2;
// semaphore
CountDownLatch latch = new CountDownLatch(nbObservateurs);
// observable configuration
Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.doOnNext(l -> showInfos.accept(l.toString()));
// observable performance (observation)
showInfos.accept("main : début observation");
for (int i = 0; i < nbObservateurs; i++) {
obs1.subscribe(new Observateur<>(String.format("observateur [%d]", i), latch, showInfos,
"obs1"));
}
// waiting
showInfos.accept("main : attente fin observation");
latch.await();
// end
showInfos.accept("main : fin observation");
}
// displays
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
- 第 22 行:该可观察对象每 500 毫秒发出一个长整型。序列从数字 0 开始;
- 第 22 行:该可观察对象会发出无限数量的值。[Observable.take(n)] 方法会创建一个新的可观察对象,该对象仅保留前 n 个发出的元素;
![]() |
让我们重新审视一下该可观察对象的代码:
Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.doOnNext(l -> showInfos.accept(l.toString()));
第 2 行:每当可观察对象(Observable)发出新元素时,[Observable.doOnNext] 方法就会被执行。这通常用于记录日志信息。在此,我们希望记录元素的发出时间,以验证 500 毫秒的间隔是否得到保持。[Observable.doOnNext] 方法不会修改其所应用的可观察对象。其定义如下:
![]() |
执行后得到以下结果:
- 第 3、7 和 11 行:我们可以看到,数据发布间隔约为 500 毫秒;
- 尽管该可观察对象并未配置为使用特定的调度程序运行,但这两个观察者确实位于两个不同的线程上。这是我们在此处看到的 [Observable.interval] 可观察对象的默认行为;
7.3.3. 示例-10/12:Observable.[error, empty, never] 方法
![]() | ![]() |
从现在开始,我们将更简洁地演示 [Observable] 类的各种方法。之前的代码如下:
package dvp.rxjava.observables;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import rx.Observable;
public class Exemple09 {
public static void main(String[] args) throws InterruptedException {
// number of observers
final int nbObservateurs = 2;
// semaphore
CountDownLatch latch = new CountDownLatch(nbObservateurs);
// observable configuration
Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.doOnNext(l -> showInfos.accept(l.toString()));
// observable performance (observation)
showInfos.accept("main : début observation");
for (int i = 0; i < nbObservateurs; i++) {
obs1.subscribe(new Observateur<>(String.format("observateur [%d]", i), latch, showInfos,
"obs1"));
}
// waiting
showInfos.accept("main : attente fin observation");
latch.await();
// end
showInfos.accept("main : fin observation");
}
// displays
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
这段代码在之前的示例中已经用过。只有第 21–22 行发生了变化。因此,我们将这段代码的大部分提取出来,放入以下 [ProcessUtils] 类中:
package dvp.rxjava.observables.utils;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import rx.Observable;
public class ProcessUtils {
@SafeVarargs
public static void subscribe(int nbObservateurs, IProcess<?>... processes) throws InterruptedException {
// semaphore
CountDownLatch latch = new CountDownLatch(nbObservateurs * processes.length);
// observable performance (observation)
showInfos.accept("main : début observation");
for (int i = 0; i < nbObservateurs; i++) {
for (IProcess<?> process : processes) {
Observable<?> obs = process.getObservable();
obs.subscribe(new Observateur<>(String.format("observateur[%d]", i), latch, showInfos, process.getName()));
}
}
// waiting
showInfos.accept("main : attente fin observation");
latch.await();
// end
showInfos.accept("main : fin observation");
}
// displays
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
- 第 13 行:该方法接受两个参数:
- nbObservers:作为第二个参数传递的进程观察者数量;
- 进程:待观察的进程(称为可观察对象)。得益于 [IProcess<?>] 语法,进程可以发出不同类型的元素;
- 第 16 行:当所有观察者都完成了所有观察操作时,信号量必须变为绿色。因此,信号量的初始值等于观察者数量乘以观察次数;
- 第 20–25 行:每个观察者都订阅了其需要观察的所有进程;
- 第 23 行:从进程中获取可观察对象(参见第 7.3.1 节);
- 第 23 行:观察者已订阅该可观察对象。向观察者传递四项信息:
- 其名称;
- 当它从所观察的可观察对象收到传输结束通知时,必须递减的信号量;
- 当它需要将信息记录到控制台时应使用的方法;
- 它将观察的进程名称;
定义了这些类后,示例 10 将如下所示:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple10 {
public static void main(String[] args) throws InterruptedException {
// observable configuration
Observable<?> obs = Observable.error(new RuntimeException("Erreur !!!")).subscribeOn(Schedulers.computation());
// observable execution (observation)
ProcessUtils.subscribe(2,new Process<>("process1", obs));
}
}
第 11 行,静态方法 [Observable.error] 的定义如下:
![]() |
因此,第 8 行配置了一个可观察对象,该对象仅向其订阅者的 [onError] 方法抛出异常。执行结果如下:
main : début observation ------Thread[main] ---- Time[22:618]
main : attente fin observation ------Thread[main] ---- Time[22:636]
Subscriber[observateur[1], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-2] ---- Time[22:638]
Subscriber[observateur[0], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-1] ---- Time[22:638]
第 3 行和第 4 行:两个订阅者的 [onError] 方法都接收到可观察对象抛出的异常。
本次执行有一个特殊之处:两个观察者的 [onCompleted] 方法均未被调用。因此,屏障未被放下,主线程仍被阻塞在静态方法 [ProcessUtils.subscribe] 的第 3 行:
// attente
showInfos.accept("main : attente fin observation");
latch.await();
// fin
showInfos.accept("main : fin observation");
这里我们可以看到,如果可观察对象中发生错误,订阅者的 [onCompleted] 方法就不会被调用。因此,我们将 [Observer.onError] 方法修改如下:
@Override
public void onError(Throwable e) {
// erreur d'émission
if (!isUnsubscribed()) {
showInfos.accept(String.format("Subscriber[%s, %s].onError (%s)", observerName, processName, e));
}
// fin blocage thread principal
latch.countDown();
}
我们添加第 7–8 行代码,以便在发生可观察错误时释放锁。使用这段新代码,执行结果如下:
main : début observation ------Thread[main] ---- Time[40:750]
main : attente fin observation ------Thread[main] ---- Time[40:764]
Subscriber[observateur[0], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-1] ---- Time[40:766]
Subscriber[observateur[1], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-2] ---- Time[40:766]
main : fin observation ------Thread[main] ---- Time[40:767]
我们得到了第5行,这是之前没有的。
示例 11 将如下所示:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple11 {
public static void main(String[] args) throws InterruptedException {
// observable configuration
Observable<?> obs1 = Observable.empty();
// observable execution (observation)
ProcessUtils.subscribe(2,new Process<>("process1",obs1));
}
}
第 10 行:静态方法 [Observable.empty] 创建了一个不发出任何元素的可观察对象。它仅发出发射结束通知;
![]() |
执行上述示例中的代码将得到以下结果:
- 第 2 行和第 3 行:我们可以看到,两个观察者都收到了广播结束通知,而此前并未收到任何元素。
有人可能会好奇,这种方法实际上有什么用处。它的用法类似于一个集合:初始为空,随后向其中添加元素:
在第 3 行,我们将初始可观察对象 obs(第 1 行)与其他可观察对象合并。
示例 12 演示了静态方法 [Observable.never]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple12 {
public static void main(String[] args) throws InterruptedException {
// observable configuration
Observable<?> obs1 = Observable.never();
// observable execution (observation)
ProcessUtils.subscribe(2,new Process<>("process1",obs1));
}
}
静态方法 [Observable.never] 会创建一个永远不会发出的可观察对象:
![]() |
运行该示例将得到以下结果:
第 2 行:主线程无限期等待。这是因为没有可观察对象发出 [onCompleted] 通知,该通知会使信号量(屏障)变为绿色(降低屏障)。
7.4. 多线程
7.4.1. 示例-13:操作线程、观察者线程
在第 7.1.3 节中,我们使用静态方法 [Observable.create] 创建了一个可观察对象:
![]() |
- [create] 方法返回类型 Observable<T>;
- [create] 方法的参数是一个类型为 [Observable.OnSubscribe<T>] 的函数,定义如下:
![]() |
类型 [Observable.OnSubscribe<T>] 是一个函数式接口,它本身继承自函数式接口 [Action1<Subscriber<? super T>>]。该接口的 [call] 方法期望接收类型 [Subscriber](即订阅者,观察者)。在本文档的其余部分中,我们将有时将类型 [Observable.OnSubscribe<T>] 称为操作。 我们将创建具有名称的自定义操作。这些操作将是以下 [IProcessAction] 接口的实例:
![]() |
package dvp.rxjava.observables.utils;
import rx.Observable;
public interface IProcessAction<T> extends Observable.OnSubscribe<T> {
// action has a name
public String getName();
}
- 第 5 行:接口 [IProcessAction<T>] 具有接口 [Observable.OnSubscribe<T>] 的所有特征;
- 第 8 行:它还具有一个 [getName] 方法,该方法返回实现该接口的实例的名称;
我们将使用名为 [ProcessAction01] 的以下操作:
package dvp.rxjava.observables.utils;
import java.util.Random;
import rx.Subscriber;
import rx.functions.Func1;
public class ProcessAction01<T> implements IProcessAction<T> {
// data
private String name;
private int nbValues;
private Func1<Integer, T> func1;
// manufacturers
public ProcessAction01(String name, int nbValues, Func1<Integer, T> func1) {
this.name = name;
this.nbValues = nbValues;
this.func1 = func1;
}
@Override
public void call(Subscriber<? super T> subscriber) {
ProcessUtils.showInfos.accept(String.format("Observable (%s) call start", getName()));
for (int i = 0; i < nbValues; i++) {
// waiting
try {
Thread.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
// error
ProcessUtils.showInfos.accept(String.format("Observable (%s) onError", getName()));
subscriber.onError(e);
}
// element emission
T value = func1.call(i);
ProcessUtils.showInfos.accept(String.format("Observable (%s,%s) onNext (%s)", getName(), i, value));
subscriber.onNext(value);
}
// finish
ProcessUtils.showInfos.accept(String.format("Observable (%s) onCompleted", getName()));
subscriber.onCompleted();
}
@Override
public String getName() {
return name;
}
}
- 第 8 行:类 [ProcessAction01<T>] 实现了接口 [IProcessAction<T>],因此也实现了接口 [Observable.OnSubscribe<T>];
- 第 11 行:操作的名称;
- 第 12 行:要发出的值个数;
- 第 13 行:一个类型为 [Func1<Integer, T>] 的实例,它接受一个整数并生成一个类型 T,该类型将由可观察对象(第 35 行和第 37 行)发出;
- 第 16–20 行:我们将动作名称、要发出的值个数以及发射函数传递给构造函数;
- 第 23–42 行:进程代码;
- 第 23 行:[call] 方法的参数是与该进程关联的可观察对象的订阅者;
- 第 28 行:进程在等待随机时长后发出其元素;
- 第 32 行:发出错误;
- 第 37 行:正常发射;
- 第 41 行:发出发射结束通知;
- 第 25–38 行:该操作在经过随机等待时间(第 30 行)后,发射 nbValues 个实数;
- 第 35 行:待发出的值由作为构造函数参数传递的 [func1] 函数提供(第 16 行);
我们重构了 [Process] 类(参见第 7.3.1 节),使其也能通过命名动作进行构造。我们添加了以下构造函数:
public Process(IProcessAction<T> na, Scheduler schedulerObserved, Scheduler schedulerObserver) {
// nom process=nom action
name = na.getName();
// action --> observable
observable = Observable.create(na);
// thread d'exécution du processus observé
if (schedulerObserved != null) {
observable = observable.subscribeOn(schedulerObserved);
}
// thread d'observation de l'observateur
if (schedulerObserver != null) {
observable = observable.observeOn(schedulerObserver);
}
}
- 第 1 行:构造函数接受 3 个参数:
- 用于构建可观察对象的命名操作(第5行);
- 被观察进程的调度器(可以为空);
- 观察者的调度器(可以为 null);
- 第 5 行:根据作为参数传递的操作创建可观察对象;
以下代码 [示例13] 观察了不同的可观察对象:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple13 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 1, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<String> process2 = new Process<>(
new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), Schedulers.computation(), null);
// process 3
Process<Integer> process3 = new Process<>(new ProcessAction01<Integer>("process3", 3, i -> i * 2), null,
Schedulers.computation());
// process 4
Process<Boolean> process4 = new Process<>(new ProcessAction01<Boolean>("process4", 4, i -> i % 2 == 0), null, null);
// subscriptions
ProcessUtils.subscribe(1, process1);
ProcessUtils.subscribe(1, process2);
ProcessUtils.subscribe(1, process3);
ProcessUtils.subscribe(1, process4);
}
}
- 第 13–15 行:process1 在一个计算线程上生成 1 个实数,该值将在另一个计算线程上被观察;
- 第 17–18 行:process2 在一个计算线程上生成 2 个字符串,且未指定观察者的线程。结果显示,观察默认发生在与进程执行相同的线程上;
- 第 20–21 行:process3 在未指定的线程上生成 3 个整数,这些整数将在一个计算线程上被观察。结果表明,该进程默认在主线程上运行;
- 第 23 行:process4 进程在未指定的线程上生成 4 个布尔值,这些值将在未指定的线程上被观察。结果表明,该进程的执行及其观察默认都在主线程上进行;
执行此代码的结果如下:
- process1 进程在计算线程 [RxComputationThreadPool-4] 上生成 1 个实数(第 4 行),该值在计算线程 [RxComputationThreadPool-3] 上被观察到(第 6 行);
- 过程 process2 在计算线程 [RxComputationThreadPool-5] 上生成 2 个字符串(第 12、14 行),并在该线程上被观察到(第 13、15 行);
- process3 在主线程上生成 3 个整数(第 21、23、25 行),这些值在计算线程 [RxComputationThreadPool-6] 上被观察到(第 22、24、28 行);
- process4 进程在主线程上生成 4 个布尔值(第 34、36、38、40 行),并在同一主线程上被观察到(第 33、35、37、39 行);
欢迎读者跟随上述内容:
- 被观察进程及其线程的生命周期;
- 其观察者及其线程的生命周期;
Rx 库的吸引力很大程度上在于这种多线程特性,而开发者无需亲自管理。
7.5. 多个可观察对象的组合
7.5.1. 示例-14:使用 [Observable.merge] 合并两个可观察对象
下面我们将介绍 [Observable] 类的静态方法,这些方法允许将多个可观察对象组合成一个结果可观察对象。
此类示例的第一种如下:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.ProcessAction01;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple14 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process2
Process<String> process2 = new Process<>(
new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), Schedulers.computation(), null);
// merge
Process<?> process12 = new Process<>("process12",
Observable.merge(process1.getObservable(), process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process12);
}
}
- 第 15–17 行:一个名为 [process1] 的进程将在计算线程上发出 3 个实数。它也将在计算线程上被观察;
- 第 19–20 行:名为 [process2] 的进程将在一个计算线程上发出 2 个字符串。未指定观察线程。我们之前看到,在这种情况下,观察线程即为计算线程;
- 第 23 行:两个进程被合并,即创建了一个可观察对象,其元素同时来自这两个进程。为此使用了静态方法 [Observable.merge]:
![]() |
与上图所示不同,在合并过程中,流 1 的元素可以穿插在流 2 的元素之间。执行结果展示了这一点:
- 第 3 行:进程 [process1] 在计算线程 [RxComputationThreadPool-4] 上运行;
- 第 4 行:进程 [process2] 在计算线程 [RxComputationThreadPool-5] 上运行;
- 第 9 行:进程 [process12] 在计算线程 [RxComputationThreadPool-3] 上被观察。我不清楚导致这一选择的规则;
- 第 9–11 行:我们看到,尽管 [process1](第 5 行)和 [process2](第 6、7 行)均未完成,观察者仍会观察到这两个进程的元素(存在混合);
- 当两个进程 process1 和 process2 均结束时,进程 [process12] 终止(第 17 行);
7.5.2. 示例-15:使用 [Observable.concat] 连接两个可观察对象
现在我们将分析以下代码:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.ProcessAction01;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple15 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process2
Process<String> process2 = new Process<>(
new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), null, Schedulers.computation());
// concat
Process<?> process12 = new Process<>("process12",
Observable.concat(process1.getObservable(), process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process12);
}
}
- 第 15–17 行:一个名为 [process1] 的进程将在计算线程上发出 3 个实数。它也将在计算线程上被观察;
- 第 19–20 行:名为 [process2] 的进程将在一个未指定的线程上(此处为默认的主线程)发出 2 个字符串。该进程将在一个计算线程上被观察;
- 第 23 行:将这两个进程进行连接,即创建一个可观察对象,其元素来自这两个进程。输出的值不会被混合。进程 [process12] 将首先输出进程 [process1] 的所有值,然后输出进程 [process2] 的值。为此使用了静态方法 [Observable.concat]:
![]() |
执行结果如下:
- 第 3-10 行:进程 [process1] 运行,进程 [process12] 发布由 [process1] 发出的值;
- 第 9 行:进程 [process1] 已完成;
- 第11-17行:进程 [process2] 运行,进程 [process12] 发布 [process2] 发布的值;
关于进程2有一个特殊之处:我们没有指定执行线程。因此,人们可能会预期默认使用主线程。然而,实际情况并非如此。执行线程是计算线程 [RxComputationThreadPool-3](第11行)。因此,当未指定执行线程或观察线程时,我们无法对将选择哪个线程做出任何假设。
7.5.3. 示例-16:使用 [Observable.zip] 组合两个可观察对象
现在,我们将分析以下代码:
package dvp.rxjava.observables.exemples;
import java.util.Arrays;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.functions.FuncN;
import rx.schedulers.Schedulers;
public class Exemple16 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<String> process2 = new Process<>(
new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), null, null);
// 2-process combination function
FuncN<String> funcn = new FuncN<String>() {
@Override
public String call(Object... args) {
if (args.length == 2) {
return String.format("double=%s, string=%s", args[0], args[1]);
} else {
throw new RuntimeException("la fonction attend 2 paramètres exactement");
}
}
};
// zip of the 2 processes
Process<String> process12 = new Process<>("process12",
Observable.zip(Arrays.asList(process1.getObservable(), process2.getObservable()), funcn));
// subscriptions
ProcessUtils.subscribe(1, process12);
}
}
- 第 16–18 行:一个名为 [process1] 的进程将在一个计算线程上发出 3 个实数。它也将在一个计算线程上被观察;
- 第 20–21 行:名为 [process2] 的进程将在一个未指定的线程上输出 2 个字符串。观察线程同样未指定;
- 第 23–32 行:使用匿名类实例化类型 [FuncN<String>]。FuncN 是一个函数式接口:
![]() |
[FuncN.call] 方法期望接收一个对象数组,并返回类型 R。函数 [funcn] 将用于按该顺序组合进程 process1 和 process2。在 [FuncN.call] 方法中:
- args[0] 将是 Double 类型;
- args[1] 将是 String 类型;
在此,[funcn.call] 的结果将是第 27 行中的字符串。构造此结果无需知道调用方法的参数类型。
这两个过程结合如下:
// zip des 2 processus
Process<String> process12 = new Process<>("process12",
Observable.zip(Arrays.asList(process1.getObservable(), process2.getObservable()), funcn));
[Observable.zip] 方法的工作原理如下:
![]() |
我们可以看到:
- zip 的第一个参数是 Iterable<Observable>。在我们的示例中,我们有一个实际参数,其类型为 List<Observable>,包含我们的两个可观察对象;
- zip 的第二个参数类型为 FuncN。在我们的示例中,实际参数是 [funcn];
执行结果如下:
- 第 7、11 行:process12 发出两个元素;
- 第 8 行:process1 发出的额外元素在 process2 中没有对应项,因此未被结果进程 process12 发出;
我们可以看到,process2 既未被分配执行线程也未被分配观察线程,因此它同时使用了主线程来处理这两者。
7.5.4. 示例-17:使用 [Observable.combineLatest] 组合两个可观察对象
现在我们将分析以下代码:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple17 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Double> process2 = new Process<>(
new ProcessAction01<Double>("process2", 2, i -> new Random().nextInt(200) * 1.4), null,
Schedulers.computation());
// combining the 2 processes
Process<Double> process12 = new Process<>("process12",
Observable.combineLatest(process1.getObservable(), process2.getObservable(), (d1, d2) -> d1 + d2));
// subscriptions
ProcessUtils.subscribe(1, process12);
}
}
- 第 14–16 行:一个名为 [process1] 的进程将在计算线程上发出 3 个实数。它也将在计算线程上被观察;
- 第 18–20 行:名为 [process2] 的进程将在一个未绑定线程上发出 2 个实数。这些值将在一个计算线程上被观察;
- 第 23 行:使用以下静态方法 [Observable.combineLatest] 将这两个可观察对象进行组合:
![]() |
[combineLatest] 可观察对象的工作原理如下:当两个可观察对象中的一个发出元素 E1 时,该元素会通过 [combineFunction] 与另一个可观察对象发出的最后一个元素进行组合。
执行此代码将得到以下结果:
- 第 5 行:process2 的输出 (56) 与 process1 输出的最后一个元素 (54,第 4 行) 组合,产生第 7 行所示的结果;
- 第 6 行:process1 的输出 (51.6) 与 process2 输出的最后一个元素 (56,第 5 行) 组合,产生第 8 行中的结果;
- 第 9 行:process2 的输出(261.8)与 process1 输出的最后一个元素(51.6,第 6 行)相组合,产生第 12 行的结果;
- 第 13 行:process1 的输出(80.39)与 process2 输出的最后一个元素(261.8,第 9 行)相组合,产生第 15 行的结果;
这是 [zip] 可观测量的变体,不同之处在于,此次组合的元素不一定是流中相同位置的元素。请注意,未被分配执行线程的 process2 是在主线程上执行的(第 2 行)。
7.5.5. 示例-18:使用 [Observable.amb] 组合两个可观察对象
现在我们将分析以下代码:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple18 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Double> process2 = new Process<>(
new ProcessAction01<Double>("process2", 2, i -> new Random().nextInt(200) * 1.4), null, null);
// combining the 2 processes
Process<Double> process12 = new Process<>("process12",
Observable.amb(process1.getObservable(), process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process12);
}
}
- 第 14–16 行:一个名为 [process1] 的进程将在一个计算线程上发出 3 个实数。它也将在一个计算线程上被观察;
- 第 18–20 行:名为 [process2] 的进程将在一个未绑定线程上发出 2 个实数。这些值将在一个未绑定线程上被观察;
- 第 22 行:使用以下静态方法 [Observable.amb] 将这两个可观察对象进行组合:
![]() |
如上图所示,可观察对象 [Observable.amb(Observable o1, Observable o2)] 会发出第一个发出元素的可观察对象的元素。这通过所给示例的结果得到了证实:
- 第 4 行:process2 最先发出;
- 第 8、12 行:process12 发布 process2 发布的所有元素(第 4、11 行);
7.6. 可观察对象的处理链
7.6.1. 示例-19:使用 [Observable.map] 转换可观察对象
在之前的示例中,我们探讨了将两个可观察对象组合成第三个可观察对象的各种方式。现在,我们将介绍 [Observable] 类的静态方法,这些方法允许对可观察对象进行转换、过滤和聚合操作。在此,我们将发现与第 5 节中研究的 [Stream] 类的方法类似的方法。
我们的第一个示例如下:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple19 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<String> process2 = new Process<>("process2",
process1.getObservable().map(d -> String.format("valeur-%s", d)));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- 第 14–16 行:一个名为 process1 的进程将在一个计算线程上发出 3 个实数。它还将在一个计算线程上被观察;
- 第 17–18 行:process1 输出的数值将在 process2 中转换为字符串;
- 第 20 行:我们监视 process2;
第 18 行中的 [Observable.map] 方法与第 5.5 节中讨论的 [Stream.map] 方法类似:
![]() |
该示例的运行结果如下:
- 第 4、5 和 8 行:来自 process1 的发射。这些是实数;
- 第 6、7、10 行:来自 process2 的被观察发射。这些是字符串;
7.6.2. 示例-20:使用 [Observable.filter] 对可观测量进行过滤
示例如下:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple20 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>("process2", process1.getObservable().filter(i -> i % 2 == 0));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- 第 11-12 行:一个名为 process1 的进程将在一个工作线程上发出 0 到 2 之间的整数。它还将在另一个工作线程上被监听;
- 第 14 行:process1 输出的数字将被过滤,使得 process2 中仅保留偶数;
- 第 20 行:我们监听 process2;
第 18 行中的 [Observable.filter] 方法与第 5.4 节中讨论的 [Stream.filter] 方法类似:
![]() |
该示例的运行结果如下:
- 第 4、5 和 7 行:来自 process1 的发射;
- 第 6、9 行:来自 process2 的被观察到的发射。这些是来自 process1 的偶数索引元素;
7.6.3. 示例-21:使用 [Observable.flatMap] 转换可观察对象
示例如下:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple21 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMap(i -> {
int value = i * 10;
return Observable.just(value, value + 1, value + 2);
}));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- 第 12–13 行:一个名为 process1 的进程将在一个计算线程上发出 0 到 2 之间的整数。它还将在一个计算线程上被观察;
- 第 15–18 行:process1 发出的每个数字 n 都会被转换为一个可观察对象,该对象会发出 3 个数字 (10*n, 10*n+1, 10*n+2)。 如果我们在第 15 行使用 [map] 方法,process2 发出的将是一个 Observable<Integer> 类型的序列,而非 Integer 类型的序列。所使用的 [flatMap] 方法允许我们将这个 Observable<Integer> 类型的元素序列,展平为一个由每个 Observable<Integer> 中的每个元素组成的 Integer 类型的序列;
- 第 20 行:我们观察 process2;
第 15 行中的 [Observable.flatMap] 方法与第 5.6.12 节中讨论的 [Stream.flatMap] 方法类似:
![]() |
该示例的运行结果如下:
- 第 5-7 行:process1 第 4 行发出数据后,process2 发出的三条数据;
- 第 9-11 行:process1 第 8 行发出信号后,process2 发出的三个信号;
- 第14-16行:process2在process1第12行发出信号后发出的三个信号;
以下代码演示了如何根据 process1 创建 Observable<Integer[]> 类型 [示例21b]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21b {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer[]> process2 = new Process<>("process2", process1.getObservable().map(i -> {
int value = i * 10;
return new Integer[] { value, value + 1, value + 2 };
}));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- 第 14 行:使用了 [Observable.map] 方法;
- 第 16 行:该方法返回 Integer[] 类型;
结果如下:
- 第 6、7、10 行:我们看到了 map 的结果;
所有这些可观测变换都可以进行链式调用,因为每次变换都会产生一个新的可观测量。以下示例 [Example21c] 演示了这一点:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple21c {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMap(i -> {
int value = i * 10;
return Observable.just(value, value + 1, value + 2);
}).filter(i -> i % 2 == 0));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- 第 15–18 行:flatMap 之后接了一个 filter;
执行结果如下:
- 第 8-13 行:process2 仅从 flatMap 中发出了偶数元素;
与 [flatMap] 类似的方法是 [flatMapIterable] 方法,如下例 [Example21d] 所示:
package dvp.rxjava.observables.exemples;
import java.util.Arrays;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21d {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMapIterable(i -> {
int value = i * 10;
return Arrays.asList(value, value + 1, value + 2);
}).filter(i -> i % 2 == 0));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
第 16 行:我们不再使用 [flatMap] 方法,而是使用 [flatMapIterable] 方法。在这种情况下,转换函数必须返回 Iterable<T> 类型(第 18 行),而不是 Observable<T> 类型。
我们得到了与之前相同的结果。
让我们回到 [flatMap] 方法的定义:
![]() |
如上所示,在两个绿色元素 [1-2] 之间插入了一个蓝色元素 [3]。这意味着在对 Observable<T> 进行扁平化时,[flatMap] 方法会保留这些内部可观察对象的发射顺序。以下示例 [Example21e] 对此进行了演示:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21e {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
Schedulers.computation(), Schedulers.computation());
// process 3
Process<Integer> process3 = new Process<>("process3",
process1.getObservable().flatMap(i -> process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process3);
}
}
- 第 11-12 行:process1 进程发出整数 [0,1];
- 第 14-15 行:process2 发出整数 [10,11,12];
- 第 17-18 行:process1 发出的每个元素都与 process2 的可观测量相关联。这意味着:
- 进程1的元素 [0] 将与一个输出 [10,11,12] 的可观测量相关联;
- 元素 1 也是如此;
最终,将发出 6 个数字 [10, 11, 12, 10, 11, 12]。我们想看看它们的顺序。
执行结果如下:
我们可以看到,process3 的发射顺序是:[10, 10, 11, 12, 11, 12](第 11、12、14、17、19、22 行)。因此,process2 发射的元素确实被打乱了。 我们可以通过使用 [concatMap] 方法代替 [flatMap] 方法来避免这种情况。以下代码 [Example21ef] 演示了这一点:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21ef {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
Schedulers.computation(), Schedulers.computation());
// process 3
Process<Integer> process3 = new Process<>("process3",
process1.getObservable().concatMap(i -> process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process3);
}
}
在第 18 行,我们将 [flatMap] 替换为 [concatMap]。执行结果如下:
我们可以看到,进程3的发射顺序为:[10, 11, 12, 10, 11, 12](第12–14、17、19、22行)。进程2发射的元素并未被打乱顺序。
[map] 方法的另一种变体是 [switchMap] 方法:
![]() |
在上例中,从可观察对象 [1] 创建了另外三个包含两个元素的可观察对象 [2],随后像 [flatMap] [3] 那样将其展平。请注意,结果包含 5 个元素,而非 6 个。这是因为在第二个可观察对象发出其第二个元素 [6] 之前,第三个可观察对象已发出其第一个元素 [5],导致第二个可观察对象被丢弃。 因此,在生成的可观察对象 [3] 中找不到元素 [6]。
为说明 [switchMap],我们将使用以下示例 [Example21eg]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21eg {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
Schedulers.computation(), Schedulers.computation());
// process 3
Process<Integer> process3 = new Process<>("process3",
process1.getObservable().switchMap(i -> process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process3);
}
}
运行该示例将产生以下结果:
- process1 发出 2 个元素,从而产生 2 个包含 3 个元素的 process2 可观察对象;
- 第 14 行:观察者接收第 6 行中第一个 process2 可观察对象发出的第 0 个元素;
- 第15行:观察者接收了第13行第二个process2可观量发出的第0个元素。该描述未解释为何观察者此前未接收第7行和第8行第一个process2可观量发出的第1和第2个元素。无论如何,第一个process2可观量已被放弃;
- 最终,观察者仅看到4个元素(第14、15、17、20行),而非实际发出的6个;
7.6.4. 示例-22:[Observable] 类的其他方法
[Observable]类包含许多来自[Stream]类的、工作方式类似的方法。以下列举其中几个。我们将仅提供代码及其结果。
[示例22a - take=limit]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22a {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.range(1, 10).take(3));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
结果
[示例22b - takeLast]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22b {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.range(1, 10).takeLast(2));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
结果
[示例22c - 跳过]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22c {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.range(1, 10).skip(5).take(2));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
结果
[示例22d - reduce]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22d {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.range(1, 10).reduce(0, (i, a) -> i + a));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
- 第 10 行:计算可观察对象中元素的总和。结果是一个会发布该总和的可观察对象;
结果
[示例 22e - 全部]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22e {
public static void main(String[] args) throws InterruptedException {
// process
Process<Boolean> process = new Process<>("process", Observable.range(1, 10).all(i -> i > 10));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
- 第 10 行:返回一个 Observable<Boolean>,如果 [all] 方法的谓词对所有元素都为真,则发出 true 元素;否则发出 false;
结果
[示例22f - 计数]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22f {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.range(1, 10).count());
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
- 第 10 行:[Observable.count] 创建了一个包含 1 个元素的可观察对象,该对象是所观察元素的总和;
结果
[示例22g - 唯一]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22g {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.just(1, 2, 1, 3).distinct());
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
结果
[ Example22h - groupBy, asObservable]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.observables.GroupedObservable;
public class Exemple22h {
public static void main(String[] args) throws InterruptedException {
// process
Observable<GroupedObservable<Boolean, Integer>> obs = Observable.range(1, 10).groupBy(i -> i % 2 == 0);
Process<Integer> process = new Process<>("process", obs.concatMap(g -> g.asObservable()));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
- 第 11 行:[groupBy] 方法将发出的 10 个元素分为两组:偶数和奇数。结果是一个 Observable<GroupedObservable<Boolean, Integer>>, 即一个其元素类型为 GroupedObservable<Boolean, Integer> 的可观察对象,其中 Boolean 是分组键的类型(本例中为 false、true),也是作为参数传递给 [groupBy] 方法的 lambda 表达式的返回类型,而 Integer 是分组中元素的类型;
- 第 12 行:GroupedObservable 类型提供了一个 [asObservable] 方法,允许我们以此类型创建可观察序列。因此我们将得到两个 Observable<Integer> 类型,一个用于偶数,另一个用于奇数。基于这两个可观察序列,[concatMap] 方法将生成一个新的可观察序列;
结果
[示例22i - 时间戳]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
import rx.schedulers.Timestamped;
public class Exemple22i {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Timestamped<Integer>> process2 = new Process<>("process2", process1.getObservable().timestamp());
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- 第 15 行,[timestamp] 方法为可观察对象的每个已处理元素关联一个时间戳;
结果
在此示例中,很难判断时间戳信息代表什么:
- 第4-5行:我们看到进程1的元素1在元素0发出后139毫秒被发出;
- 第 6 行和第 7 行:我们看到 process2 的元素 1 在元素 0 之后 234 毫秒被观察到;
- 第 5、8 行:我们看到 process1 的元素 2 在元素 1 发出 33 毫秒后被发出;
- 第 7 行和第 10 行:我们看到 process2 的元素 2 在元素 1 之后 37 毫秒被观察到;
这些延迟是由于观察可观察量和执行可观察量的线程不同所致。如果我们将第 12–13 行替换为以下代码(示例 22j):
// processus 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
null);
- 第 2–3 行:我们未指定观察线程。我们知道,在这种情况下,可观察对象会在其执行位置被观察;
这将产生以下结果:
- 第 4 行和第 6 行:process1 在发出元素 #0 之后 587 毫秒发出元素 #1;
- 第 5 行和第 7 行:观察者以 586 毫秒的间隔观察到这两个元素;
- 第6行和第8行:process1在发出元素#1后396毫秒发出元素#2;
- 第 7 行和第 9 行:观察者观察到这两个元素的时间差为 396 毫秒;
此处的时间戳值是一致的:它们准确地反映了元素的传输时间。
7.7. 调度器
7.7.1. 示例-23:[Schedulers.computation] 调度器
接下来我们将探讨执行调度器。我们将重点观察执行线程。
调度器的主题有些晦涩难懂。StackOverflow 网站上的这个问题 [http://stackoverflow.com/questions/31276164/rxjava-schedulers-use-cases] 介绍了各种调度器:
![]() |
我们将尝试通过示例说明这些不同调度器的用法。第一个示例演示了 [Schedulers.computation] 调度器:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple23 {
public static void main(String[] args) throws InterruptedException {
// processes
@SuppressWarnings("unchecked")
Process<Double> processes[] = new Process[10];
for (int i = 0; i < processes.length; i++) {
processes[i] = new Process<>(
new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
Schedulers.computation(), null);
}
// subscriptions
ProcessUtils.subscribe(1, processes);
}
}
- 第 14–19 行:我们创建一个包含 10 个进程的数组,这些进程在计算线程上运行;
- 第 17 行:每个进程生成一个随机实数;
- 第21行:我们订阅了所有这些进程;
结果如下:
- 第 2-10 行:前 8 个进程在 8 个不同的线程上启动(所用机器有 8 个核心)。请注意,它们都大约在同一时间启动;
- 第 17-19 行:3 个进程终止,从而释放了 3 个线程;
- 第23-24行:最后两个进程随后可以利用这3个已释放的线程中的2个来启动;
因此我们可以得出结论:[Schedulers.computation] 调度器提供了一个包含 n 个线程的线程池,其中 n 是机器上的核心数。这些线程将在这些核心上并行执行。
7.7.2. 示例-24:[Schedulers.io] 调度器
我们使用 [Schedulers.io] 调度器运行前面的代码:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple24 {
public static void main(String[] args) throws InterruptedException {
// processes
@SuppressWarnings("unchecked")
Process<Double> processes[] = new Process[10];
for (int i = 0; i < processes.length; i++) {
processes[i] = new Process<>(
new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
Schedulers.io(), null);
}
// subscriptions
ProcessUtils.subscribe(1, processes);
}
}
- 第 18 行:进程使用调度器的线程运行 [Schedulers.io];
这将产生以下结果:
- 第 2-10 行:10 个进程各自在不同的线程上启动。与前一个案例不同,所有进程均成功启动。请注意,这些启动操作耗时 6 毫秒,而之前仅需 1 毫秒;
- 第13-18行:可观察对象依次发出信号,远不如之前那样近乎并行;
[Schedulers.io] 和 [Schedulers.computation] 调度器之间有何区别?答案可参见以下链接 [http://stackoverflow.com/questions/31276164/rxjava-schedulers-use-cases]:
![]() |
7.7.3. 示例-25:[Schedulers.newThread] 调度器
我们使用 [Schedulers.newThread] 调度器运行前面的代码:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple25 {
public static void main(String[] args) throws InterruptedException {
// processes
@SuppressWarnings("unchecked")
Process<Double> processes[] = new Process[10];
for (int i = 0; i < processes.length; i++) {
processes[i] = new Process<>(
new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
Schedulers.newThread(), null);
}
// subscriptions
ProcessUtils.subscribe(1, processes);
}
}
所得结果与 [Schedulers.io] 调度器相同:
在 URL [http://stackoverflow.com/questions/33415881/retrofit-with-rxjava-schedulers-newthread-vs-schedulers-io] 中解释道,调度器 [Schedulers.io] 提供了一个线程池,而调度器 [Schedulers.newThread] 则不提供。 线程池会自动创建一组线程,并将它们分配给需要它们的进程。当这些进程完成后,其线程不会被删除,而是返回线程池,供其他进程重复使用。这比不断创建和删除线程更为高效。因此,建议使用 [Schedulers.io] 调度器。
7.7.4. 示例 26:调度器 [Schedulers.immediate, Schedulers.trampoline]
让我们回到对这两个调度器的说明:
![]() |
这个解释其实很容易理解,但当你试图用示例说明时,就会发现自己其实并没有真正掌握它。正是《用 Java 8 学习响应式编程》这本书帮助我创建了一个示例,它基于书中一个示例,但进行了简化。如下所示:
package dvp.rxjava.observables.exemples;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.function.Consumer;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Scheduler;
import rx.Scheduler.Worker;
import rx.functions.Action0;
import rx.schedulers.Schedulers;
public class Exemple26 {
public static void main(String[] args) throws InterruptedException {
// a scheduler
Scheduler scheduler = Schedulers.immediate();
// a worker of this scheme
Worker worker = scheduler.createWorker();
// an Action0 type to be executed on the worker
Action0 action02 = new Action0() {
@Override
public void call() {
// log action02
ProcessUtils.showInfos.accept("action02");
}
};
// an Action0 type to be executed on the worker
Action0 action01 = new Action0() {
@Override
public void call() {
// program a new action on the same worker
worker.schedule(action02);
// log action01
ProcessUtils.showInfos.accept("action01");
}
};
// action01 is programmed on the worker
worker.schedule(action01);
}
// displays
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
- 第 17 行:一个调度器。这可以是此处所示的 [Schedulers.immediate],也可以是后文提到的 [Schedulers.trampoline];
- 第 19 行:类型为 Action0 的操作(第 21、20 行)可在调度器的 worker 上执行。[Scheduler.createWorker] 方法用于创建一个 worker。[Worker.schedule(Action0)] 方法通过 worker 执行类型为 Action0 的操作;
- 第 21–27 行:第一个名为 [action02] 的操作,将由第 19 行创建的 worker 执行(第 40 行);
- 第 30–38 行:第二个名为 [action01] 的动作。它具有一个特殊特性,即会触发 action02 在与其相同的 worker 上执行(第 34 行)。这正是 [Schedulers.immediate] 与 [Schedulers.trampoline] 之间的区别所在:
- 如果调度器是 [Schedulers.immediate],那么在第 34 行,动作 action02 将立即执行(因此得名),而当前正在运行的动作 action01 将被中断。随后我们将看到第 25 行的消息出现。一旦 action02 完成,action01 将恢复运行,我们将会看到第 36 行的消息;
- 如果调度器是 [Schedulers.trampoline],那么在第 34 行,操作 action02 会被加入队列。它不会被执行,直到当前任务 action01 完成。随后第 36 行的消息将出现。一旦 action01 完成,action02 将被执行,第 25 行的消息将出现;
执行上述代码将得到以下结果:
如果在第 17 行使用调度器 [Schedulers.trampoline],则会得到相反的结果:
话虽如此,要将其与可观察对象建立联系却颇具难度。我尚未找到一个能充分展示在上述两个线程之一上执行可观察对象所带来益处的令人信服的示例。不过这里有一个示例,但我觉得它完全不自然:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.Scheduler.Worker;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
public class Exemple27 {
public static void main(String[] args) throws InterruptedException {
// Worker
Worker worker = Schedulers.immediate().createWorker();
// Worker worker = Schedulers.trampoline().createWorker();
// observable 1 sur worker
worker.schedule(() -> Observable.range(1, 2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
ProcessUtils.showInfos.accept(String.valueOf(i));
// observable 2 on same worker
worker.schedule(() -> Observable.range(100, 2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
ProcessUtils.showInfos.accept(String.valueOf(i));
}
}));
}
}));
}
}
- 第 13–14 行:使用两个调度器 [Schedulers.immediate] 和 [Schedulers.trampoline] 中的一个创建一个工作者;
- 第 16 行:第一个可观察量 obs1 被调度到该 worker 上,以发出数值 [1,2]
- 第 22 行:每次观察到该可观察对象 obs1 的某个元素时,都会在同一工作线程上启动对第二个可观察对象 obs2 的观察,以发出数值 [100,101];
使用 [Schedulers.immediate] 调度器,我们得到以下结果:
而使用 [Schedulers.trampoline] 调度器时,我们得到以下结果:
7.8. 结论
仍有许多工作需要完成。为了更深入地理解 RxJava 库,建议读者继续参考本文开头提供的资料进行学习。尽管如此,我们现在已经掌握了在 Swing 和 Android 环境中使用 RxJava 所需的基础知识。接下来我们将对此进行演示。








































