本文代码: DelayQueue
延迟队列,想必大家都不陌生,顾名思义,它是一个带有延迟功能的队列。那么到底为什么需要延迟,怎么延迟呢?考虑一下下面的应用场景。
- 订单三十分钟未支付,就自动作废。
- 新用户注册之后的一天三天等时间点发送推广邮件。
- 淘宝京东等的订单完成后 5 天未评价,自动好评。
在这些场景下,比较粗暴的办法就是定时扫表,然后拿到相应的信息,去做业务操作。
或者可以使用延时队列,在触发的时候生产信息及触发时间到队列中,在另外一个进程/线程轮询队列,按照当前时间进行弹出,不断的消费即可实现定时执行任务。
Redis 的有序列表数据类型,可以说是作为延时队列极其优秀的一个载体,因此被很多公司采用。今天就实现一个基本的延时队列,暴露对应的方法出来。
为什么叫基本的延时队列呢,因为本文是侧重于 Redis 的封装的,所以对于消息队列注重的很多特性没有实现,比如消息的 ACK, 以及失败重试等
目录
设计
延迟队列如果设计的足够通用及”豪华版”, 是可以单独作为一个中间件服务的,独立于业务运行,提供对应的接口出来即可。但是本文不实现服务级别的延迟队列,仅在后文简单介绍一下(因为本文是 Redis 系列,而不是延迟队列系列).
本文对 Redis 进行简单封装,提供一个DelayQueue
类出来使用。
作为一个延迟队列,那么它需要有以下的功能:
- 放入任务
- 取出任务(去做)
- 删除任务(不做了)
- 计数功能
对应于 Redis 怎么实现呢?Sorted Set帮你搞定。
我们将序列化后的任务信息作为 member, 任务触发时间作为 score. 放入Sorted Set即可。
之后不断弹出分值最小的元素,就是下一个要执行的任务。
功能 |
命令 |
放入任务 |
ZADD 命令 |
取出任务(去做) |
ZREVRANGEBYSCORE 命令 + ZREM 命令 |
删除任务(不做了) |
ZREM 命令 |
计数功能 |
ZCOUNT 命令 |
Java 实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
| package com.huyan.collection;
import lombok.extern.slf4j.Slf4j; import redis.clients.jedis.*;
import java.util.*;
@Slf4j public class DelayQueue {
private final String key;
private final JedisPool jedisPool;
public DelayQueue(String key, String host) { this.key = key; this.jedisPool = new JedisPool(host); }
public DelayQueue(String key, JedisPool jedisPool) { this.key = key; this.jedisPool = jedisPool; }
public long getDelaySize() { try (Jedis jedis = jedisPool.getResource()) { return jedis.zcount(key, 0, Long.MAX_VALUE); } }
public void putDelay(int expireTs, String member) { try (Jedis jedis = jedisPool.getResource()) { jedis.zadd(key, expireTs, member); } }
public void delDelay(String... members) { try (Jedis jedis = jedisPool.getResource()) { jedis.zrem(key, members); } }
public void putDelay(List<Item> items) { try (Jedis jedis = jedisPool.getResource()) { Pipeline pipeline = jedis.pipelined(); List<Response<Long>> resp = new ArrayList<>(items.size()); for (Item item : items) { resp.add(pipeline.zadd(key, item.expireTs, item.value)); } pipeline.sync(); int err = 0; for (Response<Long> r : resp) { Long reply = r.get(); if (reply == null) { err += 1; } } if (err > 0) { log.warn("put delays err: {}", err); } } }
public Set<Tuple> popNowExpires() { int now = (int) (System.currentTimeMillis() / 1000); return popRangeExpires(now); }
public Set<Tuple> popRangeExpires(int expireTs) { Set<Tuple> values = rangeExpires(expireTs); if (values.size() > 0) { delDelay(values.stream().map(Tuple::getElement).toArray(String[]::new)); } return values; }
public Set<Tuple> rangeExpires(int expireTs) { try (Jedis jedis = jedisPool.getResource()) { return jedis.zrevrangeByScoreWithScores(key, expireTs, 0); } }
public void remove(int start, int end) { try (Jedis jedis = jedisPool.getResource()) { jedis.zremrangeByRank(key, start, end); } }
public static class Item { public final String value; public final int expireTs;
public Item(String value, int expireTs) { this.value = value; this.expireTs = expireTs; }
@Override public String toString() { return value + ":" + expireTs; } } }
|
代码比较简单,这里就不多说了,上面的功能,对应的 API 为:
功能 | 命令 | API
— | —
放入任务 | ZADD 命令 | putDelay
取出任务(去做) | ZREVRANGEBYSCORE 命令 + ZREM 命令 | popNowExpires
删除任务(不做了) | ZREM 命令 | delDelay
计数功能 | ZCOUNT 命令 | getDelaySize
同时,为了方便多个值一起操作,提供了一些批量操作的 API.
Java 代码测试
首先我们要测试可用性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Test public void deleyQueueTest() { int oneHourLater = (int) (System.currentTimeMillis() / 1000 + 3600); queue.putDelay(oneHourLater, "test_1"); Assert.assertEquals(1, queue.getDelaySize()); int twoHourLater = (int) (System.currentTimeMillis() / 1000 + 7200); queue.putDelay(twoHourLater, "test_2"); Assert.assertEquals(2, queue.getDelaySize()); queue.popNowExpires(); Assert.assertEquals(2, queue.getDelaySize()); queue.rangeExpires(oneHourLater + 100); Assert.assertEquals(2, queue.getDelaySize()); queue.delDelay("test_2"); Assert.assertEquals(1, queue.getDelaySize()); queue.popRangeExpires(oneHourLater + 100); Assert.assertEquals(0, queue.getDelaySize()); }
|
可以看到,实现是没有问题的。从上面的测试代码大概可以看出这个消息队列的使用方式了,这里我还是提供一个简单的生产消费代码出来:
生产者:
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Test public void delayQueueProducer() {
int now = (int) (System.currentTimeMillis() / 1000); queue.putDelay(now, "your_message_body");
List<DelayQueue.Item> items = new ArrayList<>(); items.add(new DelayQueue.Item("your_message_body", now)); queue.putDelay(items); }
|
消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Test public void delayQueueConsumer() throws InterruptedException {
int now = (int) (System.currentTimeMillis() / 1000); while (true) { Set<Tuple> tuples = queue.popNowExpires(); if (CollectionUtils.isEmpty(tuples)) { Thread.sleep(1000); continue; } System.out.println("do something"); } }
|
服务化
经常用延时队列的读者可能从上面的代码里发现了一个问题,那就是还是有公用逻辑的,比如在消费者端的这个循环。
1 2 3 4 5 6 7
| while (true) { Set<Tuple> tuples = queue.popNowExpires(); if (CollectionUtils.isEmpty(tuples)) { Thread.sleep(1000); continue; }
|
这个循环其实也可以放在延时队列内部,但是因为我们只是封装了一个类,而不是一个服务,所以提供这种轮询不方便。
想要更加通用化,那么封装一个类就已经没有用了,需要将 延时队列
做成中间件,也就是服务化。
基本原理就是:
启动一个服务,内部负责维护延时队列,负责轮询延时队列,之后将多个业务方的定时任务进行分发,然后由业务方消费到进行逻辑处理。
当然,如果用到延时队列的地方不多,或者说不是提供给多个业务方/业务组来使用,是没有必要搞这么大阵势的.
对于服务化的延时队列,其核心对 Redis 的使用和本文也基本一致,只是会额外添加许多其他功能,比如支持多个业务方,支持任务分发,支持任务 ACK 以及失败重试等。
这些添加的内容,都不是本文的重点,因此本文不做讲解了。仅推荐一些学习内容。
有赞的一篇关于 延时队列服务的文章,讲解的不错,同时网上也有根据这篇文章的思路实现的具体代码,因此在这里作为学习资料推荐给大家。
有赞延迟队列设计
上文的 go 语言实现
上文的 java 语言实现
代码我大概看了一眼,不错而且挺简单明了的。十分不错的入门学习内容。
参考文章
https://tech.youzan.com/queuing_delay/
完。
联系我
最后,欢迎关注我的个人公众号【 呼延十 】,会不定期更新很多后端工程师的学习笔记。
也欢迎直接公众号私信或者邮箱联系我,一定知无不言,言无不尽。
以上皆为个人所思所得,如有错误欢迎评论区指正。
欢迎转载,烦请署名并保留原文链接。
联系邮箱:huyanshi2580@gmail.com
更多学习笔记见个人博客或关注微信公众号 < 呼延十 >——>呼延十