参考:PHP高级编程之消息队列
消息队列就是在消息的传输过程中,可以保存消息的容器。
常见用途:
- 存储转发:异步处理耗时的任务
- 分布式事务:多个消费者消费同一个消息队列
- 应对高并发:通过消息队列保存任务,慢慢处理
- 发布订阅:实现解耦
PHP 可以基于 Redis 的 List 数据类型实现简单的消息队列,可以参考 php-resque。当然也可以使用更强大的 RabbitMQ。
实现方式
PHP 守护进程
PHP 业务代码:
<?php
class MyDaemon
{
public $procNum = 8; // 进程总数
// 启动进程
public function run()
{
for ($i = 0; $i < $this->procNum; $i++) {
$nPID = pcntl_fork();//创建子进程
if ($nPID == 0) {
//子进程
OrgUtilMsgQ::init();
$this->work();
exit(0);
}
}
// 等待子进程执行完毕,避免僵尸进程
$n = 0;
while ($n < $this->procNum) {
$nStatus = -1;
$nPID = pcntl_wait($nStatus);
if ($nPID > 0) {
++$n;
}
}
}
//业务代码
public function work()
{
$MsgData = "";
while (true) {
usleep(10000); // 10 ms 执行一次
$ret = MsgQ::BlockSubsribe("MyMsgName", $MsgData);
// 业务代码
}
}
消息队列(基于Redis)库代码:
<?php
namespace OrgUtil;
class MsgQ {
public static $errCode = 0;
public static $errMsg = "";
public static $redis;
private static $preFix = "MsgQ.";
private static $timeOut = 10;
private static $redisHost = '';
private static $redisPort = '';
private static $redisAuth = '';
function __construct()
{
self::$redis = new Redis();
$ret = self::$redis->connect($redisHost,$redisPort,self::$timeOut);
$ret = self::$redis->auth($redisAuth);
}
function __destruct()
{
if(self::$redis) {
self::$redis->close();
}
}
public static function init($timeOut = 0){
if (!self::$redis) {
self::$redis = new Redis();
if(!empty($timeOut)){
self::$timeOut = $timeOut;
ini_set('default_socket_timeout', 259200);
$ret = self::$redis->connect($redisHost,$redisPort,self::$timeOut);
$ret = self::$redis->auth($redisAuth);
}
else{
self::$timeOut = 0;
ini_set('default_socket_timeout', 259200);
$ret = self::$redis->connect($redisHost,$redisPort,259200);
$ret = self::$redis->auth($redisAuth);
}
}
}
public static function Publish($pubKey,$data){
if(!self::PingAndConnect()){
return false;
}
$ret = self::$redis->rPush(self::$preFix.$pubKey,$data);
if ($ret === false){
return false;
}
return true;
}
public static function GetListLen($pubKey,&$len){
if(!self::PingAndConnect()){
return false;
}
$len = 0;
$ret = self::$redis->lLen(self::$preFix.$pubKey);
if ($ret === false){
return false;
}
$len = $ret;
return true;
}
public static function Subsribe($pubKey,&$data){
if(!self::PingAndConnect()){
return false;
}
$ret = self::$redis->lPop(self::$preFix.$pubKey);
if ($ret === false){
return false;
}
$data = $ret;
return true;
}
public static function BlockSubsribe($pubKey,&$data){
if(!self::PingAndConnect()){
return false;
}
try{
$ret = self::$redis->blPop(array(self::$preFix.$pubKey),0);
}
catch(Exception $e){
if(!self::PingAndConnect(true)){
return false;
}
return false;
}
if ($ret === false){
return false;
}
if ($ret === array()){
return false;
}
$data = $ret[1];
return true;
}
private static function PingAndConnect($isException = false){
if (!self::$redis) {
self::$redis = new Redis();
if (self::$timeOut == 0){
ini_set('default_socket_timeout', 259200);
$ret = self::$redis->connect($redisHost,$redisPort,259200);
}
else{
$ret = self::$redis->connect($redisHost,$redisPort,self::$timeOut);
}
if ($ret === false){
return false;
}
$ret = self::$redis->auth($redisAuth);
if ($ret === false){
return false;
}
}
else{
if (self::$timeOut == 0 && !$isException){
return true;
}
$ret = self::$redis->ping();
if ($ret === false){
if (self::$timeOut == 0){
ini_set('default_socket_timeout', 259200);
$ret = self::$redis->connect($redisHost,$redisPort,259200);
}
else{
$ret = self::$redis->connect($redisHost,$redisPort,self::$timeOut);
}
if ($ret === false){
return false;
}
$ret = self::$redis->auth($redisAuth);
if ($ret === false){
return false;
}
}
}
return true;
}
}
重启守护进程的 shell 脚本 restartprocess.sh
:
#!/bin/sh
if [ ! -n "$1" ]; then
echo "input proc name"
exit
else
procname=$1
fi
pids=`(ps -ef | grep "$procname" | grep -v "grep" | grep -v $0) | awk '{print $2}'`
for pid in ${pids[*]}
do
kill -9 $pid
done
cd /path/to/your/project/
setsid $procname &
启动守护进程的命令:
restartprocess.sh "php index.php /path/to/your/MyDaemon/func/run"
Linux 定时任务
可以设置一分钟或一秒钟执行一次 PHP 脚本。因为每次处理消息的时间不固定,可能导致消息积压或服务器负载过大。
手工执行脚本
用于处理偶然需求,简单。