zoukankan      html  css  js  c++  java
  • 【数据库开发】C++测试redis中的publish/subscribe

    运用
    http://blog.csdn.net/xumaojun/article/details/51558237 中的redis_publisher.hredis_publisher.cpp redis_subscriber.h redis_subscriber.cpp四个文件,做一个操作类进行测试.
    头文件 Policy.h
    #pragma once  
    #include "redis_publisher.h"  
    #include "redis_subscriber.h"  
    #include <iostream>  
      
    using namespace std;  
      
    class Policy  
    {  
    public:  
        Policy();  
        ~Policy();  
        void publish();  
        static void recieve_message(const char *channel_name,const char *message, int len);  
      
        CRedisPublisher publisher;  
        CRedisSubscriber subscriber;  
    };  

    代码文件 Policy.cpp
    #include "Policy.h"  
      
    Policy::Policy()  
    {  
        bool ret = publisher.init();  
        if (!ret) {  
            printf("Init failed.
    ");  
      
        }  
      
        ret = publisher.connect();  
        if (!ret) {  
            printf("connect failed.");  
      
        }  
      
        //***********************************  
        CRedisSubscriber::NotifyMessageFn fn =  
            bind(recieve_message, std::tr1::placeholders::_1,  
                 std::tr1::placeholders::_2, std::tr1::placeholders::_3);  
      
        bool ret1 = subscriber.init(fn);  
        if (!ret1) {  
            printf("Init failed.
    ");  
        }  
      
        ret1 = subscriber.connect();  
        if (!ret1) {  
            printf("Connect failed.
    ");  
        }  
        subscriber.subscribe("test-channel");  
    }  
      
    Policy::~Policy()  
    {  
        publisher.disconnect();  
        publisher.uninit();  
      
        subscriber.disconnect();  
        subscriber.uninit();  
    }  
      
      
    void Policy::publish()  
    {  
        cout<<"Policy::publish()...
    "<<endl;  
        publisher.publish("test-channel", "Test message");  
    }  
      
      
    void Policy::recieve_message(const char *channel_name,  
        const char *message, int len)  
    {  
        printf("Recieve message:    channel name: %s    message: %s
    ",  
            channel_name, message);  
    }  
    


    测试文件 testPolicy.cpp
    #include "Policy.h"  
    int main(int argc, char *argv[])  
    {  
        Policy m_policy;  
        int i=0;  
        while(i<8){  
            m_policy.publish();  
            sleep(2);  
            cout<<"main sleep...
    ";  
            i++;  
        }  
        return 0;  
    }  


    Makefile
    EXE=server_main client_main    
    CC=g++    
    FLAG=-lhiredis -levent -pthread    
    OBJ=redis_publisher.o redis_subscriber.o Policy.o testPolicy.o    
      
      
    all:$(EXE)  
      
    $(EXE):$(OBJ)  
        $(CC) -o testPolicy  redis_publisher.o redis_subscriber.o Policy.o testPolicy.o  $(FLAG)  
        
    redis_publisher.o:redis_publisher.h  
    redis_subscriber.o:redis_subscriber.h  
      
    Policy.o:Policy.h  


    这样编译之后,可以自发自收.
    本例的用途在于可以简单地在项目中实现异构的数据收发.

    附上原作代码:
    redis_publisher.h
    /*************************************************************************  
        > File Name: redis_publisher.h  
        > Author: chenzengba  
        > Mail: chenzengba@gmail.com   
        > Created Time: Sat 23 Apr 2016 10:15:09 PM CST  
        > Description: 封装hiredis,实现消息发布给redis功能  
     ************************************************************************/    
        
    #ifndef REDIS_PUBLISHER_H    
    #define REDIS_PUBLISHER_H    
        
    #include <stdlib.h>    
    #include <hiredis/async.h>    
    #include <hiredis/adapters/libevent.h>    
    #include <string>    
    #include <vector>    
    #include <unistd.h>    
    #include <pthread.h>    
    #include <semaphore.h>    
    #include <boost/tr1/functional.hpp>    
        
    class CRedisPublisher    
    {    
    public:        
        CRedisPublisher();    
        ~CRedisPublisher();    
        
        bool init();    
        bool uninit();    
        bool connect();    
        bool disconnect();    
            
        bool publish(const std::string &channel_name,     
            const std::string &message);    
        
    private:    
         // 下面三个回调函数供redis服务调用    
        // 连接回调    
        static void connect_callback(const redisAsyncContext *redis_context,    
            int status);    
            
        // 断开连接的回调    
        static void disconnect_callback(const redisAsyncContext *redis_context,    
            int status);    
        
        // 执行命令回调    
        static void command_callback(redisAsyncContext *redis_context,    
            void *reply, void *privdata);    
        
        // 事件分发线程函数    
        static void *event_thread(void *data);    
        void *event_proc();    
        
    private:    
         // libevent事件对象    
        event_base *_event_base;    
        // 事件线程ID    
        pthread_t _event_thread;    
        // 事件线程的信号量    
        sem_t _event_sem;    
        // hiredis异步对象    
        redisAsyncContext *_redis_context;    
    };    


    redis_publisher.cpp
    /*************************************************************************  
        > File Name: redis_publisher.cpp  
        > Author: chenzengba  
        > Mail: chenzengba@gmail.com   
        > Created Time: Sat 23 Apr 2016 10:15:09 PM CST  
        > Description:   
     ************************************************************************/    
         
    #include <stddef.h>    
    #include <assert.h>    
    #include <string.h>    
    #include "redis_publisher.h"    
        
    CRedisPublisher::CRedisPublisher():_event_base(0), _event_thread(0),    
    _redis_context(0)    
    {    
    }    
        
    CRedisPublisher::~CRedisPublisher()    
    {    
    }    
        
    bool CRedisPublisher::init()    
    {    
        // initialize the event    
        _event_base = event_base_new();    // 创建libevent对象    
        if (NULL == _event_base)    
        {    
            printf(": Create redis event failed.
    ");    
            return false;    
        }    
        
        memset(&_event_sem, 0, sizeof(_event_sem));    
        int ret = sem_init(&_event_sem, 0, 0);    
        if (ret != 0)    
        {    
            printf(": Init sem failed.
    ");    
            return false;    
        }    
        
        return true;    
    }    
        
    bool CRedisPublisher::uninit()    
    {    
        _event_base = NULL;    
            
        sem_destroy(&_event_sem);       
        return true;    
    }    
        
    bool CRedisPublisher::connect()    
    {    
        // connect redis    
        _redis_context = redisAsyncConnect("127.0.0.1", 6379);    // 异步连接到redis服务器上,使用默认端口    
        if (NULL == _redis_context)    
        {    
            printf(": Connect redis failed.
    ");    
            return false;    
        }    
        
        if (_redis_context->err)    
        {    
            printf(": Connect redis error: %d, %s
    ",     
                _redis_context->err, _redis_context->errstr);    // 输出错误信息    
            return false;    
        }    
        
        // attach the event    
        redisLibeventAttach(_redis_context, _event_base);    // 将事件绑定到redis context上,使设置给redis的回调跟事件关联    
            
        // 创建事件处理线程    
        int ret = pthread_create(&_event_thread, 0, &CRedisPublisher::event_thread, this);    
        if (ret != 0)    
        {    
            printf(": create event thread failed.
    ");    
            disconnect();    
            return false;    
        }    
        
        // 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态    
        redisAsyncSetConnectCallback(_redis_context,     
            &CRedisPublisher::connect_callback);    
        
        // 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连    
        redisAsyncSetDisconnectCallback(_redis_context,    
            &CRedisPublisher::disconnect_callback);    
        
        // 启动事件线程    
        sem_post(&_event_sem);    
        return true;    
    }    
        
    bool CRedisPublisher::disconnect()    
    {    
        if (_redis_context)    
        {    
            redisAsyncDisconnect(_redis_context);    
            redisAsyncFree(_redis_context);    
            _redis_context = NULL;    
        }    
        
        return true;    
    }    
        
    bool CRedisPublisher::publish(const std::string &channel_name,    
        const std::string &message)    
    {    
        int ret = redisAsyncCommand(_redis_context,     
            &CRedisPublisher::command_callback, this, "PUBLISH %s %s",     
            channel_name.c_str(), message.c_str());    
        if (REDIS_ERR == ret)    
        {    
            printf("Publish command failed: %d
    ", ret);    
            return false;    
        }    
        
        return true;    
    }    
        
    void CRedisPublisher::connect_callback(const redisAsyncContext *redis_context,    
        int status)    
    {    
        if (status != REDIS_OK)    
        {    
            printf(": Error: %s
    ", redis_context->errstr);    
        }    
        else    
        {    
            printf(": Redis connected!
    ");    
        }    
    }    
        
    void CRedisPublisher::disconnect_callback(    
        const redisAsyncContext *redis_context, int status)    
    {    
        if (status != REDIS_OK)    
        {    
            // 这里异常退出,可以尝试重连    
            printf(": Error: %s
    ", redis_context->errstr);    
        }    
    }    
        
    // 消息接收回调函数    
    void CRedisPublisher::command_callback(redisAsyncContext *redis_context,    
        void *reply, void *privdata)    
    {    
        printf("command callback.
    ");    
        // 这里不执行任何操作    
    }    
        
    void *CRedisPublisher::event_thread(void *data)    
    {    
        if (NULL == data)    
        {    
            printf(": Error!
    ");    
            assert(false);    
            return NULL;    
        }    
        
        CRedisPublisher *self_this = reinterpret_cast<CRedisPublisher *>(data);    
        return self_this->event_proc();    
    }    
        
    void *CRedisPublisher::event_proc()    
    {    
        sem_wait(&_event_sem);    
            
        // 开启事件分发,event_base_dispatch会阻塞    
        event_base_dispatch(_event_base);    
        
        return NULL;    
    }  
    



    redis_subscriber.h
    /*************************************************************************  
        > File Name: redis_subscriber.h  
        > Author: chenzengba  
        > Mail: chenzengba@gmail.com   
        > Created Time: Sat 23 Apr 2016 10:15:09 PM CST  
        > Description: 封装hiredis,实现消息订阅redis功能  
     ************************************************************************/    
        
    #ifndef REDIS_SUBSCRIBER_H    
    #define REDIS_SUBSCRIBER_H    
        
    #include <stdlib.h>    
    #include <hiredis/async.h>    
    #include <hiredis/adapters/libevent.h>    
    #include <string>    
    #include <vector>    
    #include <unistd.h>    
    #include <pthread.h>    
    #include <semaphore.h>    
    #include <boost/tr1/functional.hpp>    
        
    class CRedisSubscriber    
    {    
    public:    
        typedef std::tr1::function<void (const char *, const char *, int)>         NotifyMessageFn;   // 回调函数对象类型,当接收到消息后调用回调把消息发送出去    
                
        CRedisSubscriber();    
        ~CRedisSubscriber();    
            
        bool init(const NotifyMessageFn &fn);   // 传入回调对象    
        bool uninit();    
        bool connect();    
        bool disconnect();    
            
        // 可以多次调用,订阅多个频道    
        bool subscribe(const std::string &channel_name);    
            
    private:    
        // 下面三个回调函数供redis服务调用    
        // 连接回调    
        static void connect_callback(const redisAsyncContext *redis_context,    
            int status);    
            
        // 断开连接的回调    
        static void disconnect_callback(const redisAsyncContext *redis_context,    
            int status);    
        
        // 执行命令回调    
        static void command_callback(redisAsyncContext *redis_context,    
            void *reply, void *privdata);    
        
        // 事件分发线程函数    
        static void *event_thread(void *data);    
        void *event_proc();    
            
    private:    
        // libevent事件对象    
        event_base *_event_base;    
        // 事件线程ID    
        pthread_t _event_thread;    
        // 事件线程的信号量    
        sem_t _event_sem;    
        // hiredis异步对象    
        redisAsyncContext *_redis_context;    
            
        // 通知外层的回调函数对象    
        NotifyMessageFn _notify_message_fn;    
    };    




    redis_subscriber.cpp
    /*************************************************************************  
        > File Name: redis_subscriber.cpp  
        > Author: chenzengba  
        > Mail: chenzengba@gmail.com   
        > Created Time: Sat 23 Apr 2016 10:15:09 PM CST  
        > Description:   
     ************************************************************************/    
         
    #include <stddef.h>    
    #include <assert.h>    
    #include <string.h>    
    #include "redis_subscriber.h"    
        
    CRedisSubscriber::CRedisSubscriber():_event_base(0), _event_thread(0),    
    _redis_context(0)    
    {    
    }    
        
    CRedisSubscriber::~CRedisSubscriber()    
    {    
    }    
        
    bool CRedisSubscriber::init(const NotifyMessageFn &fn)    
    {    
        // initialize the event    
        _notify_message_fn = fn;    
        _event_base = event_base_new();    // 创建libevent对象    
        if (NULL == _event_base)    
        {    
            printf(": Create redis event failed.
    ");    
            return false;    
        }    
        
        memset(&_event_sem, 0, sizeof(_event_sem));    
        int ret = sem_init(&_event_sem, 0, 0);    
        if (ret != 0)    
        {    
            printf(": Init sem failed.
    ");    
            return false;    
        }    
        
        return true;    
    }    
        
    bool CRedisSubscriber::uninit()    
    {    
        _event_base = NULL;    
            
        sem_destroy(&_event_sem);       
        return true;    
    }    
        
    bool CRedisSubscriber::connect()    
    {    
        // connect redis    
        _redis_context = redisAsyncConnect("127.0.0.1", 6379);    // 异步连接到redis服务器上,使用默认端口    
        if (NULL == _redis_context)    
        {    
            printf(": Connect redis failed.
    ");    
            return false;    
        }    
        
        if (_redis_context->err)    
        {    
            printf(": Connect redis error: %d, %s
    ",     
                _redis_context->err, _redis_context->errstr);    // 输出错误信息    
            return false;    
        }    
        
        // attach the event    
        redisLibeventAttach(_redis_context, _event_base);    // 将事件绑定到redis context上,使设置给redis的回调跟事件关联    
            
        // 创建事件处理线程    
        int ret = pthread_create(&_event_thread, 0, &CRedisSubscriber::event_thread, this);    
        if (ret != 0)    
        {    
            printf(": create event thread failed.
    ");    
            disconnect();    
            return false;    
        }    
        
        // 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态    
        redisAsyncSetConnectCallback(_redis_context,     
            &CRedisSubscriber::connect_callback);    
        
        // 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连    
        redisAsyncSetDisconnectCallback(_redis_context,    
            &CRedisSubscriber::disconnect_callback);    
        
        // 启动事件线程    
        sem_post(&_event_sem);    
        return true;    
    }    
        
    bool CRedisSubscriber::disconnect()    
    {    
        if (_redis_context)    
        {    
            redisAsyncDisconnect(_redis_context);    
            redisAsyncFree(_redis_context);    
            _redis_context = NULL;    
        }    
        
        return true;    
    }    
        
    bool CRedisSubscriber::subscribe(const std::string &channel_name)    
    {    
        int ret = redisAsyncCommand(_redis_context,     
            &CRedisSubscriber::command_callback, this, "SUBSCRIBE %s",     
            channel_name.c_str());    
        if (REDIS_ERR == ret)    
        {    
            printf("Subscribe command failed: %d
    ", ret);    
            return false;    
        }    
            
        printf(": Subscribe success: %s
    ", channel_name.c_str());    
        return true;    
    }    
        
    void CRedisSubscriber::connect_callback(const redisAsyncContext *redis_context,    
        int status)    
    {    
        if (status != REDIS_OK)    
        {    
            printf(": Error: %s
    ", redis_context->errstr);    
        }    
        else    
        {    
            printf(": Redis connected!");    
        }    
    }    
        
    void CRedisSubscriber::disconnect_callback(    
        const redisAsyncContext *redis_context, int status)    
    {    
        if (status != REDIS_OK)    
        {    
            // 这里异常退出,可以尝试重连    
            printf(": Error: %s
    ", redis_context->errstr);    
        }    
    }    
        
    // 消息接收回调函数    
    void CRedisSubscriber::command_callback(redisAsyncContext *redis_context,    
        void *reply, void *privdata)    
    {    
        if (NULL == reply || NULL == privdata) {    
            return ;    
        }    
        
        // 静态函数中,要使用类的成员变量,把当前的this指针传进来,用this指针间接访问    
        CRedisSubscriber *self_this = reinterpret_cast<CRedisSubscriber *>(privdata);    
        redisReply *redis_reply = reinterpret_cast<redisReply *>(reply);    
            
        // 订阅接收到的消息是一个带三元素的数组    
        if (redis_reply->type == REDIS_REPLY_ARRAY &&    
        redis_reply->elements == 3)    
        {    
            printf(": Recieve message:%s:%d:%s:%d:%s:%d
    ",    
            redis_reply->element[0]->str, redis_reply->element[0]->len,    
            redis_reply->element[1]->str, redis_reply->element[1]->len,    
            redis_reply->element[2]->str, redis_reply->element[2]->len);    
                
            // 调用函数对象把消息通知给外层    
            self_this->_notify_message_fn(redis_reply->element[1]->str,    
                redis_reply->element[2]->str, redis_reply->element[2]->len);    
        }    
    }    
        
    void *CRedisSubscriber::event_thread(void *data)    
    {    
        if (NULL == data)    
        {    
            printf(": Error!
    ");    
            assert(false);    
            return NULL;    
        }    
        
        CRedisSubscriber *self_this = reinterpret_cast<CRedisSubscriber *>(data);    
        return self_this->event_proc();    
    }    
        
    void *CRedisSubscriber::event_proc()    
    {    
        sem_wait(&_event_sem);    
            
        // 开启事件分发,event_base_dispatch会阻塞    
        event_base_dispatch(_event_base);    
        
        return NULL;    
    }  



  • 相关阅读:
    字符串替换
    字符串查找
    字符串比较
    字节与字符串相互转换
    1365. How Many Numbers Are Smaller Than the Current Number
    1486. XOR Operation in an Array
    1431. Kids With the Greatest Number of Candies
    1470. Shuffle the Array
    1480. Running Sum of 1d Array
    【STM32H7教程】第56章 STM32H7的DMA2D应用之刷色块,位图和Alpha混合
  • 原文地址:https://www.cnblogs.com/huty/p/8517416.html
Copyright © 2011-2022 走看看