上一节《Zookeeper C API 指南三(回调函数)》重点讲了 Zookeeper C API 中各种回调函数的原型,本节将切入正题,正式讲解 Zookeeper C API。相信大家读完本文后应该对 Zookeeper C API 的使用有一个比较清晰的认识。
Zookeeper C API 概览
Zookeeper C API 很规范,接口很容易记忆,大部分接口均以 zoo_ 开头,只有少量接口以 zookeeper_ 开头,所有的 API 汇总如下:
void zoo_create_op_init(zoo_op_t * op, const char *path, const char *value, int valuelen, const struct ACL_vector *acl, int flags, char *path_buffer, int path_buffer_len); void zoo_delete_op_init(zoo_op_t * op, const char *path, int version); void zoo_set_op_init(zoo_op_t * op, const char *path, const char *buffer, int buflen, int version, struct Stat *stat); void zoo_check_op_init(zoo_op_t * op, const char *path, int version); ZOOAPI zhandle_t *zookeeper_init(const char *host, watcher_fn fn, int recv_timeout, const clientid_t * clientid, void *context, int flags); ZOOAPI int zookeeper_close(zhandle_t * zh); ZOOAPI const clientid_t *zoo_client_id(zhandle_t * zh); ZOOAPI int zoo_recv_timeout(zhandle_t * zh); ZOOAPI const void *zoo_get_context(zhandle_t * zh); ZOOAPI void zoo_set_context(zhandle_t * zh, void *context); ZOOAPI watcher_fn zoo_set_watcher(zhandle_t * zh, watcher_fn newFn); ZOOAPI struct sockaddr *zookeeper_get_connected_host(zhandle_t * zh, struct sockaddr *addr, socklen_t * addr_len); ZOOAPI int zookeeper_interest(zhandle_t * zh, int *fd, int *interest, struct timeval *tv); ZOOAPI int zookeeper_process(zhandle_t * zh, int events); ZOOAPI int zoo_state(zhandle_t * zh); ZOOAPI int zoo_acreate(zhandle_t * zh, const char *path, const char *value, int valuelen, const struct ACL_vector *acl, int flags, string_completion_t completion, const void *data); ZOOAPI int zoo_adelete(zhandle_t * zh, const char *path, int version, void_completion_t completion, const void *data); ZOOAPI int zoo_aexists(zhandle_t * zh, const char *path, int watch, stat_completion_t completion, const void *data); ZOOAPI int zoo_awexists(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, stat_completion_t completion, const void *data); ZOOAPI int zoo_aget(zhandle_t * zh, const char *path, int watch, data_completion_t completion, const void *data); ZOOAPI int zoo_awget(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, data_completion_t completion, const void *data); ZOOAPI int zoo_aset(zhandle_t * zh, const char *path, const char *buffer, int buflen, int version, stat_completion_t completion, const void *data); ZOOAPI int zoo_aget_children(zhandle_t * zh, const char *path, int watch, strings_completion_t completion, const void *data); ZOOAPI int zoo_awget_children(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, strings_completion_t completion, const void *data); ZOOAPI int zoo_aget_children2(zhandle_t * zh, const char *path, int watch, strings_stat_completion_t completion, const void *data); ZOOAPI int zoo_awget_children2(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, strings_stat_completion_t completion, const void *data); ZOOAPI int zoo_async(zhandle_t * zh, const char *path, string_completion_t completion, const void *data); ZOOAPI int zoo_aget_acl(zhandle_t * zh, const char *path, acl_completion_t completion, const void *data); ZOOAPI int zoo_aset_acl(zhandle_t * zh, const char *path, int version, struct ACL_vector *acl, void_completion_t, const void *data); ZOOAPI int zoo_amulti(zhandle_t * zh, int count, const zoo_op_t * ops, zoo_op_result_t * results, void_completion_t, const void *data); ZOOAPI const char *zerror(int c); ZOOAPI int zoo_add_auth(zhandle_t * zh, const char *scheme, const char *cert, int certLen, void_completion_t completion, const void *data); ZOOAPI int is_unrecoverable(zhandle_t * zh); ZOOAPI void zoo_set_debug_level(ZooLogLevel logLevel); ZOOAPI void zoo_set_log_stream(FILE * logStream); ZOOAPI void zoo_deterministic_conn_order(int yesOrNo); ZOOAPI int zoo_create(zhandle_t * zh, const char *path, const char *value, int valuelen, const struct ACL_vector *acl, int flags, char *path_buffer, int path_buffer_len); ZOOAPI int zoo_delete(zhandle_t * zh, const char *path, int version); ZOOAPI int zoo_exists(zhandle_t * zh, const char *path, int watch, struct Stat *stat); ZOOAPI int zoo_wexists(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, struct Stat *stat); ZOOAPI int zoo_get(zhandle_t * zh, const char *path, int watch, char *buffer, int *buffer_len, struct Stat *stat); ZOOAPI int zoo_wget(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, char *buffer, int *buffer_len, struct Stat *stat); ZOOAPI int zoo_set(zhandle_t * zh, const char *path, const char *buffer, int buflen, int version); ZOOAPI int zoo_set2(zhandle_t * zh, const char *path, const char *buffer, int buflen, int version, struct Stat *stat); ZOOAPI int zoo_get_children(zhandle_t * zh, const char *path, int watch, struct String_vector *strings); ZOOAPI int zoo_wget_children(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, struct String_vector *strings); ZOOAPI int zoo_get_children2(zhandle_t * zh, const char *path, int watch, struct String_vector *strings, struct Stat *stat); ZOOAPI int zoo_wget_children2(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, struct String_vector *strings, struct Stat *stat); ZOOAPI int zoo_get_acl(zhandle_t * zh, const char *path, struct ACL_vector *acl, struct Stat *stat); ZOOAPI int zoo_set_acl(zhandle_t * zh, const char *path, int version, const struct ACL_vector *acl); ZOOAPI int zoo_multi(zhandle_t * zh, int count, const zoo_op_t * ops, zoo_op_result_t * results);
除了基本的初始化、销毁 Zookeeper 句柄(zhandle),设置日志等级、日志流以及一些具有辅助功能 API(zerror(), zoo_state()等) 外,Zookeeper C API 大部分接口可以根据同步和异步特性分为两类,同步接口以 zoo_* 开头,异步接口以则以 zoo_a* 开头。并且除了 zookeeper_init() 以及与 zoo_multi() 或 zoo_amulti() 批量操作相关的 zoo_op_t 初始化外,其他的 API 的第一个参数均为 zhandle_t * zh, 即 Zookeeper 句柄的指针。
Zookeeper C API 分类
- 初始化、销毁 Zookeeper 句柄
ZOOAPI zhandle_t *zookeeper_init(const char *host, watcher_fn fn, int recv_timeout, const clientid_t * clientid, void *context, int flags); ZOOAPI int zookeeper_close(zhandle_t * zh);
- 辅助函数
// 设置日志等级、日志流 ZOOAPI void zoo_set_debug_level(ZooLogLevel logLevel); ZOOAPI void zoo_set_log_stream(FILE * logStream); ZOOAPI const clientid_t *zoo_client_id(zhandle_t * zh); ZOOAPI int zoo_recv_timeout(zhandle_t * zh); ZOOAPI const void *zoo_get_context(zhandle_t * zh); ZOOAPI void zoo_set_context(zhandle_t * zh, void *context); ZOOAPI watcher_fn zoo_set_watcher(zhandle_t * zh, watcher_fn newFn); ZOOAPI struct sockaddr *zookeeper_get_connected_host(zhandle_t * zh, struct sockaddr *addr, socklen_t * addr_len); ZOOAPI int zookeeper_interest(zhandle_t * zh, int *fd, int *interest, struct timeval *tv); ZOOAPI int zookeeper_process(zhandle_t * zh, int events); ZOOAPI int zoo_state(zhandle_t * zh); ZOOAPI const char *zerror(int c); ZOOAPI int is_unrecoverable(zhandle_t * zh); ZOOAPI void zoo_deterministic_conn_order(int yesOrNo);
- 与 zoo_multi() 和 zoo_amulti() 批量操作相关的 zoo_op_t 初始化
void zoo_create_op_init(zoo_op_t * op, const char *path, const char *value, int valuelen, const struct ACL_vector *acl, int flags, char *path_buffer, int path_buffer_len); void zoo_delete_op_init(zoo_op_t * op, const char *path, int version); void zoo_set_op_init(zoo_op_t * op, const char *path, const char *buffer, int buflen, int version, struct Stat *stat); void zoo_check_op_init(zoo_op_t * op, const char *path, int version);
- Zookeeper C API 同步接口
ZOOAPI int zoo_add_auth(zhandle_t * zh, const char *scheme, const char *cert, int certLen, void_completion_t completion, const void *data); ZOOAPI int zoo_create(zhandle_t * zh, const char *path, const char *value, int valuelen, const struct ACL_vector *acl, int flags, char *path_buffer, int path_buffer_len); ZOOAPI int zoo_delete(zhandle_t * zh, const char *path, int version); ZOOAPI int zoo_exists(zhandle_t * zh, const char *path, int watch, struct Stat *stat); ZOOAPI int zoo_wexists(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, struct Stat *stat); ZOOAPI int zoo_get(zhandle_t * zh, const char *path, int watch, char *buffer, int *buffer_len, struct Stat *stat); ZOOAPI int zoo_wget(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, char *buffer, int *buffer_len, struct Stat *stat); ZOOAPI int zoo_set(zhandle_t * zh, const char *path, const char *buffer, int buflen, int version); ZOOAPI int zoo_set2(zhandle_t * zh, const char *path, const char *buffer, int buflen, int version, struct Stat *stat); ZOOAPI int zoo_get_children(zhandle_t * zh, const char *path, int watch, struct String_vector *strings); ZOOAPI int zoo_wget_children(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, struct String_vector *strings); ZOOAPI int zoo_get_children2(zhandle_t * zh, const char *path, int watch, struct String_vector *strings, struct Stat *stat); ZOOAPI int zoo_wget_children2(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, struct String_vector *strings, struct Stat *stat); ZOOAPI int zoo_get_acl(zhandle_t * zh, const char *path, struct ACL_vector *acl, struct Stat *stat); ZOOAPI int zoo_set_acl(zhandle_t * zh, const char *path, int version, const struct ACL_vector *acl); ZOOAPI int zoo_multi(zhandle_t * zh, int count, const zoo_op_t * ops, zoo_op_result_t * results);
- Zookeeper C API 异步接口
ZOOAPI int zoo_acreate(zhandle_t * zh, const char *path, const char *value, int valuelen, const struct ACL_vector *acl, int flags, string_completion_t completion, const void *data); ZOOAPI int zoo_adelete(zhandle_t * zh, const char *path, int version, void_completion_t completion, const void *data); ZOOAPI int zoo_aexists(zhandle_t * zh, const char *path, int watch, stat_completion_t completion, const void *data); ZOOAPI int zoo_awexists(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, stat_completion_t completion, const void *data); ZOOAPI int zoo_aget(zhandle_t * zh, const char *path, int watch, data_completion_t completion, const void *data); ZOOAPI int zoo_awget(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, data_completion_t completion, const void *data); ZOOAPI int zoo_aset(zhandle_t * zh, const char *path, const char *buffer, int buflen, int version, stat_completion_t completion, const void *data); ZOOAPI int zoo_aget_children(zhandle_t * zh, const char *path, int watch, strings_completion_t completion, const void *data); ZOOAPI int zoo_awget_children(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, strings_completion_t completion, const void *data); ZOOAPI int zoo_aget_children2(zhandle_t * zh, const char *path, int watch, strings_stat_completion_t completion, const void *data); ZOOAPI int zoo_awget_children2(zhandle_t * zh, const char *path, watcher_fn watcher, void *watcherCtx, strings_stat_completion_t completion, const void *data); ZOOAPI int zoo_async(zhandle_t * zh, const char *path, string_completion_t completion, const void *data); ZOOAPI int zoo_aget_acl(zhandle_t * zh, const char *path, acl_completion_t completion, const void *data); ZOOAPI int zoo_aset_acl(zhandle_t * zh, const char *path, int version, struct ACL_vector *acl, void_completion_t, const void *data); ZOOAPI int zoo_amulti(zhandle_t * zh, int count, const zoo_op_t * ops, zoo_op_result_t * results, void_completion_t, const void *data);
Zookeeper C API 初体验
有了上面的介绍,下面我们来看一看如何使简单地使用 Zookeeper C API 吧。
在使用 Zookeeper C API 时应注意:
- 头文件包含 #include <zookeeper/zookeeper.h>
- 如果你需要编译多线程版本客户端程序,请添加编译选项 -DTHREADED,同时链接时应链接 zookeeper_mt 库;如果你需要编译单线程客户端程序,请不要添加编译选项 -DTHREADED,同时链接时应链接 zookeeper_st 库。
一个基本的程序如下(更详细的例子可以参看 src/c/src/cli.c):
/* * ============================================================================= * * Filename: zktest.c * * Description: zookeeper api testcase. * * Created: 02/15/2013 08:48:49 PM * * Author: Fu Haiping (forhappy), haipingf@gmail.com * Company: ICT ( Institute Of Computing Technology, CAS ) * * ============================================================================= */ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <zookeeper/zookeeper.h> #include <zookeeper/zookeeper_log.h> void zktest_watcher_g(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx) { printf("Something happened.\n"); printf("type: %d\n", type); printf("state: %d\n", state); printf("path: %s\n", path); printf("watcherCtx: %s\n", (char *)watcherCtx); } void zktest_dump_stat(const struct Stat *stat) { char tctimes[40]; char tmtimes[40]; time_t tctime; time_t tmtime; if (!stat) { fprintf(stderr,"null\n"); return; } tctime = stat->ctime/1000; tmtime = stat->mtime/1000; ctime_r(&tmtime, tmtimes); ctime_r(&tctime, tctimes); fprintf(stderr, "\tctime = %s\tczxid=%llx\n" "\tmtime=%s\tmzxid=%llx\n" "\tversion=%x\taversion=%x\n" "\tephemeralOwner = %llx\n", tctimes, stat->czxid, tmtimes, stat->mzxid, (unsigned int)stat->version, (unsigned int)stat->aversion, stat->ephemeralOwner); } void zktest_stat_completion(int rc, const struct Stat *stat, const void *data) { fprintf(stderr, "%s: rc = %d Stat:\n", (char*)data, rc); zktest_dump_stat(stat); } void zktest_void_completion(int rc, const void *data) { fprintf(stderr, "[%s]: rc = %d\n", (char*)(data==0?"null":data), rc); } void zktest_string_completion(int rc, const char *name, const void *data) { fprintf(stderr, "[%s]: rc = %d\n", (char*)(data==0?"null":data), rc); if (!rc) { fprintf(stderr, "\tname = %s\n", name); } } int main(int argc, const char *argv[]) { const char* host = "127.0.0.1:2181,127.0.0.1:2182," "127.0.0.1:2183,127.0.0.1:2184,127.0.0.1:2185"; int timeout = 30000; zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); zhandle_t* zkhandle = zookeeper_init(host, zktest_watcher_g, timeout, 0, "hello zookeeper.", 0); if (zkhandle == NULL) { fprintf(stderr, "Error when connecting to zookeeper servers...\n"); exit(EXIT_FAILURE); } // struct ACL ALL_ACL[] = {{ZOO_PERM_ALL, ZOO_ANYONE_ID_UNSAFE}}; // struct ACL_vector ALL_PERMS = {1, ALL_ACL}; int ret = zoo_acreate(zkhandle, "/xyz", "hello", 5, &ZOO_OPEN_ACL_UNSAFE, 0 /* ZOO_SEQUENCE */, zktest_string_completion, "acreate"); if (ret) { fprintf(stderr, "Error %d for %s\n", ret, "acreate"); exit(EXIT_FAILURE); } ret = 0; ret = zoo_aexists(zkhandle, "/xyz", 1, zktest_stat_completion, "aexists"); if (ret) { fprintf(stderr, "Error %d for %s\n", ret, "aexists"); exit(EXIT_FAILURE); } ret = 0; // Wait for asynchronous zookeeper call done. getchar(); ret = zoo_adelete(zkhandle, "/xyz", -1, zktest_void_completion, "adelete"); if (ret) { fprintf(stderr, "Error %d for %s\n", ret, "adelete"); exit(EXIT_FAILURE); } // Wait for asynchronous zookeeper call done. getchar(); zookeeper_close(zkhandle); }
下面简单讲讲这个程序的结构:
- 首先声明 host, timeout 变量。
const char* host = "127.0.0.1:2181,127.0.0.1:2182," "127.0.0.1:2183,127.0.0.1:2184,127.0.0.1:2185"; int timeout = 30000;
其中 host 字符串格式为逗号隔开的 IP:PORT对,可以是 Zookeeper 集群中的全部或部分 Zookeeper 实例的 IP:PORT对,我们在第一讲《准备工作》中介绍了如何部署一个伪分布式的集群,上述的 host 就是这些 zookeeper 实例的IP:PORT对。
另外 timeout 是 Zookeeper 客户端连接服务器的超时时间,单位为毫秒,timeout = 30000 说明如果 30 秒内客户端没有连接上 Zookeeper 服务则表示连接超时。
- 设置日志等级。
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
- 初始化 Zookeeper 句柄(zhandle_t)。
zhandle_t* zkhandle = zookeeper_init(host, zktest_watcher_g, timeout, 0, "hello zookeeper.", 0); if (zkhandle == NULL) { fprintf(stderr, "Error when connecting to zookeeper servers...\n"); exit(EXIT_FAILURE); }
初始化 Zookeeper 句柄 zookeeper_init() 函数原型如下:
ZOOAPI zhandle_t *zookeeper_init(const char *host, watcher_fn fn, int recv_timeout, const clientid_t * clientid, void *context, int flags);
各个参数解释如下:
host | 逗号隔开的 host:port 对, 每个代表一个 zk server, 例如: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" |
fn | 全局的监视器回调函数,当发生事件通知时,该函数会被调用。 |
clientid | 客 户端尝试重连的先前会话的ID,如果不需要重连先前的会话,则设置为 0。客户端可以通过调用 zoo_client_id来访问一个已经连接上的并且有效的会话ID,如果clientid对应的会话超时,或者由于某种原因 clientid变为无效了,那么zookeeper_init 将返回一个非法的 zhandle_t, 通过 zhandle_t 的状态可以获知 zookeeper_init 调用失败的原因。 (通常为 ZOO_EXPIRED_SESSION_STATE). |
context | 与 zhandle_t 实例相关联的“上下文对象”(可以通过该参数为 zhandle_t 传入自定义类型的数据),应用程序可以通过 zoo_get_context 访问它(例如在监视器回调函数中),当然 zookeeper 内部没有用到该参数,所以 context 可以设置为 NULL。 |
flags | 目前为保留参数,设置为 0。 |
- 创建一个 znode 节点。
// struct ACL ALL_ACL[] = {{ZOO_PERM_ALL, ZOO_ANYONE_ID_UNSAFE}}; // struct ACL_vector ALL_PERMS = {1, ALL_ACL}; int ret = zoo_acreate(zkhandle, "/xyz", "hello", 5, &ZOO_OPEN_ACL_UNSAFE, 0 /* ZOO_SEQUENCE */, zktest_string_completion, "acreate"); if (ret) { fprintf(stderr, "Error %d for %s\n", ret, "acreate"); exit(EXIT_FAILURE); }
这里采用异步方式创建 znode 节点,zoo_acreate() 函数原型为:
ZOOAPI int zoo_acreate(zhandle_t * zh, const char *path, const char *value, int valuelen, const struct ACL_vector *acl, int flags, string_completion_t completion, const void *data);
各个参数解释如下:
zh | zookeeper_init() 返回的 zookeeper 句柄。 |
path | 节点路径。 |
value | 该节点保存的数据。 |
valuelen | 该节点保存数据的大小。 |
acl | 该节点初始 ACL,ACL 不能为null 或空。 |
flags | 该参数可以设置为 0,或者创建标识符 ZOO_EPHEMERAL, ZOO_SEQUENCE 的组合或(OR)。 |
completion | 当创建节点请求完成时会调用该函数,该函数原型详见第三讲《回调函数》一节。同时传递给completion的 rc参数为: ZOK 操作完成;ZNONODE 父节点不存在;ZNODEEXISTS 节点已存在;ZNOAUTH 客户端没有权限创建节点。ZNOCHILDRENFOREPHEMERALS 临时节点不能创建子节点。 |
data | completino函数被调用时,传递给 completion 的数据。 |
- 调用 exists() 函数,设置监视器。
ret = zoo_aexists(zkhandle, "/xyz", 1, zktest_stat_completion, "aexists"); if (ret) { fprintf(stderr, "Error %d for %s\n", ret, "aexists"); exit(EXIT_FAILURE); }
- 调用 delete 函数,删除 znode 节点。
ret = zoo_adelete(zkhandle, "/xyz", -1, zktest_void_completion, "adelete"); if (ret) { fprintf(stderr, "Error %d for %s\n", ret, "adelete"); exit(EXIT_FAILURE); }
- 销毁 zookeeper 句柄
zookeeper_close(zkhandle);
好了,至此本文大致讲完了 Zookeeper C API 的分类和几个基本函数的用法。之所以为 Zookeeper C API 分类是方便记忆,开发者可以迅速找到自己需要的 API;另外,本文还讲了几个基本函数的使用方法,包括 zookeeper_init(),zoo_acreate(), zoo_aexists(), zoo_adelete() 以及 zookeeper_close()。相信大家对 Zookeeper C API 也有了一个大致的了解,第五讲我会给大家介绍 Zookeeper C API 中的同步调用的函数(即以 zoo_* 开头的函数),然后第六讲给大家介绍 Zookeeper C API 中的异步调用的函数(即以 zoo_a* 开头的函数)。