zoukankan      html  css  js  c++  java
  • 数据库应用之--Redis+mysql实现大量数据的读写,以及高并发

    一、开发背景

    在项目开发过程中中遇到了以下三个需求:

      1. 多个用户同时上传数据;

      2. 数据库需要支持同时读写;

      3. 1分钟内存储上万条数据;

    根据对Mysql的测试情况,遇到以下问题:

      1. 最先遇到压力的是服务器,在写入2500-3000条数据时,服务器崩溃了;

      2. 当数据库写入时,耗时太长,10000条数据,大概需要505.887s,相当于8分钟,如下:

      a. 表结构:

      

       b. 数据库Procedure:

    DROP PROCEDURE IF EXISTS my_insert;
    CREATE PROCEDURE my_insert()
    BEGIN
       DECLARE n int DEFAULT 1;
            loopname:LOOP
                INSERT INTO car_pathinfo_driver_cpy(id, linkphone,cartype,carcolor,carnumber,drivername,pubtimes)VALUES(n+500,'18838325709','雪弗兰','','豫A190XS','siker','3');
                SET n=n+1;
            IF n=10000 THEN
                LEAVE loopname;
            END IF;
            END LOOP loopname;
    END;
    CALL my_insert();

      c. 运行结果如下:

      

      3. 不断的数据库写入导致数据库压力过大;

    出现以上问题,是由于mysql是基于磁盘的IO,基于服务响应性能考虑,就需要给数据做缓存,所以决定使用Mysql+redis缓存的解决方案,将业务热数据写入Redis缓存,使得高频业务数据可以直接从内存读取,提高系统整体响应速度。

    二、使用Redis+Mysql需要考虑的问题
      使用redis缓存+mysql数据库存储能解决:

      1. 数据读写的速度

      2. 服务器的压力问题

      同时,就需要考虑同步问题了,Redis和Mysql的同步问题

    三、Redis+mysql同步解决方案

      1.写Redis->redis写mysql,读Mysql。

      以下是一个Redis+mysql同步的示例,该示例测试了写入100000条数据的效率,先向Redis写入100000条数据,再将数据读出,写入Mysql。

        批量写入缓解了服务器的压力。

    stdafx.h

    // stdafx.h : 标准系统包含文件的包含文件,
    // 或是经常使用但不常更改的
    // 特定于项目的包含文件
    //
    
    #pragma once
    
    #include "targetver.h"
    
    #include <stdio.h>
    #include <tchar.h>
    #include <stdlib.h>
    #include <string.h>
    #include <iostream>
    #include <assert.h>
    #include <vector>
    #include "hiredis.h"
    #include <Windows.h>
    #include "mysql.h"
    
    #ifdef _DEBUG
    #pragma comment(lib, "hiredis_d.lib")
    #pragma comment(lib, "Win32_Interop_d.lib")
    #else
    #pragma comment(lib, "hiredis.lib")
    #pragma comment(lib, "Win32_Interop.lib")
    
    #endif
    #pragma comment(lib, "AdvAPI32.Lib")
    #pragma comment(lib, "DbgHelp.Lib")
    #pragma comment (lib, "Shlwapi.lib")
    #pragma comment(lib,"libmysql.lib")
    
    using namespace std;
    
    typedef struct testData
    {
        int iHeight;
        int iWidth;
        char szValue[64];
        char szHValue[64];
    }stTestData, *pstTestData;

    test.h

    #include "stdafx.h"
    #include "DBHandle.h"
    
    int main()
    {
    
        DBHandle *dbHandle = new DBHandle();
        thread tWriteDataToRedis(&DBHandle::writeHsetToRedis, *dbHandle);
        tWriteDataToRedis.join();
    
        return 0;
    }

    DBHandle.h

    #pragma once
    #include <mutex>
    #include <thread>
    
    class DBHandle
    {
    public:
        DBHandle();
        ~DBHandle();
    
        bool connectRedis(string strIp, int iPort, string strPwd);
        void freeRedis();
    
        int getRedisDBSize();
    
        bool writeHsetToRedis();
        bool readDataFromRedis();
    
        bool connectMysql();
        void FreeMysqlConnect();
    
        bool insertDataToMysql(string strData);
    
        redisContext* m_pRedisContext;
        MYSQL m_mysql;
        MYSQL_RES *res;     //行的一个查询结果集
        
    
    };

    DBHandle.cpp

    #include "stdafx.h"
    #include "DBHandle.h"
    
    
    DBHandle::DBHandle()
    {
        m_pRedisContext = NULL;
    }
    
    DBHandle::~DBHandle()
    { 
        if (m_pRedisContext != NULL)
        {
            m_pRedisContext = NULL;
        } 
    }
    
    bool DBHandle::connectRedis(string strIp, int iPort, string strPwd)
    {
        //redis默认监听端口为6387 可以再配置文件中修改 
        char szBuf[32] = {};
        strcpy_s(szBuf, sizeof(strIp), strIp.c_str());
        m_pRedisContext = redisConnect(szBuf, iPort);
        if (NULL == m_pRedisContext || m_pRedisContext->err)
        {
            return false;
        }
    
        //输入Redis密码
        strcpy_s(szBuf, sizeof(strPwd), strPwd.c_str());
        redisReply *pRedisReply = (redisReply*)redisCommand(m_pRedisContext, "AUTH %s", szBuf);
        if (NULL != pRedisReply)
        {
            freeReplyObject(pRedisReply);
        }
        if (NULL == pRedisReply->str)
        {
            return false;
        }
        return true;
    
    }
    
    void DBHandle::freeRedis()
    {
        redisFree(m_pRedisContext);
        if (m_pRedisContext != NULL)
        {
            m_pRedisContext = NULL;
        }
    }
    
    int DBHandle::getRedisDBSize()
    {
        //查看list长度
        int iListLen = 0;
        //redisReply *pRedisReply = (redisReply *)redisCommand(m_pRedisContext, "LLen datalist");
        redisReply *pRedisReply = (redisReply *)redisCommand(m_pRedisContext, "DBSIZE");
        if (NULL != pRedisReply)
        {
            if (NULL == pRedisReply->integer)
            {
                return false;
            }
            iListLen = pRedisReply->integer;
            freeReplyObject(pRedisReply);
        }
        if (NULL == pRedisReply)
        {
            printf("%s 
    ", m_pRedisContext->errstr);
            return false;
        }
        
        return iListLen;
    }
    
    bool DBHandle::writeHsetToRedis()
    {
        bool bFlag = connectRedis("127.0.0.1", 6379, "123456");
        if (false == bFlag)
        {
            return false;
        }
    
        time_t st = time(NULL);//
        stTestData data = {};
        int i = 1;
        while (i<100000)
        {
    
            data.iHeight = i;
            data.iWidth = 30;
            char szBuf[64] = {};
            sprintf_s(szBuf, "width%d", i);
            strcpy_s(data.szValue, 64, szBuf);
            sprintf_s(data.szHValue, "%s%d", "heighttest", i);
    
            //向Redis写入数据hset location (interger)1 "width"
            sprintf_s(szBuf, "hset location%d value %s", i, data.szValue);
            redisReply *pRedisReply = (redisReply *)redisCommand(m_pRedisContext, szBuf);
            if (NULL != pRedisReply)
            {
                freeReplyObject(pRedisReply);
            }
            i++;
        }
    
        printf("write finish");
        readDataFromRedis();
    
        time_t et = time(NULL);
        int iUsed = st - et;
        printf("used time is %d", iUsed);
        freeRedis();
        return true;
    
    }
    
    
    bool DBHandle::readDataFromRedis()
    {
        /*bool bFlag = connectRedis("127.0.0.1", 6379, "123456");
        if (false == bFlag)
        {
            return false;
        }*/
    
        printf("read start");
        
        int iSize = getListSize();
        if (iSize <= 0)
        {
            return false;
        }
        bool bSuc = connectMysql();
        if (bSuc == false)
        {
            return false;
        }
    
        int iCount = iSize;//计数
        while (iCount > 0)
        {
            //用get命令获取数据
            redisReply *pRedisReply = (redisReply*)redisCommand(m_pRedisContext, "RPOP datalist");
            if (NULL == pRedisReply)
            {
                return false;
            }
            if (NULL != pRedisReply->str)
            {
                string str = pRedisReply->str;
                insertDataToMysql(str);
                freeReplyObject(pRedisReply);
            }
            iCount--;
        }
        
        printf("read finish");
        
        return true;
    
    }
    
    bool DBHandle::connectMysql()
    {
        mysql_init(&m_mysql);
    
        // Connects to a MySQL server
        const char host[] = "192.168.4.8";
        const char user[] = "root";
        const char passwd[] = "123456";
        const char db[] = "topproductline";
        unsigned int port = 3306;
        const char *unix_socket = NULL;
        unsigned long client_flag = 0;
    
        /*A MYSQL* connection handler if the connection was successful,
        NULL if the connection was unsuccessful. For a successful connection,
        the return value is the same as the value of the first parameter.*/
        if (mysql_real_connect(&m_mysql, host, user, passwd, db, port, unix_socket, client_flag)) {
            printf("The connection was successful.
    ");
            return true;
        }
        else {
            printf("Error connecting to database:%s
    ", mysql_error(&m_mysql));
            return false;
        }
    }
    
    void DBHandle::FreeMysqlConnect()
    {
        mysql_free_result(res);
        mysql_close(&m_mysql);
    }
    
    bool DBHandle::insertDataToMysql(string strData)
    {
        char szQuery[256] = {0};
        sprintf_s(szQuery, "insert into a_test (type) values ('%s');", strData.c_str());
        if (mysql_query(&m_mysql, szQuery)) {
            printf("Query failed (%s)
    ", mysql_error(&m_mysql));
            return false;
        }
        else {
            printf("Insert success
    ");
            return true;
        }
    }

       测试结果:

      2.写redis->写mysql,读Redis->未找到->读Mysql

  • 相关阅读:
    Python修饰器 ,控制授权,通过ini配置文件 使用密钥 给函数限制试用期和过期后试用次数
    excel vba 自定义函数 使用正则表达式提取字符串
    python 值比较判断,np.nan is np.nan 却 np.nan != np.nan ,pandas 单个数据框/单元格 值判断nan
    python 读取中文CSV 'gbk' codec can't decode bytes in position 2-3:illegal multibyte sequence
    python ipython [Errno 22] invalid mode ('rb') or filename 、IDE工作路径
    windows下 python 添加PYTHONPATH 环境变量
    pandas(python2) 读取中文数据,处理中文列名
    qq邮箱 微信提醒不通知
    python :import error
    python推荐淘宝物美价廉商品 2.0
  • 原文地址:https://www.cnblogs.com/anlia/p/11803686.html
Copyright © 2011-2022 走看看