zoukankan      html  css  js  c++  java
  • 一种基于消息发布-订阅的观察者模式实现

    在模块化的设计中,对各个模块的解耦是至关重要的,本组件通过可以对模块所扮演的角色定义为发布者、订阅者。
    订阅者通过订阅的方式注册,发布者内部状态发生变化时不用关心谁会响应自己的消息,发布者发布只管发布消息,
    由notice组件自动分发消息到已注册的观察者。
    // notice.h #define OBSERVER_MAX 4 typedef struct notice_msg { // public char *sub_name; int cmd; int data_fix; // 固定数据 unsigned int data_ex_len; // 扩展数据长度,0:没有扩展数据 void *pdata_ex; // 扩展数据指针 }notice_msg_t; typedef int (*NOTICE_ON_MSG_CB)(notice_msg_t *pmsg); typedef struct notice_observer { ak_queue_t msg_queue; }notice_observer_t; typedef struct notice_subject { char *name; notice_observer_t* obs_tbl[OBSERVER_MAX]; }notice_subject_t;

      

    // notice.c
    /****************************************************** * Constant ******************************************************/ /****************************************************** * Macro ******************************************************/ #define NOTICE_MSG_MAX 20 #define CHECK_THIS_IS_NULL() do{if(NULL == this) return -1;}while(0) /****************************************************** * Type Definitions ******************************************************/ /****************************************************** * Function Declarations ******************************************************/ // obsever int akp_notice_observer_ctor(notice_observer_t *this) { int ret; CHECK_THIS_IS_NULL(); ret = ak_thread_queue_init(&(this->msg_queue), sizeof(notice_msg_t),NOTICE_MSG_MAX); if(ret) return -1; else return 0; } int akp_notice_observer_dector(notice_observer_t *this) { int ret; CHECK_THIS_IS_NULL(); ret = ak_thread_queue_destroy(&(this->msg_queue)); if(ret) return -1; else return 0; } int akp_notice_on_msg(notice_observer_t *this, NOTICE_ON_MSG_CB fun_cb) { CHECK_THIS_IS_NULL(); notice_msg_t msg; ak_thread_queue_wait(&(this->msg_queue), &msg); fun_cb(&msg); if(msg.data_ex_len > 0 && msg.pdata_ex != NULL) { free(msg.pdata_ex); } } // subject int akp_notice_subject_ctor(notice_subject_t *this,const char *name) { CHECK_THIS_IS_NULL(); this->name = (char *)name; memset(this->obs_tbl,0,sizeof(this->obs_tbl)); return 0; } int akp_notice_subject_dector(notice_subject_t *this) { CHECK_THIS_IS_NULL(); return 0; } int akp_notice_add_observer(notice_subject_t *this, notice_observer_t *obs) { CHECK_THIS_IS_NULL(); int i; for(i = 0; i< OBSERVER_MAX;i++) { if(this->obs_tbl[i] == 0) { break; } } if(i >= OBSERVER_MAX) { ak_print_error("[notice]: no more room to add "); return -1; } //ak_print_error("akp_notice_add_observer tbl=%x obs=%x i=%d ",this->obs_tbl,obs,i); this->obs_tbl[i] = obs; return 0; } int akp_notice_remove_observer(notice_subject_t *this, notice_observer_t *obs) { CHECK_THIS_IS_NULL(); int i; for(i = 0; i< OBSERVER_MAX;i++) { if(this->obs_tbl[i] == obs) { break; } } if(i >= OBSERVER_MAX) { ak_print_error("[notice]: match observer fail "); return -1; } this->obs_tbl[i] = 0; return 0; } int akp_notice_notify(notice_subject_t *this, notice_msg_t *pmsg, void *pdata_ex, unsigned int len) { CHECK_THIS_IS_NULL(); void *p = NULL; notice_observer_t *pobs = NULL; // clear extend data area pmsg->pdata_ex= NULL; pmsg->data_ex_len = 0; // msg dispense int i; for(i = 0; i < OBSERVER_MAX;i++) { if(this->obs_tbl[i] != 0) { if(len > 0) { p = malloc(len); if(NULL != p){ memcpy(p ,pdata_ex,len); pmsg->pdata_ex= p; pmsg->data_ex_len = len; } else { ak_print_error_ex("[notice]: malloc error "); } } //ak_print_normal("[notice]: post msg src name [%s] cmd %d ",pmsg->sub_name,pmsg->cmd); //ak_print_error("[notice]: %d this->obs_tbl[i]=%x ",i,this->obs_tbl[i]); pobs = this->obs_tbl[i]; ak_thread_queue_post(&(pobs->msg_queue), pmsg); } } return 0; }

      

  • 相关阅读:
    junit单元测试
    方法引用
    方法引用表达式(1)
    Stream流的常用方法
    Stream流
    综合案例:文件上传
    tcp通信协议
    python 生成器与迭代器
    Python 序列化与反序列化
    python 文件操作
  • 原文地址:https://www.cnblogs.com/mic-chen/p/8995232.html
Copyright © 2011-2022 走看看