缓冲式线程池C++简易实现

前言 :

代码也比较短,简单说一下代码结构,是这样的:


SyncQueue.hpp封装了一个大小为MaxTaskCount的同步队列,这是一个模板类,它在线程池中承担了存放任务等待线程组中的线程来执行的角色。最底层是std::list<T>但是我们起名为m_queue,因为我们使用list模拟了队列,这里使用std::vector是不合适的,我们要频繁插入(添任务)删除(取任务),开销太大,具体了解一下std::vector底层。

我们还加了一个m_mtx互斥锁,因为我们要保证对于队列的访问是线程安全的,但是因为使用unique_lock对于m_mtx要修改,所以我们加了mutable关键字(mutable保证我们可以在常方法中修改类的非静态成员)。

        std::condition_variable m_notEmpty;
        std::condition_variable m_notFull;

他俩是条件变量是用来同步队列任务是否为空为满的。

Add()是添加任务调用的底层函数,我们对它做了一个封装,分别适合左值和右值:

        int Put(const T& task)
        {
            return Add(task);
        }
        int Put(T&& task)
        {
            return Add(std::forward<T>(task));
        } 

 同理Take也是两个。

WaitQueueEmptyStop()是后期添加的一个函数,比如在我们添加任务结束后,线程来执行任务,但是此时主线程准备结束,我们调用析构线程池对象的析构函数,它最终会调用这个函数判断任务队列中是否还有任务,如果不空,那么我就弃锁睡眠1秒,循环往复,直到队列为空。


 CachedThreadPool.hpp是线程池的代码,Task是可调用对象的包装器,上述任务队列中放的就是Task,我们添加的也是Task,执行的也是Task。底层封装了上述的任务队列。

我们使用一个

std::unordered_map<std::thread::id, std::shared_ptr<std::thread>> m_threadgroup;

来存储某个thread的id 和管理它的共享性智能指针,方便我们到了KeepAliveTimes秒删除它,这是也是我们起名为CachedThreadPool的原因,它是一个缓冲型线程池,线程数量是浮动的,受制于两个原子变量的限制,让线程数不至于太少无法执行任务,不至于太多而空闲:

        std::atomic<int> m_idleThreadSize;
        std::atomic<int> m_curThreadSize;

构造函数中我们Start()开两个线程,线程的入口函数是CachedThreadPool::RunInThread(),让他们去检测是否有任务,有就task();反之陷入m_queue.Take(task)。 

析构函数中我们调用StopThreadGroup(),它会先调用任务队列的WaitQueueEmptyStop()确保任务队列为空,然后使用一个range-for结束掉线程组:
            for (auto& x : m_threadgroup)
            {
                if (x.second->joinable())
                {
                    x.second->join();
                }
            }

还有一个重要的成员函数:

        template <class Func, class...  Args>
        void AddTask(Func&& func, Args&&... args)
        {
            auto task = std::make_shared<std::function<void()> >(
                std::bind(std::forward<Func>(func),
                    std::forward<Args>(args)...));

            if (m_queue.Put([task]() { (*task)(); }) != 0)
            {
                cerr << "not add task queue... " << endl;
                (*task)();
            }
            AddnewThread();
        }

这是一个模板函数,用来添加没有返回值的任务到任务队列中,使用了引用性别未定义,可变模板参数,bind,完美转发,lambda表达式,智能指针,这个函数的成型颇具困难。

        template <class Func, class... Args>
        auto submit(Func&& func, Args &&...args)
        {
            using RetType = decltype(func(args...));
            auto task = std::make_shared<std::packaged_task<RetType()>>(
                std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
            std::future<RetType> result = task->get_future();

            if (m_queue.Put([task]()
                { (*task)(); }) != 0)
            {
                cerr << "not add task queue... " << endl;
                (*task)();
            }
            AddnewThread();
            return result;
        }

 这是一个模板函数,用来添加带有返回值的任务到任务队列。


main.cpp中有一个有意思的问题,可以探讨一下:

std::shared_ptr<FILE> pf(fopen("test.txt", "w"));

//std::shared_ptr<FILE> pf(fopen("test.txt", "w"),fileDeleter());

main.cpp是我们的测试文件,我们想把测试结果打印到文件中,方便观察,我们刚开始使用了一个裸的FILE* fp;然后在main函数结束之后fclose(fp); fp = nullptr;

结果我们跑不通,最终发现是线程池对象析构之前的这两行代码会被执行,文件被关闭,我们无法将结果写入文件中。

最后我们使用了shared_ptr<FILE>来管理文件,但是写成了第一行,我们发现往屏幕上写结果正确但是写文件不行,而且往屏幕上写总是在线程池对象完美析构,一切安顿好之后出现异常,这令人困惑,我以为是我们线程的释放还做得有问题,谁曾想是这个指针坏事了,pf对象作为一个全局对象,析构于局部的线程池对象之后,此时它会调用它的默认删除器,也就是delete,这显然是不行的,文件指针应该fclose才对,所以我们写了一个自定义的删除器来改正错误。

struct fileDeleter {
    void operator()(FILE* fp)const {
        if (fp) {
            cout << "file close\n";
            fclose(fp);
        }
    }
}; 

类似错误还有很多,比如之前的:

红框中我们错写为了:m_queue.pop_back();

这个错误虽然看似简单,实则复杂,因为打印出来结果很流畅,但是值不对,后来人肉debug发现这个错误后令我哭笑不得,好家伙,每次拿走一个任务,竟然pop末尾的未被执行的任务..........

多线程代码调试我还不擅长,任重而道远,但我喜欢De这些有趣的bug! 


 代码总体逻辑类似于生产者-消费者模型,这也是OS学习的经典中的经典,线程池基本就是从这里扩展而来,又分化为了各种类别的线程池,定长的,缓冲的,窃取的.................

源代码:

main.cpp 

#define _CRT_SECURE_NO_WARNINGS// C++ STL
#include <thread>
#include <iostream>
using namespace std;
#include"CachedThreadPool.hpp"
struct fileDeleter {void operator()(FILE* fp)const {if (fp) {cout << "file close\n";fclose(fp);}}
};
class Int
{
private:int value;
public:Int(int x = 0) : value(x){cout << "create Int " << value << endl;}~Int(){cout << "destroy Int: " << value << endl;}Int(const Int& it) :value(it.value){cout << "Copy Int " << value << endl;}Int(Int&& it) :value(it.value){cout << "Move create Int : " << value << endl;it.value = -1;}Int& operator=(const Int& it){if (this != &it){value = it.value;}cout << "operator=" << endl;return *this;}Int& operator=(Int&& it){if (this != &it){value = it.value;it.value = -1;}cout << "operator=(Int &&)" << endl;return *this;}Int& operator++(){value += 1;return *this;}Int operator++(int){Int tmp = *this;++* this;return tmp;}bool operator<(const int x) const{return this->value < x;}ostream& operator<<(ostream& out) const {return out << value;}
};
ostream& operator<<(ostream& out, const Int& it)
{return it << out;
}std::mutex mtx;
std::shared_ptr<FILE> pf(fopen("test.txt", "w"),fileDeleter());
void print(Int x)
{std::lock_guard<std::mutex> lock(mtx);//cout << "print x: " << &x << " " << x << endl;fprintf(pf.get(), "print x : %d => &x: %p \n", x, &x);std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100));
}
void funa(myspace::CachedThreadPool& mypool)
{double dx = 1;for (Int i = 0; i < 100; ++i){mypool.AddTask(print, i);}cout << "funa ... end " << endl;
}int main()
{myspace::CachedThreadPool mypool;std::thread tha(funa, std::ref(mypool));tha.join();return 0;
}

SyncQueue.hpp


// C++ STL
#include <list>
#include <deque>
#include <mutex>
#include <condition_variable>
#include <iostream>
using namespace std;#ifndef SYNC_QUEUE_HPP
#define SYNC_QUEUE_HPP
static const int MaxTaskCount = 2000;
namespace myspace
{template <class T>class SyncQueue{private:std::list<T> m_queue;mutable std::mutex m_mtx;std::condition_variable m_notEmpty;std::condition_variable m_notFull;std::condition_variable m_waitStop;int m_maxSize;int m_waitTime;bool m_needStop;bool IsFull() const{bool full = m_queue.size() >= m_maxSize;if (full){cerr << "m_queue full .. wait .. " << endl;}return full;}bool IsEmpty() const{bool empty = m_queue.empty();if (empty){cerr << "m_queue empty...  wait ... " << endl;}return empty;}template <class F>int Add(F&& task){std::unique_lock<std::mutex> lock(m_mtx);if (!m_notFull.wait_for(lock, std::chrono::seconds(m_waitTime),[this](){ return m_needStop || !IsFull(); })){return 1; // 任务队列已达上限}if (m_needStop){return 2; // 任务队列stop;}m_queue.push_back(std::forward<F>(task));m_notEmpty.notify_all();return 0;}public:SyncQueue(int maxsize = MaxTaskCount, int timeout = 1): m_maxSize(maxsize), m_needStop(false), m_waitTime(timeout){}~SyncQueue(){if (!m_needStop){Stop();}}int Put(const T& task){return Add(task);}int Put(T&& task){return Add(std::forward<T>(task));}int Take(std::list<T>& tlist){std::unique_lock<std::mutex> lock(m_mtx);if (!m_notEmpty.wait_for(lock, std::chrono::seconds(m_waitTime),[this]() {  return m_needStop || !IsEmpty(); })){return 1; // timeout;}if (m_needStop){return 2;}tlist = std::move(m_queue);m_notFull.notify_all();return 0;}int Take(T& task){std::unique_lock<std::mutex> lock(m_mtx);if (!m_notEmpty.wait_for(lock, std::chrono::seconds(m_waitTime),[this]() {  return m_needStop || !IsEmpty(); })){return 1; // timeout;}if (m_needStop){return 2;}task = m_queue.front();m_queue.pop_front();m_notFull.notify_all();return 0;}void WaitQueueEmptyStop(){std::unique_lock<std::mutex> locker(m_mtx);while (!IsEmpty()){m_waitStop.wait_for(locker, std::chrono::seconds(1));}m_needStop = true;m_notFull.notify_all();m_notEmpty.notify_all();}void Stop(){{std::unique_lock<std::mutex> lock(m_mtx);m_needStop = true;}m_notEmpty.notify_all();m_notFull.notify_all();}bool Empty() const{std::unique_lock<std::mutex> lock(m_mtx);return m_queue.empty();}bool Full() const{std::unique_lock<std::mutex> lock(m_mtx);return m_queue.size() >= m_maxSize;}size_t Size() const{std::unique_lock<std::mutex> lock(m_mtx);return m_queue.size();}size_t Count() const{return m_queue.size();}};
}
#endif

CachedThreadPool.hpp

//C API
#include<time.h>
//OWN
#include "SyncQueue.hpp"
// C++ STL
#include <functional>
#include <unordered_map>
#include <map>
#include <future>
#include <atomic>
#include <memory>
#include <thread>
using namespace std;
#ifndef CACHED_THREAD_POOL_HPP
#define CACHED_THREAD_POOL_HPPnamespace myspace
{static const int InitThreadNums = 2;static const time_t KeepAliveTimes = 5;class CachedThreadPool{public:using Task = std::function<void(void)>;private:std::unordered_map<std::thread::id, std::shared_ptr<std::thread>> m_threadgroup;int m_coreThreadSize;int m_maxThreadSize;std::atomic<int> m_idleThreadSize;std::atomic<int> m_curThreadSize;std::mutex m_mutex;myspace::SyncQueue<Task> m_queue;std::atomic<bool> m_running;std::once_flag m_flag;void RunInThread(){auto tid = std::this_thread::get_id();time_t startTime = time(nullptr);while (m_running){Task task;if (m_queue.Size() == 0){time_t  curnow = time(nullptr);time_t intervalTime = curnow - startTime;if (intervalTime >= KeepAliveTimes && m_curThreadSize > m_coreThreadSize){m_threadgroup.find(tid)->second->detach();m_threadgroup.erase(tid);m_curThreadSize -= 1;m_idleThreadSize -= 1;cerr << "delete idle thread tid: " << tid << endl;cerr << "idle thread num: " << m_idleThreadSize << endl;cerr << "cur thread num: " << m_curThreadSize << endl;return;}}if (!m_queue.Take(task) && m_running){m_idleThreadSize -= 1;task();m_idleThreadSize += 1;startTime = time(nullptr);}}}void Start(int numthreas){m_running = true;m_curThreadSize = numthreas;for (int i = 0; i < numthreas; ++i){auto tha = std::make_shared<std::thread>(&CachedThreadPool::RunInThread, this);std::thread::id tid = tha->get_id();m_threadgroup.emplace(tid, std::move(tha));m_idleThreadSize += 1;}}void StopThreadGroup(){m_queue.WaitQueueEmptyStop();m_running = false;for (auto& x : m_threadgroup){if (x.second->joinable()){x.second->join();}}m_threadgroup.clear();}void AddnewThread(){if (m_idleThreadSize <= 0 && m_curThreadSize < m_maxThreadSize){auto tha = std::make_shared<std::thread>(&CachedThreadPool::RunInThread, this);std::thread::id tid = tha->get_id();m_threadgroup.emplace(tid, std::move(tha));m_idleThreadSize += 1;m_curThreadSize += 1;cerr << "AddnewThread id: " << tid << endl;cerr << "m_curThreadSize: " << m_curThreadSize << endl;}}public:CachedThreadPool(int initNumThreads = InitThreadNums, int taskQueueSize = MaxTaskCount): m_coreThreadSize(initNumThreads),m_maxThreadSize(std::thread::hardware_concurrency() + 1),m_idleThreadSize(0),m_curThreadSize(0),m_queue(taskQueueSize),m_running(false){Start(m_coreThreadSize);}~CachedThreadPool(){if (m_running){Stop();}}void Stop(){std::call_once(m_flag, [this](){ StopThreadGroup(); });}template <class Func, class...  Args>void AddTask(Func&& func, Args&&... args){auto task = std::make_shared<std::function<void()> >(std::bind(std::forward<Func>(func),std::forward<Args>(args)...));if (m_queue.Put([task]() { (*task)(); }) != 0){cerr << "not add task queue... " << endl;(*task)();}AddnewThread();}template <class Func, class... Args>auto submit(Func&& func, Args &&...args){using RetType = decltype(func(args...));auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<Func>(func), std::forward<Args>(args)...));std::future<RetType> result = task->get_future();if (m_queue.Put([task](){ (*task)(); }) != 0){cerr << "not add task queue... " << endl;(*task)();}AddnewThread();return result;}};
}#endif

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/15478.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

推荐一款功能强大的光学识别OCR软件:Readiris Dyslexic

Readiris Dyslexic是一款功能强大的光学识别OCR软件&#xff0c;可以扫描任何纸质文档并将其转换为完全可编辑的数字文件(Word&#xff0c;Excel&#xff0c;PDF)&#xff0c;然后用你喜欢的编辑器进行编辑。该软件提供了一种轻松创建&#xff0c;修改和签名PDF的完整解决方法&…

【面试全纪实 | Nginx 04】请回答,你真的精通Nginx吗?

&#x1f5fa;️博客地图 &#x1f4cd;1、location的作用是什么&#xff1f; &#x1f4cd;2、你知道漏桶流算法和令牌桶算法吗&#xff1f; &#x1f4cd;3、Nginx限流怎么做的&#xff1f; &#x1f4cd;4、为什么要做动静分离&#xff1f; &#x1f4cd;5、Nginx怎么做…

如何为你的 SaaS 公司做好国际化发展的准备?

随着 SaaS&#xff08;软件即服务&#xff09;公司的不断发展&#xff0c;确定扩张机会并建立可扩展的流程和策略以支持这些机会变得至关重要。一些公司向上游市场扩张&#xff0c;向企业销售产品&#xff0c;而此前他们主要面向中小企业。一些公司则朝着相反的方向发展&#x…

Towards Reasoning in Large Language Models: A Survey

文章目录 题目摘要引言什么是推理?走向大型语言模型中的推理测量大型语言模型中的推理发现与启示反思、讨论和未来方向 为什么要推理?结论题目 大型语言模型中的推理:一项调查 论文地址:https://arxiv.org/abs/2212.10403 项目地址: https://github.com/jeffhj/LM-reason…

推荐一款硬盘数据清除工具:Macrorit Data Wiper

Macrorit Data Wiper是一款硬盘数据清除工具&#xff0c;用于安全擦除数据、分区和磁盘的一站式工具包。完全擦除系统/引导分区。许多程序文件默认存储在系统磁盘驱动器中。如果您或您的组织想要永久擦除磁盘驱动器以防止未经授权使用您的数据&#xff0c;则此功能是必要的。 为…

第13章 Zabbix分布式监控企业实战

企业服务器对用户提供服务,作为运维工程师最重要的事情就是保证该网站正常稳定的运行,需要实时监控网站、服务器的运行状态,并且有故障及时去处理。 监控网站无需人工时刻去访问WEB网站或者登陆服务器去检查,可以借助开源监控软件例如Zabbix、Cacti、Nagios、Ganglia等来实…

2024IJCAI | MetalISP: 仅用1M参数的RAW到RGB高效映射模型

文章标题是&#xff1a;《MetaISP:Effcient RAW-to-sRGB Mappings with Merely 1M Parameters》 MetaISP收录于2024IJCAI&#xff0c;是新加坡国立大学&#xff08;Xinchao Wang为通讯作者&#xff09;和华为联合研发的新型ai-isp。 原文链接&#xff1a;MetaISP 【1】论文的…

使用 ts-node 运行 ts文件,启动 nodejs项目

最近在写一个nodejs项目&#xff0c;使用 ts-node 启动项目。遇到了一些问题&#xff0c;在此记录一下。 ts-node 是 TypeScript 执行引擎和 Node.js 的 REPL(一个简单的交互式的编程环境)。 它能够直接在 Node.js 上执行 TypeScript&#xff0c;而无需预编译。 这是通过挂接…

《鸿蒙生态:开发者的机遇与挑战》

一、引言 在当今科技飞速发展的时代&#xff0c;操作系统作为连接硬件与软件的核心枢纽&#xff0c;其重要性不言而喻。鸿蒙系统的出现&#xff0c;为开发者带来了新的机遇与挑战。本文将从开发者的角度出发&#xff0c;阐述对鸿蒙生态的认知和了解&#xff0c;分析鸿蒙生态的…

PHP代码审计 - SQL注入

SQL注入 正则搜索(update|select|insert|delete).*?where.*示例一&#xff1a; bluecms源码下载&#xff1a;source-trace/bluecms 以项目打开网站根目录&#xff0c;并以ctrlshiftf打开全局搜索 (update|select|insert|delete).*?where.*并开启正则匹配 最快寻找脆弱点的…

Essential Cell Biology--Fifth Edition--Chapter one (5)

1.1.4 The eukaryotic cell [真核细胞] 真核细胞&#xff0c;一般来说&#xff0c;比细菌和古细菌更大&#xff0c;更复杂。有些是独立的单细胞生物&#xff0c;如变形虫和酵母&#xff08;图1-14&#xff09;&#xff1b;另一些则生活在多细胞集合中。所有更复杂的多细胞生物…

线程-2-线程概念与控制

main 线程常见寄存器&#xff08;CR3 EIP IR MMU TLB&#xff09; CR3是当前进程页表物理内存地址&#xff08;包不能虚拟地址&#xff0c;不然套娃了&#xff09; CPU中有寄存器指向task_struct* current EIP&#xff1a;入口虚拟地址 IR&#xff1a;当前命令地址系统总线&a…

Vulkan 开发(十一):Vulkan 交换链

Vulkan 系列文章&#xff1a; 1. 开篇&#xff0c;Vulkan 概述 2. Vulkan 实例 3. Vulkan 物理设备 4. Vulkan 设备队列 5. Vulkan 逻辑设备 6. Vulkan 内存管理 7. Vulkan 缓存 8. Vulkan 图像 9. Vulkan 图像视图 10. Vulkan 窗口表面&#xff08;Surface&#xff…

【HarmonyOS】鸿蒙系统在租房项目中的项目实战(一)

从今天开始&#xff0c;博主将开设一门新的专栏用来讲解市面上比较热门的技术 “鸿蒙开发”&#xff0c;对于刚接触这项技术的小伙伴在学习鸿蒙开发之前&#xff0c;有必要先了解一下鸿蒙&#xff0c;从你的角度来讲&#xff0c;你认为什么是鸿蒙呢&#xff1f;它出现的意义又是…

百度搜索AI探索版多线程批量生成TXT原创文章软件-可生成3种类型文章

百度搜索AI探索版是百度推出的一款基于大语言模型文心一言的综合搜索产品‌。以下是关于百度搜索AI探索版的详细介绍&#xff1a; ‌产品发布‌&#xff1a;百度搜索AI探索版在百度世界大会上进行了灰度测试&#xff0c;并面向用户开放体验‌。 ‌核心功能‌&#xff1a;与传…

Linux软件包管理与Vim编辑器使用指南

目录 一、Linux软件包管理器yum 1.什么是软件包&#xff1f; 2.什么是软件包管理器&#xff1f; 3.查看软件包 4.安装软件 ​编辑 5.卸载软件 Linux开发工具&#xff1a; 二、Linux编辑器---vim 1.vim的基本概念 (1) 正常/普通模式&#xff08;Normal mode&#xff0…

Android Osmdroid + 天地图 (一)

Osmdroid 天地图 前言正文一、配置build.gradle二、配置AndroidManifest.xml三、获取天地图的API Key① 获取开发版SHA1② 获取发布版SHA1 四、请求权限五、显示地图六、源码 前言 Osmdroid是一款完全开源的地图基本操作SDK&#xff0c;我们可以通过这个SDK去加一些地图API&am…

2024国内AI工具十大推荐丨亲测好用‼️

&#x1f680;探索了市面上数百款AI工具后&#xff0c;我精心挑选了10款在不同场景下超级好用的神器&#xff0c;快来一起看看吧&#xff01;&#x1f31f; 1️⃣豆包 基于云雀模型开发&#xff0c;具备聊天机器人、写作助手、英语学习助手等功能&#xff0c;能够进行多轮对话…

Unity学习---IL2CPP打包时可能遇到的问题

写这篇主要是怕自己之后打包的时候出问题不知道怎么搞&#xff0c;所以记录一下。 问题一&#xff1a;类型裁剪 IL2CPP打包后会自动对Unity工程的dll进行裁剪&#xff0c;将代码中没有引用到的类型裁剪掉。特别是通过反射等方式调用一些类的时候&#xff0c;很容易出问题。 …

多模态大模型(2)--BLIP

大模型如火如荼&#xff0c;研究者们已经不再满足于基本文本的大语言模型&#xff08;LLM, Large Language Model&#xff09;&#xff0c;AI领域的热点正逐步向多模态转移&#xff0c;具备多模态能力的多模态大型语言模型&#xff08;MM&#xff08;Multi-Modal&#xff09;-L…