zoukankan      html  css  js  c++  java
  • PHP RabbitMQ消息队列演示代码

    先决条件

    已安装PHP,Erlang和RabbitMQ。

    安装PHP环境下使用的RabbitMQ第三方库——php-amqplib

    使用composer安装php-amqplib库。

    生产者代码

    <?php
    require 'vendor/autoload.php';
    
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    use PhpAmqpLibWireAMQPTable;
    
    $conf = [
        'host' => 'localhost',
        'port' => 5672,
        'user' => 'admin',
        'password' => 'admin',
        'vhost' => '/',
    ];
    $exchangeName = 'testExch';     //交换机名称
    $queueName = 'testQue';         //队列名称
    $routingKey = 'testRoute';      //路由关键字(也可以省略)
    
    //建立生产者与mq之间的连接
    try {
        $conn = new AMQPStreamConnection($conf['host'], $conf['port'], $conf['user'], $conf['password'], $conf['vhost']);
    } catch (Exception $e) {
        exit("RabbitMQ连接失败。
    ");
    }
    
    $channel = $conn->channel();    //在已连接基础上建立生产者与MQ之间的通道
    $channel->exchange_declare($exchangeName, 'direct', false, true, false);    //声明初始化交换机
    
    $args = new AMQPTable();
    $args->set('x-message-ttl',10000);    //设置TTL,消息生存期(设置属性需要生产者和消费者同时设置)
    
    $channel->queue_declare($queueName, false, true, false, false,false,$args);     //声明初始化一条队列
    $channel->queue_bind($queueName, $exchangeName, $routingKey,false);       //将队列与某个交换机进行绑定,并使用路由关键字
    
    // $channel->confirm_select();     //开启ACK,确认机制
    
    for ($i = 1; $i <= 20; $i++) {
        $msgBody = json_encode(["name" => "WCW", "no" => $i]);
        $msg = new AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => 2]);   //构建消息
        $ret = $channel->basic_publish($msg, $exchangeName, $routingKey);     //发布消息到某个交换机
    }
    
    /*
    // 若消息发布成功
    $channel->set_ack_handler(function (AMQPMessage $msg){
        echo "消息发布成功 
    ";
    });
    $channel->wait_for_pending_acks();      //等待ACK确认
    */
    
    $channel->close();
    $conn->close();

    消费者代码

    <?php
    require 'vendor/autoload.php';
    
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibWireAMQPTable;
    
    $conf = [
        'host' => 'localhost',
        'port' => 5672,
        'user' => 'admin',
        'password' => 'admin',
        'vhost' => '/',
    ];
    $exchangeName = 'testExch';     //交换机名
    $queueName = 'testQue';         //队列名称
    $routingKey = 'testRoute';      //路由关键字(也可以省略)
    
    //建立生产者与mq之间的连接
    try {
        $conn = new AMQPStreamConnection($conf['host'], $conf['port'], $conf['user'], $conf['password'], $conf['vhost']);
    } catch (Exception $e) {
        exit("RabbitMQ连接失败。
    ");
    }
    
    $channel = $conn->channel();    //在已连接基础上建立生产者与MQ之间的通道
    
    $args = new AMQPTable();
    $args->set('x-message-ttl',10000);    //设置TTL,消息生存期(设置属性需要生产者和消费者同时设置)
    
    $channel->queue_declare($queueName, false, true, false, false,false,$args);     //声明初始化一条队列
    
    //消费限流,每次只能处理5条消息(在basic_consume的参数no_ask=false的情况下才生效)
    $channel->basic_qos(null, 5, null);
    
    //回调函数,数据处理
    $process_message = function ($msg) {
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);     //消费限流,一轮消费完成后,告知生产者进行下一轮消息发布
        echo "Received: ", json_decode($msg->body)->no, "
    ";
        // $msg->nack(true);
    };
    
    $channel->basic_consume($queueName, '', false, false, false, false, $process_message);      //消费接收消息
    
    //监听消息,一有消息,立马就处理
    /*while(count($channel->callbacks)) {
        $channel->wait();
    }*/
    //或:
    while ($channel->is_consuming()) {
        $channel->wait();
        sleep(5);      //来让ACK即处理消息的过程慢一些,这样我们就可以从后台管理工具中清晰观察到限流情况
    }

    效果测试

    执行生产者程序:

    执行消费者程序:

    消息接收成功!

    至此。转载请注明出处,记得扫码打赏支持哦,谢谢!

  • 相关阅读:
    makefile简单例子
    js归并排序
    js插入排序
    js堆排序
    js选择排序
    js冒泡算法以及优化
    使用go语言判断不同数据类型
    go使用接口案例排序
    go接口使用案例晓demo
    go面向对象-继承
  • 原文地址:https://www.cnblogs.com/wcwnina/p/14728220.html