zoukankan      html  css  js  c++  java
  • 快速掌握RabbitMQ(一)——RabbitMQ的基本概念、安装和C#驱动

    1 RabbitMQ简介

           RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现,官网地址:http://www.rabbitmq.com。RabbitMQ作为一个消息代理,主要负责接收、存储和转发消息,它提供了可靠的消息机制和灵活的消息路由,并支持消息集群和分布式部署,常用于应用解耦,耗时任务队列,流量削锋等场景。本系列文章将系统介绍RabbitMQ的工作机制,代码驱动和集群配置,本篇主要介绍RabbitMQ中一些基本概念,常用的RabbitMQ Control命令,最后写一个C#驱动的简单栗子。先看一下RabbitMQ的基本结构:

      上图是RabbitMQ的一个基本结构,生产者Producer和消费者Consumer都是RabbitMQ的客户端,Producer负责发送消息,Consumer负责消费消息。

    接下来我们结合这张图来理解RabbitMQ的一些概念:

      Broker(Server):接受客户端连接,实现AMQP消息队列和路由功能的进程,我们可以把Broker叫做RabbitMQ服务器。

      Virtual Host:一个虚拟概念,一个Virtual Host里面可以有若干个Exchange和Queue,主要用于权限控制,隔离应用。如应用程序A使用VhostA,应用程序B使用VhostB,那么我们在VhostA中只存放应用程序A的exchange,queue和消息,应用程序A的用户只能访问VhostA,不能访问VhostB中的数据。

      Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有Direct、Fanout、Topic和Header四种,不同类型的Exchange路由规则是不一样的(这些以后会详细介绍)。

      Queue:消息队列,用于存储还未被消费者消费的消息,队列是先进先出的,默认情况下先存储的消息先被处理。

      Message:就是消息,由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等,Body是真正传输的数据,内容格式为byte[]。

      Connection:连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。

      Channel:信道,仅仅创建了客户端到Broker之间的连接Connection后,客户端还是不能发送消息的。需要在Connection的基础上创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令,一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的。

    2 RabbitMQ安装

      因为RabbitMQ是用erlang语言开发的,所以我们在安装RabbitMQ前必须要安装erlang支持。

    1 Windows平台安装

    1 安装erlang

      首先下载erlang,直接下载最新版本,当前下载的是 OTP 21.3 Windows 64-bit Binary File ,下载完成后一直下一步安装即可。

    2 安装RabbitMQ

      下载Windows平台的RabbtMQ,当前下载的是 rabbitmq-server-3.7.14.exe ,下载完成后一直下一步安装即可。

    3 安装Web管理插件

      打开RabbitMQ Command Prompt,执行命令 rabbitmq-plugins enable rabbitmq_management 即可完成Web监控插件的安装。 

      安装完成后,打开浏览器输入 http://127.0.0.1:15672/ 使用默认账号[ name:guest / password:guest ]登录后界面如下,使用这个UI插件我们可以轻松的查看RabbitMQ中的交换机(exchange),队列(queue)等内容,也可以对exchange,queue,用户等进行添加、修改、删除操作。

      到这一步Windows平台安装RabbitMQ完成了。 打开服务管理器,RabbitMQ已经在正常运行了,如下:

    2 Centos安装RabbitMQ

    1 安装RabbitMQ

      这里虚拟机系统为Centos7,采用的安装方式是yum安装,为了简单,这里直接使用官方提供的erlang和RabbitMQ-server的自动安装脚本(官方安装文档),逐行执行下边的代码就可以安装完成erlang和RabbitMQ。

    #安装socat
    yum install socat
    
    #安装erlang
      curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
      yum -y install erlang
    
    #安装rabbitmq-server
      curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
      yum -y install rabbitmq-server
    
    #启动rabbitmq服务
      systemctl start rabbitmq-server
    #添加web管理插件
      rabbitmq-plugins enable rabbitmq_management

    补充:如果安装完成后,执行RabbitMQ执行命令特别慢,或者出现报错【rabbitmq unable to perform an operation on node  xxx@xxx】,解决方法:

      编辑hosts,执行命令 vim /etc/hosts ,添加本机IP(或者虚拟机IP)

      命令执行结束后,使用浏览器访问 http://127.0.0.1:15672/ 也会出现web管理界面。通过上边的安装步骤安装的RabbitMQ会生成Unit文件,所以我们可以使用Systemd管理RabbitMQ服务,以下是几条常用的命令:

    #启动RabbitMQ服务
      systemctl start rabbitmq-server
    #停止RabbitMQ服务
      systemctl stop rabbitmq-server
    #查看RabbitMQ运行状态
      systemctl status rabbitmq-server
    #重启RabbitMQ服务
      systemctl restart rabbitmq-server

    2 RabbitMQ Control工具

      使用Web管理界面可以实现RabbitMQ的大部分常用功能,但是有些功能WebUI是做不到的,如:开启/关闭RabbitMQ应用程序和集群的管理等。RabbitMQ Control是RabbitMQ的命令行管理工具,可以调用所有的RabbitMQ内置功能,主命令是rabbitmqctl ,下边是一个查询用户列表的命令,注意需要切换到sbin目录下执行:

      为了方便的使用RabbitMQ Control工具,我们最好添加一个环境变量,Windows默认安装时在PATH中添加一条: C:Program FilesRabbitMQ Server abbitmq_server-3.7.14sbin ,不是默认安装的话找到对应的安装目录添加PATH。按照上的安装方法,Centos可以直接使用RabbitMQ Control工具,不需要多余的配置。如果想详细了解RabbitMQ Control工具,可以参考RabbitMQ Control的官方文档

      这里总结了一些最常用到的RabbitMQ Controll命令,有兴趣的小伙伴可以试着运行一下这些命令,如在命令行工具中使用命令 rabbitmqctl add_user <username> <password>  添加一个新用户。

    1 基本控制命令

      基本控制命令主要用于启动、停止应用程序、runtime等

    #停止rabbitmq和runtime
      rabbitmqctl shutdown
    #停止erlang节点
      rabbitmqctl stop 
    #启用rabbitmq 
      rabbitmqctl start_app 
    #停止rabbitmq 
      rabbitmqctl stop_app
    #查看状态
      rabbitmqctl status 
    #查看环境
      rabbitmqctl environment
    #rabbitmq恢复最初状态,内部的exchange和queue都清除
      rabbitmqctl reset 

    2 服务状态管理

      这些命令主要用于用于查看exchang、channel、binding、queue、consumers:

    #返回queue的信息
      list_queues [-p <vhostpath>] [<queueinfoitem> ...] 
    #返回exchange的信息
      list_exchanges [-p <vhostpath>] [<exchangeinfoitem> ...] 
    #返回绑定信息
      list_bindings [-p <vhostpath>] [<bindinginfoitem> ...]
     #返回链接信息
      list_connections [<connectioninfoitem> ...] 
    #返回目前所有的channels
      list_channels [<channelinfoitem> ...]   
    #返回consumers
      list_consumers [-p <vhostpath>] 

    3 用户管理命令

      这些命令主要用于添加、修改、删除用户及管理用户权限

    #在rabbitmq的内部数据库添加用户
      add_user <username> <password>  
    #删除一个用户
      delete_user <username> 
    #改变用户密码 
      change_password <username> <newpassword> 
    #清除用户密码,禁止用户登录
      clear_password <username>
    #设置用户tags,就是设置用户角色
      set_user_tags <username> <tag> 
    # 查看用户列表
      list_users 
    #创建一个vhost
      add_vhost <vhostpath> 
    #删除一个vhosts
      delete_vhost <vhostpath>    
    #列出vhosts
      list_vhosts [<vhostinfoitem> ...] 
    #针对一个vhosts 给用户赋予相关权限
      set_permissions [-p <vhostpath>] <user> <conf> <write> <read> 
    #清除一个用户对vhost的权限
      clear_permissions [-p <vhostpath>] <username> 
    #列出所有用户对某一vhost的权限
      list_permissions [-p <vhostpath>]  
    #列出某用户的访问权限
      list_user_permissions <username> 

    4 集群管理命令

    #clusternode表示node名称,--ram表示node以ram node加入集群中。默认node以disc node加入集群,在一个node加入cluster之前,必须先停止该node的rabbitmq应用,即先执行stop_app。
      join_cluster <clusternode> [--ram]   
    #显示cluster中的所有node
      cluster_status 
    #设置集群名字
      set_cluster_name <clustername>
    #修改集群名字
      rename_cluster_node <oldname> <newname>
    #改变一个cluster中node的模式,该节点在转换前必须先停止,不能把一个集群中唯一的disk node转化为ram node
      change_cluster_node_type <disc | ram>
    #远程删除一个节点,删除前必须该节点必须先停止
      rabbitmqctl forget_cluster_node rabbit@rabbit1
    #同步镜像队列
      sync_queue <queuename>
    #取消同步队列
      cancel_sync_queue <queuename>  
    #清空队列中所有消息
      purge_queue [-p vhost] <queuename>

      这里列举的很多命令是现阶段用不到的,如集群控制相关的命令,这些命令的用法会在以后的章节中逐渐理解。

    3  C#驱动RabbitMQ

    1 一个简单的栗子

      作为开发者,我们最在意的还是怎么在代码中使用RabbitMQ,可以通过官方RabbitMQ开发文档来学习RabbitMQ的使用,这里以.NET为例演示一下RabbitMQ的最基本用法。创建两个Console应用,一个作为发送消息的生产者(Producer),一个作为接受消息的消费者(Consumer),生产者向队列写入消息,消费者接受这条消息,结构如下:

      两个控制台应用都要添加RabbitMQ.Client包,命令如下:

    Install-Package RabbitMQ.Client

     生产者(Producer)代码:

        class Program
        {
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory()
                {
                    //rabbitmq-server所在设备ip,这里就是本机
                    HostName = "127.0.0.1",
                    UserName = "wyy",//用户名
                    Password = "123321"//密码
                };
                //第一步:创建连接connection
                using (var connection = factory.CreateConnection())
                {
                    //第二步:创建通道channel
                    using (var channel = connection.CreateModel())
                    {
                        //第三步:声明交换机exchang
                        channel.ExchangeDeclare(exchange: "myexchange",
                                                type: ExchangeType.Direct,
                                                durable: true,
                                                autoDelete: false,
                                                arguments: null);
                        //第四步:声明队列queue
                        channel.QueueDeclare(queue: "myqueue",
                                             durable: true,
                                             exclusive: false,
                                             autoDelete: false,
                                             arguments: null);
                        Console.WriteLine("生产者准备就绪....");
                        //第五步:绑定队列到交互机
                        channel.QueueBind(queue:"myqueue", exchange:"myexchange", routingKey:"mykey");
                        string message = "";
                        //第六步:发送消息
                        //在控制台输入消息,按enter键发送消息
                        while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
                        {
                            message = Console.ReadLine();
                            var body = Encoding.UTF8.GetBytes(message);
                            //基本发布
                            channel.BasicPublish(exchange: "myexchange",
                                                 routingKey: "mykey",
                                                 basicProperties: null,
                                                 body: body);
                            Console.WriteLine($"消息【{message}】已发送到队列");
                        }
                    }
                }
                Console.ReadKey();
            }
        }

     消费者(Consumer)代码: 

        class Program
        {
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory()
                {
                    //rabbitmq-server所在设备ip,这里就是本机
                    HostName = "127.0.0.1",
                    UserName = "wyy",//用户名
                    Password = "123321"//密码
                };
                //第一步:创建连接connection
                using (var connection = factory.CreateConnection())
                {
                    //第二步:创建通道channel
                    using (var channel = connection.CreateModel())
                    {
                        //第三步:声明队列queue
                        channel.QueueDeclare(queue: "myqueue",
                                             durable: true,
                                             exclusive: false,
                                             autoDelete: false,
                                             arguments: null);
                        //第四步:定义消费者                                      
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body);
                            Console.WriteLine($"接受到消息【{message}】");
                        };
                        Console.WriteLine("消费者准备就绪....");
                        //第五步:处理消息
                        channel.BasicConsume(queue: "myqueue",
                                             autoAck: true,
                                             consumer: consumer);
                        Console.ReadLine();
                    }
                }
            }
        } 

      依次运行Producer和Consumer两个应用程序,运行结果如下:

      注意:上边的代码在生产者和消费者的代码中都声明了exchange和queue,这主要是为了让这两个程序可以按任意顺序启动,如:我们只在生产者代码中定义了exchange和queue,却先启动消费者,这会让造成消费者找不到自己需要的exhange和queue(出现404错误)。实际开发中创建exchange/queue、绑定队列以及设置routingKey这些工作,都可以通过WebUI管理界面或者使用Rabbitmq Control工具完成。

      QueueDeclare方法用于声明队列,ExchangeDeclare用于声明交换机,我们在使用这两个方法声明时,可以设置队列和交换机的属性,如queue的名字,长度限制,exchange是否持久化、交换机类型等。

    2 QueueDeclare方法详解

      在上边的栗子中我们使用了声明队列的方法  QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments) ,该方法通过参数设置队列的特性。这里介绍一下该方法 中几个参数的作用,先看一个完整的声明队列的栗子:

                 //声明队列newsQueue
                        channel.QueueDeclare(queue: "myqueue",
                                             durable: false,
                                             exclusive: false,
                                             autoDelete: false,
                                             arguments: new Dictionary<string, object>() {
                                                 //队列中消息的过期时间是8s
                                                 { "x-message-ttl",1000*8 }, 
                                                 //队列60s没有被使用,则删除该队列
                                                 {"x-expires",1000*60 },
                                                 //队列最多保存100条消息
                                                 {"x-max-length",100 },
                                                 //队列中ready类型消息总共不能超过1000字节
                                                 {"x-max-length-bytes",1000 },
                                                 //当队列消息满了时,丢弃传来后续消息
                                                 {"x-overflow","reject-publish" },
                                                 //丢弃的消息发送到deadExchange交换机
                                                 {"x-dead-letter-exchange","deadExchange" },
                                                 //丢弃的消息发送到deadExchange交换机时的RoutingKey
                                                 {"x-dead-letter-routing-key","deadKey" },
                                                 //队列中最大的优先级等级为10(在Publish消息时对每条消息设置优先级)
                                                 {"x-max-priority",10 },
                                                 //设置队列默认为lazy
                                                 {"x-queue-mode","lazy" }
                                             });

    QueueDeclare方法的参数如下:

      queue:队列名字;

      durable:是否持久化。设置为true时,队列信息保存在rabbitmq的内置数据库中,服务器重启时队列也会恢复(注意:重启后队列内部的消息不会恢复,怎么实现消息持久化以后会详细介绍);

      exclusive:是否排外。设置为true时只有首次声明该队列的Connection可以访问,其他Connection不能访问该队列;且在Connection断开时,队列会被删除(即使durable设置为true也会被删除);

      autoDelete:是否自动删除。设置为true时,表示在最后一条使用该队列的连接(Connection)断开时,将自动删除这个队列;

      arguments:设置队列的一些其它属性,为Dictionary<string,object>类型,下表总结了arguments中可以设置的常用属性。

    参数名                                              作用 示例
    Message TTL 设置队列中消息的有效时间 { "x-message-ttl",1000*8 },设置队列中的所有消息的有效期为8s;
    Auto expire 自动删除队列。一定的时间内队列没有被使用,则自动删除队列 {"x-expires",1000*60 },设置队列的过期时长为60s,如果60s没有队列被访问,则删除队列;
    Max length 队列能保存消息的最大条数 {"x-max-length",100 },设置队列最多保存100条消息;
    Max length bytes 队列中ready类型消息的总字节数  {"x-max-length-bytes",1000 }, 设置队列中ready类型消息总共不能超过1000字节;
    Overflow behaviour  当队列消息满了时,再接收消息时的处理方法。有两种处理方案:默认为"drop-head"模式,表示从队列头部丢弃消息;"reject-publish"表示不接收后续的消息 {"x-overflow","reject-publish" },设置当队列消息满了时,丢弃传来后续消息;
    Dead letter exchange  用于存储被丢弃的消息的交换机名。Overflow behaviour 的两种处理方案中丢弃的消息都会发送到这个交换机 {"x-dead-letter-exchange","beiyongExchange" },设置丢弃的消息发送到名字位beiyongExchange的交换机;
    Dead letter routing key  被丢弃的消息发送到Dead letter exchange时的使用的routing Key {"x-dead-letter-routing-key","deadKey" },设置丢弃的消息发送到beiyongExchange交换机时的RoutingKey值是"deadKey";
    Maximum priority  设置队列中消息优先级的最大等级,在publish消息时可以设置单条消息的优先级等级 {"x-max-priority",10 },设置中消息优先级的最大等级为10;
    Lazy mode  设置队列的模式,如果设置为Lazy表示队列中消息尽可能存放在磁盘中,以减少内存占用;不设置时消息都存放在队列中,用以尽可能快的处理消息 {"x-queue-mode","lazy"},3.6以后版本可用,设置队列中消息尽可能存放在磁盘中,以减少内存占用。在消息拥堵时和消息持久化配置使用可以减少内存占用。

    3 ExchangeDeclare方法详解

       声明交换机的方法 void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments) 可以设置交换机的特性,这里简单介绍一下这个方法的几个参数:

     channel.ExchangeDeclare(exchange: "myexchange",
                             type: ExchangeType.Direct,
                             durable: true,
                             autoDelete: false,
                   arguments: new Dictionary<string, object> { 
                            {"alternate-exchange","BeiyongExchange" }//如果消息不能路由到该交换机,就把消息路由到备用交换机BeiyongExchange上
                   });

      exchange:交换机名字。

      type:交换机类型。exchange有direct、fanout、topic、header四种类型,在下一篇会详细介绍;

      durable:是否持久化。设置为true时,交换机信息保存在rabbitmq的内置数据库中,服务器重启时交换机信息也会恢复;

      autoDelete:是否自动删除。设置为true时,表示在最后一条使用该交换机的连接(Connection)断开时,自动删除这个exchange;

      arguments:其他的一些参数,类型为Dictionary<string,object> 。

    小结

      本节主要介绍了RabbitMQ的基本概念,在Windows和Centos上的安装方法,及RabbitMQ Control工具的基本使用,最后演示了一个C#驱动RabbitMQ的栗子,并详细介绍了声明queue和exchange的方法。通过这一节我们大概了解了RabbitMQ的基本使用。以后的章节会逐渐介绍RabbitMQ的四种exchange、两种Consumer的特点和使用场景,以及消息确认、优先级、持久化等,最后搭建一个高可用的RabbitMQ集群,如果文中有错误的话,希望大家可以指出,我会及时修改,谢谢!

    参考文章

    【1】 https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html

    【2】https://www.cnblogs.com/operationhome/p/10483840.html

  • 相关阅读:
    gentoo Wireless Configuration
    Gentoo: Chrome
    Gentoo: user's permission
    参考
    GithubPages上部署hexo
    hexo 个人博客搭建
    服务器修改用户名和密码
    CentOS 服务器搭建 mediawiki
    mysql 修复命令日志
    个体如何采用敏捷的工作方式
  • 原文地址:https://www.cnblogs.com/wyy1234/p/10743567.html
Copyright © 2011-2022 走看看