zoukankan      html  css  js  c++  java
  • 消息中间件系列三:使用RabbitMq原生Java客户端进行消息通信(消费者(接收方)自动确认模式、消费者(接收方)自行确认模式、生产者(发送方)确认模式)

    准备工作:

    1)安装RabbitMQ,参考文章:消息中间件系列二:RabbitMQ入门(基本概念、RabbitMQ的安装和运行)

    2.)分别新建名为OriginalRabbitMQProducer和OriginalRabbitMQConsumer的maven工程

    在pom.xml文件里面引入如下依赖:

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.0.0</version>
        </dependency>

    说明:5系列的版本最好使用JDK8及以上, 低于JDK8可以使用4.x(具体的版本号到Maven的中央仓库查)的版本

    一、消费者(接收方)自动确认模式

     前面有谈到消费者收到的每一条消息都必须进行确认,消息的确认机制分为自动确认和消费者自行确认。下面我们来看一下自动确认的示例:

    示例1:交换器是direct

    1. 在工程OriginalRabbitMQProducer新建一个一个direct的生产者

    package study.demo.normal;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 
     * @Description: 交换器是direct的生产者 路由键完全匹配时,消息才投放到对应队列
     * @author leeSmall
     * @date 2018年9月15日
     *
     */
    public class DirectProducer {
    
        //定义交换器的名字
        private final static String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //1.创建一个连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            
            //设置要连接的RabbitMQ服务器的地址
            factory.setHost("127.0.0.1");
            
            //设置用户名 这里使用缺省的
            //factory.setUsername(..);
            
            //设置连接断开  这里使用缺省的
            //factory.setPort();
            
            //设置虚拟主机 这里使用缺省的
            //factory.setVirtualHost();
    
    
            //2.通过连接工厂创建一个连接
            Connection connection = factory.newConnection();
    
            //3.通过连接创建一个信道 信道是用来传送数据的
            Channel channel = connection.createChannel();
    
            //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
            //定义一组路由键
            String[]routingKeys = {"error","info","warning"};
    
            //发布消息的交换器上
            for(int i=0;i<3;i++){
                //路由键
                String routingKey = routingKeys[i];
                
                //要发送的消息
                String message = "Hello world_"+(i+1);
    
                /**
                 * 发送消息到交换器上
                 * 参数1:交换器的名字
                 * 参数2:路由键
                 * 参数3:BasicProperties
                 * 参数4:要发送的消息
                 */
                channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());
                System.out.println("Sent "+routingKey+":"+message);
    
            }
    
            channel.close();
            connection.close();
    
        }
    
    }

    2. 在工程OriginalRabbitMQConsumer新建一个direct的只消费error日志的消费者

    package study.demo.normal;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 
     * @Description: 交换器是direct 只消费error日志的消费者
     * @author leeSmall
     * @date 2018年9月15日
     *
     */
    public class ConsumerError {
    
        //定义交换器的名字
        private static final String EXCHANGE_NAME = "direct_logs";
        // private static final String EXCHANGE_NAME = "fanout_logs_1";
    
        public static void main(String[] argv) throws IOException, TimeoutException {
            //1.创建一个连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置要连接的RabbitMQ服务器的地址
            factory.setHost("127.0.0.1");
            
            //2.通过连接工厂创建一个连接
            Connection connection = factory.newConnection();
            
            //3.通过连接创建一个信道 信道是用来传送数据的
            Channel channel = connection.createChannel();
            
            //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            
            //5.声明随机队列
            String queueName = channel.queueDeclare().getQueue();
            
            //6.声明一个只消费错误日志的路由键error
            String routingKey = "error";
            
            //7.队列通过路由键绑定到交换器上
            channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
            
            System.out.println("Waiting message.......");
    
            //8.设置一个监听器监听消费消息
            Consumer consumerB = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body,"UTF-8");
                    System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
                }
            };
         //9.自动确认:autoAck参数为true
            channel.basicConsume(queueName,true,consumerB);
        }
    
    }

    3. 启动消费者ConsumerError,再启动生产者DirectProducer,查看效果

    启动消费者ConsumerError:

    启动生产者DirectProducer:

     

    查看消费者ConsumerError现在的状况:

     

    可以看到消费者只消费了error级别的消息,这是因为在direct模式下,消费者只定义了路由键routingKey = "error";

     4. 在工程OriginalRabbitMQConsumer新建一个direct的消费所有日志的消费者

    package study.demo.normal;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 
     * @Description: 交换器是direct 消费所有日志的消费者
     * @author leeSmall
     * @date 2018年9月15日
     *
     */
    public class ConsumerAll {
        //定义交换器的名字
        private static final String EXCHANGE_NAME = "direct_logs";
        // private static final String EXCHANGE_NAME = "fanout_logs_1";
    
        public static void main(String[] argv) throws IOException,
                InterruptedException, TimeoutException {
    
            //1.创建一个连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置要连接的RabbitMQ服务器的地址
            factory.setHost("127.0.0.1");
            
            //2.通过连接工厂创建一个连接
            Connection connection = factory.newConnection();
            
            //3.通过连接创建一个信道 信道是用来传送数据的
            Channel channel = connection.createChannel();
            
            //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            
            
            //5.声明随机队列
            String queueName = channel.queueDeclare().getQueue();
            
            //6.定义一组路由键消费所有日志
            String[]routingKeys = {"error","info","warning"};
            
            //7.队列通过路由键绑定到交换器上
            for(String routingKey:routingKeys){
                //队列和交换器的绑定
                channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
            }
            System.out.println("Waiting message.......");
    
            //8.设置一个监听器监听消费消息
            Consumer consumerA = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body,"UTF-8");
                    System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
                }
            };
             //9.自动确认:autoAck参数为true
             channel.basicConsume(queueName,true,consumerA);
    
        }
    
      }

    5. 启动消费者ConsumerAll,再启动生产者DirectProducer,查看效果

    启动消费者ConsumerAll:

    启动生产者DirectProducer:

     查看消费者ConsumerAll的状态:

     可以看到消费者ConsumerAll消费了生产者DirectProducer产生的所有消息,这是因为在direct模式下,消费者定义了和生产者一样个数的路由键String[]routingKeys = {"error","info","warning"};

    示例2:交换器是fanout

    1. 在工程OriginalRabbitMQProducer新建一个交换器是fanout的生产者

    package study.demo.normal;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 
     * @Description: 交换器是fanout的生产者 可以理解为广播,会把所有消息投放到绑定到这个交换器上的队列上
     * @author leeSmall
     * @date 2018年9月15日
     *
     */
    public class FanoutProducer {
    
        private final static String EXCHANGE_NAME = "fanout_logs_1";
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //1.创建一个连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            
            //设置要连接的RabbitMQ服务器的地址
            factory.setHost("127.0.0.1");
            
            //设置用户名 这里使用缺省的
            //factory.setUsername(..);
            
            //设置连接断开  这里使用缺省的
            //factory.setPort();
            
            //设置虚拟主机 这里使用缺省的
            //factory.setVirtualHost();
    
    
            //2.通过连接工厂创建一个连接
            Connection connection = factory.newConnection();
    
            //3.通过连接创建一个信道 信道是用来传送数据的
            Channel channel = connection.createChannel();
    
            //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
    
            //定义一组路由键
            String[]routingKeys = {"error","info","warning"};
    
            //发布消息的交换器上
            for(int i=0;i<3;i++){
                //路由键
                String routingKey = routingKeys[i];
                
                //要发送的消息
                String message = "Hello world_"+(i+1);
    
                /**
                 * 发送消息到交换器上
                 * 参数1:交换器的名字
                 * 参数2:路由键
                 * 参数3:BasicProperties
                 * 参数4:要发送的消息
                 */
                channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());
                System.out.println("Sent "+routingKey+":"+message);
    
            }
    
            channel.close();
            connection.close();
    
        }
    
    }

    2. 在工程OriginalRabbitMQConsumer新建一个fanout的只消费error日志的消费者

    package study.demo.normal;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 
     * @Description: 交换器是fanout,只消费error日志的消费者
     * @author leeSmall
     * @date 2018年9月15日
     *
     */
    public class ConsumerError {
    
        //定义交换器的名字
        //private static final String EXCHANGE_NAME = "direct_logs";
         private static final String EXCHANGE_NAME = "fanout_logs_1";
    
        public static void main(String[] argv) throws IOException, TimeoutException {
            //1.创建一个连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置要连接的RabbitMQ服务器的地址
            factory.setHost("127.0.0.1");
            
            //2.通过连接工厂创建一个连接
            Connection connection = factory.newConnection();
            
            //3.通过连接创建一个信道 信道是用来传送数据的
            Channel channel = connection.createChannel();
            
            //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            
            //5.声明随机队列
            String queueName = channel.queueDeclare().getQueue();
            
            //6.声明一个只消费错误日志的路由键error
            String routingKey = "error";
            
            //7.队列通过路由键绑定到交换器上
            channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
            
            System.out.println("Waiting message.......");
    
            //8.设置一个监听器监听消费消息
            Consumer consumerB = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body,"UTF-8");
                    System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
                }
            };
          //9.自动确认:autoAck参数为true
         channel.basicConsume(queueName,true,consumerA);
    
    
        }
    
    }

    3. 启动消费者ConsumerError,再启动生产者DirectProducer,查看效果

    启动消费者ConsumerError:

    启动生产者FanoutProducer:

     

    查看消费者ConsumerError现在的状况:

     

    可以看到消费者消费了所有级别的消息,这是因为在fanout模式下,虽然消费者只定义了路由键routingKey = "error",但是因为fanut是广播模式,会把所有消息投放到绑定到这个交换器上的队列上

     4. 在工程OriginalRabbitMQConsumer新建一个fanout的消费所有日志的消费者

    package study.demo.normal;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 
     * @Description: 交换器是fanout 消费所有日志的消费者
     * @author leeSmall
     * @date 2018年9月15日
     *
     */
    public class ConsumerAll {
        //定义交换器的名字
        //private static final String EXCHANGE_NAME = "direct_logs";
        private static final String EXCHANGE_NAME = "fanout_logs_1";
    
        public static void main(String[] argv) throws IOException,
                InterruptedException, TimeoutException {
    
            //1.创建一个连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置要连接的RabbitMQ服务器的地址
            factory.setHost("127.0.0.1");
            
            //2.通过连接工厂创建一个连接
            Connection connection = factory.newConnection();
            
            //3.通过连接创建一个信道 信道是用来传送数据的
            Channel channel = connection.createChannel();
            
            //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            
            
            //5.声明随机队列
            String queueName = channel.queueDeclare().getQueue();
            
            //6.定义一组路由键消费所有日志
            String[]routingKeys = {"error","info","warning"};
            
            //7.队列通过路由键绑定到交换器上
            for(String routingKey:routingKeys){
                //队列和交换器的绑定
                channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
            }
            System.out.println("Waiting message.......");
    
            //8.设置一个监听器监听消费消息
            Consumer consumerA = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body,"UTF-8");
                    System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
                }
            };
           //9.自动确认:autoAck参数为true
           channel.basicConsume(queueName,true,consumerA);
    
    
    
        }
    }

    5. 启动消费者ConsumerAll,再启动生产者DirectProducer,查看效果

    启动消费者ConsumerAll:

    启动生产者FanoutProducer:

     查看消费者ConsumerAll的状态:

     可以看到消费者ConsumerAll消费了生产者DirectProducer产生的所有消息,这是因为在fanout模式下,消费者定义了和生产者一样个数的路由键String[]routingKeys = {"error","info","warning"};

    三、消费者(接收方)自行确认模式

    1. 在工程OriginalRabbitMQProducer新建一个消费者自行确认生产者

    package study.demo.consumerconfirm;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 
     * @Description: 消费者自行确认生产者
     * @author leeSmall
     * @date 2018年9月15日
     *
     */
    public class ConsumerConfirmProducer {
    
        //交换器
        private final static String EXCHANGE_NAME = "direct_cc_confirm_1";
        
        //路由键
        private final static String ROUTE_KEY = "error";
    
        public static void main(String[] args) throws IOException, TimeoutException,
                InterruptedException {
    
            //1.创建一个连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            
            //设置要连接的RabbitMQ服务器的地址
            factory.setHost("127.0.0.1");
            
            //设置用户名 这里使用缺省的
            //factory.setUsername(..);
            
            //设置连接断开  这里使用缺省的
            //factory.setPort();
            
            //设置虚拟主机 这里使用缺省的
            //factory.setVirtualHost();
    
    
            //2.通过连接工厂创建一个连接
            Connection connection = factory.newConnection();
    
            //3.通过连接创建一个信道 信道是用来传送数据的
            Channel channel = connection.createChannel();
    
            //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
            //发布消息的交换器上
            for(int i=0;i<10;i++){
                //要发送的消息
                String message = "Hello world_"+(i+1);
    
                /**
                 * 发送消息到交换器上
                 * 参数1:交换器的名字
                 * 参数2:路由键
                 * 参数3:BasicProperties
                 * 参数4:要发送的消息
                 */
                channel.basicPublish(EXCHANGE_NAME,ROUTE_KEY,null,message.getBytes());
                System.out.println("Sent "+ROUTE_KEY+":"+message);
    
            }
    
            channel.close();
            connection.close();
    
        }
    
    }

    2. 在工程OriginalRabbitMQConsumer新建一个消费者自行确认消费者

    package study.demo.consumerconfirm;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 
     * @Description: 消费者自行确认消费者
     * @author leeSmall
     * @date 2018年9月15日
     *
     */
    public class ClientConsumerAck {
    
        private static final String EXCHANGE_NAME = "direct_cc_confirm_1";
    
        public static void main(String[] argv) throws IOException, TimeoutException {
            //1.创建一个连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置要连接的RabbitMQ服务器的地址
            factory.setHost("127.0.0.1");
            
            //2.通过连接工厂创建一个连接
            Connection connection = factory.newConnection();
            
            //3.通过连接创建一个信道 信道是用来传送数据的
            Channel channel = connection.createChannel();
            
            //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
            //5.声明队列
            String queueName = "consumer_confirm";
            channel.queueDeclare(queueName,false,false,
                    false,null);
    
            //6.声明一个只消费错误日志的路由键error
            String routingKey = "error";
            
            //7.队列通过路由键绑定到交换器上
            channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
            System.out.println("Waiting message.......");
    
            //8.设置一个监听器监听消费消息
            Consumer consumerB = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body,"UTF-8");
                    System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
                    //消费者自行确认
                    this.getChannel().basicAck(envelope.getDeliveryTag(),false);
                }
            };
    
            //9.消费者自行确认:autoAck参数为false
            channel.basicConsume(queueName,false,consumerB);
        }
    
    } 

    3. 在工程OriginalRabbitMQConsumer新建一个消费者自行确认休眠不回复ack的消费者

    package study.demo.consumerconfirm;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 
     * @Description: 消费者自行确认休眠不回复ack的消费者
     * @author leeSmall
     * @date 2018年9月15日
     *
     */
    public class ClientConsumerSlowAck {
    
        private static final String EXCHANGE_NAME = "direct_cc_confirm_1";
    
        public static void main(String[] argv) throws IOException, TimeoutException {
            //1.创建一个连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置要连接的RabbitMQ服务器的地址
            factory.setHost("127.0.0.1");
            
            //2.通过连接工厂创建一个连接
            Connection connection = factory.newConnection();
            
            //3.通过连接创建一个信道 信道是用来传送数据的
            Channel channel = connection.createChannel();
            
            //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
            //5.声明队列
            String queueName = "consumer_confirm";
            channel.queueDeclare(queueName,false,false,
                    false,null);
    
            //6.声明一个只消费错误日志的路由键error
            String routingKey = "error";
            
            //7.队列通过路由键绑定到交换器上
            channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
            System.out.println("Waiting message.......");
    
            //8.设置一个监听器监听消费消息
            Consumer consumerB = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    try {
                        //消费者自行确认时不回复ack,一直休眠
                        Thread.sleep(20000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    String message = new String(body,"UTF-8");
                    System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
                    //this.getChannel().basicAck(envelope.getDeliveryTag(),false);
                }
            };
    
            //9.消费者自行确认:autoAck参数为false
            channel.basicConsume(queueName,false,consumerB);
        }
    
    } 

    4. 分别启动消费者自行确认消费者ClientConsumerAck和消费者自行确认休眠不回复ack的消费者ClientConsumerSlowAck,再启动消费者自行确认生产者ConsumerConfirmProducer查看状态

    启动消费者自行确认消费者ClientConsumerAck:

    启动消费者自行确认休眠不回复ack的消费者ClientConsumerSlowAck

     

    启动消费者自行确认生产者ConsumerConfirmProducer

     

    查看消费者自行确认消费者ClientConsumerAck的状态:

    查看消费者自行确认休眠不回复ack的消费者ClientConsumerSlowAck的状态:

    查看RabbitMQ服务器上的队列情况:

    可以看到队列consumer_confirm里面有5条消息未消费,这是因为消费者自行确认休眠不回复ack的消费者ClientConsumerSlowAck收到了这5条消息,但是没有向RabbitMQ服务器发送确认消息,RabbitMQMQ认为这5条消息还没有被消费就一直存在队列里面

    下面停掉ClientConsumerSlowAck,查看ClientConsumerAck和RabbitMQ服务器里面队列consumer_confirm的状态

     

    可以看到停掉ClientConsumerSlowAck以后,之前的5条消息被ClientConsumerAck消费了

    5. 在工程OriginalRabbitMQConsumer 新建一个消费者自行确认拒绝消息的消费者

    package study.demo.consumerconfirm;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 
     * @Description: 消费者自行确认拒绝消息的消费者
     * @author leeSmall
     * @date 2018年9月15日
     *
     */
    public class ClientConsumerReject {
    
        private static final String EXCHANGE_NAME = "direct_cc_confirm_1";
    
        public static void main(String[] argv) throws IOException, TimeoutException {
            //1.创建一个连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置要连接的RabbitMQ服务器的地址
            factory.setHost("127.0.0.1");
            
            //2.通过连接工厂创建一个连接
            Connection connection = factory.newConnection();
            
            //3.通过连接创建一个信道 信道是用来传送数据的
            Channel channel = connection.createChannel();
            
            //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
            //5.声明队列
            String queueName = "consumer_confirm";
            channel.queueDeclare(queueName,false,false,
                    false,null);
    
            //6.声明一个只消费错误日志的路由键error
            String routingKey = "error";
            
            //7.队列通过路由键绑定到交换器上
            channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
            System.out.println("Waiting message.......");
    
            //8.设置一个监听器监听消费消息
            Consumer consumerB = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    //消费者自行拒绝消息 参数requeue=true,让RabbitMQ服务器重新分发消息,requeue=false让RabbitMQ服务器移除消息
                    this.getChannel().basicReject(envelope.getDeliveryTag(),true);
                    System.out.println("Reject:"+envelope.getRoutingKey()
                            +":"+new String(body,"UTF-8"));
                }
            };
    
            //9.消费者自行确认:autoAck参数为false
            channel.basicConsume(queueName,false,consumerB);
        }
    
    } 

     6. 分别启动消费者自行确认消费者ClientConsumerAck和消费者自行确认拒绝消息的消费者ClientConsumerReject,再启动消费者自行确认生产者ConsumerConfirmProducer查看状态

     启动消费者自行确认消费者ClientConsumerAck:

     启动消费者自行确认拒绝消息的消费者ClientConsumerReject:

     

    启动消费者自行确认生产者ConsumerConfirmProducer

     

     查看消费者自行确认消费者ClientConsumerAck的状态:

    查看消费者自行确认拒绝消息的消费者ClientConsumerReject的状态:

    可以看到消息都被ClientConsumerAck消费了,这是因为消费者ClientConsumerReject拒绝了所有消息,这里要注意

    this.getChannel().basicReject(envelope.getDeliveryTag(),true);

    这段代码的basicReject的第二个参数requeue,参数requeue=true,让RabbitMQ服务器重新分发消息,requeue=false让RabbitMQ服务器移除消息

     requeue=false时,RabbitMQ服务器会删掉被ClientConsumerReject拒绝的消息,消费者ClientConsumerAck就不能消费所有消息了

    四、生产者(发送方)确认模式

     为什么要有个发送方确认模式?

    生产者不知道消息是否真正到达RabbitMq,也就是说发布操作不返回任何消息给生产者。
    AMQP协议层面为我们提供的事务机制解决了这个问题,但是事务机制本身也会带来问题:
    1)严重的性能问题
    2)使生产者应用程序产生同步
    RabbitMQ团队为我们拿出了更好的方案,即采用发送方确认模式,该模式比事务更轻量,性能影响几乎可以忽略不计。

    1. 在OriginalRabbitMQProducer工程新建一个生产者(发送方)确认同步模式的类

    package study.demo.producerconfirm;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * 
     * @Description: 生产者(发送方)确认同步模式
     * @author leeSmall
     * @date 2018年9月16日
     *
     */
    public class ProducerConfirm {
    
        private final static String EXCHANGE_NAME = "producer_confirm";
        private final static String ROUTE_KEY = "error";
    
        public static void main(String[] args) throws IOException, TimeoutException,
                InterruptedException {
            //1.创建一个连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            
            //设置要连接的RabbitMQ服务器的地址
            factory.setHost("127.0.0.1");
            
            //设置用户名 这里使用缺省的
            //factory.setUsername(..);
            
            //设置连接断开  这里使用缺省的
            //factory.setPort();
            
            //设置虚拟主机 这里使用缺省的
            //factory.setVirtualHost();
    
    
            //2.通过连接工厂创建一个连接
            Connection connection = factory.newConnection();
    
            //3.通过连接创建一个信道 信道是用来传送数据的
            Channel channel = connection.createChannel();
            //将信道设置为发送方确认
            channel.confirmSelect();
    
            //发布消息的交换器上
            for(int i=0;i<2;i++){
                String msg = "Hello "+(i+1);
                channel.basicPublish(EXCHANGE_NAME,ROUTE_KEY,null,msg.getBytes());
                //等待RabbitMQ返回消息确认消息已送达RabbitMQ服务器
                if (channel.waitForConfirms()){
                    System.out.println("发送方同步确认: "+ROUTE_KEY+":"+msg);
                }
            }
    
    
            // 关闭频道和连接
            channel.close();
            connection.close();
        }
    
    }

    2. 在OriginalRabbitMQProducer工程新建一个生产者(发送方)确认异步模式的类

    package study.demo.producerconfirm;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    
    
    /**
     * 
     * @Description: 生产者(发送方)确认异步模式
     * @author leeSmall
     * @date 2018年9月16日
     *
     */
    public class ProducerConfirmAsync {
    
        private final static String EXCHANGE_NAME = "producer_confirm";
    
        public static void main(String[] args) throws IOException, TimeoutException,
                InterruptedException {
            //1.创建一个连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            
            //设置要连接的RabbitMQ服务器的地址
            factory.setHost("127.0.0.1");
            
            //设置用户名 这里使用缺省的
            //factory.setUsername(..);
            
            //设置连接断开  这里使用缺省的
            //factory.setPort();
            
            //设置虚拟主机 这里使用缺省的
            //factory.setVirtualHost();
    
    
            //2.通过连接工厂创建一个连接
            Connection connection = factory.newConnection();
    
            //3.通过连接创建一个信道 信道是用来传送数据的
            Channel channel = connection.createChannel();
    
            //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
            //将信道设置为发送方确认
            channel.confirmSelect();
    
            //信道被关闭监听器 用于RabbitMQ服务器断线重连
            //channel.addShutdownListener();
    
            /**
             * 生产者异步确认监听
             * 参数deliveryTag代表了当前channel唯一的投递
             * 参数multiple:false
             * 
             */
            channel.addConfirmListener(new ConfirmListener() {
                //RabbitMQ服务器确认收到消息
                public void handleAck(long deliveryTag, boolean multiple)
                        throws IOException {
                    System.out.println("RabbitMQ服务器确认收到消息Ack deliveryTag="+deliveryTag
                            +"multiple:"+multiple);
                }
    
                //RabbitMQ服务器由于自己内部出现故障没有收到消息
                public void handleNack(long deliveryTag, boolean multiple)
                        throws IOException {
                    System.out.println("RabbitMQ服务没有收到消息Ack deliveryTag="+deliveryTag
                            +"multiple:"+multiple);
                }
            });
    
            //生产者异步返回监听 这里和发布消息时的mandatory参数有关
            //参数mandatory:mandatory=true,投递消息时无法找到一个合适的队列,把消息返回给生产者,mandatory=false 丢弃消息(缺省)
            channel.addReturnListener(new ReturnListener() {
                public void handleReturn(int replyCode, String replyText,
                                         String exchange, String routingKey,
                                         AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String msg = new String(body);
                    System.out.println("replyText:"+replyText);
                    System.out.println("exchange:"+exchange);
                    System.out.println("routingKey:"+routingKey);
                    System.out.println("msg:"+msg);
                }
            });
    
    
            //声明一组路由键
            String[] routingKeys={"error","info","warning"};
            //发送消息到交换器上
            for(int i=0;i<3;i++){
                String routingKey = routingKeys[i%3];
                // 发送的消息
                String message = "Hello World_"+(i+1)+("_"+System.currentTimeMillis());
                
                //通过路由键把消息发送到交换器上
                //参数mandatory: mandatory=true,投递消息时无法找到一个合适的队列,把消息返回给生产者,
                //              mandatory=false 丢弃消息(缺省)
                channel.basicPublish(EXCHANGE_NAME, routingKey, false,
                        null, message.getBytes());
                System.out.println("----------------------------------------------------");
                System.out.println(" Sent Message: [" + routingKey +"]:'"+ message + "'");
                //sleep一下让程序不快速结束 可以看到RabbitMQ服务器的响应
                Thread.sleep(1000);
            }
    
            // 关闭信道和连接
            channel.close();
            connection.close();
        }
    
    
    }

    3. 在OriginalRabbitMQConsumer工程新建一个发送方确认消费者

    package study.demo.producerconfirm;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 
     * @Description: 发送方确认消费者
     * @author leeSmall
     * @date 2018年9月16日
     *
     */
    public class ProducerConfirmConsumer {
    
        private static final String EXCHANGE_NAME = "producer_confirm";
    
        public static void main(String[] argv) throws IOException, TimeoutException {
            //1.创建一个连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置要连接的RabbitMQ服务器的地址
            factory.setHost("127.0.0.1");
            
            //2.通过连接工厂创建一个连接
            Connection connection = factory.newConnection();
            
            //3.通过连接创建一个信道 信道是用来传送数据的
            Channel channel = connection.createChannel();
            
            //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
            //5.声明队列
            String queueName = "producer_confirm";
            channel.queueDeclare(queueName,false,false,
                    false,null);
    
            //6.声明一个只消费错误日志的路由键error
            String routingKey = "error";
            
            //7.队列通过路由键绑定到交换器上
            channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
            System.out.println("Waiting message.......");
    
            // 8.创建队列消费者 设置一个监听器监听消费消息 
            final Consumer consumerB = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println( "Received ["+ envelope.getRoutingKey() + "] "+message);
                }
            };
            //9.消费者自动确认:autoAck参数为true
            channel.basicConsume(queueName, true, consumerB);
        }
    
    }

    4. 启动发送方确认消费者ProducerConfirmConsumer,再分别启动生产者(发送方)确认同步模式的类ProducerConfirm和生产者(发送方)确认异步模式ProducerConfirmAsync

    启动发送方确认消费者ProducerConfirmConsumer:

     启动生产者(发送方)确认同步模式的类ProducerConfirm:

     

    查看发送方确认消费者ProducerConfirmConsumer的状态:

     启动生产者(发送方)确认异步模式ProducerConfirmAsync:

     

    查看发送方确认消费者ProducerConfirmConsumer的状态:

     

    示例代码获取地址

  • 相关阅读:
    hdu7047 /2021“MINIEYE杯”中国大学生算法设计超级联赛(7) 1004 Link with Balls
    hdu7115 Vertex Deletion
    CF1552B B. Running for Gold
    hdu7055 /2021“MINIEYE杯”中国大学生算法设计超级联赛(7) 1012 Yiwen with Sqc
    hdu7050 /2021“MINIEYE杯”中国大学生算法设计超级联赛(7) 1007 Link with Limit
    CF 1560E E. Polycarp and String Transformation(思维)
    CF 1542C C. Strange Function
    STL 算法 std::distance
    STL 算法 std::accumulate
    std::get<C++11多线程库~线程管理>(08):转移线程所有权(2)
  • 原文地址:https://www.cnblogs.com/leeSmall/p/9653219.html
Copyright © 2011-2022 走看看