zoukankan      html  css  js  c++  java
  • laravel整合workerman做消息推送系统

    官方建议分离 workerman和mvc框架的结合,我去,这不是有点脑缺氧吗?

    大量的业务逻辑,去独立增加方法和类库在写一次,实际业务中是不现实和不实际的

    gateway增加一些这方面的工作,但是我看了源码之后,就发现还是只能自己做

     先增加composer require workerman/workerman  或者walkor/workerman ,但是官方的github是 walkor/workerman,注意一下

    可以去 https://packagist.org查看是否有包

    首先结合Console做命令

    建立一个Command

    <?php
    
    namespace AppConsoleCommands;
    
    use IlluminateConsoleCommand;
    use WorkermanWorker;
    use AppWorkWorkermanWork;
    
    class Workerman extends Command {
    
        protected $taskserver;
        /*
         * 操作参数
         * 注意只能在
         * start 启动
         * stop 停止
         * relaod  只能重启逻辑代码,核心workerman_init无法重启,注意看官方文档
         * status 查看状态
         * connections 查看连接状态(需要Workerman版本>=3.5.0)
         * 
         */
        protected $action = array('start', 'stop', 'reload', 'status', 'connections');
    
        /**
         * The name and signature of the console command.
         *
         * @var string
         */
        protected $signature = 'Workerman {action}';
    
        /**
         * The console command description.
         *
         * @var string
         */
        protected $description = 'Workerman';
    
        /**
         * Create a new command instance.
         *
         * @return void
         */
        public function __construct() {
            parent::__construct();
        }
    
        /**
         * Execute the console command.
         *
         * @return mixed
         * 
         * 注意
         * 
         */
        public function handle() {
            $action = $this->argument('action');
    
            if (!in_array($action, $this->action)) {
                $this->error('Error Action');
                exit;
            }
            //初始化workerman
            WorkermanWork::workerman_init($action);
        }
    
    }

    注册到Kernel

    class Kernel extends ConsoleKernel {
    
        /**
         * The Artisan commands provided by your application.
         *
         * @var array
         */
        protected $commands = [
    
            AppConsoleCommandsWorkerman::class,
        ];

    WorkermanWork的内容

    <?php
    
    namespace AppWork;
    
    use AppWorkBaseWork as Base;
    use IlluminateSupportFacadesDB;
    use AppWorkCommonWork;
    use WorkermanWorker;
    use WorkermanLibTimer;
    use AppModelsOperationLog;
    use AppModelsUsers;
    
    class WorkermanWork extends Base {
    
        static $connection_count = 0;
    
        public static function workerman_init($action = null) {
            global $argv;
    
            $argv[0] = 'workerman:websocket';
            $argv[1] = $action;
            $argv[2] = '-d';
    //        心跳
            define('HEARTBEAT_TIME', 30);
            //初始化
            $worker = new Worker("websocket://172.17.1.247:9099");
            $worker->name = 'MessagePushWorker';
            //linux 用户线上是www
    //        $worker->user = 'www';
            //守护模式信息输出文件地址
    //        $worker->stdoutFile = "./workerman.log";
            //工作进程总数 测试环境4个
            $worker->count = 4;
            //正式环境
    //        $ws->count = 10;
            //建立链接 处理逻辑
            $worker->onConnect = function($connection) {
    //             有新的客户端连接时,连接数+1
                self::$connection_count++;
                self::onConnect($connection);
            };
            //接受消息 处理逻辑
            $worker->onMessage = function($connection, $data) {
    
                self::onMessage($connection, $data);
            };
            //关闭链接 处理逻辑
            $worker->onClose = function($connection) {
    //            客户端关闭时,连接数-1
                self::$connection_count--;
                self::onClose($connection);
            };
    
            // 进程启动后设置一个30秒运行一次的定时器
    //        $worker->onWorkerStart = function($worker) {
    //            Timer::add(30, function()use($worker) {
    //                
    //            });
    //        };
            // 开始
            Worker::runAll();
        }
    
        //建立链接 处理逻辑
        public static function onConnect($connection) {
            //测试5秒一次 线上30秒一次
            Timer::add(10, function() use($connection) {
    
                if (!empty($_SESSION['user_id'])) {
                    $Users = Users::where('id', $_SESSION['user_id'])->first();
                    if (!empty($Users)) {
                        $message_count = OperationLog::where('user_id', $_SESSION['user_id'])->where('is_read', 1)->count();
                        $message_list = OperationLog::where('user_id', $_SESSION['user_id'])->where('is_read', 1)->orderBy('id', 'desc')->get(['content', 'id', 'create_time'])->toArray();
    
                        if (!empty($message_list)) {
                            $connection->send(json_encode(['code' => 200, 'msg' => '请求成功', 'data' => $message_list, 'connections' => self::$connection_count, 'message_count' => $message_count]));
                        }
                    }
    //                else {
    //                    $connection->send(json_encode(['code' => 201, 'msg' => '无效user_id', 'connections' => self::$connection_count]));
    //                }
                }
    //            else {
    //                $retrun_data = json_encode(['code' => 201, 'msg' => 'session user_id不存在', 'connections' => self::$connection_count]);
    //                $connection->send($retrun_data);
    //            }
            });
        }
    
        //接受消息 处理逻辑
        public static function onMessage($connection, $data) {
            //解析数据,非合法的json数据不处理
            if (!empty($data)) {
                if (is_json($data)) {
                    $data = json_decode($data, true);
                    switch ($data['type']) {
                        // 客户端回应服务端的心跳
                        case 'ping':
                            $connection->send(json_encode(['code' => 200, 'msg' => '服务存活', 'data' => [], 'connections' => self::$connection_count]));
                        case 'login':
                            $Users = Users::where('id', $data['user_id'])->first();
                            if (empty($Users)) {
                                $connection->send(json_encode(['code' => 201, 'msg' => '用户ID无效或者错误', 'data' => [], 'connections' => self::$connection_count]));
                            } else {
                                $_SESSION['user_id'] = $data['user_id'];
                                $connection->send(json_encode(['code' => 200, 'msg' => '登录成功', 'data' => [], 'connections' => self::$connection_count]));
                            }
                    }
                }
            } else {
                $connection->send(json_encode(['code' => 201, 'msg' => '数据请求为空', 'data' => [], 'connections' => self::$connection_count]));
            }
    
    
    //        $data = $data . '----总共有连接数:' . self::$connection_count;
    //        $connection->send("return_data: $data ");
        }
    
        //关闭链接 处理逻辑
        public static function onClose($connection) {
            
        }
    
    }

    建立socket的时间设置一个10秒的定时器,这个基于session的控制,登录之后立即请求发送

    {"type":"login","user_id":"24"}

    因为如果登录,就检索OperationLog表里属于这个用户ID的消息,每10秒推送一次数据

    运行 命令 

    php artisan Workerman start 启动

    $argv[2] = '-d';

    注释掉就是测试模式,加上就是守护模式,就是线上使用的

    2019年7月12日09:43:56

     注意:上面是临时测试代码。业务代码使用try catch处理异常和错误

  • 相关阅读:
    cygwin配合NDK开发Android程序
    和菜鸟一起学c之函数指针
    和菜鸟一起学android4.0.3源码之SD卡U盘等自动挂载配置
    Android系统的开机画面显示过程分析
    android编译系统的makefile文件Android.mk写法
    Linux下makefile教程
    和菜鸟一起学linux之本地git中心仓库建立
    强人总结的Windows XP实用技巧45条(一)
    Webshell下自动挂马的ASP
    多进程Telnet的木马例子
  • 原文地址:https://www.cnblogs.com/zx-admin/p/9419679.html
Copyright © 2011-2022 走看看