思路
上一篇大概梳理了一下 GatewayWorker 的基础知识。这篇就来准备整合 GatewayWorker 到 Laravel。
GatewayWorker 是基于 Socket 监听的服务器框架,而 Laravel 是基于 HTTP 请求/响应模型的 Web 框架。所以一定要明白,两者的部署是独立分开、互不干扰的。
因此在物理上它们的整合方式就见仁见智了。而官方, walkor 大神(GatewayWorker 框架作者)在手册里也给出了与 MVC 框架的结合方式,如下图所示:
客戶端浏览器建立与 GatewayWorker 的 WebSocket 连接,所有的业务逻辑由客户端通过 HTTP 协议 GET/POST 到 Web 框架,由 Web 框架统一处理。仅当客户端浏览器需要主动推送数据时, Web 框架将调用 GatewayWorker 提供的 API(GatewayClient),由 GatewayWorker 通过 WebSocket 主动推送给客户端。
步骤
大致的思路有了,具体的实现步骤配合聊天室部分代码,如下所示。
首先要定制 GatewayWorker 的所有进程,并把它整合到 laravel 项目根目录下的 gatewayworker 目录,该目录的结构遵循了官方推荐:
gatewayworker/ ├── app │ └── chat │ ├── Events.php # BusinessWorker 进程的实际业务处理类 │ ├── start_businessworker.php # BusinessWorker 进程的启动文件 │ ├── start_gateway.php # Gateway 进程的启动文件 │ └── start_register.php # Register 服务进程的启动文件 ├── composer.json ├── composer.lock ├── start_for_win.bat # Windows 环境下 GatewayWorker 所有 Worker 进程的启动文件 ├── start.php # Linux 环境下 GatewayWorker 所有 Worker 进程的启动文件 └── vendor
整个 gatewayworker/ 目录的结构和源码几乎与官方的 workerman-chat 一模一样。仅有 Events.php 略有不同。
start.php 负责启动所有的 worker 进程:
// 加载所有applications/*/start.php,以便启动所有服务 foreach(glob(__DIR__.'/app/*/start*.php') as $start_file) { require_once $start_file; } // 运行所有服务 Worker::runAll();
start_register.php 负责启动 Register 进程,监听本机 1238 端口,以便 Gateway 进程与 BuisnessWorker 进程建立通信。
$register = new Register('text://0.0.0.0:1238'); // 必须text协议
start_gateway.php 负责启动 Gateway 进程,监听客户端在 7272 端口、基于 WebSocket 协议的连接和连接上的数据。同时设置进程名、进程数、lanIp、起始端口和心跳检测,以及注册通信地址。
$gateway = new Gateway("Websocket://0.0.0.0:7272"); $gateway->name = 'ChatGateway'; $gateway->count = 4; $gateway->lanIp = '127.0.0.1'; // 分布式部署时要填写真实IP(非127.0.0.1) $gateway->startPort = 2300; $gateway->pingInterval = 10; // 设置心跳,防止长时间不通讯被路由节点强行断开 $gateway->pingData = '{"type":"ping"}'; $gateway->registerAddress = '127.0.0.1:1238'; // 用于和BusinessWorker进程通信,与Gateway进程的注册地址保持一致
start_businessworker.php 负责启动 BusinessWorker 进程,设置进程名、进程数,以及注册通信地址。
$worker = new BusinessWorker(); $worker->name = 'ChatBusinessWorker'; $worker->count = 4; $worker->registerAddress = '127.0.0.1:1238'; // 用于和Gateway进程通信,与Gateway进程的注册地址保持一致
Events.php 中只负责两件事:
第一件、当客户端建立连接时,在 onConnect 回调中把 client_id 发送给客户端(让客户端通过 AJAX 请求 Web 框架去绑定 uid 与 client_id)。
第二件、当客户端连接关闭时,在 onClose 回调中通过该 client_id 获取该用户所在的 room_id(聊天室唯一 ID,由 Laravel 生成),并使用 Gateway::sendToGroup($group) 向该房间推送一个客户离开信号(客户端收到这个信号会发起 AJAX 请求向 Web 框架请求最新的用户列表)。
而 onMessage 回调留空即可。前面已经说过,我们所有的业务逻辑,都尽量在 Web 框架里实现。GatewayWorker 只提供 Socket 服务。
下面是 Events.php 中的部分代码。
public static function onConnect($client_id) { Gateway::sendToClient($client_id, json_encode(array( 'type' => 'init', 'client_id' => $client_id ))); } public static function onMessage($client_id, $message) { } public static function onClose($client_id) { // 房间广播有连接关闭的信号 $room_id = $_SESSION['room_id']; $uname = $_SESSION['uname']; if (Gateway::getClientCountByGroup($room_id)) { Gateway::sendToGroup($room_id, json_encode(array( 'type' => 'close', 'uname' => $uname ))); } }
然后把所有的 Worker 进程运行起来,开始建立内部通信并监听客户端连接。使用 php start.php start -d 将以守护进程的形式运行所有 Worker进程。
下面在聊天室页面建立与 GatewayWorker 的 WebSocket 连接
var ws = new WebSocket("ws://" + document.domain + ":7272");
注意,websocket 协议来自于 HTML5,某些浏览器可能不支持,可以用
然后在 ws.onmessage 回调中接收来自 GatewayWorker 发来的 client_id,并用 AJAX 发送 POST 到 Laravel 框架,进行 uid 与 client_id 的绑定。
前端 Websocket 接收 client_id,并发送 POST 去绑定 uid:
ws.onmessage = function(e) { var data = JSON.parse(e.data), type = data.type || ''; switch (type) { ... case 'init': $.post(global_url_bind, { client_id: data.client_id, _token: global_csrf_token }, function(data) {}, 'json'); break; ... } }
后端 Laravel 需要调用 GatewayClient API 通知 GatewayWorker 绑定 client_id,首先在 Laravel 项目(聊天室)根目录下运行 Composer 命令来安装:
composer require workerman/gatewayclient
然后在需要调用 GatewayClient 接口的文件里,引用命名空间:
// GatewayClient 3.0.0版本以后加了命名空间 use GatewayClientGateway;
并设置 Gateway::$registerAddress 属性,告知 GatewayClient 与哪个 GatewayWorker (集群)通讯。方便起见,我把它放在了 Laravel 控制器的 __construct() 方法里:
public function __construct() { Gateway::$registerAddress = '127.0.0.1:1238'; }
这里再啰嗦一句,这个属性的设置值必须与前面启动的 Gateway 进程和 BusinessWorker 进程的 registerAddress 属性值一致,其中的 1238 端口是由 Register 服务进程监听的,用于 Gateway 进程和 BusinessWorker 进程内部通讯。
然后,后端 Laravel 绑定 client_id 的代码片段如下:
// 绑定uid和client_id、加入房间 Gateway::bindUid($client_id, $uid); // uid 与 room_id 已经从 Laravel session里获取 Gateway::joinGroup($client_id, $room_id); // 记录会话 session(['client_id' => $client_id]); // Laravel 负责 Gateway::setSession($client_id, [ // GatewayWorker 负责 'uid' => $uid, 'uname' => $uname, 'avatar' => $avatar, 'bubble' => $bubble, 'room_id' => $room_id ]);
后续的所有房间消息都直接 get/post 到 Laravel 里统一处理。下面是聊天部分的部分源码:
前端 AJAX:
$.post(global_url_say, { type: 'all', // 公聊 content: content, _token: global_csrf_token }, function(data) {}, 'json'); $.post(global_url_say, { type: 'to', // 私聊 to_uid: $('.to-whom').children('span').attr('id'), to_uname: $('.to-whom').children('span').text(), content: content, _token: global_csrf_token }, function(data) {}, 'json');
后端 Laravel:
$type = $request->input('type') ?: ''; $content = htmlspecialchars($request->input('content')); $uid = session('uid'); $uname = session('uname'); $avatar = session('avatar'); $bubble = session('bubble'); $room_id = session('room_id'); switch ($type) { case 'all': // 公聊 Gateway::sendToGroup($room_id, json_encode([ 'type' => 'all', 'uid' => $uid, 'uname' => $uname, 'avatar' => $avatar, 'bubble' => $bubble, 'content' => preg_replace('/^s*@me/i', '', $content) ])); break; case 'to': // 私聊 $to_uid = $request->input('to_uid'); Gateway::sendToUid($to_uid, json_encode([ 'type' => 'to', 'uid' => $uid, 'uname' => $uname, 'avatar' => $avatar, 'bubble' => $bubble, 'content' => '<a href="javascript:;" style="color: inherit;">@me</a> '.$content ])); $to_uname = $request->input('to_uname'); Gateway::sendToUid($uid, json_encode([ 'type' => 'to', 'uid' => $uid, 'uname' => $uname, 'avatar' => $avatar, 'bubble' => $bubble, 'content' => '<a href="javascript:;" style="color: inherit;">@'.$to_uname.'</a> '.$content ])); break; ... }
当有用户退出房间时,前端 Websocket 会收到来自 Events.php 中 onClose 事件的连接关闭信号 "close"。这时需要通知房间内其他用户,并请求 Laravel 拿最新的用户列表。
ws.onmessage = function(e) { var data = JSON.parse(e.data), type = data.type || ''; switch (type) { ... case 'close': // 通知有人退出 system_notify('@' + data.uname + ' leaved out.'); // 请求新的用户列表 $.post(global_url_flush, { room_id: global_room_id, _token: global_csrf_token }, function(data) { }); break; ... } }
后端:
$room_id = $request->input('room_id'); $sessions = Gateway::getClientSessionsByGroup($room_id); $users_list = []; foreach ($sessions as $client_id => $item) { $users_list[$item['uid']] = $item['uname']; } $new_message = ['type' => 'flush']; $new_message['users_list'] = $users_list; Gateway::sendToGroup($room_id, json_encode($new_message));
然后前端收到 flush 消息,刷新用户列表:
ws.onmessage = function(e) { var data = JSON.parse(e.data), type = data.type || ''; switch (type) { ... case 'flush': flush_users_list(data.users_list); break; ... } }
基本的结合逻辑就是这样了。如果对这个聊天室的源码有兴趣,可以在下面找到它的地址。谢谢阅读。
2018-01-26 13:12:19 更新:
聊天室项目已经支持 Windows。
相关链接
chat-here 聊天室源码:https://github.com/mingcw/chat-here(这两天时间有点紧,等 25 号晚上我会添加它的 windows 版本,目前只支持 Linux。)
GatewayClient API:https://github.com/walkor/GatewayClient