zoukankan      html  css  js  c++  java
  • zk系列c++下zookeeper使用实例

    ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务。分布式应用可以使用它来实现诸如:统一命名服务、配置管理、分布式锁服务、集群管理等功能。公司常用到的是Java服务集群的管理。

    1.函数介绍

    //create a handle to used communicate with zookeeper
    zhandle_t *zookeeper_init(const char *host, watcher_fn fn, int recv_timeout, const clientid_t *clientid, void *context, int flags)
    //create a node synchronously
    int zoo_create(zhandle_t *zh, const char *path, const char *value,int valuelen,
       const struct ACL_vector *acl, int flags,char *path_buffer, int path_buffer_len);
    //lists the children of a node synchronously.
    int zoo_wget_children(zhandle_t *zh, const char *path, watcher_fn watcher, void* watcherCtx, struct String_vector *strings)
    //close the zookeeper handle and free up any resources.
    int zookeeper_close(zhandle_t *zh)

    2.实例

    用上面3个函数,就能创建一个简单的集群管理。

    数据存储结构为

    服务端向Zk注册服务

    //连接zk
    void ZkRegistry::ConnectZK() {
      if (zhandle_) {
        zookeeper_close(zhandle_);
      }
      int count = 0;
      do {
        ++count;
        zhandle_ = zookeeper_init(zk_hosts_.c_str(),InitWatcher, timeout_, NULL, NULL, 0);
      } while (!connected_ && count < ZK_MAX_CONNECT_TIMES);
    
      if (count >= ZK_MAX_CONNECT_TIMES) {
        SLOG_WARN("ZkRegistry::Init --> connect host " << zk_hosts_ << " over max times:" << count);
        return;
      }
    }
    
    //发布服务,建立临时节点
    void ZkRegistry::PublishService() {
      if (zhandle_ == NULL) {
        ConnectZK();
      }
      string server_path = PING_SERVER + "/" + PingConfig::instance().get_index() + "/"
        + GetIp() + ":" + PingConfig::instance().get_port();
      char res_path[128];
      int rc = zoo_create(zhandle_, server_path.c_str(), GetIp().c_str(), GetIp().size(),
          &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, res_path, 128);
      if (rc) {
        SLOG_INFO("ZkRegistry::PublishService --> zoo_create() path=" << server_path << "," << zerror(rc));
      }
    }
    
    //每隔10s注册一次服务
    void ZkRegistry::run() {
      while(true) {
        SLOG_INFO("publish service");
        ConnectZK();
        PublishService();
        sleep(10);
      }
    }
    
    void ZkRegistry::InitWatcher(zhandle_t *zh, int type, int state, const char *path, void *watcher_ctx) {
      if (state == ZOO_CONNECTED_STATE) {
        connected_ = true;
        SLOG_INFO("InitWatcher() ZOO_CONNECTED_STATE");
      } else if (state == ZOO_AUTH_FAILED_STATE) {
        SLOG_INFO("InitWatcher() ZOO_AUTH_FAILED_STATE");
      } else if (state == ZOO_EXPIRED_SESSION_STATE) {
        SLOG_INFO("InitWatcher() ZOO_EXPIRED_SESSION_STATE");
      } else if (state == ZOO_CONNECTING_STATE) {
        SLOG_INFO("InitWatcher() ZOO_CONNECTING_STATE");
      } else if (state == ZOO_ASSOCIATING_STATE) {
        SLOG_INFO("InitWatcher() ZOO_ASSOCIATING_STATE");
      }
    }

    客户端获取服务列表

    //连接zk server
    void ZkClient::ConnectZK() {
      cout << "ZkClient::ConnectZK" << endl;
      if (zhandle_) {
        zookeeper_close(zhandle_);
      }
      zhandle_ = NULL;
      connected_ = false;
    
      int count = 0;
      do {
        ++count;
        zhandle_ = zookeeper_init(zk_hosts_.c_str(), InitWatcher, timeout_, NULL, NULL, 0);
        sleep(5 * ONE_SECONDS);
      } while (!connected_ && count < ZK_MAX_CONNECT_TIMES);
    
      if (count >= ZK_MAX_CONNECT_TIMES){
        cout << "ZkClient::Init --> connecting zookeeper host: " << zk_hosts_ << " over times: " << count << endl;
      }
    }
    //更新服务列表,冷备和热备
    void ZkClient::Update() {
      cout << "ZkClient::Update" << endl;
      if (zhandle_ == NULL || connected_ == false) {
        Init();
      }
      //获得服务份数
      struct String_vector str_vec;
      int ret = zoo_wget_children(zhandle_, PING_SERVER.c_str(), ServiceWatcher, NULL, &str_vec);
      if (ret) {
        cout << "Update --> read path:" << PING_SERVER << " wrong, " << zerror(ret) << endl;
        return;
      }
    
      //获得各份服务ip:port
      for (int i = 0; i < str_vec.count; ++i) {
        struct String_vector node_vec;
        string path = PING_SERVER + "/" + str_vec.data[i];
        int ret = zoo_wget_children(zhandle_, path.c_str(), ServiceWatcher, NULL, &node_vec);
        cout << "Update --> path:" << path << ", ret:" << ret << ", node's size:" << node_vec.count << endl;
        if (ret || node_vec.count != 1) {
          continue;
        }
        ....
      }
    }
    //监控服务变化
    void ZkClient::ServiceWatcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) {
      cout << "type:" << type << endl;
      cout << "state:" << state << endl;
      cout << "path:" << path << endl;
    //  cout << "watcherCtx:" << (char*)watcherCtx << endl;
      cout << "ZOO_CHILD_EVENT:" << ZOO_CHILD_EVENT << endl;
      if (ZOO_CHILD_EVENT == type) {
        cout << "ServiceWatcher ZOO_CHILD_EVENT" << endl;
        ZkClient::Instance().Update();//更新服务列表    
      }
    }

    服务端会每隔一段时间重新注册自己;

    客户端在第一次与zk建立连接获取服务列表时,注册监听函数。zk当节点发生变化时,通知客户端,客户端重新获取服务列表,并注册事件。

  • 相关阅读:
    android Dialog 底部弹出
    L2-023. 图着色问题(暴力)
    L2-023. 图着色问题(暴力)
    L2-022. 重排链表
    L2-022. 重排链表
    L2-020. 功夫传人(dfs+vector 或者 邻接矩阵+dij+优先队列)
    L2-020. 功夫传人(dfs+vector 或者 邻接矩阵+dij+优先队列)
    愿天下有情人都是失散多年的兄妹(bfs)
    愿天下有情人都是失散多年的兄妹(bfs)
    循环赛日程表(分治)
  • 原文地址:https://www.cnblogs.com/whuqin/p/4982015.html
Copyright © 2011-2022 走看看