开始今天的文章,这篇文章实现了 Swoole MySQL 连接池,代码是在《Swoole RPC 的实现》文章的基础上进行开发的。
先回顾上篇文章的内容:
实现了 HTTP / TCP 请求
实现了 同步 / 异步 请求
分享了 OnRequest.php、OnReceive.php 源码
业务逻辑 Order.php 中返回的是假数据
本篇文章主要的功能点:
业务逻辑 Order.php 中返回 MySQL 数据库中的数据。
Task 启用了协程
支持 主/从 数据库配置
实现数据库连接池
实现数据库 CURD
代码
Order.php <?php if (!defined('SERVER_PATH')) exit("No Access"); class Order { public function get_list($uid = 0, $type = 0) { //TODO 业务代码 $rs[0]['order_code'] = '1'; $rs[0]['order_name'] = '订单1'; $rs[1]['order_code'] = '2'; $rs[1]['order_name'] = '订单2'; $rs[2]['order_code'] = '3'; $rs[2]['order_name'] = '订单3'; return $rs; } } 修改成: class Order { private $mysql; private $table; public function __construct() { $pool = MysqlPool::getInstance(); $this->mysql = $pool->get(); $this->table = 'order'; } public function add($code = '', $name = '') { //TODO 验证 return $this->mysql->insert($this->table, ['code' => $code, 'name' => $name]); } public function edit($id = 0, $name='') { //TODO 验证 return $this->mysql->update($this->table, ['name' => $name], ['id' => $id]); } public function del($id = 0) { //TODO 验证 return $this->mysql->delete($this->table, ['id' => $id]); } public function info($code = '') { //TODO 验证 return $this->mysql->select($this->table, ['code' => $code]); } }
Task 启用协程
一、需要新增两项配置:
enable_coroutine = true task_enable_coroutine = true
二、回调参数发生改变
$serv->on('Task', function ($serv, $task_id, $src_worker_id, $data) { ... }); 修改成: $serv->on('Task', function ($serv, $task) { $task->worker_id; //来自哪个`Worker`进程 $task->id; //任务的编号 $task->data; //任务的数据 });
数据库 主/从 配置
Mysql.php <?php if (!defined('SERVER_PATH')) exit("No Access"); $db['default']['pool_size'] = 3; //连接池个数 $db['default']['pool_get_timeout'] = 0.5; //获取连接池超时时间 $db['default']['timeout'] = 0.5; //数据库建立连接超时时间 $db['default']['charset'] = 'utf8'; //字符集 $db['default']['strict_type'] = false; //开启严格模式 $db['default']['fetch_mode'] = true; //开启fetch模式 $config['master'] = $db['default']; $config['master']['host'] = '127.0.0.1'; $config['master']['port'] = 3306; $config['master']['user'] = 'root'; $config['master']['password'] = '123456'; $config['master']['database'] = 'demo'; $config['slave'] = $db['default']; $config['slave']['host'] = '127.0.0.1'; $config['slave']['port'] = 3306; $config['slave']['user'] = 'root'; $config['slave']['password'] = '123456'; $config['slave']['database'] = 'demo';
数据库连接池
MysqlPool.php <?php if (!defined('SERVER_PATH')) exit("No Access"); class MysqlPool { private static $instance; private $pool; private $config; public static function getInstance($config = null) { if (empty(self::$instance)) { if (empty($config)) { throw new RuntimeException("MySQL config empty"); } self::$instance = new static($config); } return self::$instance; } public function __construct($config) { if (empty($this->pool)) { $this->config = $config; $this->pool = new chan($config['master']['pool_size']); for ($i = 0; $i < $config['master']['pool_size']; $i++) { go(function() use ($config) { $mysql = new MysqlDB(); $res = $mysql->connect($config); if ($res === false) { throw new RuntimeException("Failed to connect mysql server"); } else { $this->pool->push($mysql); } }); } } } public function get() { if ($this->pool->length() > 0) { $mysql = $this->pool->pop($this->config['master']['pool_get_timeout']); if (false === $mysql) { throw new RuntimeException("Pop mysql timeout"); } defer(function () use ($mysql) { //释放 $this->pool->push($mysql); }); return $mysql; } else { throw new RuntimeException("Pool length <= 0"); } } }
这里我还准备了一分学习图和资料,如下:
链接:https://pan.baidu.com/s/1v5gm7n0L7TGyejCmQrMh2g 提取码:x2p5
免费分享,但是X度限制严重,如若链接失效点击链接或搜索加群 群号518475424。
数据库 CURD
MysqlDB.php <?php if (!defined('SERVER_PATH')) exit("No Access"); class MysqlDB { private $master; private $slave; private $config; public function __call($name, $arguments) { if ($name != 'query') { throw new RuntimeException($name.":This command is not supported"); } else { return $this->_execute($arguments[0]); } } public function connect($config) { //主库 $master = new SwooleCoroutineMySQL(); $res = $master->connect($config['master']); if ($res === false) { throw new RuntimeException($master->connect_error, $master->errno); } else { $this->master = $master; } //从库 $slave = new SwooleCoroutineMySQL(); $res = $slave->connect($config['slave']); if ($res === false) { throw new RuntimeException($slave->connect_error, $slave->errno); } else { $this->slave = $slave; } $this->config = $config; return $res; } public function insert($table = '', $data = []) { $fields = ''; $values = ''; $keys = array_keys($data); foreach ($keys as $k) { $fields .= "`".addslashes($k)."`, "; $values .= "'".addslashes($data[$k])."', "; } $fields = substr($fields, 0, -2); $values = substr($values, 0, -2); $sql = "INSERT INTO `{$table}` ({$fields}) VALUES ({$values})"; return $this->_execute($sql); } public function update($table = '', $set = [], $where = []) { $arr_set = []; foreach ($set as $k => $v) { $arr_set[] = '`'.$k . '` = ' . $this->_escape($v); } $set = implode(', ', $arr_set); $where = $this->_where($where); $sql = "UPDATE `{$table}` SET {$set} {$where}"; return $this->_execute($sql); } public function delete($table = '', $where = []) { $where = $this->_where($where); $sql = "DELETE FROM `{$table}` {$where}"; return $this->_execute($sql); } public function select($table = '',$where = []) { $where = $this->_where($where); $sql = "SELECT * FROM `{$table}` {$where}"; return $this->_execute($sql); } private function _where($where = []) { $str_where = ''; foreach ($where as $k => $v) { $str_where .= " AND `{$k}` = ".$this->_escape($v); } return "WHERE 1 ".$str_where; } private function _escape($str) { if (is_string($str)) { $str = "'".$str."'"; } elseif (is_bool($str)) { $str = ($str === FALSE) ? 0 : 1; } elseif (is_null($str)) { $str = 'NULL'; } return $str; } private function _execute($sql) { if (strtolower(substr($sql, 0, 6)) == 'select') { $db = $this->_get_usable_db('slave'); } else { $db = $this->_get_usable_db('master'); } $result = $db->query($sql); if ($result === true) { return [ 'affected_rows' => $db->affected_rows, 'insert_id' => $db->insert_id, ]; } return $result; } private function _get_usable_db($type) { if ($type == 'master') { if (!$this->master->connected) { $master = new SwooleCoroutineMySQL(); $res = $master->connect($this->config['master']); if ($res === false) { throw new RuntimeException($master->connect_error, $master->errno); } else { $this->master = $master; } } return $this->master; } elseif ($type == 'slave') { if (!$this->slave->connected) { $slave = new SwooleCoroutineMySQL(); $res = $slave->connect($this->config['slave']); if ($res === false) { throw new RuntimeException($slave->connect_error, $slave->errno); } else { $this->slave = $slave; } } return $this->slave; } } }
OnWorkerStart 中调用
try { MysqlPool::getInstance(get_config('mysql')); } catch (Exception $e) { $serv->shutdown(); } catch (Throwable $throwable) { $serv->shutdown(); }
客户端发送请求
<?php //新增 $demo = [ 'type' => 'SW', 'token' => 'Bb1R3YLipbkTp5p0', 'param' => [ 'class' => 'Order', 'method' => 'add', 'param' => [ 'code' => 'C'.mt_rand(1000,9999), 'name' => '订单-'.mt_rand(1000,9999), ], ], ]; //编辑 $demo = [ 'type' => 'SW', 'token' => 'Bb1R3YLipbkTp5p0', 'param' => [ 'class' => 'Order', 'method' => 'edit', 'param' => [ 'id' => '4', 'name' => '订单-'.mt_rand(1000,9999), ], ], ]; //删除 $demo = [ 'type' => 'SW', 'token' => 'Bb1R3YLipbkTp5p0', 'param' => [ 'class' => 'Order', 'method' => 'del', 'param' => [ 'id' => '1', ], ], ]; //查询 $demo = [ 'type' => 'SW', 'token' => 'Bb1R3YLipbkTp5p0', 'param' => [ 'class' => 'Order', 'method' => 'info', 'param' => [ 'code' => 'C4649' ], ], ]; $ch = curl_init(); $options = [ CURLOPT_URL => 'http://10.211.55.4:9509/', CURLOPT_POST => 1, CURLOPT_POSTFIELDS => json_encode($demo), ]; curl_setopt_array($ch, $options); curl_exec($ch); curl_close($ch);