redisson 延迟队列实现任务过期监听

一、需求:

任务超过一个小时以后,如果还为待执行状态,则自动转为结束状态。

二、实现:

  1. 创建延迟队列的监听任务RedisDelayedQueueListener,消费延迟队列;
  2. 创建新增延迟队列的类,用于创建延迟队列;
  3. 整体初始化,把监听任务与spring绑定,扫描各个监听延迟队列的实现类,并开启单独线程,监听任务;
  4. 创建延迟任务。

三、实现步骤:

1.引入redisson依赖,这里直接引入springboot整合好的依赖,如果引用原生的依赖,需要自己配置redissonClient Bean。

<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.10.5</version>
</dependency>

2.创建延时队列监听接口,定义延时队列到期事件处理方法,消费延时队列

/*** redis 队列事件监听,需要实现这个方法* @param <T>*/
public interface RedisDelayedQueueListener<T> {/*** 执行方法* @param t*/void invoke(T t);
}

3.具体的延时队列消费实现

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** 订单支付过期监听*/
@Component
@Slf4j
public class OrderPayExpirationListener implements RedisDelayedQueueListener<String>{@Autowiredprivate ITOrderService orderService;@Overridepublic void invoke(String orderId) {log.info("===" + orderId + ===");//查询到订单,判断为未支付,修改订单状态TOrder order = orderService.lambdaQuery().eq(TOrder::getOrderId, orderId).one();if (order.getOrderStatus() == 1) { //订单未支付TOrder tOrder = new TOrder();tOrder.setOrderId(orderId);tOrder.setOrderStatus(0); //更新订单为取消状态orderService.updateById(tOrder);}}
}

4.初始化,把监听任务与spring绑定

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.util.Map;/*** redis 延时队列初始化*/
@Component
@Slf4j
public class RedisDelayedQueueInit implements ApplicationContextAware {@Autowiredprivate RedissonClient redissonClient;/*** 获取应用上下文并获取相应的接口实现类* @param applicationContext* @throws BeansException*/@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {Map<String, RedisDelayedQueueListener> map = applicationContext.getBeansOfType(RedisDelayedQueueListener.class);for (Map.Entry<String, RedisDelayedQueueListener> taskEventListenerEntry : map.entrySet()) {String listenerName = taskEventListenerEntry.getValue().getClass().getName();startThread(listenerName, taskEventListenerEntry.getValue());}}/*** 启动线程获取队列* @param queueName 队列名称* @param redisDelayedQueueListener 任务回调监听*/private <T> void startThread(String queueName, RedisDelayedQueueListener redisDelayedQueueListener) {RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);//由于此线程需要常驻,可以新建线程,不用交给线程池管理Thread thread = new Thread(() -> {log.info("启动监听队列线程" + queueName);while (true) {try {T t = blockingFairQueue.take();log.info("监听队列线程{},获取到值:{}", queueName, JSON.toJSONString(t));redisDelayedQueueListener.invoke(t);} catch (Exception e) {log.info("监听队列线程错误,", e);}}});thread.setName(queueName);thread.start();}
}

5.创建延时任务

import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;/*** Redis 延时队列*/
@Component
@Slf4j
public class RedisDelayedQueue {@Autowiredprivate RedissonClient redissonClient;/*** 添加对象进延时队列* @param putInData 添加数据* @param delay     延时时间* @param timeUnit  时间单位* @param queueName 队列名称* @param <T>*/private <T> void addQueue(T putInData,long delay, TimeUnit timeUnit, String queueName){log.info("添加延迟队列,监听名称:{},时间:{},时间单位:{},内容:{}" , queueName, delay, timeUnit,putInData);RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);delayedQueue.offer(putInData, delay, timeUnit);}/*** 添加队列-秒** @param t     DTO传输类* @param delay 时间数量* @param <T>   泛型*/public <T> void addQueueSeconds(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {addQueue(t, delay, TimeUnit.SECONDS, clazz.getName());}/*** 添加队列-分** @param t     DTO传输类* @param delay 时间数量* @param <T>   泛型*/public <T> void addQueueMinutes(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {addQueue(t, delay, TimeUnit.MINUTES, clazz.getName());}/*** 添加队列-时** @param t     DTO传输类* @param delay 时间数量* @param <T>   泛型*/public <T> void addQueueHours(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {addQueue(t, delay, TimeUnit.HOURS, clazz.getName());}/*** 添加队列-天** @param t     DTO传输类* @param delay 时间数量* @param <T>   泛型*/public <T> void addQueueDays(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {addQueue(t, delay, TimeUnit.DAYS, clazz.getName());}
}

6.此时只需要再下单成功的方法里面新增以下逻辑即可

@Autowiredprivate RedisDelayedQueue redisDelayedQueue;//将订单id放入延时队列,配置过期监听的处理类
redisDelayedQueue.addQueueHours(id,2, OrderPayExpirationListener.class);

以上参考:https://www.cnblogs.com/huaixiaonian/p/16978606.html

四、我的优化

4.1 此场景中,ApplicationContextAware存在的问题

介绍ApplicationContextAware和ApplicationRunner的区别

  • ApplicationContextAware:在Bean初始化过程中initializeBean()函数中;(项目没启动完成)
  • ApplicationRunner:在所有bean都初始化完成后调用,在AfterFinish中执行;

因此ApplicationContextAware初始化会有两个问题:

  1. 未完全启动完成就监听,可能会导致消费队列的相关类未全部加载完成,导致在启动完成前这段时间,消息消费异常;
  2. 代码里是新建线程异步消费,当有系统启动异常时,线程还在启动着,会不断打印log.info(“监听队列线程错误,”, e);

4.1.2 优化一:ApplicationRunner替代ApplicationContextAware

@Slf4j
@Component
public class RedisDelayedQueueInitRunner implements ApplicationRunner {.......
}
@Overridepublic void run(ApplicationArguments args) {String listenerName = String.format("XX", redisDelayedQueueListener.getClass().getSimpleName());startThread(listenerName, redisDelayedQueueListener);}

4.2 上次关闭的时候的消息到期了,不会马上发送

上次关闭的时候的消息到期了,不会马上发送,要等新消息来,才会消费。

原因:因为是在添加消息的时候才初始化管道的:
在这里插入图片描述
解决方法:这个地方吧管道开启就可以了
在这里插入图片描述
这个是在启动的时候去执行 要在invoke 方法里面捕获,防止启动失败了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/147432.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

ACM MM24 | Hi3D: 3D生成领域再突破!新视角生成和高分辨率生成双SOTA(复旦智象等)

文章链接&#xff1a;https://arxiv.org/pdf/2409.07452 Github 链接&#xff1a;https://github.com/yanghb22-fdu/Hi3D-Official 亮点直击 本文提出了高分辨率图像到3D模型&#xff08;Hi3D&#xff09;&#xff0c;这是一种基于视频扩散的新范式&#xff0c;将单个图像重新定…

Codeforces Round 974 (Div. 3) G. Milky Days

题目 题解 #include<bits/stdc.h> using namespace std; #define int long long #define ll long long #define ld long double #define pb push_back #define fi first #define se second #define pii pair<int, int> #define lson p << 1 #define rson p …

Mapper核心配置文件

文章目录 environment 数据库环境typeAlias 起别名 environment 数据库环境 typeAlias 起别名

PMBOK® 第六版 估算活动持续时间

目录 读后感—PMBOK第六版 目录 在项目管理中&#xff0c;尤其是在软件开发这样的复杂项目中&#xff0c;工作内容是多种多样的。从需求分析、设计、编码到测试和部署&#xff0c;每个阶段都有其独特的挑战和不确定性。 没有人能独自完成所有估算工作并做到绝对精准。估算涉及…

股指期货交割方式是什么?

说起股指期货&#xff0c;这可是个高大上的金融玩意儿。咱们平时买卖股票&#xff0c;那是看准了哪只股就下手&#xff0c;赚了就卖&#xff0c;赔了就扛&#xff0c;挺直接的。但股指期货呢&#xff0c;它玩的是未来的预期&#xff0c;就像是你跟人打赌明天天气好不好&#xf…

开源推理库介绍:ZML,Distributed Llama,EXO | LeetTalk Daily

“LeetTalk Daily”&#xff0c;每日科技前沿&#xff0c;由LeetTools AI精心筛选&#xff0c;为您带来最新鲜、最具洞察力的科技新闻。 开源推理库的出现为机器学习模型的部署、监控和扩展提供了强大的支持。我们介绍三个重要的开源推理库&#xff1a;ZML、Distributed Llama …

机器人速度雅可比矩阵求解(2自由度平面关节机器人)

关节速度和末端速度空间的映射需要计算雅可比矩阵的逆矩阵,在博途PLC里如何计算一个方阵的逆矩阵,大家可以参考下面这篇文章: 博途PLC矩阵求逆 矩阵求逆 博图SCL_博图矩阵运算-CSDN博客文章浏览阅读839次。本文介绍如何用C语言实现矩阵求逆的过程,详细解析了相关代码,适…

Spring实战——入门讲解

​ 博客主页: 南来_北往 系列专栏&#xff1a;Spring Boot实战 Spring介绍 Spring实战的入门讲解主要涵盖了Spring框架的基本概念、核心功能以及应用场景。以下是关于Spring实战入门的具体介绍&#xff1a; Spring框架概述&#xff1a;Spring是一个轻量级的Java开发框架…

【有啥问啥】探索累计推理(Cumulative Reasoning, CR)——大型语言模型中的复杂推理新框架

探索累计推理&#xff08;Cumulative Reasoning, CR&#xff09;——大型语言模型中的复杂推理新框架 引言 随着人工智能&#xff08;AI&#xff09;的快速发展&#xff0c;大型语言模型&#xff08;LLMs&#xff09;在自然语言处理上的表现令人瞩目。然而&#xff0c;LLMs在…

【HTTPS】—— HTTPS协议原理详解

目录 &#xff08;一&#xff09;Https是什么 1.1 什么是加密 1.2 为什么要加密 1.3 常见的加密方式 1.4 数据摘要 && 数据指纹 &#xff08;二&#xff09;Https工作过程研究 方案一&#xff1a;只使用对称秘钥 方案二&#xff1a;只使用非对称秘钥 方案三&a…

14年数据结构

第一题 解析&#xff1a; 求时间复杂度就是看程序执行了多少次。 假设最外层执行了k次&#xff0c;我们看终止条件是kn&#xff0c;则&#xff1a; 有, 内层是一个j1到jn的循环&#xff0c;显然执行了n次。 总的时间复杂度是内层外层 答案选C。 第二题 解析&#xff1a; 一步一…

如何用ChatGPT制作一款手机游戏应用

有没有想过自己做一款手机游戏&#xff0c;并生成apk手机应用呢&#xff1f;有了人工智能&#xff0c;这一切就成为可能。今天&#xff0c;我们就使用ChatGPT来创建一个简单的井字棋游戏&#xff08;Tic-Tac-Toe&#xff09;&#xff0c;其实这个过程非常轻松且高效。 通过Cha…

【Linux】常用指令【更详细,带实操】

Linux全套讲解系列&#xff0c;参考视频-B站韩顺平&#xff0c;本文的讲解更为详细 目录 一、文件目录指令 1、cd【change directory】指令 ​ 2、mkdir【make dir..】指令​ 3、cp【copy】指令 ​ 4、rm【remove】指令 5、mv【move】指令 6、cat指令和more指令 7、less和…

【Python】Maya:为人类打造的 Python 日期时间库

不知道少了什么&#xff0c;总感觉没有以前快乐。 在编程中处理日期和时间总是一个挑战&#xff0c;尤其是当涉及到时间和时区的转换时。Maya 是一个由 Kenneth Reitz 开发的 Python 库&#xff0c;旨在简化日期时间的处理&#xff0c;使其对人类开发者更加友好。本文将介绍 M…

【二等奖论文】2024年华为杯研究生数学建模F题成品论文(后续会更新)

您的点赞收藏是我继续更新的最大动力! 一定要点击如下的卡片&#xff0c;那是获取资料的入口&#xff01; 点击链接获取【2024华为杯研赛资料汇总】&#xff1a; https://qm.qq.com/q/alQjz21npu https://qm.qq.com/q/alQjz21npu X射线脉冲星光子到达时间建模 摘要 脉冲星是…

2024年最新前端工程师 TypeScript 基础知识点详细教程(更新中)

1. TypeScript 概述 TypeScript 是由微软开发的、基于 JavaScript 的一种强类型编程语言。它是在 JavaScript 的基础上添加了静态类型检查、面向对象编程等功能的超集&#xff0c;最终会被编译为纯 JavaScript 代码。由于其扩展了 JavaScript 的功能&#xff0c;TypeScript 特…

【Linux 21】线程安全

文章目录 &#x1f308; 一、线程互斥⭐ 1. 线程间互斥的相关概念&#x1f319; 1.1 临界资源和临界区&#x1f319; 1.2 互斥和原子性 ⭐ 2. 互斥量 mutex⭐ 3. 互斥量接口&#x1f319; 3.1 初始化互斥量&#x1f319; 3.2 销毁互斥量&#x1f319; 3.3 互斥量上锁&#x1f3…

Mysql删库跑路,如何恢复数据?

问题 删库跑路&#xff0c;数据还能恢复吗&#xff1f; 我们经常听说某某被领导训斥了&#xff0c;对领导心生痛恨&#xff0c;然后登录 Mysql 删库跑路。对于闲聊中经常听说过的一个段子&#xff0c;在现实生活中是否真的发生过&#xff0c;如果发生了&#xff0c;我们该如何解…

解决RabbitMQ设置x-max-length队列最大长度后不进入死信队列

解决RabbitMQ设置x-max-length队列最大长度后不进入死信队列 问题发现问题解决方法一&#xff1a;只监听死信队列&#xff0c;在死信队列里面处理业务逻辑方法二&#xff1a;修改预取值 问题发现 最近再学习RabbitMQ过程中&#xff0c;看到关于死信队列内容&#xff1a; 来自队…

Docker 容器技术:颠覆传统,重塑软件世界的新势力

一、Docker简介 什么是docker Docker 是一种开源的容器化平台&#xff0c;它可以让开发者将应用程序及其所有的依赖项打包成一个标准化的容器&#xff0c;从而实现快速部署、可移植性和一致性。 从功能角度来看&#xff0c;Docker 主要有以下几个重要特点&#xff1a; 轻量…