/** * 消息列队服务 * @author zhou.tingze * @example * -----------------------------------Create---------------------------------------- * $array = array('a','b','c','d'); * $this->load->library('amqp_service'); * $this->amqp_service->setSaveType('test_exchange', 'test_queue', 'test_router'); * $this->amqp_service->createMessageQueue($array); * -----------------------------------End------------------------------------------- * * -----------------------------------Get------------------------------------------- * $this->load->library('amqp_service'); * $this->amqp_service->setSaveType('test_exchange', 'test_queue', 'test_router'); * $message_queue = $this->amqp_service->getMessageQueue(); * var_dump($message_queue) * -----------------------------------End------------------------------------------- */ class Amqp_service extends Base_service{ public $conn; public $exchange; public $queue; public $router; function __construct(){ parent:: __construct(); //获取系统配置 $this->load->config('app_config', TRUE); $app_config = $this->config->item('app_config'); $this->connect($app_config['amqp']); } /** * * 尝试连接Amqp服务 */ private function connect($amqp_args) { $this->conn = new AMQPConnection($amqp_args); $this->conn->connect(); if (!$this->conn->isConnected()) { throw new Exception('Cannot connect to the broker.'); } } /** * * 设定消息列队保存方式 * @param String $exchange_name 交换机名 * @param String $queue_name 消息列队名 * @param String $router_name 路由名 */ public function setSaveType($exchange_name, $queue_name, $router_name) { $this->exchange = $exchange_name; $this->queue = $queue_name; $this->router = $router_name; } /** * * 创建消息列队 * @param Array $array */ public function createMessageQueue($array) { //创建交换机 $channel = new AMQPChannel($this->conn); $ex = new AMQPExchange($channel); //交换机名 $ex->setName($this->exchange); $ex->setType(AMQP_EX_TYPE_DIRECT); $ex->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); $ex->declare(); //创建消息列队 $q = new AMQPQueue($channel); //队列名 $q->setName($this->queue); $q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); $q->declare(); //绑定交换机与队列,并指定路由键 $q->bind($this->exchange, $this->router); //消息发布 $channel->startTransaction(); $message = json_encode($array); $ex->publish($message, $this->router); $channel->commitTransaction(); //$this->conn->disconnect(); } /** * * 获取消息列队 */ public function getMessageQueue() { try { //设置queue名称,使用exchange,绑定routingkey $channel = new AMQPChannel($this->conn); $q = new AMQPQueue($channel); $q->setName($this->queue); $q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); $q->declare(); $q->bind($this->exchange, $this->router); //消息获取 $messages = $q->get(AMQP_AUTOACK) ; $arr = array(); if ($messages){ $arr = json_decode($messages->getBody(), true ); } }catch (Exception $e){ throw new Exception($e->getMessage()); } //$this->conn->disconnect(); return $arr; } /* public function getAllMessageQueue() { //设置queue名称,使用exchange,绑定routingkey $channel = new AMQPChannel($this->conn); $q = new AMQPQueue($channel); $q->setName($this->queue); $q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); $q->declare(); $q->bind($this->exchange, $this->router); $this->conn->disconnect(); //阻塞模式获取消息列队 while(True){ $q->consume('processMessage'); //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答 } } */ public function __destruct() { $this->conn->disconnect(); } } /** * 消费回调函数 * 处理消息 * @param Object $envelope * @param Object $queue */ /* function processMessage($envelope, $queue) { $msg = $envelope->getBody(); echo $msg . '<br />'; //手动发送ACK应答 $queue->ack($envelope->getDeliveryTag()); } */