zoukankan      html  css  js  c++  java
  • RabbitMQ 安装与使用

    RabbitMQ 安装与使用

     

    前言

    吃多了拉就是队列,吃饱了吐就是栈

    • 使用场景
      • 对操作的实时性要求不高,而需要执行的任务极为耗时;(发送短信,邮件提醒,更新文章阅读计数,记录用户操作日志)
      • 存在异构系统间的整合;

    安装

    • 下载 Erlang
      • 安装完确定ERLANG_HOME环境变量是否添加,否则:Setx ERLANG_HOME “D:Program Fileserl8.2″
    • 下载安装包
      • 安装完通过rabbitmqctl status确定rabbitmq状态
    • 管理服务
      • 默认安装成功会自动启动服务
      • 通过开始菜单可以启动,停止,卸载服务
    • 占用端口
      • 4369(集群、Erlang)
      • 5671,5672(应用层标准高级消息队列协议)
      • 25672(Erlang分发,CLI通信)
      • 15672(如果管理插件启用)
      • 61613,61614(如果消息文本协议STOMP已启用)
      • 1883,8883(如果erl实时通信已启用)
    • 支持的平台
      • 基于Ubuntu和Debian的Linux发行版
      • 基于Fedora,CentOS和RPM的Linux发行版
      • Mac OS X
      • Windows XP及更高版本

    概念

    • Connections:客户端连接,创建该资源非常耗时,应尽量避免多次创建。
    • Channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
    • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
    • Queue:消息队列载体,每个消息都会被投入到一个或多个队列。

    • Broker:简单来说就是消息队列服务器实体。
    • Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
    • Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
    • vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。

    • producer:消息生产者,就是投递消息的程序。
    • consumer:消息消费者,就是接收消息的程序。

    消息队列的发送过程大概如下:

    1. 客户端创建Connection,连接到消息队列服务器,打开一个channel。
    2. 客户端声明一个Exchange,并设置相关属性。
    3. 客户端声明一个Queue,并设置相关属性。
    4. 客户端使用routing key,在exchange和queue之间建立好绑定关系。
    5. 客户端发送消息首先到exchange
    6. exchange根据type路由到对应的队列(可以是多个队列)中.

    Exchange Type

    • direct(直连)
      • routing key 与 binding key相同
    • fanout
      • 给所有绑定队列发送消息
    • topic
      • routing key:audit.irs.corporate => binding key:audit.#
      • routing key:audit.irs => binding key:audit.*
    • default
      • direct
      • binding key为queue名称

    常用命令

    • 管理插件
      • rabbitmq-plugins enable rabbitmq_management // 启用
      • rabbitmq-plugins disable rabbitmq_management // 禁用
    • 管理队列
      • rabbitmqctl list_queues // 查看队列
    • 管理用户及权限
      • rabbitmqctl list_users // 查看所有用户
      • rabbitmqctl add_user user_admin passwd_admin // 添加用户
      • rabbitmqctl set_user_tags user_admin administrator // 添加权限
      • rabbitmqctl delete_user guest // 删除用户
      • rabbitmqctl change_password {username} {newpassowrd} // 修改密码
    • 管理虚拟主机vhost
      • rabbitmqctl add_vhost vhostpath // 创建虚拟主机
      • rabbitmqctl delete_vhost vhostpath // 删除虚拟主机
      • rabbitmqctl list_vhosts // 列出所有虚拟主机

    使用

    • 发送消息(以持久化代码为例)
    var factory = new ConnectionFactory
    {
        HostName = hostName,                // rabbit server
        UserName = "admin",
        Password = "admin",
        Port = 5672,                        // Broker端口
        VirtualHost = "/"                   // 虚拟Host,需提前配置
    };
    
    using (var connection = factory.CreateConnection()) // 创建与RabbitMQ服务器的连接
    {
        using (var channel = connection.CreateModel())  // 创建1个Channel(大部分API在该Channel中)
        {
            // 定义1个队列,自动会和默认的exchange 做direct类型绑定
            channel.QueueDeclare(
                queue: "hello",                     // 队列名称
                durable: true,                      // 队列是否持久化
                exclusive: false,                   // 排他队列:如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。(活动在一次连接内)
                autoDelete: false,                  // 自动删除:当最后一个消费者取消订阅时,队列自动删除。如果您需要仅由一个使用者使用的临时队列,请将自动删除与排除。当消费者断​​开连接时,队列将被删除。(至少消费者能连一次)
                arguments: null);                   // 配置参数
    
            var randomQueue = channel.QueueDeclare();                   // 定义随机的队列 该队列为临时队列(排他队列 + 自动删除)
    
    
            // 定义Exchange(一般而言,不需要定义exchange,rabbitmq默认创建了所有类型的exchange)
            //channel.ExchangeDeclare("direct-demo", ExchangeType.Direct);    // 定义direct exchange
            //channel.ExchangeDeclare("fannout-demo", ExchangeType.Fanout);   // 定义fanout exchange
            //channel.ExchangeDeclare("topic-demo", ExchangeType.Topic);      // 定义fanout exchange
    
    
            // 定义queue exchange key 关系(在某些业务场景下,会使用该关系做路由功能)
            //channel.QueueBind(queue: "hello", exchange: "amq.direct", routingKey: "hello"); // 默认绑定的关系和该行代码效果一样
            //channel.QueueBind("hello", "amq.fanout", "hello");                              // 该类型下的routingKey 实际不需要
    
            var properties = channel.CreateBasicProperties();
            properties.Persistent = true;
    
            while (true)
            {
                string message = "Hello World!" + DateTime.Now;
                var body = Encoding.UTF8.GetBytes(message);
    
                // 发送消息到队列中
                channel.BasicPublish(
                    exchange: string.Empty,         // 传递为Empty的时候,通过   `(AMQP default)`传递
                    routingKey: "hello",            // routing key 与 queuebind中的binding key对应
                    basicProperties: properties,    // 消息header
                    body: body);                    // 消息body:发送的是bytes 可以任意编码
    
                Console.WriteLine(" [x] Sent {0}", message);
            }
        }
    }
    • 接收消息(以消息响应为例)

      var factory = new ConnectionFactory
      {
      HostName = hostName,                // rabbit server
      UserName = "admin",
      Password = "admin",
      Port = 5672,                        // Broker端口
      VirtualHost = "/"                   // 虚拟Host,需提前配置
      };
      using (var connection = factory.CreateConnection())
      {
      using (var channel = connection.CreateModel())
      {
          var consumer = new EventingBasicConsumer(channel);  // 创建Consumer
          consumer.Received += (model, ea) =>         // 通过回调函数异步推送我们的消息
          {
              var body = ea.Body;
              var message = Encoding.UTF8.GetString(body);
              Thread.Sleep(1000);
              channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); // 消息响应
              Console.WriteLine(" [x] Received {0}", message);
          };
      
          channel.BasicQos(0, 1, false);  // 设置perfetchCount=1 。这样就告诉RabbitMQ 不要在同一时间给一个工作者发送多于1个的消息
          channel.BasicConsume(queue: "hello",
                                  noAck: false,      // 需要消息响应(Acknowledgments)机制
                                  consumer: consumer);
      
          Console.WriteLine(" Press [enter] to exit.");
          Console.ReadLine();
      }
      }
    • 消息响应(acknowledgments)
      • 为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)机制。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ才会释放并删除这条消息。
    • 持久化
      • 新队列(无法修改队列)配置为可持久化
      • 发送消息配置为持久化
      • 消息什么时候刷到磁盘?
        • 写入文件前会有一个Buffer,大小为1M,数据在写入文件时,首先会写入到这个Buffer,如果Buffer已满,则会将Buffer写入到文件(未必刷到磁盘)。
        • 固定的刷盘时间:25ms,也就是不管Buffer满不满,每个25ms,Buffer里的数据及未刷新到磁盘的文件内容必定会刷到磁盘。
        • 每次消息写入后,如果没有后续写入请求,则会直接将已写入的消息刷到磁盘:使用Erlang的receive x after 0实现,只要进程的信箱里没有消息,则产生一个timeout消息,而timeout会触发刷盘操作。

    常见问题

    • RabbitMQ 管理插件启动报错
      • 确认RabbitMQ服务是否启动
      • C:Windows目录下,将.erlang.cookie文件,拷贝到用户目录下 C:Users{用户名},这是Erlang的Cookie文件,允许与Erlang进行交互
      • 重新安装erl 和 rabbit,尽量不要带空格的路径
    • 修改配置文件
  • 相关阅读:
    Javascript设计模式学习(二)封装续
    Javascript设计模式学习(三)更多的高级样式
    【IBM Tivoli Identity Manager 学习文档】1 简介
    【读书笔记】测试驱动开发(中文版)
    【OpenCV学习】利用HandVu进行手部动作识别分析
    【生活】海淀驾校皮卡科目三实际道路考试备考
    【Linux开发技术之工具使用】配置VIM下编程和代码阅读环境
    【英语天天读】叶芝诗歌《当你老了》赏析——特别喜欢的一首诗,水木年华《一生有你》歌词来源
    【SIP协议】学习初学笔记
    【面向对象程序设计之CRC】CRC卡及其应用
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/6582811.html
Copyright © 2011-2022 走看看