<?php /** * 生产者 */ $connection = new AMQPConnection([ 'host' => '192.168.23.130', 'port' => 5672, 'login' => 'rabuser', 'password' => '123456' ]); $connection->connect() or die('连接失败'); try{ $exchange_name = 'trades'; $route_key = '/trade'; //投递消息到中间件 $channel = new AMQPChannel($connection);//创建消息通道 $exchange = new AMQPExchange($channel);//通过通道连接交换几 //设置通道名称 $exchange->setName($exchange_name); $data = json_encode(['time'=>time()]); //发布消息到交换机中 $exchange->publish($data,$route_key); }catch (AMQPChannelException $e){ var_dump($e); }
<?php /** * 消费者 */ $connection = new AMQPConnection([ 'host' => '192.168.23.130', 'port' => 5672, 'login' => 'rabuser', 'password' => '123456' ]); $connection->connect() or die('连接失败'); try{ $exchange_name = 'trades'; $route_key = '/trade'; $queue_name = 'queue'; //投递消息到中间件 $channel = new AMQPChannel($connection);//创建消息通道 $exchange = new AMQPExchange($channel);//通过通道连接交换几 //设置通道名称 $exchange->setName($exchange_name); //三种获取消息的模式,直连模式,主题模式,广播模式 $exchange->setType(AMQP_EX_TYPE_DIRECT); //声明 $exchange->declareExchange(); //声明队列绑定交换机路由 $queue = new AMQPQueue($channel); $queue->setName($queue_name); $queue->declareQueue(); //绑定监听获取数据 $queue->bind($exchange_name,$route_key); //消费数据,默认阻塞监听获取数据 $queue->consume(function ($event,$queue){ //获取数据 $msg = $event->getBody(); var_dump($msg); var_dump($queue); //回应ACK $queue->ack($event->getDeliveryTag()); }); }catch (AMQPChannelException $e){ var_dump($e); }