7. مكتبة RxJava
تستند مكتبة RxJava إلى المفهوم التالي: يتم مراقبة دفق العناصر من النوع T Observable<T> بواسطة مشترك واحد أو أكثر (مشتركين، مراقبين، مستهلكين) Subscriber<T>. تسمح مكتبة RxJava بتشغيل دفق Observable<T> في مؤشر الترابط T1 ومراقب Subscriber<T> الخاص به في مؤشر الترابط T2 دون أن يضطر المطور إلى القلق بشأن إدارة دورة حياة مؤشرات الترابط هذه أو المشاكل الصعبة بطبيعتها، مثل مشاركة البيانات بين مؤشرات الترابط ومزامنتها لتنفيذ مهمة عامة. وبالتالي، فهي تسهل البرمجة غير المتزامنة.
ينتج تيار Observable<T> عناصر من النوع T، والتي يمكن ملاحظتها أثناء إنتاجها. إذا كان المراقب والمراقب (مصطلح يستخدم بشكل عام للإشارة إلى نوع Observable<T>) في نفس الخيط، فإن المراقب لا يمكنه إنتاج العنصر (i+1) إلا بعد أن يستهلك المراقب العنصر i. هناك حالات قليلة تكون فيها هذه البنية مفيدة. إذا لم يكن المراقب والمراقب في نفس الخيط، فإن المراقب ومراقبه يتصرفان بشكل مستقل: يصدر المراقب وفقًا لسرعته الخاصة ويستهلك المراقب وفقًا لسرعته الخاصة. هنا تكمن قيمة المكتبة. حتى الآن، لم نناقش سوى مراقب واحد. في الواقع، يمكن أن يكون للمراقب أي عدد من المراقبين.
تعد مكتبة RxJava مناسبة بشكل خاص للبنية الموضحة في القسم 2 من المقدمة والموجزة هنا:

- في [1]، توفر طبقة الخدمة خدمات، يستغرق الحصول على بعضها وقتًا طويلاً (طلبات الشبكة، على سبيل المثال)؛
- يتم استدعاء طبقة الخدمة هذه بواسطة واجهة مستخدم رسومية [1] (Swing، Android، JavaFX). إذا كانت طبقة الخدمة تعمل في نفس مؤشر الترابط الذي تعمل فيه طريقة [Swing] التي تستخدمها، فإن واجهة المستخدم الرسومية تتجمد (تصبح غير مستجيبة) أثناء انتظار نتيجة الخدمة؛
- في [2]، تسمح طبقة تكييف رقيقة تم تنفيذها باستخدام RxJava بعرض طبقة واجهة المستخدم الرسومية بتنفيذ غير متزامن لنفس الخدمة: يمكن أن تعمل هذه الخدمة في مؤشر ترابط مختلف عن طريقة طبقة واجهة المستخدم الرسومية التي تستدعيها. في هذه الحالة، تظل واجهة المستخدم الرسومية [3] مستجيبة: يمكن للمستخدم الاستمرار في التفاعل معها، على سبيل المثال عن طريق تشغيل طلب شبكة جديد بالتوازي مع الأول، والأهم من ذلك، يمكن منح المستخدم خيار إلغاء العمليات التي تستغرق وقتًا طويلاً — وهو أمر مستحيل إذا كانت واجهة المستخدم الرسومية متجمدة؛
- الاستدعاء [4] متزامن، في حين أن الاستدعاءات [5-6] غير متزامنة؛
في هذه البنية، توفر الطبقة [2] خدمات تُرجع أنواع Observable<T> التي يمكن لطرق الطبقة الرسومية [3] الاشتراك فيها. ثم تقوم خدمة في الطبقة [2] بتسليم نتائجها واحدة تلو الأخرى، ويمكن للطبقة [3] الاستجابة لكل منها، على سبيل المثال عن طريق تحديث مكون واحد أو أكثر من مكونات واجهة المستخدم الرسومية.
تحتوي فئة Observable<T> على عشرات الطرق. وهذا أحد التحديات التي تواجه المكتبة: فهي غنية جدًا، ومن الصعب استيعاب كل إمكانياتها. سنقدم بعضًا منها. وسيأتي إتقان الطرق الأخرى مع الوقت.
7.1. إنشاء العناصر القابلة للمراقبة والاشتراك فيها
7.1.1. مثال-01: طريقة [Observable.from]
![]() |
انظر إلى الشفرة التالية:
package dvp.rxjava.observables;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import java.util.Arrays;
public class Exemple01 {
public static void main(String[] args) {
// observable integers
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
obs1.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.printf("next : %s%n", integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println(throwable);
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed");
}
});
}
}
- السطر 12: نقوم بإنشاء نوع Observable<Integer> من قائمة من الأعداد الصحيحة.
فئة Observable<T> هي دفق من العناصر من النوع T التي يمكن ملاحظتها — ويفضل أن يكون ذلك بشكل غير متزامن، ولكن ليس بالضرورة — أثناء إنتاجها. وتعريفها كما يلي:
![]() |
كما ذكرنا سابقًا، تحتوي فئة Observable<T> على عشرات الطرق. بعضها مشابه لتلك الموجودة في فئة Stream<T> التي تمت مناقشتها في القسم 5. تتضمن وثائق RxJava "مخططات الكرات" [2] التي توضح كيفية عمل هذه الطرق:
- يوضح السطر 3 انبعاثات المراقب بمرور الوقت؛
- يتم تطبيق الطريقة [4] على العناصر التي تصدرها المراقبة. وهي تنتج عمومًا مراقبة جديدة؛
- يُظهر السطر 5 المراقب الجديد الذي تم الحصول عليه؛
تتميز طريقة [Observable.from] بالسمات التالية:
![]() |
تتيح لك الطريقة الثابتة [Observable.from] إنشاء كائن Observable<T> من مجموعة من العناصر من النوع T. وهذه طريقة بسيطة للغاية للبدء في استخدام الكائنات القابلة للمراقبة. السطر:
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
سيصدر ثلاثة عناصر. ولا يصدرها على الفور. بل سيصدرها بالكامل في كل مرة يسجل فيها مشترك. وهذا ما يُسمى بـ cold observable. يعيد العنصر القابل للمراقبة إصدار عناصره لكل مشترك جديد.
يمكننا اعتبار العبارة السابقة بمثابة إجراء تكوين للمراقب. يتم تكوينه مرة واحدة ويتم تنفيذه n مرات في حالة ظهور n مشترك.
كيف يمكنك الاشتراك؟
إحدى طرق القيام بذلك هي استخدام طريقة [Observable.subscribe]، التي يُستخدم تعريفها هنا على النحو التالي:
![]() |
- المعلمة الأولى [Action1<T> onNext] (انظر القسم 6.2) للطريقة هي الطريقة التي سيتم تنفيذها عندما يصدر المراقب عنصرًا جديدًا T؛
- المعلمة الثانية [Action1<Throwable> onError] للطريقة هي الطريقة التي سيتم تنفيذها عندما يرمي المراقب استثناءً؛
- المعلمة الثالثة [Action0 onComplete] (انظر القسم 6.1) للطريقة هي الطريقة التي سيتم تنفيذها عندما يصدر المراقب استثناءً؛
- تُرجع الطريقة نوع [Subscription]؛
يمثل النوع [Subscription] اشتراكًا في العنصر القابل للمراقبة. وتعريفه كما يلي:
![]() |
تكمن قيمة هذه الواجهة [1] في طريقتها [2]، التي تسمح بإلغاء الاشتراك.
في مثالنا، يكون كود الاشتراك في المراقب كما يلي:
obs1.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.printf("next : %s%n", integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println(throwable);
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed");
}
});
- السطر 1: يتم تجاهل النتيجة من النوع [Subscription]؛
- الأسطر 1–15: المعلمات الثلاثة هي مثيلات لفئات مجهولة. سنستخدم أيضًا لامبدا. ميزة الفئات المجهولة هي أن أنواع البيانات المتوقعة من قبل الطريقة الوحيدة لهذه الفئات واضحة للعيان؛
- الأسطر 2–5: تنفيذ المعلمة الأولى من النوع [Action1<Integer>]؛
- الأسطر 6–10: تنفيذ المعلمة الثانية من النوع [Action1<Throwable>]؛
- الأسطر 11–15: تنفيذ المعلمة الثالثة من النوع [Action0]؛
الرمز الكامل هو كما يلي:
package dvp.rxjava.observables;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import java.util.Arrays;
public class Exemple01 {
public static void main(String[] args) {
// observable integers
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
// subscription
obs1.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.printf("next : %s%n", integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println(throwable);
}
}, new Action0() {
@Override
public void call() {
System.out.println("completed");
}
});
}
}
يبدأ العنصر القابل للملاحظة في السطر 12 في إصدار عناصره الثلاثة بمجرد استدعاء طريقة [subscribe] في السطر 14. ومن تلك النقطة فصاعدًا:
- يتم تنفيذ الأسطر 15-18 لكل عنصر يتم إرساله.
- عند الانتهاء من العناصر الثلاثة، يتم تنفيذ الأسطر 24-29؛
- لن يتم تنفيذ الأسطر 19–24 أبدًا لأن المراقب لا يصدر استثناءً هنا؛
بشكل افتراضي، يعمل المراقب والمراقب في نفس الخيط. هناك بعض المراقبين المحددين مسبقًا الذين يعملون في خيط آخر غير الخيط الرئيسي (هنا، خيط طريقة main)، ولكن بالنسبة لمعظمهم، هذا ليس هو الحال. لذا هنا، كل شيء يحدث في خيط طريقة main:
- يصدر المراقب العنصر 1؛
- يتم تنفيذ الأسطر 15–18 وعرض هذا العنصر؛
- يصدر العنصر القابل للمراقبة العنصر 2؛
- تقوم الأسطر 15–18 بتنفيذ هذا العنصر وعرضه؛
- يُصدر العنصر القابل للملاحظة العنصر 3؛
- تقوم الأسطر 15-18 بتنفيذ وعرض هذا العنصر؛
- يصدر العنصر القابل للمراقبة إشعار [completed]؛
- يتم تنفيذ الأسطر 24-29؛
وهذا ما تظهره النتائج:
تعيد فئة [Example02] استخدام [Example01]، ولكن هذه المرة باستخدام دوال لامدا كمعلمات لطريقة [Observable.subscribe]:
package dvp.rxjava.observables;
import java.util.Arrays;
import rx.Observable;
public class Exemple02 {
public static void main(String[] args) {
// observable integers
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
// subscription
obs1.subscribe(
(integer) -> System.out.printf("next : %s%n", integer),
(th) -> System.out.println(th),
() -> System.out.println("completed"));
}
}
7.1.2. مثال-03: فئة المراقب
![]() |
تتوفر طريقة [Observable.subscribe]، التي تتيح لك الاشتراك في كائن قابل للمراقبة، بعدة إصدارات، منها ما يلي:
package dvp.rxjava.observables;
import java.util.Arrays;
import rx.Observable;
import rx.Observer;
public class Exemple03 {
public static void main(String[] args) {
// observable integers
Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3));
// subscription
obs1.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("completed");
}
@Override
public void onError(Throwable th) {
System.out.printf("throwable %s", th);
}
@Override
public void onNext(Integer integer) {
System.out.printf("next : %s%n", integer);
}
});
};
}
السطر 13: بدلاً من تمرير ثلاثة معلمات إلى طريقة [subscribe]، نمرر إليها نوع [Observer] كما يلي:
![]() |
نوع [Observer] هو واجهة تحتوي على ثلاث طرق:
- [onNext(T t)]، والتي يتم استدعاؤها في كل مرة يصدر فيها المراقب عنصر t؛
- [onError(Throwable th)]، والتي يتم استدعاؤها عندما يرمي المراقب استثناءً th؛
- [onCompleted]، والتي يتم استدعاؤها عندما يشير العنصر القابل للمراقبة إلى أنه قد انتهى من الإرسال؛
يعمل الكود بشكل مشابه لما تم شرحه سابقًا. نحصل على النتائج التالية:
7.1.3. مثال-04: طريقة [Observable.create]
![]() |
يتم تعريف الطريقة الثابتة Observable.create على النحو التالي:
![]() |
- تُرجع الطريقة [create] نوع Observable<T>؛
- معلمة طريقة [create] هي دالة من النوع [Observable.OnSubscribe<T>] معرَّفة على النحو التالي:
![]() |
النوع [Observable.OnSubscribe<T>] هو واجهة وظيفية تمتد بدورها إلى الواجهة الوظيفية [Action1<Subscriber<? super T>>]. تتوقع طريقة [call] لهذه الواجهة نوع [Subscriber] (المشترك، المراقب) المُعرَّف على النحو التالي:
![]() |
نرى في [1] أن الفئة [Subscriber<T>] تنفذ الواجهة [Observer<T>] المعروضة في القسم 7.1.2.
في النهاية، تأخذ الطريقة [<T> Observable.create]:
- تأخذ كمعلمة مثيل من النوع [Observable.OnSubscribe<T>] مع طريقة واحدة: void call(Subscriber<T> s). النوع [Subscriber<T>] يمتد من النوع [Observer<T>] وبالتالي يحتوي على الطرق onNext و onError و onCompleted؛
- تُرجع نوع Observable<T>؛
تُرجع الطريقة [<T> Observable.create] كائنًا قابلًا للمراقبة تم تكوينه. لم يتم إصدار أي عناصر بعد. عندما يشترك مشترك [Subscriber<T> s] في هذا المراقب، يتم عندئذٍ استدعاء الطريقة [void call(s)] للدالة التي تم تمريرها كمعلمة إلى الطريقة [<T> Observable.create]. وتتمثل مهمتها في إصدار عناصر t من النوع T واستدعاء طريقة المراقب [s.onNext(t)] عند كل إصدار. عند اكتمال ذلك، يجب استدعاء طريقة المراقب [s.onCompleted(t)] ويجب إنهاء طريقة [call]. إذا واجهت طريقة [call] استثناءً th، فيجب استدعاء طريقة المراقب [s.onError(th)] ويجب إنهاء طريقة [call]؛
لتوضيح هذا السلوك المعقد، سنستخدم الكود التالي [Example04]:
package dvp.rxjava.observables;
import rx.Observable;
import rx.Subscriber;
import java.util.Random;
public class Exemple04 {
public static void main(String[] args) {
// observable configuration of reals
Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
@Override
public void call(Subscriber<? super Double> subscriber) {
for (int i = 0; i < 3; i++) {
// emission element i
subscriber.onNext(new Random((i + 1)).nextDouble());
}
// end of issue
subscriber.onCompleted();
}
});
// subscription and therefore emission
obs1.subscribe((d) -> System.out.printf("onNext %s%n", d), (th) -> System.out.printf("onError %s%n", th),
() -> System.out.println("onCompleted"));
}
}
- السطر 11: يتم إنشاء عنصر قابل للمراقبة يصدر أنواع Double؛
- الأسطر 11–21: يتم إنشاء مثيل لمعلمة طريقة [create] باستخدام فئة مجهولة تحتوي على طريقة [call] الوحيدة من الأسطر 12–20. العنصر القابل للمراقبة الذي تم إنشاؤه في السطر 11 جاهز للإصدار، ولكنه لن يصدر إلا عند وصول مراقب؛
- الأسطر 13–21: تتلقى طريقة [call] مرجعًا إلى مراقب؛
- الأسطر 14–17: يتم إرسال ثلاثة عناصر إلى المراقب؛
- السطر 19: يُعلم المراقب بأن الإرسال قد انتهى؛
- الأسطر 23-24: الاشتراك في العنصر القابل للمراقبة من السطر 11. نقوم بتنفيذ المعلمات الثلاثة [onNext، onError، onCompleted] لطريقة [subscribe] باستخدام ثلاثة لامبدا. سيؤدي هذا الاشتراك إلى إنشاء المشترك [Subscriber<Double>]، والذي سيتم تمريره إلى طريقة [call] في السطر 13. سيبدأ بعد ذلك بث العناصر؛
- كل شيء يحدث في نفس الخيط: القابل للمراقبة والمراقب؛
نحصل على النتائج التالية:
تسمح لك طريقة [Observable.create] بإنشاء عنصر قابل للمراقبة من أي حدث. هذه هي الطريقة التي استخدمناها في القسم 2 من المقدمة لتحويل واجهة متزامنة إلى واجهة غير متزامنة.
7.1.4. مثال-05: إعادة هيكلة [مثال-04]
![]() |
يقدم المثال التالي نسخة جديدة من الطريقة الثابتة [Observable.subscribe]:
package dvp.rxjava.observables;
import rx.Observable;
import rx.Subscriber;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
public class Exemple05 {
public static void main(String[] args) {
// configuration of a real observable
Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
@Override
public void call(Subscriber<? super Double> subscriber) {
showInfos("Observable.call start");
for (int i = 0; i < 3; i++) {
// waiting
try {
Thread.sleep(500 - i * 100);
} catch (InterruptedException e) {
// error
subscriber.onError(e);
}
// action
double value = new Random().nextInt(100) * 1.2;
showInfos(String.format("Observable.call onNext(%s)", value));
subscriber.onNext(value);
}
// finish
showInfos(String.format("Observable.call onCompleted"));
subscriber.onCompleted();
}
});
// a subscriber
Subscriber<Double> subscriber = new Subscriber<Double>() {
@Override
public void onCompleted() {
showInfos("Subscriber.onCompleted");
}
@Override
public void onError(Throwable e) {
showInfos(String.format("Subscriber.onError (%s)", e));
}
@Override
public void onNext(Double aDouble) {
showInfos(String.format("Subscriber.onNext (%s)", aDouble));
}
};
// subscription
showInfos("avant souscription");
obs1.subscribe(subscriber);
showInfos("après souscription");
}
private static void showInfos(String message) {
System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
new SimpleDateFormat("ss:SSS").format(new Date()));
}
}
- السطر 56: النسخة الجديدة من الطريقة الثابتة [Observable.subscribe] تقبل نوع [Subscriber] كمعلمة، وهو ما قدمناه في الفقرة السابقة؛
- الأسطر 37–52: المشترك (المراقب). وهو ينفذ واجهة Observer مع طرقها الثلاث onNext و onError و onCompleted؛
- الأسطر 61–64: من الآن فصاعدًا، سنركز على الخيوط التي يعمل فيها المراقب والمراقب الخاص به؛
- السطر 62: اسم الخيط؛
- السطر 63: الوقت الحالي معبراً عنه بالثواني والميلي ثانية. سيسمح لنا هذا بتتبع انبعاث العناصر من قبل المراقب ومعالجتها من قبل المراقب بمرور الوقت؛
- هذا الكود له نفس وظيفة الكود السابق. لقد قمنا ببساطة بإعادة هيكلة هذا الأخير؛
النتائج التي تم الحصول عليها هي كما يلي:
- السطر 1 من النتائج: قبل السطر 56 من الكود، لم يحدث شيء بعد. تم تكوين المراقب ببساطة؛
- السطر 2 من النتائج: السطر 56 من الكود يطلق استدعاءً لطريقة [call] في السطر 15. السطر 3: يتم إرسال الرقم الحقيقي 80.39 إلى المراقب؛
- السطر 4: يتلقى المراقب الرقم الذي تم إرساله؛
- الأسطر 5–8: تتكرر العملية السابقة مرتين؛
- السطر 9: يرسل المراقب إشعار نهاية البث؛
- السطر 10: يتلقى المراقب الإشعار؛
- السطر 11: يتم عرضه بواسطة السطر 57 من الكود؛
يمكننا أن نرى، بالتالي، أن سطر الاشتراك الوحيد 56 تسبب في عرض الأسطر 2–10 من النتائج. عند البدء باستخدام مكتبة RxJava، يتساءل المرء عن كيفية ارتباط الأمور ببعضها البعض، لا سيما الروابط بين المراقب والمراقب. هنا نرى أن السطر 56، وهو الاشتراك في المراقب،
- أدى إلى إصدار جميع عناصر القابل للمراقبة؛
- وأن المراقب والمراقب يعملان في نفس الخيط؛
- وبسبب ذلك، نلاحظ التسلسل التالي: إصدار العنصر i، مراقبة العنصر i، إصدار العنصر (i+1)، مراقبة العنصر (i+1)، ...
تذكر أن المُصدر كان ينتظر قبل إصدار عناصره:
// attente
try {
Thread.sleep(500 - i * 100);
} catch (InterruptedException e) {
// erreur
subscriber.onError(e);
}
حيث يمثل الحرف i في السطر 3 رقم الإرسال (0 <= i < 3). إذا نظرنا إلى أوقات إرسال عناصر المتغير القابل للملاحظة:
- السطران 2 و 3: تم إصدار العنصر 0 بعد حوالي 500 مللي ثانية من بدء الاشتراك؛
- السطران 3 و 5: تم إصدار العنصر 1 بعد حوالي 400 مللي ثانية من العنصر 0؛
- السطران 5 و7: تم إصدار العنصر 2 بعد حوالي 300 مللي ثانية من العنصر 1؛
7.2. خيط التنفيذ، خيط المراقبة
7.2.1. مثال-06: المراقب والمراقب في مؤشر ترابط بخلاف [main]
![]() |
نقوم بإعادة هيكلة المثال السابق على النحو التالي [مثال 06]:
package dvp.rxjava.observables;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class Exemple06 {
public static void main(String[] args) {
// gatekeeper
CountDownLatch latch = new CountDownLatch(1);
// configuration of a real observable
Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
@Override
public void call(Subscriber<? super Double> subscriber) {
showInfos("Observable.call start");
for (int i = 0; i < 3; i++) {
// waiting
try {
Thread.sleep(500 - i * 100);
} catch (InterruptedException e) {
// error
subscriber.onError(e);
}
// action
double value = new Random().nextInt(100) * 1.2;
showInfos(String.format("Observable.call onNext(%s)", value));
subscriber.onNext(value);
}
// finish
showInfos(String.format("Observable.call onCompleted"));
subscriber.onCompleted();
}
});
// a subscriber
Subscriber<Double> subscriber = new Subscriber<Double>() {
@Override
public void onCompleted() {
showInfos("Subscriber.onCompleted");
// we lower the barrier
latch.countDown();
}
@Override
public void onError(Throwable e) {
showInfos(String.format("Subscriber.onError (%s)", e));
}
@Override
public void onNext(Double aDouble) {
showInfos(String.format("Subscriber.onNext (%s)", aDouble));
}
};
// suite observable configuration
obs1 = obs1.subscribeOn(Schedulers.computation());
// subscription
showInfos("avant souscription");
obs1.subscribe(subscriber);
// waiting at the gate
try {
showInfos("début attente barrière");
latch.await();
showInfos("fin attente barrière");
} catch (InterruptedException e1) {
System.out.println(e1);
}
showInfos("après souscription");
}
private static void showInfos(String message) {
System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
new SimpleDateFormat("ss:SSS").format(new Date()));
}
}
- السطر 16: نقوم بإنشاء حاجز أمان (إشارة) باستخدام كائن [CountDownLatch]. يُستخدم هذا الكائن لمزامنة الخيوط مع بعضها البعض. هنا، يتم تهيئته بالقيمة 1، والتي سنشير إليها بقيمة الحاجز (أو الإشارة). ينتظر الخيط الحاجز باستخدام العملية التالية:
latch.await();
يتم حظر الخيط إذا كانت قيمة المزلاج >0. يمكن للخيط زيادة أو تقليل القيمة الداخلية للمزلاج. السطر 48: يتم تقليل قيمة المزلاج بمقدار 1.
- السطر 63: يتم تكوين المراقب (observable) للتشغيل على مؤشر ترابط يوفره المجدول [Schedulers.computation()]. يمكن لهذا المجدول توفير عدد من مؤشرات الترابط يساوي عدد النوى الموجودة في جهاز التنفيذ. أظهر القسم الخاص بالتطبيق النموذجي استخدام مجدولات أخرى (انظر القسم 2.8)؛
مبدأ الكود هو كما يلي:
- تعمل الطريقة [main] في الخيط الرئيسي؛
- السطر 66: يبدأ في إرسال عناصر من المراقب. سيتم إرسال هذه العناصر على مؤشر ترابط بخلاف مؤشر الترابط الرئيسي؛
- السطر 70: يتم حظر الخيط الرئيسي لأن الحاجز له القيمة 1 (انظر السطر 16). ولا يمكنه الاستمرار إلا عندما تتغير هذه القيمة إلى 0. ويحدث هذا في السطر 48. والمراقب هو الذي يخفض الحاجز عندما يتلقى الإشعار بأن العنصر القابل للمراقبة قد انتهى من الإرسال؛
يؤدي التنفيذ إلى النتائج التالية:
- السطر 1: الاشتراك على وشك أن يتم؛
- السطر 2: يؤدي هذا إلى تشغيل طريقة [call] على الخيط [RxComputationThreadPool-1]. لدينا الآن تشغيل متوازي مع خيطين؛
- السطر 3: لسبب غير معروف، تخلى مؤشر الترابط [RxComputationThreadPool-1] عن التحكم. ثم يتولى مؤشر الترابط [main] التحكم ويتم حظره بواسطة الحاجز (السطر 70 من الكود). من هذه النقطة فصاعدًا، يمكن لمؤشر الترابط [RxComputationThreadPool-1] فقط العمل؛
- الأسطر 4-11: نلاحظ السلوك الذي رأيناه سابقًا بين القابل للملاحظة والمراقب، ولكن كل شيء يحدث الآن في الخيط [RxComputationThreadPool-1]؛
- السطور 12-13: قام المراقب بخفض الحاجز (السطر 48 من الكود) وانتهى مؤشر الترابط [RxComputationThreadPool-1]. يتولى مؤشر الترابط [main] التحكم ويعرض رسالتين؛
7.2.2. مثال-07: العنصر القابل للمراقبة والمراقب في خيطين مختلفين
![]() |
نقوم بتعديل المثال السابق على النحو التالي:
package dvp.rxjava.observables;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class Exemple07 {
public static void main(String[] args) {
// gatekeeper
CountDownLatch latch = new CountDownLatch(1);
// configuration of a real observable
Observable<Double> obs1 = Observable.create(new Observable.OnSubscribe<Double>() {
@Override
public void call(Subscriber<? super Double> subscriber) {
showInfos("Observable.call start");
for (int i = 0; i < 3; i++) {
// waiting
try {
Thread.sleep(500 - i * 100);
} catch (InterruptedException e) {
// error
subscriber.onError(e);
}
// action
double value = new Random().nextInt(100) * 1.2;
showInfos(String.format("Observable.call onNext(%s)", value));
subscriber.onNext(value);
}
// finish
showInfos(String.format("Observable.call onCompleted"));
subscriber.onCompleted();
}
});
// a subscriber
Subscriber<Double> subscriber = new Subscriber<Double>() {
@Override
public void onCompleted() {
showInfos("Subscriber.onCompleted");
// we lower the barrier
latch.countDown();
}
@Override
public void onError(Throwable e) {
showInfos(String.format("Subscriber.onError (%s)", e));
}
@Override
public void onNext(Double aDouble) {
showInfos(String.format("Subscriber.onNext (%s)", aDouble));
}
};
// suite observable configuration
obs1 = obs1.subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation());
// subscription
showInfos("avant souscription");
obs1.subscribe(subscriber);
// waiting in front of the barrier
try {
showInfos("début attente barrière");
latch.await();
showInfos("fin attente barrière");
} catch (InterruptedException e1) {
System.out.println(e1);
}
showInfos("après souscription");
}
private static void showInfos(String message) {
System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message, Thread.currentThread().getName(),
new SimpleDateFormat("ss:SSS").format(new Date()));
}
}
الرمز مطابق للرمز في المثال السابق باستثناء السطر 63:
obs1 = obs1.subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation());
الذي يقوم بتكوين العنصر القابل للمراقبة (subscribeOn) والمراقب (observeOn) للتشغيل على أحد الخيوط التي يوفرها المجدول [Schedulers.computation()].
النتائج التي تم الحصول عليها هي كما يلي:
يمكن ملاحظة النقاط التالية:
- يعمل المراقب في الخيط [RxComputationThreadPool-4] (الأسطر 3–4، 6، 8–9)؛
- يعمل المراقب في مؤشر الترابط [RxComputationThreadPool-3] (الأسطر 5 و7 و10-11)؛
- يعملان بشكل مستقل. وبالتالي، في الأسطر 8–9، يصدر المراقب إخطارين (onNext، onCompleted) قبل أن يسترد المراقب إخطار [onNext] (السطر 10)؛
تتولى مكتبة RxJava نقل البيانات (الإصدارات) من مؤشر الترابط الخاص بالمراقب إلى مؤشر الترابط الخاص بالمراقب. ولا داعي للمطور أن يقلق بشأن هذا الأمر.
لقد رأينا كيفية إنشاء العناصر القابلة للمراقبة (Observable.from، Observable.create). الآن دعونا نلقي نظرة على العناصر القابلة للمراقبة المحددة مسبقًا في مكتبة RxJava.
7.3. المراقبون المحددون مسبقًا
7.3.1. مثال-08: طريقة [Observable.range]
![]() | ![]() |
من الآن فصاعدًا، سنستخدم فئات مخصصة للعمليات المراقبة ومراقبيها. الفكرة هي أن نتمكن من تسجيل أسمائها وخيوط تنفيذها وأوقات تنفيذها حتى نتمكن من تتبعها بمرور الوقت.
ستكون فئة [Process] ببساطة كائنًا قابلًا للمراقبة يمكننا تسميته. وستقوم بتنفيذ واجهة [IProcess] التالية:
package dvp.rxjava.observables.utils;
import rx.Observable;
public interface IProcess<T> {
// name of observable
public String getName();
// observable
public Observable<T> getObservable();
}
يمكن تنفيذ هذه الواجهة من خلال فئة [Process<T>] التالية:
package dvp.rxjava.observables.utils;
import rx.Observable;
import rx.Scheduler;
public class Process<T> implements IProcess<T>{
// observable name
protected String name;
// observed process
protected Observable<T> observable;
// manufacturers
public Process(String name, Observable<T> observable) {
// local initializations
this.name = name;
this.observable = observable;
}
// getters and setters
public String getName() {
return name;
}
public Observable<T> getObservable() {
return observable;
}
}
- السطر 9: اسم العملية؛
- السطر 11: القيمة القابلة للملاحظة؛
- الأسطر 14–18: المنشئ؛
سيتم وصف المراقب بواسطة فئة [Observer] التالية:
package dvp.rxjava.observables.utils;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import rx.Subscriber;
public class Observateur<T> extends Subscriber<T> {
...
}
- السطر 11: تمتد فئة Observateur<T> من فئة Subscriber<T>، التي قدمناها بإيجاز في القسم 7.1.3. وسنستخدمها كحجة لطريقة [Observable.subscribe]:
// exécution observable (observation)
obs1.subscribe(observateur);
تحتوي طريقة [Observable.subscribe] المستخدمة في السطر 2 أعلاه على التعريف التالي:
![]() |
يتمثل دور [Subscriber] بشكل أساسي في إدارة العناصر التي يصدرها العنصر القابل للمراقبة الذي اشترك فيه باستخدام طرق واجهة [Observer]: onNext، onError، onCompleted. تحتوي فئة [Subscriber] على الطرق التالية:
![]() |
في كود فئة [المراقب]، سنستخدم طريقة [1] isUnsubscribed لتحديد ما إذا كان اشتراك المشترك قد تم إلغاؤه أم لا. فئة [المراقب<T>] الكاملة هي كما يلي:
package dvp.rxjava.observables.utils;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import rx.Subscriber;
public class Observateur<T> extends Subscriber<T> {
// a gatekeeper (semaphore)
private CountDownLatch latch;
// a display method
private Consumer<String> showInfos;
// observer's name
private String observerName;
// the name of the observed process
private String processName;
// manufacturers
public Observateur() {
}
public Observateur(String name, CountDownLatch latch, Consumer<String> showInfos, String observedName) {
this.observerName = name;
this.latch = latch;
this.showInfos = showInfos;
this.processName = observedName;
}
// --------------------------- implementation interface Observer<T>
@Override
public void onCompleted() {
// end of issues
if (!isUnsubscribed()) {
showInfos.accept(String.format("Subscriber [%s,%s].onCompleted", observerName, processName));
}
// end of main thread lock
latch.countDown();
}
@Override
public void onError(Throwable e) {
// emission error
if (!isUnsubscribed()) {
showInfos.accept(String.format("Subscriber [%s, %s].onError (%s)", observerName, processName, e));
}
}
@Override
public void onNext(T value) {
// an additional show
if (!isUnsubscribed()) {
try {
showInfos.accept(String.format("Subscriber [%s,%s] : onNext (%s)", observerName, processName,
new ObjectMapper().writeValueAsString(value)));
} catch (JsonProcessingException e) {
showInfos.accept(String.format("Subscriber [%s,%s].onNext (%s)", observerName, processName, e));
}
}
}
}
- بالإضافة إلى خصائص المشترك، سيحمل المراقب المعلومات التالية:
- السطر 14: حاجز أو إشارة ستُستخدم لحجب الخيط الرئيسي حتى يتلقى المراقب جميع العناصر التي أرسلها القابل للمراقبة. سيحدث هذا في السطر 36 من الكود عندما يتلقى المراقب إشعار نهاية الإرسال من القابل للمراقبة؛
- السطر 16: مثيل Consumer<String> الذي سيُستخدم لعرض رسالة على وحدة التحكم؛
- السطر 18: اسم المراقب، ويُستخدم للتمييز بين المراقبين عند وجود أكثر من مراقب؛
- السطر 20: اسم العملية المراقبة؛
- السطور 36 و46 و54: طرق [onCompleted وonError وonNext] لواجهة [Observer<T>] التي تنفذها الفئة المجردة [Subscriber<T>]. لا تنفذ هذه الفئة هذه الطرق. لذلك يجب القيام بذلك في فئاتها الفرعية. قبل القيام بأي شيء في هذه الطرق، نتحقق مما إذا كان المراقب قد تم إلغاء اشتراكه من العنصر القابل للمراقبة الذي يراقبه؛
- السطر 59: تكتب طريقة [onNext] للمراقب سلسلة JSON للعنصر المستلم. سيسمح لنا ذلك بعرض أنواع مختلفة من العناصر؛
بعد ذلك، دعونا نفحص طريقة جديدة للفئة Observable، وهي طريقة [range]:
![]() |
تُصدر طريقة Observable.range(n,m) أعدادًا صحيحة (m) تتراوح من n إلى n+m-1. سنستكشفها باستخدام كود [Example08] التالي:
package dvp.rxjava.observables.exemples;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import dvp.rxjava.observables.utils.Observateur;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple08 {
public static void main(String[] args) throws InterruptedException {
// number of observers
final int nbObservateurs = 2;
// semaphore
CountDownLatch latch = new CountDownLatch(nbObservateurs);
// observable configuration
Observable<Integer> obs1 = Observable.range(15, 3).subscribeOn(Schedulers.computation());
// observable performance (observation)
showInfos.accept("main : début observation");
for (int i = 0; i < nbObservateurs; i++) {
obs1.subscribe(new Observateur<>(String.format("observateur[%d]", i), latch, showInfos,"obs1"));
}
// waiting
showInfos.accept("main : attente fin observation");
latch.await();
// end
showInfos.accept("main : fin observation");
}
// displays
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
- السطر 16: سنستخدم مراقبين اثنين؛
- السطر 19: يتم تهيئة الحاجز (الإشارة) على القيمة 2 لأننا سنضع كل مراقب على مؤشر ترابط مختلف. وبالتالي، سيتعين على مؤشر الترابط الرئيسي انتظار انتهاء مؤشري ترابط المراقبين؛
- السطر 22: نقوم بتكوين المراقب بحيث يعمل على مؤشر ترابط من المجدول [Schedulers.computation()]. سيكون المراقب على نفس مؤشر الترابط الذي يعمل عليه المراقب؛
- الأسطر 25–27: نقوم بتسجيل مشاهدين اثنين على العنصر القابل للمراقبة. سيؤدي هذا إلى تشغيل العنصر القابل للمراقبة بالكامل لكل مشاهد: سيتم إصدار الأعداد الصحيحة 15 و16 و17؛
- السطر 30: ينتظر الخيط الرئيسي انتهاء المراقبين؛
النتائج التي تم الحصول عليها هي كما يلي:
- السطر 2: الخيط الرئيسي محجوب، في انتظار انتهاء المراقبين الاثنين؛
- السطران 3-4: نرى أن المراقب 0 موجود على الخيط [RxComputationThreadPool-1] والمراقب 1 على الخيط [RxComputationThreadPool-2]؛
- الأسطر 3-10: نرى أن كلا المراقبين يتلقيا العناصر نفسها تمامًا؛
سنستخدم فئة Observer المحددة هنا لتوضيح سلوك أنواع أخرى من العناصر القابلة للمراقبة.
7.3.2. مثال-09: طرق Observable.[interval، take، doNext]
![]() |
![]() |
يوضح هذا المثال استخدام المراقب Observable.interval(long interval, TimeUnit unit)، الذي يصدر أعدادًا صحيحة طويلة على فترات منتظمة. لاحظ النقطة [1]: بشكل افتراضي، يعمل المراقب [Observable.interval] على أحد خيوط المجدول [Schedulers.computation].
سيكون الكود كما يلي:
package dvp.rxjava.observables.exemples;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import dvp.rxjava.observables.utils.Observateur;
import rx.Observable;
public class Exemple09 {
public static void main(String[] args) throws InterruptedException {
// number of observers
final int nbObservateurs = 2;
// semaphore
CountDownLatch latch = new CountDownLatch(nbObservateurs);
// observable configuration
Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.doOnNext(l -> showInfos.accept(l.toString()));
// observable performance (observation)
showInfos.accept("main : début observation");
for (int i = 0; i < nbObservateurs; i++) {
obs1.subscribe(new Observateur<>(String.format("observateur [%d]", i), latch, showInfos,
"obs1"));
}
// waiting
showInfos.accept("main : attente fin observation");
latch.await();
// end
showInfos.accept("main : fin observation");
}
// displays
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
- السطر 22: يصدر المراقب أعدادًا صحيحة طويلة كل 500 مللي ثانية. تبدأ التسلسل بالرقم 0؛
- السطر 22: يصدر هذا المراقب عددًا لا نهائيًا من القيم. تُنشئ طريقة [Observable.take(n)] مراقبًا جديدًا يحتفظ فقط بأول n عنصر من العناصر الصادرة؛
![]() |
دعونا نراجع كود المراقب:
Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.doOnNext(l -> showInfos.accept(l.toString()));
السطر 2: يتم تنفيذ طريقة [Observable.doOnNext] في كل مرة يصدر فيها العنصر القابل للمراقبة عنصرًا جديدًا. وغالبًا ما تُستخدم هذه الطريقة لتسجيل المعلومات. هنا، نريد تسجيل تاريخ إصدار العناصر للتحقق من الحفاظ على الفاصل الزمني البالغ 500 مللي ثانية. لا تقوم طريقة [Observable.doOnNext] بتعديل العنصر القابل للمراقبة الذي يتم تطبيقها عليه. وتعريفها كما يلي:
![]() |
يؤدي التنفيذ إلى النتائج التالية:
- السطور 3 و7 و11: نرى أن الفاصل الزمني للإصدار يبلغ حوالي 500 مللي ثانية؛
- المراقبان موجودان بالفعل على خيطين مختلفين على الرغم من أن القابل للمراقبة لم يتم تكوينه للتشغيل مع جدولة محددة. هذا هو السلوك الافتراضي للقابل للمراقبة [Observable.interval] الذي نراه هنا؛
7.3.3. أمثلة-10/12: طرق Observable.[error, empty, never]
![]() | ![]() |
من الآن فصاعدًا، سنكون أكثر إيجازًا في توضيحنا لطرق فئة [Observable]. كان الكود السابق كما يلي:
package dvp.rxjava.observables;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import rx.Observable;
public class Exemple09 {
public static void main(String[] args) throws InterruptedException {
// number of observers
final int nbObservateurs = 2;
// semaphore
CountDownLatch latch = new CountDownLatch(nbObservateurs);
// observable configuration
Observable<Long> obs1 = Observable.interval(500L, TimeUnit.MILLISECONDS).take(3)
.doOnNext(l -> showInfos.accept(l.toString()));
// observable performance (observation)
showInfos.accept("main : début observation");
for (int i = 0; i < nbObservateurs; i++) {
obs1.subscribe(new Observateur<>(String.format("observateur [%d]", i), latch, showInfos,
"obs1"));
}
// waiting
showInfos.accept("main : attente fin observation");
latch.await();
// end
showInfos.accept("main : fin observation");
}
// displays
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
تم استخدام هذا الكود بالفعل في المثال السابق. لم يتغير سوى السطور 21–22. لذلك سنقوم بفصل معظم هذا الكود إلى فئة [ProcessUtils] التالية:
package dvp.rxjava.observables.utils;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import rx.Observable;
public class ProcessUtils {
@SafeVarargs
public static void subscribe(int nbObservateurs, IProcess<?>... processes) throws InterruptedException {
// semaphore
CountDownLatch latch = new CountDownLatch(nbObservateurs * processes.length);
// observable performance (observation)
showInfos.accept("main : début observation");
for (int i = 0; i < nbObservateurs; i++) {
for (IProcess<?> process : processes) {
Observable<?> obs = process.getObservable();
obs.subscribe(new Observateur<>(String.format("observateur[%d]", i), latch, showInfos, process.getName()));
}
}
// waiting
showInfos.accept("main : attente fin observation");
latch.await();
// end
showInfos.accept("main : fin observation");
}
// displays
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
- السطر 13: تأخذ الطريقة معلمتين:
- nbObservers: عدد المراقبين للعمليات التي تم تمريرها كمعلمة ثانية؛
- العمليات: العمليات (المسماة «الملاحظات») المقرر ملاحظتها. وبفضل صيغة [IProcess<?>]، يمكن للعمليات أن تصدر عناصر من أنواع مختلفة؛
- السطر 16: يجب أن يتحول لون الإشارة إلى الأخضر عندما يكمل جميع المراقبين جميع عمليات المراقبة الخاصة بهم. وبالتالي، فإن القيمة الأولية للإشارة هي عدد المراقبين مضروبًا في عدد عمليات المراقبة؛
- الأسطر 20-25: كل مراقب مشترك في جميع العمليات التي يحتاج إلى مراقبة؛
- السطر 23: استرداد المراقب من العملية (انظر القسم 7.3.1)؛
- السطر 23: المشاهد مشترك فيها. يتم تمرير أربع معلومات إلى المشاهد:
- اسمه؛
- الإشارة التي يجب عليه تخفيضها عندما يتلقى إشعار نهاية الإرسال من العنصر القابل للمراقبة الذي يراقبه؛
- الطريقة التي يجب استخدامها عندما يرغب في تسجيل المعلومات في وحدة التحكم؛
- اسم العملية التي سيراقبها؛
بعد تعريف هذه الفئات، سيكون المثال 10 كما يلي:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple10 {
public static void main(String[] args) throws InterruptedException {
// observable configuration
Observable<?> obs = Observable.error(new RuntimeException("Erreur !!!")).subscribeOn(Schedulers.computation());
// observable execution (observation)
ProcessUtils.subscribe(2,new Process<>("process1", obs));
}
}
السطر 11، يتم تعريف الطريقة الثابتة [Observable.error] على النحو التالي:
![]() |
وبالتالي، فإن السطر 8 يُهيئ متغيرًا قابلًا للمراقبة يقوم ببساطة بإرسال استثناء إلى طريقة [onError] الخاصة بالمشتركين فيه. ويؤدي التنفيذ إلى النتائج التالية:
main : début observation ------Thread[main] ---- Time[22:618]
main : attente fin observation ------Thread[main] ---- Time[22:636]
Subscriber[observateur[1], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-2] ---- Time[22:638]
Subscriber[observateur[0], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-1] ---- Time[22:638]
السطران 3 و 4: تلقت طريقة [onError] لكل من المشتركين الاستثناء الذي أطلقه المراقب.
يتميز هذا التنفيذ بخاصية فريدة: لم يتم استدعاء طرق [onCompleted] لكل من المراقبين. ونتيجة لذلك، لم يتم خفض الحاجز، وظل الخيط الرئيسي محجوبًا في الطريقة الثابتة [ProcessUtils.subscribe] في السطر 3 التالي:
// attente
showInfos.accept("main : attente fin observation");
latch.await();
// fin
showInfos.accept("main : fin observation");
نلاحظ هنا أنه في حالة حدوث خطأ في العنصر القابل للمراقبة، لا يتم استدعاء طريقة [onCompleted] الخاصة بالمشتركين. لذلك نقوم بتعديل طريقة [Observer.onError] على النحو التالي:
@Override
public void onError(Throwable e) {
// erreur d'émission
if (!isUnsubscribed()) {
showInfos.accept(String.format("Subscriber[%s, %s].onError (%s)", observerName, processName, e));
}
// fin blocage thread principal
latch.countDown();
}
نضيف السطرين 7 و8 لإلغاء القفل في حالة حدوث خطأ ملحوظ. مع هذا الكود الجديد، ينتج عن التنفيذ النتائج التالية:
main : début observation ------Thread[main] ---- Time[40:750]
main : attente fin observation ------Thread[main] ---- Time[40:764]
Subscriber[observateur[0], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-1] ---- Time[40:766]
Subscriber[observateur[1], process1].onError (java.lang.RuntimeException: Erreur !!!) ------Thread[RxComputationThreadPool-2] ---- Time[40:766]
main : fin observation ------Thread[main] ---- Time[40:767]
نحصل على السطر 5، الذي لم يكن موجودًا من قبل.
سيكون المثال 11 كما يلي:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple11 {
public static void main(String[] args) throws InterruptedException {
// observable configuration
Observable<?> obs1 = Observable.empty();
// observable execution (observation)
ProcessUtils.subscribe(2,new Process<>("process1",obs1));
}
}
السطر 10: تُنشئ الطريقة الثابتة [Observable.empty] كائنًا قابلًا للمراقبة لا يصدر أي عناصر. فهو يصدر فقط إشعار نهاية الإصدار؛
![]() |
يؤدي تنفيذ الكود في المثال أعلاه إلى النتائج التالية:
- السطران 2 و 3: نرى أن كلا المراقبين يتلقيا إشعار نهاية البث دون أن يكونا قد تلقيا أي عناصر مسبقًا.
قد يتساءل المرء عن الغرض الفعلي من استخدام هذه الطريقة. يمكن استخدامها بطريقة مشابهة لمجموعة، تكون فارغة في البداية، ثم تضاف إليها العناصر:
في السطر 3، ندمج المراقب الأولي obs (السطر 1) مع المراقبين الآخرين.
يوضح المثال 12 الطريقة الثابتة [Observable.never]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple12 {
public static void main(String[] args) throws InterruptedException {
// observable configuration
Observable<?> obs1 = Observable.never();
// observable execution (observation)
ProcessUtils.subscribe(2,new Process<>("process1",obs1));
}
}
تقوم الطريقة الثابتة [Observable.never] بإنشاء كائن قابل للمراقبة لا يصدر أبدًا:
![]() |
يؤدي تشغيل المثال إلى النتائج التالية:
السطر 2: ينتظر الخيط الرئيسي إلى أجل غير مسمى. ويرجع ذلك إلى عدم قيام أي عنصر قابل للمراقبة بإصدار إشعار [onCompleted]، مما يسمح للإشارة (الحاجز) بالتحول إلى اللون الأخضر (خفض الحاجز).
7.4. التعدد الخيطي
7.4.1. المثال 13: مؤشر ترابط الإجراء، مؤشر ترابط المراقب
في القسم 7.1.3، أنشأنا عنصرًا قابلًا للمراقبة باستخدام الطريقة الثابتة [Observable.create]:
![]() |
- تُرجع الطريقة [create] نوع Observable<T>؛
- معلمة طريقة [create] هي دالة من النوع [Observable.OnSubscribe<T>] معرَّفة على النحو التالي:
![]() |
النوع [Observable.OnSubscribe<T>] هو واجهة وظيفية تمتد بدورها إلى الواجهة الوظيفية [Action1<Subscriber<? super T>>]. تتوقع طريقة [call] لهذه الواجهة نوع [Subscriber] (مشترك، مراقب). في بقية هذا المستند، سنشير أحيانًا إلى النوع [Observable.OnSubscribe<T>] باعتباره إجراءً. سننشئ إجراءات مخصصة سيكون لها اسم. وستكون هذه مثيلات للواجهة [IProcessAction] التالية:
![]() |
package dvp.rxjava.observables.utils;
import rx.Observable;
public interface IProcessAction<T> extends Observable.OnSubscribe<T> {
// action has a name
public String getName();
}
- السطر 5: تحتوي الواجهة [IProcessAction<T>] على جميع خصائص الواجهة [Observable.OnSubscribe<T>]؛
- السطر 8: كما تحتوي على طريقة [getName] التي تُرجع اسم المثيل الذي يُنفذ الواجهة؛
سنستخدم الإجراء التالي المسمى [ProcessAction01]:
package dvp.rxjava.observables.utils;
import java.util.Random;
import rx.Subscriber;
import rx.functions.Func1;
public class ProcessAction01<T> implements IProcessAction<T> {
// data
private String name;
private int nbValues;
private Func1<Integer, T> func1;
// manufacturers
public ProcessAction01(String name, int nbValues, Func1<Integer, T> func1) {
this.name = name;
this.nbValues = nbValues;
this.func1 = func1;
}
@Override
public void call(Subscriber<? super T> subscriber) {
ProcessUtils.showInfos.accept(String.format("Observable (%s) call start", getName()));
for (int i = 0; i < nbValues; i++) {
// waiting
try {
Thread.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
// error
ProcessUtils.showInfos.accept(String.format("Observable (%s) onError", getName()));
subscriber.onError(e);
}
// element emission
T value = func1.call(i);
ProcessUtils.showInfos.accept(String.format("Observable (%s,%s) onNext (%s)", getName(), i, value));
subscriber.onNext(value);
}
// finish
ProcessUtils.showInfos.accept(String.format("Observable (%s) onCompleted", getName()));
subscriber.onCompleted();
}
@Override
public String getName() {
return name;
}
}
- السطر 8: الفئة [ProcessAction01<T>] تنفذ الواجهة [IProcessAction<T>] وبالتالي الواجهة [Observable.OnSubscribe<T>]؛
- السطر 11: اسم الإجراء؛
- السطر 12: عدد القيم المراد إصدارها؛
- السطر 13: مثيل من النوع [Func1<Integer, T>] يأخذ عددًا صحيحًا وينتج نوعًا T ليتم إرساله بواسطة المراقب (السطران 35 و37)؛
- الأسطر 16–20: نمرر اسم الإجراء، وعدد القيم المراد إصدارها، ووظيفة الإصدار إلى المنشئ؛
- الأسطر 23–42: كود العملية؛
- السطر 23: تأخذ طريقة [call] كمعلمة المشترك في المراقب المرتبط بالعملية؛
- السطر 28: تصدر العملية عناصرها بعد انتظار لمدة عشوائية؛
- السطر 32: إصدار خطأ؛
- السطر 37: إصدار عادي؛
- السطر 41: إصدار إشعار نهاية الإصدار؛
- الأسطر 25-38: تصدر العملية أرقامًا حقيقية nbValues بعد فترة انتظار عشوائية (السطر 30)؛
- السطر 35: يتم توفير القيمة المراد إصدارها بواسطة الدالة [func1] التي يتم تمريرها كمعلمة إلى المنشئ (السطر 16)؛
نقوم بإعادة هيكلة فئة [Process] (انظر القسم 7.3.1) بحيث يمكن أيضًا إنشاءها باستخدام إجراء مسمى. نضيف المنشئ التالي:
public Process(IProcessAction<T> na, Scheduler schedulerObserved, Scheduler schedulerObserver) {
// nom process=nom action
name = na.getName();
// action --> observable
observable = Observable.create(na);
// thread d'exécution du processus observé
if (schedulerObserved != null) {
observable = observable.subscribeOn(schedulerObserved);
}
// thread d'observation de l'observateur
if (schedulerObserver != null) {
observable = observable.observeOn(schedulerObserver);
}
}
- السطر 1: يأخذ المنشئ 3 معلمات:
- الإجراء المسمى الذي سيتم استخدامه لإنشاء المراقب (السطر 5)؛
- مجدول العملية المراقبة (قد يكون فارغًا)؛
- مجدول المراقب (قد يكون فارغًا)؛
- السطر 5: يتم إنشاء القابل للمراقبة من الإجراء الذي تم تمريره كمعلمة؛
يرصد الكود التالي [مثال 13] عناصر قابلة للمراقبة مختلفة:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple13 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 1, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<String> process2 = new Process<>(
new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), Schedulers.computation(), null);
// process 3
Process<Integer> process3 = new Process<>(new ProcessAction01<Integer>("process3", 3, i -> i * 2), null,
Schedulers.computation());
// process 4
Process<Boolean> process4 = new Process<>(new ProcessAction01<Boolean>("process4", 4, i -> i % 2 == 0), null, null);
// subscriptions
ProcessUtils.subscribe(1, process1);
ProcessUtils.subscribe(1, process2);
ProcessUtils.subscribe(1, process3);
ProcessUtils.subscribe(1, process4);
}
}
- السطور 13–15: تنتج عملية 1 رقمًا حقيقيًا واحدًا على مؤشر ترابط حسابي سيتم ملاحظته على مؤشر ترابط حسابي آخر؛
- السطور 17–18: تنتج العملية 2 سلسلتين على خيط حسابي، ولا توجد أي إشارة إلى خيط المراقب. تظهر النتائج أن المراقبة تحدث افتراضيًا على نفس الخيط الذي يتم فيه تنفيذ العملية؛
- السطور 20-21: تنتج العملية 3 ثلاثة أعداد صحيحة على مؤشر ترابط غير محدد، والتي سيتم ملاحظتها على مؤشر ترابط حسابي. تظهر النتائج أن العملية تعمل بشكل افتراضي على مؤشر الترابط الرئيسي؛
- السطر 23: تنتج العملية process4 4 قيم منطقية على مؤشر ترابط غير محدد، والتي سيتم ملاحظتها على مؤشر ترابط غير محدد. تظهر النتائج أن تنفيذ العملية وملاحظتها يحدثان افتراضيًا على مؤشر الترابط الرئيسي؛
نتيجة تنفيذ هذا الرمز هي كما يلي:
- تُنتج العملية «process1» عددًا حقيقيًا واحدًا (السطر 4) على خيط الحساب [RxComputationThreadPool-4]، والذي يتم رصده على خيط الحساب [RxComputationThreadPool-3] (السطر 6)؛
- تنتج العملية process2 سلسلتين (السطران 12 و 14) على مؤشر ترابط الحساب [RxComputationThreadPool-5]، والتي يتم ملاحظتها على نفس مؤشر الترابط (السطران 13 و 15)؛
- تنتج العملية process3 3 أعداد صحيحة (السطور 21 و23 و25) على الخيط الرئيسي، والتي يتم ملاحظتها على خيط الحساب [RxComputationThreadPool-6] (السطور 22 و24 و28)؛
- تنتج العملية process4 4 قيم منطقية (الأسطر 34 و 36 و 38 و 40) على الخيط الرئيسي، والتي يتم ملاحظتها على نفس الخيط الرئيسي (الأسطر 33 و 35 و 37 و 39)؛
ندعو القارئ إلى متابعة ما يلي:
- دورة حياة العملية الملاحظة وخيطها؛
- دورة حياة المراقب الخاص بها وخيطها؛
يكمن جزء كبير من جاذبية مكتبات Rx في هذا التعدد في الخيوط، والذي لا يتعين على المطور إدارته بنفسه.
7.5. تركيبات من عدة عناصر قابلة للمراقبة
7.5.1. مثال-14: دمج عنصرين قابلين للمراقبة باستخدام [Observable.merge]
نقدم الآن طرقًا ثابتة لفئة [Observable] تسمح بدمج عدة عناصر قابلة للمراقبة في عنصر واحد قابل للمراقبة.
المثال الأول من هذا النوع هو كما يلي:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.ProcessAction01;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple14 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process2
Process<String> process2 = new Process<>(
new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), Schedulers.computation(), null);
// merge
Process<?> process12 = new Process<>("process12",
Observable.merge(process1.getObservable(), process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process12);
}
}
- السطور 15–17: ستصدر عملية باسم [process1] 3 أرقام حقيقية على خيط حسابي. كما سيتم ملاحظتها على خيط حسابي؛
- السطور 19–20: ستصدر عملية باسم [process2] سلسلتين على خيط حسابي. لم يتم تحديد خيط المراقبة. وقد رأينا سابقًا أن خيط المراقبة في هذه الحالة هو الخيط الحسابي؛
- السطر 23: يتم دمج العمليتين، أي يتم إنشاء عنصر قابل للمراقبة تأتي عناصره في وقت واحد من كلتا العمليتين. تُستخدم الطريقة الثابتة [Observable.merge] لهذا الغرض:
![]() |
على عكس ما قد يوحي به الرسم البياني أعلاه، أثناء الدمج، يمكن أن تتداخل عناصر من الدفق 1 بين عناصر الدفق 2. وهذا ما تظهره نتائج التنفيذ:
- السطر 3: يتم تشغيل العملية [process1] على مؤشر ترابط الحساب [RxComputationThreadPool-4]؛
- السطر 4: العملية [process2] تعمل على مؤشر ترابط الحساب [RxComputationThreadPool-5]؛
- السطر 9: تتم مراقبة العملية [process12] على مؤشر ترابط الحساب [RxComputationThreadPool-3]. لا أعرف القاعدة التي أدت إلى هذا الاختيار؛
- الأسطر 9–11: نرى أن المراقب يراقب عناصر من كل من العمليتين [process1] (السطر 5) و[process2] (السطران 6 و7) على الرغم من أن أيا منهما لم تنته بعد (هناك اختلاط)؛
- تنتهي العملية [process12] (السطر 17) عندما تنتهي كلتا العمليتين، process1 و process2؛
7.5.2. المثال 15: ربط عنصرين قابلين للمراقبة باستخدام [Observable.concat]
سنقوم الآن بفحص الكود التالي:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.ProcessAction01;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple15 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process2
Process<String> process2 = new Process<>(
new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), null, Schedulers.computation());
// concat
Process<?> process12 = new Process<>("process12",
Observable.concat(process1.getObservable(), process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process12);
}
}
- السطور 15–17: ستصدر عملية باسم [process1] 3 أرقام حقيقية على مؤشر ترابط حسابي. وسيتم ملاحظتها أيضًا على مؤشر ترابط حسابي؛
- السطور 19–20: ستصدر عملية تسمى [process2] سلسلتين على مؤشر ترابط غير محدد، وهو هنا مؤشر الترابط الرئيسي الافتراضي. وسيتم ملاحظتها على مؤشر ترابط حسابي؛
- السطر 23: يتم ربط العمليتين، أي يتم إنشاء عنصر قابل للمراقبة تأتي عناصره من كلتا العمليتين. لا يتم خلط القيم الصادرة. ستصدر العملية [process12] أولاً جميع القيم من العملية [process1] ثم تلك من العملية [process2]. تُستخدم الطريقة الثابتة [Observable.concat] لهذا الغرض:
![]() |
نتائج التنفيذ هي كما يلي:
- الأسطر 3-10: يتم تشغيل العملية [process1] وتقوم العملية [process12] بإصدار القيم التي أصدرتها [process1]؛
- السطر 9: انتهت العملية [process1]؛
- الأسطر 11-17: يتم تشغيل العملية [process2] وتقوم العملية [process12] بإصدار القيم التي أصدرتها [process2]؛
هناك أمر غريب بخصوص العملية 2: لم نحدد خيط تنفيذ. لذلك قد يتوقع المرء أن يتم استخدام الخيط الرئيسي بشكل افتراضي. لكن هذا ليس هو الحال. كان خيط التنفيذ هو خيط الحساب [RxComputationThreadPool-3] (السطر 11). لذلك، عندما لا يتم تحديد خيط تنفيذ أو مراقبة، لا يمكننا وضع أي افتراضات حول الخيط الذي سيتم اختياره.
7.5.3. المثال 16: دمج عنصرين قابلين للمراقبة باستخدام [Observable.zip]
سنقوم الآن بفحص الكود التالي:
package dvp.rxjava.observables.exemples;
import java.util.Arrays;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.functions.FuncN;
import rx.schedulers.Schedulers;
public class Exemple16 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<String> process2 = new Process<>(
new ProcessAction01<String>("process2", 2, i -> String.format("valeur-%s", i)), null, null);
// 2-process combination function
FuncN<String> funcn = new FuncN<String>() {
@Override
public String call(Object... args) {
if (args.length == 2) {
return String.format("double=%s, string=%s", args[0], args[1]);
} else {
throw new RuntimeException("la fonction attend 2 paramètres exactement");
}
}
};
// zip of the 2 processes
Process<String> process12 = new Process<>("process12",
Observable.zip(Arrays.asList(process1.getObservable(), process2.getObservable()), funcn));
// subscriptions
ProcessUtils.subscribe(1, process12);
}
}
- السطور 16–18: ستصدر عملية باسم [process1] 3 أرقام حقيقية على مؤشر ترابط حسابي. كما سيتم ملاحظتها على مؤشر ترابط حسابي؛
- السطور 20–21: ستصدر عملية باسم [process2] سلسلتين على مؤشر ترابط غير محدد. كما أن مؤشر ترابط المراقبة غير محدد أيضًا؛
- الأسطر 23–32: إنشاء مثيل لنوع [FuncN<String>] باستخدام فئة مجهولة. FuncN هي واجهة وظيفية:
![]() |
تتوقع طريقة [FuncN.call] مصفوفة من الكائنات وتُرجع نوعًا R. ستُستخدم الدالة [funcn] لدمج العمليتين process1 و process2 بهذا الترتيب. في طريقة [FuncN.call]:
- سيكون args[0] من النوع Double؛
- سيكون args[1] من نوع String؛
هنا، ستكون نتيجة [funcn.call] هي السلسلة من السطر 27. لا يتطلب إنشاء هذه النتيجة معرفة أنواع الحجج لطريقة الاستدعاء.
يتم دمج العمليتين على النحو التالي:
// zip des 2 processus
Process<String> process12 = new Process<>("process12",
Observable.zip(Arrays.asList(process1.getObservable(), process2.getObservable()), funcn));
تعمل طريقة [Observable.zip] على النحو التالي:
![]() |
نلاحظ أن:
- الحجة الأولى لـ zip هي Iterable<Observable>. في مثالنا، لدينا معلمة فعلية من النوع List<Observable> تتكون من المراقبين لدينا؛
- الحجة الثانية لـ zip هي من النوع FuncN. في مثالنا، المعلمة الفعلية هي [funcn]؛
يؤدي التنفيذ إلى النتائج التالية:
- السطران 7 و 11: process12 يصدر عنصرين؛
- السطر 8: العنصر الإضافي الذي أصدرته العملية 1، والذي ليس له نظير في العملية 2، لا تصدره العملية الناتجة process12؛
نلاحظ أن العملية 2، التي لم يُخصص لها خيط تنفيذ ولا خيط مراقبة، استخدمت الخيط الرئيسي لكليهما.
7.5.4. مثال-17: دمج عنصرين قابلين للمراقبة باستخدام [Observable.combineLatest]
سنقوم الآن بفحص الكود التالي:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple17 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Double> process2 = new Process<>(
new ProcessAction01<Double>("process2", 2, i -> new Random().nextInt(200) * 1.4), null,
Schedulers.computation());
// combining the 2 processes
Process<Double> process12 = new Process<>("process12",
Observable.combineLatest(process1.getObservable(), process2.getObservable(), (d1, d2) -> d1 + d2));
// subscriptions
ProcessUtils.subscribe(1, process12);
}
}
- الأسطر 14–16: ستصدر عملية باسم [process1] 3 أرقام حقيقية على خيط حسابي. كما سيتم ملاحظتها على خيط حسابي؛
- الأسطر 18–20: ستصدر عملية باسم [process2] عددين حقيقيين على مؤشر ترابط غير مرتبط. وسيتم ملاحظتهما على مؤشر ترابط حسابي؛
- السطر 23: يتم دمج الملاحظتين باستخدام الطريقة الثابتة التالية [Observable.combineLatest]:
![]() |
تعمل المراقبة [combineLatest] على النحو التالي: عندما تصدر إحدى المراقبتين عنصرًا E1، يتم دمج هذا العنصر بواسطة [combineFunction] مع آخر عنصر أصدرته المراقبة الأخرى.
يؤدي تنفيذ هذا الكود إلى النتيجة التالية:
- السطر 5: يتم دمج الناتج من process2 (56) مع العنصر الأخير الذي أخرجه process1 (54، السطر 4) وينتج النتيجة الموضحة في السطر 7؛
- السطر 6: يتم دمج الناتج من process1 (51.6) مع العنصر الأخير الذي أخرجته process2 (56، السطر 5) وينتج النتيجة الموضحة في السطر 8؛
- السطر 9: يتم دمج الناتج من العملية 2 (261.8) مع العنصر الأخير الذي أخرجته العملية 1 (51.6، السطر 6) وينتج النتيجة الموضحة في السطر 12؛
- السطر 13: يتم دمج الانبعاث من العملية 1 (80.39) مع العنصر الأخير المنبعث من العملية 2 (261.8، السطر 9) وينتج النتيجة الموضحة في السطر 15؛
هذا هو أحد أشكال المقياس [zip] حيث، هذه المرة، العناصر المدمجة ليست بالضرورة العناصر الموجودة في نفس الموضع في التدفقات. لاحظ هنا أن العملية 2، التي لم يتم تخصيص أي مؤشر ترابط للتنفيذ لها، تم تنفيذها على مؤشر الترابط الرئيسي (السطر 2).
7.5.5. مثال-18: دمج عنصرين قابلين للملاحظة باستخدام [Observable.amb]
سنقوم الآن بفحص الكود التالي:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple18 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Double> process2 = new Process<>(
new ProcessAction01<Double>("process2", 2, i -> new Random().nextInt(200) * 1.4), null, null);
// combining the 2 processes
Process<Double> process12 = new Process<>("process12",
Observable.amb(process1.getObservable(), process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process12);
}
}
- الأسطر 14–16: ستصدر عملية باسم [process1] 3 أرقام حقيقية على مؤشر ترابط حسابي. كما سيتم ملاحظتها على مؤشر ترابط حسابي؛
- الأسطر 18–20: ستصدر عملية باسم [process2] عددين حقيقيين على مؤشر ترابط غير مرتبط. وسيتم ملاحظتهما على مؤشر ترابط غير مرتبط؛
- السطر 22: يتم دمج الملاحظتين باستخدام الطريقة الثابتة التالية [Observable.amb]:
![]() |
كما هو موضح في الرسم البياني أعلاه، فإن المراقب [Observable.amb(Observable o1, Observable o2)] يصدر عناصر المراقب الذي يصدر أولاً. وهذا ما تؤكده نتائج المثال المقدم:
- السطر 4: عملية 2 هي أول عملية تصدر؛
- السطران 8 و 12: process12 يصدر جميع العناصر التي أصدرها process2 (السطران 4 و 11)؛
7.6. سلسلة المعالجة لعنصر قابل للمراقبة
7.6.1. مثال-19: تحويل عنصر قابل للملاحظة باستخدام [Observable.map]
في الأمثلة السابقة، درسنا تركيبات مختلفة من عنصرين قابلين للملاحظة لتكوين عنصر ثالث قابل للملاحظة. نقدم الآن طرقًا ثابتة لفئة [Observable] تسمح بعمليات التحويل والتصفية والتجميع على عنصر قابل للملاحظة. سنجد هنا طرقًا مشابهة لتلك الموجودة في فئة [Stream] التي درسناها في القسم 5.
سيكون مثالنا الأول كما يلي:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple19 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Double> process1 = new Process<>(
new ProcessAction01<Double>("process1", 3, i -> new Random().nextInt(100) * 1.2), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<String> process2 = new Process<>("process2",
process1.getObservable().map(d -> String.format("valeur-%s", d)));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- السطور 14–16: ستصدر عملية تُسمى process1 3 أرقام حقيقية على خيط حسابي. كما سيتم رصدها على خيط حسابي؛
- السطور 17–18: سيتم تحويل الأرقام التي أصدرتها العملية 1 إلى سلاسل في العملية 2؛
- السطر 20: نراقب العملية 2؛
طريقة [Observable.map] في السطر 18 مشابهة لطريقة [Stream.map] التي تمت مناقشتها في القسم 5.5:
![]() |
نتائج المثال هي كما يلي:
- السطور 4 و 5 و 8: الانبعاثات من process1. هذه أرقام حقيقية؛
- السطور 6 و7 و10: الانبعاثات الملاحظة من process2. هذه سلاسل نصية؛
7.6.2. المثال-20: تصفية متغير قابل للملاحظة باستخدام [Observable.filter]
سيكون المثال كما يلي:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple20 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>("process2", process1.getObservable().filter(i -> i % 2 == 0));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- السطران 11-12: ستصدر عملية تسمى process1 أعدادًا صحيحة من 0 إلى 2 على مؤشر ترابط عامل. وسيتم أيضًا مراقبة ذلك على مؤشر ترابط عامل؛
- السطر 14: سيتم تصفية الأرقام التي تصدرها العملية 1 بحيث يتم الاحتفاظ بالأرقام الزوجية فقط في العملية 2؛
- السطر 20: نراقب العملية 2؛
طريقة [Observable.filter] في السطر 18 مشابهة لطريقة [Stream.filter] التي تمت مناقشتها في القسم 5.4:
![]() |
نتائج المثال هي كما يلي:
- السطور 4 و 5 و 7: انبعاثات من process1؛
- السطران 6 و9: الانبعاثات الملاحظة من عملية 2. هذه هي العناصر ذات الأرقام الزوجية من عملية 1؛
7.6.3. المثال 21: تحويل متغير قابل للملاحظة باستخدام [Observable.flatMap]
سيكون المثال كما يلي:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple21 {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMap(i -> {
int value = i * 10;
return Observable.just(value, value + 1, value + 2);
}));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- السطران 12-13: ستصدر عملية تسمى process1 أعدادًا صحيحة من 0 إلى 2 على خيط حسابي. كما سيتم ملاحظتها على خيط حسابي؛
- الأسطر 15–18: يتم تحويل كل رقم n يصدره process1 إلى عنصر قابل للمراقبة يصدر الأرقام الثلاثة (10*n، 10*n+1، 10*n+2). إذا كنا قد استخدمنا طريقة [map] في السطر 15، لكانت العملية 2 قد أصدرت نوع Observable<Integer> بدلاً من نوع Integer. تسمح لنا طريقة [flatMap] المستخدمة بتسوية هذه السلسلة من العناصر من نوع Observable<Integer> إلى سلسلة من العناصر من نوع Integer تتكون من كل عنصر من كل Observable<Integer>؛
- السطر 20: نراقب process2؛
طريقة [Observable.flatMap] في السطر 15 مشابهة لطريقة [Stream.flatMap] التي تمت مناقشتها في القسم 5.6.12:
![]() |
نتائج المثال هي كما يلي:
- الأسطر 5-7: الإصدارات الثلاثة من process2 التي تلي الإصدار في السطر 4 من process1؛
- الأسطر 9-11: الانبعاثات الثلاثة من process2 التي تلي الانبعاث في السطر 8 من process1؛
- الأسطر 14-16: الانبعاثات الثلاثة من العملية 2 التي تلي الانبعاث في السطر 12 من العملية 1؛
يوضح الكود التالي كيفية إنشاء نوع Observable<Integer[]> من العملية 1 [مثال 21ب]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21b {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer[]> process2 = new Process<>("process2", process1.getObservable().map(i -> {
int value = i * 10;
return new Integer[] { value, value + 1, value + 2 };
}));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- السطر 14: يتم استخدام طريقة [Observable.map]؛
- السطر 16: والتي تُرجع نوع Integer[]؛
النتائج هي كما يلي:
- السطور 6 و7 و10: نرى نتائج الخريطة؛
يمكن ربط كل هذه التحويلات القابلة للملاحظة ببعضها البعض لأن كل تحويل ينتج متغيرًا جديدًا قابلًا للملاحظة. ويتضح ذلك في المثال التالي [مثال 21ج]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.schedulers.Schedulers;
public class Exemple21c {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMap(i -> {
int value = i * 10;
return Observable.just(value, value + 1, value + 2);
}).filter(i -> i % 2 == 0));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- الأسطر 15–18: يتبع flatMap مرشح؛
نتائج التنفيذ هي كما يلي:
- السطور 8-13: أصدرت process2 العناصر الزوجية فقط من flatMap؛
هناك طريقة مشابهة لـ [flatMap] وهي طريقة [flatMapIterable]، كما يوضح المثال التالي [مثال 21د]:
package dvp.rxjava.observables.exemples;
import java.util.Arrays;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21d {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>("process2", process1.getObservable().flatMapIterable(i -> {
int value = i * 10;
return Arrays.asList(value, value + 1, value + 2);
}).filter(i -> i % 2 == 0));
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
السطر 16: بدلاً من استخدام طريقة [flatMap]، نستخدم طريقة [flatMapIterable]. في هذه الحالة، يجب أن تنتج دالة التحويل نوع Iterable<T> (السطر 18) بدلاً من نوع Observable<T>.
نحصل على نفس النتائج كما في السابق.
لنعد إلى تعريف طريقة [flatMap]:
![]() |
كما هو موضح أعلاه، تم إدراج عنصر أزرق [3] بين العنصرين الأخضرين [1-2]. وهذا يعني أنه عند تسطيح Observable<T>s، تحافظ طريقة [flatMap] على ترتيب إصدار هذه المراقبات الداخلية المختلفة. ويتضح ذلك من خلال المثال التالي [Example21e]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21e {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
Schedulers.computation(), Schedulers.computation());
// process 3
Process<Integer> process3 = new Process<>("process3",
process1.getObservable().flatMap(i -> process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process3);
}
}
- السطران 11-12: العملية process1 تصدر الأعداد الصحيحة [0,1]؛
- السطران 14-15: العملية 2 تصدر الأعداد الصحيحة [10، 11، 12]؛
- السطران 17-18: يرتبط كل عنصر تصدره العملية 1 بالمقياس القابل للملاحظة الخاص بالعملية 2. وهذا يعني أن:
- سيتم ربط العنصر [0] من العملية 1 بمتغير قابل للملاحظة يصدر [10,11,12]؛
- وينطبق الأمر نفسه على العنصر 1؛
في النهاية، سيتم إصدار الأرقام الستة [10، 11، 12، 10، 11، 12]. نريد أن نرى بأي ترتيب.
نتائج التنفيذ هي كما يلي:
يمكننا أن نرى أن ترتيب الإرسال لـ process3 كان: [10, 10, 11, 12, 11, 12] (الأسطر 11، 12، 14، 17، 19، 22). لذلك، كانت العناصر التي أرسلها process2 مختلطة بالفعل. يمكننا تجنب ذلك باستخدام طريقة [concatMap] بدلاً من طريقة [flatMap]. ويتضح ذلك من خلال الكود التالي [مثال 21ef]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21ef {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
Schedulers.computation(), Schedulers.computation());
// process 3
Process<Integer> process3 = new Process<>("process3",
process1.getObservable().concatMap(i -> process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process3);
}
}
في السطر 18، استبدلنا [flatMap] بـ [concatMap]. نتائج التنفيذ هي كما يلي:
نلاحظ أن ترتيب الإرسال في العملية 3 كان: [10، 11، 12، 10، 11، 12] (الأسطر 12–14، 17، 19، 22). أما العناصر التي أرسلتها العملية 2 فلم يتم خلط ترتيبها.
هناك نوع آخر من طريقة [map] وهو طريقة [switchMap]:
![]() |
في المثال أعلاه، من المراقب [1]، يتم إنشاء ثلاثة مراقبين آخرين [2] يحتوي كل منهم على عنصرين، ثم يتم تسويتها كما في [flatMap] [3]. لاحظ أن النتيجة تحتوي على 5 عناصر، وليس 6. ويرجع ذلك إلى أنه قبل أن يصدر المراقب الثاني عنصره الثاني [6]، يصدر المراقب الثالث عنصره الأول [5]، مما يؤدي إلى تجاهل المراقب الثاني. وبالتالي، لا يوجد العنصر [6] في المراقب الناتج [3].
لتوضيح [switchMap]، سنستخدم المثال التالي [Example21eg]:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple21eg {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 2, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Integer> process2 = new Process<>(new ProcessAction01<>("process2", 3, i -> i + 10),
Schedulers.computation(), Schedulers.computation());
// process 3
Process<Integer> process3 = new Process<>("process3",
process1.getObservable().switchMap(i -> process2.getObservable()));
// subscriptions
ProcessUtils.subscribe(1, process3);
}
}
يؤدي تشغيل المثال إلى النتائج التالية:
- process1 يصدر عنصرين يؤديان إلى ظهور عنصرين قابلين للمراقبة في process2 مكونين من 3 عناصر؛
- السطر 14: يتلقى المراقب العنصر رقم 0 الذي أصدرته المراقبة الأولى لـ process2 في السطر 6؛
- السطر 15: يتلقى المراقب العنصر رقم 0 الذي أصدرته المراقبة الثانية لعملية 2 في السطر 13. لا تشرح القصة سبب عدم تلقيه سابقًا العنصرين 1 و 2 اللذين أصدرتهما المراقبة الأولى لعملية 2 في السطرين 7 و 8. على أي حال، يتم التخلي عن المراقبة الأولى لعملية 2؛
- في النهاية، يرى المراقب 4 عناصر فقط (الأسطر 14 و15 و17 و20) بدلاً من الـ6 التي تم إرسالها؛
7.6.4. أمثلة-22: طرق أخرى لفئة [Observable]
تتضمن فئة [Observable] العديد من الطرق من فئة [Stream] التي تعمل بطريقة مماثلة. فيما يلي بعض منها. سنقدم ببساطة الكود ونتائجه.
[مثال 22أ - take=limit]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22a {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.range(1, 10).take(3));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
النتائج
[مثال 22ب - takeLast]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22b {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.range(1, 10).takeLast(2));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
النتائج
[مثال 22c - تخطي]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22c {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.range(1, 10).skip(5).take(2));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
النتائج
[مثال 22d - reduce]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22d {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.range(1, 10).reduce(0, (i, a) -> i + a));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
- السطر 10: يحسب مجموع العناصر في المراقب. والنتيجة هي مراقب يصدر هذا المجموع؛
النتائج
[المثال 22e - الكل]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22e {
public static void main(String[] args) throws InterruptedException {
// process
Process<Boolean> process = new Process<>("process", Observable.range(1, 10).all(i -> i > 10));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
- السطر 10: يُرجع Observable<Boolean> الذي يُصدر العنصر true إذا كان مسند الطريقة [all] صحيحًا لجميع العناصر، و false في الحالات الأخرى؛
النتائج
[مثال 22f - العد]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22f {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.range(1, 10).count());
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
- السطر 10: [Observable.count] ينشئ عنصرًا قابلًا للمراقبة مكونًا من عنصر واحد يمثل مجموع العناصر المراقبة؛
النتائج
[مثال 22g - مميز]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
public class Exemple22g {
public static void main(String[] args) throws InterruptedException {
// process
Process<Integer> process = new Process<>("process", Observable.just(1, 2, 1, 3).distinct());
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
النتائج
[ المثال 22h - groupBy، asObservable]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.observables.GroupedObservable;
public class Exemple22h {
public static void main(String[] args) throws InterruptedException {
// process
Observable<GroupedObservable<Boolean, Integer>> obs = Observable.range(1, 10).groupBy(i -> i % 2 == 0);
Process<Integer> process = new Process<>("process", obs.concatMap(g -> g.asObservable()));
// subscriptions
ProcessUtils.subscribe(1, process);
}
}
- السطر 11: تقوم الطريقة [groupBy] بتجميع العناصر العشرة التي تم إصدارها في مجموعتين: الأعداد الزوجية والأعداد الفردية. والنتيجة هي Observable<GroupedObservable<Boolean, Integer>>، أي، عنصر قابل للمراقبة عناصره من النوع GroupedObservable<Boolean, Integer>، حيث Boolean هو نوع مفتاح المجموعة (false، true في هذه الحالة) وهو أيضًا نوع نتيجة اللامدا التي تم تمريرها كمعلمة إلى طريقة [groupBy]، و Integer هو نوع عناصر المجموعة؛
- السطر 12: يحتوي نوع GroupedObservable على طريقة [asObservable] تسمح لنا بإنشاء كائن قابل للمراقبة من هذا النوع. وبالتالي سيكون لدينا نوعان من Observable<Integer>، أحدهما للأرقام الزوجية والآخر للأرقام الفردية. ومن هذين الكائنين القابلين للمراقبة، ستقوم طريقة [concatMap] بإنشاء كائن واحد؛
النتائج
[مثال 22i - طابع زمني]
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
import rx.schedulers.Timestamped;
public class Exemple22i {
public static void main(String[] args) throws InterruptedException {
// process 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
Schedulers.computation());
// process 2
Process<Timestamped<Integer>> process2 = new Process<>("process2", process1.getObservable().timestamp());
// subscriptions
ProcessUtils.subscribe(1, process2);
}
}
- السطر 15، تربط طريقة [timestamp] طابعًا زمنيًا بكل عنصر تمت معالجته من العنصر القابل للمراقبة؛
النتائج
في هذا المثال، من الصعب تحديد ما تمثله معلومات الطابع الزمني:
- السطران 4-5: نرى أن العنصر 1 من العملية 1 تم إرساله بعد 139 مللي ثانية من العنصر 0؛
- السطران 6 و7: نرى أن العنصر 1 من العملية 2 تمت ملاحظته بعد 234 مللي ثانية من العنصر 0؛
- السطران 5 و8: نرى أن العنصر 2 من العملية 1 تم إصداره بعد 33 مللي ثانية من العنصر 1؛
- السطران 7 و 10: نرى أن العنصر 2 من العملية 2 تمت ملاحظته بعد 37 مللي ثانية من العنصر 1؛
تعود هذه التأخيرات إلى حقيقة أن خيوط مراقبة وتنفيذ العناصر القابلة للمراقبة ليست هي نفسها. إذا استبدلنا السطرين 12-13 بالسطور التالية (المثال 22j):
// processus 1
Process<Integer> process1 = new Process<>(new ProcessAction01<>("process1", 3, i -> i), Schedulers.computation(),
null);
- السطران 2–3: لا نحدد مؤشر ترابط المراقبة. ونعلم أنه في هذه الحالة، تتم مراقبة المتغير القابل للمراقبة في المكان الذي يتم فيه تنفيذه؛
وهذا يعطي النتائج التالية:
- السطران 4 و 6: process1 يصدر العنصر رقم 1 بعد 587 مللي ثانية من العنصر رقم 0؛
- السطران 5 و 7: يراقب المراقب هذين العنصرين بفارق 586 مللي ثانية؛
- السطران 6 و 8: تطلق العملية 1 العنصر رقم 2 بعد 396 مللي ثانية من العنصر رقم 1؛
- السطران 7 و9: يلاحظ المراقب هذين العنصرين بفارق زمني قدره 396 مللي ثانية؛
هنا، قيم الطوابع الزمنية متسقة: فهي تمثل بدقة وقت إرسال العنصر.
7.7. المجدولون
7.7.1. المثال 23: المجدول [Schedulers.computation]
سنقوم الآن بفحص مجدولات التنفيذ. وستتم الملاحظة على مؤشر ترابط التنفيذ.
موضوع المجدولات غامض إلى حد ما. يتم عرض المجدولات المختلفة في هذا السؤال على موقع StackOverflow [http://stackoverflow.com/questions/31276164/rxjava-schedulers-use-cases]:
![]() |
سنحاول توضيح استخدام برامج الجدولة المختلفة هذه باستخدام أمثلة. يوضح المثال الأول برنامج الجدولة [Schedulers.computation]:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple23 {
public static void main(String[] args) throws InterruptedException {
// processes
@SuppressWarnings("unchecked")
Process<Double> processes[] = new Process[10];
for (int i = 0; i < processes.length; i++) {
processes[i] = new Process<>(
new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
Schedulers.computation(), null);
}
// subscriptions
ProcessUtils.subscribe(1, processes);
}
}
- الأسطر 14–19: نقوم بإنشاء مصفوفة من 10 عمليات تعمل على مؤشر ترابط حسابي؛
- السطر 17: تولد كل عملية عددًا حقيقيًا عشوائيًا؛
- السطر 21: نشترك في جميع هذه العمليات؛
والنتائج هي كما يلي:
- السطور 2-10: تبدأ العمليات الثماني الأولى على 8 خيوط مختلفة (تحتوي الآلة المستخدمة على 8 نوى). لاحظ أن جميعها تبدأ في نفس الوقت تقريبًا؛
- الأسطر 17-19: تنتهي 3 عمليات، مما يؤدي إلى تحرير 3 خيوط؛
- السطور 23-24: يمكن عندئذٍ أن تبدأ العمليتان الأخيرتان باستخدام خيطين من الخيوط التي تم تحريرها؛
وبالتالي يمكننا أن نستنتج أن المجدول [Schedulers.computation] يوفر مجموعة من n خيوط، حيث n هو عدد النوى في الجهاز. يتم تنفيذ الخيوط بالتوازي على هذه النوى.
7.7.2. مثال-24: المجدول [Schedulers.io]
نقوم بتشغيل الكود السابق باستخدام المجدول [Schedulers.io]:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple24 {
public static void main(String[] args) throws InterruptedException {
// processes
@SuppressWarnings("unchecked")
Process<Double> processes[] = new Process[10];
for (int i = 0; i < processes.length; i++) {
processes[i] = new Process<>(
new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
Schedulers.io(), null);
}
// subscriptions
ProcessUtils.subscribe(1, processes);
}
}
- السطر 18: يتم تشغيل العمليات باستخدام خيوط المجدول [Schedulers.io]؛
وينتج عن ذلك النتائج التالية:
- الأسطر 2-10: تبدأ كل عملية من العمليات العشر على خيط مختلف. على عكس الحالة السابقة، تمكنت جميع العمليات من التشغيل. لاحظ أن عمليات التشغيل هذه تستغرق 6 مللي ثانية، بينما كانت تستغرق 1 مللي ثانية سابقًا؛
- الأسطر 13-18: يتم إصدار القيم القابلة للملاحظة واحدة تلو الأخرى وليس بشكل متوازٍ كما كان الحال سابقًا؛
ما الفرق بين جدولي [Schedulers.io] و [Schedulers.computation]؟ يمكن العثور على إجابة في الرابط [http://stackoverflow.com/questions/31276164/rxjava-schedulers-use-cases]:
![]() |
7.7.3. مثال-25: برنامج الجدولة [Schedulers.newThread]
نقوم بتشغيل الكود السابق باستخدام المجدول [Schedulers.newThread]:
package dvp.rxjava.observables.exemples;
import java.util.Random;
import dvp.rxjava.observables.utils.Process;
import dvp.rxjava.observables.utils.ProcessAction01;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.schedulers.Schedulers;
public class Exemple25 {
public static void main(String[] args) throws InterruptedException {
// processes
@SuppressWarnings("unchecked")
Process<Double> processes[] = new Process[10];
for (int i = 0; i < processes.length; i++) {
processes[i] = new Process<>(
new ProcessAction01<Double>(String.format("process%s", i), 1, value -> new Random().nextInt(100) * 1.2),
Schedulers.newThread(), null);
}
// subscriptions
ProcessUtils.subscribe(1, processes);
}
}
النتائج التي تم الحصول عليها هي نفسها التي تم الحصول عليها مع المجدول [Schedulers.io]:
في الرابط [http://stackoverflow.com/questions/33415881/retrofit-with-rxjava-schedulers-newthread-vs-schedulers-io]، يوضح أن المجدول [Schedulers.io] يوفر مجموعة خيوط، وهو ما لا يفعله المجدول [Schedulers.newThread]. يقوم تجمع الخيوط تلقائيًا بإنشاء مجموعة من الخيوط. ويقوم بتخصيصها للعمليات التي تحتاج إليها. وعندما تنتهي هذه العمليات، لا يتم حذف خيوطها بل تعود إلى التجمع ويمكن إعادة استخدامها من قبل عملية أخرى. وهذا أكثر كفاءة من إنشاء الخيوط وحذفها باستمرار. لذلك، يفضل استخدام المجدول [Schedulers.io].
7.7.4. المثال 26: المجدولان [Schedulers.immediate، Schedulers.trampoline]
لنعد إلى الشرح المقدم لهذين المجدولين:
![]() |
التفسير سهل الفهم إلى حد ما، لكن عندما تحاول توضيحه، تدرك أنك لم تستوعبه حقًا. كان كتاب *Learning Reactive Programming With Java 8* هو الذي ساعدني في إنشاء مثال مستند إلى مثال موجود في ذلك الكتاب، لكن بشكل مبسط. وإليك المثال:
package dvp.rxjava.observables.exemples;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.function.Consumer;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Scheduler;
import rx.Scheduler.Worker;
import rx.functions.Action0;
import rx.schedulers.Schedulers;
public class Exemple26 {
public static void main(String[] args) throws InterruptedException {
// a scheduler
Scheduler scheduler = Schedulers.immediate();
// a worker of this scheme
Worker worker = scheduler.createWorker();
// an Action0 type to be executed on the worker
Action0 action02 = new Action0() {
@Override
public void call() {
// log action02
ProcessUtils.showInfos.accept("action02");
}
};
// an Action0 type to be executed on the worker
Action0 action01 = new Action0() {
@Override
public void call() {
// program a new action on the same worker
worker.schedule(action02);
// log action01
ProcessUtils.showInfos.accept("action01");
}
};
// action01 is programmed on the worker
worker.schedule(action01);
}
// displays
static Consumer<String> showInfos = message -> System.out.printf("%s ------Thread[%s] ---- Time[%s]%n", message,
Thread.currentThread().getName(), new SimpleDateFormat("ss:SSS").format(new Date()));
}
- السطر 17: جدولة. ستكون إما [Schedulers.immediate] كما هو موضح هنا أو [Schedulers.trampoline] لاحقًا؛
- السطر 19: يمكن تنفيذ الإجراءات من النوع Action0 (السطران 21 و20) على عمال المجدول. تقوم طريقة [Scheduler.createWorker] بإنشاء عامل. تقوم طريقة [Worker.schedule(Action0)] بتنفيذ نوع Action0 عبر عامل؛
- الأسطر 21–27: إجراء أول يسمى [action02] سيتم تنفيذه (السطر 40) بواسطة العامل من السطر 19؛
- الأسطر 30–38: إجراء ثانٍ يسمى [action01]. له ميزة خاصة تتمثل في التسبب في تنفيذ action02 على نفس العامل الذي ينفذ هو نفسه (السطر 34). هنا يكمن الفرق بين [Schedulers.immediate] و [Schedulers.trampoline]:
- إذا كان المجدول هو [Schedulers.immediate]، ففي السطر 34، سيتم تنفيذ الإجراء action02 على الفور (ومن هنا جاء اسم المجدول) وسيتم مقاطعة الإجراء action01 قيد التشغيل حاليًا. سنرى بعد ذلك ظهور الرسالة من السطر 25. بمجرد انتهاء action02، سيستأنف action01 وسنرى الرسالة من السطر 36؛
- إذا كان المجدول هو [Schedulers.trampoline]، ففي السطر 34، يتم وضع الإجراء action02 في قائمة الانتظار. ولن يتم تنفيذه حتى تكتمل المهمة الحالية، action01. ثم ستظهر الرسالة الموجودة في السطر 36. وبمجرد اكتمال action01، سيتم تنفيذ action02، وستظهر الرسالة الموجودة في السطر 25؛
يؤدي تنفيذ الكود أعلاه إلى النتائج التالية:
إذا استخدمنا، في السطر 17، المجدول [Schedulers.trampoline]، فسنحصل على نتائج معاكسة:
ومع ذلك، من الصعب الربط بين ذلك وبين العناصر القابلة للمراقبة. لم أجد مثالاً مقنعاً يوضح فائدة تنفيذ عنصر قابل للمراقبة على أحد هذين الخيطين. لكن إليك مثالاً، لا أجده طبيعياً على الإطلاق:
package dvp.rxjava.observables.exemples;
import dvp.rxjava.observables.utils.ProcessUtils;
import rx.Observable;
import rx.Scheduler.Worker;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
public class Exemple27 {
public static void main(String[] args) throws InterruptedException {
// Worker
Worker worker = Schedulers.immediate().createWorker();
// Worker worker = Schedulers.trampoline().createWorker();
// observable 1 sur worker
worker.schedule(() -> Observable.range(1, 2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
ProcessUtils.showInfos.accept(String.valueOf(i));
// observable 2 on same worker
worker.schedule(() -> Observable.range(100, 2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
ProcessUtils.showInfos.accept(String.valueOf(i));
}
}));
}
}));
}
}
- السطران 13–14: يتم إنشاء عامل باستخدام أحد المجدولين [Schedulers.immediate] و [Schedulers.trampoline]؛
- السطر 16: تم جدولة المقياس الأول obs1 على هذا العامل لإصدار الأرقام [1,2]
- السطر 22: في كل مرة يتم فيها ملاحظة عنصر من هذا المقياس القابل للملاحظة obs1، يتم إطلاق ملاحظة مقياس قابل للملاحظة ثانٍ obs2 على نفس العامل لإصدار الأرقام [100،101]؛
باستخدام المجدول [Schedulers.immediate]، نحصل على النتائج التالية:
بينما مع المجدول [Schedulers.trampoline]، نحصل على النتائج التالية:
7.8. الخلاصة
لا يزال هناك الكثير مما يجب القيام به. لفهم مكتبة RxJava بشكل أعمق، ننصح القراء بمواصلة التعلم باستخدام المراجع المذكورة في بداية هذا المستند. ومع ذلك، لدينا الآن الأساسيات اللازمة لاستخدام RxJava في بيئات Swing وAndroid. وهذا ما سنقوم بتوضيحه في الفقرة التالية.








































