zoukankan      html  css  js  c++  java
  • 基于swoole+Redis的消息实时推送通知

    swoole+Redis将实时数据的推送

    一 实现功能

    设计师订单如果设计师未抢单,超时(5分钟)设计订单时时给设计师派送,
    设计师公众号中收到派单信息
    设计发布者收到派单成功信息

    环境

    centos6.10
    redis-4.0.2
    swoole-src-4.4.12
    php-7.1.5
    MYsyql5.7
    

    在centos6默认是gcc-4.7,安装swoole的时候需要升级到gcc-4.8

    二 实现流程

    1.开启swoole server端监听
    2.开启swoole client连接执行定时执行
    3.使用swoole task 异步执行推送逻辑
    

    开始监听

    服务端窗口

    # php71 pushServer.php
    

    client连接执行开始任务

    客户端窗口

    # php71 pushClient.php start
    

    默认start开启5个client tcp链接,每个链接开启一个1s定时器

    开启后服务端窗口的变化

    [root@111111 swoole_server]# php71 pushServer.php 
    Client-1: 连接成功
    reactor-7 Client-1 接受数据: data=start 
    Client-1: 连接结束
    Client-2: 连接成功
    reactor-0 Client-2 接受数据: data=start 
    Client-3: 连接成功
    Client-2: 连接结束
    reactor-7 Client-3 接受数据: data=start 
    Client-3: 连接结束
    Client-4: 连接成功
    reactor-0 Client-4 接受数据: data=start 
    Client-4: 连接结束
    Client-5: 连接成功
    reactor-7 Client-5 接受数据: data=start 
    Client-5: 连接结束
    2019-12-14 01:29:15reactor-7 client-1 timer-2: 定时器第1 次执行
    

    redis添加数据

    向order_designer_list队列中添加数据

    127.0.0.1:6379> LPUSH order_designer_list 1912136916313##150481##1576140373##oLvqQwo25p5myELvO5VXj0k-7ngk##李伟测试##13926
    (integer) 1
    127.0.0.1:6379> LRANGE order_designer_list 0 10
    1) "1912136916313##150481##1576140373##oLvqQwo25p5myELvO5VXj0k-7ngk##李伟测试##13926"
    

    值为:
    订单号##orderId##分配到期时间(uninx时间戳)##微信openid##设计师用户名##设计师userid

    服务端窗口的变化

    2019-12-14 01:29:15reactor-7 client-1 timer-2 taskid=0 :投递异步任务 data=1912136916313_150481_1576140373_oLvqQwo25p5myELvO5VXj0k-7ngk_李伟测试_13926
    2019-12-14 01:29:15执行任务 id=0  data= 1912136916313_150481_1576140373_oLvqQwo25p5myELvO5VXj0k-7ngk_李伟测试_13926 
    任务完成 id=0 结果=1912136916313_150481_1576140373_oLvqQwo25p5myELvO5VXj0k-7ngk_李伟测试_13926已经派单过
    

    task任务执行逻辑

    数据分析
    数据通过微信消息模板接口,将信息内容发送到客户微信公众号上
    将监控日志和错误日志写入mysql数据库

    三 项目代码结构

    ├── composer.json
    ├── composer.lock
    ├── swoole_server
    │   ├── 2019-12-13-swoole.log
    │   ├── 2019-12-14-swoole.log
    │   ├── DispatchOrder.php
    │   ├── pushClient.php
    │   ├── pushServer.php
    │   ├── Runtime.php
    └── vendor
    │   ├── 忽略php类库文件
    │   ├── ....
    

    conposer.json

    {
        "require": {        
                "predis/predis": "^1.1",
            "catfan/medoo": "1.7.*"
        }
    }
    

    pushServer.php

    <?php
    require __DIR__ . '/../vendor/autoload.php';
    require './DispatchOrder.php';
    
    class swoolePush
    {
    
        private $timerId;
        private $timeCount = 0;
        //微信消息模板id
        private $templateid = "C3HSuUJdS86p_4gj4xxth943DdE2zkE3IxnlrK5MFTI";
    
        public function receiveHandle($serv, $fd, $reactor_id, $data)
        {
            echo "reactor-{$reactor_id} Client-$fd 接受数据: data=$data " . PHP_EOL;
            if ('start' == trim($data)) {//开启定时器
                $redisServ = new SwooleCoroutineRedis;
                $redisServ->connect('127.0.0.1', 6379);
                //每隔5000ms触发一次
                $this->timerId = SwooleTimer::tick(5000, function ($timer_id) use ($redisServ, $serv, $reactor_id, $fd) {
                    $this->timeCount++;
                    echo date("Y-m-d H:i:s") . "reactor-{$reactor_id} client-{$fd} timer-{$timer_id}: 定时器第{$this->timeCount} 次执行" . PHP_EOL;
                    $item = $redisServ->lIndex("order_designer_list", -1);
                    if ($item) {
    
                        //订单号_orderId_分配到期时间(uninx时间戳)_openid_设计师用户名_设计师userid
                        $order = explode('##', $item);
                        if ($order[2] < time()) {
                            //投递异步任务
                            $task_id = $serv->task($item);
                            echo date("Y-m-d H:i:s") . "reactor-{$reactor_id} client-{$fd} timer-{$timer_id} taskid={$task_id} :投递异步任务 data=$item" . PHP_EOL;
                            $redisServ->rPop("order_designer_list");
                        }
                    }
                });
            } elseif ('stop' == trim($data)) {//清除定时器
                echo "reactor-{$reactor_id} client-{$fd} timer-{$this->timerId} 定时器结束" . PHP_EOL;;
                SwooleTimer::clear($this->timerId);
            }
    
        }
    
        /**
         * 任务处理
         * @param $serv
         * @param $fd
         */
        public function taskHandle($serv, $task_id, $from_id, $data)
        {
            echo date("Y-m-d H:i:s") . "执行任务 id=$task_id  data= $data " . PHP_EOL;
    
            //订单号_orderId_分配到期时间(uninx时间戳)_openid_设计师用户名_设计师userid
            $order = explode('##', $data);
            $returnMsg = "";
    
            //派单逻辑
            $dispatchOrderObj = new DispatchOrder();
            if ($dispatchOrderObj->isOrderNeedGive($order[1])) {
    
                //开始派单 orderGive($orderId, $designerUserId,$otherInfo=[])
                $result = $dispatchOrderObj->orderGive($order[1], $order[5], [
                    'design_username' => $order[4], 'order_no' => $order[0], 'design_openid' => $order[3],
                    'design_templdateid' => "C3HSuUJdS86p_4gj4xxth943DdE2zkE3IxnlrK5MFTI"]);
                if ($result['state'] != 1) {
                    //派单失败重新加入redis
                    $redisServ = new SwooleCoroutineRedis;
                    $redisServ->connect('127.0.0.1', 6379);
                    $redisServ->lPush("order_designer_list", $data);
                    $returnMsg = "{$order[0]}派单失败, errmsg:" . $result['message'];
                } else {
                    $returnMsg = "{$order[0]}派单成功, errmsg:" . $result['message'];
                }
    
            } else {
                $returnMsg = "{$order[0]}已经派单过";
            }
            //返回任务执行的结果
            $serv->finish($returnMsg);
        }
    
        /**
         * 任务完成通知
         * @param $serv
         * @param $fd
         */
        public function finishHandle($serv, $task_id, $data)
        {
            echo "任务完成 id=$task_id 结果=$data" . PHP_EOL;
        }
    
        /**
         * 连接开始
         * @param $serv
         * @param $fd
         */
        public function connectHandle($serv, $fd)
        {
            echo "Client-$fd: 连接成功" . PHP_EOL;
        }
    
        /**
         * 连接结束
         * @param $serv
         * @param $fd
         */
        public function closeHandle($serv, $fd)
        {
            echo "Client-$fd: 连接结束" . PHP_EOL;
        }
    }
    
    //创建Server对象,监听 127.0.0.1:9501端口
    $serv = new SwooleServer("127.0.0.1", 9501);
    //设置异步任务的工作进程数量
    $serv->set([
        'task_worker_num' => 4 * 5,
        'log_file' => date("Y-m-d") . '-swoole.log',
    ]);
    $object = new swoolePush();
    //监听连接进入事件
    $serv->on('Connect', [$object, 'connectHandle']);
    //监听数据接收事件
    $serv->on('Receive', [$object, 'receiveHandle']);
    //监听连接关闭事件
    $serv->on('Close', [$object, 'closeHandle']);
    //处理异步任务
    $serv->on('task', [$object, 'taskHandle']);
    //处理异步任务的结果
    $serv->on('finish', [$object, 'finishHandle']);
    //启动服务器
    $serv->start();
    

    pushClient.php

    <?php
    
    class pushClient
    {
        private $client;
    
        public function __construct()
        {
            $this->client = new SwooleClient(SWOOLE_SOCK_TCP);
            if (!$this->client->connect('127.0.0.1', 9501, -1)) {
                exit("connect failed. Error: {$this->client->errCode}" . PHP_EOL);
            }
        }
    
        /**
         * client发送开启定时器的指令
         */
        public function startTimer()
        {
    
            $this->client->send("start");
        }
    
        /**
         * client发送结束定时器的指令
         */
        public function stopTimer()
        {
    
            $this->client->send("stop");
        }
    
        /**
         * 自动销毁client
         */
        public function __destruct()
        {
            $this->client->close();
        }
    }
    
    //client连接开始
    if (empty($argv[1])) {
        exit("缺少参数, 参数:(string 'start|getinfo', int [num]) ");
    }
    
    $num = empty($argv[2]) ? 5 : intval($argv[2]);
    //不同操作返回
    //开启多个client连接,并发送start命令
    if (trim($argv[1]) == 'start') {
        for ($i = 0; $i < $num; $i++) {
            $pushClient = new pushClient();
            $pushClient->startTimer();
            sleep(1);
        }
    } elseif (trim($argv[1]) == 'getinfo') {
    
        //获取当前swoole的信息
        $server = new SwooleServer("127.0.0.1", 9501);
        $arr = [
            'client_connection_num' => count($server->connections)
    
        ];
        var_dump($arr);
    }
    

    DispatchOrder.php

    <?php
    require __DIR__ . '/../vendor/autoload.php';
    require './Runtime.php';
    
    use MedooMedoo;
    
    class DispatchOrder
    {
        private $configWchat = [
            'app_id' => 'wx888888888888',
            'secret' => '6d6e4f19888888888888888888',
        ];
        private $configMysql = [
            'database_type' => 'mysql',
            'server' => '10.0.0.0',
            'database_name' => 'db_test',
            'username' => 'root',
            'password' => '123456'
        ];
        private $tokenWchat;
        private $severMysql = null;
        private $severRedis = null;
        private $severRuntime = null;
        private $result = ['state' => 0, 'message' => ''];
    
    
        public function __construct()
        {
            //mysql
            if ($this->severMysql == null) {
    
                $this->severMysql = new Medoo($this->configMysql);
            }
            //日志
            if ($this->severRuntime == null) {
    
                $this->severRuntime = new Runtime(1, 1, 1, $this->severMysql);
            }
        }
    
        /**
         * 是否需要分配订单
         * @param $orderId
         * @return int 1需要分配 0不需要
         */
        public function isOrderNeedGive($orderId)
        {
            $data = $this->severMysql->select('tg_order_task', [
                'status',
            ], [
                'order_id' => $orderId
            ]);
            //1需要分配 0不需要
            return (empty($data) || $data[0]['status'] == 1) ? 0 : 1;
        }
    
        /**
         * @return Medoo
         */
        public function orderGive($orderId, $designerUserId, $otherInfo = [])
        {
            //微信access_token
            if ($this->severRedis == null) {
                $redisServ = new PredisClient([
                    'scheme' => 'tcp',
                    'host' => '127.0.0.1',
                    'port' => 6379]);
                $this->severRedis = $redisServ;
            }
            $accessToken = $this->severRedis->get('swoole_order_dispatch_token');
            $accessTokenArr = explode("##", $accessToken);
            if (empty($accessToken) || $accessTokenArr[1] < time()) {
                //设置token
                $urlToken = "https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid=" . $this->configWchat['app_id'] . "&secret=" . $this->configWchat['secret'];
                $res = $this->httpRequest($urlToken, "get");
                $arr = json_decode($res, true);
                if (empty($arr['access_token'])) {
                    $this->severRuntime->log("请求微信token接口失败,返回:" . $res);
                }
                $expiresTime = (string)(time() + 7200);
                $value = (string)($arr['access_token'] . "##" . $expiresTime);
                $this->severRedis->set('swoole_order_dispatch_token', $value);
                $this->tokenWchat = $arr['access_token'];
            } else {
    
                $this->tokenWchat = $accessTokenArr[0];
            }
    //        var_dump($this->tokenWchat);
            //1.更新订单状态
            $updateData = [
                'design_userid' => $designerUserId,
                'design_username' => $otherInfo['design_username'],
                'design_time' => date("Y-m-d H:i:s"),
                'status' => 1,//0 待抢单 1待设计 2已设计 3已完成4已取消5驳回
            ];
    
            $res = $this->severMysql->update('tg_order_task', [
                'status' => 1,
            ], [
                'order_id' => $orderId
            ]);
    //        var_dump($res);
            if (0 == $res->rowCount()) {
                $this->result['message'] = "更新任务订单失败,订单号:" . $otherInfo['order_no'];
                $this->severRuntime->log($this->result['message']);
                return $this->result;
            }
    
            //2.微信模板信息推送
            $data = [
                'touser' => $otherInfo['design_openid'],
                'template_id' => $otherInfo['design_templdateid'],
                'url' => '',
                'data' => [
                    'keyword1' => ['value' => $otherInfo['order_no']],
                    'keyword2' => ['value' => date("Y-m-d H:i:s")]
                ],
            ];
            $url = 'https://api.weixin.qq.com/cgi-bin/message/template/send?access_token=' . $this->tokenWchat;
            $res = $this->httpRequest($url, "post", json_encode($data));
    //        var_dump($res);
            $resObj = json_decode($res);
            if (0 != $resObj->errcode) {
                $this->result['message'] = "{$otherInfo['order_no']}微信推送失败,$res";
                $this->severRuntime->log($this->result['message']);
                return $this->result;
            }
            $this->result['state'] = 1;
            $this->result['message'] = "执行成功,订单号:" . $otherInfo['order_no'];
            return $this->result;
    
        }
    
        /**
         * CURL请求
         *
         * @param string  请求url地址
         * @param method 请求方法 get post
         * @param null $postfields post数据数组
         * @param array $headers 请求header信息
         * @param bool|false $debug 调试开启 默认false
         * @return mixed
         */
        private function httpRequest($url, $method, $postfields = null, $headers = [], $debug = false)
        {
            $method = strtoupper($method);
            $ci = curl_init();
            /* Curl settings */
            curl_setopt($ci, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_0);
            curl_setopt($ci, CURLOPT_USERAGENT, "Mozilla/5.0 (Windows NT 6.2; WOW64; rv:34.0) Gecko/20100101 Firefox/34.0");
            curl_setopt($ci, CURLOPT_CONNECTTIMEOUT, 60); /* 在发起连接前等待的时间,如果设置为0,则无限等待 */
            curl_setopt($ci, CURLOPT_TIMEOUT, 7); /* 设置cURL允许执行的最长秒数 */
            curl_setopt($ci, CURLOPT_RETURNTRANSFER, true);
            switch ($method) {
                case "POST":
                    curl_setopt($ci, CURLOPT_POST, true);
                    if (!empty($postfields)) {
                        $tmpdatastr = is_array($postfields) ? http_build_query($postfields) : $postfields;
                        curl_setopt($ci, CURLOPT_POSTFIELDS, $tmpdatastr);
                    }
                    break;
                default:
                    curl_setopt($ci, CURLOPT_CUSTOMREQUEST, $method); /* //设置请求方式 */
                    break;
            }
            $ssl = preg_match('/^https:///i', $url) ? TRUE : FALSE;
            curl_setopt($ci, CURLOPT_URL, $url);
            if ($ssl) {
                curl_setopt($ci, CURLOPT_SSL_VERIFYPEER, FALSE); // https请求 不验证证书和hosts
                curl_setopt($ci, CURLOPT_SSL_VERIFYHOST, FALSE); // 不从证书中检查SSL加密算法是否存在
            }
            curl_setopt($ci, CURLOPT_MAXREDIRS, 2); /* 指定最多的HTTP重定向的数量,这个选项是和CURLOPT_FOLLOWLOCATION一起使用的 */
            curl_setopt($ci, CURLOPT_HTTPHEADER, $headers);
            curl_setopt($ci, CURLINFO_HEADER_OUT, true);
            $response = curl_exec($ci);
            $requestinfo = curl_getinfo($ci);
            $http_code = curl_getinfo($ci, CURLINFO_HTTP_CODE);
            if ($debug) {
                echo "=====post data======
    ";
                var_dump($postfields);
                echo "=====info===== 
    ";
                print_r($requestinfo);
                echo "=====response=====
    ";
                print_r($response);
            }
            curl_close($ci);
            return $response;
        }
    
        /**
         * 关闭资源链接
         */
        public function __destruct()
        {
        }
    }
    

    Runtime.php

    <?php
    require __DIR__ . '/../vendor/autoload.php';
    
    use MedooMedoo;
    
    class Runtime
    {
        //isMysql是否写入数据  isFile是否写入文件  isConsole是否console输入
        private $isMysql;
        private $isFile;
        private $isConsole;
        private $startTime = 0;
        private $stopTime = 0;
        private $severMysql = null;
    
        public function __construct($isMysql, $isFile, $isConsole, $severMysql)
        {
            $this->isMysql = $isMysql;
            $this->isFile = $isFile;
            $this->isConsole = $isConsole;
            //mysql
            if ($this->severMysql == null) {
                $this->severMysql = $severMysql;
            }
        }
    
        /**
         * 日志记录
         * @param $content
         * @param $runtime
         */
        public function log($content, $runtime = 0, $mode = [])
        {
            //数据库
            if ($this->isMysql) {
                $data = [
                    'content' => $content,
                    'runtime' => $runtime,
                    'w_time' => date("Y-m-d H:i:s"),
                ];
                $this->severMysql->insert('tg_log_order_dispatch', $data);
            }
            //写入文件
            if ($this->isFile) {
                file_put_contents(date("Y-m-d").'-runtime.log', $content."
    ", FILE_APPEND);
            }
            //命令窗口
            if ($this->isConsole) {
                echo date("Y-m-d H:i:s") . " " . $content . PHP_EOL;
            }
    
        }
    
        //开始运行时间
        public function start()
        {
            $this->startTime = $this->getMicrotime();
        }
    
        //结束时间
        public function stop()
        {
            $this->stopTime = $this->getMicrotime();
        }
    
        //开始和结束之间总时长
        public function spent()
        {
            return ($this->stopTime - $this->startTime);
        }
    
        private function getMicrotime()
        {
            list($usec, $sec) = explode(' ', microtime());
            return ((float)$usec + (float)$sec);
        }
    }
    

    欢迎留言交流

  • 相关阅读:
    移动端轮播图实现
    iterator
    Promise对象和运算符
    xshell报错:The remote SSH server rejected X11 forwarding request. Last login: Fri Dec 2
    linux平台运行jmeter
    linux基础命令
    APP登录之后会将PC的登录信息踢掉
    测试中遇到支付的一个小问题
    谷歌浏览器css不支持12px以下的
    spring setter注入
  • 原文地址:https://www.cnblogs.com/sentangle/p/12038221.html
Copyright © 2011-2022 走看看