C++ 简单线程池实现
实现代码
#include <vector>
#include <thread>
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <future>class ThreadPool {
public:ThreadPool(size_t threads) : stop(false) {for(size_t i = 0; i < threads; ++i) {workers.emplace_back([this] {for(;;) {std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });if(this->stop && this->tasks.empty())return;task = std::move(this->tasks.front());this->tasks.pop();}task();}});}}template<class F, class... Args>auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();{std::unique_lock<std::mutex> lock(queue_mutex);if(stop)throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task](){ (*task)(); });}condition.notify_one();return res;}~ThreadPool() {{std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}condition.notify_all();for(std::thread &worker: workers)worker.join();}private:std::vector<std::thread> workers;std::queue<std::function<void()>> tasks;std::mutex queue_mutex;std::condition_variable condition;bool stop;
};
使用示例
#include <iostream>
#include <chrono>int main() {ThreadPool pool(4);// 提交多个任务到线程池std::vector<std::future<int>> results;for(int i = 0; i < 8; ++i) {results.emplace_back(pool.enqueue([i] {std::cout << "hello " << i << std::endl;std::this_thread::sleep_for(std::chrono::seconds(1));std::cout << "world " << i << std::endl;return i*i;}));}// 获取结果for(auto && result: results)std::cout << result.get() << ' ';std::cout << std::endl;return 0;
}
解析
ThreadPool 类定义
class ThreadPool {
public:ThreadPool(size_t threads); // 构造函数,指定线程数量~ThreadPool(); // 析构函数template<class F, class... Args>auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;private:std::vector<std::thread> workers; // 工作线程集合std::queue<std::function<void()>> tasks; // 任务队列std::mutex queue_mutex; // 任务队列互斥锁std::condition_variable condition; // 条件变量bool stop; // 停止标志
};
构造函数解析
ThreadPool(size_t threads) : stop(false) {for(size_t i = 0; i < threads; ++i) {workers.emplace_back([this] {// 线程工作函数for(;;) {std::function<void()> task;{// 获取队列锁std::unique_lock<std::mutex> lock(this->queue_mutex);// 等待条件满足:停止或任务队列非空this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });// 如果停止且任务为空,线程退出if(this->stop && this->tasks.empty())return;// 获取任务task = std::move(this->tasks.front());this->tasks.pop();}// 执行任务(在锁外执行,避免锁持有时间过长)task();}});}
}
-
stop(false)
- 初始化停止标志为false -
workers.emplace_back
- 创建并启动工作线程 -
for(;;)
- 线程无限循环,等待任务 -
std::unique_lock<std::mutex>
- 获取队列锁 -
条件:condition.wait
- 等待条件变量通知,防止忙等待stop || !tasks.empty()
(停止或有任务) -
检查是否应该退出线程
-
从队列获取任务并移出队列
-
在锁外执行任务
enqueue 方法解析
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {// 获取返回类型using return_type = typename std::result_of<F(Args...)>::type;// 创建packaged_task包装可调用对象auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));// 获取future以便获取结果std::future<return_type> res = task->get_future();{// 获取队列锁std::unique_lock<std::mutex> lock(queue_mutex);// 如果线程池已停止,抛出异常if(stop)throw std::runtime_error("enqueue on stopped ThreadPool");// 将任务添加到队列tasks.emplace([task](){ (*task)(); });}// 通知一个等待线程有新任务condition.notify_one();return res;
}
-
模板参数:
-
F
- 可调用对象类型 -
Args
- 参数类型包
-
-
返回类型推导:
-
std::future<typename std::result_of<F(Args...)>::type>
-
-
std::packaged_task
- 包装可调用对象,可以获取future -
std::bind
- 绑定参数 -
std::forward
- 完美转发参数 -
task->get_future()
- 获取与任务关联的future -
锁保护下的队列操作
-
condition.notify_one()
- 通知一个等待线程
析构函数解析
~ThreadPool() {{// 获取锁并设置停止标志std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}// 通知所有线程condition.notify_all();// 等待所有线程完成for(std::thread &worker: workers)worker.join();
}
-
设置停止标志
stop = true
-
condition.notify_all()
- 唤醒所有等待线程 -
worker.join()
- 等待所有线程结束
动态调整线程数量
void resize(size_t new_size) {if (new_size < workers.size()) {// 减少线程数量{std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}condition.notify_all();for (std::thread &worker : workers)worker.join();workers.clear();stop = false;for (size_t i = 0; i < new_size; ++i) {workers.emplace_back([this] {// 线程工作函数});}} else if (new_size > workers.size()) {// 增加线程数量for (size_t i = workers.size(); i < new_size; ++i) {workers.emplace_back([this] {// 线程工作函数});}}
}
任务优先级
#include <queue>// 修改任务队列定义
struct Task {std::function<void()> func;int priority;bool operator<(const Task& other) const {return priority < other.priority; // 优先级高的先执行}
};std::priority_queue<Task> tasks;// 修改enqueue方法
template<class F, class... Args>
auto enqueue(int priority, F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {// ... 其他代码不变tasks.emplace(Task{[task](){ (*task)(); }, priority});// ...
}