美文网首页
Redis 延迟任务队列

Redis 延迟任务队列

作者: 非典型程序员 | 来源:发表于2017-08-06 16:11 被阅读5301次

使用 Redis 的列表结构可以实现执行一种任务的FIFO队列,也可以实现通过调用不同回调函数的来执行多重不同的任务队列,乃至可以是实现简单的优先级队列,当然也可以实现延时队列。

延时队列的基本实现有3类:

  • 在任务信息中包含任务的执行时间,工作进程发现任务时间未到,短暂的等待之后,将任务重新推入队列里面。
  • 使用一个任务列表记录所需要的执行的任务,并在每次进行 while循环的时候,扫描检查列表并执行已经到期的任务。
  • 把所所有需要执行的任务都添加到有序集合里面,并将任务执行的时间设置分值。再是有个一个额外的进程来查询有序集合里面是否有可以执行的任务,如果有,将任务从有序集合里面移除,并将任务推进适当的任务队列。

无论是短暂的等待,还是将任务从入队列,都是已经很好资源的事情,多以通常不会采用第一种方法。如果在本地维护一个任务列表,可以能会导致任务丢失,除非对任务进行持久化。其次,通过不断的扫描别表,查找合适的任务,每次都需要循环遍历,也是件浪费资源的事情,所以第二种方法也不可取。最后,采用有序结合保存任务、执行时间作为排序的依据是最简单最直接的做法。采用执行时间排序,不需要每次遍历整个队列,只需要判断队首的元素是否到了可执行时间即可。其次,只需要一个工作进程。再者,可以使用 “分布式锁”机制将任务从有序集合中个移动到任务队列。这样处理,语义简单,逻辑清晰。

Redis 的有序集合天生就适合做这件事。

127.0.0.1:6379> zadd task_set 1 task1
(integer) 1
127.0.0.1:6379> zadd task_set 2 task2
(integer) 1
127.0.0.1:6379> zadd task_set 3 task3
(integer) 1
127.0.0.1:6379> zadd task_set 4 task4
(integer) 1
127.0.0.1:6379> ZRANGE task_set 0 10 WITHSCORES
1) "task1"
2) "1"
3) "task2"
4) "2"
5) "task3"
6) "3"
7) "task4"
8) "4"

Java 模拟代码:

package me.touch.redis;

import java.util.Set;
import java.util.UUID;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Tuple;

/**
 * 延时队列
 * @author Knight-Ran
 *
 */
public class delayQueue {
    private Jedis jedis;
    private JedisPool pool;
    private static final String QUEUE_NAME = "deplay_queue";
    
    @Before
    public void setUp() {
        pool = new JedisPool(new JedisPoolConfig(), "localhost");
        jedis = pool.getResource();
    }

    @After
    public void after() {
        jedis.close();
        pool.destroy();
    }
    
    
    // 模拟任务处理队列
    public static void addToTaskQue(String taskInfo){
        System.out.println(taskInfo+"已经从延时队列中转至队列"+ "当前时间:"+ System.currentTimeMillis() );
        System.out.println();
    }
        
    public void addToDeplayQueue(Task task){
        System.out.println(task.toString()+ "已经加入延时队列");
        jedis.zadd(QUEUE_NAME, task.getTime(), task.toString());
    }
    
    public void transferFromDelayQueue() throws InterruptedException{
        while(true){
            Set<Tuple> item = jedis.zrangeWithScores(QUEUE_NAME, 0, 0);
            if(item != null && !item.isEmpty()){
                Tuple tuple = item.iterator().next();
                if(System.currentTimeMillis() >= tuple.getScore()){
                    // TODO 获取锁
                    jedis.zrem(QUEUE_NAME, tuple.getElement()); // 从延时队列中移除
                    addToTaskQue(tuple.getElement()); //任务推入延时队列,因为这里只是延时
                    // TODO 释放锁
                }
            }
            
            Thread.sleep(100);
            
        }
    }
    
    @Test
    public void test() throws InterruptedException{
         long now = System.currentTimeMillis();
         Task task = new Task(UUID.randomUUID().toString(), now+10*1000, 10*1000+"后执行");
         addToDeplayQueue(task);
         task = new Task(UUID.randomUUID().toString(), now+20*1000, 20*1000+"后执行");
         addToDeplayQueue(task);
         task = new Task(UUID.randomUUID().toString(), now+30*1000, 30*1000+"后执行");
         addToDeplayQueue(task);
         task = new Task(UUID.randomUUID().toString(), now+40*1000, 40*1000+"后执行");
         transferFromDelayQueue();
         
    }
    
    static class Task{
        // 任务id
        private String id ;
        // 任务执行时间
        private long time;
        // 描述
        private String desc;
        
        public Task(String id, long time, String desc){
            this.id = id ;
            this.time = time;
            this.desc = desc;
        }
        
        public String getId() {
            return id;
        }
        public long getTime() {
            return time;
        }
        public String getDesc() {
            return desc;
        }

        @Override
        public String toString() {
            return "Task [id=" + id + ", time=" + time + ", desc=" + desc + "]";
        }
    }
}

测试结果:

Task [id=441a900e-a4a5-44cc-bddc-117bb3f00130, time=1502006961460, desc=10000后执行]已经加入延时队列
Task [id=9982a932-3c29-4e3c-a940-5c3beb5b55c2, time=1502006971460, desc=20000后执行]已经加入延时队列
Task [id=adfdfdff-b8b0-440d-b85e-06c3432b0094, time=1502006981460, desc=30000后执行]已经加入延时队列
Task [id=441a900e-a4a5-44cc-bddc-117bb3f00130, time=1502006961460, desc=10000后执行]已经从延时队列中转至队列当前时间:1502006961481
Task [id=9982a932-3c29-4e3c-a940-5c3beb5b55c2, time=1502006971460, desc=20000后执行]已经从延时队列中转至队列当前时间:1502006971518
Task [id=adfdfdff-b8b0-440d-b85e-06c3432b0094, time=1502006981460, desc=30000后执行]已经从延时队列中转至队列当前时间:1502006981538

相关文章

  • Laravel源码分析 - Redis延迟队列(一)

    延迟队列,首先它是一个队列。然后其任务可以延迟被执行。实现队列,以及延迟队列的方式有很多种,基于Redis的方式也...

  • Redis 延迟任务队列

    使用 Redis 的列表结构可以实现执行一种任务的FIFO队列,也可以实现通过调用不同回调函数的来执行多重不同的任...

  • Redis 延迟任务队列

    背景 在业务发展过程中,会出现一些需要延时处理的场景,比如: a.订单下单之后超过30分钟用户未支付,需要取消订单...

  • 通过redis的有序集合[zset] 实现延迟队列

    php使用redis的有序集合zset实现延迟队列 我们通过redis的有序集合zset来实现简单的延迟队列,将消...

  • Delayer 基于 Redis 的延迟消息队列中间件

    Delayer 基于 Redis 的延迟消息队列中间件,采用 Golang 开发。 参考 有赞延迟队列设计 中的部...

  • Redis延迟队列

    不同的微服务之间做异步通迅时通常会使用Kafka,它非常适用于对消费次序或时间没有强一致性需要的场景。如果消息需要...

  • Redis入门(5) - 消息通知

    使用列表实现任务队列 优先级队列 按照规则订阅 Redis也可以作为任务队列。任务队列顾名思义,就是“传递任务的队...

  • horizon - 队列监控

    Horizon 为 Laravel Redis 队列提供一个仪表板,用于查看和管理 Redis 队列任务执行的情况...

  • GCD使用基础 三

    dispatch_after dispatch_after延迟将任务提交到队列中,注意不是事先将任务添加到队列中然...

  • Redis实现延迟消息队列

    消息队列是应用中常用的一个技术点,通常我们可以借助消息队列中间件来实现,但是并不是所有的情况下,都需要使用到MQ。...

网友评论

      本文标题:Redis 延迟任务队列

      本文链接:https://www.haomeiwen.com/subject/vcskottx.html