zoukankan      html  css  js  c++  java
  • 第一节 RabbitMQ入门及安装

    RabbitMQ消息中间件系列教程

    第一节 RabbitMQ入门及安装

    1、RabbitMQ基本概念

    https://www.rabbitmq.com

    RabbitMQ是一个开源的消息代理和队列服务器,通过普通的协议(Amqp协议)来完成不同应用之间的数据共享(消费生产和消费者可以跨语言平台),RabbitMQ是通过elang语言来开发的基于amqp协议。

    AMQP的核心概念

    Advanced message queue protocol高级消息队列协议,amqp 是一个应用层协议的规范(定义了很多规范),可以有很多不同的消息中间件产品(需要 遵循该规范)。

    • 1)server :又称为broker,接受客户端连接,实现amqp实体服务
    • 2)Connection: 连接,应用程序与brokder建立网络连接
    • 3)channel:网络通道,几乎所有的操作都是在channel中进行的,是进行消息对象的通道,客户端可以建立 多个通道,每一个channel表示一个会话任务
    • 4)Message: 服务器和应用程序之间传递数据的载体,有properties(消息属性,用来修饰消息,比如消息的优 先级,延时投递)和Body(消息体)
    • 5)virtual host(虚拟主机): 是一个逻辑概念,最上层的消息路由,一个虚拟主机中可以包含多个exhange 和 queue 但是一个虚拟主机中不能有名称相同的exchange 和queue
    • 6)exchange 交换机: 消息直接投递到交换机上,然后交换机根据消息的路由key 来路由到对应绑定的队列上
    • 7)baingding: 绑定exchange 与queue的虚拟连接,bingding中可以包含route_key
    • 8)route_key 路由key ,他的作用是在交换机上通过route_key来把消息路由到哪个队列上
    • 9)queue:队列,用于来保存消息的载体,有消费者监听,然后消费消息

    交互机和队列是有一个绑定的关系

    2、RabbitMQ的整体架构模型

    3、RabbitMQ的消息是如何流转的

    4、RabbitMQ的安装和使用

    • 安装RabbitMQ所需要的依赖包
    yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gccc++ kernel-devel m4 ncurses-devel tk tc xz
    
    • 下载安装包
    wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm 
    wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm 
    wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm 
    

    链接:https://pan.baidu.com/s/1_yJgzHsEENN4HRfcvVH79A 提取码:fir5

    • 安装服务命令
    #第一步:安装erlang语言环境 
    rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm 
    #第二步:安装socat加解密软件 
    rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm 
    #第三步:最后安装rabbitmq 
    rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm 
    
    • 修改集群用户与连接心跳检测

    注意修改vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app文件,修改:loopback_users 中的 <<"guest">>,只保留guest(不修改只能通过localhost访问)

    • 修改本机系统文件

    修改 vim /etc/rabbitmq/rabbitmq-env.conf 添加NODENAME=rabbit

    修改 vim /etc/hostname

    修改本地 vim /etc/hosts文件

    • 验证服务器是可用的
    rabbitmq-server start &
    

    执行管控台插件:(不然不能在浏览器方法)

    rabbitmq-plugins enable rabbitmq_management
    

    *具体安装3.6.5的文档:https://www.cnblogs.com/sky-cheng/p/10709104.html

    *卸载文档:https://www.cnblogs.com/kingsonfu/p/11023967.html

    5、RabbitMQ延时插件安装

    插件地址:https://www.rabbitmq.com/community-plugins.html

    延时插件地址 :https://dl.bintray.com/rabbitmq/communityplugins/3.6.x/rabbitmq_delayed_message_exchange/

    下载延时插件:

    wget https://dl.bintray.com/rabbitmq/communityplugins/3.6.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-201712153.6.x.zip 
    

    解压延时插件:

    unzip rabbitmq_delayed_message_exchange-20171215-3.6.x.zip 
    

    把延时插件拷贝到指定目录下:

    cp rabbitmq_delayed_message_exchange-20171215-3.6.x.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.5/plugins 
    

    启动延时插件:

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    

    6、RabbitMQ操作命令

    • 起停服务命令
    启动服务 rabbitmqctl start_app(启动rabbitmq节点  保证需要erlang虚拟机节点起来才能执行) 
    停止服务 rabbitmqctl stop_app(停止rabbtimq节点,但是不会停止erlang节点)   rabbitmqctl stop(都 会停止) 
    查看服务状态 rabbtimqctl status
    
    • 用户操作命令
    查看所有用户列表:  rabbitmq list_users
    添加用户 rabbitmqctl add_user smlz smlz
    设置rabbitmq用户的角色  rabbitmqctl set_user_tags smlz administrator
    为用户设置权限: rabbitmqctl set_permissions  -p / smlz ".*" ".*" ".*" 
    rabbitmqctl set_permissions  -p <虚拟机> <用户名> ".*" ".*" ".*"
    列出用户权限: rabbitmqctl list_user_permissions smlz
    清除用户权限 rabbitmqctl clear_permissions -p <虚拟机> <用户名>
    删除用户 rabbitmqctl delete_user root 
    修改密码  rabbitmqctl change_password 用户名  新密码
    
    • 虚拟主机操作
    rabbitmqctl add_vhost /cloudmall 增加一个虚拟主机 
    rabbitmqctl list_vhosts;   查看所有的虚拟主机 
    rabbitmqctl list_permissions -p /cloudmall  查看虚拟主机的权限 
    rabbitmqctl delete_vhost /cloudmall  删除虚拟主机
    
    • 操作队列命令
    rabbitmqctl list_queues 查询所有队列 
    rabbitmqctl -p vhostpath purge_queue blue 清除队列消息
    
    • 高级命令
    rabbitmqctl reset 移除所有数据   该命令需要在 rabbitmqctl stop_app命令之后才执行(也就是说 在服 务停止后) 
    rabbitmqctl join_cluster  <cluster_node> [--ram] 组成集群命令 
    rabbitmqctl cluster_status  查看集群状态 
    rabbitmqctl change_cluster_node_type dist|ram   修改集群节点存储数据模式 
    rabbitmqctl forget_cluster_node [--offline]忘记节点 (摘除节点) 
    rabbitmqctc rename_cluster_node oldnode1 newnode1 oldnode2 newnode2 修改节点名称 
    

    7、消费者 生产者模型(使用java连接mq)

    • 生产者
    public class RabbitmqProducter {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //1:创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
    
            //2设置连接工厂的属性
            connectionFactory.setHost("192.168.159.8");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("tuling");
            connectionFactory.setUsername("smlz");
            connectionFactory.setPassword("smlz");
    
            //3:通过连接工厂创建连接对象
            Connection connection = connectionFactory.newConnection();
    
            //4:通过连接创建channel
            Channel channel = connection.createChannel();
    
            //5:通过channel发送消息
            for(int i=0;i<5;i++) {
                String message = "hello--"+i;
                /**
                 * 老师以前讲过说我们的消息会发送的exchange上,
                 * 但是在这里我们没有指定交换机?那我们的消息发送到哪里了????
                 * The default exchange is implicitly bound to every queue, with a routing key equal to the queue name.
                 * It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.
                 * 说明:加入我们消息发送的时候没有指定具体的交换机的话,那么就会发送到rabbimtq指定默认的交换机上,
                 * 那么该交换机就会去根据routing_key 查找对应的queueName 然后发送的该队列上.
                 *
                 */
                channel.basicPublish("","tuling-queue-01",null,message.getBytes());
            }
    
            //6:关闭连接
            channel.close();
            connection.close();
        }
    }
    
    
    • 消费者
    public class RabbitmqConsumer {
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    
            //创建连接工厂
            ConnectionFactory connectionFactory  = new ConnectionFactory();
            connectionFactory.setHost("192.168.159.8");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("tuling");
            connectionFactory.setUsername("smlz");
            connectionFactory.setPassword("smlz");
    
            //创建连接
            Connection connection = connectionFactory.newConnection();
    
            //创建一个channel
            Channel channel = connection.createChannel();
    
            //声明队列
            String queueName = "tuling-queue-01";
            /**
             * queue:队列的名称
             * durable:是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化,
             * 保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库
             * exclusive:当连接关闭时connection.close()该队列是否会自动删除;
             * 二:该队列是否是私有的private,如果不是排外的,可以使用两个消费者都访问同一个队列,
             * 没有任何问题,如果是排外的,会对当前队列加锁,其他通道channel是不能访问的,如果强制访问会报异常
             * com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue_name' in vhost '/', class-id=50, method-id=20)
             * 一般等于true的话用于一个队列只能有一个消费者来消费的场景
             * autodelete:是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQ Management,
             * 查看某个队列的消费者数量,当consumers = 0时队列就会自动删除
             */
            channel.queueDeclare(queueName,true,false,true,null);
    
            //创建消费者
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            channel.basicConsume(queueName,true,queueingConsumer);
    
            while (true) {
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                String reserveMsg = new String(delivery.getBody());
                System.out.println("消费消息:"+reserveMsg);
            }
        }
    }
    
    

    8、Rabbitmq交换机详解

    1:作用:

    接受生产者的消息,然后根据路由键 把消息投递到跟交换机绑定的对应的队列上

    2:交换机的属性:
    • Name:交换机的名称
    • Type:交换机的类型,direct,topic,fanout,headers
    • Durability:是否需要持久化
    • autodelete:假如没有队列绑定到该交换机,那么该交换机会自动删除
    • Internal:当前交换机是否用户rabbitmq内部使用不常用,默认为false
    • Argurements:扩展参数,用户扩展AMQP 定制化协议
    3:交换机的类型
    • 3.1)直连交换机:direct exchange

    所以发送的direct exhchange 的消息都会被投递到与routekey名称(与队列名称)相同的queue上,direct模式下,可以使用rabbitmq自定exchange----> default  exchange   所以不需要交换机和任何队列绑定,消息将会投递到route_key名称和队列名称相同的队列上

    • 生产者
    public class DirectExchangeProductor {
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.159.8");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("tuling");
            connectionFactory.setUsername("smlz");
            connectionFactory.setPassword("smlz");
    
            //创建连接
            Connection connection = connectionFactory.newConnection();
    
            //创建channel
            Channel channel = connection.createChannel();
    
            //定义交换机名称
            String exchangeName = "tuling.directchange";
    
            //定义routingKey
            String routingKey = "tuling.directchange.key11111111";
    
            //消息体内容
            String messageBody = "hello tuling ";
            channel.basicPublish(exchangeName,routingKey,null,messageBody.getBytes());
        }
    }
    
    • 消费者
    public class DirectExchangeConsumer {
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    
            //创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.159.8");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("tuling");
            connectionFactory.setUsername("smlz");
            connectionFactory.setPassword("smlz");
    
            //创建连接
            Connection connection = connectionFactory.newConnection();
    
            //创建channel
            Channel channel = connection.createChannel();
    
            String exchangeName = "tuling.directchange";
            String exchangeType = "direct";
            String queueName = "tuling.directqueue";
            String routingKey = "tuling.directchange.key";
            /**
             * 声明一个交换机
             * exchange:交换机的名称
             * type:交换机的类型 常见的有direct,fanout,topic等
             * durable:设置是否持久化。durable设置为true时表示持久化,反之非持久化.持久化可以将交换器存入磁盘,在服务器重启的时候不会丢失相关信息
             * autodelete:设置是否自动删除。autoDelete设置为true时,则表示自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后,所有与这个交换器绑定的队列或者交换器都与此解绑。
             * 不能错误的理解—当与此交换器连接的客户端都断开连接时,RabbitMq会自动删除本交换器
             * arguments:其它一些结构化的参数,比如:alternate-exchange
             */
            channel.exchangeDeclare(exchangeName,exchangeType,true,false,null);
    
            /**
             * 声明一个队列
             * durable:表示rabbitmq关闭删除队列
             * autodelete:表示没有程序和队列建立连接 那么就会自动删除队列
             *
             */
            channel.queueDeclare(queueName,true,false,false,null);
    
            /**
             * 队里和交换机绑定
             */
            channel.queueBind(queueName,exchangeName,routingKey);
    
            /**
             * 创建一个消费者
             */
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    
            /**
             * 开始消费
             */
            channel.basicConsume(queueName,true,queueingConsumer);
    
            while (true) {
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                String reciverMessage = new String(delivery.getBody());
                System.out.println("消费消息:-----"+reciverMessage);
            }
        }
    }
    
    • 3.2)主题交换机 TopicExchange

    就是在队列上绑到top 交换机上的路由key  可以是通过通配符来匹配的通配符的规则是

    比如:   log.#   :可以匹配一个单词  也可以匹配多个单词    比如  log.#   可以匹配log.a 、log.a.b、log.a.b  
                  log.*   :可以匹配一个单词    比如 log.*  可以匹配log.a  但是不可以匹配log.a.b

    • 生产者
    public class TopicExchangeProductor {
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.159.8");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            connectionFactory.setUsername("smlz");
            connectionFactory.setPassword("smlz");
    
            //创建连接
            Connection connection = connectionFactory.newConnection();
    
            //创建channel
            Channel channel = connection.createChannel();
    
            //发送消息
            String exchangeName = "policymaker.exchange";
    
            String routingKey1 = "policymaker.key1";
            String routingKey2 = "policymaker.key2";
            String routingKey3 = "policymaker.key.key3";
    
            channel.basicPublish(exchangeName,routingKey1,null,"我是第一条消息".getBytes());
            channel.basicPublish(exchangeName,routingKey2,null,"我是第二条消息".getBytes());
            channel.basicPublish(exchangeName,routingKey3,null,"我是第三条消息".getBytes());
        }
    } 
    
    • 消费者
    public class TopicExchangeConsumer {
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            //创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.159.8");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            connectionFactory.setUsername("smlz");
            connectionFactory.setPassword("smlz");
    
            //创建连接
            Connection connection = connectionFactory.newConnection();
    
            //创建channel
            Channel channel = connection.createChannel();
    
            //声明交换机
            String exchangeName = "policymaker.exchange";
            String exchangeType = "topic";
            channel.exchangeDeclare(exchangeName,exchangeType,true,true,null);
    
            //声明队列
            String quequName = "policymaker.queue";
            channel.queueDeclare(quequName,true,false,false,null);
    
            //声明绑定关系
            String bingdingStr = "policymaker.#";
            channel.queueBind(quequName,exchangeName,bingdingStr);
    
            //声明一个消费者
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    
            //开始消费
            /**
             * 开始消费
             */
            channel.basicConsume(quequName,true,queueingConsumer);
    
            while (true) {
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                System.out.println("接受到消息:"+new String(delivery.getBody()));
            }
        }
    }
    
    • 3.3)扇形交换机(fanout exchange)

    就是消息通过从交换机到队列上不会通过路由key   所以该模式的速度是最快的   只要和交换机绑定的那么消息就会 被分发到与之绑定的队列上

    • 生产者
    public class FanoutExchangeConsumer {
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            //创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.159.8");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("tuling");
            connectionFactory.setUsername("smlz");
            connectionFactory.setPassword("smlz");
    
            //创建连接
            Connection connection = connectionFactory.newConnection();
    
            //创建channel
            Channel channel = connection.createChannel();
    
            //声明交换机
            String exchangeName = "tuling.fanoutexchange";
            String exchangeType = "fanout";
            channel.exchangeDeclare(exchangeName,exchangeType,true,true,null);
    
            //声明队列
            String quequName = "tuling.fanout.queue";
            channel.queueDeclare(quequName,true,false,false,null);
    
            //声明绑定关系
            String bingdingStr = "jjsadf";
            channel.queueBind(quequName,exchangeName,bingdingStr);
    
            //声明一个消费者
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    
            //开始消费
            /**
             * 开始消费
             */
            channel.basicConsume(quequName,true,queueingConsumer);
    
            while (true) {
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                System.out.println("接受到消息:"+new String(delivery.getBody()));
            }
        }
    }
    
    • 消费者
    public class FanoutExchangeConsumer {
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            //创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.159.8");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("tuling");
            connectionFactory.setUsername("smlz");
            connectionFactory.setPassword("smlz");
    
            //创建连接
            Connection connection = connectionFactory.newConnection();
    
            //创建channel
            Channel channel = connection.createChannel();
    
            //声明交换机
            String exchangeName = "tuling.fanoutexchange";
            String exchangeType = "fanout";
            channel.exchangeDeclare(exchangeName,exchangeType,true,true,null);
    
            //声明队列
            String quequName = "tuling.fanout.queue";
            channel.queueDeclare(quequName,true,false,false,null);
    
            //声明绑定关系
            String bingdingStr = "jjsadf";
            channel.queueBind(quequName,exchangeName,bingdingStr);
    
            //声明一个消费者
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    
            //开始消费
            /**
             * 开始消费
             */
            channel.basicConsume(quequName,true,queueingConsumer);
    
            while (true) {
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                System.out.println("接受到消息:"+new String(delivery.getBody()));
            }
        }
    }
    

    9、 队列,绑定虚拟主机,消息

    绑定:  exchange 与之间的连接关系(通过路由规则)

    队列:用来存储消息的实体

    队列的属性:  durability  消息是否被持久化 AutoDelete :表示最后一个监听被移除那么该队列就会被删除

    消息:用来生产着和消费者之间传递数据的

    消息属性:  包括消息体body 和属性 properties

    常用属性:delivery mode ,headers,content_type(消息类型) content_encoding(消息编码),priporty(消息优 先级)
                          correntlation_id(最为消息唯一的id),reply_to(消息失败做重回队列),expiretion(消息的过期时 间),message_id(消息id);
                          timestamp,type,user_id ,app_id,cluster_id等

  • 相关阅读:
    hdu_5718_Oracle(大数模拟)
    hdu_2222_Keywords Search(AC自动机板子)
    hdu_5616_Jam's balance(暴力枚举子集||母函数)
    hdu_2255_奔小康赚大钱(KM带权二分匹配板子)
    hdu_2544_最短路(spfa版子)
    hdu_2457_DNA repair(AC自动机+DP)
    hdu_5555_Immortality of Frog(状压DP)
    hdu_2159_FATE(完全背包)
    [USACO2002][poj1944]Fiber Communications(枚举)
    [AHOI2013]打地鼠(网络流)
  • 原文地址:https://www.cnblogs.com/cnsyear/p/12805260.html
Copyright © 2011-2022 走看看