目录
- 线程池
- locker.h
- threadpool.h
线程池
相比于动态地创建子线程,选择一个已经存在的子线程的代价显然要小得多。至于主线程选择哪个子线程来为新任务服务,有多种方式:
- 主线程使用某种算法来主动选择子线程。最简单、最常用的算法是随机算法和Round Robin(轮流选取)算法,但更优秀、更智能的算法将使任务在各个工作线程中更均匀地分配,从而减轻服务器的整体压力。
- 主线程和所有子线程通过一个共享的工作队列来同步,子线程都睡眠在该工作队列上。当有新的任务到来时,主线程将任务添加到工作队列中。这将唤醒正在等待任务的子线程,不过只有一个子线程将获得新任务的“接管权”,它可以从工作队列中取出任务并执行之,而其他子线程将继续睡眠在工作队列上。
线程间竞争的不是CPU的计算资源而是IO,IO的处理一般较慢。
池是一组资源的集合,这组资源在服务器启动之初就被完全创建好并初始化,这称为静态资源。
当服务器执行完之后,把相关的资源放回池中,无需执行系统调用释放资源。
创建一个webserver目录
locker.h
#ifndef LOCKER_H
#define LOCKER_H#include<pthread.h>
#include<exception>
#include<semaphore.h>//线程同步机制封装类//互斥锁类
class locker{
public:locker(){if(pthread_mutex_init(&m_mutex,NULL)!=0){throw std::exception();}}~locker(){pthread_mutex_destroy(&m_mutex);}bool lock(){return pthread_mutex_lock(&m_mutex)==0;}bool unlock(){return pthread_mutex_unlock(&m_mutex)==0;}pthread_mutex_t * get(){return &m_mutex;}
private:pthread_mutex_t m_mutex;
};//条件变量类
class cond{
public:cond(){if(pthread_cond_init(&m_cond,NULL)!=0){throw std::exception();}}~cond(){pthread_cond_destroy(&m_cond);}bool wait(pthread_mutex_t * mutex){return pthread_cond_wait(&m_cond,mutex)==0;}bool timewait(pthread_mutex_t * mutex,struct timespec t){return pthread_cond_timedwait(&m_cond,mutex,&t)==0;}bool signal(){return pthread_cond_signal(&m_cond)==0;}bool broadcast(){return pthread_cond_broadcast(&m_cond)==0;}
private:pthread_cond_t m_cond;
};//信号量类
class sem{
public:sem(){if(sem_init(&m_sem,0,0)!=0){throw std::exception();}}sem(int num){if(sem_init(&m_sem,0,num)!=0){throw std::exception();}}~sem(){sem_destroy(&m_sem);}//等待信号量bool wait(){return sem_wait(&m_sem)==0;}//增加信号量bool post(){return sem_post(&m_sem)==0;}
private:sem_t m_sem;
};
#endif
threadpool.h
#ifndef THREADPOOL_H
#define THREADPOOL_H#include<pthread.h>
#include<list>
#include<exception>
#include<cstdio>
#include"locker.h"//线程池类,定义成模板类是为了代码的复用,模板参数T是任务类
template<typename T>
class threadpool{
public:threadpool(int thread_number=8,int max_requests=10000);~threadpool();bool append(T* request);private:static void* worker(void * arg);void run();private://线程的数量int m_thread_number;//线程池数组,大小为m_thread_numberpthread_t * m_threads;//请求队列中最多允许的,等待处理的请求数量int m_max_requests;//请求队列std::list< T*> m_workqueue;//互斥锁locker m_queuelocker;//信号量用来判断是否有任务需要处理sem m_queuestat;//是否结束线程bool m_stop;
};template<typename T>
threadpool<T>::threadpool(int thread_number,int max_requests) :m_thread_number(thread_number),m_max_requests(max_requests),m_stop(false),m_threads(NULL){if((thread_number <=0)||(max_requests<=0)){throw std::exception();}m_threads=new pthread_t[m_thread_number];if(!m_threads){throw std::exception();}//创建thread_number个线程,并将它们设置为线程脱离for(int i=0;i<thread_number;i++){printf("create the %dth thread\n",i);if(pthread_create(m_threads + i,NULL,worker,this)!=0){delete [] m_threads;throw std::exception();}if(pthread_detach(m_threads[i])){delete[] m_threads;throw std::exception();}}}template<typename T>
threadpool<T>::~threadpool(){delete[] m_threads;m_stop=true;
}template<typename T>
bool threadpool<T>::append(T *request){m_queuelocker.lock();if(m_workqueue.size()>m_max_requests){m_queuelocker.unlock();return false;}m_workqueue.push_back(request);m_queuelocker.unlock();m_queuestat.post();return true;
}template<typename T>
void* threadpool<T>::worker(void *arg){threadpool * pool=(threadpool *)arg;pool->run();return pool;
}template<typename T>
void threadpool<T>::run(){while(!m_stop){m_queuestat.wait();m_queuelocker.lock();if(m_workqueue.empty()){m_queuelocker.unlock();continue;} T* request=m_workqueue.front();m_workqueue.pop_front();m_queuelocker.unlock();if(!request){continue;}request->process();}
}
#endif