zoukankan      html  css  js  c++  java
  • redis 连接池 hiredis

    对Hiredis进行了简单封装

    1、API进行统一,对外只提供一个接口;

    2、屏蔽上层应用对连接的细节处理;

    3、底层采用队列的方式保持连接池,保存连接会话;

    4、重连时采用时间戳进行控制,每隔一定时间(3s)重连一次,防止频繁重试造成的不必要浪费。

    先看一下Hiredis的常用数据结构与API:

    //hiredis/hiredis.h
    /* Context for a connection to Redis */
    typedef struct redisContext {
        int err; /* Error flags, 0 when there is no error */
        char errstr[128]; /* String representation of error when applicable */
        int fd;
        int flags;
        char *obuf; /* Write buffer */
        redisReader *reader; /* Protocol reader */
    } redisContext;
    /* This is the reply object returned by redisCommand() */
    #define REDIS_REPLY_STRING 1
    #define REDIS_REPLY_ARRAY 2
    #define REDIS_REPLY_INTEGER 3
    #define REDIS_REPLY_NIL 4
    #define REDIS_REPLY_STATUS 5
    #define REDIS_REPLY_ERROR 6
    typedef struct redisReply {
        int type; /* REDIS_REPLY_* */
        long long integer; /* The integer when type is REDIS_REPLY_INTEGER */
        int len; /* Length of string */
        char *str; /* Used for both REDIS_REPLY_ERROR and REDIS_REPLY_STRING */
        size_t elements; /* number of elements, for REDIS_REPLY_ARRAY */
        struct redisReply **element; /* elements vector for REDIS_REPLY_ARRAY */
    } redisReply;
    redisContext *redisConnectWithTimeout(const char *ip, int port, struct timeval tv);
    void redisFree(redisContext *c);
    
    

    封装后的代码:

    redis_pool.h

    #ifndef __REDIS_POOL_H__
    #define __REDIS_POOL_H__
    #include <iostream>
    #include <string.h>
    #include <string>
    #include <stdio.h>
    #include <memory>
    #include <mutex>
    #include <queue>
    #include <sys/time.h>
    #include "hiredis/hiredis.h"
    class KGRedisClient
    {
    public:
        KGRedisClient(std::string ip, int port, std::string password, int timeout = 2000);
        virtual ~KGRedisClient();
     //   bool ExecuteCmd_spop(const char *cmd, size_t len, std::string &response);
      bool ExecuteCmd_spop(std::string &response, const char* format, ...);
     
     //   redisReply* ExecuteCmd(const char *cmd, size_t len);
         redisReply* ExecuteCmd(const char* format, ...);
    private:
        int m_timeout;
        int m_serverPort;
        std::string m_setverIp;
     std::string m_password;
     //   CCriticalSection m_lock;
      std::mutex _mutex;
        std::queue<redisContext *> m_clients;
        time_t m_beginInvalidTime;
        static const int m_maxReconnectInterval = 3;
        redisContext* CreateContext();
        void ReleaseContext(redisContext *ctx, bool active);
        bool CheckStatus(redisContext *ctx);
    };

    #endif
     
     
    redis_pool.cpp
     
    #include "redis_pool.h"
    #include <stdio.h>
    KGRedisClient::KGRedisClient(std::string ip, int port, std::string password, int timeout)
    {
        m_timeout = timeout;
        m_serverPort = port;
        m_setverIp = ip;
     m_password = password;
        m_beginInvalidTime = 0;
    }
    KGRedisClient::~KGRedisClient()
    {
    //    CAutoLock autolock(m_lock);
     std::unique_lock <std::mutex> lck(_mutex);
        while(!m_clients.empty())
        {
            redisContext *ctx = m_clients.front();
            redisFree(ctx);
            m_clients.pop();
        }
    }


    bool KGRedisClient::ExecuteCmd(std::string &response, const char* format, ...)
    {
     va_list args;
     va_start(args, format);
     redisReply *reply = ExecuteCmd(format, args);
     va_end(args);
       
      if(reply == NULL) return false;
        std::shared_ptr<redisReply> autoFree(reply, freeReplyObject);
        if(reply->type == REDIS_REPLY_INTEGER)
        {
            response = std::to_string(reply->integer);
            return true;
        }
        else if(reply->type == REDIS_REPLY_STRING)
        {
            response.assign(reply->str, reply->len);
            return true;
        }
        else if(reply->type == REDIS_REPLY_STATUS)
        {
            response.assign(reply->str, reply->len);
            return true;
        }
        else if(reply->type == REDIS_REPLY_NIL)
        {
            response = "";
            return true;
        }
        else if(reply->type == REDIS_REPLY_ERROR)
        {
            response.assign(reply->str, reply->len);
            return false;
        }
        else if(reply->type == REDIS_REPLY_ARRAY)
        {
            response = "Not Support Array Result!!!";
            return false;
        }
        else
        {
            response = "Undefine Reply Type";
            return false;
        }
    }
    redisReply* KGRedisClient::ExecuteCmd(const char* format, ...)
    {
     va_list args;
     va_start(args, format);
     
     
        redisContext *ctx = CreateContext();
        if(ctx == NULL) return NULL;
      //  redisReply *reply = (redisReply*)redisCommand(ctx, "spop %b", cmd, len);
     //   redisReply *reply = (redisReply*)redisCommand(ctx, "%s", cmd); 
      redisReply* reply = (redisReply*)redisCommand(ctx, format, args);
       va_end(args);
     
        ReleaseContext(ctx, reply != NULL);
        return reply;
    }
     
    redisContext* KGRedisClient::CreateContext()
    {
        {
    //        CAutoLock autolock(m_lock);
      std::unique_lock <std::mutex> lck(_mutex);
            if(!m_clients.empty())
            {
                redisContext *ctx = m_clients.front();
                m_clients.pop();
                return ctx;
            }
        }
        time_t now = time(NULL);
        if(now < m_beginInvalidTime + m_maxReconnectInterval) return NULL;
        struct timeval tv;
        tv.tv_sec = m_timeout / 1000;
        tv.tv_usec = (m_timeout % 1000) * 1000;;
        redisContext *ctx = redisConnectWithTimeout(m_setverIp.c_str(), m_serverPort, tv);
        if(ctx == NULL || ctx->err != 0)
        {
            if(ctx != NULL) redisFree(ctx);
            m_beginInvalidTime = time(NULL);
           
            return NULL;
        }
     redisReply *reply;
     std::string strReply = "AUTH ";
     strReply += m_password;
     reply = (redisReply*)redisCommand(ctx, strReply.c_str());
     freeReplyObject(reply);
     reply = NULL;
     printf("connect OK ");
        return ctx;
    }
    void KGRedisClient::ReleaseContext(redisContext *ctx, bool active)
    {
        if(ctx == NULL) return;
        if(!active) {redisFree(ctx); return;}
    //    CAutoLock autolock(m_lock);
     std::unique_lock <std::mutex> lck(_mutex);
        m_clients.push(ctx);
    }
    bool KGRedisClient::CheckStatus(redisContext *ctx)
    {
        redisReply *reply = (redisReply*)redisCommand(ctx, "ping");
        if(reply == NULL) return false;
        std::shared_ptr<redisReply> autoFree(reply, freeReplyObject);
        if(reply->type != REDIS_REPLY_STATUS) return false;
        if(strcasecmp(reply->str,"PONG") != 0) return false;
        return true;
    }
          

    成员变量:m_clients用于保存连接池。

    成员变量:m_beginInvalidTime、m_maxReconnectInterval 用于控制断掉时的频繁连接。

    对外API:ExecuteCmd(const char *cmd, string &response);

  • 相关阅读:
    BBS登入和数据库迁移部分
    Auth组件
    【作业】返回一个整形数组中最大子数组地和——当维度达到二维/*待完善*/
    【作业】返回一个整形数组中最大子数组地和——当数量达到10亿
    软件工程课程周学习进度报告——第三周
    软件工程课程周学习进度报告——第二周
    软件工程第一周开课博客
    【作业】返回一个整形数组中最大子数组地和
    《人月神话》读后感其三——第二个系统问题
    《人月神话》读后感其二——从未考虑过的多人协作问题
  • 原文地址:https://www.cnblogs.com/henryliublog/p/9221964.html
Copyright © 2011-2022 走看看