项目第五弹:队列消息管理模块

项目第五弹:队列消息管理模块

  • 一、消息如何组织并管理
    • 1.消息结构体
    • 2.消息持久化管理模块设计
      • 1.数据消息文件名
      • 2.临时消息文件名
      • 3.对外接口与包含成员
  • 二、自定义应用层协议解决文件读写的粘包问题
    • 1.Length-Value协议
  • 三、队列消息管理模块设计
    • 1.待确认消息哈希表
    • 2.待推送消息链表
      • 1.为何不选哈希表
      • 2.vector与list
      • 3.queue和list
      • 4.deque和list
    • 3. 持久化消息哈希表
    • 4.对外接口与包含成员
  • 四、总体消息管理模块设计
  • 五、消息持久化管理模块实现
    • 1.构造函数
    • 2.删除文件
    • 3.插入消息
      • 1.有效载荷序列化
      • 2.拿到文件大小和有效载荷长度
      • 3.把len写入文件,写sizeof(size_t)个字节
      • 4.把有效载荷写入文件
      • 5.设置mp的offset和len这两个字段
      • 6._insert完整代码
    • 4.删除消息
      • 1.将有效标记位置为无效
      • 2.序列化
      • 3.拿到偏移量并检查长度是否相等
      • 4.将修改后的mp写到offset位置,写len个长度
      • 5.erase完整代码
    • 5.gc垃圾回收
      • 1.加载有效信息
        • 1.循环框架的构建
        • 2.读取长度
        • 3.读取有效载荷
        • 4.反序列化拿到有效载荷
        • 5.看是否有效,进行写入
        • 读取数据文件完整代码
      • 2.删除数据文件,重命名临时文件
      • 3.完整代码
    • 6.完整代码
    • 7.是否需要加锁???
  • 六、队列消息管理模块实现
    • 1.成员与构造函数
    • 2.恢复历史消息
    • 3.发布消息
      • 1.函数签名
      • 2.步骤
        • 0.要不要遍历消息链表
        • 1.构建消息智能指针
        • 2.持久化
        • 3.放到待推送链表当中
      • 3.完整代码
    • 4.确认消息
      • 1.函数签名
      • 2.查找是否存在
        • 1.在待确认哈希表当中查找
        • 2.持久化
        • 3.从待确认消息中哈希表当中删除
        • 3.完整代码
    • 5.推送消息
    • 6.销毁队列相关消息
    • 7.用于测试的一堆get接口
    • 8.check_and_gc
      • 1.要不要加锁
      • 2.框架
      • 3.步骤
      • 4.一个问题
      • 5.调用_mapper的gc,拿到包含有效消息的MessagePtr的list
      • 6.更新持久化消息总数和持久化有效消息总数
      • 7.更新持久化哈希表
    • 9.完整代码
  • 七、总体消息管理模块实现
    • 1.介绍
    • 2.加锁问题
    • 3.完整代码
  • 八、测试代码

首先要说明的是:
我们的基础版本的项目是只实现了消息推送的功能,暂不支持消息拉取。
消息拉取功能,我们留到扩展版本在实现

一、消息如何组织并管理

消息是依附于队列而存在的,因此我们的消息管理模块是以队列为单位进行管理的

消息也要持久化,不过它只是为了备份,无需查询操作,而新增和删除的操作较为频繁,且消息数量较多,因此我们不将消息存到数据库当中,而是存到普通二进制文件当中

1.消息结构体

我们的消息结构体之前在proto文件当中已经编写完毕:

// 3. 消息的基本属性
message BasicProperities
{string msg_id = 1;DeliveryMode mode = 2;string routing_key = 3;
}// 4. 消息结构体
//为了便于管理消息:
//      1. 有效载荷(持久化在文件当中的)
//              属性
//              消息内容
//      2. 管理字段
//              是否有效
//              偏移量
//              消息长度
message Message 
{message ValidLoad{string body = 1;BasicProperities properities = 2;string valid = 3;// 因为bool的true/false在protobuf当中持久化后的长度不同,因此我们不用bool,而是用"0"代表无效,"1"代表有效}ValidLoad valid = 1;uint64 offset = 2;uint64 len = 3;  
}

2.消息持久化管理模块设计

1.数据消息文件名

因为消息是以队列为单位进行管理的,所以我们想要将队列名和消息文件名产生关联(方便管理)

所以我们规定:

消息文件名 = 队列名.data

2.临时消息文件名

我们之前说过,我们往文件当中写入的是消息的有效载荷:

message ValidLoad
{string body = 1;BasicProperities properities = 2;string valid = 3;// 因为bool的true/false在protobuf当中持久化后的长度不同,因此我们不用bool,而是用"0"代表无效,"1"代表有效
}

其中valid字段用于我们消息的伪删除。
如果只采用伪删除:消息会越来越多,消息文件会越来越大

首先我们想到的是:
能不能就像闭散列哈希表那样,把DELETE的位置占掉,放上待插入数据
这个方法在我们文件版本伪删除法当中不现实,
因为每个消息的长度大概率都不一样,覆盖式写入会产生空隙位置,会使得文件读写产生隐患,不符合代码健壮性的原则
而且这么来写,效率太低,代码也较为复杂

那怎么办呢?
我们可以借助Java当中垃圾回收器(Garbage Collection)的思想
设置一个水位线(标准):当消息文件中消息总数超过2000,且无效消息超过一半时,进行垃圾回收

进行垃圾回收时其实就是依次读取数据消息文件当中的有效消息,并写入临时文件当中,然后把消息文件删除,将临时文件重命名为数据消息文件名

临时文件名:队列名.temp

3.对外接口与包含成员

对外接口:

  • 创建/删除消息文件
  • 新增消息
  • 删除消息
  • 垃圾回收(消息恢复)

包含成员:

  • 数据消息文件名
  • 临时消息文件名
  • 队列名

目前就只有一个问题了:文件是面向字节流的,因此会有数据包粘包问题,所以我们要自定义应用层协议来解决文件读写的粘包问题:

二、自定义应用层协议解决文件读写的粘包问题

1.Length-Value协议

8字节的消息长度+消息本身

我们的消息长度就用我们常用的size_t类型了,32位平台下是4字节
64位平台下是8字节

我们用的Linux版本大多都是编程默认就是在64位平台的
在这里插入图片描述

三、队列消息管理模块设计

1.待确认消息哈希表

我们要解决的问题是:

我们的消息在内存当中采用什么数据结构来存储?

这就要对消息进行分类了:
因为我们的消息队列是支持消息推送与消息确认的,因此消息就分为:

  1. 待推送消息(推送消息时使用)
  2. 待确认消息(确认消息时使用)

又因为我们需要随时检查并在符合条件的情况下进行gc,而gc时会改变持久化消息在文件当中的偏移量

因此gc之后,内存级的待确认消息当中的消息体的偏移量就“野”了,变成了“野”偏移量了

因此我们就要在gc的时候随时更新待确认消息当中的偏移量

因此我们就要能够根据某个字段来快速找到对应的消息结构体
所以待确认消息要用哈希表来存储 <消息ID,消息结构体::ptr>

using MessagePtr=std::shared_ptr<Message>;
std::unordered_map<std::string,MessagePtr> waitack_map;

2.待推送消息链表

对于待推送消息,我们选择list,而不是vector,queue,unordered_map

1.为何不选哈希表

因为我们的待推送消息链表无需进行基于key值的随机访问,所以用哈希表的意义不大

2.vector与list

  1. 因为我们作为消息队列,要确保消息推送时的有序性,因此没有随机访问需求,vector最大的优势发挥不出来
  2. 因为消息推送是有序的:需要进行“头删尾插”或者“头插尾删”,也就是在容器两端进行操作,而vector的头插和头删虽说可以利用insert和erase来完成,但是效率很低,而list效率较高

因此在vector和list当中,我们选择list

3.queue和list

  1. 因为我们在服务重启之后,进行gc时,要把待推送消息链表当中的数据都插入到持久化消息哈希表当中,因此需要遍历待推送消息
  2. 而queue是不支持在不删除元素的情况下进行遍历的,尽管其两端操作效率更高【因为底层是deque】,但是没法遍历,没办法,只能选list

因此在queue和list当中,我们选择list

4.deque和list

deque的两端操作效率比list高【因为内存分配快,缓存命中率高】,遍历效率也高于list【缓存命中率高】

但是:
它们性能上的差异并不大,而且list更加灵活,我们的项目较为复杂,采用list能有更好的扩展性,当然,如果大家想用deque也可以的

std::list<MessagePtr> waitpush_list;

3. 持久化消息哈希表

服务重启时,我们需要读取持久化消息,此时消息文件当中的数据有些是无效的,既然都要读取,那何不顺便gc一下呢?

而gc之后加载到内存当中消息放在哪里呢?

  1. 待推送消息链表??
    可以这么搞,这样的话持久化消息可以被扩展为能够被消费者主动拉取的消息,但是消息确认时,它不太适合快速查找来删除该持久化消息
  2. 待确认哈希表?
    不合适,因为消息都还没有被推送,那何谈待确认,不符合这个哈希表的任务,尽管实现上可以,但是代码不优雅

因此我们就再搞一个持久化消息哈希表<消息ID,消息结构体::ptr>
有了它之后,gc的时候直接改这个持久化消息哈希表即可,而无需改待确认消息哈希表了,因为各司其职,见名知义,代码更优雅

而且value都是智能指针,只要其资源被改了,那么所有的智能指针所访问到的也就都是修改后的新版本

std::unordered_map<std::string,MessagePtr> durable_map;

4.对外接口与包含成员

对外接口:

  • 发布消息(增)
  • 确认消息(删)
  • 垃圾回收(消息恢复)
  • 获取队首消息(进行消息推送)
  • 销毁该队列所有消息(删除队列时要用)

包含成员:

  • 待确认消息哈希表
  • 待推送消息哈希表
  • 持久化消息哈希表
  • 持久化消息总数
  • 持久化有效消息总数
  • 互斥锁
  • 队列名
  • 消息持久化管理模块句柄

因为我们的这个队列消息管理模块可能会同时被多线程访问,因此需要加上互斥锁保证线程安全

四、总体消息管理模块设计

其实就是把队列消息管理模块组织一下,存到一个哈希表当中而已

对外接口:

  • 初始化队列消息管理结构
  • 销毁队列消息管理结构
  • 发布消息
  • 确认消息
  • 获取队首消息

包含成员:

  • 互斥锁
  • unordered_map<队列名,队列消息管理模块::ptr>
  • 消息文件所在目录

五、消息持久化管理模块实现

1.构造函数

构造函数其实就是初始化:

  1. _datafile
  2. _tmpfile
    然后创建消息文件目录,最后创建这两个文件
MessageMapper(const std::string &basedir, const std::string &qname)
{if (!FileHelper::createDir(basedir)){default_fatal("消息持久化管理模块句柄初始化失败,因为创建消息文件目录失败, 目录名: %s",basedir.c_str());abort();}std::string dir = basedir;if (dir.back() != '/')dir.push_back('/');_datafile = dir + qname + data_suffix;_tmpfile = dir + qname + tmp_suffix;if (!createFile()){default_fatal("消息持久化管理模块句柄初始化失败,因为创建数据和临时文件失败, 数据文件名: %s , 临时文件名: %s",_datafile.c_str(),_tmpfile.c_str());abort();}
}

2.删除文件

注意: 析构函数直接用编译器默认生成的即可,而无需调用removeFile
否则就是画蛇添足,好不容易持久化好了,析构的时候又删除了,典型的画蛇添足

bool removeFile()
{if (!FileHelper::removeFile(_datafile)){default_fatal("删除数据文件失败, 数据文件名: %s",_datafile.c_str());return false;}if (!FileHelper::removeFile(_tmpfile)){default_fatal("删除临时文件失败, 临时文件名: %s",_datafile.c_str());return false;}return true;
}

3.插入消息

因为gc的时候我们要读取数据文件,将其中的有效信息写到临时文件当中

也就是说我们既有向数据文件当中写入消息的需求,也有向临时文件当中写入消息的需求

因此我们单拎出一个函数来,说白了,函数签名就是这个:

bool _insert(const std::string &filename, MessagePtr &mp);

下面我们就要想:插入一个消息一共分为几步:

  1. 把mp当中的有效载荷序列化拿到一个string load
  2. 拿到文件总大小(filesz),拿到load的长度len(类型:size_t)
  3. 把长度写入文件,大小是sizeof(size_t)
  4. 把load写入文件
  5. 设置mp的offset和len这两个字段
    【注意】:消息的offset指向的是有效载荷的起始位置,而不是消息长度的起始位置
    在这里插入图片描述
    就想把大象放到冰箱一样,一步一步来就行

1.有效载荷序列化

// 1.序列化有效载荷,拿到消息长度
std::string load = mp->valid().SerializeAsString();

2.拿到文件大小和有效载荷长度

// 2.拿到文件大小(偏移量)和有效载荷长度
size_t offset = FileHelper::size(filename);
size_t len = load.size();

3.把len写入文件,写sizeof(size_t)个字节

这是我们的write函数,第二个参数类型是const char*
而不要搞成string类型,因为我们还要写入长度(size_t)类型,主要是要保证其所占字节数恒为sizeof(size_t)啊static bool write(const std::string &filename,const char* str, size_t offset, size_t len);
// 3.把len写入文件,写sizeof(size_t)个字节
if (!FileHelper::write(filename, reinterpret_cast<const char *>(&len), offset, sizeof(size_t)))
{default_fatal("消息的长度写入文件失败, 消息ID: %s, 文件名: %s",mp->valid().properities().msg_id().c_str(),filename.c_str());return false;
}

4.把有效载荷写入文件

static bool write(const std::string &filename, const std::string &str);
// 4.把有效载荷写入文件
if (!FileHelper::write(filename, load))
{default_fatal("消息的有效载荷写入文件失败, 消息ID:  %s, 文件名: %s",mp->valid().properities().msg_id().c_str(),filename.c_str());return false;
}

5.设置mp的offset和len这两个字段

mp->set_len(len);
mp->set_offset(offset + sizeof(size_t)); // 注意:偏移量是有效载荷的起始位置,而不是长度的起始位置!!

6._insert完整代码

bool _insert(const std::string &filename, MessagePtr &mp)
{// 1.序列化有效载荷,拿到消息长度std::string load = mp->valid().SerializeAsString();size_t len = load.size();// 2.拿到文件大小(偏移量)size_t offset = FileHelper::size(filename);// 3.把len写入文件,写sizeof(size_t)个字节if (!FileHelper::write(filename, reinterpret_cast<const char *>(&len), offset, sizeof(size_t))){default_fatal("消息的长度写入文件失败, 消息ID: %s, 文件名: ",mp->valid().properities().msg_id().c_str(),filename.c_str());return false;}// 4.把有效载荷写入文件if (!FileHelper::write(filename, load)){default_fatal("消息的有效载荷写入文件失败, 消息ID: %s, 文件名: %s",mp->valid().properities().msg_id().c_str(),filename.c_str());return false;}// 5.设置mp的offset和len这两个字段mp->set_len(len);mp->set_offset(offset + sizeof(size_t)); // 注意:偏移量是有效载荷的起始位置,而不是长度的起始位置!!return true;
}

4.删除消息

因为我们采用的是伪删除,所以删除消息时要给我【消息持久化管理模块】这个MessagePtr
bool erase(MessagePtr &mp);

同样的,步骤是:

  1. 将有效标记位置为无效
  2. 将有效载荷序列化为 string data
  3. 拿到偏移量并检查长度是否相等
  4. 将修改后的mp写到offset位置,写len个长度

1.将有效标记位置为无效

// 1. 将有效标志置为无效
mp->mutable_valid()->set_valid("0");

2.序列化

// 2. 序列化
std::string data = mp->valid().SerializeAsString();

3.拿到偏移量并检查长度是否相等

// 3. 拿到偏移量和检查长度是否相等
size_t offset = mp->offset();
if (mp->len() != data.size())
{default_info("删除持久化数据失败,因为修改后的有效载荷跟文件当中的数据长度不同,len:%d ,mp->len():%d",data.size(),mp->len());return false;
}

4.将修改后的mp写到offset位置,写len个长度

// 4. 写入数据文件
if (!FileHelper::write(_datafile, data.c_str(), offset, mp->len()))
{default_info("删除持久化数据失败,消息ID:%s , 文件名:%s",mp->valid().properities().msg_id().c_str(),_datafile.c_str());return false;
}

5.erase完整代码

bool erase(MessagePtr &mp)
{// 1. 将有效标志置为无效mp->mutable_valid()->set_valid("0");// 2. 序列化std::string data = mp->valid().SerializeAsString();// 3. 拿到偏移量和检查长度是否相等size_t offset = mp->offset();if (mp->len() != data.size()){default_info("删除持久化数据失败,因为修改后的有效载荷跟文件当中的数据长度不同,len:%d ,mp->len():%d",data.size(),mp->len());return false;}// 4. 写入数据文件if (!FileHelper::write(_datafile, data.c_str(), offset, mp->len())){default_info("删除持久化数据失败,消息ID:%s , 文件名:%s",mp->valid().properities().msg_id().c_str(),_datafile.c_str());return false;}return true;
}

5.gc垃圾回收

std::list<MessagePtr> gc();

步骤:

  1. 读取数据文件,将有效消息写入临时文件【加载有效信息】
  2. 删除数据文件
  3. 重命名临时文件

1.加载有效信息

步骤:

  1. 拿到文件总大小,并用offset来依次读取,直到offset>=文件总大小
  2. 先读取8字节长度,拿到len(offset+=sizeof(size_t))
  3. 读取len个字节的数据(offset+=len)
  4. 定义MessagePtr mp,反序列化拿到有效载荷
  5. 看mp是否有效,若有效,则进入下一步,否则continue,继续下一轮循环
  6. 复用_insert,将mp写入临时文件
  7. 将mp添加到需要返回的list当中
1.循环框架的构建
size_t offset = 0, sz = FileHelper::size(_datafile);
while (offset < sz)
{// 先读取长度offset += sizeof(size_t);// 再读取len个字节的有效数据offset += len;// 看是否有效,若有效则写入文件
}
2.读取长度

第二个参数是char*

static bool read(const std::string &filename, char* return_str, size_t offset, size_t len);
// 先读取长度
size_t len = 0;
if (!FileHelper::read(_datafile, reinterpret_cast<char *>(&len), offset, sizeof(size_t)))
{default_info("垃圾回收失败,因为读取数据文件长度失败,文件名:%s",_datafile.c_str());return msg_list;
}
offset += sizeof(size_t);
3.读取有效载荷

这里用string来读取
有两种方式读取:

对于string来说,有两种方法可以拿到内部char*类型的数组地址1. &body[0](body[0]就是数组首元素【char类型】),&一下就变成数据首元素的地址【char*】类型(是安全的)2. const_cast<char*>(body.c_str()):
body.c_str()const char*类型,然后用const_cast进行强转,去除const修饰【不推荐,强转是下下策,能不用就别用】
// 再读取len个字节的有效数据
std::string body(len, '\0');
if (!FileHelper::read(_datafile, &body[0], offset, len))
{default_info("垃圾回收失败,因为读取数据文件有效载荷失败,文件名:%s",_datafile.c_str());return msg_list;
}
offset += len;
4.反序列化拿到有效载荷
MessagePtr mp = std::make_shared<Message>();
mp->mutable_valid()->ParseFromString(body);
5.看是否有效,进行写入
// 若为有效,则写入临时文件,并插入到list当中
if (mp->valid().valid() == "1")
{// 写入临时文件,_insert函数内部会填充该智能指针的offset和len字段if (!_insert(_tmpfile, mp)){default_info("垃圾回收失败,因为写入临时文件失败,临时文件名:%s",_tmpfile.c_str());return msg_list;}// 放到链表当中msg_list.push_back(mp);
}
读取数据文件完整代码
std::list<MessagePtr> msg_list;
// 依次读取_datafile,将有效载荷进行序列化
size_t offset = 0, sz = FileHelper::size(_datafile);
while (offset < sz)
{// 先读取长度size_t len = 0;if (!FileHelper::read(_datafile, reinterpret_cast<char *>(&len), offset, sizeof(size_t))){default_info("垃圾回收失败,因为读取数据文件长度失败,文件名:%s",_datafile.c_str());return msg_list;}offset += sizeof(size_t);// 再读取len个字节的有效数据std::string body(len, '\0');if (!FileHelper::read(_datafile, &body[0], offset, len)) // 推荐,类型安全// if (!FileHelper::read(_datafile, const_cast<char*>(body.c_str()), offset, len))// 不推荐,强转是下下策,能不用就别用{default_info("垃圾回收失败,因为读取数据文件有效载荷失败,文件名:%s",_datafile.c_str());return msg_list;}offset += len;// 反序列化拿到有效载荷MessagePtr mp = std::make_shared<Message>();mp->mutable_valid()->ParseFromString(body);// 若为有效,则写入临时文件,并插入到list当中if (mp->valid().valid() == "1"){// 写入临时文件,_insert函数内部会填充该智能指针的offset和len字段if (!_insert(_tmpfile, mp)){default_info("垃圾回收失败,因为写入临时文件失败,临时文件名:%s",_tmpfile.c_str());return msg_list;}// 放到链表当中msg_list.push_back(mp);}
}

2.删除数据文件,重命名临时文件

// 删除数据文件,重命名临时文件
if (!FileHelper::removeFile(_datafile))
{default_info("垃圾回收失败,因为删除数据文件失败 %s",_datafile.c_str());return msg_list;
}
if (!FileHelper::rename(_tmpfile, _datafile))
{default_info("垃圾回收失败,因为重命名临时文件失败 %s",_tmpfile.c_str());return msg_list;
}

3.完整代码

std::list<MessagePtr> gc()
{std::list<MessagePtr> msg_list;// 依次读取_datafile,将有效载荷进行序列化size_t offset = 0, sz = FileHelper::size(_datafile);while (offset < sz){// 先读取长度size_t len = 0;if (!FileHelper::read(_datafile, reinterpret_cast<char *>(&len), offset, sizeof(size_t))){default_info("垃圾回收失败,因为读取数据文件长度失败,文件名:%s",_datafile.c_str());return msg_list;}offset += sizeof(size_t);// 再读取len个字节的有效数据std::string body(len, '\0');if (!FileHelper::read(_datafile, &body[0], offset, len)) // 推荐,类型安全// if (!FileHelper::read(_datafile, const_cast<char*>(body.c_str()), offset, len))// 不推荐,强转是下下策,能不用就别用{default_info("垃圾回收失败,因为读取数据文件有效载荷失败,文件名:%s",_datafile.c_str());return msg_list;}offset += len;// 反序列化拿到有效载荷MessagePtr mp = std::make_shared<Message>();mp->mutable_valid()->ParseFromString(body);// 若为有效,则写入临时文件,并插入到list当中if (mp->valid().valid() == "1"){// 写入临时文件,_insert函数内部会填充该智能指针的offset和len字段if (!_insert(_tmpfile, mp)){default_info("垃圾回收失败,因为写入临时文件失败,临时文件名:%s",_tmpfile.c_str());return msg_list;}// 放到链表当中msg_list.push_back(mp);}}// 删除数据文件,重命名临时文件if (!FileHelper::removeFile(_datafile)){default_info("垃圾回收失败,因为删除数据文件失败 %s",_datafile.c_str());return msg_list;}if (!FileHelper::rename(_tmpfile, _datafile)){default_info("垃圾回收失败,因为重命名临时文件失败 %s",_tmpfile.c_str());return msg_list;}// 最后返回list即可return msg_list;
}

6.完整代码

using MessagePtr = std::shared_ptr<Message>;const std::string data_suffix = ".data";
const std::string tmp_suffix = ".tmp";class MessageMapper
{
public:MessageMapper(const std::string &basedir, const std::string &qname){if (!FileHelper::createDir(basedir)){default_info("消息持久化管理模块句柄初始化失败,因为创建消息文件目录失败, 目录名: %s",basedir.c_str());abort();}std::string dir = basedir;if (dir.back() != '/')dir.push_back('/');_datafile = dir + qname + data_suffix;_tmpfile = dir + qname + tmp_suffix;if (!createFile()){default_info("消息持久化管理模块句柄初始化失败,因为创建数据和临时文件失败, 数据文件名: %s , 临时文件名: %s",_datafile.c_str(),_tmpfile.c_str());abort();}}// 注意:消息持久化管理类不能在析构的时候删除文件(这就是画蛇添足,好不容易持久化好了,析构又把它删了,那不白持久化了)bool createFile(){if (!FileHelper::createFile(_datafile)){default_info("创建数据文件失败, 数据文件名: %s",_datafile.c_str());return false;}if (!FileHelper::createFile(_tmpfile)){default_info("创建临时文件失败, 临时文件名: %s",_tmpfile.c_str());return false;}return true;}bool removeFile(){if (!FileHelper::removeFile(_datafile)){default_info("删除数据文件失败, 数据文件名: %s",_datafile.c_str());return false;}if (!FileHelper::removeFile(_tmpfile)){default_info("删除临时文件失败, 临时文件名: %s",_tmpfile.c_str());return false;}return true;}bool insert(MessagePtr &mp){// 因为gc的时候是要向临时文件写入数据的,因此把insert单独提出来return _insert(_datafile, mp);}bool erase(MessagePtr &mp){// 1. 将有效标志置为无效mp->mutable_valid()->set_valid("0");// 2. 序列化std::string data = mp->valid().SerializeAsString();// 3. 拿到偏移量和检查长度是否相等size_t offset = mp->offset();if (mp->len() != data.size()){default_info("删除持久化数据失败,因为修改后的有效载荷跟文件当中的数据长度不同,len:%d ,mp->len():%d",data.size(),mp->len());return false;}// 4. 写入数据文件if (!FileHelper::write(_datafile, data.c_str(), offset, mp->len())){default_info("删除持久化数据失败,消息ID:%s , 文件名:%s",mp->valid().properities().msg_id().c_str(),_datafile.c_str());return false;}return true;}std::list<MessagePtr> gc(){std::list<MessagePtr> msg_list;// 依次读取_datafile,将有效载荷进行序列化size_t offset = 0, sz = FileHelper::size(_datafile);while (offset < sz){// 先读取长度size_t len = 0;if (!FileHelper::read(_datafile, reinterpret_cast<char *>(&len), offset, sizeof(size_t))){default_info("垃圾回收失败,因为读取数据文件长度失败,文件名:%s",_datafile.c_str());return msg_list;}offset += sizeof(size_t);// 再读取len个字节的有效数据std::string body(len, '\0');if (!FileHelper::read(_datafile, &body[0], offset, len)) // 推荐,类型安全// if (!FileHelper::read(_datafile, const_cast<char*>(body.c_str()), offset, len))// 不推荐,强转是下下策,能不用就别用{default_info("垃圾回收失败,因为读取数据文件有效载荷失败,文件名:%s",_datafile.c_str());return msg_list;}offset += len;// 反序列化拿到有效载荷MessagePtr mp = std::make_shared<Message>();mp->mutable_valid()->ParseFromString(body);// 若为有效,则写入临时文件,并插入到list当中if (mp->valid().valid() == "1"){// 写入临时文件,_insert函数内部会填充该智能指针的offset和len字段if (!_insert(_tmpfile, mp)){default_info("垃圾回收失败,因为写入临时文件失败,临时文件名:%s",_tmpfile.c_str());return msg_list;}// 放到链表当中msg_list.push_back(mp);}}// 删除数据文件,重命名临时文件if (!FileHelper::removeFile(_datafile)){default_info("垃圾回收失败,因为删除数据文件失败 %s",_datafile.c_str());return msg_list;}if (!FileHelper::rename(_tmpfile, _datafile)){default_info("垃圾回收失败,因为重命名临时文件失败 %s",_tmpfile.c_str());return msg_list;}// 最后返回list即可return msg_list;}private:bool _insert(const std::string &filename, MessagePtr &mp){// 1.序列化有效载荷,拿到消息长度std::string load = mp->valid().SerializeAsString();size_t len = load.size();// 2.拿到文件大小(偏移量)size_t offset = FileHelper::size(filename);// 3.把len写入文件,写sizeof(size_t)个字节if (!FileHelper::write(filename, reinterpret_cast<const char *>(&len), offset, sizeof(size_t))){default_info("消息的长度写入文件失败, 消息ID: %s , 文件名: %s",mp->valid().properities().msg_id().c_str(),filename.c_str());return false;}// 4.把有效载荷写入文件if (!FileHelper::write(filename, load)){default_info("消息的有效载荷写入文件失败, 消息ID: %s , 文件名: %s",mp->valid().properities().msg_id().c_str(),filename.c_str());return false;}// 5.设置mp的offset和len这两个字段mp->set_len(len);mp->set_offset(offset + sizeof(size_t)); // 注意:偏移量是有效载荷的起始位置,而不是长度的起始位置!!return true;}std::string _datafile;std::string _tmpfile;
};

7.是否需要加锁???

在这里插入图片描述
因此消息持久化管理模块无需加锁

六、队列消息管理模块实现

1.成员与构造函数

public:QueueMessageManager(const std::string &basedir, const std::string &qname)
: _qname(qname), _mapper(basedir, qname), _total_count(0), _valid_count(0) {}private:std::string _qname;
std::mutex _mutex;
MessageMapper _mapper;
std::list<MessagePtr> _waitpush_list;
std::unordered_map<std::string, MessagePtr> _waitack_map;
std::unordered_map<std::string, MessagePtr> _durable_map;
size_t _total_count;
size_t _valid_count;

这里没有在构造的时候就恢复历史消息,是为了加快对象的构造过程
以便减少总体消息管理模块的锁竞争

这句话是怎么个意思呢?
写个伪代码,大家就一清二楚了

总体消息管理模块当中的初始化队列消息管理模块的函数
void initQueueMessageManager(const std::string &qname)
{QueueMessageManager::ptr qmmp;{// 0. 加锁std::unique_lock<std::mutex> ulock(_mutex);// 1. 查找是否存在if (_qmsg_map.count(qname))return;// 2. 直接插入qmmp = std::make_shared<QueueMessageManager>(_basedir, qname);_qmsg_map.insert(std::make_pair(qname, qmmp));}// 为了降低锁冲突,因此把recovery单提出来搞qmmp->recovery();
}
private:
std::mutex _mutex;
std::unordered_map<std::string, QueueMessageManager::ptr> _qmsg_map;
std::string _basedir;

也就是说,因为 总体消息管理模块中的哈希表需要保证线程安全,
因此对象构造依旧属于临界区代码
所以为了让临界区代码执行的快一些(降低锁冲突),又因为消息持久化管理模块本来就加锁了,所以我们可以放心大胆的把recovery单拎出去

2.恢复历史消息

  1. 加锁
  2. 用gc返回的值初始化待推送消息链表
  3. 将gc返回的值放到持久化哈希表中
  4. 更新持久化消息总数和有效消息总数
void recovery()
{std::unique_lock<std::mutex> ulock(_mutex);// 1. 恢复历史消息_waitpush_list = _mapper.gc();// 2. 将gc后的消息放到持久化哈希表中for (auto &mp : _waitpush_list){_durable_map[mp->valid().properities().msg_id()] = mp;}// 3. 更新持久化消息总数和有效消息总数_total_count = _valid_count = _durable_map.size();
}

3.发布消息

1.函数签名

因为消息是依附于队列而存在的,因此如果该队列是非持久化队列,那么该消息即使持久化了,也没有价值,因为无法被消费

因此发布消息时我们需要知道对应队列是持久化的
现在有两种选择:

  1. 包含队列文件,去查对应队列是否持久化了【不好,因为会增大耦合度】
  2. 加一个参数,表示对应的队列是否持久化了
// 3. 消息的基本属性
message BasicProperities
{string msg_id = 1;DeliveryMode mode = 2;string routing_key = 3;
}
message Message 
{message ValidLoad{string body = 1;BasicProperities properities = 2;string valid = 3;}ValidLoad valid = 1;uint64 offset = 2;uint64 len = 3;  
}

下面我们来看,消息结构体当中,那些是需要用户传入的?
BasicProperities properities和string body是需要用户传入的
而其余的valid,offset和len都无需用户传入

因此函数签名是:

bool publishMessage(const BasicProperities *bp, const std::string &body, DeliveryMode mode);

2.步骤

  1. 构建消息智能指针
  2. 看是否需要持久化
  3. 如果需要,则进行第四步,否则跳到第五步
  4. 在文件中插入该消息并且放到持久化消息哈希表中,更新total_count和valid_count
  5. 放到待推送消息链表当中
0.要不要遍历消息链表

我们常规来说,一想到的肯定是遍历啊。
可是有问题:

首先,我们要想:为何我们要遍历?

是为了防止推送重复消息,可是我们msg_id是UUID啊,出现重复的率几乎为0,因此如果真的出现了重复
那么我们有理由认为,这是调用者要求我们重新推送,所以我们不做检查,也就是不遍历消息链表

1.构建消息智能指针
// 消息能否持久化取决于队列是否持久化,因此消息的持久化与否不仅要看bp当中的mode,还要看DeliveryMode mode
// 只有当DeliveryMode mode是持久化时,才看bp当中的mode,否则一律不持久化// 1. 构建消息智能指针
MessagePtr mp = std::make_shared<Message>();
mp->mutable_valid()->set_body(body);
mp->mutable_valid()->set_valid("1");mp->mutable_valid()->mutable_properities()->set_msg_id(bp->msg_id());
mp->mutable_valid()->mutable_properities()->set_routing_key(bp->routing_key());
DeliveryMode final_mode = (mode == DURABLE && bp->mode() == DURABLE) ? DURABLE : UNDURABLE;
mp->mutable_valid()->mutable_properities()->set_mode(final_mode);
2.持久化

在持久化这里,_mapper的操作和那三个数据结构,还有两个整形变量
,他们都需要受到加锁保护,因此下面就需要加锁了

而上面构造mp,因为线程独享栈空间,局部变量是线程安全的,所以上面无需加锁(降低锁冲突)

// 加锁
std::unique_lock<std::mutex> ulock(_mutex);// 2. 看是否需要持久化
if (final_mode == DURABLE)
{if (!_mapper.insert(mp)){default_info("发布消息失败, 因为消息持久化失败, 消息ID: %s",bp->msg_id().c_str());return false;}// 放到持久化哈希表中_durable_map[bp->msg_id()] = mp;_total_count++;_valid_count++;
}
3.放到待推送链表当中
// 3. 放到待推送链表当中
_waitpush_list.push_back(mp);

3.完整代码

bool publishMessage(const BasicProperities *bp, const std::string &body, DeliveryMode mode)
{// 消息能否持久化取决于队列是否持久化,因此消息的持久化与否不仅要看bp当中的mode,还要看DeliveryMode mode// 只有当DeliveryMode mode是持久化时,才看bp当中的mode,否则一律不持久化// 1. 构建消息智能指针MessagePtr mp = std::make_shared<Message>();mp->mutable_valid()->set_body(body);mp->mutable_valid()->set_valid("1");mp->mutable_valid()->mutable_properities()->set_msg_id(bp->msg_id());mp->mutable_valid()->mutable_properities()->set_routing_key(bp->routing_key());DeliveryMode final_mode = (mode == DURABLE && bp->mode() == DURABLE) ? DURABLE : UNDURABLE;mp->mutable_valid()->mutable_properities()->set_mode(final_mode);// 加锁std::unique_lock<std::mutex> ulock(_mutex);// 2. 看是否需要持久化if (final_mode == DURABLE){if (!_mapper.insert(mp)){default_info("发布消息失败, 因为消息持久化失败, 消息ID: %s",bp->msg_id().c_str());return false;}// 放到持久化哈希表中_durable_map[bp->msg_id()] = mp;_total_count++;_valid_count++;}// 3. 放到待推送链表当中_waitpush_list.push_back(mp);return true;
}

4.确认消息

1.函数签名

删除消息只需要用消息ID在待确认哈希表当中就能查到对应的MessagePtr,就能够拿到该消息的所有信息
因此,函数签名是:

bool ackMessage(const std::string &msg_id);

2.查找是否存在

跟发布消息不同,确认消息这里需要查找是否存在。
因为不存在就代表对应消息早已被确认过了,这样反而能提高效率
毕竟,本来就要根据msg_id才能拿到MessagePtr

步骤:

  1. 在待确认哈希表当中查找
  2. 看是否持久化
  3. 若持久化,则进行第四步,否则跳到第五步
  4. 在文件中删除该消息,从持久化哈希表当中删除,更新valid_count【删除后进行gc检测并执行】
  5. 在待确认哈希表当中删除
1.在待确认哈希表当中查找
// 1. 在待确认哈希表当中查找
std::unique_lock<std::mutex> ulock(_mutex);
auto iter = _waitack_map.find(msg_id);
if (iter == _waitack_map.end())return true;
2.持久化
// 2. 看是否持久化
MessagePtr mp = iter->second;
if (mp->valid().properities().mode() == DURABLE)
{if (!_mapper.erase(mp)){default_info("消息确认失败,因为消息持久化删除失败,消息ID:%s",msg_id.c_str());return false;}// 从持久化哈希表当中删除_durable_map.erase(msg_id);_valid_count--;// 每次删除时看看是否需要gccheck_and_gc();
}
3.从待确认消息中哈希表当中删除
// 3. 从待确认哈希表当中删除
_waitack_map.erase(msg_id);
3.完整代码
bool ackMessage(const std::string &msg_id)
{// 1. 在待确认哈希表当中查找std::unique_lock<std::mutex> ulock(_mutex);auto iter = _waitack_map.find(msg_id);if (iter == _waitack_map.end())return true;// 2. 看是否持久化MessagePtr mp = iter->second;if (mp->valid().properities().mode() == DURABLE){if (!_mapper.erase(mp)){default_info("消息确认失败,因为消息持久化删除失败,消息ID:%s",msg_id.c_str());return false;}// 从持久化哈希表当中删除_durable_map.erase(msg_id);_valid_count--;// 每次删除时看看是否需要gccheck_and_gc();}// 3. 从待确认哈希表当中删除_waitack_map.erase(msg_id);return true;
}

5.推送消息

其实就是取出待推送链表的队头元素,放到待确认哈希表当中
并返回该MessagePtr

MessagePtr front()
{std::unique_lock<std::mutex> ulock(_mutex);if (_waitpush_list.empty()){return MessagePtr();}MessagePtr mp = _waitpush_list.front();_waitpush_list.pop_front();_waitack_map[mp->valid().properities().msg_id()] = mp;return mp;
}

6.销毁队列相关消息

既包括持久化消息文件,也包括内存级消息数据结构

// 需要提供销毁该队列所有信息的方法(删除队列时要用)
void clear()
{std::unique_lock<std::mutex> ulock(_mutex);_mapper.removeFile();_waitpush_list.clear();_waitack_map.clear();_durable_map.clear();_valid_count = _total_count = 0;
}

7.用于测试的一堆get接口

// 不能修饰为const成员函数,因为需要申请释放锁
size_t waitpush_count()
{std::unique_lock<std::mutex> ulock(_mutex);return _waitpush_list.size();
}size_t waitack_count()
{std::unique_lock<std::mutex> ulock(_mutex);return _waitack_map.size();
}size_t total_count()
{std::unique_lock<std::mutex> ulock(_mutex);return _total_count;
}size_t valid_count()
{std::unique_lock<std::mutex> ulock(_mutex);return _valid_count;
}

8.check_and_gc

1.要不要加锁

这里不能加锁,因为:

  1. 我们这个check_and_gc函数不允许被外界主动调用,是一个private成员函数
  2. 只有ackMessage函数才会调用我们这个check_and_gc函数,而ackMessage当中已经加了锁,因此如果check_and_gc也加锁的话,那么就死锁了
  3. 其实大家可以把check_and_gc当作就是ackMessage当中的代码,只不过因为功能相对独立,因此提取出来封装为单独一个函数,实现解耦

那我可以加两个锁吗?
这里也加一个锁?
这就是典型的墨守成规,访问同一个临界资源,用两个锁来保护
这是互斥锁的非常错误的使用方法

check_and_gc自己搞一个锁,没有任何意义

2.框架

bool check()
{return _total_count > 2000 && _valid_count * 2 < _total_count;
}// 为了提高代码的健壮性和可读性,并提高容错机制,所以虽然由于shared_ptr的共享特性,我们无需更新(早已隐式更新)
// 但是我们依然选择显示更新
void check_and_gc()
{if (!check())return;//开始gc// ...
}

3.步骤

  1. 调用_mapper的gc,拿到包含有效消息的MessagePtr的list
  2. 用这个list来更新持久化哈希表
  3. 更新持久化消息总数和持久化有效消息总数

4.一个问题

细心的小伙伴可能已经发现了一个问题:
我们为每个消息所对应的Message对象从头到尾只创建了一个实例
利用智能指针实现同一个实例对象的共享

而消息的offset和len字段早已在消息持久化管理模块的_insert函数当中更新了

因此我们的持久化哈希表当中的MessagePtr看似没变,实际上已经改变了,但是这个比较细节,算是一个"隐藏"特性
【shared_ptr共享所管理的资源,均保持可见性】

但是如果不更新的话,代码的可读性和健壮性以及容错机制都不太好,所以我们在这里采取显式更新

5.调用_mapper的gc,拿到包含有效消息的MessagePtr的list

std::list<MessagePtr> valid_list = _mapper.gc();

6.更新持久化消息总数和持久化有效消息总数

_valid_count = _total_count = valid_list.size();

7.更新持久化哈希表

就是遍历有效消息链表,然后拿到msg_id去哈希表当中查找并更新
在这里还可以对那些持久化的有效消息,但是没有在持久化哈希表当中的数据进行处理
处理方式就是将其重新放到待推送链表当中进行推送

for (auto &mp : valid_list)
{std::string msg_id = mp->valid().properities().msg_id();// 1.在持久化哈希表中查找该消息auto iter = _durable_map.find(msg_id);if (iter == _durable_map.end()){std::ostringstream oss;oss << "有个持久化的消息没有在_durable_map当中存储... 即:代码有问题,消息ID" << msg_id << "\n";// 将其重新放到待推送链表当中,进行推送_waitpush_list.push_back(mp);// 插入到持久化哈希表中_durable_map.insert(std::make_pair(msg_id, mp));continue;}// 这里是更新偏移量和长度,[其实这里也不需要更新,因为智能指针共享所管理的资源,均保持可见性]// 对应的offset和len早已在QueueMessageManager的gc时调用的_insert当中更新了// 但是为了代码的健壮性和可读性(我们不能要求每一个看我们这个代码的人都去想智能指针管理资源的共享性和可见性,因此这里额外更新一下// 也是顺手的事, 也是为了找是否有哪些消息不在持久化哈希表中,提高代码容错性)iter->second->set_offset(mp->offset());iter->second->set_len(mp->len());
}
// 为了提高代码的健壮性和可读性,并提高容错机制,所以虽然由于shared_ptr的共享特性,我们无需更新(早已隐式更新)
// 但是我们依然选择显式更新
void check_and_gc()
{if (!check())return;std::list<MessagePtr> valid_list = _mapper.gc();// 按照valid_list当中的MessagePtr来更新持久化哈希表for (auto &mp : valid_list){std::string msg_id = mp->valid().properities().msg_id();// 1.在持久化哈希表中查找该消息auto iter = _durable_map.find(msg_id);if (iter == _durable_map.end()){std::ostringstream oss;oss << "有个持久化的消息没有在_durable_map当中存储... 即:代码有问题,消息ID" << msg_id << "\n";// 将其重新放到待推送链表当中,进行推送_waitpush_list.push_back(mp);// 插入到持久化哈希表中_durable_map.insert(std::make_pair(msg_id, mp));continue;}// 这里是更新偏移量和长度,[其实这里也不需要更新,因为智能指针共享所管理的资源,均保持可见性]// 对应的offset和len早已在QueueMessageManager的gc时调用的_insert当中更新了// 但是为了代码的健壮性和可读性(我们不能要求每一个看我们这个代码的人都去想智能指针管理资源的共享性和可见性,因此这里额外更新一下// 也是顺手的事, 也是为了找是否有哪些消息不在持久化哈希表中,提高代码容错性)iter->second->set_offset(mp->offset());iter->second->set_len(mp->len());}_valid_count = _total_count = valid_list.size();
}

9.完整代码

class QueueMessageManager
{
public:using ptr = std::shared_ptr<QueueMessageManager>;QueueMessageManager(const std::string &basedir, const std::string &qname): _qname(qname), _mapper(basedir, qname), _total_count(0), _valid_count(0) {}void recovery(){std::unique_lock<std::mutex> ulock(_mutex);// 1. 恢复历史消息_waitpush_list = _mapper.gc();// 2. 将gc后的消息放到持久化哈希表中for (auto &mp : _waitpush_list){_durable_map[mp->valid().properities().msg_id()] = mp;}// 3. 更新持久化消息总数和有效消息总数_total_count = _valid_count = _durable_map.size();}bool publishMessage(const BasicProperities *bp, const std::string &body, DeliveryMode mode){// 消息能否持久化取决于队列是否持久化,因此消息的持久化与否不仅要看bp当中的mode,还要看DeliveryMode mode// 只有当DeliveryMode mode是持久化时,才看bp当中的mode,否则一律不持久化// 1. 构建消息智能指针MessagePtr mp = std::make_shared<Message>();mp->mutable_valid()->set_body(body);mp->mutable_valid()->set_valid("1");mp->mutable_valid()->mutable_properities()->set_msg_id(bp->msg_id());mp->mutable_valid()->mutable_properities()->set_routing_key(bp->routing_key());DeliveryMode final_mode = (mode == DURABLE && bp->mode() == DURABLE) ? DURABLE : UNDURABLE;mp->mutable_valid()->mutable_properities()->set_mode(final_mode);// 加锁std::unique_lock<std::mutex> ulock(_mutex);// 2. 看是否需要持久化if (final_mode == DURABLE){if (!_mapper.insert(mp)){default_info("发布消息失败, 因为消息持久化失败, 消息ID: %s",bp->msg_id().c_str());return false;}// 放到持久化哈希表中_durable_map[bp->msg_id()] = mp;_total_count++;_valid_count++;}// 3. 放到待推送链表当中_waitpush_list.push_back(mp);return true;}bool ackMessage(const std::string &msg_id){// 1. 在待确认哈希表当中查找std::unique_lock<std::mutex> ulock(_mutex);auto iter = _waitack_map.find(msg_id);if (iter == _waitack_map.end())return true;// 2. 看是否持久化MessagePtr mp = iter->second;if (mp->valid().properities().mode() == DURABLE){if (!_mapper.erase(mp)){default_info("消息确认失败,因为消息持久化删除失败,消息ID:%s",msg_id.c_str());return false;}// 从持久化哈希表当中删除_durable_map.erase(msg_id);_valid_count--;// 每次删除时看看是否需要gccheck_and_gc();}// 3. 从待确认哈希表当中删除_waitack_map.erase(msg_id);return true;}MessagePtr front(){std::unique_lock<std::mutex> ulock(_mutex);if (_waitpush_list.empty()){return MessagePtr();}MessagePtr mp = _waitpush_list.front();_waitpush_list.pop_front();_waitack_map[mp->valid().properities().msg_id()] = mp;return mp;}// 需要提供销毁该队列所有信息的方法(删除队列时要用)void clear(){std::unique_lock<std::mutex> ulock(_mutex);_mapper.removeFile();_waitpush_list.clear();_waitack_map.clear();_durable_map.clear();_valid_count = _total_count = 0;}// 不能修饰为const成员函数,因为需要申请释放锁size_t waitpush_count(){std::unique_lock<std::mutex> ulock(_mutex);return _waitpush_list.size();}size_t waitack_count(){std::unique_lock<std::mutex> ulock(_mutex);return _waitack_map.size();}size_t total_count(){std::unique_lock<std::mutex> ulock(_mutex);return _total_count;}size_t valid_count(){std::unique_lock<std::mutex> ulock(_mutex);return _valid_count;}private:bool check(){return _total_count > 2000 && _valid_count * 2 < _total_count;}// 为了提高代码的健壮性和可读性,并提高容错机制,所以虽然由于shared_ptr的共享特性,我们无需更新(早已隐式更新)// 但是我们依然选择显示更新void check_and_gc(){if (!check())return;std::list<MessagePtr> valid_list = _mapper.gc();// 按照valid_list当中的MessagePtr来更新持久化哈希表for (auto &mp : valid_list){std::string msg_id = mp->valid().properities().msg_id();// 1.在持久化哈希表中查找该消息auto iter = _durable_map.find(msg_id);if (iter == _durable_map.end()){std::ostringstream oss;oss << "有个持久化的消息没有在_durable_map当中存储... 即:代码有问题,消息ID" << msg_id << "\n";// 将其重新放到待推送链表当中,进行推送_waitpush_list.push_back(mp);// 插入到持久化哈希表中_durable_map.insert(std::make_pair(msg_id, mp));continue;}// 这里是更新偏移量和长度,[其实这里也不需要更新,因为智能指针共享所管理的资源,均保持可见性]// 对应的offset和len早已在QueueMessageManager的gc时调用的_insert当中更新了// 但是为了代码的健壮性和可读性(我们不能要求每一个看我们这个代码的人都去想智能指针管理资源的共享性和可见性,因此这里额外更新一下// 也是顺手的事, 也是为了找是否有哪些消息不在持久化哈希表中,提高代码容错性)iter->second->set_offset(mp->offset());iter->second->set_len(mp->len());}_valid_count = _total_count = valid_list.size();}std::string _qname;std::mutex _mutex;MessageMapper _mapper;std::list<MessagePtr> _waitpush_list;std::unordered_map<std::string, MessagePtr> _waitack_map;std::unordered_map<std::string, MessagePtr> _durable_map;size_t _total_count;size_t _valid_count;
};

七、总体消息管理模块实现

1.介绍

总体消息管理模块负责管理所有队列的消息
也就是管理所有的队列消息管理模块句柄

因此,它的成员:互斥锁,哈希表,basedir

private:
std::mutex _mutex;
std::unordered_map<std::string, QueueMessageManager::ptr> _qmsg_map;
std::string _basedir;

接口:

  1. 初始化和销毁队列消息管理模块(增,删)
  2. 队列消息管理模块所提供的所有接口(全都额外加一个queue_name来指定对那个队列进行什么操作 – 类似于查+操作)
  3. 唯一需要注意的是:只有查操作才需要互斥锁,是为了保证哈希表的线程安全。
    而操作则无需加锁,因为所有的队列消息管理模块本来就都需要加锁

2.加锁问题

那能不能我总体消息管理模块这里对操作部分也加锁,然后队列消息模块当中不加锁呢?

画一下图:
在这里插入图片描述
综上:

队列消息管理模块必须加锁,因为它必存在被多线程同时访问的可能

了解了上面的设计之后,代码很easy了,直接给出了

3.完整代码

class MessageManager
{
public:using ptr = std::shared_ptr<MessageManager>;MessageManager(const std::string &basedir): _basedir(basedir) {}void initQueueMessageManager(const std::string &qname){QueueMessageManager::ptr qmmp;{// 0. 加锁std::unique_lock<std::mutex> ulock(_mutex);// 1. 查找是否存在if (_qmsg_map.count(qname))return;// 2. 直接插入qmmp = std::make_shared<QueueMessageManager>(_basedir, qname);_qmsg_map.insert(std::make_pair(qname, qmmp));}// 为了降低锁冲突,因此把recovery单提出来搞qmmp->recovery();}void destroyQueueMessageManager(const std::string &qname){// 这里必须是对象,如果是指针或者引用就野了QueueMessageManager::ptr qmmp;{std::unique_lock<std::mutex> ulock(_mutex);auto iter = _qmsg_map.find(qname);if (iter == _qmsg_map.end()){return;}qmmp = iter->second;_qmsg_map.erase(iter);}// 为了降低锁冲突,因此把clear单提出来搞qmmp->clear();}bool publishMessage(const std::string &qname, const BasicProperities *bp, const std::string &body, DeliveryMode mode){QueueMessageManager::ptr qmmp;{std::unique_lock<std::mutex> ulock(_mutex);auto iter = _qmsg_map.find(qname);if (iter == _qmsg_map.end()){default_info("发布消息失败,因为该队列的消息管理模块句柄尚未初始化");return false;}qmmp = iter->second;}return qmmp->publishMessage(bp, body, mode);}bool ackMessage(const std::string &qname, const std::string &msg_id){QueueMessageManager::ptr qmmp;{std::unique_lock<std::mutex> ulock(_mutex);auto iter = _qmsg_map.find(qname);if (iter == _qmsg_map.end()){default_info("确认消息失败,因为该队列的消息管理模块句柄尚未初始化");return false;}qmmp = iter->second;}return qmmp->ackMessage(msg_id);}MessagePtr front(const std::string &qname){QueueMessageManager::ptr qmmp;{std::unique_lock<std::mutex> ulock(_mutex);auto iter = _qmsg_map.find(qname);if (iter == _qmsg_map.end()){default_error("获取消息失败,因为该队列的消息管理模块句柄尚未初始化");return MessagePtr();}qmmp = iter->second;}return qmmp->front();}size_t waitpush_count(const std::string &qname){QueueMessageManager::ptr qmmp;{std::unique_lock<std::mutex> ulock(_mutex);auto iter = _qmsg_map.find(qname);if (iter == _qmsg_map.end()){default_error("获取待推送消息数量失败,因为该队列的消息管理模块句柄尚未初始化");return 0;}qmmp = iter->second;}return qmmp->waitpush_count();}size_t waitack_count(const std::string &qname){QueueMessageManager::ptr qmmp;{std::unique_lock<std::mutex> ulock(_mutex);auto iter = _qmsg_map.find(qname);if (iter == _qmsg_map.end()){default_error("获取待确认消息数量失败,因为该队列的消息管理模块句柄尚未初始化");return 0;}qmmp = iter->second;}return qmmp->waitack_count();}size_t total_count(const std::string &qname){QueueMessageManager::ptr qmmp;{std::unique_lock<std::mutex> ulock(_mutex);auto iter = _qmsg_map.find(qname);if (iter == _qmsg_map.end()){default_error("获取持久化消息总数失败,因为该队列的消息管理模块句柄尚未初始化");return 0;}qmmp = iter->second;}return qmmp->total_count();}size_t valid_count(const std::string &qname){QueueMessageManager::ptr qmmp;{std::unique_lock<std::mutex> ulock(_mutex);auto iter = _qmsg_map.find(qname);if (iter == _qmsg_map.end()){default_error("获取持久化有效消息数量失败,因为该队列的消息管理模块句柄尚未初始化");return 0;}qmmp = iter->second;}return qmmp->valid_count();}private:std::mutex _mutex;std::unordered_map<std::string, QueueMessageManager::ptr> _qmsg_map;std::string _basedir;
};

八、测试代码

#include <gtest/gtest.h>
#include "../mqserver/message.hpp"using namespace ns_mq;MessageManager::ptr mmp;std::vector<std::string> uuid_vec = {UUIDHelper::uuid(), UUIDHelper::uuid(), UUIDHelper::uuid(), UUIDHelper::uuid(), UUIDHelper::uuid()};class MessageTest : public testing::Environment
{
public:virtual void SetUp(){mmp = std::make_shared<MessageManager>("./queue_message");mmp->initQueueMessageManager("queue1");}virtual void TearDown(){//mmp->destroyQueueMessageManager("queue1");}
};// TEST(message_test, recovery_test)
// {
//     ASSERT_EQ(mmp->waitpush_count("queue1"), 3);
//     ASSERT_EQ(mmp->waitack_count("queue1"), 0);
//     ASSERT_EQ(mmp->total_count("queue1"), 3);
//     ASSERT_EQ(mmp->valid_count("queue1"), 3);
// }TEST(message_test, publish_test)
{BasicProperities bp;bp.set_msg_id(uuid_vec[0]);bp.set_mode(DURABLE);bp.set_routing_key("news.music.#");ASSERT_EQ(mmp->publishMessage("queue1", &bp, "hello-0", DURABLE), true);bp.set_msg_id(uuid_vec[1]);ASSERT_EQ(mmp->publishMessage("queue1", &bp, "hello-1", DURABLE), true);bp.set_msg_id(uuid_vec[2]);ASSERT_EQ(mmp->publishMessage("queue1", &bp, "hello-2", DURABLE), true);bp.set_msg_id(uuid_vec[3]);ASSERT_EQ(mmp->publishMessage("queue1", &bp, "hello-3", DURABLE), true);bp.set_msg_id(uuid_vec[4]);ASSERT_EQ(mmp->publishMessage("queue1", &bp, "hello-4", DURABLE), true);ASSERT_EQ(mmp->waitpush_count("queue1"), 5);ASSERT_EQ(mmp->waitack_count("queue1"), 0);ASSERT_EQ(mmp->total_count("queue1"), 5);ASSERT_EQ(mmp->valid_count("queue1"), 5);
}TEST(message_test, get_test)
{MessagePtr mp = mmp->front("queue1");ASSERT_NE(mp.get(), nullptr);ASSERT_EQ(mp->valid().body(), std::string("hello-0"));mp = mmp->front("queue1");ASSERT_NE(mp.get(), nullptr);ASSERT_EQ(mp->valid().body(), std::string("hello-1"));mp = mmp->front("queue1");ASSERT_NE(mp.get(), nullptr);ASSERT_EQ(mp->valid().body(), std::string("hello-2"));ASSERT_EQ(mmp->waitpush_count("queue1"), 2);ASSERT_EQ(mmp->waitack_count("queue1"), 3);ASSERT_EQ(mmp->total_count("queue1"), 5);ASSERT_EQ(mmp->valid_count("queue1"), 5);
}TEST(message_test, ack_test)
{ASSERT_EQ(mmp->ackMessage("queue1", uuid_vec[0]), true);ASSERT_EQ(mmp->ackMessage("queue1", uuid_vec[1]), true);ASSERT_EQ(mmp->waitpush_count("queue1"), 2);ASSERT_EQ(mmp->waitack_count("queue1"), 1);ASSERT_EQ(mmp->total_count("queue1"), 5);ASSERT_EQ(mmp->valid_count("queue1"), 3);
}int main(int argc, char *argv[])
{testing::AddGlobalTestEnvironment(new MessageTest);testing::InitGoogleTest(&argc, argv);return RUN_ALL_TESTS();
}

以上就是项目第五弹:队列消息管理模块的全部内容

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/146105.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

[数据结构]动态顺序表的实现与应用

文章目录 一、引言二、动态顺序表的基本概念三、动态顺序表的实现1、结构体定义2、初始化3、销毁4、扩容5、缩容5、打印6、增删查改 四、分析动态顺序表1、存储方式2、优点3、缺点 五、总结1、练习题2、源代码 一、引言 想象一下&#xff0c;你有一个箱子&#xff08;静态顺序…

【医学半监督】对比互补掩蔽的自监督预训练半监督心脏图像分割

SELF-SUPERVISED PRE-TRAINING BASED ON CONTRASTIVE COMPLEMENTARY MASKING FOR SEMI-SUPERVISED CARDIAC IMAGE SEGMENTATION 2024 IEEE International Symposium on Biomedical Imaging (ISBI) 摘要: 心脏结构分割对心脏病诊断非常重要,而使用大量注释的深度学习在这项任…

Buck变换器闭环控制,simulink仿真模型(适合初学者学习)

Buck变换器&#xff0c;又称为降压斩波器&#xff0c;是一种常见的DC-DC转换器&#xff0c;广泛应用于电源管理领域。它通过开关元件&#xff08;通常是MOSFET或BJT&#xff09;的导通与截止&#xff0c;改变输入电压到负载的平均电压&#xff0c;从而实现电压的降低。在实际应…

harbor私有镜像仓库,搭建及管理

私有镜像仓库 docker-distribution docker的镜像仓库&#xff0c;默认端口号5000 做个仓库&#xff0c;把镜像放里头&#xff0c;用什么服务&#xff0c;起什么容器 vmware公司在docker私有仓库的基础上做了一个web页面&#xff0c;是harbor docker可以把仓库的镜像下载到本地&…

tauri嵌入自定义目录/文件,并在代码中读取文件内容的操作流程

可以看官方文档&#xff1a;Embedding Additional Files | Tauri Apps 在绑定了文件之后&#xff0c;可以在js中访问嵌入的文件或者在rust中读取嵌入的文件内容&#xff0c;详细的配置操作如下。 在src-tauri中创建自定义文件夹或文件&#xff0c;并在在tauri.conf.json中配置…

Java多线程Thread及其原理深度解析

文章目录 1. 实现多线程的方式2. Thread 部分源码2.1. native 方法注册2.2. Thread 中的成员变量2.3. Thread 构造方法与初始化2.4. Thread 线程状态与操作系统状态2.4. start() 与 run() 方法2.5. sleep() 方法2.6. join() 方法2.7. interrupt() 方法 本文参考&#xff1a; 线…

Spring自定义参数解析器

在这篇文章中&#xff0c;我们认识了参数解析器和消息转换器&#xff0c;今天我们来自定义一个参数解析器。 自定义参数解析器 实现HandlerMethodArgumentResolver的类&#xff0c;并注册到Spring容器。 Component&#xff0f;&#xff0f;注册到Spring public class UserAr…

Java集合必知必会:热门面试题汇编与核心源码(ArrayList、HashMap)剖析

写在前面 &#x1f525;我把后端Java面试题做了一个汇总&#xff0c;有兴趣大家可以看看&#xff01;这里&#x1f449; ⭐️在无数次的复习巩固中&#xff0c;我逐渐意识到一个问题&#xff1a;面对同样的面试题目&#xff0c;不同的资料来源往往给出了五花八门的解释&#…

【Linux进程控制】自主Shell

目录 自主shell实现 获取基本变量 实现命令行 获取用户命令字符串 命令行字符串分割 内建命令CD() chdir getcwd putenv 检查是否为内建命令 检查是否为重定向 执行命令 主函数设置 测试用例 项目代码 自主shell实现 根据之前学的内容&#xff0c;我们已经可以模…

【学习笔记】SSL/TLS安全机制之CAA

1、概念界定 CAA全称Certificate Authority Authorization&#xff0c;即证书颁发机构授权&#xff0c;每个CA都能给任何网站签发证书。 2、CAA要解决的问题 例如&#xff0c;蓝色网站有一张橙色CA颁发的证书&#xff0c;我们也知道还有许多其他的CA&#xff1b;中间人可以说服…

网址链接能做成二维码吗?在线网址二维码生成的操作技巧

现在用二维码能够展示很多的内容&#xff0c;将内容放入二维码后&#xff0c;通过扫码的方式获取内容会更加的方便快捷&#xff0c;简化获取内容的流程。比如在分享网上内容时&#xff0c;可以将链接生成二维码的方式来让用户扫码访问网页&#xff0c;那么网址转二维码具体该怎…

【BetterBench博士】2024年中国研究生数学建模竞赛 E题:高速公路应急车道紧急启用模型 问题分析

2024年中国研究生数学建模竞赛 E题&#xff1a;高速公路应急车道紧急启用模型 问题分析 更新进展 【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析 【BetterBench博士】2024年中国研究生数学建模竞赛 E题&#xff1a;高速公路应急车道紧急启用…

【垃圾识别系统】Python+卷积神经网络算法+人工智能+深度学习+计算机毕设项目选题+TensorFlow+图像识别

一、介绍 垃圾识别分类系统。本系统采用Python作为主要编程语言&#xff0c;通过收集了5种常见的垃圾数据集&#xff08;‘塑料’, ‘玻璃’, ‘纸张’, ‘纸板’, ‘金属’&#xff09;&#xff0c;然后基于TensorFlow搭建卷积神经网络算法模型&#xff0c;通过对图像数据集进…

Qt窗口——对话框

文章目录 对话框自定义对话框对话框分类消息对话框QMessageBox使用示例自定义按钮快速构造对话框 颜色对话框QColorDialog文件对话框QFileDialog字体对话框QFontDialog输入对话框QInputDialog 对话框 对话框可以理解成一个弹窗&#xff0c;用于短期任务或者简洁的用户交互 Qt…

2024华为杯研赛D题分析

2024华为杯研究生数学建模D题分析如下&#xff0c;完整版本在文末名片

【HTTP】请求“报头”,Referer 和 Cookie

Referer 描述了当前这个页面是从哪里来的&#xff08;从哪个页面跳转过来的&#xff09; 浏览器中&#xff0c;直接输入 URL/点击收藏夹打开的网页&#xff0c;此时是没有 referer。当你在 sogou 页面进行搜索时&#xff0c;新进入的网页就会有 referer 有一个非常典型的用…

扎克伯格的未来愿景:用智能眼镜引领数字社交新时代

Meta Connect 2024大会前夕&#xff0c;创始人马克扎克伯格的90分钟播客访谈&#xff0c;为我们描绘了Meta未来的蓝图。这场访谈&#xff0c;不仅是大会的热身&#xff0c;更是对科技未来的一次深刻洞察。 人工智能 - Ai工具集 - 未来办公人的智能办公生活导航网 扎克伯格的未…

nacos适配人大金仓的数据库

前言 在微服务架构中&#xff0c;服务发现和配置管理是关键组件。Nacos作为一个动态服务发现和配置管理平台&#xff0c;支持多种数据库作为其后端存储。本文将探讨如何在Nacos中适配人大金仓数据库&#xff0c;以及在此过程中的最佳实践。 Nacos简介 Nacos&#xff08;Nami…

二分查找算法(1) _二分查找_模板

个人主页&#xff1a;C忠实粉丝 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 C忠实粉丝 原创 二分查找算法(1) _二分查找模板 收录于专栏【经典算法练习】 本专栏旨在分享学习算法的一点学习笔记&#xff0c;欢迎大家在评论区交流讨论&#x1f48c; 1. 二…

Redis——持久化策略

Redis持久化 Redis的读写操作都是在内存上&#xff0c;所以Redis性能高。 但是当重启的时候&#xff0c;或者因为特殊情况导致Redis崩了&#xff0c;就可能导致数据的丢失。 所以Redis采取了持久化的机制&#xff0c;重启的时候利用之间持久化的文件实现数据的恢复。 Redis提…