zoukankan      html  css  js  c++  java
  • 用 Redis 实现 PHP 的简单消息队列

    参考:PHP高级编程之消息队列

    消息队列就是在消息的传输过程中,可以保存消息的容器。

    常见用途:

    • 存储转发:异步处理耗时的任务
    • 分布式事务:多个消费者消费同一个消息队列
    • 应对高并发:通过消息队列保存任务,慢慢处理
    • 发布订阅:实现解耦

    PHP 可以基于 Redis 的 List 数据类型实现简单的消息队列,可以参考 php-resque。当然也可以使用更强大的 RabbitMQ。

    实现方式

    PHP 守护进程

    PHP 业务代码:

    <?php
    
    class MyDaemon
    {
        public $procNum = 8; // 进程总数
    
    	// 启动进程
        public function run()
        {
            for ($i = 0; $i < $this->procNum; $i++) {
                $nPID = pcntl_fork();//创建子进程
                if ($nPID == 0) {
                    //子进程
                    OrgUtilMsgQ::init();
                    $this->work();
                    exit(0);
                }
            }
            // 等待子进程执行完毕,避免僵尸进程
            $n = 0;
            while ($n < $this->procNum) {
                $nStatus = -1;
                $nPID = pcntl_wait($nStatus);
                if ($nPID > 0) {
                    ++$n;
                }
            }
        }
        
        //业务代码
        public function work()
        {
            $MsgData = "";
            while (true) {
                usleep(10000); // 10 ms 执行一次
                $ret = MsgQ::BlockSubsribe("MyMsgName", $MsgData);
                // 业务代码
            }
        }
    

    消息队列(基于Redis)库代码:

    <?php
    
    namespace OrgUtil;
    
    class MsgQ {
    	public static $errCode = 0;
    	public static $errMsg = "";
    	public static $redis;
    	private static $preFix = "MsgQ.";
    	private static $timeOut = 10;
    
    	private static $redisHost = '';
    	private static $redisPort = '';
    	private static $redisAuth = '';
    
    	function __construct()
    	{
    		self::$redis = new Redis();
    		$ret = self::$redis->connect($redisHost,$redisPort,self::$timeOut);
    		$ret = self::$redis->auth($redisAuth);
    	}
    
    	function __destruct()
    	{
    		if(self::$redis) {
    			self::$redis->close();
    		}
    	}
    
    	public static function init($timeOut = 0){
    		if (!self::$redis) {
    			self::$redis = new Redis();
    			if(!empty($timeOut)){
    				self::$timeOut = $timeOut;
    				ini_set('default_socket_timeout', 259200);
    				$ret = self::$redis->connect($redisHost,$redisPort,self::$timeOut);
    				$ret = self::$redis->auth($redisAuth);
    			}
    			else{
    				self::$timeOut = 0;
    				ini_set('default_socket_timeout', 259200);
    				$ret = self::$redis->connect($redisHost,$redisPort,259200);
    				$ret = self::$redis->auth($redisAuth);
    			}
    		}
    	}
    
    	public static function Publish($pubKey,$data){
    		if(!self::PingAndConnect()){
    			return false;
    		}
    		$ret = self::$redis->rPush(self::$preFix.$pubKey,$data);
    		if ($ret === false){
    			return false;
    		}
    		return true;
    	}
    
    	public static function GetListLen($pubKey,&$len){
    		if(!self::PingAndConnect()){
    			return false;
    		}
    		$len = 0;
    		$ret = self::$redis->lLen(self::$preFix.$pubKey);
    		if ($ret === false){
    			return false;
    		}
    		$len = $ret;
    		return true;
    	}
    
    	public static function Subsribe($pubKey,&$data){
    		if(!self::PingAndConnect()){
    			return false;
    		}
    		$ret = self::$redis->lPop(self::$preFix.$pubKey);
    		if ($ret === false){
    			return false;
    		}
    		$data = $ret;
    		return true;
    	}
    
    	public static function BlockSubsribe($pubKey,&$data){
    		if(!self::PingAndConnect()){
    			return false;
    		}
    		try{
    			$ret = self::$redis->blPop(array(self::$preFix.$pubKey),0);
    		}
    		catch(Exception $e){
    			if(!self::PingAndConnect(true)){
    				return false;
    			}
    			return false;
    		}
    		if ($ret === false){
    			return false;
    		}
    		if ($ret === array()){
    			return false;
    		}
    		$data = $ret[1];
    		return true;
    	}
    
    	private static function PingAndConnect($isException = false){
    		if (!self::$redis) {
    			self::$redis = new Redis();
    			if (self::$timeOut == 0){
    			     ini_set('default_socket_timeout', 259200);
    				$ret = self::$redis->connect($redisHost,$redisPort,259200);
    			}
    			else{
    				$ret = self::$redis->connect($redisHost,$redisPort,self::$timeOut);
    			}
    			if ($ret === false){
    				return false;
    			}
    			$ret = self::$redis->auth($redisAuth);
    			if ($ret === false){
    				return false;
    			}
    		}
    		else{
    			if (self::$timeOut == 0 && !$isException){
    				return true;
    			}
    			$ret = self::$redis->ping();
    			if ($ret === false){
    				if (self::$timeOut == 0){
    				    ini_set('default_socket_timeout', 259200);
    					$ret = self::$redis->connect($redisHost,$redisPort,259200);
    				}
    				else{
    					$ret = self::$redis->connect($redisHost,$redisPort,self::$timeOut);
    				}
    				if ($ret === false){
    					return false;
    				}
    				$ret = self::$redis->auth($redisAuth);
    				if ($ret === false){
    					return false;
    				}
    			}
    		}
    		return true;
    	}
    }
    

    重启守护进程的 shell 脚本 restartprocess.sh

    #!/bin/sh
    if [ ! -n "$1" ]; then
    	echo "input proc name"
    	exit
    else
    	procname=$1
    fi
    
    pids=`(ps -ef | grep "$procname" | grep -v "grep" | grep -v $0) | awk '{print $2}'`
    
    for pid in ${pids[*]}
    do
    	kill -9 $pid
    done
    cd /path/to/your/project/
    setsid $procname &
    

    启动守护进程的命令:

    restartprocess.sh "php index.php /path/to/your/MyDaemon/func/run"
    

    Linux 定时任务

    可以设置一分钟或一秒钟执行一次 PHP 脚本。因为每次处理消息的时间不固定,可能导致消息积压或服务器负载过大。

    手工执行脚本

    用于处理偶然需求,简单。

  • 相关阅读:
    五年磨一剑未成
    通过实例学习 VSL Hello World
    Web3d明日之星基于Javascript和OpenGL的技术
    将自己常去Web3D网站整理到文章中来
    FreeBSD,比较安静
    通过实例学习Virtools脚本语言VSL 合并字符串
    关于SQLite
    android中AsyncTask和Handler对比
    JavaScript中Array的prototype运用
    WampServer2.X 安装与使用说明
  • 原文地址:https://www.cnblogs.com/kika/p/10851535.html
Copyright © 2011-2022 走看看