1.将php-amqplib拷贝至yii2项目,新建phpclient类
<?php use PhpAmqpLibConnectionAMQPStreamConnection; use PhpAmqpLibMessageAMQPMessage; class PhpClient { public static function Call($n){ require_once __DIR__ . '/vendor/autoload.php'; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('task_queue', false, true, false, false); $data=empty($n)?"Hello World!":$n; $msg = new AMQPMessage($data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) ); $channel->basic_publish($msg, '', 'task_queue'); $channel->close(); $connection->close(); return true; } }
2.commands中新建controller简单实现tasker和worker
<?php namespace appcommands; use yiiconsoleController; use Yii; use PhpAmqpLibConnectionAMQPStreamConnection; class RabbittaskController extends Controller { public function actionWorker() { require(Yii::getAlias('@vendor') . '/rabbitmq/vendor/autoload.php'); $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('task_queue', false, true, false, false); $callback = function ($msg) { //$msg->body //do sth $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $channel->basic_qos(null, 1, null); $channel->basic_consume('task_queue', '', false, false, false, false, $callback); while (count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); } public function actionNewTask() { require(Yii::getAlias('@vendor') . '/rabbitmq/PhpClient.php'); PhpClient::Call('test!'); } }