zoukankan      html  css  js  c++  java
  • RabbitMQ(2)---高级使用

    1.ack和限流

    ack也就是消息确认签收,分为自动签收和手动签收。之前的交换机demo中:channel.basicConsume(queueName,true, consumer);  第二个参数就是自动签收,如果我们要手动签收则需要改成false,再去消息处理中手动签收即可

    当我们消息队列中已经积压了大量消息的时候。这个时候消费者才启动,,如果是自动签收的话,就会导致大量消息涌入,可能回到服务刚启动就宕机。这个时候就可以限制消息数量,使用手动签收。处理完这一批,再处理下一批。

    使用手动签收,我们还可以在拿到消息,进行不同的业务处理,比如如果消息信息有问题,那就不签收,移除当前队列,或者放到其他地方去处理之类的

    RabbitMQUtils类的代码在上一节中:RabbitMQ(1)---基本概念及简单demo

    ack:手动签收消息:

    package com.nijunyang.rabbitmq.ack;
    
    import com.nijunyang.rabbitmq.util.RabbitMQUtils;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Random;
    import java.util.UUID;
    
    /**
     * Description:
     * Created by nijunyang on 2020/6/7 13:07
     */
    public class AckProducer {
    
        public static void main(String[] args) throws Exception {
            Connection connection = RabbitMQUtils.getConnection();
            Channel channel = connection.createChannel();
            String message = "hello rabbitMQ." + new Random().nextInt(100);
    
            String exchangeName = "ack.exchange";
            String routingKey = "ack.key";
    
    
            Map<String,Object> heads = new HashMap<>();
            heads.put("userName", "zhangsan");
    
            AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
                    .deliveryMode(2)//消息持久化
                    .contentEncoding("UTF-8")
                    .correlationId(UUID.randomUUID().toString())
                    .headers(heads)//存放头信息
                    .build();
    
            channel.basicPublish(exchangeName, routingKey, basicProperties, message.getBytes("utf-8"));
            channel.basicPublish(exchangeName, routingKey, null, message.getBytes("utf-8"));
            RabbitMQUtils.close(channel, connection);
        }
    }
    package com.nijunyang.rabbitmq.ack;
    
    import com.nijunyang.rabbitmq.util.RabbitMQUtils;
    import com.rabbitmq.client.*;
    import org.springframework.util.StringUtils;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * Description:
     * Created by nijunyang on 2020/6/7 13:07
     */
    public class AckConsumer {
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = RabbitMQUtils.getConnection();
            Channel channel = connection.createChannel();
            String exchangeName = "ack.exchange";
            String exchangeType = "direct";
            String routingKey = "ack.key";
            String queueName = "ack.queue";
    
            channel.exchangeDeclare(exchangeName, exchangeType);
            channel.queueDeclare(queueName,true,false,false,null);
            channel.queueBind(queueName, exchangeName, routingKey);
    
    
            Consumer consumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                    try {
                        Object userName = properties.getHeaders().get("userName");
                        if (!StringUtils.isEmpty(userName)) {
                            //用发送时候放的 头信息模拟业务问题
                            String message = new String(body, "UTF-8");
                            System.out.println(message);
                            //手动签收消息
                            channel.basicAck(envelope.getDeliveryTag(),false);
                        }
                        else {
                            throw new RuntimeException();
                        }
                    }catch (Exception e) {
                        //requeue参数 true 重回队列,,false不重回队列, 或者做其他处理
                        channel.basicNack(envelope.getDeliveryTag(),false,false);
                    }
                }
            };
    
            //autoAck参数 true:开启自动签收,false:关闭自动签收功能
            channel.basicConsume(queueName,false, consumer);
    
    
        }
    }

    限流的话只需要多一个限制:channel.basicQos(0,5,false);  每次只会处理5条消息,签收完了,在处理后面的

    package com.nijunyang.rabbitmq.limit;
    
    import com.nijunyang.rabbitmq.util.RabbitMQUtils;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Random;
    import java.util.UUID;
    
    /**
     * Description:
     * Created by nijunyang on 2020/6/7 13:07
     */
    public class LimitProducer {
    
        public static void main(String[] args) throws Exception {
            Connection connection = RabbitMQUtils.getConnection();
            Channel channel = connection.createChannel();
    
    
            String exchangeName = "limit.exchange";
            String routingKey = "limit.key";
    
    
            for (int i = 0; i < 20; i++) {
                String message = "limit rabbitMQ." + i;
                channel.basicPublish(exchangeName, routingKey, null, message.getBytes("utf-8"));
            }
    
            RabbitMQUtils.close(channel, connection);
        }
    }
    package com.nijunyang.rabbitmq.limit;
    
    import com.nijunyang.rabbitmq.util.RabbitMQUtils;
    import com.rabbitmq.client.*;
    import org.springframework.util.StringUtils;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * Description:
     * Created by nijunyang on 2020/6/7 13:07
     */
    public class LimitConsumer {
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = RabbitMQUtils.getConnection();
            Channel channel = connection.createChannel();
            String exchangeName = "limit.exchange";
            String exchangeType = "direct";
            String routingKey = "limit.key";
            String queueName = "limit.queue";
    
            channel.exchangeDeclare(exchangeName, exchangeType);
            channel.queueDeclare(queueName,true,false,false,null);
            channel.queueBind(queueName, exchangeName, routingKey);
    
            /**
             * 限流设置:
             * prefetchSize:每条消息大小的设置
             * prefetchCount:标识每次推送多少条消息
             * global:false标识channel级别的  true:标识消费的级别的
             */
            channel.basicQos(0,5,false);
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(message);
                    //手动签收消息, 否则就会一直阻塞了
                    channel.basicAck(envelope.getDeliveryTag(), false);
    
                }
            };
    
            //autoAck参数 true:开启自动签收,false:关闭自动签收功能
            channel.basicConsume(queueName,false, consumer);
    
    
        }
    }

    2.消息投递确认,开启这个模式之后 消息投递了之后 不能关闭连接,因为监听是绑定在channel上面的

    开启消息投递确认模式,,在消息发送者上面绑定一个监听,消息投递成功或者失败回调对应方法。

    package com.nijunyang.rabbitmq.confirm;
    
    import com.nijunyang.rabbitmq.util.RabbitMQUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConfirmListener;
    import com.rabbitmq.client.Connection;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * Description:
     * Created by nijunyang on 2020/6/7 19:53
     */
    public class ConfirmProducer {
        public static void main(String[] args) throws Exception {
            Connection connection = RabbitMQUtils.getConnection();
            Channel channel = connection.createChannel();
    
            //设置消息投递模式(确认模式)
            channel.confirmSelect();
            /**
             * 消息确认监听绑定
             */
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("消息投递成功");
                }
    
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("消息投递失败");
                }
            });
    
            String exchangeName = "confirm.exchange";
            String routingKey = "confirm.key";
    
            for (int i = 0; i < 20; i++) {
                String message = "limit rabbitMQ." + i;
                channel.basicPublish(exchangeName, routingKey, null, message.getBytes("utf-8"));
            }
            //设置了消息投递确认就不能关闭channel和连接了
    //        RabbitMQUtils.close(channel, connection);
        }
    }

    3.不可达消息处理:有些消息发送之后,由于设置的原因,不能正常的路由到队列上面。

    和消息投递确认差不多,只不过是在生产者的channel上面绑定一个ReturnListener(channel.addReturnListener(new RetrunListener())),然后投递消息的时候使用这个方法channel.basicPublish(exchangeName,routingKey,true,null, message.getBytes()),相比之前的投递方式多了一个布尔类型的mandatory参数。如果true那么就会调用的绑定的ReturnListener,实现的方法,如果是false那么就会直接删除这个消息。

    4.死信队列。专门用来接收没有消费的消息的队列。消息发送到正常队列上面但是没有被消费,就会被转发到死信队列上面。所以说死信队列是和一个正常队列绑定的。消息变成死信的几种情况:1.消息被拒绝(basicNack   basicReject)并且重回队里设置的false,2.消息设置了过期时间,时间到了也没有被消费,3.队列已经达到最大长度,后面进来的消息直接转到死信队列。死信队列也是一个正常的交换机和队列。

    package com.nijunyang.rabbitmq.deadqueue;
    
    import com.nijunyang.rabbitmq.util.RabbitMQUtils;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConfirmListener;
    import com.rabbitmq.client.Connection;
    
    import java.io.IOException;
    
    /**
     * Description:
     * Created by nijunyang on 2020/6/7 20:48
     */
    public class DeadQueueProducer {
        public static void main(String[] args) throws Exception {
            Connection connection = RabbitMQUtils.getConnection();
            Channel channel = connection.createChannel();
    
            String exchangeName = "normal.exchange";
            String routingKey = "normal.key";
    
            //设置消息的过期时间10s
            AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
                    .deliveryMode(2)
                    .expiration("10000")
                    .build();
            for (int i = 0; i < 20; i++) {
                String message = "dead rabbitMQ." + i;
                channel.basicPublish(exchangeName, routingKey, basicProperties, message.getBytes("utf-8"));
            }
            RabbitMQUtils.close(channel, connection);
        }
    }
    package com.nijunyang.rabbitmq.deadqueue;
    
    import com.nijunyang.rabbitmq.util.RabbitMQUtils;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * Description:
     * Created by nijunyang on 2020/6/7 20:51
     */
    public class DeadQueueConsumer {
        public static void main(String[] args) throws Exception{
            Connection connection = RabbitMQUtils.getConnection();
            Channel channel = connection.createChannel();
    
            //声明正常的队列
            String normalExchangeName = "normal.exchange";
            String exchangeType = "direct";
            String normalQueueName = "normal.queue";
            String routingKey = "normal.key";
    
            channel.exchangeDeclare(normalExchangeName, exchangeType);
    
            //申明死信队列
            String deadExchangeName = "dead.exchange";
            String deadExchangeType = "topic";
            String deadQueueName = "dead.queue";
    
            Map<String, Object> queueArgs = new HashMap<>();
            //正常队列上绑定死信队列信息
            queueArgs.put("x-dead-letter-exchange", deadExchangeName);
            queueArgs.put("x-max-length", 4); //队列的最大长度
            channel.queueDeclare(normalQueueName,true,false,false, queueArgs);
            channel.queueBind(normalQueueName, normalExchangeName, routingKey);
    
            //声明死信队列
            channel.exchangeDeclare(deadExchangeName, deadExchangeType);
            channel.queueDeclare(deadQueueName,true,false,false,null);
            channel.queueBind(deadQueueName, deadExchangeName,"#");
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(message);
                    channel.basicNack(envelope.getDeliveryTag(),false,false); //拒签
                }
            };
            channel.basicConsume(normalQueueName, false, consumer);
    
        }
    }

    可有看到所有的消息最后都转到了 死信队列中去了。这个模式还可以用于延迟队列。只需要设置正常队列消息的过期时间,然后转到死信队列,,消费者监听消费死信队列,就可以实现延时队列了。

    5.单播消费模式,首先我们要明确一点消费者最终都是从队列中拿到消息消费的,我们将多个消费者都绑定到同一个队列上面去,这个时候,队列消息只会被一个消费者消费,不会重复让每个消费者都消费。

    6.多播消费模式,和单播消费差不多,这个时候我们需要申明多个队列绑定同一个交换机,这样交换机的信息就会发到多个队列上面,这样通过同一个交换机将同一条消息发送到不同的队列上面去了,也就实现了让不同的消费者消费了同一条消息了。

    package com.nijunyang.rabbitmq.deadqueue;

    import com.nijunyang.rabbitmq.util.RabbitMQUtils;
    import com.rabbitmq.client.*;

    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;

    /**
    * Description:
    * Created by nijunyang on 2020/6/7 20:51
    */
    public class DeadQueueConsumer {
    public static void main(String[] args) throws Exception{
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();

    //声明正常的队列
    String normalExchangeName = "normal.exchange";
    String exchangeType = "direct";
    String normalQueueName = "normal.queue";
    String routingKey = "normal.key";

    channel.exchangeDeclare(normalExchangeName, exchangeType);

    //申明死信队列
    String deadExchangeName = "dead.exchange";
    String deadExchangeType = "topic";
    String deadQueueName = "dead.queue";

    Map<String, Object> queueArgs = new HashMap<>();
    //正常队列上绑定死信队列信息
    queueArgs.put("x-dead-letter-exchange", deadExchangeName);
    queueArgs.put("x-max-length", 4); //队列的最大长度
    channel.queueDeclare(normalQueueName,true,false,false, queueArgs);
    channel.queueBind(normalQueueName, normalExchangeName, routingKey);

    //声明死信队列
    channel.exchangeDeclare(deadExchangeName, deadExchangeType);
    channel.queueDeclare(deadQueueName,true,false,false,null);
    channel.queueBind(deadQueueName, deadExchangeName,"#");

    Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");
    System.out.println(message);
    channel.basicNack(envelope.getDeliveryTag(),false,false); //拒签
    }
    };
    channel.basicConsume(normalQueueName, false, consumer);

    }
    }
  • 相关阅读:
    leetcode整理(一)
    day02 整理
    python从入门到放弃之守护进程
    python从入门到放弃之进程
    基于hashlib下的文件校验
    hashlib(hmac)进阶之client跟server交互
    hashlib以及hmac的日常应用
    python从入门到放弃
    6.redis 的持久化有哪几种方式?不同的持久化机制都有什么优缺点?持久化机制具体底层是如何实现的?
    5.如何保证 redis 的高并发和高可用?redis 的主从复制原理能介绍一下么?redis 的哨兵原理能介绍一下么?
  • 原文地址:https://www.cnblogs.com/nijunyang/p/13062220.html
Copyright © 2011-2022 走看看