zoukankan      html  css  js  c++  java
  • RabbitMQ PHP 代码示例

    docker 启动 rabbitmq,其中 --network=host 不必绑定任何端口,相当于宿主机本地,直接通过 localhost 就能访问。
     
    docker run -d --name rabbit --network=host rabbitmq
    docker exec -it rabbit bash
    容器中> rabbitmq-plugins enable rabbitmq_management
     
    然后浏览器访问: http://localhost:15672 登录账号默认: guest  gues
     
    receive.php 文件
    <?php
    /**
     * 接收消息
     * php receive.php
     */
    
    $config = [
        'host' => '127.0.0.1',
        'port' => '5672',
        'login' => 'guest',
        'password' => 'guest',
        'vhost'=>'/'
    ];
    
    #定义名称
    define('ExchangeName', 'exchange_name_1');
    define('QueueName',    'queue_name_1');
    define('RoutingKey',   'routing_key_1');
    
    //创建连接
    $conn = new AMQPConnection($config);
    if(!$conn->connect()) die("Cannot connect to the broker!
    ");
    
    //创建通道
    $channel = new AMQPChannel($conn);
    
    //创建交换机
    $ex = new AMQPExchange($channel);
    $ex->setName(ExchangeName);
    $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
    $ex->setFlags(AMQP_DURABLE); //持久化交换机
    $ex->declareExchange();
    
    //创建队列
    $queue = new AMQPQueue($channel);
    $queue->setName(QueueName);
    $queue->setFlags(AMQP_DURABLE);//持久化队列
    $queue->declareQueue();
    
    //队列绑定到交换机
    $queue->bind(ExchangeName,  RoutingKey);
    
    //接收消息
    $queue->consume(function ($envelope, $queue){
    
        $msg = $envelope->getBody();
        p($msg);
    
    }, AMQP_AUTOACK); //自动应答
    
    
    $conn->disconnect();

    sent.php

    <?php
    /**
     * 发送消息
     * php sent.php <message>
     *
     */
    
    $config = [
        'host' => '127.0.0.1',
        'port' => '5672',
        'login' => 'guest',
        'password' => 'guest',
        'vhost'=>'/'
    ];
    #定义名称
    define('ExchangeName', 'exchange_name_1');
    define('QueueName',    'queue_name_1');
    define('RoutingKey',   'routing_key_1');
    
    
    //创建连接和channel
    $conn = new AMQPConnection($config);
    if(!$conn->connect()) die("Cannot connect to the broker!
    ");
    $channel = new AMQPChannel($conn);
    
    //创建交换机
    $ex = new AMQPExchange($channel);
    $ex->setName(ExchangeName);
    $ex->setType(AMQP_EX_TYPE_DIRECT); // 设置交换机类型
    $ex->setFlags(AMQP_DURABLE); //持久化交换机
    $ex->declareExchange();
    
    
    //创建队列
    $queue = new AMQPQueue($channel);
    $queue->setName(QueueName);
    $queue->setFlags(AMQP_DURABLE); //持久化队列
    $queue->declareQueue();
    
    //队列绑定到交换机
    $queue->bind(ExchangeName,  RoutingKey);
    
    
    echo "Send: $argv[1] 
    ";
    $ex->publish($argv[1],  RoutingKey);

    打开两个命令行,分别执行两个命令

    php sent.php hello

    php receive.php

    封装一个类

    <?php
    
    
    class Amqp extends AMQPConnection {
    
    //    protected AMQPChannel $channel;
    //    protected AMQPExchange $exchange;
    //    protected AMQPQueue $queue;
    
        protected  $channel;
        protected  $exchange;
        protected  $queue;
    
        public function __construct($host='127.0.0.1', $port=5672, $user='guest', $passwd='guest', $vhost='/'){
    
            $credentials = is_array($host)? $host : [
                'host'      => $host,
                'port'      => $port,
                'login'     => $user,
                'password'  => $passwd,
                'vhost'     => $vhost
            ];
            parent::__construct($credentials);
    
        }
    
    
        /**
         * 创建频道
         * @return AMQPChannel
         * @throws AMQPConnectionException
         */
        public function channel(){
    
            if(!$this->channel){
                parent::connect();
                $this->channel = new AMQPChannel($this);
            }
            return $this->channel;
        }
    
        /**
         * 创建交换器
         * @param string $name
         * @param string $type
         * @param null $flags
         * @return AMQPExchange
         * @throws AMQPConnectionException
         * @throws AMQPExchangeException
         */
        public function exchange($name='', $type='', $flags=null){
    
            if(!$this->exchange){
                $this->exchange = new AMQPExchange($this->channel());
    
            }
            $name && $this->exchange->setName($name);
            $type && $this->exchange->setType($type);
            is_integer($flags) && $this->exchange->setFlags($flags);
    
            return $this->exchange;
    
        }
    
        /**
         * 创建队列
         * @param string $name
         * @param null $flags
         * @return AMQPQueue
         * @throws AMQPConnectionException
         * @throws AMQPQueueException
         */
        public function queue($name='', $flags=null){
    
            if(!$this->queue){
                $this->queue = new AMQPQueue($this->channel());
            }
    
            $name && $this->queue->setName($name);
            is_integer($flags) && $this->queue->setFlags($flags);
    
            return $this->queue;
    
        }
    
        /**
         * 发布交换器
         * @return bool
         * @throws AMQPChannelException
         * @throws AMQPConnectionException
         * @throws AMQPExchangeException
         */
        public function declareExchange(){
            return $this->exchange()->declareExchange();
        }
    
        /**
         * 发布队列
         * @return int
         * @throws AMQPChannelException
         * @throws AMQPConnectionException
         * @throws AMQPQueueException
         */
        public function declareQueue(){
            return $this->queue()->declareQueue();
        }
    
        /**
         * 绑定队列
         * @param string $routing_key
         * @param array $arguments
         * @return bool
         * @throws AMQPChannelException
         * @throws AMQPConnectionException
         * @throws AMQPExchangeException
         * @throws AMQPQueueException
         */
        public function bindQueue($routing_key='', $arguments=[]){
            return $this->queue()->bind($this->exchange()->getName(), $routing_key, $arguments);
        }
    
    
        /**
         * 发布消息
         * @param callable|null $callback
         * @param int $flags
         * @param null $consumerTag
         * @return bool
         * @throws AMQPChannelException
         * @throws AMQPConnectionException
         * @throws AMQPExchangeException
         */
        public function publish($message, $routing_key=null,  $flags=AMQP_NOPARAM,  array $attributes=[]){
            return $this->exchange()->publish($message, $routing_key, $flags, $attributes);
        }
    
        public function consume(callable $callback=null, $flags=AMQP_NOPARAM, $consumerTag = null){
            $this->queue()->consume($callback, $flags, $consumerTag);
        }
    
        /**
         * 删除交换器
         * @param null $name
         * @param int $flags
         * @throws AMQPChannelException
         * @throws AMQPConnectionException
         * @throws AMQPExchangeException
         */
        public function deleteExchange($name = null, $flags=AMQP_NOPARAM){
            $this->exchange()->delete($name, $flags);
        }
    
        /**
         * 删除队列
         * @param int $flags
         * @throws AMQPChannelException
         * @throws AMQPConnectionException
         * @throws AMQPQueueException
         */
        public function deleteQueue($flags=AMQP_NOPARAM){
            $this->queue()->delete($flags);
        }
    
    
    }

    测试:test.php

    #定义名称
    define('ExchangeName', 'exchange_name_15');
    define('QueueName',    'queue_name_15');
    define('RoutingKey',   'routing_key_15');
    
    
    //如果带有命令行参数,则表示发送消息
    if(count($argv) > 1){
    
        $amqp = new Amqp();
        $amqp->channel();
        $amqp->exchange(ExchangeName, AMQP_EX_TYPE_DIRECT, AMQP_DURABLE)->declareExchange();
        $amqp->queue(QueueName, AMQP_DURABLE)->declareQueue();
        $amqp->bindQueue(RoutingKey);
        $amqp->publish($argv[1], RoutingKey);
        $amqp->disconnect();
    }else{
    
        $amqp = new Amqp();
        $amqp->channel();
        $amqp->exchange(ExchangeName, AMQP_EX_TYPE_DIRECT, AMQP_DURABLE)->declareExchange();
        $amqp->queue(QueueName, AMQP_DURABLE)->declareQueue();
        $amqp->bindQueue(RoutingKey);
    
        //接收消息
        $amqp->consume(function($envelope, $queue){
            $msg = $envelope->getBody();
            echo $msg;
        }, AMQP_AUTOACK); //自动应答
    
    }
  • 相关阅读:
    POJ2115解题报告【拓展欧几里得模板题】
    Linux安装jdk快速流程
    SpringBoot+Vue项目多文件上传同时上传其他参数
    Maven
    浏览器常用快捷键
    IDEA从GitHub仓库拉取代码
    Address already in use: bind
    Vue集成echarts插件
    致自己
    Flask_FileUpload
  • 原文地址:https://www.cnblogs.com/zbseoag/p/14279249.html
Copyright © 2011-2022 走看看