zoukankan      html  css  js  c++  java
  • PHP实现RabbitMQ的Publish/Subscribe

    <?php
    /**
     * Created by PhpStorm.
     * User: 豆腐居士
     * Date: 2018/5/30
     * Time: 上午11:01
     */
    
    class AqiTask extends BaseTask
    {
        const EX_NAME = 'aqi_fanout';
        function clientAction()
        {
            $conn = new AMQPConnection([
                'host' => '0.0.0.0',
                'port' => 5672,
                'login' => 'wangwang',
                'password' => '123456'
            ]);
            if (!$conn->connect()) {
                exit('fail');
            }
            $channel = new AMQPChannel($conn);
            $ex = new AMQPExchange($channel);
            $ex->setName(self::EX_NAME);
            $ex->setType(AMQP_EX_TYPE_FANOUT); //direct类型
            $ex->setFlags(AMQP_DURABLE); //持久化
            $ex->declareExchange();
            for ($i = 0; $i < 10; $i++) {
                $msg = 'hello fanout:' . $i;
                echo $msg . PHP_EOL;
                $ex->publish($msg);
            }
            $conn->disconnect();
        }
    
        function server1Action()
        {
            $conn = new AMQPConnection([
                'host' => '0.0.0.0',
                'port' => 5672,
                'login' => 'wangwang',
                'password' => '123456'
            ]);
            if (!$conn->connect()) {
                exit('fail');
            }
            $channel = new AMQPChannel($conn);
            $ex = new AMQPExchange($channel);
            $ex->setName(self::EX_NAME);
            $ex->setType(AMQP_EX_TYPE_FANOUT); //fanout类型
            $ex->setFlags(AMQP_DURABLE); //持久化
            $ex->declareExchange();
    
            $queue = new AMQPQueue($channel);
            $queue->setName('q1');
            $queue->setFlags(AMQP_DURABLE);
            $queue->declareQueue();
            $queue->bind($ex->getName());try {
                 $queue->consume(function ($envelope, $q) {
                     echo 'server1:' . $envelope->getBody(), PHP_EOL;
                     $q->ack($envelope->getDeliveryTag()); //这里是手动应答  如果使用默认的自动应答,进程退出后消息会丢失
                 });
            } catch (Exception $e) {
                echo $e->getMessage();
            }
        }
    
        function server2Action()
        {
            $conn = new AMQPConnection([
                'host' => '0.0.0.0',
                'port' => 5672,
                'login' => 'wangwang',
                'password' => '123456'
            ]);
            if (!$conn->connect()) {
                exit('fail');
            }
    
            $channel = new AMQPChannel($conn);
            $ex = new AMQPExchange($channel);
            $ex->setName(self::EX_NAME);
            $ex->setType(AMQP_EX_TYPE_FANOUT);
            $ex->setFlags(AMQP_DURABLE);
            $ex->declareExchange();
    
            $queue = new AMQPQueue($channel);
            $queue->setName('q2');
            $queue->setFlags(AMQP_DURABLE);
            $queue->declareQueue();
            $queue->bind($ex->getName());$queue->consume(function ($envelope, $q) {
                 echo 'server2:' . $envelope->getBody(), PHP_EOL;
                 $q->ack($envelope->getDeliveryTag());
            });
        }
    }
  • 相关阅读:
    软件测试入门知识
    QTP小应用一则
    频分时分波分码分
    解析UML9种图的作用
    OSI七层模型
    暑期实习心得
    0724工作小结 SQL查库是重点
    0723脚本存储过程的学习
    0722工作日志
    工作之余回味了曾经的写过的小说
  • 原文地址:https://www.cnblogs.com/sanshuiqing/p/9113093.html
Copyright © 2011-2022 走看看