zoukankan      html  css  js  c++  java
  • RabbitMQ 队列的基础使用及安装

    什么是RabbitMQ就是一个队列

    优点

    1.提高系统响应速度

    我们发送请求然后直接通过Rabbit获得响应不用看我们数据是否在数据库中查到了:如签票软件会你点击过后直接给你一个正在抢票的界面

    2.提高系统稳定性

    如果服务挂了不关紧要,如上面所述

    3.服务异步调用

    也如1.所述我们的响应和我们的反应没有了直接关系

    4.服务解耦

    差不多了

    5.保证顺序FIFO

    队列特征

    6.消除峰值

    把一次100行请求分为一次5个请求

    安装

    RabbitMQErlang语言开发

    RabbitMQ的下载地址:http://www.rabbitmq.com/download.html

    Erlang下载

    地址:http://erlang.org/download/otp

    安装可视化界面

    安装rabbitMQ的管理插件,方便在浏览器端管理RabbitMQ ,进入到Rabbitsbin目录,使用cmd执行命令: rabbitmq-plugins.bat enable rabbitmq_management  安装成功后重新RabbitMQ

    进入浏览器,输入:http://localhost:15672  ,初始账号和密码:guest/guest

    如果你的C盘下:如我的中文路径

    则需要修改参考如下

    解决方案:https://blog.csdn.net/leoma2012/article/details/97636859 

    工作原理

    Broker:消息队列服务进程,此进程包括两个部分:ExchangeQueue

    Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。

    Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。

    Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ

    Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

    1.1.1. 消息发布接收流程:

    1.发送消息

    1、生产者和Broker建立TCP连接。

    2、生产者和Broker建立通道。

    3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。

    4、Exchange将消息转发到指定的Queue(队列)

    2.接收消息

    1、消费者和Broker建立TCP连接

    2、消费者和Broker建立通道

    3、消费者监听指定的Queue(队列)

    4、当有消息到达QueueBroker默认将消息推送给消费者。

    5、消费者接收到消息。

     

     单模型

    发送端操作流程

    1)创建连接

    2)创建通道

    3)声明队列

    4)发送消息

    接收端

    1)创建连接

    2)创建通道

    3)声明队列

    4)监听队列

    5)接收消息

    6ack回复

    发送者

    public class Sender {
        public static final String QUERY_HELLO_WORD="hello_word";
        //创建连接对象
        //通过连接对象获得通道
        //通过通道创建队列
        //发送消息
        public static void main(String[] args) throws Exception {
    
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            //3.使用通道创建(声明)队列 ,参数如下:
            //String queue,:队列名字
            // boolean durable:持久化队列
            // boolean exclusive: 队列是否独占此连接
            // boolean autoDelete:用完之后自动删除队列
            // Map<String, Object> arguments :其他的参数
            channel.queueDeclare(QUERY_HELLO_WORD,true,false,true,null);
            /**
             * 消息发布方法
             *String exchange, :交换机名字  "" 使用默认
             *String routingKey, :队列的路由key(对列的名字)
             *AMQP.BasicProperties props,:其他属性
             *byte[] body :发送的消息的内容
             */
            String mes = "hellowrod";
            channel.basicPublish("",QUERY_HELLO_WORD,null,mes.getBytes());
            System.out.println("发送消息。。。。");
        }
    }
    View Code

    接收者

    public class Consutom {
        //创建连接对象
        //通过连接对象获得通道
        //接受消息
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            //定义消费方法
            DefaultConsumer consumer  = new DefaultConsumer(channel){
                /**
                 * 消费者接收消息调用此方法
                 * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
                 * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
                (收到消息失败后是否需要重新发送)
                 * @param properties
                 * @param body
                 */
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    System.out.println("收到消息。。。。。。。。。。。。");
                    System.out.println("consumerTagd的内容"+consumerTag);
                    System.out.println("getExchange的内容"+envelope.getExchange());
                    System.out.println("getDeliveryTag的内容"+envelope.getDeliveryTag());
                    System.out.println("getRoutingKey的内容"+envelope.getRoutingKey());
                    System.out.println("properties"+properties);
                    System.out.println("consumerTagd的内容"+new String(body));
                }
            };
            /**
             * 监听队列String queue, boolean autoAck,Consumer callback
             * 参数明细
             * 1、队列名称
             * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置
             为false则需要手动回复
             * 3、消费消息的方法,消费者接收到消息后调用此方法
             */
            channel.basicConsume(Sender.QUERY_HELLO_WORD,true,consumer);
        }
    }
    View Code

     就是两个消费者一个队列

    send

    package cn.jiedada._02_word_quary;
    
    
    import cn.jiedada.utils.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Sender {
        public static final String QUERY_WORK_QUARY="query_work_quary";
        //创建连接对象
        //通过连接对象获得通道
        //通过通道创建队列
        //发送消息
        public static void main(String[] args) throws Exception {
    
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            /**
             * 声明队列,如果Rabbit中没有此队列将自动创建
             * param1:队列名称
             * param2:是否持久化
             * param3:队列是否独占此连接
             * param4:队列不再使用时是否自动删除此队列
             * param5:队列参数
             */
            channel.queueDeclare(QUERY_WORK_QUARY,true,false,true,null);
            /**
             * 消息发布方法
             * param1:Exchange的名称,如果没有指定,则使用Default Exchange
             * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
             * param3:消息包含的属性
             * param4:消息体
             */
            String mes = "杰大大真的帅";
            channel.basicPublish("",QUERY_WORK_QUARY,null,mes.getBytes());
            System.out.println("发送消息。。。。");
        }
    }
    View Code

    接收者在复制一份就可以了

    package cn.jiedada._02_word_quary;
    
    import cn.jiedada.utils.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Consutom2 {
        //创建连接对象
        //通过连接对象获得通道
        //接受消息
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            final Channel channel = connection.createChannel();
            //定义消费方法
            DefaultConsumer consumer  = new DefaultConsumer(channel){
                /**
                 * 消费者接收消息调用此方法
                 * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
                 * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
                (收到消息失败后是否需要重新发送)
                 * @param properties
                 * @param body
                 */
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    System.out.println("consutem2+收到了");
                    System.out.println("consumerTagd的内容"+consumerTag);
                    System.out.println("getExchange的内容"+envelope.getExchange());
                    System.out.println("getDeliveryTag的内容"+envelope.getDeliveryTag());
                    System.out.println("getRoutingKey的内容"+envelope.getRoutingKey());
                    System.out.println("properties"+properties);
                    System.out.println("consumerTagd的内容"+new String(body));
                    System.out.println(".............................");
                    //手动签收消息
                    //long deliveryTag, :交货的ID
                    //boolean multiple:false,签收当前则一个消息
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            /**
             * 监听队列String queue, boolean autoAck,Consumer callback
             * 参数明细
             * 1、队列名称
             * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置
             为false则需要手动回复
             * 3、消费消息的方法,消费者接收到消息后调用此方法
             */
            channel.basicConsume(Sender.QUERY_WORK_QUARY,false,consumer);
        }
    }
    View Code

    订阅模型基本逻辑都这样

    这里我们在发送者中创建交换机

    然后接收者创建队列来接收

    只不过创建的交换机不一样罢了分为Faout,direct,topic三种

     send

    package cn.jiedada._03_fanout;
    
    
    import cn.jiedada.utils.ConnectionUtil;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Sender {
            public static final String NAME_EXCHANGE_FANOUT = "name_exchange_fanout";
            //1.使用工具创建连接对象
            //2.使用连接创建通道对象
            //3.使用通道创建交换机Fanout类型
            //4.准备消息内容
            //5.发送消息到MQ不指定队列
            public static void main(String[] args) throws Exception {
                //1.使用工具创建连接对象
                Connection connection = ConnectionUtil.getConnection();
    
                //2.使用连接创建通道对象
                Channel channel = connection.createChannel();
    
                //3.3.使用通道创建交换机[变化] :广播
                channel.exchangeDeclare(NAME_EXCHANGE_FANOUT, BuiltinExchangeType.FANOUT);
    
                //4.准备消息内容
                String messgae = "is a fanout message over!!!";
    
                //5.发送消息到MQ
                //String exchange, :交换机名字  "" 使用默认
                //String routingKey, :队列的路由key(对列的名字)
                //AMQP.BasicProperties props,:其他属性
                //byte[] body :发送的消息的内容
                channel.basicPublish(NAME_EXCHANGE_FANOUT,"",null,messgae.getBytes());
    
                System.out.println("Sender消息发送完毕...");
            }
    }
    View Code

    接收者

    package cn.jiedada._03_fanout;
    
    import cn.jiedada.utils.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Consutom {
        public static final String NAME_QUEUE_FANOUT_C1 = "name_queue_fanout_c1";
        //1.使用工具创建连接对象
        //2.使用连接创建通道对象
        //3.使用通道创建队列【变化】
        //4.把队列绑定到交换机【变化,routingkey不写】
        //5.准备消息处理回调
        //6.监听队列,消费消息
        public static void main(String[] args) throws Exception {
            //1.使用工具创建连接对象
            Connection connection = ConnectionUtil.getConnection();
            //2.使用连接创建通道对象
            final Channel channel = connection.createChannel();
            //同时消费一个消息
            channel.basicQos(1);
            //3.使用通道创建队列【变化】
            channel.queueDeclare(
                    NAME_QUEUE_FANOUT_C1,
                    true,
                    false,
                    false,
                    null );
            //把队列绑定到交换机【变化】
            //String queue,队列名
            //String exchange,绑定的交换机名
            //String routingKey,队列的routingkey,广播模式,不写
            channel.queueBind(
                    NAME_QUEUE_FANOUT_C1,
                    Sender.NAME_EXCHANGE_FANOUT,
                    "");
    
            //5.准备消息处理回调
            Consumer callback = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("Receiver1-1:收到消息.....");
                    //消费者ID
                    System.out.println("consumerTag:"+consumerTag);
                    //交换机
                    System.out.println("getExchange:"+envelope.getExchange());
                    System.out.println("getRoutingKey:"+envelope.getRoutingKey());
                    //收货ID(消息ID)
                    System.out.println("getDeliveryTag:"+envelope.getDeliveryTag());
                    System.out.println("消息内容:"+new String(body));
                    System.out.println("Receiver1-1:消息完毕.............");
                    //手动签收消息
                    //long deliveryTag, :交货的ID
                    //boolean multiple:false,签收当前则一个消息
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
    
            //4.监听队列,消费消息
            //String queue:消费的队列的名字
            //boolean autoAck:自动签收消息,队列把消息推送给消费者之后,自动删除消息,
            // 可能会出问题:如果消费失败,消息也会被删除,会出现消息丢失,手动签收可以解决
    
            //Consumer callback :拿到消息之后的处理器的回调
            //channel.basicConsume(Sender.NAME_QUEUE_HELLOWORLD,true,callback);
            channel.basicConsume(NAME_QUEUE_FANOUT_C1,false,callback);
        }
    }
    View Code

    重要的三个持久化,一般来说需要设置

    交换机持久化

    队列持久化

    消息持久化

  • 相关阅读:
    "AWT-EventQueue-0" java.lang.UnsatisfiedLinkError: no freetype in java.library.path
    Cookie的简单使用
    转发与重定向
    JSON 在Ajax数据交换中的简单运用
    Word/Excel/PPT 2016商务办公从入门到精通
    高性能MySQL(第3版)
    精通CFD工程仿真与案例实战 FLUENT GAMBIT ICEM CFD Tecplot 第2版
    HTML、CSS和JavaScript入门经典(第2版)
    C#图解教程(第4版)
    巧学巧用DreamweaverCS6、FlashCS6、FireworksCS6网站制作
  • 原文地址:https://www.cnblogs.com/xiaoruirui/p/13679315.html
Copyright © 2011-2022 走看看