文章目录
- 一、创建SpringBoot项目
- 二、创建核心类
- 创建 Exchange类
- 创建 MSGQueue类
- 创建 Binding类
- 创建Message类
一、创建SpringBoot项目
在项目中添加这四个依赖!
二、创建核心类
交换机 :Exchange
队列 :Queue
绑定关系: Binding
消息 :Message
这些核心类都存在于 BrokerServer 中.
先创建出服务器与客户端的包.
再在服务器中创建 core包,用来存放这些核心类.
创建 Exchange类
首先考虑,咱们在此处共实现了三种交换机类型,所以咱们可以创建一个枚举类来表示交换机类型.
/*** 表示交换机类型*/
public enum ExchangeType {DIRECT(0),FANOUT(1),TOPIC(2);private final int type;private ExchangeType(int type) {this.type = type;}public int getType() {return type;}
}
咱们再考虑,Exchange类中有哪些属性?
- 1.name,当作交换机的唯一身份标识
- 2.ExchangeType,表示交换机类型
- 3.durable,表示这个交换机是否需要持久化存储
- 4.autoDelete,表示该交换机在无人使用后,是否会自动删除
- 5.arguments,表示后续的一些拓展功能
/*** 表示一个交换机* 交换机的使用者是生产者*/
@Data
public class Exchange {// 此处使用 name 作为交换机的身份标识,(唯一的)private String name;// 交换机类型,DIRECT,FANOUT,TOPICprivate ExchangeType type = ExchangeType.DIRECT;// 该交换机是否要持久化存储,true表示要,false表示不要private boolean durable = false;// 如果当前交换机,没人使用了,就会自动删除// 这个属性暂时放在这(后续代码中没有实现,RabbitMQ中实现了)private boolean autoDelete = false;// arguments 表示的是创建交换机时指定的一些额外参数// 这个属性也暂时放在这(后续代码中没有实现,RabbitMQ中实现了)// 为了把这个 arguments 存到数据库中,需要将 arguments 转换为 json 格式的字符串private Map<String,Object> arguments = new HashMap<>();// 这里的 get set 用于与数据库交互使用public String getArguments() {ObjectMapper objectMapper = new ObjectMapper();try {// 将 arguments 按照 JSON 格式 转换成 字符串return objectMapper.writeValueAsString(arguments);} catch (JsonProcessingException e) {e.printStackTrace();}// 如果代码抛出异常,返回一个空的 json 字符串return "{}";}public void setArguments(String arguments) {ObjectMapper objectMapper = new ObjectMapper();try {// 将库中的 arguments 按照 JSON 格式解析,转换成 Map 对象this.arguments = objectMapper.readValue(arguments, new TypeReference<HashMap<String,Object>>() {});} catch (JsonProcessingException e) {e.printStackTrace();}}public void setArguments(Map<String,Object> arguments) {this.arguments = arguments;}// 这里的 get set ,用来更方便的获取/设置 arguments 中的键值对// 这一组 getter setter 是在Java内部代码使用的(比如测试的时候)public Object getArguments(String key) {return arguments.get(key);}public void setArguments(String key,Object value) {arguments.put(key, value);}
}
创建 MSGQueue类
MSGQueue类中有哪些属性?
与Exchange类大差不差.
直接贴代码
/*** 表示一个存储消息的队列* MSG =》Message* 消息队列的使用者是消费者*/
@Data
public class MSGQueue {// 表示队列的身份标识private String name;// 表示队列是否持久化private boolean durable = false;// true -> 这个队列只能被一个消费者使用,false -> 大家都能使用这个队列// 后续代码不实现相关功能private boolean exclusive = false;// true -> 没人使用后,自动删除,false -> 没人使用,不自动删除private boolean autoDelete = false;// 表示扩展参数,后续代码没有实现private Map<String,Object> arguments = new HashMap<>();public String getArguments() {ObjectMapper objectMapper = new ObjectMapper();try {return objectMapper.writeValueAsString(arguments);} catch (JsonProcessingException e) {e.printStackTrace();}return null;}public void setArguments(String arguments) {ObjectMapper objectMapper = new ObjectMapper();try {this.arguments = objectMapper.readValue(arguments, new TypeReference<HashMap<String,Object>>() {});} catch (JsonProcessingException e) {e.printStackTrace();}}public void setArguments(Map<String,Object> arguments) {this.arguments = arguments;}public Object getArguments(String key) {return arguments.get(key);}public void setArguments(String key,Object value) {arguments.put(key, value);}
}
创建 Binding类
/*** 表示队列和交换机之间的绑定关系*/
@Data
public class Binding {private String exchangeName;private String queueName;// 主题交换机的匹配keyprivate String bindingKey;
}
创建Message类
Message类,大致可以分为三个部分.
- 消息自身的属性
- 消息的正文
- 消息的持久化存储所需属性
我们新建一个 BasicProperties 类来表示 消息的属性.
/*** 这个类表示消息的属性*/
@Data // 实现 Serializable 接口是为了后续的序列化操作
public class BasicProperties implements Serializable {// 消息的唯一身份标识private String messageId;// 如果当前交换机是 DIRECT,此时 routingKey 表示要转发的队列名// 如果当前交换机是 FANOUT,此时 routingKey 无意义// 如果当前交换机是 TOPIC,此时 routingKey 就要和bindingKey进行匹配,匹配成功才转发给对应的消息队列private String routingKey;// 这个属性表示消息是否要持久化,1表示不持久化,2 表示持久化private int deliverMode = 1;
}
持久化存储会在下面讲到,莫慌.
/*** 这个类表示一个消息*/
@Data // 实现 Serializable 接口是为了后续的序列化操作
public class Message implements Serializable {// 消息的属性private BasicProperties basicProperties = new BasicProperties();// 消息的正文private byte[] body;// 相当于消息的版本号,主要针对 Message 类有改动后,再去反序列化之前旧的 message时,可能会出现错误// 因此引入消息版本号,如果版本号不匹配,就不允许反序列化直接报错,来告知程序猿,后续代码中并未实现该功能private static final long serialVersionUid = 1L;// 下面的属性是持久化存储需要的属性// 消息存储到文件中,使用一下两个偏移量来确定消息在文件中的位置 [offsetBeg,offsetEnd)// 这两个属性不需要 序列化 存储到文件中,存储到文件中后位置就固定了,// 这两个属性的作用是让 内存 中的 message 能顺利找到 文件 中的 message// 被 transient 修饰的属性,不会被 标准库 的 序列化方式 序列化private transient long offsetBeg = 0; // 消息数据的开头举例文件开头的位置偏移(字节)private transient long offsetEnd = 0; // 消息数据的结尾举例文件开头的位置偏移(字节)// 使用这个属性表示该消息在文件中是否是有效信息(逻辑删除)// 0x1表示有效,0x0表示无效private byte isValid = 0x1;// 创建工厂方法,让工厂方法封装 new Message 对象的过程// 该方法创建的 Message 对象,会自动生成唯一的MessageIdpublic static Message createMessageWithId(String routingKey,BasicProperties basicProperties,byte[] body) {Message message = new Message();if (basicProperties != null) {message.setBasicProperties(basicProperties);}message.basicProperties.setRoutingKey(routingKey);// 此处生成的 MessageId 以 M- 作为前缀message.setMessageId("M-" + UUID.randomUUID());message.setBody(body);// 此处先将 message的核心部分 basicProperties 与 body设置了// 而 offsetBeg,offsetEnd,isValid,这些属性是持久化时才设置的return message;}// 直接获取消息idpublic String getMessageId() {return basicProperties.getMessageId();}// 直接更改消息idpublic void setMessageId(String messageId) {basicProperties.setMessageId(messageId);}// 直接获取 消息的keypublic String getRoutingKey() {return basicProperties.getRoutingKey();}// 直接更改 消息的keypublic void setRoutingKey(String routingKey) {basicProperties.setRoutingKey(routingKey);}// 直接获取 消息的是否持久化存储字段public int getDeliverMode() {return basicProperties.getDeliverMode();}// 直接修改 消息的是否持久化存储字段public void setDeliverMode(int mode) {basicProperties.setDeliverMode(mode);}
}
这些核心类就都建好了,下篇文章就来考虑他们的持久化存储与内存存储!