一. 简介
本章我们开始阅读c++ git 高星开源项目ThreadPool
, 这是一个纯c++的线程池项目, 并且代码量极小, 非常适合新手阅读
git地址: progschj / ThreadPool
二. 前提知识
为了面对不同读者对c++掌握情况不同的情况, 这里我会将基本上稍微值得一说的前提知识点, 全部专门写成一篇博客, 同学们在阅读本篇之前, 可以先去阅读前提知识部分
c++源码阅读__ThreadPool__前提基础部分
还有线程的一些基础知识
C++ 多线程 菜鸟教程
三. 源码:
因为源码时c++11的, 所以我们如果用最新的标准是跑不起来的, 所以这里我在下面源码部分把能用最新标准跑的版本的代码贴了出来
修改的地方只有一处, 如下
返回类型的推导:
typename std::result_of<F(Args...)>::type
改为了
typename std::invoke_result<F, Args...>::type
由于此项目比较小, 所以我们直接把代码全部贴出来, 并且在代码中, 用注释附上讲解
#ifndef THREAD_POOL_H
#define THREAD_POOL_H#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
#include <iostream>class ThreadPool {
public:ThreadPool(size_t);// 给线程池设置任务的方法, 核心逻辑template<class F, class... Args>auto enqueue(F&& f, Args&&... args) -> std::future<typename std::invoke_result<F, Args...>::type>;~ThreadPool();
private:// 线程队列的 vectorstd::vector< std::thread > workers;// 任务列表, 在前提知识里, 已经说明了, 通过lambda和bind将方法斗包装为void()类型的// 至于任务的返回值, 通过future实现std::queue< std::function<void()> > tasks;// synchronizationstd::mutex queue_mutex;// 主线程通知子线程的工具std::condition_variable condition;bool stop;
};// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads): stop(false)
{for (size_t i = 0; i < threads; ++i)// 这了workers.emplace_back([](){}) 为什么能够直接生成Thread呢?// 因为emplace_back 和 push_back不同, emplace_back传入的参数可以是 构造函数的 参数, // 所以这里写全了 应该是类似下面的代码// workers.emplace_back( Thread( [](){} ) )workers.emplace_back([this]{for (;;){std::function<void()> task;{// 这里单独{}开一个域, 是因为unique_lock生效的范围是当前作用域std::unique_lock<std::mutex> lock(this->queue_mutex);// 这里condition_variable的wait, 等待一个notifythis->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });if (this->stop && this->tasks.empty()){std::cout << std::this_thread::get_id() << std::endl;return;}task = std::move(this->tasks.front());this->tasks.pop();}// 脱离了lock域, 真正执行方法的地方, 还是多线程的, 如果写在上面的lock域里// 那就变成 "单线程" 了task();}});
}// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) ->std::future<typename std::invoke_result<F, Args...>::type>
{using return_type = typename std::invoke_result<F, Args...>::type;// 为什么要用shared_ptr? 因为后面使用queue.emplace, 会将task传递到queue中,// 当离开此方法时, task因为离开作用域, 会销毁, 而shared_ptr则不会销毁, 而是引用计数-1auto task = std::make_shared< std::packaged_task<return_type()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...));// 并且这里要注意, task不是packaged_task, 而是shared_ptrstd::future<return_type> res = task->get_future();{// 为什么这里要开一个单独的作用域呢? 因为这里unique_lock是以作用域进行lock的std::unique_lock<std::mutex> lock(queue_mutex);// don't allow enqueueing after stopping the poolif (stop)throw std::runtime_error("enqueue on stopped ThreadPool");// 这里通过lambda, 及那个task包装为一个void()的函数, 里面的*task是shared_ptr指针指向的packaged_tasktasks.emplace([task]() { (*task)(); });}// 任务装入queue后, 通知子线程执行condition.notify_one();return res;
}// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{{// 同样的原因, unique_lock的生效范围是当前作用域std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}condition.notify_all();// 执行到notify_all的时候, 在线程的方法里, 实际上已经return了, 这里循环join// 是为了让主线程结束的时候, 子线程也随之结束for (std::thread& worker : workers)worker.join();
}#endif
调用方法
#include "ThreadPool.h"
using namespace std;int main()
{// 线程数量为4的鲜橙汁ThreadPool pool(4);// 8个返回类型都是int的future数组std::vector< std::future<int> > results;for (int i = 0; i < 8; ++i) {auto f = [i]() {std::this_thread::sleep_for(std::chrono::seconds(1));return i;};// 设置任务, 并将返回的future放入results里results.emplace_back(pool.enqueue(f));}// 循环打印结果for (auto&& result : results)std::cout << result.get() << endl;// 设置一个string(const char*, const char*)的方法, 并获取返回的future<string>std::future<string> f = pool.enqueue([](const char* s1, const char* s2) {return string(s1) + s2;}, "hello ", "world");cout << f.get() << endl;std::cout << std::endl;return 0;
}
执行结果
总结
技巧:
- 通过lambda 或者 bind 来改变函数的参数个数
- 通过构造 packaged_task来改变返回值传递的方式, 方便将方法统一放入vector, 并且是异步执行的
- 通过lambda来改变函数返回值类型