zoukankan      html  css  js  c++  java
  • RabbitMQ

    这次我们试着实现这样一个小程序:


    嗯,就是任务队列(task queue)。
    不是将任务集中在一堆并一直等到所有任务一并完成为止,而是将每一个任务封装为一个消息,并将其发送到队列,后台的workers就从队列中分担工作。
    web应用尤其喜欢这种处理方式,比如面对一个请求时我们有一大堆复杂逻辑需要处理,而我们却不需要立即响应处理结果,那就放到后面慢慢弄。
    (PS:另外也有直接对任务进行持久化,然后用scheduler什么的去定时处理。无论如何,没有银弹。)


    对于复杂的任务,我们可以用Thread.sleep模拟一下。
    比如provider每发一个"hello...",worker读到消息后开始数点,每读到一个"."就睡一会儿。

    provider也简单模拟一下,一次塞个20个消息到队列:

    public static void main(String[] argv) throws java.io.IOException {
     
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
     
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
     
        String message = "Hello...";
     
        for (int i = 0; i < 20; i++) {
            channel.basicPublish("", TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN, message.concat(i+1+"").getBytes());
            System.out.println(" [x] Sent '" + message + (i + 1) + "'  "
                    + (i + 1) + " times");
        }
     
        channel.close();
        connection.close();
    }

    有一个需要注意的地方,就是consumer揽了活后没干完就死掉了。
    我需要其他还活着的consumer替死者完成工作。
    RabbitMQ支持消息应答,如果worder没有做出应答却死掉了,provider则会将消息重新发给其他活着的consumer。
    但这个和timeout无关,只有在worker的connection断掉时才会重新发送。


    如果调用了没有autoAck参数的basicConsume,消息应答默认是启用的,也就是autoAck=false。

    boolean autoAck = false;
    channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

    当autoAck==false时需要我们显示调用channel.basicAck方法将接收的消息ack一下。
    如果接收了消息却不显示调用应答方法,就不能再接收新的消息,这就造成了浪费。
    另外,如果设置了autoAck就不要显示进行应答,否则会来一个com.rabbitmq.client.ShutdownSignalException。

    consumer死了有其他人处理后事,那整个server死掉了怎么办?
    为了让消息不丢失,我们需要将队列和消息标记为durable。

    boolean durable = true;
    channel.queueDeclare("hello", durable, false, false, null);

    好了,这样即使重启RabbitMQ服务也不会丢失队列。

    但这并不保证消息不会丢失,为了保证这一点,我们在provider发布消息时加了essageProperties.PERSISTENT_TEXT_PLAIN:

    channel.basicPublish("", TASK_QUEUE_NAME,
                        MessageProperties.PERSISTENT_TEXT_PLAIN, message.concat(i+1+"").getBytes());

    虽然这种方式并不完美,我们还需要做其他的一些工作,但暂时先到这里。

    最后一个问题是,如何做到给consumer公平分配任务。
    如果没有做这个处理,会出现这样一种情况。
    举个例子:provider发送了20个消息,随即启动的consumer_1把这20个消息全都独占了。
    在consumer_1工作期间又有consumer_2被启动,但此时consumer_2没有任何任务。
    此时provider又发送了20个消息,这时consumer_2会得到10个任务。

    我们可以使用channel.basicQos(int prefetchCount)方法限制预获取的数量,比如prefetchCount==1就是返回应答后可以再获得1个消息。

    好了,consumer代码如下:

    public class Worker {
        private static final String TASK_QUEUE_NAME = "task_queue";
     
        public static void main(String[] argv) throws java.io.IOException,
                java.lang.InterruptedException {
     
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
     
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
     
            channel.basicQos(1);
     
            QueueingConsumer consumer = new QueueingConsumer(channel);
            boolean autoAck = false;
            channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
     
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
     
                System.out.println(" [x] Received '" + message + "'");
                doWork(message);
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
     
        private static void doWork(String task) throws InterruptedException {
            for (char ch : task.toCharArray()) {
                if (ch == '.')
                    Thread.sleep(1000);
            }
        }
     
    }
  • 相关阅读:
    A1023 Have Fun with Numbers (20分)(大整数四则运算)
    A1096 Consecutive Factors (20分)(质数分解)
    A1078 Hashing (25分)(哈希表、平方探测法)
    A1015 Reversible Primes (20分)(素数判断,进制转换)
    A1081 Rational Sum (20分)
    A1088 Rational Arithmetic (20分)
    A1049 Counting Ones (30分)
    A1008 Elevator (20分)
    A1059 Prime Factors (25分)
    A1155 Heap Paths (30分)
  • 原文地址:https://www.cnblogs.com/kavlez/p/4100076.html
Copyright © 2011-2022 走看看