基于 Windows I/O 完成端口(IOCP)的多线程任务队列系统小case
这段代码实现了一个基于 Windows I/O 完成端口(IOCP)的多线程任务队列系统。它通过 IOCP 将任务分发到线程池中执行,并通过一个线程安全的队列(std::list<std::string>
)来管理任务数据。以下是对代码的详细讲解:
1. 宏定义和枚举
#define IOCP_LIST_EMPTR 0
#define IOCP_LIST_PUSH 1
#define IOCP_LIST_POP 2enum {IocpListEmpty,IocpListPush,IocpListPop
};
-
宏定义和枚举用于定义操作类型:
-
IOCP_LIST_EMPTR
和IocpListEmpty
:表示清空队列的操作。 -
IOCP_LIST_PUSH
和IocpListPush
:表示向队列中添加数据的操作。 -
IOCP_LIST_POP
和IocpListPop
:表示从队列中取出数据的操作。
-
2. 数据结构 IOCP_PARAM
typedef struct IocpParam
{int nOperator; // 操作类型std::string strData; // 数据_beginthread_proc_type cbFunc; // 回调函数IocpParam(int op, const char* sData, _beginthread_proc_type cb = NULL) {nOperator = op;strData = sData;cbFunc = cb;}IocpParam() {nOperator = -1;}
} IOCP_PARAM;
-
IOCP_PARAM
是一个结构体,用于封装任务信息:-
nOperator
:指定操作类型(如IocpListPush
或IocpListPop
)。 -
strData
:存储任务相关的数据。 -
cbFunc
:回调函数指针,用于在任务完成后调用。
-
3. 线程函数 threadQueueEntry
void threadQueueEntry(HANDLE hIOCP) {std::list<std::string> lstString; // 线程安全的队列DWORD dwTransferred = 0;ULONG_PTR CompletionKey = 0;OVERLAPPED* poverlapped = NULL;while (GetQueuedCompletionStatus(hIOCP, &dwTransferred, &CompletionKey, &poverlapped, INFINITE)) {if ((dwTransferred == 0) && (CompletionKey == NULL)) {printf("thread is prepare to exit!\r\n");break;}IOCP_PARAM* pParam = (IOCP_PARAM*)CompletionKey;if (pParam->nOperator == IocpListPush) {lstString.push_back(pParam->strData);}else if (pParam->nOperator == IocpListPop) {std::string* pStr = NULL;if (lstString.size() > 0) {pStr = new std::string(lstString.front());lstString.pop_front();}if (pParam->cbFunc) {pParam->cbFunc(pStr);}}else if (pParam->nOperator == IocpListEmpty) {lstString.clear();}delete pParam;}_endthread();
}
-
这是线程的入口函数,负责处理从完成端口(IOCP)接收到的任务。
-
使用
GetQueuedCompletionStatus
函数从完成端口获取任务:-
如果
dwTransferred
为 0 且CompletionKey
为NULL
,表示线程准备退出。 -
否则,根据
CompletionKey
获取任务参数pParam
。
-
-
根据
pParam->nOperator
的值执行不同的操作:-
IocpListPush
:将数据添加到队列lstString
中。 -
IocpListPop
:从队列中取出数据,并调用回调函数cbFunc
。 -
IocpListEmpty
:清空队列。
-
-
最后释放任务参数
pParam
。
4. 回调函数 func
void func(void* arg) {std::string* pstr = (std::string*)arg;if (pstr != NULL) {printf("pop from list :%s\r\n", pstr->c_str());delete pstr;}else {printf("list is empty,no data!\r\n");}
}
-
这是一个简单的回调函数,用于处理从队列中取出的数据:
-
如果
pstr
不为空,打印数据并释放内存。 -
如果为空,打印队列为空的消息。
-
5. 主函数 main
int main()
{if (!CTool::Init()) return 1;printf("press any key to exit...\r\n");HANDLE hIOCP = INVALID_HANDLE_VALUE; // I/O 完成端口hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1); // 创建完成端口HANDLE hThread = (HANDLE)_beginthread(threadQueueEntry, 0, hIOCP); // 创建线程DWORD tick = GetTickCount64();while (_kbhit() != 0) { // 检测按键if (GetTickCount64() - tick > 1300) {PostQueuedCompletionStatus(hIOCP, sizeof(IOCP_PARAM), (ULONG_PTR)new IOCP_PARAM(IocpListPop, "hello world"), NULL);}if (GetTickCount64() - tick > 2000) {PostQueuedCompletionStatus(hIOCP, sizeof(IOCP_PARAM), (ULONG_PTR)new IOCP_PARAM(IocpListPush, "hello world"), NULL);tick = GetTickCount64();}Sleep(1);}if (hIOCP != NULL) {PostQueuedCompletionStatus(hIOCP, 0, NULL, NULL); // 唤醒完成端口WaitForSingleObject(hIOCP, INFINITE);}CloseHandle(hIOCP);printf("exit done!\r\n");::exit(0);
}
-
主函数的主要逻辑:
-
初始化:调用
CTool::Init()
进行初始化(假设这是一个工具类)。 -
创建完成端口:使用
CreateIoCompletionPort
创建一个完成端口,指定线程池大小为 1。 -
创建线程:通过
_beginthread
创建一个线程,线程函数为threadQueueEntry
。 -
任务调度:
-
使用
PostQueuedCompletionStatus
向完成端口提交任务。 -
每隔 1300 毫秒提交一个
IocpListPop
任务。 -
每隔 2000 毫秒提交一个
IocpListPush
任务。
-
-
退出逻辑:
-
按下任意键后,通过
PostQueuedCompletionStatus
提交一个退出信号(dwTransferred
为 0,CompletionKey
为NULL
)。 -
等待完成端口线程退出。
-
关闭完成端口句柄。
-
-
总结
这段代码通过 Windows 的 I/O 完成端口(IOCP)实现了一个简单的任务队列系统。它利用 IOCP 的多线程能力,将任务分发到线程池中执行,并通过一个线程安全的队列管理任务数据。代码的核心在于:
-
使用
CreateIoCompletionPort
创建完成端口。 -
使用
PostQueuedCompletionStatus
提交任务。 -
在线程中通过
GetQueuedCompletionStatus
获取任务并处理。
这种设计可以高效地处理并发任务,适用于需要高性能 I/O 操作的场景。