项目第六弹:虚拟机管理模块、路由匹配模块
- 一、虚拟机管理模块的设计
- 1.什么是虚拟机?
- 2.借助MySQL来理解一下
- 3.如何整合?【埋下伏笔】
- 二、RabbitMQ为何要有虚拟机
- 1.从业务角度来讲
- 2.步步探索
- 1.优点
- 2.结合业务适用场景和需求
- 3.发掘真正的原因
- 4.反向验证
- 1.物理隔离
- 2.逻辑隔离
- 3.根本原因
- 三、虚拟机的模块设计
- 1.虚拟机持久化模块设计
- 1.RabbitMQ是如何做的
- 2.如何设计
- 1.如何保证持久化的交换机、队列、绑定信息、消息恢复之后仍然隶属于对应的虚拟机
- 2.如何组织管理实现虚拟机的“默认持久化”
- 3.设计
- 2.虚拟机类
- 3.虚拟机管理模块
- 四、代码环节
- 1.虚拟机模块
- 0.分析是否需要加锁
- 1.构造
- 1.有一个问题,怎么解决?
- 2.构造
- 2.声明/删除交换机
- 3.声明/删除队列
- 4.绑定/解绑队列
- 5.发布/确认消息
- 6.消费消息
- 7.其他方法
- 8.get接口
- 9.clear
- 9.完整代码
- 2.虚拟机持久化管理模块
- 3.虚拟机管理模块
- 4.测试
- 五、路由匹配模块
- 1.routing_key和binding_key的格式介绍
- 2.动规分析
- 1.状态定义与表示
- 2.状态转移方程
- 1.纯单词
- 2.*
- 3.#
- 1.#匹配0个单词
- 2.#匹配1个单词
- 3.#匹配1个单词后继续匹配
- 3.dp数组初始化
- 4.填表顺序
- 5.返回值
- 3.动规代码
- 4.模块整体代码
- 1.路由匹配代码
- 2.判断routing_key合法性
- 3.判断binding_key合法性
- 4.完整代码
- 5.测试
关于虚拟机用户管理方面的功能,我们基础版本暂且先不实现
一、虚拟机管理模块的设计
1.什么是虚拟机?
虚拟机是RabbitMQ当中,整合交换机管理模块,队列管理模块,绑定信息管理模块的一个逻辑集合,是一个中间层
只不过,在我们的项目当中,因为消息是依附于队列才能存在的,所以消息附属于队列,而队列又附属于虚拟机
所以我们的消息管理模块也被纳入了虚拟机管理模块当中,但是AMQP并未规定消息管理模块直接附属于虚拟机
【我们的项目是这样组织的】
回顾一下AMQP协议:
2.借助MySQL来理解一下
我们可以借助MySQL的知识来理解:
先看一段SQL
mysql> create database host1;
Query OK, 1 row affected (0.02 sec)mysql> use host1;
Database changedmysql> create table if not exists exchange_table (-> name varchar(32) primary key,-> type int,-> durable int,-> auto_delete int,-> args varchar(512));
Query OK, 0 rows affected (0.07 sec)mysql> create table if not exists queue_table(-> name varchar(32) primary key,-> durable int,-> exclusive int,-> auto_delete int,-> args varchar(512));
Query OK, 0 rows affected (0.06 sec)mysql> create table if not exists binding_table(-> exchange_name varchar(32),-> queue_name varchar(32),-> binding_key varchar(32),-> durable int);
Query OK, 0 rows affected (0.05 sec)mysql> select table_name from information_schema.tables where table_schema='host1';
+----------------+
| TABLE_NAME |
+----------------+
| binding_table |
| exchange_table |
| queue_table |
+----------------+
3 rows in set (0.00 sec)mysql> insert into exchange_table values ('exchange1',1,1,0,'新闻交换机=wzs');
Query OK, 1 row affected (0.02 sec)mysql> select * from exchange_table;
+-----------+------+---------+-------------+---------------------+
| name | type | durable | auto_delete | args |
+-----------+------+---------+-------------+---------------------+
| exchange1 | 1 | 1 | 0 | 新闻交换机=wzs |
+-----------+------+---------+-------------+---------------------+
1 row in set (0.01 sec)
新闻交换机=wzs:这个交换机是新闻交换机,是wzs创建的
上面没啥难的,就是SQL语句而已
下面我们要说明的是:
数据库《----》虚拟机
交换机表《----》交换机管理模块
交换机表当中的数据《----》交换机
其实这一点我们在使用SQLite3数据库的时候就已经渗透/埋下伏笔了,因此理解起来并不难
因此:
我们将MessageManager和QueueMessageManager结合在一起来看
便能抽离出一个消息表来:
因此“消息表”:
指定目录【在应用层控制该字段的唯一性,就一个if而已】
队列名
有效载荷:属性字段:消息ID投递模式routing_key消息主体有效标记位
偏移量和长度mysql> create table if not exists message_table(-> base_dir varchar(32) default './queue_message',-> queue_name varchar(32),-> msg_id varchar(32) primary key,-> mode int,-> routing_key varchar(32),-> body varchar(512),-> valid int,-> offset int,-> len int);
Query OK, 0 rows affected (0.05 sec)mysql> desc message_table;
+-------------+--------------+------+-----+-----------------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------+--------------+------+-----+-----------------+-------+
| base_dir | varchar(32) | YES | | ./queue_message | |
| queue_name | varchar(32) | YES | | NULL | |
| msg_id | varchar(32) | NO | PRI | NULL | |
| mode | int | YES | | NULL | |
| routing_key | varchar(32) | YES | | NULL | |
| body | varchar(512) | YES | | NULL | |
| valid | int | YES | | NULL | |
| offset | int | YES | | NULL | |
| len | int | YES | | NULL | |
+-------------+--------------+------+-----+-----------------+-------+
9 rows in set (0.00 sec)当然,也可以在创建表之后显式修改表结构,添加默认值
alter table message_table modify column base_dir varchar(32) default './queue_message';
在应用层写SQL插入时不插入base_dir即可
它们是关联的,所以我们可以借助表的关系来理解如何整合
3.如何整合?【埋下伏笔】
二、RabbitMQ为何要有虚拟机
1.从业务角度来讲
我们的消息队列服务器的本质其实就是一个生产者消费者模型,其业务处理的核心就是我们的虚拟机
所以才需要有虚拟机
但是这么来理解,太肤浅了吧。我们需要从代码和业务出发,但它们只是为了让我们更好的去理解我们的业务而已
但是核心落脚点一定不是业务处理本身,要想知道为什么
最好的方式是先从优劣点出发,然后结合业务的适用场景和需求来审视这些优点
从中发掘出真正的原因
然后反向回溯验证,能够成功完全的说服自己,大概率就是准确的
2.步步探索
1.优点
如果大家了解过Hadoop或者其他分布式框架,或者云平台,云服务器【它们都是用的云计算技术,而云计算技术用到了分布式框架】的话:
劣势呢?
可以说是没有劣势,最起码对于写代码来说更加方便清晰明了
只不过要想了解它设计背后的意义和价值,是有一定学习成本在的
【不过理解一下它背后的原因和意义,虽然学习成本高,但是非常值得的,甚至是一件好事】
2.结合业务适用场景和需求
- 多租户应用:如果业务需要支持多个独立的用户或租户,且这些租户之间的消息传递需要隔离,那么RabbitMQ的虚拟机特性将非常有用。
- 高安全性要求:对于需要严格控制消息传递权限和访问控制的业务,RabbitMQ的安全特性能够提供强有力的支持。
- 分布式系统和云平台:在分布式系统或云平台环境中,RabbitMQ的可扩展性、灵活性和高可用性特性将大大简化消息传递的复杂性,并提升系统的整体性能。
3.发掘真正的原因
- 特别是在大数据时代,分布式系统和云平台急需一个进行模块间消息传递/异步调用的一个消息中间件
- 来在保证模块间高内聚,低耦合的情况下建立起多个模块之间的紧密联系
这就是消息中间件MQ的最大应用场景与需求,也是虚拟主机诞生的原因
4.反向验证
假设不用虚拟机,能有其他好方法来解决这一问题吗?
1.物理隔离
创建多个RabbitMQ,“稳,准,狠”、但是成本较高,且管理复杂
2.逻辑隔离
-
命名空间/前缀约定:在应用层实现,为不同应用或者团队的消息队列,交换机使用不同命名的前缀或者后缀
但是这种方法需要额外的管理和约定才能保证命名不冲突,且不具备Virtual Host那样的强制性 -
访问控制列表:【权限表】
通过精细的访问控制列表来限制不同用户或应用对RabbitMQ资源的访问。
这可以在一定程度上实现资源的隔离,但相比Virtual Host来说,管理复杂度和出错风险更高
3.根本原因
最重要的是,这些都是RabbitMQ的使用者要动脑子去做的事情,这是用户啊,顾客就是上帝啊
因此RabbitMQ自己在内部加了一层“Virtual Host”,告诉用户,不同的Virtual Host是资源隔离的,这不就OK了吗
所以反向验证成功,只能用Virtual Host
三、虚拟机的模块设计
1.虚拟机持久化模块设计
写到这里的时候,不知道大家是否会有这种想法:
既然交换机,队列,绑定信息,他们都是虚拟机资源的一部分,都被虚拟机整合了
那么虚拟机按理来说也需要持久化字段吧,至少肯定不能不持久化啊
因此:
- 难道虚拟机不持久化,交换机,队列,绑定信息,消息…都没法持久化了?
总感觉用起来体验不是很好
那不妨看一下RabbitMQ是如何做的
1.RabbitMQ是如何做的
在RabbitMQ当中,虚拟机默认就是持久化的,这里的“持久化”是指虚拟机在RabbitMQ服务重启后能够恢复其状态
并没有给虚拟机安排持久化字段
为何这样呢?
是为了提升用户体验,方便使用,使得用户更容易创建并管理其内部资源
因此,我们也这么做
2.如何设计
1.如何保证持久化的交换机、队列、绑定信息、消息恢复之后仍然隶属于对应的虚拟机
消息是依附于队列而存在的,它的消息文件名就已经保证了他一直属于对应的那个队列
到时候创建虚拟机时传入的数据库和消息目录名就是该虚拟机的持久化资源
而且外部访问交换机,队列等等操作都必须经过虚拟机才能完成,这样的话内存级资源也是隔离的
这样就完美实现资源隔离了
2.如何组织管理实现虚拟机的“默认持久化”
给虚拟机创建一张表:
字段:
虚拟机名称(主键索引,不允许重复)
数据库名称(唯一键索引)
消息文件目录名(唯一键索引)
mysql> create table if not exists vhost_table(-> name varchar(32) primary key,-> dbfile varchar(32) unique key not null,-> basedir varchar(32) unique key not null);
Query OK, 0 rows affected (0.07 sec)mysql> desc vhost_table;
+---------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+---------+-------------+------+-----+---------+-------+
| name | varchar(32) | NO | PRI | NULL | |
| dbfile | varchar(32) | NO | UNI | NULL | |
| basedir | varchar(32) | NO | UNI | NULL | |
+---------+-------------+------+-----+---------+-------+
3 rows in set (0.00 sec)
为了组织这些虚拟机,我们把该虚拟机表放到一个单独的数据库当中:main.db,服务启动时自动恢复main.db当中vhost_table当中的所有虚拟机
3.设计
成员:
- SqliteHelper
接口:
- 创建/删除表
- 新增/删除数据
- 查询所有虚拟机【恢复历史数据时要用】
2.虚拟机类
我们之前说了:
虚拟机就是对交换机管理模块,队列管理模块,绑定信息管理模块,消息管理模块的整合
因此成员:
private:std::string _name;//虚拟机名称std::string _dbfile;//数据库名称std::string _basedir;//消息文件目录名ExchangeManager::ptr _emp;//交换机管理模块句柄MsgQueueManager::ptr _mqmp;//队列管理模块句柄BindingManager::ptr _bmp;//绑定信息管理模块句柄MessageManager::ptr _mmp;//消息管理模块句柄
接口:
1、声明/删除交换机
2、声明/删除队列
3、绑定/解绑队列
4、发布/确认消息
5、推送【消费】消息
6、获取指定交换机的所有绑定信息【消息推送时要用】
7、获取所有队列【以后要用,到时候在说原因】
8、清空虚拟机所有资源【析构函数不要调用它哦,否则画蛇添足】
9、获取指定交换机/队列【查】
10、判断指定交换机/队列是否存在【GTest】
11、判断某个绑定信息是否存在【GTest】
3.虚拟机管理模块
成员依旧是:
- <虚拟机名称,虚拟机::ptr> 的一个哈希表
- 虚拟机持久化管理模块句柄
- 互斥锁
接口:增删查
- 创建虚拟机
- 删除虚拟机
- 获取指定虚拟机::ptr
- 把虚拟机提供的接口全都拿过来封一下
四、代码环节
理论与实践相结合,下面开始写代码
1.虚拟机模块
0.分析是否需要加锁
因为那四个模块的管理句柄
ExchangeManager::ptr _emp;
MsgQueueManager::ptr _mqmp;
BindingManager::ptr _bmp;
MessageManager::ptr _mmp;
当中都加锁了,而且这里的临界资源就只有那四个模块的管理句柄
所以这里无需加锁
1.构造
1.有一个问题,怎么解决?
我们传入的dbfile和basedir可能都还没有创建,原本是想在构造函数体内部执行的,但是初始化列表时要创建交换机数据管理模块…
他们内部会恢复对应数据库的历史数据,因此我们创建dbfile和basedir必须要在他们【交换机…】构造函数执行之前
也就是说:
创建文件和目录必须要在初始化列表当中【交换机…】构造之前执行,但是无法直接写到初始化列表当中
此时利用initHelper就可以完美解决这一问题
class VirtualHost
{
private: struct initHelper{initHelper(const std::string &dbfile,const std::string& basedir){// 创建dbfile和basedirFileHelper::createDir(FileHelper::parentDir(dbfile));FileHelper::createDir(FileHelper::parentDir(basedir));FileHelper::createFile(dbfile);}};//xxxxxxxxxxxx
private://类的成员变量按照声明顺序进行初始化initHelper _init_helper;std::string _name;std::string _dbfile;std::string _basedir;ExchangeManager::ptr _emp;MsgQueueManager::ptr _mqmp;BindingManager::ptr _bmp;MessageManager::ptr _mmp;
};
这种方法的一个好处是,它允许我们将前置操作封装在一个单独的类中,从而使主类的构造函数保持简洁
2.构造
构造的时候别忘了初始化所有队列的消息管理句柄
[其实就是恢复所有队列的历史消息]
class VirtualHost
{
public:using ptr = std::shared_ptr<VirtualHost>;VirtualHost(const std::string &name, const std::string &dbfile, const std::string &basedir):_init_helper(dbfile,basedir), _name(name), _dbfile(dbfile), _basedir(basedir), _emp(std::make_shared<ExchangeManager>(_dbfile)), _mqmp(std::make_shared<MsgQueueManager>(_dbfile)), _bmp(std::make_shared<BindingManager>(_dbfile)), _mmp(std::make_shared<MessageManager>(_basedir)){// 初始化所有队列的消息管理句柄MsgQueueMap mqm = _mqmp->getAllMsgQueue();for (auto &kv : mqm){_mmp->initQueueMessageManager(kv.first);}}
private:std::string _name;std::string _dbfile;std::string _basedir;ExchangeManager::ptr _emp;MsgQueueManager::ptr _mqmp;BindingManager::ptr _bmp;MessageManager::ptr _mmp;
};
2.声明/删除交换机
声明交换机直接复用即可
删除交换机之前要删除该交换机的所有绑定信息
// 声明/删除交换机
bool declareExchange(const std::string &ename, ExchangeType etype, bool edurable, bool eauto_delete,const google::protobuf::Map<std::string, std::string> &eargs)
{return _emp->declareExchange(ename, etype, edurable, eauto_delete, eargs);
}bool eraseExchange(const std::string &ename)
{// 删除交换机之前要删除该交换机的所有绑定信息哦_bmp->removeExchangeBindings(ename);return _emp->eraseExchange(ename);
}
3.声明/删除队列
声明队列时初始化该队列的消息管理模块句柄
删除队列时:
- 删除该队列的消息管理模块句柄
- 删除该队列的所有绑定信息
// 声明/删除队列
bool declareMsgQueue(const std::string &qname, bool qdurable, bool qexclusive, bool qauto_delete,const google::protobuf::Map<std::string, std::string> &qargs)
{// 初始化该队列的消息管理模块句柄_mmp->initQueueMessageManager(qname);return _mqmp->declareMsgQueue(qname, qdurable, qexclusive, qauto_delete, qargs);
}bool eraseMsgQueue(const std::string &qname)
{// 删除该队列的所有绑定信息if (!_bmp->removeMsgQueueBindings(qname)){default_error("删除队列失败,因为删除队列的所有绑定信息失败, 队列名: %s",qname.c_str());return false;}// 删除该队列的消息管理模块句柄_mmp->destroyQueueMessageManager(qname);return _mqmp->eraseMsgQueue(qname);
}
4.绑定/解绑队列
绑定队列时,要先查找交换机和队列是否存在
并根据他们的持久化方式来决定绑定信息的持久化方式[联系其低耦合模块之间的桥梁(虚拟机整合的意义)]
// 绑定/解绑队列
bool bind(const std::string &ename, const std::string &qname, const std::string &binding_key)
{// 因为不知道该交换机,队列是否存在// 也不知道他们是否持久化,所以先查找他们Exchange::ptr ep = _emp->getExchange(ename);if (ep.get() == nullptr){default_error("绑定交换机与队列失败,因为该交换机不存在, 交换机名称: %s",ename.c_str());return false;}MsgQueue::ptr mqp = _mqmp->getMsgQueue(qname);if (mqp.get() == nullptr){default_error("绑定交换机与队列失败,因为该队列不存在, 队列名称: %s",qname.c_str());return false;}return _bmp->bind(ename, qname, binding_key, ep->durable && mqp->durable);
}bool unBind(const std::string& ename,const std::string& qname)
{return _bmp->unBind(ename,qname);
}
5.发布/确认消息
发布消息时要先拿到对应的队列,然后根据队列的持久化方式来决定消息是否持久化
确认消息时直接复用即可
// 发布/确认消息
bool basicPublish(const std::string &qname, const BasicProperities *bp, const std::string &body)
{// 在这里能够知道队列的持久化方式,因此就能够传递durable了// 1. 查找该队列的ptr,看是否存在,拿到durable// 2. 发布消息MsgQueue::ptr mqp = _mqmp->getMsgQueue(qname);if (mqp.get() == nullptr){default_error("发布消息失败,因为该队列不存在, 队列名: %s",qname.c_str());return false;}return _mmp->publishMessage(qname, bp, body, (mqp->durable) ? DURABLE : UNDURABLE);
}bool basicAck(const std::string& qname,const std::string& msg_id)
{return _mmp->ackMessage(qname,msg_id);
}
6.消费消息
// 推送[消费]消息
MessagePtr basicConsume(const std::string& qname)
{return _mmp->front(qname);
}
直接复用即可
7.其他方法
后面全是复用就行的,直接给出了
// 获取指定交换机的所有绑定信息
MsgQueueBindingMap getAllBindingsByExchange(const std::string& ename)
{return _bmp->getExchangeBindings(ename);
}// 获取所有队列
MsgQueueMap getAllMsgQueue()
{return _mqmp->getAllMsgQueue();
}// 获取指定交换机/队列
Exchange::ptr getExchange(const std::string& ename)
{return _emp->getExchange(ename);
}MsgQueue::ptr getMsgQueue(const std::string& qname)
{return _mqmp->getMsgQueue(qname);
}// 判断指定交换机/队列是否存在
bool existsExchange(const std::string& ename)
{return _emp->exists(ename);
}bool existMsgQueue(const std::string& qname)
{return _mqmp->exists(qname);
}VirtualHostMap getAllVirtualHost()
{std::unique_lock<std::mutex> ulock(_mutex);return _vhmap;
}
8.get接口
这些get接口在虚拟机持久化管理模块那里要用
std::string get_name() const
{return _name;
}std::string get_dbfile() const
{return _dbfile;
}std::string get_basedir() const
{return _basedir;
}
9.clear
清除该虚拟机的所有资源
void clear()
{_emp->clear();_mqmp->clear();_bmp->clear();_mmp->clear();//删除数据库文件FileHelper::removeFile(_dbfile);//删除目录FileHelper::removeDir(_basedir);_dbfile.clear();_basedir.clear();_name.clear();
}
9.完整代码
class VirtualHost
{
private: struct initHelper{initHelper(const std::string &dbfile,const std::string& basedir){// 创建dbfile和basedirFileHelper::createDir(FileHelper::parentDir(dbfile));FileHelper::createDir(FileHelper::parentDir(basedir));FileHelper::createFile(dbfile);}};public:using ptr = std::shared_ptr<VirtualHost>;VirtualHost(const std::string &name, const std::string &dbfile, const std::string &basedir):_init_helper(dbfile,basedir), _name(name), _dbfile(dbfile), _basedir(basedir), _emp(std::make_shared<ExchangeManager>(_dbfile)), _mqmp(std::make_shared<MsgQueueManager>(_dbfile)), _bmp(std::make_shared<BindingManager>(_dbfile)), _mmp(std::make_shared<MessageManager>(_basedir)){// 初始化所有队列的消息管理句柄MsgQueueMap mqm = _mqmp->getAllMsgQueue();for (auto &kv : mqm){_mmp->initQueueMessageManager(kv.first);}}// 声明/删除交换机bool declareExchange(const std::string &ename, ExchangeType etype, bool edurable, bool eauto_delete,const google::protobuf::Map<std::string, std::string> &eargs){return _emp->declareExchange(ename, etype, edurable, eauto_delete, eargs);}bool eraseExchange(const std::string &ename){// 删除交换机之前要删除该交换机的所有绑定信息哦_bmp->removeExchangeBindings(ename);return _emp->eraseExchange(ename);}// 声明/删除队列bool declareMsgQueue(const std::string &qname, bool qdurable, bool qexclusive, bool qauto_delete,const google::protobuf::Map<std::string, std::string> &qargs){// 初始化该队列的消息管理模块举句柄_mmp->initQueueMessageManager(qname);return _mqmp->declareMsgQueue(qname, qdurable, qexclusive, qauto_delete, qargs);}bool eraseMsgQueue(const std::string &qname){// 删除该队列的所有绑定信息if (!_bmp->removeMsgQueueBindings(qname)){default_error("删除队列失败,因为删除队列的所有绑定信息失败, 队列名: %s",qname.c_str());return false;}// 删除该队列的消息管理模块句柄_mmp->destroyQueueMessageManager(qname);return _mqmp->eraseMsgQueue(qname);}// 绑定/解绑队列bool bind(const std::string &ename, const std::string &qname, const std::string &binding_key){// 因为不知道该交换机,队列是否存在// 也不知道他们是否持久化,所以先查找他们Exchange::ptr ep = _emp->getExchange(ename);if (ep.get() == nullptr){default_error("绑定交换机与队列失败,因为该交换机不存在, 交换机名称: %s",ename.c_str());return false;}MsgQueue::ptr mqp = _mqmp->getMsgQueue(qname);if (mqp.get() == nullptr){default_error("绑定交换机与队列失败,因为该队列不存在, 队列名称: %s",qname.c_str());return false;}return _bmp->bind(ename, qname, binding_key, ep->durable && mqp->durable);}bool unBind(const std::string& ename,const std::string& qname){return _bmp->unBind(ename,qname);}// 发布/确认消息bool basicPublish(const std::string &qname, const BasicProperities *bp, const std::string &body){// 在这里能够知道队列的持久化方式,因此就能够传递durable了// 1. 查找该队列的ptr,看是否存在,拿到durable// 2. 发布消息MsgQueue::ptr mqp = _mqmp->getMsgQueue(qname);if (mqp.get() == nullptr){default_error("发布消息失败,因为该队列不存在, 队列名: %s",qname.c_str());return false;}return _mmp->publishMessage(qname, bp, body, (mqp->durable) ? DURABLE : UNDURABLE);}bool basicAck(const std::string& qname,const std::string& msg_id){return _mmp->ackMessage(qname,msg_id);}// 推送[消费]消息MessagePtr basicConsume(const std::string& qname){return _mmp->front(qname);}// 获取指定交换机的所有绑定信息MsgQueueBindingMap getAllBindingsByExchange(const std::string& ename){return _bmp->getExchangeBindings(ename);}// 获取所有队列MsgQueueMap getAllMsgQueue(){return _mqmp->getAllMsgQueue();}// 获取指定交换机/队列Exchange::ptr getExchange(const std::string& ename){return _emp->getExchange(ename);}MsgQueue::ptr getMsgQueue(const std::string& qname){return _mqmp->getMsgQueue(qname);}// 判断指定交换机/队列是否存在bool existsExchange(const std::string& ename){return _emp->exists(ename);}bool existMsgQueue(const std::string& qname){return _mqmp->exists(qname);}std::string get_name() const{return _name;}std::string get_dbfile() const{return _dbfile;}std::string get_basedir() const{return _basedir;}void clear(){_emp->clear();_mqmp->clear();_bmp->clear();_mmp->clear();//删除数据库文件FileHelper::removeFile(_dbfile);//删除目录FileHelper::removeDir(_basedir);_dbfile.clear();_basedir.clear();_name.clear();}private:std::string _name;std::string _dbfile;std::string _basedir;ExchangeManager::ptr _emp;MsgQueueManager::ptr _mqmp;BindingManager::ptr _bmp;MessageManager::ptr _mmp;
};
2.虚拟机持久化管理模块
唯一需要注意的就是SQLite3数据库当中创建表指定唯一键索引时
不是用unique key,而是一个unique关键字即可
using VirtualHostMap = std::unordered_map<std::string, VirtualHost::ptr>;class VirtualHostMapper
{
public:// 注意:这个dbfile是存放虚拟机表的那个专属数据库文件,而不是对应虚拟机整合的那些资源所属的数据库文件VirtualHostMapper(const std::string &dbfile): _helper(dbfile){if (!_helper.open()){default_fatal("虚拟机持久化管理模块创建失败, 因为数据库打开失败, 数据库: %s",dbfile.c_str());abort();}if (!createTable()){default_fatal("虚拟机持久化管理模块创建失败, 因为表创建失败");abort();}}bool createTable(){static std::string create_sql = "create table if not exists vhost_table(\name varchar(32) primary key,\dbfile varchar(32) unique not null,\basedir varchar(32) unique not null);";if (!_helper.exec(create_sql, nullptr, nullptr)){default_fatal("虚拟机表创建失败");return false;}return true;}bool dropTable(){static std::string drop_sql = "drop table if exists vhost_table;";if (!_helper.exec(drop_sql, nullptr, nullptr)){default_fatal("虚拟机表删除失败");return false;}return true;}bool insert(const VirtualHost::ptr &vhp){std::ostringstream insert_sql;insert_sql << "insert into vhost_table values(";insert_sql << "'" << vhp->get_name() << "',";insert_sql << "'" << vhp->get_dbfile() << "',";insert_sql << "'" << vhp->get_basedir() << "');";if (!_helper.exec(insert_sql.str(), nullptr, nullptr)){default_error("虚拟机插入数据失败");return false;}return true;}bool erase(const std::string &vname){std::ostringstream delete_sql;delete_sql << "delete from vhost_table if name=";delete_sql << "'" << vname << "';";if (!_helper.exec(delete_sql.str(), nullptr, nullptr)){default_error("虚拟机删除数据失败");return false;}return true;}VirtualHostMap recovery(){static std::string select_sql = "select * from vhost_table;";VirtualHostMap vhmap;if (!_helper.exec(select_sql, VirtualHostMapper::selectCallback, &vhmap)){default_error("虚拟机恢复/查询数据失败");return VirtualHostMap();}return vhmap;}private:static int selectCallback(void *args, int column, char **rows, char **fields){VirtualHostMap *vhptr = static_cast<VirtualHostMap *>(args);VirtualHost::ptr vhp = std::make_shared<VirtualHost>(rows[0], rows[1], rows[2]);vhptr->insert(std::make_pair(vhp->get_name(), vhp));return 0;}SqliteHelper _helper;
};
3.虚拟机管理模块
总体消息管理模块和队列消息管理模块之间的关系 跟
虚拟机管理模块和虚拟机模块之间的关系 是一样的
接口的复用也是一样的
只不过我们在查找对应的虚拟机时,可以复用一下getVirtualHost这个函数
因为对应虚拟机模块下的四个临界资源句柄都已加锁,所以拿到VirtualHost之后操作就无需加锁了
因此才可以复用
而声明/删除虚拟机则无法复用getVirtualHost,原因跟交换机模块当中声明/删除交换机无法复用exists函数一样
两个原子操作组合在一起就不原子了
const std::string vhost_dbfile = "main.db";class VirtualHostManager
{
public:using ptr=std::shared_ptr<VirtualHostManager>;VirtualHostManager(const std::string &dbfile = vhost_dbfile): _mapper(dbfile){_vhmap = _mapper.recovery();}// 声明/删除虚拟机的操作不能复用查找虚拟机这个函数,因为:// 要么死锁,要么就两个原子操作了,而两个原子操作放到一起就不原子了// 声明/删除虚拟机bool declareVirtualHost(const std::string &vname, const std::string dbfile, const std::string &basedir){// 1. 加锁,查找std::unique_lock<std::mutex> ulock(_mutex);if (_vhmap.count(vname))return true;// 2. 构造虚拟机对象VirtualHost::ptr vhp = std::make_shared<VirtualHost>(vname, dbfile, basedir);if (!_mapper.insert(vhp)){std::ostringstream oss;oss << "声明虚拟机失败,因为虚拟机持久化失败, 虚拟机名称: " << vname << "\n" return false;}_vhmap.insert(std::make_pair(vname, vhp));return true;}bool eraseVirtualHost(const std::string &vname){VirtualHost::ptr vhp;{// 1. 加锁,查找std::unique_lock<std::mutex> ulock(_mutex);auto iter = _vhmap.find(vname);if (iter == _vhmap.end()){return true;}// 2. 保存vhp = iter->second;// 3. 持久化删除if (!_mapper.erase(vname)){default_error("删除虚拟机失败,因为持久化删除失败, 虚拟机名称: %s",vname.c_str());return false;}// 4. 从哈希表中删除_vhmap.erase(vname);}// 4. 清空所有资源vhp->clear();return true;}// 声明/删除交换机bool declareExchange(const std::string &vname, const std::string &ename, ExchangeType etype, bool edurable, bool eauto_delete,const google::protobuf::Map<std::string, std::string> &eargs){std::ostringstream oss;oss << "声明交换机失败,因为虚拟机不存在, 交换机名称: " << ename << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->declareExchange(ename, etype, edurable, eauto_delete, eargs);}bool eraseExchange(const std::string &vname, const std::string &ename){std::ostringstream oss;oss << "删除交换机失败,因为虚拟机不存在, 交换机名称: " << ename << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->eraseExchange(ename);}// 声明/删除队列bool declareMsgQueue(const std::string &vname, const std::string &qname, bool qdurable, bool qexclusive, bool qauto_delete,const google::protobuf::Map<std::string, std::string> &qargs){std::ostringstream oss;oss << "声明队列失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->declareMsgQueue(qname, qdurable, qexclusive, qauto_delete, qargs);}bool eraseMsgQueue(const std::string &vname, const std::string &qname){std::ostringstream oss;oss << "删除队列失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->eraseMsgQueue(qname);}// 绑定/解绑队列bool bind(const std::string &vname, const std::string &ename, const std::string &qname, const std::string &binding_key){std::ostringstream oss;oss << "绑定队列失败,因为虚拟机不存在, 队列名称: " << qname << ", 交换机名称: " << ename << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->bind(ename, qname, binding_key);}bool unBind(const std::string &vname, const std::string &ename, const std::string &qname){std::ostringstream oss;oss << "解绑队列失败,因为虚拟机不存在, 队列名称: " << qname << ", 交换机名称: " << ename << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->unBind(ename, qname);}// 发布/确认消息bool basicPublish(const std::string &vname, const std::string &qname, const BasicProperities *bp, const std::string &body){std::ostringstream oss;oss << "发布消息失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->basicPublish(qname, bp, body);}bool basicAck(const std::string &vname, const std::string &qname, const std::string &msg_id){std::ostringstream oss;oss << "确认消息失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->basicAck(qname, msg_id);}// 推送[消费]消息MessagePtr basicConsume(const std::string &vname, const std::string &qname){std::ostringstream oss;oss << "确认消息失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return MessagePtr();}return vhp->basicConsume(qname);}// 获取指定交换机的所有绑定信息MsgQueueBindingMap getAllBindingsByExchange(const std::string &vname, const std::string &ename){std::ostringstream oss;oss << "获取指定交换机的所有绑定信息失败,因为虚拟机不存在, 交换机名称: " << ename << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return MsgQueueBindingMap();}return vhp->getAllBindingsByExchange(ename);}// 获取所有队列MsgQueueMap getAllMsgQueue(const std::string &vname){std::ostringstream oss;oss << "获取所有队列失败,因为虚拟机不存在 , 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return MsgQueueMap();}return vhp->getAllMsgQueue();}// 获取指定交换机/队列Exchange::ptr getExchange(const std::string &vname, const std::string &ename){std::ostringstream oss;oss << "获取指定交换机失败,因为虚拟机不存在, 交换机名称: " << ename << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return Exchange::ptr();}return vhp->getExchange(ename);}MsgQueue::ptr getMsgQueue(const std::string &vname, const std::string &qname){std::ostringstream oss;oss << "获取指定队列失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return MsgQueue::ptr();}return vhp->getMsgQueue(qname);}// 判断指定交换机/队列是否存在bool existsExchange(const std::string &vname, const std::string &ename){std::ostringstream oss;oss << "获取指定队列失败,因为虚拟机不存在, 交换机名称: " << ename << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->existsExchange(ename);}bool existMsgQueue(const std::string &vname, const std::string &qname){std::ostringstream oss;oss << "获取指定队列失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->existMsgQueue(qname);}//清空/销毁所有虚拟机void clear(){std::unique_lock<std::mutex> ulock(_mutex);for (auto &kv : _vhmap){kv.second->clear();}_mapper.dropTable();_vhmap.clear();}VirtualHost::ptr getVirtualHost(const std::string &vname, const std::ostringstream &oss){// 加锁,查找std::unique_lock<std::mutex> ulock(_mutex);auto iter = _vhmap.find(vname);if (iter == _vhmap.end()){default_error("%s",oss.str().c_str());return VirtualHost::ptr();}return iter->second;}VirtualHostMap getAllVirtualHost(){std::unique_lock<std::mutex> ulock(_mutex);return _vhmap;}private:std::mutex _mutex;VirtualHostMap _vhmap;VirtualHostMapper _mapper;
};
4.测试
#include "../mqserver/virtual_host.hpp"#include <gtest/gtest.h>
#include <vector>
using namespace ns_mq;VirtualHostManager::ptr vhmp;std::vector<std::string> uuid_vec = {UUIDHelper::uuid(), UUIDHelper::uuid(), UUIDHelper::uuid(), UUIDHelper::uuid(), UUIDHelper::uuid()};class VirtualHostTest : public testing::Environment
{
public:virtual void SetUp(){vhmp = std::make_shared<VirtualHostManager>(); // 使用那个main.db即可}virtual void TearDown(){vhmp->clear();}
};TEST(vhost_test, recovery_test)
{ASSERT_NE(vhmp->getExchange("vhost1","exchange1").get(),nullptr);ASSERT_NE(vhmp->getExchange("vhost1","exchange2").get(),nullptr);ASSERT_NE(vhmp->getMsgQueue("vhost1","queue1").get(),nullptr);
}TEST(vhost_test, insert_test)
{vhmp->declareVirtualHost("vhost1", "host1.db", "./host1");vhmp->declareVirtualHost("vhost2", "host2.db", "./host2");vhmp->declareVirtualHost("vhost3", "host3.db", "./host3");vhmp->declareVirtualHost("vhost4", "host4.db", "./host4");ASSERT_EQ(vhmp->exists("vhost1"), true);ASSERT_EQ(vhmp->exists("vhost2"), true);ASSERT_EQ(vhmp->exists("vhost3"), true);ASSERT_EQ(vhmp->exists("vhost4"), true);
}TEST(vhost_test, exchange_test)
{vhmp->declareExchange("vhost1", "exchange1", DIRECT, true, false, {});vhmp->declareExchange("vhost1", "exchange2", TOPIC, true, false, {});vhmp->declareExchange("vhost1", "exchange3", FANOUT, true, false, {});ASSERT_NE(vhmp->getExchange("vhost1", "exchange1").get(), nullptr);ASSERT_NE(vhmp->getExchange("vhost1", "exchange2").get(), nullptr);ASSERT_NE(vhmp->getExchange("vhost1", "exchange3").get(), nullptr);vhmp->eraseExchange("vhost1", "exchange3");ASSERT_NE(vhmp->getExchange("vhost1", "exchange1").get(), nullptr);ASSERT_NE(vhmp->getExchange("vhost1", "exchange2").get(), nullptr);ASSERT_EQ(vhmp->getExchange("vhost1", "exchange3").get(), nullptr);
}TEST(vhost_test, queue_test)
{vhmp->declareMsgQueue("vhost1", "queue1", true, false, false, {});vhmp->declareMsgQueue("vhost1", "queue2", true, false, false, {});vhmp->declareMsgQueue("vhost1", "queue3", true, false, false, {});ASSERT_NE(vhmp->getMsgQueue("vhost1", "queue1").get(), nullptr);ASSERT_NE(vhmp->getMsgQueue("vhost1", "queue2").get(), nullptr);ASSERT_NE(vhmp->getMsgQueue("vhost1", "queue3").get(), nullptr);vhmp->eraseMsgQueue("vhost1", "queue3");ASSERT_NE(vhmp->getMsgQueue("vhost1", "queue1").get(), nullptr);ASSERT_NE(vhmp->getMsgQueue("vhost1", "queue2").get(), nullptr);ASSERT_EQ(vhmp->getMsgQueue("vhost1", "queue3").get(), nullptr);
}TEST(vhost_test, bind_test)
{vhmp->bind("vhost1", "exchange1", "queue1", "news.music.pop");vhmp->bind("vhost1", "exchange1", "queue2", "news.music.pop");vhmp->bind("vhost1", "exchange2", "queue1", "news.music.pop");vhmp->bind("vhost1", "exchange2", "queue2", "news.music.pop");MsgQueueBindingMap emap = vhmp->getAllBindingsByExchange("vhost1", "exchange1");ASSERT_EQ(emap.size(), 2);vhmp->eraseMsgQueue("vhost1", "queue2");emap = vhmp->getAllBindingsByExchange("vhost1", "exchange1");ASSERT_EQ(emap.size(), 1);
}TEST(vhost_test, message_test)
{
//1. 发布消息测试BasicProperities bp;bp.set_msg_id(uuid_vec[0]);bp.set_mode(DURABLE);bp.set_routing_key("news.music.#");vhmp->basicPublish("vhost1", "queue1", &bp, std::string("Hello-1"));bp.set_msg_id(uuid_vec[1]);vhmp->basicPublish("vhost1", "queue1", &bp, std::string("Hello-2"));bp.set_msg_id(uuid_vec[2]);vhmp->basicPublish("vhost1", "queue1", &bp, std::string("Hello-3"));bp.set_msg_id(uuid_vec[3]);vhmp->basicPublish("vhost1", "queue1", &bp, std::string("Hello-4"));bp.set_msg_id(uuid_vec[4]);vhmp->basicPublish("vhost1", "queue1", &bp, std::string("Hello-5"));//2. 消费消息测试MessagePtr mp= vhmp->basicConsume("vhost1","queue1");ASSERT_NE(mp.get(),nullptr);ASSERT_EQ(mp->valid().body(),std::string("Hello-1"));mp= vhmp->basicConsume("vhost1","queue1");ASSERT_NE(mp.get(),nullptr);ASSERT_EQ(mp->valid().body(),std::string("Hello-2"));mp= vhmp->basicConsume("vhost1","queue1");ASSERT_NE(mp.get(),nullptr);ASSERT_EQ(mp->valid().body(),std::string("Hello-3"));mp= vhmp->basicConsume("vhost1","queue1");ASSERT_NE(mp.get(),nullptr);ASSERT_EQ(mp->valid().body(),std::string("Hello-4"));//3. 确认消息测试ASSERT_EQ(vhmp->basicAck("vhost1","queue1",uuid_vec[0]),true);ASSERT_EQ(vhmp->basicAck("vhost1","queue1",uuid_vec[1]),true);ASSERT_EQ(vhmp->basicAck("vhost1","queue1",uuid_vec[2]),true);}int main(int argc, char *argv[])
{testing::AddGlobalTestEnvironment(new VirtualHostTest);testing::InitGoogleTest(&argc, argv);return RUN_ALL_TESTS();
}
五、路由匹配模块
现在我们离具体业务的实现就差一个模块了,就是如何通过交换机类型,binding_key,routing_key来进行消息往队列的投递
这个任务是一个单独的解耦任务,因此我们提取为路由匹配模块
这个模块没有成员,类似于一个工具类,只不过它是独属于我们的项目的
其实就是一个通配符匹配问题,【模式串和匹配串的匹配问题】,经典的一个动态规划问题
1.routing_key和binding_key的格式介绍
2.动规分析
假设我们要匹配的两个字符串是这样的:
routing_key: aaa.ddd.aaa.bbb.eee.ddd
binding_key: aaa.#.bbb.*.ddd
1.状态定义与表示
我们可以很明显的感受到,整个动规的状态取决于两个变量:
i,j【routing_key的前i个单词能否跟binding_key的前j个单词相匹配】
如果写成递归式的函数签名:
bool _ismatch(const std::string& routing_key,const std::string& binding_key,int i,int j);
因此我们的表格是二维的:
dp[i][j]:routing_key的前i个单词能否跟binding_key的前j个单词相匹配
2.状态转移方程
因为binding_key当中的单词分为三种:
纯单词,*,#
所以我们需要分三类进行讨论:
1.纯单词
纯单词就简单了:
若routing_key的第i个单词和binding_key的第j个单词相匹配,则:
routing_key的前i个单词是否跟binding_key的前j个单词相匹配就等价于:
routing_key的前i-1个单词是否跟binding_key的前j-1个单词相匹配
//返回routing_key的第i个单词是否跟binding_key的第j个单词相匹配
bool _ismatch(const std::string& routing_key,const std::string& binding_key,int i,int j);if(_ismatch(routing_key,binding_key,i,j))
{dp[i][j]=dp[i-1][j-1];
}
else dp[i][j]=false;
2.*
因为*匹配任意一个单词,所以只要跟*进行匹配,则必定成功,因此这种情况比第一种殊途同归,因此在_ismatch函数当中匹配时直接if特判一下即可,所以代码不用改
if(_ismatch(routing_key,binding_key,i,j))
{dp[i][j]=dp[i-1][j-1];
}
else dp[i][j]=false;
3.#
因为#可以匹配0个或者任意多个单词:
而匹配n个单词==匹配1个单词+匹配n-1个单词【这个继续匹配即可】
【因此匹配数是可递归拆分的】,所以我们只需要分为三种情况进行讨论即可:
- 匹配0个单词
- 匹配1个单词
- 匹配1个单词后继续匹配【即:匹配多个单词】
【用到了类似于完全背包的拆解方法】
1.#匹配0个单词
dp[i][j]:routing_key的前i个单词能否跟binding_key的前j个单词相匹配而如今#匹配0个单词,所以routing_key的前i个单词就要跟binding_key的前j个单词相匹配
dp[i][j]=dp[i][j-1] : 继承自上方
2.#匹配1个单词
跟*一样
dp[i][j]=dp[i-1][j-1] :继承自左上方
3.#匹配1个单词后继续匹配
dp[i][j]:routing_key的前i个单词能否跟binding_key的前j个单词相匹配而如今#匹配1个单词后继续匹配,所以routing_key的前i-1个单词就要跟binding_key的前j个单词相匹配
dp[i][j]=dp[i-1][j] : 继承自左方
3.dp数组初始化
dp[i][j]:routing_key的前i个单词能否跟binding_key的前j个单词相匹配
所以:
- dp数组先整体初始化为false:
vector<vector<bool>> dp(行数+1,vector<bool>(列数+1,false));
- dp[0][0]:两者都为空,true
- 第一行dp[0][j]当中:routing_key为空,而binding_key不为空
【只要binding_key连续单词为#,则匹配是必定成功的】
for(int j=1;j<=列数;j++)
{if(binding_key的第j个单词 != "#")break;else dp[0][j]=true;
}
- 第一列dp[i][0]当中:routing_key不为空,而binding_key为空
【一定是false】
4.填表顺序
因为每个格子都只会依赖于左,上和左上,因此从上往下,从左往右填表
for(int i=1;i<=行数;i++)
{for(int j=1;j<=列数;j++){//书写状态转移方程}
}
5.返回值
dp[行数][列数]:整个routing_key能否跟整个binding_key相匹配
3.动规代码
但是我们还需要能够拿到routing_key和binding_key的第x个单词
如果每次都现求,那代码既不优雅,效率也低
因此可以用StringHelper::split
【注意:存在下标偏移:即:routing_key的第i个单词其实是routing_vec[i-1]】
该分析的都分析完了,下面直接上代码:
// routing_key : 匹配串 binding_key:模式串
static bool ismatch(const std::string &routing_key, const std::string &binding_key)
{// 1. 分割字符串,将每个单词放到vec当中std::vector<std::string> routing_vec, binding_vec;StringHelper::split(routing_key, ".", &routing_vec);StringHelper::split(binding_key, ".", &binding_vec);// 2. 动规开始// 2.1. dp数组定义int row = routing_vec.size(), col = binding_vec.size();std::vector<std::vector<bool>> dp(row + 1, std::vector<bool>(col + 1, false));// 2.2. dp数组初始化dp[0][0] = true;for (int j = 1; j <= col; j++){if (binding_vec[j - 1] != "#")break;dp[0][j] = true;}// 2.3 填表+状态转移for (int i = 1; i <= row; i++){for (int j = 1; j <= col; j++){// 第1种或者第2种情况if (binding_vec[j - 1] == "*" || routing_vec[i - 1] == binding_vec[j - 1]){dp[i][j] = dp[i - 1][j - 1];}// 第3种情况else if (binding_vec[j - 1] == "#"){// dp[i][j-1] : 匹配0个// dp[i-1][j-1] : 匹配1个// dp[i-1][j] : 匹配多个dp[i][j] = dp[i][j - 1] || dp[i - 1][j - 1] || dp[i - 1][j];}}}// 2.4 返回值return dp[row][col];
}
4.模块整体代码
1.路由匹配代码
我们需要注意的是:
上面动规的路由匹配仅仅使用于交换机类型是TOPIC【主题交换】时
而其余的DIRECT【直接交换】和FANOUT【广播交换】并不适用,因此路由匹配模块并未完全完成,其实也很简单:
static bool route(const std::string &routing_key, const std::string &binding_key, ns_proto::ExchangeType type)
{if (type == ns_proto::ExchangeType::DIRECT){return routing_key == binding_key;}else if (type == ns_proto::ExchangeType::FANOUT){return true;}else if (type == ns_proto::ExchangeType::TOPIC){return ismatch(routing_key, binding_key);}else if (type == ns_proto::ExchangeType::UNKNOWNTYPE){default_fatal("路由匹配时,交换机类型为:UNKNOWNTYPE");return false;}else{default_fatal("路由匹配时,交换机出现未知类型");// 出现未知类型return false;}
}
2.判断routing_key合法性
routing_key只能由数字,字母,下划线,点组成:
// routing_key只能由数字,字母,下划线,点组成
static bool check_routing_key(const std::string &routing_key)
{for (auto &ch : routing_key){if ((ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z') || (ch == '_') || (ch=='.'))continue;elsereturn false;}return true;
}
3.判断binding_key合法性
// binding_key:
// 1. 只能由数字,字母,下划线,点,*,#组成
// 2. 通配符必须自成单词
// 3. *通配符两边不能有任何通配符
static bool check_binding_key(const std::string &binding_key)
{// 0. 只能由数字,字母,下划线,点,*,#组成for (auto &ch : binding_key){if ((ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z') || (ch == '_') || (ch == '*') || (ch == '#') || (ch=='.'))continue;elsereturn false;}// 1. 因为要涉及到对单词进行操作,所以先分割单词std::vector<std::string> vec;StringHelper::split(binding_key, ".", &vec);// 2. 通配符必须自成单词for (auto &str : vec){if (str.size() > 1 && (str.find("*") != std::string::npos || str.find("#") != std::string::npos)){return false;}}// 3. *通配符两边不能有任何通配符for (int i = 1; i < vec.size(); i++){if (vec[i] == "#" && vec[i - 1] == "#")return false;else if (vec[i] == "#" && vec[i - 1] == "*")return false;else if (vec[i] == "*" && vec[i - 1] == "#")return false;}return true;
}
4.完整代码
using namespace ns_helper;
class Router
{
public:// routing_key只能由数字,字母,下划线,点组成static bool check_routing_key(const std::string &routing_key){for (auto &ch : routing_key){if ((ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z') || (ch == '_') || (ch=='.'))continue;elsereturn false;}return true;}// binding_key:// 1. 只能由数字,字母,下划线,点,*,#组成// 2. 通配符必须自成单词// 3. *通配符两边不能有任何通配符static bool check_binding_key(const std::string &binding_key){// 0. 只能由数字,字母,下划线,点,*,#组成for (auto &ch : binding_key){if ((ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z') || (ch == '_') || (ch == '*') || (ch == '#') || (ch=='.'))continue;elsereturn false;}// 1. 因为要涉及到对单词进行操作,所以先分割单词std::vector<std::string> vec;StringHelper::split(binding_key, ".", &vec);// 2. 通配符必须自成单词for (auto &str : vec){if (str.size() > 1 && (str.find("*") != std::string::npos || str.find("#") != std::string::npos)){return false;}}// 3. *通配符两边不能有任何通配符for (int i = 1; i < vec.size(); i++){if (vec[i] == "#" && vec[i - 1] == "#")return false;else if (vec[i] == "#" && vec[i - 1] == "*")return false;else if (vec[i] == "*" && vec[i - 1] == "#")return false;}return true;}static bool route(const std::string &routing_key, const std::string &binding_key, ns_proto::ExchangeType type){if (!check_routing_key(routing_key)){default_warning("路由匹配时:routing_key不合法,routing_key:%s",routing_key.c_str());return false;}if (!check_binding_key(binding_key)){default_warning("路由匹配时:binding_key不合法,binding_key:%s",binding_key.c_str());return false;}if (type == ns_proto::ExchangeType::DIRECT){return routing_key == binding_key;}else if (type == ns_proto::ExchangeType::FANOUT){return true;}else if (type == ns_proto::ExchangeType::TOPIC){return ismatch(routing_key, binding_key);}else if (type == ns_proto::ExchangeType::UNKNOWNTYPE){default_fatal("路由匹配时,交换机类型为:UNKNOWNTYPE");return false;}else{default_fatal("路由匹配时,交换机出现未知类型");// 出现未知类型return false;}}private:// routing_key : 匹配串 binding_key:模式串static bool ismatch(const std::string &routing_key, const std::string &binding_key){// 1. 分割字符串,将每个单词放到vec当中std::vector<std::string> routing_vec, binding_vec;StringHelper::split(routing_key, ".", &routing_vec);StringHelper::split(binding_key, ".", &binding_vec);// 2. 动规开始// 2.1. dp数组定义int row = routing_vec.size(), col = binding_vec.size();std::vector<std::vector<bool>> dp(row + 1, std::vector<bool>(col + 1, false));// 2.2. dp数组初始化dp[0][0] = true;for (int j = 1; j <= col; j++){if (binding_vec[j - 1] != "#")break;dp[0][j] = true;}// 2.3 填表+状态转移for (int i = 1; i <= row; i++){for (int j = 1; j <= col; j++){// 第1种或者第2种情况if (binding_vec[j - 1] == "*" || routing_vec[i - 1] == binding_vec[j - 1]){dp[i][j] = dp[i - 1][j - 1];}// 第3种情况else if (binding_vec[j - 1] == "#"){// dp[i][j-1] : 匹配0个// dp[i-1][j-1] : 匹配1个// dp[i-1][j] : 匹配多个dp[i][j] = dp[i][j - 1] || dp[i - 1][j - 1] || dp[i - 1][j];}}}// 2.4 返回值return dp[row][col];}
};
5.测试
#include "../mqserver/routing.hpp"
#include <gtest/gtest.h>using namespace ns_mq;class RouteTest : public testing::Environment
{
public:virtual void SetUp() {}virtual void TearDown() {}
};TEST(route_test, legal_test)
{ASSERT_EQ(Router::check_routing_key("news...music.^%"), false);ASSERT_EQ(Router::check_routing_key("news...music/@$."), false);ASSERT_EQ(Router::check_binding_key("news...music.#"), true);ASSERT_EQ(Router::check_binding_key("news...music.a#"), false);ASSERT_EQ(Router::check_binding_key("news.#.#"), false);ASSERT_EQ(Router::check_binding_key("news.*.#"), false);ASSERT_EQ(Router::check_binding_key("news.*.*"), true);
}TEST(route_test, route)
{// [测试⽤例]// binding key routing key result// aaa aaa true// aaa.bbb aaa.bbb true// aaa.bbb aaa.bbb.ccc false// aaa.bbb aaa.ccc false// aaa.#.bbb aaa.bbb.ccc false// aaa.bbb.# aaa.ccc.bbb false// #.bbb.ccc aaa.bbb.ccc.ddd false// aaa.bbb.ccc aaa.bbb.ccc true// aaa.* aaa.bbb true// aaa.*.bbb aaa.bbb.ccc false// *.aaa.bbb aaa.bbb false// # aaa.bbb.ccc true// aaa.# aaa.bbb true// aaa.# aaa.bbb.ccc true// aaa.#.ccc aaa.ccc true// aaa.#.ccc aaa.bbb.ccc true// aaa.#.ccc aaa.aaa.bbb.ccc true// #.ccc ccc true// #.ccc aaa.bbb.ccc true// aaa.#.ccc.ccc aaa.bbb.ccc.ccc.ccc true// aaa.#.bbb.*.bbb aaa.ddd.ccc.bbb.eee.bbb truestd::vector<std::string> binding_v = {"aaa","aaa.bbb","aaa.bbb","aaa.bbb","aaa.#.bbb","aaa.bbb.#","#.bbb.ccc","aaa.bbb.ccc","aaa.*","aaa.*.bbb","*.aaa.bbb","#","aaa.#","aaa.#","aaa.#.ccc","aaa.#.ccc","aaa.#.ccc","#.ccc","#.ccc","aaa.#.ccc.ccc","aaa.#.bbb.*.bbb"};std::vector<std::string> routing_v = {"aaa","aaa.bbb","aaa.bbb.ccc","aaa.ccc","aaa.bbb.ccc","aaa.ccc.bbb","aaa.bbb.ccc.ddd","aaa.bbb.ccc","aaa.bbb","aaa.bbb.ccc","aaa.bbb","aaa.bbb.ccc","aaa.bbb","aaa.bbb.ccc","aaa.ccc","aaa.bbb.ccc","aaa.aaa.bbb.ccc","ccc","aaa.bbb.ccc","aaa.bbb.ccc.ccc.ccc","aaa.ddd.ccc.bbb.eee.bbb"};std::vector<bool> ret_v = {true,true,false,false,false,false,false,true,true,false,false,true,true,true,true,true,true,true,true,true,true};for (int i = 0; i < ret_v.size(); i++){ASSERT_EQ(Router::route(routing_v[i], binding_v[i], ns_proto::ExchangeType::TOPIC), ret_v[i]);}
}int main(int argc, char *argv[])
{testing::AddGlobalTestEnvironment(new RouteTest);testing::InitGoogleTest(&argc, argv);return RUN_ALL_TESTS();
}
以上就是项目第六弹:虚拟机管理模块、路由匹配模块的全部内容