zoukankan      html  css  js  c++  java
  • thinkphp 实现rabbitMq常驻进程消费队列

    1,项目一级目录新建一个server文件

    #!/usr/bin/env php
    <?php
    try {
        require __DIR__ . "/start.php";
    } catch (Exception $e) {}
    
    $queueNames = config('queue.qnames');
    if (empty($queueNames)) exit('队列名称未配置');
    
    $option = $argv[1];
    $queueName = isset($argv[2]) ? $argv[2] : null;
    if ($queueName && !in_array($queueName, $queueNames)) exit('队列名称不存在');
    
    if ($queueName) {
        unset($queueNames);
        $queueNames[] = $queueName;
    }
    
    echo "开始操作... 
    ";
    switch ($option) {
        case 'start':
            startQueue($queueNames);
            break;
    
        case 'stop':
            stopQueue($queueNames);
            break;
    
        case 'reload':
            stopQueue($queueNames);
            startQueue($queueNames);
            break;
    
        case 'monitor':
            demon();
            break;
    
        default:
            echo '操作类型: start|stop|reload|monitor' . "
    ";
            break;
    }
    
    echo "结束操作... 
    ";
    
    function startQueue(array $queueNames)
    {
        $correct = 'dispatch/Correct/index$';
        $ret = isProcessExist($correct);
        if($ret ==  0){
            $cmd = "nohup php start.php dispatch/Correct/index >> /tmp/correct.log &";
            system($cmd,$result);
            ($result == 0) or die("$cmd 启动失败 
    "); 
            echo "$correct 队列已启动 
    "; 
        }else {
            echo "$correct 进程已存在,无需重启 
    ";
        }
        foreach ($queueNames as $_queueName) {
            $proccessname = "dispatch/Consume/index/qname/$_queueName$";
            if(isProcessExist($proccessname) == 1) {
                echo $_queueName . "进程已经存在,无需重启 
    ";
            }else {
                $command = "nohup php start.php dispatch/Consume/index/qname/" . $_queueName." >> /tmp/$_queueName.log &";
                system($command,$result);
                ($result == 0) or die("$command 启动失败 
    ");
                echo $_queueName . "队列已启动 
    ";
            } 
        }
    }
    
    function demon()
    {
        $queues = config('queue.qnames'); 
        
        
        foreach($queues as $queue) {
            if(!isProcessExist($queue)) {
                try{
                    startQueue([$queue]);
                }catch(Exception $e){
                    echo "队列监控启动失败: ".$e->getMessage()."
    ";
                }
            }
        }
    }
    
    function stopQueue(array $queueNames)
    {
        $redisconf = config('redis');
        $redis = libraryRedis::getInstance($redisconf);
        foreach ($queueNames as $key => $_queueName) {
            $redis->hset('script:signal', $_queueName, false);
            libraryQueue::getInstance()->addEvent('Stop', [])->push($_queueName);
            $proccessname = "dispatch/Consume/index/qname/$_queueName$";
            while(true) {
                usleep(200000); // 0.2s
                $result = isProcessExist($proccessname);
                if(!$result) {
                    break;
                }
                unset($result);
            }
            echo $_queueName . "队列已停止 
    ";
        }
    }
    
    function isProcessExist($processname) 
    {
        $ps = 'ps axu|grep "'.$processname.'"|grep -v "grep"|wc -l';
        $ret = shell_exec("$ps");
        $ret = rtrim($ret, "
    ");
        return $ret;
    }
    View Code

    2,项目一级目录新建一个start.php

    3,修改application/extra/queue.php

     

    <?php
    // +----------------------------------------------------------------------
    // | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
    // +----------------------------------------------------------------------
    // | Copyright (c) 2006-2016 http://thinkphp.cn All rights reserved.
    // +----------------------------------------------------------------------
    // | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
    // +----------------------------------------------------------------------
    // | Author: yunwuxin <448901948@qq.com>
    // +----------------------------------------------------------------------
    
    return [
    //    'connector' => 'Sync'
        'qnames'=>[
            'Trade',        //投资,变现,返本相关
            'Activity',        //活动相关
            'Sms',            //发短信相关
            'Repay',        //借款人还款
            'Interest',     //计息,派息相关
            'DebtOnline',   //债权上线相关
            'Other'            //其他
        ],
        'driver' => env('QUEUE_DRIVER', 'redis'),
    ];
    View Code
  • 相关阅读:
    hive表增量抽取到oracle数据库的通用程序(二)
    java进程的守护进程脚本
    hadoop2.7节点的动态增加与删除
    hive表增量抽取到oracle数据库的通用程序(一)
    arduino 驱动电调
    arduino IO口
    通过电机编码器AB相输出确定电机转向
    Wifi小车资料
    winform 按键控制
    vs2010 EF4.0 访问mysql
  • 原文地址:https://www.cnblogs.com/spicy/p/7920867.html
Copyright © 2011-2022 走看看