毕竟redis是作为缓存,供程序的快速读写,虽然reidis也可以做持久化保存,但还是需要一个做数据存储的数据库。比如首次查询数据在redis查询不到则查询mysql,再将查询结果写过redis供下次查询。保存数据也可以先写入redis再通过队列使用另外的程序异步写入mysql。

后面会逐步把openresty写入rabbitmq和python通过订阅rabbitmq的队列将数据写入mysql得方法都整理一下。

先看看openresty如何对mysql进行连接和操作,并进行二次封装

mysql操作流程

通过resty.mysql库进行连接和操作

local mysql = require("resty.mysql")
local client, errmsg = mysql:new()
if not client then
    return false, "mysql.socket_failed: " .. (errmsg or "nil")
end
client:set_timeout(10000)

local options = {
    host = "127.0.0.1",
    port = 3306,
    user = "root",
    password = "root",
    database = "test"
}

local result, errmsg, errno, sqlstate = client:connect(options)

if not result then
    return false, errmsg
end

local result, errmsg, errno, sqlstate = client:query(sql)
if not result then
    errmsg = concat_db_errmsg("mysql.query_failed:", errno, errmsg, sqlstate)
end

self.close()

  

连接mysql的二次封装

这样做和连接redis一样,如果大量使用会需要不断重复代码和进行创建关闭与数据库的连接。所以这里也进行二次封装。
使用 set_keepalive(max_idle_timeout, pool_size) 替代 close() 将启用连接池特性。set_keepalive 的意思可以理解为,保持连接,并将连接归还到连接池内。这样在下次连接时,会首先会尝试从连接池获取连接,获取不成功才会创建新的连接。在高并发下,连接池能大大的减少连接 MySQL 和 Redis 的次数,明显的提升性能。

local mysql_c = require("resty.mysql")

local _M = {}
_M._VERSION = '0.01'

local mt = { __index = _M }

--[[    先从连接池取连接,如果没有再建立连接.
        返回:
        false,出错信息.
        true,数据库连接
--]]

function _M.get_connect(self)

    if ngx.ctx[self] then
        return true, ngx.ctx[self]
    end
    
    local client, errmsg = mysql_c:new()
    if not client then
        return false, "mysql.socket_failed: " .. (errmsg or "nil")
    end

    client:set_timeout(self.db_timeout)

    local options = {
        host = self.db_host,
        port = self.db_port,
        user = self.db_user,
        password = self.db_password,
        database = self.db_name
    }

    local result, errmsg, errno, sqlstate = client:connect(options)

    if not result then
        return false, errmsg
    end

    local query = "SET NAMES "..self.db_charset
    local result, errmsg, errno, sqlstate = client:query(query)
    if not result then
        return false, errmsg
    end

    ngx.ctx[self] = client
    return true, ngx.ctx[self]

end


--[[    把连接返回到连接池
        用set_keepalive代替close() 将开启连接池特性,可以为每个nginx工作进程,指定连接最大空闲时间,和连接池最大连接数
--]]

function _M.close(self)
    if ngx.ctx[self] then
        ngx.ctx[self]:set_keepalive(60000, 1000)
        ngx.ctx[self] = nil
    end
end



-- --[[    查询有结果数据集时返回结果数据集
--         无数据数据集时返回查询影响返回:
--         false,出错信息,sqlstate结构.
--         true,结果集,sqlstate结构.
-- --]]

function _M.mysql_query(self ,sql)
    local ret, client = self:get_connect()
    if not ret then
        return false, client, nil
    end

    local result, errmsg, errno, sqlstate = client:query(sql)

    if not result then
        errmsg = concat_db_errmsg("mysql.query_failed:", errno, errmsg, sqlstate)
        return false, errmsg, sqlstate
    end

    self.close()

    return true, result, sqlstate
end


function _M.query(self ,sql)

    local ret, res, _ = self:mysql_query(sql)
    if not ret then
        ngx.log(ngx.ERR, "query db error. res: " .. (res or "nil"))
        return nil
    end

    return res[1]
end

function _M.execute(sql)

    local ret, res, sqlstate = self:mysql_query(sql)
    if not ret then
        ngx.log(ngx.ERR, "mysql.execute_failed. res: " .. (res or 'nil') .. ",sql_state: " .. (sqlstate or 'nil'))
        return -1
    end

    return res.affected_rows

end

function _M.new(self, opts)
    opts = opts or {}
    local db_host = opts.host or '127.0.0.1'
    local db_port = opts.port or 3306
    local db_user = opts.user or 'root'
    local db_password = opts.password or ' '
    local db_name = opts.db_name or 'test'
    local db_timeout =  opts.db_timeout or 10000
    local db_charset = opts.charset or 'utf8'

    return setmetatable({
            db_host = db_host,
            db_port = db_port,
            db_user = db_user,
            db_password = db_password,
            db_name = db_name,
            db_timeout = db_timeout,
            db_charset = db_charset }, mt)
end

return _M

  

使用示例

local mysqlPool = require ("mysqlPool")
local mysql = mysqlPool:new()
local mysqlCodeRes = mysql:query('SELECT type,enable_flag,count FROM access_code where code = 123')

  

小tips

如果数据库地址是用使用域名,需要设置nginx的域名解析服务器。在nginx.conf配置,实际情况根据所在网络的DNS进行填写

resolver 10.138.224.65;
resolver_timeout 30s;