第五章、SpringBoot与消息通信(三)
第五章、SpringBoot与消息通信(三)
Spring Boot 程序中集成 ActiveMQ
当 classpath 上存在 ActiveMQ 时,Spring Boot 会自动配置一个ConnectionFactory。我们可以通过使用 spring-boot-starter-activemq 这个启动依赖实现引入连接 ActiveMQ 实例的依赖并实现自动配置。此外,在 Spring Boot 程序中,可以通过使用spring.activemq.*属性在程序的全局配置文件,如 application.properties 文件中配置ActiveMQ 属性。
SpringBoot 集成 ActiveMQ 实现消息通信
- Spring Boot 实现点对点模型消息传送
- Spring Boot 实现发布/订阅模型消息传送
Spring Boot 实现点对点模型消息传送
在点对点模型中,消息由一个生产者(Producer)传送到一个使用者 ( Consumer ) 。 消 息 的 生 产 者 称 为 发 送 者 ( Sender ) , 消 息 的 使 用 者 称 为 接 收 者(Receiver),消息传送的目的地为队列(Queue)。消息由发送者,也就是生产者,发送到队列目的地,然后传送给注册了队列的其中一个接收者,也就是使用者。任意多的生产者可以发送消息到同一个队列,且每条消息确保被传送并由一个使用者使用。如果没有任何注册使用者来使用消息,则队列保留消息直到有使用者注册和使用消息。
ActiveMQ 实现点对点模型消息传送,需要执行以下步骤:
- 启动 ActiveMQ 服务
- 创建客户端程序
- 添加 ActiveMQ 启动依赖
- 配置 ActiveMQ 属性
- 编写消息发送程序
- 编写消息接收程序
- 测试发送和接收消息
1.启动 ActiveMQ 服务
使用 ActiveMQ 作为消息代理,提供消息服务,需要先启动 ActiveMQ 服务。可以通过运行 cmd 命令窗口,切换到 ActiveMQ 的 bin 目录,通过 activemq start 命令启动 ActiveMQ 服务。
启动 ActiveMQ 服务后,可通过 url:http://127.0.0.1:8161/ 访问 ActiveMQ 的 Web 控制台,以跟踪目的地的创建、消息的发送和接收等。我们在前面的内容中详细探讨了 ActiveMQ 的下载、安装和启动,此处不再过多赘述。
2.创建客户端程序
JMS 客户端是指发送和接收消息的客户端程序。为了实现消息的发送和接收,也为了方便更好地理解,我们可以创建不同的 Spring Boot 应用程序,分别用于发送消息和接收消息。
3.添加 ActiveMQ 启动依赖
Spring Boot 提供 spring-boot-starter-activemq 这个启动依赖实现在 Spring Boot 程序中快速集成和自动配置置。
添加 ActiveMQ 启动依赖的代码为:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
4.配置 ActiveMQ 属性
在 Spring Boot 程序中,可以通过使用 spring.activemq.*属性在程序的全局配置文件,如application.properties 文件中配置 ActiveMQ 属性。
例如,可以使用以下代码配置 ActiveMQ 和 JMS 的常用属性:
server.port=8081
server.servlet.context-path=/activity511
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.jms.pub-sub-domain=false
spring.activemq.packages.trust-all=true
关于上述代码中使用的属性,说明如下:
- spring.activemq.broker-url:指定要连接的 ActiveMQ 代理的 URL
- spring.activemq.user:指定消息代理的登录用户名,也就是连接到 ActiveMQ 代理的用户名,默认用户名为 admin,也可通过修改 ActiveMQ 的配置文件添加用户。
- spring.activemq.password:指定消息代理的登录密码,也就是连接到 ActiveMQ 代理的密码,也可通过修改 ActiveMQ 的配置文件配置用户对应的密码。
- spring.jms.pub-sub-domain:指定是否使用主题(Topic)作为默认目的地,默认值为 false。如果设置为 true,表示使用发布/订阅模型,否则,表示使用点对点模型。
- spring.activemq.packages.trust-all:指定是否信任所有包,这样任何包下的的任何对象都可以作为消息被发送出去。默认情况下,是不能发送对象消息的,需要将发送的对象所在的包添加为信任的包,才能将对象作为消息发送出去,也可通过 spring.activemq.packages.trusted 属性指定一个或多个受信任的包。
5.编写消息发送程序
集成 ActiveMQ 后,我们就可以着手编写消息发送程序和消息接收程序了。Spring 框架提供JmsTemplate 等类,简化了使用 JMS API 实现消息通信的编码,这样开发人员不需要去直接操作ConnectionFactory 、 Connection 、 Session 等 组 件 , 而 且 可 以 直 接 在 需 要 的 地 方 注 入JmsTemplate 组件,调用其方法发送消息即可。
消息中的对象
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {private int id;private String realName;
}
我们可以使用类似如下的代码在一个客户端程序中编写消息发送程序:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;@Component
public class MessProductService {private int count_1,count_2;@Autowiredprivate JmsTemplate jmsTemplate;public void sendMessage_1(){count_1++;jmsTemplate.convertAndSend("queue_1","队列1文本消息:"+count_1);}public void sendMessage_2(){count_2++;jmsTemplate.convertAndSend("queue_2",new User(count_2,"queue2-张三"+count_2));}
}
6.编写消息接收程序
JMS 客户端可以同步或异步方式使用消息,可以通过调用 JmsTemplate 类的 receive()方法实现同步接收消息。如果希望以异步的方式接收消息,可以通过注册消息监听器实现,接下来我们来了解如何使用以下方式接收消息:
- 调用 JmsTemplate 类的 receive()方法实现同步接收消息如果希望使用同步方式实现消息接收,可以通过调用 JmsTemplate 类的 receive()方法接收消息。该方法有以下几种形态:
- public Message receive() throws JmsException从默认目的地接收消息
- public Message receive(Destination destination) throws JmsException{}从传入的 Destination 对象指定的目的地接收消息
- public Message receive(String destinationName) throws JmsException {}从指定名称的目的地接收消息
我们可以在需要的地方调用 JmsTemplate 的 receive()方法接收和使用消息,示例代码如下:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import javax.jms.TextMessage;@Component
public class MessConsumerService {@Autowiredprivate JmsTemplate jmsTemplate;public void receiveTextMessage() throws JMSException {TextMessage msg = (TextMessage) jmsTemplate.receive("queue_1");System.out.println(msg.getText());}public void receiveObejctMessage() throws JMSException {ObjectMessage msg = (ObjectMessage) jmsTemplate.receive("queue_2");User user= (User) msg.getObject();System.out.println(user);}
}
- 测试
生产者
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import javax.jms.JMSException;
@SpringBootTest
class JmsDemoApplicationTests {@Autowiredprivate MessProductService productService;@Testvoid testProductService(){productService.sendMessage_1();productService.sendMessage_2();}
}
消费者
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import javax.jms.JMSException;@SpringBootTest
class JmsDemoApplicationTests_2 {@Autowiredprivate MessConsumerService consumerService;@Testvoid testConsumerService() throws JMSException {consumerService.receiveTextMessage();consumerService.receiveObejctMessage();}
}
- 添加使用@JmsListener 注解修饰的方法监听消息接收
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;@Component
public class MessListenerService {@JmsListener(destination = "queue_2")public void receiveTextMsg(String message){System.out.println("消费者1-收到文本消息:"+message);}@JmsListener(destination = "queue_2")public void receiveTextMsg_2(String message){System.out.println("消费者2-收到文本消息:"+message);}
}
- 测试
@Autowired
private MessProductService productService;@Test
void testProductService(){productService.sendMessage_1();productService.sendMessage_2();
}
7.测试发送和接收消息
Spring Boot 实现发布/订阅模型消息传送
在前面的内容中,我们了解到:在发布/订阅模型中,消息从生产者(Producer)传送到任意数量的使用者(Consumer)。消息的生产者称为发布者(Publisher),消息的使用者称为订阅者(Subscriber),消息传送的目的地为主题(Topic)。消息由发布者,也就是生产者,发布到主题目的地,然后传送给所有已订阅该主题的活跃订阅者,也就是使用者。任意数量的生产者可以将消息发布到主题目的地,并且每条消息都可以传送给任意数量的使用者。如果没有已注册的订阅者,则主题目的地不保存消息,除非它对不活动的订阅者具有持久的订阅。
使用 Spring Boot 集成 ActiveMQ 实现发布/订阅模型消息传送,需要执行的步骤和实现点对点模型消息传送类似,只是在配置 ActiveMQ 和 JMS 属性时,需要将 spring.jms.pub-sub-domain 属性的值设置为 true,表示使用主题(Topic)作为消息目的地,也就是使用发布/订阅模型。
示例代码如下:
spring.jms.pub-sub-domain=true
注册主题
@SpringBootApplication
public class JmsDemoApplication {public static void main(String[] args) {SpringApplication.run(JmsDemoApplication.class, args);}@Beanpublic Topic topic(){return new ActiveMQTopic("niit");}
}
消息发布者
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;import javax.jms.Topic;@Component
public class MessPublisher {@Autowiredprivate Topic topic;@Autowiredprivate JmsTemplate jmsTemplate;public void sendTopic(String msg) {System.out.println("发送Topic消息内容 :" + msg);jmsTemplate.convertAndSend(this.topic, msg);}}
消息订阅者
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;@Component
public class MessListenerService {@JmsListener(destination = "niit")public void receiveTopic_1(String text){System.out.println("订阅者1接收到消息:"+text);}@JmsListener(destination = "niit")public void receiveTopic_2(String text){System.out.println("订阅者2接收到消息:"+text);}
}
测试
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
class JmsDemoApplicationTests {@Autowiredprivate MessPublisher publisher;@Testvoid testSentTopic(){publisher.sendTopic("Spring Boot 开课了!");}
}