zoukankan      html  css  js  c++  java
  • RabbitMQ

    1、RabbitMQ简介

    2、RabbitMQ安装及使用

    3、RabbitMQ快速入门

    4、交换机

    RabbitMQ简介

    各大主流中间件对比

    ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线,并且它一

    个完全支持 J M S 规范的消息中间件。

    其丰富的 API 、多种集群构建模式使得他成为业界老牌消息中间件,在中

    小型企业中应用广泛!

    MQ 衡量指标:服务性能、数据存储、集群架构

    RocketMQ是阿里开源的消息中间件,目前也已经孵化为Apache顶级项目,

    它是纯java开发,具有高吞吐量、高可用性、适合大规模分布式系统 应用的特点。

    RocketMQ思路起源于Kafka,它对消息的可靠传输及事务 性做了优化,

    目前在阿里集团被广泛应用于交易、充值、流计算、消息推 送、日志流式处理、binglog分发等场景

     

    RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议 来实现。

    AMQP的主要特征是面向消息、队列、路由(包括点对点和发布 /订阅)、可靠性、安全。AMQP协议更多用在企业系统内,

    对数据_致 性、稳定性和可靠性要求很髙的场景,对性能和吞吐量的要求还在其次。

    结论:

    activiMq老牌消息中间件,api全面,但是吞吐量不大

    Kafaka吞吐量大,但是数据无法保证不丢失,主要面向大数据

    rokectMQ:吞吐量大,保证数据不丢失,并且支持分布式事物,但是商业版需要收费

    rabbitMQ:吞吐量大,数据不易丢失

     

    初识RabbitMQ

    RabbitMQ是—个开源的消息代理和队列服务器,用来通过普通协议 在完全不同的应用之间共享数据,RabbitMQ是使用Erlang语言来编写 的,并且RabbitMQ是基于AMQP协议的。

     

    哪些大厂在用RabbitMQ,为什幺?

    滴滴、美团、头条、去哪儿、艺龙......

    开源、性能优秀,稳定性保障

    提供可靠性消息投递模式(confirm)、返回模式(return )

    与SpringAMQP完美的整合、API丰富

    集群模式丰富,表达式配置,HA模式,镜像队列模型

    保证数据不丟失的前提做到高可靠性、可用性

      

    RabbitMQ高性能的原因?

    Erlang语言最初在于交换机领域的架构模式,这样使得 RabbitMQ在Broker之间进行数据交互的性能是非常优秀的

    Erlang的优点:Erlang有着和原生Socket—样的延迟

    什么是AMQP高级消息队列协议?

    AMQP定义:

    是具有现代特征的二进制协议;

    是一个提供统一消息服务的应用层标准高级消息队列协议;

    是应用层协议的一个开放标准,为面向消息的中间件设计;

     

    AMQP核心概念(重点)

    Server:又称Broker,接受客户端的连接,实现AMQP实体服务

    Connection:连接:应用程序与Broker的网络连接

    Channel:网络通道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道;客户端可建立多个Channel,每个Channel代表一个会话任务;

    Message:消息,服务器与应用程序之间传递的数据,由Properties和Body组成。Properties可以对消息进行装饰,比如消息的优先级、延迟等高级特性;Body则就是消息体内容;

    Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由;一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或Queue;

    Exchange:交换机,交换消息,根据路由键转发消息到绑定的队列;

    Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key;

    Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息

    Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者

    RabbitMQ安装及使用

    Docker安装方式

    注意获取镜像的时候要获取management版本的,不要获取last版本的,management版本的才带有管理界面

     

       #1.查询镜像

       docker search rabbitmq:management

     

       #2.获取镜像

       docker pull rabbitmq:management

      

       #3.运行镜像

       ##方式一:默认guest用户,密码也是guest

       docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management

     

       ##方式二:设置用户名和密码

       docker run -d

         --name my-rabbitmq

         -p 5672:5672 -p 15672:15672

         -v /data:/var/lib/rabbitmq

         --hostname my-rabbitmq-host

         -e RABBITMQ_DEFAULT_VHOST=my_vhost

         -e RABBITMQ_DEFAULT_USER=admin

         -e RABBITMQ_DEFAULT_PASS=admin

         --restart=always

         rabbitmq:management 

    参数说明:

       -d:后台运行容器

       -name:指定容器名

       -p:指定服务运行的端口(5672:应用访问端口;15672:控制台Web端口号)

       -v:映射目录或文件,启动了一个数据卷容器,数据卷路径为:/var/lib/rabbitmq,再将此数据卷映射到住宿主机的/data目录

       --hostname:主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名)

       -e:指定环境变量;(RABBITMQ_DEFAULT_VHOST:默认虚拟机名;RABBITMQ_DEFAULT_USER:默认的用户名;RABBITMQ_DEFAULT_PASS:默认用户名的密码)

       --restart=always:当Docker重启时,容器能自动启动   

       rabbitmq:management:镜像名

       

       注1:RABBITMQ_DEFAULT_VHOST=my_vhost,my_vhost名字请记好,在之后的编程中要用到,

            如果启动时没指定,默认值为/

     

       #4.进入RabbitMQ管理平台进行相关操作

        

       注1:容器启动后,可以通过docker logs 窗口ID/容器名字 查看日志

            docker logs my-rabbitmq    

       注2:停止并删除所有容器

            docker stop $(docker ps -aq) && docker rm $(docker ps -aq)

    常用操作命令

    命令行与管控台-基础操作

    rabbitmqctl stop_app:关闭应用

    rabbitmqctl start_app:启动应用

    rabbitmqctl status:节点状态

    rabbitmqctl add_user username password:添加用户

    rabbitmqctl list_users:列出所有用户

    rabbitmqctl delete_user username:删除用户

    rabbitmqctl clear_permissions -p vhostpath username:清除用户权限

    rabbitmqctl list_user_permissions username:列出用户权限

    rabbitmqctl change_password username newpassword:修改密码

    rabbitmqctl set_permissions -p vhostpath username “.*” “.*” “.*”

    rabbitmqctl add_vhost vhostpath:创建虚拟主机

    rabbitmqctl list_vhosts:列出所有虚拟主机

    rabbitmqctl list_permissions -p vhostpath:列出虚拟主机上所有权限

    rabbitmqctl delete_vhost vhostpath:删除虚拟主机

    rabbitmqctl list_queues:查看所有队列信息

    rabbitmqctl -p vhostpath purge_queue blue:清除队列里的消息

     

    命令行与管控台-高级操作

    rabbitmqctl reset:移除所有数据,要在rabbitmqctl stop_app之后使用

    rabbitmqctl join_cluster <clustermode> [--ram]:组成集群命令

    rabbitmqctl cluster_status:查看集群状态

    rabbitmqctl change_cluster_node_type disc | ram:修改集群节点的存储形式

    rabbitmqctl forget_cluster_node {--offline} 忘记节点 (摘除节点)

    rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2...] (修改节点名称)

    RabbitMQ快速入门

    极速入门-消息生产与消费

    ConnectionFactory:获取连接工厂

    Connection:一个链接

    Channel:数据通信通道,课发送和接收消息

    Queue:具体的消息存储队列

    Producer & Consumer:生产和消费者

     

    创建一个springboot项目: rabbitmq-api

    导入pom依赖:

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>3.6.5</version>
    </dependency>

    消费端代码:

    package com.psy.rabbitmqapi.quick;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    
    import java.io.IOException;
    
    
    public class Consumer {
        public static void main(String[] args) throws Exception {
    
            //1 创建一个ConnectionFactory, 并进行配置
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.254.128");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
    
            //2 通过连接工厂创建连接
            Connection connection = connectionFactory.newConnection();
    
            //3 通过connection创建一个Channel
            Channel channel = connection.createChannel();
    
            //4 声明(创建)一个队列
            String queueName = "test001";
    //        参数:队列名称、持久化与否、独占与否、无消息队列是否自动删除、消息参数
    //        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
            channel.queueDeclare(queueName, true, false, false, null);
    
            //5 创建消费者
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    
            //6 设置Channel
    //         参数:队列名称、自动签收、消费者回调
    //        basicConsume(String queue, boolean autoAck, Consumer callback)
            channel.basicConsume(queueName, true, queueingConsumer);
    
            while(true){
                //7 获取消息(Delivery:传送)
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.err.println("消费端: " + msg);
                //Envelope envelope = delivery.getEnvelope();
            }
    
        }
    }

    生产端:

    package com.psy.rabbitmqapi.quick;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    
    public class Procuder {
        public static void main(String[] args) throws Exception {
            //1 创建一个ConnectionFactory, 并进行配置
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.254.128");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
    
            //2 通过连接工厂创建连接
            Connection connection = connectionFactory.newConnection();
    
            //3 通过connection创建一个Channel
            Channel channel = connection.createChannel();
    
            //4 通过Channel发送数据
            for(int i=0; i < 5; i++){
                String msg = "Hello RabbitMQ!";
                //1 exchange   2 routingKey
                channel.basicPublish("", "test001", null, msg.getBytes());
            }
    
            //5 记得要关闭相关的连接
            channel.close();
            connection.close();
        }
    }

    交换机

    交换机属性:

    Name:交换机名称

    Type:交换机类型 direct、topic、fanout、headers

    Durability:是否需要持久化,true为持久化

    Auto Delete:当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange

    Internal:当前Exchange是否用于RabbitMQ内部使用,默认为False

    Arguments:扩展参数,用于扩展AMQP协议,定制化使用

    直流交换机

    直连交换机Direct Exchange(完全匹配路由key)

    所有发送到Direct Exchange的消息会被转发到RouteKey中指定的Queue

    注意:Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃;

     

     消费端代码:

    package com.psy.rabbitmqapi.exchange.direct;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    
    
    public class Consumer4DirectExchange {
        public static void main(String[] args) throws Exception {
    
    
            ConnectionFactory connectionFactory = new ConnectionFactory() ;
    
            connectionFactory.setHost("192.168.254.128");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
    
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setNetworkRecoveryInterval(3000);
            Connection connection = connectionFactory.newConnection();
    
            Channel channel = connection.createChannel();
            //4 声明
            String exchangeName = "test_direct_exchange";
            String exchangeType = "direct";
            String queueName = "test_direct_queue";
            String routingKey = "test.direct";
    
            //表示声明了一个交换机
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            //表示声明了一个队列
            channel.queueDeclare(queueName, false, false, false, null);
            //建立一个绑定关系:
            channel.queueBind(queueName, exchangeName, routingKey);
    
            //durable 是否持久化消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //参数:队列名称、是否自动ACK、Consumer
            channel.basicConsume(queueName, true, consumer);
            //循环获取消息
            while(true){
                //获取消息,如果没有消息,这一步将会一直阻塞
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.out.println("收到消息:" + msg);
            }
        }
    }

    生产端:

    package com.psy.rabbitmqapi.exchange.direct;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    
    public class Producer4DirectExchange {
        public static void main(String[] args) throws Exception {
    
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.254.128");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
    
            //2 创建Connection
            Connection connection = connectionFactory.newConnection();
            //3 创建Channel
            Channel channel = connection.createChannel();
            //4 声明
            String exchangeName = "test_direct_exchange";
            String routingKey = "test.direct";
    //        String routingKey = "test.direct111"; //收不到
            //5 发送
    
            String msg = "Hello World RabbitMQ 4  Direct Exchange Message 111 ... ";
            channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
    
        }
    }

    主题交换机

    主题交换机Topic Exchange(匹配路由规则的交换机)

     

    所有发送到Topic Exchange的消息被转发到所有关系RouteKey中指定Topic的Queue上;

     

    Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic;

     

    注意:可以使用通配符进行模糊匹配

    符号:“#” 匹配一个或者多个词

    符号:“*” 匹配不多不少一个词

    列如:

    “log.#” 能够匹配到 “log.info.oa”

    “log.*” 能够匹配到 “log.err”

     

    消费端代码:

    package com.psy.rabbitmqapi.exchange.topic;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    
    
    public class Consumer4TopicExchange {
        public static void main(String[] args) throws Exception {
    
    
            ConnectionFactory connectionFactory = new ConnectionFactory() ;
    
            connectionFactory.setHost("192.168.254.128");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
    
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setNetworkRecoveryInterval(3000);
            Connection connection = connectionFactory.newConnection();
    
            Channel channel = connection.createChannel();
            //4 声明
            String exchangeName = "test_topic_exchange";
            String exchangeType = "topic";
            String queueName = "test_topic_queue";
            String routingKey = "user.#";
    //        String routingKey = "user.*";
            // 1 声明交换机
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            // 2 声明队列
            channel.queueDeclare(queueName, false, false, false, null);
            // 3 建立交换机和队列的绑定关系:
            channel.queueBind(queueName, exchangeName, routingKey);
    
            //durable 是否持久化消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //参数:队列名称、是否自动ACK、Consumer
            channel.basicConsume(queueName, true, consumer);
            //循环获取消息
            while(true){
                //获取消息,如果没有消息,这一步将会一直阻塞
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.out.println("收到消息:" + msg);
            }
        }
    }

    生产端:

    package com.psy.rabbitmqapi.exchange.topic;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    
    public class Producer4TopicExchange {
        public static void main(String[] args) throws Exception {
    
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.254.128");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
    
            //2 创建Connection
            Connection connection = connectionFactory.newConnection();
            //3 创建Channel
            Channel channel = connection.createChannel();
            //4 声明
            String exchangeName = "test_topic_exchange";
            String routingKey1 = "user.save";
            String routingKey2 = "user.update";
            String routingKey3 = "user.delete.abc";
            //5 发送
    
            String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
            channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
            channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
            channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
            channel.close();
            connection.close();
        }
    }

    输出交换机

    输出交换机Fanout Exchange(不做路由)

    不处理路由键,只需要简单的将队列绑定到交换机上;

    发送到交换机的消息都会被转发到与该交换机绑定的所有队列上;

    Fanout交换机转发消息是最快的

     消费端代码:

    package com.psy.rabbitmqapi.exchange.fanout;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    
    
    public class Consumer4FanoutExchange {
        public static void main(String[] args) throws Exception {
    
            ConnectionFactory connectionFactory = new ConnectionFactory() ;
    
            connectionFactory.setHost("192.168.254.128");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
    
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setNetworkRecoveryInterval(3000);
            Connection connection = connectionFactory.newConnection();
    
            Channel channel = connection.createChannel();
            //4 声明
            String exchangeName = "test_fanout_exchange";
            String exchangeType = "fanout";
            String queueName = "test_fanout_queue";
            String routingKey = "";    //不设置路由键
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            channel.queueDeclare(queueName, false, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
    
            //durable 是否持久化消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //参数:队列名称、是否自动ACK、Consumer
            channel.basicConsume(queueName, true, consumer);
            //循环获取消息
            while(true){
                //获取消息,如果没有消息,这一步将会一直阻塞
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.out.println("收到消息:" + msg);
            }
        }
    }

    生产端:

    package com.psy.rabbitmqapi.exchange.fanout;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    
    public class Producer4FanoutExchange {
        public static void main(String[] args) throws Exception {
    
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.254.128");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
    
            //2 创建Connection
            Connection connection = connectionFactory.newConnection();
            //3 创建Channel
            Channel channel = connection.createChannel();
            //4 声明
            String exchangeName = "test_fanout_exchange";
            //5 发送
            for(int i = 0; i < 10; i ++) {
                String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
                channel.basicPublish(exchangeName, "", null , msg.getBytes());
            }
            channel.close();
            connection.close();
        }
    }
  • 相关阅读:
    some tips
    ORA00847: MEMORY_TARGET/MEMORY_MAX_TARGET and LOCK_SGA cannot be set together
    Chapter 01Overview of Oracle 9i Database Perfomrmance Tuning
    Chapter 02Diagnostic and Tuning Tools
    变量与常用符号
    Chapter 18Tuning the Operating System
    标准输入输出
    Trace files
    DBADeveloped Tools
    Chapter 03Database Configuration and IO Issues
  • 原文地址:https://www.cnblogs.com/psyu/p/11988354.html
Copyright © 2011-2022 走看看