zoukankan      html  css  js  c++  java
  • PHP+RabbitMQ订单消息发布与订阅

    订单模拟发布

    <?php
    
    /**
     * 发布消息
     * @Author: hdj
     * @Date:   2020-07-22 16:15:22
     */
    
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    
    $exchange = 'orders';
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    
    $channel->exchange_declare($exchange, 'direct', false, false, false);
    //生成订单号
    $order_sn=time();
    //订单生成后 处理积分
    $arr = ['id' => rand(111,999),'order_sn' => 'score_ '. $order_sn];
    $data = json_encode($arr);
    $msg = new AMQPMessage($data);
    $channel->basic_publish($msg, $exchange, 'score');
    echo 'Send score message: ' . $data . PHP_EOL;
    
    //订单生成后 处理优惠券
    $arr1 = ['id' => rand(111,999),'order_sn' => 'coupon_ '. $order_sn];
    $data1 = json_encode($arr1);
    $msg1 = new AMQPMessage($data1);
    $channel->basic_publish($msg1, $exchange, 'coupon');
    echo 'Send coupon message: ' . $data1 . PHP_EOL;
    
    
    $channel->close();
    $connection->close();

    积分订阅

    <?php
    
    /**
     * 订阅消息
     * @Author: hdj
     * @Date:   2020-07-22 16:22:50
     */
    
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    
    $exchange = 'orders';
    $routerKey = 'score'; //只消费积分
    
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    $channel->exchange_declare($exchange, 'direct', false, false, false);
    list($queueName, ,) = $channel->queue_declare("", false, false, true, false);
    
    $channel->queue_bind($queueName, $exchange, $routerKey);
    
    echo " [*] Waiting for messages. To exit press CTRL+C" .PHP_EOL;
    $callback = function ($msg) {
        //echo " Received message:", $msg->body, PHP_EOL;
        echo ' Received message:',$msg->delivery_info['routing_key'], ':', $msg->body, PHP_EOL;
        sleep(1);  //模拟耗时执行
    };
    $channel->basic_consume($queueName, '', false, true, false, false, $callback);
    
    while ($channel->is_consuming()) {
        $channel->wait();
    }
    
    $channel->close();
    $connection->close();

    优惠券订阅

    <?php
    
    /**
     * 订阅消息
     * @Author: hdj
     * @Date:   2020-07-22 16:24:57
     */
    
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    
    $exchange = 'orders';
    $routerKey = 'coupon'; //只消费优惠券
    
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    $channel->exchange_declare($exchange, 'direct', false, false, false);
    list($queueName, ,) = $channel->queue_declare("", false, false, true, false);
    
    $channel->queue_bind($queueName, $exchange, $routerKey);
    
    echo " [*] Waiting for messages. To exit press CTRL+C" .PHP_EOL;
    $callback = function ($msg) {
        //echo " Received message:", $msg->body, PHP_EOL;
        echo ' Received message:',$msg->delivery_info['routing_key'], ':', $msg->body, PHP_EOL;
        sleep(1);  //模拟耗时执行
    };
    $channel->basic_consume($queueName, '', false, true, false, false, $callback);
    
    while ($channel->is_consuming()) {
        $channel->wait();
    }
    
    $channel->close();
    $connection->close();

  • 相关阅读:
    JavaScript闭包
    模块模式——方法
    产品与技术
    读书笔记
    屌丝求职记
    正则表达式regex狂记
    css狂记
    html狂记
    Android狂记忆
    关于调式
  • 原文地址:https://www.cnblogs.com/houdj/p/13361873.html
Copyright © 2011-2022 走看看