zoukankan      html  css  js  c++  java
  • RabbitMQ的安装与基本使用

      运行环境:https://oneinstack.com/install/

        在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。如发送短信、邮件、过滤非法关键字等等。它还可以用于RPC。

        先看一张官方图:

     

     
        一、概念:
         Broker:简单来说就是消息队列服务器实体。
       Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
       Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
       Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
       Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
       vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
       producer:消息生产者,就是投递消息的程序。
       consumer:消息消费者,就是接受消息的程序。
       channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
    二、安装RabbitMQ
         Ubuntu:
              sudo apt-get install erlang
              sudo apt-get install rabbitmq-server
         CentOS:
              1.先安装erlang
                   yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel 
                   yum -y install ncurses-devel 
                   yum install ncurses-devel 
        cd /usr/local
                   wget http://www.erlang.org/download/otp_src_17.5.tar.gz
                   tar -xzvf otp_src_17.5.tar.gz 
                   cd otp_src_17.5
                   ./configure --without-javac
                   make && make install
                   测试一下是否安装成功,在控制台输入命令erl
        安装完后输入“erl”以下提示即为安装成功
    [root@cloud bin]# erl
    Erlang R16B02 (erts-5.10.3) [source] [64-bit] [smp:2:2] [async-threads:10] [hipe] [kernel-poll:false]
     
    Eshell V5.10.3  (abort with ^G)
    1>
              2.安装rabbitmq
                   cd /usr/local
                   tar -zxvf rabbitmq-server-3.5.1.tar.gz 
                   cd rabbitmq-server-3.5.1
                   make
                   make TARGET_DIR=/usr/local/rabbitmq SBIN_DIR=/usr/local/rabbitmq/sbin MAN_DIR=/usr/local/rabbitmq/man DOC_INSTALL_DIR=/usr/local/rabbitmq/doc install
                   报错处理:
                        /bin/sh: xmlto: command not found
                        /bin/sh: line 2: xmlto: command not found
                       解决:yum install xmlto
    三、管理命令
      切换到安装目录,sbin文件下才能执行命令,如:cd /usr/local/rabbitmq/sbin/
           启动:./rabbitmq-server start
           后台启动:./rabbitmq-server -detached
           关闭:./rabbitmqctl stop
           状态:./rabbitmqctl status
    四、插件
        启动web管理插件,切换到安装目录,sbin文件下才能执行命令,如:cd /usr/local/rabbitmq/sbin/
            ./rabbitmq-plugins enable rabbitmq_management
            错误解决:
                Error: {cannot_write_enabled_plugins_file,"/etc/rabbitmq/enabled_plugins",            enoent}
                mkdir /etc/rabbitmq
                重新启动输入地址:localhost:15672,帐号默认为guest,密码guest,此帐号默认只能在本机访问。不建议打开远程访问。你可以创建一个帐户,并设置可以远程访问的角色进行访问。
                如:./rabbitmqctl add_user luo 123456
                ./rabbitmqctl  set_user_tags  luo administrator
    五、用户管理
         默认的guest帐户相当于root帐户
         rabbitmqctl add_user username password 添加帐户
         rabbitmqctl change_password username newpassword 修改密码
         rabbitmqctl delete_user username 删除帐户
         rabbitmqctl list_users 列出所有帐户
         rabbitmqctl  set_user_tags  User  Tag 设置角色(administrator、monitoring、policymaker、management、其它)
         立即生效,不需重启
     六、安装PHP扩展(rabbitmq-c,amqp)
        
     1.安装rabbitmq-c
     cd /usr/local
        wget https://github.com/alanxz/rabbitmq-c/releases/download/v0.8.0/rabbitmq-c-0.8.0.tar.gz 
      tar zxvf rabbitmq-c-0.8.0.tar.gz
        cd  rabbitmq-c-0.8.0
        autoreconf -i
        ./configure --prefix=/usr/local/rabbitmq-c
    最后显示一下内容表示正常
    rabbitmq-c build options:
    Host: x86_64-unknown-linux-gnu
    Version: 0.4.1
    SSL/TLS: openssl
    Tools: yes
    Documentation: no
    Examples: yes
      然后进行make和安装了.
      make && make install
       2.安装amqp
        cd /usr/local
        wget https://pecl.php.net/get/amqp-1.9.1.tgz
        tar -zxvf amqp-1.9.1.tgz 
        cd amqp-1.9.1
         /usr/local/php/bin/phpize( 可使用find / -name phpize查找phpize路径 )
       ./configure --with-php-config=/usr/local/php/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/local/rabbitmq-c(可使用find / -name php-config查找php-config路径)
    make && make install
       在php.ini中extenstion部分写入extension=amqp.so
       重启php,/etc/init.d/php-fpm restart
        最后phpinfo检查搜索amqp是否成功
    七、使用PHP与之交互
         Produce端:
    //设置连接属性  
    $connArgs = array(  
         'host'           => 'localhost',  
         'port'           => '5672',  
         'login'           => 'guest',  
         'password' => 'guest',  
         'vhost'          => '/'  
    );  
      
    //创建RabbitMQ连接  
    $conn = new AMQPConnection($connArgs);  
    if (!$conn->connect()) {  
        echo "Cannot connect to the broker <br/>
     ";  
    }  
      
    //创建channel  
    $channel = new AMQPChannel($conn);  
      
    //创建exchange  
    $exchangeName = 'exchange';  
    $exchange = new AMQPExchange($channel);  
    $exchange->setName($exchangeName);//创建名字  
    $exchange->setType(AMQP_EX_TYPE_DIRECT);//设置为direct类型  
    $exchange->setFlags(AMQP_DURABLE);//持久化交换机,当代理重启动后依然存在,并包括它们中的完整数据  
    $exchange->setFlags(AMQP_AUTODELETE);//对交换机而言,自动删除标志表示交换机将在没有队列绑定的情况下被自动删除,如果从没有队列和其绑定过,这个交换机将不会被删除.  
      
    //发送消息  
    $routingKey = 'key_test';  
    for($i=0;$i<10;$i++){  
         sleep(3);  
         $message = 'Hello World '.$i;  
        $exchange->publish($message,$routingKey);  
    }  
      
    //断开连接  
    $conn->disconnect();  

     

    Consumer端:
    //设置连接属性  
    $connArgs = array(  
                        'host'           => 'localhost',  
                        'port'           => '5672',  
                        'login'           => 'guest',  
                        'password' => 'guest',  
                        'vhost'          => '/'  
    );  
      
    //创建RabbitMQ连接  
    $conn = new AMQPConnection($connArgs);  
    if (!$conn->connect()) {  
         echo 'cannot connect to the broker';  
    }  
      
    //创建channel  
    $channel = new AMQPChannel($conn);  
      
    //设置队列名称  
    $exchangeName = 'exchange';  
    $routingKey = 'key_test';      
    $queueName = 'queue_test_1';  
    $queue = new AMQPQueue($channel);  
    $queue->setName($queueName);//设置名称  
    $queue->setFlags(AMQP_DURABLE);//持久化队列,当代理重启动后依然存在,并包括它们中的完整数据  
    $queue->declare();//声明此队列  
    $queue->bind('exchange',$routingKey);//使用某交换机,并绑定某路由关键字  
      
    //消费数据方式一:非阻塞方式  
    // while(true)  
    // {  
    //      sleep(1);  
    //      $envelope = $queue->get(AMQP_AUTOACK) ;//第一个参数表示自动ACK应答  
    //      if ($envelope){  
    //           $messages = $envelope->getBody();  
    //           echo $messages;  
    //      }  
    // }  
      
      
    //消费数据方式二:以阻塞模式消费数据(推荐)  
    while(true)  
    {  
              $queue->consume('processMessage');//第一个参数表示要回调的方法名,第二个参数设置为AMQP_AUTOACK,表示自动ACK应答  
    }  
    /** 
    * 定义回调方法 
    */  
    function processMessage($envelope, $queue)  
    {  
         $messages = $envelope->getBody();#获取消息数据  
         echo $messages;  
         $queue->ack($envelope->getDeliveryTag()); //处理成功后,手动发送ACK应答  
         //$queue->nack($envelope->getDeliveryTag()); //处理不成功,手动发送NO-ACK应答,放回队列中   
    }  

         八:特别说明:
         1.如果某一次消费数据没有ACK,则此条消息会记录为Unacknowledged,如果对于某一节点有连续三条则RabbitMQ认为此节点有故障,则不会再对它进行分发.当它断开后或一定时间后Unacknowledged状态消息会重新放回交换机中。手动no-ack后此Message将会放回队列中且不会再转发给此节点。
         2.对于计算密集型的工作,我们需要建立多个Consumer,对于多个Consumer,默认的分发机制是“公平分发”,将第n个发给第n个Consumer,超过个数取n的模。
         3.Exchange中的类型有:direct, topic 和fanout。
              direct:通过routingKey和exchange决定的那个唯一的queue可以接收消息。
                  topic :所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息,direct的区别是它的routingkey可 以模糊匹配,#代表一个或多个字符,*代表任何字符。一般为确保程序严谨性而使用direct。注意当使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout
              fanout:是广播模式,所有bind到此exchange的queue都可以接收消息,即该一个Message可以对应多个Consumer,应用场景举例:生产了了添加到购物车的Message,一个Consumer写推荐商品日志,一个Consumer写购物车表。
         4.两个不相同的queue名的队列绑定到同一exchange和rotingky上,此时相当于同一queue名的fanout广播模式,两个queue都会得都到
    所有Message。如图所示:
    5.RPC的实现流程:
  • 相关阅读:
    Cannot find a free socket for the debugger
    maven自己创建jar坐标(比如oracle驱动包)
    svn“Previous operation has not finished; run 'cleanup' if it was interrupted“报错的解决方法
    java发http请求
    Spring MVC
    函数节流 防抖
    js 执行顺序
    铺平多维对象数组 js
    vue 线上图标404
    vue-devtools--------vue浏览器工具
  • 原文地址:https://www.cnblogs.com/-mrl/p/7690693.html
Copyright © 2011-2022 走看看