开发者

Redisson延时队列RedissonDelayed的具体使用

开发者 https://www.devze.com 2024-08-10 09:09 出处:网络 作者: bacawa
目录一、案例场景二、技术选型三、编码实现1、引入依赖2、创建配置类3、持续监听线程4、编写controller进行测试调用四、原理一、案例场景
目录
  • 一、案例场景
  • 二、技术选型
  • 三、编码实现
    • 1、引入依赖
    • 2、创建配置类
    • 3、持续监听线程
    • 4、编写controller进行测试调用
  • 四、原理

    一、案例场景

    定时调度基本是每个项目都会遇到的业务场景,一般地,都会通过任务调度工具执行定时任务完成,定时任务有两点缺陷:

    • 定时任务执行频度限制,实际执行的时间可能会晚于理想的设定时间,例如,如果要通过定时任务实现在下单后15分钟仍未支付则取消订单的功能,假设定时任务的执行频度为每分钟执行一次,对于有些订单而言,其实际取消时间是介于15-16分钟之间,不够精确;
    • 定时任务执行需要时间,定时任务的执行也需要时间,如果业务场景的数据量较大,执行一次定时任务需要足够长的时间,进一步放大了缺点一。

    二、技术选型

    Redis实现延时队列有两种实现方式:

    • key失效监听回调;

      key失效监听存在两个问题:① Redis的pubsub不会被持久化,服务器宕机就会被丢弃,这点就很致命,因为谁也无法保证redis服务一直不宕机;②没有高级特性,没有ack机制skCsagitvs,可靠性不高。

    • zset分数存时间戳。

      zset的实现是,轮询队列头部来获取超期的时间戳,实现延时效果,可靠性更高,并且数据会被持久化,这就很好的规避了key失效监听回调的问题,如果redis服务崩溃,还是有丢失数据的可能。

    Redisson的RDelayedQueue是一个封装好的zset实现的延时队列,最终选择了这个方案。其实还有一些优秀的方案可供选择,例如rocketmq、pulsar等拥有定时投递功能的消息队列;我这边优先考虑在不引入新的中间键的情况下使用RDelayedQueue技术进行实现。

    注意:在不方便获得专业消息队列时可以考虑使用redissondelayqueue等基于redis的延时队列方案,但要为redis崩溃等情况设计补偿保护机制。

    三、编码实现

    1、引入依赖

                <!--redisson-->
                <dependency>
                    <groupId>org.redisson</groupId>
                    <artifactId>redisson-spring-boot-starter</artifactId>
                    <version>3.20.0</version>
                </dependency>
                <dependency>
                    <groupId>org.redisson</groupId>
                    <artifactId>redisson-spring-data-27</artifactId>
                    <version>3.20.0</version>
                </dependency>
    

    2、创建配置类

    import com.geovis.common.redis.utils.RedisUtils;
    import org.redisson.api.RblockingQueue;
    import org.redisson.api.RDelayedQueue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @date 2023/8/30 15:05
     */
    @Configuration
    public class RedissonQueueConfig {
    
        private final String queueName = "orderQueue";
    
        @Bean
        public RBlockingQueue<String> blockingQueue() {
            return RedisUtils.getClient().getBlockingQueue(queueName);
        }
        @Bean
        public RDelayedQueue<String> delayedQueue(RBlockingQueue<String> blockQueue) {
            return RedisUtils.getClient().getDelayedQueue(blockQueue);
        }
    }
    

    其中RedisUtils.getClientjs()是为了获取RedissonClient 对象,这里我使用Redis工具类直接获取,我把工具类也简单展示出来吧。

    import org.redisson.api.*;
    /**
    *Redis工具类
    */
    public class RedisUtils {
    
        private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class);
    
        /**
         * 获取客户端实例
         */
        public static RedissonClient getClient() {
            return CLIENT;
        }
    }
    

    3、持续监听线程

    import lombok.extern.slf4j.Slf4j;
    import org.redisson.api.RBlockingQueue;
    import org.springframework.stereotype.Component;
    
    import Javax.annotation.PostConstruct;
    import javax.annotation.Resource;
    
    /**
     * @date 2023/8/30 15:09
     */
    @Slf4j
    @Component
    public class OrderTask {
    
        @Resource
        private RBlockingQueue<Object> blockingQueue;
    
        @PostConstruct
        public void take() {
            new Thread(() -> {
                while (true) {
                 http://www.devze.com   try {
        www.devze.com            	log.info(blockingQueue.take().toString());  //将到期的数据取出来,如果一直没有到期数据,就一直等待。
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    
    }
    
    

    4、编写controller进行测试调用

    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.redisson.api.RDelayedQueue;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.validation.annotation.Validated;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * 测试接口类
     * @date 2023/8php/30 16:56
     */
    @Validated
    @RequiredArgsConstructor
    @RestController
    @RequestMapping("/forest")
    @Slf4j
    public class ForestController {
    
    
        @Autowired
        private RDelayedQueue delayedQueue;
        
    
        @GetMapping(value = "/offerAsync")
        public void offerAsync() {
        	//20秒后到期,在监听现成哪里可以打印出  1234567890
            delayedQueue.offerAsync("1234567890", 20, TimeUnit.SECONDS);   
        }
    }
    
    

    到这里基本就完成了Demo编码,具体要根据业务修改对应的代码,本demo亲测没有问题。

    四、原理

    用户传进来的延迟时间必须大于0,小于0抛出异常代码结束。将用户传进来的时间转换为毫秒,并加上系统当前时间,计算出来的就是过期时间。到了过期时间消费者就可以把该任务取出来消费了。

    Redisson延时队列RedissonDelayed的具体使用

    结合上图所示,首先创建了一个Redisson实现的阻塞队列RBlockingQueue的实例blockingQueue,然后又使用该阻塞队列blockingQueue创建了一个延时队列RDelayedQueue的实例delayedQueue。延时消息添加后并不是立即进入到阻塞队列blockingQueue中,而是到达了设定的延时时间之后才会从延时队列delayedQueue进入到阻塞队列blockingQueue;因此,延时消息的添加由延时队列delayedQueue完成,而延时队列的消费则由阻塞队列blockingQueue完成。注意,这里如果直接对延时队列delayedQueue进行监听,则延时消息刚加入时就会被消费,达不到延时的效果。

    相比于Redisson官网文档延时队列中给出的代码示例,这里被包装队列使用阻塞队列RBlockingQueue的好处是blockingQueue.take()会一直阻塞直至队列内有可消费延时消息,避免无意义的循环占用CPU。

    到此这篇关于Redisson延时队列RedissonDelayed的具体使用的文章就介绍到这了,更多相关Redisson延时队列RedissonDelayed内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

    0

    精彩评论

    暂无评论...
    验证码 换一张
    取 消