RabbitMQ是用来在多个异构系统之间进行数据交换的,生产者无需知道消费者的存在,消费者也无需关注生产者的行为,从而进行解耦,提高系统性能。
这里生产者只需要将数据送到队列里面,就完成了任务,而消费者只需要去某个队列消费,就接收到了数据。那么生产者是如何将数据送到指定的队列呢?这里就用到了交换机。
在RabbitMQ中有四种交换机:
1.Direct(默认)
Direct交换机是路由键匹配的话,就将消息投递到相应的队列:
protected function getConn() { $conf = Yii::$app->params['rabbitMQ']; $conn = new AMQPStreamConnection($conf['host'], $conf['port'], $conf['user'], $conf['password'], $conf['vhost']); return $conn; }
$conn = $this->getConn(); $channel = $conn->channel(); /* * 声明一个white队列和一个green队列 * 参数介绍: * passive = false 没有此队列时不会报错 * durable = true 队列持久化 * exclusive = false 当一个连接关闭时队列不会删除 * auto_delete = false 当最后一个消费者断开连接后队列不会自动删除 * nowait = false 不返回执行结果 */ $channel->queue_declare('white', false, true, false, false, false); $channel->queue_declare('green', false, true, false, false, false); //声明direct交换机 $channel->exchange_declare('direct_exchange', 'direct', false, true, false, false, false); /** * 将white队列绑定到direct交换机上 * 此步骤可以省略,默认队列会有一个同名路由键,所以上一章中生产者是这样发送消息的: * $channel->basic_publish($msg, '', 'white'); */ $channel->queue_bind('white', 'direct_exchange', 'color.white'); $msgBody = 'Hello White Rabbit!'; //delivery_mode = 2,消息持久化 $msg = new AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); $channel->basic_publish($msg, 'direct_exchange', 'color.white');
2.Fanout
Fanout交换机不匹配路由,会将消息投递给所有绑定该交换机的队列:
//声明fanout交换机 $channel->exchange_declare('fanout_exchange', 'fanout', false, true, false, false, false); //绑定white和green队列,fanout交换机无需路由键 $channel->queue_bind('white', 'fanout_exchange'); $channel->queue_bind('green', 'fanout_exchange'); //以下一行代码会同时给white和green队列发送一条消息 $channel->basic_publish($msg, 'fanout_exchange');
3.Topic
Topic交换机和Direct交换机类似,只不过会根据定义的规则进行投递:
//声明topic交换机 $channel->exchange_declare('topic_exchange', 'topic', false, true, false, false, false); /* * 绑定white和green队列 * "#"表示0个或多个关键字, "*"表示匹配一个关键字 */ $channel->queue_bind('white', 'topic_exchange', 'color.*'); $channel->queue_bind('green', 'topic_exchange', 'color.#'); //以下一行代码会给green队列发送一条消息 $channel->basic_publish($msg, 'topic_exchange', 'color.white.green');
4.Headers
Headers交换机会根据消息的头部进行投递:
//声明headers交换机 $channel->exchange_declare('headers_exchange', 'headers', false, true, false, false, false); $channel->queue_bind('white', 'headers_exchange', '', false, new AMQPTable([ 'x-match' => 'all', //要求所有参数都要符合 'author' => '74percent', 'type' => 'color' ])); $channel->queue_bind('green', 'headers_exchange', '', false, new AMQPTable([ 'x-match' => 'any', //要求任意参数符合即可 'author' => '74percent', 'type' => 'color' ])); $msg->set('application_headers', new AMQPTable([ 'author' => '74percent' ])); //以下一行代码会给green队列发送一条消息 $channel->basic_publish($msg, 'headers_exchange');