参考:https://xiaoxiami.gitbook.io/rabbitmq_into_chinese_php/ (php-amqplib扩展库:demo)
参考:https://zhuanlan.zhihu.com/p/63700605 (知乎)
简介:
不要用的太复杂,一般定义好一个直连交换机(route_key)和一个扇形交换机就可以用了。
生产端使用流程:
1,创建连接:connection
$connection = new AMQPStreamConnection('192.168.124.39', 5672, 'fort', 'fort');
2,创建信道:channel
$channel = $connection->channel();
3,创建消息:message
$msg = new AMQPMessage('Hello World!'); $msg = new AMQPMessage($data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) );//持久化的消息,与queue都需要定义好持久化参数
4,创建队列:queue
#空交换机的时候才需要定义queue $channel->queue_declare('hello', false, false, false, false); $channel->queue_declare('task_queue', false, true, false, false);//持久的消息,第3个参数
5,创建交换机:exchange
#定义了交换机就不需要定义queue
$channel->exchange_declare('logs', 'fanout', false, false, false); $channel->exchange_declare('direct_logs', 'direct', false, false, false); $channel->exchange_declare('topic_logs', 'topic', false, false, false);
6,发布消息:publish
$channel->basic_publish($msg, '', 'task_queue'); $channel->basic_publish($msg, 'logs');//扇形交换机 $channel->basic_publish($msg, 'direct_logs', $severity);//直连交换机 $channel->basic_publish($msg, 'topic_logs', $routing_key);//主题交换机
7,关闭信道和连接:
$channel->closs(); $connection->close();
消费端使用流程
1,创建连接:connection
$connection = new AMQPStreamConnection('192.168.124.39', 5672, 'fort', 'fort');
2,创建信道:channel
$channel = $connection->channel();
3,申明队列:queue(消费端必须有queue)
//空交换机时,不需要申明交换机 $channel->queue_declare('hello', false, false, false, false); //扇形,直连,主题交换机时 list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
4,申明交换机:exchange
//扇形交换机 $channel->exchange_declare('logs', 'fanout', false, false, false); //直连交换机 $channel->exchange_declare('direct_logs', 'direct', false, false, false); //主题交换机 $channel->exchange_declare('topic_logs', 'topic', false, false, false);
5,交换机绑定到队列:
//绑定扇形交换机 $channel->queue_bind($queue_name, 'logs'); //绑定直连交换机 foreach($severities as $severity) { $channel->queue_bind($queue_name, 'direct_logs', $severity); } //绑定主题交换机 foreach($binding_keys as $binding_key) { $channel->queue_bind($queue_name, 'topic_logs', $binding_key); }
6,消费队列;
//空交换机时 $channel->basic_consume('hello', '', false, true, false, false, $callback); while ($channel->is_consuming()) { $channel->wait(); } //扇形,直连,主题交换机时 $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); }
7,关闭信道和连接:
$channel->closs();
$connection->close();