基于springboot2.5.7
废话不多说,直接上干货:
@Slf4j
@Configuration
@EnableConfigurationProperties(MqttProperties.class)
@IntegrationComponentScan(basePackages = {"扫描包路径","扫描包路径"})
public class MqttAutoConfig {@Autowiredprivate MqttProperties mqttProperties;@Autowiredprivate ApplicationContext applicationContext;@Autowiredprivate DisruptorProperties disruptorProperties;@RefreshScope@Bean(value = "mqttParallelQueueHandler",initMethod = "start",destroyMethod = "shutDown")@ConditionalOnProperty(prefix = "custom-config.mqtt", name = "disruptor", havingValue = "true")public ParallelQueueHandler mqttParallelQueueHandler(){log.info("初始化Disruptor...");return new ParallelQueueHandler.Builder<DisruptorEventData>().setDisruptorProperties(disruptorProperties).setWaitStrategy(new BlockingWaitStrategy()).setListener(new MQTTMsgListener()).build();}@Bean@ConditionalOnProperty(prefix = "custom-config.mqtt", value = {"username","password", "host-url"})public MqttConnectOptions getReceiverMqttConnectOptionsForSub(){MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();mqttConnectOptions.setUserName(mqttProperties.getUsername());mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());List<String> hostList = Arrays.asList(mqttProperties.getHostUrl().trim().split(","));String[] serverURIs = new String[hostList.size()];hostList.toArray(serverURIs);mqttConnectOptions.setServerURIs(serverURIs);mqttConnectOptions.setKeepAliveInterval(2);mqttConnectOptions.setAutomaticReconnect(true);return mqttConnectOptions;}/*** MQTT 连接工厂* @return MqttPahoClientFactory*/@Bean@ConditionalOnMissingBeanpublic MqttPahoClientFactory receiverMqttClientFactoryForSub(MqttConnectOptions mqttConnectOptions) {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(mqttConnectOptions);log.info("【MQTT】-初始化连接工厂...");return factory;}/*** 出站通道* @return MessageChannel*/@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}/*** MQTT 消息发送处理器* @return MessageHandler*/@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound(MqttPahoClientFactory factory) {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId()+"out", factory);messageHandler.setDefaultQos(1);//开启异步messageHandler.setAsync(true);messageHandler.setDefaultTopic("test");return messageHandler;}/*** 此处可以使用其他消息通道* Spring Integration默认的消息通道,它允许将消息发送给一个订阅者,然后阻碍发送直到消息被接收。** @return MessageChannel*/@Beanpublic MessageChannel mqttInBoundChannel() {return new DirectChannel();}/*** 适配器, 多个topic共用一个adapter* 客户端作为消费者,订阅主题,消费消息*/@Bean@ConditionalOnMissingBeanpublic MqttPahoMessageDrivenChannelAdapter mqttInbound(MqttPahoClientFactory factory) {List<String> topics = mqttProperties.getSubscribeTopics();String[] topicArray = new String[topics.size()];for (int i = 0; i < topics.size(); i++) {topicArray[i] = "$queue/"+ topics.get(i);}log.info("【MQTT】-订阅TOPIC:{}", Arrays.toString(topicArray));MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId(), factory, topicArray);adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout());adapter.setConverter(new DefaultPahoMessageConverter());adapter.setRecoveryInterval(10000);adapter.setQos(1);adapter.setOutputChannel(mqttInBoundChannel());return adapter;}@Autowired(required = false)@Qualifier("mqttParallelQueueHandler")private ParallelQueueHandler<DisruptorEventData> parallelQueueHandler;/*** mqtt入站消息处理工具,对于指定消息入站通道接收到生产者生产的消息后处理消息的工具。* @return MessageHandler*/@Bean@RefreshScope@ServiceActivator(inputChannel = "mqttInBoundChannel")public MessageHandler mqttMessageHandler() {// 获取配置中的设备品牌MqttProperties.DeviceBrand deviceBrand = mqttProperties.getDeviceBrand();boolean disruptor = mqttProperties.isDisruptor();// 获取所有实现了 CustomMqttMessageHandler 接口的 Beanreturn message -> {log.info("【MQTT】-收到MQTT消息,Topic: {}, Payload: {}",message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC),message.getPayload());String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);Map<String, CustomMqttMessageReceiverHandler> handlers = applicationContext.getBeansOfType(CustomMqttMessageReceiverHandler.class);boolean handled = false;if (Objects.nonNull(deviceBrand)){CustomMqttMessageReceiverHandler handler = handlers.get(deviceBrand.getServiceName());if (Objects.nonNull(handler)){if (handler.supportsTopic(topic)) {handled = this.run(handler,message,topic,disruptor);}}else {log.error("【MQTT】-未找到设备品牌消息接收处理器,deviceBrand->{}",deviceBrand);}}else {for (CustomMqttMessageReceiverHandler handler : handlers.values()) {if (handler.supportsTopic(topic)) {handled = this.run(handler,message,topic,disruptor);}}}if (!handled) {log.warn("【MQTT】-未找到匹配的处理器来处理Topic {} 的消息", topic);}};}@Bean@ConditionalOnProperty(prefix = "custom-config.mqtt", value = {"username","password", "host-url"})public MqttMessageSender mqttMessageSender(){return new MqttMessageSender();}private boolean run(CustomMqttMessageReceiverHandler handler,Message<?> message,String topic,boolean disruptor){try {String traceId = MDC.get("traceId");if (!StringUtils.hasText(traceId)){traceId = UUID.randomUUID().toString().replaceAll("-", "");MDC.put("traceId",traceId);}if (disruptor && Objects.nonNull(parallelQueueHandler)){log.info("【MQTT】-使用Disruptor处理...");DisruptorEventData data = new DisruptorEventData();Map<String,Object> map = new HashMap<>();map.put("data",message);map.put("handler",handler);map.put("traceId",traceId);data.setMessage(map);parallelQueueHandler.add(data);}else {handler.handleMessage(message);}return true;} catch (Exception e) {log.error("【MQTT】-Handler {} 处理Topic {} 的消息时出错", handler.getClass().getSimpleName(), topic, e);return false;}finally {MDC.clear();}}
由于涉及隐私,其余代码可以留言