同步
同步概念
同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步
竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解
如何理解同步?
假如有一个vip自习室,一天小李拿到钥匙进去了,这个自习室只允许进去一个人,外面还排着很多人要进去,但小李拿到了钥匙,一直自习到中午,小李觉得饿了,想去餐厅吃点饭,但小李正想着出去吃饭,又想到这个自习室是免费的,这便宜不赚白不赚,小李刚走出门,又拿着钥匙回来了,可小李又饿了,又出门,又回来了,这样辗转反复造成了小李自身效率低下,排队的其他人造成饥饿问题。
这样小李并没什么错啊,但是不合理,因为不高效!
所以自习室负责人就想到办法了,就规定每一个自习完成的同学归还钥匙后,1.不能立马申请,2.要二次申请,必须排队(换句话说,其他人也必须排队)
这样就可以保证,所有人访问自习室的过程未来是安全的&&具有一定的顺序性,这就是同步,可以是严格的顺序性,也可以是宏观上的相对的顺序性。
条件变量
条件变量与互斥锁的接口类似,可以选择全局的或者是局部的。
无非多了一个wait接口!还有broadcast(唤醒全部线程)和signal(唤醒一个线程)接口
认识条件变量,我们先讲个小故事吧!
现在有两个人,A和B,一个盘子,一个苹果,一个铃铛,B要把苹果放到盘子里,A去拿盘子里的苹果,盘子就是共享资源,那么就要需要加锁和解锁,A去拿苹果时要判断苹果是否存在,如果存在就进去拿苹果,如果不存在,A就需要到铃铛那里等待,当B把苹果成功放入盘子,B会去敲响铃铛,A就会被唤醒重新判断进去拿苹果,往后再有C,D等等来拿苹果时,都需要判断苹果是否存在,不存在就进入铃铛队列中,等待被唤醒!!!
那么此时的铃铛就是条件变量:1.需要一个线程队列。2.需要有通知机制。
而B可以选择叫醒一个,或者叫醒全部!!!这就是条件变量的理解!
代码测试:
#include<iostream>
#include<unistd.h>
#include<pthread.h>const int num = 5;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t gcond = PTHREAD_COND_INITIALIZER;
void* Wait(void* args)
{std::string name = static_cast<const char*>(args);while(true){pthread_mutex_lock(&mutex);pthread_cond_wait(&gcond, &mutex);usleep(1000);std::cout << "I am:" << name << std::endl;pthread_mutex_unlock(&mutex);//usleep(1000);}
}
int main()
{pthread_t threads[num];for(int i = 0; i < num; i++){char* name = new char[1024];snprintf(name, 1024, "thread-%d", i + 1);pthread_create(threads + i, nullptr, Wait, (void*)name);usleep(1000);}sleep(1);//主线程唤醒其他线程while(true){//pthread_cond_signal(&gcond);pthread_cond_broadcast(&gcond);std::cout << "唤醒一个线程" << std::endl;sleep(1);}for(int i = 0; i < num; i++){pthread_join(threads[i], nullptr);}return 0;
}
生产消费模型
理论
假设有一个超市生产方便面,有对应多个供应商提供商品,多个消费者进行消费,超市不就是一个巨大的缓存吗!让供应商并发的生成商品,消费者又并发的进行消费。这种模型优点,协调忙闲不均,效率高,解耦,都依赖并发的执行!
所以生产消费模型是多执行流并发的模型!
我们要实现生产消费模型,如何思考切入呢?
“321”原则:
- 一个交易场所(特定数据结构形式存在的一段内存空间)
- 两种角色(生产角色,消费角色)生产线程,消费线程
- 三种关系(生产与生产(互斥),消费与消费(互斥),生产与消费(互斥&&同步))
所以实现生产消费模型本质就是通过代码实现“321”原则,用锁和条件变量(或其他方式)来实现三种关系!!!
基于BlockingQueue的生产者消费者模型实现
BlockingQueue本质就是一个队列,常用来实现生产者消费者模型。在队列为空时获取元素的消费者线程就会被阻塞,直到队列中被生产者放入数据并唤醒阻塞的消费者线程。同样的,如果队列为满,那么此时不能再生产数据,生产者线程就会阻塞,直到消费者从队列中拿走数据并唤醒等待的生产者线程!
BlockQueue.hpp --- 主要是阻塞队列的具体实现
#pragma once
#include<iostream>
#include<string>
#include<queue>const static int defaultcap = 5;
template<class T>
class BlockQueue
{
public:bool IsFull(){return _block_queue.size() == _max_cap;}bool IsEmpty(){return _block_queue.empty();}
public:BlockQueue(int cap = defaultcap):_max_cap(cap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_c_cond, nullptr);pthread_cond_init(&_p_cond, nullptr);}//假如:两个消费者void Pop(T* out){pthread_mutex_lock(&_mutex);while(IsEmpty()) //if? --》while可以保证代码的鲁棒性(健壮性){//添加尚未满足,但是线程被异常唤醒的情况,叫做伪唤醒!pthread_cond_wait(&_c_cond, &_mutex);//两个消费者都在这里等待,//当两个消费者都被唤醒,其中一个消费者函数返回会往下执行,另一个消费者则会在wait里面的锁内部等待}//1.没有空 || 2. 被唤醒了*out = _block_queue.front();//到阻塞队列消费_block_queue.pop();pthread_mutex_unlock(&_mutex);//唤醒生产者pthread_cond_signal(&_p_cond);}//一个生产者void Equeue(const T& in){pthread_mutex_lock(&_mutex);while(IsFull()) //if?{//满了,生产者不能生产,必须等待//可是在临界区里面啊!//pthread_cond_wait被调用的时候:除了让自己继续排队等待,还会自己释放传入的锁//函数返回时,不就还在临界区了!//返回时:必须先参与锁的竞争,重新加上锁,该函数才会返回pthread_cond_wait(&_p_cond, &_mutex);}//1.没有满 || 2. 被唤醒了_block_queue.push(in);//生产到阻塞队列pthread_mutex_unlock(&_mutex);//唤醒消费者pthread_cond_signal(&_c_cond);//pthread_cond_broadcast:一种场景}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_c_cond);pthread_cond_destroy(&_p_cond);}
private:std::queue<T> _block_queue;//临界资源int _max_cap;pthread_mutex_t _mutex;//消费者的锁pthread_cond_t _c_cond;//消费者的条件变量pthread_cond_t _p_cond;//生产者的条件变量
};
Task.hpp --- 模拟未来要完成的一些任务
#pragma once
#include<iostream>
#include<string>
#include<functional>//typedef std::function<void()> task_t;
using task_t = std::function<void()>;void Download()
{std::cout << "我是一个下载任务" << std::endl;
}
class Task
{
public:Task(){}Task(int x, int y):_x(x), _y(y){}void Excute(){_result = _x + _y;}std::string debug(){std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=?";return msg;}void operator()(){Excute();}std::string result(){std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=" + std::to_string(_result);return msg;}
private:int _x;int _y;int _result;
};
main.cc --- 测试代码逻辑
#include<iostream>
#include<unistd.h>
#include<pthread.h>
#include"BlockQueue.hpp"
#include"Task.hpp"void* Consumer(void* args)
{BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t>*>(args);while(true){//sleep(2);//1.获取数据//int data = 0;//Task t;task_t t;bq->Pop(&t);//2.处理数据//t.Excute();t();//std::cout << "Consumer -> " << t.result() << std::endl;}
}
void* Productor(void* args)
{srand(time(nullptr) ^ getpid());BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t>*>(args);while(true){//1.构建数据//int data = rand() % 10 + 1;// int x = rand()%10 + 1;// usleep(x * 1000);// int y = rand()%10 + 1;//Task t(x, y);//2.生产数据bq->Equeue(Download);//std::cout << "Productor -> " << t.debug() << std::endl;std::cout << "Productor -> Download" << std::endl;sleep(1);}
}
int main()
{BlockQueue<task_t> *bq = new BlockQueue<task_t>();pthread_t c1, c2, c3, p1, p2;pthread_create(&c1, nullptr, Consumer, bq);pthread_create(&c2, nullptr, Consumer, bq);pthread_create(&c3, nullptr, Consumer, bq);pthread_create(&p1, nullptr, Productor, bq);pthread_create(&p2, nullptr, Productor, bq);pthread_join(c1, nullptr);pthread_join(c2, nullptr);pthread_join(c3, nullptr);pthread_join(p1, nullptr);pthread_join(p2, nullptr);return 0;
}