zoukankan      html  css  js  c++  java
  • 【RabbitMQ】RabbitMQ在Windows的安装和简单的使用

    版本说明

    使用当前版本:3.5.4

    安装与启动

    在官网上下载其Server二进制安装包,在Windows上的安装时简单的,与一般软件没什么区别。
    安装前会提示你,还需要安装Erlang,并打开下载页面。把他们都下载安装就ok了。(当然也可先行下载安装)
    安装完,服务默认是启动的。
    Erlang,应该是一个在并发编程方面很厉害的语言吧。

    后期可通过开始菜单启动。

    简单的Java客户端连接

    编码中有些配置需要特别注意配置,比如:

    • 选择什么交换器,各种交换器的分发策略不一样。
    • 是否自动确认消息。如果RabbitMQ服务器收到该消息的确认消息,会认为该消息已经处理OK了,会把它从队列中删除。
    • 队列是否持久、消息是否持久,就是队列和消息在RabbitMQ服务器重启时是否恢复。
    • 队列是否自动删除,就是队列在无监听的情况下是否自动删除。

    引入的客户端包:

    <dependency>
        <groupId>com.rabbitmq</groupId>
    	<artifactId>amqp-client</artifactId>
    	<version>3.5.6</version>
    </dependency>
    

    消费者:

    package com.nicchagil.rabbit.No001MyFirstDemo;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    public class Customer {
    
        private final static String QUEUE_NAME = "hello world";
    
        public static void main(String[] argv) throws java.io.IOException,
                java.lang.InterruptedException, TimeoutException {
    
            /* 创建连接工厂 */
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            /* 创建连接 */
            Connection connection = factory.newConnection();
            /* 创建信道 */
            Channel channel = connection.createChannel();
    
            // 声明一个队列:名称、持久性的(重启仍存在此队列)、非私有的、非自动删除的
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            System.out.println("Waiting for messages.");
    
            /* 定义消费者 */
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                        AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("Received the message -> " + message);
                }
            };
            
            // 将消费者绑定到队列,并设置自动确认消息(即无需显示确认,如何设置请慎重考虑)
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }
    
    

    生产者:

    package com.nicchagil.rabbit.No001MyFirstDemo;
    
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Producer {
    
        private final static String QUEUE_NAME = "hello world";
    
        public static void main(String[] argv) throws java.io.IOException, TimeoutException {
            
            Connection connection = null;
            Channel channel = null;
            try {
            	/* 创建连接工厂 */
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("localhost");
                /* 创建连接 */
                connection = factory.newConnection();
                /* 创建信道 */
                channel = connection.createChannel();
    
                // 声明一个队列:名称、持久性的(重启仍存在此队列)、非私有的、非自动删除的
                channel.queueDeclare(QUEUE_NAME, true, false, false, null);
                
                String message = "hello world..."; // 需发送的信息
                
                /* 发送消息,使用默认的direct交换器 */
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println("Send message -> " + message);
                
            } finally {
                /* 关闭连接、通道 */
                channel.close();
                connection.close();
                System.out.println("Closed the channel and conn.");
            }
    
        }
    
    }
    
    

    如无意外,每运行一次生产者(发送一次消息),消费者都会执行一次业务(接收到一次消息)。
    执行了两次生产者后,日志如下:

    Waiting for messages.
    Received the message -> hello world...
    Received the message -> hello world...
    

    注:

    • MQ服务器收到确认(ack)信息后,会在队列中删除该消息。如果未收到确认消息,则会继续等待(不存在超时的概念),它直到执行该消息的消费者挂了,才把此遗留的消息重新分发给其它的消费者。

    发布与订阅(fanout交换器)

    发布与订阅,类似于广播。
    下面代码演示:两个消费者创建临时队列后绑定一个fanout类型的交换器,然后生产者往该交换器发送消息,消息被广播到两个绑定的队列中,队列将消息发送给各自的消费者,两个消费者接收到消息完成任务。

    消费者A:

    package com.nicchagil.rabbit.No003Fadout;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    public class FanoutCustomerA {
    
        public static void main(String[] argv) throws java.io.IOException,
                java.lang.InterruptedException, TimeoutException {
    
            /* 创建连接工厂 */
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            /* 创建连接 */
            Connection connection = factory.newConnection();
            /* 创建信道 */
            Channel channel = connection.createChannel();
    
            // 创建一个临时的、私有的、自动删除、随机名称的临时队列
            String queueName = channel.queueDeclare().getQueue();
            System.out.println("queue : " + queueName);
            channel.queueBind(queueName, "amq.fanout", "");
            System.out.println(FanoutCustomerA.class.getName() + ", waiting for messages.");
    
            /* 定义消费者 */
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                        AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("Received the message -> " + message);
                }
            };
            
            // 开始消费(设置自动确认消息)
            channel.basicConsume("", true, consumer);
        }
    }
    
    

    消费者B:

    package com.nicchagil.rabbit.No003Fadout;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    public class FanoutCustomerB {
    
        public static void main(String[] argv) throws java.io.IOException,
                java.lang.InterruptedException, TimeoutException {
    
            /* 创建连接工厂 */
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            /* 创建连接 */
            Connection connection = factory.newConnection();
            /* 创建信道 */
            Channel channel = connection.createChannel();
    
            // 创建一个临时的、私有的、自动删除、随机名称的临时队列
            String queueName = channel.queueDeclare().getQueue();
            System.out.println("queue : " + queueName);
            channel.queueBind(queueName, "amq.fanout", "");
            System.out.println(FanoutCustomerB.class.getName() + ", waiting for messages.");
    
            /* 定义消费者 */
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                        AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("Received the message -> " + message);
                }
            };
            
            // 开始消费(设置自动确认消息)
            channel.basicConsume("", true, consumer);
        }
    }
    
    
    

    生产者:

    package com.nicchagil.rabbit.No003Fadout;
    
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class FanoutProducer {
    
        public static void main(String[] argv) throws java.io.IOException, TimeoutException {
            
            Connection connection = null;
            Channel channel = null;
            try {
            	/* 创建连接工厂 */
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("localhost");
                /* 创建连接 */
                connection = factory.newConnection();
                /* 创建信道 */
                channel = connection.createChannel();
    
                String message = "hello world..."; // 需发送的信息
                
                /* 发送消息,使用默认的fanout交换器 */
                channel.basicPublish("amq.fanout", "", null, message.getBytes());
                System.out.println("Send message -> " + message);
                
            } finally {
                /* 关闭连接、通道 */
                channel.close();
                connection.close();
                System.out.println("Closed the channel and conn.");
            }
    
        }
    
    }
    
    

    先将两个消费者跑起来,然后运行生产者发送一条消息。
    正常来说,消费者A、消费者B都收到消息并执行。

    消费者A的日志:

    queue : amq.gen-F3EYfr68AHvfZTIJUcN_Ug
    com.nicchagil.rabbit.No003Fadout.FanoutCustomerA, waiting for messages.
    Received the message -> hello world...
    
    

    消费者B的日志:

    queue : amq.gen-AV_XDQtB-LFPK8bDy31PTw
    com.nicchagil.rabbit.No003Fadout.FanoutCustomerB, waiting for messages.
    Received the message -> hello world...
    
    

    管理控制台

    我们可以通过以下命令启用管理控制台:

    rabbitmq-plugins enable rabbitmq_management
    

    然后由此地址(http://localhost:15672)进入,默认端口是15672,默认账户是guest/guest。
    进入后,可以看到Overview、Connections、Channels、Exchanges、Queues、Admin几个页签,此控制台的功能各种强大,不仅可以查看信息,还可以增、删信息,非常棒。

    消费者的异常处理器

    如果消费者方法体中发生异常没被捕捉并处理,如果使用默认的异常处理器,消费者的信道会关闭,不继续执行任务。

    比如以下例子,遇到空字符串则抛出运行时异常:

    package com.nicchagil.rabbit.No002消费者出现异常发生堵塞怎么办;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    public class Customer {
    
        private final static String QUEUE_NAME = "hello world";
    
        public static void main(String[] argv) throws java.io.IOException,
                java.lang.InterruptedException, TimeoutException {
    
            /* 创建连接工厂、连接、通道 */
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            // 声明消息队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println("Waiting for messages.");
    
            /* 定义消费者 */
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                        AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    
                    if (message == null || message.length() == 0) {
                        throw new RuntimeException("The input str is null or empty...");
                    }
                    
                    System.out.println("Received the message -> " + message);
                }
            };
            
            // 将消费者绑定到队列
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }
    

    然后,我们用这个消费者监听一个队列,且此队列只有这个消费者,用于测试队列是否堵塞,也就是这个消费者是否不继续消费。

    先用生产者发送“hello world...”(正常参数),再发送“”(异常参数),最后发送“hello world...”(正常参数)。
    可见如下日志,消费者发生异常后,没有响应第三个“hello world...”的消息,也可进入控制台,会发现此消息为Ready状态,等待消费。
    原因在于,默认的异常处理器为DefaultExceptionHandler,其继承StrictExceptionHandler,从源码看,遇到异常它会关闭信道。
    日志如下:

    Waiting for messages.
    Received the message -> hello world...
    com.rabbitmq.client.impl.DefaultExceptionHandler: Consumer com.nicchagil.rabbit.No002消费者出现异常发生堵塞怎么办.Customer$1@630bd3f1 (amq.ctag-QzWI1jxh4h23rOFJM63cBA) method handleDelivery for channel AMQChannel(amqp://guest@127.0.0.1:5672/,1) threw an exception for channel AMQChannel(amqp://guest@127.0.0.1:5672/,1):
    java.lang.RuntimeException: The input str is null or empty...
        at com.nicchagil.rabbit.No002消费者出现异常发生堵塞怎么办.Customer$1.handleDelivery(Customer.java:40)
        at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:144)
        at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:99)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
    

    遇到异常,如果需使用别的处理方式,可以设置自定义的异常处理器。
    以下的异常处理器,只是Demo。各方法体中只打印相关信息供查看:

    package com.nicchagil.rabbit.No002消费者出现异常发生堵塞怎么办;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.ExceptionHandler;
    import com.rabbitmq.client.TopologyRecoveryException;
    
    public class MyExceptionHandler implements ExceptionHandler {
    
        public void handleUnexpectedConnectionDriverException(Connection conn,
                Throwable exception) {
            System.out.println("MyExceptionHandler.handleUnexpectedConnectionDriverException");
        }
    
        public void handleReturnListenerException(Channel channel,
                Throwable exception) {
            System.out.println("MyExceptionHandler.handleReturnListenerException");
        }
    
        public void handleFlowListenerException(Channel channel, Throwable exception) {
            System.out.println("MyExceptionHandler.handleFlowListenerException");
        }
    
        public void handleConfirmListenerException(Channel channel,
                Throwable exception) {
            System.out.println("MyExceptionHandler.handleConfirmListenerException");
        }
    
        public void handleBlockedListenerException(Connection connection,
                Throwable exception) {
            System.out.println("MyExceptionHandler.handleBlockedListenerException");
        }
    
        public void handleConsumerException(Channel channel, Throwable exception,
                Consumer consumer, String consumerTag, String methodName) {
            // 正常渠道应该有专业的LOG框架打印,此处简单处理
            exception.printStackTrace();
            System.out.println("MyExceptionHandler.handleConsumerException");
        }
    
        public void handleConnectionRecoveryException(Connection conn,
                Throwable exception) {
            System.out.println("MyExceptionHandler.handleConnectionRecoveryException");
        }
    
        public void handleChannelRecoveryException(Channel ch, Throwable exception) {
            System.out.println("MyExceptionHandler.handleChannelRecoveryException");
        }
    
        public void handleTopologyRecoveryException(Connection conn, Channel ch,
                TopologyRecoveryException exception) {
            System.out.println("MyExceptionHandler.handleTopologyRecoveryException");
        }
    
    }
    

    设置自定义的异常处理器:

    factory.setExceptionHandler(new MyExceptionHandler());
    

    像上述那样,先传递“hello world...”,再传递“”(空字符串),最后传递“hello world...”。观察如下日志,可见发生异常后,消费者正常响应消息。

    Waiting for messages.
    Received the message -> hello world...
    java.lang.RuntimeException: The input str is null or empty...
        at com.nicchagil.rabbit.No002消费者出现异常发生堵塞怎么办.Customer$1.handleDelivery(Customer.java:41)
        at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:144)
        at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:99)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    MyExceptionHandler.handleConsumerException
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
    Received the message -> hello world...
    

    具体应该根据什么策略进行异常处理,这是个是值得深思的问题,与业务的性质有关。什么情况下消费者应不继续响应请求,什么情况下消费者应继续相应,这个在于业务的性质而定。

    参考的优秀文章

  • 相关阅读:
    20160205
    20151120
    20151023
    20151023
    20140207
    yum工具介绍
    Linux程序包管理
    Linux任务计划、周期性任务执行
    10 压缩和解压缩工具和bash脚本编程
    9 btrfs文件系统
  • 原文地址:https://www.cnblogs.com/nick-huang/p/5791505.html
Copyright © 2011-2022 走看看