zoukankan      html  css  js  c++  java
  • RabbitMQ简单应用の公平分发(fair dipatch)

    公平分发(fair dipatch)和轮询分发其实基本一致,只是每次分发的机制变了,由原来的平均分配到现在每次只处理一条消息

    1.MQ连接工厂类Connection

     1 package com.mmr.rabbitmq.util;
     2 
     3 import java.io.IOException;
     4 
     5 import com.rabbitmq.client.Connection;
     6 import com.rabbitmq.client.ConnectionFactory;
     7 
     8 public class ConnectionUtils {
     9     /**
    10      * @desc 获取Mq 的链接
    11      * @author zp
    12      * @throws IOException 
    13      * @date 2018-7-19
    14      */
    15     public static  Connection getConnection() throws IOException {
    16         // 1.定义一个链接工厂
    17         ConnectionFactory factroy = new ConnectionFactory();
    18         
    19         // 2.设置服务地址
    20         factroy.setHost("127.0.0.1");
    21         
    22         // 3.设置端口号
    23         factroy.setPort(5672);
    24         
    25         // 4.vhost  设置数据库
    26         factroy.setVirtualHost("vhtest");
    27         
    28         // 5.设置用户名
    29         factroy.setUsername("jerry");
    30         
    31         // 6. 设置密码
    32         factroy.setPassword("123456");
    33         
    34         // 7.返回链接
    35         return factroy.newConnection();
    36     }
    37 }
    View Code

    2.消息生产者Send,这里的变化是声明了“每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只处理一个消息” channel.basicQos(intnum);

     1 package com.mmr.rabbitmq.workfair;
     2 
     3 import java.io.IOException;
     4 
     5 import com.mmr.rabbitmq.util.ConnectionUtils;
     6 import com.rabbitmq.client.Channel;
     7 import com.rabbitmq.client.Connection;
     8 
     9 public class Send {
    10     
    11     /*
    12      *         |--C1
    13      * P-------|--C2
    14      *         |--C3
    15      * 
    16      * */
    17     private static final String QUEUE_NAME="test_work_queue";
    18     public static void main(String[] args) throws IOException, InterruptedException{
    19         // 获取链接
    20         Connection connection = ConnectionUtils.getConnection();
    21         
    22         // 获取通道
    23         Channel channel = connection.createChannel();
    24         // 声明队列
    25         channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    26         /*
    27          * 每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只处理一个消息
    28          * 
    29          * 限制发送给同一个消费者只能发送一条
    30          * */
    31         int prefetchCount =1;
    32         channel.basicQos(prefetchCount);
    33         
    34         
    35         for (int i = 0; i < 50; i++) {
    36             String msg = "hello "+i;
    37             channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
    38             System.out.println("send msg 的第"+i+"条");
    39             Thread.sleep(i*20);
    40         }
    41         channel.close();
    42         connection.close();
    43     }
    44 }
    View Code

    3.消息处理者(消费者)Recv1 Recv2,这里的区别在于:

    (1)每次只处理1条消息channel.basicQos(1);

    (2)并且在消息处理完之后会手动返回回执单 channel.basicAck(envelope.getDeliveryTag(), false);

    (3)最后将之前的自动应答true改为false boolean autoAck = false;

     1 package com.mmr.rabbitmq.workfair;
     2 
     3 import java.io.IOException;
     4 
     5 import com.mmr.rabbitmq.util.ConnectionUtils;
     6 import com.rabbitmq.client.Channel;
     7 import com.rabbitmq.client.Connection;
     8 import com.rabbitmq.client.Consumer;
     9 import com.rabbitmq.client.DefaultConsumer;
    10 import com.rabbitmq.client.Envelope;
    11 import com.rabbitmq.client.AMQP.BasicProperties;
    12 
    13 public class Recv1 {
    14     private static final String QUEUE_NAME="test_work_queue";
    15     public static void main(String[] args) throws IOException{
    16         // 获取链接
    17         Connection connection = ConnectionUtils.getConnection();
    18         
    19         //获取频道
    20         
    21         final Channel channel = connection.createChannel();
    22         
    23         // 声明队列
    24         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    25         
    26         // 关闭自动应答
    27         channel.basicQos(1); // 保证每次只被分发一个
    28         
    29         // 定义一个消费者
    30         Consumer consumer = new DefaultConsumer(channel){
    31             // 一旦有消息 就会触发这个方法  消息到达 
    32             @Override
    33             public void handleDelivery(String consumerTag, Envelope envelope,
    34                     BasicProperties properties, byte[] body) throws IOException {
    35                 // TODO Auto-generated method stub
    36                 // 拿消息
    37                 String msg = new String(body,"utf-8");
    38                 
    39                 //搭出来
    40                 System.out.println("[1]Recv msg:"+msg);
    41                 try {
    42                     Thread.sleep(2000);
    43                 } catch (Exception e) {
    44                     // TODO: handle exception
    45                     e.printStackTrace();
    46                 }finally{
    47                     System.out.println("[1] done");
    48                     // 手动回执
    49                     channel.basicAck(envelope.getDeliveryTag(), false);
    50                 }
    51             }
    52         };
    53         // boolean autoAck = true; // 自动应答改为false
    54         boolean autoAck = false;
    55         channel.basicConsume(QUEUE_NAME, autoAck,consumer);
    56         
    57     }
    58 }
    View Code
     1 package com.mmr.rabbitmq.workfair;
     2 
     3 import java.io.IOException;
     4 
     5 import com.mmr.rabbitmq.util.ConnectionUtils;
     6 import com.rabbitmq.client.Channel;
     7 import com.rabbitmq.client.Connection;
     8 import com.rabbitmq.client.Consumer;
     9 import com.rabbitmq.client.DefaultConsumer;
    10 import com.rabbitmq.client.Envelope;
    11 import com.rabbitmq.client.AMQP.BasicProperties;
    12 
    13 public class Recv2 {
    14     private static final String QUEUE_NAME="test_work_queue";
    15     public static void main(String[] args) throws IOException{
    16         // 获取链接
    17         Connection connection = ConnectionUtils.getConnection();
    18         
    19         //获取频道
    20         
    21         final Channel channel = connection.createChannel();
    22         
    23         // 声明队列
    24         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    25         
    26         // 保证每次只接收一条消息
    27         channel.basicQos(1);
    28         
    29         // 定义一个消费者
    30         Consumer consumer = new DefaultConsumer(channel){
    31             // 一旦有消息 就会触发这个方法  消息到达 
    32             @Override
    33             public void handleDelivery(String consumerTag, Envelope envelope,
    34                     BasicProperties properties, byte[] body) throws IOException {
    35                 // TODO Auto-generated method stub
    36                 // 拿消息
    37                 String msg = new String(body,"utf-8");
    38                 
    39                 //搭出来
    40                 System.out.println("[2]Recv msg:"+msg);
    41                 try {
    42                     Thread.sleep(1000);
    43                 } catch (Exception e) {
    44                     // TODO: handle exception
    45                     e.printStackTrace();
    46                 }finally{
    47                     System.out.println("[2] done");
    48                     // 手动回执
    49                     channel.basicAck(envelope.getDeliveryTag(), false);
    50                 }
    51             }
    52         };
    53         // boolean autoAck = true; // 自动应答改为false
    54         boolean autoAck = false;
    55         channel.basicConsume(QUEUE_NAME, autoAck,consumer);
    56         
    57     }
    58 }
    View Code
  • 相关阅读:
    sql语句游标的写法
    oracle的安装与plsql的环境配置
    oracle中创建表时添加注释
    jsp中Java代码中怎么获取jsp页面元素
    sql模糊查询
    jQuery循环给某个ID赋值
    Codeforces Round #671 (Div. 2)
    TYVJ1935 导弹防御塔
    Educational Codeforces Round 95 (Rated for Div. 2)
    Codeforces Round #670 (Div. 2)
  • 原文地址:https://www.cnblogs.com/pengpengzhang/p/9335817.html
Copyright © 2011-2022 走看看