项目第六弹:虚拟机管理模块、路由匹配模块

项目第六弹:虚拟机管理模块、路由匹配模块

  • 一、虚拟机管理模块的设计
    • 1.什么是虚拟机?
    • 2.借助MySQL来理解一下
    • 3.如何整合?【埋下伏笔】
  • 二、RabbitMQ为何要有虚拟机
    • 1.从业务角度来讲
    • 2.步步探索
      • 1.优点
      • 2.结合业务适用场景和需求
      • 3.发掘真正的原因
      • 4.反向验证
        • 1.物理隔离
        • 2.逻辑隔离
        • 3.根本原因
  • 三、虚拟机的模块设计
    • 1.虚拟机持久化模块设计
      • 1.RabbitMQ是如何做的
      • 2.如何设计
        • 1.如何保证持久化的交换机、队列、绑定信息、消息恢复之后仍然隶属于对应的虚拟机
        • 2.如何组织管理实现虚拟机的“默认持久化”
        • 3.设计
    • 2.虚拟机类
    • 3.虚拟机管理模块
  • 四、代码环节
    • 1.虚拟机模块
      • 0.分析是否需要加锁
      • 1.构造
        • 1.有一个问题,怎么解决?
        • 2.构造
      • 2.声明/删除交换机
      • 3.声明/删除队列
      • 4.绑定/解绑队列
      • 5.发布/确认消息
      • 6.消费消息
      • 7.其他方法
      • 8.get接口
      • 9.clear
      • 9.完整代码
    • 2.虚拟机持久化管理模块
    • 3.虚拟机管理模块
    • 4.测试
  • 五、路由匹配模块
    • 1.routing_key和binding_key的格式介绍
    • 2.动规分析
      • 1.状态定义与表示
      • 2.状态转移方程
        • 1.纯单词
        • 2.*
        • 3.#
          • 1.#匹配0个单词
          • 2.#匹配1个单词
          • 3.#匹配1个单词后继续匹配
      • 3.dp数组初始化
      • 4.填表顺序
      • 5.返回值
    • 3.动规代码
    • 4.模块整体代码
      • 1.路由匹配代码
      • 2.判断routing_key合法性
      • 3.判断binding_key合法性
      • 4.完整代码
    • 5.测试

关于虚拟机用户管理方面的功能,我们基础版本暂且先不实现

一、虚拟机管理模块的设计

1.什么是虚拟机?

虚拟机是RabbitMQ当中,整合交换机管理模块,队列管理模块,绑定信息管理模块的一个逻辑集合,是一个中间层

只不过,在我们的项目当中,因为消息是依附于队列才能存在的,所以消息附属于队列,而队列又附属于虚拟机

所以我们的消息管理模块也被纳入了虚拟机管理模块当中,但是AMQP并未规定消息管理模块直接附属于虚拟机
【我们的项目是这样组织的】
在这里插入图片描述
回顾一下AMQP协议:
在这里插入图片描述

2.借助MySQL来理解一下

我们可以借助MySQL的知识来理解:
先看一段SQL

mysql> create database host1;
Query OK, 1 row affected (0.02 sec)mysql> use host1;
Database changedmysql> create table if not exists exchange_table (-> name varchar(32) primary key,-> type int,-> durable int,-> auto_delete int,-> args varchar(512));
Query OK, 0 rows affected (0.07 sec)mysql> create table if not exists queue_table(-> name varchar(32) primary key,-> durable int,-> exclusive int,-> auto_delete int,-> args varchar(512));
Query OK, 0 rows affected (0.06 sec)mysql> create table if not exists binding_table(-> exchange_name varchar(32),-> queue_name varchar(32),-> binding_key varchar(32),-> durable int);
Query OK, 0 rows affected (0.05 sec)mysql> select table_name from information_schema.tables where table_schema='host1';
+----------------+
| TABLE_NAME     |
+----------------+
| binding_table  |
| exchange_table |
| queue_table    |
+----------------+
3 rows in set (0.00 sec)mysql> insert into exchange_table values ('exchange1',1,1,0,'新闻交换机=wzs');
Query OK, 1 row affected (0.02 sec)mysql> select * from exchange_table;
+-----------+------+---------+-------------+---------------------+
| name      | type | durable | auto_delete | args                |
+-----------+------+---------+-------------+---------------------+
| exchange1 |    1 |       1 |           0 | 新闻交换机=wzs      |
+-----------+------+---------+-------------+---------------------+
1 row in set (0.01 sec)

新闻交换机=wzs:这个交换机是新闻交换机,是wzs创建的

上面没啥难的,就是SQL语句而已
下面我们要说明的是:
数据库《----》虚拟机
交换机表《----》交换机管理模块
交换机表当中的数据《----》交换机

其实这一点我们在使用SQLite3数据库的时候就已经渗透/埋下伏笔了,因此理解起来并不难
在这里插入图片描述
因此:
在这里插入图片描述我们将MessageManager和QueueMessageManager结合在一起来看
便能抽离出一个消息表来:
在这里插入图片描述
因此“消息表”:

指定目录【在应用层控制该字段的唯一性,就一个if而已】
队列名
有效载荷:属性字段:消息ID投递模式routing_key消息主体有效标记位
偏移量和长度mysql> create table if not exists message_table(-> base_dir varchar(32) default './queue_message',-> queue_name varchar(32),-> msg_id varchar(32) primary key,-> mode int,-> routing_key varchar(32),-> body varchar(512),-> valid int,-> offset int,-> len int);
Query OK, 0 rows affected (0.05 sec)mysql> desc message_table;
+-------------+--------------+------+-----+-----------------+-------+
| Field       | Type         | Null | Key | Default         | Extra |
+-------------+--------------+------+-----+-----------------+-------+
| base_dir    | varchar(32)  | YES  |     | ./queue_message |       |
| queue_name  | varchar(32)  | YES  |     | NULL            |       |
| msg_id      | varchar(32)  | NO   | PRI | NULL            |       |
| mode        | int          | YES  |     | NULL            |       |
| routing_key | varchar(32)  | YES  |     | NULL            |       |
| body        | varchar(512) | YES  |     | NULL            |       |
| valid       | int          | YES  |     | NULL            |       |
| offset      | int          | YES  |     | NULL            |       |
| len         | int          | YES  |     | NULL            |       |
+-------------+--------------+------+-----+-----------------+-------+
9 rows in set (0.00 sec)当然,也可以在创建表之后显式修改表结构,添加默认值 
alter table message_table modify column base_dir varchar(32) default './queue_message';
在应用层写SQL插入时不插入base_dir即可

它们是关联的,所以我们可以借助表的关系来理解如何整合

3.如何整合?【埋下伏笔】

在这里插入图片描述

二、RabbitMQ为何要有虚拟机

1.从业务角度来讲

我们的消息队列服务器的本质其实就是一个生产者消费者模型,其业务处理的核心就是我们的虚拟机

所以才需要有虚拟机
但是这么来理解,太肤浅了吧。我们需要从代码和业务出发,但它们只是为了让我们更好的去理解我们的业务而已

但是核心落脚点一定不是业务处理本身,要想知道为什么
最好的方式是先从优劣点出发,然后结合业务的适用场景和需求来审视这些优点

从中发掘出真正的原因
然后反向回溯验证,能够成功完全的说服自己,大概率就是准确的

2.步步探索

1.优点

如果大家了解过Hadoop或者其他分布式框架,或者云平台,云服务器【它们都是用的云计算技术,而云计算技术用到了分布式框架】的话:
在这里插入图片描述
劣势呢?
可以说是没有劣势,最起码对于写代码来说更加方便清晰明了
只不过要想了解它设计背后的意义和价值,是有一定学习成本在的
【不过理解一下它背后的原因和意义,虽然学习成本高,但是非常值得的,甚至是一件好事】

2.结合业务适用场景和需求

  • 多租户应用:如果业务需要支持多个独立的用户或租户,且这些租户之间的消息传递需要隔离,那么RabbitMQ的虚拟机特性将非常有用。
  • 高安全性要求:对于需要严格控制消息传递权限和访问控制的业务,RabbitMQ的安全特性能够提供强有力的支持。
  • 分布式系统和云平台:在分布式系统或云平台环境中,RabbitMQ的可扩展性、灵活性和高可用性特性将大大简化消息传递的复杂性,并提升系统的整体性能。

3.发掘真正的原因

  • 特别是在大数据时代,分布式系统和云平台急需一个进行模块间消息传递/异步调用的一个消息中间件
  • 来在保证模块间高内聚,低耦合的情况下建立起多个模块之间的紧密联系

这就是消息中间件MQ的最大应用场景与需求,也是虚拟主机诞生的原因

4.反向验证

假设不用虚拟机,能有其他好方法来解决这一问题吗?

1.物理隔离

创建多个RabbitMQ,“稳,准,狠”、但是成本较高,且管理复杂

2.逻辑隔离
  1. 命名空间/前缀约定:在应用层实现,为不同应用或者团队的消息队列,交换机使用不同命名的前缀或者后缀
    但是这种方法需要额外的管理和约定才能保证命名不冲突,且不具备Virtual Host那样的强制性

  2. 访问控制列表:【权限表】
    通过精细的访问控制列表来限制不同用户或应用对RabbitMQ资源的访问。
    这可以在一定程度上实现资源的隔离,但相比Virtual Host来说,管理复杂度和出错风险更高

3.根本原因

最重要的是,这些都是RabbitMQ的使用者要动脑子去做的事情,这是用户啊,顾客就是上帝啊
因此RabbitMQ自己在内部加了一层“Virtual Host”,告诉用户,不同的Virtual Host是资源隔离的,这不就OK了吗

所以反向验证成功,只能用Virtual Host

三、虚拟机的模块设计

1.虚拟机持久化模块设计

写到这里的时候,不知道大家是否会有这种想法:
既然交换机,队列,绑定信息,他们都是虚拟机资源的一部分,都被虚拟机整合了

那么虚拟机按理来说也需要持久化字段吧,至少肯定不能不持久化啊
因此:

  • 难道虚拟机不持久化,交换机,队列,绑定信息,消息…都没法持久化了?

总感觉用起来体验不是很好
那不妨看一下RabbitMQ是如何做的

1.RabbitMQ是如何做的

在RabbitMQ当中,虚拟机默认就是持久化的,这里的“持久化”是指虚拟机在RabbitMQ服务重启后能够恢复其状态

并没有给虚拟机安排持久化字段

为何这样呢?
是为了提升用户体验,方便使用,使得用户更容易创建并管理其内部资源

因此,我们也这么做

2.如何设计

1.如何保证持久化的交换机、队列、绑定信息、消息恢复之后仍然隶属于对应的虚拟机

消息是依附于队列而存在的,它的消息文件名就已经保证了他一直属于对应的那个队列
在这里插入图片描述
到时候创建虚拟机时传入的数据库消息目录名就是该虚拟机的持久化资源

而且外部访问交换机,队列等等操作都必须经过虚拟机才能完成,这样的话内存级资源也是隔离的

这样就完美实现资源隔离了

2.如何组织管理实现虚拟机的“默认持久化”

给虚拟机创建一张表:
字段:
虚拟机名称(主键索引,不允许重复)
数据库名称(唯一键索引)
消息文件目录名(唯一键索引)

mysql> create table if not exists vhost_table(-> name varchar(32) primary key,-> dbfile varchar(32) unique key not null,-> basedir varchar(32) unique key not null);
Query OK, 0 rows affected (0.07 sec)mysql> desc vhost_table;
+---------+-------------+------+-----+---------+-------+
| Field   | Type        | Null | Key | Default | Extra |
+---------+-------------+------+-----+---------+-------+
| name    | varchar(32) | NO   | PRI | NULL    |       |
| dbfile  | varchar(32) | NO   | UNI | NULL    |       |
| basedir | varchar(32) | NO   | UNI | NULL    |       |
+---------+-------------+------+-----+---------+-------+
3 rows in set (0.00 sec)

为了组织这些虚拟机,我们把该虚拟机表放到一个单独的数据库当中:main.db,服务启动时自动恢复main.db当中vhost_table当中的所有虚拟机

3.设计

成员:

  • SqliteHelper

接口:

  • 创建/删除表
  • 新增/删除数据
  • 查询所有虚拟机【恢复历史数据时要用】

2.虚拟机类

我们之前说了:
虚拟机就是对交换机管理模块,队列管理模块,绑定信息管理模块,消息管理模块的整合

因此成员:

private:std::string _name;//虚拟机名称std::string _dbfile;//数据库名称std::string _basedir;//消息文件目录名ExchangeManager::ptr _emp;//交换机管理模块句柄MsgQueueManager::ptr _mqmp;//队列管理模块句柄BindingManager::ptr _bmp;//绑定信息管理模块句柄MessageManager::ptr _mmp;//消息管理模块句柄

接口:
1、声明/删除交换机
2、声明/删除队列
3、绑定/解绑队列
4、发布/确认消息
5、推送【消费】消息
6、获取指定交换机的所有绑定信息【消息推送时要用】
7、获取所有队列【以后要用,到时候在说原因】
8、清空虚拟机所有资源【析构函数不要调用它哦,否则画蛇添足】
9、获取指定交换机/队列【查】
10、判断指定交换机/队列是否存在【GTest】
11、判断某个绑定信息是否存在【GTest】

3.虚拟机管理模块

成员依旧是:

  1. <虚拟机名称,虚拟机::ptr> 的一个哈希表
  2. 虚拟机持久化管理模块句柄
  3. 互斥锁

接口:增删查

  1. 创建虚拟机
  2. 删除虚拟机
  3. 获取指定虚拟机::ptr
  4. 把虚拟机提供的接口全都拿过来封一下

四、代码环节

理论与实践相结合,下面开始写代码

1.虚拟机模块

0.分析是否需要加锁

因为那四个模块的管理句柄

ExchangeManager::ptr _emp;
MsgQueueManager::ptr _mqmp;
BindingManager::ptr _bmp;
MessageManager::ptr _mmp;

当中都加锁了,而且这里的临界资源就只有那四个模块的管理句柄
所以这里无需加锁

1.构造

1.有一个问题,怎么解决?

我们传入的dbfile和basedir可能都还没有创建,原本是想在构造函数体内部执行的,但是初始化列表时要创建交换机数据管理模块…

他们内部会恢复对应数据库的历史数据,因此我们创建dbfile和basedir必须要在他们【交换机…】构造函数执行之前

也就是说:
创建文件和目录必须要在初始化列表当中【交换机…】构造之前执行,但是无法直接写到初始化列表当中

此时利用initHelper就可以完美解决这一问题

class VirtualHost
{
private:  struct initHelper{initHelper(const std::string &dbfile,const std::string& basedir){// 创建dbfile和basedirFileHelper::createDir(FileHelper::parentDir(dbfile));FileHelper::createDir(FileHelper::parentDir(basedir));FileHelper::createFile(dbfile);}};//xxxxxxxxxxxx
private://类的成员变量按照声明顺序进行初始化initHelper _init_helper;std::string _name;std::string _dbfile;std::string _basedir;ExchangeManager::ptr _emp;MsgQueueManager::ptr _mqmp;BindingManager::ptr _bmp;MessageManager::ptr _mmp;
};

这种方法的一个好处是,它允许我们将前置操作封装在一个单独的类中,从而使主类的构造函数保持简洁

2.构造

构造的时候别忘了初始化所有队列的消息管理句柄
[其实就是恢复所有队列的历史消息]

class VirtualHost
{
public:using ptr = std::shared_ptr<VirtualHost>;VirtualHost(const std::string &name, const std::string &dbfile, const std::string &basedir):_init_helper(dbfile,basedir), _name(name), _dbfile(dbfile), _basedir(basedir), _emp(std::make_shared<ExchangeManager>(_dbfile)), _mqmp(std::make_shared<MsgQueueManager>(_dbfile)), _bmp(std::make_shared<BindingManager>(_dbfile)), _mmp(std::make_shared<MessageManager>(_basedir)){// 初始化所有队列的消息管理句柄MsgQueueMap mqm = _mqmp->getAllMsgQueue();for (auto &kv : mqm){_mmp->initQueueMessageManager(kv.first);}}
private:std::string _name;std::string _dbfile;std::string _basedir;ExchangeManager::ptr _emp;MsgQueueManager::ptr _mqmp;BindingManager::ptr _bmp;MessageManager::ptr _mmp;
};

2.声明/删除交换机

声明交换机直接复用即可
删除交换机之前要删除该交换机的所有绑定信息

// 声明/删除交换机
bool declareExchange(const std::string &ename, ExchangeType etype, bool edurable, bool eauto_delete,const google::protobuf::Map<std::string, std::string> &eargs)
{return _emp->declareExchange(ename, etype, edurable, eauto_delete, eargs);
}bool eraseExchange(const std::string &ename)
{// 删除交换机之前要删除该交换机的所有绑定信息哦_bmp->removeExchangeBindings(ename);return _emp->eraseExchange(ename);
}

3.声明/删除队列

声明队列时初始化该队列的消息管理模块句柄
删除队列时:

  1. 删除该队列的消息管理模块句柄
  2. 删除该队列的所有绑定信息
// 声明/删除队列
bool declareMsgQueue(const std::string &qname, bool qdurable, bool qexclusive, bool qauto_delete,const google::protobuf::Map<std::string, std::string> &qargs)
{// 初始化该队列的消息管理模块句柄_mmp->initQueueMessageManager(qname);return _mqmp->declareMsgQueue(qname, qdurable, qexclusive, qauto_delete, qargs);
}bool eraseMsgQueue(const std::string &qname)
{// 删除该队列的所有绑定信息if (!_bmp->removeMsgQueueBindings(qname)){default_error("删除队列失败,因为删除队列的所有绑定信息失败, 队列名: %s",qname.c_str());return false;}// 删除该队列的消息管理模块句柄_mmp->destroyQueueMessageManager(qname);return _mqmp->eraseMsgQueue(qname);
}

4.绑定/解绑队列

绑定队列时,要先查找交换机和队列是否存在
并根据他们的持久化方式来决定绑定信息的持久化方式[联系其低耦合模块之间的桥梁(虚拟机整合的意义)]

// 绑定/解绑队列
bool bind(const std::string &ename, const std::string &qname, const std::string &binding_key)
{// 因为不知道该交换机,队列是否存在// 也不知道他们是否持久化,所以先查找他们Exchange::ptr ep = _emp->getExchange(ename);if (ep.get() == nullptr){default_error("绑定交换机与队列失败,因为该交换机不存在, 交换机名称: %s",ename.c_str());return false;}MsgQueue::ptr mqp = _mqmp->getMsgQueue(qname);if (mqp.get() == nullptr){default_error("绑定交换机与队列失败,因为该队列不存在, 队列名称: %s",qname.c_str());return false;}return _bmp->bind(ename, qname, binding_key, ep->durable && mqp->durable);
}bool unBind(const std::string& ename,const std::string& qname)
{return _bmp->unBind(ename,qname);
}

5.发布/确认消息

发布消息时要先拿到对应的队列,然后根据队列的持久化方式来决定消息是否持久化

确认消息时直接复用即可

// 发布/确认消息
bool basicPublish(const std::string &qname, const BasicProperities *bp, const std::string &body)
{// 在这里能够知道队列的持久化方式,因此就能够传递durable了//  1. 查找该队列的ptr,看是否存在,拿到durable//  2. 发布消息MsgQueue::ptr mqp = _mqmp->getMsgQueue(qname);if (mqp.get() == nullptr){default_error("发布消息失败,因为该队列不存在, 队列名: %s",qname.c_str());return false;}return _mmp->publishMessage(qname, bp, body, (mqp->durable) ? DURABLE : UNDURABLE);
}bool basicAck(const std::string& qname,const std::string& msg_id)
{return _mmp->ackMessage(qname,msg_id);
}

6.消费消息

// 推送[消费]消息
MessagePtr basicConsume(const std::string& qname)
{return _mmp->front(qname);
}

直接复用即可

7.其他方法

后面全是复用就行的,直接给出了

// 获取指定交换机的所有绑定信息
MsgQueueBindingMap getAllBindingsByExchange(const std::string& ename)
{return _bmp->getExchangeBindings(ename);
}// 获取所有队列
MsgQueueMap getAllMsgQueue()
{return _mqmp->getAllMsgQueue();
}// 获取指定交换机/队列
Exchange::ptr getExchange(const std::string& ename)
{return _emp->getExchange(ename);
}MsgQueue::ptr getMsgQueue(const std::string& qname)
{return _mqmp->getMsgQueue(qname);
}// 判断指定交换机/队列是否存在
bool existsExchange(const std::string& ename)
{return _emp->exists(ename);
}bool existMsgQueue(const std::string& qname)
{return _mqmp->exists(qname);
}VirtualHostMap getAllVirtualHost()
{std::unique_lock<std::mutex> ulock(_mutex);return _vhmap;
}

8.get接口

这些get接口在虚拟机持久化管理模块那里要用

std::string get_name() const
{return _name;
}std::string get_dbfile() const
{return _dbfile;
}std::string get_basedir() const
{return _basedir;
}

9.clear

清除该虚拟机的所有资源

void clear()
{_emp->clear();_mqmp->clear();_bmp->clear();_mmp->clear();//删除数据库文件FileHelper::removeFile(_dbfile);//删除目录FileHelper::removeDir(_basedir);_dbfile.clear();_basedir.clear();_name.clear();
}

9.完整代码

class VirtualHost
{
private:  struct initHelper{initHelper(const std::string &dbfile,const std::string& basedir){// 创建dbfile和basedirFileHelper::createDir(FileHelper::parentDir(dbfile));FileHelper::createDir(FileHelper::parentDir(basedir));FileHelper::createFile(dbfile);}};public:using ptr = std::shared_ptr<VirtualHost>;VirtualHost(const std::string &name, const std::string &dbfile, const std::string &basedir):_init_helper(dbfile,basedir), _name(name), _dbfile(dbfile), _basedir(basedir), _emp(std::make_shared<ExchangeManager>(_dbfile)), _mqmp(std::make_shared<MsgQueueManager>(_dbfile)), _bmp(std::make_shared<BindingManager>(_dbfile)), _mmp(std::make_shared<MessageManager>(_basedir)){// 初始化所有队列的消息管理句柄MsgQueueMap mqm = _mqmp->getAllMsgQueue();for (auto &kv : mqm){_mmp->initQueueMessageManager(kv.first);}}// 声明/删除交换机bool declareExchange(const std::string &ename, ExchangeType etype, bool edurable, bool eauto_delete,const google::protobuf::Map<std::string, std::string> &eargs){return _emp->declareExchange(ename, etype, edurable, eauto_delete, eargs);}bool eraseExchange(const std::string &ename){// 删除交换机之前要删除该交换机的所有绑定信息哦_bmp->removeExchangeBindings(ename);return _emp->eraseExchange(ename);}// 声明/删除队列bool declareMsgQueue(const std::string &qname, bool qdurable, bool qexclusive, bool qauto_delete,const google::protobuf::Map<std::string, std::string> &qargs){// 初始化该队列的消息管理模块举句柄_mmp->initQueueMessageManager(qname);return _mqmp->declareMsgQueue(qname, qdurable, qexclusive, qauto_delete, qargs);}bool eraseMsgQueue(const std::string &qname){// 删除该队列的所有绑定信息if (!_bmp->removeMsgQueueBindings(qname)){default_error("删除队列失败,因为删除队列的所有绑定信息失败, 队列名: %s",qname.c_str());return false;}// 删除该队列的消息管理模块句柄_mmp->destroyQueueMessageManager(qname);return _mqmp->eraseMsgQueue(qname);}// 绑定/解绑队列bool bind(const std::string &ename, const std::string &qname, const std::string &binding_key){// 因为不知道该交换机,队列是否存在// 也不知道他们是否持久化,所以先查找他们Exchange::ptr ep = _emp->getExchange(ename);if (ep.get() == nullptr){default_error("绑定交换机与队列失败,因为该交换机不存在, 交换机名称: %s",ename.c_str());return false;}MsgQueue::ptr mqp = _mqmp->getMsgQueue(qname);if (mqp.get() == nullptr){default_error("绑定交换机与队列失败,因为该队列不存在, 队列名称: %s",qname.c_str());return false;}return _bmp->bind(ename, qname, binding_key, ep->durable && mqp->durable);}bool unBind(const std::string& ename,const std::string& qname){return _bmp->unBind(ename,qname);}// 发布/确认消息bool basicPublish(const std::string &qname, const BasicProperities *bp, const std::string &body){// 在这里能够知道队列的持久化方式,因此就能够传递durable了//  1. 查找该队列的ptr,看是否存在,拿到durable//  2. 发布消息MsgQueue::ptr mqp = _mqmp->getMsgQueue(qname);if (mqp.get() == nullptr){default_error("发布消息失败,因为该队列不存在, 队列名: %s",qname.c_str());return false;}return _mmp->publishMessage(qname, bp, body, (mqp->durable) ? DURABLE : UNDURABLE);}bool basicAck(const std::string& qname,const std::string& msg_id){return _mmp->ackMessage(qname,msg_id);}// 推送[消费]消息MessagePtr basicConsume(const std::string& qname){return _mmp->front(qname);}// 获取指定交换机的所有绑定信息MsgQueueBindingMap getAllBindingsByExchange(const std::string& ename){return _bmp->getExchangeBindings(ename);}// 获取所有队列MsgQueueMap getAllMsgQueue(){return _mqmp->getAllMsgQueue();}// 获取指定交换机/队列Exchange::ptr getExchange(const std::string& ename){return _emp->getExchange(ename);}MsgQueue::ptr getMsgQueue(const std::string& qname){return _mqmp->getMsgQueue(qname);}// 判断指定交换机/队列是否存在bool existsExchange(const std::string& ename){return _emp->exists(ename);}bool existMsgQueue(const std::string& qname){return _mqmp->exists(qname);}std::string get_name() const{return _name;}std::string get_dbfile() const{return _dbfile;}std::string get_basedir() const{return _basedir;}void clear(){_emp->clear();_mqmp->clear();_bmp->clear();_mmp->clear();//删除数据库文件FileHelper::removeFile(_dbfile);//删除目录FileHelper::removeDir(_basedir);_dbfile.clear();_basedir.clear();_name.clear();}private:std::string _name;std::string _dbfile;std::string _basedir;ExchangeManager::ptr _emp;MsgQueueManager::ptr _mqmp;BindingManager::ptr _bmp;MessageManager::ptr _mmp;
};

2.虚拟机持久化管理模块

唯一需要注意的就是SQLite3数据库当中创建表指定唯一键索引时
不是用unique key,而是一个unique关键字即可

using VirtualHostMap = std::unordered_map<std::string, VirtualHost::ptr>;class VirtualHostMapper
{
public:// 注意:这个dbfile是存放虚拟机表的那个专属数据库文件,而不是对应虚拟机整合的那些资源所属的数据库文件VirtualHostMapper(const std::string &dbfile): _helper(dbfile){if (!_helper.open()){default_fatal("虚拟机持久化管理模块创建失败, 因为数据库打开失败, 数据库: %s",dbfile.c_str());abort();}if (!createTable()){default_fatal("虚拟机持久化管理模块创建失败, 因为表创建失败");abort();}}bool createTable(){static std::string create_sql = "create table if not exists vhost_table(\name varchar(32) primary key,\dbfile varchar(32) unique not null,\basedir varchar(32) unique not null);";if (!_helper.exec(create_sql, nullptr, nullptr)){default_fatal("虚拟机表创建失败");return false;}return true;}bool dropTable(){static std::string drop_sql = "drop table if exists vhost_table;";if (!_helper.exec(drop_sql, nullptr, nullptr)){default_fatal("虚拟机表删除失败");return false;}return true;}bool insert(const VirtualHost::ptr &vhp){std::ostringstream insert_sql;insert_sql << "insert into vhost_table values(";insert_sql << "'" << vhp->get_name() << "',";insert_sql << "'" << vhp->get_dbfile() << "',";insert_sql << "'" << vhp->get_basedir() << "');";if (!_helper.exec(insert_sql.str(), nullptr, nullptr)){default_error("虚拟机插入数据失败");return false;}return true;}bool erase(const std::string &vname){std::ostringstream delete_sql;delete_sql << "delete from vhost_table if name=";delete_sql << "'" << vname << "';";if (!_helper.exec(delete_sql.str(), nullptr, nullptr)){default_error("虚拟机删除数据失败");return false;}return true;}VirtualHostMap recovery(){static std::string select_sql = "select * from vhost_table;";VirtualHostMap vhmap;if (!_helper.exec(select_sql, VirtualHostMapper::selectCallback, &vhmap)){default_error("虚拟机恢复/查询数据失败");return VirtualHostMap();}return vhmap;}private:static int selectCallback(void *args, int column, char **rows, char **fields){VirtualHostMap *vhptr = static_cast<VirtualHostMap *>(args);VirtualHost::ptr vhp = std::make_shared<VirtualHost>(rows[0], rows[1], rows[2]);vhptr->insert(std::make_pair(vhp->get_name(), vhp));return 0;}SqliteHelper _helper;
};

3.虚拟机管理模块

总体消息管理模块和队列消息管理模块之间的关系 跟
虚拟机管理模块和虚拟机模块之间的关系 是一样的
接口的复用也是一样的

只不过我们在查找对应的虚拟机时,可以复用一下getVirtualHost这个函数
因为对应虚拟机模块下的四个临界资源句柄都已加锁,所以拿到VirtualHost之后操作就无需加锁了
因此才可以复用
在这里插入图片描述
而声明/删除虚拟机则无法复用getVirtualHost,原因跟交换机模块当中声明/删除交换机无法复用exists函数一样

两个原子操作组合在一起就不原子了

const std::string vhost_dbfile = "main.db";class VirtualHostManager
{
public:using ptr=std::shared_ptr<VirtualHostManager>;VirtualHostManager(const std::string &dbfile = vhost_dbfile): _mapper(dbfile){_vhmap = _mapper.recovery();}// 声明/删除虚拟机的操作不能复用查找虚拟机这个函数,因为:// 要么死锁,要么就两个原子操作了,而两个原子操作放到一起就不原子了// 声明/删除虚拟机bool declareVirtualHost(const std::string &vname, const std::string dbfile, const std::string &basedir){// 1. 加锁,查找std::unique_lock<std::mutex> ulock(_mutex);if (_vhmap.count(vname))return true;// 2. 构造虚拟机对象VirtualHost::ptr vhp = std::make_shared<VirtualHost>(vname, dbfile, basedir);if (!_mapper.insert(vhp)){std::ostringstream oss;oss << "声明虚拟机失败,因为虚拟机持久化失败, 虚拟机名称: " << vname << "\n" return false;}_vhmap.insert(std::make_pair(vname, vhp));return true;}bool eraseVirtualHost(const std::string &vname){VirtualHost::ptr vhp;{// 1. 加锁,查找std::unique_lock<std::mutex> ulock(_mutex);auto iter = _vhmap.find(vname);if (iter == _vhmap.end()){return true;}// 2. 保存vhp = iter->second;// 3. 持久化删除if (!_mapper.erase(vname)){default_error("删除虚拟机失败,因为持久化删除失败, 虚拟机名称: %s",vname.c_str());return false;}// 4. 从哈希表中删除_vhmap.erase(vname);}// 4. 清空所有资源vhp->clear();return true;}// 声明/删除交换机bool declareExchange(const std::string &vname, const std::string &ename, ExchangeType etype, bool edurable, bool eauto_delete,const google::protobuf::Map<std::string, std::string> &eargs){std::ostringstream oss;oss << "声明交换机失败,因为虚拟机不存在, 交换机名称: " << ename << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->declareExchange(ename, etype, edurable, eauto_delete, eargs);}bool eraseExchange(const std::string &vname, const std::string &ename){std::ostringstream oss;oss << "删除交换机失败,因为虚拟机不存在, 交换机名称: " << ename << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->eraseExchange(ename);}// 声明/删除队列bool declareMsgQueue(const std::string &vname, const std::string &qname, bool qdurable, bool qexclusive, bool qauto_delete,const google::protobuf::Map<std::string, std::string> &qargs){std::ostringstream oss;oss << "声明队列失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->declareMsgQueue(qname, qdurable, qexclusive, qauto_delete, qargs);}bool eraseMsgQueue(const std::string &vname, const std::string &qname){std::ostringstream oss;oss << "删除队列失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->eraseMsgQueue(qname);}// 绑定/解绑队列bool bind(const std::string &vname, const std::string &ename, const std::string &qname, const std::string &binding_key){std::ostringstream oss;oss << "绑定队列失败,因为虚拟机不存在, 队列名称: " << qname << ", 交换机名称: " << ename << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->bind(ename, qname, binding_key);}bool unBind(const std::string &vname, const std::string &ename, const std::string &qname){std::ostringstream oss;oss << "解绑队列失败,因为虚拟机不存在, 队列名称: " << qname << ", 交换机名称: " << ename << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->unBind(ename, qname);}// 发布/确认消息bool basicPublish(const std::string &vname, const std::string &qname, const BasicProperities *bp, const std::string &body){std::ostringstream oss;oss << "发布消息失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->basicPublish(qname, bp, body);}bool basicAck(const std::string &vname, const std::string &qname, const std::string &msg_id){std::ostringstream oss;oss << "确认消息失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->basicAck(qname, msg_id);}// 推送[消费]消息MessagePtr basicConsume(const std::string &vname, const std::string &qname){std::ostringstream oss;oss << "确认消息失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return MessagePtr();}return vhp->basicConsume(qname);}// 获取指定交换机的所有绑定信息MsgQueueBindingMap getAllBindingsByExchange(const std::string &vname, const std::string &ename){std::ostringstream oss;oss << "获取指定交换机的所有绑定信息失败,因为虚拟机不存在, 交换机名称: " << ename << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return MsgQueueBindingMap();}return vhp->getAllBindingsByExchange(ename);}// 获取所有队列MsgQueueMap getAllMsgQueue(const std::string &vname){std::ostringstream oss;oss << "获取所有队列失败,因为虚拟机不存在 , 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return MsgQueueMap();}return vhp->getAllMsgQueue();}// 获取指定交换机/队列Exchange::ptr getExchange(const std::string &vname, const std::string &ename){std::ostringstream oss;oss << "获取指定交换机失败,因为虚拟机不存在, 交换机名称: " << ename << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return Exchange::ptr();}return vhp->getExchange(ename);}MsgQueue::ptr getMsgQueue(const std::string &vname, const std::string &qname){std::ostringstream oss;oss << "获取指定队列失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return MsgQueue::ptr();}return vhp->getMsgQueue(qname);}// 判断指定交换机/队列是否存在bool existsExchange(const std::string &vname, const std::string &ename){std::ostringstream oss;oss << "获取指定队列失败,因为虚拟机不存在, 交换机名称: " << ename << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->existsExchange(ename);}bool existMsgQueue(const std::string &vname, const std::string &qname){std::ostringstream oss;oss << "获取指定队列失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->existMsgQueue(qname);}//清空/销毁所有虚拟机void clear(){std::unique_lock<std::mutex> ulock(_mutex);for (auto &kv : _vhmap){kv.second->clear();}_mapper.dropTable();_vhmap.clear();}VirtualHost::ptr getVirtualHost(const std::string &vname, const std::ostringstream &oss){// 加锁,查找std::unique_lock<std::mutex> ulock(_mutex);auto iter = _vhmap.find(vname);if (iter == _vhmap.end()){default_error("%s",oss.str().c_str());return VirtualHost::ptr();}return iter->second;}VirtualHostMap getAllVirtualHost(){std::unique_lock<std::mutex> ulock(_mutex);return _vhmap;}private:std::mutex _mutex;VirtualHostMap _vhmap;VirtualHostMapper _mapper;
};

4.测试

#include "../mqserver/virtual_host.hpp"#include <gtest/gtest.h>
#include <vector>
using namespace ns_mq;VirtualHostManager::ptr vhmp;std::vector<std::string> uuid_vec = {UUIDHelper::uuid(), UUIDHelper::uuid(), UUIDHelper::uuid(), UUIDHelper::uuid(), UUIDHelper::uuid()};class VirtualHostTest : public testing::Environment
{
public:virtual void SetUp(){vhmp = std::make_shared<VirtualHostManager>(); // 使用那个main.db即可}virtual void TearDown(){vhmp->clear();}
};TEST(vhost_test, recovery_test)
{ASSERT_NE(vhmp->getExchange("vhost1","exchange1").get(),nullptr);ASSERT_NE(vhmp->getExchange("vhost1","exchange2").get(),nullptr);ASSERT_NE(vhmp->getMsgQueue("vhost1","queue1").get(),nullptr);
}TEST(vhost_test, insert_test)
{vhmp->declareVirtualHost("vhost1", "host1.db", "./host1");vhmp->declareVirtualHost("vhost2", "host2.db", "./host2");vhmp->declareVirtualHost("vhost3", "host3.db", "./host3");vhmp->declareVirtualHost("vhost4", "host4.db", "./host4");ASSERT_EQ(vhmp->exists("vhost1"), true);ASSERT_EQ(vhmp->exists("vhost2"), true);ASSERT_EQ(vhmp->exists("vhost3"), true);ASSERT_EQ(vhmp->exists("vhost4"), true);
}TEST(vhost_test, exchange_test)
{vhmp->declareExchange("vhost1", "exchange1", DIRECT, true, false, {});vhmp->declareExchange("vhost1", "exchange2", TOPIC, true, false, {});vhmp->declareExchange("vhost1", "exchange3", FANOUT, true, false, {});ASSERT_NE(vhmp->getExchange("vhost1", "exchange1").get(), nullptr);ASSERT_NE(vhmp->getExchange("vhost1", "exchange2").get(), nullptr);ASSERT_NE(vhmp->getExchange("vhost1", "exchange3").get(), nullptr);vhmp->eraseExchange("vhost1", "exchange3");ASSERT_NE(vhmp->getExchange("vhost1", "exchange1").get(), nullptr);ASSERT_NE(vhmp->getExchange("vhost1", "exchange2").get(), nullptr);ASSERT_EQ(vhmp->getExchange("vhost1", "exchange3").get(), nullptr);
}TEST(vhost_test, queue_test)
{vhmp->declareMsgQueue("vhost1", "queue1", true, false, false, {});vhmp->declareMsgQueue("vhost1", "queue2", true, false, false, {});vhmp->declareMsgQueue("vhost1", "queue3", true, false, false, {});ASSERT_NE(vhmp->getMsgQueue("vhost1", "queue1").get(), nullptr);ASSERT_NE(vhmp->getMsgQueue("vhost1", "queue2").get(), nullptr);ASSERT_NE(vhmp->getMsgQueue("vhost1", "queue3").get(), nullptr);vhmp->eraseMsgQueue("vhost1", "queue3");ASSERT_NE(vhmp->getMsgQueue("vhost1", "queue1").get(), nullptr);ASSERT_NE(vhmp->getMsgQueue("vhost1", "queue2").get(), nullptr);ASSERT_EQ(vhmp->getMsgQueue("vhost1", "queue3").get(), nullptr);
}TEST(vhost_test, bind_test)
{vhmp->bind("vhost1", "exchange1", "queue1", "news.music.pop");vhmp->bind("vhost1", "exchange1", "queue2", "news.music.pop");vhmp->bind("vhost1", "exchange2", "queue1", "news.music.pop");vhmp->bind("vhost1", "exchange2", "queue2", "news.music.pop");MsgQueueBindingMap emap = vhmp->getAllBindingsByExchange("vhost1", "exchange1");ASSERT_EQ(emap.size(), 2);vhmp->eraseMsgQueue("vhost1", "queue2");emap = vhmp->getAllBindingsByExchange("vhost1", "exchange1");ASSERT_EQ(emap.size(), 1);
}TEST(vhost_test, message_test)
{
//1. 发布消息测试BasicProperities bp;bp.set_msg_id(uuid_vec[0]);bp.set_mode(DURABLE);bp.set_routing_key("news.music.#");vhmp->basicPublish("vhost1", "queue1", &bp, std::string("Hello-1"));bp.set_msg_id(uuid_vec[1]);vhmp->basicPublish("vhost1", "queue1", &bp, std::string("Hello-2"));bp.set_msg_id(uuid_vec[2]);vhmp->basicPublish("vhost1", "queue1", &bp, std::string("Hello-3"));bp.set_msg_id(uuid_vec[3]);vhmp->basicPublish("vhost1", "queue1", &bp, std::string("Hello-4"));bp.set_msg_id(uuid_vec[4]);vhmp->basicPublish("vhost1", "queue1", &bp, std::string("Hello-5"));//2. 消费消息测试MessagePtr mp= vhmp->basicConsume("vhost1","queue1");ASSERT_NE(mp.get(),nullptr);ASSERT_EQ(mp->valid().body(),std::string("Hello-1"));mp= vhmp->basicConsume("vhost1","queue1");ASSERT_NE(mp.get(),nullptr);ASSERT_EQ(mp->valid().body(),std::string("Hello-2"));mp= vhmp->basicConsume("vhost1","queue1");ASSERT_NE(mp.get(),nullptr);ASSERT_EQ(mp->valid().body(),std::string("Hello-3"));mp= vhmp->basicConsume("vhost1","queue1");ASSERT_NE(mp.get(),nullptr);ASSERT_EQ(mp->valid().body(),std::string("Hello-4"));//3. 确认消息测试ASSERT_EQ(vhmp->basicAck("vhost1","queue1",uuid_vec[0]),true);ASSERT_EQ(vhmp->basicAck("vhost1","queue1",uuid_vec[1]),true);ASSERT_EQ(vhmp->basicAck("vhost1","queue1",uuid_vec[2]),true);}int main(int argc, char *argv[])
{testing::AddGlobalTestEnvironment(new VirtualHostTest);testing::InitGoogleTest(&argc, argv);return RUN_ALL_TESTS();
}

五、路由匹配模块

现在我们离具体业务的实现就差一个模块了,就是如何通过交换机类型,binding_key,routing_key来进行消息往队列的投递

这个任务是一个单独的解耦任务,因此我们提取为路由匹配模块

这个模块没有成员,类似于一个工具类,只不过它是独属于我们的项目的

其实就是一个通配符匹配问题,【模式串和匹配串的匹配问题】,经典的一个动态规划问题

1.routing_key和binding_key的格式介绍

在这里插入图片描述

2.动规分析

假设我们要匹配的两个字符串是这样的:
routing_key: aaa.ddd.aaa.bbb.eee.ddd
binding_key: aaa.#.bbb.*.ddd

1.状态定义与表示

我们可以很明显的感受到,整个动规的状态取决于两个变量:
i,j【routing_key的前i个单词能否跟binding_key的前j个单词相匹配】
如果写成递归式的函数签名:

bool _ismatch(const std::string& routing_key,const std::string& binding_key,int i,int j);

因此我们的表格是二维的:
dp[i][j]:routing_key的前i个单词能否跟binding_key的前j个单词相匹配

2.状态转移方程

因为binding_key当中的单词分为三种:
纯单词,*,#
所以我们需要分三类进行讨论:

1.纯单词

纯单词就简单了:
若routing_key的第i个单词和binding_key的第j个单词相匹配,则:
routing_key的前i个单词是否跟binding_key的前j个单词相匹配就等价于:
routing_key的前i-1个单词是否跟binding_key的前j-1个单词相匹配

//返回routing_key的第i个单词是否跟binding_key的第j个单词相匹配
bool _ismatch(const std::string& routing_key,const std::string& binding_key,int i,int j);if(_ismatch(routing_key,binding_key,i,j))
{dp[i][j]=dp[i-1][j-1];
}
else dp[i][j]=false;
2.*

因为*匹配任意一个单词,所以只要跟*进行匹配,则必定成功,因此这种情况比第一种殊途同归,因此在_ismatch函数当中匹配时直接if特判一下即可,所以代码不用改

if(_ismatch(routing_key,binding_key,i,j))
{dp[i][j]=dp[i-1][j-1];
}
else dp[i][j]=false;
3.#

因为#可以匹配0个或者任意多个单词:
而匹配n个单词==匹配1个单词+匹配n-1个单词【这个继续匹配即可】
【因此匹配数是可递归拆分的】,所以我们只需要分为三种情况进行讨论即可:

  1. 匹配0个单词
  2. 匹配1个单词
  3. 匹配1个单词后继续匹配【即:匹配多个单词】
    【用到了类似于完全背包的拆解方法】
1.#匹配0个单词
dp[i][j]:routing_key的前i个单词能否跟binding_key的前j个单词相匹配而如今#匹配0个单词,所以routing_key的前i个单词就要跟binding_key的前j个单词相匹配
dp[i][j]=dp[i][j-1] : 继承自上方
2.#匹配1个单词

跟*一样

dp[i][j]=dp[i-1][j-1] :继承自左上方
3.#匹配1个单词后继续匹配
dp[i][j]:routing_key的前i个单词能否跟binding_key的前j个单词相匹配而如今#匹配1个单词后继续匹配,所以routing_key的前i-1个单词就要跟binding_key的前j个单词相匹配
dp[i][j]=dp[i-1][j] : 继承自左方

3.dp数组初始化

dp[i][j]:routing_key的前i个单词能否跟binding_key的前j个单词相匹配
所以:

  1. dp数组先整体初始化为false:
vector<vector<bool>> dp(行数+1,vector<bool>(列数+1false));
  1. dp[0][0]:两者都为空,true
  2. 第一行dp[0][j]当中:routing_key为空,而binding_key不为空
    【只要binding_key连续单词为#,则匹配是必定成功的】
for(int j=1;j<=列数;j++)
{if(binding_key的第j个单词 != "#")break;else dp[0][j]=true;
}
  1. 第一列dp[i][0]当中:routing_key不为空,而binding_key为空
    【一定是false】

4.填表顺序

因为每个格子都只会依赖于左,上和左上,因此从上往下,从左往右填表

for(int i=1;i<=行数;i++)
{for(int j=1;j<=列数;j++){//书写状态转移方程}
}

5.返回值

dp[行数][列数]:整个routing_key能否跟整个binding_key相匹配

3.动规代码

但是我们还需要能够拿到routing_key和binding_key的第x个单词
如果每次都现求,那代码既不优雅,效率也低
因此可以用StringHelper::split
【注意:存在下标偏移:即:routing_key的第i个单词其实是routing_vec[i-1]】
该分析的都分析完了,下面直接上代码:

// routing_key : 匹配串   binding_key:模式串
static bool ismatch(const std::string &routing_key, const std::string &binding_key)
{// 1. 分割字符串,将每个单词放到vec当中std::vector<std::string> routing_vec, binding_vec;StringHelper::split(routing_key, ".", &routing_vec);StringHelper::split(binding_key, ".", &binding_vec);// 2. 动规开始// 2.1. dp数组定义int row = routing_vec.size(), col = binding_vec.size();std::vector<std::vector<bool>> dp(row + 1, std::vector<bool>(col + 1, false));// 2.2. dp数组初始化dp[0][0] = true;for (int j = 1; j <= col; j++){if (binding_vec[j - 1] != "#")break;dp[0][j] = true;}// 2.3 填表+状态转移for (int i = 1; i <= row; i++){for (int j = 1; j <= col; j++){// 第1种或者第2种情况if (binding_vec[j - 1] == "*" || routing_vec[i - 1] == binding_vec[j - 1]){dp[i][j] = dp[i - 1][j - 1];}// 第3种情况else if (binding_vec[j - 1] == "#"){// dp[i][j-1]    :   匹配0个// dp[i-1][j-1]  :   匹配1个// dp[i-1][j]    :   匹配多个dp[i][j] = dp[i][j - 1] || dp[i - 1][j - 1] || dp[i - 1][j];}}}// 2.4 返回值return dp[row][col];
}

4.模块整体代码

1.路由匹配代码

我们需要注意的是:
上面动规的路由匹配仅仅使用于交换机类型是TOPIC【主题交换】时
而其余的DIRECT【直接交换】和FANOUT【广播交换】并不适用,因此路由匹配模块并未完全完成,其实也很简单:

static bool route(const std::string &routing_key, const std::string &binding_key, ns_proto::ExchangeType type)
{if (type == ns_proto::ExchangeType::DIRECT){return routing_key == binding_key;}else if (type == ns_proto::ExchangeType::FANOUT){return true;}else if (type == ns_proto::ExchangeType::TOPIC){return ismatch(routing_key, binding_key);}else if (type == ns_proto::ExchangeType::UNKNOWNTYPE){default_fatal("路由匹配时,交换机类型为:UNKNOWNTYPE");return false;}else{default_fatal("路由匹配时,交换机出现未知类型");// 出现未知类型return false;}
}

2.判断routing_key合法性

routing_key只能由数字,字母,下划线,点组成:

// routing_key只能由数字,字母,下划线,点组成
static bool check_routing_key(const std::string &routing_key)
{for (auto &ch : routing_key){if ((ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z') || (ch == '_') || (ch=='.'))continue;elsereturn false;}return true;
}

3.判断binding_key合法性

// binding_key:
//  1. 只能由数字,字母,下划线,点,*,#组成
//  2. 通配符必须自成单词
//  3. *通配符两边不能有任何通配符
static bool check_binding_key(const std::string &binding_key)
{//  0. 只能由数字,字母,下划线,点,*,#组成for (auto &ch : binding_key){if ((ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z') || (ch == '_') || (ch == '*') || (ch == '#') || (ch=='.'))continue;elsereturn false;}// 1. 因为要涉及到对单词进行操作,所以先分割单词std::vector<std::string> vec;StringHelper::split(binding_key, ".", &vec);//  2. 通配符必须自成单词for (auto &str : vec){if (str.size() > 1 && (str.find("*") != std::string::npos || str.find("#") != std::string::npos)){return false;}}//  3. *通配符两边不能有任何通配符for (int i = 1; i < vec.size(); i++){if (vec[i] == "#" && vec[i - 1] == "#")return false;else if (vec[i] == "#" && vec[i - 1] == "*")return false;else if (vec[i] == "*" && vec[i - 1] == "#")return false;}return true;
}

4.完整代码

using namespace ns_helper;
class Router
{
public:// routing_key只能由数字,字母,下划线,点组成static bool check_routing_key(const std::string &routing_key){for (auto &ch : routing_key){if ((ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z') || (ch == '_') || (ch=='.'))continue;elsereturn false;}return true;}// binding_key://  1. 只能由数字,字母,下划线,点,*,#组成//  2. 通配符必须自成单词//  3. *通配符两边不能有任何通配符static bool check_binding_key(const std::string &binding_key){//  0. 只能由数字,字母,下划线,点,*,#组成for (auto &ch : binding_key){if ((ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z') || (ch == '_') || (ch == '*') || (ch == '#') || (ch=='.'))continue;elsereturn false;}// 1. 因为要涉及到对单词进行操作,所以先分割单词std::vector<std::string> vec;StringHelper::split(binding_key, ".", &vec);//  2. 通配符必须自成单词for (auto &str : vec){if (str.size() > 1 && (str.find("*") != std::string::npos || str.find("#") != std::string::npos)){return false;}}//  3. *通配符两边不能有任何通配符for (int i = 1; i < vec.size(); i++){if (vec[i] == "#" && vec[i - 1] == "#")return false;else if (vec[i] == "#" && vec[i - 1] == "*")return false;else if (vec[i] == "*" && vec[i - 1] == "#")return false;}return true;}static bool route(const std::string &routing_key, const std::string &binding_key, ns_proto::ExchangeType type){if (!check_routing_key(routing_key)){default_warning("路由匹配时:routing_key不合法,routing_key:%s",routing_key.c_str());return false;}if (!check_binding_key(binding_key)){default_warning("路由匹配时:binding_key不合法,binding_key:%s",binding_key.c_str());return false;}if (type == ns_proto::ExchangeType::DIRECT){return routing_key == binding_key;}else if (type == ns_proto::ExchangeType::FANOUT){return true;}else if (type == ns_proto::ExchangeType::TOPIC){return ismatch(routing_key, binding_key);}else if (type == ns_proto::ExchangeType::UNKNOWNTYPE){default_fatal("路由匹配时,交换机类型为:UNKNOWNTYPE");return false;}else{default_fatal("路由匹配时,交换机出现未知类型");// 出现未知类型return false;}}private:// routing_key : 匹配串   binding_key:模式串static bool ismatch(const std::string &routing_key, const std::string &binding_key){// 1. 分割字符串,将每个单词放到vec当中std::vector<std::string> routing_vec, binding_vec;StringHelper::split(routing_key, ".", &routing_vec);StringHelper::split(binding_key, ".", &binding_vec);// 2. 动规开始// 2.1. dp数组定义int row = routing_vec.size(), col = binding_vec.size();std::vector<std::vector<bool>> dp(row + 1, std::vector<bool>(col + 1, false));// 2.2. dp数组初始化dp[0][0] = true;for (int j = 1; j <= col; j++){if (binding_vec[j - 1] != "#")break;dp[0][j] = true;}// 2.3 填表+状态转移for (int i = 1; i <= row; i++){for (int j = 1; j <= col; j++){// 第1种或者第2种情况if (binding_vec[j - 1] == "*" || routing_vec[i - 1] == binding_vec[j - 1]){dp[i][j] = dp[i - 1][j - 1];}// 第3种情况else if (binding_vec[j - 1] == "#"){// dp[i][j-1]    :   匹配0个// dp[i-1][j-1]  :   匹配1个// dp[i-1][j]    :   匹配多个dp[i][j] = dp[i][j - 1] || dp[i - 1][j - 1] || dp[i - 1][j];}}}// 2.4 返回值return dp[row][col];}
};

5.测试

#include "../mqserver/routing.hpp"
#include <gtest/gtest.h>using namespace ns_mq;class RouteTest : public testing::Environment
{
public:virtual void SetUp() {}virtual void TearDown() {}
};TEST(route_test, legal_test)
{ASSERT_EQ(Router::check_routing_key("news...music.^%"), false);ASSERT_EQ(Router::check_routing_key("news...music/@$."), false);ASSERT_EQ(Router::check_binding_key("news...music.#"), true);ASSERT_EQ(Router::check_binding_key("news...music.a#"), false);ASSERT_EQ(Router::check_binding_key("news.#.#"), false);ASSERT_EQ(Router::check_binding_key("news.*.#"), false);ASSERT_EQ(Router::check_binding_key("news.*.*"), true);
}TEST(route_test, route)
{// [测试⽤例]// binding key         routing key              result// aaa                  aaa                     true// aaa.bbb              aaa.bbb                 true// aaa.bbb              aaa.bbb.ccc             false// aaa.bbb              aaa.ccc                 false// aaa.#.bbb            aaa.bbb.ccc             false// aaa.bbb.#            aaa.ccc.bbb             false// #.bbb.ccc            aaa.bbb.ccc.ddd         false// aaa.bbb.ccc          aaa.bbb.ccc             true// aaa.*                aaa.bbb                 true// aaa.*.bbb            aaa.bbb.ccc             false// *.aaa.bbb            aaa.bbb                 false// #                    aaa.bbb.ccc             true// aaa.#                aaa.bbb                 true// aaa.#                aaa.bbb.ccc             true// aaa.#.ccc            aaa.ccc                 true// aaa.#.ccc            aaa.bbb.ccc             true// aaa.#.ccc            aaa.aaa.bbb.ccc         true// #.ccc                ccc                     true// #.ccc                aaa.bbb.ccc             true// aaa.#.ccc.ccc        aaa.bbb.ccc.ccc.ccc     true// aaa.#.bbb.*.bbb      aaa.ddd.ccc.bbb.eee.bbb truestd::vector<std::string> binding_v = {"aaa","aaa.bbb","aaa.bbb","aaa.bbb","aaa.#.bbb","aaa.bbb.#","#.bbb.ccc","aaa.bbb.ccc","aaa.*","aaa.*.bbb","*.aaa.bbb","#","aaa.#","aaa.#","aaa.#.ccc","aaa.#.ccc","aaa.#.ccc","#.ccc","#.ccc","aaa.#.ccc.ccc","aaa.#.bbb.*.bbb"};std::vector<std::string> routing_v = {"aaa","aaa.bbb","aaa.bbb.ccc","aaa.ccc","aaa.bbb.ccc","aaa.ccc.bbb","aaa.bbb.ccc.ddd","aaa.bbb.ccc","aaa.bbb","aaa.bbb.ccc","aaa.bbb","aaa.bbb.ccc","aaa.bbb","aaa.bbb.ccc","aaa.ccc","aaa.bbb.ccc","aaa.aaa.bbb.ccc","ccc","aaa.bbb.ccc","aaa.bbb.ccc.ccc.ccc","aaa.ddd.ccc.bbb.eee.bbb"};std::vector<bool> ret_v = {true,true,false,false,false,false,false,true,true,false,false,true,true,true,true,true,true,true,true,true,true};for (int i = 0; i < ret_v.size(); i++){ASSERT_EQ(Router::route(routing_v[i], binding_v[i], ns_proto::ExchangeType::TOPIC), ret_v[i]);}
}int main(int argc, char *argv[])
{testing::AddGlobalTestEnvironment(new RouteTest);testing::InitGoogleTest(&argc, argv);return RUN_ALL_TESTS();
}

以上就是项目第六弹:虚拟机管理模块、路由匹配模块的全部内容

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1541732.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

NoSql数据库Redis知识点

数据库的分类 关系型数据库 &#xff0c;是建立在关系模型基础上的数据库&#xff0c;其借助于集合代数等数学概念和方法来处理数据库 中的数据主流的 MySQL 、 Oracle 、 MS SQL Server 和 DB2 都属于这类传统数据库。 NoSQL 数据库 &#xff0c;全称为 Not Only SQL &a…

Python学习——【4.1】数据容器:list列表

文章目录 【4.1】数据容器&#xff1a;list列表一、数据容器入门二、数据容器&#xff1a;list 列表&#xff08;一&#xff09;列表的定义&#xff08;二&#xff09;列表的下标索引&#xff08;三&#xff09;列表的常用操作&#xff08;1&#xff09;列表的查询功能&#xf…

RAG+Agent人工智能平台:RAGflow实现GraphRA知识库问答,打造极致多模态问答与AI编排流体验

1.RAGflow简介 全面优化的 RAG 工作流可以支持从个人应用乃至超大型企业的各类生态系统。大语言模型 LLM 以及向量模型均支持配置。基于多路召回、融合重排序。提供易用的 API&#xff0c;可以轻松集成到各类企业系统。支持丰富的文件类型&#xff0c;包括 Word 文档、PPT、exc…

Vue3入门 - ElementPlus中左侧菜单和Tabs菜单组合联动效果

在Vue3中&#xff0c;ElementPlus是使用比较广泛的UI组件库&#xff0c;提供了丰富的界面元素支持项目开发需求。在后台管理系统中&#xff0c;左侧或顶部的菜单栏通常包含多个子菜单项&#xff0c;通过菜单的展开和收缩功能&#xff0c;用户可以方便地查看或隐藏不需要的菜单项…

数字世界中的浪漫:揭秘会跳动的爱心

在编程的世界里&#xff0c;复杂的技术可以与艺术产生美妙的碰撞。无论是通过代码实现动态效果&#xff0c;还是用算法绘制图案&#xff0c;程序员都可以成为数字艺术的创作者。而今天&#xff0c;我们将通过 Python 的强大 GUI 工具库 Tkinter&#xff0c;用简单的代码生成一颗…

U-Boot顶层Makefile详解

直接参考【正点原子】I.MX6U嵌入式Linux驱动开发指南V1.81 本文仅作为个人笔记使用&#xff0c;方便进一步记录自己的实践总结。 上一章我们详细的讲解了 uboot 的使用方法&#xff0c;其实就是各种命令的使用&#xff0c;学会 uboot 使用以后就可以尝试移植 uboot 到自己的开发…

linux操作系统的基本命令

1.linux下的文件系统 在linux操作目录下没有像window操作系统下盘符的概念,只有一个根目录/,所有文件目录都在它的下面 linux的目录结构: 在Linux系统中: 文件都从跟目录开始的,用/表示文件名称区分大小写路径都是以/俩进行分隔(windown用\分隔)以.开头的文件为隐藏文件 Li…

鸿蒙开发之ArkUI 界面篇 十七 购物综合案例

layoutWeight:子元素与兄弟元素主轴方向按照权重进行分配,参数是联合类型&#xff0c;数字或者是字符串&#xff0c;在指定的空间占多少份额&#xff0c;数字越大&#xff0c;表示在空间中占用的份额越多&#xff0c;如果父容器的子组件没有别的指定&#xff0c;剩下的空间全部…

10分钟一条童装走秀爆款Ai视频,小白轻松上手,新蓝海赛道,竞争小机会多!

今天我要给大家带来一个超级有趣的项目——童装走秀AI视频制作。 这不仅是一个充满创意的项目&#xff0c;而且操作简单&#xff0c;即使是视频制作的新手也能轻松上手。 更重要的是&#xff0c;这个项目竞争小&#xff0c;变现机会多&#xff0c;是进入新蓝海赛道的绝佳机会…

C++类之set与get理解

在类中&#xff0c;我们尝尝将一些变量设置为private或者protect里面&#xff0c;而我们经常会遇到在主函数&#xff08;main.cpp&#xff09;使用到这些private变量&#xff0c;而往往我们会下意识地在主函数直接调用在private里面的变量&#xff0c;但现实比较残酷&#xff0…

【linux】nice命令

Linux中的nice命令是一个强大的工具&#xff0c;用于调整进程的优先级&#xff0c;进而影响它们在CPU上的资源分配和执行顺序。以下是关于nice命令的详细解释&#xff0c;包括其用途、语法、参数、示例以及使用建议。 一、用途 nice命令主要用于控制进程在CPU上的调度优先级&…

K8S介绍+集群部署

Kubernetes介绍 官网&#xff1a;https://kubernetes.io/ 一、应用部署方式演变 1、传统部署&#xff1a;互联网早期&#xff0c;会直接将应用程序部署在物理机上 优点&#xff1a;简单&#xff0c;不需要其他技术的参与 缺点&#xff1a;不能为应用程序定义资源使用边界&a…

【论文笔记】BEVNeXt: Reviving Dense BEV Frameworks for 3D Object Detection

原文链接&#xff1a;https://arxiv.org/pdf/2312.01696 简介&#xff1a;最近&#xff0c;在摄像头3D目标检测任务中&#xff0c;基于查询的Transformer解码器正在超越传统密集BEV方法。但密集BEV框架有着更好的深度估计和目标定位能力&#xff0c;能全面精确地描绘3D场景。本…

初始网络编程(下)

所属专栏&#xff1a;Java学习 1. TCP 的简单示例 同时&#xff0c;由于 TCP 是面向字节流的传输&#xff0c;所以说传输的基本单位是字节&#xff0c;接受发送都是使用的字节流 方法签名 方法说明 Socket accept() 开始监听指定端口&#xff08;创建时绑定的端口&…

Navicat导入Sql文件至Mysql数据库,事务失效

Mysql 版本&#xff1a;8.0.39 Navicat 版本&#xff1a;17.x、16.x 结论&#xff1a; Navicat 导入sql文件&#xff0c;事务不会生效&#xff0c;无论怎么设置 mysql.exe 导入sql文件&#xff0c;事务生效 测试 准备一张表 name约束不能为空&#xff0c;用于测试事务失败…

SpringBoot 整合 Caffeine 实现本地缓存

目录 1、Caffeine 简介1.1、Caffeine 简介1.2、对比 Guava cache 的性能主要优化项1.3、常见的缓存淘汰算法1.4、SpringBoot 集成 Caffeine 两种方式 2、SpringBoot 集成 Caffeine 方式一2.1、缓存加载策略2.1.1、手动加载2.1.2、自动加载【Loading Cache】2.1.3、异步加载【As…

7、论等保的必要性

数据来源&#xff1a;7.论等保的必要性_哔哩哔哩_bilibili 等级保护必要性 降低信息安全风险 等级保护旨在降低信息安全风险&#xff0c;提高信息系统的安全防护能力。 风险发现与整改 开展等级保护的最重要原因是通过测评工作&#xff0c;发现单位系统内外部的安全风险和脆弱…

Linux启动流程,0,1,2进程,init进程,idle进程,内核态到用户态的kernel_execve(一)

&#xff1f;是&#xff0c;如果定义了&#xff0c;就按Makefile的&#xff0c;如果如下make编译时&#xff0c;就按如下 linux内核入口 进程0在用户空间看不到&#xff0c;因为他是内核进程 进程2就是守护进程&#xff0c;维护内涵运转的 一生二&#xff0c;二生三&#xff…

Navicate 链接Oracle 提示 Oracle Library is not loaded ,账号密码都正确地址端口也对

Navicate 链接Oracle 提示 Oracle Library is not loaded ,账号密码都正确地址端口也对的问题 解决办法 出现 Oracle Library is not loaded 错误提示&#xff0c;通常是因为 Navicat 无法找到或加载 Oracle 客户端库&#xff08;OCI.dll&#xff09;。要解决这个问题&#x…

IntelliJ IDEA 2024.1.4 (Ultimate Edition)找不到Add Framework Support解决方法

目录 背景: 解决方法&#xff1a; 步骤1: 步骤2&#xff1a; 步骤3&#xff1a; 创建Web项目的完整流程&#xff1a; 步骤1: 步骤2: 步骤3&#xff1a; 步骤4&#xff1a; Web优点: 背景: 我的IDE版本是IntelliJ IDEA 2024.1.4 (Ultimate Edition)&#xff0c;当我…