zoukankan      html  css  js  c++  java
  • PHP+RabbitMQ订单消息发布与订阅

    订单模拟发布

    <?php
    
    /**
     * 发布消息
     * @Author: hdj
     * @Date:   2020-07-22 16:15:22
     */
    
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    
    $exchange = 'orders';
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    
    $channel->exchange_declare($exchange, 'direct', false, false, false);
    //生成订单号
    $order_sn=time();
    //订单生成后 处理积分
    $arr = ['id' => rand(111,999),'order_sn' => 'score_ '. $order_sn];
    $data = json_encode($arr);
    $msg = new AMQPMessage($data);
    $channel->basic_publish($msg, $exchange, 'score');
    echo 'Send score message: ' . $data . PHP_EOL;
    
    //订单生成后 处理优惠券
    $arr1 = ['id' => rand(111,999),'order_sn' => 'coupon_ '. $order_sn];
    $data1 = json_encode($arr1);
    $msg1 = new AMQPMessage($data1);
    $channel->basic_publish($msg1, $exchange, 'coupon');
    echo 'Send coupon message: ' . $data1 . PHP_EOL;
    
    
    $channel->close();
    $connection->close();

    积分订阅

    <?php
    
    /**
     * 订阅消息
     * @Author: hdj
     * @Date:   2020-07-22 16:22:50
     */
    
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    
    $exchange = 'orders';
    $routerKey = 'score'; //只消费积分
    
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    $channel->exchange_declare($exchange, 'direct', false, false, false);
    list($queueName, ,) = $channel->queue_declare("", false, false, true, false);
    
    $channel->queue_bind($queueName, $exchange, $routerKey);
    
    echo " [*] Waiting for messages. To exit press CTRL+C" .PHP_EOL;
    $callback = function ($msg) {
        //echo " Received message:", $msg->body, PHP_EOL;
        echo ' Received message:',$msg->delivery_info['routing_key'], ':', $msg->body, PHP_EOL;
        sleep(1);  //模拟耗时执行
    };
    $channel->basic_consume($queueName, '', false, true, false, false, $callback);
    
    while ($channel->is_consuming()) {
        $channel->wait();
    }
    
    $channel->close();
    $connection->close();

    优惠券订阅

    <?php
    
    /**
     * 订阅消息
     * @Author: hdj
     * @Date:   2020-07-22 16:24:57
     */
    
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    
    $exchange = 'orders';
    $routerKey = 'coupon'; //只消费优惠券
    
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    $channel->exchange_declare($exchange, 'direct', false, false, false);
    list($queueName, ,) = $channel->queue_declare("", false, false, true, false);
    
    $channel->queue_bind($queueName, $exchange, $routerKey);
    
    echo " [*] Waiting for messages. To exit press CTRL+C" .PHP_EOL;
    $callback = function ($msg) {
        //echo " Received message:", $msg->body, PHP_EOL;
        echo ' Received message:',$msg->delivery_info['routing_key'], ':', $msg->body, PHP_EOL;
        sleep(1);  //模拟耗时执行
    };
    $channel->basic_consume($queueName, '', false, true, false, false, $callback);
    
    while ($channel->is_consuming()) {
        $channel->wait();
    }
    
    $channel->close();
    $connection->close();

  • 相关阅读:
    LeetCode----->dp系列
    重读STL源码剖析:vector
    重读深度探索C++对象模型:构造
    LeetCode4:寻找两个有序数组的中位数
    部署测试环境(ubuntu+mysql+tomcat)
    开发环境配置(netbeans+ant迁移到eclipse+maven)
    笔记《Java程序性能优化 让你的Java程序更快、更稳定》 第二章 设计调优
    企业高并发的成熟解决方案(一)video(笔记&知识点)
    猎豹使用AI RoboForm填表
    FreeMarker笔记 第四章 其它
  • 原文地址:https://www.cnblogs.com/houdj/p/13361873.html
Copyright © 2011-2022 走看看