前言 :
代码也比较短,简单说一下代码结构,是这样的:
SyncQueue.hpp封装了一个大小为MaxTaskCount的同步队列,这是一个模板类,它在线程池中承担了存放任务等待线程组中的线程来执行的角色。最底层是std::list<T>但是我们起名为m_queue,因为我们使用list模拟了队列,这里使用std::vector是不合适的,我们要频繁插入(添任务)删除(取任务),开销太大,具体了解一下std::vector底层。
我们还加了一个m_mtx互斥锁,因为我们要保证对于队列的访问是线程安全的,但是因为使用unique_lock对于m_mtx要修改,所以我们加了mutable关键字(mutable保证我们可以在常方法中修改类的非静态成员)。
std::condition_variable m_notEmpty;
std::condition_variable m_notFull;
他俩是条件变量是用来同步队列任务是否为空为满的。
Add()是添加任务调用的底层函数,我们对它做了一个封装,分别适合左值和右值:
int Put(const T& task)
{
return Add(task);
}
int Put(T&& task)
{
return Add(std::forward<T>(task));
}
同理Take也是两个。
WaitQueueEmptyStop()是后期添加的一个函数,比如在我们添加任务结束后,线程来执行任务,但是此时主线程准备结束,我们调用析构线程池对象的析构函数,它最终会调用这个函数判断任务队列中是否还有任务,如果不空,那么我就弃锁睡眠1秒,循环往复,直到队列为空。
CachedThreadPool.hpp是线程池的代码,Task是可调用对象的包装器,上述任务队列中放的就是Task,我们添加的也是Task,执行的也是Task。底层封装了上述的任务队列。
我们使用一个
std::unordered_map<std::thread::id, std::shared_ptr<std::thread>> m_threadgroup;
来存储某个thread的id 和管理它的共享性智能指针,方便我们到了KeepAliveTimes秒删除它,这是也是我们起名为CachedThreadPool的原因,它是一个缓冲型线程池,线程数量是浮动的,受制于两个原子变量的限制,让线程数不至于太少无法执行任务,不至于太多而空闲:
std::atomic<int> m_idleThreadSize;
std::atomic<int> m_curThreadSize;
构造函数中我们Start()开两个线程,线程的入口函数是CachedThreadPool::RunInThread(),让他们去检测是否有任务,有就task();反之陷入m_queue.Take(task)。
析构函数中我们调用StopThreadGroup(),它会先调用任务队列的WaitQueueEmptyStop()确保任务队列为空,然后使用一个range-for结束掉线程组:
for (auto& x : m_threadgroup)
{
if (x.second->joinable())
{
x.second->join();
}
}
还有一个重要的成员函数:
template <class Func, class... Args>
void AddTask(Func&& func, Args&&... args)
{
auto task = std::make_shared<std::function<void()> >(
std::bind(std::forward<Func>(func),
std::forward<Args>(args)...));if (m_queue.Put([task]() { (*task)(); }) != 0)
{
cerr << "not add task queue... " << endl;
(*task)();
}
AddnewThread();
}
这是一个模板函数,用来添加没有返回值的任务到任务队列中,使用了引用性别未定义,可变模板参数,bind,完美转发,lambda表达式,智能指针,这个函数的成型颇具困难。
template <class Func, class... Args>
auto submit(Func&& func, Args &&...args)
{
using RetType = decltype(func(args...));
auto task = std::make_shared<std::packaged_task<RetType()>>(
std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
std::future<RetType> result = task->get_future();if (m_queue.Put([task]()
{ (*task)(); }) != 0)
{
cerr << "not add task queue... " << endl;
(*task)();
}
AddnewThread();
return result;
}
这是一个模板函数,用来添加带有返回值的任务到任务队列。
main.cpp中有一个有意思的问题,可以探讨一下:
std::shared_ptr<FILE> pf(fopen("test.txt", "w"));
//std::shared_ptr<FILE> pf(fopen("test.txt", "w"),fileDeleter());
main.cpp是我们的测试文件,我们想把测试结果打印到文件中,方便观察,我们刚开始使用了一个裸的FILE* fp;然后在main函数结束之后fclose(fp); fp = nullptr;
结果我们跑不通,最终发现是线程池对象析构之前的这两行代码会被执行,文件被关闭,我们无法将结果写入文件中。
最后我们使用了shared_ptr<FILE>来管理文件,但是写成了第一行,我们发现往屏幕上写结果正确但是写文件不行,而且往屏幕上写总是在线程池对象完美析构,一切安顿好之后出现异常,这令人困惑,我以为是我们线程的释放还做得有问题,谁曾想是这个指针坏事了,pf对象作为一个全局对象,析构于局部的线程池对象之后,此时它会调用它的默认删除器,也就是delete,这显然是不行的,文件指针应该fclose才对,所以我们写了一个自定义的删除器来改正错误。
struct fileDeleter {
void operator()(FILE* fp)const {
if (fp) {
cout << "file close\n";
fclose(fp);
}
}
};
类似错误还有很多,比如之前的:
红框中我们错写为了:m_queue.pop_back();
这个错误虽然看似简单,实则复杂,因为打印出来结果很流畅,但是值不对,后来人肉debug发现这个错误后令我哭笑不得,好家伙,每次拿走一个任务,竟然pop末尾的未被执行的任务..........
多线程代码调试我还不擅长,任重而道远,但我喜欢De这些有趣的bug!
代码总体逻辑类似于生产者-消费者模型,这也是OS学习的经典中的经典,线程池基本就是从这里扩展而来,又分化为了各种类别的线程池,定长的,缓冲的,窃取的.................
源代码:
main.cpp
#define _CRT_SECURE_NO_WARNINGS// C++ STL
#include <thread>
#include <iostream>
using namespace std;
#include"CachedThreadPool.hpp"
struct fileDeleter {void operator()(FILE* fp)const {if (fp) {cout << "file close\n";fclose(fp);}}
};
class Int
{
private:int value;
public:Int(int x = 0) : value(x){cout << "create Int " << value << endl;}~Int(){cout << "destroy Int: " << value << endl;}Int(const Int& it) :value(it.value){cout << "Copy Int " << value << endl;}Int(Int&& it) :value(it.value){cout << "Move create Int : " << value << endl;it.value = -1;}Int& operator=(const Int& it){if (this != &it){value = it.value;}cout << "operator=" << endl;return *this;}Int& operator=(Int&& it){if (this != &it){value = it.value;it.value = -1;}cout << "operator=(Int &&)" << endl;return *this;}Int& operator++(){value += 1;return *this;}Int operator++(int){Int tmp = *this;++* this;return tmp;}bool operator<(const int x) const{return this->value < x;}ostream& operator<<(ostream& out) const {return out << value;}
};
ostream& operator<<(ostream& out, const Int& it)
{return it << out;
}std::mutex mtx;
std::shared_ptr<FILE> pf(fopen("test.txt", "w"),fileDeleter());
void print(Int x)
{std::lock_guard<std::mutex> lock(mtx);//cout << "print x: " << &x << " " << x << endl;fprintf(pf.get(), "print x : %d => &x: %p \n", x, &x);std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100));
}
void funa(myspace::CachedThreadPool& mypool)
{double dx = 1;for (Int i = 0; i < 100; ++i){mypool.AddTask(print, i);}cout << "funa ... end " << endl;
}int main()
{myspace::CachedThreadPool mypool;std::thread tha(funa, std::ref(mypool));tha.join();return 0;
}
SyncQueue.hpp
// C++ STL
#include <list>
#include <deque>
#include <mutex>
#include <condition_variable>
#include <iostream>
using namespace std;#ifndef SYNC_QUEUE_HPP
#define SYNC_QUEUE_HPP
static const int MaxTaskCount = 2000;
namespace myspace
{template <class T>class SyncQueue{private:std::list<T> m_queue;mutable std::mutex m_mtx;std::condition_variable m_notEmpty;std::condition_variable m_notFull;std::condition_variable m_waitStop;int m_maxSize;int m_waitTime;bool m_needStop;bool IsFull() const{bool full = m_queue.size() >= m_maxSize;if (full){cerr << "m_queue full .. wait .. " << endl;}return full;}bool IsEmpty() const{bool empty = m_queue.empty();if (empty){cerr << "m_queue empty... wait ... " << endl;}return empty;}template <class F>int Add(F&& task){std::unique_lock<std::mutex> lock(m_mtx);if (!m_notFull.wait_for(lock, std::chrono::seconds(m_waitTime),[this](){ return m_needStop || !IsFull(); })){return 1; // 任务队列已达上限}if (m_needStop){return 2; // 任务队列stop;}m_queue.push_back(std::forward<F>(task));m_notEmpty.notify_all();return 0;}public:SyncQueue(int maxsize = MaxTaskCount, int timeout = 1): m_maxSize(maxsize), m_needStop(false), m_waitTime(timeout){}~SyncQueue(){if (!m_needStop){Stop();}}int Put(const T& task){return Add(task);}int Put(T&& task){return Add(std::forward<T>(task));}int Take(std::list<T>& tlist){std::unique_lock<std::mutex> lock(m_mtx);if (!m_notEmpty.wait_for(lock, std::chrono::seconds(m_waitTime),[this]() { return m_needStop || !IsEmpty(); })){return 1; // timeout;}if (m_needStop){return 2;}tlist = std::move(m_queue);m_notFull.notify_all();return 0;}int Take(T& task){std::unique_lock<std::mutex> lock(m_mtx);if (!m_notEmpty.wait_for(lock, std::chrono::seconds(m_waitTime),[this]() { return m_needStop || !IsEmpty(); })){return 1; // timeout;}if (m_needStop){return 2;}task = m_queue.front();m_queue.pop_front();m_notFull.notify_all();return 0;}void WaitQueueEmptyStop(){std::unique_lock<std::mutex> locker(m_mtx);while (!IsEmpty()){m_waitStop.wait_for(locker, std::chrono::seconds(1));}m_needStop = true;m_notFull.notify_all();m_notEmpty.notify_all();}void Stop(){{std::unique_lock<std::mutex> lock(m_mtx);m_needStop = true;}m_notEmpty.notify_all();m_notFull.notify_all();}bool Empty() const{std::unique_lock<std::mutex> lock(m_mtx);return m_queue.empty();}bool Full() const{std::unique_lock<std::mutex> lock(m_mtx);return m_queue.size() >= m_maxSize;}size_t Size() const{std::unique_lock<std::mutex> lock(m_mtx);return m_queue.size();}size_t Count() const{return m_queue.size();}};
}
#endif
CachedThreadPool.hpp
//C API
#include<time.h>
//OWN
#include "SyncQueue.hpp"
// C++ STL
#include <functional>
#include <unordered_map>
#include <map>
#include <future>
#include <atomic>
#include <memory>
#include <thread>
using namespace std;
#ifndef CACHED_THREAD_POOL_HPP
#define CACHED_THREAD_POOL_HPPnamespace myspace
{static const int InitThreadNums = 2;static const time_t KeepAliveTimes = 5;class CachedThreadPool{public:using Task = std::function<void(void)>;private:std::unordered_map<std::thread::id, std::shared_ptr<std::thread>> m_threadgroup;int m_coreThreadSize;int m_maxThreadSize;std::atomic<int> m_idleThreadSize;std::atomic<int> m_curThreadSize;std::mutex m_mutex;myspace::SyncQueue<Task> m_queue;std::atomic<bool> m_running;std::once_flag m_flag;void RunInThread(){auto tid = std::this_thread::get_id();time_t startTime = time(nullptr);while (m_running){Task task;if (m_queue.Size() == 0){time_t curnow = time(nullptr);time_t intervalTime = curnow - startTime;if (intervalTime >= KeepAliveTimes && m_curThreadSize > m_coreThreadSize){m_threadgroup.find(tid)->second->detach();m_threadgroup.erase(tid);m_curThreadSize -= 1;m_idleThreadSize -= 1;cerr << "delete idle thread tid: " << tid << endl;cerr << "idle thread num: " << m_idleThreadSize << endl;cerr << "cur thread num: " << m_curThreadSize << endl;return;}}if (!m_queue.Take(task) && m_running){m_idleThreadSize -= 1;task();m_idleThreadSize += 1;startTime = time(nullptr);}}}void Start(int numthreas){m_running = true;m_curThreadSize = numthreas;for (int i = 0; i < numthreas; ++i){auto tha = std::make_shared<std::thread>(&CachedThreadPool::RunInThread, this);std::thread::id tid = tha->get_id();m_threadgroup.emplace(tid, std::move(tha));m_idleThreadSize += 1;}}void StopThreadGroup(){m_queue.WaitQueueEmptyStop();m_running = false;for (auto& x : m_threadgroup){if (x.second->joinable()){x.second->join();}}m_threadgroup.clear();}void AddnewThread(){if (m_idleThreadSize <= 0 && m_curThreadSize < m_maxThreadSize){auto tha = std::make_shared<std::thread>(&CachedThreadPool::RunInThread, this);std::thread::id tid = tha->get_id();m_threadgroup.emplace(tid, std::move(tha));m_idleThreadSize += 1;m_curThreadSize += 1;cerr << "AddnewThread id: " << tid << endl;cerr << "m_curThreadSize: " << m_curThreadSize << endl;}}public:CachedThreadPool(int initNumThreads = InitThreadNums, int taskQueueSize = MaxTaskCount): m_coreThreadSize(initNumThreads),m_maxThreadSize(std::thread::hardware_concurrency() + 1),m_idleThreadSize(0),m_curThreadSize(0),m_queue(taskQueueSize),m_running(false){Start(m_coreThreadSize);}~CachedThreadPool(){if (m_running){Stop();}}void Stop(){std::call_once(m_flag, [this](){ StopThreadGroup(); });}template <class Func, class... Args>void AddTask(Func&& func, Args&&... args){auto task = std::make_shared<std::function<void()> >(std::bind(std::forward<Func>(func),std::forward<Args>(args)...));if (m_queue.Put([task]() { (*task)(); }) != 0){cerr << "not add task queue... " << endl;(*task)();}AddnewThread();}template <class Func, class... Args>auto submit(Func&& func, Args &&...args){using RetType = decltype(func(args...));auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<Func>(func), std::forward<Args>(args)...));std::future<RetType> result = task->get_future();if (m_queue.Put([task](){ (*task)(); }) != 0){cerr << "not add task queue... " << endl;(*task)();}AddnewThread();return result;}};
}#endif