前言
IPC中介绍过信号量, 为了让进程间通信, 从而多个执行流看到同一份公共资源, 对于并发访问造成数据不一致问题, 就需要把公共资源保护起来, 从而就需要同步与互斥.
信号量共有三个特性:
1. 本质是一把用于描述临界资源资源的数目的计数器
2. 每一个执行流想访问公共资源内的某一份资源,不应该让执行流直接访问,而是先申请信号量资源(对资源预定), 其实就是先对信号量计数器进行P操作. 本质上, 只要P成功. 完成了对资源的预订机制.
3. 信号量也是公共资源, 为了保证自身安全, PV操作是原子的.
为什么使用信号量
我们实现基于阻塞队列的生产者消费者模型时, 为什么使用互斥锁而不是其它的管理同步互斥的资源?
因为我们是把这个队列当作一个整体来用. 每个线程拿到该队列后, 对该队列的Push和pop操作是会互相影响的, 可能涉及空间的重新配置, 所以容器不是线程安全的.
所以阻塞队列的模型有以下缺点:
- 访问临界资源前, 无法得知临界资源的情况, 所以要把临界资源当作一个整体使用
- 多个线程不能同时访问临界资源的不同区域
为了解决上述问题, 现在我们想使用信号量管理同步与互斥. 如果当前存在一个数组公共资源:
1. 使用时并不把公共资源当作整体
2. 多线程不访问临界资源的同一个区域, 而是访问临界资源的不同区域
最好的情况下, 我们为公共资源里的 N 份分别分给 N 个线程去使用, 但是为了保证 N 个资源不分配给多于 N 个线程访问, 我们需要信号量的PV操作去管理资源:
- 申请信号量(P操作)
- 访问N个资源中指定的一个资源
- 释放信号量(V操作)
当信号量内计数器减到0的时候, 说明临界资源的 N 个区域都有线程访问. 其他想访问临界资源的线程由于申请信号量不成功, 就只能在信号量的阻塞队列中等待.
其中与之前使用阻塞队列的最大不同之处在于: 我们通过信号量管理的资源在使用的时候不需要对其进行条件判断, 比如 while(isFull()); 因为之前线程拿到阻塞队列并不知道其中资源的使用情况(空或满), 需要判断; 而信号量管理的资源把判断结合到P操作中, 所以线程通过信号量预定的资源一定是可以使用的.
举个例子, 我们去电影院线下买票时要问一问售票员是否还有票, 但是线上预定好票之后就不会再问了, 直接取票使用.
所以通过使用信号量:
- 线程可以在不访问临界资源的情况下知晓资源的使用情况,
- 允许多线程同时访问共享资源的不同区域, 有效提升了程序的效率。
基于环形队列的生产者消费者模型
1. 认识信号量接口
首先, 信号量也是一个类(sem_t), 也可以构造对象(sem_t sem),对象内也有成员函数:
int sem_init(sem_t* sem, int pshared, unsigned int value);
头文件:semaphore.h
功能:初始化信号量
参数:sem_t* sem 表示需要被初始化的信号量的地址. int pshared表示信号量共享模式, 0表示线程间共享, 非零表示进程间共享, 这里设为0. unsigned int value表示信号量的初始值, 也就是计数器的初始值.
返回值:初始化成功返回0,失败返回-1并将错误码设置进errno
int sem_destroy(sem_t* sem);
头文件:semaphore.h
功能:销毁信号量
参数:sem_t* sem表示需要被销毁的信号量的地址
返回值:初始化成功返回0, 失败返回-1并将错误码设置进errno
int sem_wait(sem_t* sem);
头文件:semaphore.h
功能:申请(等待)信号量, P操作
参数:sem_t* sem 表示需要申请的信号量地址
返回值:初始化成功返回0, 失败返回-1并将错误码设置进errno
int sem_post(sem_t* sem);
头文件:semaphore.h
功能:归还(发布)信号量, V操作
参数:sem_t* sem 表示需要归还的信号量地址。
返回值:初始化成功返回0, 失败返回-1并将错误码设置进errno
注意: 现在操作系统的 IPC 普遍使用 systemV 和 POSIX 两种标准. 正是因为信号量和互斥锁都使用了POSIX标准,所以信号量与互斥锁的接口非常类似
2. 环形队列
数据结构中用数组模拟环形队列时, 通过 下标%总长度 得到逻辑上的环形队列.
基于Ringbuffer的PC模型的基本规则
基于环形队列的生产者消费者模型需要遵守这几个规则:
1. 生产者不能把消费者套圈, 此时生产者生产完最后一个数据, 马上追上消费者, 如果生产者继续生产, 将覆盖原来生产的数据, 不可以.
2. 消费者不能超过生产者
这个也很容易理解, 消费者要等待生产者生产完数据才能消费.
图中消费者消费速度很快, 马上超过生产者, 所以此时就需要等待生产.
但是是否能在同一位置?
结论: 消费者和生产者在同一位置时, 要么为空, 要么为满. 在其它情况下, 两者根本不会指向同一个位置!
a. 为空时, 只允许生产者生产, "只"体现了互斥, "生产者"体现了同步.
b. 为满时, 只运行消费者消费, "只"体现了互斥, "消费者"体现了同步.
数据结构中我们认为 front == rear 为空, (rear+1) % N == front 为满, 此处我们用信号量控制资源, 就不需要这种方式判空和满.
清楚了模型的规则之后, 我们要对资源进行认识, 怎样可以通过信号量来控制空与满?
由于生产者只负责将数据生产到环形队列中, 当环形队列满了以后就不能生产了, 所以它只关心队尾后还有多少空间供它生产数据
由于消费者只负责从环形队列中取数据, 当环形队列空了以后就不能消费了, 所以它只关心队首到队尾有几个数据可以供它消费
所以我们得到:
生产者在意环形队列中空闲空间可存储数据的个数
消费者在意环形队列中数据的个数
于是我们就可以两个信号量来控制:
- 生产者信号量 -> 管理空间
- 消费者信号量 -> 管理数据
生产和消费的伪代码大体上是这样的:
//生产者
P(空间)
生产者行为
V(资源)//消费者
P(资源)
消费者行为
V(空间)
多生产多消费模型
但是前面的理论叙述, 我们只体现出3个关系中"生产者与消费者的互斥与同步", 我们其实还没体现出生产者之间和消费者之间的关系, 于是我们在上面的基础上对临界区加锁:
//生产者
P(空间)
加锁
生产者行为
释放锁
V(资源)//消费者
P(资源)
加锁
消费者行为
释放锁
V(空间)
有一个问题, 申请互斥锁和信号量谁在前比较合适呢?
信号量在前, 申请锁在后
如果申请互斥锁在前, 申请信号量在后 , 会怎么样?
1. 假如现在我们只有一把锁:
如果一个生产者(或消费者)线程申请到锁后,信号量申请失败了,那线程就只能拿着锁阻塞,其他生产者(或消费者)线程就申请不到锁, 产生死锁, 整个程序就卡住了。
2. 假如现在我们有两把锁, 生产和消费各一把:
- 由于有两把锁, 生产和消费互不影响, 程序不会产生死锁.
- 但是对于线程来说, 申请锁是有代价的, 将信号量申请放在前面可以减少申请锁的次数
总结: 其实不管用什么形式控制临界资源去实现PC模型, 只要能满足 理论中的3个关系就可以.
代码实现
现在再来实现RingBuffer, 和之前一样, 用Task充当资源, 生产者线程和消费者线程的行为和基于阻塞队列的PC一样, 通过向队列Push和Pop数据来进行生产和消费:
#include "RingBuffer.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <time.h>
#include <stdlib.h>
#include <iostream>
#include <string>std::string opers = "+-*/()";void *produce(void *arg)
{RingBuffer<Task> *prb = static_cast<RingBuffer<Task> *>(arg);while (true){int dataX = rand() % 10;usleep(100);int dataY= rand() % 10;usleep(100);char oper = opers[rand() % opers.size()];Task t(dataX, dataY, oper);prb->Push(t);std::cout << "Producer task: ";t.PrintTask();sleep(1);}}void *consumer(void *arg)
{RingBuffer<Task> *prb = static_cast<RingBuffer<Task> *>(arg);while (true){Task t;prb->Pop(&t);t.Run();usleep(100);std::cout << "Consumer task: ";t.PrintTaskResult();}
}int main()
{srand(time(nullptr));RingBuffer<Task> rb;pthread_t con[3], pro[2];for(auto& c: con)pthread_create(&c, nullptr, consumer, (void *)&rb);for(auto& p: pro)pthread_create(&p, nullptr, produce, (void *)&rb);for(auto& c: con)pthread_join(c, nullptr);for(auto& p: con)pthread_join(p, nullptr);return 0;
}
环形队列中需要有两个信号量控制生产者和消费者的互斥与同步, 两个锁控制生产者之间和消费者之间的互斥 .
此外, 为了插入和取出, 还需要两个下标来记录当前的位置.
#pragma once
#include <semaphore.h>
#include <pthread.h>
#include "LockGuard.hpp"
#include <vector>template <class T>
class RingBuffer
{
public:RingBuffer(int size = 10): _size(size), _q(_size), _c_pos(0), _p_pos(0){pthread_mutex_init(&_p_mutex, nullptr);pthread_mutex_init(&_c_mutex, nullptr);sem_init(&_p_sem, 0, _size);sem_init(&_c_sem, 0, 0);}void P(sem_t *sem){sem_wait(sem);}void V(sem_t *sem){sem_post(sem);}void Push(const T &in){P(&_p_sem);{LockGuard lg(&_p_mutex);_q[_p_pos++] = in; // 放入数据_p_pos %= _size; // 更新生产者位置}V(&_c_sem);}void Pop(T *out){P(&_c_sem);{LockGuard lg(&_c_mutex);*out = _q[_c_pos++]; // 取出数据_c_pos %= _size; // 更新消费者位置}V(&_p_sem);}private:size_t _size;//队列大小std::vector<T> _q;// 环形队列sem_t _p_sem;//生产者信号量, 管理空间sem_t _c_sem;// 消费者信号量, 管理数据pthread_mutex_t _c_mutex;//消费者互斥pthread_mutex_t _p_mutex;//生产者互斥int _c_pos;//消费者当前位置int _p_pos;///生产者当前位置
};
运行程序, 我们有3个消费者和2个生产者. 由于我们假设消费者消费速度很快, 生产者1s生产一个数据, 所以结果呈现出每秒有两个消费者竞争去消费生产者生产的数据.