生产者 require_once './vendor/autoload.php'; use PhpAmqpLibConnectionAMQPStreamConnection; use PhpAmqpLibMessageAMQPMessage; $exchangeName = 'exchange_fanout_1'; // $queueName = 'queue_fanout_1'; $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'test', 'test', '/'); $channel = $connection->channel(); //推送成功 $channel->set_ack_handler( function (AMQPMessage $message) { echo "成功: " . $message->body . PHP_EOL; } ); //推送失败 $channel->set_nack_handler( function (AMQPMessage $message) { echo "失败: " . $message->body . PHP_EOL; } ); $channel->confirm_select(); $channel->exchange_declare($exchangeName, 'fanout', false, false, false); for ($i = 0; $i < 10; $i++) { $msg = new AMQPMessage('消息: '.$i); $channel->basic_publish($msg, $exchangeName); } $channel->wait_for_pending_acks(); $channel->close(); $connection->close();
消费者 require_once './vendor/autoload.php'; use PhpAmqpLibConnectionAMQPStreamConnection; $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'test', 'test', '/'); $channel = $connection->channel(); $exchangeName = 'exchange_fanout_1'; $queueName = $argv[1]; $channel->exchange_declare($exchangeName, 'fanout', false, false, false); $channel->queue_declare($queueName, false, false, false, false); $channel->queue_bind($queueName, $exchangeName); echo " [*] Waiting for messages. To exit press CTRL+C "; $callback = function ($msg) { echo ' [x] Received ', $msg->body, " "; }; $channel->basic_consume($queueName, '', false, true, false, false, $callback); while ($channel->is_consuming()) { $channel->wait(); } $channel->close(); $connection->close();
php producter.php
php customer.php "exchange_fanout_1"