zoukankan      html  css  js  c++  java
  • rabbitMQ 通过Topic模式分发消息

    Direct和Topic两种模式类似Mysql语言中精确和模糊查询,在Topic模式下有两个特殊字符,类似MySQL “%” 字符

      * (星号) 代表任意 一个单词

      # (井号) 0个或者多个单词

    Topic模式可以很好的用于多维度场景。一个日志模块来收集处理不同的日志,日志区分包含三个维度的标准:模块、日志紧急程度、日志重要程度。模块分为:red、black、white;紧急程度分为:critical、normal;把重要程度分为:medium、low、high在RoutingKey字段中我们把这三个维度通过两个“.“连接起来。
    现在我们需要对black模块,紧急程度为critical,重要程度为high的日志分配到队列1打印到屏幕;对所以模块重要程度为high的日志和white紧急程度为critical的日志发送到队列2持久化

    • RoutingKey为“black.critical.high”的日志会投递到queue1和queue2,。

    • RoutingKey为“red.critical.high”的日志会只投递到queue2。

    • RoutingKey为“white.critical.high”的日志会投递到queue2,并且虽然queue2的两个匹配规则都符合但只会向queue2投递一份。

    测试结果

    生产者:topic.php

    <?php
    // 生产者 p.php
    //配置信息
    $config = [
        'host'     => 'localhost',
        'port'     => '5672',
        'login'    => 'guest',
        'password' => 'guest',
        'vhost'    => '/'
    ];
    
    $exchangeName = 'e_topic';
    $routeKey1    = "black.critical.high";
    $routeKey2    = "red.critical.high";
    $routeKey3    = "white.critical.high";
    
    $message1 = 'black-critical-high!';
    $message2 = 'red-critical-high!';
    $message3 = 'white-critical-high!';
    
    //创建连接和channel
    $connect = new AMQPConnection($config);
    
    if (!$connect->connect()) {
        die("Cannot connect to the broker!
    ");
    }
    
    $channel  = new AMQPChannel($connect);
    $exchange = new AMQPExchange($channel);
    
    $exchange->setName($exchangeName);
    $exchange->setType(AMQP_EX_TYPE_TOPIC);
    
    // 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘
    $exchange->setFlags(AMQP_DURABLE);
    
    // 声明交换机
    $exchange->declareExchange();
    
    $exchange->publish($message1, $routeKey1);
    $exchange->publish($message2, $routeKey2);
    $exchange->publish($message3, $routeKey3);
    

      

    消费者:c_topic1.php

    <?php
    // 消费者 c.php
    //配置信息
    $config = [
        'host'     => 'localhost',
        'port'     => '5672',
        'login'    => 'guest',
        'password' => 'guest',
        'vhost'    => '/'
    ];
    
    $exchangeName = 'e_topic';
    $queueName    = 'log_1';
    $routeKey     = 'black.critical.high';
     
    //创建连接和channel
    $connect = new AMQPConnection($config);
    
    if (!$connect->connect()) {
        die("Cannot connect to the broker!
    ");
    }
    
    $channel  = new AMQPChannel($connect);
    $exchange = new AMQPExchange($channel);
    
    $exchange->setName($exchangeName);
    $exchange->setType(AMQP_EX_TYPE_TOPIC);
    
    // 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘
    $exchange->setFlags(AMQP_DURABLE);
    
    // 声明交换机
    $exchange->declareExchange();
    
    // 创建消息队列
    $queue = new AMQPQueue($channel);
    $queue->setName($queueName);
    
    // 设置持久性
    $queue->setFlags(AMQP_DURABLE);
    
    // 声明消息队列
    $queue->declareQueue();
    
    $queue->bind($exchange->getName(), $routeKey);
    
    // 接收消息并处理回调
    $queue->consume('receive');
    
    // 处理回调的方法
    function receive($envelop, $queue){
        echo $envelop->getBody() . "
    ";
    
        // ACK 通知生产者任务完成
        $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM);
    }
    

    消费者:c_topic2.php

    <?php
    // 消费者 c.php
    //配置信息
    $config = [
        'host'     => 'localhost',
        'port'     => '5672',
        'login'    => 'guest',
        'password' => 'guest',
        'vhost'    => '/'
    ];
    
    $exchangeName = 'e_topic';
    $queueName    = 'log2';
    $routeKey     = '#.high';
     
    //创建连接和channel
    $connect = new AMQPConnection($config);
    
    if (!$connect->connect()) {
        die("Cannot connect to the broker!
    ");
    }
    
    $channel  = new AMQPChannel($connect);
    $exchange = new AMQPExchange($channel);
    
    $exchange->setName($exchangeName);
    $exchange->setType(AMQP_EX_TYPE_TOPIC);
    
    // 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘
    $exchange->setFlags(AMQP_DURABLE);
    
    // 声明交换机
    $exchange->declareExchange();
    
    // 创建消息队列
    $queue = new AMQPQueue($channel);
    $queue->setName($queueName);
    
    // 设置持久性
    $queue->setFlags(AMQP_DURABLE);
    
    // 声明消息队列
    $queue->declareQueue();
    
    $queue->bind($exchange->getName(), $routeKey);
    
    // 接收消息并处理回调
    $queue->consume('receive');
    
    // 处理回调的方法
    function receive($envelop, $queue){
        echo $envelop->getBody() . "
    ";
    
        // ACK 通知生产者任务完成
        $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM);
    }
    

      

    消费者:c_topic3.php

    <?php
    // 消费者 c.php
    //配置信息
    $config = [
        'host'     => 'localhost',
        'port'     => '5672',
        'login'    => 'guest',
        'password' => 'guest',
        'vhost'    => '/'
    ];
    
    $exchangeName = 'e_topic';
    $queueName    = 'log3';
    $routeKey     = 'white.critical.*';
     
    //创建连接和channel
    $connect = new AMQPConnection($config);
    
    if (!$connect->connect()) {
        die("Cannot connect to the broker!
    ");
    }
    
    $channel  = new AMQPChannel($connect);
    $exchange = new AMQPExchange($channel);
    
    $exchange->setName($exchangeName);
    $exchange->setType(AMQP_EX_TYPE_TOPIC);
    
    // 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘
    $exchange->setFlags(AMQP_DURABLE);
    
    // 声明交换机
    $exchange->declareExchange();
    
    // 创建消息队列
    $queue = new AMQPQueue($channel);
    $queue->setName($queueName);
    
    // 设置持久性
    $queue->setFlags(AMQP_DURABLE);
    
    // 声明消息队列
    $queue->declareQueue();
    
    $queue->bind($exchange->getName(), $routeKey);
    
    // 接收消息并处理回调
    $queue->consume('receive');
    
    // 处理回调的方法
    function receive($envelop, $queue){
        echo $envelop->getBody() . "
    ";
    
        // ACK 通知生产者任务完成
        $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM);
    }
    

      

      

  • 相关阅读:
    MongoDB数据库学习总结
    windows及linux环境下永久修改pip镜像源的方法
    enumerate枚举函数
    sum求和函数
    zip拉链函数
    python实现面对面快传
    redis实现发布订阅
    python自定义classmethod
    Nginx为什么支持那么高的并发量?
    java进阶(28)--Map集合
  • 原文地址:https://www.cnblogs.com/xiangdongsheng/p/14259590.html
Copyright © 2011-2022 走看看