在上一节中,我们讲了三种方式来实现延时任务,其实,将三种方式结合起来用,对于一些中小型公司已经足够了,但是在中大型互联网公司还是远远不够的。
想必大家对Redis起码有一个初步的概念:基于内存的非关系型数据库。在平时的业务开发中,Redis经常会被用做缓存,来提高网站的性能,减少数据库的访问,所以一想到Redis,脑海中第一个浮现出来的就是缓存。
没办法,对于搬砖业务开发,所使用到的Redis基本也仅仅局限于缓存/代替Session了。但是Redis的应用场景很多很多。下面我就用Redis来实现延时任务。
我在这里假设大家已经对Redis有了一个基本的了解,并且使用过Redis,所以我不再过多的讲述Redis的基本知识了。
开门见山,我们会使用到Redis的ZSet数据结构。
ZSet可以理解为Set的升级版本,Set是无序的,ZSet是有序的,而且会实时排序,所以ZSet也被称为 Sort Set,我是比较倾向于后面的称呼的,因为从字面上的意思就可以知道这个Set拥有一个特点:排序。
ZSet有两个元素:member ,score,ZSet会根据score进行排序,member就是成员的意思。
要想完成这个需求,我们第一个想到的应该就是怎么把数据推送给Redis,第二个想到的是怎么把Redis数据给读取出来。
我所用的是Jedis
通过IDEA的自动提示功能,我们很容易找到zadd这方法,看起来推送数据给Redis应该是这个方法:
让我们试一下吧。
private static JedisPoolConfig jedisPoolConfig = null;
private static JedisPool jedisPool;
private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
static {
jedisPoolConfig = new JedisPoolConfig();
jedisPool = new JedisPool(jedisPoolConfig, "", 0, 1500, "", 0);
}
private static void zadd(String key, String member, double score) {
Jedis jedis = jedisPool.getResource();
jedis.zadd(key, score, member);
jedis.close();
}
private static Set<Tuple> zrangeWithScores(String key, int start, int end) {
Jedis jedis = jedisPool.getResource();
Set<Tuple> set = jedis.zrangeWithScores(key, start, end);
jedis.close();
return set;
}
private static void zrem(String key, String member) {
Jedis jedis = jedisPool.getResource();
jedis.zrem(key, member);
jedis.close();
}
在这里我把ip和端口号,密码,DBNum都给隐藏了 ,大家应该可以看懂。
我并没有用Spring或者是Spring Boot来管理Redis,因为如果加上这些东西,还需要有Spring或者Spring Boot的知识,而且这里我仅仅是想演示下用Redis实现延迟任务的一个思路而已,基于此原因,大家也别纠结 没有分不同的类,没有异常处理 等等。
我们在main方法调用下zadd方法:
zadd("haha","order1",2018005130);
zadd("haha","order2",2017005130);
zadd("haha","order3",2016005130);
zadd("haha","order4",2021115130);
执行完成后,打开Redis管理工具,找到这个key,看一下结果:
score小的排前面,这也符合orderTime小的,越先过期。但是 score参数类型是double,所以我们不能把Date型的orderTime直接放进去了,但是我们可以把Date经过转换,再放进去。
数据已经放进去了,怎么拿出来呢?这个方法比较难找,只能借助于搜索引擎了,最后确定了方法:
不管是参数,还是返回类型,都有点奇怪,难怪找不到。
我们调用下这个方法试一下。 根据参数名称,猜测start是开始,end是结束,我们要读取第一个数据,应该是传0,1,或者是1,2。但是 其实应该是传0,0。。。真让人沮丧。。返回的数据也很奇怪,看不懂啊:
这个先放一下,我们发现 这个数据虽然已经被读取出来了,但是Redis并么有把这个数据删掉。
让我们想想,这个ZSet并不像DelayQueue一样,这个没有延迟的功能。我们把数据推到Redis,下一秒就可以读出来。所以我们需要先把数据读取出来,然后判断这个订单有没有超时,如果没有超时的话,Sleep一段时间,再次判断是否超时。超时了,则修改数据库状态,如果还是没超时,继续Sleep,再判断。所以上面读取方法没有删除数据,是符合我们要求的。
我们需要找到删除的方法,最后确定了方法:
这个方法比较简单,就是一个key,一个可变的member。
我们调用下这个方法:
zrem("haha", "order3");
再看下Redis的管理工具:
order3 被删除了。
接下来的问题就是读取数据方法那个奇怪的返回值怎么使用了,也没有什么好的办法,就是在运行的时候,ALT+F8 调出窗口,各种尝试呗。
下面,直接把最终代码贴出来吧:
private static final int expireTime = 15000;
//其他略,上面有
public static void main(String[] args) {
Thread productThread = new Thread(() -> {
for (int i = 0; i < 15; i++) {
try {
Thread.sleep(1200);
} catch (InterruptedException e) {
e.printStackTrace();
}
produce(i);
}
});
productThread.start();
Thread consumThread = new Thread(() -> {
consum();
});
consumThread.start();
}
private static void produce(int orderId) {
Date date = new Date();
String dateStr = simpleDateFormat.format(date);
System.out.printf("现在时间是%s,订单%s加入队列%n", dateStr, orderId);
zadd("order", String.valueOf(orderId), date.getTime());
}
private static void consum() {
while (true) {
Set<Tuple> set = zrangeWithScores("order", 0, 0);
for (Tuple item : set) {
Date date = new Date();
if (date.getTime() - item.getScore() > expireTime) {
String dateStr = simpleDateFormat.format(date);
System.out.printf("现在时间是%s,订单%s已过期%n", dateStr, item.getElement());
zrem("order", item.getElement());
} else {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
最后让我们执行一下:
没有问题。
在这里有两个细节需要说明下:
-
如果取出来的订单没有超时,会睡300毫秒,然后继续判断。这个数值直接影响着订单过期的延迟情况,如果把数值设置的很大,比如20秒,显而易见,订单真正过期的时间远远不止15秒了。如果把数据设置的很小,那么服务器压力也会增大。
-
取出数据——》删除数据 会有一定的时间差,如果开几个线程同时消费,或者部署几个应用同时消费,会有重复执行的情况,如果仅仅是修改下订单的状态,没什么问题,因为这是一个幂等性操作,不管执行几次,都是同样的结果,但是如何还有其他的非幂等性操作,比如 用户信誉-10,就要用其他的手段来避免 重复执行了,这里就不展开了。
这种实现方式其实也比较简单,因为大家都或多或少的使用过Redis,只要知道这几个方法,很容易就可以实现。
但是相比上一节的三个方法来说,这个方法就高端很多了,支持多线程消费,支持多应用(部署)消费,应用服务器宕机也没事。
好了,到这里,延时任务的第四种方式——使用Redis 就讲完了。