Web实时消息推送

Web实时消息推送

消息推送

  • web端消息推送
  • 移动端推送

消息推送:推Push 和 拉Pull

消息推送常见方案

短轮询

轮询:由浏览器向服务器发出HTTP请求,服务器实时返回未读消息数据给客户端,浏览器再做渲染显示。

JS请求

setInterval(() => {// 方法请求messageCount().then((res) => {if (res.code === 200) {this.messageCount = res.data;}});
}, 1000);
长轮询

长轮询是短轮询的改进版本,减少对服务器资源浪费的同时,保证消息的相对实时性。长轮询在中间件中应用的很广泛,比如 Nacos 和 Apollo 配置中心,消息队列 Kafka、RocketMQ 中都有用到长轮询。

因为一个 ID 可能会被多个长轮询请求监听,所以我采用了 Guava 包提供的Multimap结构存放长轮询,一个 key 可以对应多个 value。一旦监听到 key 发生变化,对应的所有长轮询都会响应。前端得到非请求超时的状态码,知晓数据变更,主动查询未读消息数接口,更新页面数据。

@Controller
@RequestMapping("/polling")
public class PollingController {// 存放监听某个Id的长轮询集合// 线程同步结构public static Multimap<String, DeferredResult<String>> watchRequests = Multimaps.synchronizedMultimap(HashMultimap.create());/*** 设置监听*/@GetMapping(path = "watch/{id}")@ResponseBodypublic DeferredResult<String> watch(@PathVariable String id) {// 延迟对象设置超时时间DeferredResult<String> deferredResult = new DeferredResult<>(TIME_OUT);// 异步请求完成时移除 key,防止内存溢出deferredResult.onCompletion(() -> {watchRequests.remove(id, deferredResult);});// 注册长轮询请求watchRequests.put(id, deferredResult);return deferredResult;}/*** 变更数据*/@GetMapping(path = "publish/{id}")@ResponseBodypublic String publish(@PathVariable String id) {// 数据变更 取出监听ID的所有长轮询请求,并一一响应处理if (watchRequests.containsKey(id)) {Collection<DeferredResult<String>> deferredResults = watchRequests.get(id);for (DeferredResult<String> deferredResult : deferredResults) {deferredResult.setResult("我更新了" + new Date());}}return "success";}

当请求超过设置的超时时间,会抛出AsyncRequestTimeoutException异常,这里直接用@ControllerAdvice全局捕获统一返回即可,前端获取约定好的状态码后再次发起长轮询请求,如此往复调用。

@ControllerAdvice
public class AsyncRequestTimeoutHandler {@ResponseStatus(HttpStatus.NOT_MODIFIED)@ResponseBody@ExceptionHandler(AsyncRequestTimeoutException.class)public String asyncRequestTimeoutHandler(AsyncRequestTimeoutException e) {System.out.println("异步请求超时");return "304";}
}

SSE

除了可以用WebSocket这种耳熟能详的机制外,还有一种服务器发送事件(Server-Sent Events),简称 SSE。一种服务器端到客户端(浏览器)的单向消息推送

ChatGPT 就是采用的 SSE。对于需要长时间等待响应的对话场景,ChatGPT 采用了一种巧妙的策略:它会将已经计算出的数据“推送”给用户,并利用 SSE 技术在计算过程中持续返回数据。这样做的好处是可以避免用户因等待时间过长而选择关闭页面。

SSE 在服务器和客户端之间打开一个单向通道,服务端响应的不再是一次性的数据包而是text/event-stream类型的数据流信息,在有数据变更时从服务器流式传输到客户端。

整体的实现思路有点类似于在线视频播放,视频流会连续不断的推送到浏览器,你也可以理解成,客户端在完成一次用时很长(网络不畅)的下载。

SSE交互
SSE 与 WebSocket 作用相似,都可以建立服务端与浏览器之间的通信,实现服务端向客户端推送消息,但还是有些许不同:

  • SSE 是基于 HTTP 协议的,它们不需要特殊的协议或服务器实现即可工作;WebSocket 需单独服务器来处理协议。
  • SSE 单向通信,只能由服务端向客户端单向通信;WebSocket 全双工通信,即通信的双方可以同时发送和接受信息。
  • SSE 实现简单开发成本低,无需引入其他组件;WebSocket 传输数据需做二次解析,开发门槛高一些。
  • SSE 默认支持断线重连;WebSocket 则需要自己实现。
  • SSE 只能传送文本消息,二进制数据需要经过编码后传送;WebSocket 默认支持传送二进制数据。

前端只需进行一次 HTTP 请求,带上唯一 ID,打开事件流,监听服务端推送的事件就可以了

<script>let source = null;let userId = 7777if (window.EventSource) {// 建立连接source = new EventSource('http://localhost:7777/sse/sub/'+userId);setMessageInnerHTML("连接用户=" + userId);/*** 连接一旦建立,就会触发open事件* 另一种写法:source.onopen = function (event) {}*/source.addEventListener('open', function (e) {setMessageInnerHTML("建立连接。。。");}, false);/*** 客户端收到服务器发来的数据* 另一种写法:source.onmessage = function (event) {}*/source.addEventListener('message', function (e) {setMessageInnerHTML(e.data);});} else {setMessageInnerHTML("你的浏览器不支持SSE");}
</script>

服务端的实现更简单,创建一个SseEmitter对象放入sseEmitterMap进行管理

private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();/*** 创建连接*/
public static SseEmitter connect(String userId) {try {// 设置超时时间,0表示不过期。默认30秒SseEmitter sseEmitter = new SseEmitter(0L);// 注册回调sseEmitter.onCompletion(completionCallBack(userId));sseEmitter.onError(errorCallBack(userId));sseEmitter.onTimeout(timeoutCallBack(userId));sseEmitterMap.put(userId, sseEmitter);count.getAndIncrement();return sseEmitter;} catch (Exception e) {log.info("创建新的sse连接异常,当前用户:{}", userId);}return null;
}/*** 给指定用户发送消息*/
public static void sendMessage(String userId, String message) {if (sseEmitterMap.containsKey(userId)) {try {sseEmitterMap.get(userId).send(message);} catch (IOException e) {log.error("用户[{}]推送异常:{}", userId, e.getMessage());removeUser(userId);}}
}

WebSocket

这是一种在 TCP 连接上进行全双工通信的协议,建立客户端和服务器之间的通信渠道。浏览器和服务器仅需一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

工作流程:

  1. 客户端向服务器发送一个 HTTP 请求,请求头中包含 Upgrade: websocketSec-WebSocket-Key 等字段,表示要求升级协议为 WebSocket;
  2. 服务器收到这个请求后,会进行升级协议的操作,如果支持 WebSocket,它将回复一个 HTTP 101 状态码,响应头中包含 ,Connection: UpgradeSec-WebSocket-Accept: xxx 等字段、表示成功升级到 WebSocket 协议。
  3. 客户端和服务器之间建立了一个 WebSocket 连接,进行双向的数据传输。数据以帧(frames)的形式进行传送,而不是传统的 HTTP 请求和响应。WebSocket 的每条消息可能会被切分成多个数据帧(最小单位)。发送端会将消息切割成多个帧发送给接收端,接收端接收消息帧,并将关联的帧重新组装成完整的消息。
  4. 客户端或服务器可以主动发送一个关闭帧,表示要断开连接。另一方收到后,也会回复一个关闭帧,然后双方关闭 TCP 连接。

另外,建立 WebSocket 连接之后,通过心跳机制来保持 WebSocket 连接的稳定性和活跃性。

SpringBoot工程应用WebSocket实现

<!-- 引入websocket -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

服务端使用@ServerEndpoint注解标注当前类为一个 WebSocket 服务器,客户端可以通过ws://localhost:7777/webSocket/10086来连接到 WebSocket 服务器端。

@Component
@Slf4j
@ServerEndpoint("/websocket/{userId}")
public class WebSocketServer {//与某个客户端的连接会话,需要通过它来给客户端发送数据private Session session;private static final CopyOnWriteArraySet<WebSocketServer> webSockets = new CopyOnWriteArraySet<>();// 用来存在线连接数private static final Map<String, Session> sessionPool = new HashMap<String, Session>();/*** 链接成功调用的方法*/@OnOpenpublic void onOpen(Session session, @PathParam(value = "userId") String userId) {try {this.session = session;webSockets.add(this);sessionPool.put(userId, session);log.info("websocket消息: 有新的连接,总数为:" + webSockets.size());} catch (Exception e) {}}/*** 收到客户端消息后调用的方法*/@OnMessagepublic void onMessage(String message) {log.info("websocket消息: 收到客户端消息:" + message);}/*** 此为单点消息*/public void sendOneMessage(String userId, String message) {Session session = sessionPool.get(userId);if (session != null && session.isOpen()) {try {log.info("websocket:单点消息:" + message);session.getAsyncRemote().sendText(message);} catch (Exception e) {e.printStackTrace();}}}
}

服务端还需要注入ServerEndpointerExporter,这个 Bean 就会自动注册使用了@ServerEndpoint注解的 WebSocket 服务器。

@Configuration
public class WebSocketConfiguration {/*** 用于注册使用了 @ServerEndpoint 注解的 WebSocket 服务器*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}

前端初始化打开 WebSocket 连接,并监听连接状态,接收服务端数据或向服务端发送数据。

<script>var ws = new WebSocket('ws://localhost:7777/webSocket/10086');// 获取连接状态console.log('ws连接状态:' + ws.readyState);//监听是否连接成功ws.onopen = function () {console.log('ws连接状态:' + ws.readyState);//连接成功则发送一个数据ws.send('test1');}// 接听服务器发回的信息并处理展示ws.onmessage = function (data) {console.log('接收到来自服务器的消息:');console.log(data);//完成通信后关闭WebSocket连接ws.close();}// 监听连接关闭事件ws.onclose = function () {// 监听整个过程中websocket的状态console.log('ws连接状态:' + ws.readyState);}// 监听并处理error事件ws.onerror = function (error) {console.log(error);}function sendMessage() {var content = $("#message").val();$.ajax({url: '/socket/publish?userId=10086&message=' + content,type: 'GET',data: { "id": "7777", "content": content },success: function (data) {console.log(data)}})}
</script>

MTQQ协议

概念:基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,通过订阅相应的主题来获取消息,是物联网中的一个标准传输协议。

该协议将消息的发布者(publisher)与订阅者(subscriber)进行分离,因此可以在不可靠的网络环境中,为远程连接的设备提供可靠的消息服务。

TCP 协议位于传输层,MQTT 协议位于应用层,MQTT 协议构建于 TCP/IP 协议上,也就是说只要支持 TCP/IP 协议栈的地方,都可以使用 MQTT 协议。

为什么要用 MQTT 协议?

为什么不是其它协议,比如更为熟悉的 HTTP 协议呢?

  • 首先 HTTP 协议是一种同步协议,客户端请求后需要等待服务器的响应。而在物联网(IOT)环境中,设备会很受制于环境的影响,比如带宽低、网络延迟高、网络通信不稳定等,显然异步消息协议更为适合 IOT 应用程序。
  • HTTP 是单向的,如果要获取消息客户端必须发起连接,而在物联网(IOT)应用程序中,设备或传感器往往都是客户端,这意味着它们无法被动地接收来自网络的命令。
  • 通常需要将一条命令或者消息,发送到网络上的所有设备上。HTTP 要实现这样的功能不但很困难,而且成本极高。

我也没想到 springboot + rabbitmq 做智能家居,会这么简单

未读消息(小红点),前端 与 RabbitMQ 实时消息推送实践,贼简单~

总结

介绍优点缺点
SSE一种服务器端到客户端(浏览器)的单向消息推送。简单、易实现,功能丰富不支持双向通信
WebSocket除了最初建立连接时用 HTTP 协议,其他时候都是直接基于 TCP 协议进行通信的,可以实现客户端和服务端的全双工通信。性能高、开销小实现相对复杂一些
MQTT基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,通过订阅相应的主题来获取消息。成熟稳定,轻量级实现相对复杂一些

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/11089.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

ReactPress数据库表结构设计全面分析

ReactPress Github项目地址&#xff1a;https://github.com/fecommunity/reactpress 欢迎Star。 ReactPress是一个基于React框架开发的开源发布平台和内容管理系统&#xff08;CMS&#xff09;。它不仅支持用户在支持React和MySQL数据库的服务器上搭建自己的博客和网站&#…

无线振动传感器的安装方法

lora无线温振一体传感器即传感器的采集时间&#xff0c;采集方式完全有主机通过命令实现。其主要特点是&#xff1a;传感器平时处在低功耗状态、传感器可以随时响应远程主机控制命令、传感器可采集特征值或者原始加速度数据 lora 技术&#xff0c;提高了传输速率多振动&#xf…

GESP4级考试语法知识(暴力枚举(三))

古堡算式代码&#xff1a; # include <stdio.h> int main(){int a, b, c, d, e;int x;int left, right;left a * 10000 b * 1000 c * 100 d * 10 e * 1;right e * 10000 d * 1000 c * 100 b * 10 a * 1;for(a 0; a < 9; a){for(b 0; b < 9; b){for(c …

CSS学习

目录 一、CSS概述 二、CSS的三种引入方式 (一)直接使用style标签编辑样式(调试样式代码时使用) (二)使用link标签引入CSS文件(上线时使用) (三)内嵌式(尽量不用&#xff0c;后期维护麻烦) 三、CSS常用选择器 (一)标签选择器(通过html标签选择元素) (二)class选择器(通…

数字IC实践项目(10)—基于System Verilog的DDR4 Model/Tb 及基础Verification IP的设计与验证(付费项目)

数字IC实践项目&#xff08;10&#xff09;—基于System Verilog的DDR4 Model/Tb 及基础Verification IP的设计与验证&#xff08;付费项目&#xff09; 前言项目框图1&#xff09;DDR4 Verification IP2&#xff09;DDR4 JEDEC Model & Tb 项目文件1&#xff09;DDR4 Veri…

NLP论文速读|ScPO:自我一致性的偏好优化(Self-Consistency Preference Optimization)

论文速读|Self-Consistency Preference Optimization 论文信息&#xff1a; 简介&#xff1a; 这篇论文试图解决的问题是如何在没有人类标注数据的情况下&#xff0c;提高大型语言模型&#xff08;LLMs&#xff09;在复杂推理任务上的性能。现有的自我对齐技术往往因为难以分配…

Java定时任务

业务场景&#xff1a; 系统凌晨1点数据备份。用户下单半小时未支付订单&#xff0c;需要自动取消订单。每10min动态抓取某网站的数据。博客定时发送文章。每晚定时计算用户当日收益情况并推送给用户最新的数据。 分布式定时任务 Redis Redis过期事件监听。Redisson内置延时…

Data Grouping 数据分组

Goto Data Grid 数据网格 Data Grouping 数据分组 分组功能将具有相同列值的行合并到相同的数据组中。它受 Grid View 和 Banded Grid View 支持。 Apply Grouping 应用分组 数据分组最初在 Data Grid 中启用&#xff08;默认设置&#xff09;。要按列对数据进行分组&#…

对于大根堆的计算时间复杂度的过程

目录 第一步 第二步 第三步 第四步 第一步 首先进行假设 第二步 然后求解出每一层的节点个数这一层节点需要调整的所在高度 第三步 接着每一层节点需要调整的次数 &#xff08;每一层的节点个数 * 这一层节点需要调整的所在高度&#xff09;再全部相加起来 利用*2T&…

ANNOVAR下载

1.官网 https://annovar.openbioinformatics.org/en/latest/user-guide/startup/ 都填英文 要不然会报错 tar -xzvf annovar.latest.tar.gztree . ├── annotate_variation.pl ├── coding_change.pl ├── convert2annovar.pl ├── example │ ├── ex1.avinput…

【电子通识】TINA-TI中怎么用分段线性源做周期性波形

在文章【电子通识】TINA-TI 如何产生动态电流波形?中我们讲到我们可以用piecewise linear分段性线源做一个动态脉冲。 但是这个动态脉冲只能保持一定的时间,那么如何做成周期性的动态脉冲呢? 我们使用以下关键字,来完成周期性动态负载创建 Repeat Forever ....周期…

Llamaindex RAG 实践

大模型支持的最强大的应用程序之一是复杂的问答聊天机器人。这些应用程序可以回答有关特定源信息的问题。这些应用程序使用一种称为检索增强生成 &#xff08;RAG&#xff09; 的技术。 1. 什么是RAG&#xff1f; 当你需要给模型注入新的知识时&#xff0c;有两种方法&#xf…

外包干了2个月,技术明显退步

回望过去&#xff0c;我是一名普通的本科生&#xff0c;于2019年通过校招有幸加入了南京某知名软件公司。那时的我&#xff0c;满怀着对未来的憧憬和热情&#xff0c;投入到了功能测试的岗位中。日复一日&#xff0c;年复一年&#xff0c;转眼间&#xff0c;我已经在这个岗位上…

Sigrity SPEED2000 Power Ground Noise Simulation模式如何进行信号时域仿真操作指导(一)-单个信号

Sigrity SPEED2000 Power Ground Noise Simulation模式如何进行信号时域仿真操作指导(一)-单个信号 Power Ground Noise Simulation模式除了可以对电源进行时域仿真外&#xff0c;同样支持对信号进行时域仿真&#xff0c;以下图为例进行说明 2D视图 3D view 本例中观测信号D2从…

String模拟实现【C++】【STL】

String模拟实现【C】【STL】 构造函数拷贝构造赋值重载析构函数<<赋值重载插入函数reserveappend函数push_back函数 earse函数完整代码string.hstring.cpp STL中有两个属性capacity和size&#xff0c;capacity是真正STL容器的真正内存大小&#xff0c;size是STL容器中数据…

前端CSS3 渐变详解

文章目录 CSS3 渐变详解一、引言二、CSS3 渐变基础1、线性渐变1.1、基本线性渐变1.2、改变渐变方向 2、径向渐变2.1、基本径向渐变2.2、设置径向渐变的中心 三、高级渐变技巧1、重复渐变1.1、重复线性渐变1.2、重复径向渐变 四、总结 CSS3 渐变详解 一、引言 在现代网页设计中…

Ubuntu系统安装minicom软件连接交换机

安装minicom&#xff1a; 电脑主机串口线连接上交换机的console口。开打乌班图系统终端&#xff0c;输入sudo -i切换为root用户 方法一&#xff1a; 输入 sudo apt-get install minicom 命令&#xff0c;安装minicom软件。 minicom 必须带有安装包的完整路径 文件名称 后…

异星工厂_1

经验 首次体验异星工厂这款游戏&#xff0c;得出了以下经验。 1. 基地的构建顺序&#xff1a;煤&#xff0c;电&#xff0c;原料&#xff0c;传送流&#xff0c;组装器&#xff0c;防御武器&#xff0c;其他 2. 永远不要让采集&#xff08;生产者&#xff09;停止&#xff0…