zoukankan      html  css  js  c++  java
  • RabbitMq的使用--发布订阅篇--交换机的存在

    发布订阅感觉对比上一篇的消息队列 感觉就是多引入了一个交换机的概念

    上篇的时候没有创建交换机,然后生产者

    在这个发布订阅篇这里,生产者和消费者的区别感觉更小了 唯一不同的就生产者是basic_publish 把信息推上去交换机  消费者是basic_consume把信息拉下来

    
    
    <?php
    namespace app abbit;
    //require_once __DIR__ . '/autoload.php'; //因为我自己使用的是tp框架 所以我在这里不需要再加载这个类了
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    class Send {
    public function connect($num) {

    //连接rabbit
    $connection = new AMQPStreamConnection('***.***.***.***', 5672, '***', '*****');
    //创建一个通道
    $channel = $connection->channel();

    /**
    * 之前没有说到交换机
    * 交换机类型:直连交换机(direct), 主题交换机(topic), (头交换机)headers和 扇型交换机(fanout)。我们在这里主要说明最后一个 —— 扇型交换机(fanout)
    * RabbitMQ消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。
    * 其实可不可以理解为其实交换机才是rabbitmq的大脑
    */
    //创建一个扇型交换机 它把消息发送给它所知道的所有队列
    /**
    * rabbitmqctl能够列出服务器上所有的交换器:
    * rabbitmqctl list_exchanges
    */
    /**
    * @exchange 交换机名称
    * @type 交换机类型
    */
    $channel->exchange_declare('suibianla', 'fanout', false, false, false);
    //创建一个临时队列 当与消费者(consumer)断开连接的时候,这个队列应当被立即删除。
    list($queue_name) = $channel->queue_declare('');
    var_dump($queue_name);
    //队列绑定交换机
    $channel->queue_bind($queue_name, 'suibianla');
    //这应该是经过AMQP协议洗礼过得信息
    $msg = new AMQPMessage('如果我是憨p勇+++'.$num);
    //把信息推上去交换机
         //这里第二个参数是交换机的名字 ,上篇是没填的,然后有第三个参数,指定队列名
    $res = $channel->basic_publish($msg, 'suibianla');
    //通道关闭
    $channel->close();
    //连接关闭
    $connection->close();
    }
    }


    //---------------------------------------------------------


    //消费者代码,几乎一样

    <?php
    require_once __DIR__ . '/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;

    $connection = new AMQPStreamConnection('***.***.***.***', 5672, '***', '***');
    $channel = $connection->channel();
    $channel->exchange_declare('suibianla', 'fanout', false, false, false);
    list($queue_name) = $channel->queue_declare('');
    $channel->queue_bind($queue_name, 'suibianla');

    $res = $channel->basic_consume($queue_name, '', false, false, false, false, function ($msg) {
    // file_put_contents('get.text', '这是消息1'.$msg->body." ", FILE_APPEND);
    echo '说:'.$msg->body." ";
    try {
    // var_dump($msg);
    //这里是上面第四个消息回复设置为false 之后需要手动确认 不然的话消息不会删掉
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);

    }catch(Exception $e) {
    return false;
    }
    });

    while(count($channel->callbacks)) {
    $channel->wait();
    }
    $channel->close();
    $connection->close();



  • 相关阅读:
    python面试大全
    python面试2
    python求职之路
    python面试题目
    关于栈的输入顺序和输出顺序
    表单提交中get和post方式的区别
    DOS命令行下mysql 基本命令
    跨站请求伪造CSRF
    Windows上python的virtualenv 安装及使用
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
  • 原文地址:https://www.cnblogs.com/wqxq/p/14681818.html
Copyright © 2011-2022 走看看