zoukankan      html  css  js  c++  java
  • redis async client 与自有框架集成

    hiredis的异步接口已经支持ae libuv libev 和 libevent集成,具体头文件可以参见redis/deps/hiredis/adapters,样例参见redis/deps/hiredis/examples.

    完整样例参见: https://github.com/DavadDi/study_example/tree/master/async_redis_client

    参照hireids的异步接口和libevent的集成可以很容易和其他网络框架集成,例如asio或者ace等。 以下样例为自己编写reactor框架的集成方式,

    支持自动重练和asyncRedisContext对象的创建和释放,重练使用退步算法,最大连接时间间隔为32秒。

    使用方式:
      将redis_client.hpp 放到 hiredis的adapter目录即可。

      1 #ifndef redis_client_h
      2 #define redis_client_h
      3 
      4 #include "reactor/define.hpp"
      5 #include "reactor/event_handler.hpp"
      6 #include "hiredis.h"
      7 #include "async.h"
      8 
      9 using namespace reactor;
     10 
     11 static  void redisReactorAddRead(void *arg);
     12 static  void redisReactorDelRead(void *arg);
     13 static  void redisReactorAddWrite(void *arg);
     14 static  void redisReactorDelWrite(void *arg);
     15 static  void redisReactorCleanup(void *arg);
     16 
     17 void connectCallBack(const redisAsyncContext *c, int status);
     18 void disconnectCallBack(const redisAsyncContext *c, int status);
     19 
     20 void get_call_fun(redisAsyncContext *c, void *r, void *arg)
     21 {
     22     redisReply *reply = (redisReply *)r;
     23     std::string *key_str = (std::string *)arg;
     24 
     25     if (reply == NULL)
     26     {
     27         delete key_str;
     28         return;
     29     }
     30 
     31     LOG_INFO("[%s] -> %s
    ", key_str->c_str(), reply->str);
     32 
     33     delete key_str;
     34 
     35     /* Disconnect after receiving the reply to GET */
     36     // redisAsyncDisconnect(c);
     37 }
     38 
     39 // -------------------------------------------------------------------
     40 // !!NOTE, if obj conneted to server faild and unregister from epoll, 
     41 // prog exit, this object my leak memory
     42 // -------------------------------------------------------------------
     43 
     44 class CRedisClient : public CEventHandler
     45 {
     46     public:
     47         // enum {MAX_BUF_SIZE = 4096};
     48         typedef CEventHandler super;
     49 
     50         CRedisClient(const char *srv_ip, uint16_t srv_port, reactor::CReactor *reactor) 
     51             : super(reactor)
     52         {
     53             m_srv_ip_str = srv_ip;
     54             m_srv_port = srv_port;
     55         }
     56 
     57         int connect()
     58         {
     59             LOG_DEBUG("Enter CRedisClient connect()");
     60             m_client_status = CONNECT_STATUS::CLIENT_CONNECTING;
     61 
     62             clear_redis_context();
     63 
     64             m_redis_context = redisAsyncConnect(m_srv_ip_str.c_str(), m_srv_port);
     65             if (m_redis_context == nullptr)
     66             {
     67                 return -1;
     68             }
     69 
     70             if (m_redis_context->err)
     71             {
     72                 LOG_INFO("Connect to %s:%d Error: %s", 
     73                         m_srv_ip_str.c_str(), m_srv_port, m_redis_context->errstr);
     74 
     75                 return -1;
     76             }
     77 
     78             if (m_timer_id == 0)
     79             {
     80                 m_timer_id = this->reactor()->regist_timer(this, m_timeout_value); // only one time
     81                 LOG_DEBUG("Client regist timer to reactor id %d, timeout %d", m_timer_id, m_timeout_value);
     82             }
     83 
     84             this->attach();
     85 
     86             return 0;
     87         }
     88 
     89         virtual ~CRedisClient()
     90         {
     91             // maybe should not free redis context in deconstuct!!
     92             m_delete_redis_context = true;
     93             clear_redis_context();
     94         }
     95 
     96         virtual int open(void *data = nullptr)
     97         {
     98             m_client_status = CONNECT_STATUS::CLIENT_CONNECTED;
     99 
    100             m_delete_redis_context = false;
    101 
    102             if (m_timer_id == 0)
    103             {
    104                 m_timer_id = this->reactor()->regist_timer(this, m_timeout_value); // only one time
    105                 LOG_DEBUG("Client regist timer to reactor id %d, timeout %d", 
    106                                                     m_timer_id, m_timeout_value);
    107             }
    108             
    109             LOG_INFO("Connect to RedisServer %s:%d succeed!!", 
    110                                                m_srv_ip_str.c_str(), m_srv_port);
    111             
    112             return 0;
    113         }
    114 
    115         virtual int handle_input(socket_t socket)
    116         {
    117             redisAsyncHandleRead(m_redis_context);
    118             return 0;
    119         }
    120 
    121         virtual int handle_output(socket_t socket)
    122         {
    123             redisAsyncHandleWrite(m_redis_context);
    124             return 0;
    125         }
    126 
    127         virtual int handle_timeout(uint32_t tm, void *data = nullptr)
    128         {
    129             // LOG_DEBUG("Enter into timeout function....");
    130             if (m_client_status == CONNECT_STATUS::CLIENT_CONNECTED)
    131             {
    132                 /* just for test */
    133                 std::string key = std::to_string(tm);
    134 
    135                 LOG_DEBUG("Set key %s", key.c_str());
    136                 redisAsyncCommand(m_redis_context, NULL, NULL, "SET %s %s",key.c_str(), "aaa");
    137                 redisAsyncCommand(m_redis_context, get_call_fun, (char*)new string(key), "GET %s", key.c_str());
    138             }
    139             else
    140             {
    141                 static uint32_t last_tm = 0;
    142                 if ((tm - last_tm) >= m_timeout_interval)
    143                 { 
    144                     //reconnect
    145                     LOG_DEBUG("Start reconnect now ...");
    146                     this->connect();
    147 
    148                     m_timeout_interval = m_timeout_interval * 2;
    149                     if (m_timeout_interval > 32)
    150                     {
    151                         m_timeout_interval = 1;
    152                     }
    153 
    154                     last_tm = tm;
    155                 }
    156             }
    157 
    158             return 0;
    159         }
    160 
    161         virtual int handle_close(socket_t socket = INVALID_SOCKET, uint32_t mask = 0)
    162         {
    163             LOG_DEBUG("Enter into handle_close()"); 
    164             m_client_status = CONNECT_STATUS::CLIENT_UNCONNECTED;
    165             
    166             // epoll call delete this handler
    167             if (mask & RE_MASK_DEL)
    168             {
    169                 LOG_DEBUG("Call RE_MASK_DEL now");
    170 
    171                 if (this->m_timer_id && (this->reactor() != nullptr))
    172                 {
    173                     this->reactor()->unregist_timer(this->m_timer_id);
    174                     this->m_timer_id = 0;
    175                 }
    176 
    177                 delete this;
    178                 return 0;
    179             }
    180             
    181             this->reactor()->del_event(this,0);
    182             return 0;
    183         }
    184 
    185         void clear_redis_context()
    186         {
    187             if (m_delete_redis_context && m_redis_context != nullptr)
    188             {
    189                 LOG_DEBUG("Call redisAsynFree() now");
    190                 redisAsyncFree(m_redis_context);
    191                 m_redis_context = nullptr;
    192             }
    193         }
    194 
    195         int attach()
    196         {
    197             LOG_DEBUG("Enter attatch function... ");
    198 
    199             redisContext *context = &(m_redis_context->c);
    200             if (m_redis_context->ev.data != NULL)
    201             {
    202                 return -1;
    203             }
    204 
    205             // set callback function
    206             redisAsyncSetConnectCallback(m_redis_context,connectCallBack);
    207             redisAsyncSetDisconnectCallback(m_redis_context,disconnectCallBack);
    208 
    209             this->set_handle(context->fd); // set handler
    210 
    211             m_redis_context->ev.addRead = redisReactorAddRead;
    212             m_redis_context->ev.delRead = redisReactorDelRead;
    213             m_redis_context->ev.addWrite = redisReactorAddWrite;
    214             m_redis_context->ev.delWrite = redisReactorDelWrite;
    215             m_redis_context->ev.cleanup = redisReactorCleanup;
    216             m_redis_context->ev.data = this;
    217 
    218             LOG_DEBUG("ac->ev.data %p", m_redis_context->ev.data);
    219 
    220             this->add_read();
    221             this->add_write();
    222 
    223             return 0;
    224         }
    225 
    226         void add_read()
    227         {
    228             LOG_TRACE_METHOD(__func__);
    229 
    230             if( (this->m_current_event_mask & reactor::EVENT_READ) > 0)
    231             {
    232                 LOG_DEBUG("EV_READ(0x%0x) already in event_mask 0x%x",
    233                         reactor::EVENT_READ, this->m_current_event_mask);
    234 
    235                 return;
    236             }
    237 
    238             this->reactor()->add_event(this, reactor::EVENT_READ);
    239         }
    240 
    241         void del_read()
    242         {
    243             LOG_TRACE_METHOD(__func__);
    244             this->reactor()->mod_event(this, this->m_current_event_mask&(~reactor::EVENT_READ));
    245         }
    246 
    247         void add_write()
    248         {
    249             LOG_TRACE_METHOD(__func__);
    250             this->schedule_write();
    251         }
    252 
    253         void del_write()
    254         {
    255             LOG_TRACE_METHOD(__func__);
    256             this->cancel_schedule_write();
    257         }
    258 
    259         void clean_up()
    260         {
    261             LOG_TRACE_METHOD(__func__);
    262         }
    263 
    264         // note!!!
    265         // connenct not succeed. we can free redis context. ]
    266         // But if connect succeed and borken, we don't connect 
    267 
    268     protected:
    269         std::string m_srv_ip_str;
    270         uint16_t    m_srv_port;
    271 
    272         CONNECT_STATUS  m_client_status = CONNECT_STATUS::CLIENT_UNCONNECTED;
    273 
    274         int m_timer_id  = 0;
    275         uint32_t m_timeout_value = 1;
    276         uint32_t  m_timeout_interval = 1;
    277 
    278         bool    m_delete_redis_context = true;
    279         redisAsyncContext *m_redis_context = nullptr;
    280 };
    281 
    282 static void redisReactorAddRead(void *arg) 
    283 {
    284     LOG_DEBUG("Enter redisReactorAddRead() arg %p", arg);
    285     CRedisClient *event_handler = (CRedisClient *)arg;
    286     event_handler->add_read();
    287 }
    288 
    289 static void redisReactorDelRead(void *arg) 
    290 {
    291     CRedisClient *event_handler = (CRedisClient *)arg;
    292     event_handler->del_read();
    293 }
    294 
    295 static void redisReactorAddWrite(void *arg) 
    296 {
    297     CRedisClient *event_handler = (CRedisClient *)arg;
    298     event_handler->add_write();
    299 }
    300 
    301 static void redisReactorDelWrite(void *arg) 
    302 {
    303     CRedisClient *event_handler = (CRedisClient *)arg;
    304     event_handler->del_write();
    305 }
    306 
    307 static void redisReactorCleanup(void *arg) 
    308 {
    309     CRedisClient *event_handler = (CRedisClient *)arg;
    310     event_handler->clean_up();
    311 }
    312 
    313 void connectCallBack(const redisAsyncContext *ac, int status)
    314 {
    315     if (status != REDIS_OK)
    316     {
    317         LOG_ERROR("connectCallBack() Error: %s", ac->errstr);
    318         return;
    319     }
    320 
    321     CRedisClient *event_handler = (CRedisClient *)ac->ev.data;
    322     event_handler->open();
    323 
    324     LOG_INFO("RedisClient Connected...");    
    325 }
    326 
    327 void disconnectCallBack(const redisAsyncContext *ac, int status)
    328 { 
    329     CRedisClient *event_handler = (CRedisClient *)ac->ev.data;
    330     event_handler->handle_close(0,0);
    331 
    332     if (status != REDIS_OK)
    333     {
    334         LOG_INFO("disconnectCallBack()!! Error: %s", ac->errstr);
    335         return;
    336     }
    337 
    338     LOG_INFO("RedisClient Disconnected...");
    339 }
    340 
    341 #endif /* redis_client_h */

    使用的程序样例:

     1 #include <stdio.h>
     2 #include <stdlib.h>
     3 #include <string.h>
     4 #include <signal.h>
     5 
     6 //#include <hiredis.h>
     7 //#include <async.h>
     8 
     9 #include <adapters/redis_client.hpp>
    10 //#include "redis_client.hpp"
    11 
    12 #include <signal.h>
    13 
    14 static void signal_handler(int sig)
    15 {
    16     if (sig == SIGINT)
    17     {
    18         reactor::CReactor::instance()->end_event_loop();
    19     }
    20 }
    21 
    22 /*
    23 void get_call_fun(redisAsyncContext *c, void *r, void *arg) 
    24 {
    25     redisReply *reply = (redisReply *)r;
    26     std::string *key_str = (std::string *)arg;
    27 
    28     if (reply == NULL) 
    29     {
    30         delete key_str;
    31         return;
    32     }
    33 
    34     LOG_INFO("[%s] -> %s
    ", key_str->c_str(), reply->str);
    35 
    36     delete key_str;
    37 
    38     // Disconnect after receiving the reply to GET 
    39     // redisAsyncDisconnect(c);
    40 }
    41 */
    42 
    43 
    44 int main (int argc, char **argv) 
    45 {
    46     signal(SIGPIPE, SIG_IGN);
    47     signal(SIGINT, signal_handler);
    48 
    49     CLoggerMgr logger("reactor.prop");
    50 
    51     reactor::CReactor *rt = reactor::CReactor::instance();
    52     CRedisClient *redis_client = new CRedisClient("127.0.0.1", 6379, rt);
    53     redis_client->connect();
    54 
    55     rt->run_event_loop();
    56 
    57     return 0;
    58 }
  • 相关阅读:
    88. 合并两个有序数组
    680. 验证回文字符串 Ⅱ
    345. 反转字符串中的元音字母
    633. 平方数之和
    分支程序设计
    scanf函数(初学者)
    输入与输出(初学者)
    C语句详细(初学者)
    算术运算符和算术表达式(初学者)
    变量赋值(初学者)
  • 原文地址:https://www.cnblogs.com/davad/p/5073007.html
Copyright © 2011-2022 走看看