zoukankan      html  css  js  c++  java
  • 几种实现延时任务的方式(三)

    上篇文章介绍了使用Redis来实现延时任务,这是一个比较好的方案,但是这种方式是把Redis作为消息队列去使用,而Redis作为消息队列还是有一些缺点的:

    1. Redis本身没有提供监控、管理界面,需要自己去实现。我们无法方便的知道现在队列的情况,比如是否有积压,消费情况是如何的,生产情况又是如何的。
    2. 消息可能被重复消费,如果是幂等性操作也没什么,但是如果非幂等性操作,就需要其他的解决方案来解决这个问题。
    3. Redis本身没有ACK机制,消息没有那么可靠,当然这个缺点在这个案例中,并不是那么明显,因为我们可以在该执行的都执行成功了,才去删除数据。
      ...

    当然最根本的问题是Redis本身就不是为了队列而生的,它是为了存储而生的,所以它缺少一些队列才有的功能也是“情理之中”的。不过,Redis5引进了Stream,据说 这也是一个功能很强大的队列,但是我还没去看。这里就不说了。

    在本节中,我将用RabbitMQ来实现延时任务。

    关于RabbitMQ的安装,我就不做介绍了,网上都有,而且没有什么难度。

    在使用方面,RabbitMQ比Redis难很多,毕竟使用的比较少,而且不少公司都对MQ进行了封装,使其更好用,但是同时也隐藏了MQ在使用方面的不少细节。

    从基本没有接触过RabbitMQ,到要使用RabbitMQ来完成延时任务,也是一个"跳跃性"的任务。我们应该先了解RabbitMQ一些基础概念,基本使用 等等。仅仅靠一两句话是远远不够的。本文的主题在于“使用RabbitMQ来完成延时任务”。所以在这里我默认大家都有一定的RabbitMQ使用经验了。

    好了,让我们开始吧。

    首先,让我们引进两个名词:

    1. TTL、死信:
      Time To Live,这个名词也说不上是一个新名词,Redis中也有,就是 存活时间,也就是我们经常说的过期时间了,放在MQ里面,特指 消息的存活时间。消息超过了存活时间,就认为这个消息“死”了,称之为“死信”。
    2. Dead Letter Exchange
      死信交换器。创建死信交换器和创建其他交换器没什么区别,只是我们需要告诉队列,死信需要被推送到死信交换器上。

    对于生产者来说,需要创建一个Connection连接,接着在Connection中创建一个Channel,通过Channel申明两个交换器,一个是 用来接收订单数据的交换器,一个是用来接收超时订单数据的交换机,然后申明两个队列,一个是订单数据队列,并且需要告诉这个队列,如果有消息超时了,需要转发到 “用来接收超时订单数据的交换机”,还要申明一个超时订单数据队列。然后把 “用来接收订单数据的交换器”和“订单数据队列”进行绑定,把“用来接收超时订单数据的交换机”和“超时订单数据队列”进行绑定。前置准备工作才算完成,下面就是通过Channel往 “用来接收订单数据的交换器”推数据了。

    为了帮助大家更好的理解,我简单的画了一张图:
    image.png
    希望大家看了文字之后,再对照图片,可以有所理解。

    对于生产者来说,就比较简单了,前置工作就是创建Connection连接,再创建Channel,然后通过Channel,消费 “超时订单数据队列” 就OK了。

    下面我直接放出代码:
    需要在pom中引入依赖:

            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.5.0</version>
            </dependency>
    
    public class Main {
        static ConnectionFactory connectionFactory;
    
        static Connection connection;
    
        static {
            connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("localhost");
            try {
                connection = connectionFactory.newConnection();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
    
        }
    
    
        public static void main(String[] args) throws Exception {
            producer();
    
            Thread thread = new Thread(() -> {
                try {
                    consume();
                } catch (Exception e) {
                    e.printStackTrace();
                }
    
            });
            thread.start();
        }
    
        private static void producer() throws Exception {
    
            Channel channel = connection.createChannel();//创建一个channel,不管是生产数据,还是消费数据,都是通过channel去操作的
    
            channel.exchangeDeclare("orderExchange", "direct", true);//定义一个交换机,路由类型为direct,所有的订单会塞给此交换机
            channel.exchangeDeclare("orderDelayExchange", "direct", true);//定义一个交换机,路由类型为direct,延迟的订单会塞给此交换机
    
            HashMap<String, Object> arguments = new HashMap<String, Object>();
            arguments.put("x-dead-letter-exchange", "orderDelayExchange");//申明死信交换机是名称为orderDelayExchange的交换机
            channel.queueDeclare("order_queue", true, false, false,
                    arguments);//定义一个名称为order_queue的队列,绑定上面定义的参数,这样就告诉rabbit此队列延迟的消息,发送给orderDelayExchange交换机
    
            channel.queueDeclare("order_delay_queue", true, false, false,
                    null);//定义一个名称为order_delay_queue的队列
    
            channel.queueBind("order_queue", "orderExchange",
                    "delay");//order_queue和orderExchange绑定,路由为delay。路由也为delay的消息会通过orderExchange进入到order_queue队列
            channel.queueBind("order_delay_queue", "orderDelayExchange",
                    "delay");//order_delay_queue和orderDelayExchange绑定
    
            AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
            builder.expiration("15000");//设置消息TTL(消息生存时间)
            builder.deliveryMode(2);//设置消息持久化
            AMQP.BasicProperties properties = builder.build();
    
            Thread productThread = new Thread(() -> {
                for (int i = 0; i < 20; i++) {
                    String order = "order" + i;
    
                    try {
                        channel.basicPublish("orderExchange", "delay",
                                properties, order.getBytes());//通过channel,向orderExchange交换机发送路由为delay的消息,这样就可以进入到order_queue队列
                        String str = "现在时间是" + new Date().toString() + "  " + order + "  的消息产生了";
                        System.out.println(str);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                try {
                    channel.close();
                } catch (Exception ex) {
    
                }
            });
            productThread.start();
    
        }
    
        private static void consume() throws Exception {
            Channel channel = connection.createChannel();//创建一个channel,不管是生产数据,还是消费数据,都是通过channel去操作的
            //消费名称为order_delay_queue的队列,且关闭自动应答,需要手动应答
            channel.basicConsume("order_delay_queue", false, new DefaultConsumer(channel) {
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    long deliveryTag = envelope.getDeliveryTag();//消息的标记,应答的时候需要传入这个参数
                    String str = "现在时间是" + new Date().toString() + "  " + new String(body) + "  的消息消费了";
                    System.out.println(str);
                    channel.basicAck(deliveryTag, false);//手动应答,代表这个消息处理完成了
                }
            });
        }
    }
    
    

    下面我们运行一下:
    image.png

    代码注释写的还是比较清晰的,希望大家可以看懂吧。

    这一节,我没有像上两节一样,讲的那么细,因为如果从RabbitMQ的基础讲起,可能需要三四章的内容来做铺垫,这就脱离主题了。如果有机会的话,我会再花一个系列去介绍RabbitMQ。

    好了,实现延时任务系列到这里就结束了,当然我这里只是抛砖引玉,大家肯定还有不少更好的实现方式。

  • 相关阅读:
    Javascript的作用域、作用域链以及闭包
    C#当中的泛型和java中的对比
    MongoDB的主从复制和副本集
    MongoDB启动配置等
    JavaScript学习系列1 基础-变量
    Umbraco项目发布错误 --More than one type want to be a model for content type authorize
    项目中gulp使用发生的错误及解决
    JavaScript学习系列2一JavaScript中的变量作用域
    ASP.NET MVC中的ActionFilter介绍学习
    Razor中的 内容标记块语法
  • 原文地址:https://www.cnblogs.com/CodeBear/p/10056810.html
Copyright © 2011-2022 走看看