zoukankan      html  css  js  c++  java
  • php队列的实现思路和详细过程

    一、队列使用场景:为什么需要队列
    在web开发中,我们经常会遇到需要处理批量任务的时候,这些批量任务可能是用户提交的,也可能是当系统被某个事件触发时需要进行批量处理的,面对这样的 任务,如果是用户提交的批量任务,初级程序员只能让用户触发提交动作后,等待服务器处理完毕,并且将结果返回到浏览器,期间用户不能关掉浏览器窗口,如果 数据比较大,或者处理速度比较慢,那用户体验将会因此受到直接影响。但是当我们使用某讯或者某浪的邮箱时,点击群发邮件之后,只需等待很短的时间,浏览器 提示提交成功,正在发送之类的信息时,用户就可以关掉浏览器,稍后,收件地址栏里的邮箱将陆续收到该群发邮件,再比如群发定时邮件,以及当商城系统中有客 户下单,客户,客服,仓库等相关人员收到订单邮件信息。诸如此类,队列的应用范围是如此之广。

    二 :普通工程师的解决方案和架构师的解决方案
    方案1:建表存邮件,消息等,用定时程序取出发送。

    方案2:抽象到更高一层,开发一套通用异步处理队列适用于任何复杂的业务逻辑
    那么,作为架构师,使用队列的做法,将抽象层和业务层分离,可具有良好的扩展性和可维护性。相比较而言就高明了许多,下面就我们介绍一下自定义队列的实现思路和方法。

    三 :队列总体设计

    1:需要队列程序,提供加入队列接口和取队列接口等
    2:需要存储队列,文件或者数据库
    3:需要定时程序取出队列并执行
    4:其它扩展功能:优先级,日志,定时等

    代码的目录结构如下,每个文件的作用用//注释来标明
    |–addTask.php              //添加任务到队列的例子
    |–cronMission.php         //定时任务调度程序,例如linux中受crontab直接调用的文件,业务逻辑工程师可以在这个文件中灵活定义自己的队列任务,从而不用每个队列任务 都需要上服务器修改crontab,从而在安全性,便捷性方面有很大提高
    |–db.php                      //数据库操作
    |–db.sql                       //建立队列需要用到的基本表结构
    |–doQueue.php             //执行队列任务
    |–Queue.class.php         //队列核心业务在这里定义,包括将任务加入队列,读队列,更改队列任务状态
    |–sendMsg.php             //队列要实现具体任务的业务接口,比如现有系统的发送消息的接口,这里例子中因为将此队列程序和现有系统系统集成,用写入日志来演示

    四 :队列具体实现一:建任务存储表
    1:
    先来个最基本的:

    CREATE TABLE`queue` (
     
      id int(11) NOT NULL auto_increment primarykey,
     
      taskphp varchar(128) NOT NULL default '',
     
      param text not null default '',
     
      status tinyint not null default 0,
     
      ctime timestamp NOT NULL default CURRENT_TIMESTAMP,
      
    KEY (ctime)
     
     ) ENGINE=InnoDBDEFAULT CHARSET=utf8;

    字段解释:
    taskphp:处理业务的接口文件
    param:处理业务的接口文件需要接收的参数
    status:任务处理状态,0为未处理,处理完毕更改为1

    五 、队列具体实现二:定义调用接口
     

    <?php
     
     /**
      *
      * 任务队列实现
      *
      */
            
     include_once('db.php');
     
    class Queue
     
     {
     
            
         /**
          * 把任务扔到队列
          *
          * @param string $taskphp   执行任务的程序
     
          * @param string $param     执行任务程序所用的参数
            
          * 例如,群发消息加入队列:
     
          * $arr = array(
     
          *      "uid" => 4,//发信息的人的UID
     
          *      "uids" => array(6,234,34,67,7888,2355), //接收信息的人的UID
     
          *      "content" => 'xxxxx',//信息内容
     
          *  );
     
          * $cqueue = new Queue();
     
          * $cqueue->add("/app/send_msg.php", serialize($arr));
          *
          */
     
         public function add($taskphp,$param)
         
    {
     
             $taskphp = mysql_real_escape_string($taskphp);
     
             //$param = mysql_real_escape_string($param);
     
             $param $param;
     
             $sql "insert into queue (taskphp, param) values('".$taskphp."', '".$param."')";
     
             $re = execute($sql);
     
             if ($re)
             {
                 $pid = mysql_insert_id();
     
                 return $pid;
             }
             
    else
     
             {
     
                 return false;
             }
         }
     
            
            
         /**
          * 读取任务队列
          
          * @param string $limit 一次取多少条
          */
          public function getQueueTask($limit = 1000)
          
          {
     
             $limit = (int)$limit;
     
             $sql "select id, taskphp, param from queue  where status = 0 order by id asc";
     
             $re = query($sql);
     
             return $re;
          }
            
         /**
          * 更新任务状态
          
          * @param string $limit 一次取多少条
          */
          public function updateTaskByID($id)
     
          {
     
             $id = (int)$id;
     
             $mtime = time();
     
             $sql "update queue  set status =1, mtime = ".$mtime." where id = ".$id;
     
             $re = execute($sql);
     
             return $re;
          }
            
            
          public static function a2s($arr)
     
         {
             $str "";
     
             foreach ($arr as $key => $value)
             
               {
                 if (is_array($value))
                 
                   {
                     foreach ($value as $value2)
                     
                       {
     
                         $str .= urlencode($key) . "[]=" . urlencode($value2) . "&";
                     }
                 }
     
                 else
     
                 {
                     $str .= urlencode($key) . "=" . urlencode($value) . "&";
                 }
             }
     
             return $str;
         }
     
            
         public static function s2a($str)
     
         {
             $arr array();
     
             parse_str($str$arr);
     
             return $arr;
         }
            
     }
            
     ?>

    1:加入队列接口
    l  //$param1 为执行任务的程序,$param2 为程序参数,可以为序列化的数据
    l  $cqueue->add($param1,$param2);
    2:  读取队列接口
    l  $tasks = $cqueue->getQueueTask($limit = 1000);
    3:更新任务状态
    l  $cqueue->updateTaskStatus($id);
    4:a2s是自定义的一个数组转换字符串方法,这里不要使用json_encode,容易出现问题,同样,从数据库中取出转换为数组的时候,使用s2a方法
    l  $re = $cqueue->add("sendMsg.php", Queue::a2s($arr));

    六、队列具体实现三:写执行队列的程序
    根据设计,执行队列的程序文件是 do_queue.php , 它的主要功能是把任务从队列表里取出来,并且在后台执行。

    do_queue.php部分代码:

    $phpcmd exec("which php");    //查找到php安装位置
     
     $cqueue new Queue();
     
     $tasks $cqueue->getQueueTask(200);
     
     foreach ($tasks as $t)
     
     {
         $taskphp $t['taskphp'];
     
         $param $t['param'];
     
         $job $phpcmd " " escapeshellarg($taskphp) . " " escapeshellarg($param);
     
         system($job);
     }

    七、具体任务的业务实现

      还是拿群发消息来做例子,我们需要写好一个群发消息的程序,这个程序接收事先定义好的参数,然后根据参数调用发消息的接口把消息发送出去。
      这个一般由做业务功能的工程师实现。但是架构师事先得写文档例子,教会别人使用。
    send_msg.php:

    $para $argv[1];
     
     $arr = unserialize($para);
     
     $cmessage new Message();
     
     foreach($arr['uids'as $touid)
     
     {
     
         $cmessage->send($arr['uid'], $touid$arr['content']);
     }

    八、服务器部署一:配置crontab

    咱们执行队列的程序都写好了, 这个程序怎么触发呢,当然就要用到linux的定时任务,每隔一定的时间,执行do_queue.php一次。但是呢,这里不是直接调用 do_queue.php,咱们再提高一层,加个调度程序cron_mission.php, 在cron_mission.php里面调用do_queue.php
    配置定时任务 crontab:
    l  crontab –e
    l  * * * * *  cd /ucai/schedule;php cron_mission.php >> cron_mission.log
    #可以先使用crontab -l查看本机已经使用的定时任务
    九、服务器部署二:写定时任务调度程序
    思路:将定时任务写入到任务调度程序cron_mission.php中,这样可以在cron_mission.php中灵活控制队列任务。相比较直接通 过crontab控制doQueue.php而言,避免了频繁修改服务器上的crontab,从安全,便于维护等角度来说,都是上策。

    cron_mission.php 示例:

    if ($minute % 5 == 0)
     
     {
     
         if(chdir($site_dir."app/")) {
     
             $cmd "$phpcmd do_queue.php > do_queue.log &";
     
             echo '[' $ymd ' ' $hour ':' $minute '] ' $cmd "n";
     
             system($cmd);
         }
     }

     

    十、开启多进程并发执行队列
    思路:对任务序列进行编号,数据库中执行的时候
    where条件加上id%每个队列要执行任务总数=队列编号
    这样可以避免重复处理
    例如:每个进程执行10条任务,修改如下
    1:定时任务的修改

    修改前:

    if ($minute % 5 == 0)
     
     {
     
         if(chdir($site_dir."app/")) {
     
             $cmd "$phpcmd do_queue.php > do_queue.log &";
     
             echo '[' $ymd ' ' $hour ':' $minute '] ' $cmd "n";
     
             system($cmd);
         }
     }
     
    修改后:
    if ($minute % 5 == 0)
     
     {
     
         for ($i=0; $i < 10; $i++) { 
     
             $cmd "$phpcmd doQueue.php 10 $i>> doQueueMission".date('Y-m-d').".log  ";
     
             echo  date("Y-m-d H:i:s") . "t : " .$cmd."n";
     
             system($cmd);
         }
     }

    //每次进行10个进程,$i来区分是当前的进程标示
    2:队列执行程序的修改

    修改前:

    $phpcmd 'D:workwampbinphpphp5.3.10php ';
     
     $cqueue new Queue();
     
     $tasks $cqueue->getQueueTask(200);
     

    修改后:

    $phpcmd 'D:workwampbinphpphp5.3.10php ';
     
     $total=$argv[1];
     
     $i=$argb[2];
     
     $cqueue new Queue();
     
     $tasks $cqueue->getQueueTask($total,$i,200);

    3:取队列接口的修改

    修改前:

    public function getQueueTask($limit = 1000)
          {
             $limit = (int)$limit;
     
             $sql "select id, taskphp, param from queue  where status = 0 order by id asc";
     
             $re = query($sql);
     
             return $re;
          }

    修改后:

    public function getQueueTask($total,$i,$limit = 1000)
     
          {
     
             $limit = (int)$limit;
     
             $sql "select id, taskphp, param from queue  where status = 0 and id%$total=$i order by id asc";
             $re = query($sql);
     
             return $re;
          }

    4:需要关注服务器压力
    进程数定为多少,取决于服务器压力

    十一、实现任务优先级
    1:任务存储表加优先级字段
    在数据表里,加一个优先级字段,按字段值的数值大小来区分优先级
    2:修改取队列任务接口,按优先级取
    同样是在sql语句中增加order by

    十二、记录队列日志
    1:关键地方加echo
    2:shell脚本的>>和>的各自作用
    总结:
    我们这里的队列实现借助了服务器的计划任务来实现,例如linux中的crontab,这本身是linux系统中的一个程序,平时我们还可以使用他来进行 定时执行.sh脚本,例如将数据库备份打包并ftp传送到指定服务器上,这个功能不需要借助php脚本,直接用.sh脚本就可以实现。在这里我们巧妙的将 crontab和php脚本结合,并且使用crontab来不断调用一个队列调度接口cronMission.php,再通过 cronMission.php直接来控制具体什么时候或者是满足什么条件来执行什么队列任务。
    这里面几个需要注意的地方
    1:往数据库中存取数据时,不要直接使用json_encode或者json_decode,容易造成一些意外问题,在代码中,我们定义了a2s和s2a两个方法,分别是处理数组转为字符串,和从数据库中读取字符串后转为数组。
    2:当任务量比较大,同时服务器负载又没有充分利用的时候,可以使用多进程并发处理,在并发处理的时候需要考虑一个问题,就是如何避免重复,在这里我们使 用了,对队列任务进行标记,每次从数据库中读取一个进程需要处理的一批任务,使用数据库中id与批次标示取余等于0的方法来区分,避免不同批次的队列,重 复处理相同任务。(上面步骤10中有具体实现)

  • 相关阅读:
    Begin Example with Override Encoded SOAP XML Serialization
    State Machine Terminology
    How to: Specify an Alternate Element Name for an XML Stream
    How to: Publish Metadata for a WCF Service.(What is the Metadata Exchange Endpoint purpose.)
    Beginning Guide With Controlling XML Serialization Using Attributes(XmlSerializaiton of Array)
    Workflow 4.0 Hosting Extensions
    What can we do in the CacheMetaData Method of Activity
    How and Why to use the System.servicemodel.MessageParameterAttribute in WCF
    How to: Begin Sample with Serialization and Deserialization an Object
    A Test WCF Service without anything of config.
  • 原文地址:https://www.cnblogs.com/yingmo/p/6148380.html
Copyright © 2011-2022 走看看