2. مثال تمهيدي
كانت أول مرة تعرفت فيها على RxJava من خلال الدورات والبرامج التعليمية التي وجدتها على الإنترنت. وبصرف النظر عن حقيقة أن النظرية استخدمت مفاهيم لم أكن على دراية بها وواجهت صعوبة في فهمها، لم أستطع حقًا أن أرى كيف يمكن أن تكون مفيدة في الحياة الواقعية. لذلك سنبدأ بتقديم مثال (آمل أن يكون بسيطًا) حيث يؤدي استخدام RxJava إلى تبسيط حقيقي للكود، ومن هناك، سنحاول تحديد العناصر المهمة في هذه المكتبة.
تستند مكتبة RxJava إلى المفهوم التالي: يتم مراقبة دفق من العناصر من النوع T Observable<T> بواسطة مشترك واحد أو أكثر (مشتركين، مراقبين، مستهلكين) Subscriber<T>. تسمح مكتبة RxJava بتشغيل دفق Observable<T> في مؤشر ترابط T1 ومراقب Subscriber<T> الخاص به في مؤشر ترابط T2 دون أن يضطر المطور إلى القلق بشأن إدارة دورة حياة هذه المؤشرات أو المشكلات الصعبة بطبيعتها، مثل مشاركة البيانات بين المؤشرات ومزامنتها لتنفيذ مهمة عامة. وبالتالي، فهي تسهل البرمجة غير المتزامنة.
ينتج تيار Observable<T> عناصر من النوع T، والتي يمكن ملاحظتها فور إنتاجها. إذا كان المراقب والمراقب (وهو مصطلح يُستخدم بشكل عام للإشارة إلى النوع Observable<T>) موجودين في نفس الخيط، فإن المراقب لا يمكنه إنتاج العنصر (i+1) إلا بعد أن يكون المراقب قد استهلك العنصر i. وهناك حالات قليلة تكون فيها هذه البنية مفيدة. إذا لم يكن المراقب والمراقب في نفس الخيط، فإن المراقب ومراقبه يتصرفان بشكل مستقل: يصدر المراقب وفقًا لسرعته الخاصة ويستهلك المراقب وفقًا لسرعته الخاصة. هنا تكمن قيمة المكتبة. حتى الآن، لم نناقش سوى مراقب واحد. في الواقع، يمكن أن يكون للمراقب أي عدد من المراقبين.
2.1. بنية التطبيق المثال
يتميز التطبيق النموذجي بالبنية التالية:

- في [1]، تقوم طبقة الخدمة بإنشاء قوائم من الأرقام العشوائية. تعمل هذه الطبقة في نفس الخيط الذي تعمل فيه طريقة [swing] التي تستخدمها. ثم تقوم بإنشاء أرقامها بشكل متزامن؛
- في [2]، تسمح طبقة تكييف رقيقة تم تنفيذها باستخدام RxJava بتقديم تنفيذ غير متزامن لنفس الخدمة إلى طبقة [swing]: يمكن أن تعمل هذه الخدمة في مؤشر ترابط مختلف عن طريقة [swing] التي تستخدمها؛
- الاستدعاء [4] متزامن، في حين أن الاستدعاءات [5-6] غير متزامنة؛
ما نريد توضيحه هنا هو أن مكتبة Rx تسهل تحويل واجهة متزامنة إلى واجهة غير متزامنة. لماذا هذا مفيد؟ تتم معالجة الأحداث في واجهة Swing في مؤشر ترابط يُشار إليه عادةً باسم حلقة الأحداث. يتم وضع الأحداث في قائمة انتظار ومعالجتها واحدة تلو الأخرى. لا يمكن معالجة الحدث Ei+1 إلا بعد معالجة الحدث السابق Ei بالكامل. لذلك من المهم أن تكون معالجة الأحداث قصيرة قدر الإمكان حتى تظل واجهة المستخدم الرسومية (GUI) سريعة الاستجابة. في بعض الأحيان، قد يستغرق معالجة الحدث وقتًا طويلاً. ويحدث هذا إذا كانت المعالجة تتضمن الوصول إلى الشبكة. إذا كنا لا نريد تجميد واجهة المستخدم الرسومية بطريقة غير مقبولة للمستخدم، فيجب تنفيذ عمليات الشبكة هذه في خيوط منفصلة عن حلقة الأحداث لتحريرها. وهذا يقودنا إلى مجال البرمجة المتزامنة (حيث تعمل خيوط متعددة بالتوازي)، والتي تعتبر صعبة بحق. توفر مكتبة Rx حلاً بسيطًا وأنيقًا لهذه المشكلة.
لمحاكاة العمليات طويلة الأمد، تقدم الخدمة في المثال أرقامها العشوائية بعد تأخير معين حتى نتمكن من ملاحظة سلوك واجهة المستخدم الرسومية.
2.2. الملف القابل للتنفيذ
يوجد الملف القابل للتنفيذ لتطبيق المثال في مجلد [dvp/executables] ضمن الأمثلة:
![]() | ![]() |
هناك طرق مختلفة لتشغيل أرشيف [swing-01] اعتمادًا على تكوين الجهاز المستخدم لتشغيله. على سبيل المثال، يمكنك اتباع الخطوات [1-3]. سيؤدي ذلك إلى عرض واجهة المستخدم الرسومية التالية:
![]() |
- تحتوي الواجهة على علامتي تبويب [1-2]: الأولى [Request] لإرسال طلب إلى خدمة مولد الأرقام العشوائية، والأخرى [Response] لعرض الأرقام المستلمة؛
- في [3]، تحدد عدد الطلبات التي تريد إرسالها إلى الخدمة؛
- في [4]، تحدد نطاق توليد الأرقام المطلوب [a,b]؛
- في [5]، سيكون عدد القيم التي تعيدها الخدمة رقمًا عشوائيًا ضمن الفاصل [minCount, maxCount] الذي يحدده المستخدم؛
- في [6]، قبل إرجاع استجابتها، ستنتظر الخدمة تأخيرًا يبلغ ميلي ثانية، حيث يكون التأخير رقمًا عشوائيًا ضمن النطاق المحدد من قبل المستخدم [minDelay, maxDelay]؛
- بشكل افتراضي، ستستخدم طبقة [swing] الواجهة المتزامنة للخدمة. لاستخدام الطبقة غير المتزامنة، سيقوم المستخدم بتحديد [7]. في هذه الحالة، ستعمل خدمة التوليد في خيوط منفصلة عن حلقة أحداث واجهة المستخدم الرسومية. توفر مكتبة Rx استراتيجيات متنوعة لتوليد هذه الخيوط. يمكن للمستخدم تحديد استراتيجيته في [8]؛
- يتم إنشاء الأرقام باستخدام الزر [9]؛
![]() |
- [10] يعرض النتائج. سنشرح هيكلها؛
- في [11]، عدد النتائج التي تم الحصول عليها؛
- في [12]، وقت التنفيذ بالمللي ثانية؛
- في [13]، يتوفر للمستخدم خيار إلغاء التنفيذ؛
كل نتيجة لها التنسيق التالي:
{"idClient":0,"serviceResponse":{"delay":412,"aleas":[146,115,128,174,159,112,162,127],"executedOn":"RxComputationThreadPool-6"},"observedOn":"AWT-EventQueue-0","requestAt":"02:42:47:708","responseAt":"02:42:52:931"}
- [idClient]: معرف الطلب. لاحظ أنه يتم إرسال طلبات متعددة إلى خدمة التوليد؛
- [delay]: وقت الانتظار بالمللي ثانية الذي لاحظته الخدمة قبل إرسال نتيجتها؛
- [aleas]: الأرقام العشوائية التي أعادتها الخدمة؛
- [executedOn]: اسم الخيط الذي تم تشغيل الخدمة فيه؛
- [observedOn]: اسم مؤشر الترابط الذي عرض النتيجة. مع واجهة Swing، لا يمكن أن يكون هذا سوى مؤشر ترابط حلقة الأحداث، وهنا [AWT-EventQueue-0]؛
- [requestAt]: وقت الطلب بالتنسيق [ساعات:دقائق:ثوانٍ:مللي ثانية]؛
- [responseAt]: وقت استلام النتائج بنفس التنسيق؛
سنقدم الآن مقتطفات الكود اللازمة لفهم المثال.
2.3. الواجهة المتزامنة

تقدم طبقة الخدمة [1] الواجهة التالية:
public interface IService {
// random numbers in [a,b]
// n numbers are generated with random n in the interval [minCount, maxCount]
// numbers are generated after a delay of milliseconds,
// where [delay] is a random number in the range [minDelay, maxDelay]
public ServiceResponse getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay);
}
[ServiceResponse] كما يلي:
public class ServiceResponse {
// service waiting time
private int delay;
// random numbers
private List<Integer> aleas;
// execution thread
private String executedOn;
// manufacturers
public ServiceResponse(int delay, List<Integer> aleas) {
executedOn = Thread.currentThread().getName();
this.delay = delay;
this.aleas = aleas;
}
// getters and setters
...
}
تتكون الإجابة من ثلاثة أجزاء:
- السطر 6: الأرقام العشوائية التي تم إنشاؤها؛
- السطر 4: وقت الانتظار الذي تلاحظه الخدمة قبل إرجاع نتيجتها؛
- السطر 8: مؤشر ترابط تنفيذ الخدمة؛
2.4. الاستدعاء المتزامن

سنقوم الآن بتفصيل الاستدعاء المتزامن [4] الذي تقوم به طبقة [swing] للخدمة [1]:
private void doGenerateWithService() {
// start waiting
beginWaiting();
try {
for (int i = 0; i < nbRequests; i++) {
UiResponse uiResponse = new UiResponse();
uiResponse.setIdClient(i);
uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
uiResponse.setResponseAt();
model.add(0, jsonMapper.writeValueAsString(uiResponse));
jLabelNbReponses.setText(String.valueOf(Integer.parseInt(jLabelNbReponses.getText()) + 1));
}
} catch (JsonProcessingException | RuntimeException e) {
System.out.println(e);
}
// end waiting
endWaiting();
}
- الأسطر 5–12: الحلقة التي تعالج الطلبات [nbRequests] التي قدمها المستخدم؛
- السطر 8: [service] هو تنفيذ واجهة [IService] المتزامنة المعروضة في القسم 2.3؛
- السطر 10: [model] هو النموذج الذي يعرضه مكون JList في علامة التبويب [Response]. عناصر هذا النموذج هي سلاسل JSON لعناصر من النوع [UiResponse] على النحو التالي:
public class UiResponse {
// customer id
private int idClient;
// service response
private ServiceResponse serviceResponse;
// observation thread name
private String observedOn;
// query time
private String requestAt;
// response time
private String responseAt;
// manufacturers
public UiResponse() {
observedOn = Thread.currentThread().getName();
requestAt = getTimeStamp();
}
// private methods
private String getTimeStamp() {
return new SimpleDateFormat("hh:mm:ss:SSS").format(Calendar.getInstance().getTime());
}
// getters and setters
...
}
- السطر 6: الرد من خدمة توليد الأرقام؛
- السطر 4: رقم الطلب الذي يتم الرد عليه؛
- السطر 8: مؤشر الترابط الذي يعرض هذا الرد. كما ذكرنا، سيكون هذا دائمًا مؤشر ترابط حلقة الأحداث؛
- السطران 10 و 12: وقت الطلب ووقت الرد؛
2.5. اختبار المكالمات المتزامنة
نقوم بتشغيل التكوين التالي:
![]() |
نحصل على النتائج التالية في علامة التبويب [Response]:
![]() |
- في [1-2]، تلقينا بالفعل 10 استجابات كما هو مطلوب. تم إدراجها في الموضع الأول حسب ترتيب وصولها. يمكننا أن نرى أنها تم استلامها بترتيب الطلبات؛
- تم تنفيذها جميعًا وعرضها في مؤشر ترابط حلقة الأحداث [AWT-EventQueue-0]. وبالتالي، تم تنفيذ الطلبات واحدًا تلو الآخر في هذا المؤشر. لم تكن هناك طلبات متزامنة؛
- ما لا يظهر هنا هو أن واجهة المستخدم الرسومية تتجمد أثناء التنفيذ. على سبيل المثال، لا توجد طريقة للوصول إلى علامة التبويب [Response] لعرض الردود الواردة أو لإيقاف التنفيذ باستخدام الزر [Cancel]. حتى لو كان هذا الزر موجودًا في علامة التبويب [Request]، لكان غير قابل للاستخدام. في الواقع، سيكون هناك حدثان:
- النقر على زر [Generate]؛
- النقر على زر [Cancel]؛
لا تتم معالجة النقر على زر [Cancel] إلا بعد انتهاء العملية التي تم تشغيلها بالنقر على زر [Generate]. لقد رأينا للتو أن هذه العملية شغلت مؤشر ترابط حلقة الأحداث طوال مدة التنفيذ، مما منع معالجة النقر على زر [Cancel]. هذا هو عادةً نوع الموقف الذي يمكن أن يوفر فيه Rx تحسينًا كبيرًا؛
2.6. الواجهة غير المتزامنة وتنفيذها
سنلقي الآن نظرة على واجهة الطبقة [2] وتنفيذها باستخدام Rx. لن يكون هذا واضحًا على الفور. نريد ببساطة تسليط الضوء على بساطة الكود في هذا التنفيذ.

الواجهة غير المتزامنة هي كما يلي:
public interface IRxService {
// random numbers in [a,b]
// n numbers are generated with random n in the interval [minCount, maxCount]
// numbers are generated after a delay of milliseconds,
// where [delay] is a random number in the range [minDelay, maxDelay]
public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse);
}
فيما يلي الاختلافات عن الواجهة المتزامنة المعروضة في القسم 2.3:
- أصبحت فئة [UiResponse] المعروضة في القسم 2.3 الآن جزءًا من معلمات طريقة [getAleas] (السطر 6). والسبب في ذلك هو أن الطلبات تعمل الآن بشكل متوازٍ وأن الخدمة تنتظر فترة زمنية عشوائية قبل إرجاع النتيجة، وبالتالي لن تصلنا الاستجابات بترتيب الطلبات. لذلك، نقوم بتمرير كائن [UiResponse]، الذي يحتوي، من بين معلومات أخرى، على معرف الطلب:
// id du client (requête)
private int idClient;
// réponse du service
private ServiceResponse serviceResponse;
// nom du thread d'observation
private String observedOn;
// heure de la requête
private String requestAt;
// heure de la réponse
private String responseAt;
- نوع الاستجابة للخدمة غير المتزامنة هو من النوع [Observable<UiResponse>]. يتم توفير النوع [Observable<>] بواسطة مكتبة Rx. تشير النتيجة [Observable<UiResponse>] إلى أن الطريقة [getAleas] توفر دفقًا من القيم من النوع [UiResponse]، والتي يتم دفعها واحدة تلو الأخرى إلى المراقب الخاص بها؛
الآن دعونا نلقي نظرة على تنفيذ هذه الواجهة:
public class RxService implements IRxService {
// service
private IService service;
// manufacturer
public RxService(IService service) {
this.service = service;
}
@Override
public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse) {
return Observable.create(subscriber -> {
try {
uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
subscriber.onNext(uiResponse);
} catch (Exception e) {
subscriber.onError(e);
} finally {
subscriber.onCompleted();
}
});
}
}
- الأسطر 7–9: نزود المنشئ بإشارة إلى الواجهة المتزامنة [IService]. ستتولى هذه الواجهة إنشاء الأرقام العشوائية؛
- يتم إنشاء المراقب الذي ترجعته الطريقة [getAleas] بواسطة الطريقة الثابتة [Observable.create]. تسمح لنا هذه الطريقة ببناء تنفيذ غير متزامن من تنفيذ متزامن؛
- السطر 13: المعلمة للطريقة الثابتة [Observable.create] هي هنا دالة لامدا تأخذ نوع [Subscriber] كمعلمة، وهو مرة أخرى نوع Rx. [Subscriber] هو كائن يشترك في دفق من القيم القابلة للملاحظة، أي دفق من البيانات التي يتم تسليمها بشكل غير متزامن. هنا، نستخدم ثلاث طرق لهذا المشترك:
- [Subscriber.onNext] لتمرير البيانات إليه (السطر 16)؛
- [Subscriber.onError] لإرسال استثناء إليه (السطر 18)؛
- [Subscriber.onCompleted] لإعلام المشترك بأن دفق البيانات قد انتهى (السطر 20)؛
يمكن أن يكون هناك عدة مشتركين في نفس العنصر القابل للمراقبة. هنا، سيكون لدينا مشترك واحد فقط يشترك في دفق لبيانات مفردة، وهي تلك التي تم إنتاجها في السطرين 15-16. يتم إنتاج البيانات بواسطة التنفيذ المتزامن للخدمة (السطر 15) وإرجاعها إلى المشترك (السطر 16).
حتى لو بقي كل هذا غامضًا إلى حد ما، لا يسع المرء إلا أن يندهش من الإيجاز الشديد لهذا التنفيذ غير المتزامن للخدمة.
2.7. الاستدعاء غير المتزامن

سنقوم الآن بفحص المكالمة المتزامنة [5] التي أجرتها طبقة [swing] إلى الخدمة [2]:
private void doGenerateWithRxService() {
// start waiting
beginWaiting();
// we ask for the random numbers
Observable<UiResponse> observables = Observable.empty();
for (int i = 0; i < nbRequests; i++) {
UiResponse uiResponse = new UiResponse();
uiResponse.setIdClient(i);
// scheduler
int schedulerIndex = jComboBoxSchedulers.getSelectedIndex();
switch (schedulerIndex) {
case 0:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.io()));
break;
...
}
}
...
}
- السطور 6–10: تنفيذ طلبات [nbRequests] التي طلبها المستخدم؛
- السطور 7-8: إعداد كائن [UiResponse] المطلوب بواسطة طريقة [getAleas] للخدمة غير المتزامنة (السطر 13). يتضمن هذا بشكل أساسي تسجيل [idClient] للطلب؛
- السطر 13: يتم استدعاء طريقة [getAleas] للخدمة غير المتزامنة. وهي تُرجع كائن [Observable<UiResponse>]. لا يؤدي هذا الاستدعاء بعد إلى استدعاء الخدمة المتزامنة. لنعد إلى كود [getAleas] غير المتزامن:
@Override
public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse) {
return Observable.create(subscriber -> {
try {
uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
subscriber.onNext(uiResponse);
} catch (Exception e) {
subscriber.onError(e);
} finally {
subscriber.onCompleted();
}
});
}
يتم تنفيذ الكود الموجود في الأسطر 4–11، الذي يستدعي الخدمة المتزامنة، فقط عند تسجيل أحد المشتركين. وطالما لا يوجد مشتركون، لا يتم تنفيذ هذا الكود.
لنعد إلى الكود الخاص بالطريقة [doGenerateWithRxService]:
- السطر 5: نقوم بإنشاء عنصر قابل للمراقبة فارغ (لا يتم مراقبة أي شيء)؛
- السطر 13: نقوم بإنشاء كائن قابل للمراقبة يكون تياره عبارة عن دمج التيارات غير المتزامنة [nbRequests] المرتبطة بطلبات [nbRequests]. يتم تحقيق ذلك باستخدام طريقة [Observable.mergeWith]، التي تسمح بدمج تيارين غير متزامنين. في مصطلحات Rx، يُطلق على [mergeWith] اسم مشغل الدفق. تتميز هذه المشغلات بأن نتيجة العملية هي، في معظم الحالات، [Observable] آخر. في النهاية، بعد السطر 17، تشير المتغير [observables] إلى دفق واحد يتكون من الاستجابات غير المتزامنة [nbRequests] التي قامت بها الخدمة غير المتزامنة؛
- السطر 13: كان من الممكن كتابة عملية الدمج على النحو التالي:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse));
لكننا كتبنا:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.io()));
هنا، استخدمنا عامل [subscribeOn] على المراقب [rxService.getAleas]. وكما هو الحال غالبًا، تكون النتيجة مرة أخرى مراقبًا. يحدد عامل [subscribeOn] أن القابل للملاحظة يجب أن يتم تنفيذه في مؤشر ترابط مقدم من [Scheduler]. هناك العديد من [Schedulers] الممكنة المناسبة لمواقف مختلفة. في واجهة المستخدم الرسومية، قدمنا عدة خيارات لمعرفة كيف تختلف:
![]() |
ينتج عن ذلك الكود التالي:
private void doGenerateWithRxService() {
// start waiting
beginWaiting();
// we ask for the random numbers
Observable<UiResponse> observables = Observable.empty();
for (int i = 0; i < nbRequests; i++) {
UiResponse uiResponse = new UiResponse();
uiResponse.setIdClient(i);
// scheduler
int schedulerIndex = jComboBoxSchedulers.getSelectedIndex();
switch (schedulerIndex) {
case 0:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.io()));
break;
case 1:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.computation()));
break;
case 2:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.newThread()));
break;
case 3:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.trampoline()));
break;
case 4:
observables = observables.mergeWith(rxService.getAleas(a, b, minCount, maxCount, minDelay, maxDelay, uiResponse).subscribeOn(Schedulers.immediate()));
break;
}
}
...
}
دعونا نراجع الكود في الأسطر 12–14. يقوم المجدول [Schedulers.io()] بتعيين مؤشر ترابط جديد لكل عنصر قابل للمراقبة. إذا تابعنا الكود:
- السطر 5: لدينا عنصر قابل للمراقبة فارغ؛
- السطر 13، التكرار 1: العناصر القابلة للمراقبة هي القائمة [observable0/thread0] (العنصر القابل للمراقبة observable0 يعمل على الخيط thread0)؛
- السطر 13، التكرار 2: العناصر القابلة للمراقبة هي القائمة [observable0/thread0, observable1/thread1]؛
- إلخ...
في النهاية، بعد السطر 28، لدينا عنصر قابل للمراقبة ناتج عن دمج [nbRequests] عناصر قابلة للمراقبة تعمل على [nbRequests] خيوط مختلفة. لا تعمل جميع برامج الجدولة بهذه الطريقة، كما سنرى أثناء الاختبار.
دعونا نواصل فحص الكود الخاص باستدعاء الخدمة غير المتزامنة:
private void doGenerateWithRxService() {
// start waiting
beginWaiting();
// we ask for the random numbers
Observable<UiResponse> observables = Observable.empty();
for (int i = 0; i < nbRequests; i++) {
...
}
// observer
observables = observables.observeOn(SwingScheduler.getInstance());
// these observables are executed
subscriptions.add(observables.subscribe(uiResponse -> {
updateUi(uiResponse);
} , th -> {
System.out.println(th);
doCancel();
} , this::doCancel));
}
- لقد رأينا أنه عند الوصول إلى السطر 10، يكون لدينا عنصر قابل للمراقبة واحد، وهو دمج لعناصر [nbRequests] قابلة للمراقبة قد تعمل أو لا تعمل على [nbRequests] خيوط مختلفة، اعتمادًا على المجدول الذي يختاره المستخدم؛
- السطر 10: يتيح لنا عامل [observeOn] تحديد الخيط الذي نريد استرداد البيانات منه من الكائن القابل للمراقبة، وفي هذه الحالة كائنات [nbRequests] من النوع [UiResponse]. وفي واجهة Swing، لا خيار أمامنا. يجب إجراء أي تحديث للواجهة في مؤشر ترابط حلقة الأحداث. هنا، سيتم عرض بيانات العنصر القابل للمراقبة في مكون Swing JList. يمثل مؤشر ترابط [SwingScheduler.getInstance()] مؤشر ترابط حلقة الأحداث. لا تأتي فئة [SwingScheduler] من مكتبة RxJava بل من مكتبة RxSwing؛
- عندما نصل إلى السطر 12، لا تزال الخدمة المتزامنة لم تُستدعَ لأن المراقب في السطر 10 لا يزال يفتقر إلى مشترك. توفر الأسطر 12–17 مشتركًا واحدًا، باستخدام عامل [subscribe]. معلمات هذا العامل هي ثلاث دوال لامدا:
- الأولى [uiResponse -> {updateUi(uiResponse);}] تأخذ كمعلمة أحد كائنات [UiResponse] التي أنتجتها المراقبة. تذكر أننا هنا سنحصل على [nbRequests] كائنات من هذا النوع. يجب أن تعالج الطريقة المرتبطة، وهي updateUi في هذه الحالة، هذه النتيجة؛
- الثانية [th -> {System.out.println(th);doCancel();}] تأخذ نوع [Throwable] كمعلمة، في هذه الحالة استثناء حدث أثناء تنفيذ المراقب. يجب أن تعالج الطريقة المرتبطة هذه المعلومات. هنا، نعرضها على وحدة التحكم (السطر 15) ونلغي التنفيذ، مما سيؤدي إلى تحديث عناصر معينة من واجهة المستخدم الرسومية؛
- يتم استدعاء الثالث [this::doCancel] عندما يشير المراقب إلى أنه لم يعد لديه بيانات لإرسالها. هنا، المراقب هو اتحاد [nbRequests] من المراقبين. سيشير المراقب الناتج إلى أنه قد انتهى عندما يشير جميع المراقبين المكونين له إلى أنهم قد أنهوا عملهم. لذلك، عند تنفيذ دالة لامدا الثالثة هذه، نكون قد تلقينا جميع البيانات. تقوم الطريقة المحلية [doCancel] بتحديث واجهة المستخدم الرسومية لتعكس اكتمال التنفيذ؛
يتم تعريف المتغير [subscriptions] على النحو التالي:
// les souscriptions aux observables
protected List<Subscription> subscriptions = new ArrayList<Subscription>();
يمثل النوع [Subscription] اشتراكًا، أي الرابط بين المشترك [Subscriber] وما يراقبه [Observable]. استخدمنا هنا قائمة بالاشتراكات، على الرغم من وجود اشتراك واحد فقط في هذا المثال. فيما يلي الدالة المحلية [doCancel]، التي يتم تنفيذها عندما يشير المراقب إلى أنه لم يعد لديه بيانات لإرسالها:
@Override
protected void doCancel() {
// fin attente
endWaiting();
// dans le cas de souscriptions
if (jCheckBoxRxSwing.isSelected() && subscriptions != null) {
subscriptions.forEach(Subscription::unsubscribe);
}
}
- السطر 7 يلغي اشتراك جميع المشتركين من المراقب؛
من هذا الشرح الموجز، يمكننا استخلاص النقاط الرئيسية التالية:
- يشير النوع [Observable] إلى دفق من القيم، التي يتم دفعها واحدة تلو الأخرى إلى المشتركين أو المراقبين؛
- يشير النوع [Subscriber] إلى مشترك من النوع [Observable]؛
- يشير النوع [Subscription] إلى اشتراك، أي الرابط بين [Subscriber] و [Observable]؛
- يدعم النوع [Observable] العوامل [mergeWith، empty، subscribeOn، observeOn، ...]، والتي ينتج معظمها قيمًا قابلة للمراقبة. تُستخدم هذه العوامل لتكوين القيمة القابلة للمراقبة قبل تشغيلها:
- ما الذي يجب ملاحظته؛
- الخيط الذي يعمل عليه القابل للمراقبة؛
- الخيط الذي يتلقى عليه المشترك البيانات من القابل للمراقبة؛
- هناك نوعان من العناصر القابلة للمراقبة: [cold] و [hot]. يتم تنفيذ العنصر القابل للمراقبة البارد بالكامل لكل مشترك جديد. إذا أنتج كل تنفيذ نفس البيانات، يتلقى كل مشترك جديد نفس البيانات التي تلقاها المشترك السابق. ينتج العنصر القابل للمراقبة الساخن البيانات بشكل مستمر بشكل عام. عندما يشترك مشترك، يتلقى البيانات الصادرة منذ وقت اشتراكه. ولا يتلقى البيانات التي ربما تم إصدارها مسبقًا. في مثالنا، المراقب بارد: يتم إعادة تنفيذه بالكامل لكل مشترك جديد. ما الذي يتم تنفيذه فعليًا في مثالنا؟ لمعرفة ذلك، نحتاج إلى العودة إلى تعريف المراقب المراقب:
@Override
public Observable<UiResponse> getAleas(int a, int b, int minCount, int maxCount, int minDelay, int maxDelay, UiResponse uiResponse) {
return Observable.create(subscriber -> {
try {
uiResponse.setServiceResponse(service.getAleas(a, b, minCount, maxCount, minDelay, maxDelay));
subscriber.onNext(uiResponse);
} catch (Exception e) {
subscriber.onError(e);
} finally {
subscriber.onCompleted();
}
});
}
بالنسبة لكل مشترك جديد، يتم إعادة تنفيذ دالة لامدا، وهي معلمة لطريقة [Observable.create] (السطر 3). لذلك، يتم تنفيذ الأسطر 4–11 لكل مشترك جديد [subscriber]؛
2.8. اختبار المكالمات غير المتزامنة
نبدأ بتوضيح تأثير المجدولات المختلفة المتاحة. للقيام بذلك، نستخدم المعلمات التالية:
![]() |
نضبط [1-2] على قيم صغيرة بحيث حتى إذا تم تنفيذ الطلبات على نفس الخيط، فلن نضطر إلى الانتظار لفترة طويلة.
2.8.1. باستخدام المجدول [Schedulers.io]
![]() |
يمكن ملاحظة النقاط التالية:
- يتم استلام الردود بترتيب لا يتطابق مع ترتيب الطلبات (انظر idClient)؛
- تم تشغيل كل طلب في مؤشر ترابط مختلف؛
- لم تعد واجهة المستخدم الرسومية متجمدة هذه المرة:
- يمكنك التبديل بين علامات التبويب؛
- نرى البيانات وهي تصل؛
- لا يوجد وقت لرؤية زر [إلغاء] لأن التنفيذ سريع للغاية. سنسلط الضوء على هذا في اختبار آخر؛
2.8.2. باستخدام المجدول [Schedulers.computation]
![]() |
يمكن ملاحظة النقاط التالية:
- يتم استلام الردود بترتيب لا يتطابق مع ترتيب الطلبات (انظر idClient)؛
- تم تنفيذ الطلبات في 8 خيوط؛
- تم استخدام الخيط رقم 3 للطلبين 8 و 0؛
- تم استخدام الخيط رقم 4 للطلبين 9 و 1؛
- كان لكل طلب من الطلبات الأخرى موضوع منفصل؛
يستخدم المجدول [Schedulers.computation] عددًا من الخيوط يساوي عدد النوى الموجودة في الجهاز المستخدم. يتم الحصول على هذه المعلومات عبر التعبير [Runtime.getRuntime().availableProcessors()].
2.8.3. مع المجدول [Schedulers.newThread]
![]() |
السلوك مشابه لسلوك المجدول [Schedulers.io].
2.8.4. مع المجدولين [Schedulers.trampoline، Schedulers.immediate]
![]() |
السلوك متزامن. يتم تنفيذ جميع الطلبات على مؤشر ترابط حلقة الأحداث. لا ينبغي تعميم هذه النتيجة؛ بل يعني ذلك ببساطة أنه في هذا المثال المحدد، عملت كلتا المجدولتين بشكل متزامن.
2.9. الحالات الاستثنائية
في هذا المثال، سنعمل مع جدولات تدعم التشغيل غير المتزامن. أولاً، نزيد عدد الطلبات إلى 100 باستخدام جدولة [Schedulers.computation]، التي تعمل هنا على 8 خيوط. نحصل على النتيجة التالية:
![]() |
- في [1]، زر [Cancel] موجود وقابل للاستخدام (عملية غير متزامنة)؛
الآن، دعونا نترك التنفيذ يستمر حتى النهاية:
![]() |
نرى في [2] أن تنفيذ الطلبات المائة استغرق حوالي 4 ثوانٍ (عبر 8 خيوط).
الآن، دعونا نُشغّل هذه الطلبات المائة نفسها باستخدام مُجدول [Schedulers.newThread]، الذي يُنفّذ كل طلب على خيط منفصل:
![]() |
في [1]، نرى أن تنفيذ الطلبات المائة (عبر 100 مؤشر ترابط) استغرق نصف ثانية. وهذا أسرع بكثير من استخدام المجدول [Schedulers.computation].
الآن، دعونا نرسل 800 طلب في نفس الظروف، مع الاستمرار في استخدام المجدول [Schedulers.newThread]. نحصل على النتائج التالية:
![]() |
يتم تنفيذ الطلبات الـ 800 في حوالي ثانية واحدة.
عندما نزيد هذا العدد (إلى ما يزيد عن 2500 طلب على جهازي — يتم تنفيذها في 1.5 ثانية — وهذا العدد يعتمد بالطبع بشكل كبير على بيئة التشغيل)، نحصل في النهاية على الاستثناء التالي:
![]() |
وبالتالي، لدينا تجاوز سعة المكدس. تظهر الاختبارات أن سلوك المجدول [Schedulers.newThread] غير حتمي. قد تواجه الاستثناء السابق، ثم تجري اختبارات جديدة، ثم تعود إلى التكوين الذي تسبب في الاستثناء ولا تواجهه مرة أخرى.
2.10. الخلاصة
لقد عرضنا مثالاً على استخدام مكتبة Rx. دعونا نلخص ما تعلمناه:
بدأنا بالبنية التالية:

- في [4]، قامت طبقة [swing] بإجراء مكالمات متزامنة إلى طبقة [service]؛
- في [5]، قامت طبقة [swing] بإجراء مكالمات غير متزامنة إلى طبقة [rxService]، والتي قامت بدورها بإجراء مكالمة متزامنة [6] إلى طبقة [service]؛
أول ما لاحظناه هو أن مكتبة Rx سهّلت إنشاء واجهة [rxService] غير المتزامنة من واجهة [service] المتزامنة (انظر القسم 2.4). وهذا درس مهم لأنه يعني أنه يمكننا بسهولة تطوير تطبيق متزامن إلى تطبيق غير متزامن.
في طبقة [swing]، تمت كتابة طريقتين منفصلتين:
- واحدة لإجراء مكالمات متزامنة إلى الخدمة (انظر القسم 2.4)؛
- والأخرى لإجراء مكالمات غير متزامنة إليها (انظر القسم 2.7)؛
وقد ثبت أن كتابة المكالمات غير المتزامنة أكثر تعقيدًا بكثير من كتابة المكالمات المتزامنة. ومع ذلك، فإن أولئك الذين عملوا في البرمجة المتزامنة التي تتضمن خيوطًا متعددة تحتاج إلى التزامن سيجدون أن حل Rx أسهل في الكتابة ويتجنب جميع المشاكل الصعبة المتعلقة بالتزامن والتواصل بين الخيوط. في هذه المقالة، قمنا بتسليط الضوء على النقاط الرئيسية التالية:
- يشير النوع [Observable] إلى دفق من الأحداث (القيم) التي قد تكون (ولكن ليس بالضرورة) غير متزامنة ويمكن ملاحظتها؛
- يشير النوع [Subscriber] إلى مشترك في نوع [Observable]؛
- يشير النوع [Subscription] إلى اشتراك، أي الرابط بين [Subscriber] و [Observable]؛
- يدعم النوع [Observable] عوامل التشغيل [mergeWith، empty، subscribeOn، observeOn، ...] التي تنتج في الغالب كائنات قابلة للمراقبة. تُستخدم عوامل التشغيل هذه لتكوين الكائن القابل للمراقبة قبل تشغيله:
- ما الذي يجب ملاحظته؛
- الخيط الذي يعمل عليه العنصر القابل للمراقبة؛
- الخيط الذي يتلقى عليه المشترك البيانات من القابل للمراقبة؛
- هناك نوعان من القيم القابلة للمراقبة: [cold] و [hot]. يتم تنفيذ القيمة القابلة للمراقبة من النوع [cold] بالكامل لكل مشترك جديد. إذا أنتج كل تنفيذ نفس البيانات، يتلقى كل مشترك جديد نفس البيانات التي تلقاها المشترك السابق. تنتج القيمة القابلة للمراقبة من النوع [hot] البيانات بشكل مستمر بشكل عام. عندما يشترك مشترك ما، يتلقى البيانات التي تم إصدارها منذ وقت اشتراكه. ولا يتلقى أي بيانات قد تكون صدرت سابقًا. في مثالنا، المراقب بارد: يتم إعادة تنفيذه بالكامل لكل مشترك جديد.
الآن بعد أن رأينا مثالاً يوضح قيمة مكتبة Rx، سنستكشفها بمزيد من التفصيل.
تحتوي مكتبة Rx على العديد من الطرق التي تحتوي على معلمات عامة في توقيعاتها. سنستعرض هذه التوقيعات بإيجاز (القسم 3). معلمات هذه الطرق هي في الغالب واجهات وظيفية (Java 8)، أي واجهات تحتوي على طريقة واحدة فقط. لذلك يجب أن تكون المعلمات الفعلية مثيلات لهذه الواجهات. قبل Java 8، كان من الشائع تنفيذ واجهة باستخدام فئة مجهولة. مع Java 8، إذا كانت الواجهة واجهة وظيفية، فمن الأكثر إيجازًا تنفيذها باستخدام دالة لامدا. لذلك سنقدم هذه الدوال (القسم 4). بمجرد الانتهاء من ذلك، سنقدم فئة [Stream] (القسم 5)، التي تسمح لك بمعالجة مجموعات Java باستخدام دوال لامدا. هذه الفئة مثيرة للاهتمام لأن فئة [Observable] في RxJava تستعير:
- طرقًا معينة؛
- نفس طريقة ربط الطرق معًا لمعالجة نفس العنصر القابل للمراقبة؛
سنقدم بعد ذلك الواجهات الوظيفية الخاصة بمكتبة RxJava (القسم 6). سنواصل مع العناصر الرئيسية لمكتبة Rx [Observable، Subscriber، Subscription، operators] (القسم 7). تحتوي فئة [Observable] على العشرات من المشغلات التي يتم تحميلها بشكل زائد عدة مرات. يؤدي هذا في البداية إلى تعقيد كبير لأن هذه المشغلات وأحمالها الزائدة تختلف أحيانًا في تفصيل واحد فقط، ومن الصعب، بدون خبرة، معرفة المشغل الذي يجب استخدامه. سنقدم عددًا محدودًا فقط من المشغلات، وسنتجاهل أحمالها الزائدة في معظم الأحيان.
سيتم تغطية القسم السابق بالكامل باستخدام مكتبة RxJava في تطبيقات وحدة التحكم البسيطة. بمجرد إتقان مكتبة RxJava، سنستخدمها في نوعين من التطبيقات الرسومية:
- في القسم 8، سنعيد النظر في مثال تطبيق Swing لاستكشافه بمزيد من التفصيل. سنستخدم بعد ذلك مكتبة RxSwing؛
- في القسم 9، سننشئ تطبيق Android باستخدام مكتبة RxAndroid؛
بمجرد الانتهاء من كل هذا، سيكون لدى القارئ الأدوات اللازمة ليقف على قدميه. من المحتمل أن يستغرق الأمر بعض الوقت قبل أن يتمكن من استخدام مكتبة Rx بشكل بديهي. لقد وجدت هذه المكتبة مثيرة للاهتمام بشكل خاص. ومع ذلك، وجدت أنها معقدة الفهم، وكان منحنى التعلم حادًا. آمل أن يقلل هذا المستند من منحنى التعلم هذا بالنسبة للقارئ. يبدو لي أن الأمر يستحق العناء.
















