zoukankan      html  css  js  c++  java
  • RabbitMq 之简单队列

    
    

    简单队列类似于我们的生产者,消费者,

    
    

    一个生产者,对应一个消费者.

    
    

    直接上代码:



    package
    com.j1.rabbitmq.simple; import com.j1.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "test_queue"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 /** * 如果队列存在,则返回,如果不存在,则创建 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列 channel.basicConsume(QUEUE_NAME, true, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } }
    package com.j1.rabbitmq.simple;
    
    import com.j1.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Send {
    
        private final static String QUEUE_NAME = "test_queue";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            // 从连接中创建通道
            Channel channel = connection.createChannel();
    
            // 声明(创建)队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 消息内容
            String message = "Hello World! 1111";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
    
            //关闭通道和连接
            channel.close();
            connection.close();
        }
    }

    生产者生产一条信息,会被消费者进行消费,

    但是上面有一个问题,只能是一对一的关系,在应用中很少用的到,

    下面我们来讨论一下,一个生产者,多个消费者的情况

    package com.j1.rabbitmq.work;
    
    import com.j1.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Send {
    
        private final static String QUEUE_NAME = "test_queue_work";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            for (int i = 0; i < 100; i++) {
                // 消息内容
                String message = "" + i;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println(" [x] Sent '" + message + "'");
    
                Thread.sleep(i * 10);
            }
    
            channel.close();
            connection.close();
        }
    }
    package com.j1.rabbitmq.work;
    
    import com.j1.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    
    public class Recv {
    
        private final static String QUEUE_NAME = "test_queue_work";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 同一时刻服务器只会发一条消息给消费者,每一次服务器只会向客户端发送一条
            //channel.basicQos(1);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [x] Received '" + message + "'");
                //休眠
                Thread.sleep(10);
                // 返回确认状态
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    package com.j1.rabbitmq.work;
    
    import com.j1.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    
    public class Recv2 {
    
        private final static String QUEUE_NAME = "test_queue_work";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 同一时刻服务器只会发一条消息给消费者,每一次服务器只会向客户端发送一条
          //  channel.basicQos(1);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成状态
            channel.basicConsume(QUEUE_NAME, false, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [x] Received '" + message + "'");
                // 休眠1秒
                Thread.sleep(1000);
    
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }

    上面标红的地方要特别注意一下,

    生产者在生产消息时,消费者的拿到的数据是一样的,这样就很不合理,因为能者多劳嘛,为了解决这一个问题,引入一行代码:

      channel.basicQos(1);

     

     

     

    上面的生产者生产了100条信息,A消费者拿到48条信息,B拿到52条数据,说明A,B拿的信息不是同一条,

    就上面的问题我们有一种解决方案:

     

    package com.j1.rabbitmq.ps;
    
    import com.j1.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Send {
    
        private final static String EXCHANGE_NAME = "test_exchange_fanout";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明exchange交换机
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    
            // 消息内容
            String message = "Hello World!";
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
    
            channel.close();
            connection.close();
        }
    }

     

    package com.j1.rabbitmq.ps;
    
    import com.j1.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    
    public class Recv {
    
        private final static String QUEUE_NAME = "test_queue_exchange";
    
        private final static String EXCHANGE_NAME = "test_exchange_fanout";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [x] Received '" + message + "'");
                Thread.sleep(10);
                // 返回消息消费状态
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    package com.j1.rabbitmq.ps;
    
    import com.j1.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    
    public class Recv2 {
    
        private final static String QUEUE_NAME = "test_queue_exchange2";
    
        private final static String EXCHANGE_NAME = "test_exchange_fanout";
    
        public static void main(String[] argv) throws Exception {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1);
    
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer);
    
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [x] Received '" + message + "'");
                Thread.sleep(10);
    
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }

    其实总结一下:

    一个生产者,多个消费者,每个消费者获取到的消息是一样的。

    1、生产者发送消息到交换机。(生产者可以向队列或者交换机发送消息)

    2、消费者监听消息队列,消费者永远只能从队列中获取消息。

    3、队列绑定到交换机,获取消息。

    就是将队列绑定在交换机上,监听队列,获取生产者者信息.

    一个交换机可以对应好几个队列,

    这种模式将用于我们的搜索,缓存服务,后面会在实际应用中看到

     

     

     

     

     

     

     

  • 相关阅读:
    谷歌机器学习
    Pycharm使用conda安装的环境
    HAN模型理解2
    HAN模型理解1
    RCNN
    深度CNN
    多通道CNN
    TextCNN
    词向量2
    词向量1.md
  • 原文地址:https://www.cnblogs.com/wangchuanfu/p/7272608.html
Copyright © 2011-2022 走看看