zoukankan      html  css  js  c++  java
  • rabbitmq实现一台服务器同时给指定部分的consumer发送消息(tp框架)(第六篇)

    previous article:  http://www.cnblogs.com/spicy/p/7989717.html

    上一篇学习了,发送消息的时候用direct类型的exchange,绑定不同的路由信息比如 info、warning、error 

           接受消息的时候,通过queue_bind("队列名字“,“exchange名字”,“想监听的routing”) 方法 就可以接受想监听的对应路由关键字的信息(info,warning,error)

    这次,我们使用exchange 类型是topic的方式,然后路由可以更灵活的走到对应的队列

       “quick.orange.rabbit”  会走到Q1 和 Q2

        “lazy.orange.elephant” 会走到Q1 和 Q2

       "quick.orange.fox"  只走Q1

       lazy.brown.fox  只去Q2

               lazy.pink.rabbit  虽然符合二次 但是只会发送一次到Q2

       quick.brown.fox 不会去任何队列

       lazy.orange.male.rabbit 会去Q2

    知识点: *表示任意一个字符  #表示所有字符

    发送和接受代码代码:

     public function worker1()
        {
            set_time_limit(0);
            $connection = new AMQPStreamConnection('localhost', 5672, 'bitch', 'bitch');
            $channel = $connection->channel();
    
            #申明一个exchange名字叫logs,类型是fanout
            $channel->exchange_declare('topic_logs','topic',false,false,false);
    
            #申明一个由服务器自动命名的队列,这个队列会在连接结束以后 自动断掉
            list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
    
            $binding_keys  =['*'];
    
            #把随机命名的队列绑定到绗棉新建的exchange,同时分配routing key
            foreach($binding_keys as $binding_key) {
                $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
            }
    
            #下面第四个参数如果为false表示开启确认模式,也就是消费以后会告知rabbitmq服务器该条消息已经处理完毕,这样可以方式消息处理一半挂掉了,结果服务器也删除了这条未处理完毕的消息
            $receiver = new self();
            $channel->basic_consume($queue_name, '', false, true, false, false, [$receiver, 'callFunc']);
    
            while(true) {
                $channel->wait();
            }
            $channel->close();
            $connection->close();
        }
        public function worker2()
        {
            set_time_limit(0);
            $connection = new AMQPStreamConnection('localhost', 5672, 'bitch', 'bitch');
            $channel = $connection->channel();
    
            #申明一个exchange名字叫logs,类型是fanout
            $channel->exchange_declare('topic_logs','topic',false,false,false);
    
            #申明一个由服务器自动命名的队列,这个队列会在连接结束以后 自动断掉
            list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
    
            $binding_keys  = ['a.#'];
    
            #把随机命名的队列绑定到绗棉新建的exchange,同时分配routing key
            foreach($binding_keys as $binding_key) {
                $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
            }
    
            #下面第四个参数如果为false表示开启确认模式,也就是消费以后会告知rabbitmq服务器该条消息已经处理完毕,这样可以方式消息处理一半挂掉了,结果服务器也删除了这条未处理完毕的消息
            $receiver = new self();
            $channel->basic_consume($queue_name, '', false, true, false, false, [$receiver, 'callFunc']);
    
            while(true) {
                $channel->wait();
            }
            $channel->close();
            $connection->close();
        }
    
        public function task()
        {
            $connection = new AMQPStreamConnection('localhost', 5672, 'bitch', 'bitch');
            $channel = $connection->channel();
    
            #申明一个exchange名字叫topic_logs,类型是topic
            $channel->exchange_declare('topic_logs','topic',false,false,false);
    
            $routing_key  = 'a.b.c';
            $msg = new AMQPMessage('4');
    
            #第二个参数是表示走什么exchange  第三个参数表示路由到什么队列
            $channel->basic_publish($msg, 'topic_logs',$routing_key);
            echo '发送完毕';
    
            $channel->close();
            $connection->close();
        }
    View Code
  • 相关阅读:
    java加载类的方法1.classloader 2.class.forName()
    servlet与线程与jdbc connection的关系
    static再次深入理解
    多线程读某个共享变量有时候也要给读方法加锁
    多线程读一个全局变量要不要加锁?还是说只是当修改全局变量的时候才要加锁?
    接口耗时打印并统计
    Java从设计模式[本场比赛状态转换武器]状态分析(State)模式
    Openstack中间DVR Part1 -- 东西走向的交通处理
    写酷“大神”的公开信
    从反思谈论阵列和指针的几个问题,腾讯的笔名
  • 原文地址:https://www.cnblogs.com/spicy/p/8006405.html
Copyright © 2011-2022 走看看