zoukankan      html  css  js  c++  java
  • RabbitMQ消息队列 基本订阅/发布Demo(PHP版)

    <?php

    /*
    * 发布-订阅
    * create by superid
    */

      $queueName = 'superid';
      $exchangeName = 'superid';
      $routeKey = 'superid';

      $conn_args = array(
        'host' => '127.0.0.1',
        'port' => '5672',
        'login' => 'guest',
        'password' => 'guest',
        'vhost'=>'/'
      );
      $connection = new AMQPConnection($conn_args); //创建连接
      $connection->connect() or die("Cannot connect to the broker! ");
      try {
        $channel = new AMQPChannel($connection); // 建立一个 Channel信道

        $exchange = new AMQPExchange($channel); //创建交换机对象
        $exchange->setName($exchangeName);
        $exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型
        $exchange->setFlags(AMQP_DURABLE); //持久化
        $exchange->declareExchange();

        $queue = new AMQPQueue($channel);
        $queue->setName($queueName);
        $queue->setFlags(AMQP_DURABLE); //持久化队列,并不表示消息也会持久化
        $queue->declareQueue();

        $queue->bind($exchangeName, $routeKey); //绑定交换机与队列,并指定路由键

        for($i=0; $i<50000; ++$i){

          $message = json_encode(array(
            'id' => $i,
            'name' => 'liming',
            'note' => '这是一个备注',
          ));

          //Direct类型是使用最多的,使用确定的routingkey。
          //这种模型下,接收消息时绑定'routeKey'则只接收routeKey的消息

          $result = $exchange->publish($message,$routeKey);
          var_dump("[{$result}-{$i}]");
        }

      } catch (AMQPConnectionException $e) {
        var_dump($e);
        exit();
      }
      $connection->disconnect();

    ?>

    <?php
    /*
    * 发布-订阅
    * create by superid
    */


    $queueName = 'superid';
    $exchangeName = 'superid';
    $routeKey = 'superid';
    $conn_args = array(
      'host' => '127.0.0.1',
      'port' => '5672',
      'login' => 'guest',
      'password' => 'guest',
      'vhost'=>'/'
    );
    $connection = new AMQPConnection($conn_args);
    $connection->connect() or die("Cannot connect to the broker! ");

    $channel = new AMQPChannel($connection); // 建立一个 Channel信道

    //如果Consumer数量很多或者希望每个Consumer同时只处理一个任务
    //可以通过在Consumer中设置PrefetchCount来实现更加均匀的任务分发。,收到ack应答,才会发生消息,如果有一条没有收到ack,则不会发送消息了
    $channel->setPrefetchCount(1);

    $exchange = new AMQPExchange($channel); //创建交换机
    $exchange->setName($exchangeName); //设置交换机的名字
    $exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型 Exchange 类型: direct fanout topic
    $exchange->setFlags(AMQP_DURABLE); //持久化
    echo "Exchange Status:".$exchange->declareExchange()." ";

    $queue = new AMQPQueue($channel); //创建队列
    $queue->setName($queueName); //设置队列的名称
    $queue->setFlags(AMQP_DURABLE); //设置持久化队列
    echo "Message Total:".$queue->declareQueue()." "; //队列中消息的总量

    $queue->bind($exchangeName, $routeKey);

    //阻塞模式接收消息

    echo "Message: ";
    while(True){
      $queue->consume('processMessage');
      //自动ACK应答
      //$queue->consume('processMessage', AMQP_AUTOACK);
    }

    $conn->disconnect();

    /**
      * 消费回调函数
      * 处理消息
      * *需要注意的地方是:queue对象有两个方法可用于取消息: consume 和 get 。
      *前者是阻塞的,无消息时会被挂起,适合循环中使用;后者则是非阻塞的,
      *取消息时有则取,无则返回false。
    */
    function processMessage($envelope, $queue) {
      $msg = $envelope->getBody();
      echo $msg." "; //处理消息
      //手动发送ACK应答, 如果不进行确认,消息会积累越来越多,产生严重bug
      $queue->ack($envelope->getDeliveryTag());
    }

    ?>

    备注:

    发送消息时,只要有“交换机”就够了。至于交换机后面有没有对应的处理队列,发送方是不用管的。

    对于交换机,有两个重要的概念:

    A,类型。有三种类型: Fanout类型最简单,这种模型忽略routingkey;Direct类型是使用最多的,使用确定的routingkey。这种模型下,接收消息时绑定'key_1'则只接收key_1的消息;最后一种是Topic,这种模式与Direct类似,但是支持通配符进行匹配,比如: 'key_*',就会接受key_1和key_2。Topic貌似美好,但是有可能导致不严谨,所以还是推荐使用Direct。

    B,持久化。指定了持久化的交换机,在重新启动时才能重建,否则需要客户端重新声明生成才行。

    需要特别明确的概念:交换机的持久化,并不等于消息的持久化。只有在持久化队列中的消息,才能持久化;如果没有队列,消息是没有地方存储的;消息本身在投递时也有一个持久化标志的,PHP中默认投递到持久化交换机就是持久的消息,不用特别指定

    queue: 队列

    事实上,队列仅是针对接收方(consumer)的,由接收方根据需求创建的。只有队列创建了,交换机才会将新接受到的消息送到队列中,交换机是不会在队列创建之前的消息放进来的。换句话说,在建立队列之前,发出的所有消息都被丢弃了。

    下面这个图比RabbitMQ官方的图更清楚——Queue是属于ReceiveMessage的一部分。

  • 相关阅读:
    向现有的磁盘组加入/删除ASM磁盘
    词语相似度计算
    C++ STL algorithm 列表
    ORACLE内核参数
    一款好的UI草图设计软件
    (转)svn检出的时候报 Unable to connect to a repository at URL错误
    HTTP 错误 500.19 – Internal Server Error web.config 文件的 system.webServer/httpErrors 节中不允许绝对物理路径“C:\inetpub\custerr”。
    使用SVN后系统变慢的解决方法
    oracle 11g ORA12541: TNS: 无监听程序 (DBD ERROR: OCIServerAttach)
    [转载]无法删除oci.dll文件的解决办法
  • 原文地址:https://www.cnblogs.com/maomaochong123/p/8836083.html
Copyright © 2011-2022 走看看