zoukankan      html  css  js  c++  java
  • RabbitMQ(二):Java 操作队列

    1. 简单模式

    模型:

    • P:消息的生产者
    • 队列:rabbitmq
    • C:消息的消费者

    获取 MQ 连接

    public static Connection getConnection() throws IOException, TimeoutException {
            // 定义一个连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            // 设置服务地址
            factory.setHost("127.0.0.1");
            // AMQP 5672
            factory.setPort(5672);
            // vhost
            factory.setVirtualHost("/vhost_ljf");
            // 用户名
            factory.setUsername("ljf");
            // 密码
            factory.setPassword("123456");
            return factory.newConnection();
        }
    

    生产者生产消息

    public class Send {
        private static final String QUEUE_NAME = "test_simple_queue";
        public static void main(String[] args) throws IOException, TimeoutException {
            // 获取一个连接
            Connection connection = ConnectionUtils.getConnection();
            // 从连接中获取一个通道
            Channel channel = connection.createChannel();
            // 创建队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            String msg = "hello simple!";
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            System.out.println("--send msg: " + msg);
            channel.close();
            connection.close();
        }
    }
    

    消费者接收消息

    public class Recv {
        private static final String QUEUE_NAME = "test_simple_queue";
        public static void main(String[] args) throws IOException, TimeoutException {
            // 获取连接
            Connection connection = ConnectionUtils.getConnection();
            // 创建通道
            Channel channel = connection.createChannel();
            // 队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 定义消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取到达的消息
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("recv: " + msg);
                }
            };
            // 监听队列
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }
    

    简单队列的不足

    耦合性高,生产者一一对应消费者(如果我想有多个消费者消费队列中的消息,这时候就不行了);

    队列名变更,这时候得同时变更。

    2. 工作队列模式(Work Queue)

    模型

    为什么会出现工作队列?

    simple 队列是一一对应的,而且我们实际开发,生产者发送消息是毫不费力的,而消费者一般是要跟业务相结合的,消费者接收到消息之后就需要处理,可能需要花费时间,这时候队列就会积压了很多消息。

    生产者

    /**
     *                 |----C1
     *   P----Queue----|
     *                 |----C2
     */
    public class Send {
        private static final String QUEUE_NAME = "test_work_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException ,InterruptedException{
            // 获取连接
            Connection connection = ConnectionUtils.getConnection();
    
            // 获取 channel
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            for (int i = 0; i < 50; i++) {
                String msg = "hello" + i;
                System.out.println("[WQ] send: " + msg);
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
                Thread.sleep(i*20);
            }
            channel.close();
            connection.close();
        }
    }
    

    消费者

    • 消费者1
    public class Recv1 {
        private static final String QUEUE_NAME = "test_work_queue";
        public static void main(String[] args) throws IOException, TimeoutException {
            // 获取连接
            Connection connection = ConnectionUtils.getConnection();
            // 获取 channel
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 定义一个消费者
            Consumer consumer = new DefaultConsumer(channel) {
                // 消息到达 触发方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("[1] Recv msg: " + msg);
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("[1] done.");
                    }
                }
            };
            boolean autoAck = true;
            channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
    }
    
    • 消费者2
    public class Recv2 {
        private static final String QUEUE_NAME = "test_work_queue";
        public static void main(String[] args) throws IOException, TimeoutException {
            // 获取连接
            Connection connection = ConnectionUtils.getConnection();
            // 获取 channel
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 定义一个消费者
            Consumer consumer = new DefaultConsumer(channel) {
                // 消息到达 触发方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("[2] Recv msg: " + msg);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("[2] done.");
                    }
                }
            };
            boolean autoAck = true;
            channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
    }
    

    现象

    先运行消费者1和消费者2,再运行生产者

    消费者1 和 消费者2 处理的消息数量是一样多的。

    消费者1:偶数

    消费者2:奇数

    这种方式叫做轮询分发(round-robin),结果就是不管谁忙谁清闲,都不会多给一个消息。

    3. 公平分发(fair dipatch)

    生产者

    public class Send {
        private static final String QUEUE_NAME = "test_work_queue";
        public static void main(String[] args) throws IOException, TimeoutException ,InterruptedException{
            // 获取连接
            Connection connection = ConnectionUtils.getConnection();
            // 获取 channel
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            /**
             * 每个消费者:发送确认消息之前,消息队列不发送下一个消息到消费者,一次只处理一个消息
             */
            int prefetchCount = 1;
            channel.basicQos(prefetchCount);
            for (int i = 0; i < 50; i++) {
                String msg = "hello" + i;
                System.out.println("[WQ] send: " + msg);
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
                Thread.sleep(i*5);
            }
            channel.close();
            connection.close();
        }
    }
    

    消费者

    • 消费者1
    public class Recv1 {
        private static final String QUEUE_NAME = "test_work_queue";
        public static void main(String[] args) throws IOException, TimeoutException {
            // 获取连接
            Connection connection = ConnectionUtils.getConnection();
            // 获取 channel
            final Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.basicQos(1);  // 保证一次只发送一个
            // 定义一个消费者
            Consumer consumer = new DefaultConsumer(channel) {
                // 消息到达 触发方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("[1] Recv msg: " + msg);
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("[1] done.");
                        // 手动回执
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            boolean autoAck = false; // 自动应答 false
            channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
    }
    
    • 消费者2
    public class Recv2 {
        private static final String QUEUE_NAME = "test_work_queue";
        public static void main(String[] args) throws IOException, TimeoutException {
            // 获取连接
            Connection connection = ConnectionUtils.getConnection();
            // 获取 channel
            final Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.basicQos(1);  // 保证一次只发送一个
            // 定义一个消费者
            Consumer consumer = new DefaultConsumer(channel) {
                // 消息到达 触发方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("[2] Recv msg: " + msg);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("[2] done.");
                        // 手动回执
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            boolean autoAck = false; // 自动应答 false
            channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
    }
    

    现象

    消费者2 处理的消息比 消费者1 多,能者多劳。

    4. 消息应答与消息持久化

    消息应答

    boolean autoAck = false; // 自动应答 false
    channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    
    • boolean autoAck = true;(自动确认模式)

    一旦 rabbitmq 将消息分发给消费者,就会从内存中删除;

    这种情况下,如果杀死正在执行的消费者,就会丢失正在处理的消息。

    • boolean autoAck = false;(手动模式)

    如果一个消费者挂掉,就会交付给其他消费者;

    rabbitmq 支持消息应答,消费者发送一个消息应答,告诉 rabbitmq 这个消息我已经处理完成,可以删掉,然后 rabbitmq 就删除内存中的消息。

    消息应答默认是打开的,即为 false;

    如果 rabbitmq 挂了,消息任然会丢失。

    消息持久化

    // 声明队列
    boolean durable = true;
    channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
    

    注意:rabbitmq 不允许重新定义(不同参数)一个已存在的队列

  • 相关阅读:
    Red5/FMS视频直播带宽计算
    基于NPOI开源框架写的ExcelHelper
    Using C# 4.0 and dynamic to parse JSON
    跟我学MVVM模式开发
    supermap使用代码示例(GIS)
    使用OpenXML将Excel内容读取到DataTable中
    ADO 数据类型转换表
    I don't like Regex...
    将Datatable转Excel少于4笔时汉字乱码4/26
    记录宝宝成长脚印3/31
  • 原文地址:https://www.cnblogs.com/dear_diary/p/10567792.html
Copyright © 2011-2022 走看看