zoukankan      html  css  js  c++  java
  • rabbitmq实现一台服务器同时给所有的consumer发送消息(tp框架)(第四篇)

    之前的学习了把消息直接publish到queue里面,然后consume掉,

    真实的情况,我们会把消息先发送到exchange里面,由它来处理,是发给某一个队列,还是发给某些队列,还是丢弃掉?

    exchange类型: direct,topic,headers,fanout

    下面以fanout为例子(把收到的消息,全部发给所有的队列)

    如何查看服务器上面的所有的exchanges?

     sudo rabbitmqctl list_exchanges

    如何查看服务器上面的所有的binding关系?

     sudo rabbitmqctl list_bindings

    前面几章消息发送是把$msg消息通过默认(如果第二个参数为‘ ’)的exchange发送给hello队列

    $channel->basic_publish($msg, '', 'hello');

     为了实现一对所有发送同一个消息

    第一步:修改发送消息部分,这里basic-publish没有传递第三个参数,因为这里的消息是要发送给每个队列?

    第二步:保证每次连接rabbit 都会新产生一个全新的 消息为空的队列,我们可以通过服务器自动生成no-durable queue 同时名字是随机的类似:amq.gen-JzTY20BRgKO-HjmUJj0wLg.

        这样就能保证每次consumer断开和服务器的连接以后,队列自动释放掉,因为他是exclusive

        

    第三步:上面创建了一个fanout类型的exchange  和  一个任意名字的队列queue, exchange和queue之间的关系叫做binding(绑定?)

         

        经过上面这步: logs这个exchange就会把消息绑定到$queue_name这个队列,(但是$queue_name是随机生成的名字)

        

    consumer 代码:

    public function worker()
        {
            set_time_limit(0);
            $connection = new AMQPStreamConnection('localhost', 5672, 'bitch', 'bitch');
            $channel = $connection->channel();
    
            #申明一个exchange名字叫logs,类型是fanout
            $channel->exchange_declare('logs','fanout',false,false,false);
    
            #申明一个由服务器自动命名的队列,这个队列会在连接结束以后 自动断掉
            list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
    
            #把随机命名的队列绑定到绗棉新建的exchange
            $channel->queue_bind($queue_name, 'logs');
    
            #下面第四个参数如果为false表示开启确认模式,也就是消费以后会告知rabbitmq服务器该条消息已经处理完毕,这样可以方式消息处理一半挂掉了,结果服务器也删除了这条未处理完毕的消息
            $receiver = new self();
            $channel->basic_consume($queue_name, '', false, true, false, false, [$receiver, 'callFunc']);
    
            while(true) {
                $channel->wait();
            }
            $channel->close();
            $connection->close();
        }
    View Code

    publisher代码:

    public function task()
        {
            $connection = new AMQPStreamConnection('localhost', 5672, 'bitch', 'bitch');
            $channel = $connection->channel();
    
            #申明一个exchange名字叫logs,类型是fanout
            $channel->exchange_declare('logs','fanout',false,false,false);
    
                $msg = new AMQPMessage('123');
    
                #第二个参数是表示走什么exchange  第三个参数表示走什么队列
                $channel->basic_publish($msg, 'logs');
                echo "第".'1'.'发送完毕';
    
            $channel->close();
            $connection->close();
        }
    View Code

    实验过程描述: .

    1 ,task 方法去publish 一条消息“123”,

    2,启动3个worker方法,这样系统自动生成3个不一样的随机名字的队列,然后去接受logs的exchange发送来的消息

    这样就实现了 一个服务器发送消息给所有的consumer

     注意: 如果worker没有先启动就发送了消息,在一对所有(publish/subscrible的模式下), 如果发消息后打开worker就无法收到消息

    
    

    ---恢复内容结束---

  • 相关阅读:
    java 线程状态和转化
    初学Spring
    unittest --- 单元测试
    Python单例模式的两种实现方式
    python records操作数据库
    数据可视化之——matplotlib基础学习
    python使用list维护成一个队列
    Python将list列表维护成一个栈
    Python使用rsa模块实现非对称加密与解密
    python +OpenCV实现rtmp视频流媒体的播放
  • 原文地址:https://www.cnblogs.com/spicy/p/7942521.html
Copyright © 2011-2022 走看看