Redis系列(十四)应用之延时队列

Posted1 by 呼延十 on April 16, 2020 Hot:

本文代码: DelayQueue

延迟队列,想必大家都不陌生,顾名思义,它是一个带有延迟功能的队列。那么到底为什么需要延迟,怎么延迟呢?考虑一下下面的应用场景。

  • 订单三十分钟未支付,就自动作废。
  • 新用户注册之后的一天三天等时间点发送推广邮件。
  • 淘宝京东等的订单完成后 5 天未评价,自动好评。

在这些场景下,比较粗暴的办法就是定时扫表,然后拿到相应的信息,去做业务操作。

或者可以使用延时队列,在触发的时候生产信息及触发时间到队列中,在另外一个进程/线程轮询队列,按照当前时间进行弹出,不断的消费即可实现定时执行任务。

Redis 的有序列表数据类型,可以说是作为延时队列极其优秀的一个载体,因此被很多公司采用。今天就实现一个基本的延时队列,暴露对应的方法出来。

为什么叫基本的延时队列呢,因为本文是侧重于 Redis 的封装的,所以对于消息队列注重的很多特性没有实现,比如消息的 ACK, 以及失败重试等

目录

设计

延迟队列如果设计的足够通用及”豪华版”, 是可以单独作为一个中间件服务的,独立于业务运行,提供对应的接口出来即可。但是本文不实现服务级别的延迟队列,仅在后文简单介绍一下(因为本文是 Redis 系列,而不是延迟队列系列).

本文对 Redis 进行简单封装,提供一个DelayQueue类出来使用。

作为一个延迟队列,那么它需要有以下的功能:

  • 放入任务
  • 取出任务(去做)
  • 删除任务(不做了)
  • 计数功能

对应于 Redis 怎么实现呢?Sorted Set帮你搞定。

我们将序列化后的任务信息作为 member, 任务触发时间作为 score. 放入Sorted Set即可。

之后不断弹出分值最小的元素,就是下一个要执行的任务。

功能 命令
放入任务 ZADD 命令
取出任务(去做) ZREVRANGEBYSCORE 命令 + ZREM 命令
删除任务(不做了) ZREM 命令
计数功能 ZCOUNT 命令

Java 实现代码

package com.huyan.collection;

import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.*;

import java.util.*;

/**
 * Author: huyanshi
 * Date:   2020/01/22.
 * Brief:  redis 实现的延迟队列 实现
 */
@Slf4j
public class DelayQueue {

    /**
     * 延迟队列的 key
     */
    private final String key;

    /**
     * Jedispool
     */
    private final JedisPool jedisPool;

    /**
     * constructor
     *
     * @param key  key
     * @param host host
     */
    public DelayQueue(String key, String host) {
        this.key = key;
        this.jedisPool = new JedisPool(host);
    }

    /**
     * constructor
     *
     * @param key       key
     * @param jedisPool jedispool
     */
    public DelayQueue(String key, JedisPool jedisPool) {
        this.key = key;
        this.jedisPool = jedisPool;
    }

    /**
     * 获取当前延迟队列中元素的数量
     *
     * @return 数量
     */
    public long getDelaySize() {
        try (Jedis jedis = jedisPool.getResource()) {
            return jedis.zcount(key, 0, Long.MAX_VALUE);
        }
    }

    /**
     * 向延迟队列中添加一个元素
     *
     * @param expireTs 元素的执行时间
     * @param member   元素的信息体。
     */
    public void putDelay(int expireTs, String member) {
        try (Jedis jedis = jedisPool.getResource()) {
            jedis.zadd(key, expireTs, member);
        }
    }

    /**
     * 删除元素
     *
     * @param members 元素名的集合
     */
    public void delDelay(String... members) {
        try (Jedis jedis = jedisPool.getResource()) {
            jedis.zrem(key, members);
        }
    }

    /**
     * 批量添加元素
     *
     * @param items 待添加的所有元素
     */
    public void putDelay(List<Item> items) {
        try (Jedis jedis = jedisPool.getResource()) {
            Pipeline pipeline = jedis.pipelined();
            List<Response<Long>> resp = new ArrayList<>(items.size());
            for (Item item : items) {
                resp.add(pipeline.zadd(key, item.expireTs, item.value));
            }
            pipeline.sync();
            int err = 0;
            for (Response<Long> r : resp) {
                Long reply = r.get();
                if (reply == null) {
                    err += 1;
                }
            }
            if (err > 0) {
                log.warn("put delays err: {}", err);
            }
        }
    }

    /**
     * 弹出当前要执行的任务
     *
     * @return 当前要执行的任务
     */
    public Set<Tuple> popNowExpires() {
        int now = (int) (System.currentTimeMillis() / 1000);
        return popRangeExpires(now);
    }

    /**
     * 弹出某个时间前执行的任务
     *
     * @return 当前要执行的任务
     */
    public Set<Tuple> popRangeExpires(int expireTs) {
        Set<Tuple> values = rangeExpires(expireTs);
        // del
        if (values.size() > 0) {
            delDelay(values.stream().map(Tuple::getElement).toArray(String[]::new));
        }
        return values;
    }

    /**
     * 查看某个时间以前的任务
     *
     * @param expireTs 执行时间
     * @return 任务集合
     */
    public Set<Tuple> rangeExpires(int expireTs) {
        try (Jedis jedis = jedisPool.getResource()) {
            return jedis.zrevrangeByScoreWithScores(key, expireTs, 0);
        }
    }

    /**
     * 根据过期时间批量移除元素
     *
     * @param start 开始时间
     * @param end   结束时间
     */
    public void remove(int start, int end) {
        try (Jedis jedis = jedisPool.getResource()) {
            jedis.zremrangeByRank(key, start, end);
        }
    }

    /**
     * 延迟队列中放的 Item
     */
    public static class Item {
        public final String value;
        public final int expireTs;

        public Item(String value, int expireTs) {
            this.value = value;
            this.expireTs = expireTs;
        }

        @Override
        public String toString() {
            return value + ":" + expireTs;
        }
    }
}

代码比较简单,这里就不多说了,上面的功能,对应的 API 为:

功能 命令 API
放入任务 ZADD 命令 putDelay
取出任务(去做) ZREVRANGEBYSCORE 命令 + ZREM 命令 popNowExpires
删除任务(不做了) ZREM 命令 delDelay
计数功能 ZCOUNT 命令 getDelaySize

同时,为了方便多个值一起操作,提供了一些批量操作的 API.

Java 代码测试

首先我们要测试可用性。

    @Test
    public void deleyQueueTest() {
        int oneHourLater = (int) (System.currentTimeMillis() / 1000 + 3600);
        queue.putDelay(oneHourLater, "test_1");
        Assert.assertEquals(1, queue.getDelaySize());
        int twoHourLater = (int) (System.currentTimeMillis() / 1000 + 7200);
        queue.putDelay(twoHourLater, "test_2");
        Assert.assertEquals(2, queue.getDelaySize());
        queue.popNowExpires();
        Assert.assertEquals(2, queue.getDelaySize());
        queue.rangeExpires(oneHourLater + 100);
        Assert.assertEquals(2, queue.getDelaySize());
        queue.delDelay("test_2");
        Assert.assertEquals(1, queue.getDelaySize());
        queue.popRangeExpires(oneHourLater + 100);
        Assert.assertEquals(0, queue.getDelaySize());
    }

可以看到,实现是没有问题的。从上面的测试代码大概可以看出这个消息队列的使用方式了,这里我还是提供一个简单的生产消费代码出来:

生产者:

    @Test
    public void delayQueueProducer() {

        // 单个生产
        int now = (int) (System.currentTimeMillis() / 1000);
        queue.putDelay(now, "your_message_body");

        // 批量生产
        List<DelayQueue.Item> items = new ArrayList<>();
        items.add(new DelayQueue.Item("your_message_body", now));
        queue.putDelay(items);
    }

消费者:

    @Test
    public void delayQueueConsumer() throws InterruptedException {

        // 轮询消费当前应该执行的任务,或者调用 popRangeExpires 消费某个时间之前的所有任务
        int now = (int) (System.currentTimeMillis() / 1000);
        while (true) {
            Set<Tuple> tuples = queue.popNowExpires();
            // 为空休眠一秒
            if (CollectionUtils.isEmpty(tuples)) {
                Thread.sleep(1000);
                continue;
            }
            // 处理业务逻辑
            System.out.println("do something");
        }
    }

服务化

经常用延时队列的读者可能从上面的代码里发现了一个问题,那就是还是有公用逻辑的,比如在消费者端的这个循环。

        while (true) {
            Set<Tuple> tuples = queue.popNowExpires();
            // 为空休眠一秒
            if (CollectionUtils.isEmpty(tuples)) {
                Thread.sleep(1000);
                continue;
            }

这个循环其实也可以放在延时队列内部,但是因为我们只是封装了一个类,而不是一个服务,所以提供这种轮询不方便。

想要更加通用化,那么封装一个类就已经没有用了,需要将 延时队列 做成中间件,也就是服务化。

基本原理就是:

启动一个服务,内部负责维护延时队列,负责轮询延时队列,之后将多个业务方的定时任务进行分发,然后由业务方消费到进行逻辑处理。

当然,如果用到延时队列的地方不多,或者说不是提供给多个业务方/业务组来使用,是没有必要搞这么大阵势的.

对于服务化的延时队列,其核心对 Redis 的使用和本文也基本一致,只是会额外添加许多其他功能,比如支持多个业务方,支持任务分发,支持任务 ACK 以及失败重试等。

这些添加的内容,都不是本文的重点,因此本文不做讲解了。仅推荐一些学习内容。

有赞的一篇关于 延时队列服务的文章,讲解的不错,同时网上也有根据这篇文章的思路实现的具体代码,因此在这里作为学习资料推荐给大家。

有赞延迟队列设计

上文的 go 语言实现

上文的 java 语言实现

代码我大概看了一眼,不错而且挺简单明了的。十分不错的入门学习内容。

参考文章

https://tech.youzan.com/queuing_delay/


完。


联系我

最后,欢迎关注我的个人公众号【 呼延十 】,会不定期更新很多后端工程师的学习笔记。 也欢迎直接公众号私信或者邮箱联系我,一定知无不言,言无不尽。


以上皆为个人所思所得,如有错误欢迎评论区指正。

欢迎转载,烦请署名并保留原文链接。

联系邮箱:huyanshi2580@gmail.com

更多学习笔记见个人博客或关注微信公众号 < 呼延十 >——>呼延十