Skip to content
🔗 内容纲要:

操作系统原理之进程同步

标签:操作系统原理

目录

进程程同步简介

在同步(synchronization)的基础上,进程被分为以下两种类型:

  • 独立进程(Independent Process)。一个进程的执行并不影响其他进程的执行。
  • 合作进程(Cooperative Process)。一个进程可以影响或被系统中执行的其他进程所影响。

在合作进程的情况下会出现进程同步问题,因为在合作进程中资源是共享的。

条件竞争(Race Condition):

当一个以上的进程执行相同的代码或访问相同的内存或任何共享变量时,在这种情况下,输出或共享变量的值有可能是错误的,因此,所有的进程都在进行资源竞争,这种情况被称为条件竞争(race condition)。几个进程同时访问和处理同一数据的操作,那么结果就取决于访问发生的特定顺序。竞争条件可能发生在临界区中,当临界区的多个线程执行的结果因线程的执行顺序不同而发生。如果把临界区作为一个原子指令(atomic instruction)来处理,就可以避免临界区的条件竞争。此外,使用锁或原子变量(atomic variables)进行适当的线程同步(thread synchronization)也可以防止出现条件竞争。

临界区(Critical Section)问题:临界区是一个代码段,一次只能由一个进程访问。临界区包含需要同步的共享变量,以保持数据变量的一致性(consistency)。因此,关键部段问题意味着为合作进程需要设计一种访问共享资源而不产生数据不一致的方法。

临界区

在入口(entry section)部分,进程在进入临界区之前需要请求进入。任何解决临界区问题的方案都必须满足三个要求:

  • 相互排斥(Mutual Exclusion)。如果一个进程在其临界区执行,那么其他进程就不允许在临界区执行。
  • 进展(Progress)。如果没有进程在临界区执行,而其他进程在临界区之外等待,那么只有那些没有在其剩余部分(remainder section)执行的进程可以参与决定下一个进入临界区的进程,而且这种选择不能无限期地推迟(postponed indefinitely)。
  • 有限等待(Bounded Waiting)。在一个进程提出进入其临界区的请求后,在该请求被批准之前,允许其他进程进入其临界区的次数必须存在一个约束。

Peterson 方法

彼得森的解决方案是一个经典的基于软件的临界区(critical section)问题的解决方案。在彼得森的解决方案中,我们有两个共享变量:

  • boolean flag[i] 。初始化为 FALSE,最初没有进程对进入临界区感兴趣。
  • int turn 。轮到进入临界区的进程。

image

彼得森的解决方案保留了所有三个条件:

  • 相互排斥得到了保证,因为在任何时候只有一个进程可以访问临界区。
  • 进度也得到保证,因为在临界区之外的进程不会阻止其他进程进入临界区。
  • 由于每个进程都有公平的机会,因此保留了有限的等待。

彼得森解决方案的缺点:

  • 导致繁忙等待(busy waiting)。(在 Peterson 的解决方案中,代码声明 while(flag[j] && turn == j) 要对此负责。忙碌的等待是不受欢迎的,因为它浪费了可以用来执行其他任务的 CPU 周期(CPU cycles))。
  • 它被限制在 2 个进程。
  • 彼得森的解决方案不能用于现代 CPU 架构(modern CPU architectures)。

信号量(Semaphores)

信号量是一种信号机制(signaling mechanism),一个正在等待信号量的线程可以被另一个线程发出信号。这与 mutex(互斥锁)不同,因为 mutex 只能由被调用等待函数(wait function)的线程发出信号。

一个 semaphore 使用两个原子操作(atomic operations),进程同步的等待(wait)和信号(signal)。一个 semaphore 是一个整数变量,它只能通过 wait () 和 signal () 两个操作来访问。

有两种类型的 semaphores:二进制 semaphores 和计数 semaphores。

  • 二进制 semaphores(Binary Semaphores):它们只能是 0 或 1。它们也被称为互斥锁(mutex locks),因为该锁可以提供互斥(mutual exclusion)。所有的进程可以共享同一个初始化为 1 的互斥信号,然后,一个进程必须等待,直到锁变成 0。当它完成其临界区时,它可以将 mutex semaphore 的值重置为 0,其他进程可以进入临界区。
  • 计数 semaphores(Counting Semaphores):它们可以有任何值,并且不受限于某个领域。它们可以用来控制对某一资源的访问,该资源对同时访问(simultaneous accesses)的次数有限制。该信号可以被初始化为该资源的实例数。每当一个进程想使用该资源时,它就会检查剩余实例的数量是否超过零,也就是说,该进程是否有可用的实例。然后,该进程可以进入其临界区,从而使计数信号的值减少 1。在该进程结束对资源实例的使用后,它可以离开临界区,从而使资源的可用实例数增加 1。

同步临界区

什么是临界区?

当一个以上的进程访问同一个代码段时,该段被称为关键段(critical section,临界区)。

临界区包含需要同步的共享变量或资源,以保持数据变量的一致性。简单来说,临界区是一组需要原子化(atomically)执行的指令 / 语句或代码区域,比如访问一个资源(文件、输入或输出端口、全局数据等)。在并发编程(concurrent programming)中,如果一个线程试图改变共享数据的值,同时另一个线程试图读取该值(即跨线程的数据竞争,data race across threads),结果是不可预测的。对这种共享变量(共享内存、共享文件、共享端口等......)的访问是要同步(synchronized)的。很少有编程语言对同步化(synchronization)有内置支持。在编写内核模式程序(kernel-mode programming)(设备驱动程序、内核线程等)时,理解条件竞争(race conditions)的重要性至关重要,因为程序员可以直接访问和修改内核的数据结构。

它可以用下面的伪代码进行可视化:

c
do{
    flag=1;
    while(flag); // (entry section)
        // critical section
    if (!flag)
        // remainder section
} while(true);
1
2
3
4
5
6
7

对临界区的简单解决方案可以考虑如下:

txt
acquireLock();
Process Critical Section
releaseLock();
1
2
3

一个线程在执行临界区之前必须获得一个锁。该锁只能由一个线程获得。在上述伪代码中,有多种方法来实现锁。

原子操作

什么是原子操作(atomic operation)?原子操作的概念有助于理解重入(reentrancy)、临界区(critical section)、线程安全(thread safety)、同步原语(synchronization primitives)等。

原子性和原子操作:

简单来说,原子性(atomicity)就是不可中断性(unbreakability),也就是不间断的操作(uninterrupted operation)。如果两个用户发出一个打印命令,每一次打印都应该是单次尝试(single attempt)。如果打印机驱动程序同时从两个用户那里发送部分数据,那么打印出来的结果将不符合预期。因此,打印机驱动程序必须将打印命令作为不可中断的操作从一个应用程序中发送出去(换句话说,同步访问打印机)。

请注意,关于原子性的数据库术语会有所不同,但概念是相同的。

原子操作的一个典型例子是指令的执行,通常情况下,给执行单元(execution unit)的指令不可能在中间停止。然而,高级语言中的一条语句会产生多条指令,这是非原子性操作(non-atomic operations)的根本原因。

进程间通信(IPC)

一个进程可以有两种类型:

  • 独立进程(Independent process)
  • 合作进程(Co-operating process)

一个独立的进程不受其他进程执行的影响,而一个合作的进程可能受到其他执行进程的影响。尽管人们可以认为那些独立运行的进程会非常有效地执行,但在现实中,有许多情况下可以利用合作的特性(co-operative nature)来提高计算速度、便利性和模块化(modularity)。进程间通信(IPC,Inter-process communication)是一种机制,它允许进程之间相互通信并同步其行动(synchronize their actions)。这些进程之间的通信可以被看作是它们之间的一种合作。进程之间可以通过以下两种方式进行通信:

  • 共享内存(Shared Memory)
  • 消息传递(Message passing)

一个操作系统可以实现这两种通信方法。首先,我们将讨论共享内存的通信方法,然后是消息传递。

使用共享内存的进程之间的通信需要进程共享一些变量,这完全取决于程序员将如何实现它。使用共享内存的一种通信方式可以是这样:假设进程 1 和进程 2 同时(simultaneously)执行,它们共享一些资源或使用另一个进程的一些信息。进程 1 产生关于某些计算或资源被使用的信息,并将其作为记录保存在共享内存(shared memory)中。当进程 2 需要使用共享信息时,它将在存储在共享内存中的记录中检查,并注意到进程 1 生成的信息,并采取相应的行动(act accordingly)。进程可以使用共享内存从另一个进程中提取信息作为记录,以及向其他进程传递任何特定信息。

让我们来讨论一个使用共享内存方法(Shared Memory Method)的进程间通信的例子:

image

共享内存法

例如:生产者 - 消费者问题(Producer-Consumer problem)

有两个进程:生产者和消费者。生产者生产一些物品,消费者消费该物品。这两个进程共享一个共同的空间或内存位置,称为缓冲区(buffer),生产者生产的物品存放在这里,如果需要,消费者从这里消费该物品。

这个问题有两个版本:第一个版本被称为无界缓冲区问题(unbounded buffer problem),在这个问题中,生产者可以不断地生产物品,并且对缓冲区的大小没有限制;第二个版本被称为有界缓冲区问题(bounded buffer problem),在这个问题中,生产者在开始等待消费者消费之前可以生产一定数量的物品。我们将讨论有界缓冲区问题。

首先,生产者和消费者将共享一些共同的内存,然后生产者将开始生产物品。如果生产的项目总数等于缓冲区的大小,生产者将等待让消费者消费它。同样地,消费者将首先检查项目的可用性,如果没有可用的物品,消费者将等待生产者生产它。如果有可用的项目,消费者将消费它们。下面提供了演示的伪代码。

两个进程之间的共享数据:

c
#define buff_max 25
#define mod %

 struct item{

  // different member of the produced data
  // or consumed data
  ---------
 }
 
 // An array is needed for holding the items.
 // This is the shared place which will be
 // access by both process
 // item shared_buff [ buff_max ];
 
 // Two variables which will keep track of
 // the indexes of the items produced by producer
 // and consumer The free index points to
 // the next free index. The full index points to
 // the first full index.
 int free_index = 0;
 int full_index = 0;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

生产者进程代码:

c
item nextProduced;
 
 while(1){
  
  // check if there is no space
  // for production.
  // if so keep waiting.
  // free_index 的下一个坐标(环形)是full_index,说明从 free_index 到 full_index
  // 已经占满了数组,新的产品将没有位置放置
  while((free_index+1) mod buff_max == full_index);
  // 生产新的产品并放入产品池
  shared_buff[free_index] = nextProduced;
  // 后移 free_index(环形)
  free_index = (free_index + 1) mod buff_max;
 }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

消费者进程代码:

c
item nextConsumed;
 
 while(1){
  
  // check if there is an available
  // item for consumption.
  // if not keep on waiting for
  // get them produced.
  // free_index 与 full_index 重合表示没有产品池中已经没有产品
  while((free_index == full_index);
  // 消费池中生产日期最长的产品
  nextConsumed = shared_buff[full_index];
  // full_index 后移(环形)
  full_index = (full_index + 1) mod buff_max;
 }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

在上面的代码中,当 (free_index+1) mod buff max 是空的时候,生产者将再次开始生产,因为如果它不是空的,这意味着仍然有项目可以被消费者消费,所以不需要生产更多。同样地,如果 free indexfull index 指向同一个索引,这意味着没有物品可以消费。

信息传递法

现在,我们将开始讨论进程之间通过消息传递进行通信的问题。在这种方法中,进程之间的通信不需要使用任何种类的共享内存。如果两个进程 p1 和 p2 想相互通信,它们的程序如下:

  • 建立一个通信链路(communication link)(如果一个链路已经存在,不需要再建立)。
  • 开始使用基本原语(basic primitives)交换信息。我们至少需要两个原语。
    • send(message, destination)send(message)
    • receive(message, host)receive(message)

image

信息大小可以是固定的,也可以是可变大小的。如果是固定大小,对操作系统设计者来说很容易,但对程序员来说很复杂;如果是可变大小,对程序员来说很容易,但对操作系统设计者来说很复杂。一个标准的消息可以有两个部分:消息头部(header)和消息体(body)。消息头部用于存储消息类型(message type)、目的地 ID(destination id)、源 ID(source id)、消息长度(message length)和控制信息(control information)。控制信息包含的信息有:缓冲区空间(buffer space)用尽的处理措施、序列号(sequence number)、优先级(priority)。一般来说,消息是以先进先出的方式(FIFO)发送的。

通信链路传递消息

直接和间接通信链路(Communication Link):

现在,我们将开始讨论实现通信链路的方法。在实现链接时,有一些问题需要牢记在心,比如:

  • 链接是如何建立的?
  • 一个链路可以与两个以上的进程相关联(associated)吗?
  • 每一对通信进程之间可以有多少个链路?
  • 链接的容量(capacity)是多少?链接所能容纳的信息的大小是固定的还是可变的?
  • 一个链接是单向的还是双向的(unidirectional or bi-directional)?

一个链路有一定的容量(capacity),它决定了可以暂时驻留在其中的消息数量,为此,每个链路都有一个与之相关的队列(queue),可以是零容量(zero capacity)、有限界容量(bounded capacity)或无线容量(unbounded capacity)。在零容量的情况下,发送方等待直到接收方通知发送方它已经收到消息。在非零容量(non-zero capacity)的情况下,一个进程在发送操作后不知道对方是否已经收到了消息。为此,发送方必须与接收方进行明确的沟通。链接(link)的实现取决于实际情况(situation),它可以是直接通信链接(direct communication link),也可以是间接通信链接(in-directed communication link)。

当进程使用特定的进程标识符(process identifier)进行通信,但很难提前识别发送方时,就可以实现直接通信链接(Direct Communication links)。例如,打印服务器。

间接通信(In-direct Communication)是通过一个共享邮箱(端口,port)进行的,它由一个消息队列组成。发送者把消息放在邮箱(端口内的消息队列)里,接收者把它们取出来。

交换信息传递信息

同步和异步的消息传递:

一个被阻塞的进程可能是被事件阻塞,如资源的可用性或 I/O 操作的完成。IPC 可以在同一台计算机上的进程之间进行,也可以在不同计算机上运行的进程之间进行,即在网络 / 分布式系统(networked/distributed system)中。在这两种情况下,进程在发送消息或试图接收消息时可能会被阻塞,也可能不会被阻塞,因此消息传递可能是阻塞或非阻塞的(blocking or non-blocking)。

阻断被认为是同步(synchronous)的,阻塞式发送意味着发送者将被阻断,直到消息被接收者收到。同样地,阻塞式接收让接收者阻塞,直到有消息可用。非阻塞被认为是异步(asynchronous)的,非阻塞式发送是指发送者发送消息并继续工作。同样地,非阻塞式接收让接收者收到有效的信息或空信息。

经过仔细的分析,我们可以得出一个结论,对于发送方来说,在消息传递之后,非阻塞是比较自然的,因为可能需要将消息发送到不同的进程。然而,发送方希望在发送失败的情况下得到接收方的确认。同样地,接收者在报告接收后处于阻塞状态是更自然的,因为收到的消息中的信息可能会被用于进一步的执行。同时,如果消息发送不断失败,接收方将不得不无限期(indefinitely)地等待。这就是为什么我们还要考虑消息传递的其他可能性。基本上有三种首选组合:

  • 阻塞式发送和阻塞式接收
  • 非阻塞式发送和非阻塞式接收
  • 非阻塞式发送和阻塞式接收(多数情况下使用)。

在直接消息传递中,想要进行通信的进程必须明确命名通信的接收者或发送者。例如, send(p1, message) 意味着向 p1 发送消息。类似地, receive(p2, message) 意味着从 p2 接收消息。

在这种通信方法中,通信链路自动建立,可以是单向的(unidirectional),也可以是双向的(bidirectional),但一对发送者和接收者之间可以使用一条链路,一对发送者和接收者不应该拥有超过一对链路。

发送和接收之间的对称性(symmetry)和非对称性(asymmetry)也可以实现,即要么两个进程在发送和接收信息时相互命名,要么只有发送方在发送信息时命名接收方,接收方在接收信息时不需要命名发送方。这种通信方法的问题是,如果一个进程的名称发生变化,这种方法将无法工作。

在间接消息传递中,进程使用邮箱(也被称为端口)来发送和接收消息。每个邮箱都有一个唯一的 ID,进程只有在共享一个邮箱时才能通信。只有当进程共享一个共同的邮箱时才建立链接,而且一个链接可以与许多进程相关联。每对进程可以共享几个通信链接,这些链接可以是单向的或双向的。

假设两个进程想通过间接消息传递进行通信,所需的操作是:创建一个邮箱,使用该邮箱发送和接收消息,然后销毁该邮箱。使用的标准原语是: send(A, message) ,这意味着将消息发送到邮箱 A。接收消息的原语也以同样的方式工作,例如: received (A, message) 。这个邮箱的实现有一个问题。假设有两个以上的进程共享同一个邮箱,并且假设进程 p1 向邮箱发送了一个消息,那么哪个进程将是接收方?这可以通过以下方式解决:

强制规定只有两个进程可以共享一个邮箱,或者强制规定在给定时间内只允许一个进程执行接收,或者随机选择任何一个进程并通知发送方为接收方。一个邮箱可以成为单个发送方 / 接收方对的私有邮箱,也可以在多个发送方 / 接收方对之间共享。端口(Port)是这种邮箱的一个实现,可以有多个发送方和一个接收方。它被用于客户端 / 服务器应用程序(在这种情况下,服务器是接收方)。端口由接收进程拥有,并由操作系统在接收进程的请求下创建,并且可以在接收者自身终止时,在同一接收处理器的请求下销毁。强制执行只有一个进程被允许执行接收,可以使用互斥(mutual exclusion)的概念来完成。互斥邮箱(Mutex mailbox)被创建,由 n 个进程共享。发送者是无阻塞的,并发送消息。第一个执行接收的进程将进入临界区,所有其他进程将处于阻塞状态并等待。

现在,让我们用消息传递的概念来讨论生产者 - 消费者问题。生产者在邮箱中放置项目(消息),当邮箱中至少有一条消息时,消费者就可以消费一个项目。以下是代码:

生产者代码:

c
void Producer(void){
  
  int item;
  Message m;
  
  while(1){
   
   receive(Consumer, &m);
   item = produce();
   build_message(&m , item) ;
   send(Consumer, &m);
  }
 }
1
2
3
4
5
6
7
8
9
10
11
12
13

消费者代码:

c
void Consumer(void){
  
  int item;
  Message m;
  
  while(1){
   
   receive(Producer, &m);
   item = extracted_item();
   send(Producer, &m);
   consume_item(item);
  }
 }
1
2
3
4
5
6
7
8
9
10
11
12
13

IPC 系统的例子

IPC 系统的例子如下:

  • Posix:使用共享内存法
  • Mach : 使用信息传递法
  • Windows XP:使用本地过程调用(local procedural calls)的消息传递方式

客户 / 服务器结构(client/server Architecture)中的通信机制:

  • 管道(Pipe)
  • 套接字(Socket)
  • 远程过程调用(RPC,Remote Procedural calls)

Semaphores 信息量

Semaphore 是由 Dijkstra 在 1965 年提出的,这是一项非常重要的技术,通过使用一个简单的整数值来管理并发进程(concurrent processes),这就是所谓的 semaphore。semaphore 是一个简单的整数变量,在线程之间共享。这个变量被用来解决临界区的问题,并在多进程(multiprocessing)环境中实现进程的同步(synchronization)。

信号量的类型

Semaphores 有两种类型:

  • 二进制信息量(Binary Semaphore,二元信号量): 这也被称为 mutex 锁。它只能有两个值 --0 和 1。它的值被初始化为 1。它被用来实现解决多进程的临界区问题。
  • 计数信息量(Counting Semaphore):它的值可以在一个不受限制的范围内。它被用来控制对有多个实例的资源的访问。

现在让我们看看它是如何做到这一点的。

P 操作和 V 操作

首先,看一下可以用来访问和改变 semaphore 变量值的两个操作:

image

关于 P 和 V 操作的一些重点:

  • P 操作(P operation)也被称为等待(wait)、睡眠(sleep)或下降(down)操作,而 V 操作(V operation)也被称为信号(signal)、唤醒(wake-up)或上升(up)操作。
  • 这两个操作都是原子操作,并且信号量总是被初始化为一个,即 s=1。这里的原子性是指在同一时间 / 时刻对变量进行读取、修改和更新,没有抢占,也就是说,在读取、修改和更新之间,没有其他操作会改变该变量。
  • 一个临界区被这两个操作所包围,以实现进程同步。请看下面的图片。进程 P 的临界区位于 P 和 V 操作之间。

image

二元信号量

现在,让我们看看它是如何实现互斥的。假设有两个进程 P1 和 P2,semaphore s 被初始化为 1。现在,如果假设 P1 进入了它的临界区,那么 semaphore s 的值就变成了 0。这样就实现了互斥。请看下面的图片,这就是二进制信号量。

image

Binary semaphores 的实现:

c
struct semaphore {
 enum value(0, 1);

 // q contains all Process Control Blocks (PCBs)
 // corresponding to processes got blocked
 // while performing down operation.
 Queue<process> q;

} P(semaphore s)
{
 if (s.value == 1) {
  s.value = 0;
 }
 else {
  // add the process to the waiting queue
  q.push(P) sleep();
 }
}
V(Semaphore s)
{
 if (s.q is empty) {
  s.value = 1;
 }
 else {

  // select a process from waiting queue
  Process p = q.front();
  // remove the process from wating as it has been
  // sent for CS
  q.pop();
  wakeup(p);
 }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

局限性:

  • 信号量最大的限制之一是优先级倒置(priority inversion)。
  • 死锁(Deadlock),假设一个进程试图唤醒另一个不在睡眠状态的进程。因此,一个死锁可能会无限期地阻塞。
  • 操作系统必须跟踪所有调用等待和信号的 semaphore。

在这个实现 semaphore 的过程中存在问题:

semaphores 的主要问题是它需要繁忙的等待,如果一个进程处于临界区,那么其他试图进入临界区的进程将一直等待,直到临界区没有被任何进程占据。每当有进程等待时,它就会不断地检查 semaphore 的值(请看在 P 操作中这一行 while (s==0); ),并浪费 CPU 周期(CPU cycle)。

上面的描述是针对二进制信号量的,它只能取 0 和 1 两个值,并确保相互排斥。还有一种类型的信号灯叫做计数信号量,可以取大于 1 的值。

计数信号量

现在假设有一种资源,其实例数为 4,现在我们初始化 S=4,其余部分与二进制信号量相同。每当进程需要该资源时,它就会调用 P 或等待函数,当它完成后,就会调用 V 或信号函数。如果 S 的值变成了 0,那么一个进程必须等待,直到 S 变成正值。例如,假设有 4 个进程 P1、P2、P3、P4,它们都对 S(初始化为 4)调用等待操作。如果另一个进程 P5 想要这个资源,那么它应该等待,直到这四个进程中的一个调用信号函数,并且 semaphore 的值变为正值。

在等待锁的过程中,也有可能出现 "自旋锁"(spinlock),因为这些进程不断地旋转(spin)。为了避免这种情况,下面提供了另一种实现,计数信号量的实现:

c
struct Semaphore {

 int value;

 // q contains all Process Control Blocks(PCBs)
 // corresponding to processes got blocked
 // while performing down operation.
 Queue<process> q;

} P(Semaphore s)
{
 s.value = s.value - 1;
 if (s.value < 0) {

  // add process to queue
  // here p is a process which is currently executing
  q.push(p);
  block();
 }
 else
  return;
}

V(Semaphore s)
{
 s.value = s.value + 1;
 if (s.value <= 0) {

  // remove process p from queue
  Process p = q.pop();
  wakeup(p);
 }
 else
  return;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

在这个实现中,每当进程等待时,它就会被加入到与该信号相关的进程的等待队列中。这是在该进程上通过系统调用 block () 完成的。当一个进程完成后,它会调用信号函数,队列中的一个进程会被恢复。它使用 wakeup () 系统调用。

Mutex vs Semaphore

mutex 和 semaphore 之间的区别是什么?什么时候应该使用 mutex,什么时候应该使用 semaphore?

按照操作系统的术语,mutexes 和 semaphores 是提供同步服务(synchronization services,也叫同步原语,synchronization primitives)的内核资源(kernel resources)。为什么我们需要这样的同步原语?只有一个不就够了吗?为了回答这些问题,我们需要了解一些核心概念。

生产者 / 消费者的问题

请注意,该内容是一个概括性的解释。实际细节因场景而异。

考虑一下标准的生产者 - 消费者问题。假设,我们有一个长度为 4096 字节的缓冲区。一个生产者线程收集数据并将其写入缓冲区。一个消费者线程处理从缓冲区收集的数据。目标是,这两个线程不应该同时运行。

使用 Mutex:

Mutex 提供相互排斥(mutual exclusion),生产者或消费者都可以拥有钥匙(Mutex)并继续他们的工作。只要缓冲区被生产者填满,消费者就需要等待,反之亦然。在任何时间点,只有一个线程可以处理整个缓冲区。这个概念可以用 semaphore 来概括。

使用 semaphore:

semaphore 是一个通用的 mutex。为了代替单个缓冲区,我们可以将 4KB 的缓冲区分成四个 1KB 的缓冲区(相同的资源)。一个 semaphore 可以与这四个缓冲区相关联。消费者和生产者可以同时在不同的缓冲区工作。

误区

在二进制信号量和 mutex 之间存在一个模糊的概念。我们可能遇到过,mutex 是一个二进制信号量。但事实并非如此。mutex 和 semaphore 的目的是不同的。也许,由于它们在实现上的相似性,mutex 会被称为二进制信号量。

严格来说,mutex 是一种锁定机制,用于同步访问一个资源。只有一个任务(可以是线程,也可以是基于操作系统抽象的进程)可以获得该 mutex。这意味着有所有权与 mutex 相关联,并且只有所有者可以释放锁(mutex)。

semaphore 是一种信号机制(ignaling mechanism)("我完成了,你可以继续" 的信号)。例如,如果你在手机上听歌(假设是一个任务),同时你的朋友给你打电话,就会触发一个中断,中断服务程序(ISR)就会给呼叫处理任务发出唤醒信号。

常见问题

  1. 一个线程可以获得一个以上的锁(Mutex)吗?是的,一个线程有可能需要一个以上的资源,因此要有锁。如果任何锁不可用,线程将在锁上等待(阻塞,wait (block))。

  2. 一个 mutex 可以被锁定一次以上吗?一个 mutex 是一个锁。只有一种状态(锁定 / 解锁)与它相关联。然而,一个递归的 mutex(recursive mutex)可以被锁定一次以上(POSIX 兼容系统),其中有一个计数与它相关联,但只保留一个状态(锁定 / 未锁定)。程序员必须使 mutex 解锁的次数与它被锁定的次数相同。

  3. 如果一个非递归的 mutex 被锁定超过一次,会发生什么?死锁(Deadlock)。如果一个已经锁定的 mutex 的线程,试图再次锁定该 mutex,它将进入该 mutex 的等待列表,这将导致死锁。这是因为没有其他线程可以解锁该 mutex。操作系统的实现者可以谨慎地识别 mutex 的所有者,如果它已经被同一个线程锁定,则返回,以防止死锁。

  4. 二进制信号量和互斥(mutex)是一样的吗?不,我们建议将它们分开处理,正如在信号量与锁机制中所解释的那样。但是二进制信号量可能会遇到与 mutex 类似的重要问题(例如优先级倒置(priority inversion))。

  5. 什么是 mutex 和临界区(critical section)?一些操作系统在 API 中使用了相同的单词 critical section。通常情况下,由于与之相关的保护协议(protection protocols),mutex 是一个昂贵的操作。最后,mutex 的目标是原子访问(atomic access)。还有其他的方法来实现原子访问,比如禁用中断(disabling interrupts),这样可以快得多,但会破坏响应性(responsiveness)。替代的 API 使用了禁用中断的方法。

  6. 什么是事件(events)?mutex、semaphore、event、critical section 等的语义都是一样的,都是同步原语(synchronization primitives)。基于使用它们的成本,它们是不同的。我们应该查阅操作系统文档以了解确切的细节。

  7. 我们可以在中断服务程序(ISR)中获取 mutex/semaphore 吗?ISR 将在当前运行线程的上下文中异步运行。我们不建议在 ISR 中查询(阻塞调用,blocking call)同步原语的可用性。ISR 是短暂的,对 mutex/semaphore 的调用可能会阻塞当前运行的线程。然而,一个 ISR 可以给 semaphore 发出信号或解锁 mutex。

  8. 当 mutex/semaphore 不可用时,我们所说的 "线程阻塞在 mutex/semaphore 上" 是什么意思?每个同步原语都有一个与之相关的等待列表(waiting list)。当资源不可用时,请求的线程将从处理器的运行列表(running list)中移到同步原语的等待列表中。当资源可用时,等待列表中优先级较高的线程会获得资源(更准确地说,这取决于调度策略(scheduling policies))。

  9. 当资源不可用时,线程是否必须一直阻塞?没有必要。如果设计确定 "当资源不可用时必须做什么",线程可以承担这项工作(不同的代码分支)。为了支持应用需求,操作系统提供了非阻塞的 API。例如,POSIX pthread_mutex_trylock() API。当 mutex 不可用时,函数立即返回,而 API pthread_mutex_lock() 则阻断线程,直到资源可用。

还可以将 mutex/semaphores 与 Peterson 的算法和 Dekker 的算法进行比较。一本好的参考书是《并发的艺术》(《Art of Concurrency book》)。还可以在 Qt 文档中探讨读锁和写锁。

参考

生产者和消费者问题

生产者消费者问题是一个经典的同步问题。我们可以通过使用 semaphores 来解决这个问题。

一个信号量 S 是一个整数变量,只能通过两个标准操作来访问:wait () 和 signal ()。wait () 操作将 semaphore 的值减少 1,signal () 操作将其值增加 1。

c
wait(S){
while(S<=0);   // busy waiting
S--;
}

signal(S){
S++;
}
1
2
3
4
5
6
7
8

Semaphores 有两种类型:

  • Binary Semaphore - 这类似于 mutex lock,但不是一回事。它只能有两个值 --0 和 1。它的值被初始化为 1。它被用来实现解决多进程的临界区问题。
  • Counting Semaphore - 它的值可以在一个不受限制的领域内。它被用来控制对多实例的资源的访问。

问题描述:

我们有一个固定大小的缓冲区。一个生产者可以生产项目并将其放在缓冲区(buffer)内。消费者可以挑选物品,也可以消费它们。我们需要确保生产者在将物品放入缓冲区的同时,消费者不应该消费任何物品。在这个问题中,缓冲区是临界区(critical section)。

为了解决这个问题,我们需要两个计数信号量(counting semaphores):Full 和 Empty。"Full" 记录任何时候缓冲区内的项目数量,"Empty" 记录未被占用的槽的数量。

信号量的初始化:

  • mutex = 1
  • Full = 0 // 初始时,所有槽位都是空的。因此,满槽为 0
  • Empty = n // 最初所有的槽都是空的

生产者的解决方案:

c
do{

//produce an item

wait(empty);
wait(mutex);

//place in buffer

signal(mutex);
signal(full);

}while(true)
1
2
3
4
5
6
7
8
9
10
11
12
13

当生产者产生一个项目时,"empty" 的值会减少 1,因为现在有一个槽会被填充。mutex 的值也被减少,以防止消费者访问缓冲区。现在,生产者已经放置了物品,因此 "full" 的值增加了 1,mutex 的值也增加了 1,因为生产者的任务已经完成,消费者可以访问缓冲区。

消费者的解决方案:

c
do{

wait(full);
wait(mutex);

// remove item from buffer

signal(mutex);
signal(empty);

// consumes item

}while(true)
1
2
3
4
5
6
7
8
9
10
11
12
13

由于消费者正在从缓冲区移除一个项目,因此 "full" 的值减少了 1,mutex 的值也减少了,所以生产者此时不能访问缓冲区。现在,消费者已经消耗了这个项目,因此 "empty" 的值增加了 1,Mutex 的值也增加了,所以生产者现在可以访问缓冲区。

参考:

用餐哲学家问题

吃饭的哲学家问题(The Dining Philosopher Problem):

吃饭的哲学家问题指出,K 个哲学家围着一张圆桌子坐着,每对哲学家之间有一根筷子。如果一个哲学家能够拿起与他相邻的两根筷子,他就可以吃饭。一根筷子只可以被其相邻的任何一个人拿起,但不能同时拿起。

image

每个哲学家都由以下伪代码表示:

c
process P[i]
 while true do
   {  THINK;
      PICKUP(CHOPSTICK[i], CHOPSTICK[i+1 mod 5]);
      EAT;
      PUTDOWN(CHOPSTICK[i], CHOPSTICK[i+1 mod 5])
   }
1
2
3
4
5
6
7

哲学家有三种状态。思考、饥饿和进食(THINKING, HUNGRY, and EATING)。这里有两个信号量。Mutex 和一个用于哲学家的信号量数组(semaphore array)。Mutex 的使用使得没有两个哲学家可以在同一时间访问 pickup 或 putdown。数组被用来控制每个哲学家的行为。但是,由于编程错误,信号量可能导致死锁。

代码:

c
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>

#define N 5
#define THINKING 2
#define HUNGRY 1
#define EATING 0
#define LEFT (phnum + 4) % N
#define RIGHT (phnum + 1) % N

int state[N];
int phil[N] = { 0, 1, 2, 3, 4 };

sem_t mutex;
sem_t S[N];

void test(int phnum)
{
 if (state[phnum] == HUNGRY
  && state[LEFT] != EATING
  && state[RIGHT] != EATING) {
  // state that eating
  state[phnum] = EATING;

  sleep(2);

  printf("Philosopher %d takes fork %d and %d\n",
     phnum + 1, LEFT + 1, phnum + 1);

  printf("Philosopher %d is Eating\n", phnum + 1);

  // sem_post(&S[phnum]) has no effect
  // during takefork
  // used to wake up hungry philosophers
  // during putfork
  sem_post(&S[phnum]);
 }
}

// take up chopsticks
void take_fork(int phnum)
{

 sem_wait(&mutex);

 // state that hungry
 state[phnum] = HUNGRY;

 printf("Philosopher %d is Hungry\n", phnum + 1);

 // eat if neighbours are not eating
 test(phnum);

 sem_post(&mutex);

 // if unable to eat wait to be signalled
 sem_wait(&S[phnum]);

 sleep(1);
}

// put down chopsticks
void put_fork(int phnum)
{

 sem_wait(&mutex);

 // state that thinking
 state[phnum] = THINKING;

 printf("Philosopher %d putting fork %d and %d down\n",
  phnum + 1, LEFT + 1, phnum + 1);
 printf("Philosopher %d is thinking\n", phnum + 1);

 test(LEFT);
 test(RIGHT);

 sem_post(&mutex);
}

void* philosopher(void* num)
{

 while (1) {

  int* i = num;

  sleep(1);

  take_fork(*i);

  sleep(0);

  put_fork(*i);
 }
}

int main()
{

 int i;
 pthread_t thread_id[N];

 // initialize the semaphores
 sem_init(&mutex, 0, 1);

 for (i = 0; i < N; i++)

  sem_init(&S[i], 0, 0);

 for (i = 0; i < N; i++) {

  // create philosopher processes
  pthread_create(&thread_id[i], NULL,
     philosopher, &phil[i]);

  printf("Philosopher %d is thinking\n", i + 1);
 }

 for (i = 0; i < N; i++)

  pthread_join(thread_id[i], NULL);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124

参考:

睡眠理发师问题

问题:

有一家理发店,有一个理发师,一张理发椅,如果有顾客坐的话,还有 n 张椅子用来等待顾客。如果没有顾客,那么理发师就睡在自己的椅子上。当有顾客到来时,他必须叫醒理发师。如果有很多顾客,而理发师正在为顾客理发,那么剩下的顾客如果等候室里有空椅子就等待,如果没有空椅子就离开。

image

解决方案:

这个问题的解决方案包括三个信号量。第一个信号量是用于计算等待室中的顾客数量(理发师椅子上的顾客不包括在内,因为他没有在等待)。第二,0 或 1 用来告诉理发师是空闲还是在工作,第三个 mutex 用来提供相互排斥,这是进程执行所需要的。在解决方案中,顾客有在等候室等待的顾客数量的记录,如果顾客的数量等于等候室的椅子数量,那么即将到来的顾客就会离开理发店。

当理发师早上出现时,他执行程序 barber,导致他在 semaphore customers 上阻塞,因为它最初是 0。然后理发师去睡觉,直到第一个顾客出现。

当一个顾客来的时候,他执行 customer 过程,顾客获得了进入临界区的 mutex,如果此后有另一个顾客进入,第二个顾客将不能做任何事情,直到第一个顾客释放了 mutex。然后,客户检查等待室的椅子,如果等待的客户少于椅子的数量,那么他就坐下,否则他就离开并释放 mutex。

如果椅子是空的,那么顾客就坐在等候室里,并增加变量 waiting 的值,同时也增加顾客的 semaphore,如果理发师在睡觉,就把他唤醒。

这时,顾客和理发师都醒了,理发师准备给这个人理发了。理发结束后,顾客退出程序,如果等待室里没有顾客,理发师就会睡觉。

image

c
Semaphore Customers = 0;
Semaphore Barber = 0;
Mutex Seats = 1;
int FreeSeats = N;

Barber {
 while(true) {
   /* waits for a customer (sleeps). */
   down(Customers);

   /* mutex to protect the number of available seats.*/
   down(Seats);

   /* a chair gets free.*/
   FreeSeats++;
   
   /* bring customer for haircut.*/
   up(Barber);
   
   /* release the mutex on the chair.*/
   up(Seats);
   /* barber is cutting hair.*/
 }
}

Customer {
 while(true) {
   /* protects seats so only 1 customer tries to sit
   in a chair if that's the case.*/
   down(Seats); //This line should not be here.
   if(FreeSeats > 0) {
    
    /* sitting down.*/
    FreeSeats--;
    
    /* notify the barber. */
    up(Customers);
    
    /* release the lock */
    up(Seats);
    
    /* wait in the waiting room if barber is busy. */
    down(Barber);
    // customer is having hair cut
   } else {
    /* release the lock */
    up(Seats);
    // customer leaves
   }
 }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

读写同步问题

考虑这样一种情况:我们有一个许多人共享的文件:

如果其中一个人试图编辑该文件,其他任何人都不应该在同一时间读或写,否则他将看不到变化。然而,如果某个人正在阅读该文件,那么其他人也可能同时阅读它。确切地说,在操作系统中,我们把这种情况称为 "读写问题"(readers-writers problem)。

问题参数:

  • 一组数据在多个进程之间共享
  • 一旦一个写者准备好了,它就进行写。每次只有一个写者可以写
  • 如果一个进程正在写,其他进程不能读它
  • 如果至少有一个读者在读,其他进程就不能写。
  • 读者可以不写,只读

读者比写者有优先权时的解决方案:

这里的优先级是指,如果当前的共享被打开用于读取,则没有读者应该等待。使用了三个变量:mutex, wrt, readcnt 来实现解决方案:

  • semaphore mutex, wrt。semaphore mutex 用于确保在 readcnt 更新时,即任何读者进入或退出临界区时的相互排斥,semaphore wrt 同时被读者和写者使用。
  • int readcnt。readcnt 表示在临界区进行读取的进程数量,最初为 0。

semaphore 的函数:

  • wait () : 递减 semaphore 的值。
  • signal ():增加信号量的值。

写者进程:

  • 写入者请求进入临界区。
  • 如果允许,即 wait () 给出一个真值,它就进入并执行写操作。如果不允许,它将继续等待。
  • 它退出临界区。
c
do {
    // writer requests for critical section
    wait(wrt);  
   
    // performs the write

    // leaves the critical section
    signal(wrt);

} while(true);
1
2
3
4
5
6
7
8
9
10

读者过程:

  • 读者要求进入临界区。
  • 如果允许的话。它将增加临界区中的读者数量。如果这个读者是第一个进入的读者,它会锁定 wrt semaphore,以限制任何写者进入。然后,它向 mutex 发出信号,因为任何其他读者都被允许进入。在进行读取后,它退出临界区。当退出时,它检查是否有更多的读者在里面,如果没有更多读者,它释放信号信量 "wrt",因为现在写者可以进入临界区。
  • 如果不允许,它将继续等待。
c
do {
    
   // Reader wants to enter the critical section
   wait(mutex);

   // The number of readers has now increased by 1
   readcnt++;                          

   // there is atleast one reader in the critical section
   // this ensure no writer can enter if there is even one reader
   // thus we give preference to readers here
   if (readcnt==1)     
      wait(wrt);                    

   // other readers can enter while this current reader is inside 
   // the critical section
   signal(mutex);                   

   // current reader performs reading here
   wait(mutex);   // a reader wants to leave

   readcnt--;

   // that is, no reader is left in the critical section,
   if (readcnt == 0) 
       signal(wrt);         // writers can enter

   signal(mutex); // reader leaves

} while(true);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

因此,信号 "wrt" 在读者和写者之间排队,其方式是如果写者也在那里,则优先考虑读者。因此,没有一个读者仅仅因为一个写作者要求进入临界区而在等待。

Released under the MIT License.