Skip to content

10. 执行线程

10.1. Thread 类

当应用程序启动时,它会在一个称为线程的执行流中运行。在 .NET 中,线程的类模型是 System.Threading.Thread,其定义如下:

制造商

在下面的示例中,我们将仅使用构造函数 [1,3]。构造函数 [1] 接受一个具有签名 [2] 的方法作为参数,即该方法有一个类型为 object 的参数且不返回结果。构造函数 [3] 接受一个具有签名 [4] 的方法作为参数,即该方法没有参数且不返回结果。

属性

一些有用的属性:

  • Thread CurrentThread:静态属性,返回请求此属性的代码所在线程的引用
  • string Name:线程名称
  • bool IsAlive:指示线程是否正在运行。

方法

最常用的方法包括:

  • Start()、Start(object obj):启动线程的异步执行,可通过对象传递信息。
  • Abort()、Abort(object obj):强制终止线程
  • Join():执行 T2.Join 的线程 T1 将被阻塞,直到 T2 完成。该方法有变体,可在设定时间后结束等待。
  • Sleep(int n):静态方法——执行该方法的线程将暂停 n 毫秒。随后该线程将失去处理器控制权,处理器将分配给另一个线程。

让我们看一个简单的示例,演示主执行线程的存在,即类中 Main 函数所在的那个线程:


using System;
using System.Threading;
 
namespace Chap8 {
    class Program {
        static void Main(string[] args) {
             // init current thread
            Thread main = Thread.CurrentThread;
             // display
            Console.WriteLine("Thread courant : {0}", main.Name);
             // we change the name
            main.Name = "main";
             // check
            Console.WriteLine("Thread courant : {0}", main.Name);
 
             // infinite loop
            while (true) {
                 // display
                Console.WriteLine("{0} : {1:hh:mm:ss}", main.Name, DateTime.Now);
                 // temporary shutdown
                Thread.Sleep(1000);
             }//while         
        }
    }
}
  • 第 8 行:获取 [main] 方法所在线程的引用
  • 第 10-14 行:显示并修改其名称
  • 第17-22行:一个每秒显示一条消息的循环
  • 第 21 行:运行 [main] 方法的线程将被挂起 1 秒

屏幕显示结果如下:

1
2
3
4
5
6
7
8
Thread courant :
Thread courant : main
main : 04:19:00
main : 04:19:01
main : 04:19:02
main : 04:19:03
main : 04:19:04
^CAppuyez sur une touche pour continuer...
  • 第 1 行:当前线程没有名称
  • 第 2 行:它有一个名称
  • 第3-7行:每秒显示一次
  • 第8行:程序被Ctrl-C中断。

10.2. 创建执行线程

某些应用程序中,代码片段可能在不同的执行线程中“同时”运行。虽然我们常说线程“同时”运行,但这通常是一种误称。如果机器只有一个处理器(这种情况至今仍很常见),则线程共享该处理器:它们轮流在短时间内(几毫秒)访问处理器。这给人一种并行执行的错觉。 分配给一个线程的时间段取决于多种因素,包括其优先级——该优先级有默认值,但也可以通过编程方式设置。当一个线程拥有处理器时,它通常会正常使用整个分配的时间。然而,它也可以提前释放处理器:

  • 等待某个事件(Wait、Join
  • 通过将自身置于休眠状态一段时间(Sleep
  1. 线程 T 首先由上述创建器之一创建,例如:
Thread thread=new Thread(Start);

其中 Start 是一个具有以下两种签名之一的方法:

void Start();
void Start(object obj);

创建线程并不意味着该线程会立即开始运行。

  1. 线程 T 由 T.Start() 启动:此时,传递给 T 构造函数的 Start 方法将由线程 T 执行。执行 T.Start() 的程序不会等待任务 T 完成:它会立即继续执行下一条指令。这意味着两个任务正在并行运行。在许多情况下,它们需要能够相互通信,以跟踪其协作工作的进度。这就是线程同步的问题。
  2. 一旦启动,线程 T 便会自主运行。当其执行的 Start 方法完成工作后,线程 T 便会停止。
  3. 可以强制终止 T 线程:
    1. T.Abort() 请求 T 线程终止。
  4. 您也可以通过 T.Join() 等待其执行结束。这是一条阻塞指令:执行该指令的程序将被阻塞,直到任务 T 完成工作。这是一种同步手段。

让我们来看一下以下程序:


using System;
using System.Threading;
 
namespace Chap8 {
    class Program {
        public static void Main() {
             // init Current thread
            Thread main = Thread.CurrentThread;
             // name the Thread
            main.Name = "Main";
 
             // creation of execution threads
            Thread[] tâches = new Thread[5];
            for (int i = 0; i < tâches.Length; i++) {
                 // create thread i
                tâches[i] = new Thread(Affiche);
                 // set the thread name
                tâches[i].Name =  i.ToString();
                 // start execution of thread i
                tâches[i].Start();
            }
 
             // end of hand
            Console.WriteLine("Fin du thread {0} à {1:hh:mm:ss}",main.Name,DateTime.Now);
        }
 
        public static void Affiche() {
             // display start of execution
            Console.WriteLine("Début d'exécution de la méthode Affiche dans le Thread {0} : {1:hh:mm:ss}",Thread.CurrentThread.Name,DateTime.Now);
             // sleep for 1 s
            Thread.Sleep(1000);
             // display end of run
            Console.WriteLine("Fin d'exécution de la méthode Affiche dans le Thread {0} : {1:hh:mm:ss}", Thread.CurrentThread.Name, DateTime.Now);
        }
    }
}
  • 第 8-10 行:为执行 [Main] 方法的线程命名
  • 第13-21行:创建并执行5个线程。将线程引用存储在数组中以便后续检索。每个线程执行第27-35行的Poster代码。
  • 第 20 行:启动第 i 个线程。此操作是非阻塞的。第 i 个线程将与启动它的 [Main] 方法线程并行运行。
  • 第24行:执行[Main]方法的线程终止。
  • 第 27-35 行:[Display] 方法执行显示操作。它会显示正在执行该方法的线程名称,以及执行的开始和结束时间。
  • 第 31 行:任何正在执行 [Display] 方法的线程都会暂停 1 秒。随后,处理器将分配给另一个正在等待处理器的线程。暂停秒数结束后,该暂停的线程将成为处理器的候选者。当轮到它时,它将获得处理器。这取决于多种因素,包括其他正在等待处理器的线程的优先级。

结果如下:

Début d'exécution de la méthode Affiche dans le Thread 0 : 10:30:44
Début d'exécution de la méthode Affiche dans le Thread 1 : 10:30:44
Début d'exécution de la méthode Affiche dans le Thread 2 : 10:30:44
Début d'exécution de la méthode Affiche dans le Thread 3 : 10:30:44
Début d'exécution de la méthode Affiche dans le Thread 4 : 10:30:44
Fin du thread Main à 10:30:44
Fin d'exécution de la méthode Affiche dans le Thread 0 : 10:30:45
Fin d'exécution de la méthode Affiche dans le Thread 1 : 10:30:45
Fin d'exécution de la méthode Affiche dans le Thread 2 : 10:30:45
Fin d'exécution de la méthode Affiche dans le Thread 3 : 10:30:45
Fin d'exécution de la méthode Affiche dans le Thread 4 : 10:30:45

这些结果极具启发性:

  • 首先,我们可以看到线程的启动并不阻塞。Main 程序并行启动了 5 个线程的执行,并在它们完成之前就结束了自己的执行。该操作

                // on lance l'exécution du thread i
                tâches[i].Start();

会启动线程 tasks[i] 的执行,但一旦完成,执行会立即继续执行下一条指令,而不会等待该线程执行完毕。

  • 所有创建的线程都必须执行 Affiche 方法。执行顺序是不可预测的。尽管在示例中,执行顺序似乎遵循了请求的顺序,但无法据此得出普遍的 结论。操作系统在此处有 6 个线程和一个处理器。它将根据自身的规则将处理器分配给这 6 个线程。
  • 结果取决于 Sleep 方法。在示例中,线程 0 最先执行 Affiche 方法。 “开始执行”消息显示后,它执行了 Sleep 方法,该方法使其暂停 1 秒。随后它失去了处理器,处理器便可供其他线程使用。示例表明线程 1 将获得该处理器。线程 1 将遵循与其他线程相同的执行路径。当线程 0 的 1 秒睡眠时间结束时,其执行即可恢复。系统将处理器分配给它,它便能完成 Affiche 方法的执行。

让我们修改程序,使用以下指令结束 Main 函数:


             // end of hand
            Console.WriteLine("Fin du thread " + main.Name);
             // stop all threads
Environment.Exit(0);

运行新程序会得到以下结果:

1
2
3
4
5
6
Début d'exécution de la méthode Affiche dans le Thread 0 : 10:33:18
Début d'exécution de la méthode Affiche dans le Thread 1 : 10:33:18
Début d'exécution de la méthode Affiche dans le Thread 2 : 10:33:18
Début d'exécution de la méthode Affiche dans le Thread 3 : 10:33:18
Début d'exécution de la méthode Affiche dans le Thread 4 : 10:33:18
Fin du thread Main à 10:33:18
  • 第 1-5 行:由 Main 创建的线程开始执行,并暂停 1 秒
  • 第 6 行:[Main] 线程重新获得处理器控制权并执行指令:
        Environment.Exit(0);

该指令会停止所有线程,而不仅仅是 Main 线程。

如果 Main 线程希望等待其创建的线程执行完毕,可以使用 JoinThread


        public static void Main() {
...
             // we wait for all threads
            for (int i = 0; i < tâches.Length; i++) {
                 // wait for thread i to finish execution
                tâches[i].Join();
            }
             // end of hand
            Console.WriteLine("Fin du thread {0} à {1:hh:mm:ss}", main.Name, DateTime.Now);
}
  • 第 6 行:[Main] 线程等待每个线程完成。它首先阻塞等待第 1 个线程,然后等待第 2 个线程,依此类推……最后,当它退出第 2-5 行的循环时,它启动的全部 5 个线程均已完成。

结果如下:

Début d'exécution de la méthode Affiche dans le Thread 0 : 10:35:18
Début d'exécution de la méthode Affiche dans le Thread 1 : 10:35:18
Début d'exécution de la méthode Affiche dans le Thread 2 : 10:35:18
Début d'exécution de la méthode Affiche dans le Thread 3 : 10:35:18
Début d'exécution de la méthode Affiche dans le Thread 4 : 10:35:18
Fin d'exécution de la méthode Affiche dans le Thread 0 : 10:35:19
Fin d'exécution de la méthode Affiche dans le Thread 1 : 10:35:19
Fin d'exécution de la méthode Affiche dans le Thread 2 : 10:35:19
Fin d'exécution de la méthode Affiche dans le Thread 3 : 10:35:19
Fin d'exécution de la méthode Affiche dans le Thread 4 : 10:35:19
Fin du thread Main à 10:35:19
  • 第 11 行:[Main] 线程在其启动的线程结束后终止。

10.3. 线程的优势

既然我们已经强调了默认线程(即执行 Main 方法的那个)的存在,并且知道如何创建新的线程,那么让我们来看看线程对我们意味着什么,以及为什么我们要在此介绍它们。有一种应用程序非常适合使用线程,那就是互联网上的客户端-服务器应用程序。 我们将在下一章中介绍它们。在互联网的客户端-服务器应用程序中,位于机器 S1 上的服务器会响应来自远程机器 C1、C2、...、Cn 上客户端的请求。

每天,我们都在使用符合此图示的互联网应用程序:Web服务、电子邮件、论坛浏览、文件传输……在上图中,S1服务器必须同时为Ci客户端提供服务。如果以FTP(文件传输协议)服务器向其客户端传输文件为例,我们知道文件传输有时可能需要几分钟。 当然,客户端不可能独占服务器这么长时间。通常的做法是,服务器创建与客户端数量相等的执行线程。每个线程负责处理特定的客户端。由于处理器在机器的所有活动线程之间循环共享,服务器与每个客户端的交互时间都很短,从而确保了同时服务。

实际上,服务器会使用一个线程池,其中包含数量有限的线程,例如50个。第51个客户端则会被要求等待。

10.4. 线程之间的信息交换

在之前的示例中,线程的初始化方式如下:

Thread t=new Thread(Run);

其中 Run 是一个具有以下签名的方法:

void Run();

也可以使用以下签名:

void Run(object obj);

这允许将信息传递给已启动的线程。例如,

t.Start(obj1);

将启动 t,根据设计,t 随后将执行与其关联的 Run 方法,并向其传递有效参数 obj1。以下是一个示例:


using System;
using System.Threading;

namespace Chap8 {
    class Program4 {
        public static void Main() {
             // init Current thread
            Thread main = Thread.CurrentThread;
             // name the Thread
            main.Name = "Main";
 
             // creation of execution threads
            Thread[] tâches = new Thread[5];
            Data[] data = new Data[5];
            for (int i = 0; i < tâches.Length; i++) {
                 // create thread i
                tâches[i] = new Thread(Sleep);
                 // set the thread name
                tâches[i].Name = i.ToString();
                 // start execution of thread i
                tâches[i].Start(data[i] = new Data { Début = DateTime.Now, Durée = i+1 });
            }
             // we wait for all threads
            for (int i = 0; i < tâches.Length; i++) {
                 // wait for thread i to finish execution
                tâches[i].Join();
                 // result display
                Console.WriteLine("Thread {0} terminé : début {1:hh:mm:ss}, durée programmée {2} s, fin {3:hh:mm:ss}, durée effective {4}",
                    tâches[i].Name,data[i].Début,data[i].Durée,data[i].Fin,(data[i].Fin-data[i].Début));
            }        
             // end of hand
            Console.WriteLine("Fin du thread {0} à {1:hh:mm:ss}", main.Name, DateTime.Now);
        }
 
        public static void Sleep(object infos) {
             // parameter is retrieved
            Data data = (Data)infos;
             // sleep mode for Duration
            Thread.Sleep(data.Durée*1000);
             // end of execution
            data.Fin = DateTime.Now;
        }
    }
 
    internal class Data {
         // miscellaneous information
        public DateTime Début { get; set; }
        public int Durée { get; set; }
        public DateTime Fin { get; set; }
    }
}
  • 第 45-50 行:传递给线程的 [Data] 类型信息:
    • Start:线程执行开始时间——由启动线程设置
    • Duration:被启动线程执行的 Sleep 操作的持续时间(以秒为单位)——由启动线程设定
    • 结束:线程执行结束时间——由被启动的线程设定
  • 第 35-43 行:线程执行的 Sleep 方法的签名是 void Sleep(object obj)。实际参数 obj 的类型将是第 45 行定义的 [Data] 类型。
  • 第 15-22 行:创建 5 个线程
  • 第 17 行:每个线程都关联了第 35 行中的 Sleep 方法
  • 第 21 行:将一个 [Data] 类型的对象传递给启动线程的 Start 方法。在此对象中,我们记录了线程的执行开始时间以及它必须休眠的时长(以秒为单位)。该对象存储在第 14 行的表中。
  • 第24-30行:[Main]线程等待其启动的所有线程完成。
  • 第 28-29 行:[Main] 线程从第 i 个线程中获取 data[i] 对象并显示其内容。
  • 第 35-42 行:线程执行的 Sleep 方法
  • 第 37 行:获取 [Data] 类型参数
  • 第 39 行:使用 Duration 字段参数来设置 Sleep
  • 第 41 行:初始化参数的 End 字段

结果如下:

1
2
3
4
5
6
Thread 0 terminé : début 11:18:50, durée programmée 1 s, fin 11:18:51, durée effective 00:00:01.0156250
Thread 1 terminé : début 11:18:50, durée programmée 2 s, fin 11:18:52, durée effective 00:00:02
Thread 2 terminé : début 11:18:50, durée programmée 3 s, fin 11:18:53, durée effective 00:00:03
Thread 3 terminé : début 11:18:50, durée programmée 4 s, fin 11:18:54, durée effective 00:00:04
Thread 4 terminé : début 11:18:50, durée programmée 5 s, fin 11:18:55, durée effective 00:00:05
Fin du thread Main à 11:18:55

此示例表明两个线程可以交换信息:

  • 启动线程可以通过提供信息来控制被启动线程的执行
  • 被启动的线程可将结果返回给启动线程。

为了让被启动线程知道其等待的结果何时可用,必须在被启动线程完成时向其发出通知。在此,它使用 Join 方法等待被启动线程终止。还有其他方法可以实现相同的效果。我们稍后将探讨这些方法。

10.5. 对共享资源的竞争性访问

10.5.1. 未同步的并发访问

在讨论线程间信息交换的段落中,信息仅在两个线程之间且在非常特定的时刻进行交换。这是典型的参数传递。在其他情况下,信息由多个线程共享,这些线程可能希望同时读取或更新该信息。这引发了信息完整性的问题。假设共享信息是一个结构体 S,其中包含各种信息项 I1、I2、... In。

  • 一个 T1 线程开始更新结构体 S:它修改了字段 I1,但在完成对结构体 S 的全部更新之前被中断
  • 随后,T2线程获取处理器控制权,并读取结构体S以进行决策。此时它读取到的结构体处于不稳定状态:部分字段已更新,而其他字段尚未更新。

我们将这种情况称为对共享资源(此处指 S 结构)的访问,其管理往往相当棘手。让我们通过以下示例来说明可能出现的问题:

  • 一个应用程序将生成 n 个线程,其中 n 作为参数传递
  • 共享资源是一个计数器,每个生成的线程都必须将其递增
  • 在应用程序结束时,计数器的值会被显示出来。因此,我们需要确定 n 的值

程序代码如下:


using System;
using System.Threading;
 
namespace Chap8 {
    class Program {
 
         // class variables
         static int cptrThreads     = 0; // thread counter
 
         //hand
        public static void Main(string[] args) {
             // instructions for use
            const string syntaxe = "pg nbThreads";
            const int nbMaxThreads = 100;
 
             // verification no. of arguments
            if (args.Length != 1) {
                 // error
                Console.WriteLine(syntaxe);
                 // stop
                Environment.Exit(1);
            }
             // argument quality check
            int nbThreads = 0;
            bool erreur = false;
            try {
                nbThreads = int.Parse(args[0]);
                if (nbThreads < 1 || nbThreads > nbMaxThreads)
                    erreur = true;
            } catch {
                 // error
                erreur = true;
            }
             // mistake?
            if (erreur) {
                 // error
                Console.Error.WriteLine("Nombre de threads incorrect (entre 1 et 100)");
                 // end
                Environment.Exit(2);
            }
             // thread creation and generation
            Thread[] threads = new Thread[nbThreads];
            for (int i = 0; i < nbThreads; i++) {
                 // creation
                threads[i] = new Thread(Incrémente);
                 // naming
                threads[i].Name = "" + i;
                 // launch
                threads[i].Start();
            }//for
             // waiting for threads to finish
            for (int i = 0; i < nbThreads; i++) {
                threads[i].Join();
            }
             // counter display
            Console.WriteLine("Nombre de threads générés : " + cptrThreads);
        }
 
        public static void Incrémente() {
             // increases thread counter
             // meter reading
            int valeur = cptrThreads;
             // follow-up
            Console.WriteLine("A {0:hh:mm:ss}, le thread {1}  a lu la valeur du compteur : {2}", DateTime.Now, Thread.CurrentThread.Name, cptrThreads);
             // waiting
            Thread.Sleep(1000);
             // counter incrementation
            cptrThreads = valeur + 1;
             // follow-up
            Console.WriteLine("A {0:hh:mm:ss}, le thread {1}  a écrit la valeur du compteur : {2}", DateTime.Now, Thread.CurrentThread.Name, cptrThreads);
        }
    }
}

我们不再赘述线程创建部分,该内容已在前文中介绍过。接下来,让我们关注第 59 行中的 Increment 方法,每个线程都会调用该方法来递增第 8 行中的静态计数器 cptrThreads

  1. 第 62 行:读取计数器
  2. 第 66 行:线程暂停 1 秒。因此它将失去处理器
  3. 第 68 行:计数器被递增

步骤 2 仅用于强制线程失去处理器控制权。处理器将交给另一个线程。实际上,无法保证线程在读取计数器值与将其值加 1 之间不会被中断。 即使编写 cptrThreads++ 来营造单条指令的假象,在读取计数器值与写入加 1 后的值之间,仍存在失去处理器控制权的风险。事实上,高级操作 cptrThreads++ 在处理器层面上将由多条基础指令组成。因此,步骤 2 中的 1 秒睡眠仅是为了系统化地规避这一风险。

使用 5 个线程获得的结果如下:

A 12:00:56, le thread 3  a lu la valeur du compteur : 0
A 12:00:56, le thread 2  a lu la valeur du compteur : 0
A 12:00:56, le thread 1  a lu la valeur du compteur : 0
A 12:00:56, le thread 0  a lu la valeur du compteur : 0
A 12:00:56, le thread 4  a lu la valeur du compteur : 0
A 12:00:57, le thread 3  a écrit la valeur du compteur : 1
A 12:00:57, le thread 2  a écrit la valeur du compteur : 1
A 12:00:57, le thread 1  a écrit la valeur du compteur : 1
A 12:00:57, le thread 0  a écrit la valeur du compteur : 1
A 12:00:57, le thread 4  a écrit la valeur du compteur : 1
Nombre de threads générés : 1

阅读这些结果,很容易看出发生了什么:

  • 第 1 行:第一个线程读取计数器。它发现值为 0。它暂停 1 秒并失去处理器控制权
  • 第 2 行:第二个线程接管处理器,也读取计数器值。由于前一个线程尚未将其递增,值仍为 0。它同样暂停 1 秒并失去处理器控制权。
  • 第1-5行:在1秒内,所有5个线程都有机会运行并读取到值为0。
  • 第6-10行:当它们依次唤醒时,会将读取到的0进行递增,并将1写入计数器,这一点在第11行的主程序(Main)中得到了验证。

问题出在哪里?第二个线程读取了错误的值,因为第一个线程在完成更新窗口中计数器的任务之前就被中断了。这引出了程序中关键资源和关键区段的概念:

  • 关键资源是指一次只能由一个线程持有的资源。在此,关键资源即为计数器。
  • 程序中的临界区是指线程执行流中的一段指令序列,在此期间线程会访问某个关键资源。必须确保在此临界区期间,只有该线程能够访问该资源。

在我们的示例中,临界区是指读取计数器值与写入新值之间的代码段:


            // lecture compteur
            int valeur = cptrThreads;
            // attente
            Thread.Sleep(1000);
            // incrémentation compteur
cptrThreads = valeur + 1;

要执行这段代码,必须确保只有一个线程在运行。该线程可能会被中断,但在中断期间,其他线程绝不能执行相同的代码。 .NET 平台提供了多种工具来确保关键代码段的单线程访问。让我们来看看其中的一些。

10.5.2. lock 语句

lock 子句用于定义临界区,如下所示:

lock(obj){section critique}

obj 必须是所有运行该临界区线程均可访问的对象引用。该确保每次仅有一个线程执行临界区。前面的示例重写如下:


using System;
using System.Threading;
 
namespace Chap8 {
    class Program2 {
 
         // class variables
         static int cptrThreads     = 0; // thread counter
         static object synchro = new object(); // synchronization object
 
         //hand
        public static void Main(string[] args) {
    ...
             // waiting for threads to finish
            Thread.CurrentThread.Name = "Main";
            for (int i = nbThreads - 1; i >= 0; i--) {
                Console.WriteLine("A {0:hh:mm:ss}, le thread {1} attend la fin du thread {2}", DateTime.Now, Thread.CurrentThread.Name, threads[i].Name);
                threads[i].Join();
                Console.WriteLine("A {0:hh:mm:ss}, le thread {1} a été prévenu de la fin du thread {2}", DateTime.Now, Thread.CurrentThread.Name, threads[i].Name);
            }
             // counter display
            Console.WriteLine("Nombre de threads générés : " + cptrThreads);
        }
 
        public static void Incrémente() {
             // increases thread counter
             // exclusive access to the meter is required
            Console.WriteLine("A {0:hh:mm:ss}, le thread {1}  attend l'autorisation d'entrer dans la section critique", DateTime.Now, Thread.CurrentThread.Name);
            lock (synchro) {
                 // meter reading
                int valeur = cptrThreads;
                 // follow-up
                Console.WriteLine("A {0:hh:mm:ss}, le thread {1}  a lu la valeur du compteur : {2}", DateTime.Now, Thread.CurrentThread.Name, cptrThreads);
                 // waiting
                Thread.Sleep(1000);
                 // counter incrementation
                cptrThreads = valeur + 1;
                 // follow-up
                Console.WriteLine("A {0:hh:mm:ss}, le thread {1}  a écrit la valeur du compteur : {2}", DateTime.Now, Thread.CurrentThread.Name, cptrThreads);
            }
            Console.WriteLine("A {0:hh:mm:ss}, le thread {1} a quitté la section critique", DateTime.Now, Thread.CurrentThread.Name);
        }
    }
}
  • 第 9 行:synchro 是用于同步所有线程的对象。
  • 第 16-23 行:[Main] 方法按线程创建的逆序等待线程。
  • 第 29-40 行:Increment 方法的临界区已被 lock 锁定。

使用 3 个线程获得的结果如下:

A 09:37:09, le thread 0 attend l'autorisation d'entrer dans la section critique
A 09:37:09, le thread 0 a lu la valeur du compteur : 0
A 09:37:09, le thread 1 attend l'autorisation d'entrer dans la section critique
A 09:37:09, le thread 2 attend l'autorisation d'entrer dans la section critique
A 09:37:09, le thread Main attend la fin du thread 2
A 09:37:10, le thread 0 a écrit la valeur du compteur : 1
A 09:37:10, le thread 1 a lu la valeur du compteur : 1
A 09:37:10, le thread 0 a quitté la section critique
A 09:37:11, le thread 1 a écrit la valeur du compteur : 2
A 09:37:11, le thread 1 a quitté la section critique
A 09:37:11, le thread 2 a lu la valeur du compteur : 2
A 09:37:12, le thread 2 a écrit la valeur du compteur : 3
A 09:37:12, le thread 2 a quitté la section critique
A 09:37:12, le thread Main a été prévenu de la fin du thread 2
A 09:37:12, le thread Main attend la fin du thread 1
A 09:37:12, le thread Main a été prévenu de la fin du thread 1
A 09:37:12, le thread Main attend la fin du thread 0
A 09:37:12, le thread Main a été prévenu de la fin du thread 0
Nombre de threads générés : 3
  • 线程 0 是第一个进入临界区的:第 1、2、6、8 行
  • 另外两个线程将被阻塞,直到线程 0 退出临界区:第 3 行和第 4 行
  • 线程 1 接下来执行:第 7、9、10 行
  • 线程 2 随后进入:第 11、12、13 行
  • 第 14 行:正在等待线程 2 完成的 Main 线程收到警告
  • 第 15 行:Main 线程现在正在等待线程 1 完成。该线程已经完成。Main 线程立即收到通知,第 16 行。
  • 第 17-18 行:线程 0 也经历了同样的过程
  • 第 19 行:线程数量正确

10.5.3. Mutex 类

System.Threading.Mutex 类也可用于划分临界区。它在可见性方面与 lock 不同:

  • lock 语句用于同步同一应用程序中的线程
  • Mutex 类允许您同步来自不同应用程序的线程。

我们将使用以下构造函数和方法:

public Mutex()
创建一个 Mutex 对象 M
public bool WaitOne()
执行 M.WaitOne() 的 T1 线程请求同步对象 M 的所有权。如果互斥锁 M 尚未被任何线程持有(如初始状态),则会“授予”给请求它的 T1 线程。 如果稍后 T2 线程执行相同的操作,它将被阻塞。这是因为互斥锁只能属于一个线程。当 T1 线程释放其持有的互斥锁 M 时,互斥锁将被解锁。因此,多个线程在等待互斥锁 M 时可能会被阻塞。
public void ReleaseMutex()
执行 M.ReleaseMutex() 的 T1 线程将放弃对互斥锁 M 的所有权。当 T1 线程失去处理器时,系统可以将其分配给正在等待互斥锁 M 的某个线程。只有一个线程会依次获得它,而其他等待 M 的线程将保持阻塞状态

互斥锁 M 管理对共享资源 R 的访问。线程通过 M.WaitOne() 请求资源 R,并通过 M.ReleaseMutex() 释放它。必须由单个线程在特定时间段内执行的代码段即为共享资源。关键区的执行可通过以下方式进行同步:

M.WaitOne();
// le thread est seul à entrer ici
// section critique
....
M.ReleaseMutex();

其中 M 是一个 Mutex 对象。请务必释放不再需要的 Mutex,以便其他线程能够进入临界区;否则,等待该 Mutex 的线程将永远无法获得处理器访问权限。

如果我们将刚才所学的内容应用到前面的示例中,我们的应用程序将变为如下所示:


using System;
using System.Threading;
 
namespace Chap8 {
    class Program3 {
 
         // class variables
         static int cptrThreads     = 0; // thread counter
         static Mutex synchro = new Mutex(); // synchronization object
 
         //hand
        public static void Main(string[] args) {
    ...
        }
 
        public static void Incrémente() {
....
            synchro.WaitOne();
            try {
...
            } finally {
...
                synchro.ReleaseMutex();
            }
        }
    }
}
  • 第 9 行:线程同步对象现在是一个互斥锁
  • 第 18 行:临界区开始——仅需一个线程进入。我们阻塞直到互斥锁Mutex)释放。
  • 第 33 行:由于 Mutex 必须始终被释放(无论是否发生异常),我们使用 try/finally 块管理临界区,并在 finally 块中释放 Mutex
  • 第 23 行:一旦通过了临界区,Mutex 即被释放。

结果与之前相同。

10.5.4. AutoResetEvent 类

AutoResetEvent 对象是一种屏障,它像前文提到的 lockMutex 这两种工具一样,每次只允许一个线程通过。我们按以下方式创建一个 AutoResetEvent

AutoResetEvent barrière=new AutoresetEvent(bool état);

布尔值状态(état)表示屏障是关闭(false)还是打开(true)。希望通过屏障的线程将按以下方式进行操作:

barrière.WaitOne();
  • 如果屏障处于打开状态,线程将通过,且屏障会在其通过后关闭。如果有多个线程在等待,我们可以确保只有一个线程会通过。
  • 如果屏障处于关闭状态,该线程将被阻塞。另一个线程将在适当的时候将其打开。这个时间完全取决于所处理的问题。屏障将由以下操作打开:
barrière.Set(); 

可能出现线程需要关闭屏障的情况。可通过以下方式实现:

barrière.Reset(); 

如果在前面的示例中,我们将 Mutex 对象替换为 AutoResetEvent 类型的对象,代码将变为:


using System;
using System.Threading;
 
namespace Chap8 {
    class Program4 {
 
         // class variables
         static int cptrThreads     = 0; // thread counter
         static EventWaitHandle synchro = new AutoResetEvent(false); // synchronization object
 
         //hand
        public static void Main(string[] args) {
....
             // we open the critical section barrier
            Console.WriteLine("A {0:hh:mm:ss}, le thread {1} ouvre la barrière de la section critique", DateTime.Now, Thread.CurrentThread.Name);
            synchro.Set();
             // waiting for threads to finish
...
             // counter display
            Console.WriteLine("Nombre de threads générés : " + cptrThreads);
        }
 
        public static void Incrémente() {
             // increases thread counter
             // exclusive access to the meter is required
...
            synchro.WaitOne();
            try {
...
            } finally {
                 // release the resource
...
                synchro.Set();
            }
        }
    }
}
  • 第 9 行:创建了一个处于关闭状态的屏障。它将在 Main 的第 16 行被打开。
  • 第27行:负责递增线程计数器的线程请求进入临界区的授权。各个线程将在关闭的屏障前积压。当Main打开屏障时,其中一个等待的线程将通过。
  • 第33行:当该线程完成工作后,它会重新打开屏障,允许另一个线程进入。

结果与前面的类似。

10.5.5. Interlocked 类

Interlocked 类使得操作组能够原子化。在原子操作组内,要么由运行该组的线程执行所有操作,要么一个都不执行。 不会出现部分操作已执行而部分未执行的状态。MutexAutoResetEvent 等同步对象均旨在实现一组操作的原子性,其实现方式是阻塞线程。Interlocked 类允许您在执行简单但频繁的操作时避免线程阻塞。Interlocked 类提供了以下静态方法:

Image

Incrementally 方法具有以下签名:

public static int Increment(ref int location);

该方法用于增加租金。该操作保证是原子性的

我们的线程计数程序可以如下所示:


using System;
using System.Threading;
 
namespace Chap8 {
    class Program5 {
 
         // class variables
         static int cptrThreads     = 0; // thread counter
 
         //hand
        public static void Main(string[] args) {
...
        }
 
        public static void Incrémente() {
             // increments the thread counter
            Interlocked.Increment(ref cptrThreads);
        }
    }
}
  • 第 17 行:线程计数器被原子地递增。

10.6. 对多个共享资源的竞争性访问

10.6.1. 一个示例

在之前的示例中,不同的线程共享一个资源。如果存在多个资源且它们相互依赖,情况就会变得更加复杂。这可能会导致互锁情况。这种情况也被称为死锁,即两个线程相互等待。考虑以下按时间顺序依次发生的操作:

  • 线程 T1 获取互斥锁 M1 的控制权以访问共享资源 R1
  • 线程 T2 获取互斥锁 M2 的控制权以访问共享资源 R2
  • 线程 T1 请求互斥锁 M2,但被阻塞
  • 线程 T2 请求互斥锁 M1,被阻塞。

在此情况下,线程 T1 和 T2 处于相互等待状态。这种情况发生在线程需要两个共享资源时:资源 R1 由互斥锁 M1 控制,资源 R2 由互斥锁 M2 控制。一种可能的解决方案是同时请求这两个资源,使用单个互斥锁 M。但这并非总是可行,例如,如果涉及耗时且昂贵的资源调配。 另一种解决方案是:如果某个线程已持有 M1 但无法获得 M2,则释放 M1 以避免死锁。

  1. 我们有一个数组,其中一些线程向其中写入数据(写入者),而另一些线程则从中读取数据(读取者)。
  2. 写入者是平等但互斥的:一次只有一个写入者可以向表中输入数据。
  3. 读取者是平等但互斥的:一次只有一个读取者可以读取表中存储的数据。
  4. 只有当写入者将数据存入表中后,读取者才能读取表中的数据;只有当表中的数据被读取者读取后,写入者才能将新数据存入表中。

可以区分两种共享资源:

  • 写字板:每次仅有一名写入者可以访问它。
  • 只读显示板:每次仅有一个读取者可以访问它。

以及这些资源的使用顺序:

  • 读取者必须始终在写入者之后。
  • 写入者必须始终在读取者之后,首次使用除外。

可以通过两个类型为 AutoResetEvent 的屏障来控制对这两项资源的访问:

  • barrier peutEcrire 将控制写入者对白板的访问。
  • barrier peutLire 将控制读取者对板区的访问。
  • barrier peutEcrire 初始创建时处于打开状态,允许第一个写入者通过,并阻塞所有其他写入者。
  • barrier peutLire 创建时初始状态为关闭,阻挡所有读取者。
  • 当写入者完成工作后,他会打开“peutLire”门,让读取者进入。
  • 当读者完成工作后,他会打开“peutEcrire”门,让写入者进入。

演示这种事件驱动同步的程序如下:


using System;
using System.Threading;
 
namespace Chap8 {
    class Program {
         // use of reader and writer threads
         // illustrates the use of synchronization events
 
 
         // class variables
         static int[] data = new int[3    ]; // resource shared between reader and writer threads
         static Random objRandom = new Random(DateTime.Now.Second    ); // a random number generator
         static AutoResetEvent peutLir    e; // indicates that the contents of data can be read
         static AutoResetEvent peutEcrir    e; // indicates that you can write the contents of data
 
         //hand
        public static void Main(string[] args) {
 
             // number of threads to generate
            const int nbThreads = 2;
 
             // flag initialization
             peutLire = new AutoResetEvent(f    als e); // cannot be read yet
             peutEcrire = new AutoResetEvent(    tru e); // we can already write
 
             // creation of reader threads
            Thread[] lecteurs = new Thread[nbThreads];
            for (int i = 0; i < nbThreads; i++) {
                 // creation
                lecteurs[i] = new Thread(Lire);
                lecteurs[i].Name = "L" + i.ToString();
                 // launch
                lecteurs[i].Start();
            }
 
             // creating writer threads
            Thread[] écrivains = new Thread[nbThreads];
            for (int i = 0; i < nbThreads; i++) {
                 // creation
                écrivains[i] = new Thread(Ecrire);
                écrivains[i].Name = "E" + i.ToString();
                 // launch
                écrivains[i].Start();
            }
 
             //end of hand
            Console.WriteLine("Fin de Main...");
        }
 
         // read the contents of the table
        public static void Lire() {
...
        }
 
         // write in the table
        public static void Ecrire() {
....
        }
    }
}
  • 第 11 行:表数据是读取线程和写入线程之间共享的资源。它供读取线程读取,也供写入线程写入。
  • 第 13 行:对象 peutLire 用于通知读取线程它们可以读取数组数据。该对象由填充表数据的写入线程设置为 true。它在第 23 行初始化为 false。写入线程必须先填充数组,然后才能将事件 peutLire 传递给 real
  • 第 14 行:peutEcrire 对象用于通知写入线程可以向数据写入。当读取线程已使用完整个数组数据后,会将其设置为 true。该对象在第 24 行初始化为 true。此时表数据可自由写入。
  • 第27-34行:创建并启动读取线程
  • 第 37-44 行:创建并启动写入线程

由读取线程执行的 Read 方法如下:


public static void Lire() {
             // follow-up
            Console.WriteLine("Méthode [Lire] démarrée par le thread n° {0}", Thread.CurrentThread.Name);
             // we have to wait for reading authorization
            peutLire.WaitOne();
             // table reading
            for (int i = 0; i < data.Length; i++) {
                 //wait 1 s
                Thread.Sleep(1000);
                 // display
                Console.WriteLine("{0:hh:mm:ss} : Le lecteur {1} a lu le nombre {2}", DateTime.Now, Thread.CurrentThread.Name, data[i]);
            }
             // we can write
            peutEcrire.Set();
             // follow-up
            Console.WriteLine("Méthode [Lire] terminée par le thread n° {0}", Thread.CurrentThread.Name);
        }
  • 第 5 行:我们等待一个写入线程发出数组已填满的信号。收到此信号后,等待该信号的读取线程中仅有一个能通过。
  • 第 7-12 行:执行表操作,中间插入 Sleep 调用以强制线程释放处理器。
  • 第 14 行:通知写入线程数组已被读取,可以重新填充。

写入线程执行的 Write 方法如下:


public static void Ecrire() {
             // follow-up
            Console.WriteLine("Méthode [Ecrire] démarrée par le thread n° {0}", Thread.CurrentThread.Name);
             // we have to wait for write authorization
            peutEcrire.WaitOne();
             // writing table
            for (int i = 0; i < data.Length; i++) {
                 //wait 1 s
                Thread.Sleep(1000);
                 // display
                data[i] = objRandom.Next(0, 1000);
                Console.WriteLine("{0:hh:mm:ss} : L'écrivain {1} a écrit le nombre {2}", DateTime.Now, Thread.CurrentThread.Name, data[i]);
            }
            // on peut lire
            peutLire.Set();
             // follow-up
            Console.WriteLine("Méthode [Ecrire] terminée par le thread n° {0}", Thread.CurrentThread.Name);
        }
  • 第 5 行:我们等待一个读取线程发出信号,表明数组已被读取。收到此信号后,等待该信号的写入线程中仅有一个可以通过。
  • 第 7-13 行:执行表操作,中间插入 Sleep 调用以强制线程释放处理器。
  • 第 15 行:通知读取线程数组已填充完毕,可以再次读取。

执行结果如下:

Méthode [Lire] démarrée par le thread n° L0
Méthode [Lire] démarrée par le thread n° L1
Méthode [Ecrire] démarrée par le thread n° E0
Méthode [Ecrire] démarrée par le thread n° E1
Fin de Main...
02:29:18 : L'écrivain E0 a écrit le nombre 607
02:29:19 : L'écrivain E0 a écrit le nombre 805
02:29:20 : L'écrivain E0 a écrit le nombre 650
Méthode [Ecrire] terminée par le thread n° E0
02:29:21 : Le lecteur L0 a lu le nombre 607
02:29:22 : Le lecteur L0 a lu le nombre 805
02:29:23 : Le lecteur L0 a lu le nombre 650
Méthode [Lire] terminée par le thread n° L0
02:29:24 : L'écrivain E1 a écrit le nombre 186
02:29:25 : L'écrivain E1 a écrit le nombre 881
02:29:26 : L'écrivain E1 a écrit le nombre 415
Méthode [Ecrire] terminée par le thread n° E1
02:29:27 : Le lecteur L1 a lu le nombre 186
02:29:28 : Le lecteur L1 a lu le nombre 881
02:29:29 : Le lecteur L1 a lu le nombre 415
Méthode [Lire] terminée par le thread n° L1

以下几点值得注意:

  • 虽然在关键区段 Read 中会失去处理器,但每次只有 1 个驱动器
  • 每次仅有一个写入者,尽管他在 Write 检查段中会失去处理器
  • 读取器仅在表中存在可读取数据时才进行读取
  • 写入者只有在表中的数据被完全读取后才会进行写入

10.6.2. Monitor 类

在上一个示例中:

  • 有两个共享资源需要管理
  • 对于给定的资源,各个线程是平等的。

当写入线程在 peutEcrire.WaitOne 上被阻塞时,其中一个(任意一个)会被 peutEcrire.Set 操作解锁。如果前一个操作涉及为某个特定的写入者打开通道,情况就会变得更加复杂。

这可以类比为一家设有服务窗口的公共机构,每个窗口都负责特定业务。顾客到达后,会从X号窗口的取号机取号,然后就座。每张号牌都有编号,广播会按编号呼叫顾客。等待期间,顾客可以自由活动,可以阅读或打盹。 每次,广播都会宣布“Y号请前往X号窗口”,将他唤醒。如果是自己,顾客便起身前往X号窗口;否则,他继续做自己的事。

我们也可以用类似的方式来处理。以作家为例:

多名作家在同一个窗口前排队
他们的线程被阻塞
通道释放后,下一个写入者的编号被调用
正在读取数组的线程通知写入者们数组已可用。该线程或另一个线程会设置写入者线程使其通过屏障。
每个写入线程检查自己的编号,只有编号被叫到的那个
才会前往窗口。其余写入者重新进入等待状态。
每个线程检查自己是否是被选中的那个。如果是,则通过屏障;如果不是,则返回待机状态。

Monitor 类用于实现此场景。

Image

接下来我们将介绍一种标准构造(模式),该模式由本文开头提及的《C# 3.0》一书“多线程”章节提出,能够解决带有进入条件的屏障问题。

  • 首先,共享某个资源(计数器等)的线程通过一个我们将称之为“令牌”的对象来访问该资源。要打开通往计数器的门,你需要持有令牌才能开启,而令牌只有一个。因此,线程之间必须相互传递该令牌。
object jeton=new object();
  • 要访问计数器,线程首先需要请求:
Monitor.Enter(jeton);

如果令牌可用,则将其分配给执行上一次操作的线程;否则,该线程将因等待令牌而被挂起。

  • 如果对计数器的访问是无序的,即进入的线程顺序无关紧要,那么上述操作就足够了。持有令牌的线程直接访问计数器。如果访问是有序的,持有令牌的线程需检查自身是否满足访问计数器的条件:
while (! jeNeSuisPasCeluiQuiEstAttendu) {Monitor.Wait(jeton);}

如果该线程并非计数器所期待的那个,它将通过归还令牌来放弃当前轮次。该线程进入阻塞状态。一旦令牌再次可用,它就会被唤醒。随后,它将再次检查是否满足前往计数器的条件。释放令牌的操作 Monitor.Wait(token) 仅在该线程持有令牌时才能执行。否则,将抛出异常。

  • 检查进入计数器条件通过的线程将前往计数器:
  1. // 计数器处理
  2. ....

在离开计数器之前,线程必须返回其令牌,否则等待该令牌的线程将无限期阻塞。有两种不同的情况:

  • 第一种情况是,持有令牌的线程也是向等待该令牌的线程发出释放信号的线程。它将按以下方式操作:
1
2
3
4
5
6
7
8
// travail au guichet
....
// modification condition d'accès au guichet
...
// réveil des threads en attente du jeton
Monitor.PulseAll(jeton);
// libération du jeton
Monitor.Exit(jeton);

第 6 行唤醒了正在等待令牌的线程。这意味着它们有资格接收令牌,但并不意味着它们会立即收到令牌。 第 8 行,令牌被释放。所有符合条件的线程将依次获得令牌,具体顺序不可预测。这将使它们有机会再次检查是否满足访问条件。释放令牌的线程已在第 4 行修改了该条件,以允许新线程进入。第一个检查该条件并满足的线程将保留令牌,并依次进入计数器。

  • 第二种情况是:持有令牌的线程并非向等待令牌的线程发出“令牌已释放”信号的那个线程。但它必须释放令牌,因为负责发送此信号的线程必须是令牌持有者。它将通过以下操作实现:
Monitor.Exit(jeton);

令牌现已可用,但等待它的线程(它们已执行了 Wait(token) 操作)并未收到通知。此任务交由另一个线程负责,该线程将在某个时刻执行类似于以下内容的代码:

1
2
3
4
5
6
7
8
// acquisition jeton
Monitor.Enter(jeton);
// modification condition d'accès au guichet
....
// réveil des threads en attente du jeton
Monitor.PulseAll(jeton);
// libération du jeton
Monitor.Exit(jeton);

最终,《C# 3.0》一书“多线程”章节中提出的标准实现如下:

  • define counter access token :
object jeton=new object();
  • 请求访问计数器:
lock(jeton){
    while (! jeNeSuisPasCeluiQuiEstAttendu) 
        Monitor.Wait(jeton);
}
// passage au guichet
...
lock(jeton){...} 

等同于

Monitor.Enter(jeton);
try{...} finally{Monitor.Exit(jeton);}

请注意,在此方案中,一旦通过屏障,令牌就会立即释放。随后另一个线程可以测试访问条件。因此,上述构造会允许所有验证访问条件的线程进入。如果这不是您想要的效果,您可以编写如下代码:

lock(jeton){
    while (! jeNeSuisPasCeluiQuiEstAttendu) 
        Monitor.Wait(jeton);
    // passage au guichet
    ...
}

其中令牌仅在通过柜台后才会被释放。

  • 修改计数器的访问条件并通知其他线程
lock(jeton){
    // modifier la condition d'accès au guichet
    ...
    // en avertir les threads en attente du jeton
    Monitor.PulseAll(jeton);
}

在上文中,访问条件只能由持有令牌的线程修改。你也可以这样写:

    // modifier la condition d'accès au guichet
    ...
    // en avertir les threads en attente du jeton
    Monitor.PulseAll(jeton);
    // libérer le jeton
    Monitor.Exit(jeton);

如果线程已经持有令牌。

有了这些信息,我们可以重写读写器应用程序,为读写器访问各自计数器设定顺序。代码如下:


using System;
using System.Threading;
 
namespace Chap8 {
    class Program2 {
         // use of reader and writer threads
         // illustrates the use of synchronization events
 
 
         // class variables
         static int[] data = new int[3            ]; // resource shared between reader and writer threads
         static Random objRandom = new Random(DateTime.Now.Second    ); // a random number generator
         static object peutLire = new object(        ); // indicates that the contents of data can be read
         static object peutEcrire = new object(    ); // indicates that you can write the contents of data
         static bool lectureAutorisée = fals    e; // to authorize the reading of the table
         static bool écritureAutorisée = fals    e; // to authorize writing in the table
         static string[] ordreLectur    e; // sets the order of readers
         static string[] ordreEcritur    e; // sets the order for writers
         static int lecteurSuivant =     0; // indicates the next drive number
         static int écrivainSuivant =     0; // indicates the number of the following writer
 
         //hand
        public static void Main(string[] args) {
 
             // number of threads to generate
            const int nbThreads = 5;
 
             // creation of reader threads
            Thread[] lecteurs = new Thread[nbThreads];
            for (int i = 0; i < nbThreads; i++) {
                 // creation
                lecteurs[i] = new Thread(Lire);
                lecteurs[i].Name = "L" + i.ToString();
                 // launch
                lecteurs[i].Start();
            }
 
             // create playback order
            ordreLecture = new string[nbThreads];
            for (int i = 0; i < nbThreads; i++) {
                ordreLecture[i] = lecteurs[nbThreads - i - 1].Name;
                Console.WriteLine("Le lecteur {0} est en position {1}", ordreLecture[i], i);
            }
 
             // creating writer threads
            Thread[] écrivains = new Thread[nbThreads];
            for (int i = 0; i < nbThreads; i++) {
                 // creation
                écrivains[i] = new Thread(Ecrire);
                écrivains[i].Name = "E" + i.ToString();
                 // launch
                écrivains[i].Start();
            }
 
             // creation of writing order
            ordreEcriture = new string[nbThreads];
            for (int i = 0; i < nbThreads; i++) {
                ordreEcriture[i] = écrivains[i].Name;
                Console.WriteLine("L'écrivain {0} est en position {1}", ordreEcriture[i], i);
            }
 
             // write authorization
            lock (peutEcrire) {
               écritureAutorisée = true;
                Monitor.Pulse(peutEcrire);
            }
 
 
             //end of hand
            Console.WriteLine("Fin de Main...");
        }
 
         // read the contents of the table
        public static void Lire() {
...
        }
 
         // write in the table
        public static void Ecrire() {
...
        }
    }
}

使用阅览台须遵守以下条件:

  • 第13行:令牌 peutLire
  • 第15行:布尔值 readingAuthorized
  • 第17行:读者排序表。读者按此表中的顺序前往阅览台,该表包含他们的姓名。
  • 第19行:lecteurSuivant 表示下一位获准前往服务台的读者的序号。

前往阅览台须满足以下条件:

  • 第14行:令牌 peutEcrire
  • 第16行:布尔值 writingAuthorized
  • 第18行:有序的写作者表。写作者将按照该表中包含其姓名的顺序前往写字台。
  • 第 20 行:writerNext 表示下一个获准进入计数器的写入者的编号。

代码的其他部分如下:

  • 第29-36行:创建并启动读取线程。由于未获读取授权(第15行),这些线程都将被阻塞。
  • 第 39-43 行:它们通过计数器的顺序将与创建顺序相反。
  • 第 46-53 行:创建并启动写入线程。由于不允许写入(第 16 行),这些线程都将被阻塞。
  • 第 56-60 行:它们通过计数器的顺序将与它们的创建顺序一致。
  • 第 64 行:写入操作已获授权
  • 第 65 行:写入线程收到警告,提示发生了某些变化。

Read 方法如下:


        public static void Lire() {
             // follow-up
            Console.WriteLine("Méthode [Lire] démarrée par le thread n° {0}", Thread.CurrentThread.Name);
             // we have to wait for reading authorization
            lock (peutLire) {
                while (!lectureAutorisée || ordreLecture[lecteurSuivant] != Thread.CurrentThread.Name) {
                    Monitor.Wait(peutLire);
                }
                 // table reading
                for (int i = 0; i < data.Length; i++) {
                     //wait 1 s
                    Thread.Sleep(1000);
                     // display
                    Console.WriteLine("{0:hh:mm:ss} : Le lecteur {1} a lu le nombre {2}", DateTime.Now, Thread.CurrentThread.Name, data[i]);
                }
                 // next reader
                lectureAutorisée = false;
                lecteurSuivant++;
                 // writers are warned that they can write
                lock (peutEcrire) {
                    écritureAutorisée = true;
                    Monitor.PulseAll(peutEcrire);
                }
 
                 // follow-up
                Console.WriteLine("Méthode [Lire] terminée par le thread n° {0}", Thread.CurrentThread.Name);
            }
}
  • 对服务台的所有访问均由第5至27行的控制。获取令牌的读者在其访问服务台期间会一直持有该令牌
  • 第6-8行:在第5行获取令牌的读者,若读取未获授权或轮不到其通过,则释放该令牌。
  • 第10-15行:计数器通行(表操作)
  • 第17-18行:线程修改阅读柜台的访问条件。请注意,它仍持有阅读令牌,且这些修改尚不能允许读者通过。
  • 第 20-23 行:线程更改写入桌的访问条件,并通知所有等待的写入者情况已发生变化。
  • 第27行:锁定结束,令牌peutLire被释放。此时读取线程可在第5行获取该令牌,但因布尔变量readingAuthorized为false,故无法通过访问条件。此外,所有等待peutLire的线程仍保持等待状态,因为PulseAll(peutLire)尚未执行。

Write方法如下:


        public static void Ecrire() {
             // follow-up
            Console.WriteLine("Méthode [Ecrire] démarrée par le thread n° {0}", Thread.CurrentThread.Name);
             // we have to wait for write authorization
            lock (peutEcrire) {
                while (!écritureAutorisée || ordreEcriture[écrivainSuivant] != Thread.CurrentThread.Name) {
                    Monitor.Wait(peutEcrire);
                }
                 // writing table
                for (int i = 0; i < data.Length; i++) {
                     //wait 1 s
                    Thread.Sleep(1000);
                     // display
                    data[i] = objRandom.Next(0, 1000);
                    Console.WriteLine("{0:hh:mm:ss} : L'écrivain {1} a écrit le nombre {2}", DateTime.Now, Thread.CurrentThread.Name, data[i]);
                }
                 // next writer
                écritureAutorisée = false;
                écrivainSuivant++;
                 // readers waiting for the peutLire token are woken up
                lock (peutLire) {
                    lectureAutorisée = true;
                    Monitor.PulseAll(peutLire);
                }
                 // follow-up
                Console.WriteLine("Méthode [Ecrire] terminée par le thread n° {0}", Thread.CurrentThread.Name);
            }
}
  • 对写字台的所有访问均由第5至27行的控制。获取令牌的写作者在其使用写字台期间将一直持有该令牌
  • 第6-8行:在第5行获取令牌的写入者,若未获授权写入或轮不到其通过,则释放该令牌。
  • 第10-16行:计数器操作(表操作)
  • 第18-19行:线程更改写入桌的访问条件。请注意,它仍持有写入令牌,且这些修改尚不能允许写入者通过。
  • 第21-24行:线程修改阅读桌的访问条件,并通知所有等待的读者情况已发生变化。
  • 第27行:锁定结束,令牌peutEcrire被释放。此时写入线程可在第5行获取该令牌,但因布尔writingAuthorized为false,故无法通过访问条件。此外,所有等待peutEcrire的线程仍保持等待状态,直至执行新的PulseAll(peutEcrire)操作。

执行示例如下:

Méthode [Lire] démarrée par le thread n° L0
Méthode [Lire] démarrée par le thread n° L2
Méthode [Lire] démarrée par le thread n° L1
Le lecteur L2 est en position 0
Le lecteur L1 est en position 1
Le lecteur L0 est en position 2
Méthode [Ecrire] démarrée par le thread n° E0
Méthode [Ecrire] démarrée par le thread n° E1
L'écrivain E0 est en position 0
L'écrivain E1 est en position 1
L'écrivain E2 est en position 2
Fin de Main...
Méthode [Ecrire] démarrée par le thread n° E2
12:09:05 : L'écrivain E0 a écrit le nombre 815
12:09:06 : L'écrivain E0 a écrit le nombre 990
12:09:07 : L'écrivain E0 a écrit le nombre 563
Méthode [Ecrire] terminée par le thread n° E0
12:09:08 : Le lecteur L2 a lu le nombre 815
12:09:09 : Le lecteur L2 a lu le nombre 990
12:09:10 : Le lecteur L2 a lu le nombre 563
Méthode [Lire] terminée par le thread n° L2
12:09:11 : L'écrivain E1 a écrit le nombre 411
12:09:12 : L'écrivain E1 a écrit le nombre 11
12:09:13 : L'écrivain E1 a écrit le nombre 54
Méthode [Ecrire] terminée par le thread n° E1
12:09:14 : Le lecteur L1 a lu le nombre 411
12:09:15 : Le lecteur L1 a lu le nombre 11
12:09:16 : Le lecteur L1 a lu le nombre 54
Méthode [Lire] terminée par le thread n° L1
12:09:17 : L'écrivain E2 a écrit le nombre 698
12:09:18 : L'écrivain E2 a écrit le nombre 448
12:09:19 : L'écrivain E2 a écrit le nombre 472
Méthode [Ecrire] terminée par le thread n° E2
12:09:20 : Le lecteur L0 a lu le nombre 698
12:09:21 : Le lecteur L0 a lu le nombre 448
12:09:22 : Le lecteur L0 a lu le nombre 472
Méthode [Lire] terminée par le thread n° L0

10.7. 线程池

迄今为止,为了管理:

  • 我们通过 Thread T = new Thread(...) 创建它们
  • 然后通过 T.Start() 执行

我们在“数据库”一章中看到,某些关系型数据库管理系统(RDBMS)支持建立已打开连接的连接池:

  • 在连接池启动时打开 n 个连接
  • 当线程请求连接时,系统会从连接池中分配一个已打开的连接
  • 当线程关闭连接时,该连接不会被关闭,而是返回给连接池

连接池的使用对代码是透明的。其优势在于性能的提升:建立连接的开销很大。在此情况下,10 个打开的连接即可处理数百个请求。

针对线程也存在类似的系统:

  • 在池启动时会创建min个线程。 min 的值可通过 ThreadPool.SetMinThreads(min1, min2) 进行设置线程池可用于执行异步阻塞或非阻塞任务。第一个参数 min1 设置阻塞线程的数量,第二个参数 min2 设置异步线程的数量。可通过 ThreadPool.GetMinThreads(out min1, out min2) 获取这两个变量的当前值
  • 如果该数量不足,线程池将创建其他线程来响应请求,直至达到最大线程数(max threads)的限制。 max 的值通过 ThreadPool.SetMaxThreads(max1, max2) 进行设置。这两个参数的含义与 SetMinThreads 中的相同。可通过 ThreadPool.GetMaxThreads(out max1, out max2) 获取这两个值的当前值当达到 max1 个线程时,阻塞任务的线程请求将被排队,等待池中释放的空闲线程。

线程池具有以下优势:

  • 与连接池类似,我们节省了线程创建时间:10 个线程即可处理数百个请求。
  • 我们保障了应用程序的安全:通过设置最大线程数,避免因请求过多而导致应用程序瘫痪。这些请求将被放入队列中。

要将任务分配给线程池中的某个线程,可使用以下两种方法之一:

  1. ThreadPool.QueueWorkItem(WaitCallBack)
  2. ThreadPool.QueueWorkItem(WaitCallBack, object)

其中 WaitCallBack 是任何具有签名 void WaitCallBack(object) 的方法。方法 1 要求线程执行 WaitCallBack 方法,但不传递参数。方法 2 执行相同操作,但向 WaitCallBack 传递一个 object 类型的参数。

以下程序演示了这些概念:


using System;
using System.Threading;
 
namespace Chap8 {
    class Program {
        public static void Main() {
             // init Current thread
            Thread main = Thread.CurrentThread;
            // name the Thread
            main.Name = "Main";
 
            // we use a thread pool
            int min1, min2;
            // set the minimum number of blocking threads
            ThreadPool.GetMinThreads(out min1, out min2);
            Console.WriteLine("Nombre minimum de tâches bloquantes dans le pool : {0}", min1);
            Console.WriteLine("Nombre minimum de tâches asynchrones dans le pool : {0}", min2);
            ThreadPool.SetMinThreads(3, min2);
            ThreadPool.GetMinThreads(out min1, out min2);
            Console.WriteLine("Nombre minimum de tâches bloquantes dans le pool après changement : {0}", min1);
            // set the maximum number of blocking threads
            int max1, max2;
            ThreadPool.GetMaxThreads(out max1, out max2);
            Console.WriteLine("Nombre maximum de tâches bloquantes dans le pool : {0}", max1);
            Console.WriteLine("Nombre maximum de tâches asynchrones dans le pool : {0}", max2);
            ThreadPool.SetMaxThreads(5, max2);
            ThreadPool.GetMaxThreads(out max1, out max2);
            Console.WriteLine("Nombre maximum de tâches bloquantes dans le pool après changement : {0}", max1);
            // 7 threads are executed
            for (int i = 0; i < 7; i++) {
                // start execution of thread i in a pool
                ThreadPool.QueueUserWorkItem(Sleep, new Data2 { Numéro = i.ToString(), Début = DateTime.Now, Durée = i + 10 });
            }
             // end of hand
            Console.Write("Tapez [entrée] pour terminer le thread {0} à {1:hh:mm:ss:FF}", main.Name, DateTime.Now);
             // waiting
            Console.ReadLine();
        }
 
        public static void Sleep(object infos) {
            // parameter is retrieved
            Data2 data = infos as Data2;
            Console.WriteLine("A {2:hh:mm:ss:FF}, le thread n° {0} va dormir pendant {1} seconde(s)", data.Numéro, data.Durée,DateTime.Now);
             // pool status
            int cpt1, cpt2;
            ThreadPool.GetAvailableThreads(out cpt1, out cpt2);
            Console.WriteLine("Nombre de threads pour tâches bloquantes disponibles dans le pool : {0}", cpt1);
            // sleep mode for Duration
            Thread.Sleep(data.Durée * 1000);
             // end of execution
            data.Fin = DateTime.Now;
            Console.WriteLine("A {3:hh:mm:ss:FF}, le thread n° {0} se termine. Il était programmé pour durer {1} seconde(s). Il a duré {2} seconde(s)", data.Numéro, data.Durée, data.Fin - data.Début,DateTime.Now);
        }
    }
 
    internal class Data2 {
         // miscellaneous information
        public string Numéro { get; set; }
        public DateTime Début { get; set; }
        public int Durée { get; set; }
        public DateTime Fin { get; set; }
    }
}
  • 第 15-17 行:查询并显示线程池中当前的最小线程数
  • 第 18 行:将阻塞任务的最小线程数更改为 2
  • 第 19-21 行:显示新的最小值
  • 第 22-28 行:采用相同方法将阻塞任务的最大线程数设置为 5
  • 第 30-33 行:在 5 个线程的线程池中执行 7 个任务。其中 5 个任务应各获得 1 个线程,前 2 个任务能快速获得(因为始终有 2 个线程可用),其余 3 个任务的等待时间为 0.5 秒。另有 2 个任务应等待线程空闲。
  • 第 32 行:任务通过传递第 56-62 行定义的 Data2 类型参数,调用第 40-54 行中的 Sleep 方法。
  • 第 40 行:任务调用的 Sleep 方法
  • 第 42 行:获取传递给 Sleep 的参数。
  • 第 43 行:任务在控制台输出自身标识
  • 第 45-47 行:显示当前可用的线程数。我们希望观察其变化情况。
  • 第 49 行:任务暂停几秒钟(阻塞任务)。
  • 第 52 行:当任务恢复运行时,显示其账户的相关信息。

结果如下。

对于线程池中的最小最大线程数:

1
2
3
4
5
6
Nombre minimum de tâches bloquantes dans le pool : 2
Nombre minimum de tâches asynchrones dans le pool : 2
Nombre minimum de tâches bloquantes dans le pool après changement : 3
Nombre maximum de tâches bloquantes dans le pool : 500
Nombre maximum de tâches asynchrones dans le pool : 1000
Nombre maximum de tâches bloquantes dans le pool après changement : 5

要运行 7 个线程:

A 03:07:37:04, le thread n° 0 va dormir pendant 10 seconde(s)
Nombre de threads pour tâches bloquantes disponibles dans le pool : 3
A 03:07:37:04, le thread n° 2 va dormir pendant 12 seconde(s)
Nombre de threads pour tâches bloquantes disponibles dans le pool : 2
A 03:07:37:04, le thread n° 1 va dormir pendant 11 seconde(s)
Nombre de threads pour tâches bloquantes disponibles dans le pool : 2
A 03:07:38:04, le thread n° 3 va dormir pendant 13 seconde(s)
Nombre de threads pour tâches bloquantes disponibles dans le pool : 1
A 03:07:38:54, le thread n° 4 va dormir pendant 14 seconde(s)
Nombre de threads pour tâches bloquantes disponibles dans le pool : 0
A 03:07:47:04, le thread n° 0 se termine. Il était programmé pour durer 10 seconde(s). Il a duré 00:00:10 seconde(s)
A 03:07:47:04, le thread n° 5 va dormir pendant 15 seconde(s)
Nombre de threads pour tâches bloquantes disponibles dans le pool : 0
A 03:07:48:04, le thread n° 1 se termine. Il était programmé pour durer 11 seconde(s). Il a duré 00:00:11 seconde(s)
A 03:07:48:04, le thread n° 6 va dormir pendant 16 seconde(s)
Nombre de threads pour tâches bloquantes disponibles dans le pool : 0
A 03:07:49:04, le thread n° 2 se termine. Il était programmé pour durer 12 seconde(s). Il a duré 00:00:12 seconde(s)
A 03:07:51:04, le thread n° 3 se termine. Il était programmé pour durer 13 seconde(s). Il a duré 00:00:14 seconde(s)
A 03:07:52:54, le thread n° 4 se termine. Il était programmé pour durer 14 seconde(s). Il a duré 00:00:15.5000000 seconde(s)
A 03:08:02:04, le thread n° 5 se termine. Il était programmé pour durer 15 seconde(s). Il a duré 00:00:25 seconde(s)
A 03:08:04:04, le thread n° 6 se termine. Il était programmé pour durer 16 seconde(s). Il a duré 00:00:27 seconde(s)
  • 第1-6行:前3个任务依次执行。它们立即找到1个可用线程(MinThreads=3),随后进入休眠状态。
  • 第7-9行:对于任务3和4,耗时稍长。因为它们各自都找不到空闲线程,因此必须创建一个。该机制最多支持5个线程(MaxThreads=5)。
  • 第 10 行:已无可用线程:任务 5 和 6 将不得不等待。
  • 第11-12行:任务0结束。任务5接管其线程。
  • 第13-14行:任务1结束。任务6获取其线程。
  • 第17-21行:任务依次完成。

10.8. BackgroundWorker 类

10.8.1. 示例 1

BackgroundWorker 类属于 [System.ComponentModel] 命名空间。它的使用方式与线程相同,但具有一些特殊功能,在某些情况下可能比 [Thread] 类更具优势:

  • 它会触发以下事件:
  • DoWork:有线程请求执行 BackgroundWorker
  • ProgressChangedBackgroundWorker 对象调用了 ReportProgress 方法。该事件用于返回完成百分比。
  • RunWorkerCompletedBackgroundWorker 对象已完成其工作。它可能是正常完成的,也可能是因取消或异常而终止的。

这些事件使得 BackgroundWorker 在图形用户界面中非常有用:耗时的任务将委托给 BackgroundWorker,它能够通过 ProgressChanged 事件报告进度,并通过 RunWorkerCompleted 事件报告任务结束。BackgroundWorker 需要完成的工作将由与 DoWork 关联的方法执行。

  • 可以请求取消该任务。在图形界面中,用户可以取消耗时较长的任务。
  • BackgroundWorker 对象属于一个池,并根据需要进行回收利用。需要 BackgroundWorker 的应用程序将从该池中获取它,池会提供一个现有的但未被使用的线程。通过这种方式回收利用线程,而不是每次都创建新线程,可以提高性能。

当计数器的访问未受控时,我们在前面的应用程序中使用此工具:


using System;
using System.Threading;
using System.ComponentModel;
 
namespace Chap8 {
    class Program2 {
        // use of reader and writer threads
        // illustrates the simultaneous use of shared resources and synchronization
 
         // class variables
        const int nbThreads = 2;                    // total number of threads
        static int nbLecteursTerminés = 0;        // number of terminated threads
        static int[] data = new int[5];            // shared array between reader and writer threads
        static object appli;                            // synchronizes access to number of completed threads
        static Random objRandom = new Random(DateTime.Now.Second);    // a random number generator
        static AutoResetEvent peutLire;        // indicates that the contents of the table can be read
        static AutoResetEvent peutEcrire;        // points out that we can write in the table
        static AutoResetEvent finLecteurs;    // signals the end of readers
 
         //hand
        public static void Main(string[] args) {
 
            // give the thread a name
            Thread.CurrentThread.Name = "Main";
 
             // flag initialization
             peutLire = new AutoResetEvent(fals        e); // cannot be read yet
             peutEcrire = new AutoResetEvent(tru    e); // we can already write
            finLecteurs = new AutoResetEvent(false);    // application not completed
 
             // synchronizes access to terminated thread counter
            appli = new object();                
 
             // creation of reader threads
            MyBackgroundWorker[] lecteurs = new MyBackgroundWorker[nbThreads];
            for (int i = 0; i < nbThreads; i++) {
                 // creation
                lecteurs[i] = new MyBackgroundWorker();
                lecteurs[i].Numéro = "L" + i;
                lecteurs[i].DoWork += Lire;
                lecteurs[i].RunWorkerCompleted += EndLecteur;
                 // launch
                lecteurs[i].RunWorkerAsync();
            }
 
            // creating writer threads
            MyBackgroundWorker[] écrivains = new MyBackgroundWorker[nbThreads];
            for (int i = 0; i < nbThreads; i++) {
                 // creation
                écrivains[i] = new MyBackgroundWorker();
                écrivains[i].Numéro = "E" + i;
                écrivains[i].DoWork += Ecrire;
                 // launch
                écrivains[i].RunWorkerAsync();
            }
 
            // wait for all threads to finish
            finLecteurs.WaitOne();
             //end of hand
            Console.WriteLine("Fin de Main...");
        }
 
        public static void EndLecteur(object sender, RunWorkerCompletedEventArgs infos) {
...
        }
 
        // read the contents of the table
        public static void Lire(object sender, DoWorkEventArgs infos) {
...
        }
 
        // write in the table
        public static void Ecrire(object sender, DoWorkEventArgs infos) {
...
        }
    }
 
     // thread
    internal class MyBackgroundWorker : BackgroundWorker {
         // miscellaneous information
        public string Numéro { get; set; }
    }
 
}

我们仅详细说明了更改内容:

  • 第 79-82 行将 Thread 类替换为 MyBackgroundWorkerBackgroundWorker 类的方法已被派生,以便为线程分配一个编号。我们也可以通过在第 43 和 54 行向 RunWorkerAsync 方法传递一个包含线程编号的对象来实现,
  • 第 58 行:Main 方法在所有读取线程完成任务后结束。为此,在第 12 行,计数器 nbReadersTerminated 用于统计已完成工作的读取线程数量。 该计数器由第63至65行的 EndLecteur 方法递增,该方法在每个读取线程终止时执行。正是该过程控制了第18行的 AutoResetEvent finLecteurs,该事件在第59行与 Hand 对象进行了同步
  • 第 16 行:由于可能有多个读取线程同时试图递增计数器 nbReadersTerminated,因此通过同步对象 app 对其提供独占访问。这种情况虽不太可能发生,但理论上存在这种可能性。
  • 第 35-44 行:创建读取线程
  • 第 38 行:创建 MyBackgroundWorker 类型的线程
  • 第 39 行:为其分配一个 No
  • 第 40 行:将其分配给 Read 方法执行
  • 第 41 行:线程结束时将执行 EndLecteur 方法
  • 第 43 行:启动线程
  • 第 47-55 行:创建写入线程
  • 第 50 行:创建 MyBackgroundWorker 类型的线程
  • 第 51 行:为其分配一个 No
  • 第 52 行:为其分配要执行的 Write 操作
  • 第 54 行:启动线程

ReadWrite 方法保持不变。EndLecteur 方法在每个读取线程结束时执行。其代码如下:


        public static void EndLecteur(object sender, RunWorkerCompletedEventArgs infos) {
             // increment no. of completed drives
            lock (appli) {
                nbLecteursTerminés++;
                if (nbLecteursTerminés == nbThreads)
                    finLecteurs.Set();
            }
}

EndLecteur 方法的作用是通知 Main 程序,所有读取器都已完成任务。

  • 第 4 行:计数器 nbReadersTerminated 被递增。
  • 第 5-6 行:如果所有读取器都已完成任务,则将事件 finLecteurs 设置为 true,以防止 Main 继续等待该事件。
  • 由于 EndLecteur 由多个线程执行,因此第 3 行中的保护了前面的临界区。

执行结果与多线程版本相似。

10.8.2. 示例 2

以下代码演示了 BackgroundWorker 类的其他要点:

  • 取消任务的能力
  • 报告任务中抛出的异常
  • 向任务传递 I/O 参数

using System;
using System.Threading;
using System.ComponentModel;
 
namespace Chap8 {
    class Program3 {
 
         // threads
        static BackgroundWorker[] tâches = new BackgroundWorker[5];
 
        public static void Main() {
             // init Current thread
            Thread main = Thread.CurrentThread;
            // name the Thread
            main.Name = "Main";
 
             // thread creation
            for (int i = 0; i < tâches.Length; i++) {
                 // create thread n° i
                tâches[i] = new BackgroundWorker();
                 // initialize it
                tâches[i].DoWork += Sleep;
                tâches[i].RunWorkerCompleted += End;
                tâches[i].WorkerSupportsCancellation = true;
                 // launch it
                tâches[i].RunWorkerAsync(new Data { Numéro = i, Début = DateTime.Now, Durée = i + 1 });
            }
            // cancel the last thread
            tâches[4].CancelAsync();
 
             // end of hand
            Console.WriteLine("Fin du thread {0}, tapez [entrée] pour terminer...", main.Name);
            Console.ReadLine();
            return;
        }
 
        public static void Sleep(object sender, DoWorkEventArgs infos) {
...
        }
 
        public static void End(object sender, RunWorkerCompletedEventArgs infos) {
...
        }
 
        internal class Data {
             // miscellaneous information
            public int Numéro { get; set; }
            public DateTime Début { get; set; }
            public int Durée { get; set; }
            public DateTime Fin { get; set; }
        }
    }
}
  • 第 9 行:BackgroundWorker
  • 第 18-27 行:创建线程
  • 第 20 行:创建线程
  • 第 22 行:该线程将执行第 39-41 行的 Sleep 操作
  • 第 23 行:第 43-45 行中的 End 方法将在线程结束时执行
  • 第 24 行:线程可以被取消
  • 第 26 行:线程以类型为 [Data] 的参数启动,该参数在第 49-52 行中定义。该对象具有以下字段:
    • Number(输入):线程编号
    • Start(输入):线程开始时间
    • Duration(输入):Sleep的运行时间
    • 结束 (退出):线程执行结束
  • 第 29 行:取消线程 4

所有线程随后执行 Sleep 操作:


        public static void Sleep(object sender, DoWorkEventArgs infos) {
             // we use the info parameter
            Data data = (Data)infos.Argument;
             // exception for task no. 3
            if (data.Numéro == 3) {
                throw new Exception("test....");
            }
             // sleep mode for Duration, stopping every second
            for (int i = 1; i <= data.Durée && !tâches[data.Numéro].CancellationPending; i++) {
                 // wait 1 second
                Thread.Sleep(1000);
            }
             // end of execution
            data.Fin = DateTime.Now;
             // initialize the result
            infos.Result = data;
            infos.Cancel = tâches[data.Numéro].CancellationPending;
}
  • 第 1 行:Sleep 方法具有标准的事件处理程序签名。它接收两个参数:
    • sender:事件发送者,此处为执行
    • news:类型为 DoWorkEventArgs,提供关于 DoWork 事件的信息。该参数既用于向线程传输信息,也用于获取其结果。
  • 第 3 行:传递给任务的 RunWorkerAsync 方法的参数位于 infos.Argument 中。
  • 第 5-7 行:针对任务编号 3 抛出异常
  • 第 9-12 行:该线程“以一秒为增量休眠 Duration 秒”,以便在第 9 行进行取消测试。这模拟了一个长时间运行的任务,在此期间线程会定期检查取消请求。为了表示已被取消,该线程必须将属性 infos.Cancel 设置为 true(第 17 行)。
  • 第 16 行:该线程可向调用它的线程返回结果。它将此结果存入 infos.Result 中。

完成后,线程将执行 End next:


public static void End(object sender, RunWorkerCompletedEventArgs infos) {
            // the infos parameter is used to display the result of execution
             // exception?
            if (infos.Error != null) {
                Console.WriteLine("Le thread {1} a rencontré l'erreur suivante : {0}", infos.Error.Message, sender);
            } else
                if (!infos.Cancelled) {
                    Data data = (Data)infos.Result;
                    Console.WriteLine("Thread {0} terminé : début {1:hh:mm:ss}, durée programmée {2} s, fin {3:hh:mm:ss}, durée effective {4}",
                    data.Numéro, data.Début, data.Durée, data.Fin, (data.Fin - data.Début));
                } else {
                    Console.WriteLine("Thread {0} annulé", sender);
                }
        }
  • 第 1 行:End 方法具有标准的事件处理程序签名。它接收两个参数:
    • sender:事件发送者,此处为执行
    • news:类型为 RunWorkerCompletedEventArgs,提供关于 RunWorkerCompleted 事件的信息。
  • 第 4 行:字段 infos.Error(类型为 Exception)仅在发生异常时才会被赋值。
  • 第 7 行:如果线程已被取消,则将布尔类型字段 infos.Cancelled 设置为 true
  • 第 8 行:若未发生异常或取消,则 infos.Result 即为已执行线程的结果。若在此情况下使用该结果(即线程已被取消或抛出异常时),将引发异常。 因此,在第 5 行和第 13 行,我们无法显示已取消或抛出异常的线程编号,因为该编号存储在 infos.Result 中。可以通过继承 BackgroundWorker 类来解决此问题,以便像前一个示例那样存储调用线程与被调用线程之间需要交换的信息。然后,我们使用代表 BackgroundWorkersender 参数,而不是 news

结果如下:

1
2
3
4
5
6
Fin du thread Main. Laissez les autres threads se terminer puis tapez [entrée] pour terminer...
Thread 0 terminé : début 05:19:46, durée programmée 1 s, fin 05:19:47, durée effective 00:00:01
Le thread System.ComponentModel.BackgroundWorker a rencontré l'erreur suivante : test....
Thread System.ComponentModel.BackgroundWorker annulé
Thread 1 terminé : début 05:19:46, durée programmée 2 s, fin 05:19:49, durée effective 00:00:03
Thread 2 terminé : début 05:19:46, durée programmée 3 s, fin 05:19:50, durée effective 00:00:04

10.9. 线程局部数据

10.9.1. 原理

考虑一个三层应用程序:

假设该应用程序是多用户应用程序,例如一个 Web 应用程序。每个用户由一个专用线程处理。该线程的生命周期如下:

  1. 线程被创建或从线程池中获取,以响应用户请求
  2. 如果该请求需要数据,线程将执行 [ui] 层中的方法,该方法会调用 [metier] 层中的方法,进而调用 [dao] 层中的方法。
  3. 线程将响应返回给用户。随后该线程消失或被回收至线程池中。

在操作 2 中,该线程拥有独立数据(即不与其他线程共享)可能很有用。例如,这些数据可能属于该线程正在服务的特定用户。随后,这些数据可在各个层([ui]、[metier]、[dao])中被使用。

Thread 类通过一种私有字典实现了这种场景,其中键的类型为 LocalDataStoreSlot

在线程的私有字典中为该键创建一个条目。
将值数据与线程私有字典中的键关联
从线程的私有字典中检索与该名称关联的值

使用模式可以如下所示:

  • 要创建与当前线程关联的 (,) 对:
Thread.SetData(Thread.GetNamedDataSlot("clé"),valeur);
  • 检索与关联的值:
Thread.GetData(Thread.GetNamedDataSlot("clé"));

10.9.2. 该原理的应用

考虑以下三层应用程序:

假设 [dao] 层管理一个文章数据库,且其接口初始定义如下:


using System.Collections.Generic;
 
namespace Chap8 {
    public interface IDao {
        int InsertArticle(Article article);
        List<Article> GetAllArticles();
        void DeleteAllArticles();
    }
}
  • 第 5 行:将一项数据插入数据库
  • 第6行:从数据库中检索所有文章
  • 第7行:从数据库中删除所有文章

稍后,我们需要一个方法,用于通过事务插入文章数组,因为我们希望以“全有或全无”的模式进行操作:要么所有文章都被插入,要么一个都不插入。然后,我们可以修改接口以集成这一新要求:


using System.Collections.Generic;
 
namespace Chap8 {
    public interface IDao {
        int InsertArticle(Article article);
        void insertArticles(Article[] articles);
        List<Article> GetAllArticles();
        void DeleteAllArticles();
    }
}
  • 第 6 行:将 articles 数组添加到数据库中

后来,在另一个应用中,需要删除保存在列表中的一组文章,且仍处于事务处理中。如我们所见,[DAO] 层将不断扩展以满足不同的业务需求。我们还可以采取另一种方案:

  • 仅将基本操作(InsertArticleDeleteArticleUpdateArticleSelectArticleSelectArticles)放入 [dao] 层
  • 将多篇文章的并发更新操作移交至 [business] 层。这些操作将调用 [dao] 层的基础操作。

此方案的优势在于,同一 [dao] 层可与不同的 [metier] 层配合使用而无需修改。然而,这给事务管理带来了困难,因为需要将待执行的更新操作原子性地归入同一事务中:

  • 事务必须由 [业务] 层在调用 [DAO] 层的方法之前发起
  • [DAO] 层的方法必须感知事务的存在,以便在事务存在时参与其中
  • 事务必须由[业务]层终止。

为确保 [dao] 层的方法能够感知当前事务的存在,我们可以将事务作为参数添加到 [dao] 层的每个方法中。该参数随后将出现在接口方法的签名中,从而将其与特定的数据源(即数据库)建立关联。 线程的本地数据为我们提供了一个更优雅的解决方案:[业务]层将事务放入线程的本地数据中,而[DAO]层将从那里获取它。[DAO]层的方法签名无需更改。

我们将通过以下 Visual Studio 项目来实现这一方案:

  • 在 [1] 中:整个解决方案
  • 在 [2] 中:所使用的引用。由于 [4] 是 SQL Server Compact 数据库,因此需要引用 [System.Data.SqlServerCe]。
  • 在 [3] 中:应用程序的不同层。

基础 [4] 是上一章(特别是第 9.3.1 节)中已使用的 SQL Server Compact 数据库。

 

Article 类

前一张表 [articles] 中的某一行被封装在一个 Article 类型的对象中:


namespace Chap8 {
    public class Article {
         // properties
        public int Id { get; set; }
        public string Nom { get; set; }
        public decimal Prix { get; set; }
        public int StockActuel { get; set; }
        public int StockMinimum { get; set; }
 
         // manufacturers
        public Article() { 
        }
 
        public Article(int id, string nom, decimal prix, int stockActuel, int stockMinimum) {
            Id = id;
            Nom = nom;
            Prix = prix;
            StockActuel = stockActuel;
            StockMinimum = stockMinimum;
        }
 
         // identity
        public override string ToString() {
            return string.Format("[{0},{1},{2},{3},{4}]", Id, Nom, Prix, StockActuel, StockMinimum);
        }
    }
}

层接口 [dao]

[dao] 层的 IDao 接口如下所示:


using System.Collections.Generic;
 
namespace Chap8 {
    public interface IDao {
        int InsertArticle(Article article);
        List<Article> GetAllArticles();
        void DeleteAllArticles();
    }
}
  • 第 5 行:向 [articles] 表中插入一条记录
  • 第 6 行:将表 [articles] 中的所有行放入对象列表 Article
  • 第 7 行:删除表 [articles] 中的所有行

[metier] 层接口

[metier] 层的 IMetier 接口如下:


using System.Collections.Generic;
 
namespace Chap8 {
    interface IMetier {
        void InsertArticlesInTransaction(Article[] articles);
        void InsertArticlesOutOfTransaction(Article[] articles);
        List<Article> GetAllArticles();
        void DeleteAllArticles();
    }
}
  • 第 5 行:在事务内插入一组文章
  • 第 6 行:同上,但不使用事务
  • 第 7 行:获取所有文章的列表
  • 第8行:删除所有文章

实现 [业务] 层

Trade 接口 IMetier 的实现如下:


using System.Collections.Generic;
using System.Data;
using System.Data.SqlServerCe;
using System.Threading;
 
namespace Chap8 {
    public class Metier : IMetier {
         // layer [dao]
        public IDao Dao { get; set; }
         // connecting chain
        public string ConnectionString { get; set; }
 
        // insert an array of articles inside a transaction
        public void InsertArticlesInTransaction(Article[] articles) {
            // create the connection to the
            using (SqlCeConnection connexion = new SqlCeConnection(ConnectionString)) {
                 // opening connection
                connexion.Open();
                 // transaction
                SqlCeTransaction transaction = null;
                try {
                     // start of transaction
                    transaction = connexion.BeginTransaction(IsolationLevel.ReadCommitted);
                    // register the transaction in the thread
                    Thread.SetData(Thread.GetNamedDataSlot("transaction"), transaction);
                     // articles insertion
                    foreach (Article article in articles) {
                        Dao.InsertArticle(article);
                    }
                    // validate the transaction
                    transaction.Commit();
                } catch {
                    // we undo the transaction
                    if (transaction != null)
                        transaction.Rollback();
                }
            }
        }
 
        // insertion of an array of articles without transaction
        public void InsertArticlesOutOfTransaction(Article[] articles) {
             // articles insertion
            foreach (Article article in articles) {
                Dao.InsertArticle(article);
            }
        }
 
         // articles list
        public List<Article> GetAllArticles() {
            return Dao.GetAllArticles();
        }
         // delete all articles
        public void DeleteAllArticles() {
            Dao.DeleteAllArticles();
        }
    }
}

该类具有以下属性:

  • 第 9 行:对 [dao] 层的引用
  • 第 11 行:用于连接文章数据库的连接字符串

我们仅对 InsertArticlesInTransaction 方法进行说明,该方法本身就存在一些难点:

  • 第 16 行:建立与数据库的连接
  • 第 18 行:现在打开
  • 第 23 行:创建事务
  • 第 25 行:将事务保存到线程的本地数据中,并关联 "transaction" 键
  • 第 27-29 行:针对每个待插入的项目,调用 [dao] 层的插入方法
  • 第 21 行和第 32 行:整个数组的插入操作由 try/catch 块控制
  • 第 31 行:若到达此处,说明未发生异常。随后对事务进行验证。
  • 第 34-35 行:已发生异常,事务被回滚
  • 第 37 行:使用 `EXIT` 语句退出。第 18 行打开的连接将自动关闭。

[DAO] 层的实现

Dao 接口 IDao 的实现如下:


using System.Collections.Generic;
using System.Data;
using System.Data.SqlServerCe;
using System.Threading;
 
namespace Chap8 {
    public class Dao : IDao {
         // connecting chain
        public string ConnectionString { get; set; }
         // requests
        public string InsertText { get; set; }
        public string DeleteAllText { get; set; }
        public string GetAllText { get; set; }
 
         // interface implementation

         // article insertion
        public int InsertArticle(Article article) {
            // is there a transaction in progress?
            SqlCeTransaction transaction = Thread.GetData(Thread.GetNamedDataSlot("transaction")) as SqlCeTransaction;
            // retrieve or create connection
            SqlCeConnection connexion = null;
            if (transaction != null) {
                 // recover connection
                connexion = transaction.Connection as SqlCeConnection;
            } else {
                 // create it
                connexion = new SqlCeConnection(ConnectionString);
                connexion.Open();
            }
            try {
                 // preparation of insertion order
                SqlCeCommand sqlCommand = new SqlCeCommand();
                sqlCommand.Transaction = transaction;
                sqlCommand.Connection = connexion;
                sqlCommand.CommandText = InsertText;
                sqlCommand.Parameters.Add("@nom", SqlDbType.NVarChar, 30);
                sqlCommand.Parameters.Add("@prix", SqlDbType.Money);
                sqlCommand.Parameters.Add("@sa", SqlDbType.Int);
                sqlCommand.Parameters.Add("@sm", SqlDbType.Int);
                sqlCommand.Parameters["@nom"].Value = article.Nom;
                sqlCommand.Parameters["@prix"].Value = article.Prix;
                sqlCommand.Parameters["@sa"].Value = article.StockActuel;
                sqlCommand.Parameters["@sm"].Value = article.StockMinimum;
                 // execution
                return sqlCommand.ExecuteNonQuery();
            } finally {
                // if you were not in a transaction, you close the connection
                if (transaction == null) {
                    connexion.Close();
                }
            }
        }
 
         // articles list
        public List<Article> GetAllArticles() {
...
        }
 
         // deletion of articles
        public void DeleteAllArticles() {
...
        }
    }
}

该类具有以下属性:

  • 第 9 行:用于连接 articles 数据库的连接字符串
  • 第 11 行:用于插入项目的 SQL 命令
  • 第 12 行:用于查询所有文章的 SQL 语句
  • 第 13 行:用于获取所有文章的 SQL 语句

这些属性将从以下配置文件 [App.config] 中初始化:


<?xml version="1.0" encoding="utf-8" ?>
<configuration>
    <connectionStrings>
        <add name="dbArticlesSqlServerCe" connectionString="Data Source=|DataDirectory|\dbarticles.sdf;Password=dbarticles;" />
    </connectionStrings>
    <appSettings>
        <add key="insertText" value="insert into articles(nom,prix,stockactuel,stockminimum) values(@nom,@prix,@sa,@sm)"/>
        <add key="getAllText" value="select id,nom,prix,stockactuel,stockminimum from articles"/>
        <add key="deleteAllText" value="delete from articles"/>
    </appSettings>
</configuration>

我们对 InsertArticle 方法进行注释:

  • 第 20 行:恢复 [业务] 层在该线程中创建的任何事务
  • 第 23-25 行:如果存在事务,则检索与其关联的连接。
  • 第 26-30 行:否则,创建并打开一个新的连接。
  • 第 33-44 行:准备插入命令。该命令是参数化的(参见 App.config 文件的 g 行)。
  • 第 33 行:创建 Command 对象。
  • 第 34 行:将其与当前事务关联。如果当前事务不存在(transaction=null),这相当于在没有显式事务的情况下执行 SQL 命令。在这种情况下,仍然存在隐式事务。对于 SQL Server CE,此隐式事务默认采用 autocommit 模式:即 SQL 在执行后立即提交
  • 第 35 行:将 Command 对象与当前连接关联
  • 第 36 行:设置待执行的 SQL 文本。这是 App.config 第 g 行中参数化的查询。
  • 第 37-44 行:初始化 4 个查询参数
  • 第 46 行:执行请求。
  • 第 49-51 行:请注意,如果之前没有事务,则已在第 26-30 行中与数据库建立了新连接。在这种情况下,必须关闭该连接。如果存在事务,则不得关闭连接,因为连接由 [业务] 层进行管理。

另外两个方法基于我们在“数据库”章节中所见的内容:


         // list of items
        public List<Article> GetAllArticles() {
             // item list - empty at start
            List<Article> articles = new List<Article>();
             // operation connection
            using (SqlCeConnection connexion = new SqlCeConnection(ConnectionString)) {
                 // opening connection
                connexion.Open();
                 // executes sqlCommand with select query
                SqlCeCommand sqlCommand = new SqlCeCommand(GetAllText, connexion);
                using (SqlCeDataReader reader = sqlCommand.ExecuteReader()) {
                     // operating income
                    while (reader.Read()) {
                         // current line operation
                        articles.Add(new Article(reader.GetInt32(0), reader.GetString(1), reader.GetDecimal(2), reader.GetInt32(3), reader.GetInt32(4)));
                    }
                }
            }
             // we return the result
            return articles;
        }
 
         // article deletion
        public void DeleteAllArticles() {
            using (SqlCeConnection connexion = new SqlCeConnection(ConnectionString)) {
                 // opening connection
                connexion.Open();
                 // executes sqlCommand with update request
                new SqlCeCommand(DeleteAllText, connexion).ExecuteNonQuery();
            }
}

该测试 [控制台] 应用程序

测试 [控制台] 应用程序如下:


using System;
using System.Configuration;
 
namespace Chap8 {
    class Program {
        static void Main(string[] args) {
            // using the configuration file
            string connectionString = null;
            string insertText;
            string getAllText;
            string deleteAllText;
            try {
                 // connecting chain
                connectionString = ConfigurationManager.ConnectionStrings["dbArticlesSqlServerCe"].ConnectionString;
                 // other parameters
                insertText = ConfigurationManager.AppSettings["insertText"];
                getAllText = ConfigurationManager.AppSettings["getAllText"];
                deleteAllText = ConfigurationManager.AppSettings["deleteAllText"];
            } catch (Exception e) {
                Console.WriteLine("Erreur de configuration : {0}", e.Message);
                return;
            }
             // layer creation [dao]
            Dao dao = new Dao();
            dao.ConnectionString = connectionString;
            dao.DeleteAllText = deleteAllText;
            dao.GetAllText = getAllText;
            dao.InsertText = insertText;
             // layer creation [job]
            Metier metier = new Metier();
            metier.Dao = dao;
            metier.ConnectionString = connectionString;
            // we create an array of articles
            Article[] articles = new Article[2];
            for (int i = 0; i < articles.Length; i++) {
                articles[i] = new Article(0, "article", 100, 10, 1);
            }
             // we delete all articles
            Console.WriteLine("Suppression de tous les articles...");
            metier.DeleteAllArticles();
            // insert the table outside the transaction
            Console.WriteLine("Insertion des articles hors transaction...");
            try {
                metier.InsertArticlesOutOfTransaction(articles);
            } catch (Exception e){
                Console.WriteLine("Exception : {0}", e.Message);
            }
            // we display the articles
            Console.WriteLine("Liste des articles");
            AfficheArticles(metier);
             // we delete all articles
            Console.WriteLine("Suppression de tous les articles...");
            metier.DeleteAllArticles();
            // insert the array in a transaction
            Console.WriteLine("Insertion des articles dans une transaction...");
            metier.InsertArticlesInTransaction(articles);
            // we display the articles
            Console.WriteLine("Liste des articles");
            AfficheArticles(metier);
        }
 
        private static void AfficheArticles(IMetier metier) {
            // we display the articles
            foreach(Article article in metier.GetAllArticles()){
                Console.WriteLine(article);
            }
        }
 
    }
}
  • 第 12-22 行:使用文件 [App.config]。
  • 第 24-28 行:实例化并初始化 [dao] 层
  • 第 30-32 行:[metier] 层同样如此
  • 第 34-37 行:创建一个包含 2 条同名记录的表。SQL Server 数据库 [dbarticles.sdf] 中的 [articles] 表对名称设有唯一性约束。 因此将拒绝插入第 2 个条目。若在事务外插入该数组,则先插入第 1 个条目并保留;若在事务内插入,则先插入第 1 个条目,随后在事务回滚时将其删除。
  • 第 39-50 行:在事务外插入 2 个文章数组并进行验证。
  • 第 52-59 行:与上述操作相同,但在事务中执行

结果如下:

1
2
3
4
5
6
7
8
9
Suppression de tous les articles...
Insertion des articles hors transaction...
Exception : A duplicate value cannot be inserted into a unique index. [ Table na
me = ARTICLES,Constraint name = UQ__ARTICLES__0000000000000010 ]
Liste des articles
[7,article,100,10,1]
Suppression de tous les articles...
Insertion des articles dans une transaction...
Liste des articles
  • 第 5-6 行:在事务外插入导致数据库中保留了第一个项目
  • 第 9 行:在事务中插入,数据库中未保留任何项目

10.9.3. 结论

前面的示例展示了线程局部数据在事务管理中的优势。但不应照搬该示例。Spring、Nhibernate 等框架虽然也采用了这种技术,但使其更加透明:[业务]层可以使用事务,而[DAO]层无需知晓。 [DAO] 层代码中不存在事务。这是通过一种名为 AOP(面向切面编程)的代理技术实现的。我们再次强烈建议您使用这些框架。

10.10. 了解更多...

若要深入了解线程同步这一复杂领域,请阅读本文开头提及的《C# 3.0》一书中的“线程”章节。该章节针对不同场景介绍了多种同步技术。