基于主从Reactor模型实现高并发服务器

目录

  • 1. 项目简介
    • 1.1 环境介绍
    • 1.2 项目定位
    • 1.3 功能模块整体划分
  • 2. Reactor简介
    • 2.1 Reactor模型分析
    • 2.2 多Reactor多线程分析:多I/O多路复用+线程池(业务处理)
  • 3. 日志宏的编写
  • 4. Server模块
    • 4.1 Buffer模块
      • 4.1.1 Buffer的功能
      • 4.1.2 Buffer的实现思想
      • 4.1.3 Buffer的实现
    • 4.2 Socket模块
      • 4.2.1 Socket的功能
      • 4.2.2 Socket的实现
    • 4.3 TimeQueue模块
      • 4.3.1 TimeQueue的功能
      • 4.3.2 时间轮的思想
      • 4.3.3 定时器任务TimerTask类
      • 4.3.4 TimerWheel 类
    • 4.4 Any模块
      • 4.4.1 Any的功能
      • 4.4.2 Any的实现
    • 4.5 Channel模块
      • 4.5.1 Channel的功能
      • 4.5.2 Channel的实现
    • 4.6 Acceptor模块
      • 4.6.1 Acceptor的功能
      • 4.6.2 Acceptor的实现
    • 4.7 Poller模块
      • 4.7.1 Poller的功能
      • 4.7.2 Poller的实现
    • 4.8 EventLoop模块
      • 4.8.1 EventLoop的功能
      • 4.8.2 EventLoop的实现
    • 4.9 Connection模块
      • 4.9.1 Connection的功能
      • 4.9.2 Connection的实现
    • 4.10 LoopThreadPool模块
      • 4.10.1 LoopThreadPool的功能
      • 4.10.2 LoopThread的功能
      • 4.10.3 LoopThread的实现
      • 4.10.4 LoopThreadPool的实现
    • 4.11 TcpServer模块
      • 4.11.1 TcpServer模块的功能
      • 4.11.2 TcpServer模块的实现
  • 5. Http协议模块
    • 5.1 Util模块
    • 5.2 HttpRequest模块
    • 5.3 HttpResponse模块
    • 5.4 HttpContext模块
    • 5.5 HttpServer模块
  • 6. 对服务器整体测试
    • 6.1 长连接测试
    • 6.2 超时连接测试
    • 6.3 错误请求测试
    • 6.4 业务处理超时测试
    • 6.5 同时多条请求测试
    • 6.6 大文件传输测试

1. 项目简介

1.1 环境介绍

  1. 服务器部署:Linux-Centos – 2核2G的腾讯云服务器。
  2. 项目用到的技术栈:C++、多线程、Epoll等。

1.2 项目定位

  1. 主(从)Reactor模型服务器,主Reactor线程只负责监听描述符,获取新建连接。这样就保证了新连接的获取比较高效,提高了服务器的并发性能。主Reactor获取到新连接后分发给子Reactor进行通信事件监控。
  2. 子(从)Reactor线程监控各自文件描述符下的读写事件,进行数据读写以及业务处理。
  3. One Thread One Loop的思想就是把所有的操作都放到线程中进行,一个线程对应一个EventLoop。

1.3 功能模块整体划分

项目实现目标:带有协议支持的Reactor模型高性能服务器。模块划分如下:

  1. Server模块:实现Reactor模型的TCP服务器。

  1. 协议模块:对于自主实现的Reactor模型服务器提供应用层协议支持,项目中支持的Http协议。

2. Reactor简介

2.1 Reactor模型分析

在高性能的I/O设计中,Reactor模型用于同步I/O。

优点:

  1. 响应快,不必为单个同步时间所阻塞(虽然Reactor本身依然是同步的);
  2. 可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销。
    • 可扩展性:可以方便地通过增加Reactor实例个数来充分利用CPU资源。
    • 可复用性,Reactor模型本身与具体事件处理逻辑无关,具有很高的复用性。
  3. Rector模型基于事件驱动,特别适合处理海量的I/O。

2.2 多Reactor多线程分析:多I/O多路复用+线程池(业务处理)

  1. 在主Reactor中处理新连接请求事件,有新连接到来则分发到⼦Reactor中监控。
  2. 在⼦Reactor中进⾏客⼾端通信监控,有事件触发,则接收数据分发给Worker线程池。
  3. Worker线程池分配独⽴的线程进⾏具体的业务处理。⼯作线程处理完毕后,将响应交给⼦Reactor线程进⾏数据响应。
  4. 优点:充分的利用了CPU多核资源,主从Reactor各自完成各自的任务。

3. 日志宏的编写

在实现各个模块之前先将日志实现方便打印找到程序问题所在:

#pragma once
#include <iostream>
#include <string>
#include <ctime>#define NORMAL 1
#define WARNING 2
#define DEBUG 3
#define FATAL 4
#define LOG(LEVEL, MESSAGE) log(#LEVEL, MESSAGE, __FILE__, __LINE__)void log(std::string level, std::string message, std::string file, int line)
{time_t now = time(0);char* currentTime = ctime(&now);time_t nowtime;time(&nowtime);                //获取1970年1月1日0点0分0秒到现在经过的秒数tm* p = localtime(&nowtime);   //将秒数转换为本地时间,年从1900算起,需要+1900,月为0-11,所以要+1std::cout << "[" << level << " ";printf("%04d:%02d:%02d %02d:%02d:%02d", p->tm_year + 1900, p->tm_mon + 1, p->tm_mday,p->tm_hour,p->tm_min,p->tm_sec);std::cout << " " << file << " : " << line << "]" << " " << message << std::endl;
}

4. Server模块

该模块主要实现TCP服务器,该模块包含了:Buffer模块、TimeQueue模块、Any模块、Socket模块、Acceptor模块、Poller模块、Channel模块、Connection模块、LoopThreadPool模块、EventLoop模块、TcpServer模块。

以下将上述模块的基本功能实现。

4.1 Buffer模块

4.1.1 Buffer的功能

4.1.2 Buffer的实现思想

Buffer实现思想如下:

  1. 想要实现缓冲区首先要有一块内存空间,使用vector,vector的底层使用的就是一个线性的内存空间。
  2. 一个读偏移记录当前读取数据的位置。一个写偏移记录当前的写入位置。
  3. 写入数据:从写偏移的位置开始写入,如果后续空间足够直接写,反之就扩容:这里的扩容比较特殊,可以从整体空闲空间(当数据被读取,读偏移会向后移动,前面的空间是空闲的状态)和写偏移后的空闲空间两种情况考虑,如果整体空间足够,将现有数据移动到起始位置。如果不够,扩容,从当前写位置开始扩容足够的大小。数据写入成功后,写偏移记得向后偏移。

  1. 读取数据:从当前读偏移开始读取,前提是有数据可读。可读数据的大小–写偏移和读偏移之间的数据。

4.1.3 Buffer的实现

(1)文字描述Buffer类接口的具体设计:

(2)Buffer类的设计接口函数:

#include <vector>
#include <cstdint>#define BUFFER_DEFAULT_SIZE 1024    // Buffer 默认起始大小
class Buffer
{
public:Buffer():_reader_idx(0),_writer_idx(0),_buffer(BUFFER_DEFAULT_SIZE) {}// 获取当前写入起始地址void *WirtePosition();// 获取当前读取起始地址void *ReadPosition();// 获取缓冲区末尾空闲空间大小--写偏移之后的空闲空间uint64_t TailIdleSize();// 获取缓冲区起始空闲空间大小--读偏移之前的空闲空间uint64_t HeadIdleSize();// 获取可读数据大小uint16_t ReadAbleSize();// 将读偏移向后移动void MoveReadOffset(uint64_t len);// 将写偏移向后移动void MoveWriteOffset(uint64_t len);// 确保可写空间足够(整体空闲空间够了就移动数据。否则就扩容)void EnsureWriteSpace(uint64_t len);// 写入数据void Write(void *data, uint64_t len);// 读取数据void Read(void *buf, uint64_t len);// 清空缓冲区void Clear();private:std::vector<char> _buffer; // 使用vector进行内存空间管理uint64_t _reader_idx;      // 读偏移uint64_t _writer_idx;      // 写偏移
};

(3)Buffer类的接口函数实现:

#pragma once
#include <iostream>
#include <vector>
#include <cassert>
#include <string>
#include <cstring>using std::cout;
using std::endl;#define BUFFER_DEFAULT_SIZE 1024class Buffer
{
public:Buffer():_reader_index(0),_writer_index(0),_buffer(BUFFER_DEFAULT_SIZE){}char* Begin(){return &(*_buffer.begin());}// 获取当前写入起始地址, _buffer的空间起始地址,加上写偏移量char* WritePosition(){return Begin() + _writer_index;}// 获取当前读取起始地址char* ReadPosition(){return Begin() + _reader_index;}// 获取缓冲区末尾空闲空间大小-->写偏移之后的空闲空间, 总体空间大小减去写偏移uint64_t TailIdleSize(){return _buffer.size() - _writer_index;}// 获取缓冲区起始空闲空间大小-->读偏移之前的空闲空间uint64_t HeadIdleSize(){return _reader_index;}// 获取可读数据大小 = 写偏移 - 读偏移uint64_t ReadAbleSize(){return _writer_index - _reader_index;}// 将读偏移向后移动void MoveReadOffset(uint64_t len){// 向后移动的大小,必须小于可读数据大小assert(len <= ReadAbleSize());_reader_index += len;}// 将写偏移向后移动void MoveWriteOffset(uint64_t len){// 向后移动的大小,必须小于当前后边的空闲空间大小assert(len <= TailIdleSize());_writer_index += len;}// 确保可写空间足够(整体空闲空间够了就移动数据,否则就扩容)void EnsureWriteSpace(uint64_t len){// 如果末尾空闲空间大小足够,直接返回if(TailIdleSize() >= len){return;}// 末尾空闲空间不够,则判断加上起始位置的空闲空间大小是否足够, 够了就将数据移动到起始位置if(len <= TailIdleSize() + HeadIdleSize()){uint64_t rsz = ReadAbleSize();  //把当前数据大小先保存起来std::copy(ReadPosition(), ReadPosition() + rsz, Begin());  //把可读数据拷贝到起始位置_reader_index = 0;     //将读偏移归0_writer_index = rsz;   //将写位置置为可读数据大小, 因为当前的可读数据大小就是写偏移量}else{// 总体空间不够,则需要扩容,不移动数据,直接给写偏移之后扩容足够空间即可_buffer.resize(_writer_index + 2 * len);}}//写入数据void Write(const void* data, uint64_t len){//1. 保证有足够空间,2. 拷贝数据进去if(len == 0){return;}EnsureWriteSpace(len);const char* dt = (const char*)data;std::copy(dt, dt + len, WritePosition());}void WriteAndPush(const void* data, uint64_t len){Write(data, len);MoveWriteOffset(len);}void WriteString(const std::string& data){Write(data.c_str(), data.size());}void WriteStringAndPush(const std::string& data){WriteString(data);MoveWriteOffset(data.size());}void WriteBuffer(Buffer& data){Write(data.ReadPosition(), data.ReadAbleSize());}void WriteBufferAndPush(Buffer& data){WriteBuffer(data);MoveWriteOffset(data.ReadAbleSize());}//读取数据void Read(void* buff, uint64_t len){assert(len <= ReadAbleSize());std::copy(ReadPosition(), ReadPosition() + len, (char*)buff);}void ReadAndPop(void* buff, uint64_t len){Read(buff, len);MoveReadOffset(len);}std::string ReadString(uint64_t len){assert(len <= ReadAbleSize());std::string str;str.resize(len);Read(&str[0], len);return str;}std::string ReadStringAndPop(uint64_t len){std::string str = ReadString(len);MoveReadOffset(len);return str;}//寻找换行字符char* FindCRLF(){char* res = (char*)memchr(ReadPosition(), '\n', ReadAbleSize());return res;}//通常获取一行数据,这种情况针对是std::string GetLine(){char* pos = FindCRLF();if(pos == nullptr){return "";}// +1是为了把换行字符也取出来return ReadString(pos - ReadPosition() + 1);}std::string GetLineAndPop(){std::string str = GetLine();MoveReadOffset(str.size());return str;}//清空缓冲区void Clear(){//只需要将偏移量归0即可_reader_index = 0;_writer_index = 0;}private:std::vector<char> _buffer;uint64_t _reader_index;uint64_t _writer_index;
};

4.2 Socket模块

4.2.1 Socket的功能

4.2.2 Socket的实现

(1)Socket类的设计接口函数:

// 套接字类
#define MAX_LISTEN 1024
class Sock
{
public:Sock();Sock(int fd);~Sock();// 创建套接字bool Socket();// 绑定地址信息bool Bind(const std::string &ip, uint64_t port);// 开始监听bool Listen(int backlog = MAX_LISTEN);// 向服务器发起连接bool Connect(const std::string &ip, uint64_t port);// 获取新连接int Accept();// 接收数据ssize_t Recv(void* buf, size_t len, int flag = 0);  // 0 阻塞// 发送数据ssize_t Send(void* buf, size_t len, int flag = 0);// 关闭套接字void Close();// 创建一个服务器连接bool CreateServer(uint64_t port, const std::string &ip = "0.0.0.0"); // 接收全部// 创建一个客户端连接bool CreateClient(uint64_t port, const std::string &ip);// 设置套接字选项 -- 开启地址端口重用void ReuseAddress();// 设置套接字阻塞属性 -- 设置为非阻塞void NonBlock();private:int _sockfd;
};  

(2)Socket类接口函数实现:

#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
#include <cstring>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <unistd.h>
#include <fcntl.h>
#include "log.hpp"const int backlog = 10;class Sock
{
public:Sock():_sockfd(-1){}Sock(int fd):_sockfd(fd) {}//创建套接字bool Socket(){_sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);if(_sockfd < 0){LOG(FATAL, "socker error");return false;}return true;}//绑定地址信息bool Bind(const uint16_t& port){sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = INADDR_ANY;  //INADDR_ANY相当于"0.0.0.0"int n = bind(_sockfd, (sockaddr*)&local, sizeof(local));if(n < 0){LOG(FATAL, "bind error");return false;}return true;}//开始监听bool Listen(){int n = listen(_sockfd, backlog);if(n < 0){LOG(FATAL, "listen error");return false;}return true;}//获取新连接int Accept(std::string& clientip, uint16_t& clientport){sockaddr_in peer;socklen_t len = 0;int newfd = accept(_sockfd, (sockaddr*)&peer, &len);if(newfd < 0){LOG(FATAL, "accept error");return -1;}char buffer[64];inet_ntop(AF_INET, &peer.sin_addr, buffer, sizeof(buffer));clientip = buffer;clientport = ntohs(peer.sin_port);return newfd;}//向服务器发起连接int Connect(const std::string& serverip, const uint16_t serverport){sockaddr_in server;memset(&server, 0, sizeof(server));server.sin_family = AF_INET;server.sin_port = htons(serverport);server.sin_addr.s_addr = inet_addr(serverip.c_str());int n = connect(_sockfd, (sockaddr*)&server, sizeof(server));if(n < 0){LOG(FATAL, "connect error");return -1;}return 0;}//接收数据ssize_t Recv(void* buf, size_t len, int flag = 0){ssize_t ret = recv(_sockfd, buf, len, flag);if(ret <= 0){if(errno == EAGAIN || errno == EINTR){return 0;}LOG(FATAL, "recv error");return -1;}return ret;  //实际接收的数据长度}ssize_t NonBlockRecv(void* buf, size_t len) {return Recv(buf, len, MSG_DONTWAIT);  // MSG_DONTWAIT 表示当前接收为非阻塞。}//发送数据ssize_t Send(void* buf, int len, int flag = 0){ssize_t ret = send(_sockfd, buf, len, flag);if(ret < 0){if(errno == EAGAIN || errno == EINTR){return 0;}LOG(FATAL, "send error");return -1;}return ret;  //实际发送的数据长度}ssize_t NonBlockSend(void* buf, size_t len){if(len == 0){return 0;}return Send(buf, len, MSG_DONTWAIT);  // MSG_DONTWAIT 表示当前发送为非阻塞。}void Close(){close(_sockfd);_sockfd = -1;}int Getsock(){return _sockfd;}//创建一个服务端连接bool CreatServer(uint16_t port, const std::string& ip = "0.0.0.0", bool block_flag = false){if(Socket() == false){return false;}if(block_flag){NonBlock();}if(Bind(port) == false){return false;}if(Listen() == false){return false;}ReuseAddress();return true;}//创建一个客户端连接bool CreateClient(uint16_t port, const std::string& ip) {//1. 创建套接字,2.指向连接服务器if(Socket() == false){return false;}if(Connect(ip, port) == false){return false;}return true;}// 设置套接字选项---开启地址端口重用void ReuseAddress(){// int setsockopt(int fd, int leve, int optname, void *val, int vallen)int val = 1;setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void*)&val, sizeof(int));val = 1;setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void*)&val, sizeof(int));}// 设置套接字阻塞属性-- 设置为非阻塞void NonBlock(){// int fcntl(int fd, int cmd, ... /* arg */ );int flag = fcntl(_sockfd, F_GETFL, 0);fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);}~Sock(){Close();}private:int _sockfd;
};

4.3 TimeQueue模块

4.3.1 TimeQueue的功能

模块介绍:实现固定时间,执行定时任务的模块 — 定时任务管理器。向该模块添加一个任务,任务将在固定时间后被执行,同时也可以对定时任务进行刷新,延迟该任务执行,当然也可以通过接口取消定时任务。

时间轮是一种 实现延迟功能(定时器)的巧妙算法。如果一个系统存在大量的任务调度,时间轮可以高效的利用线程资源来进行批量化调度。把大批量的调度任务全部都绑定时间轮上,通过时间轮进行所有任务的管理,触发以及运行。能够高效地管理各种延时任务,周期任务,通知任务等。

4.3.2 时间轮的思想

  • 如上图所示,时间轮的实现通过定义数组模拟,并且有一个秒针指向数组的起始位置,这个指针向后走,走到哪里代表哪里的任务要被执行,假设我们要一个任务5秒后执行,只需要将任务添加到_second_hand + 5的位置,秒针每秒向后走一步,5秒后秒针指向对应的位置,定时任务执行。
  • 需要注意的是,在同一时间,可能会有大批量的定时任务。因此我们只需要在数组对应的位置下拉一个数组即可。这样就可以在同一时刻添加多个任务了。

4.3.3 定时器任务TimerTask类

(1)类的具体功能:

(2)具体实现如下:

using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;
class TimerTask
{
public:TimerTask(uint64_t id, uint32_t delay, const TaskFunc& cb):_id(id),_timeout(delay),_task_cb(cb),_canceled(false){}void Cancel(){ _canceled = true; }void SetRelease(const ReleaseFunc &cb) { _release = cb; }uint32_t DelayTime() { return _timeout; }~TimerTask(){if(_canceled == false){_task_cb();}_release();}private:uint64_t _id;         // 定时器任务对象IDuint32_t _timeout;    // 定时任务的超时时间bool _canceled;       // false-表示没有被取消, true-表示被取消TaskFunc _task_cb;    // 定时器对象要执行的定时任务ReleaseFunc _release; // 用于删除TimerWheel中保存的定时器对象信息
};

4.3.4 TimerWheel 类

(1)类的具体功能:

  • 该模块主要是对Connection对象的生命周期进行管理,对非活跃连接进行超时后的释放。
  • 该模块内部包含有一个timerfd。以下是timefd相关函数的认识:

  • 该模块内部含有一个Channel对象:实现对timerfd的事件就绪回调处理。

(2)具体实现如下:

class TimerWheel 
{
public:TimerWheel(EventLoop* loop):_capacity(60),_tick(0),_wheel(_capacity),_loop(loop),_timerfd(CreateTimerfd()),_timer_channel(new Channel(_loop, _timerfd)){_timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this));_timer_channel->EnableRead();   //启动读事件监控}void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc& cb);//刷新/延迟定时任务void TimerRefresh(uint64_t id);void TimerCancel(uint64_t id);//这个接口存在线程安全问题--这个接口实际上不能被外界使用者调用,只能在模块内,在对应的EventLoop线程内执行bool HasTimer(uint64_t id){auto it = _timers.find(id);if(it == _timers.end()){return false;}return true;}private:void RemoveTimer(uint64_t id){auto it = _timers.find(id);if (it != _timers.end()){_timers.erase(it);}}int ReadTimefd(){uint64_t times;//有可能因为其他描述符的事件处理花费事件比较长,然后在处理定时器描述符事件的时候,有可能就已经超时了很多次//read读取到的数据times就是从上一次read之后超时的次数int ret = read(_timerfd, &times, 8);if (ret < 0){LOG(FATAL, "read timer error!");abort();}return times;}//这个函数应该每秒钟被执行一次,相当于秒针向后走了一步void RunTimerTask(){_tick = (_tick + 1) % _capacity;_wheel[_tick].clear();  //清空指定位置的数组,就会把数组中保存的所有管理定时器对象的shared_ptr释放掉}void OnTime(){// 根据实际超时的次数,执行对应的超时任务int times = ReadTimefd();for (int i = 0; i < times; i++){RunTimerTask();}}static int CreateTimerfd(){int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);if(timerfd < 0){LOG(FATAL, "timerfd_create error");abort();}itimerspec itime;itime.it_value.tv_sec = 1;itime.it_value.tv_nsec = 0; // 第一次超时时间为1s后itime.it_interval.tv_sec = 1;itime.it_interval.tv_nsec = 0; // 第一次超时后,每次超时的间隔时timerfd_settime(timerfd, 0, &itime, nullptr);return timerfd;}void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc &cb) {PtrTask pt(new TimerTask(id, delay, cb));pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id));int pos = (_tick + delay) % _capacity;_wheel[pos].push_back(pt);_timers[id] = WeakTask(pt);}void TimerRefreshInLoop(uint64_t id){// 通过保存的定时器对象的weak_ptr构造一个shared_ptr出来,添加到轮子中auto it = _timers.find(id);if(it == _timers.end()){return; // 没找着定时任务,没法刷新,没法延迟}PtrTask pt = it->second.lock(); // lock获取weak_ptr管理的对象对应的shared_ptrint delay = pt->DelayTime();int pos = (_tick + delay) % _capacity;_wheel[pos].push_back(pt);}void TimerCancelInLoop(uint64_t id){auto it = _timers.find(id);if(it == _timers.end()){return; // 没找着定时任务,没法刷新,没法延迟}PtrTask pt = it->second.lock();if(pt){pt->Cancel();}}private:using WeakTask = std::weak_ptr<TimerTask>;using PtrTask = std::shared_ptr<TimerTask>;int _tick; //当前的秒针,走到哪里释放哪里,释放哪里,就相当于执行哪里的任务int _capacity;  //表盘最大数量---其实就是最大延迟时间std::vector<std::vector<PtrTask>> _wheel;std::unordered_map<uint64_t, WeakTask> _timers;EventLoop* _loop;int _timerfd;  //定时器描述符--可读事件回调就是读取计数器,执行定时任务std::unique_ptr<Channel> _timer_channel;
};

4.4 Any模块

4.4.1 Any的功能

  • Connection模块中需要设置协议处理的上下⽂来控制处理节奏。但是应⽤层协议有很多,这个协议接收解析上下⽂就不能有明显的协议倾向,它可以是任意协议的上下⽂信息,因此就需要⼀个通⽤的类型来保存各种不同的数据结构。
  • Any内部设计⼀个模板容器holder类,可以保存各种类型数据。因为在Any类中⽆法定义这个holder对象或指针,因为Any也不知道这个类要保存什么类型的数据,因此⽆法传递类型参数。所以,定义⼀个基类placehoder,让holder继承于placeholde,⽽Any类保存⽗类指针即可。当需要保存数据时,则new⼀个带有模板参数的⼦类holder对象出来保存数据,然后让Any类中的⽗类指针,指向这个⼦类对象就搞定了。

4.4.2 Any的实现

具体实现如下:

class Any
{
public:Any() :_content(nullptr) {}template <class T>Any(const T& val) :_content(new placeholder<T>(val)) {}Any(const Any& other) :_content(other._content ? other._content->clone() : nullptr) {}~Any() { delete _content; }Any &swap(Any &other){std::swap(_content, other._content);return *this;}// 返回子类对象保存的数据的指针template <class T>T *get(){// 想要获取的数据类型,必须和保存的数据类型一致assert(typeid(T) == _content->type());return &((placeholder<T> *)_content)->_val;}// 赋值运算符的重载函数template <class T>Any &operator=(const T &val){// 为val构造一个临时的通用容器,然后与当前容器自身进行指针交换,临时对象释放的时候,原先保存的数据也就被释放Any(val).swap(*this);return *this;}Any &operator=(const Any &other){Any(other).swap(*this);return *this;}private:class holder{public:virtual ~holder() {}virtual const std::type_info &type() = 0;virtual holder *clone() = 0;};template <class T>class placeholder : public holder{public:placeholder(const T &val) : _val(val) {}// 获取子类对象保存的数据类型virtual const std::type_info &type() { return typeid(T); }// 针对当前的对象自身,克隆出一个新的子类对象virtual holder *clone() { return new placeholder(_val); }public:T _val;};holder *_content;
};

4.5 Channel模块

4.5.1 Channel的功能


模块介绍:该模块的主要功能是对每一个描述符上的IO事件进行管理,实现对描述符可读,可写,错误等事件的管理操作。以及,Poller模块对描述符进行IO事件监控 的事件就绪后,根据事件,回调不同的函数。

4.5.2 Channel的实现

(1)Channel类的设计接口函数:

class Channel
{
public:Channel();void SetReadCallback(const EventCallback &cb);void SetWriteCallback(const EventCallback &cb);void SetErrorCallback(const EventCallback &cb);void SetCloseCallback(const EventCallback &cb);void SetEventCallback(const EventCallback &cb);bool ReadAble();     // 当前是否监控了可读bool WriteAble();    // 当前是否监控了可写void EnableRead();   // 启动读事件监控void EnableWrite();  // 启动写事件监控void DisableRead();  // 关闭读事件监控void DisableWrite(); // 关闭写事件监控void Remove();       // 移除监控void HandleEvent();  // 事件处理,一旦触发了事件,就调用这个函数,自己触发了什么事件如何处理自己决定private:int _fd;EventLoop* _loop;uint32_t _events;  // 当前需要监控的事件uint32_t _revents; // 当前连接触发的事件using EventCallback = std::function<void()>;EventCallback _read_callback;  // 可读事件被触发的回调函数EventCallback _write_callback; // 可写事件被触发的回调函数EventCallback _error_callback; // 错误事件被触发的回调函数EventCallback _close_callback; // 连接断开事件被触发的回调函数EventCallback _event_callback; // 任意事件被触发的回调函数
};

(2)Channel类接口函数实现:

class Channel
{
public:using EventCallback = std::function<void()>;Channel(EventLoop* loop, int fd):_fd(fd),_events(0),_revents(0),_loop(loop){}int GetFd(){return _fd;}//获取想要监控的事件uint32_t GetEvents() { return _events;}//设置实际就绪的事件void SetREvents(uint32_t events) { _revents = events;}void SetReadCallback(const EventCallback& cb){ _read_callback = cb; }void SetWriteCallback(const EventCallback& cb) { _write_callback = cb; }void SetErrorCallback(const EventCallback& cb) { _error_callback = cb; }void SetCloseCallback(const EventCallback& cb) { _close_callback = cb; }void SetEventCallback(const EventCallback& cb) {_event_callback = cb; }//当前是否监控了可读bool ReadAble(){return (_events & EPOLLIN);}//当前是否监控了可写bool WriteAble(){return (_events & EPOLLOUT);}//启动读事件监控void EnableRead() { _events |= EPOLLIN;Update();}//启动写事件监控void EnableWrite() {_events |= EPOLLOUT;Update();}//关闭读事件监控void DisableRead(){_events &= ~EPOLLIN;Update();}//关闭写事件监控void DisableWrite(){_events &= ~EPOLLOUT;Update();}//关闭所有事件监控void DisableAll() { _events = 0;Update();}//移除监控void Remove();//添加或修改监控void Update();//事件处理,一旦连接触发了事件,就调用这个函数,自己触发了什么事件如何处理自己决定void HandleEvent(){if((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)){/*不管任何事件,都调用的回调函数*/if(_read_callback){_read_callback();}}/*有可能会释放连接的操作事件,一次只处理一个*/if(_revents & EPOLLOUT){if(_write_callback){_write_callback();}}else if(_revents & EPOLLERR){if(_error_callback){_error_callback();}}else if(_revents & EPOLLHUP){if(_close_callback){_close_callback();}}if(_event_callback){_event_callback();}}private:int _fd;uint32_t _events;  // 当前需要监控的事件uint32_t _revents; // 当前连接触发的事件EventLoop* _loop;   EventCallback _read_callback;  // 可读事件被触发的回调函数EventCallback _write_callback; // 可写事件被触发的回调函数EventCallback _error_callback; // 错误事件被触发的回调函数EventCallback _close_callback; // 连接断开事件被触发的回调函数EventCallback _event_callback; // 任意事件被触发的回调函数
};

Channel模块需要和EventLoop模块一起才可以进行测试,因此Channel类的代码只进行了编译测试,测试发现并没有语法上的错误,后面有什么逻辑的错误只能等后续完善该类的时候再来进行测试了。

4.6 Acceptor模块

4.6.1 Acceptor的功能

(1)模块介绍对Socket和Channel模块的整体封装,实现对一个监听套接字的整体管理。
=

  • 该模块中包含一个Socket对象,实现监听套接字的操作。
  • 该模块中包含一个Channel对象,实现监听套接字IO事件就绪的处理。

(2)Accept模块处理流程:

  1. 向Channel提供可读事件的IO事件处理回调函数 — 获取新连接。
  2. 为新连接构建一个Connection对象,通过该对象设置各种回调。

4.6.2 Acceptor的实现

(1)Acceptor类的设计接口函数:

// 监听套接字管理类
class Acceptor
{
public:Acceptor();void SetAcceptCallback();private:/*监听套接字的读事件回调处理函数 -- 获取新连接,调用_accept_callback函数进行新连接管理*/void HandleRead();//启动服务器int CreateServer(int port)Sock _socket;   // 用于创建监听套接字EventLoop *_loop; // 用于对监听套接字进行事件监控Channel _channel; // 用于对监控套接字进行事件管理using AcceptCallback = std::function<void(int)>;AcceptCallback _accept_callback;
};

(2)Acceptor类接口函数实现:

class Acceptor
{
public:using AcceptCallback = std::function<void(int)>;//不能将启动读事件监控,放到构造函数中,必须在设置回调函数后,再去启动//否则有可能造成启动监控后,立即有事件,处理的时候,回调函数还没设置:新连接得不到处理,且资源泄漏Acceptor(EventLoop* loop, int port):_socket(CreateServer(port)),_loop(loop),_channel(_loop, _socket.Getsock()){_channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));}void SetAcceptCallback(const AcceptCallback& cb) { _accept_callback = cb; }void Listen(){_channel.EnableRead();}private://监听套接字的读事件回调处理函数---获取新连接,调用_accept_callback函数进行新连接处理void HandleRead(){std::string clientip;uint16_t clientport;int newfd = _socket.Accept(clientip, clientport);if (newfd < 0) {return;}if (_accept_callback){_accept_callback(newfd);}}int CreateServer(int port){bool ret = _socket.CreatServer(port);assert(ret == true);return _socket.Getsock();}Sock _socket;     // 用于创建监听套接字EventLoop* _loop; // 用于对监听套接字进行事件监控Channel _channel; // 用于对监听套接字进行事件管理AcceptCallback _accept_callback;
};

4.7 Poller模块

4.7.1 Poller的功能


模块介绍对epoll进行封装,主要实现epoll的IO事件添加,修改,移除,获取活跃连接功能。

4.7.2 Poller的实现

(1)文字描述Poller类接口的具体设计:


(2)Poller类的设计接口函数:

//Poller描述符监控类
#define MAX_EPOLLEVENTS
class Poller
{
public:Poller();// 添加或修改监控事件void UpdateEvent(Channel *channel);// 移除监控void RemoveEvent(Channel *channel);// 开始监控, 返回活跃连接void Poll(std::vector<Channel*> *active);private:// 对epoll的直接操作void Update(Channel *channel, int op);bool HasChannel(Channel *Channel);private:int _epfd;struct epoll_event _evs[MAX_EPOLLEVENTS];std::unordered_map<int, Channel *> _channels;
};

(3)Poller类接口函数实现:

#define MAX_EPOLLEVENTS 1024
class Poller
{
public:Poller(){_epfd = epoll_create(MAX_EPOLLEVENTS);if(_epfd < 0){LOG(FATAL, "epoll_create error");abort();  //退出程序}}//添加或修改监控事件void UpdateEvent(Channel* channel){bool ret = HasChannel(channel);if(ret == false){//不存在则添加_channels.insert(std::make_pair(channel->GetFd(), channel));return Update(channel, EPOLL_CTL_ADD);}Update(channel, EPOLL_CTL_MOD);}//移除监控void RemoveEvent(Channel* channel){auto it = _channels.find(channel->GetFd());if(it != _channels.end()){_channels.erase(it);}Update(channel, EPOLL_CTL_DEL);}//开始监控,返回活跃连接void Poll(std::vector<Channel*>* active){// int epoll_wait(int epfd, struct epoll_event *evs, int maxevents, int timeout)int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1);if(nfds < 0){if (errno == EINTR){return;}LOG(FATAL, "epoll_wait error");abort();}for(int i = 0; i < nfds; i++){auto it = _channels.find(_evs[i].data.fd);assert(it != _channels.end());it->second->SetREvents(_evs[i].events);  //设置实际就绪的事件active->push_back(it->second);}}private://对epoll的直接操作void Update(Channel* channel, int op) {// int epoll_ctl(int epfd, int op,  int fd,  struct epoll_event *ev);int fd = channel->GetFd();epoll_event ev;ev.data.fd = fd;ev.events = channel->GetEvents();int ret = epoll_ctl(_epfd, op, fd, &ev);if(ret < 0){LOG(FATAL, "epoll_ctl error");}}// 判断一个Channel是否已经添加了事件监控bool HasChannel(Channel* channel){auto it = _channels.find(channel->GetFd());if (it == _channels.end()){return false;}return true;}int _epfd;std::unordered_map<int, Channel*> _channels;epoll_event _evs[MAX_EPOLLEVENTS];
};

4.8 EventLoop模块

4.8.1 EventLoop的功能

模块介绍EventLoop模块对Poller,TimerQueue,Socket模块进行了封装。也是Reactor模型模块。

4.8.2 EventLoop的实现

(1)文字描述EventLoop类接口的具体设计:

  • EventLoop模块必须是一个对象对应一个线程,线程内部运行EventLoop的启动函数。
  • EventLoop模块为了保证整个服务器的线程安全问题,因此要求使用者对于Connection的所有操作一定要在其对应的EventLoop线程内完成。
  • EventLoop模块保证自己内部所监控的所有描述符都要是活跃连接,非活跃连接就要及时的释放避免资源浪费。
  • EventLoop模块内部包含一个eventfd:内核提供的事件fd,专门用于事件通知。

  • EventLoop模块内部含有一个Poller对象,用于进行描述符的IO事件管理。
  • EventLoop模块内部包含有一个TimeQueue对象,用于进行定时任务的管理。
  • EventLoop模块中包含一个任务队列,组件使用者要对Connection进行的所有操作,都加入到任务队列中并由EventLoop模块进行管理,并在EventLoop对应的线程中进行执行。
  • 每一个Connection对象都会绑定到一个EventLoop上,这样一来对连接的所有操作就能保证在一个线程中进行。
  • 通过Poller模块对当前模块管理内的所有描述符进行IO事件监控,当有描述符事件就绪后,通过描述符对应的Channel进行事件的处理。
  • 所有就绪的描述符IO事件处理完毕后,对任务队列中的所有操作进行顺序执行。
  • epoll的事件监控,有可能会因为没有事件到来而持续阻塞。导致任务队列中的任务不能得到及时的处理。对此的处理方式是创建一个eventfd,添加到Poller的事件监控中,每当向任务队列添加任务的时候,通过向eventdf写入数据来唤醒epoll的阻塞。

(2)EventLoop类的设计接口函数:

// EventLoop事件监控处理类
class EventLoop
{
public:using Functor = std::function<void()>;EventLoop();void RunInLoop(const Functor &cb); // 判断将要执行的任务是否处于当前线程中,如果是则执行,否则压入队列void QueueInLoop(const Functor &cb); // 将操作压入任务池bool IsInLoop(); // 用于判断当前线程是否是EventLoop对应的线程void UpdateEvent(Channel *channel); // 添加/修改描述符的事件监控void RemoveEvent(Channel *channel); // 移除描述符的监控void Start(); // 三步走--事件监控-》就绪事件处理-》执行任务void RunAllTask();private:std::thread::id _thread_id; //线程IDint _event_fd;  //eventfd唤醒IO事件监控有可能导致的阻塞std::unique_ptr<Channel> _event_channel;Poller _poller;std::vector<Functor> _tasks;std::mutex _mutex;  //实现任务池操作的线程安全TimerWheel _timer_wheel;  //定时器模块
};

(3)EventLoop类接口函数实现:

class EventLoop
{
public:using Functor = std::function<void()>;//执行任务池中的所有任务void RunAllTask(){std::vector<Functor> functor;{std::unique_lock<std::mutex> _lock(_mutex);_tasks.swap(functor);}for(auto& f : functor){f();}return;}//获取event_fdstatic int CreateEventFd(){int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if(efd < 0){LOG(FATAL, "eventfd error");abort();}return efd;}void ReadEventfd(){uint64_t res = 0;int ret = read(_event_fd, &res, sizeof(res));if(ret < 0){//EINTR -- 被信号打断;   EAGAIN -- 表示无数据可读if(errno == EINTR || errno == EAGAIN){return;}LOG(FATAL, "read error");abort();}return;}void WeakUpEventFd(){uint64_t val = 1;int ret = write(_event_fd, &val, sizeof(val));if(ret < 0){if(errno == EINTR) {return;}LOG(FATAL, "read error");abort();}return;}public:EventLoop():_thread_id(std::this_thread::get_id()),_event_fd(CreateEventFd()),_event_channel(new Channel(this, _event_fd)),_timer_wheel(this){//给eventfd添加可读事件回调函数,读取eventfd事件通知次数_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));//启动eventfd的读事件监控_event_channel->EnableRead();}//用于判断当前线程是否是EventLoop对应的线程;bool IsInLoop(){return (_thread_id == std::this_thread::get_id());}void AssertInLoop(){assert(_thread_id == std::this_thread::get_id());}//判断将要执行的任务是否处于当前线程中,如果是则执行,不是则压入队列。void RunInLoop(const Functor& cb){if(IsInLoop()){return cb();}return QueueInLoop(cb);}//将操作压入任务池void QueueInLoop(const Functor& cb){{std::unique_lock<std::mutex> _lock(_mutex);_tasks.push_back(std::move(cb));}//唤醒有可能因为没有事件就绪,而导致的epoll阻塞;//其实就是给eventfd写入一个数据,eventfd就会触发可读事件WeakUpEventFd();}//添加/修改描述符的事件监控void UpDataEvent(Channel* channel){return _poller.UpdateEvent(channel);}//移除描述符的监控void RemoveEvent(Channel* channel){return _poller.RemoveEvent(channel);}//三步走--事件监控-》就绪事件处理-》执行任务void Start(){while(1){// 1. 事件监控,std::vector<Channel *> actives;_poller.Poll(&actives);// 2. 事件处理。for (auto &channel : actives){channel->HandleEvent();}// 3. 执行任务RunAllTask();}}void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc& cb) {return _timer_wheel.TimerAdd(id, delay, cb);}void TimerRefresh(uint64_t id){return _timer_wheel.TimerRefresh(id);}void TimerCancel(uint64_t id){return _timer_wheel.TimerCancel(id);}bool HasTimer(uint64_t id){return _timer_wheel.HasTimer(id);}private:std::thread::id _thread_id; //线程IDint _event_fd;  //eventfd唤醒IO事件监控有可能导致的阻塞std::unique_ptr<Channel> _event_channel;Poller _poller;std::vector<Functor> _tasks;std::mutex _mutex;  //实现任务池操作的线程安全TimerWheel _timer_wheel;  //定时器模块
};

4.9 Connection模块

4.9.1 Connection的功能

模块介绍该模块是一个对Buffer/Socket/Channel模块的整体封装,实现了对套接字的整体管理。每一个进行数据通信的套接字(accept获取到的新连接)都会构造一个Connetction对象进行管理。

4.9.2 Connection的实现

(1)模块分析:

  1. 该模块内部包含由组件使用者提供的回调函数:连接建立完成回调,事件回调,新数据回调,关闭回调。
  2. 该模块包含两个组件使用者提供的接口,数据发送接口和连接关闭接口。
  3. 该模块中包含两个用户态缓冲区:用户态接收缓冲区和用户态发送缓冲区。
  4. 该模块中包含一个Socket对象,完成描述符面向系统的IO操作。
  5. 该模块中包含一个Channel对象,完成描述符IO事件就绪的处理。

(2)该模块的处理流程:

  1. 向Channel提供可读,可写,错误等不同事件的IO事件回调函数,将Channel和对应的描述符添加到Poller事件监控中。
  2. 当描述符在Poller模块中就绪了IO可读事件后,调用该描述符对应Channel中保存的读事件处理函数,进行数据读取,读取的过程本质上是将socket接收缓冲区中的数据 读到Connection管理的用户态接收数据中。
  3. 业务处理完毕后,通过Connection提供的数据发送接口,将数据写入到Connection的发送缓冲区中。
  4. 启动描述符在Poll模块中的IO事件监控,事件就绪后,调用Channel中保存的写事件处理函数,将发送缓冲区中的数据通过Sockert进行数据的真正发送。

(3)Connection类的设计接口函数:

// DISCONECTED -- 连接关闭状态  CONNECTING -- 连接建立成功-待处理状态
// CONNECTED -- 连接建立完成,各种设置已完成,可以通信状态    DISCONNECTING -- 待关闭状态
typedef enum
{DISCONECTED,CONNECTING,CONNECTED,DISCONNECTING
} ConnStatu;using PtrConnection = std::shared_ptr<Connection>;
class Connection
{
public:Connection(EventLoop *loop, uint64_t conn_id, int sockfd);~Connection();int Fd();                            // 获取管理的文件描述符int Id();                            // 获取连接IDbool Connected();                    // 是否处于CONNECTED状态void SetContext(const Any &context); // 设置上下文--连接建立完成时进行调用Any *GetContext();                   // 获取上下文,返回的是指针void SetConnectedCallback(const ConnectedCallback &cb);void SetMessageCallback(const MessageCallback &cb);void SetClosedCallback(const ClosedCallback &cb);void SetAnyEventCallback(const AnyEventCallback &cb);void Established();                  // 连接建立就绪后,进行channel回调设置,启动读监控,调用_connect_callbackvoid Send(char *data, size_t len);   // 发送数据,将数据发送到发送缓冲区,启动写事件监控void Shutdown();                     // 提供给组件使用者的关闭接口--并不实际关闭,需要判断有没有数据待处理void EnableInactiveRelease(int sec); // 启动非活跃销毁,并定义多长时间无通信就是非活跃,添加定时任务void CancelInactiveRelease();        // 取消非活跃销毁// 切换协议--重置上下文以及阶段性处理函数void Upgrade(const Context, const ConnectedCallback &conn,const ClosedCallback &closed, const AnyEventCallback &event);
private:/*五个channel的事件回调函数*/void HandleRead();        // 描述符可读事件触发后调用的函数,接收socket数据放到接收缓冲区中,然后调用_message_callbackvoid HandleWrite();       // 描述符可写事件触发后调用的函数,将发送缓冲区中的数据进行发送void HandleClose();       // 描述符触发挂断事件void HandleError();       // 描述符触发出错事件void HandleEvent();       // 描述符触发任意事件void EstablishedInLoop(); // 连接获取之后,所处的状态要进行各种设置(给channel设置事件回调,启动读监控)void ReleaseInLoop();     // 这个接口才是实际的释放接口void SendInLoop(char *data, size_t len);void ShutdownInLoop();void EnableInactiveReleaseInLoop(int sec);void CancelInactiveReleaseInLoop();void UpgradeInLoop(const Context &context,const ConnectedCallback &conn,const MessageCallback &msg,const ClosedCallback &closed,const AnyEventCallback &event);
private:uint64_t _conn_id; // 连接的唯一ID,便于连接的管理和查找// uint64_t _timer_id; // 定时器ID,必须是唯一的,这块是为了简化操作使用conn_id作为定时器int _sockfd;                   // 连接关联的文件描述符bool _enable_inactive_release; // 连接是否启动非活跃的判断标志,默认为falseEventLoop *_loop;              // 连接所关联的一个EventLoopConnStatu _statu;              // 连接状态Socket _socket;                // 套接字操作管理Channel _channel;              // 连接的事件管理Buffer _in_buffer;             // 输入缓冲区--存放从socket中读取到的数据Buffer _out_buffer;            // 输出缓冲区--存放要发送给对端的数据Any _context;                  // 请求的接收处理上下文/*这四个回调函数,是让服务器模块来设置的(其实服务器模块的处理回调也是组件使用者设置的)*//*换句话来说,这几个回调都是组件使用者使用的*/using ConnectedCallback = std::function<void(const PtrConnection &)>;using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection &)>;using AnyEventCallback = std::function<void(const PtrConnection &)>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closef_callback;AnyEventCallback _event_callback;/*组件内的连接关闭回调--组件内设置的,因为服务器组件内会把所有的连接管理起来,一旦某个连接要关闭*//*就应该从管理的地方移除掉自己的信息*/ClosedCallback _server_closed_callback;
};

(4)Connection类接口函数实现:

class Connection;using PtrConnection = std::shared_ptr<Connection>;
class Connection : public std::enable_shared_from_this<Connection>
{
public://这四个回调函数,是让服务器模块来设置的(其实服务器模块的处理回调也是组件使用者设置的)//换句话说,这几个回调都是组件使用者使用的using ConnectedCallback = std::function<void(const PtrConnection&)>;using MessageCallback = std::function<void(const PtrConnection&, Buffer*)>;using ClosedCallback = std::function<void(const PtrConnection&)>;using AnyEventCallback = std::function<void(const PtrConnection&)>;Connection(EventLoop* loop, uint64_t con_id, int sockfd):_con_id(con_id),_sockfd(sockfd),_enable_inactive_release(false),_statu(CONNECTING),_loop(loop),_socket(_sockfd),_channel(loop, _sockfd){_channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));_channel.SetEventCallback(std::bind(&Connection::HandleEvent, this));_channel.SetReadCallback(std::bind(&Connection::HandleRead, this));_channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));_channel.SetErrorCallback(std::bind(&Connection::HandleError, this));}//获取管理的文件描述符int GetFd(){return _sockfd;}//获取连接IDint GetConnfd(){return _con_id;}//是否处于CONNECTED状态bool GetConnStatu(){return _statu == CONNECTED;}//设置上下文--连接建立完成时进行调用void SetContext(const Any& context){_context = context;}//获取上下文,返回的是指针Any* GetContext(){return &_context;}void SetConnectedCallback(const ConnectedCallback& cb){_connected_callback = cb;}void SetMessageCallback(const MessageCallback& cb){_message_callback = cb;}void SetClosedCallback(const ClosedCallback& cb){_closed_callback = cb;}void SetAnyEventCallback(const AnyEventCallback& cb){_event_callback = cb;}void SetSrvClosedCallback(const ClosedCallback& cb){_server_closed_callback = cb; }//连接建立就绪后,进行channel回调设置,启动读监控,调用_connected_callbackvoid Established() {_loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this));}//发送数据,将数据放到发送缓冲区,启动写事件监控void Send(const char* data, size_t len){//外界传入的data,可能是个临时的空间,我们现在只是把发送操作压入了任务池,有可能并没有被立即执行//因此有可能执行的时候,data指向的空间有可能已经被释放了。Buffer buf;buf.WriteAndPush(data, len);_loop->RunInLoop(std::bind(&Connection::SendInLoop, this, std::move(buf)));}//提供给组件使用者的关闭接口--并不实际关闭,需要判断有没有数据待处理void Shutdown(){_loop->RunInLoop(std::bind(&Connection::ShutdownInLoop, this));}void Release(){_loop->QueueInLoop(std::bind(&Connection::ReleaseInLoop, this));}//启动非活跃销毁,并定义多长时间无通信就是非活跃,添加定时任务void EnableInactiveRelease(int sec){_loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec));}//取消非活跃销毁void CancelInactiiveRelease(){_loop->RunInLoop(std::bind(&Connection::CancelInactiiveReleaseInLoop, this));}//切换协议---重置上下文以及阶段性回调处理函数 -- 而是这个接口必须在EventLoop线程中立即执行//防备新的事件触发后,处理的时候,切换任务还没有被执行--会导致数据使用原协议处理了。void Upgrade(const Any& context,const ConnectedCallback& conn,const MessageCallback& msg,const ClosedCallback& closed,const AnyEventCallback& event){_loop->AssertInLoop();_loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, conn, msg, closed, event));}~Connection(){LOG(NORMAL, "Release Connention");}private://描述符可读事件触发后调用的函数,接收socket数据放到接收缓冲区中,然后调用_message_callbackvoid HandleRead(){//1. 接收socket的数据,放到缓冲区char buf[65536];ssize_t ret = _socket.NonBlockRecv(buf, 65535);if(ret < 0){// 出错了,不能直接关闭连接LOG(WARNING, "HandleRead error");return ShutdownInLoop();}//这里的等于0表示的是没有读取到数据,而并不是连接断开了,连接断开返回的是-1//将数据放入输入缓冲区,写入之后顺便将写偏移向后移动_in_buffer.WriteAndPush(buf, ret);//2. 调用message_callback进行业务处理if(_in_buffer.ReadAbleSize() > 0){//shared_from_this--从当前对象自身获取自身的shared_ptr管理对象return _message_callback(shared_from_this(), &_in_buffer);}}//描述符可写事件触发后调用的函数,将发送缓冲区中的数据进行发送void HandleWrite(){ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize());if(ret < 0){// 发送错误就该关闭连接了,if(_in_buffer.ReadAbleSize() > 0){_message_callback(shared_from_this(), &_in_buffer);}return Release(); // 这时候就是实际的关闭释放操作了。}_out_buffer.MoveReadOffset(ret);  //千万不要忘了,将读偏移向后移动if (_out_buffer.ReadAbleSize() == 0){_channel.DisableWrite(); // 没有数据待发送了,关闭写事件监控// 如果当前是连接待关闭状态,则有数据,发送完数据释放连接,没有数据则直接释放if(_statu == DISCONNECTING){return Release();}}}//描述符触发挂断事件void HandleClose(){if(_in_buffer.ReadAbleSize() > 0){_message_callback(shared_from_this(), &_in_buffer);}Release();}//描述符触发出错事件void HandleError(){return HandleClose();}//描述符触发任意事件: 1. 刷新连接的活跃度--延迟定时销毁任务;  2. 调用组件使用者的任意事件回调void HandleEvent(){if(_enable_inactive_release == true){_loop->TimerRefresh(_con_id);}if(_event_callback){_event_callback(shared_from_this());}}//连接获取之后,所处的状态下要进行各种设置(启动读监控,调用回调函数)void EstablishedInLoop(){// 1. 修改连接状态;  2. 启动读事件监控;  3. 调用回调函数assert(_statu == CONNECTING);  //当前的状态必须一定是上层的半连接状态_statu = CONNECTED;  //当前函数执行完毕,则连接进入已完成连接状态// 一旦启动读事件监控就有可能会立即触发读事件,如果这时候启动了非活跃连接销毁_channel.EnableRead();if(_connected_callback){_connected_callback(shared_from_this());}}//这个接口才是实际的释放接口void ReleaseInLoop(){//1. 修改连接状态,将其置为DISCONNECTED_statu = DISCONNECTED;//2. 移除连接的事件监控_channel.Remove();//3. 关闭描述符_socket.Close();//4. 如果当前定时器队列中还有定时销毁任务,则取消任务if(_loop->HasTimer(_con_id)){CancelInactiiveReleaseInLoop();}//5. 调用关闭回调函数,避免先移除服务器管理的连接信息导致Connection被释放,再去处理会出错,因此先调用用户的回调函数if(_closed_callback){_closed_callback(shared_from_this());}//移除服务器内部管理的连接信息if (_server_closed_callback){_server_closed_callback(shared_from_this());}}void SendInLoop(Buffer buf){if(_statu == DISCONNECTED){return;}_out_buffer.WriteBufferAndPush(buf);if(_channel.WriteAble() == false){_channel.EnableWrite();}}//这个关闭操作并非实际的连接释放操作,需要判断还有没有数据待处理,待发送void ShutdownInLoop(){_statu = DISCONNECTING;// 设置连接为半关闭状态if(_in_buffer.ReadAbleSize() > 0){if(_message_callback){_message_callback(shared_from_this(), &_in_buffer);}}//要么就是写入数据的时候出错关闭,要么就是没有待发送数据,直接关闭if(_out_buffer.ReadAbleSize() > 0){if(_channel.WriteAble() == false){_channel.EnableWrite();}}if(_out_buffer.ReadAbleSize() == 0){Release();}}//启动非活跃连接超时释放规则void EnableInactiveReleaseInLoop(int sec){//1. 将判断标志 _enable_inactive_release 置为true_enable_inactive_release = true;//2. 如果当前定时销毁任务已经存在,那就刷新延迟一下即可if(_loop->HasTimer(_con_id)){_loop->TimerRefresh(_con_id);}//3. 如果不存在定时销毁任务,则新增_loop->TimerAdd(_con_id, sec, std::bind(&Connection::Release, this));}void CancelInactiiveReleaseInLoop(){_enable_inactive_release = false;if(_loop->HasTimer(_con_id)){_loop->TimerCancel(_con_id);}}void UpgradeInLoop(const Any& context,const ConnectedCallback& conn,const MessageCallback& msg,const ClosedCallback& closed,const AnyEventCallback& event){_context = context;_connected_callback = conn;_message_callback = msg;_closed_callback = closed;_event_callback = event;}private:uint64_t _con_id;  // 连接的唯一ID,便于连接的管理和查找//uint64_t _timer_id;   //定时器ID,必须是唯一的,这块为了简化操作使用conn_id作为定时器IDint _sockfd;     // 连接关联的文件描述符bool _enable_inactive_release;  // 连接是否启动非活跃销毁的判断标志,默认为falseEventLoop* _loop;ConnStatu _statu;    // 连接状态Sock _socket;        // 套接字操作管理Channel _channel;    // 连接的事件管理Buffer _in_buffer;   // 输入缓冲区---存放从socket中读取到的数据Buffer _out_buffer;  // 输出缓冲区---存放要发送给对端的数据Any _context;        // 请求的接收处理上下文ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;//组件内的连接关闭回调--组件内设置的,因为服务器组件内会把所有的连接管理起来,一旦某个连接要关闭//就应该从管理的地方移除掉自己的信息ClosedCallback _server_closed_callback;
};

4.10 LoopThreadPool模块

4.10.1 LoopThreadPool的功能

  • LoopThread模块的功能就是将EventLoop模块与thread整合到一起。
  • EventLoop模块实例化的对象,在构造的时候会初始化_thread_id。在后续的操作中,通过当前线程ID和EventLoop模块中的_thread_id进行一个比较,相同就表示在同一个线程,不同就表示当前运行的线程并不是EventLoop线程。因此,我们必须先创建线程,然后在线程的入口函数中,去实例化EventLoop对象。
  • LoopThreadPool模块的功能主要是对所有的LoopThread进行管理及分配。
  • 在服务器中,主Reactor负责新连接的获取,从属线程负责新连接的事件监控及处理,因此当前的线程池,游有可能从属线程数量为0。也就是实现单Reactor服务器,一个线程既负责获取连接,也负责连接的处理。该模块就是对0个或者多个LoopThread对象进行管理。

关于线程分配,当主线程获取了一个新连接,需要将新连接挂到从属线程上进行事件监控及处理。假设有0个从属线程,则直接分配给主线程的EventLoop进行处理。假设有多个从属线程,采用轮转的思想,进行线程的分配(将对应线程的EventLoop获取到,设置给对应的Connection)。

在实现LoopThreadPool类之前需要实现LoopThread类。

4.10.2 LoopThread的功能

EventLoop模块在实例化对象的时候,必须在线程内部,EventLoop在实例化对象时会设置自己的thread_id,如果我们先创建了多个EventLoop对象,然后创建了多个线程,将各个线程的id,重新给EventLoop进行设置存在问题:那就是在构造EventLoop对象,到设置新的thread_id期间将是不可控的。

因此我们必须先创建线程,然后在线程的入口函数中,去实例化EventLoop对象构造一个新的模块:LoopThread。该模块的功能是将EventLoop与thread整合到一起:

  1. 创建线程
  2. 在线程中实例化EventLoop对象

LoopThread功能:可以向外部返回所实例化的EventLoop。

4.10.3 LoopThread的实现

具体实现如下:

class LoopThread
{
public:/*创建线程,设定线程入口函数*/LoopThread():_loop(nullptr),_thread(std::thread(&LoopThread::ThreadEntry, this)){}/*返回当前线程关联的EventLoop对象指针*/EventLoop* GetLoop(){EventLoop* loop = nullptr;{std::unique_lock<std::mutex> lock(_mutex); // 加锁_cond.wait(lock, [&](){return _loop != nullptr; });  // loop为NULL就一直阻塞loop = _loop;}return loop;}private:/*实例化 EventLoop 对象,唤醒_cond上有可能阻塞的线程,并且开始运行EventLoop模块的功能*/void ThreadEntry() {EventLoop loop;{std::unique_lock<std::mutex> lock(_mutex);//加锁_loop = &loop;_cond.notify_all();}loop.Start();}/*用于实现_loop获取的同步关系,避免线程创建了,但是_loop还没有实例化之前去获取_loop*/std::mutex _mutex;       // 互斥锁std::condition_variable _cond; // 条件变量EventLoop* _loop;        // EventLoop指针变量,这个对象需要在线程内实例化std::thread _thread;     // EventLoop对应的线程
};

4.10.4 LoopThreadPool的实现

LoopThreadPool类接口函数实现:

class LoopThreadPool
{
public:LoopThreadPool(EventLoop* baseloop):_thread_count(0),_next_idx(0),_baseloop(baseloop){}void SetThreadCount(int count) { _thread_count = count; }void Create(){if(_thread_count > 0){_threads.resize(_thread_count);_loops.resize(_thread_count);for(int i = 0; i < _thread_count; i++){_threads[i] = new LoopThread();_loops[i] = _threads[i]->GetLoop();}}}EventLoop* NextLoop(){if(_thread_count == 0){return _baseloop;}_next_idx = (_next_idx + 1) % _thread_count;return _loops[_next_idx];}private:int _thread_count;   //从属线程的个数int _next_idx;EventLoop* _baseloop;  //主EventLoopstd::vector<LoopThread*> _threads;   //保存所有LoopThread对象std::vector<EventLoop*> _loops;  //从属线程大于0则从_loops种进行线程EventLoop分配
};

4.11 TcpServer模块

4.11.1 TcpServer模块的功能

TcpServer模块功能:对所有模块的整合,通过TcpServer模块实例化的对象,可以非常简单的完成一个服务器的搭建。

(1)TcpServer模块的管理:

  1. Acceptor对象,创建一个监听套接字。
  2. EventLoop对象, baseloop对象, 实现对监听套接字的事件监控。
  3. std::unordered_ map <uint64_ t, PtrConnection> conns, 实现对所有新建连接的管理。
  4. LoopThreadPool对象,创建loop线程池, 对新建连接进行事件监控及处理。

(2)TcpServer模块的主要功能:

  1. 设置从属线程池数量。
  2. 启动服务器。
  3. 设置各种回调函数(连接建立完成,消息,关闭,任意), 用户设置给TcpServer, TcpServer设置给获取的新连接。
  4. 是否启动非活跃连接超时销毁功能。
  5. 添加定时任务功能。

(3)TcpServer模块的工作流程:

  1. 在TcpServer中实例化一个Acceptor对象, 以及一个EventLoop对象(baseloop)。
  2. 将Acceptor挂到baseloop上进行事件监控。
  3. 一旦Acceptor对象就绪了可读事件,则执行读事件回调函数获取新建连接。
  4. 对新连接创建一个Connection进行管理。
  5. 对连接对应的Connection设置功能回调(连接完成回调,消息回调,关闭回调,任意事件回调)。
  6. 启动Connection的非活跃连接的超时销毁规则。
  7. 将新连接对应的Connection挂到LoopThreadPool中的从属线程对应的Eventloop中进行事件监控。
  8. 一旦Connection对应的连接就绪了可读事件,则这时候执行读事件回调函数,读取数据,读取完毕后调用TcpServer设置的消息回调。

4.11.2 TcpServer模块的实现

(1)TcpServer类的设计接口函数:

// TcpServer服务器管理模块(即全部模块的整合)
class TcpServer
{
private:uint64_t _next_id;                                  // 这是一个自动增长的连接IDint _port;int _timeout;                                       // 这是非活跃连接的统计时间--多长时间无通信就是非活跃连接bool _enable_inactive_release;                      // 是否启动非活跃连接超时销毁的判断标志EventLoop _baseloop;                                // 这是主线程的EventLoop对象,负责监听事件的处理Acceptor _acceptor;                                 // 这是监听套接字的管理对象LoopThreadPool _pool;                               // 这是从属EventLoop线程池std::unordered_map<uint64_t, PtrConnection> _conns; // 保管所有连接对应的share_ptr对象,这里面的对象被删除,就意味这某一个连接被删除using ConnectedCallback = std::function<void(const PtrConnection &)>;using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection &)>;using AnyEventCallback = std::function<void(const PtrConnection &)>;using Functor = std::function<void()>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;
private:void NewConnection(int fd); // 为新连接构造一个Connection进行管理void RemoveConnection(); // 从管理Connection的_conns移除连接信息
public:TcpServer();void SetThreadCount(int count);void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; }void SetMessageCallback(const MessageCallback &cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; }void EnableInactiveRelease(int timeout);void RunAfter(const Functor &task, int delay);  // 用于添加一个定时任务void Start();
};

(2)TcpServer类的设计接口函数实现:

class TcpServer
{
private:uint64_t _next_id; //这是一个自动增长的连接ID,int _port;int _timeout;bool _enable_inactive_release;EventLoop _base_loop;Acceptor _acceptor;LoopThreadPool _pool;   //这是从属EventLoop线程池std::unordered_map<uint64_t, PtrConnection> _conns; //保存管理所有连接对应的shared_ptr对象using ConnectedCallback = std::function<void(const PtrConnection&)>;using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection&)>;using AnyEventCallback = std::function<void(const PtrConnection&)>;using Functor = std::function<void()>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;private:void NewConnection(int fd){_next_id++;PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd));conn->SetMessageCallback(_message_callback);conn->SetClosedCallback(_closed_callback);conn->SetConnectedCallback(_connected_callback);conn->SetAnyEventCallback(_event_callback);conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));if(_enable_inactive_release){conn->EnableInactiveRelease(_timeout); //启动非活跃超时销毁}conn->Established();//就绪初始化_conns.insert(std::make_pair(_next_id, conn));}void RunAfterInLoop(const Functor &task, int delay){_next_id++;_base_loop.TimerAdd(_next_id, delay, task);}void RemoveConnectionInLoop(const PtrConnection& conn){int id = conn->Getid();auto iter = _conns.find(id);if(iter != _conns.end()){_conns.erase(id);}}void RemoveConnection(const PtrConnection& conn){_base_loop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, conn));}public:TcpServer(int port):_port(port),_next_id(0),_enable_inactive_release(false),_acceptor(&_base_loop, port),_pool(&_base_loop){_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));_acceptor.Listen();   //将监听套接字挂到baseloop上}void SetThreadCount(int count) { return _pool.SetThreadCount(count); }void SetConnectedCallback(const ConnectedCallback&cb) { _connected_callback = cb; }void SetMessageCallback(const MessageCallback&cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback&cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback&cb) { _event_callback = cb; }void EnableInactiveRelease(int timeout) { _timeout = timeout; _enable_inactive_release = true; }//用于添加一个定时任务void RunAfter(const Functor &task, int delay) {_base_loop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, delay));}void Start() { _pool.Create();  _base_loop.Start(); }};

5. Http协议模块

5.1 Util模块

(1)该模块为工具模块,主要提供Http协议模块所用到的一些工具函数,比如Url编码解码,文件读写。

(2)具体功能实现如下:

#pragma once
#include "../server.hpp"
#include <fstream>
#include <ostream>
#include <sys/stat.h>std::unordered_map<int, std::string> _statu_msg = {{100, "Continue"},{101, "Switching Protocol"},{102, "Processing"},{103, "Early Hints"},{200, "OK"},{201, "Created"},{202, "Accepted"},{203, "Non-Authoritative Information"},{204, "No Content"},{205, "Reset Content"},{206, "Partial Content"},{207, "Multi-Status"},{208, "Already Reported"},{226, "IM Used"},{300, "Multiple Choice"},{301, "Moved Permanently"},{302, "Found"},{303, "See Other"},{304, "Not Modified"},{305, "Use Proxy"},{306, "unused"},{307, "Temporary Redirect"},{308, "Permanent Redirect"},{400, "Bad Request"},{401, "Unauthorized"},{402, "Payment Required"},{403, "Forbidden"},{404, "Not Found"},{405, "Method Not Allowed"},{406, "Not Acceptable"},{407, "Proxy Authentication Required"},{408, "Request Timeout"},{409, "Conflict"},{410, "Gone"},{411, "Length Required"},{412, "Precondition Failed"},{413, "Payload Too Large"},{414, "URI Too Long"},{415, "Unsupported Media Type"},{416, "Range Not Satisfiable"},{417, "Expectation Failed"},{418, "I'm a teapot"},{421, "Misdirected Request"},{422, "Unprocessable Entity"},{423, "Locked"},{424, "Failed Dependency"},{425, "Too Early"},{426, "Upgrade Required"},{428, "Precondition Required"},{429, "Too Many Requests"},{431, "Request Header Fields Too Large"},{451, "Unavailable For Legal Reasons"},{501, "Not Implemented"},{502, "Bad Gateway"},{503, "Service Unavailable"},{504, "Gateway Timeout"},{505, "HTTP Version Not Supported"},{506, "Variant Also Negotiates"},{507, "Insufficient Storage"},{508, "Loop Detected"},{510, "Not Extended"},{511, "Network Authentication Required"}
};std::unordered_map<std::string, std::string> _mime_msg = {{".aac", "audio/aac"},{".abw", "application/x-abiword"},{".arc", "application/x-freearc"},{".avi", "video/x-msvideo"},{".azw", "application/vnd.amazon.ebook"},{".bin", "application/octet-stream"},{".bmp", "image/bmp"},{".bz", "application/x-bzip"},{".bz2", "application/x-bzip2"},{".csh", "application/x-csh"},{".css", "text/css"},{".csv", "text/csv"},{".doc", "application/msword"},{".docx", "application/vnd.openxmlformats-officedocument.wordprocessingml.document"},{".eot", "application/vnd.ms-fontobject"},{".epub", "application/epub+zip"},{".gif", "image/gif"},{".htm", "text/html"},{".html", "text/html"},{".ico", "image/vnd.microsoft.icon"},{".ics", "text/calendar"},{".jar", "application/java-archive"},{".jpeg", "image/jpeg"},{".jpg", "image/jpeg"},{".js", "text/javascript"},{".json", "application/json"},{".jsonld", "application/ld+json"},{".mid", "audio/midi"},{".midi", "audio/x-midi"},{".mjs", "text/javascript"},{".mp3", "audio/mpeg"},{".mpeg", "video/mpeg"},{".mpkg", "application/vnd.apple.installer+xml"},{".odp", "application/vnd.oasis.opendocument.presentation"},{".ods", "application/vnd.oasis.opendocument.spreadsheet"},{".odt", "application/vnd.oasis.opendocument.text"},{".oga", "audio/ogg"},{".ogv", "video/ogg"},{".ogx", "application/ogg"},{".otf", "font/otf"},{".png", "image/png"},{".pdf", "application/pdf"},{".ppt", "application/vnd.ms-powerpoint"},{".pptx", "application/vnd.openxmlformats-officedocument.presentationml.presentation"},{".rar", "application/x-rar-compressed"},{".rtf", "application/rtf"},{".sh", "application/x-sh"},{".svg", "image/svg+xml"},{".swf", "application/x-shockwave-flash"},{".tar", "application/x-tar"},{".tif", "image/tiff"},{".tiff", "image/tiff"},{".ttf", "font/ttf"},{".txt", "text/plain"},{".vsd", "application/vnd.visio"},{".wav", "audio/wav"},{".weba", "audio/webm"},{".webm", "video/webm"},{".webp", "image/webp"},{".woff", "font/woff"},{".woff2", "font/woff2"},{".xhtml", "application/xhtml+xml"},{".xls", "application/vnd.ms-excel"},{".xlsx", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"},{".xml", "application/xml"},{".xul", "application/vnd.mozilla.xul+xml"},{".zip", "application/zip"},{".3gp", "video/3gpp"},{".3g2", "video/3gpp2"},{".7z", "application/x-7z-compressed"}
};class Util
{
public:// 字符串分割函数,将src字符串按照sep字符进行分割,得到的各个字串放到arry中,最终返回字串的数量static size_t Split(const std::string &src, const std::string &sep, std::vector<std::string> *arry){size_t offset = 0;// 有10个字符,offset是查找的起始位置,范围应该是0~9,offset==10就代表已经越界了while (offset < src.size()){// 在src字符串偏移量offset处,开始向后查找sep字符/字串,返回查找到的位置size_t pos = src.find(sep, offset);if (pos == std::string::npos) // 没有找到特定的字符{// 将剩余的部分当作一个字串,放入arry中if (pos == src.size()){break;}arry->push_back(src.substr(offset));return arry->size();}if (pos == offset){offset = pos + sep.size();continue; // 当前字串是一个空的,没有内容}arry->push_back(src.substr(offset, pos - offset));offset = pos + sep.size();}return arry->size();}// 读取文件的所有内容,将读取的内容放到一个Buf中static bool ReadFile(const std::string &filename, std::string *buf){std::ifstream out(filename, std::ios::binary);if (out.is_open() == false){LOG(FATAL, "Open ReadFile error");return false;}size_t out_size = 0;out.seekg(0, out.end);  // 跳转读写位置到末尾out_size = out.tellg(); // 获取当前读写位置相对于起始位置的偏移量,从末尾偏移刚好就是文件大小out.seekg(0, out.beg);  // 跳转到起始位置buf->resize(out_size);  // 开辟文件大小的空间out.read(&(*buf)[0], out_size);if (out.good() == false){printf("Read %s File error!!", filename.c_str());out.close();return false;}out.close();return true;}// 向文件写入数据static bool WriteFile(const std::string& filename, std::string& buf){std::ofstream in(filename, std::ios::binary);if (in.is_open() == false){LOG(FATAL, "Open WriteFile error");return false;}in.write(buf.c_str(), buf.size());if (in.good() == false){LOG(FATAL, "Write File error");in.close();return false;}in.close();return true;}// URL编码,避免URL中资源路径与查询字符串中的特殊字符与HTTP请求中特殊字符产生歧义// 编码格式:将特殊字符的ascii值,转换为两个16进制字符,前缀%   C++ -> C%2B%2B// 不编码的特殊字符: RFC3986文档规定 . - _ ~ 字母,数字属于绝对不编码字符// RFC3986文档规定,编码格式 %HH// W3C标准中规定,查询字符串中的空格,需要编码为+, 解码则是+转空格static std::string UrlEncode(const std::string &url, bool convert_space_to_plus){std::string res;for (auto &ch : url){if (ch == '.' || ch == '-' || ch == '_' || ch == '~' || isalnum(ch)){res += ch;continue;}if (ch == ' ' && convert_space_to_plus == true){res += '+';continue;}// 剩下的字符都是需要编码成为 %HH 格式char tmp[4] = {0};// snprintf 与 printf比较类似,都是格式化字符串,只不过一个是打印,一个是放到一块空间中snprintf(tmp, 4, "%%%02X", ch);res += tmp;}return res;}static char HEXTOI(char c){if (c >= '0' && c <= '9'){return c - '0';}else if (c >= 'a' && c <= 'z'){return c - 'a' + 10;}else if (c >= 'A' && c <= 'Z'){return c - 'A' + 10;}return -1;}// Url解码static std::string UrlDecode(const std::string &url, bool convert_plus_to_space){std::string res;for (int i = 0; i < url.size(); i++){if (url[i] == '+' && convert_plus_to_space == true){res += ' ';continue;}if (url[i] == '%' && (i + 2) < url.size()){char v1 = HEXTOI(url[i + 1]);char v2 = HEXTOI(url[i + 2]);char v = v1 * 16 + v2;res += v;i += 2;continue;}res += url[i];}return res;}// 响应状态码的描述信息获取static std::string StatuDesc(int statu){auto it = _statu_msg.find(statu);if (it != _statu_msg.end()){return it->second;}return "Unknow";}// 根据文件后缀名获取文件mimestatic std::string ExtMime(const std::string& filename){// a.b.txt  先获取文件扩展名size_t pos = filename.find_last_of('.');if(pos == std::string::npos){return "application/octet-stream";}// 根据扩展名,获取mimestd::string ext = filename.substr(pos);auto it = _mime_msg.find(ext);if(it == _mime_msg.end()){return "application/octet-stream";}return it->second;}// 判断一个文件是否是一个目录static bool IsDirectory(const std::string &filename){struct stat st;int ret = stat(filename.c_str(), &st);if(ret < 0){return false;}return S_ISDIR(st.st_mode);}// 判断一个文件是否是一个普通文件static bool IsRegular(const std::string &filename){struct stat st;int ret = stat(filename.c_str(), &st);if(ret < 0){return false;}return S_ISREG(st.st_mode);}//http请求的资源路径有效性判断// /index.html  --- 前边的/叫做相对根目录  映射的是某个服务器上的子目录// 想表达的意思就是,客户端只能请求相对根目录中的资源,其他地方的资源都不予理会// /../login, 这个路径中的..会让路径的查找跑到相对根目录之外,这是不合理的,不安全的static bool ValidPath(const std::string &path){//思想:按照/进行路径分割,根据有多少子目录,计算目录深度,有多少层,深度不能小于0std::vector<std::string> subdir;Split(path, "/", &subdir);int level = 0;for(auto& dir : subdir){if(dir == ".."){level--; // 任意一层走出相对根目录,就认为有问题if(level < 0){return false;}continue;}level++;}return true;}
};

5.2 HttpRequest模块

(1)这个模块是Http请求数据模块,用于保存Http请求数据被解析后的各项请求元素信息。具体功能如下图:


(2)具体实现如下:

#pragma once
#include <iostream>
#include <regex>
#include <unordered_map>class HttpRequest
{
public:std::string _method;      //请求方法std::string _path;        //资源路径std::string _version;     //协议版本std::string _body;        //请求正文std::smatch _matches;     //资源路径的正则提取数据std::unordered_map<std::string, std::string> _headers;  //头部字段std::unordered_map<std::string, std::string> _params;   //查询字符串public:HttpRequest():_version("HTTP/1.1"){}void ReSet(){_method.clear();_path.clear();_version = "HTTP/1.1";_body.clear();std::smatch match;_matches.swap(match);_headers.clear();_params.clear();}//插入头部字段bool SetHeader(const std::string& key, const std::string& val){_headers.insert(std::make_pair(key, val));}bool HasHeader(const std::string& key) const{auto iter = _headers.find(key);if(iter == _headers.end()){return false;}return true;}std::string GetHeader(const std::string& key) const{auto iter = _headers.find(key);if(iter == _headers.end()){return "";}return iter->second;}//插入查询字符串void SetParam(const std::string& key, const std::string& val) {_params.insert(std::make_pair(key, val));}bool HasParam(const std::string& key) const{auto it = _params.find(key);if (it == _params.end()){return false;}return true;}// 获取指定的查询字符串std::string GetParam(const std::string& key) const{auto it = _params.find(key);if (it == _params.end()){return "";}return it->second;}//获取正文长度size_t ContentLength() const{bool ret = HasHeader("Content-Length");if(ret == false){return 0;}std::string clen = GetHeader("Content-Length");return std::stol(clen);}// 判断是否是短链接bool Close() const{// 没有Connection字段,或者有Connection但是值是close,则都是短链接,否则就是长连接if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive"){return false;}return true;}
};

5.3 HttpResponse模块

(1)这个模块是Http响应数据块,用于业务处理后设置并保存Http响应数据的各项元素信息,最终会被按照Http协议响应格式组织成为响应信息发送给客户端。具体功能如下图:


(2)具体实现如下:

#pragma once
#include <iostream>
#include <unordered_map>class HttpResponse
{
public:int _statu;bool _redirect_flag;std::string _body;std::string _redirect_url;std::unordered_map<std::string, std::string> _headers;public:HttpResponse():_redirect_flag(false),_statu(200){}HttpResponse(int statu):_redirect_flag(false),_statu(statu) {} void Reset(){_statu = 200;_redirect_flag = false;_body.clear();_redirect_url.clear();_headers.clear();}// 插入头部字段void SetHeader(const std::string& key, const std::string& val){_headers.insert(std::make_pair(key, val));}// 判断是否存在指定头部字段bool HasHeader(const std::string& key){auto it = _headers.find(key);if (it == _headers.end()){return false;}return true;}// 获取指定头部字段的值std::string GetHeader(const std::string& key){auto it = _headers.find(key);if (it == _headers.end()){return "";}return it->second;}void SetContent(const std::string& body, const std::string& type = "text.html"){_body = body;SetHeader("Content-Type", type);}void SetRedirect(const std::string& url, int statu = 302){_statu = statu;_redirect_flag = true;_redirect_url = url;}// 判断是否是短链接bool Close(){// 没有Connection字段,或者有Connection但是值是close,则都是短链接,否则就是长连接if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive"){return false;}return true;}
};

5.4 HttpContext模块

(1)该模块是一个Http请求接收的上下文模块,主要是为了防止在一次接收的数据中,不是一个完整的Http请求,需要在下次接收到新数据后继续根据上下文进行解析,最终得到一个HttpRequest请求信息对象,因此在请求数据的接收以及解析部分需要一个上下文来进行控制接收和处理节奏。

(2)正则库的简单概述:
注意在正则表达式的相关函数要在gcc版本较高的版本下才能正常运行,否则会出现编译成功,但是运行失败的情况。

(3)具体实现如下:

#pragma once
#include <iostream>
#include "HttpRequest.hpp"
#include "Util.hpp"
#include "../Buffer.hpp"typedef enum {RECV_HTTP_ERROR,RECV_HTTP_LINE,RECV_HTTP_HEAD,RECV_HTTP_BODY,RECV_HTTP_OVER
}HttpRecvStatu;#define MAX_LINE 8192class HttpContext
{
private:int _resp_statu; //响应状态码HttpRecvStatu _recv_statu; //当前接收及解析的阶段状态HttpRequest _request;  //已经解析得到的请求信息private:bool ParseHttpLine(const std::string& line) {std::smatch matches;std::regex e("(GET|HEAD|POST|PUT|DELETE) ([^?]*)(?:\\?(.*))? (HTTP/1\\.[01])(?:\n|\r\n)?", std::regex::icase);bool ret = std::regex_match(line, matches, e);if(ret == false){_recv_statu = RECV_HTTP_ERROR;_resp_statu = 400; // BAD REQUESTreturn false;}//0 : GET /baidu/login?user=xiaoming&pass=123123 HTTP/1.1//1 : GET//2 : /bitejiuyeke/login//3 : user=xiaoming&pass=123123//4 : HTTP/1.1//请求方法的获取_request._method = matches[1];std::transform(_request._method.begin(), _request._method.end(), _request._method.begin(), ::toupper);//资源路径的获取,需要进行URL解码操作,但是不需要+转空格_request._path = Util::UrlEncode(matches[2], false);//协议版本的获取_request._version = matches[4];//查询字符串的获取与处理std::vector<std::string> query_string_arry;std::string query_string = matches[3];Util::Split(query_string, "&", &query_string_arry);//针对各个字串,以 = 符号进行分割,得到key 和val, 得到之后也需要进行URL解码for(auto& ch : query_string_arry){size_t pos = ch.find("=");if(pos == std::string::npos){_recv_statu = RECV_HTTP_ERROR;_resp_statu = 400;   //BAD REQUESTreturn false;}std::string key = Util::UrlDecode(ch.substr(0, pos), true);  std::string val = Util::UrlDecode(ch.substr(pos + 1), true);_request.SetParam(key, val);}return true;}bool RecvHttpLine(Buffer* buf){if (_recv_statu != RECV_HTTP_LINE){return false;}//1. 获取一行数据,带有末尾的换行std::string line = buf->GetLineAndPop();//2. 需要考虑的一些要素:缓冲区中的数据不足一行, 获取的一行数据超大if(line.size() == 0){// 缓冲区中的数据不足一行,则需要判断缓冲区的可读数据长度,如果很长了都不足一行,这是有问题的if(buf->ReadAbleSize() > MAX_LINE){_recv_statu = RECV_HTTP_ERROR;_resp_statu = 414; // URI TOO LONGreturn false;}// 缓冲区中数据不足一行,但是也不多,就等等新数据的到来return true;}if(line.size() > MAX_LINE){_recv_statu = RECV_HTTP_ERROR;_resp_statu = 414;//URI TOO LONGreturn false;}bool ret = ParseHttpLine(line);if(ret == false) {return false;}//首行处理完毕,进入头部获取阶段_recv_statu = RECV_HTTP_HEAD;return true;}bool RecvHttpHead(Buffer* buf){if(_recv_statu != RECV_HTTP_HEAD){return false;}while(1){std::string line = buf->GetLineAndPop();if(line.size() == 0){// 缓冲区中的数据不足一行,则需要判断缓冲区的可读数据长度,如果很长了都不足一行,这是有问题的if(buf->ReadAbleSize() > MAX_LINE){_recv_statu = RECV_HTTP_ERROR;_resp_statu = 414; // URI TOO LONGreturn false;}// 缓冲区中数据不足一行,但是也不多,就等等新数据的到来return true;}if(line.size() > MAX_LINE){_recv_statu = RECV_HTTP_ERROR;_resp_statu = 414;//URI TOO LONGreturn false;}if(line == "\n" || line == "\r\n") {break;}bool ret = ParseHttpHead(line);if(ret == false) {return false;}}//头部处理完毕,进入正文获取阶段_recv_statu = RECV_HTTP_BODY;return true;}bool ParseHttpHead(std::string& line){if(line.back() == '\n') line.pop_back();if(line.back() == '\r') line.pop_back();auto pos = line.find(":");if(pos == std::string::npos){_recv_statu = RECV_HTTP_ERROR;_resp_statu = 400; //return false;}std::string key = line.substr(0, pos);std::string val = line.substr(pos + 2);_request.SetHeader(key, val);return true;}bool RecvHttpBody(Buffer* buf){if(_recv_statu != RECV_HTTP_BODY){return false;}//1. 获取正文长度size_t content_length = _request.ContentLength();if(content_length == 0) {//没有正文,则请求接收解析完毕_recv_statu = RECV_HTTP_OVER;return true;}//2. 当前已经接收了多少正文,其实就是往  _request._body 中放了多少数据了size_t real_len = content_length - _request._body.size();//实际还需要接收的正文长度//3. 接收正文放到body中,但是也要考虑当前缓冲区中的数据,是否是全部的正文//  3.1 缓冲区中数据,包含了当前请求的所有正文,则取出所需的数据if(buf->ReadAbleSize() >= real_len){_request._body.append(buf->ReadPosition(), real_len);buf->MoveRead(real_len);_recv_statu = RECV_HTTP_OVER;return true;}//  3.2 缓冲区中数据,无法满足当前正文的需要,数据不足,取出数据,然后等待新数据到来_request._body.append(buf->ReadPosition(), buf->ReadAbleSize());buf->MoveRead(buf->ReadAbleSize());return true;}public:HttpContext():_resp_statu(200),_recv_statu(RECV_HTTP_LINE){}void ReSet(){_resp_statu = 200;_recv_statu = RECV_HTTP_LINE;_request.ReSet();}int RespStatu() { return _resp_statu; }HttpRecvStatu RecvStatu() { return _recv_statu; }HttpRequest& Request() { return _request; }//接收并解析HTTP请求void RecvHttpRequest(Buffer* buf){//不同的状态,做不同的事情,但是这里不要break//因为处理完请求行后,应该立即处理头部,而不是退出等新数据switch (_recv_statu){case RECV_HTTP_LINE: RecvHttpLine(buf);case RECV_HTTP_HEAD: RecvHttpHead(buf);case RECV_HTTP_BODY: RecvHttpBody(buf);}}
};

5.5 HttpServer模块

(1)主要目标:最终给组件使用者提供的Http服务器模块,用于以简单的接口实现Http服务器的搭建。

  1. HttpServer模块内部包含有⼀个TcpServer对象:TcpServer对象实现服务器的搭建 。
  2. HttpServer模块内部包含有两个提供给TcpServer对象的接口:连接建⽴成功设置上下⽂接口,数据处理接口。
  3. HttpServer模块内部包含有⼀个hash-map表存储请求与处理函数的映射表:组件使⽤者向HttpServer设置哪些请求应该使⽤哪些函数进⾏处理,等TcpServer收到对应的请求就会使⽤对应的函数进⾏处理。

(2)具体实现如下:

#pragma once
#include "HttpRequest.hpp"
#include "HttpResponse.hpp"
#include "../server.hpp"
#include "HttpContext.hpp"class HttpServer
{
private:using Handler = std::function<void(const HttpRequest &, HttpResponse *)>;using Handlers = std::vector<std::pair<std::regex, Handler>>;Handlers _get_route;Handlers _post_route;Handlers _put_route;Handlers _delete_route;std::string _basedir; // 静态资源根目录TcpServer _server;private:void ErrorHandler(const HttpRequest &req, HttpResponse *rsp){// 1. 组织一个错误展示页面std::string body;body += "<html>";body += "<head>";body += "<meta http-equiv='Content-Type' content='text/html;charset=utf-8'>";body += "</head>";body += "<body>";body += "<h1>";body += std::to_string(rsp->_statu);body += " ";body += Util::StatuDesc(rsp->_statu);body += "</h1>";body += "</body>";body += "</html>";// 2. 将页面数据,当作响应正文,放入rsp中rsp->SetContent(body, "text/html");}// 将HttpResponse中的要素按照http协议格式进行组织,发送void WriteReponse(const PtrConnection &conn, const HttpRequest &req, HttpResponse &rsp){// 1. 先完善头部字段if (req.Close() == true){rsp.SetHeader("Connection", "close");}else{rsp.SetHeader("Connection", "keep-alive");}if (rsp._body.empty() == false && rsp.HasHeader("Content-Length") == false){rsp.SetHeader("Content-Length", std::to_string(rsp._body.size()));}if (rsp._body.empty() == false && rsp.HasHeader("Content-Type") == false){rsp.SetHeader("Content-Type", "application/octet-stream");}if (rsp._redirect_flag == true){rsp.SetHeader("Location", rsp._redirect_url);}// 2. 将rsp中的要素,按照http协议格式进行组织std::stringstream rsp_str;rsp_str << req._version << " " << std::to_string(rsp._statu) << " " << Util::StatuDesc(rsp._statu) << "\r\n";for (auto &head : rsp._headers){rsp_str << head.first << ": " << head.second << "\r\n";}rsp_str << "\r\n";rsp_str << rsp._body;// 3. 发送数据conn->Send(rsp_str.str().c_str(), rsp_str.str().size());}bool IsFileHandler(const HttpRequest &req){// 1. 必须设置了静态资源根目录if (_basedir.empty()){return false;}// 2. 请求方法,必须是GET / HEAD请求方法if (req._method != "GET" && req._method != "HEAD"){return false;}// 3. 请求的资源路径必须是一个合法路径if (Util::ValidPath(req._path) == false) {return false;}std::string req_path = _basedir + req._path; // 为了避免直接修改请求的资源路径,因此定义一个临时对象if (req._path.back() == '/'){req_path += "index.html";}if (Util::IsRegular(req_path) == false){return false;}return true;}//静态资源的请求处理 --- 将静态资源文件的数据读取出来,放到rsp的_body中, 并设置mimevoid FileHandler(const HttpRequest &req, HttpResponse *rsp){std::string req_path = _basedir + req._path;if (req._path.back() == '/')  {req_path += "index.html";}bool ret = Util::ReadFile(req_path, &rsp->_body);if (ret == false){return;}std::string mime = Util::ExtMime(req_path);rsp->SetHeader("Content-Type", mime);}//功能性请求的分类处理void Dispatcher(HttpRequest &req, HttpResponse *rsp, Handlers &handlers){//在对应请求方法的路由表中,查找是否含有对应资源请求的处理函数,有则调用,没有则发挥404//思想:路由表存储的时键值对 -- 正则表达式 & 处理函数//使用正则表达式,对请求的资源路径进行正则匹配,匹配成功就使用对应函数进行处理//  /numbers/(\d+)       /numbers/12345for(auto& handler : handlers){const std::regex &re = handler.first;const Handler &functor = handler.second;bool ret = std::regex_match(req._path, req._matches, re);if (ret == false){continue;}return functor(req, rsp); // 传入请求信息,和空的rsp,执行处理函数}rsp->_statu = 404;}void Route(HttpRequest &req, HttpResponse *rsp) {//1. 对请求进行分辨,是一个静态资源请求,还是一个功能性请求//   静态资源请求,则进行静态资源的处理//   功能性请求,则需要通过几个请求路由表来确定是否有处理函数//   既不是静态资源请求,也没有设置对应的功能性请求处理函数,就返回405if (IsFileHandler(req) == true) {//是一个静态资源请求, 则进行静态资源请求的处理return FileHandler(req, rsp);}if(req._method == "GET" || req._method == "HEAD"){return Dispatcher(req, rsp, _get_route);}else if(req._method == "POST"){return Dispatcher(req, rsp, _post_route);}else if(req._method == "PUT"){return Dispatcher(req, rsp, _put_route);}else if(req._method == "DELETE"){return Dispatcher(req, rsp, _delete_route);}rsp->_statu = 405;// Method Not Allowed}// 设置上下文void OnConnected(const PtrConnection &conn){conn->SetContext(HttpContext());lg(Info, "NEW CONNECTION %p", conn.get());}//缓冲区数据解析+处理void OnMessage(const PtrConnection &conn, Buffer *buffer) {while(buffer->ReadAbleSize() > 0){// 1. 获取上下文HttpContext *context = conn->GetContext()->get<HttpContext>();// 2. 通过上下文对缓冲区数据进行解析,得到HttpRequest对象//   1. 如果缓冲区的数据解析出错,就直接回复出错响应//   2. 如果解析正常,且请求已经获取完毕,才开始去进行处理context->RecvHttpRequest(buffer);HttpRequest &req = context->Request();HttpResponse rsp(context->RespStatu());if(context->RespStatu() >= 400){//进行错误响应,关闭连接ErrorHandler(req, &rsp);//填充一个错误显示页面数据到rsp中WriteReponse(conn, req, rsp);//组织响应发送给客户端context->ReSet();buffer->MoveRead(buffer->ReadAbleSize());//出错了就把缓冲区数据清空conn->Shutdown();//关闭连接return;}if (context->RecvStatu() != RECV_HTTP_OVER){// 当前请求还没有接收完整,则退出,等新数据到来再重新继续处理return;}// 3. 请求路由 + 业务处理Route(req, &rsp);// 4. 对HttpResponse进行组织发送WriteReponse(conn, req, rsp);// 5. 重置上下文context->ReSet();// 6. 根据长短连接判断是否关闭连接或者继续处理if (rsp.Close() == true){conn->Shutdown(); // 短链接则直接关闭v}}}public:HttpServer(int port, int timeout = DEFALT_TIMEOUT):_server(port){_server.EnableInactiveRelease(timeout);_server.SetConnectedCallback(std::bind(&HttpServer::OnConnected, this, std::placeholders::_1));_server.SetMessageCallback(std::bind(&HttpServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2));}void SetBaseDir(const std::string &path){assert(Util::IsDirectory(path) == true);_basedir = path;}/*设置/添加,请求(请求的正则表达)与处理函数的映射关系*/void Get(const std::string &pattern, const Handler &handler){_get_route.push_back(std::make_pair(std::regex(pattern), handler));}void Post(const std::string &pattern, const Handler &handler){_post_route.push_back(std::make_pair(std::regex(pattern), handler));}void Put(const std::string &pattern, const Handler &handler){_put_route.push_back(std::make_pair(std::regex(pattern), handler));}void Delete(const std::string &pattern, const Handler &handler){_delete_route.push_back(std::make_pair(std::regex(pattern), handler));}void SetThreadCount(int count){_server.SetThreadCount(count);}void Listen(){_server.Start();}
};

6. 对服务器整体测试

服务器运行代码:

#include "http.hpp"#define WWWROOT "./wwwroot/"std::string RequestStr(const HttpRequest &req) {std::stringstream ss;ss << req._method << " " << req._path << " " << req._version << "\r\n";for (auto &it : req._params) {ss << it.first << ": " << it.second << "\r\n";}for (auto &it : req._headers) {ss << it.first << ": " << it.second << "\r\n";}ss << "\r\n";ss << req._body;return ss.str();
}void Hello(const HttpRequest &req, HttpResponse *rsp) 
{rsp->SetContent(RequestStr(req), "text/plain");
}void Login(const HttpRequest &req, HttpResponse *rsp) 
{rsp->SetContent(RequestStr(req), "text/plain");
}
void PutFile(const HttpRequest &req, HttpResponse *rsp) 
{std::string pathname = WWWROOT + req._path;Util::WriteFile(pathname, req._body);
}void DelFile(const HttpRequest &req, HttpResponse *rsp) 
{rsp->SetContent(RequestStr(req), "text/plain");
}
int main()
{HttpServer server(8085);server.SetThreadCount(3);server.SetBaseDir(WWWROOT);//设置静态资源根目录,告诉服务器有静态资源请求到来,需要到哪里去找资源文件server.Get("/hello", Hello);server.Post("/login", Login);server.Put("/1234.txt", PutFile);server.Delete("/1234.txt", DelFile);server.Listen();return 0;
}

6.1 长连接测试

(1)客户端代码:

/*长连接测试1:创建一个客户端持续给服务器发送数据,直到超过超时时间看看是否正常*/
#include "../source/server.hpp"int main()
{Socket cli_sock;cli_sock.CreateClient(8080, "127.0.0.1");std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";while(1) {assert(cli_sock.Send(req.c_str(), req.size()) != -1);char buf[1024] = {0};assert(cli_sock.Recv(buf, 1023));DBG_LOG("[%s]", buf);sleep(3);}cli_sock.Close();return 0;
}

(2)运行结果:

6.2 超时连接测试

(1)客户端代码:

/*超时连接测试1:创建一个客户端,给服务器发送一次数据后,不动了,查看服务器是否会正常的超时关闭连接*/#include "../source/server.hpp"int main()
{Socket cli_sock;cli_sock.CreateClient(8080, "127.0.0.1");std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";while(1) {assert(cli_sock.Send(req.c_str(), req.size()) != -1);char buf[1024] = {0};assert(cli_sock.Recv(buf, 1023));DBG_LOG("[%s]", buf);sleep(15);}cli_sock.Close();return 0;
}

(2)运行结果:

6.3 错误请求测试

(1)客户端代码:

/*给服务器发送一个数据,告诉服务器要发送1024字节的数据,但是实际发送的数据不足1024,查看服务器处理结果*/
/*1. 如果数据只发送一次,服务器将得不到完整请求,就不会进行业务处理,客户端也就得不到响应,最终超时关闭连接2. 连着给服务器发送了多次 小的请求,  服务器会将后边的请求当作前边请求的正文进行处理,而后便处理的时候有可能就会因为处理错误而关闭连接
*/#include "../source/server.hpp"int main()
{Socket cli_sock;cli_sock.CreateClient(8080, "127.0.0.1");std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 100\r\n\r\nbitejiuyeke";while(1) {assert(cli_sock.Send(req.c_str(), req.size()) != -1);assert(cli_sock.Send(req.c_str(), req.size()) != -1);assert(cli_sock.Send(req.c_str(), req.size()) != -1);char buf[1024] = {0};assert(cli_sock.Recv(buf, 1023));DBG_LOG("[%s]", buf);sleep(3);}cli_sock.Close();return 0;
}

(2)运行结果:

6.4 业务处理超时测试

(1)客户端代码:

/* 业务处理超时,查看服务器的处理情况当服务器达到了一个性能瓶颈,在一次业务处理中花费了太长的时间(超过了服务器设置的非活跃超时时间)1. 在一次业务处理中耗费太长时间,导致其他的连接也被连累超时,其他的连接有可能会被拖累超时释放假设现在  12345描述符就绪了, 在处理1的时候花费了30s处理完,超时了,导致2345描述符因为长时间没有刷新活跃度1. 如果接下来的2345描述符都是通信连接描述符,如果都就绪了,则并不影响,因为接下来就会进行处理并刷新活跃度2. 如果接下来的2号描述符是定时器事件描述符,定时器触发超时,执行定时任务,就会将345描述符给释放掉这时候一旦345描述符对应的连接被释放,接下来在处理345事件的时候就会导致程序崩溃(内存访问错误)因此这时候,在本次事件处理中,并不能直接对连接进行释放,而应该将释放操作压入到任务池中,等到事件处理完了执行任务池中的任务的时候,再去释放
*/#include "../source/server.hpp"int main()
{signal(SIGCHLD, SIG_IGN);for (int i = 0; i < 10; i++) {pid_t pid = fork();if (pid < 0) {DBG_LOG("FORK ERROR");return -1;}else if (pid == 0) {Socket cli_sock;cli_sock.CreateClient(8080, "127.0.0.1");std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";while(1) {assert(cli_sock.Send(req.c_str(), req.size()) != -1);char buf[1024] = {0};assert(cli_sock.Recv(buf, 1023));DBG_LOG("[%s]", buf);}cli_sock.Close();exit(0);}}while(1) sleep(1);return 0;
}

(2)服务端修改代码:

(3)运行结果:

6.5 同时多条请求测试

(1)客户端代码:

/*一次性给服务器发送多条数据,然后查看服务器的处理结果*/
/*每一条请求都应该得到正常处理*/#include "../source/server.hpp"int main()
{Socket cli_sock;cli_sock.CreateClient(8080, "127.0.0.1");std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";req += "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";req += "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";while(1) {assert(cli_sock.Send(req.c_str(), req.size()) != -1);char buf[1024] = {0};assert(cli_sock.Recv(buf, 1023));DBG_LOG("[%s]", buf);sleep(3);}cli_sock.Close();return 0;
}

(2)运行结果:

6.6 大文件传输测试

(1)客户端代码:

/*大文件传输测试,给服务器上传一个大文件,服务器将文件保存下来,观察处理结果*/
/*上传的文件,和服务器保存的文件一致
*/
#include "../source/http/http.hpp"int main()
{Socket cli_sock;cli_sock.CreateClient(8080, "127.0.0.1");std::string req = "PUT /1234.txt HTTP/1.1\r\nConnection: keep-alive\r\n";std::string body;Util::ReadFile("./hello.txt", &body);req += "Content-Length: " + std::to_string(body.size()) + "\r\n\r\n";assert(cli_sock.Send(req.c_str(), req.size()) != -1);assert(cli_sock.Send(body.c_str(), body.size()) != -1);char buf[1024] = {0};assert(cli_sock.Recv(buf, 1023));DBG_LOG("[%s]", buf);sleep(3);cli_sock.Close();return 0;
}

(2)运行结果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/146916.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

AI健身之俯卧撑计数和姿态矫正-角度估计

在本项目中&#xff0c;实现了Yolov7-Pose用于人体姿态估计。以下是如何在Windows 11操作系统上设置和运行该项目的详细步骤。 环境准备 首先&#xff0c;确保您的计算机已经安装了Anaconda。Anaconda是一个开源的Python发行版本&#xff0c;它包含了conda、Python以及众多科…

Python基于TensorFlow实现时间序列循环神经网络回归模型(LSTM时间序列回归算法)项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。 1.项目背景 随着信息技术的发展和传感器设备的广泛应用&#xff0c;时间序列数据的产生量急剧增加。无论是股市价格…

Windows本地连接远程服务器并创建新用户详细记录

前提可知&#xff1a; &#xff08;1&#xff09;服务器IP地址&#xff1a;x.x.x.x &#xff08;2&#xff09;服务器名称&#xff1a;root&#xff08;一般默认为root&#xff0c;当然也有别的名称&#xff09; &#xff08;3&#xff09;服务器登陆密码&#xff1a;**** 一、…

优化下载性能:使用Python多线程与异步并发提升下载效率

文章目录 📖 介绍 📖🏡 演示环境 🏡📒 文章内容 📒📝 普通请求下载📝 使用多线程加速下载📝 使用异步编程加速下载📝 总结 📝⚓️ 相关链接 ⚓️📖 介绍 📖 你是否因为下载速度慢而感到焦虑?特别是在下载大型文件时,等待进度条慢慢移动的感觉真的很…

西圣、吉玛仕、绿联电容笔好不好用?热门平替电容笔超真实测评!

电容笔在数字化学习与办公环境中扮演着举足轻重的角色&#xff0c;它不仅是绘写的基本工具&#xff0c;更是提高创造效率的重要手段。随着平替电容笔的市场不断扩大&#xff0c;涌现了很多品牌&#xff0c;使得很多消费者不知道如何选择。此外&#xff0c;还有掺杂了一些性能不…

浅谈Spring Cloud:OpenFeign

RestTemplate 方式调用存在的问题&#xff1a; String url "http://userservice/user/" order.getUserId(); User user restTemplate.getForObject(url, User.class); 这是通过URL地址来访问的。但是&#xff1a; 代码可读性差&#xff0c;编程体验不统一参数复…

CSGHub开源版本v0.9.0更新

CSGHub开源版本v0.9.0更新现已发布&#xff01; 00 重大更新&#x1f50a;&#x1f50a;&#x1f50a; golang 重写 Rails 服务端API git server增加gitaly的支持&#xff0c;且新版本默认使用 gitaly 本地运行应用空间、推理、微调不再需要域名 01 代码仓库&#xff08;模型…

在线骑行网站设计与实现

摘 要 传统办法管理信息首先需要花费的时间比较多&#xff0c;其次数据出错率比较高&#xff0c;而且对错误的数据进行更改也比较困难&#xff0c;最后&#xff0c;检索数据费事费力。因此&#xff0c;在计算机上安装在线骑行网站软件来发挥其高效地信息处理的作用&#xff0c…

灾备技术演进之路 | 虚拟化无代理备份只能挂载验证和容灾吗?只能无代理恢复吗?且看科力锐升级方案

灾备技术演进之路系列 虚拟化备份技术演进 摆脱束缚&#xff0c;加速前行 无代理备份仅能挂载/恢复验证吗&#xff1f; ——科力锐极简验证演练无代理备份来了 无代理备份无法应对平台级故障吗&#xff1f; ——科力锐应急接管无代理备份来了 无代理备份仅能同平台挂载吗&a…

Java反序列化利用链篇 | URLDNS链

文章目录 URLDNS链调用链分析Payload编写 系列篇其他文章&#xff0c;推荐顺序观看~ Java反序列化利用链篇 | JdbcRowSetImpl利用链分析Java反序列化利用链篇 | CC1链_全网最菜的分析思路Java反序列化利用链篇 | CC1链的第二种方式-LazyMap版调用链Java反序列化利用链篇 | URLD…

thinkphp 做分布式服务+读写分离+分库分表(分区)(后续接着写)

thinkphp 做分布式服务读写分离分库分表&#xff08;分区&#xff09; 引言 thinkphp* 大道至简一、分库分表分表php 分库分表hash算法0、分表的方法&#xff08;thinkphp&#xff09;1、ThinkPHP6 业务分表之一&#xff1a;UID 发号器2、ThinkPHP6 业务分表之二&#xff1a;用…

【数据结构与算法 | 灵神题单 | 二叉搜索树篇】力扣653

1. 力扣653&#xff1a;两数之和IV - 输入二叉搜索树 1.1 题目&#xff1a; 给定一个二叉搜索树 root 和一个目标结果 k&#xff0c;如果二叉搜索树中存在两个元素且它们的和等于给定的目标结果&#xff0c;则返回 true。 示例 1&#xff1a; 输入: root [5,3,6,2,4,null,7…

伊犁云计算22-1 raid 5 linux 配置

&#xff11;  添加四块&#xff53;&#xff41;&#xff54;&#xff41; 硬盘  &#xff12;  设置启动项为原来&#xff53;&#xff43;&#xff53;&#xff49; 的硬盘 &#xff13;  四块盘都是  &#xff46;&#xff44;   &#xff4c;&#xff49;&…

用 HTML + JavaScript DIY 一个渐进式延迟法定退休年龄测算器

为减轻社会和个人因退休年龄变化带来的冲击&#xff0c;近日&#xff0c;全国人民代表大会常务委员会正式发布了关于实施渐进式延迟法定退休年龄的重要决定。 根据该决定&#xff0c;我国将同步启动对男、女职工法定退休年龄的延迟计划。这一调整将采取渐进式的方式进行&#…

概率论与数理统计(2)

第一节博客已经整理了求导的公式&#xff0c;一些常用的概念。链接如下&#xff1a;高等数学基础&#xff08;1&#xff09;-CSDN博客。 第二节博客整理了微积分的公式及其相关概念。链接如下&#xff1a;高等数学基础&#xff08;2&#xff09;——微积分-CSDN博客 第三节博客…

Java:Clonable 接口和拷贝

一 Clonable 接口 在 Java SE 中&#xff0c;Cloneable 是一个标记接口&#xff08;Marker Interface&#xff09;&#xff0c;它位于 java.lang 包中。这个接口的主要目的是标识实现该接口的类能够被合法地克隆&#xff08;即可以调用 Object 类中的 clone() 方法&#xff09…

重生之我们在ES顶端相遇第14 章 - ES 节点类型

文章目录 前言Coordinating nodeMaster-eligible nodeData nodeCoordinating only nodeRemote-eligible nodeMachine learning node 前言 通过前面的学习&#xff0c;我们已经初步的掌握了 ES 的大部分用法。 后面的篇章会介绍 ES 集群相关的内容。 本文着重介绍 ES 节点类型&…

vue3-05-Element-plus中表单校验:校验对象中的对象的属性,校验对象中的数组中的对象的属性,校验嵌套对象

目录 一、校验对象中的普通属性二、校验对象中对象的属性三、校验对象中的数组中的对象的属性 这两天写vue3项目&#xff0c;用了element-plus库&#xff0c;到了表单规则验证的环节&#xff0c;我发现我只会校验对象中的普通属性&#xff0c;如果校验嵌套对象&#xff0c;我就…

Java笔试面试题AI答之设计模式(2)

文章目录 6. 什么是单例模式&#xff0c;以及他解决的问题&#xff0c;应用的环境 &#xff1f;解决的问题应用的环境实现方式 7. 什么是工厂模式&#xff0c;以及他解决的问题&#xff0c;应用的环境 &#xff1f;工厂模式简述工厂模式解决的问题工厂模式的应用环境工厂模式的…

React组件如何暴露自身的方法

一、研究背景 最近遇到一个如何暴露React组件自身方法的问题。在某些时候&#xff0c;我们需要调用某个组件内部的方法以实现某个功能&#xff0c;因此我们需要了解如何暴露组件内部API的方法。 二、实践过程 本文主要介绍React组件暴露子组件API的方法&#xff0c;以下是实…