zoukankan      html  css  js  c++  java
  • 封装rabbitmq

    今天又抽时间用php封装了rabbitmq,使用的框架是yaf

    Consumer如下:

    <?php
    
    namespace RabbitMq;
    class Consumer
    {
        public $exchange_name = "exchange_1";
        public $route_name = "route_1";
        public $queue_name = "queue_1";
        public $conn = null;
        public $channel = null;
        public $exchange = null;
        public $queue = null;
    
        public function __construct(string $exchange_name = "", string $route_name = "", string $queue_name = "")
        {
            if ($exchange_name) $this->exchange_name = $exchange_name;
            if ($route_name) $this->route_name = $route_name;
            if ($queue_name) $this->queue_name = $queue_name;
            $this->init();
    //        $this->createChannel();
    //        $this->createQueue();
        }
    
        public function init()
        {
            //创建连接和channel
            $this->conn = new AMQPConnection(MqConfig::$config);
            if (!$this->conn->connect()) {
                die("Cannot connect to the broker!
    ");
            }
    
        }
    
        public function createChannel()
        {
            $this->channel = new AMQPChannel($this->conn);
    
            //创建交换机
            $this->exchange = new AMQPExchange($this->channel);
            $this->exchange->setName($this->exchange_name);
            $this->exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型
            $this->exchange->setFlags(AMQP_DURABLE); //持久化
            //echo "Exchange Status:" . $this->exchange->declare() . "
    ";
    
    
        }
    
        public function createQueue()
        {
            //创建队列
            $this->queue = new AMQPQueue($this->channel);
            $this->queue->setName($this->queue_name);
            $this->queue->setFlags(AMQP_DURABLE); //持久化
            //echo "Message Total:" . $this->queue->declare() . "
    ";
            //绑定交换机与队列,并指定路由键
            echo 'Queue Bind: ' . $this->queue->bind($this->exchange_name, $this->route_name) . "
    ";
    
            //阻塞模式接收消息
            echo "接收到的消息:
    ";
            while (True) {
                $this->queue->consume(function ($envelope, $queue) {
                    $msg = $envelope->getBody();
                    echo $msg . "
    "; //处理消息
                    $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
                });
                //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答
            }
            $this->conn->disconnect();
        }
    }
    Publisher如下
    <?php
    
    namespace RabbitMq;
    class Publisher
    {
        public $exchange_name = "exchange_1";
        public $route_name = "route_1";
        public $conn = null;
        public $channel = null;
        public $exchange = null;
    
        public function __construct(string $exchange_name = "", string $route_name = "")
        {
            if ($exchange_name) $this->exchange_name = $exchange_name;
            if ($route_name) $this->route_name = $route_name;
            $this->init();
        }
    
        public function init()
        {
            //创建连接和channel
            $this->conn = new AMQPConnection(MqConfig::$config);
            if (!$this->conn->connect()) {
                die("Cannot connect to the broker!
    ");
            }
    
        }
    
        public function createChannel()
        {
            $this->channel = new AMQPChannel($this->conn);
            //创建交换机对象
            $this->exchange = new AMQPExchange($this->channel);
            $this->exchange->setName($this->exchange_name);
        }
    
        public function publishMsg()
        {
            for ($i = 0; $i < 5; ++$i) {
                sleep(1);//休眠1秒
                //发送的消息内容
                $message = "测试消息,你好啊!" . date("h:i:s");
                echo "发送消息:哈哈哈:" . $this->exchange->publish($message, $this->route_name) . "
    ";
            }
            $this->conn->disconnect();
        }
    }

    简单调用:

    调用consumer:

    $consumer = new RabbitMqConsumer();
    $consumer->createChannel();
    $consumer->createQueue();

    调用Publisher:

    $publisher = new RabbitMqPublisher();
    $publisher->createChannel();
    $publisher->publishMsg();
  • 相关阅读:
    php AppStore内购付款验证
    NodeJS
    Electron 的中文乱码问题
    Flexbox 弹性盒子布局的使用
    springboot整合quartz
    第4章:逆向分析技术--64位软件逆向技术
    第51章:静态反调试技术——API查询
    第51章:静态反调试技术
    第48章:SEH
    第47章:PEB
  • 原文地址:https://www.cnblogs.com/allen-spot/p/11439471.html
Copyright © 2011-2022 走看看