原文:https://cloud.tencent.com/developer/article/1358266
1、什么是延时任务
延时任务,顾名思义,就是延迟一段时间后才执行的任务。延时任务的使用还是很广泛的。关于延时任务的实现方式,我知道的就不少 3 种,今天就讲下如何用 redis 实现延时任务。
2、延时任务的特点
在介绍具体方案之前,我们不妨先想一下要实现一个延时系统,有哪些内容是必须存储下来的(这里的存储不一定是指持久化,也可以是放在内存中,取决于延时任务的重要程度)。
首先要存储的就是任务的描述。假如你要处理的延时任务是延时发布资讯,那么你至少要存储资讯的id吧。另外,如果你有多种任务类型,比如:延时推送消息、延时清洗数据等等,那么你还需要存储任务的类型。以上总总,都归属于任务描述。
除此之外,你还必须存储任务执行的时间点吧,一般来说就是时间戳。此外,我们还需要根据任务的执行时间进行排序,因为延时任务队列里的任务可能会有很多,只有到了时间点的任务才应该被执行,所以必须支持对任务执行时间进行排序。
3、使用 Redis 实现延时任务
以上就是一个延迟任务系统必须具备的要素了。回到 Redis,有什么数据结构可以既存储任务描述,又能存储任务执行时间,还能根据任务执行时间进行排序呢?想来想去,似乎只有 Sorted Set 。我们可以把任务的描述序列化成字符串,放在 Sorted Set 的 value 中,然后把任务的执行时间戳作为 score,利用 Sorted Set 天然的排序特性,执行时刻越早的会排在越前面。
这样一来,我们只要开一个或多个定时线程,每隔一段时间去查一下这个 Sorted Set 中 score 小于或等于当前时间戳的元素(这可以通过 zrangebyscore 命令实现),然后再执行元素对应的任务即可。当然,执行完任务后,还要将元素从 Sorted Set 中删除,避免任务重复执行。
如果是多个线程去轮询这个 Sorted Set,还有考虑并发问题,假如说一个任务到期了,也被多个线程拿到了,这个时候必须保证只有一个线程能执行这个任务,这可以通过 zrem 命令来实现,只有删除成功了,才能执行任务,这样就能保证任务不被多个任务重复执行了。
接下来看代码。首先看下项目结构:
一共 4 个类:Constants 类定义了 Redis key 相关的常量。DelayTaskConsumer 是延时任务的消费者,这个类负责从 Redis 拉取到期的任务,并封装了任务消费的逻辑。DelayTaskProducer 则是延时任务的生产者,主要用于将延时任务放到 Redis 中。RedisClient 则是 Redis 客户端的工具类。
最主要的类就是 DelayTaskConsumer 和 DelayTaskProducer 了。
我们先来看下生产者 DelayTaskProducer:
代码很简单,就是将任务描述(为了方便,这里只存储资讯的 id)和任务执行的时间戳放到 Redis 的 Sorted Set 中。
接下来是延时任务的消费者 DelayTaskConsumer:
public class DelayTaskConsumer {
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
public void start(){
scheduledExecutorService.scheduleWithFixedDelay(new DelayTaskHandler(),1,1, TimeUnit.SECONDS);
}
public static class DelayTaskHandler implements Runnable{
@Override
public void run() {
Jedis client = RedisClient.getClient();
try {
Set<String> ids = client.zrangeByScore(Constants.DELAY_TASK_QUEUE, 0, System.currentTimeMillis(),
0, 1);
if(ids==null||ids.isEmpty()){
return;
}
for(String id:ids){
Long count = client.zrem(Constants.DELAY_TASK_QUEUE, id);
if(count!=null&&count==1){
System.out.println(MessageFormat.format("发布资讯。id - {0} , timeStamp - {1} , " +
"threadName - {2}",id,System.currentTimeMillis(),Thread.currentThread().getName()));
}
}
}finally {
client.close();
}
}
}
}
首先看 start 方法。在这个方法里面我们利用 Java 的ScheduledExecutorService 开了一个调度线程池,这个线程池会每隔 1 秒钟调度 DelayTaskHandler 中的 run 方法。
DelayTaskHandler 类就是具体的调度逻辑了。主要有 2 个步骤,一个是从 Redis Sorted Set 中拉取到期的延时任务,另一个是执行到期的延时任务。拉取到期的延时任务是通过 zrangeByScore 命令实现的,处理多线程并发问题是通过 zrem 命令实现的。代码不复杂,这里就不多做解释了。
接下来测试一下:
我们首先生产了 4 个延时任务,执行时间分别是程序开始运行后的 5 秒、10 秒、15 秒、20 秒,然后启动了 10 个消费者去消费延时任务。运行效果如下:
可以看到,任务确实能够在相应的时间点左右被执行,不过有少许时间误差,这个是因为我们拉取到期任务是通过定时任务拉取而不是实时推送的,而且拉取任务时有一部分网络开销,再者,我们的任务处理逻辑是同步处理的,需要上一次的任务处理完,才能拉取下一批任务,这些因素都会造成延时任务的执行时间产生偏差。
4、总结
以上就是通过 Redis 实现延时任务的思路了。这里提供的只是一个最简单的版本,实际上还有很多地方可以优化。比如,我们可以把任务的处理逻辑再放到单独的线程池中去执行,这样的话任务消费者只需要负责任务的调度就可以了,好处就是可以减少任务执行时间偏差。还有就是,这里为了方便,任务的描述存储的只是任务的 id,如果有多种不同类型的任务,像前面说的发送资讯任务和推送消息任务,那么就要通过额外存储任务的类型来进行区分,或者使用不同的 Sorted Set 来存放延时任务了。
除此之外,上面的例子每次拉取延时任务时,只拉取一个,如果说某一个时刻要处理的任务数非常多,那么会有一部分任务延迟比较严重,这里可以优化成每次拉取不止一个到期的任务,比如说 10 个,然后再逐个进行处理,这样的话可以极大地提升调度效率,因为如果是使用上面的方法,拉取 10 个任务需要 10 次调度,每次间隔 1 秒,总共需要 10 秒才能把 10 个任务拉取完,如果改成一次拉取 10 个,只需要 1 次就能完成了,效率提升还是挺大的。
最后一个需要考虑的地方是,上面的代码并没有对任务执行失败的情况进行处理,也就是说如果某个任务执行失败了,那么连重试的机会都没有。因此,在生产环境使用时,还需要考虑任务处理失败的情况。有一个简单的方法是在任务处理时捕获异常,当在处理过程中出现异常时,就将该任务再放回 Redis Sorted 中,或者由当前线程再重试处理。
那么使用 Redis 实现延时任务有什么优缺点呢?优点就是可以满足吞吐量。缺点则是存在任务丢失的风险(当 Redis 实例挂了的时候)。因此,如果对性能要求比较高,同时又能容忍少数情况下任务的丢失,那么可以使用这种方式来实现。