zoukankan      html  css  js  c++  java
  • RabbitMQ简单应用の轮训分发

    MQ连接工厂还是之前的那个Connection

     1 package com.mmr.rabbitmq.util;
     2 
     3 import java.io.IOException;
     4 
     5 import com.rabbitmq.client.Connection;
     6 import com.rabbitmq.client.ConnectionFactory;
     7 
     8 public class ConnectionUtils {
     9     /**
    10      * @desc 获取Mq 的链接
    11      * @author zp
    12      * @throws IOException 
    13      * @date 2018-7-19
    14      */
    15     public static  Connection getConnection() throws IOException {
    16         // 1.定义一个链接工厂
    17         ConnectionFactory factroy = new ConnectionFactory();
    18         
    19         // 2.设置服务地址
    20         factroy.setHost("127.0.0.1");
    21         
    22         // 3.设置端口号
    23         factroy.setPort(5672);
    24         
    25         // 4.vhost  设置数据库
    26         factroy.setVirtualHost("vhtest");
    27         
    28         // 5.设置用户名
    29         factroy.setUsername("jerry");
    30         
    31         // 6. 设置密码
    32         factroy.setPassword("123456");
    33         
    34         // 7.返回链接
    35         return factroy.newConnection();
    36     }
    37 }
    View Code

    消息生产者类的定义Send

     1 package com.mmr.rabbitmq.work;
     2 
     3 import java.io.IOException;
     4 
     5 import com.mmr.rabbitmq.util.ConnectionUtils;
     6 import com.rabbitmq.client.Channel;
     7 import com.rabbitmq.client.Connection;
     8 
     9 public class Send {
    10     
    11     /*
    12      *         |--C1
    13      * P-------|--C2
    14      *         |--C3
    15      * 
    16      * */
    17     private static final String QUEUE_NAME="test_work_queue";
    18     public static void main(String[] args) throws IOException, InterruptedException{
    19         // 获取链接
    20         Connection connection = ConnectionUtils.getConnection();
    21         
    22         // 获取通道
    23         Channel channel = connection.createChannel();
    24         // 声明队列
    25         channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    26         
    27         for (int i = 0; i < 50; i++) {
    28             String msg = "hello "+i;
    29             channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
    30             System.out.println("send msg 的第"+i+"条");
    31             Thread.sleep(i*20);
    32         }
    33         channel.close();
    34         connection.close();
    35     }
    36 }
    View Code

    消息消费者累的定义 Recv1  Recv2

     1 package com.mmr.rabbitmq.work;
     2 
     3 import java.io.IOException;
     4 
     5 import com.mmr.rabbitmq.util.ConnectionUtils;
     6 import com.rabbitmq.client.Channel;
     7 import com.rabbitmq.client.Connection;
     8 import com.rabbitmq.client.Consumer;
     9 import com.rabbitmq.client.DefaultConsumer;
    10 import com.rabbitmq.client.Envelope;
    11 import com.rabbitmq.client.AMQP.BasicProperties;
    12 
    13 public class Recv1 {
    14     private static final String QUEUE_NAME="test_work_queue";
    15     public static void main(String[] args) throws IOException{
    16         // 获取链接
    17         Connection connection = ConnectionUtils.getConnection();
    18         
    19         //获取频道
    20         
    21         Channel channel = connection.createChannel();
    22         
    23         // 声明队列
    24         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    25         
    26         // 定义一个消费者
    27         Consumer consumer = new DefaultConsumer(channel){
    28             // 一旦有消息 就会触发这个方法  消息到达 
    29             @Override
    30             public void handleDelivery(String consumerTag, Envelope envelope,
    31                     BasicProperties properties, byte[] body) throws IOException {
    32                 // TODO Auto-generated method stub
    33                 // 拿消息
    34                 String msg = new String(body,"utf-8");
    35                 
    36                 //搭出来
    37                 System.out.println("[1]Recv msg:"+msg);
    38                 try {
    39                     Thread.sleep(2000);
    40                 } catch (Exception e) {
    41                     // TODO: handle exception
    42                     e.printStackTrace();
    43                 }finally{
    44                     System.out.println("[1] done");
    45                 }
    46             }
    47         };
    48         boolean autoAck = true;
    49         channel.basicConsume(QUEUE_NAME, autoAck,consumer);
    50         
    51     }
    52 }
    View Code
     1 package com.mmr.rabbitmq.work;
     2 
     3 import java.io.IOException;
     4 
     5 import com.mmr.rabbitmq.util.ConnectionUtils;
     6 import com.rabbitmq.client.Channel;
     7 import com.rabbitmq.client.Connection;
     8 import com.rabbitmq.client.Consumer;
     9 import com.rabbitmq.client.DefaultConsumer;
    10 import com.rabbitmq.client.Envelope;
    11 import com.rabbitmq.client.AMQP.BasicProperties;
    12 
    13 public class Recv2 {
    14     private static final String QUEUE_NAME="test_work_queue";
    15     public static void main(String[] args) throws IOException{
    16         // 获取链接
    17         Connection connection = ConnectionUtils.getConnection();
    18         
    19         //获取频道
    20         
    21         Channel channel = connection.createChannel();
    22         
    23         // 声明队列
    24         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    25         
    26         // 定义一个消费者
    27         Consumer consumer = new DefaultConsumer(channel){
    28             // 一旦有消息 就会触发这个方法  消息到达 
    29             @Override
    30             public void handleDelivery(String consumerTag, Envelope envelope,
    31                     BasicProperties properties, byte[] body) throws IOException {
    32                 // TODO Auto-generated method stub
    33                 // 拿消息
    34                 String msg = new String(body,"utf-8");
    35                 
    36                 //搭出来
    37                 System.out.println("[2]Recv msg:"+msg);
    38                 try {
    39                     Thread.sleep(1000);
    40                 } catch (Exception e) {
    41                     // TODO: handle exception
    42                     e.printStackTrace();
    43                 }finally{
    44                     System.out.println("[2] done");
    45                 }
    46             }
    47         };
    48         boolean autoAck = true;
    49         channel.basicConsume(QUEUE_NAME, autoAck,consumer);
    50         
    51     }
    52 }
    View Code

    1.首先我们运行Recv1  Recv2 对消息进行监听

    2.其次我们运行Send,开始生产消息。

    3.最后得到的结果是:消费者1(都是偶数)和消费者2(都是奇数)处理消息是一样的

    为什么会出现这种现象呢?

    ----这种方式叫做轮训分发(round-robin)结果就是不管谁忙谁闲,都不会多给一个消息,任务就是你一个我一个。

  • 相关阅读:
    写个简单的搜索引擎
    C++中的三种继承关系
    《深度探索C++对象模型》调用虚函数
    一次数据库优化的对话
    读后感:你的灯亮着吗
    Linux Shell 截取字符串
    一次关于知识储备的思考
    哈夫曼树与哈夫曼编码
    二叉查找树
    jar中没有注清单属性
  • 原文地址:https://www.cnblogs.com/pengpengzhang/p/9335613.html
Copyright © 2011-2022 走看看