zoukankan      html  css  js  c++  java
  • RabbitMQ之Work Wueues(四)

    Producer

     1 package workqueues;
     2 
     3 import com.rabbitmq.client.Channel;
     4 import com.rabbitmq.client.Connection;
     5 import com.rabbitmq.client.ConnectionFactory;
     6 
     7 public class Send1 {
     8 
     9     private static final String QUEUE_NAME = "workqueue";
    10     public static void main(String[] args) {
    11         foo();
    12     }
    13 
    14     private static void foo() {
    15         try{
    16             ConnectionFactory factory = new ConnectionFactory();
    17             factory.setHost("localhost");
    18             Connection connection = factory.newConnection();
    19             Channel channel = connection.createChannel();
    20             boolean durable = true;  //指定消息是否需要持久化,避免rebbitMq挂掉之后消息丢失
    21             channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
    22             String dots = "";
    23             for(int i=0; i < 10; i++) {
    24                 dots += ".";
    25                 String message = "helloworld" + dots + dots.length();
    26                 channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    27                 System.out.println("Send:"+message);
    28             }
    29             channel.close();
    30             connection.close();
    31         } catch(Exception e) {
    32             e.printStackTrace();
    33         }
    34     }
    35 }

    Consumer1

     1 package workqueues;
     2 
     3 import java.io.IOException;
     4 
     5 import com.rabbitmq.client.Channel;
     6 import com.rabbitmq.client.Connection;
     7 import com.rabbitmq.client.ConnectionFactory;
     8 import com.rabbitmq.client.DefaultConsumer;
     9 import com.rabbitmq.client.Envelope;
    10 import com.rabbitmq.client.AMQP.BasicProperties;
    11 
    12 public class Recv2 {
    13     private static final String QUEUE_NAME = "workqueue";
    14     public static void main(String[] args) {
    15         foo();
    16     }
    17     
    18     private static void foo() {
    19         try{
    20             int hashCode = Recv2.class.hashCode();
    21             ConnectionFactory factory = new ConnectionFactory();
    22             factory.setHost("localhost");
    23             Connection connection = factory.newConnection();
    24             Channel channel = connection.createChannel();
    25             channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    26             //同时处理的待ack的请求的数量
    27             int prefetchCount = 3;
    28             channel.basicQos(prefetchCount);
    29             //关闭autoAck,表示请求处理完需要Consumer显示的发送应答,否则MQ认为消息未得到处理,后续会发给其他人处理
    30             boolean autoAck = false;
    31             DefaultConsumer consumer = new DefaultConsumer(channel){
    32                 @Override
    33                 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
    34                         byte[] body) throws IOException {
    35                     String message = new String(body,"UTF-8");
    36                     doWork(message);
    37                     //设定每处理完一条消息都要发送ack,只有发送完ack的,rabbitMq才会认为得到正确处理,否则会发送给其它人
    38                     channel.basicAck(envelope.getDeliveryTag(), autoAck);
    39                 }
    40             };
    41             //设定需要显示发送ack
    42             channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    43         } catch(Exception e) {
    44             e.printStackTrace();
    45         }
    46     }
    47     private static void doWork(String task) {
    48         System.out.println("Received task:"+task);
    49         for(char ch : task.toCharArray()) {
    50             if(ch == '.') {
    51                 try {
    52                     Thread.sleep(500);
    53                 } catch (InterruptedException e) {
    54                     e.printStackTrace();
    55                 }
    56             }
    57         }
    58         System.out.println("task done:"+task);
    59     }
    60 }

    其中几个主要参数:

      durable:表示是否需要持久化消息,其主要是为了避免rabbitMQ崩溃的时候消息丢失,所以设置为持久化,mq会把未处理的消息写到磁盘。

      autoack:指定mq client是否自动应答ack(消息处理确认),实际上我们应该将autoack设置成false,等Consumer处理完消息后显示的调用函数向mq 发送ack,表示这条消息已经得到正确处理。

            如果Consumer正在处理一条消息的时候挂了,那么mq没有收到应答,则会将这条消息发给这个queue上的其它Consumer来进行处理。

      prefetchCount:Consumer同时处理的待应答的消息的数量,如果设置成1则相当于负载均衡,每个人同时只会处理一条消息。处理完之后才会收到下一条消息。

              假如设置成3,那么每个Consumer同一时间最多可以处理3条消息。上面测试代码设置为3.

    测试结果

    Producer

    Send:helloworld.1
    Send:helloworld..2
    Send:helloworld...3
    Send:helloworld....4
    Send:helloworld.....5
    Send:helloworld......6
    Send:helloworld.......7
    Send:helloworld........8
    Send:helloworld.........9
    Send:helloworld..........10

    C1

      Received task:helloworld.1

    task done:helloworld.1
    Received task:helloworld..2
    task done:helloworld..2
    Received task:helloworld...3
    task done:helloworld...3
    Received task:helloworld.......7
    task done:helloworld.......7
    Received task:helloworld........8
    task done:helloworld........8
    Received task:helloworld..........10
    task done:helloworld..........10

    C2

    Received task:helloworld....4
    task done:helloworld....4
    Received task:helloworld.....5
    task done:helloworld.....5
    Received task:helloworld......6
    task done:helloworld......6
    Received task:helloworld.........9
    task done:helloworld.........9

      备注:先运行Producer代码,然后运行Consumer代码(运行两次。即两个实例,分别命名为C1,C2)

    结果分析

      Producer生产任务1,2,3,4,5,6,7,8,9,10

      C1处理任务1,2,3,7,8,10

    ·   C2处理任务4,5,6,9

      基本符合同一时间最多处理三条消息的设置,假设在C1处理10的时候将C1进程强杀掉,那么mq会把10发送给C2进行处理

  • 相关阅读:
    SQL Azure (17) SQL Azure V12
    Microsoft Azure News(5) Azure新DV2系列虚拟机上线
    Azure Redis Cache (3) 在Windows 环境下使用Redis Benchmark
    Azure PowerShell (11) 使用自定义虚拟机镜像模板,创建Azure虚拟机并绑定公网IP(VIP)和内网IP(DIP)
    Windows Azure Virtual Machine (31) 迁移Azure虚拟机
    Windows Azure Web Site (16) Azure Web Site HTTPS
    Azure China (12) 域名备案问题
    一分钟快速入门openstack
    管理员必备的Linux系统监控工具
    Keepalived+Nginx实现高可用和双主节点负载均衡
  • 原文地址:https://www.cnblogs.com/gc65/p/8992916.html
Copyright © 2011-2022 走看看