spring-boot 整合 redisson 实现延时队列(文末有彩蛋)

应用场景

通常在一些需要经历一段时间或者到达某个指定时间节点才会执行的功能,比如以下这些场景:

  • 订单超时提醒
  • 收货自动确认
  • 会议提醒
  • 代办事项提醒

为什么使用延时队列

对于数据量小且实时性要求不高的需求来说,最简单的方法就是定时扫描数据库。

但是,当数量达到数百万、上千万级别且时,定时扫库就显得非常低效且消耗资源,

甚至有些时间间隔小实时性要求高的情况,上一次扫描还没结束,下一次就又开始了,

这时候如果使用延时队列就会比较合适

延时队列的几种方式:

  • Quartz 定时任务实现扫库
  • DelayQueue JDK中提供了一组实现延迟队列的API
  • Redis sorted set
  • Redis 过期键监听回调
  • RabbitMQ 死信队列
  • RabbitMQ 基于插件实现延迟队列
  • Wheel 时间轮训算法

Redisson 实现延时队列

顾名思义 Redis son 就是 Redis 的儿子,举个栗子先:

1.引入 pom

<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>${lastest.version}</version>
</dependency>

2.封装一个 RedissonQueue 类

@Service
public class RedissonQueue {public static final String QUEUE = "delayQueue";// 默认超时时间,30秒public static final Integer DEFAULT_TIMEOUT = 30;@Resourceprivate RedissonClient redissonClient;// 加入任务并设置到期时间public void offer(String taskId, Integer timeout) {RDelayedQueue<String> delayedQueue = delayedQueue();delayedQueue.offer(taskId, Objects.isNull(timeout) ? DEFAULT_TIMEOUT : timeout, TimeUnit.SECONDS);}// 移除任务public void remove(String taskId) {RDelayedQueue<String> delayedQueue = delayedQueue();delayedQueue.removeIf(messageId -> messageId.equals(taskId));}// 任务列表public RDelayedQueue<String> delayedQueue() {RBlockingDeque<String> blockingDeque = blockingDeque();return redissonClient.getDelayedQueue(blockingDeque);}public RBlockingDeque<String> blockingDeque() {return redissonClient.getBlockingDeque(QUEUE);}public boolean isShutdown() {return redissonClient.isShutdown();}public void shutdown() {redissonClient.shutdown();}}

3.交给 Spring 管理

@Slf4j
@Service
public class RedissonService implements ApplicationRunner {@Resourceprivate RedissonQueue redissonQueue;@Resource(name = "threadPoolTaskExecutor")private ThreadPoolTaskExecutor executor;@Overridepublic void run(ApplicationArguments args) {RBlockingDeque<String> blockingDeque = redissonQueue.blockingDeque();executor.execute(() -> {while (true) {if (redissonQueue.isShutdown()) {return;} else {String messageId = null;try {messageId = blockingDeque.take();} catch (InterruptedException e) {log.warn("RedissonConsumer error:{}", e.getMessage());}if (!Objects.isNull(messageId) && !messageId.isEmpty()) {log.warn("timeout messageId : {}", messageId);}}}});}// 初始化,启动服务就执行一次@PostConstructpublic void init() {redissonQueue.delayedQueue();}@PreDestroypublic void shutdown() {redissonQueue.shutdown();}}

4.测试接口

@Operation(summary = "添加任务", description = "添加任务")
@PostMapping
public ResponseEntity<?> add(@RequestParam(value = "taskId", required = false) String taskId,@RequestParam(value = "timeout", required = false) Integer timeout) {taskId = StringUtils.isEmpty(taskId) ? String.valueOf(snowflake.nextId()) : taskId;redissonQueue.offer(taskId, timeout);return ResponseEntity.ok().body(redissonQueue.delayedQueue());
}@Operation(summary = "移除任务", description = "移除任务")
@DeleteMapping(value = "/{taskId}")
public ResponseEntity<?> remove(@PathVariable("taskId") String taskId) {redissonQueue.remove(taskId);return ResponseEntity.ok().body(redissonQueue.delayedQueue());
}

5.测试结果

添加10个任务

在这里插入图片描述

删除第1个任务

在这里插入图片描述

可以看到第一个任务删除后没有被执行(没有设置到期时间,默认为30秒到期)

在这里插入图片描述

实现原理

  • redisson_delay_queue_timeout:delayQueue,sorted set 数据类型,存放所有延迟任务,按延迟任务的到期时间戳(提交任务时间戳 +
    延迟时间)排序,所以列表最前面第一个元素就是整个延迟队列中最早被执行的任务。
  • redisson_delay_queue:delayQueue,list 数据类型,也是存放所有任务。
  • delayQueue,list 数据类型,被称为目标队列,这个里面存放的任务都是已经到延迟时间的,可以被消费者获取的任务,所以上面示例中
    RBlockingQueue 的 take 方法是从此目标队列中获取任务的。
  • redisson_delay_queue_channel:delayQueue,是一个 channel,用来通知客户端开启一个延迟任务
  • 生产者提交任务时将任务放到 redisson_delay_queue_timeout:delayQueue 中,提交任务的时间戳+延迟时间
  • 客户端会有一个延迟任务,这个延迟任务会向 Redis Server 发送一段 lua 脚本,Redis 执行 lua 脚本中的命令,此操作是原子性的

lua 脚本主要干两件事

  • 将到了延迟时间的任务从 redisson_delay_queue_timeout:delayQueue 中移除,存到 delayQueue 这个目标队列
  • 获取到 redisson_delay_queue_timeout:delayQueue 中最早到期时间的任务的到期时间戳,发布到 redisson_delay_queue_channel:
    delayQueue channel 中

当客户端监听到 redisson_delay_queue_channel:delayQueue 这个 channel 的消息时,会再次提交一个客户端延迟任务,延迟时间就是消息(最早到期时间任务的到期时间戳)当前时间戳
这个时间其实也就是 redisson_delay_queue_channel:delayQueue 中最早到期时间的任务的剩余的延迟时间。
一旦时间来到最早到期时间任务的到期时间戳,redisson_delay_queue_timeout:delayQueue 中最早到期时间的任务已经到期,客户端的延迟任务也同时到期,
于是开始执行 lua 脚本操作,及时将到期任务放到目标队列中。然后再次发布剩余的延迟任务中最早到期任务的到期时间戳到 channel
中,
如此循环运行下去,保证 redisson_delay_queue_timeout:delayQueue 中到期数据能及时放到目标队列中。
这里存在一个特殊情况,需要项目启动时就执行一次延时队列。因为由于没有客户端延迟任务的执行,
可能会出现 redisson_delay_queue_timeout:delayQueue 队列中有到期但是没有被放到目标队列的可能,启动就执行一次是为了保证到期的数据能被及时放到目标队列中。

结论

  • Redisson 方案理论上没有延迟,但当消息数量剧增,消费者消费缓慢这种情况下,可能会导致延迟任务消费的延迟。

  • 消息丢失问题 Redisson 方案最大程度上减轻消息丢失的可能性,因为所有任务都是存在 list 和 sorted set 两种数据类型中,Redis
    有持久化机制。除非整个 redis 集群宕机,可能丢失一小部分数据。

  • 广播任务问题,是不会出现的,因为每个客户端都是从同一个目标队列中获取任务。

Redisson 这种实现方案是比较合适且靠谱的,一般中小型项目建议用 Redisson 实现延迟队列,规模较大的项目直接上 MQ。

整合DEMO仓库地址

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1483844.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

【IEEE出版】第四届能源工程与电力系统国际学术会议(EEPS 2024)

第四届能源工程与电力系统国际学术会议&#xff08;EEPS 2024&#xff09; 2024 4th International Conference on Energy Engineering and Power Systems 重要信息 大会官网&#xff1a;www.iceeps.com 大会时间&#xff1a;2024年8月9-11日 大会…

S7-1200PLC使用西门子报文 111 和 FB38002(Easy_SINA_Pos)实现V90 PN总线伺服定位控制

1、博途1200/1500 PLC V90 PN通信 博途1200/1500PLC V90 PN通信控制 (FB284功能块)_fb284功能块文档说明-CSDN博客文章浏览阅读7k次。先简单说下如何获取FB284,一般有2种方法,Startdrive软件可以操作大部分西门子的驱动器,建议安装调试方便,缺点就是软件太大。_fb284功能…

安装anaconda后jupyter notebook打不开 闪退

首先&#xff0c;通过清华源安装了最新的anaconda&#xff08;安装在了D盘&#xff09; 尝试打开jupyter&#xff0c;发现小黑框1s后自己关了&#xff0c;根本不打开浏览器 之后尝试按照这个做了一遍https://blog.csdn.net/gary101818/article/details/123560304还是不行。。…

Jmeter性能测试(九)

一、Jmeter性能测试需要特别注意的地方 1、参数化 2、请求参数 3、BeanShell 预处理程序更新jmeter请求参数 4、接口中不可重复的随机数处理 5、线程组设置 6、总结 二、参数化 1、参数化配置,多个参数用英文逗号隔开 2、wallet参数化文件,不要写表头,多个参数用英文逗号…

二叉树的前、中、后序遍历(递归法、迭代法)leetcode144/94/145

leetcode144、二叉树的前序遍历 给你二叉树的根节点 root &#xff0c;返回它节点值的 前序 遍历。 示例 1&#xff1a; 输入&#xff1a;root [1,null,2,3] 输出&#xff1a;[1,2,3] 示例 2&#xff1a; 输入&#xff1a;root [] 输出&#xff1a;[] 示例 3&#xff1a;…

鸿蒙开发入门——ArkTS语法简介(万字简介)

ArkTS 作为鸿蒙开发的编程语言&#xff0c;我们先来一图看看这个语言&#xff0c;我们可以看到ArkTS是在TS&#xff08;TypeScript&#xff09;的基础上改造的&#xff0c;而TS又是在JS&#xff08;JavaSript&#xff09;上改造的&#xff0c;一句话总结就是ArkTS是TS的超集&a…

新版本 idea 创建不了 spring boot 2 【没有jkd8选项】

创建新项目 将地址换成如下 https://start.aliyun.com/

HackQuest介绍 web3 学习平台

HackQuest 官网地址&#xff1a; https://www.hackquest.io/zh HackQuest是一个专注于Web3技术教育的在线学习平台&#xff0c;旨在帮助全球开发者掌握区块链、加密货币和去中心化应用&#xff08;DApps&#xff09;领域的最新技能。该平台汇聚了超过14000名活跃开发者&#…

C学习(数据结构)-->单链表习题

目录 一、环形链表 题一&#xff1a;环形链表 思路&#xff1a; 思考一&#xff1a;为什么&#xff1f; 思考二&#xff1a;快指针一次走3步、4步、......n步&#xff0c;能否相遇 step1&#xff1a; step2&#xff1a; 代码&#xff1a; 题二&#xff1a; 环形链表 I…

区块链技术在溯源领域的应用

区块链技术具有去中心化、不可篡改、可追溯等特点&#xff0c;使其在溯源领域具有广阔的应用前景。具体而言&#xff0c;区块链技术可以应用于以下几个方面。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#xff0c;欢迎交流合作。 1. 产品溯源 产品溯源是指…

Nginx的核心功能

1. Nginx的核心功能 1.1 nginx反向代理功能 正向代理 代理的为客户端&#xff0c;对于服务器不知道真实客户的信息。例如&#xff1a;翻墙软件 反向代理服务器 代理的为服务器端。对于客户来说不知道服务器的信息。例如&#xff1a;nginx 项目部署图 web项目部署的虚拟机和Ng…

【linux】服务器安装NVIDIA驱动

【linux】服务器安装NVIDIA驱动 【创作不易&#xff0c;求点赞关注收藏】&#x1f600; 文章目录 【linux】服务器安装NVIDIA驱动一、关闭系统自带驱动nouveau二、下载英伟达驱动三、安装英伟达驱动1、禁用X服务器和相关进程2、在TTY终端安装驱动3、验证是否安装成功4、重新启…

HarmonyOS根据官网写案列~ArkTs从简单地页面开始

Entry Component struct Index {State message: string 快速入门;build() {Column() {Text(this.message).fontSize(24).fontWeight(700).width(100%).textAlign(TextAlign.Start).padding({ left: 16 }).fontFamily(HarmonyHeiTi-Bold).lineHeight(33)Scroll() {Column() {Ba…

Spring循环依赖与三级缓存

Spring循环依赖是指两个或多个Bean相互依赖&#xff0c;导致Spring无法在不部分实例化这些Bean的情况下完成它们的创建。在Spring框架中&#xff0c;为了解决循环依赖问题&#xff0c;Spring使用了三级缓存机制。 假设BeanA依赖BeanB&#xff0c;BeanB依赖BeanA&#xff0c;Spr…

Nginx详解(超级详细)

目录 Nginx简介 1. 为什么使用Nginx 2. 安装Nginx Nginx的核心功能 1. Nginx反向代理功能 2. Nginx的负载均衡 3 Nginx动静分离 Nginx简介 Nginx是一款轻量级的Web 服务器/反向代理服务器及电子邮件&#xff08;IMAP/POP3&#xff09;代理服务器&#xff0c;在BSD-like 协…

2-35 基于matlab的四足液压机器人设计程序

基于matlab的四足液压机器人设计程序&#xff0c;界面化例程&#xff0c;输入液压机器人结构参数&#xff0c;输出液压缸的行程、推力和速度。程序已调通&#xff0c;可直接运行。 2-35 四足液压机器人 液压机器人结构参数 - 小红书 (xiaohongshu.com)

Postman、Apifox、Apipost用哪个?

Postman、Apifox、Apipost都是流行的API接口管理工具&#xff0c;它们各自具有不同的特点和优势&#xff0c;因此哪个更好用取决于具体的使用场景和需求。以下是对这三个工具的比较分析&#xff1a; 一、Postman 特点与优势&#xff1a; 支持多种请求方式&#xff1a;包括GE…

基于Python+Django+MySQL的心理咨询预约系统

心理咨询预约系统 DjangoMySQL 基于PythonDjangoMySQL的心理咨询预约系统 项目主要依赖Django3.2&#xff0c;MySQL 支持随机验证码生成与登录验证 简介 基于PythonDjangoMySQL的心理咨询预约系统通过连接数据库获取数据&#xff0c;登录新增随机数字验证码验证。具体可以看…

VisualRules-Web案例展示(一)

VisualRules单机版以其卓越的功能深受用户喜爱。现在&#xff0c;我们进一步推出了VisualRules-Web在线版本&#xff0c;让您无需安装任何软件&#xff0c;即可在任何浏览器中轻松体验VisualRules的强大功能。无论是数据分析、规则管理还是自动化决策&#xff0c;VisualRules-W…

GESP CCF C++ 四级认证真题 2024年6月

第 1 题 下列代码中&#xff0c;输出结果是&#xff08; &#xff09; A. 12 24 24 12 B. 24 12 12 24 C. 12 12 24 24 D. 24 24 12 12 第 2 题 下面函数不能正常执行的是&#xff08;&#xff09; A. B. C. D. 第 3 题 下面程序…