当前位置: 首页 > news >正文

C++ 简单线程池实现

实现代码 

#include <vector>
#include <thread>
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <future>class ThreadPool {
public:ThreadPool(size_t threads) : stop(false) {for(size_t i = 0; i < threads; ++i) {workers.emplace_back([this] {for(;;) {std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });if(this->stop && this->tasks.empty())return;task = std::move(this->tasks.front());this->tasks.pop();}task();}});}}template<class F, class... Args>auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();{std::unique_lock<std::mutex> lock(queue_mutex);if(stop)throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task](){ (*task)(); });}condition.notify_one();return res;}~ThreadPool() {{std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}condition.notify_all();for(std::thread &worker: workers)worker.join();}private:std::vector<std::thread> workers;std::queue<std::function<void()>> tasks;std::mutex queue_mutex;std::condition_variable condition;bool stop;
};

使用示例

#include <iostream>
#include <chrono>int main() {ThreadPool pool(4);// 提交多个任务到线程池std::vector<std::future<int>> results;for(int i = 0; i < 8; ++i) {results.emplace_back(pool.enqueue([i] {std::cout << "hello " << i << std::endl;std::this_thread::sleep_for(std::chrono::seconds(1));std::cout << "world " << i << std::endl;return i*i;}));}// 获取结果for(auto && result: results)std::cout << result.get() << ' ';std::cout << std::endl;return 0;
}

解析

ThreadPool 类定义

class ThreadPool {
public:ThreadPool(size_t threads);  // 构造函数,指定线程数量~ThreadPool();               // 析构函数template<class F, class... Args>auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;private:std::vector<std::thread> workers;       // 工作线程集合std::queue<std::function<void()>> tasks; // 任务队列std::mutex queue_mutex;                 // 任务队列互斥锁std::condition_variable condition;      // 条件变量bool stop;                              // 停止标志
};

构造函数解析

ThreadPool(size_t threads) : stop(false) {for(size_t i = 0; i < threads; ++i) {workers.emplace_back([this] {// 线程工作函数for(;;) {std::function<void()> task;{// 获取队列锁std::unique_lock<std::mutex> lock(this->queue_mutex);// 等待条件满足:停止或任务队列非空this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); });// 如果停止且任务为空,线程退出if(this->stop && this->tasks.empty())return;// 获取任务task = std::move(this->tasks.front());this->tasks.pop();}// 执行任务(在锁外执行,避免锁持有时间过长)task();}});}
}
  1. stop(false) - 初始化停止标志为false

  2. workers.emplace_back - 创建并启动工作线程

  3. for(;;) - 线程无限循环,等待任务

  4. std::unique_lock<std::mutex> - 获取队列锁

  5. condition.wait - 等待条件变量通知,防止忙等待

       条件:stop || !tasks.empty()(停止或有任务)
  6. 检查是否应该退出线程

  7. 从队列获取任务并移出队列

  8. 在锁外执行任务

enqueue 方法解析

template<class F, class... Args>
auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {// 获取返回类型using return_type = typename std::result_of<F(Args...)>::type;// 创建packaged_task包装可调用对象auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));// 获取future以便获取结果std::future<return_type> res = task->get_future();{// 获取队列锁std::unique_lock<std::mutex> lock(queue_mutex);// 如果线程池已停止,抛出异常if(stop)throw std::runtime_error("enqueue on stopped ThreadPool");// 将任务添加到队列tasks.emplace([task](){ (*task)(); });}// 通知一个等待线程有新任务condition.notify_one();return res;
}
  1. 模板参数:

    • F - 可调用对象类型

    • Args - 参数类型包

  2. 返回类型推导:

    • std::future<typename std::result_of<F(Args...)>::type>

  3. std::packaged_task - 包装可调用对象,可以获取future

  4. std::bind - 绑定参数

  5. std::forward - 完美转发参数

  6. task->get_future() - 获取与任务关联的future

  7. 锁保护下的队列操作

  8. condition.notify_one() - 通知一个等待线程

析构函数解析

~ThreadPool() {{// 获取锁并设置停止标志std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}// 通知所有线程condition.notify_all();// 等待所有线程完成for(std::thread &worker: workers)worker.join();
}
  1. 设置停止标志stop = true

  2. condition.notify_all() - 唤醒所有等待线程

  3. worker.join() - 等待所有线程结束

动态调整线程数量

void resize(size_t new_size) {if (new_size < workers.size()) {// 减少线程数量{std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}condition.notify_all();for (std::thread &worker : workers)worker.join();workers.clear();stop = false;for (size_t i = 0; i < new_size; ++i) {workers.emplace_back([this] {// 线程工作函数});}} else if (new_size > workers.size()) {// 增加线程数量for (size_t i = workers.size(); i < new_size; ++i) {workers.emplace_back([this] {// 线程工作函数});}}
}

 任务优先级

#include <queue>// 修改任务队列定义
struct Task {std::function<void()> func;int priority;bool operator<(const Task& other) const {return priority < other.priority; // 优先级高的先执行}
};std::priority_queue<Task> tasks;// 修改enqueue方法
template<class F, class... Args>
auto enqueue(int priority, F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {// ... 其他代码不变tasks.emplace(Task{[task](){ (*task)(); }, priority});// ...
}
http://www.xdnf.cn/news/218287.html

相关文章:

  • 线程数据同步的三种方式
  • Qwen多模态系列论文
  • C语言中的POSIX线程与多线程编程:从入门到实践
  • Java SE(5)——数组
  • Java基础学习内容大纲
  • 【Qt】Qt换肤,使用QResource动态加载资源文件
  • AI时代来临将带来文科复兴
  • 预留库存的实现
  • 清晰易懂的跨域请求知识——拿捏
  • 前端与后端开发详解:从概念到就业技能指南
  • 解锁健康密码,开启养生新旅程
  • 空间权重矩阵
  • 体育直播源码NBA足球直播M29模板赛事直播源码
  • Linux日志系统大揭秘-系统故障排查安全审计
  • openssl_error_string() 不要依赖错误信息作为逻辑判断
  • JVM | CMS垃圾收集器详解
  • OVP UVP与UVLO对比
  • 2025年DDoS攻击防御全解析:应对超大流量的实战策略
  • Springboot使用jwt实现登录认证
  • Gitea windows服务注册,服务启动、停止、重启脚本
  • JavaScript面试问题
  • 研读论文——电子科技大学《通过专家混合实现多类型上下文感知的对话推荐系统》
  • antd-vue表单实现一个临时校验效果
  • DeepSeek+Dify之六通过API调用工作流
  • 头歌java课程实验(Java中的IO流操作)
  • python脚本下载ERA5数据详细规范和教程
  • Mysql中索引的知识
  • c#栈及其应用
  • 生物信息学常用软件InSequence,3大核心功能,简易好上手
  • PyTorch 深度学习实战(23):多任务强化学习(Multi-Task RL)之扩展