zoukankan      html  css  js  c++  java
  • rabbit工作队列模式

    工作队列比简单队列在消费者这边多了一个方法。

    channel.basicQos(1);公平队列消费(参数设置为1,表示消费者消费完一条才会去接受再次发来的消息)

    生产者:

     1 package com.kf.queueDemo.fairQueue;
     2 
     3 import java.io.IOException;
     4 import java.util.concurrent.TimeoutException;
     5 
     6 import com.kf.utils.RabbitConnectionUtils;
     7 import com.rabbitmq.client.Channel;
     8 import com.rabbitmq.client.Connection;
     9 /**
    10  * 公平模式发送消息
    11  * @author kf
    12  *
    13  */
    14 public class FairProducer {
    15     
    16     //队列名称
    17     private static String QUEUENAME = "SIMPLEQUEUE";
    18     
    19     public static void main(String[] args) throws IOException, TimeoutException{
    20         Connection connection = RabbitConnectionUtils.getConnection();
    21         
    22         //创建通道
    23         Channel channel = connection.createChannel();
    24         
    25         //通道里放入队列
    26         /**
    27          * 第一个参数是  队列名称
    28          * 第二个参数指 要不要持久化
    29          */
    30         channel.queueDeclare(QUEUENAME, false, false, false, null);
    31         
    32 /*        //消息体
    33         String mes = "demo_message汉字";
    34         
    35         //发送消息
    36         *//**
    37          * 参数为  exchange, routingKey, props, body
    38          * exchange   交换机
    39          * routingKey 路由键
    40          * 
    41          * body 消息体
    42          *//*
    43         channel.basicPublish("", QUEUENAME, null, mes.getBytes());*/
    44         
    45         /**
    46          * 集群环境下,多个消费者情况下。消费者默认采用均摊
    47          */
    48         for(int i=1; i<11; i++){
    49             String mes = "demo_message汉字"+i;
    50             System.out.println("发送消息"+mes);
    51             channel.basicPublish("", QUEUENAME, null, mes.getBytes());
    52         }
    53         
    54         
    55 //        System.out.println("发送消息"+mes);
    56         
    57         channel.close();
    58         connection.close();
    59     }
    60 
    61 }

    消费者1:

     1 package com.kf.queueDemo.fairQueue;
     2 
     3 import java.io.IOException;
     4 import java.util.concurrent.TimeoutException;
     5 
     6 import com.kf.utils.RabbitConnectionUtils;
     7 import com.rabbitmq.client.Channel;
     8 import com.rabbitmq.client.Connection;
     9 import com.rabbitmq.client.DefaultConsumer;
    10 import com.rabbitmq.client.Envelope;
    11 import com.rabbitmq.client.AMQP.BasicProperties;
    12 /**
    13  * 公平模式消费者
    14  * @author kf
    15  *
    16  */
    17 public class FairConsumer1 {
    18     //队列名称
    19     private static String QUEUENAME = "SIMPLEQUEUE";
    20     
    21     public static void main(String[] args) throws IOException, TimeoutException{
    22         System.out.println("01开始接收消息");
    23         Connection connection = RabbitConnectionUtils.getConnection();
    24         
    25         //创建通道
    26         final Channel channel = connection.createChannel();
    27         
    28         //通道里放入队列
    29         /**
    30          * 第一个参数是  队列名称
    31          * 第二个参数指 要不要持久化
    32          */
    33         channel.queueDeclare(QUEUENAME, false, false, false, null);
    34         
    35         //公平队列消费(参数设置为1,表示消费者消费完一条才会去接受再次发来的消息)
    36         channel.basicQos(1);
    37         
    38         DefaultConsumer consumer = new DefaultConsumer(channel){
    39             //监听队列
    40                 @Override
    41                 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
    42                         byte[] body) throws IOException {try {
    43                             Thread.sleep(500);
    44                         } catch (Exception e) {
    45                         }finally {
    46                             System.out.println("------------进入监听---------");
    47                             String s = new String(body, "utf-8");
    48                             System.out.println("获取到的消息是:"+s);
    49                             //手动应答。
    50                             /**
    51                              * 当  channel.basicConsume(QUEUENAME, true, consumer);第二个参数为false时  是手动应答模式
    52                              */
    53                             channel.basicAck(envelope.getDeliveryTag(), false);
    54                         }}
    55         };
    56         
    57         //设置应答模式
    58         /**
    59          * 参数:  对列名,是否自动签收,监听的类
    60          */
    61         System.out.println("获取消息的方法之前");
    62         channel.basicConsume(QUEUENAME, false, consumer);
    63         System.out.println("获取消息的方法之后");
    64         
    65     }
    66 
    67 
    68 }

    消费者2:

    package com.kf.queueDemo.fairQueue;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.kf.utils.RabbitConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    /**
     * 公平模式消费者
     * @author kf
     *
     */
    public class FairConsumer2 {
        //队列名称
        private static String QUEUENAME = "SIMPLEQUEUE";
        
        public static void main(String[] args) throws IOException, TimeoutException{
            System.out.println("02开始接收消息");
            Connection connection = RabbitConnectionUtils.getConnection();
            
            //创建通道
            final Channel channel = connection.createChannel();
            
            //通道里放入队列
            /**
             * 第一个参数是  队列名称
             * 第二个参数指 要不要持久化
             */
            channel.queueDeclare(QUEUENAME, false, false, false, null);
            
            //公平队列消费(参数设置为1,表示消费者消费完一条才会去接受再次发来的消息)
                    channel.basicQos(1);
            
            DefaultConsumer consumer = new DefaultConsumer(channel){
                //监听队列
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                            byte[] body) throws IOException {
                        try {
                            Thread.sleep(1000);
                        } catch (Exception e) {
                        }finally {
                            System.out.println("------------进入监听---------");
                            String s = new String(body, "utf-8");
                            System.out.println("获取到的消息是:"+s);
                            //手动应答。
                            /**
                             * 当  channel.basicConsume(QUEUENAME, true, consumer);第二个参数为false时  是手动应答模式
                             */
                            channel.basicAck(envelope.getDeliveryTag(), false);
                        }
                        
                    }
            };
            
            //设置应答模式
            /**
             * 参数:  对列名,是否自动签收,监听的类
             */
            System.out.println("获取消息的方法之前");
            channel.basicConsume(QUEUENAME, false, consumer);
            System.out.println("获取消息的方法之后");
            
        }
    
    
    }
  • 相关阅读:
    HTML DOM教程 49JavaScript Number 对象
    jquery技巧总结
    eclipse常用快捷键汇总
    一个Hibernate 的简单教程
    HTML DOM教程 51JavaScript match() 方法
    Java开源项目Hibernate包作用详解
    Build system 英文说明 Andrlid.mk说明
    android通过 哪些变量 来决定 哪些应用 会被编译进系统
    android“设置”里的版本号
    android设置中PreferenceActivity的 小结
  • 原文地址:https://www.cnblogs.com/fuguang/p/10660554.html
Copyright © 2011-2022 走看看