思路
上一篇大概梳理了一下 GatewayWorker 的基础知识。这篇就来准备整合 GatewayWorker 到 Laravel。
GatewayWorker 是基于 Socket 监听的服务器框架,而 Laravel 是基于 HTTP 请求/响应模型的 Web 框架。所以一定要明白,两者的部署是独立分开、互不干扰的。
因此在物理上它们的整合方式就见仁见智了。而官方, walkor 大神(GatewayWorker 框架作者)在手册里也给出了与 MVC 框架的结合方式,如下图所示:

客戶端浏览器建立与 GatewayWorker 的 WebSocket 连接,所有的业务逻辑由客户端通过 HTTP 协议 GET/POST 到 Web 框架,由 Web 框架统一处理。仅当客户端浏览器需要主动推送数据时, Web 框架将调用 GatewayWorker 提供的 API(GatewayClient),由 GatewayWorker 通过 WebSocket 主动推送给客户端。
步骤
大致的思路有了,具体的实现步骤配合聊天室部分代码,如下所示。
首先要定制 GatewayWorker 的所有进程,并把它整合到 laravel 项目根目录下的 gatewayworker 目录,该目录的结构遵循了官方推荐:
gatewayworker/ ├── app │ └── chat │ ├── Events.php # BusinessWorker 进程的实际业务处理类 │ ├── start_businessworker.php # BusinessWorker 进程的启动文件 │ ├── start_gateway.php # Gateway 进程的启动文件 │ └── start_register.php # Register 服务进程的启动文件 ├── composer.json ├── composer.lock ├── start_for_win.bat # Windows 环境下 GatewayWorker 所有 Worker 进程的启动文件 ├── start.php # Linux 环境下 GatewayWorker 所有 Worker 进程的启动文件 └── vendor
整个 gatewayworker/ 目录的结构和源码几乎与官方的 workerman-chat 一模一样。仅有 Events.php 略有不同。
start.php 负责启动所有的 worker 进程:
// 加载所有applications/*/start.php,以便启动所有服务
foreach(glob(__DIR__.'/app/*/start*.php') as $start_file) {
require_once $start_file;
}
// 运行所有服务
Worker::runAll();
start_register.php 负责启动 Register 进程,监听本机 1238 端口,以便 Gateway 进程与 BuisnessWorker 进程建立通信。
$register = new Register('text://0.0.0.0:1238'); // 必须text协议
start_gateway.php 负责启动 Gateway 进程,监听客户端在 7272 端口、基于 WebSocket 协议的连接和连接上的数据。同时设置进程名、进程数、lanIp、起始端口和心跳检测,以及注册通信地址。
$gateway = new Gateway("Websocket://0.0.0.0:7272");
$gateway->name = 'ChatGateway';
$gateway->count = 4;
$gateway->lanIp = '127.0.0.1'; // 分布式部署时要填写真实IP(非127.0.0.1)
$gateway->startPort = 2300;
$gateway->pingInterval = 10; // 设置心跳,防止长时间不通讯被路由节点强行断开
$gateway->pingData = '{"type":"ping"}';
$gateway->registerAddress = '127.0.0.1:1238'; // 用于和BusinessWorker进程通信,与Gateway进程的注册地址保持一致
start_businessworker.php 负责启动 BusinessWorker 进程,设置进程名、进程数,以及注册通信地址。
$worker = new BusinessWorker(); $worker->name = 'ChatBusinessWorker'; $worker->count = 4; $worker->registerAddress = '127.0.0.1:1238'; // 用于和Gateway进程通信,与Gateway进程的注册地址保持一致
Events.php 中只负责两件事:
第一件、当客户端建立连接时,在 onConnect 回调中把 client_id 发送给客户端(让客户端通过 AJAX 请求 Web 框架去绑定 uid 与 client_id)。
第二件、当客户端连接关闭时,在 onClose 回调中通过该 client_id 获取该用户所在的 room_id(聊天室唯一 ID,由 Laravel 生成),并使用 Gateway::sendToGroup($group) 向该房间推送一个客户离开信号(客户端收到这个信号会发起 AJAX 请求向 Web 框架请求最新的用户列表)。
而 onMessage 回调留空即可。前面已经说过,我们所有的业务逻辑,都尽量在 Web 框架里实现。GatewayWorker 只提供 Socket 服务。
下面是 Events.php 中的部分代码。
public static function onConnect($client_id)
{
Gateway::sendToClient($client_id, json_encode(array(
'type' => 'init',
'client_id' => $client_id
)));
}
public static function onMessage($client_id, $message)
{
}
public static function onClose($client_id)
{
// 房间广播有连接关闭的信号
$room_id = $_SESSION['room_id'];
$uname = $_SESSION['uname'];
if (Gateway::getClientCountByGroup($room_id)) {
Gateway::sendToGroup($room_id, json_encode(array(
'type' => 'close',
'uname' => $uname
)));
}
}
然后把所有的 Worker 进程运行起来,开始建立内部通信并监听客户端连接。使用 php start.php start -d 将以守护进程的形式运行所有 Worker进程。

下面在聊天室页面建立与 GatewayWorker 的 WebSocket 连接
var ws = new WebSocket("ws://" + document.domain + ":7272");
注意,websocket 协议来自于 HTML5,某些浏览器可能不支持,可以用
然后在 ws.onmessage 回调中接收来自 GatewayWorker 发来的 client_id,并用 AJAX 发送 POST 到 Laravel 框架,进行 uid 与 client_id 的绑定。
前端 Websocket 接收 client_id,并发送 POST 去绑定 uid:
ws.onmessage = function(e) {
var data = JSON.parse(e.data),
type = data.type || '';
switch (type) {
...
case 'init':
$.post(global_url_bind, {
client_id: data.client_id,
_token: global_csrf_token
}, function(data) {}, 'json');
break;
...
}
}
后端 Laravel 需要调用 GatewayClient API 通知 GatewayWorker 绑定 client_id,首先在 Laravel 项目(聊天室)根目录下运行 Composer 命令来安装:
composer require workerman/gatewayclient
然后在需要调用 GatewayClient 接口的文件里,引用命名空间:
// GatewayClient 3.0.0版本以后加了命名空间 use GatewayClientGateway;
并设置 Gateway::$registerAddress 属性,告知 GatewayClient 与哪个 GatewayWorker (集群)通讯。方便起见,我把它放在了 Laravel 控制器的 __construct() 方法里:
public function __construct()
{
Gateway::$registerAddress = '127.0.0.1:1238';
}
这里再啰嗦一句,这个属性的设置值必须与前面启动的 Gateway 进程和 BusinessWorker 进程的 registerAddress 属性值一致,其中的 1238 端口是由 Register 服务进程监听的,用于 Gateway 进程和 BusinessWorker 进程内部通讯。
然后,后端 Laravel 绑定 client_id 的代码片段如下:
// 绑定uid和client_id、加入房间
Gateway::bindUid($client_id, $uid); // uid 与 room_id 已经从 Laravel session里获取
Gateway::joinGroup($client_id, $room_id);
// 记录会话
session(['client_id' => $client_id]); // Laravel 负责
Gateway::setSession($client_id, [ // GatewayWorker 负责
'uid' => $uid,
'uname' => $uname,
'avatar' => $avatar,
'bubble' => $bubble,
'room_id' => $room_id
]);
后续的所有房间消息都直接 get/post 到 Laravel 里统一处理。下面是聊天部分的部分源码:
前端 AJAX:
$.post(global_url_say, {
type: 'all', // 公聊
content: content,
_token: global_csrf_token
}, function(data) {}, 'json');
$.post(global_url_say, {
type: 'to', // 私聊
to_uid: $('.to-whom').children('span').attr('id'),
to_uname: $('.to-whom').children('span').text(),
content: content,
_token: global_csrf_token
}, function(data) {}, 'json');
后端 Laravel:
$type = $request->input('type') ?: '';
$content = htmlspecialchars($request->input('content'));
$uid = session('uid');
$uname = session('uname');
$avatar = session('avatar');
$bubble = session('bubble');
$room_id = session('room_id');
switch ($type) {
case 'all': // 公聊
Gateway::sendToGroup($room_id, json_encode([
'type' => 'all',
'uid' => $uid,
'uname' => $uname,
'avatar' => $avatar,
'bubble' => $bubble,
'content' => preg_replace('/^s*@me/i', '', $content)
]));
break;
case 'to': // 私聊
$to_uid = $request->input('to_uid');
Gateway::sendToUid($to_uid, json_encode([
'type' => 'to',
'uid' => $uid,
'uname' => $uname,
'avatar' => $avatar,
'bubble' => $bubble,
'content' => '<a href="javascript:;" style="color: inherit;">@me</a> '.$content
]));
$to_uname = $request->input('to_uname');
Gateway::sendToUid($uid, json_encode([
'type' => 'to',
'uid' => $uid,
'uname' => $uname,
'avatar' => $avatar,
'bubble' => $bubble,
'content' => '<a href="javascript:;" style="color: inherit;">@'.$to_uname.'</a> '.$content
]));
break;
...
}
当有用户退出房间时,前端 Websocket 会收到来自 Events.php 中 onClose 事件的连接关闭信号 "close"。这时需要通知房间内其他用户,并请求 Laravel 拿最新的用户列表。
ws.onmessage = function(e) {
var data = JSON.parse(e.data),
type = data.type || '';
switch (type) {
...
case 'close':
// 通知有人退出
system_notify('@' + data.uname + ' leaved out.');
// 请求新的用户列表
$.post(global_url_flush, {
room_id: global_room_id,
_token: global_csrf_token
}, function(data) { });
break;
...
}
}
后端:
$room_id = $request->input('room_id');
$sessions = Gateway::getClientSessionsByGroup($room_id);
$users_list = [];
foreach ($sessions as $client_id => $item) {
$users_list[$item['uid']] = $item['uname'];
}
$new_message = ['type' => 'flush'];
$new_message['users_list'] = $users_list;
Gateway::sendToGroup($room_id, json_encode($new_message));
然后前端收到 flush 消息,刷新用户列表:
ws.onmessage = function(e) {
var data = JSON.parse(e.data),
type = data.type || '';
switch (type) {
...
case 'flush':
flush_users_list(data.users_list);
break;
...
}
}
基本的结合逻辑就是这样了。如果对这个聊天室的源码有兴趣,可以在下面找到它的地址。谢谢阅读。
2018-01-26 13:12:19 更新:
聊天室项目已经支持 Windows。
相关链接
chat-here 聊天室源码:https://github.com/mingcw/chat-here(这两天时间有点紧,等 25 号晚上我会添加它的 windows 版本,目前只支持 Linux。)
GatewayClient API:https://github.com/walkor/GatewayClient