zoukankan      html  css  js  c++  java
  • rabbitmq实现一台服务器同时给指定部分的consumer发送消息(tp框架)(第五篇)

    承接上一篇: http://www.cnblogs.com/spicy/p/7942521.html

    背景: 前面已经实现了一个rabbitmq服务器给所有的subscrible的consumer统一发消息

    目标:实现给指定部分consumer发放消息

    1,之前用到下面的绑定exchange的方式,意思就是:  该队列queue对来自该exchange消息非常感兴趣

      

    2,实际上这个方法课可以接受第三个参数 路由 routing_key, 意思是:一条消息会通过exchange交换机转发到路由

      

    但是第三个参数也是根据exchange的类型来决定,如果fanout类型,那么就会自动忽略第三个参数,所以现在用type是direct的交换机exchange

     一条发布的消息如果routing key是 orange 就会被路由到Q1队列, 同理 routing key是black 或者 green的会被路由到Q2而其他的消息就会被丢弃掉

     

     同一个binding key可以同时绑定给多个队列,这下下图发布一条消息如果routing key是black 就会同时发给2条队列

     实验: 指定routing key 发到对应名字的队列接受,不符合的不接受,从而某些消息只发送到指定的队列中

    1,tp 路由处理  下面task来发布消息, worker1 和 worker2 来消费消息

      

    2,发送队列消息的方法(下面的$serverity就是指定routing key的走向)

      

    public function task()
        {
            $connection = new AMQPStreamConnection('localhost', 5672, 'bitch', 'bitch');
            $channel = $connection->channel();
    
            #申明一个exchange名字叫logs,类型是fanout
            $channel->exchange_declare('direct_logs','direct',false,false,false);
    
            $severity = 'info';
            $msg = new AMQPMessage('123');
    
            #第二个参数是表示走什么exchange  第三个参数表示走什么队列
            $channel->basic_publish($msg, 'direct_logs',$severity);//$severity包括‘info’,‘waring’,‘error’
            echo '发送完毕';
    
            $channel->close();
            $connection->close();
        }
    View Code

    3,接受消息方法1:(只接受 info,waring,error的消息)

    public function worker2()
        {
            set_time_limit(0);
            $connection = new AMQPStreamConnection('localhost', 5672, 'bitch', 'bitch');
            $channel = $connection->channel();
    
            #申明一个exchange名字叫logs,类型是fanout
            $channel->exchange_declare('direct_logs','direct',false,false,false);
    
            #申明一个由服务器自动命名的队列,这个队列会在连接结束以后 自动断掉
            list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
    
            $severities = ['info','warning','error'];
    
            #把随机命名的队列绑定到绗棉新建的exchange,同时分配routing key
            foreach($severities as $severity) {
                $channel->queue_bind($queue_name, 'direct_logs', $severity);
            }
    
            #下面第四个参数如果为false表示开启确认模式,也就是消费以后会告知rabbitmq服务器该条消息已经处理完毕,这样可以方式消息处理一半挂掉了,结果服务器也删除了这条未处理完毕的消息
            $receiver = new self();
            $channel->basic_consume($queue_name, '', false, true, false, false, [$receiver, 'callFunc']);
    
            while(true) {
                $channel->wait();
            }
            $channel->close();
            $connection->close();
        }
    
            #下面第四个参数如果为false表示开启确认模式,也就是消费以后会告知rabbitmq服务器该条消息已经处理完毕,这样可以方式消息处理一半挂掉了,结果服务器也删除了这条未处理完毕的消息
            $receiver = new self();
            $channel->basic_consume($queue_name, '', false, true, false, false, [$receiver, 'callFunc']);
    
            while(true) {
                $channel->wait();
            }
            $channel->close();
            $connection->close();
        }
    View Code

    4,分别开启worker1  worker2

      

    5,先发一个 routing key 是info的消息

      

      发现二个worker1 worker2 都获取到了消息

      

    6,再发一个 routing key 是warning的消息 (更改task方法,把里面的)

     

      现在发现 只有worker2 获取到了消息,

     7,如果发一个 routing_key 是noting的 ,会发现 worker1 和 woker2 什么都没有收到。

      

  • 相关阅读:
    时间戳
    MD5加密、字节与字符串转换、对ToString("X2 ")的理解
    JWT(JSON Web Token)简介
    Entity Framework 通过主键查询提高效率
    C# switch语句与枚举类型
    对象映射库【AutoMapper】所支持场景
    关于EF框架EntityState的几种状态
    EF底层操作注意点、批量操作、更新
    linq:求分组后的每组最大值、最小值、平均值等、Aggregate聚合函数
    Flask——Request(2)
  • 原文地址:https://www.cnblogs.com/spicy/p/7989717.html
Copyright © 2011-2022 走看看