介绍
延迟队列,顾名思义它是一种带有延迟功能的消息队列。 那么,是在什么场景下我才需要这样的队列呢?
很多时候我们会有延时处理一个任务的需求,比如说:
2个小时后给用户发送短信。
15分钟后关闭网络连接。
2分钟后再次尝试回调。
下面我们来分别探讨一下几种实现方案:
1、Java中的DelayQueue
Java中的DelayQueue位于java.util.concurrent包下,本质是由PriorityQueue和BlockingQueue实现的阻塞优先级队列。
2、使用Redis实现
前文我们看到,可以通过优先级队列来实现延迟队列的功能。Redis提供了很多数据结构,其中的zset是一种有序的数据结构;我们可以通过Redis中的zset来实现一个延迟队列。
基本的方法就是使用时间戳作为元素的score存入zset。
redis> ZADD delayqueue <future_timestamp> "messsage" 获取所有已经“就绪”的message,并且删除message。 redis> MULTI redis> ZRANGEBYSCORE delayqueue 0 <current_timestamp> redis> ZREMRANGEBYSCORE delayqueue 0 <current_timestamp> redis> EXEC
但是这个方案也有一些问题:
Redis事务虽然保证了一致性和隔离性,但是并没有提供回滚功能。消息处理失败是不能被恢复的,如果处理某条消息的线程崩溃或机器宕机,这条未被处理不能被自动的再次处理。
也有考虑过将分为TODO和Doing两条队列:
先从TODO队列中取出任务,放入Doing中,再开始处理;如果停留在Doing队列总过久,则重新放入TODO队列。但是由于Redis的事务特性,并不能做到完全可靠;并且检查Doing超时的逻辑也略复杂。那么有没有一个成熟的消息队列可以支持延迟投递消息的功能呢?答案当然是有的,本文的标题就是使用RabbitMQ实现DelayQueue。
3、使用AWS上的SQS的延时队列
import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.services.sns.AmazonSNS; import com.amazonaws.services.sns.AmazonSNSClientBuilder; import com.amazonaws.services.sns.model.PublishResult; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.SendMessageRequest; import com.amazonaws.services.sqs.model.SendMessageResult; import com.amazonaws.util.StringUtils; SendMessageRequest sendMessageRequest = new SendMessageRequest(queueUrl, message); sendMessageRequest.setDelaySeconds(delaySeconds); SendMessageResult result = sqs.sendMessage(sendMessageRequest);
4、使用RabbitMQ实现
这是RabbitMQ众多隐藏的强大特性中的一个,可以轻松的降低代码的复杂度,实现DelayQueue的功能。
我们需要两个队列,一个用来做主队列,真正的投递消息;另一个用来延迟处理消息。
ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setPort(port); connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare("MAIN_QUEUE", true, false, false, null); channel.queueBind("MAIN_QUEUE", "amq.direct", "MAIN_QUEUE"); HashMap<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-dead-letter-exchange", "amq.direct"); arguments.put("x-dead-letter-routing-key", "MAIN_QUEUE"); channel.queueDeclare("DELAY_QUEUE", true, false, false, arguments);
放入延迟消息:
public boolean send(String message, long delay) { AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration(String.valueOf(delay)); //璁剧疆娑堟伅TTL builder.deliveryMode(2); //璁剧疆娑堟伅鎸佷箙鍖� AMQP.BasicProperties properties = builder.build(); //璁剧疆娑堟伅TTL try { logger.info("=====>start send rabbit mq message:{},delay:{}",message,delay); channel.basicPublish(exchangeName,routingKey,properties,message.getBytes()); logger.info("=====>end send rabbit mq message:{},delay:{}",message,delay); return true; } catch (IOException e) { logger.error("<=====rabbitmq producer send message error",e); return false; } }
而关键点,就在于 x-dead-letter-exchange 和 x-dead-letter-routing-key 两个参数上。这两个参数说明了:消息过期后的处理方式 --> 投递到我们指定的MAIN_QUEUE;然后我们只需要在MAIN_QUEUE中等待消息投递即可。
RabbitMQ本身提供了消息持久化和没有收到ACK的重投递功能,这样我们就可以实现一个高可靠的分布式延迟消息队列了。
PS:上面讲述的RabbitMQ定时任务方案有问题,RabbitMQ TTL文档 中写道:
Caveats
While consumers never see expired messages, only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered). When setting a per-queue TTL this is not a problem, since expired messages are always at the head of the queue. When setting per-message TTL however, expired messages can queue up behind non-expired ones until the latter are consumed or expired. Hence resources used by such expired messages will not be freed, and they will be counted in queue statistics (e.g. the number of messages in the queue).
per-queue TTL不会有问题,因为快要过期的消息总是在队列的前边;但是如果使用per-message TTL的话,过期的消息有可能会在未过期的消息后边,直到前边的消息过期或者被消费。因为RabbitMQ保证过期的消息一定不会被消费者消费,但是不能保证消息过期就会从队列中移除。
5、ActiveMQ
ActiveMQ from version 5.4 has an optional persistent scheduler built into the ActiveMQ message broker.
可以支持定时、延迟投递、重复投递和Cron调度。
在配置文件中,启用<broker ... schedulerSupport="true"> 选项后即可使用。
MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage("test msg"); long delay = 30 * 1000; long period = 10 * 1000; int repeat = 9; message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); producer.send(message); MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage("test msg"); message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *"); producer.send(message);
ActiveMQ配置项介绍:
Property type Description
AMQ_SCHEDULED_DELAY false The time in milliseconds that a message will wait before
being scheduled to be delivered by the broker
AMQ_SCHEDULED_DELAY false 消息延迟发送的延迟时间(单位毫秒)
AMQ_SCHEDULED_PERIOD false The time in milliseconds after the start time to wait before
scheduling the message again
AMQ_SCHEDULED_PERIOD false 代理启动后,发送消息之前的等待时间(单位毫秒).
AMQ_SCHEDULED_REPEAT false The number of times to repeat scheduling a message for delivery
AMQ_SCHEDULED_REPEAT false 调度消息发送的重复次数
AMQ_SCHEDULED_CRON String Use a cron entry to set the schedule
AMQ_SCHEDULED_CRON String 使用一个cron实体设置消息发送调度
文章引自:http://zhangyp.net/rabbitmq-delayqueue/