项目扩展二:消息拉取功能的实现

项目扩展二:消息拉取功能的实现

  • 一、回顾一下消息推送功能是如何实现的
  • 二、设计消息拉取功能
    • 1.服务器如何处理
    • 2.定义Request和Response
      • 1.定义Request
      • 2.proto文件
  • 三、服务器实现消息拉取
    • 1.业务模块的实现:信道模块
    • 2.消费者管理模块实现O(1)获取消费者
      • 1.目前的情形
      • 2.加一个哈希表?
      • 3.如何做?
      • 4.代码
    • 3.信道模块实现
    • 4.broker服务器注册响应业务函数
  • 四、客户端修改
  • 五、修改消息推送逻辑--设计
    • 1.考量
    • 2.要不要给BOTH消息推送机制呢?
    • 3.设计
  • 六、实现
    • 1.修改消息proto文件
    • 2.修改队列消息管理模块
      • 1.成员的修改
      • 2.recovery恢复历史消息的修改
      • 3.发布消息的修改
      • 3.front的修改
      • 4.clear的修改
    • 3.总体消息管理模块修改
      • 1.PublishMessage加一个参数
      • 2.获取链表队头消息修改
    • 4.虚拟机模块的修改
      • 1.发布消息增加一个参数
        • 1.虚拟机模块
        • 2.虚拟机管理模块
      • 2.推送和拉取消息的修改
        • 1.虚拟机模块
        • 2.虚拟机管理模块
    • 5.proto网络通信协议修改
      • 1.BasicPublishRequest
    • 6.信道模块修改
      • 1.发布消息的修改
      • 2.publishCallback的修改
        • 1.坑点--连锁BUG
        • 2.解决方案
        • 3.实现接口
        • 4.publishCallback的修改
      • 3.拉取消息的修改
    • 7.客户端修改
  • 七、验证
    • 1.消息拉取功能与恢复功能联合测试
      • 1.测试1
        • 1.生产者
        • 2.消费者
        • 3.演示
      • 2.测试2 -- 演示
    • 2.PULL测试
    • 2.BOTH测试

一、回顾一下消息推送功能是如何实现的

这其中一共有三个角色:
在这里插入图片描述

二、设计消息拉取功能

给客户端多提供一个服务:消息拉取服务
其实就是从该消费者订阅的队列当中取出一个消息,推送给该消费者
只不过这个消息拉取是由消费者主动向我们服务器发起请求的

因此,我们要考虑两点:

  1. 服务器如何主动向消费者发送消息
  2. 网络通信协议中Request和Response的定义

1.服务器如何处理

要完成:主动向消费者发送消息这一任务,需要两个模块:消费者管理模块和虚拟机模块

  1. 从消费者管理模块当中取出消费者
  2. 从虚拟机模块当中取出消息

然后调用该消费者的消费处理回调函数,向客户端发送BasicConsumeResponse给客户端它需要的参数,告诉客户端“你可以消费了”

总体而言,并不难,因为我们早已把具体功能模块化,然后需要使用哪些功能,找对应的负责人【模块句柄】即可
这就是面向对象的模块化编程的独特魅力

2.定义Request和Response

Response的话,依然只需要BasicCommonResponse和BasicConsumeResponse即可

BasicCommonResponse是基础响应,对客户端的每条Request都有ACK

因此,我们只需要完成定义Request的任务即可

1.定义Request

首先,req_id和channel_id必然需要。下面我们根据消息拉取这一功能考虑所需参数

首先,消费者拉取消息,因此我们需要找到该消费者,所以需要vhost_name,queue_name和consumer_tag

有了这三个字段,我们就能从消费者管理模块拿取该消费者对应信息
然后我们就可以推送消息了,无需消费者再提供任何信息

2.proto文件

因此,我们往proto文件当中新增一个message消息体

//8. 消息的拉取
message BasicPullRequest
{string req_id = 1;string channel_id = 2;string vhost_name = 3;string queue_name = 4;string consumer_tag = 5;
}

然后编译

protoc --cpp_out=. mq_proto.proto 

三、服务器实现消息拉取

1.业务模块的实现:信道模块

我们的信道是实现具体业务,提供具体服务模块,他内部整合了虚拟机模块和消费者管理模块。
因此我们只需要加一个函数,它的任务就是:

  1. 根据consumer_tag这三个字段 从消费者管理模块当中取出消费者
  2. 从虚拟机模块当中取出消息
  3. 调用该消费者的消费处理回调函数
  4. 如果该消费者有自动确认标志,则进行自动确认

其中,第3解耦且耗时,第4步我们想要它在第三步结束之后才开始,
我们想要解放信道服务线程,因此我们把第4步跟第3步放到一起交给异步工作线程

2.消费者管理模块实现O(1)获取消费者

我们的消费者管理模块还没有实现获取消费者这一接口,而这一接口在增加了消息拉取功能之后,又很常用
且基于之前的数据结构来实现,效率为O(N),所以我们需要改进一下,提高效率

1.目前的情形

std::vector<Consumer::ptr> _consumer_vec;
size_t _seq;

之前为了实现RR轮转的负载均衡,我们通过vector和一个轮询序号实现了队列消费者管理模块

新增,删除,都是O(N)【因为新增和删除消费者的需求频率并不高,所以没什么大碍】,主要是负载均衡select是O(1),所以设计总体来说还可以

但是目前我们想要增加的是根据consumer_tag来O(1)获取消费者,而vector只能O(N),所以不满足需求

2.加一个哈希表?

这里的哈希表的value_type不能是vector的迭代器,因为vector的扩容和删除都有迭代器失效问题
删除导致的迭代器失效问题还好解决,但是扩容导致的迭代器失效问题不好解决,因为vector的扩容被它通过封装屏蔽掉了,倒是也能检测【通过不断检测capcity()】,不过非常不优雅

所以哈希表的value_type搞成Consumer::ptr的话,虽然可以快速查找,但是这样的话,哈希表的用途就不大的
因为哈希表只能让查询操作变为O(1),但是删除还是O(N),因为vector还是要遍历删除的

3.如何做?

哈希表不能跟vector打好配合,所以我们将vector改为list,将RR轮转的负载均衡变为LRU式的负载均衡
每次select时从队头取最近最少使用的消费者,访问之后放到队尾

4.代码

_consumer_list的队头是最近最少使用的,队尾是最近访问的

因此:

  1. select获取消费者之后,将该消费者挪到队尾
  2. get获取消费者之后,将该消费者挪到队尾
  3. 新增消费者,将该消费者放到队头(因为该消费者的负载必为0)
class QueueConsumerManager
{
public:using ptr = std::shared_ptr<QueueConsumerManager>;QueueConsumerManager(const std::string &vhost_name, const std::string &qname): _vhost_name(vhost_name), _qname(qname) {}// 1. 新增消费者[只有想要订阅当前队列消息的消费者才会调用该函数]Consumer::ptr createConsumer(const std::string &consumer_tag, const ConsumerCallback &callback, bool auto_ack){// 1. 加锁,并查找是否有该消费者std::unique_lock<std::mutex> ulock(_mutex);auto iter_map = _consumer_map.find(consumer_tag);if (iter_map != _consumer_map.end()){iter_type iter_list = iter_map->second; // iter_list是链表的迭代器return *iter_list;}// 2. 从队头插入该消费者Consumer::ptr cp = std::make_shared<Consumer>(consumer_tag, callback, _vhost_name, _qname, auto_ack);_consumer_list.push_front(cp);_consumer_map.insert(std::make_pair(consumer_tag, _consumer_list.begin()));return cp;}void removeConsumer(const std::string &consumer_tag){// 加锁并删除该消费者std::unique_lock<std::mutex> ulock(_mutex);auto iter_map = _consumer_map.find(consumer_tag);if (iter_map == _consumer_map.end())return;iter_type iter_list = iter_map->second; // iter_list是链表的迭代器// 删除_consumer_list.erase(iter_list);_consumer_map.erase(iter_map);}Consumer::ptr selectConsumer(){// 0. 加锁并判断是否为空std::unique_lock<std::mutex> ulock(_mutex);if (_consumer_list.empty()){default_warning("获取消费者失败,因为该队列没有消费者,虚拟机名称:%s ,队列名:",_vhost_name.c_str(),_qname.c_str());return Consumer::ptr();}// 1. 拿到队头消费者Consumer::ptr cp = _consumer_list.front();// 2. 将队头消费者移到队尾_consumer_list.splice(_consumer_list.end(), _consumer_list, _consumer_list.begin());// 因为splice是转移节点,不会导致迭代器失效,所以无需更新哈希表return cp;}bool exist(const std::string &consumer_tag){std::unique_lock<std::mutex> ulock(_mutex);return _consumer_map.count(consumer_tag) > 0;}bool empty(){std::unique_lock<std::mutex> ulock(_mutex);return _consumer_map.empty();}void clear(){std::unique_lock<std::mutex> ulock(_mutex);_consumer_map.clear();}// 支持通过消费者tag来获取消费者,这里用哈希表来提高查询效率// 这里的哈希表的value_type不能是vector的迭代器,因为vector的扩容和删除都有迭代器失效问题// 删除导致的迭代器失效问题还好解决,但是扩容导致的迭代器失效问题不好解决,因为vector的扩容被它通过封装屏蔽掉了,倒是也能检测【通过不断检测capcity()】,不过非常不优雅// 所以哈希表的value_type搞成Consumer::ptr的话,虽然可以快速查找,但是这样的话,哈希表的用途就不大的// 因为哈希表只能让查询操作变为O(1),但是删除还是O(N),因为vector还是要遍历的// 那能否就单纯只有一个vector呢?不能,消费者删除需求并不高,所以曾经我们用的vector,但是有了消息拉取功能之后,获取消费者的需求就很高了// 所以不能只有一个vector,必须要有一个哈希表// 而哈希表不能跟vector打好配合,所以我们将vector改为list,将RR轮转的负载均衡变为LRU式的负载均衡// 每次select时从队头取最近最少使用的消费者,访问之后放到队尾Consumer::ptr getConsumer(const std::string &consumer_tag){std::unique_lock<std::mutex> ulock(_mutex);auto iter_map = _consumer_map.find(consumer_tag);if (iter_map != _consumer_map.end()){iter_type iter_list = iter_map->second; // iter_list是链表的迭代器// 将iter_list移动到队尾_consumer_list.splice(_consumer_list.end(), _consumer_list, iter_list);return *iter_list;}return Consumer::ptr();}private:using iter_type = std::list<Consumer::ptr>::iterator;std::string _vhost_name;std::string _qname;std::mutex _mutex;std::list<Consumer::ptr> _consumer_list;std::unordered_map<std::string, iter_type> _consumer_map;
};

然后在总体消费者管理模块当中添加:

Consumer::ptr getConsumer(const std::string vhost_name, const std::string &qname, const std::string &consumer_tag)
{std::ostringstream oss;oss << "获取消费者失败,因为未能找到该队列消费者管理模块,qname = " << qname << "\n";QueueConsumerManager::ptr qcmp = getQueueConsumerManager(vhost_name, qname, oss);if (qcmp.get() == nullptr){return Consumer::ptr();}return qcmp->getConsumer(consumer_tag);
}

3.信道模块实现

  1. 拿到消费者和消息
  2. 封装异步任务,抛入线程池
    异步任务:
    1. 调用该消费者的消费处理回调函数
    2. auto_ack的问题
  3. 返回基础响应

不要忘了:服务器所描述的消费者的消费处理回调函数的功能仅仅是:
向消费者发送基础消费处理响应BasicConsumeResponse

void basicPull(const BasicPullRequestPtr &req)
{// 1. 拿到该消费者Consumer::ptr cp = _consumer_manager_ptr->getConsumer(req->vhost_name(), req->queue_name(), req->consumer_tag());if(cp.get()==nullptr){default_error("拉取消息失败,因为消费者不存在,消费者tag:%s",req->consumer_tag().c_str());basicResponse(req->req_id(), req->channel_id(), false);return;}// 2. 拿到消息MessagePtr mp = _vhost_manager_ptr->basicConsume(req->vhost_name(), req->queue_name());if(mp.get()==nullptr){default_error("拉取消息失败,因为该队列没有待推送消息,队列名:%s",req->queue_name().c_str());basicResponse(req->req_id(), req->channel_id(), false);return;}// 3. 封装异步任务,抛入线程池auto func = [cp, mp, req, this](){// 3. 调用该消费者的消费处理回调函数cp->_callback(cp->_consumer_tag, mp->mutable_valid()->mutable_properities(), mp->valid().body());// 4. auto_ack的问题if (cp->_auto_ack){this->_vhost_manager_ptr->basicAck(req->vhost_name(), req->queue_name(), mp->valid().properities().msg_id());}};_pool_ptr->put(func);// 4. 基础相应basicResponse(req->req_id(),req->channel_id(),true);
}

4.broker服务器注册响应业务函数

_dispatcher.registerMessageCallback<BasicPullRequest>(std::bind(&Server::OnBasicPull,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));
// 8. 消息拉取
void OnBasicPull(const muduo::net::TcpConnectionPtr &conn, const BasicPullRequestPtr &req, muduo::Timestamp)
{// 1. 先看有无该连接Connection::ptr myconn = _connection_manager_ptr->getConnecion(conn);if (myconn.get() == nullptr){default_info("确认消息时,没有找到连接对应的Connection对象");return;}// 2. 获取信道Channel::ptr mychannel = myconn->getChannel(req->channel_id());if (mychannel.get() == nullptr){default_info("确认消息失败,因为获取信道失败");return;}mychannel->basicPull(req);
}

四、客户端修改

客户端只需要修改Channel即可,

// 消息拉取
bool BasicPull()
{if (_consumer.get() == nullptr){default_error("消息拉取失败,该信道没有关联消费者");return false;}BasicPullRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(_consumer->_vhost_name);req.set_consumer_tag(_consumer->_consumer_tag);req.set_queue_name(_consumer->_queue_name);// 发送请求_codec->send(_conn, req);std::ostringstream oss;BasicCommonResponsePtr resp = waitResponse(rid);if (resp->ok()){default_info("消息拉取成功: %s",_consumer->_consumer_tag.c_str());}else{default_info("消息拉取失败: %s",_consumer->_consumer_tag.c_str());}return resp->ok();
}

五、修改消息推送逻辑–设计

1.考量

我们之前的消息推送逻辑是:当生产者发布消息之后,我们会立刻找消费者去推送该消息,如果没有消费者,那么就会丢弃该消息
这个逻辑在之前只有消息推送功能时,是正确的,因为不丢弃消息就是浪费资源

而现在我们实现了消息拉取功能,此时这种情况就可以不丢弃消息,而将其存放到队列当中,等待消费者进行拉取

但是推送的消息要不要跟拉取的消息分割一下呢,
让生产者发布消息时选择是推送的,还是拉取的,还是都有

因为一些消息具有实时性,更希望快速被处理,就可以放到待推送当中。而那些对实时性没这么高要求的,就可以放到待确认

2.要不要给BOTH消息推送机制呢?

BOTH的意思是:这个消息既放到待推送链表当中,又放到待拉取链表当中

因为我们的消息推送是放到异步线程池当中去跑的,所以存在拉取快于推送的情况

哪个快听谁的,如果服务器先推送,那么这个消息就按推送走
如果服务器先拉取,那么这个消息就按拉取走

因为我们拉取之后,会将该消息从推送当中删除
推送之后,会将该消息从拉取当中删除,而且操作都因为加了互斥锁而成为了原子操作

所以不会存在同一消息被消费2次的情形

因为消息被消费只有3种情况:

  1. 推送(推送时会将消息从拉取当中删除,所以不怕该消息同时被拉取)
  2. 推送进行之前被拉取(此时推送就找不到消息了,没事)
  3. 推送失败之后被拉取(推送时会从拉取当中删除,推送失败之后会将消息放到拉取当中,所以没事)

因此BOTH是可以的,所以我们给上BOTH

3.设计

如何修改呢?
在这里插入图片描述
消息推送如果失败,也是将其放到待拉取消息链表当中,等待消费者主动拉取

持久化消息恢复之后,将其放到待拉取消息链表当中,等待消费者主动拉取

现在有个问题:
既然持久化的消息在恢复之后是直接放到待拉取消息链表当中的,那么有必要将消息的推送机制一并持久化吗?

没必要

六、实现

1.修改消息proto文件

// 3. 消息的推送机制:推送/拉取/推+拉
enum PublishMechanism
{UNKNOWNMECHANISM = 0;PUSH = 1;PULL = 2;BOTH = 3;
}// 4. 消息的基本属性
message BasicProperities
{string msg_id = 1;DeliveryMode mode = 2;string routing_key = 3;
}message Message 
{message ValidLoad{string body = 1;BasicProperities properities = 2;string valid = 3;// 因为bool的true/false在protobuf当中持久化后的长度不同,因此我们不用bool,而是用"0"代表无效,"1"代表有效}ValidLoad valid = 1;uint64 offset = 2;uint64 len = 3;  PublishMechanism mechanism = 4;
}
protoc --cpp_out=. mq_msg.proto

2.修改队列消息管理模块

1.成员的修改

内部类
struct iter_node
{using iter_type = std::list<MessagePtr>::iterator;iter_type push_iter;iter_type pull_iter;
};std::list<MessagePtr> _waitpush_list;
std::list<MessagePtr> _waitpull_list;
std::unordered_map<MessagePtr,iter_node> _waitpublish_map;

2.recovery恢复历史消息的修改

把别忘了gc恢复的待拉取消息链表存入_waitpublish_map当中!!

void recovery()
{std::unique_lock<std::mutex> ulock(_mutex);// 1. 恢复历史消息,将消息放入待拉取消息链表当中_waitpull_list = _mapper.gc();// 2. 遍历待拉取消息链表,将其中的消息都放入_waitpublish_map当中for (auto iter_list = _waitpull_list.begin(); iter_list != _waitpull_list.end(); ++iter_list){MessagePtr mp = *iter_list;_waitpublish_map[mp].pull_iter = iter_list;_waitpublish_map[mp].push_iter = _waitpush_list.end();}// 3. 将gc后的消息放到持久化哈希表中for (auto &mp : _waitpull_list){_durable_map[mp->valid().properities().msg_id()] = mp;}// 2. 更新持久化消息总数和有效消息总数_total_count = _valid_count = _durable_map.size();
}

3.发布消息的修改

bool publishMessage(const BasicProperities *bp, const std::string &body, DeliveryMode mode, PublishMechanism mechanism)
{// 消息能否持久化取决于队列是否持久化,因此消息的持久化与否不仅要看bp当中的mode,还要看DeliveryMode mode// 只有当DeliveryMode mode是持久化时,才看bp当中的mode,否则一律不持久化// 1. 构建消息智能指针MessagePtr mp = std::make_shared<Message>();mp->mutable_valid()->set_body(body);mp->mutable_valid()->set_valid("1");mp->mutable_valid()->mutable_properities()->set_msg_id(bp->msg_id());mp->mutable_valid()->mutable_properities()->set_routing_key(bp->routing_key());DeliveryMode final_mode = (mode == DURABLE && bp->mode() == DURABLE) ? DURABLE : UNDURABLE;mp->mutable_valid()->mutable_properities()->set_mode(final_mode);mp->set_mechanism(mechanism);// 加锁std::unique_lock<std::mutex> ulock(_mutex);// 2. 看是否需要持久化if (final_mode == DURABLE){if (!_mapper.insert(mp)){default_error("发布消息失败, 因为消息持久化失败, 消息ID: %s",bp->msg_id().c_str());return false;}// 放到持久化哈希表中_durable_map[bp->msg_id()] = mp;_total_count++;_valid_count++;}// 3. 根据消息发布机制来进行消息存储// 1. 放到待推送链表if (mechanism == PUSH){_waitpush_list.push_back(mp);_waitpublish_map[mp].push_iter = std::prev(_waitpush_list.end());_waitpublish_map[mp].pull_iter = _waitpull_list.end();}else if (mechanism == PULL){_waitpull_list.push_back(mp);_waitpublish_map[mp].push_iter = _waitpush_list.end();_waitpublish_map[mp].pull_iter = std::prev(_waitpull_list.end());}else if (mechanism == BOTH){_waitpush_list.push_back(mp);_waitpull_list.push_back(mp);_waitpublish_map[mp].push_iter = std::prev(_waitpush_list.end());_waitpublish_map[mp].pull_iter = std::prev(_waitpull_list.end());}else{default_error("发布消息失败,因为消息的发布机制未知, 消息ID: %s",bp->msg_id().c_str());return false;}return true;
}

3.front的修改

抽离出一个共用函数:

MessagePtr front(std::list<MessagePtr> &main_list, std::list<MessagePtr> &sub_list, bool ispush)
{std::unique_lock<std::mutex> ulock(_mutex);// 0.加锁并判空if (main_list.empty()){return MessagePtr();}// 1.从链表取消息,设置待确认状态MessagePtr mp = main_list.front();main_list.pop_front();_waitack_map[mp->valid().properities().msg_id()] = mp;// 2.在另一个链表当中进行删除auto iter_hash = _waitpublish_map.find(mp);// 假设它是push,那么另一个链表就是pulliter_node::iter_type iter_list = iter_hash->second.pull_iter;if (!ispush){iter_list = iter_hash->second.push_iter;}if (iter_list != sub_list.end()){sub_list.erase(iter_list);}// 3.在哈希表当中删除_waitpublish_map.erase(iter_hash);return mp;
}

从待推送链表取消息:

MessagePtr push_list_front()
{return front(_waitpush_list,_waitpull_list,true);
}

从待拉取链表取消息:

MessagePtr pull_list_front()
{return front(_waitpull_list,_waitpush_list,false);
}

4.clear的修改

// 需要提供销毁该队列所有信息的方法(删除队列时要用)
void clear()
{std::unique_lock<std::mutex> ulock(_mutex);_mapper.removeFile();_waitpush_list.clear();_waitpull_list.clear();_waitpublish_map.clear();_waitack_map.clear();_durable_map.clear();_valid_count = _total_count = 0;
}

3.总体消息管理模块修改

1.PublishMessage加一个参数

bool publishMessage(const std::string &qname, const BasicProperities *bp, const std::string &body, DeliveryMode mode, PublishMechanism mechanism)
{QueueMessageManager::ptr qmmp;{std::unique_lock<std::mutex> ulock(_mutex);auto iter = _qmsg_map.find(qname);if (iter == _qmsg_map.end()){default_error("发布消息失败,因为该队列的消息管理模块句柄尚未初始化");return false;}qmmp = iter->second;}return qmmp->publishMessage(bp, body, mode, mechanism);
}

2.获取链表队头消息修改

MessagePtr push_list_front(const std::string &qname)
{QueueMessageManager::ptr qmmp;{std::unique_lock<std::mutex> ulock(_mutex);auto iter = _qmsg_map.find(qname);if (iter == _qmsg_map.end()){default_error("获取待推送消息失败,因为该队列的消息管理模块句柄尚未初始化");return MessagePtr();}qmmp = iter->second;}return qmmp->push_list_front();
}MessagePtr pull_list_front(const std::string &qname)
{QueueMessageManager::ptr qmmp;{std::unique_lock<std::mutex> ulock(_mutex);auto iter = _qmsg_map.find(qname);if (iter == _qmsg_map.end()){default_error("获取待拉取消息失败,因为该队列的消息管理模块句柄尚未初始化");return MessagePtr();}qmmp = iter->second;}return qmmp->pull_list_front();
}

至此,消息模块就搞完了,下面顺着这个层状结构往上找,去修改虚拟机

4.虚拟机模块的修改

1.发布消息增加一个参数

1.虚拟机模块
bool basicPublish(const std::string &qname, const BasicProperities *bp, const std::string &body,PublishMechanism mechanism)
{// 在这里能够知道队列的持久化方式,因此就能够传递durable了//  1. 查找该队列的ptr,看是否存在,拿到durable//  2. 发布消息MsgQueue::ptr mqp = _mqmp->getMsgQueue(qname);if (mqp.get() == nullptr){default_error("发布消息失败,因为该队列不存在, 队列名: %s",qname.c_str());return false;}return _mmp->publishMessage(qname, bp, body, (mqp->durable) ? DURABLE : UNDURABLE,mechanism);
}
2.虚拟机管理模块
bool basicPublish(const std::string &vname, const std::string &qname, const BasicProperities *bp, const std::string &body, PublishMechanism mechanism)
{std::ostringstream oss;oss << "发布消息失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->basicPublish(qname, bp, body, mechanism);
}

2.推送和拉取消息的修改

1.虚拟机模块
// 推送[消费]消息
MessagePtr basicConsume(const std::string &qname)
{return _mmp->push_list_front(qname);
}// 拉取消息
MessagePtr basicPull(const std::string &qname)
{return _mmp->pull_list_front(qname);
}
2.虚拟机管理模块
// 推送[消费]消息
MessagePtr basicConsume(const std::string &vname, const std::string &qname)
{std::ostringstream oss;oss << "推送消息失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return MessagePtr();}return vhp->basicConsume(qname);
}// 拉取消息
MessagePtr basicPull(const std::string &vname, const std::string &qname)
{std::ostringstream oss;oss << "拉取消息失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return MessagePtr();}return vhp->basicPull(qname);
}

然后虚拟机模块就搞定了,再往上走,修改信道

5.proto网络通信协议修改

因为信道是网络服务模块,且生产者要提供的参数多了一个,所以我们需要修改一下proto网络通信协议

1.BasicPublishRequest

其实就是给BasicPublishRequest加一个PublishMechanism 字段而已

//7. 消息的发布与确认
message BasicPublishRequest
{string req_id = 1;string channel_id = 2;string vhost_name = 3;string exchange_name = 4;   //需要用户指定:消息发布到哪个交换机上,然后我们给他进行路由匹配,放到对应队列当中string body = 5;BasicProperities properities = 6;PublishMechanism mechanism = 7;// 消息的发布方式
}
protoc --cpp_out=. mq_proto.proto

6.信道模块修改

1.发布消息的修改

  1. 给basicPublish多传一个参数req->mechanism()
  2. 只有当该消息是PUSH的发布机制时,才需要将publishCallback封装为异步任务抛入线程池
void basicPublish(const BasicPublishRequestPtr &req)
{// 1. 先找到该交换机的交换机类型Exchange::ptr ep = _vhost_manager_ptr->getExchange(req->vhost_name(), req->exchange_name());if (ep.get() == nullptr){default_error("发布消息失败,因为交换机不存在\n,交换机名称:%s",req->exchange_name().c_str());basicResponse(req->req_id(), req->channel_id(), false);return;}// 2. 先找到消息发布的交换机  绑定的所有队列MsgQueueBindingMap qmap = _vhost_manager_ptr->getAllBindingsByExchange(req->vhost_name(), req->exchange_name());// 3. 遍历所有队列,进行路由匹配与消息投递for (auto &kv : qmap){Binding::ptr bp = kv.second;BasicProperities *properities = nullptr;::std::string routing_key;if (req->has_properities()){properities = req->mutable_properities();routing_key = properities->routing_key();}if (Router::route(routing_key, bp->binding_key, ep->type)){// 把消息投递到指定队列_vhost_manager_ptr->basicPublish(req->vhost_name(), bp->queue_name, properities, req->body(), req->mechanism());// 判断该消息是否需要推送if (req->mechanism() == PUSH || req->mechanism() == BOTH){// 5. 向线程池添加一个消息消费任务,消费任务交给线程池中的线程去做,解放Channel线程去做更重要的任务auto func = ::std::bind(&Channel::publishCallback, this, req->vhost_name(), bp->queue_name);_pool_ptr->put(func);}}}// 返回响应即可basicResponse(req->req_id(), req->channel_id(), true);
}

2.publishCallback的修改

注意注意:

  • 当publishCallback推送消息找不到消费者时,要将该消息放到待拉取消息链表当中
  • 放过去的时候,将消息推送机制改为PULL
1.坑点–连锁BUG

这里有一个连锁BUG问题:
我们复用basicPublish的时候,会复用到消息管理模块当中的新增消息
如果我们的消息是持久化的,那么就会重复持久化,持久化时又会修改消息结构体当中的offset字段,因此ACK的时候就只能删除持久化消息副本,而无法删除原件

此时就BUG了:那么我们想当然地就会这么想:
那我把DeliveryMode改成UNDURABLE不就行了?

mp->mutable_valid()->mutable_properities()->set_mode(UNDURABLE);

不行的,因为我们ACK的时候,是否需要删除持久化消息是看该消息的DeliveryMode
因此这样的话,ACK时就无法删除该消息了,也是BUG

怎么办呢?

2.解决方案
  1. 从需求解决问题:
    我们想要的其实就是把一个MessagePtr放到待拉取消息链表当中,因此让消息管理模块提供这么一个接口不就行了吗

  2. 从复用方面解决问题:
    我们依然选择持久化,但是在调用basicPublish之前先调用一下basicAck,将原件先ACK了

  3. 从ACK方面方面解决问题:
    修改ACK:是否删除持久化消息不依据DeliveryMode,而是依赖于该消息是否在持久化哈希表当中

下面我们进行选择:

  1. 效率:第2种由于需要复用ACK,也就需要进行IO操作,效率低,所以淘汰第2种
  2. 业务/代码优雅角度:第3种必须依赖于持久化哈希表,而不能依赖于DeliveryMode,不好,而且修改DeliveryMode不利于排查BUG时的调试,不好

因此,我们选择第1种

3.实现接口

QueueMessageManager:

// 提供向待拉取消息链表当中插入数据
void insert_pull(const MessagePtr& mp)
{std::unique_lock<std::mutex> ulock(_mutex);_waitpull_list.push_back(mp);_waitpublish_map[mp].pull_iter=std::prev(_waitpull_list.end());_waitpublish_map[mp].push_iter=_waitpush_list.end();
}

MessageManager:

// 提供向待拉取消息链表当中插入数据
bool insert_pull(const std::string &qname, const MessagePtr &mp)
{QueueMessageManager::ptr qmmp;{std::unique_lock<std::mutex> ulock(_mutex);auto iter = _qmsg_map.find(qname);if (iter == _qmsg_map.end()){default_error("发布消息失败,因为该队列的消息管理模块句柄尚未初始化");return false;}qmmp = iter->second;}qmmp->insert_pull(mp);return true;
}

VirtualHost:

// 将消息放入待拉取消息链表当中
bool insert_pull(const std::string& qname,const MessagePtr& mp)
{return _mmp->insert_pull(qname,mp);
}

VirtualHostManager:

// 将消息放入待拉取消息链表当中
bool insert_pull(const std::string &vname, const std::string &qname, const MessagePtr &mp)
{std::ostringstream oss;oss << "将消息放入待拉取消息链表失败,因为虚拟机不存在, 队列名称: " << qname << ", 虚拟机名称: " << vname << "\n";VirtualHost::ptr vhp = getVirtualHost(vname, oss);if (vhp.get() == nullptr){return false;}return vhp->insert_pull(qname, mp);
}
4.publishCallback的修改
//  推送消息(取出消息,取出消费者,调用对应消费者的消费处理回调函数)
//  不能先取消费者,因为那样会导致 在无消费者的情况下,待推送消息在链表当中堆积的情况
//  而通过先取消息,再取消费者,将消息放到待拉取消息链表当中,等待有消费者拉取
void publishCallback(const ::std::string &vname, const ::std::string &qname)
{// 1.取出消息MessagePtr mp = _vhost_manager_ptr->basicConsume(vname, qname);if (mp.get() == nullptr){default_info("消息的消费失败, 因为消息队列为空,没有消息: %s",qname.c_str());return;}// 2.取出消费者Consumer::ptr cp = _consumer_manager_ptr->selectConsumer(vname, qname);if (cp.get() == nullptr){default_info("该队列中暂无消费者,将该消息放入该队列的待拉取消息链表当中 %s",qname.c_str());if (mp->mechanism() == PUSH || mp->mechanism() == BOTH){// 这里要将该消息重新添加到待拉取消息链表当中_vhost_manager_ptr->insert_pull(vname, qname, mp);}return;}// 3.调用消费者的消费处理回调函数cp->_callback(cp->_consumer_tag, mp->mutable_valid()->mutable_properities(), mp->valid().body());default_info("调用消费者的消费处理回调函数成功 %s",qname.c_str());// 4.如果消费者有自动确认标志,则进行自动确认if (cp->_auto_ack == true){_vhost_manager_ptr->basicAck(vname, qname, mp->valid().properities().msg_id());}
}

3.拉取消息的修改

void basicPull(const BasicPullRequestPtr &req)
{// 1. 拿到该消费者Consumer::ptr cp = _consumer_manager_ptr->getConsumer(req->vhost_name(), req->queue_name(), req->consumer_tag());if(cp.get()==nullptr){default_error("拉取消息失败,因为消费者不存在,消费者tag:%s",req->consumer_tag().c_str());basicResponse(req->req_id(), req->channel_id(), false);return;}// 2. 拿到消息MessagePtr mp = _vhost_manager_ptr->basicPull(req->vhost_name(), req->queue_name());if(mp.get()==nullptr){default_error("拉取消息失败,因为该队列没有待推送消息,队列名:%s",req->queue_name().c_str());basicResponse(req->req_id(), req->channel_id(), false);return;}// 3. 封装异步任务,抛入线程池auto func = [cp, mp, req, this](){// 3. 调用该消费者的消费处理回调函数cp->_callback(cp->_consumer_tag, mp->mutable_valid()->mutable_properities(), mp->valid().body());// 4. auto_ack的问题if (cp->_auto_ack){this->_vhost_manager_ptr->basicAck(req->vhost_name(), req->queue_name(), mp->valid().properities().msg_id());}};_pool_ptr->put(func);// 4. 基础相应basicResponse(req->req_id(),req->channel_id(),true);
}

7.客户端修改

就是给Channel多加一个参数而已

bool BasicPublish(const std::string &vhost_name, const std::string &exchange_name, const BasicProperities *bp, const std::string &body,PublishMechanism mechanism)
{BasicPublishRequest req;std::string rid = UUIDHelper::uuid();req.set_req_id(rid);req.set_channel_id(_channel_id);req.set_vhost_name(vhost_name);req.set_exchange_name(exchange_name);req.set_body(body);req.set_mechanism(mechanism);if (bp != nullptr){req.mutable_properities()->set_msg_id(bp->msg_id());req.mutable_properities()->set_mode(bp->mode());req.mutable_properities()->set_routing_key(bp->routing_key());}// 发送请求_codec->send(_conn, req);BasicCommonResponsePtr resp = waitResponse(rid);if (resp->ok()){default_info("发布消息成功 %s",body.c_str());}else{default_info("发布消息失败 %s",body.c_str());}return resp->ok();
}

七、验证

1.消息拉取功能与恢复功能联合测试

我们的验证方式是:

  1. 先让生产者跑,然后再让消费者跑,消费者能够拉取消息,则成功
  2. 让生产者跑,制造持久化未确认消息,然后服务器重启(恢复历史消息),然后消费者跑,消费者能够拉取消息,则成功

1.测试1

1.生产者

消息的发布机制就给PUSH了

#include "connection.hpp"
using namespace ns_mq;
#include <thread>
#include <vector>
using namespace std;// host1
void publisher1(const Connection::ptr &conn, const std::string &thread_name)
{// 1. 创建信道Channel::ptr cp = conn->getChannel();// 2. 创建虚拟机,交换机,队列,并进行绑定cp->declareVirtualHost("host1", "./host1/resource.db", "./host1/message");cp->declareExchange("host1", "exchange1", TOPIC, true, false, {});cp->declareMsgQueue("host1", "queue1", true, false, false, {});cp->declareMsgQueue("host1", "queue2", true, false, false, {});cp->bind("host1", "exchange1", "queue1", "news.sport.#");cp->bind("host1", "exchange1", "queue2", "news.*.zhangsan");// 3. 发送10条消息BasicProperities bp;bp.set_mode(DURABLE);bp.set_routing_key("news.sport.basketball");for (int i = 0; i < 10; i++){bp.set_msg_id(UUIDHelper::uuid());cp->BasicPublish("host1", "exchange1", &bp, "Hello -" + std::to_string(i), PUSH);}// 4. 关闭信道conn->returnChannel(cp);
}// host2
void publisher2(const Connection::ptr &conn, const std::string &thread_name)
{// 1. 创建信道Channel::ptr cp = conn->getChannel();// 2. 创建虚拟机,交换机,队列,并进行绑定cp->declareVirtualHost("host2", "./host2/resource.db", "./host2/message");cp->declareExchange("host2", "exchange1", TOPIC, true, false, {});cp->declareMsgQueue("host2", "queue1", true, false, false, {});cp->declareMsgQueue("host2", "queue2", true, false, false, {});cp->bind("host2", "exchange1", "queue1", "news.sport.#");cp->bind("host2", "exchange1", "queue2", "news.*.zhangsan");// 3. 发送10条消息BasicProperities bp;bp.set_mode(DURABLE);bp.set_routing_key("news.sport.basketball");for (int i = 0; i < 10; i++){bp.set_msg_id(UUIDHelper::uuid());cp->BasicPublish("host2", "exchange1", &bp, "Hello -" + std::to_string(i), PUSH);}// 4. 关闭信道conn->returnChannel(cp);
}int main()
{AsyncWorker::ptr worker = std::make_shared<AsyncWorker>();Connection::ptr myconn = std::make_shared<Connection>("127.0.0.1", 8888, worker);vector<thread> thread_v;thread_v.push_back(thread(publisher1, myconn, "thread1"));thread_v.push_back(thread(publisher2, myconn, "thread2"));for (auto &t : thread_v)t.join();return 0;
}
2.消费者

订阅完队列之后,每隔1s拉取一次消息

#include "connection.hpp"
using namespace ns_mq;
#include <thread>
#include <vector>
#include <thread>
using namespace std;// 因为要拿到信道才能进行确认,所以这里需要把Channel::ptr bind过来
void Callback(const Channel::ptr &cp, const std::string &consumer_tag, const BasicProperities *bp, const std::string &body)
{// 1. 消费消息std::string id;if (bp != nullptr){id = bp->msg_id();}std::cout << consumer_tag << " 消费了消息: " << body << ", 消息ID: " << id << "\n";// 2. 确认消息if (bp != nullptr)std::cout << cp->BasicAck(id) << "\n";
}void consumer1(const Connection::ptr &conn, const std::string &thread_name)
{Channel::ptr cp = conn->getChannel();default_debug("consumer1: 信道ID:",cp->cid().c_str());// 2. 创建虚拟机,交换机,队列,并进行绑定cp->declareVirtualHost("host1", "./host1/resource.db", "./host1/message");cp->declareExchange("host1", "exchange1", TOPIC, true, false, {});cp->declareMsgQueue("host1", "queue1", true, false, false, {});cp->declareMsgQueue("host1", "queue2", true, false, false, {});cp->bind("host1", "exchange1", "queue1", "news.sport.#");cp->bind("host1", "exchange1", "queue2", "news.*.zhangsan");// 3. 创建消费者cp->BasicConsume("host1", "consumer1", "queue1",std::bind(Callback, cp, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3), false);// 4. 等待消息while (true){cp->BasicPull();std::this_thread::sleep_for(std::chrono::seconds(1));}// 5. 关闭信道conn->returnChannel(cp);
}void consumer2(const Connection::ptr &conn, const std::string &thread_name)
{Channel::ptr cp = conn->getChannel();default_debug("consumer2: 信道ID:",cp->cid().c_str());// 2. 创建虚拟机,交换机,队列,并进行绑定cp->declareVirtualHost("host2", "./host2/resource.db", "./host2/message");cp->declareExchange("host2", "exchange1", TOPIC, true, false, {});cp->declareMsgQueue("host2", "queue1", true, false, false, {});cp->declareMsgQueue("host2", "queue2", true, false, false, {});cp->bind("host2", "exchange1", "queue1", "news.sport.#");cp->bind("host2", "exchange1", "queue2", "news.*.zhangsan");// 3. 创建消费者cp->BasicConsume("host2", "consumer2", "queue1",std::bind(Callback, cp, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3), false);// 4. 等待消息while (true){cp->BasicPull();std::this_thread::sleep_for(std::chrono::seconds(1));}// 5. 关闭信道conn->returnChannel(cp);
}int main()
{AsyncWorker::ptr worker = std::make_shared<AsyncWorker>();// 1. 创建连接和信道Connection::ptr conn = std::make_shared<Connection>("127.0.0.1", 8888, worker);vector<thread> thread_v;thread_v.push_back(thread(consumer1, conn, "thread1"));thread_v.push_back(thread(consumer2, conn, "thread2"));for (auto &t : thread_v)t.join();return 0;
}
3.演示

先让生产者跑,然后再让消费者跑,消费者能够拉取消息,则成功
在这里插入图片描述
在这里插入图片描述

2.测试2 – 演示

让生产者跑,制造持久化未确认消息,然后服务器重启(恢复历史消息),然后消费者跑,消费者能够拉取消息,则成功
在这里插入图片描述

2.PULL测试

我们的验证方式是:客户端纯拉取,所有消息必须是由拉取进行消费的

因为客户端拉取消息是每1s拉取一次,所以拉取消息会持续10s,如果是推送的话,那么一瞬间就会搞定

先让消费者跑,再让生产者跑
只需要把生产者发布消息时的发布机制改一下即可

cp->BasicPublish("host2", "exchange1", &bp, "Hello -" + std::to_string(i), PULL);

演示:
在这里插入图片描述

2.BOTH测试

我们在publishCallback当中故意让工作线程等上5s,这样就能让拉取快于推送了

因此:

void publishCallback(const ::std::string &vname, const ::std::string &qname)
{std::this_thread::sleep_for(std::chrono::seconds(5));
//模拟5s后异步线程才开始执行该函数,测试BOTH时使用,用来让拉取快于推送

为了保证生产者主线程退出之前异步工作线程能够执行完这些publishCallback

因此我们让生产者结束之后陷入死循环

// 4. 测试BOTH时:等待异步线程执行完publishCallback
while (true)
{std::this_thread::sleep_for(std::chrono::seconds(1000));
}// 5. 关闭信道
conn->returnChannel(cp);

演示:
在这里插入图片描述

在这里插入图片描述
验证成功

本篇博客分了两大点来进行扩展,是为了让大家更有一步步的代入感,不至于一上来就这么突兀
所以代码篇幅较大,希望大家理解

动图比较卡顿是因为帧数太少,因为CSDN不支持上传5MB以上的图片,所以只能减帧压缩体积,抱歉在这里插入图片描述

以上就是项目扩展二:消息拉取功能的实现的全部内容

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/147303.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

C++迭代器 iterator详解

目录 什么是迭代器 迭代器的类型 迭代器的用法 三种迭代器 范围for 什么是迭代器 它提供了一种访问容器&#xff08;如列表、集合等&#xff09;中元素的方法&#xff0c;而无需暴露容器的内部表示。迭代器使得程序员能够以统一的方式遍历不同的数据结构&#xff0c;而无需…

JVM的基本概念

目录 一、JVM的内存划分 二、JVM的类加载过程 三、JVM的垃圾回收机制&#xff08;GC&#xff09; 四、分代回收 一、JVM的内存划分 一个运行起来的Java进程&#xff0c;就是一个Java虚拟机&#xff0c;就需要从操作系统中申请一大块内存。申请的内存会划分为不同的区域&…

5.工欲善其事,必先利其器!收集金融数据你必须先做这个!

在正式从网络上获取数据并存储到我们的数据库之前&#xff0c;我们还需要做一些准备工作。其中最重要的无疑是把Python环境配置好。 你可以不好好学习Python&#xff0c;毕竟我后边会一步步教大家&#xff0c;也会提供现成的Python脚本。但是你必须得在你的电脑上把Python安装…

基于51单片机无线蓝牙智能家居控制系统设计

文章目录 前言资料获取设计介绍功能介绍设计程序具体实现截图![请添加图片描述](https://i-blog.csdnimg.cn/direct/c25dac9c3044416385d22a655dee5c3d.jpeg)设计获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师&#xff…

LLM安全风险及应对

LLM安全风险主要从四个维度分析&#xff1a;用户输入、训练数据、模型本身以及工具和插件。 风险类别具体风险风险解释应对措施具体举例用户输入相关风险提示注入&#xff08;Prompt Injection&#xff09;攻击者通过设计特定输入&#xff0c;使模型生成恶意或不安全的输出。- …

FLStudio21Mac版flstudio v21.2.1.3430简体中文版下载(含Win/Mac)

给大家介绍了许多FL21版本&#xff0c;今天给大家介绍一款FL Studio21Mac版本&#xff0c;如果是Mac电脑的朋友请千万不要错过&#xff0c;当然我也不会忽略掉Win系统的FL&#xff0c;链接我会放在文章&#xff0c;供大家下载与分享&#xff0c;如果有其他问题&#xff0c;欢迎…

【成神之路】Ambari实战-011-代码生命周期-metainfo加载原理深度剖析

在 Ambari 中&#xff0c;metainfo.xml 是定义服务和组件的关键配置文件。Ambari 通过解析它来加载和管理服务的整个生命周期。今天&#xff0c;我们将深入探索 metainfo.xml 是如何被解析的&#xff0c;并以 Redis 集群服务为例&#xff0c;逐步解读 Ambari 的处理过程。&…

cv中每个patch的关联

在计算机视觉任务中&#xff0c;当图像被划分为多个小块&#xff08;patches&#xff09;时&#xff0c;每个 patch 的关联性可以通过不同的方法来计算。具体取决于使用的模型和任务&#xff0c;以下是一些常见的计算 patch 关联性的方法&#xff1a; 1. Vision Transformer (…

Java : 图书管理系统

图书管理系统的作用&#xff1a; 高效的图书管理 图书管理系统通过自动化管理&#xff0c;实现了图书的采编、编目、流通管理等操作的自动化处理&#xff0c;大大提高了图书管理的效率和准确性。 工作人员可以通过系统快速查找图书信息&#xff0c;实时掌握图书的借还情况&…

【comfyUI工作流】一键生成专属欧美漫画!

现在你不需要在webui上手动设置一堆的参数 来将自己的照片转绘成欧美漫画插画 可以通过我制作的工作流一键完成转绘&#xff0c;更加效率便捷&#xff0c; 而且不需要你懂什么专业的AI绘画知识&#xff0c;会打开工作流&#xff0c;上传图片就可以 工作流特点 真实照片一键…

程序员的AI时代:拥抱变革,塑造未来

你们有没有想过&#xff0c;如果有一天&#xff0c;你的编程工作被一个AI助手取代了&#xff0c;你会怎么办&#xff1f;这不是危言耸听&#xff0c;随着AIGC技术的飞速发展&#xff0c;这样的场景可能真的会出现。但是&#xff0c;别担心&#xff0c;今天我们就来聊聊&#xf…

XSS—xss-labs靶场通关

level 1 JS弹窗函数alert() <script>alert()</script> level 2 闭合绕过 "> <script>alert()</script> <" level 3 onfocus事件在元素获得焦点时触发&#xff0c;最常与 <input>、<select> 和 <a> 标签一起使用…

[Excel VBA办公]如何使用VBA批量删除空行

在处理Excel数据时&#xff0c;空行可能会干扰数据分析和展示。以下是一个VBA代码示例&#xff0c;帮助你批量删除工作表中的空行。 1. 代码说明 此代码将遍历指定工作表&#xff0c;删除所有空行&#xff0c;确保数据整洁。 2. VBA代码 删除sheet1的空行 Sub DeleteEmptyRow…

re题(39)BUUCTF-[FlareOn3]Challenge1

BUUCTF在线评测 (buuoj.cn) 查壳是32位&#xff0c;ida打开&#xff0c;进入main函数&#xff0c;进入sub_401260看看 查看byte_413000存的字符串 _BYTE *__cdecl sub_401260(int a1, unsigned int a2) {int v3; // [espCh] [ebp-24h]int v4; // [esp10h] [ebp-20h]int v5; //…

19 基于51单片机的倒计时音乐播放系统设计

目录 一、主要功能 二、硬件资源 三、程序编程 四、实现现象 一、主要功能 五个按键&#xff0c;分别为启动按键&#xff0c;则LCD1602显示倒计时&#xff0c;音乐播放 设置按键&#xff0c;可以设置倒计时的分秒&#xff0c;然后加减按键&#xff0c;还有最后一个暂停音乐…

项目集成sharding-jdbc

目录 项目集成sharding-jdbc 1.业务分析 2.数据库构建 3.分库分表策略 项目配置默认数据源 一&#xff1a;导入sharding-jdbc依赖 二&#xff1a;在application文件中编写配置 三&#xff1a;注释掉主配置文件中配置的数据源 注意&#xff1a;这里添加了spring.main.allow…

芝士AI论文写作|开题报告、论文生成、降重、降AI、答辩PPT

芝士AI&#xff0c;免费论文查重软件,为毕业生提供专业的AI论文生成、强力降重、AIGC降低、论文重复率检测、论文降重、学术查重、学术检测、PPT生成、学术论文观点剽窃检测等一站式服务。免费论文查重_芝士AI&#xff08;PaperZZ&#xff09;论文检测__PaperZZ论文查重 是不是…

Snap 发布新一代 AR 眼镜,有什么特别之处?

Snap 发布新一代 AR 眼镜&#xff0c;有什么特别之处&#xff1f; Snap 简介 新一代的 AR 眼镜特点 Snap 简介 Snap 公司成立于 2010 年&#xff0c;2017 年美国东部时间 3 月 2 日上午 11 时许&#xff0c;在纽交所正式挂牌交易&#xff0c;股票代码为 “SNAP”。其旗下的核…

QT 信号和槽函数

信号和槽函数介绍 conncet(sender, signal, receiver, slot) /* * 1. 信号发出者&#xff1b; * 2. 信号&#xff1b; * 3. 信号接收者&#xff1b; * 4. 接受到信号执行任务&#xff1b; 槽函数 */自定义信号和槽函数 场景 &#xff1a;老师饿了&#xff0c;学生请客&#xf…

使用 KMeans 聚类算法 对鸢尾花数据集进行无监督学习的简单示例

代码功能 主要功能&#xff1a; 加载数据集&#xff1a; 代码使用 load_iris() 函数加载了鸢尾花数据集&#xff08;Iris dataset&#xff09;。这个数据集包含 150 条样本&#xff0c;每条样本有 4 个特征&#xff0c;对应于 3 种不同的鸢尾花。 KMeans 聚类&#xff1a; 使用…