SpringBoot + SSE 实时异步流式推送
前言
在当今数字化时代,实时数据处理对于企业的决策和运营至关重要。许多业务场景需要及时响应数据库中的数据变化,例如电商平台实时更新库存、金融系统实时监控交易数据等。
本文将详细介绍如何通过Debezium
捕获数据库变更事件,并利用Server - Sent Events(SSE)
将这些变更实时推送给前端应用。
技术背景
+----------------+ +----------------+ +----------------+
| MySQL 数据库 | 监听变更 | SpringBoot 服务 | 推送变更 | Web 前端 |
| (Binlog 模式) | ------> | (Debezium CDC) | ------> | (EventSource) |
+----------------+ +----------------+ +----------------+
-
Debezium
是一个开源的分布式平台,它能够监控数据库的变化,并将这些变化以事件流的形式发送出去。它支持多种数据库,如MySQL、PostgreSQL
等,通过模拟数据库的复制协议来实现对数据库变更的实时捕获。 -
Server - Sent Events(SSE)
是一种允许网页自动获取服务器推送更新的技术。它基于HTTP
协议,通过一个单向的连接,服务器可以持续向客户端发送事件流数据,非常适合实时数据推送的场景。
环境准备
MySQL 配置
-- 启用 Binlog(ROW 模式)
SET GLOBAL log_bin = ON;
SET GLOBAL binlog_format = 'ROW'; -- 创建 CDC 用户(需 REPLICATION 权限)
CREATE USER 'cdc_user' IDENTIFIED BY 'cdc_pass';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc_user';
❝配置要点:确保 Binlog 记录行级变更
引入依赖
<dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>1.6.0.Final</version>
</dependency>
<dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-mysql</artifactId><version>1.6.0.Final</version>
</dependency>
核心代码实现
Debezium 监听服务
@Slf4j
@Component
public class BinlogListener {@Autowiredprivate SseService sseService;@PostConstructpublic void start() {Configuration config = Configuration.create().with("name", "mysql-connector-1").with("connector.class", "io.debezium.connector.mysql.MySqlConnector").with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore").with("offset.storage.file.filename", "D:\\usr\\debezium\\mysql-offsets.dat").with("offset.flush.interval.ms", "10000").with("database.server.name", "mysql-connector-1").with("database.hostname", "localhost").with("database.port", "3306").with("database.user", "root").with("database.password", "root").with("database.server.id", "1").with("database.include.list", "scf").with("table.include.list", "scf.user").with("include.schema.changes", "false").with("snapshot.mode", "initial").with("database.history.skip.unparseable.ddl", "true") // 忽略解析错误.with("database.connection.attempts", "5") // 最大重试次数.with("database.connection.backoff.ms", "10000") // 重试间隔 10s.with("database.history", "io.debezium.relational.history.FileDatabaseHistory").with("database.history.file.filename", "D:\\usr\\debezium\\mysql-history.dat").build();EmbeddedEngine engine = EmbeddedEngine.create().using(config).notifying(this::handleEvent).build();Executors.newSingleThreadExecutor().execute(engine::run);}private void handleEvent(SourceRecord record) {Struct value = (Struct) record.value();Struct after = value.getStruct("after");// 转换为 Map 并序列化Map<String, Object> dataMap = new HashMap<>();dataMap.put("id", after.getString("id"));dataMap.put("name", after.getString("name"));dataMap.put("age", after.getInt32("age"));sseService.broadcast(JSON.toJSONString(dataMap));}
}
SSE 推送服务
@Service
public class SseService { private final Set<SseEmitter> emitters = ConcurrentHashMap.newKeySet(); public SseEmitter subscribe() { SseEmitter emitter = new SseEmitter(60_000L); emitter.onCompletion(() -> emitters.remove(emitter)); emitters.add(emitter); return emitter; } public void broadcast(String data) { emitters.forEach(emitter -> { try { emitter.send(SseEmitter.event() .data(data) .id(UUID.randomUUID().toString())); } catch (IOException e) { emitter.completeWithError(e); } }); }
}
控制器层
@RestController
@RequestMapping("/sse")
public class SseController { @Autowired private SseService sseService; @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public SseEmitter stream() { return sseService.subscribe(); }
}
前端实现
<html lang="en">
<head><meta charset="UTF-8"><title>实时数据推送测试</title>
</head>
<body>
<div id="updates"></div>
<script>const eventSource = new EventSource('/sse/stream');eventSource.onmessage = e => {const data = JSON.parse(e.data);document.getElementById('updates').innerHTML +=`<p>用户变更: ID=${data.id}, 姓名=${data.name}</p>`;};eventSource.onerror = e => console.error("SSE 错误:", e);
</script>
</body>
</html>