订单模拟发布
<?php /** * 发布消息 * @Author: hdj * @Date: 2020-07-22 16:15:22 */ require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLibConnectionAMQPStreamConnection; use PhpAmqpLibMessageAMQPMessage; $exchange = 'orders'; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare($exchange, 'direct', false, false, false); //生成订单号 $order_sn=time(); //订单生成后 处理积分 $arr = ['id' => rand(111,999),'order_sn' => 'score_ '. $order_sn]; $data = json_encode($arr); $msg = new AMQPMessage($data); $channel->basic_publish($msg, $exchange, 'score'); echo 'Send score message: ' . $data . PHP_EOL; //订单生成后 处理优惠券 $arr1 = ['id' => rand(111,999),'order_sn' => 'coupon_ '. $order_sn]; $data1 = json_encode($arr1); $msg1 = new AMQPMessage($data1); $channel->basic_publish($msg1, $exchange, 'coupon'); echo 'Send coupon message: ' . $data1 . PHP_EOL; $channel->close(); $connection->close();
积分订阅
<?php /** * 订阅消息 * @Author: hdj * @Date: 2020-07-22 16:22:50 */ require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLibConnectionAMQPStreamConnection; $exchange = 'orders'; $routerKey = 'score'; //只消费积分 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare($exchange, 'direct', false, false, false); list($queueName, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queueName, $exchange, $routerKey); echo " [*] Waiting for messages. To exit press CTRL+C" .PHP_EOL; $callback = function ($msg) { //echo " Received message:", $msg->body, PHP_EOL; echo ' Received message:',$msg->delivery_info['routing_key'], ':', $msg->body, PHP_EOL; sleep(1); //模拟耗时执行 }; $channel->basic_consume($queueName, '', false, true, false, false, $callback); while ($channel->is_consuming()) { $channel->wait(); } $channel->close(); $connection->close();
优惠券订阅
<?php /** * 订阅消息 * @Author: hdj * @Date: 2020-07-22 16:24:57 */ require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLibConnectionAMQPStreamConnection; $exchange = 'orders'; $routerKey = 'coupon'; //只消费优惠券 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare($exchange, 'direct', false, false, false); list($queueName, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queueName, $exchange, $routerKey); echo " [*] Waiting for messages. To exit press CTRL+C" .PHP_EOL; $callback = function ($msg) { //echo " Received message:", $msg->body, PHP_EOL; echo ' Received message:',$msg->delivery_info['routing_key'], ':', $msg->body, PHP_EOL; sleep(1); //模拟耗时执行 }; $channel->basic_consume($queueName, '', false, true, false, false, $callback); while ($channel->is_consuming()) { $channel->wait(); } $channel->close(); $connection->close();