✔ ★Java项目——设计一个消息队列(五)【虚拟主机设计】

虚拟主机设计

  • 创建 VirtualHost
    • 实现构造⽅法和 getter
    • 创建交换机
    • 删除交换机
    • 创建队列
    • 删除队列
    • 创建绑定
    • 删除绑定
    • 发布消息 ★
    • 路由规则
      • 1) 实现 route ⽅法
      • 2) 实现 checkRoutingKeyValid
      • 3) 实现 checkBindingKeyValid
      • 4) 实现 routeTopic
      • 5) 匹配规则测试⽤例
      • 6) 测试 Router
    • 订阅消息
      • 1) 添加⼀个订阅者
      • 2) 创建订阅者管理管理类
      • 3) 添加令牌接⼝
      • 4) 实现添加订阅者
      • 给 MsgQueue 添加⼀个订阅者列表
      • 5) 实现扫描线程
      • 6) 实现消费消息
    • ⼩结
    • 消息确认
    • 测试 VirtualHost

在这里插入图片描述

在这里插入图片描述

创建 VirtualHost

创建 mqserver.VirtualHost
在这里插入图片描述
其中 Router ⽤来定义转发规则, ConsumerManager ⽤来实现消息消费. 这两个内容后续再介绍

实现构造⽅法和 getter

在这里插入图片描述

创建交换机

• 此处的 autoDelete, arguments 其实并没有使⽤. 只是先预留出来. (RabbitMQ 是⽀持的) .
• 约定, 交换机/队列的名字, 都加上 VirtualHostName 作为前缀. 这样不同 VirtualHost 中就可以存在
同名的交换机或者队列了.
• exchangeDeclare 的语义是, 不存在就创建, 存在则直接返回. 因此不叫做 “exchangeCreate”.
• 先写硬盘, 后写内存. 因为写硬盘失败概率更⼤. 如果硬盘写失败了, 也就不必写内存了.

在这里插入图片描述

删除交换机

在这里插入图片描述

创建队列

在这里插入图片描述

删除队列

在这里插入图片描述

创建绑定

在这里插入图片描述

删除绑定

在这里插入图片描述

发布消息 ★

• 发布消息其实是把消息发送给指定的 Exchange, 再根据 Exchange 和 Queue 的 Binding 关系, 转发到对应队列中.
• 发送消息需要指定 routingKey, 这个值的作⽤和 ExchangeType 是相关的.
◦ Direct: routingKey 就是对应队列的名字. 此时不需要 binding 关系, 也不需要 bindingKey, 就可以直接转发消息.
◦ Fanout: routingKey 不起作⽤, bindingKey 也不起作⽤. 此时消息会转发给绑定到该交换机上的所有队列中.
◦ Topic: routingKey 是⼀个特定的字符串, 会和 bindingKey 进⾏匹配. 如果匹配成功, 则发到对应的队列中. 具体规则后续介绍.
• BasicProperties 是消息的元信息. body 是消息本体.

    // 发送消息到指定的交换机/队列中.public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {try {// 1. 转换交换机的名字exchangeName = virtualHostName + exchangeName;// 2. 检查 routingKey 是否合法.if (!router.checkRoutingKey(routingKey)) {throw new MqException("[VirtualHost] routingKey 非法! routingKey=" + routingKey);}// 3. 查找交换机对象Exchange exchange = memoryDataCenter.getExchange(exchangeName);if (exchange == null) {throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName);}// 4. 判定交换机的类型if (exchange.getType() == ExchangeType.DIRECT) {// 按照直接交换机的方式来转发消息// 以 routingKey 作为队列的名字, 直接把消息写入指定的队列中.// 此时, 可以无视绑定关系.String queueName = virtualHostName + routingKey;// 5. 构造消息对象Message message = Message.createMessageWithId(routingKey, basicProperties, body);// 6. 查找该队列名对应的对象MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 队列不存在! queueName=" + queueName);}// 7. 队列存在, 直接给队列中写入消息sendMessage(queue, message);} else {// 按照 fanout 和 topic 的方式来转发.// 5. 找到该交换机关联的所有绑定, 并遍历这些绑定对象ConcurrentHashMap<String, Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName);for (Map.Entry<String, Binding> entry : bindingsMap.entrySet()) {// 1) 获取到绑定对象, 判定对应的队列是否存在Binding binding = entry.getValue();MSGQueue queue = memoryDataCenter.getQueue(binding.getQueueName());if (queue == null) {// 此处咱们就不抛出异常了. 可能此处有多个这样的队列.// 希望不要因为一个队列的失败, 影响到其他队列的消息的传输.System.out.println("[VirtualHost] basicPublish 发送消息时, 发现队列不存在! queueName=" + binding.getQueueName());continue;}// 2) 构造消息对象Message message = Message.createMessageWithId(routingKey, basicProperties, body);// 3) 判定这个消息是否能转发给该队列.//    如果是 fanout, 所有绑定的队列都要转发的.//    如果是 topic, 还需要判定下, bindingKey 和 routingKey 是不是匹配.if (!router.route(exchange.getType(), binding, message)) {continue;}// 4) 真正转发消息给队列sendMessage(queue, message);}}return true;} catch (Exception e) {System.out.println("[VirtualHost] 消息发送失败!");e.printStackTrace();return false;}}private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException, InterruptedException {// 此处发送消息, 就是把消息写入到 硬盘 和 内存 上.int deliverMode = message.getDeliverMode();// deliverMode 为 1 , 不持久化. deliverMode 为 2 表示持久化.if (deliverMode == 2) {diskDataCenter.sendMessage(queue, message);}// 写入内存memoryDataCenter.sendMessage(queue, message);// 此处还需要补充一个逻辑, 通知消费者可以消费消息了.consumerManager.notifyConsume(queue.getName());}

路由规则

1) 实现 route ⽅法

    public boolean route(ExchangeType exchangeType, Binding binding, Message message) throws MqException {// 根据不同的 exchangeType 使用不同的判定转发规则.if (exchangeType == ExchangeType.FANOUT) {// 如果是 FANOUT 类型, 则该交换机上绑定的所有队列都需要转发return true;} else if (exchangeType == ExchangeType.TOPIC) {// 如果是 TOPIC 主题交换机, 规则就要更复杂一些.return routeTopic(binding, message);} else {// 其他情况是不应该存在的.throw new MqException("[Router] 交换机类型非法! exchangeType=" + exchangeType);}}

2) 实现 checkRoutingKeyValid

    public boolean checkBindingKey(String bindingKey) {if (bindingKey.length() == 0) {// 空字符串, 也是合法情况. 比如在使用 direct / fanout 交换机的时候, bindingKey 是用不上的.return true;}// 检查字符串中不能存在非法字符for (int i = 0; i < bindingKey.length(); i++) {char ch = bindingKey.charAt(i);if (ch >= 'A' && ch <= 'Z') {continue;}if (ch >= 'a' && ch <= 'z') {continue;}if (ch >= '0' && ch <= '9') {continue;}if (ch == '_' || ch == '.' || ch == '*' || ch == '#') {continue;}return false;}// 检查 * 或者 # 是否是独立的部分.// aaa.*.bbb 合法情况;  aaa.a*.bbb 非法情况.String[] words = bindingKey.split("\\.");for (String word : words) {// 检查 word 长度 > 1 并且包含了 * 或者 # , 就是非法的格式了.if (word.length() > 1 && (word.contains("*") || word.contains("#"))) {return false;}}// 约定一下, 通配符之间的相邻关系(人为(俺)约定的).// 为啥这么约定? 因为前三种相邻的时候, 实现匹配的逻辑会非常繁琐, 同时功能性提升不大~~// 1. aaa.#.#.bbb    => 非法// 2. aaa.#.*.bbb    => 非法// 3. aaa.*.#.bbb    => 非法// 4. aaa.*.*.bbb    => 合法for (int i = 0; i < words.length - 1; i++) {// 连续两个 ##if (words[i].equals("#") && words[i + 1].equals("#")) {return false;}// # 连着 *if (words[i].equals("#") && words[i + 1].equals("*")) {return false;}// * 连着 #if (words[i].equals("*") && words[i + 1].equals("#")) {return false;}}return true;}

3) 实现 checkBindingKeyValid

    // bindingKey 的构造规则:// 1. 数字, 字母, 下划线// 2. 使用 . 分割成若干部分// 3. 允许存在 * 和 # 作为通配符. 但是通配符只能作为独立的分段.public boolean checkBindingKey(String bindingKey) {if (bindingKey.length() == 0) {// 空字符串, 也是合法情况. 比如在使用 direct / fanout 交换机的时候, bindingKey 是用不上的.return true;}// 检查字符串中不能存在非法字符for (int i = 0; i < bindingKey.length(); i++) {char ch = bindingKey.charAt(i);if (ch >= 'A' && ch <= 'Z') {continue;}if (ch >= 'a' && ch <= 'z') {continue;}if (ch >= '0' && ch <= '9') {continue;}if (ch == '_' || ch == '.' || ch == '*' || ch == '#') {continue;}return false;}// 检查 * 或者 # 是否是独立的部分.// aaa.*.bbb 合法情况;  aaa.a*.bbb 非法情况.String[] words = bindingKey.split("\\.");for (String word : words) {// 检查 word 长度 > 1 并且包含了 * 或者 # , 就是非法的格式了.if (word.length() > 1 && (word.contains("*") || word.contains("#"))) {return false;}}// 约定一下, 通配符之间的相邻关系(人为(俺)约定的).// 为啥这么约定? 因为前三种相邻的时候, 实现匹配的逻辑会非常繁琐, 同时功能性提升不大~~// 1. aaa.#.#.bbb    => 非法// 2. aaa.#.*.bbb    => 非法// 3. aaa.*.#.bbb    => 非法// 4. aaa.*.*.bbb    => 合法for (int i = 0; i < words.length - 1; i++) {// 连续两个 ##if (words[i].equals("#") && words[i + 1].equals("#")) {return false;}// # 连着 *if (words[i].equals("#") && words[i + 1].equals("*")) {return false;}// * 连着 #if (words[i].equals("*") && words[i + 1].equals("#")) {return false;}}return true;}

4) 实现 routeTopic

    // [测试用例]// binding key          routing key         result// aaa                  aaa                 true// aaa.bbb              aaa.bbb             true// aaa.bbb              aaa.bbb.ccc         false// aaa.bbb              aaa.ccc             false// aaa.bbb.ccc          aaa.bbb.ccc         true// aaa.*                aaa.bbb             true// aaa.*.bbb            aaa.bbb.ccc         false// *.aaa.bbb            aaa.bbb             false// #                    aaa.bbb.ccc         true// aaa.#                aaa.bbb             true// aaa.#                aaa.bbb.ccc         true// aaa.#.ccc            aaa.ccc             true// aaa.#.ccc            aaa.bbb.ccc         true// aaa.#.ccc            aaa.aaa.bbb.ccc     true// #.ccc                ccc                 true// #.ccc                aaa.bbb.ccc         trueprivate boolean routeTopic(Binding binding, Message message) {// 先把这两个 key 进行切分String[] bindingTokens = binding.getBindingKey().split("\\.");String[] routingTokens = message.getRoutingKey().split("\\.");// 引入两个下标, 指向上述两个数组. 初始情况下都为 0int bindingIndex = 0;int routingIndex = 0;// 此处使用 while 更合适, 每次循环, 下标不一定就是 + 1, 不适合使用 forwhile (bindingIndex < bindingTokens.length && routingIndex < routingTokens.length) {if (bindingTokens[bindingIndex].equals("*")) {// [情况二] 如果遇到 * , 直接进入下一轮. * 可以匹配到任意一个部分!!bindingIndex++;routingIndex++;continue;} else if (bindingTokens[bindingIndex].equals("#")) {// 如果遇到 #, 需要先看看有没有下一个位置.bindingIndex++;if (bindingIndex == bindingTokens.length) {// [情况三] 该 # 后面没东西了, 说明此时一定能匹配成功了!return true;}// [情况四] # 后面还有东西, 拿着这个内容, 去 routingKey 中往后找, 找到对应的位置.// findNextMatch 这个方法用来查找该部分在 routingKey 的位置. 返回该下标. 没找到, 就返回 -1routingIndex = findNextMatch(routingTokens, routingIndex, bindingTokens[bindingIndex]);if (routingIndex == -1) {// 没找到匹配的结果. 匹配失败return false;}// 找到的匹配的情况, 继续往后匹配.bindingIndex++;routingIndex++;} else {// [情况一] 如果遇到普通字符串, 要求两边的内容是一样的.if (!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])) {return false;}bindingIndex++;routingIndex++;}}// [情况五] 判定是否是双方同时到达末尾// 比如 aaa.bbb.ccc  和  aaa.bbb 是要匹配失败的.if (bindingIndex == bindingTokens.length && routingIndex == routingTokens.length) {return true;}return false;}private int findNextMatch(String[] routingTokens, int routingIndex, String bindingToken) {for (int i = routingIndex; i < routingTokens.length; i++) {if (routingTokens[i].equals(bindingToken)) {return i;}}return -1;}

5) 匹配规则测试⽤例

// [测试用例]
// binding key          routing key         result
// aaa                  aaa                 true
// aaa.bbb              aaa.bbb             true
// aaa.bbb              aaa.bbb.ccc         false
// aaa.bbb              aaa.ccc             false
// aaa.bbb.ccc          aaa.bbb.ccc         true
// aaa.*                aaa.bbb             true
// aaa.*.bbb            aaa.bbb.ccc         false
// *.aaa.bbb            aaa.bbb             false
// #                    aaa.bbb.ccc         true
// aaa.#                aaa.bbb             true
// aaa.#                aaa.bbb.ccc         true
// aaa.#.ccc            aaa.ccc             true
// aaa.#.ccc            aaa.bbb.ccc         true
// aaa.#.ccc            aaa.aaa.bbb.ccc     true
// #.ccc                ccc                 true
// #.ccc                aaa.bbb.ccc         true

6) 测试 Router

package com.example.mq;import com.example.mq.common.MqException;
import com.example.mq.mqserver.core.Binding;
import com.example.mq.mqserver.core.ExchangeType;
import com.example.mq.mqserver.core.Message;
import com.example.mq.mqserver.core.Router;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class RouterTests {private Router router = new Router();private Binding binding = null;private Message message = null;@BeforeEachpublic void setUp() {binding = new Binding();message = new Message();}@AfterEachpublic void tearDown() {binding = null;message = null;}// [测试用例]// binding key          routing key         result// aaa                  aaa                 true// aaa.bbb              aaa.bbb             true// aaa.bbb              aaa.bbb.ccc         false// aaa.bbb              aaa.ccc             false// aaa.bbb.ccc          aaa.bbb.ccc         true// aaa.*                aaa.bbb             true// aaa.*.bbb            aaa.bbb.ccc         false// *.aaa.bbb            aaa.bbb             false// #                    aaa.bbb.ccc         true// aaa.#                aaa.bbb             true// aaa.#                aaa.bbb.ccc         true// aaa.#.ccc            aaa.ccc             true// aaa.#.ccc            aaa.bbb.ccc         true// aaa.#.ccc            aaa.aaa.bbb.ccc     true// #.ccc                ccc                 true// #.ccc                aaa.bbb.ccc         true@Testpublic void test1() throws MqException {binding.setBindingKey("aaa");message.setRoutingKey("aaa");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test2() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test3() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test4() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test5() throws MqException {binding.setBindingKey("aaa.bbb.ccc");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test6() throws MqException {binding.setBindingKey("aaa.*");message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test7() throws MqException {binding.setBindingKey("aaa.*.bbb");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test8() throws MqException {binding.setBindingKey("*.aaa.bbb");message.setRoutingKey("aaa.bbb");Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test9() throws MqException {binding.setBindingKey("#");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test10() throws MqException {binding.setBindingKey("aaa.#");message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test11() throws MqException {binding.setBindingKey("aaa.#");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test12() throws MqException {binding.setBindingKey("aaa.#.ccc");message.setRoutingKey("aaa.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test13() throws MqException {binding.setBindingKey("aaa.#.ccc");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test14() throws MqException {binding.setBindingKey("aaa.#.ccc");message.setRoutingKey("aaa.aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test15() throws MqException {binding.setBindingKey("#.ccc");message.setRoutingKey("ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}@Testpublic void test16() throws MqException {binding.setBindingKey("#.ccc");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));}
}

订阅消息

1) 添加⼀个订阅者

    // 订阅消息.// 添加一个队列的订阅者, 当队列收到消息之后, 就要把消息推送给对应的订阅者.// consumerTag: 消费者的身份标识// autoAck: 消息被消费完成后, 应答的方式. 为 true 自动应答. 为 false 手动应答.// consumer: 是一个回调函数. 此处类型设定成函数式接口. 这样后续调用 basicConsume 并且传实参的时候, 就可以写作 lambda 样子了.public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {// 构造一个 ConsumerEnv 对象, 把这个对应的队列找到, 再把这个 Consumer 对象添加到该队列中.queueName = virtualHostName + queueName;try {consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);System.out.println("[VirtualHost] basicConsume 成功! queueName=" + queueName);return true;} catch (Exception e) {System.out.println("[VirtualHost] basicConsume 失败! queueName=" + queueName);e.printStackTrace();return false;}}

Consumer 相当于⼀个回调函数. 放到 common.Consumer 中

@FunctionalInterface
public interface Consumer {// Delivery 的意思是 "投递", 这个方法预期是在每次服务器收到消息之后, 来调用.// 通过这个方法把消息推送给对应的消费者.// (注意! 这里的方法名和参数, 也都是参考 RabbitMQ 展开的)void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException;
}

2) 创建订阅者管理管理类

在这里插入图片描述
• parent ⽤来记录虚拟主机.
• 使⽤⼀个阻塞队列⽤来触发消息消费. 称为令牌队列. 每次有消息过来了, 都往队列中放⼀个令牌(也就是队列名), 然后消费者再去消费对应队列的消息.
• 使⽤⼀个线程池⽤来执⾏消息回调.

这样令牌队列的设定避免搞出来太多线程. 否则就需要给每个队列都安排⼀个单独的线程了, 如果队列很多则开销就⽐较⼤了

3) 添加令牌接⼝

    // 这个方法的调用时机就是发送消息的时候.public void notifyConsume(String queueName) throws InterruptedException {tokenQueue.put(queueName);}

4) 实现添加订阅者

• 新来订阅者的时候, 需要先消费掉之前积压的消息.
• consumeMessage 真正的消息消费操作, ⼀会再实现.

    public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {// 找到对应的队列.MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if (queue == null) {throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);}ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag, queueName, autoAck, consumer);synchronized (queue) {queue.addConsumerEnv(consumerEnv);// 如果当前队列中已经有了一些消息了, 需要立即就消费掉.int n = parent.getMemoryDataCenter().getMessageCount(queueName);for (int i = 0; i < n; i++) {// 这个方法调用一次就消费一条消息.consumeMessage(queue);}}}

创建 ConsumerEnv , 这个类表⽰⼀个订阅者的执⾏环境

public class ConsumerEnv {private String consumerTag;private String queueName;private boolean autoAck;// 通过这个回调来处理收到的消息.private Consumer consumer;public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {this.consumerTag = consumerTag;this.queueName = queueName;this.autoAck = autoAck;this.consumer = consumer;}

给 MsgQueue 添加⼀个订阅者列表

在这里插入图片描述
此处的 chooseConsumer 是实现⼀个轮询效果. 如果⼀个队列有多个订阅者, 将会按照轮询的⽅式轮
流拿到消息

5) 实现扫描线程

在 ConsumerManager 中创建⼀个线程, 不停的尝试扫描令牌队列. 如果拿到了令牌, 就真正触发消费消息操作

    public ConsumerManager(VirtualHost p) {parent = p;scannerThread = new Thread(() -> {while (true) {try {// 1. 拿到令牌String queueName = tokenQueue.take();// 2. 根据令牌, 找到队列MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if (queue == null) {throw new MqException("[ConsumerManager] 取令牌后发现, 该队列名不存在! queueName=" + queueName);}// 3. 从这个队列中消费一个消息.synchronized (queue) {consumeMessage(queue);}} catch (InterruptedException | MqException e) {e.printStackTrace();}}});// 把线程设为后台线程.scannerThread.setDaemon(true);scannerThread.start();}

6) 实现消费消息

    private void consumeMessage(MSGQueue queue) {// 1. 按照轮询的方式, 找个消费者出来.ConsumerEnv luckyDog = queue.chooseConsumer();if (luckyDog == null) {// 当前队列没有消费者, 暂时不消费. 等后面有消费者出现再说.return;}// 2. 从队列中取出一个消息Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());if (message == null) {// 当前队列中还没有消息, 也不需要消费.return;}// 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.workerPool.submit(() -> {try {// 1. 把消息放到待确认的集合中. 这个操作势必在执行回调之前.parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);// 2. 真正执行回调操作luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(),message.getBody());// 3. 如果当前是 "自动应答" , 就可以直接把消息删除了.//    如果当前是 "手动应答" , 则先不处理, 交给后续消费者调用 basicAck 方法来处理.if (luckyDog.isAutoAck()) {// 1) 删除硬盘上的消息if (message.getDeliverMode() == 2) {parent.getDiskDataCenter().deleteMessage(queue, message);}// 2) 删除上面的待确认集合中的消息parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());// 3) 删除内存中消息中心里的消息parent.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println("[ConsumerManager] 消息被成功消费! queueName=" + queue.getName());}} catch (Exception e) {e.printStackTrace();}});}

注意: ⼀个队列可能有 N 个消费者, 此处应该按照轮询的⽅式挑⼀个消费者进⾏消费.

⼩结

⼀. 消费消息的两种典型情况

  1. 订阅者已经存在了, 才发送消息
    这种直接获取队列的订阅者, 从中按照轮询的⽅式挑⼀个消费者来调⽤回调即可.
  2. 消息先发送到队列了, 订阅者还没到.
    此时当订阅者到达, 就快速把指定队列中的消息全都消费掉.
    ⼆. 关于消息不丢失的论证
    每个消息在从内存队列中出队列时, 都会先进⼊ 待确认 中.
    • 如果 autoAck 为 true
    消息被消费完毕后(执⾏完消息回调之后), 再执⾏清除⼯作.
    分别清除硬盘数据, 待确认队列, 消息中⼼.
    • 如果 autoAck 为 false
    在回调内部, 进⾏清除⼯作.
    分别清除硬盘数据, 待确认队列, 消息中⼼.
  3. 执⾏消息回调的时候抛出异常
    此时消息仍然处在待确认队列中.
    此时可以⽤⼀个线程扫描待确认队列, 如果发现队列中的消息超时未确认, 则放⼊死信队列.
    死信队列咱们此处暂不实现.
  4. 执⾏消息回调的时候服务器宕机
    内存所有数据都没了, 但是消息在硬盘上仍然存在. 会在服务下次启动的时候, 加载回内存. 重新被消费到.

消息确认

在这里插入图片描述

测试 VirtualHost

package com.example.mq;import com.example.mq.common.Consumer;
import com.example.mq.mqserver.VirtualHost;
import com.example.mq.mqserver.core.BasicProperties;
import com.example.mq.mqserver.core.ExchangeType;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.SpringBootTest;import java.io.File;
import java.io.IOException;@SpringBootTest
public class VirtualHostTests {private VirtualHost virtualHost = null;@BeforeEachpublic void setUp() {MqApplication.context = SpringApplication.run(MqApplication.class);virtualHost = new VirtualHost("default");}@AfterEachpublic void tearDown() throws IOException {MqApplication.context.close();virtualHost = null;// 把硬盘的目录删除掉File dataDir = new File("./data");FileUtils.deleteDirectory(dataDir);}@Testpublic void testExchangeDeclare() {boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);}@Testpublic void testExchangeDelete() {boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDelete("testExchange");Assertions.assertTrue(ok);}@Testpublic void testQueueDeclare() {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);}@Testpublic void testQueueDelete() {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueDelete("testQueue");Assertions.assertTrue(ok);}@Testpublic void testQueueBind() {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueue", "testExchange", "testBindingKey");Assertions.assertTrue(ok);}@Testpublic void testQueueUnbind() {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueue", "testExchange", "testBindingKey");Assertions.assertTrue(ok);ok = virtualHost.queueUnbind("testQueue", "testExchange");Assertions.assertTrue(ok);}@Testpublic void testBasicPublish() {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);ok = virtualHost.basicPublish("testExchange", "testQueue", null,"hello".getBytes());Assertions.assertTrue(ok);}// 先订阅队列, 后发送消息@Testpublic void testBasicConsume1() throws InterruptedException {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);// 先订阅队列ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {try {// 消费者自身设定的回调方法.System.out.println("messageId=" + basicProperties.getMessageId());System.out.println("body=" + new String(body, 0, body.length));Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());Assertions.assertEquals(1, basicProperties.getDeliverMode());Assertions.assertArrayEquals("hello".getBytes(), body);} catch (Error e) {// 断言如果失败, 抛出的是 Error, 而不是 Exception!e.printStackTrace();System.out.println("error");}}});Assertions.assertTrue(ok);Thread.sleep(500);// 再发送消息ok = virtualHost.basicPublish("testExchange", "testQueue", null,"hello".getBytes());Assertions.assertTrue(ok);}// 先发送消息, 后订阅队列.@Testpublic void testBasicConsume2() throws InterruptedException {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);// 先发送消息ok = virtualHost.basicPublish("testExchange", "testQueue", null,"hello".getBytes());Assertions.assertTrue(ok);// 再订阅队列ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {// 消费者自身设定的回调方法.System.out.println("messageId=" + basicProperties.getMessageId());System.out.println("body=" + new String(body, 0, body.length));Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());Assertions.assertEquals(1, basicProperties.getDeliverMode());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);}@Testpublic void testBasicConsumeFanout() throws InterruptedException {boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.FANOUT, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueDeclare("testQueue1", false, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueue1", "testExchange", "");Assertions.assertTrue(ok);ok = virtualHost.queueDeclare("testQueue2", false, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueue2", "testExchange", "");Assertions.assertTrue(ok);// 往交换机中发布一个消息ok = virtualHost.basicPublish("testExchange", "", null, "hello".getBytes());Assertions.assertTrue(ok);Thread.sleep(500);// 两个消费者订阅上述的两个队列.ok = virtualHost.basicConsume("testConsumer1", "testQueue1", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {System.out.println("consumerTag=" + consumerTag);System.out.println("messageId=" + basicProperties.getMessageId());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);ok = virtualHost.basicConsume("testConsumer2", "testQueue2", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {System.out.println("consumerTag=" + consumerTag);System.out.println("messageId=" + basicProperties.getMessageId());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);}@Testpublic void testBasicConsumeTopic() throws InterruptedException {boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.TOPIC, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueDeclare("testQueue", false, false, false, null);Assertions.assertTrue(ok);ok = virtualHost.queueBind("testQueue", "testExchange", "aaa.*.bbb");Assertions.assertTrue(ok);ok = virtualHost.basicPublish("testExchange", "aaa.ccc.bbb", null, "hello".getBytes());Assertions.assertTrue(ok);ok = virtualHost.basicConsume("testConsumer", "testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {System.out.println("consumerTag=" + consumerTag);System.out.println("messageId=" + basicProperties.getMessageId());Assertions.assertArrayEquals("hello".getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);}@Testpublic void testBasicAck() throws InterruptedException {boolean ok = virtualHost.queueDeclare("testQueue", true,false, false, null);Assertions.assertTrue(ok);ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);// 先发送消息ok = virtualHost.basicPublish("testExchange", "testQueue", null,"hello".getBytes());Assertions.assertTrue(ok);// 再订阅队列 [要改的地方, 把 autoAck 改成 false]ok = virtualHost.basicConsume("testConsumerTag", "testQueue", false, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {// 消费者自身设定的回调方法.System.out.println("messageId=" + basicProperties.getMessageId());System.out.println("body=" + new String(body, 0, body.length));Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());Assertions.assertEquals(1, basicProperties.getDeliverMode());Assertions.assertArrayEquals("hello".getBytes(), body);// [要改的地方, 新增手动调用 basicAck]boolean ok = virtualHost.basicAck("testQueue", basicProperties.getMessageId());Assertions.assertTrue(ok);}});Assertions.assertTrue(ok);Thread.sleep(500);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1410870.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

动态规划-子序列问题1

文章目录 1. 最长递增子序列&#xff08;300&#xff09;2. 摆动序列&#xff08;376&#xff09;3. 最长递增子序列的个数&#xff08;673&#xff09;4. 最长数对链&#xff08;646&#xff09; 1. 最长递增子序列&#xff08;300&#xff09; 题目描述&#xff1a; 状态表…

38.基础乐理-其余调号说明

目前只写了自然大调&#xff0c;还有其它的调式没有写&#xff0c;大调中还有 和声大调 与 旋律大调&#xff0c;除了大调&#xff0c;还有小调式、五声调式、中古调式等还有很多很多&#xff0c;这些东西是需要对于调号、拍号&#xff0c;对于五线谱、对于音程和弦都有一定程度…

OS考研chapter3内存管理

目录 一、基础知识点补充 1.内存、内存地址概念与联系 2.按byte编址 vs 按字编码 二、进程运行的基本原理 1.指令的工作原理 2.逻辑地址 vs 物理地址 3.从写程序到程序运行 &#xff08;1&#xff09;编辑源代码 &#xff08;2&#xff09;编译 &#xff08;3&#xf…

JZ69跳台阶

&#x1f600;前言 青蛙跳台阶是一个经典的问题&#xff0c;它描述了一只青蛙每次可以跳上1级台阶或者2级台阶&#xff0c;问跳上一个n级的台阶有多少种跳法。这个问题看似简单&#xff0c;实则蕴含了一定的数学思维和递推关系。在本文中&#xff0c;我们将通过分析问题的特性和…

合并两个有序数组(详解)

合并两个有序数组&#xff08;详解&#xff09; 合并两个有序数组 题目&#xff1a; 给你两个按 非递减顺序 排列的整数数组 nums1 和 nums2&#xff0c;另有两个整数 m 和 n &#xff0c;分别表示 nums1 和 nums2 中的元素数目。 请你 合并 nums2 到 nums1 中&#xff0c;…

空闲缓冲区(empty) 和 非空缓冲区(full) 的的概念和区别【操作系统 生产者——消费者进程】

首先&#xff0c;我们得有个环境——通常是个缓冲池&#xff0c;这个池子里可以塞很多缓冲区&#xff0c;它们是用来存放数据的。生产者就是那个不停造东西的家伙&#xff0c;而消费者则是等着用这些东西的人。 1. 空闲缓冲区&#xff08;empty&#xff09;&#xff1a; 这玩意…

顺序表经典算法

顺序表经典算法 1.移除元素 题目&#xff1a; 给你一个数组 nums 和一个值 val&#xff0c;你需要 原地 移除所有数值等于 val 的元素&#xff0c;并返回移除后数组的新长度。 不要使用额外的数组空间&#xff0c;你必须仅使用 O(1) 额外空间并 原地 修改输入数组。 元素的…

Linux编辑器调试器 gcc/g++ gdb 编译过程及使用讲解

这恋爱呀 我有两不谈 第一异性不谈 因为我们性别不一样 我知道的她不知道相处起来太累 第二同性不谈 因为我们性别一样 我知道的他也知道相处起来太无聊了 –❀–❀–❀–❀–❀–❀–❀–❀–❀–❀–❀–❀–❀–❀–❀–❀–❀–❀–❀-正文开始-❀–❀–❀–❀–❀–❀–…

雅思(IELTS)优秀小作文分享

IELTS优秀小作文分享 柱状图 本篇范文个人评分是8分或者8.5分&#xff0c;属于能找到的最优质的范文了 题目如下: The two sets of bar charts illustrate the amount of time that teenagers (boys, girls, and all) in the UK spend chatting online and playing game c…

DHCPv4_CLIENT_ALLOCATING_01: 在其本地物理子网上广播DHCPDISCOVER消息

测试目的&#xff1a; 确保客户端能够在其本地物理子网上广播DHCPDISCOVER消息。 描述&#xff1a; 该测试用例旨在验证DHCP客户端是否能够正确地在其本地物理子网上广播DHCPDISCOVER消息&#xff0c;以便进行IP地址的自动分配。 测试拓扑&#xff1a; 测试步骤&#xff1a…

汇报进度26届cpp,目前来说之后的规划,暑假打算回家10天就留校沉淀了

汇报一下进度吧&#xff0c;26双非菜鸡&#xff0c;cpper. 但目前学了一些go &#xff0c;辅修吧&#xff0c;距离发的上个动态已经过去3个月了&#xff0c;真的觉得找实习时间来不及&#xff0c;现在leetcode 100多道题&#xff0c;前几天蓝桥杯整了个省二&#xff0c;把OS和…

利用大语言模型(KIMI)构建智能产品的控制信息模型

数字化的核心是数字化建模&#xff0c;为一个事物构建数字模型是一项十分复杂的工作。不同的应用场景&#xff0c;对事物的关注重点的不同的。例如&#xff0c;对于一个智能传感器而言&#xff0c;从商业的角度看&#xff0c;产品的信息模型中应该包括产品的类型&#xff0c;名…

Linux蛋疼笔记之无法安装软件

Ubuntu在安装软件时&#xff0c;一直安装失败&#xff0c;提示&#xff1a; dpkg: error processing package gconf2-common (--configre):installed gconf2-common package post-installation script susprocess returned error exit status 10 Error wre encountered while …

数字滤波器设计笔记1

系统结构 1.先利用matlab的simulink和FDA进行滤波器建模设计&#xff0c;通过仿真后&#xff0c;确定模型达到相应的性能要求&#xff0c;再利用verilog进行电路设计。最后使用modelsim进行功能验证。其中testbench的输入数据&#xff0c;利用matlab模型的输入数据。 2.Matlab…

基于openwrt和libssh2实现ssh的远程登录

libssh2的支持 openwrt本身带有libssh和libssh2两种三方库。我们需要修改编译使其参与编译./scripts/feeds update -a ./scripts/feeds install -a执行make menuconfig操作&#xff0c;勾选你想要的三方库 ssh服务器的连接 这里需要注意的主要就是makefile里面记得添加ssh2…

刷代码随想录有感(52):用数组最大值及其两侧构造最大二叉树

题干&#xff1a; 代码&#xff1a; class Solution { public:TreeNode* constructMaximumBinaryTree(vector<int>& nums) {if(nums.size() 1)return new TreeNode(nums[0]);int maxvalue INT_MIN;int maxindex;for(int i 0; i < nums.size(); i){if(nums[i] …

[C++][数据结构]红黑树的介绍和模拟实现

前言 之前我们简单学习了一下搜索树和平衡搜索树&#xff0c;今天我们来学习一下红黑树 简介 概念 红黑树&#xff0c;是一种二叉搜索树&#xff0c;但在每个结点上增加一个存储位表示结点的颜色&#xff0c;可以是Red或Black。 通过对任何一条从根到叶子的路径上各个结点着…

地图产业的困局与破局:高精地图“上车”难 轻量化渐成主流方案 | 最新快讯

《科创板日报》5月3日讯&#xff08;编辑 邱思雨&#xff09; 近期&#xff0c;特斯拉与百度的“绯闻”成为智驾、地图行业的焦点。 有媒体消息称&#xff0c;特斯拉将与百度地图独家深度定制车道级高辅地图。《科创板日报》记者也获悉&#xff0c;自5月1日起&#xff0c;百度…

pygame鼠标绘制

pygame鼠标绘制 Pygame鼠标绘制效果代码 Pygame Pygame是一个开源的Python库&#xff0c;专为电子游戏开发而设计。它建立在SDL&#xff08;Simple DirectMedia Layer&#xff09;的基础上&#xff0c;允许开发者使用Python这种高级语言来实时开发电子游戏&#xff0c;而无需被…

ESXi8 中FreeBSD启动失败记录

一天突然发现ESXi8 中的FreeBSD启动失败&#xff0c;且自动挂载了FreeBSD的安装光盘&#xff0c;进入安装步骤。 惊了一身冷汗。 到虚拟主机设置里&#xff0c;发现引导选项里面&#xff0c;选择应当用来引导虚拟机的固件为EFI&#xff0c;原来是前段时间手残修改了&#xff0…