zoukankan      html  css  js  c++  java
  • RabbitMQ(二)

      在学习RabbitMQ之前,我们先简单了解几个概念。

      RabbitMQ是什么:

        RabbitMQ 是一个消息代理。主要的原理就是通过接受和转发消息。

          RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,消息中间件主要用于组件之间的解耦。服务器端用Erlang语言编写,

        支持多种客户端,如:Python、Ruby、.NET、Java、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。

      RabbitMQ优势是什么:

        1、高并发  2、负载均衡   3、消息持久  4、耦合低   5、响应速度快

      几个术语介绍:

        Connection:是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。

        ConnectionFactory:Connection的制造工厂。

        Channel:是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、

               定义 Exchange、绑定Queue与Exchange、发布消息等。

        生产者:生产过程就像发送过程,发送消息的程序就是一个生产者,我们使用“P”来描述它。

               

        消费者:消费过程与接收相似,一个消费者通常是一个等着接受消息的程序,我们使用"C"来描述。

            

        Queue(队列):是RabbitMQ的内部对象,用于存储消息,用下图表示

            

              生成者生产消息,发送消息到队列,消费者可以从队列获取消息并消费。

            

              多个消费者订阅同一个队列,消息平摊:

                            

        Exchange(交换机):用下图表示  

           

          实际情况:生产者投递消息永远只会先经过交换机,交换机根据交换机类型或规则路由到队列。

            

          如果没有路由匹配到队列,则消息会丢失。

       Routing key:用于绑定交换机与队列间的key。

          生产者发送消息给交换机时,一般都会指定routing key,用于绑定交换机与队列。 

           这还跟交换机类型(Exchange Type)有关:四种类型,fanout,direct,topic,headers

         Binding:交换机与队列绑定,指向消息流向

            

      使用流程:

         1客户端连接到消息队列服务器,打开一个channel。

           2客户端声明一个exchange,并设置相关属性。

               3客户端声明一个queue,并设置相关属性。

               4客户端使用routing key,在exchange和queue之间建立好绑定关系。

             5客户端投递消息到exchange。exchange接收到消息后,就根据消息的key和已经设置的binding,

           进行消息路由,将消息投递到一个或多个队列里。

        

      下面我们就正式进入RabbitMQ的学习了。我们将带着下面的问题进行RabbitMQ系列的学习:

        1:如果消费者连接中断,这期间我们应该怎么办

        2:如何做到负载均衡

        3:如何有效的将数据发送到相关的接收者?就是怎么样过滤

        4:如何保证消费者收到完整正确的数据

        5:如何让优先级高的接收者先收到数据

    项目开始

      1:创建一个Maven项目,然后引入jar包

     <dependency>
         <groupId>com.rabbitmq</groupId>
         <artifactId>amqp-client</artifactId>
         <version>3.6.5</version>
    </dependency>

       2:创建一个消息生产者类,Producer

    package com.mq;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * 
     * <p>消息生产者</p>
     */
    public class Producer {
        public final static String QUEUE_NAME = "rabbitMQ.test";
        
        public static void main(String[] args) throws IOException, TimeoutException{
            //创建RabbitMQ的连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            
            //设置RabbitMQ相关信息
            factory.setHost("localhost");//创建一个新连接
            Connection connection = factory.newConnection();
            //创建一个通道
            Channel channel = connection.createChannel();
            
            //创建一个队列
            //第一个参数:表示队列名称
            //第二个参数:是否持久化,true表示是,队列将在服务器重启时生存
            //第三个参数:是否是独占队列,创建者可以使用的私有队列,断开后自动删除
            //第四个参数:当所有消费者客户端连接断开时是否自动删除队列
            //第五个参数:队列的其他参数
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            
            String message = "Hello RabbitMQ";
            //发送消息到队列中
            //第一个参数:交换机的名称
            //第二个参数:队列映射的路由key
            //第三个参数:消息的其他属性
            //第四个参数:发送信息的主体
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println("Producer Send:"+message);
            
            //关闭通知和连接
            channel.close();
            connection.close();
        }
    }

      3:创建一个Customer消费者类

    package com.mq;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.ConsumerCancelledException;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.QueueingConsumer.Delivery;
    import com.rabbitmq.client.ShutdownSignalException;
    
    public class Customer {
         private final static String QUEUE_NAME = "rabbitMQ.test";
         
         public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
            //创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
             
            //设置RabbitMQ的地址
            factory.setHost("localhost");
            //创建一个新连接
            Connection connection = factory.newConnection();
            //创建一个通道
            Channel channel = connection.createChannel();
            
            //声明要关注的队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println("Customer Waiting Received message");
            
            /*
            //接收方式一:
                //DefaultCoustomer类实现了Consumer接口,通过传入一个频道
                //告诉服务器我们需要哪个频道的消息,如果频道到有消息,就会执行回调函数handleDeliver
                //envelope主要存放生产者相关的信息,比如交换机,路由key等,body是消息实体
                Consumer consumer = new DefaultConsumer(channel){
                    
                    @Override
                    public void handleDelivery(String consumerTag,
                                                Envelope envelope,
                                                AMQP.BasicProperties properties,
                                                byte[] body) throws UnsupportedEncodingException{
                        String message = new String(body,"UTF-8");
                        System.out.println("Customer Received:" + message);
                        
                    }
                };
                
                //自动回复队列应答  ----RabbitMQ中的消息确认机制
                channel.basicConsume(QUEUE_NAME, true,consumer);
            */
            
            //接收方式二:
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //自动回复队列应答  ----RabbitMQ中的消息确认机制
            //注册一个新消息到达的回调,第二个参数表示ack,默认是false,表示字段回复,不是自动回复则需要basicAck来手动回复
            //显示的告诉MQ已经取得了消息,否则MQ会将该消息重新分配给另外一个绑定在该队列上的消费者
            channel.basicConsume(QUEUE_NAME, true,consumer);
            while(true){
                //线程阻塞,挂起知道队列传输消息到来
                Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("Customer Received2:" + message);
            }
        }
    }

      4:在测试之前必须记得先运行sbin下面的rabbitmq-server.bat。

      之后各自运行Producer和Customer,没有顺序之分,结果Producer和Customer控制台内容为:

      

      接收方式一:

      

      接收方式二:

      

      注:

        如果运行的时候保报错:com.rabbitmq.client.ShutdownSignalException  

                   或com.rabbitmq.client.PossibleAuthenticationFailureException

        你可以尝试一下几种操作:

          一: rabbitmq服务通道是持久通道,该queue 已经存在, 而且通道属性跟最近修改后的属性不一致

                 而导致无法更新queue。你可以在客户端中清除队列缓存并删除。可参看: 

                     https://www.oschina.net/question/190643_155714?sort=time

          二: 一些权限的配置,可参看

              http://www.linuxidc.com/Linux/2014-10/107917.htm

    注:由于本教程中rabbitmq是在本机安装,使用的是默认端口(5672)。 
    如果你的例子运行中的主机、端口不同,请进行必要设置,否则可能无法运行。

    作者:哀&RT
    出处:博客园哀&RT的技术博客--http://www.cnblogs.com/Tony-Anne/
    您的支持是对博主最大的鼓励,感谢您的认真阅读。
    本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
  • 相关阅读:
    什么样的代码称得上是好代码?
    九年程序人生 总结分享
    Docker入门 第一课 --.Net Core 使用Docker全程记录
    阿里云 Windows Server 2012 r2 部署asp.net mvc网站 平坑之旅
    Visual studio 2015 Community 安装过程中遇到问题的终极解决
    Activiti6.0 spring5 工作流引擎 java SSM流程审批 项目框架
    java 进销存 库存管理 销售报表 商户管理 springmvc SSM crm 项目
    Leetcode名企之路
    24. 两两交换链表中的节点
    21. 合并两个有序链表
  • 原文地址:https://www.cnblogs.com/Tony-Anne/p/6428775.html
Copyright © 2011-2022 走看看