当前位置: 首页 > news >正文

SpringBoot + SSE 实时异步流式推送

前言

在当今数字化时代,实时数据处理对于企业的决策和运营至关重要。许多业务场景需要及时响应数据库中的数据变化,例如电商平台实时更新库存、金融系统实时监控交易数据等。

本文将详细介绍如何通过Debezium捕获数据库变更事件,并利用Server - Sent Events(SSE)将这些变更实时推送给前端应用。

技术背景

+----------------+          +----------------+          +----------------+  
|   MySQL 数据库  | 监听变更  |  SpringBoot 服务  |  推送变更  |    Web 前端     |  
|  (Binlog 模式)  | ------>  | (Debezium CDC) | ------>  | (EventSource)  |  
+----------------+          +----------------+          +----------------+  
  • Debezium 是一个开源的分布式平台,它能够监控数据库的变化,并将这些变化以事件流的形式发送出去。它支持多种数据库,如 MySQL、PostgreSQL 等,通过模拟数据库的复制协议来实现对数据库变更的实时捕获。

  • Server - Sent Events(SSE)是一种允许网页自动获取服务器推送更新的技术。它基于 HTTP 协议,通过一个单向的连接,服务器可以持续向客户端发送事件流数据,非常适合实时数据推送的场景。

环境准备

MySQL 配置
-- 启用 Binlog(ROW 模式)  
SET GLOBAL log_bin = ON;  
SET GLOBAL binlog_format = 'ROW';  -- 创建 CDC 用户(需 REPLICATION 权限)  
CREATE USER 'cdc_user' IDENTIFIED BY 'cdc_pass';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc_user';  

配置要点:确保 Binlog 记录行级变更‌

引入依赖
<dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>1.6.0.Final</version>
</dependency>
<dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-mysql</artifactId><version>1.6.0.Final</version>
</dependency>

核心代码实现

Debezium 监听服务
@Slf4j
@Component
public class BinlogListener {@Autowiredprivate SseService sseService;@PostConstructpublic void start() {Configuration config = Configuration.create().with("name", "mysql-connector-1").with("connector.class", "io.debezium.connector.mysql.MySqlConnector").with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore").with("offset.storage.file.filename", "D:\\usr\\debezium\\mysql-offsets.dat").with("offset.flush.interval.ms", "10000").with("database.server.name", "mysql-connector-1").with("database.hostname", "localhost").with("database.port", "3306").with("database.user", "root").with("database.password", "root").with("database.server.id", "1").with("database.include.list", "scf").with("table.include.list", "scf.user").with("include.schema.changes", "false").with("snapshot.mode", "initial").with("database.history.skip.unparseable.ddl", "true") // 忽略解析错误.with("database.connection.attempts", "5") // 最大重试次数.with("database.connection.backoff.ms", "10000") // 重试间隔 10s.with("database.history", "io.debezium.relational.history.FileDatabaseHistory").with("database.history.file.filename", "D:\\usr\\debezium\\mysql-history.dat").build();EmbeddedEngine engine = EmbeddedEngine.create().using(config).notifying(this::handleEvent).build();Executors.newSingleThreadExecutor().execute(engine::run);}private void handleEvent(SourceRecord record) {Struct value = (Struct) record.value();Struct after = value.getStruct("after");// 转换为 Map 并序列化Map<String, Object> dataMap = new HashMap<>();dataMap.put("id", after.getString("id"));dataMap.put("name", after.getString("name"));dataMap.put("age", after.getInt32("age"));sseService.broadcast(JSON.toJSONString(dataMap));}
}
SSE 推送服务
@Service  
public class SseService {  private final Set<SseEmitter> emitters = ConcurrentHashMap.newKeySet();  public SseEmitter subscribe() {  SseEmitter emitter = new SseEmitter(60_000L);  emitter.onCompletion(() -> emitters.remove(emitter));  emitters.add(emitter);  return emitter;  }  public void broadcast(String data) {  emitters.forEach(emitter -> {  try {  emitter.send(SseEmitter.event()  .data(data)  .id(UUID.randomUUID().toString()));  } catch (IOException e) {  emitter.completeWithError(e);  }  });  }  
}  
控制器层
@RestController  
@RequestMapping("/sse")  
public class SseController {  @Autowired  private SseService sseService;  @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)  public SseEmitter stream() {  return sseService.subscribe();  }  
}  

前端实现

<html lang="en">
<head><meta charset="UTF-8"><title>实时数据推送测试</title>
</head>
<body>
<div id="updates"></div>
<script>const eventSource = new EventSource('/sse/stream');eventSource.onmessage = e => {const data = JSON.parse(e.data);document.getElementById('updates').innerHTML +=`<p>用户变更: ID=${data.id}, 姓名=${data.name}</p>`;};eventSource.onerror = e => console.error("SSE 错误:", e);
</script>
</body>
</html> 

测试数据变更

图片

http://www.xdnf.cn/news/176599.html

相关文章:

  • Linux内核中的编译时安全防护:以网络协议栈控制块校验为例
  • mAh 与 Wh:电量单位的深度解析
  • 【Pandas】pandas DataFrame rtruediv
  • 全网直播推介会,九识智能与申通快递达成全面战略合作
  • 20.压敏电阻的特性与使用注意事项
  • RuoYi-Vue项目Docker镜像构建、推送与部署完整流程
  • 云平台+MQTT+C#上位机+单片机通信
  • 在 UniApp 中实现 App 与 H5 页面的跳转及通信
  • lightrag : from lightrag.utils import EmbeddingFunc 报错
  • 04.通过OpenAPI-Swagger规范让Dify玩转Agent
  • 【Redis】set类型
  • JavaEE-多线程实战02
  • AI如何重塑CC防护行业?五大变革与实战策略解析
  • 【创新实训个人博客】multi-agent调研(2)
  • promis(resolve,reject)入门级别
  • 互联网大厂Java面试:从Spring Boot到微服务架构的实践与挑战
  • 智诚科技苏州SOLIDWORKS授权代理商的卓越之选
  • vite.config.ts 的详细配置项说明、完整代码示例及表格总结
  • 代码随想录算法训练营day12(二叉树)
  • javaScript--数据结构和算法
  • 轮转数组(中等)
  • 如何优雅地解决AI生成内容粘贴到Word排版混乱的问题?
  • 从“世界工厂”到“智造之都”:双运放如何改写东莞产业基因?
  • JavaScript 中 undefined 和 not defined 的区别
  • Dev控件RadioGroup 如何设置一排有N个显示或分为几行
  • 使用cesium设置第一视角
  • 第2讲、Tensor高级操作与自动求导详解
  • w~嵌入式C语言~合集6
  • 【计算机哲学故事1-2】输入输出(I/O):你吸收什么,便成为什么
  • APP、游戏、网站被黑客攻击了怎么解决?