zoukankan      html  css  js  c++  java
  • 在PHP中如何使用消息列队

    /**
     * 消息列队服务
     * @author zhou.tingze
     * @example
     * -----------------------------------Create----------------------------------------
     * $array = array('a','b','c','d');
     * $this->load->library('amqp_service');
     * $this->amqp_service->setSaveType('test_exchange', 'test_queue', 'test_router');
     * $this->amqp_service->createMessageQueue($array);
     * -----------------------------------End-------------------------------------------
     * 
     * -----------------------------------Get-------------------------------------------
     * $this->load->library('amqp_service');
     * $this->amqp_service->setSaveType('test_exchange', 'test_queue', 'test_router');
     * $message_queue = $this->amqp_service->getMessageQueue();
     * var_dump($message_queue)
     * -----------------------------------End-------------------------------------------
     */
    
    class Amqp_service extends Base_service{
    	
    	public $conn;
    	public $exchange;
    	public $queue;
    	public $router;
    	
        function __construct(){
            parent:: __construct();
            
            //获取系统配置
            $this->load->config('app_config', TRUE);
            $app_config = $this->config->item('app_config');
    
            $this->connect($app_config['amqp']);
        }
        
        /**
         * 
         * 尝试连接Amqp服务
         */
        private function connect($amqp_args)
        {	
        	$this->conn = new AMQPConnection($amqp_args);
    		$this->conn->connect();
    		
    		if (!$this->conn->isConnected()) 
    		{
    			throw new Exception('Cannot connect to the broker.');
    		}
        }
        
        /**
         * 
         * 设定消息列队保存方式
         * @param String $exchange_name 交换机名
         * @param String $queue_name    消息列队名
         * @param String $router_name   路由名
         */
        public function setSaveType($exchange_name, $queue_name, $router_name)
        {
        	$this->exchange = $exchange_name;
        	$this->queue    = $queue_name;
        	$this->router   = $router_name;
        }
        
        /**
         * 
         * 创建消息列队
         * @param Array $array
         */
        public function createMessageQueue($array)
        {
        	//创建交换机 
    		$channel = new AMQPChannel($this->conn);
    		$ex      = new AMQPExchange($channel);
    		
    		//交换机名 
    		$ex->setName($this->exchange);
    		$ex->setType(AMQP_EX_TYPE_DIRECT);
    		$ex->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
    		$ex->declare();
    		
    		//创建消息列队
    		$q = new AMQPQueue($channel);
    		
    		//队列名
    		$q->setName($this->queue);
    		$q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
    		$q->declare();
    		
    		//绑定交换机与队列,并指定路由键 
    		$q->bind($this->exchange, $this->router);
    		
    		//消息发布
    		$channel->startTransaction();
    		$message = json_encode($array);
    		$ex->publish($message, $this->router);
    		$channel->commitTransaction();
    		
    		//$this->conn->disconnect();
        }
        
        /**
         * 
         * 获取消息列队
         */
        public function getMessageQueue()
        {
        	try
        	{
    	    	//设置queue名称,使用exchange,绑定routingkey
    			$channel = new AMQPChannel($this->conn);
    			$q       = new AMQPQueue($channel);
    			
    			$q->setName($this->queue);
    			$q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
    			$q->declare();
    			$q->bind($this->exchange, $this->router);   
    			
    			//消息获取
    			$messages = $q->get(AMQP_AUTOACK) ;
    			
    			$arr = array();
    			if ($messages){
    				$arr = json_decode($messages->getBody(), true );
    			}
        	}catch (Exception $e){
        		throw new Exception($e->getMessage());
        	}
    		//$this->conn->disconnect();
    		
    		return $arr;
        }
        
        /*
        public function getAllMessageQueue()
        {
    		//设置queue名称,使用exchange,绑定routingkey
    		$channel = new AMQPChannel($this->conn);
    		$q       = new AMQPQueue($channel);
    		
    		$q->setName($this->queue);
    		$q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
    		$q->declare();
    		$q->bind($this->exchange, $this->router);   
    		$this->conn->disconnect();
    		
    		//阻塞模式获取消息列队
    		while(True){ 
    		    $q->consume('processMessage');   
    		    //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答  
    		} 
        }
        */
        
        public function __destruct()
        {
        	$this->conn->disconnect();
        }
    }
    
    
    /**
     * 消费回调函数
     * 处理消息
     * @param Object $envelope
     * @param Object $queue
     */ 
    /*
    function processMessage($envelope, $queue) { 
        $msg = $envelope->getBody(); 
        echo $msg . '<br />';
        
        //手动发送ACK应答 
        $queue->ack($envelope->getDeliveryTag()); 
    } 
    */
    

      

  • 相关阅读:
    结语
    创建ejs模板的express工程
    浏览器控制台命令调试——console
    JS获取URL中参数值(QueryString)的4种方法分享
    oracle之报错:ORA-00054: 资源正忙,要求指定 NOWAIT
    javascript 获取页面的高度及滚动条的位置的代码
    javascript 页面各种高度宽度
    导出Excel之Epplus使用教程2(样式设置)
    索引 'GXHRCS.PK_A253' 或这类索引的分区处于不可用状态
    数据库操作
  • 原文地址:https://www.cnblogs.com/adtuu/p/4670229.html
Copyright © 2011-2022 走看看