zoukankan      html  css  js  c++  java
  • 带优先级的队列

    很久以前写的一个功能,当时需要一个优先级的队列,特用新学的swoole写了一个简单的demo,仅满足当时的需求。

    功能说明

      完全参考httpsqs增加优先级参数level

            例:
                http://192.168.8.18:5555/?name=t ... a=testdata1&level=1
                http://192.168.8.18:5555/?name=t ... a=testdata2&level=1
                http://192.168.8.18:5555/?name=t ... a=testdata3&level=3
               此时,循环调用get方法,将获取数据testdata3,testdata1,testdata2
               注:get时先弹出优先级高的数据;同等优先级数据,按put的先后弹出;

    用到的工具:

          PHP PHP的swoole扩展

    思路:

      使用swoole扩展监听端口,接收用户传入的参数;用全局变量$info存储所有队列名、每个队列的状态信息(所有优先级、每个优先级中已读数据个数,未读数据个数)等;设置定时器,定时将info写入文件;实际数据存入文件,每个数据占一行,使用fgets按行读出;全局变量$fp保存所有打开的文件句柄,减少文件的打开次数;

    优势:
            使用世界上最好的语言PHP开发!

    性能测试:
    1:写性能:

    ab -c 100 -n 10000 http://192.168.8.18:5555/?name=test&opt=put&data=testdata&level=3
    

        

    2:读性能

    ab -c 100 -n 10000 http://192.168.8.18:5555/?name=test&opt=get
    

    附:基础代码

    <?php
    //带优化级的http队列服务
    error_reporting(E_ERROR | E_WARNING | E_PARSE);
    date_default_timezone_set('PRC');
    $dirname = "./data/queue_"; //文件存储位置及文件名
    $infodirname = "./data/info"; //存储基本信息的文件
    $maxfilesize = 1024 * 1024 * 1024;//文件最大1G
    $fp = array(); //打开文件的句柄
    $info = array(); //存储各个队列的基本信息,get/put的位置
    $infoflag = 0; //标记info是否被修改过
    $tmp = file_get_contents($infodirname);//初始化,从文件中载入基本信息
    !empty($tmp) && $info = unserialize($tmp);
      
    $http = new swoole_http_server("0.0.0.0", 5555);
    $http->set(array(
        'worker_num' => 1,   //工作进程数量 
    ));
    $http->on('WorkerStart', function($http) {
        $http->addtimer(10000);//增加定时器
    });
    $http->on('Timer', 'cronWriteInfo'); //定时将基本信息写入文件
      
    $http->on('request', "mycallback");
    $http->start();
      
    function cronWriteInfo() { //定时写文件
        global $info,$infoflag,$infodirname;
        if($infoflag) { //info被修改过时,才将之写入文件
            file_put_contents($infodirname, serialize($info));
        }
    }
    function mycallback($request, $response) {
        global $info, $infoflag;
        if($request->server['request_uri'] == "/favicon.ico") { //过滤掉浏览器的icon请求
            $response->end(" 
    ");
            return '';
        }
        $infoflag = 1;
        $param = $request->get;
        if(empty($param['name'])) {//队列名字不能为空
            $ret = "QUEUE_ERROR_QUEUENAME_EMPTY";
        } else {
            switch ($param['opt']) {
                case 'get':
                    $ret = get($param['name']);
                    break;
                case 'put':
                    if(($data = $param['data']) && !empty($param['data'])) {
                        empty($param['level']) && $param['level'] = 0;
                        $ret = put($param['name'], $param['data'], $param['level']);
                    } else {
                        $ret = "QUEUE_PUT_ERROR";
                    }
                    break;
                case 'status':
                    $ret = json_encode($info);
                    break;
                default:
                    $ret = "QUEUE_ERROR_PARAM";
                    break;
            }
        }
        $response->end($ret."
    ");
    }
    function get($queuename) {
        global $info;
        if($info[$queuename]['unread'] > 0) {
            $maxlevel = $info[$queuename]['curMaxlevel']; //当前最大优化级
            $pos = $info[$queuename]['leveldata'][$maxlevel]['getpos']; //文件偏移量
            $data =  getdataFromFile($queuename, $maxlevel, $pos); //读取数据
            $ret = pack("H*", $data); //data是压缩的数据,需要解压
            //修改全局变量
            $info[$queuename]['get']++;
            $info[$queuename]['unread']--;
            $info[$queuename]['leveldata'][$maxlevel]['unread']--;
            if($ret == "QUEUE_END") { //读到的数据不全,原因:数据文件大小超过最大值设置; 
                $data = getdataFromFile($queuename, $maxlevel, 0); //从头再读一次
                $ret = pack("H*", $data); //data是压缩的数据,需要解压
                $info[$queuename]['leveldata'][$maxlevel]['get'] = 1;//读取位置设为1
                $info[$queuename]['leveldata'][$maxlevel]['getpos'] = strlen($data) + 1;
            } else {
                $info[$queuename]['leveldata'][$maxlevel]['get']++;
                $info[$queuename]['leveldata'][$maxlevel]['getpos'] += strlen($data) + 1 ;
            }
            if($info[$queuename]['leveldata'][$maxlevel]['get'] == $info[$queuename]['leveldata'][$maxlevel]['put']) { //数据读取完毕,重新计算当前最高优先级的数据
                $info[$queuename]['curMaxlevel'] = getcurMaxlevel($info[$queuename]['leveldata']);
            }
            return $ret;
        } else {
            return 'QUEUE_EMPTY';
        }
    }
    function put($queuename, $userdata, $level) {
        global $info;
        if($info[$queuename]['leveldata'][$level]['put']+1 == $info[$queuename]['leveldata'][$level]['get']) { //队列写满
            return "QUEUE_FULL";
        }
        $info[$queuename]['put']++;
        $info[$queuename]['unread']++;
        $newpos = setDataToFile($queuename, $level, $userdata);
        $info[$queuename]['leveldata'][$level]['put']++;
        $info[$queuename]['leveldata'][$level]['unread']++;
        $info[$queuename]['leveldata'][$level]['putpos'] = $newpos;
        $info[$queuename]['curMaxlevel'] = getcurMaxlevel($info[$queuename]['leveldata']);
        return "QUEUE_PUT_OK";
    }
      
    //获取当前需要执行的优先级最高的数据
    function getcurMaxlevel($leveldata) {
        krsort($leveldata);
        foreach($leveldata as $level => $info) {
            if($info['unread'] > 0) {
                return $level;
            }
        }
        return 0;
    }
      
    //写数据到文件,如果文件写满,循环写入;
    function setDataToFile($queuename, $level, $userdata) {
        global $dirname, $maxfilesize, $fp;
        $file = $dirname.$queuename.$level;
        $packdata = unpack("H*", $userdata);
        //$$packdata['1'] = $userdata;
        $data = $packdata['1']."
    ";
        $size = filesize($file);
        if(!$fp[$queuename][$level]) {
            $fp = fopen($file, "ab+");
        }
        fseek($fp, $size);
        fwrite($fp, $data);
        clearstatcache();
        if($size + strlen($data) + 1 > $maxfilesize) { //超出文件最大值,进入下一轮
            fseek($fp, $size); //倒回指针
            fwrite($fp, "QUEUE_END"); //写入标记符号
            fseek($fp,0);//重头写
            fwrite($fp, $data);
            return strlen($data);
        } else {
            return $size + strlen($data);
        }
    }
    //从文件中读取数据
    function getdataFromFile($queuename, $level, $pos) {
        global $dirname,$fp;
        if(!isset($fp[$queuename][$level])) {
            $file = $dirname.$queuename.$level;
            $fp = fopen($file, "ab+");
        }
        fseek($fp, intval($pos));
        return trim(fgets($fp));
    }
  • 相关阅读:
    poj 2251 Dungeon Master
    poj 2488 A Knight's Journey
    poj 3349
    poj 2442
    poj 3274 Gold Balanced Lineup
    优先队列
    广州华盟信息科技有限公司
    山东山大华天软件有限公司
    RvmTranslator6.5 is released
    PipeCAD之管道标准库PipeStd(2)
  • 原文地址:https://www.cnblogs.com/dormscript/p/4787631.html
Copyright © 2011-2022 走看看