zoukankan      html  css  js  c++  java
  • Work Queues(点对多)

    Work Queues(点对多)

    多个消费者在同一个消息队列中获取消息的情况。在有些应用当中,消费端接收到消息任务需要长时间的处理,如果等上一个消息处理完成以后再取下一个数据进行处理的话,势必会有一些延迟。在消息队列中的数据也会不断增多,延迟将越来越大。当然对于一个消费进程来说,在某些情况下可以起多个线程来处理,而在这里将介绍另一种处理方式,多个消费进程的情况。而RabbitMQ在这方面进行了很好的处理和封装,使客户程序可以很方便的使用。

    默认的,RabbitMQ会顺序的把消息发送到下一个Consumer,这种发送消息的方式称为循环发送(round-robin)

     1 package com.rabbitmq.www.work_queues;
     2 
     3 
     4 
     5 import com.rabbitmq.client.Channel;
     6 import com.rabbitmq.client.Connection;
     7 import com.rabbitmq.client.ConnectionFactory;
     8 import com.rabbitmq.client.MessageProperties;
     9 
    10 public class NewTask {
    11 
    12   private static final String TASK_QUEUE_NAME = "task_queue";
    13   private final static String HOST_ADDR = "172.18.112.102";
    14 
    15   public static void main(String[] argv) throws Exception {
    16       
    17       
    18     ConnectionFactory factory = new ConnectionFactory();
    19     factory.setHost(HOST_ADDR);
    20     Connection connection = factory.newConnection();
    21     Channel channel = connection.createChannel();
    22     
    23     //durable 设置true,queue持久化,server重启,此queue不丢失
    24     channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    25     for(int i=0;i<=10;i++){
    26         String message = "hello world";
    27         message=message+i;
    28         //BasicProperties设置MessageProperties.PERSISTENT_TEXT_PLAIN,信息持久化
    29         channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
    30         System.out.println(" [x] Sent '" + message);
    31     }
    32 
    33     channel.close();
    34     connection.close();
    35   }
    36 
    37 }
    package com.rabbitmq.www.work_queues;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Worker {
    
      private static final String TASK_QUEUE_NAME = "task_queue";
      private final static String HOST_ADDR = "172.18.112.102";
    
      public static void main(String[] argv) throws Exception {
          
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST_ADDR);
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
      //durable 设置true,queue持久化,server重启,此queue不丢失
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //一次只接受一条信息
        channel.basicQos(1);
    
        final Consumer consumer = new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
    
            System.out.println(" [x] Received '" + message + "'");
            try {
              doWork(message);
            } finally {
              System.out.println(" [x] Done");
              //向服务器发送应答
              channel.basicAck(envelope.getDeliveryTag(), false);
            }
          }
        };
        //autoAck 设置false,消费端挂掉,信息不会丢失,server会re-queue
        channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
      }
    
      private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
          if (ch == '.') {
            try {
              Thread.sleep(1000);
            } catch (InterruptedException _ignored) {
              Thread.currentThread().interrupt();
            }
          }
        }
      }
    }
  • 相关阅读:
    如何解决"应用程序无法启动,因为应用程序的并行配置不正确"问题
    C/C++时间函数使用方法
    vim: C++文件之间快速切换(含视频)
    HOWTO install commonlisp on ubuntu
    TagSoup home page
    Quicklisp beta
    What is Lispbox?
    猫人女王
    Lisp in a box 安装指南 JAAN的专栏 博客频道 CSDN.NET
    Ubuntu 12.04 改造指南 | Ubuntusoft
  • 原文地址:https://www.cnblogs.com/woms/p/7040853.html
Copyright © 2011-2022 走看看