线程(二)
- 1. 生产消费模型
- 概念:
- 阻塞队列
- 生产消费模型优点
- 代码 - 模拟实现阻塞队列
- 2. POSIX信号量
- 概念
- 有名信号量(Named Semaphore)
- 无名信号量(Unnamed Semaphore):
- 作用
- 初始化信号量 - sem_init (无名信号量)
- 销毁信号量 - sem_destroy
- 等待信号量 - sem_wait
- 释放信号量 - sem_post
- 代码 - POSIX信号量示例
- 3. 可重入和线程安全
- 二者概念
- 常见线程不安全情况
- 常见的线程安全的情况
- 常见不可重入的情况
- 常见的可重入的情况
- 可重入和线程安全关系
- 可重入与线程安全区别
- 4. 线程池 - 代码
- 5. STL容器与线程安全
- 6. 智能指针与线程安全
- 7. 其他常见的锁
1. 生产消费模型
概念:
- 一个交易场所:内存空间
- 两种角色:生产线程,消费线程
- 三种关系:生产和生产(互斥),消费和消费(互斥),生产和消费(互斥和同步)
阻塞队列
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
生产消费模型优点
- 解耦
- 协调忙闲不均
- 效率高(生产任务和处理任务并发了)
代码 - 模拟实现阻塞队列
//========================== 阻塞队列部分 ==========================//
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <functional>
#include <queue>
using namespace std;const int Capacity = 5; //设定队列长度为5template <class T>class BlockQueue{private:bool IsFull(){return qe.size() == _cap_max;}bool IsEmpty(){return qe.empty();}public:BlockQueue(){_cap_max = Capacity;pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_c_cond, nullptr);pthread_cond_init(&_p_cond, nullptr);}void Push(T &in){pthread_mutex_lock(&_mutex);while (IsFull()) //队列为满,停止生产{//等待消费者拿取数据后激活条件变量pthread_cond_wait(&_p_cond,&_mutex);}//队列肯定没满qe.push(in);pthread_cond_signal(&_c_cond);pthread_mutex_unlock(&_mutex);}T Pop(){pthread_mutex_lock(&_mutex);while (IsEmpty()) //队列为空,停止取数据{//等待生产者生产数据后激活条件变量pthread_cond_wait(&_c_cond,&_mutex);}//队列肯定至少有一个数据T ret = qe.front();qe.pop();pthread_cond_signal(&_p_cond);pthread_mutex_unlock(&_mutex);return ret;}private:queue<T> qe;size_t _cap_max;pthread_mutex_t _mutex;pthread_cond_t _c_cond;pthread_cond_t _p_cond;};
//========================== 主函数部分 ==========================//
#include"BlockQueue.hpp"pthread_mutex_t Gmutex = PTHREAD_MUTEX_INITIALIZER;
void* Consumer(void* bq)
{if(bq){BlockQueue<int>* out = static_cast<BlockQueue<int>*>(bq);while(true){int tmp = out->Pop();cout<<"Consumer get a data: "<<tmp<<endl;sleep(1);}}return nullptr;
}
void* Productor(void* bq)
{if(bq){BlockQueue<int>* in = static_cast<BlockQueue<int>*>(bq);int cnt = 0;while(true){//这里加锁原因:可能多个线程拿同一个cnt进阻塞队列等待pthread_mutex_lock(&Gmutex);in->Push(cnt);cnt++;pthread_mutex_unlock(&Gmutex);}}return nullptr;
}
int main()
{pthread_t c1,c2,p1,p2,p3;BlockQueue<int>* bq = new BlockQueue<int>();pthread_create(&c1,nullptr,Consumer,bq);pthread_create(&c2,nullptr,Consumer,bq);pthread_create(&p1,nullptr,Productor,bq);pthread_create(&p2,nullptr,Productor,bq);pthread_create(&p3,nullptr,Productor,bq);pthread_join(c1,nullptr);pthread_join(c2,nullptr);pthread_join(p1,nullptr);pthread_join(p2,nullptr);pthread_join(p3,nullptr);return 0;
}
2. POSIX信号量
概念
有名信号量(Named Semaphore)
有名信号量有一个名字,它可以在文件系统中以一个名字来标识自己。这使得不同的进程可以通过这个名字来找到并使用同一个信号量。有名信号量通常用于进程间通信。例如,在一个多进程的服务器应用程序中,不同的进程可能需要协调对共享数据库的访问,就可以使用有名信号量。
对于有名信号量,可以使用sem_open函数来创建和打开一个有名信号量。它的基本语法是sem_t *sem_open(const char *name, int oflag,…);,其中name是信号量的名字,oflag用于指定打开的方式(如创建新的信号量或者打开已有的信号量)。
无名信号量(Unnamed Semaphore):
无名信号量没有名字,它通常是在内存中分配的一块空间,用于在同进程内的线程之间进行同步和互斥。比如,在一个多线程的图像处理程序中,多个线程可能需要协调对图像缓冲区的访问,此时可以使用无名信号量。
作用
POSIX信号量可用于线程间同步,信号量的本质是计数器。
需包含头文件<semaphore.h>
初始化信号量 - sem_init (无名信号量)
int sem_init(sem_t *sem, int pshared, unsigned int value);
- sem:输出型参数,表示要初始化的信号量
- pshared:0表示线程间共享,非零表示进程间共享
- value:信号量初始值
销毁信号量 - sem_destroy
int sem_destroy(sem_t *sem);
等待信号量 - sem_wait
int sem_wait(sem_t *sem)
- 等待信号量又叫P操作,当一个进程或线程调用sem_wait时,如果信号量的值大于0,信号量的值就会减1,表示占用了一个资源;如果信号量的值为0,调用者就会阻塞,直到信号量的值大于0。这就像是在等待有可用的资源许可证。
释放信号量 - sem_post
int sem_post(sem_t *sem)
- 释放信号量又叫V操作,它会使信号量的值加1,表示释放了一个资源。如果有其他进程或线程正在等待这个信号量(因为之前调用sem_wait而阻塞),那么其中一个等待者就会被唤醒,去获取刚刚释放的资源。
代码 - POSIX信号量示例
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdlib.h>
#include <unistd.h>
sem_t sem;
void *thread_function(void *arg) {// 等待信号量sem_wait(&sem);printf("Thread %lu is accessing the shared resource.\n", pthread_self());// 模拟线程使用共享资源的时间usleep(200000); // 释放信号量sem_post(&sem);pthread_exit(NULL);
}
int main() {pthread_t threads[5];// 初始化无名信号量,用于线程间同步,初始值为1if (sem_init(&sem, 0, 1)!= 0) {perror("sem_init");return 1;}for (int i = 0; i < 5; i++) {if (pthread_create(&threads[i], NULL, thread_function, NULL)!= 0) {perror("pthread_create");// 销毁信号量,因为线程创建失败sem_destroy(&sem);return 1;}}for (int i = 0; i < 5; i++) {if (pthread_join(threads[i], NULL)!= 0) {perror("pthread_join");// 销毁信号量,因为线程等待失败sem_destroy(&sem);return 1;}}// 销毁信号量if (sem_destroy(&sem)!= 0) {perror("sem_destroy");return 1;}return 0;
}
3. 可重入和线程安全
二者概念
- 线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。
- 重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。
常见线程不安全情况
- 不保护共享变量的函数
- 函数状态随着被调用,状态发生变化的函数
- 返回指向静态变量指针的函数
- 调用线程不安全函数的函数
常见的线程安全的情况
- 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的
- 类或者接口对于线程来说都是原子操作
- 多个线程之间的切换不会导致该接口的执行结果存在二义性
常见不可重入的情况
- 调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的
- 调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构
- 可重入函数体内使用了静态的数据结构
常见的可重入的情况
- 不使用全局变量或静态变量
- 不使用用malloc或者new开辟出的空间
- 不调用不可重入函数
- 不返回静态或全局数据,所有数据都有函数的调用者提供
- 使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据
可重入和线程安全关系
- 函数是可重入的,那就是线程安全的
- 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题
- 如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的
可重入与线程安全区别
- 可重入函数是线程安全函数的一种
- 线程安全不一定是可重入的,而可重入函数则一定是线程安全的。
- 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生死锁,因此是不可重入的。
4. 线程池 - 代码
// ThreadPool.hpp#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <functional>
#include <unistd.h>using namespace std;using func_t = function<void(const string& name)>;
const int Capacity = 5;class Task
{
public:Task() {}Task(int x, int y) : _x(x), _y(y){}void operator()(){cout << "get a task: " << _x << " + " << _y << " = ?" << endl;_result = _x + _y;}void result(){cout << "result: " << _x << " + " << _y << " = " << _result << endl;}private:int _x;int _y;int _result;
};template <class T>
class ThreadPool
{
private:void LockQueue() //对临界资源上锁{pthread_mutex_lock(&_mutex);}void UnlockQueue() //对临界资源解锁{pthread_mutex_unlock(&_mutex);}void Sleep() //等待条件{pthread_cond_wait(&_cond, &_mutex);}void WakeUp() //唤醒单个线程{pthread_cond_signal(&_cond);}void WakeUpAll() //唤醒全部{pthread_cond_broadcast(&_cond);}bool IsEmpty(){return _qe.empty();}void HandlerTask(const string &name){while (true){LockQueue();while (IsEmpty() && _isrunning) // 任务队列为空 且 正在运行{_active_thread_num--;Sleep();_active_thread_num++;}if (IsEmpty() && !_isrunning) // 任务队列为空 且 运行状态为停止{UnlockQueue();break;}// 有任务T t = _qe.front();_qe.pop();UnlockQueue();// 执行任务t();cout << name << " ";t.result();}}public:ThreadPool(size_t thread_cap = Capacity): _thread_cap(thread_cap){func_t func = std::bind(&ThreadPool::HandlerTask, this, placeholders::_1);pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);for (int i = 0; i < _thread_cap; i++){//线程名,当做函数参数传入string s = "thread-" + to_string(i + 1);_threads.emplace_back(func,s);}}void Start(){_isrunning = true;_active_thread_num = _thread_cap;for(auto& td: _threads)td.detach();}void Stop(){LockQueue();_isrunning = false;WakeUpAll();UnlockQueue();}void EQueue(T &in){LockQueue();if (_isrunning){_qe.push(in);if (_active_thread_num < _thread_cap)WakeUp();}UnlockQueue();}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}private:vector<thread> _threads;queue<T> _qe;bool _isrunning;size_t _active_thread_num;size_t _thread_cap;pthread_cond_t _cond;pthread_mutex_t _mutex;
};
// mian.cc#include"ThreadPool.hpp"int main()
{srand(0);ThreadPool<Task>* tp = new ThreadPool<Task>; //创建线程池tp->Start(); //启动int cnt = 5; //假设只构建5个任务while(cnt--){sleep(1);Task t(rand()%1000,rand()%1000); //创造随机任务tp->EQueue(t); //放入线程池队列}tp->Stop(); //退出线程池sleep(2);return 0;
}
5. STL容器与线程安全
- STL不是线程安全的
- STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响
- 而且对于不同的容器, 加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶)
- 因此 STL 默认不是线程安全. 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全
6. 智能指针与线程安全
- 对于unique_ptr,仅是在当前代码块范围内生效,不涉及线程安全问题
- 对于shared_ptr,多个对象需要共用一个引用计数变量, 所以会存在线程安全问题. 但是标准库实现的时候考虑到了这个问题, 基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数
7. 其他常见的锁
- 悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。
- 乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。
- CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试