zoukankan      html  css  js  c++  java
  • 【RabbitMQ】 WorkQueues

    消息分发

    【RabbitMQ】 HelloWorld中我们写了发送/接收消息的程序。这次我们将创建一个Work Queue用来在多个消费者之间分配耗时任务。

    Work Queues(又称为:Task Queues)的主要思想是:尽可能的减少执行资源密集型任务时的等待时间。我们将任务封装为消息并发送到队列,在后台的工作进程将弹出任务并进行作业。当你运行很多worker,任务将在他们之间共享。

    这个概念在WEB应用中尤为有效,因为在一个HTTP请求进行复杂操作是不可能的。

    准备

    在上一节我们发送了一条包含“Hello World”的消息。现在我们将要发送代表复杂任务的字符串。我们没有真实场景的复杂任务,例如调整图片大小或呈现PDF文件,让我们假装自己很忙 - 通过Thread.sleep()。我们将根据字符串中“.”的数量来衡量任务复杂度;每一个“.”增加1秒钟的工作时间。例如:一个“Hello...”将消耗3秒钟。

    稍微修改下上一节中Send.java的代码,让我们可以从命令行参数中输入任意字符作为消息。这个程序将给我们的工作队列安排消息,命名为NewTask.java

    String message = getMessage(argv);
    
    channel.basicPublish("", "hello", null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

    一些封装方法来帮助我们从命令行参数中得到消息(简单来说就是将所有的命令行参数当做一条完整消息):

    private static String getMessage(String[] strings){
        if (strings.length < 1)
            return "Hello World!";
        return joinStrings(strings, " ");
    }
    
    private static String joinStrings(String[] strings, String delimiter) {
        int length = strings.length;
        if (length == 0) return "";
        StringBuilder words = new StringBuilder(strings[0]);
        for (int i = 1; i < length; i++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }

    老的Recv.java程序也需要一些修改:他需要为消息中的每一个“.”伪造1秒钟的工作时间。称为Worker.java

    final Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
    
        System.out.println(" [x] Received '" + message + "'");
        try {
          doWork(message);
        } finally {
          System.out.println(" [x] Done");
        }
      }
    };
    boolean autoAck = true; // acknowledgment is covered below  消息确认,在后面会详细讲解
    channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

    模拟任务执行:

    private static void doWork(String task) throws InterruptedException {
        for (char ch: task.toCharArray()) {
            if (ch == '.') Thread.sleep(1000);
        }
    }

    循环调度

    使用Task Queue的优点之一就是可以轻松的进行并行工作。如果我们正在构建一个积压的工作,我们可以仅仅通过添加更多的workers来解决。

    首先,同时运行两个worker实例,他们都会从队列中得到消息,但事实上是什么样的呢?让我们看一看:

    在IDEA中运行两次Worker.java,然后他们两个都会处于等待消息状态。运行NewTask.java,并携带命令行参数,可以在Edit Configurations中设置Program arguements。下面为官方教程中的命令行版本:

    shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
    NewTask First message.
    shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
    NewTask Second message..
    shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
    NewTask Third message...
    shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
    NewTask Fourth message....
    shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
    NewTask Fifth message.....

    主要观察两个worker的输出:

    worker1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
    Worker
     [*] Waiting for messages. To exit press CTRL+C
     [x] Received 'First message.'
     [x] Received 'Third message...'
     [x] Received 'Fifth message.....'
    worker2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
    Worker
     [*] Waiting for messages. To exit press CTRL+C
     [x] Received 'Second message..'
     [x] Received 'Fourth message....'

    默认的,RabbitMQ将会按照顺序,以此发送每一条消息到每一个消费者。平均每个消费者是可以获得相同数量的消息的。这种分发消息的方式称为循环。

    消息确认

     完成一个任务需要消耗一定时间,你可能想知道如果一个消费者开始了一个很长的任务,在仅仅完成了一部分的时候,死掉了,将会发生什么。在我们当前的代码中,一旦RabbitMQ分发一条消息给消费者,立即就会将该条消息从内存中删除。这种情况下,如果你杀掉一个worker,我们将会丢失它正在操作的消息。我们也会失去所有分发给他的还未处理的消息。

    但是我们不想丢失任何消息。如果worker死掉,我们期望这个任务被重新分发给另一个worker。

    为了确保消息从来没有丢失,RabbitMQ支持消息确认(acknowledgments)。一个确认是从消费者处发送以告诉RabbitMQ指定的消息收到了,处理完成了,RabbitMQ可以删除它了。

    如果一个消费者宕机(channel关闭,connection关闭,TCP连接丢失等),没有发送ack,RabbitMQ将会知道这条消息没有处理完成,将会重新排队。如果此时存在其它消费者,将会迅速转发给其它消费者。这样你就可以确保消息不会丢失,即使进程偶尔宕机。

    这里不存在消息超时,RabbitMQ在消费者宕机后会重发消息。即使处理数据用了很长很长的时间这也是没有问题的。

    默认的消息确认是被打开的。上面的例子中我们通过autoAck=true明确关闭了它。下面我们打开它,每当处理完一个任务,就发送回一个适当的确认消息。

    channel.basicQos(1); // accept only one unack-ed message at a time (see below)  每次接收一个未处理消息
    
    final Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
    
        System.out.println(" [x] Received '" + message + "'");
        try {
          doWork(message);
        } finally {
          System.out.println(" [x] Done");
          channel.basicAck(envelope.getDeliveryTag(), false);
        }
      }
    };
    boolean autoAck = false;
    channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

    使用现在的代码,我们可以保证即使在操作消息的时候通过CTRL+C关闭了一个消费者,也不会丢失消息。不久后,所有未处理完成的消息都会被重新发送。

    Forgotten acknowledgment

    忘记设置basicAck是很普通的事情,但是结果却很严重。当客户端退出(这可能听起来像随机分发)消息会被重新发送,但是RabbitMQ会吃掉越来越多的内容,因为它不会释放任何没有被确认的消息。

    调试这种错误的使用rabbitmqctl来打印messages_unacknowledged的部分:

    $ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
    Listing queues ...
    hello    0       0
    ...done.

    消息持久化

    我们学习了如何在消费者宕机的情况下保证数据不丢失。但是在RabbitMQ服务器宕机的情况下,数据依然是会丢失的。

    当RabbitMQ退出或崩溃,它会忘记所有的队列和消息,除非你告诉它不要。两件事情来确保消息未丢失:我们需要标记队列和消息为持久化的。

    首先,我们需要确保RabbitMQ从来不会丢失队列。因此我们需要声明队列为持久化的:

    boolean durable = true;
    channel.queueDeclare("hello", durable, false, false, null);

    这行代码是没有问题的,但是在我们的环境下是错误的。这是因为我们已经定义了一个叫做hello的非持久化队列。RabbitMQ不允许重新定义已经存在的队列(使用不用参数)。这里有一个快速的方法 - 定义一个不同名字的队列,如task_queue:

    boolean durable = true;
    channel.queueDeclare("task_queue", durable, false, false, null);

    这个queueDeclare需要同时更改生产者和消费者的代码。

    现在我们确保了task_queue在RabbitMQ重启的状态下也不会丢失。现在我们需要去标记我们的消息为持久化的 - 通过设置MessageProperties(实现了BasicProperties)的常量值:PERSISTENT_TEXT_PLAIN。

    import com.rabbitmq.client.MessageProperties;
    
    channel.basicPublish("", "task_queue",
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes());

    注意消息持久化:

    标记了消息为持久化也不能完全保证消息不会丢失。尽管告诉了RabbitMQ将消息保存在磁盘中,RabbitMQ刚刚接收数据,还没有保存的时候,这个时间区间是无法持久化的。同事,RabbitMQ没有对每条消息都进行fsync(2) -- 也许仅仅保存在缓存中并没有真正写入硬盘。持久化并不健壮,但是对于处理简单的任务队列已经足够了。如果你需要更加强健的保证可以使用publisher confirms

    公平分发

    你可能注意到有时候分发还是无法解决我们的某些问题。例如在某种情况下,有两个消费者,当所有的奇数消息非常大,偶数消息很小,一个消费者将会持续不断的工作,另一个消费者基本不工作。但是RabbitMQ并不知道这种情况,依然是依次分发。

    这是因为RabbitMQ在消息进入队列是进行分发。并不探查消息的数量。仅仅是发送第n条消息给第n个消费者。

    为了解决这个问题,我们可以使用basicQos方法,设置参数为prefetchCount = 1。这会告诉RabbitMQ每次只给一个消费者一条消息。或者说,不要在消费者正在处理和确认消息的时候发送新的消息给他们。相反,它将分发消息给下一个不忙的消费者。

    int prefetchCount = 1;
    channel.basicQos(prefetchCount);

    注意队列的大小

    如果所有的消费者都处于繁忙状态,队列会填满。可以添加更多的消费者或者其它方案。

    Putting it all together

    NewTask.java

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    
    public class NewTask {
    
      private static final String TASK_QUEUE_NAME = "task_queue";
    
      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    
        String message = getMessage(argv);
    
        channel.basicPublish("", TASK_QUEUE_NAME,
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");
    
        channel.close();
        connection.close();
      }
    
      private static String getMessage(String[] strings) {
        if (strings.length < 1)
          return "Hello World!";
        return joinStrings(strings, " ");
      }
    
      private static String joinStrings(String[] strings, String delimiter) {
        int length = strings.length;
        if (length == 0) return "";
        StringBuilder words = new StringBuilder(strings[0]);
        for (int i = 1; i < length; i++) {
          words.append(delimiter).append(strings[i]);
        }
        return words.toString();
      }
    }

    Worker.java

    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Worker {
    
      private static final String TASK_QUEUE_NAME = "task_queue";
    
      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
    
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
        channel.basicQos(1);
    
        final Consumer consumer = new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
    
            System.out.println(" [x] Received '" + message + "'");
            try {
              doWork(message);
            } finally {
              System.out.println(" [x] Done");
              channel.basicAck(envelope.getDeliveryTag(), false);
            }
          }
        };
        channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
      }
    
      private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
          if (ch == '.') {
            try {
              Thread.sleep(1000);
            } catch (InterruptedException _ignored) {
              Thread.currentThread().interrupt();
            }
          }
        }
      }
    }
  • 相关阅读:
    web.xml文件详解
    SQLSERVER dbo解释
    sqlserver BULK INSERT
    google 基站定位api
    Sqlserver中Select和Set区别
    SQL Server优化50法
    ibatis常用16条SQL
    面向对象 -- 三大特性之继承 补充 抽象类 接口类
    面向对象 -- 三大特性之继承
    面向对象 -- 类的组合
  • 原文地址:https://www.cnblogs.com/shiyu404/p/6251773.html
Copyright © 2011-2022 走看看