zoukankan      html  css  js  c++  java
  • php rabbitmq发送消息并判断消息是否发送成功 ack comfirm机制

    <?php
    require_once '../vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    
    $exchangeName = 'exchange_1';
    $queueName = 'queue_1';
    $routeKey = 'routeingkey_1';
    
    $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'test', 'test123', '/');
    $channel = $connection->channel();
    
    
    //推送成功
    $channel->set_ack_handler(
        function (AMQPMessage $message) {
            echo "发送成功: " . $message->body . PHP_EOL;
        }
    );
    
    //推送失败
    $channel->set_nack_handler(
        function (AMQPMessage $message) {
            echo "发送失败: " . $message->body . PHP_EOL;
        }
    );
    
    /*
     * bring the channel into publish confirm mode.
     * if you would call $ch->tx_select() before or after you brought the channel into this mode
     * the next call to $ch->wait() would result in an exception as the publish confirm mode and transactions
     * are mutually exclusive
     */
    /*
     * 进入发布确认模式。
     * 如果在将通道引入此模式之前或之后调用$ch->tx_select()
     * 下一个调用$ch->wait()将导致发布确认模式和事务异常
     * 是互斥的
     */
    $channel->confirm_select(); // 发布确认模式  
    
    // 通道
    $channel->exchange_declare($exchangeName, 'direct', false, false, false);
    // 队列
    $channel->queue_declare($queueName, false, false, false, false);
    
    // 使用routeKey绑定交换机和队列
    $channel->queue_bind($queueName, $exchangeName, $routeKey);
    
    //$channel->wait_for_pending_acks();
    
    
    for ($i = 0; $i < 10; $i++) {
        $msg = new AMQPMessage('Hello World!'.$i);
        $channel->basic_publish($msg, $exchangeName, $routeKey);
    }
    
    /*
    * you do not have to wait for pending acks after each message sent. in fact it will be much more efficient
    * to wait for as many messages to be acked as possible.
    */
    /*
     *您不必在每条消息发送后等待挂起的acks。事实上,这样会更有效率
     *等待尽可能多的邮件被屏蔽。
     */
    $channel->wait_for_pending_acks();
    // 监听成功或失败返回结束 成功/失败 => set_ack_handler/set_nack_handler
    
    $channel->close();
    $connection->close();
    

      

     

     

  • 相关阅读:
    svn忽略不需要同步的文件夹或文件
    Redis 字符串(String)
    Redis 数据类型
    Linux下安装rabbitMQ
    Windows平台下Git服务器搭建
    Linux下安装redis
    JVM调优总结
    Tomcat优化配置
    通过profile 用maven命令打不同配置的变量包
    Log4j日志配置说明
  • 原文地址:https://www.cnblogs.com/setevn/p/14645458.html
Copyright © 2011-2022 走看看