zoukankan      html  css  js  c++  java
  • rabbitmq实现一台服务器同时给指定部分的consumer发送消息(tp框架)(第六篇)

    previous article:  http://www.cnblogs.com/spicy/p/7989717.html

    上一篇学习了,发送消息的时候用direct类型的exchange,绑定不同的路由信息比如 info、warning、error 

           接受消息的时候,通过queue_bind("队列名字“,“exchange名字”,“想监听的routing”) 方法 就可以接受想监听的对应路由关键字的信息(info,warning,error)

    这次,我们使用exchange 类型是topic的方式,然后路由可以更灵活的走到对应的队列

       “quick.orange.rabbit”  会走到Q1 和 Q2

        “lazy.orange.elephant” 会走到Q1 和 Q2

       "quick.orange.fox"  只走Q1

       lazy.brown.fox  只去Q2

               lazy.pink.rabbit  虽然符合二次 但是只会发送一次到Q2

       quick.brown.fox 不会去任何队列

       lazy.orange.male.rabbit 会去Q2

    知识点: *表示任意一个字符  #表示所有字符

    发送和接受代码代码:

     public function worker1()
        {
            set_time_limit(0);
            $connection = new AMQPStreamConnection('localhost', 5672, 'bitch', 'bitch');
            $channel = $connection->channel();
    
            #申明一个exchange名字叫logs,类型是fanout
            $channel->exchange_declare('topic_logs','topic',false,false,false);
    
            #申明一个由服务器自动命名的队列,这个队列会在连接结束以后 自动断掉
            list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
    
            $binding_keys  =['*'];
    
            #把随机命名的队列绑定到绗棉新建的exchange,同时分配routing key
            foreach($binding_keys as $binding_key) {
                $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
            }
    
            #下面第四个参数如果为false表示开启确认模式,也就是消费以后会告知rabbitmq服务器该条消息已经处理完毕,这样可以方式消息处理一半挂掉了,结果服务器也删除了这条未处理完毕的消息
            $receiver = new self();
            $channel->basic_consume($queue_name, '', false, true, false, false, [$receiver, 'callFunc']);
    
            while(true) {
                $channel->wait();
            }
            $channel->close();
            $connection->close();
        }
        public function worker2()
        {
            set_time_limit(0);
            $connection = new AMQPStreamConnection('localhost', 5672, 'bitch', 'bitch');
            $channel = $connection->channel();
    
            #申明一个exchange名字叫logs,类型是fanout
            $channel->exchange_declare('topic_logs','topic',false,false,false);
    
            #申明一个由服务器自动命名的队列,这个队列会在连接结束以后 自动断掉
            list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
    
            $binding_keys  = ['a.#'];
    
            #把随机命名的队列绑定到绗棉新建的exchange,同时分配routing key
            foreach($binding_keys as $binding_key) {
                $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
            }
    
            #下面第四个参数如果为false表示开启确认模式,也就是消费以后会告知rabbitmq服务器该条消息已经处理完毕,这样可以方式消息处理一半挂掉了,结果服务器也删除了这条未处理完毕的消息
            $receiver = new self();
            $channel->basic_consume($queue_name, '', false, true, false, false, [$receiver, 'callFunc']);
    
            while(true) {
                $channel->wait();
            }
            $channel->close();
            $connection->close();
        }
    
        public function task()
        {
            $connection = new AMQPStreamConnection('localhost', 5672, 'bitch', 'bitch');
            $channel = $connection->channel();
    
            #申明一个exchange名字叫topic_logs,类型是topic
            $channel->exchange_declare('topic_logs','topic',false,false,false);
    
            $routing_key  = 'a.b.c';
            $msg = new AMQPMessage('4');
    
            #第二个参数是表示走什么exchange  第三个参数表示路由到什么队列
            $channel->basic_publish($msg, 'topic_logs',$routing_key);
            echo '发送完毕';
    
            $channel->close();
            $connection->close();
        }
    View Code
  • 相关阅读:
    高级特性(4)- 数据库编程
    UVA Jin Ge Jin Qu hao 12563
    UVA 116 Unidirectional TSP
    HDU 2224 The shortest path
    poj 2677 Tour
    【算法学习】双调欧几里得旅行商问题(动态规划)
    南洋理工大学 ACM 在线评测系统 矩形嵌套
    UVA The Tower of Babylon
    uva A Spy in the Metro(洛谷 P2583 地铁间谍)
    洛谷 P1095 守望者的逃离
  • 原文地址:https://www.cnblogs.com/spicy/p/8006405.html
Copyright © 2011-2022 走看看