zoukankan      html  css  js  c++  java
  • 【数据库开发】C++测试redis中的publish/subscribe

    运用
    http://blog.csdn.net/xumaojun/article/details/51558237 中的redis_publisher.hredis_publisher.cpp redis_subscriber.h redis_subscriber.cpp四个文件,做一个操作类进行测试.
    头文件 Policy.h
    #pragma once  
    #include "redis_publisher.h"  
    #include "redis_subscriber.h"  
    #include <iostream>  
      
    using namespace std;  
      
    class Policy  
    {  
    public:  
        Policy();  
        ~Policy();  
        void publish();  
        static void recieve_message(const char *channel_name,const char *message, int len);  
      
        CRedisPublisher publisher;  
        CRedisSubscriber subscriber;  
    };  

    代码文件 Policy.cpp
    #include "Policy.h"  
      
    Policy::Policy()  
    {  
        bool ret = publisher.init();  
        if (!ret) {  
            printf("Init failed.
    ");  
      
        }  
      
        ret = publisher.connect();  
        if (!ret) {  
            printf("connect failed.");  
      
        }  
      
        //***********************************  
        CRedisSubscriber::NotifyMessageFn fn =  
            bind(recieve_message, std::tr1::placeholders::_1,  
                 std::tr1::placeholders::_2, std::tr1::placeholders::_3);  
      
        bool ret1 = subscriber.init(fn);  
        if (!ret1) {  
            printf("Init failed.
    ");  
        }  
      
        ret1 = subscriber.connect();  
        if (!ret1) {  
            printf("Connect failed.
    ");  
        }  
        subscriber.subscribe("test-channel");  
    }  
      
    Policy::~Policy()  
    {  
        publisher.disconnect();  
        publisher.uninit();  
      
        subscriber.disconnect();  
        subscriber.uninit();  
    }  
      
      
    void Policy::publish()  
    {  
        cout<<"Policy::publish()...
    "<<endl;  
        publisher.publish("test-channel", "Test message");  
    }  
      
      
    void Policy::recieve_message(const char *channel_name,  
        const char *message, int len)  
    {  
        printf("Recieve message:    channel name: %s    message: %s
    ",  
            channel_name, message);  
    }  
    


    测试文件 testPolicy.cpp
    #include "Policy.h"  
    int main(int argc, char *argv[])  
    {  
        Policy m_policy;  
        int i=0;  
        while(i<8){  
            m_policy.publish();  
            sleep(2);  
            cout<<"main sleep...
    ";  
            i++;  
        }  
        return 0;  
    }  


    Makefile
    EXE=server_main client_main    
    CC=g++    
    FLAG=-lhiredis -levent -pthread    
    OBJ=redis_publisher.o redis_subscriber.o Policy.o testPolicy.o    
      
      
    all:$(EXE)  
      
    $(EXE):$(OBJ)  
        $(CC) -o testPolicy  redis_publisher.o redis_subscriber.o Policy.o testPolicy.o  $(FLAG)  
        
    redis_publisher.o:redis_publisher.h  
    redis_subscriber.o:redis_subscriber.h  
      
    Policy.o:Policy.h  


    这样编译之后,可以自发自收.
    本例的用途在于可以简单地在项目中实现异构的数据收发.

    附上原作代码:
    redis_publisher.h
    /*************************************************************************  
        > File Name: redis_publisher.h  
        > Author: chenzengba  
        > Mail: chenzengba@gmail.com   
        > Created Time: Sat 23 Apr 2016 10:15:09 PM CST  
        > Description: 封装hiredis,实现消息发布给redis功能  
     ************************************************************************/    
        
    #ifndef REDIS_PUBLISHER_H    
    #define REDIS_PUBLISHER_H    
        
    #include <stdlib.h>    
    #include <hiredis/async.h>    
    #include <hiredis/adapters/libevent.h>    
    #include <string>    
    #include <vector>    
    #include <unistd.h>    
    #include <pthread.h>    
    #include <semaphore.h>    
    #include <boost/tr1/functional.hpp>    
        
    class CRedisPublisher    
    {    
    public:        
        CRedisPublisher();    
        ~CRedisPublisher();    
        
        bool init();    
        bool uninit();    
        bool connect();    
        bool disconnect();    
            
        bool publish(const std::string &channel_name,     
            const std::string &message);    
        
    private:    
         // 下面三个回调函数供redis服务调用    
        // 连接回调    
        static void connect_callback(const redisAsyncContext *redis_context,    
            int status);    
            
        // 断开连接的回调    
        static void disconnect_callback(const redisAsyncContext *redis_context,    
            int status);    
        
        // 执行命令回调    
        static void command_callback(redisAsyncContext *redis_context,    
            void *reply, void *privdata);    
        
        // 事件分发线程函数    
        static void *event_thread(void *data);    
        void *event_proc();    
        
    private:    
         // libevent事件对象    
        event_base *_event_base;    
        // 事件线程ID    
        pthread_t _event_thread;    
        // 事件线程的信号量    
        sem_t _event_sem;    
        // hiredis异步对象    
        redisAsyncContext *_redis_context;    
    };    


    redis_publisher.cpp
    /*************************************************************************  
        > File Name: redis_publisher.cpp  
        > Author: chenzengba  
        > Mail: chenzengba@gmail.com   
        > Created Time: Sat 23 Apr 2016 10:15:09 PM CST  
        > Description:   
     ************************************************************************/    
         
    #include <stddef.h>    
    #include <assert.h>    
    #include <string.h>    
    #include "redis_publisher.h"    
        
    CRedisPublisher::CRedisPublisher():_event_base(0), _event_thread(0),    
    _redis_context(0)    
    {    
    }    
        
    CRedisPublisher::~CRedisPublisher()    
    {    
    }    
        
    bool CRedisPublisher::init()    
    {    
        // initialize the event    
        _event_base = event_base_new();    // 创建libevent对象    
        if (NULL == _event_base)    
        {    
            printf(": Create redis event failed.
    ");    
            return false;    
        }    
        
        memset(&_event_sem, 0, sizeof(_event_sem));    
        int ret = sem_init(&_event_sem, 0, 0);    
        if (ret != 0)    
        {    
            printf(": Init sem failed.
    ");    
            return false;    
        }    
        
        return true;    
    }    
        
    bool CRedisPublisher::uninit()    
    {    
        _event_base = NULL;    
            
        sem_destroy(&_event_sem);       
        return true;    
    }    
        
    bool CRedisPublisher::connect()    
    {    
        // connect redis    
        _redis_context = redisAsyncConnect("127.0.0.1", 6379);    // 异步连接到redis服务器上,使用默认端口    
        if (NULL == _redis_context)    
        {    
            printf(": Connect redis failed.
    ");    
            return false;    
        }    
        
        if (_redis_context->err)    
        {    
            printf(": Connect redis error: %d, %s
    ",     
                _redis_context->err, _redis_context->errstr);    // 输出错误信息    
            return false;    
        }    
        
        // attach the event    
        redisLibeventAttach(_redis_context, _event_base);    // 将事件绑定到redis context上,使设置给redis的回调跟事件关联    
            
        // 创建事件处理线程    
        int ret = pthread_create(&_event_thread, 0, &CRedisPublisher::event_thread, this);    
        if (ret != 0)    
        {    
            printf(": create event thread failed.
    ");    
            disconnect();    
            return false;    
        }    
        
        // 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态    
        redisAsyncSetConnectCallback(_redis_context,     
            &CRedisPublisher::connect_callback);    
        
        // 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连    
        redisAsyncSetDisconnectCallback(_redis_context,    
            &CRedisPublisher::disconnect_callback);    
        
        // 启动事件线程    
        sem_post(&_event_sem);    
        return true;    
    }    
        
    bool CRedisPublisher::disconnect()    
    {    
        if (_redis_context)    
        {    
            redisAsyncDisconnect(_redis_context);    
            redisAsyncFree(_redis_context);    
            _redis_context = NULL;    
        }    
        
        return true;    
    }    
        
    bool CRedisPublisher::publish(const std::string &channel_name,    
        const std::string &message)    
    {    
        int ret = redisAsyncCommand(_redis_context,     
            &CRedisPublisher::command_callback, this, "PUBLISH %s %s",     
            channel_name.c_str(), message.c_str());    
        if (REDIS_ERR == ret)    
        {    
            printf("Publish command failed: %d
    ", ret);    
            return false;    
        }    
        
        return true;    
    }    
        
    void CRedisPublisher::connect_callback(const redisAsyncContext *redis_context,    
        int status)    
    {    
        if (status != REDIS_OK)    
        {    
            printf(": Error: %s
    ", redis_context->errstr);    
        }    
        else    
        {    
            printf(": Redis connected!
    ");    
        }    
    }    
        
    void CRedisPublisher::disconnect_callback(    
        const redisAsyncContext *redis_context, int status)    
    {    
        if (status != REDIS_OK)    
        {    
            // 这里异常退出,可以尝试重连    
            printf(": Error: %s
    ", redis_context->errstr);    
        }    
    }    
        
    // 消息接收回调函数    
    void CRedisPublisher::command_callback(redisAsyncContext *redis_context,    
        void *reply, void *privdata)    
    {    
        printf("command callback.
    ");    
        // 这里不执行任何操作    
    }    
        
    void *CRedisPublisher::event_thread(void *data)    
    {    
        if (NULL == data)    
        {    
            printf(": Error!
    ");    
            assert(false);    
            return NULL;    
        }    
        
        CRedisPublisher *self_this = reinterpret_cast<CRedisPublisher *>(data);    
        return self_this->event_proc();    
    }    
        
    void *CRedisPublisher::event_proc()    
    {    
        sem_wait(&_event_sem);    
            
        // 开启事件分发,event_base_dispatch会阻塞    
        event_base_dispatch(_event_base);    
        
        return NULL;    
    }  
    



    redis_subscriber.h
    /*************************************************************************  
        > File Name: redis_subscriber.h  
        > Author: chenzengba  
        > Mail: chenzengba@gmail.com   
        > Created Time: Sat 23 Apr 2016 10:15:09 PM CST  
        > Description: 封装hiredis,实现消息订阅redis功能  
     ************************************************************************/    
        
    #ifndef REDIS_SUBSCRIBER_H    
    #define REDIS_SUBSCRIBER_H    
        
    #include <stdlib.h>    
    #include <hiredis/async.h>    
    #include <hiredis/adapters/libevent.h>    
    #include <string>    
    #include <vector>    
    #include <unistd.h>    
    #include <pthread.h>    
    #include <semaphore.h>    
    #include <boost/tr1/functional.hpp>    
        
    class CRedisSubscriber    
    {    
    public:    
        typedef std::tr1::function<void (const char *, const char *, int)>         NotifyMessageFn;   // 回调函数对象类型,当接收到消息后调用回调把消息发送出去    
                
        CRedisSubscriber();    
        ~CRedisSubscriber();    
            
        bool init(const NotifyMessageFn &fn);   // 传入回调对象    
        bool uninit();    
        bool connect();    
        bool disconnect();    
            
        // 可以多次调用,订阅多个频道    
        bool subscribe(const std::string &channel_name);    
            
    private:    
        // 下面三个回调函数供redis服务调用    
        // 连接回调    
        static void connect_callback(const redisAsyncContext *redis_context,    
            int status);    
            
        // 断开连接的回调    
        static void disconnect_callback(const redisAsyncContext *redis_context,    
            int status);    
        
        // 执行命令回调    
        static void command_callback(redisAsyncContext *redis_context,    
            void *reply, void *privdata);    
        
        // 事件分发线程函数    
        static void *event_thread(void *data);    
        void *event_proc();    
            
    private:    
        // libevent事件对象    
        event_base *_event_base;    
        // 事件线程ID    
        pthread_t _event_thread;    
        // 事件线程的信号量    
        sem_t _event_sem;    
        // hiredis异步对象    
        redisAsyncContext *_redis_context;    
            
        // 通知外层的回调函数对象    
        NotifyMessageFn _notify_message_fn;    
    };    




    redis_subscriber.cpp
    /*************************************************************************  
        > File Name: redis_subscriber.cpp  
        > Author: chenzengba  
        > Mail: chenzengba@gmail.com   
        > Created Time: Sat 23 Apr 2016 10:15:09 PM CST  
        > Description:   
     ************************************************************************/    
         
    #include <stddef.h>    
    #include <assert.h>    
    #include <string.h>    
    #include "redis_subscriber.h"    
        
    CRedisSubscriber::CRedisSubscriber():_event_base(0), _event_thread(0),    
    _redis_context(0)    
    {    
    }    
        
    CRedisSubscriber::~CRedisSubscriber()    
    {    
    }    
        
    bool CRedisSubscriber::init(const NotifyMessageFn &fn)    
    {    
        // initialize the event    
        _notify_message_fn = fn;    
        _event_base = event_base_new();    // 创建libevent对象    
        if (NULL == _event_base)    
        {    
            printf(": Create redis event failed.
    ");    
            return false;    
        }    
        
        memset(&_event_sem, 0, sizeof(_event_sem));    
        int ret = sem_init(&_event_sem, 0, 0);    
        if (ret != 0)    
        {    
            printf(": Init sem failed.
    ");    
            return false;    
        }    
        
        return true;    
    }    
        
    bool CRedisSubscriber::uninit()    
    {    
        _event_base = NULL;    
            
        sem_destroy(&_event_sem);       
        return true;    
    }    
        
    bool CRedisSubscriber::connect()    
    {    
        // connect redis    
        _redis_context = redisAsyncConnect("127.0.0.1", 6379);    // 异步连接到redis服务器上,使用默认端口    
        if (NULL == _redis_context)    
        {    
            printf(": Connect redis failed.
    ");    
            return false;    
        }    
        
        if (_redis_context->err)    
        {    
            printf(": Connect redis error: %d, %s
    ",     
                _redis_context->err, _redis_context->errstr);    // 输出错误信息    
            return false;    
        }    
        
        // attach the event    
        redisLibeventAttach(_redis_context, _event_base);    // 将事件绑定到redis context上,使设置给redis的回调跟事件关联    
            
        // 创建事件处理线程    
        int ret = pthread_create(&_event_thread, 0, &CRedisSubscriber::event_thread, this);    
        if (ret != 0)    
        {    
            printf(": create event thread failed.
    ");    
            disconnect();    
            return false;    
        }    
        
        // 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态    
        redisAsyncSetConnectCallback(_redis_context,     
            &CRedisSubscriber::connect_callback);    
        
        // 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连    
        redisAsyncSetDisconnectCallback(_redis_context,    
            &CRedisSubscriber::disconnect_callback);    
        
        // 启动事件线程    
        sem_post(&_event_sem);    
        return true;    
    }    
        
    bool CRedisSubscriber::disconnect()    
    {    
        if (_redis_context)    
        {    
            redisAsyncDisconnect(_redis_context);    
            redisAsyncFree(_redis_context);    
            _redis_context = NULL;    
        }    
        
        return true;    
    }    
        
    bool CRedisSubscriber::subscribe(const std::string &channel_name)    
    {    
        int ret = redisAsyncCommand(_redis_context,     
            &CRedisSubscriber::command_callback, this, "SUBSCRIBE %s",     
            channel_name.c_str());    
        if (REDIS_ERR == ret)    
        {    
            printf("Subscribe command failed: %d
    ", ret);    
            return false;    
        }    
            
        printf(": Subscribe success: %s
    ", channel_name.c_str());    
        return true;    
    }    
        
    void CRedisSubscriber::connect_callback(const redisAsyncContext *redis_context,    
        int status)    
    {    
        if (status != REDIS_OK)    
        {    
            printf(": Error: %s
    ", redis_context->errstr);    
        }    
        else    
        {    
            printf(": Redis connected!");    
        }    
    }    
        
    void CRedisSubscriber::disconnect_callback(    
        const redisAsyncContext *redis_context, int status)    
    {    
        if (status != REDIS_OK)    
        {    
            // 这里异常退出,可以尝试重连    
            printf(": Error: %s
    ", redis_context->errstr);    
        }    
    }    
        
    // 消息接收回调函数    
    void CRedisSubscriber::command_callback(redisAsyncContext *redis_context,    
        void *reply, void *privdata)    
    {    
        if (NULL == reply || NULL == privdata) {    
            return ;    
        }    
        
        // 静态函数中,要使用类的成员变量,把当前的this指针传进来,用this指针间接访问    
        CRedisSubscriber *self_this = reinterpret_cast<CRedisSubscriber *>(privdata);    
        redisReply *redis_reply = reinterpret_cast<redisReply *>(reply);    
            
        // 订阅接收到的消息是一个带三元素的数组    
        if (redis_reply->type == REDIS_REPLY_ARRAY &&    
        redis_reply->elements == 3)    
        {    
            printf(": Recieve message:%s:%d:%s:%d:%s:%d
    ",    
            redis_reply->element[0]->str, redis_reply->element[0]->len,    
            redis_reply->element[1]->str, redis_reply->element[1]->len,    
            redis_reply->element[2]->str, redis_reply->element[2]->len);    
                
            // 调用函数对象把消息通知给外层    
            self_this->_notify_message_fn(redis_reply->element[1]->str,    
                redis_reply->element[2]->str, redis_reply->element[2]->len);    
        }    
    }    
        
    void *CRedisSubscriber::event_thread(void *data)    
    {    
        if (NULL == data)    
        {    
            printf(": Error!
    ");    
            assert(false);    
            return NULL;    
        }    
        
        CRedisSubscriber *self_this = reinterpret_cast<CRedisSubscriber *>(data);    
        return self_this->event_proc();    
    }    
        
    void *CRedisSubscriber::event_proc()    
    {    
        sem_wait(&_event_sem);    
            
        // 开启事件分发,event_base_dispatch会阻塞    
        event_base_dispatch(_event_base);    
        
        return NULL;    
    }  



  • 相关阅读:
    业务领域建模Domain Modeling
    用例建模Use Case Modeling
    分析一套源代码的代码规范和风格并讨论如何改进优化代码
    结合工程实践选题调研分析同类软件产品
    如何提高程序员的键盘使用效率?
    CSS水平布局
    CSS文档流
    CSS盒子模型
    CSS单位
    CSS选择器的权重
  • 原文地址:https://www.cnblogs.com/huty/p/8517415.html
Copyright © 2011-2022 走看看