Redis系列(十四)应用之延时队列

本文代码: DelayQueue

延迟队列,想必大家都不陌生,顾名思义,它是一个带有延迟功能的队列。那么到底为什么需要延迟,怎么延迟呢?考虑一下下面的应用场景。

  • 订单三十分钟未支付,就自动作废。
  • 新用户注册之后的一天三天等时间点发送推广邮件。
  • 淘宝京东等的订单完成后 5 天未评价,自动好评。

在这些场景下,比较粗暴的办法就是定时扫表,然后拿到相应的信息,去做业务操作。

或者可以使用延时队列,在触发的时候生产信息及触发时间到队列中,在另外一个进程/线程轮询队列,按照当前时间进行弹出,不断的消费即可实现定时执行任务。

Redis 的有序列表数据类型,可以说是作为延时队列极其优秀的一个载体,因此被很多公司采用。今天就实现一个基本的延时队列,暴露对应的方法出来。

为什么叫基本的延时队列呢,因为本文是侧重于 Redis 的封装的,所以对于消息队列注重的很多特性没有实现,比如消息的 ACK, 以及失败重试等

目录

设计

延迟队列如果设计的足够通用及”豪华版”, 是可以单独作为一个中间件服务的,独立于业务运行,提供对应的接口出来即可。但是本文不实现服务级别的延迟队列,仅在后文简单介绍一下(因为本文是 Redis 系列,而不是延迟队列系列).

本文对 Redis 进行简单封装,提供一个DelayQueue类出来使用。

作为一个延迟队列,那么它需要有以下的功能:

  • 放入任务
  • 取出任务(去做)
  • 删除任务(不做了)
  • 计数功能

对应于 Redis 怎么实现呢?Sorted Set帮你搞定。

我们将序列化后的任务信息作为 member, 任务触发时间作为 score. 放入Sorted Set即可。

之后不断弹出分值最小的元素,就是下一个要执行的任务。

功能 命令
放入任务 ZADD 命令
取出任务(去做) ZREVRANGEBYSCORE 命令 + ZREM 命令
删除任务(不做了) ZREM 命令
计数功能 ZCOUNT 命令

Java 实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package com.huyan.collection;

import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.*;

import java.util.*;

/**
* Author: huyanshi
* Date: 2020/01/22.
* Brief: redis 实现的延迟队列 实现
*/
@Slf4j
public class DelayQueue {

/**
* 延迟队列的 key
*/
private final String key;

/**
* Jedispool
*/
private final JedisPool jedisPool;

/**
* constructor
*
* @param key key
* @param host host
*/
public DelayQueue(String key, String host) {
this.key = key;
this.jedisPool = new JedisPool(host);
}

/**
* constructor
*
* @param key key
* @param jedisPool jedispool
*/
public DelayQueue(String key, JedisPool jedisPool) {
this.key = key;
this.jedisPool = jedisPool;
}

/**
* 获取当前延迟队列中元素的数量
*
* @return 数量
*/
public long getDelaySize() {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.zcount(key, 0, Long.MAX_VALUE);
}
}

/**
* 向延迟队列中添加一个元素
*
* @param expireTs 元素的执行时间
* @param member 元素的信息体。
*/
public void putDelay(int expireTs, String member) {
try (Jedis jedis = jedisPool.getResource()) {
jedis.zadd(key, expireTs, member);
}
}

/**
* 删除元素
*
* @param members 元素名的集合
*/
public void delDelay(String... members) {
try (Jedis jedis = jedisPool.getResource()) {
jedis.zrem(key, members);
}
}

/**
* 批量添加元素
*
* @param items 待添加的所有元素
*/
public void putDelay(List<Item> items) {
try (Jedis jedis = jedisPool.getResource()) {
Pipeline pipeline = jedis.pipelined();
List<Response<Long>> resp = new ArrayList<>(items.size());
for (Item item : items) {
resp.add(pipeline.zadd(key, item.expireTs, item.value));
}
pipeline.sync();
int err = 0;
for (Response<Long> r : resp) {
Long reply = r.get();
if (reply == null) {
err += 1;
}
}
if (err > 0) {
log.warn("put delays err: {}", err);
}
}
}

/**
* 弹出当前要执行的任务
*
* @return 当前要执行的任务
*/
public Set<Tuple> popNowExpires() {
int now = (int) (System.currentTimeMillis() / 1000);
return popRangeExpires(now);
}

/**
* 弹出某个时间前执行的任务
*
* @return 当前要执行的任务
*/
public Set<Tuple> popRangeExpires(int expireTs) {
Set<Tuple> values = rangeExpires(expireTs);
// del
if (values.size() > 0) {
delDelay(values.stream().map(Tuple::getElement).toArray(String[]::new));
}
return values;
}

/**
* 查看某个时间以前的任务
*
* @param expireTs 执行时间
* @return 任务集合
*/
public Set<Tuple> rangeExpires(int expireTs) {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.zrevrangeByScoreWithScores(key, expireTs, 0);
}
}

/**
* 根据过期时间批量移除元素
*
* @param start 开始时间
* @param end 结束时间
*/
public void remove(int start, int end) {
try (Jedis jedis = jedisPool.getResource()) {
jedis.zremrangeByRank(key, start, end);
}
}

/**
* 延迟队列中放的 Item
*/
public static class Item {
public final String value;
public final int expireTs;

public Item(String value, int expireTs) {
this.value = value;
this.expireTs = expireTs;
}

@Override
public String toString() {
return value + ":" + expireTs;
}
}
}

代码比较简单,这里就不多说了,上面的功能,对应的 API 为:

功能 | 命令 | API
— | —
放入任务 | ZADD 命令 | putDelay
取出任务(去做) | ZREVRANGEBYSCORE 命令 + ZREM 命令 | popNowExpires
删除任务(不做了) | ZREM 命令 | delDelay
计数功能 | ZCOUNT 命令 | getDelaySize

同时,为了方便多个值一起操作,提供了一些批量操作的 API.

Java 代码测试

首先我们要测试可用性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
public void deleyQueueTest() {
int oneHourLater = (int) (System.currentTimeMillis() / 1000 + 3600);
queue.putDelay(oneHourLater, "test_1");
Assert.assertEquals(1, queue.getDelaySize());
int twoHourLater = (int) (System.currentTimeMillis() / 1000 + 7200);
queue.putDelay(twoHourLater, "test_2");
Assert.assertEquals(2, queue.getDelaySize());
queue.popNowExpires();
Assert.assertEquals(2, queue.getDelaySize());
queue.rangeExpires(oneHourLater + 100);
Assert.assertEquals(2, queue.getDelaySize());
queue.delDelay("test_2");
Assert.assertEquals(1, queue.getDelaySize());
queue.popRangeExpires(oneHourLater + 100);
Assert.assertEquals(0, queue.getDelaySize());
}

可以看到,实现是没有问题的。从上面的测试代码大概可以看出这个消息队列的使用方式了,这里我还是提供一个简单的生产消费代码出来:

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void delayQueueProducer() {

// 单个生产
int now = (int) (System.currentTimeMillis() / 1000);
queue.putDelay(now, "your_message_body");

// 批量生产
List<DelayQueue.Item> items = new ArrayList<>();
items.add(new DelayQueue.Item("your_message_body", now));
queue.putDelay(items);
}

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
public void delayQueueConsumer() throws InterruptedException {

// 轮询消费当前应该执行的任务,或者调用 popRangeExpires 消费某个时间之前的所有任务
int now = (int) (System.currentTimeMillis() / 1000);
while (true) {
Set<Tuple> tuples = queue.popNowExpires();
// 为空休眠一秒
if (CollectionUtils.isEmpty(tuples)) {
Thread.sleep(1000);
continue;
}
// 处理业务逻辑
System.out.println("do something");
}
}

服务化

经常用延时队列的读者可能从上面的代码里发现了一个问题,那就是还是有公用逻辑的,比如在消费者端的这个循环。

1
2
3
4
5
6
7
while (true) {
Set<Tuple> tuples = queue.popNowExpires();
// 为空休眠一秒
if (CollectionUtils.isEmpty(tuples)) {
Thread.sleep(1000);
continue;
}

这个循环其实也可以放在延时队列内部,但是因为我们只是封装了一个类,而不是一个服务,所以提供这种轮询不方便。

想要更加通用化,那么封装一个类就已经没有用了,需要将 延时队列 做成中间件,也就是服务化。

基本原理就是:

启动一个服务,内部负责维护延时队列,负责轮询延时队列,之后将多个业务方的定时任务进行分发,然后由业务方消费到进行逻辑处理。

当然,如果用到延时队列的地方不多,或者说不是提供给多个业务方/业务组来使用,是没有必要搞这么大阵势的.

对于服务化的延时队列,其核心对 Redis 的使用和本文也基本一致,只是会额外添加许多其他功能,比如支持多个业务方,支持任务分发,支持任务 ACK 以及失败重试等。

这些添加的内容,都不是本文的重点,因此本文不做讲解了。仅推荐一些学习内容。

有赞的一篇关于 延时队列服务的文章,讲解的不错,同时网上也有根据这篇文章的思路实现的具体代码,因此在这里作为学习资料推荐给大家。

有赞延迟队列设计

上文的 go 语言实现

上文的 java 语言实现

代码我大概看了一眼,不错而且挺简单明了的。十分不错的入门学习内容。

参考文章

https://tech.youzan.com/queuing_delay/


完。


联系我

最后,欢迎关注我的个人公众号【 呼延十 】,会不定期更新很多后端工程师的学习笔记。
也欢迎直接公众号私信或者邮箱联系我,一定知无不言,言无不尽。


以上皆为个人所思所得,如有错误欢迎评论区指正。

欢迎转载,烦请署名并保留原文链接。

联系邮箱:huyanshi2580@gmail.com

更多学习笔记见个人博客或关注微信公众号 < 呼延十 >——>呼延十