zoukankan      html  css  js  c++  java
  • RabbitMQ消息队列-通过fanout模式将消息推送到多个Queue中

    介绍如何使用fanout模式将消息推送到多个队列。
    有时我们会遇到这样的情况,多个功能模块都希望得到完整的消息数据。例如一个log的消息,一个我们希望输出在屏幕上实时监控,另外一个用户持久化日志。这时就可以使用fanout模式。fanout模式模式不像direct模式通过routingkey来进行匹配,而是会把消息发送到所以的已经绑定的队列中。

     可以看到ca和cb收到的消息完全一致。注意以上代码fanout.php中并没有新建队列,所以先运行fanout_ca.php和fanout_cb.php的脚本,如果先运行fanout.php因为找不到绑定的队列数据就会丢失。 

    <?php

    /*
    * RabbitMQ fanout
    * create by superid
    * 新建fanout.php用来发布消息。fanout_ca.php 和 fanout_cb.php 用来订阅不同队列消费消息。
    */

    $exchangeName = 'log';
    $message = 'log--';
    $conn_args = array(
      'host' => '127.0.0.1',
      'port' => '5672',
      'login' => 'guest',
      'password' => 'guest',
      'vhost'=>'/'
    );
    $connection = new AMQPConnection($conn_args);
    $connection->connect() or die("Cannot connect to the broker! ");
    try {
      $channel = new AMQPChannel($connection); // 建立一个 Channel信道

      $exchange = new AMQPExchange($channel); //创建交换机
      $exchange->setName($exchangeName); //设置交换机的名字
      $exchange->setType(AMQP_EX_TYPE_FANOUT); //fanout类型 Exchange 类型: direct fanout topic
      $exchange->setFlags(AMQP_DURABLE); //持久化
      $exchange->declareExchange();

      for($i=1 ; $i<=20;$i++){
        //Fanout类型最简单,这种模型忽略routingkey
        $exchange->publish($message.$i, "");
        var_dump("[x] Sent $message $i");
      }
    } catch (AMQPConnectionException $e) {
      var_dump($e);
      exit();
    }
    $connection->disconnect();

    ?>

    订阅者:

    fanout_ca.php

    <?php
    /*
    * RabbitMQ fanout 模式
    * create by superrd
    */

    $exchangeName = 'log';
    $queueName = 'queuea';
    $routeKey = '';
    $conn_args = array(
      'host' => '127.0.0.1',
      'port' => '5672',
      'login' => 'guest',
      'password' => 'guest',
      'vhost'=>'/'
    );
    $connection = new AMQPConnection($conn_args);
    $connection->connect() or die("Cannot connect to the broker! ");

    $channel = new AMQPChannel($connection);

    $exchange = new AMQPExchange($channel);
    $exchange->setName($exchangeName);
    $exchange->setType(AMQP_EX_TYPE_FANOUT);
    $exchange->setFlags(AMQP_DURABLE);
    $exchange->declareExchange();

    $queue = new AMQPQueue($channel);
    $queue->setName($queueName);
    $queue->setFlags(AMQP_DURABLE);
    echo "Message Total:".$queue->declareQueue()." "; //队列中消息的总量
    $queue->bind($exchangeName, $routeKey);

    //阻塞模式接收消息
    echo "Message: ";
    while(True){
      $queue->consume('processMessage');
      //$queue->consume('processMessage', AMQP_AUTOACK); //自动ACK应答
    }

    $conn->disconnect();
    /**
    * 消费回调函数
    * 处理消息
    */
    function processMessage($envelope, $queue) {
      $msg = $envelope->getBody();
      echo $msg." "; //处理消息
      $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
    }

     ?>

    订阅者:

    fanout_cb.php

    <?php
    /*
    * RabbitMQ fanout 模式
    * create by superrd
    */

    $exchangeName = 'log';
    $queueName = 'queueb';
    $routeKey = ''; //Fanout类型最简单,这种模型忽略routingkey
    $conn_args = array(
      'host' => '127.0.0.1',
      'port' => '5672',
      'login' => 'guest',
      'password' => 'guest',
      'vhost'=>'/'
    );

    $connection = new AMQPConnection($conn_args);
    $connection->connect() or die("Cannot connect to the broker! ");

    $channel = new AMQPChannel($connection);

    $exchange = new AMQPExchange($channel);
    $exchange->setName($exchangeName);
    $exchange->setType(AMQP_EX_TYPE_FANOUT);
    $exchange->setFlags(AMQP_DURABLE);
    $exchange->declareExchange();

    $queue = new AMQPQueue($channel);
    $queue->setName($queueName);
    $queue->setFlags(AMQP_DURABLE);
    echo "Message Total:".$queue->declareQueue()." "; //队列中消息的总量
    $queue->bind($exchangeName, $routeKey);

    //阻塞模式接收消息
    echo "Message: ";
    while(True){
      $queue->consume('processMessage');
      //$queue->consume('processMessage', AMQP_AUTOACK); //自动ACK应答
    }

    $conn->disconnect();
    /**
    * 消费回调函数
    * 处理消息
    */
    function processMessage($envelope, $queue) {
      $msg = $envelope->getBody();
      // sleep(1);
      echo $msg." "; //处理消息
      $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
    }

    ?>

  • 相关阅读:
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    互联网大厂CTO详解5大软件架构,看完这篇你就秒懂
    5-10年的DBA如何独当一面?这10个建议送给你(附图书工具推荐)
    2020 从新开始:你应该知道的Oracle认证新变化
    Centos7部署NFS实战
    你的公司,远程办公多久了?
    PostgreSQL的几种分布式架构对比
    数据库周刊 | DBA 核心技能
    理解redis 分布式中的分片机制
  • 原文地址:https://www.cnblogs.com/maomaochong123/p/8836447.html
Copyright © 2011-2022 走看看