I/O复用模型(EPOLL)
模型思想:向内核注册需要监听的文件描述符,操作系统负责保存监视对象文件描述符,当有事件发生时,epoll_wait仅返回有事件发生的文件描述符数组
优点:
1.无需编写以监视状态为目的的针对所有文件描述符的循环语句
2.调用epoll_wait时无需每次传递监视对象信息
条件触发:只要输入缓冲区有数据,就会触发epoll_wait()
#include <iostream>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#define BUF_SIZE 100
#define EPOLL_SIZE 50
void error_handling(char *buf);
int main(int argc, char *argv[])
{int serv_sock, clnt_sock;struct sockaddr_in serv_adr, clnt_adr;char buf[BUF_SIZE];struct epoll_event ep_events[EPOLL_SIZE];struct epoll_event event;int epfd, event_cnt;serv_sock = socket(PF_INET, SOCK_STREAM, 0);memset(&serv_adr, 0, sizeof(serv_adr));serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);serv_adr.sin_port = htons(atoi(argv[1]));serv_adr.sin_family = AF_INET;if (bind(serv_sock, (struct sockaddr *)&serv_adr, sizeof(serv_adr)) != 0){error_handling("bind() error");return -1;}if (listen(serv_sock, 5) != 0){error_handling("listen() error");return -2;}epfd = epoll_create(EPOLL_SIZE);event.events = EPOLLIN;event.data.fd = serv_sock;epoll_ctl(epfd, EPOLL_CTL_ADD, serv_sock, &event);while (1){event_cnt = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1);if (event_cnt == -1){error_handling("epoll_wait error!");break;}for (int i = 0; i < event_cnt; i++){if (ep_events[i].data.fd == serv_sock) // 通信事件{socklen_t clnt_addrLen = sizeof(clnt_adr);clnt_sock = accept(serv_sock, (sockaddr *)&clnt_adr, &clnt_addrLen);printf("connected client:%d\n", clnt_sock);event.events = EPOLLIN;event.data.fd = clnt_sock;epoll_ctl(epfd, EPOLL_CTL_ADD, clnt_sock, &event);}else // 通信事件{int len = read(ep_events[i].data.fd, buf, BUF_SIZE);if (len == 0) // 连接关闭{epoll_ctl(epfd, EPOLL_CTL_DEL, ep_events[i].data.fd, NULL);close(ep_events[i].data.fd);}puts(buf);write(ep_events[i].data.fd, buf, strlen(buf));}}}close(serv_sock);close(epfd);return 0;
}
void error_handling(char *buf)
{fputs(buf, stderr);fputc('\n', stderr);exit(1);
}
边缘触发:只会触发一次epoll_wait(非阻塞,忙轮询)
#include <iostream>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#define BUF_SIZE 2
#define EPOLL_SIZE 50
void error_handling(char *buf);
void setnonnlockingmode(int fd);
char buf[BUF_SIZE];
int main(int argc, char *argv[])
{int serv_sock, clnt_sock;struct sockaddr_in serv_adr, clnt_adr;struct epoll_event ep_events[EPOLL_SIZE];struct epoll_event event;int epfd, event_cnt;serv_sock = socket(PF_INET, SOCK_STREAM, 0);memset(&serv_adr, 0, sizeof(serv_adr));serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);serv_adr.sin_port = htons(atoi(argv[1]));serv_adr.sin_family = AF_INET;if (bind(serv_sock, (struct sockaddr *)&serv_adr, sizeof(serv_adr)) != 0){error_handling("bind() error");return -1;}if (listen(serv_sock, 5) != 0){error_handling("listen() error");return -2;}epfd = epoll_create(EPOLL_SIZE);event.events = EPOLLIN;event.data.fd = serv_sock;setnonnlockingmode(serv_sock);epoll_ctl(epfd, EPOLL_CTL_ADD, serv_sock, &event);while (1){event_cnt = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1);printf("epoll_wait\n");if (event_cnt == -1){error_handling("epoll_wait error!");break;}for (int i = 0; i < event_cnt; i++){if (ep_events[i].data.fd == serv_sock) // 通信事件{socklen_t clnt_addrLen = sizeof(clnt_adr);clnt_sock = accept(serv_sock, (sockaddr *)&clnt_adr, &clnt_addrLen);printf("connected client:%d\n", clnt_sock);event.events = EPOLLIN | EPOLLET;event.data.fd = clnt_sock;setnonnlockingmode(clnt_sock);epoll_ctl(epfd, EPOLL_CTL_ADD, clnt_sock, &event);}else // 通信事件{while (1){int len = read(ep_events[i].data.fd, buf, BUF_SIZE);if (len == 0) // 连接关闭{epoll_ctl(epfd, EPOLL_CTL_DEL, ep_events[i].data.fd, NULL);close(ep_events[i].data.fd);printf("closed client:%d\n", ep_events[i].data.fd);break;}else if (len < 0){if (errno == EAGAIN) // 没有数据可读break;}else{write(ep_events[i].data.fd, buf, len);}}}}}close(serv_sock);close(epfd);return 0;
}
void setnonnlockingmode(int fd)
{int flag = fcntl(fd, F_GETFL, 0);fcntl(fd, F_SETFL, flag | O_NONBLOCK);
}
void error_handling(char *buf)
{fputs(buf, stderr);fputc('\n', stderr);exit(1);
}
I/O复用模型(select)
模型思想:通过位数组(fd_set),向内核注册需要监视的文件描述符,当内核监听到位数组文件描述符有事件发生时,select返回,通知用户程序有事件发生,并返回内核标记了发生事件的文件描述符的位数组,可以通过遍历位数组判断哪些文件描述符发生了事件
特点:select可以跨平台
缺点:
1.每次调用select函数时向操作系统传递监视信息(所有需要监听的文件描述符都需要拷贝)
2.调用select函数后常见的针对所有文件描述符的循环语句
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <stdio.h>
#include <stdlib.h>
#include <arpa/inet.h>
#include <string.h>
#include <unistd.h>
#define BUF_SIZE 1024
void error_handling(char *buf);
int main(int argc, char *argv[])
{int serv_sock, clnt_sock;struct sockaddr_in serv_adr, clnt_adr;struct timeval timeout;fd_set reads, cpy_reads; // 位数组int fd_max, fd_num;char buf[BUF_SIZE];serv_sock = socket(PF_INET, SOCK_STREAM, 0);memset(&serv_adr, 0, sizeof(serv_adr));serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);serv_adr.sin_port = htons(atoi(argv[1]));serv_adr.sin_family = AF_INET;if (bind(serv_sock, (struct sockaddr *)&serv_adr, sizeof(serv_adr)) != 0){error_handling("bind() error");return -1;}if (listen(serv_sock, 5) != 0){error_handling("listen() error");return -2;}FD_ZERO(&reads); // 清空位数组FD_SET(serv_sock, &reads);fd_max = serv_sock;while (1){cpy_reads = reads;timeout.tv_sec = 5;timeout.tv_usec = 5000;if ((fd_num = select(fd_max + 1, &cpy_reads, 0, 0, &timeout)) == -1){break;}if (fd_num == 0) // 超时返回{printf("超时...继续等待连接......\n");continue;}for (int i = 0; i < fd_max + 1; i++){if (FD_ISSET(i, &cpy_reads)) // i文件描述符是有事件发生{if (i == serv_sock) // 连接事件{socklen_t str_len = sizeof(clnt_adr);clnt_sock = accept(serv_sock, (sockaddr *)&clnt_sock, &str_len);printf("connected client:%d", clnt_sock);FD_SET(clnt_sock, &reads);if (fd_max < clnt_sock)fd_max = clnt_sock;}else{ // 通信事件int str_len = read(i, buf, BUF_SIZE);if (str_len == 0) // 客户端断开连接{FD_CLR(i, &reads);close(i);printf("closed client:%d\n", i);}else{write(i, buf, str_len);}}}}}close(serv_sock);return 0;
}
void error_handling(char *buf)
{fputs(buf, stderr);fputc('\n', stderr);exit(1);
}
多进程并发服务器
模型思想:父进程负责连接,子进程负责通信
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <signal.h>
#include <sys/wait.h>
#include <unistd.h>
#define BUF_SIZE 30
void error_handling(char *buf);
void read_childproc(int sig);
int main(int argc, char *argv[])
{int serv_sock, clnt_sock;struct sockaddr_in serv_adr, cln_adr;pid_t pid;char buf[BUF_SIZE];// 注册信号struct sigaction act;act.sa_handler = read_childproc;sigemptyset(&atc.sa_mask);act.sa_flags = 0;sigaction(SIGCHLD, &act, 0);// 初始化网络serv_sock = socket(PF_INET, SOCK_STREAM, 0);memset(&serv_adr, 0, sizeof(serv_adr));serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);serv_adr.sin_port = htons(atoi(argv[1]));serv_adr.sin_family = AF_INET;if (bind(serv_sock, (struct sockaddr *)&serv_adr, sizeof(serv_adr)) != 0){error_handling("bind() error");return -1;}if (listen(serv_sock, 5) != 0){error_handling("listen() error");return -2;}while (1){socklen_t clnt_adr_len = sizeof(cln_adr);clnt_sock = accept(serv_sock, (struct sockaddr *)&cln_adr, &clnt_adr_len);if (clnt_sock == -1)continue;elseputs("new client connected...");pid = fork();if (pid == -1){close(clnt_sock);continue;}if (pid == 0) // 子进程(读写){close(serv_sock);int str_len;while ((str_len == read(clnt_sock, buf, BUF_SIZE)) != 0){write(clnt_sock, buf, strlen(buf));}close(clnt_sock);puts("clint disconnected...");return 0;}else{close(clnt_sock);}}close(serv_sock);return 0;
}// 回收子进程信号响应函数
void read_childproc(int sig)
{pid_t pid;int status;pid = waitpid(-1, &status, WNOHANG);printf("removed proc id:%d\n", pid);
}
多线程并发服务器
模型思想:主线程用于连接,开辟线程用于通信
多线程聊天服务器:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <unistd.h>
#include <pthread.h>
#define MAX_CLNT 100
#define BUF_SIZE 256
void error_handling(char *buf);
void read_childproc(int sig);
void send_msg(char *msg, int len);
void *handle_cnt(void *);
int clnt_cnt = 0;
int clnt_socks[MAX_CLNT];
pthread_mutex_t mutex;
int main(int argc, char *argv[])
{int serv_sock, clnt_sock;struct sockaddr_in serv_adr, cln_adr;char buf[BUF_SIZE];pthread_t tid;pthread_mutex_init(&mutex, NULL);// 初始化网络serv_sock = socket(PF_INET, SOCK_STREAM, 0);memset(&serv_adr, 0, sizeof(serv_adr));serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);serv_adr.sin_port = htons(atoi(argv[1]));serv_adr.sin_family = AF_INET;if (bind(serv_sock, (struct sockaddr *)&serv_adr, sizeof(serv_adr)) != 0){error_handling("bind() error");return -1;}if (listen(serv_sock, 5) != 0){error_handling("listen() error");return -2;}while (1){socklen_t clnt_adr_len = sizeof(cln_adr);clnt_sock = accept(serv_sock, (struct sockaddr *)&cln_adr, &clnt_adr_len);pthread_mutex_lock(&mutex);clnt_socks[clnt_cnt++] = clnt_sock;pthread_mutex_unlock(&mutex);// 开启线程用于通信pthread_create(&tid, NULL, handle_cnt, (void *)&clnt_sock);pthread_detach(tid);printf("Connected client IP:%s \n", inet_ntoa(cln_adr.sin_addr));}close(serv_sock);return 0;
}
void *handle_cnt(void *arg)
{int clnt_sock = *((int *)arg);int str_len = 0;char msg[BUF_SIZE];while ((str_len = read(clnt_sock, msg, BUF_SIZE)) != 0){puts(msg);send_msg(msg, str_len);}// 客户端断开连接pthread_mutex_lock(&mutex);for (int i = 0; i < clnt_cnt; i++){if (clnt_sock == clnt_socks[i]){while (i++ < clnt_cnt - 1){clnt_socks[i] = clnt_socks[i + 1];}break;}}clnt_cnt--;pthread_mutex_unlock(&mutex);close(clnt_sock);return NULL;
}void send_msg(char *msg, int len)
{pthread_mutex_lock(&mutex);for (int i = 0; i < clnt_cnt; i++){write(clnt_socks[i], msg, len);printf("message to :%d\n", clnt_socks[i]);}pthread_mutex_unlock(&mutex);
}void error_handling(char *buf)
{fputs(buf, stderr);fputc('\n', stderr);exit(1);
}
IOCP模型(Windows)
模型思想:创建完成端口对象,将连接套接字注册到完成端口对象中,向操作系统投递异步请求,开辟线程监听完成端口有已完成的I/O操作,并通知程序进行处理
注意:投递一个I/O请求,完成端口才会响应一次I/O操作
#pragma comment(lib, "ws2_32.lib")
#include<stdio.h>
#include<stdlib.h>
#include<WinSock2.h>
#include<process.h>
#include<iostream>
#define BUF_SIZE 100
#define READ 3
#define WRITE 5
//客户端地址信息结构体
typedef struct
{SOCKET hClntSock;SOCKADDR_IN clntAdr;
}PER_HANDLE_DATA,*LPPER_HANDLE_DATA;
typedef struct
{OVERLAPPED overlapped;//传递overlapped地址相当于传递整个结构体首地址WSABUF wsaBuf;char buffer[BUF_SIZE];//缓冲区int rwMode; //READ or WRITE IOCP不区分输入还是输出完成 只通知I/O完成状态
}PER_IO_DATA,*LPPER_IO_DATA;unsigned __stdcall EchoThreadMain(void* arg);//接收完成I/O的线程
void PrintWinsockError(const char* apiName);int main(int argc, char* argv[])
{WSADATA wsaData;if (WSAStartup(MAKEWORD(2, 2), &wsaData)!=0){PrintWinsockError("WSAStartup");return -1;}HANDLE hComPort;//完成端口LPPER_IO_DATA ioInfo;LPPER_HANDLE_DATA handleInfo;SOCKET hServSock;//服务器套接字SOCKADDR_IN servAdr;//服务器地址信息//创建完成端口 向CP对象分配此电脑CPU核数的线程hComPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);//创建CPU核数个线程(接收已完成的I/O操作结果)SYSTEM_INFO sysInfo;GetSystemInfo(&sysInfo);//获取系统信息for (int i = 0; i < sysInfo.dwNumberOfProcessors; i++){_beginthreadex(NULL, 0,EchoThreadMain, (LPVOID)hComPort,0,NULL);}//创建连接套接字(重叠结构非阻塞)hServSock = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);memset(&servAdr, 0, sizeof(servAdr));servAdr.sin_addr.s_addr = htonl(INADDR_ANY);servAdr.sin_port = htons(atoi(argv[1]));servAdr.sin_family = PF_INET;bind(hServSock, (SOCKADDR*)&servAdr, sizeof(servAdr));listen(hServSock, 5);while (1){SOCKET hClntSock;SOCKADDR_IN clntAdr;int addrLen = sizeof(clntAdr);hClntSock = accept(hServSock, (SOCKADDR*)&clntAdr, &addrLen);//等待连接//创建客户端地址信息结构体handleInfo = new PER_HANDLE_DATA;handleInfo->hClntSock = hClntSock;memcpy(&(handleInfo->clntAdr), &clntAdr, addrLen);//连接完成端口和已连接客户端套接字CreateIoCompletionPort((HANDLE)hClntSock, hComPort, (ULONG_PTR)handleInfo,0);//创建I/O操作需要的结构体 缓冲区 overlappedioInfo = new PER_IO_DATA;memset(&ioInfo->overlapped, 0, sizeof(OVERLAPPED));ioInfo->wsaBuf.buf = ioInfo->buffer;ioInfo->wsaBuf.len = BUF_SIZE;ioInfo->rwMode = READ;//投递异步Recv请求DWORD recvBytes, flags = 0;WSARecv(hClntSock, &ioInfo->wsaBuf, 1, &recvBytes, &flags, &ioInfo->overlapped, NULL);}return 0;
}
//线程函数
unsigned __stdcall EchoThreadMain(void* pComPort)
{HANDLE hComPort = (HANDLE)pComPort;SOCKET sock;DWORD bytesTrans;LPPER_HANDLE_DATA handleInfo;LPPER_IO_DATA ioInfo;DWORD flags = 0;while (1){//监听是否有I/O操作完成GetQueuedCompletionStatus(hComPort, &bytesTrans, (PULONG_PTR)&handleInfo,(LPOVERLAPPED*)&ioInfo, INFINITE);sock = handleInfo->hClntSock;if (ioInfo->rwMode == READ)//read 完成{puts("message received!");if (bytesTrans == 0)//传输EOF时{closesocket(sock);free(handleInfo);free(ioInfo);continue;}//投递异步Send请求(回声)memset(&(ioInfo->overlapped), 0, sizeof(OVERLAPPED));ioInfo->wsaBuf.len = bytesTrans;ioInfo->rwMode = WRITE;WSASend(sock, &ioInfo->wsaBuf, 1, NULL, 0, &ioInfo->overlapped, NULL);//投递异步Recv请求ioInfo = new PER_IO_DATA;memset(&ioInfo->overlapped, 0, sizeof(WSAOVERLAPPED));ioInfo->wsaBuf.buf = ioInfo->buffer;ioInfo->wsaBuf.len = BUF_SIZE;ioInfo->rwMode = READ;WSARecv(sock, &ioInfo->wsaBuf, 1, NULL, &flags, &ioInfo->overlapped, NULL);}else{puts("message sent!");free(ioInfo);}}return 0;
}
void PrintWinsockError(const char* apiName) {int errorCode = WSAGetLastError();LPVOID lpMsgBuf;DWORD bufLen = FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER |FORMAT_MESSAGE_FROM_SYSTEM |FORMAT_MESSAGE_IGNORE_INSERTS,NULL,errorCode,MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),(LPTSTR)&lpMsgBuf,0, NULL);if (bufLen) {std::cerr << apiName << " failed with error: " << lpMsgBuf << std::endl;LocalFree(lpMsgBuf);}else {std::cerr << apiName << " failed with unknown error code: " << errorCode << std::endl;}
}
异步重叠I/O模型(Windows)
模型思想:向操作系统投递异步I/O请求,当有I/O操作完成时,调取相应响应函数(Completion Routine函数),并通过 overlapped 传递已连接的客户端信息 特点:每个客户端都需要一个overlapped结构,投递一次I/O请求,操作系统仅通知一次
注意:该模型操作系统会异步 处理 I/O操作,处理完成后操作系统再将处理结果返回给程序
缺点:重复调用非阻塞模式的accept函数和进入alertable wait状态为目的WleepEx函数严重影响性能
#include<iostream>
#include<stdio.h>
#include<WinSock2.h>
#define BUF_SIZE 1024
void CALLBACK ReadCompRoutine(DWORD, DWORD, LPWSAOVERLAPPED, DWORD);//读完成接收响应函数
void CALLBACK WriteCompRoutine(DWORD, DWORD, LPWSAOVERLAPPED, DWORD);//写完成接收响应函数
void PrintWinsockError(const char* apiName);//错误打印函数
/
typedef struct {SOCKET hClntSock;//套接字句柄char buf[BUF_SIZE];//缓冲区WSABUF wsaBuf;
}PER_IO_DATA,*LPPER_IO_DATA;int main(int argc, char* argv[])
{WSADATA wsaData;if (WSAStartup(MAKEWORD(2,2),&wsaData)!=0){PrintWinsockError("WSAStartup");return -1;}SOCKET hLisnSock, hRecvSock;SOCKADDR_IN lisnAdr, recvAdr;LPWSAOVERLAPPED lpOvLp;DWORD recvBytes;LPPER_IO_DATA hbInfo;DWORD mode = 1, flagInfo = 0;//创建非阻塞连接套接字hLisnSock = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);ioctlsocket(hLisnSock, FIONBIO, &mode);//设置非阻塞memset(&lisnAdr, 0, sizeof(lisnAdr));lisnAdr.sin_family = PF_INET;lisnAdr.sin_addr.s_addr = htonl(INADDR_ANY);lisnAdr.sin_port = htons(atoi(argv[1]));if (bind(hLisnSock, (SOCKADDR*)&lisnAdr, sizeof(lisnAdr))==SOCKET_ERROR){PrintWinsockError("bind");return -2;}if (listen(hLisnSock,5) == SOCKET_ERROR){PrintWinsockError("listen");return -2;}int RecvAdr_len = sizeof(recvAdr);while (1){SleepEx(100, TRUE);//使线程处于alertable wait状态(等待接收操作系统消息状态)hRecvSock = accept(hLisnSock, (SOCKADDR*)&recvAdr, &RecvAdr_len);//非阻塞接收连接if (hRecvSock == INVALID_SOCKET){if (WSAGetLastError() == WSAEWOULDBLOCK)//非阻塞没有客户端连接{continue;//继续监听连接}else{PrintWinsockError("accept");}}printf("Client connected...\n");lpOvLp = new WSAOVERLAPPED;//创建一个重叠结构memset(lpOvLp, 0, sizeof(WSAOVERLAPPED));//储存客户端信息hbInfo = new PER_IO_DATA;hbInfo->hClntSock = hRecvSock;hbInfo->wsaBuf.buf = hbInfo->buf;hbInfo->wsaBuf.len = BUF_SIZE;lpOvLp->hEvent = (HANDLE)hbInfo;//利用hEvent成员传递客户端信息//投递异步Recv请求WSARecv(hRecvSock, &hbInfo->wsaBuf, 1, &recvBytes, &flagInfo, lpOvLp, ReadCompRoutine);}closesocket(hRecvSock);closesocket(hLisnSock);WSACleanup();return 0;
}//异步数据接收完成操作系统通知函数
void CALLBACK ReadCompRoutine(DWORD dwError,DWORD szRecvRytes,LPWSAOVERLAPPED lpOverlapped,DWORD flags)
{LPPER_IO_DATA hbInfo = (LPPER_IO_DATA)(lpOverlapped->hEvent);//传递过来的客户端信息SOCKET hSock = hbInfo->hClntSock;LPWSABUF bufInfo = &(hbInfo->wsaBuf);//操作系统将接收完成的数据已经写入DWORD sentBytes;if (szRecvRytes == 0)//客户端关闭,没有收到数据{closesocket(hSock);free(lpOverlapped->hEvent);free(lpOverlapped);printf("Client disconnected...");}else{puts(bufInfo->buf);bufInfo->len = szRecvRytes;//投递异步Send请求WSASend(hSock, bufInfo, 1, &sentBytes, 0, lpOverlapped, WriteCompRoutine);}
}
//异步数据发送完成操作系统通知函数
void CALLBACK WriteCompRoutine(DWORD dwError, DWORD szRecvRytes, LPWSAOVERLAPPED lpOverlapped, DWORD flags)
{LPPER_IO_DATA hbInfo = (LPPER_IO_DATA)(lpOverlapped->hEvent);SOCKET hSock = hbInfo->hClntSock;LPWSABUF bufInfo = &(hbInfo->wsaBuf);DWORD recvBytes;DWORD flagInfo = 0;//投递Recv异步请求WSARecv(hSock, bufInfo, 1, &recvBytes, &flagInfo, lpOverlapped, ReadCompRoutine);
}void PrintWinsockError(const char* apiName) {int errorCode = WSAGetLastError();LPVOID lpMsgBuf;DWORD bufLen = FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER |FORMAT_MESSAGE_FROM_SYSTEM |FORMAT_MESSAGE_IGNORE_INSERTS,NULL,errorCode,MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),(LPTSTR)&lpMsgBuf,0, NULL);if (bufLen) {std::cerr << apiName << " failed with error: " << lpMsgBuf << std::endl;LocalFree(lpMsgBuf);}else {std::cerr << apiName << " failed with unknown error code: " << errorCode << std::endl;}
}
异步通知I/O模型(Windows)
模型思想:将套接字和内核事件对象绑定,通过内核事件对象的状态变化(no-signaled→signaled),判定对应套接字是否有I/O操作,并通知程序有I/O操作需要处理 特点:一个套接字需要对应创建一个内核对象
注意:该模型只是异步 通知 有I/O操作需要处理,但操作系统不异步处理I/O操作
缺点:仅异步通知有I/O操作,不异步处理I/O操作
#include<string.h>
#include<stdio.h>
#include<WinSock2.h>
#define BUF_SIZE 100
void CompressSockets(SOCKET hSockArr[], int idx, int total);
void CompressEvent(WSAEVENT hEventArr[], int idx, int total);
char msg[BUF_SIZE];//缓冲区
int main(int argc,char* argv[])
{WSADATA wsaData;//初始化网络if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0){printf("WSAStartup error!\n");return -1;}SOCKET hServSock, hClentSock;//服务器客户端套接字SOCKADDR_IN servAdr, clntAdr;//服务器客户端地址SOCKET hSockArr[WSA_MAXIMUM_WAIT_EVENTS];//套接字数组WSAEVENT hEventArr[WSA_MAXIMUM_WAIT_EVENTS];//句柄数组int numOfClntSock = 0;//套接字数量WSAEVENT newEvent;//事件对象//初始化服务器套接字hServSock = socket(PF_INET, SOCK_STREAM, 0);memset(&servAdr, 0, sizeof(servAdr));servAdr.sin_family = AF_INET;servAdr.sin_addr.s_addr = htonl(INADDR_ANY);servAdr.sin_port = ntohs(atoi(argv[1]));if (bind(hServSock, (SOCKADDR*)&servAdr, sizeof(servAdr)) == SOCKET_ERROR){printf("bind error!\n");return -2;}if (listen(hServSock, 5) == SOCKET_ERROR){printf("listen error!\n");return -3;}newEvent = WSACreateEvent();//创建一个事件对象if (WSAEventSelect(hServSock, newEvent, FD_ACCEPT) == SOCKET_ERROR)//连接事件对象的套接字 套接字发生FD_ACCEPT事件,newEvent内核对象改变为signaled状态{printf("listen error!\n");return -4;}hSockArr[numOfClntSock] = hServSock;//加入套接字hEventArr[numOfClntSock] = newEvent;//加入句柄numOfClntSock++;//开始监听事件while (1){int posInfo, StartIdx;//StartIdx = posInfo-WSA_WAIT_EVENT_0; 转变为signaled状态的事件对象最小句柄索引posInfo = WSAWaitForMultipleEvents(numOfClntSock, hEventArr, FALSE, WSA_INFINITE, FALSE);//等待一个事件发生StartIdx = posInfo - WSA_WAIT_EVENT_0; //转变为signaled状态的事件对象的最小句柄的索引for (int i = StartIdx; i < numOfClntSock; i++){int sigEventIdx = WSAWaitForMultipleEvents(1, &hEventArr[i], TRUE, 0, FALSE);//遍历其他每个事件对象(非阻塞)if (sigEventIdx == WSA_WAIT_FAILED || sigEventIdx == WSA_WAIT_TIMEOUT)//i对应内核对象没有发生事件{continue;}else//i内核对象发生了事件{WSANETWORKEVENTS netEvents;//保存事件类型和错误信息sigEventIdx = i;WSAEnumNetworkEvents(hSockArr[sigEventIdx], hEventArr[sigEventIdx], &netEvents);//区分事件类型if (netEvents.lNetworkEvents & FD_ACCEPT)//连接事件{if (netEvents.iErrorCode[FD_ACCEPT_BIT])//错误{printf("Accept Error!\n");return -5;}int clntAdrLen = sizeof(clntAdr);//客户端地址长度hClentSock = accept(hSockArr[sigEventIdx], (SOCKADDR*)&clntAdr, &clntAdrLen);//接收连接newEvent = WSACreateEvent();//创建新内核事件对象WSAEventSelect(hClentSock, newEvent, FD_READ | FD_CLOSE);//连接事件对象的套接hEventArr[numOfClntSock] = newEvent;hSockArr[numOfClntSock] = hClentSock;numOfClntSock++;printf("connected new client...");}if (netEvents.lNetworkEvents & FD_READ)//通信事件{if (netEvents.iErrorCode[FD_READ_BIT] != 0){printf("read error");return -6;}int strLen = recv(hSockArr[sigEventIdx], msg, sizeof(msg), 0);send(hSockArr[sigEventIdx], msg, strLen, 0);}if (netEvents.lNetworkEvents & FD_CLOSE)//断开连接事件{if (netEvents.iErrorCode[FD_CLOSE_BIT] != 0){printf("Close Error");break;}//关闭套接字和事件对象WSACloseEvent(hEventArr[sigEventIdx]);closesocket(hSockArr[sigEventIdx]);//调整套接字数组和事件数组平衡numOfClntSock--;CompressEvent(hEventArr, sigEventIdx, numOfClntSock);CompressSockets(hSockArr, sigEventIdx, numOfClntSock);}}}}WSACleanup();return 0;
}void CompressSockets(SOCKET hSockArr[], int idx, int total)
{for (int i=idx; i < total; i++){hSockArr[i] = hSockArr[i + 1];}
}
void CompressEvent(WSAEVENT hEventArr[], int idx, int total)
{for (int i=idx; i < total; i++){hEventArr[i] = hEventArr[i + 1];}
}