zoukankan      html  css  js  c++  java
  • rabbitmq2

    rabbitmq学习记录2

    1.rabbitmq队列持久化

            //声明队列
            boolean durable = true;//需要让queue持久优化
            /**
             * 1.队列名称
             * 2.是否持久化
             */
            channel.queueDeclare(task_queue_name,durable,false,false,null);
    

    2.rabbitmq消息持久化

    MessageProperties.PERSISTENT_TEXT_PLAIN

                //设置生产者发消息为持久化消息(要求保存到磁盘上)
                channel.basicPublish("",task_queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));
    

    3.不公平分发

    可以根据设备的速度调节
    消费者

            //设置不公平分发 0 为公平,1为不公平
            channel.basicQos(0);
    

    4.预取值

    可以设定每个消费者做工的量,等于信道的size。如果size满了就会分配给别人

            //大于1为预取值
            channel.basicQos(x);
    

    5.发布确认原理

    1. 设置要求队列必须持久化
    2. 设置要求对了中的消息必须是持久化的
    3. 发布确认,把消息从队列保存到磁盘上

    5.1单个确认发布

        public static void publicMessageIndividually() throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            //队列的声明
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName,false,false,false,null);
            //开启发布确认
            channel.confirmSelect();
            //开始时间
            long begin = System.currentTimeMillis();
    
            //批量发消息
            for (int i = 0;i < MESSAGE_COUNT;i++){
                String message = i + "";
                channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));
                //单个消息就马上发布确认
                boolean flag = channel.waitForConfirms();
                if (flag){
                    System.out.println("消息发送成功");
                }
            }
    
            //结束时间
            long end = System.currentTimeMillis();
            System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");
        }
    

    5.2批量确认发布

        public static  void publishMessageBatch() throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            //队列的声明
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName,false,false,false,null);
            //开启发布确认
            channel.confirmSelect();
            //开始时间
            long begin = System.currentTimeMillis();
    
            //批量确认消息大小
            int batchSize = 100;
    
            //批量发消息 批量确认
            for (int i = 0;i < MESSAGE_COUNT;i++){
                String message = i + "";
                channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));
                if (i % batchSize == 0) {
                    channel.waitForConfirms();
                }
            }
            //结束时间
            long end = System.currentTimeMillis();
            System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms");
        }
    
    

    5.3异步确认发布

        public static void publicMessageAsync() throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            //队列的声明
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName,false,false,false,null);
            //开启发布确认
            channel.confirmSelect();
    
            /**
             * 线程安全有序的一个哈希表,适用于高并发的情况下
             * 1.轻松的将序号与消息进行关联
             * 2.轻松批量删除条目只要给到序号
             * 3.支持高并发(多线程)
             */
            ConcurrentSkipListMap<Long,String> outstandingConfirms =
                    new ConcurrentSkipListMap<>();
    
            //消息确认成功 回调函数
            /**
             * 1.消息的标记
             * 2.消息的是否批量确认
             */
            ConfirmCallback ackCallback =(deliveryTag,multiple)->{
                // 第二步 删除已经确认的消息剩下的就是未确认消
                if(multiple){
                    ConcurrentNavigableMap<Long,String> confirmed =
                            outstandingConfirms.headMap(deliveryTag);
                    confirmed.clear();
                }else{
                    outstandingConfirms.remove(deliveryTag);
                }
    
                System.out.println("确认的消息:" + deliveryTag);
            };
            //消息确认失败 回调函数
            ConfirmCallback nackCallback =(deliveryTag,multiple)->{
                // 第三步 打印未确认的消息都有哪些
                String message = outstandingConfirms.get(deliveryTag);
                System.out.println("未确认的消息是" +message + "未确认的消息tag:" + deliveryTag);
            };
            //设置监听器,监听哪些消息成功了,哪些消息失败了
            channel.addConfirmListener(ackCallback,nackCallback);//异步通知
    
            //开始时间
            long begin = System.currentTimeMillis();
    
            //批量发消息 异步确认发布
            for (int i = 0;i < MESSAGE_COUNT;i++){
                String message = "消息" + i;
                channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));
                // 异步确认 第一步 记录所有要发送的消息 使用 ConcurrentLinkedQueue
                outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
            }
            //结束时间
            long end = System.currentTimeMillis();
            System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) + "ms");
        }
    

    5.4交换机

    发布订阅模式:让多个消费者得到消息。
    之前是生产者->队列
    改为生产者->交换机->队列
    四大核心六大模式
    四大核心 : 生产者,交换机,队列,消费者
    六大模式 : 简单模式,工作模式,发布订阅模式,路由模式,主题模式,发布确认模式

    5.4.1交换机概念

    生产者只能将消息发送到交换机。不指定的话是默认交换机。

    5.4.2交换机的类型

    直接(direct) 主题(topic) 标题(headers) 扇出(发布订阅模式)(fanout)

    5.4.3无名exchange

    用空串"" 默认交换机,routingkey直接是队列名字。

    6扇出模式实战

    fanout 一发多接受

    public class EmitLog {
        //交换机的名字
        public static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
    
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()){
                String message = scanner.next();
                channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));
                System.out.println("发送者发出消息" + message);
            }
        }
    }
    
    public class ReceiveLogs01 {
        //交换机的名称
        public static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            //声明一个交换机
            channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
            //声明一个临时队列
            /**
             * 生成一个临时队列。队列的名称是随机的
             * 当消费者断开与队列的连接时。队列自动删除
             * s2:rountingkey
             */
            String queueName = channel.queueDeclare().getQueue();
            channel.queueBind(queueName,EXCHANGE_NAME,"");
            System.out.println("等待接收消息,把接收到的消息打印在屏幕上........");
    
            DeliverCallback deliverCallback = (consumerTag,message) -> {
                System.out.println("控制台1打印接收到的消息" + new String(message.getBody()));
            };
    
            channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});
        }
    }
    

    7direct模式 路由模式

    rounting key 不一样的话,就是路由模式

    public class DirectLogs {
        //交换机的名字
        public static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            channel.exchangeDeclare(EXCHANGE_NAME,"direct");
    
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()){
                String message = scanner.next();
                /**
                 * s1 :rounting key 的名称
                 */
                channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));
                System.out.println("发送者发出消息" + message);
            }
        }
    }
    
    
    public class ReceiveLogsDirect02 {
        public static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] args) throws Exception{
            Channel channel = RabbitMqUtils.getChannel();
            channel.exchangeDeclare(EXCHANGE_NAME,"direct");
    
            //声明一个队列
            channel.queueDeclare("disk",false,false,false,null);
            /**
             * 1. 队列名称
             * 2. 交换机名称
             * 3. rountingkey
             */
            channel.queueBind("disk",EXCHANGE_NAME,"info");
    
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println("控制台1打印接收到的消息" + new String(message.getBody()));
            };
    
            channel.basicConsume("disk",true,deliverCallback,consumerTag ->{});
        }
    }
    
  • 相关阅读:
    Redis常见数据类型二:Hash
    Redis常见数据类型一:String
    了解Docker
    微信小程序倒计时秒杀
    笛卡尔积求二维数组所有组合
    git好网站网址搜集
    npm i 报错Can't find Python executable "python2.7", you can set the PYTHON env variable
    css_注意小事项总结_随时更新
    echarts使用时报错cannot read property 'querycomponents' of undefined解决方案
    echarts中获取各个省份地图的链接
  • 原文地址:https://www.cnblogs.com/wenwenjiejie/p/15113243.html
Copyright © 2011-2022 走看看