Java实现一个延时队列

文章目录

  • 前言
  • 正文
    • 一、基本概念
      • 1.1 延时队列的特点
      • 1.2 常见的实现方式
    • 二、Java原生的内存型延时队列
      • 2.1 定义延时元素DelayedElement
      • 2.2 定义延时队列管理器DelayedQueueManager
      • 2.3 消费元素
      • 2.4 调试
      • 2.5 调试结果
      • 2.6 精髓之 DelayQueue.poll()
    • 三、基于Redisson的延时队列
      • 3.1 定义延时队列管理器
      • 3.2 调试
      • 3.3 调试结果

前言

业务中经常会出现各种涉及到定时,延迟执行的需求任务。

有一种队列专门处理这种情况。那就是延时队列。

本文提供两种实现方式:

  1. java原生的内存型延时队列;
  2. redisson 的内置延时队列;

正文

一、基本概念

延时队列(Delay Queue)是一种特殊的消息队列,用于处理需要在将来某个时间点执行的任务。

与普通的队列不同,延时队列中的消息在指定的时间之前是不可见的,只有当消息的延时时间到达后,消息才会被消费。

1.1 延时队列的特点

  • 延时性:消息在进入队列后并不会立即被消费,而是需要等待一段时间后才能被消费。
  • 有序性:消息按照延时时间的先后顺序被消费。
  • 可靠性:通常需要保证消息不丢失,即使在系统故障的情况下也能恢复。(这里对于内存型的延时队列不太适合,一旦内存释放就会丢失消息)

1.2 常见的实现方式

  • 数据库:使用数据库的定时任务或触发器。
  • 消息队列:使用支持延时消息的消息队列,如 RabbitMQ、Kafka、RocketMQ 等。
  • 内存队列:使用内存中的数据结构,如 Java 中的 DelayQueue。
  • Redis:使用 Redis 的 sorted set 或 Redisson 的 RDelayedQueue。

二、Java原生的内存型延时队列

使用 Java 的 DelayQueue

  • 生产者:将任务封装成 Delayed 接口的实现类,添加到 DelayQueue 中。
  • 消费者:使用 take 或 poll 方法从 DelayQueue 中取出任务进行处理。

2.1 定义延时元素DelayedElement

package com.pine.common.util.delayqueue;import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;/*** 延迟元素** @author fengjinsong*/
public class DelayedElement implements Delayed {/*** 延迟时间(单位:毫秒)*/private final AtomicLong delayTime;/*** 到期时间*/private final AtomicLong expire;/*** 任务数据*/private final Object data;/*** 执行次数*/private final AtomicInteger executionFrequency;public DelayedElement(long delayTime, Object data) {this.delayTime = new AtomicLong(delayTime);this.expire = new AtomicLong(System.currentTimeMillis() + delayTime);this.data = data;this.executionFrequency = new AtomicInteger(0);}public Object getData() {return this.data;}public AtomicInteger getExecutionFrequency() {return executionFrequency;}public void setExecutionFrequency() {this.executionFrequency.incrementAndGet();}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(this.expire.longValue() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed o) {return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));}/*** 重置延迟时间*/public void resetDelay(long delayTime) {this.delayTime.set(delayTime);this.expire.set(System.currentTimeMillis() + this.delayTime.longValue());}/*** 重置延迟时间*/public void resetDelay() {resetDelay(this.delayTime.longValue());}@Overridepublic String toString() {return "DelayedElement{" +"delayTime=" + delayTime +", expire=" + expire +", data=" + data +", executionFrequency=" + executionFrequency +'}';}
}

2.2 定义延时队列管理器DelayedQueueManager

package com.pine.common.util.delayqueue;import java.util.List;
import java.util.concurrent.DelayQueue;/*** 延时队列管理器** @author fengjinsong*/
public class DelayedQueueManager {private DelayedQueueManager() {}/*** 延时队列*/private static final DelayQueue<DelayedElement> DELAY_QUEUE = new DelayQueue<>();/*** 添加元素** @param element 元素*/public static void addElement(DelayedElement element) {DELAY_QUEUE.add(element);}public static void addElement(List<DelayedElement> elements) {DELAY_QUEUE.addAll(elements);}/*** 获取元素,并从队列中移除该元素** @return 元素*/public static DelayedElement pollElement() {return DELAY_QUEUE.poll();}
}

2.3 消费元素

package com.pine.common.util.delayqueue;import java.time.LocalDateTime;public class DelayedElementConsumer implements Runnable {private final static int[] FREQUENCY_SEQUENCE = new int[]{1, 2, 3, 6, 12, 24, 48, 96, 192, 384, 768};@Overridepublic void run() {boolean hasDelayedElement = true;while (hasDelayedElement) {// 获取元素DelayedElement element = DelayedQueueManager.pollElement();try {if (element != null) {System.out.println(LocalDateTime.now() + "消费了延迟元素:" + element);if (element.getData().toString().contains("3")) {throw new RuntimeException("模拟报错");}} else {hasDelayedElement = false;}} catch (Exception e) {retry(element);}}}private void retry(DelayedElement element) {element.setExecutionFrequency();System.out.println("执行出错:" + element);//出错3次后,不再重试if (element.getExecutionFrequency().intValue() > 3) {System.out.println("出错3次后,不再重试");} else {element.resetDelay(FREQUENCY_SEQUENCE[element.getExecutionFrequency().intValue() + 3] * 1000);// 重试DelayedQueueManager.addElement(element);}}}

2.4 调试

package com.pine.common.redis.delayqueue;import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;public class Client {public static void main(String[] args) {// 模拟生产数据RedissonDelayedQueueManager.offer("hello22", 3000);RedissonDelayedQueueManager.offer("hello33", 5000);// 模拟消费数据System.out.println(LocalDateTime.now() + "开始消费数据");while (true) {Object object = RedissonDelayedQueueManager.poll(10, TimeUnit.SECONDS);if (object != null) {System.out.println("-----------------------" + LocalDateTime.now() + ":" + object);}}}
}

2.5 调试结果

2024-11-06T16:57:39.342358开始消费数据
-----------------------2024-11-06T16:57:42.285383:hello22
-----------------------2024-11-06T16:57:44.378298:hello33

可以观察到 hello22 延时了3秒;hello33延时了5秒;

2.6 精髓之 DelayQueue.poll()

检索并删除此队列的头部,如果此队列没有延迟过期的元素,则返回null。

public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {E first = q.peek();return (first == null || first.getDelay(NANOSECONDS) > 0)? null: q.poll();} finally {lock.unlock();}}

三、基于Redisson的延时队列

使用 Redisson 的 RDelayedQueue

  • 生产者:使用 RDelayedQueue 的 offer 方法将任务添加到队列中,指定延时时间。
  • 消费者:使用 RQueue 的 poll 方法从队列中取出任务进行处理。

3.1 定义延时队列管理器

package com.pine.common.redis.delayqueue;import org.redisson.Redisson;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;import java.io.IOException;
import java.util.concurrent.TimeUnit;public class RedissonDelayedQueueManager {private static final String QUEUE_NAME = "delay_queue";private static final RedissonClient REDISSON_CLIENT;static {try {String content = """singleServerConfig:address: "redis://10.189.64.136:8379"""";Config config = Config.fromYAML(content);REDISSON_CLIENT = Redisson.create(config);} catch (IOException e) {throw new RuntimeException(e);}}/*** 获取延迟队列* <p>* 本方法通过Redisson客户端创建一个阻塞队列,并基于该阻塞队列创建一个延迟队列* 延迟队列用于处理需要延迟执行的任务,例如任务重试机制、任务调度等场景** @param <T> 队列中元素的类型* @return 返回一个延迟队列实例,用于后续的操作和管理*/private static <T> RDelayedQueue<T> getDelayedQueue() {// 创建一个阻塞队列,这是后续创建延迟队列的基础RBlockingQueue<T> queue = REDISSON_CLIENT.getBlockingQueue(QUEUE_NAME);// 基于阻塞队列创建延迟队列并返回return REDISSON_CLIENT.getDelayedQueue(queue);}/*** 向延迟队列中添加元素,并设置延迟时间** @param task     要添加的元素* @param delayTime 延迟时间,单位为毫秒* @param <T>      元素类型*/public static <T> void offer(T task, long delayTime) {RDelayedQueue<T> delayedQueue = getDelayedQueue();delayedQueue.offer(task, delayTime, TimeUnit.MILLISECONDS);}/*** 从延迟队列中获取元素,并设置超时时间** @param timeout 超时时间,单位为毫秒* @param unit    超时时间单位* @param <T>     元素类型* @return 返回获取到的元素,如果没有获取到元素则返回null*/public static <T> T poll(long timeout, TimeUnit unit) {RBlockingQueue<T> queue = REDISSON_CLIENT.getBlockingQueue(QUEUE_NAME);try {return queue.poll(timeout, unit);} catch (InterruptedException e) {throw new RuntimeException(e);}}
}

3.2 调试

package com.pine.common.redis.delayqueue;import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;public class Client {public static void main(String[] args) {// 模拟生产数据RedissonDelayedQueueManager.offer("hello22", 3000);RedissonDelayedQueueManager.offer("hello33", 5000);// 模拟消费数据System.out.println(LocalDateTime.now() + "开始消费数据");while (true) {Object object = RedissonDelayedQueueManager.poll(10, TimeUnit.SECONDS);if (object != null) {System.out.println("-----------------------" + LocalDateTime.now() + ":" + object);}}}
}

3.3 调试结果

2024-11-06T17:05:31.630768开始消费数据
-----------------------2024-11-06T17:05:34.548032:hello22
-----------------------2024-11-06T17:05:36.732607:hello33

可以观察到 hello22 延时了3秒;hello33延时了5秒;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/5559.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

越权访问漏洞

V2Board Admin.php 越权访问漏洞 ## 漏洞描述 V2board面板 Admin.php 存在越权访问漏洞&#xff0c;由于部分鉴权代码于v1.6.1版本进行了修改&#xff0c;鉴权方式变为从Redis中获取缓存判定是否存在可以调用… V2Board Admin.php 越权访问漏洞 漏洞描述 V2board面板 Admin.ph…

安装和运行开发微信小程序

下载HBuilder uniapp官网 uni-app官网 微信开发者工具 安装 微信小程序 微信小程序 官网 微信小程序 配置 运行 注意&#xff1a;运行前需要开启服务端口 如果运行看不到效果&#xff0c;设置下基础库选别的版本 配置

小檗碱和卤代苄基异喹啉生物碱的代谢工程合成-文献精读79

De novo biosynthesis of berberine and halogenated benzylisoquinoline alkaloids in Saccharomyces cerevisiae 在 酿酒酵母&#xff08;Saccharomyces cerevisiae&#xff09;中从头合成小檗碱和卤代苄基异喹啉生物碱 小檗碱的酵母代谢工程生物合成-文献精读78 苄基异喹…

鸿蒙开发案例:七巧板

【1】引言&#xff08;完整代码在最后面&#xff09; 本文介绍的拖动七巧板游戏是一个简单的益智游戏&#xff0c;用户可以通过拖动和旋转不同形状的七巧板块来完成拼图任务。整个游戏使用鸿蒙Next框架开发&#xff0c;利用其强大的UI构建能力和数据响应机制&#xff0c;实现了…

【TS】九天学会TS语法——1.TypeScript 是什么

今天学习的是TypeScript 基础&#xff0c;目标是了解 TypeScript 的基本概念&#xff0c;安装 TypeScript&#xff0c;编写第一个 TypeScript 程序。 TypeScript 简介安装 TypeScriptTypeScript 编译过程编写第一个 TypeScript 程序 随着前端开发的不断发展&#xff0c;TypeScr…

Docker学习—Docker的安装与使用

Docker安装 1.卸载旧版 首先如果系统中已经存在旧的Docker&#xff0c;则先卸载&#xff1a; yum remove docker \docker-client \docker-client-latest \docker-common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-engine2.配置Docker的yum库 首先…

69.ov5640摄像头HDMI灰度显示

&#xff08;1&#xff09;理论学习 灰度像素&#xff1a;在 RGB 颜色模型下&#xff0c;图像中每个像素颜色的 R、G、B 三种基色的分量值相等的像素。由灰度像素组成的灰度图像只能表现256中颜色&#xff08;或亮度&#xff09;&#xff0c;通常把灰度图像中像素的亮度称为灰…

Star Tower:开启数据存储新纪元

在科技飞速发展的当今时代&#xff0c;数据如同璀璨的星辰&#xff0c;闪耀着无尽的价值。而数据存储系统&#xff0c;则是承载这些星辰的浩瀚宇宙。Star Tower 以其卓越的性能和创新的理念&#xff0c;开启了数据存储的新纪元。 Star Tower 的分布式存储架构&#xff0c;是一…

基于SSM的企业管理系统(源码+lw+调试+技术指导)

项目描述 临近学期结束&#xff0c;还是毕业设计&#xff0c;你还在做java程序网络编程&#xff0c;期末作业&#xff0c;老师的作业要求觉得大了吗?不知道毕业设计该怎么办?网页功能的数量是否太多?没有合适的类型或系统?等等。这里根据疫情当下&#xff0c;你想解决的问…

鸿蒙应用App测试-通用测试

注意&#xff1a;大家记得学完通用测试记得再学鸿蒙专项测试 鸿蒙应用App测试-专项测试&#xff08;DevEco Testing&#xff09;-CSDN博客 注意&#xff1a;博主有个鸿蒙专栏&#xff0c;里面从上到下有关于鸿蒙next的教学文档&#xff0c;大家感兴趣可以学习下 如果大家觉得…

掌握Qt调试技术

文章目录 前言一、Qt调试的基本概念二、Qt调试工具三、Qt调试实践四、Q调试技巧五、总结前言 在软件开发中,调试是一个至关重要的环节。Qt作为一个广泛使用的跨平台C++图形用户界面应用程序开发框架,其调试技术也显得尤为重要。本文将深入探讨Qt调试技术,帮助读者更好地掌握…

Qt中时间戳转化为时间

QT中时间和时间戳互相转化_currentsecssinceepoch-CSDN博客 qDebug()<<QDateTime::currentMSecsSinceEpoch(); 1730838034770 时间戳(Unix timestamp)转换工具 - 在线工具 (tool.lu) [static] qint64 QDateTime::currentMSecsSinceEpoch() Returns the number of milli…

势不可挡 创新引领 | 生信科技SOLIDWORKS 2025新品发布会·苏州站精彩回顾

2024年11月01日&#xff0c;由生信科技举办的SOLIDWORKS 2025新产品发布会在江苏苏州圆满落幕。现场邀请到制造业的专家学者们一同感受SOLIDWORKS 2025最新功能&#xff0c;探索制造业数字化转型之路。 在苏州站活动开场&#xff0c;达索系统专业客户事业部华东区渠道经理马腾飞…

ArcGIS006:ArcMap常用操作151-200例动图演示

摘要&#xff1a;本文介绍了ArcMap邻域分析、栅格表面分析、水文分析、区域分析、提取分析等工具箱中的工具功能。包括计算图层间点、线、面要素间的距离、位置和角度&#xff0c;创建缓冲区&#xff0c;添加计算信息到属性表&#xff0c;分割面要素&#xff0c;编号&#xff0…

关于马达驱动芯片AT6237的开发指南(兼容DRV8837)

一、芯片引脚介绍 1.芯片引脚 二、系统结构图 三、功能描述 逻辑功能

利用AI提升SEO效果的关键词策略

内容概要 在当今数字化时代&#xff0c;人工智能&#xff08;AI&#xff09;正逐步成为提升搜索引擎优化&#xff08;SEO&#xff09;效果的重要工具。SEO的核心在于关键词的选择与运用&#xff0c;而AI技术则为我们提供了一种智能化的分析和优化方案。通过对大量数据进行挖掘…

了解数据库设计中的反规范化

反规范化是指通过增加冗余数据来提高数据库的读取效率。也就是说,反规范化通过在表中增加冗余字段来减少数据库中的表连接,以提高查询速度。规范化和反规范化是关系型数据库设计中的两个重要方面,它们分别代表了数据组织方式上的两个不同方向。规范化是为了减少数据冗余和提…

【系统集成项目管理工程师教程】第10章 启动过程组

启动过程组包含制定项目章程和识别干系人两个过程&#xff0c;是项目的起始阶段&#xff0c;旨在协调各方期望&#xff0c;明确项目范围、目标与干系人&#xff0c;确保项目符合组织战略&#xff0c;为项目成功奠定基础&#xff0c;在项目管理中起着至关重要的引领作用。 10.…

斑马打印机如何与工业系统(如MES、ERP、数据库等)自动化通讯?

摘要&#xff1a;本文将介绍OPC Router与斑马&#xff08;Zebra&#xff09;打印机相结合的优势&#xff0c;探讨其在工业4.0和物联网领域的应用&#xff0c;以及如何通过简单配置实现数据传输和智能监控。 在工业生产过程中&#xff0c;标签打印环节至关重要。斑马&#xff08…

二开CS—上线流量特征shellcode生成修改模板修改反编译打包

前言 免杀几乎讲的差不多了&#xff0c;今天讲个CS的二次开发。我们原生态的CS特征肯定都是被提取完的了&#xff0c;包括它的流量特征&#xff0c;而我们要做的就是把它的流量特征给打乱&#xff0c;还可以修改生成的后门&#xff0c;使其生成即免杀。 实验环境 CS4.4&…