zookeeper提供了两个库,zookeeper_st和 zookeeper_mt。
前者是单线程库,仅仅提供了异步API和集成在应用程序实现循环中的回调函数,这个库是为了支持pthread库不支持或者不稳定的系统而存在的。使用过程中要自己通过zoo_interest和zoo_process实现事件处理及通知机制。
其他情况下应该使用后者多线程库。因为它同时支持同步和异步API。使用起来也方便很多,可以看我的例子。
The package includes two shared libraries: zookeeper_st and zookeeper_mt. The former only provides the asynchronous APIs and callbacks for integrating into the application's event loop. The only reason this library exists is to support the platforms were a pthread library is not available or is unstable (i.e. FreeBSD 4.x). In all other cases, application developers should link with zookeeper_mt, as it includes support for both Sync and Async API.
OK,talk is cheap。
单线程库使用示例: 在zookeeper上先创建/xyz才能执行成功,因为st库仅支持异步,而异步create没有时序保证。 该程序可以监控/xyz节点及子节点的变化。
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <zookeeper/zookeeper.h> #include <zookeeper/zookeeper_log.h> #include <errno.h> #define TRUE 1 static const char* state2String(int state){ if (state == 0) return "CLOSED_STATE"; if (state == ZOO_CONNECTING_STATE) return "CONNECTING_STATE"; if (state == ZOO_ASSOCIATING_STATE) return "ASSOCIATING_STATE"; if (state == ZOO_CONNECTED_STATE) return "CONNECTED_STATE"; if (state == ZOO_EXPIRED_SESSION_STATE) return "EXPIRED_SESSION_STATE"; if (state == ZOO_AUTH_FAILED_STATE) return "AUTH_FAILED_STATE"; return "INVALID_STATE"; } static const char* type2String(int state){ if (state == ZOO_CREATED_EVENT) return "CREATED_EVENT"; if (state == ZOO_DELETED_EVENT) return "DELETED_EVENT"; if (state == ZOO_CHANGED_EVENT) return "CHANGED_EVENT"; if (state == ZOO_CHILD_EVENT) return "CHILD_EVENT"; if (state == ZOO_SESSION_EVENT) return "SESSION_EVENT"; if (state == ZOO_NOTWATCHING_EVENT) return "NOTWATCHING_EVENT"; return "UNKNOWN_EVENT_TYPE"; } void zktest_dump_stat(const struct Stat *stat) { char tctimes[40]; char tmtimes[40]; time_t tctime; time_t tmtime; if (!stat) { fprintf(stderr,"null "); return; } tctime = stat->ctime/1000; tmtime = stat->mtime/1000; ctime_r(&tmtime, tmtimes); ctime_r(&tctime, tctimes); fprintf(stderr, " ctime = %s czxid=%llx " " mtime=%s mzxid=%llx " " version=%x aversion=%x " " ephemeralOwner = %llx ", tctimes, stat->czxid, tmtimes, stat->mzxid, (unsigned int)stat->version, (unsigned int)stat->aversion, stat->ephemeralOwner); } void zktest_data_completion(int rc, const char *value, int value_len, const struct Stat *stat, const void *data) { fprintf(stderr, "in data completion [%s]: rc = %d ", value, rc); zktest_dump_stat(stat); } /*process a list of string and stat*/ void zktest_strings_stat_completion(int rc, const struct String_vector *strings, const struct Stat *stat, const void *data) { int i=0; fprintf(stderr, "in strings state completion [%s]: rc = %d, string count %d ", (char *)data, rc, strings ->count); for (i=0; i<strings->count; i++) { printf("%d: %s ", i, strings->data[i]); } deallocate_String_vector(strings); zktest_dump_stat(stat); } void zktest_string_completion(int rc, const char *name, const void *data) { fprintf(stderr, "in string completion [%s]: rc = %d ", (char*)(data==0?"null":data), rc); if (!rc) { fprintf(stderr, " name = %s ", name); } } void zktest_stat_completion(int rc, const struct Stat *stat, const void *data) { if (rc == ZNONODE) { printf("node not exists "); } fprintf(stderr, "in state completion rc = %d data=%s stat: ", rc, (char *)data); zktest_dump_stat(stat); } void zktest_void_completion(int rc, const void *data) { fprintf(stderr, "in void completion [%s]: rc = %d ", (char*)(data==0?"null":data), rc); } void zktest_watcher_g(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx) { printf("Something happened. "); printf("type: %s ", type2String(type)); printf("state: %s ", state2String(state)); printf("path: %s ", path); printf("watcherCtx: %s ", (char *)watcherCtx); int ret; const clientid_t *zk_clientid; if (path == NULL || strlen(path) == 0) { path="/xyz"; } if (type == ZOO_SESSION_EVENT) { if (state == ZOO_EXPIRED_SESSION_STATE) { printf("[%s %d]zookeeper session expired ", __FUNCTION__, __LINE__); } else if (state == ZOO_CONNECTED_STATE) { zk_clientid = zoo_client_id(zh); printf("[%s %d] connected to zookeeper server with clientid=%lu ", __FUNCTION__, __LINE__, zk_clientid ->client_id); if ((ret=zoo_aexists(zh, path, TRUE, zktest_stat_completion, path)) != ZOK) { printf("[%s %d] zoo_aexists judge error, msg = %s ", __FUNCTION__, __LINE__, zerror(ret)); } if ((ret = zoo_aget(zh, (char *)path, TRUE, zktest_data_completion, "get param")) !=ZOK) { printf("[%s %d] fail to get znode %s, err=%d, msg= %s ", __FUNCTION__, __LINE__, path, ret, zerror(ret)); } if ((ret = zoo_aget_children2(zh, path, TRUE, zktest_strings_stat_completion, "get children param")) !=ZOK) { printf("[%s %d] fail to get znode %s, err=%d, msg= %s ", __FUNCTION__, __LINE__, path, ret, zerror(ret)); } } } else if (type == ZOO_CREATED_EVENT) { printf("create a child node "); if ((ret = zoo_aget_children2(zh, path, TRUE, zktest_strings_stat_completion, "get children param")) !=ZOK) { printf("[%s %d] fail to get znode %s, err=%d, msg= %s ", __FUNCTION__, __LINE__, path, ret, zerror(ret)); } if ((ret = zoo_aget(zh, path, TRUE, zktest_data_completion, "get param")) !=ZOK) { printf("[%s %d] fail to get znode %s, err=%d, msg= %s ", __FUNCTION__, __LINE__, path, ret, zerror(ret)); } } else if (type == ZOO_DELETED_EVENT) { printf("path %s deleted ", path); if ((ret = zoo_aget(zh, path, TRUE, zktest_data_completion, "get param")) !=ZOK) { printf("[%s %d] fail to get znode %s, err=%d, msg= %s ", __FUNCTION__, __LINE__, path, ret, zerror(ret)); } } else if (type == ZOO_CHANGED_EVENT) { printf("path %s changed ", path); if ((ret = zoo_aget_children2(zh, path, TRUE, zktest_strings_stat_completion, "get children param")) !=ZOK) { printf("[%s %d] fail to get znode %s, err=%d, msg= %s ", __FUNCTION__, __LINE__, path, ret, zerror(ret)); } if ((ret = zoo_aget(zh, path, TRUE, zktest_data_completion, "get param")) !=ZOK) { printf("[%s %d] fail to get znode %s, err=%d, msg= %s ", __FUNCTION__, __LINE__, path, ret, zerror(ret)); } } else if (type == ZOO_CHILD_EVENT) { printf("path %s child event "); if ((ret = zoo_aget(zh, path, TRUE, zktest_data_completion, "get param")) !=ZOK) { printf("[%s %d] fail to get znode %s, err=%d, msg= %s ", __FUNCTION__, __LINE__, path, ret, zerror(ret)); } if ((ret = zoo_aget_children2(zh, path, TRUE, zktest_strings_stat_completion, "get children param")) !=ZOK) { printf("[%s %d] fail to get znode %s, err=%d, msg= %s ", __FUNCTION__, __LINE__, path, ret, zerror(ret)); } } else if (type == ZOO_NOTWATCHING_EVENT){ } else { printf("unkown zoo event type "); } } int main(int argc, const char *argv[]) { static const char* host = "localhost:2181"; int timeout = 30000; zhandle_t* zkhandle; zoo_set_debug_level(ZOO_LOG_LEVEL_DEBUG); zoo_deterministic_conn_order(1); zkhandle = zookeeper_init(host, zktest_watcher_g, 30000, 0, "hello world", 0); if (zkhandle == NULL) { fprintf(stderr, "Error when connecting to zookeeper servers... "); exit(EXIT_FAILURE); } /* Wait for asynchronous zookeeper call done.*/ int fd, interest, events; int rc; fd_set rfds, wfds, efds; FD_ZERO(&rfds); FD_ZERO(&wfds); FD_ZERO(&efds); while(1){ struct timeval tv; zookeeper_interest(zkhandle, &fd, &interest, &tv); if (fd != -1) { if(interest&ZOOKEEPER_READ) { FD_SET(fd, &rfds); } else { FD_CLR(fd, &rfds); } if(interest&ZOOKEEPER_WRITE) { FD_SET(fd, &wfds); } else{ FD_CLR(fd, &wfds); } } else{ fd = 0; } if (select(fd+1, &rfds, &wfds, &efds, &tv) < 0) { printf("[%s %d]select failed, err=%d, msg=%s ", __FUNCTION__, __LINE__, errno, strerror(errno)); } events = 0; if (FD_ISSET(fd, &rfds)) { events |= ZOOKEEPER_READ; } if (FD_ISSET(fd, &wfds)) { events |= ZOOKEEPER_WRITE; } zookeeper_process(zkhandle, events); } zookeeper_close(zkhandle); }
gcc -o zookeeper_s_test zookeeper_s_test.c -lzookeeper_st -I /usr/local/include/zookeeper/
执行结果:
2014-10-21 22:05:41,856:49845:ZOO_INFO@zookeeper_init@786: Initiating client connection, host=localhost:2181 sessionTimeout=30000 watcher=0x401444 sessionId=0 sessionPasswd=<null> context=0x402213 flags=0 2014-10-21 22:05:41,859:49845:ZOO_INFO@check_events@1703: initiated connection to server [::1:2181] 2014-10-21 22:05:41,860:49845:ZOO_INFO@check_events@1750: session establishment complete on server [::1:2181], sessionId=0x14929440e3d0027, negotiated timeout=30000 2014-10-21 22:05:41,860:49845:ZOO_DEBUG@check_events@1756: Calling a watcher for a ZOO_SESSION_EVENT and the state=ZOO_CONNECTED_STATE 2014-10-21 22:05:41,860:49845:ZOO_DEBUG@process_completions@2107: Calling a watcher for node [], type = -1 event=ZOO_SESSION_EVENT Something happened. type: SESSION_EVENT state: CONNECTED_STATE path: watcherCtx: hello world [zktest_watcher_g 124] connected to zookeeper server with clientid=92650639611199527 2014-10-21 22:05:41,860:49845:ZOO_DEBUG@zoo_awexists@2835: Sending request xid=0x54473b25 for path [/xyz] to ::1:2181 2014-10-21 22:05:41,860:49845:ZOO_DEBUG@zoo_awget@2655: Sending request xid=0x54473b26 for path [/xyz] to ::1:2181 2014-10-21 22:05:41,860:49845:ZOO_DEBUG@zoo_awget_children2_@2916: Sending request xid=0x54473b27 for path [/xyz] to ::1:2181 2014-10-21 22:05:41,862:49845:ZOO_DEBUG@zookeeper_process@2264: Queueing asynchronous response 2014-10-21 22:05:41,862:49845:ZOO_DEBUG@deserialize_response@2005: Calling COMPLETION_STAT for xid=0x54473b25 failed=0 rc=0 in state completion rc = 0 data=/xyz stat: ctime = Tue Oct 21 21:48:42 2014 czxid=400000052 mtime=Tue Oct 21 21:56:06 2014 mzxid=40000005a version=2 aversion=0 ephemeralOwner = 0 2014-10-21 22:05:41,862:49845:ZOO_DEBUG@zookeeper_process@2264: Queueing asynchronous response 2014-10-21 22:05:41,862:49845:ZOO_DEBUG@deserialize_response@1992: Calling COMPLETION_DATA for xid=0x54473b26 failed=0 rc=0 in data completion [123]: rc = 0 ctime = Tue Oct 21 21:48:42 2014 czxid=400000052 mtime=Tue Oct 21 21:56:06 2014 mzxid=40000005a version=2 aversion=0 ephemeralOwner = 0 2014-10-21 22:05:41,862:49845:ZOO_DEBUG@zookeeper_process@2264: Queueing asynchronous response 2014-10-21 22:05:41,862:49845:ZOO_DEBUG@deserialize_response@2029: Calling COMPLETION_STRINGLIST_STAT for xid=0x54473b27 failed=0 rc=0 in strings state completion [get children param]: rc = 0, string count 0 ctime = Tue Oct 21 21:48:42 2014 czxid=400000052 mtime=Tue Oct 21 21:56:06 2014 mzxid=40000005a version=2 aversion=0 ephemeralOwner = 0
建议初学者使用时直接开debug,可以看清楚zookeeper的处理机制。
多线程库使用示例。该程序自己创建,读取,并删除一个节点,并能够触发watcher。
#include<stdio.h>
#include<string.h>
#include<zookeeper.h>
#include<zookeeper_log.h>
#include <net/if.h>
#include <netinet/in.h>
#include <sys/ioctl.h>
#include <arpa/inet.h>
static const char* state2String(int state){
if (state == 0)
return "CLOSED_STATE";
if (state == ZOO_CONNECTING_STATE)
return "CONNECTING_STATE";
if (state == ZOO_ASSOCIATING_STATE)
return "ASSOCIATING_STATE";
if (state == ZOO_CONNECTED_STATE)
return "CONNECTED_STATE";
if (state == ZOO_EXPIRED_SESSION_STATE)
return "EXPIRED_SESSION_STATE";
if (state == ZOO_AUTH_FAILED_STATE)
return "AUTH_FAILED_STATE";
return "INVALID_STATE";
}
static const char* type2String(int state){
if (state == ZOO_CREATED_EVENT)
return "CREATED_EVENT";
if (state == ZOO_DELETED_EVENT)
return "DELETED_EVENT";
if (state == ZOO_CHANGED_EVENT)
return "CHANGED_EVENT";
if (state == ZOO_CHILD_EVENT)
return "CHILD_EVENT";
if (state == ZOO_SESSION_EVENT)
return "SESSION_EVENT";
if (state == ZOO_NOTWATCHING_EVENT)
return "NOTWATCHING_EVENT";
return "UNKNOWN_EVENT_TYPE";
}
void zookeeper_watcher_g(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx) {
printf("
Something happened.
");
printf("type: %s
", type2String(type));
printf("state: %s
", state2String(state));
printf("path: %s
", path);
printf("watcherCtx: %s
", (char *)watcherCtx);
const clientid_t *zk_clientid;
int rc;
if (type == ZOO_CREATED_EVENT)
{
printf("[%s %d] znode %s created.
", __FUNCTION__, __LINE__, path);
} else if (type == ZOO_DELETED_EVENT){
printf("[%s %d] znode %s deleted.
", __FUNCTION__, __LINE__, path);
} else if (type == ZOO_CHANGED_EVENT){
printf("[%s %d] znode %s changed.
", __FUNCTION__, __LINE__, path);
} else if(type == ZOO_CHILD_EVENT) {
printf("[%s %d] znode %s children changed.
", __FUNCTION__, __LINE__, path);
} else if(type == ZOO_SESSION_EVENT) {
if(state == ZOO_EXPIRED_SESSION_STATE){
printf("[%s %d] zookeeper session expired
", __FUNCTION__, __LINE__);
} else if (state == ZOO_AUTH_FAILED_STATE) {
printf("[%s %d] zookeeper session auth failed
", __FUNCTION__, __LINE__);
} else if (state == ZOO_CONNECTING_STATE) {
printf("[%s %d] zookeeper session is connecting
", __FUNCTION__, __LINE__);
} else if (state == ZOO_ASSOCIATING_STATE) {
printf("[%s %d] zookeeper session is associating state
", __FUNCTION__, __LINE__);
} else if (state == ZOO_CONNECTED_STATE){
zk_clientid = zoo_client_id(zh);
printf("[%s %d] connected to zookeeper server with clientid=%lu
", __FUNCTION__, __LINE__, zk_clientid ->client_id);
} else if (state == ZOO_NOTWATCHING_EVENT) {
printf("[%s %d] zookeeper session remove watch
", __FUNCTION__, __LINE__);
} else {
printf("unknown session event state = %s, path = %s, ctxt=%s
", state2String(state), path, (char *)watcherCtx);
}
}
}
void create(zhandle_t *zkhandle,char *str) {
char path_buffer[64]="abc";
int bufferlen=sizeof(path_buffer);
printf("create node in synchronous mode-----------------------
");
int flag = zoo_create(zkhandle, str,"syn-node", 9,
&ZOO_OPEN_ACL_UNSAFE,0,
path_buffer,bufferlen);
if (flag!=ZOK) {
printf("create syn-node failed
");
exit(EXIT_FAILURE);
} else {
printf("created node is %s
",path_buffer);
}
}
void exists(zhandle_t *zkhandle,char *path)
{
int flag = zoo_exists(zkhandle, path, 1, NULL);
if (flag) {
printf("%s node already exist
", path);
} else {
printf("%s node not exist
", path);
}
}
void getACL(zhandle_t *zkhandle,char *str)
{
struct ACL_vector acl;
struct Stat stat;
int flag = zoo_get_acl(zkhandle,str,&acl,&stat);
if (flag==ZOK && acl.count > 0) {
printf("-----------------the ACL of %s:
------------",str);
printf("%d
",acl.count);
printf("%d
",acl.data->perms);
printf("%s
",acl.data->id.scheme);
printf("%s
",acl.data->id.id);
}
}
void delete(zhandle_t *zkhandle,char *str)
{
int flag = zoo_delete(zkhandle,str,-1);
if (flag==ZOK) {
printf("delete node %s success
", str);
}
}
int main(int argc, const char *argv[])
{
if (argc != 2) {
printf("usage: zookeeper_m_test server_address
");
exit(-1);
}
const char* host = argv[1];
int timeout = 30000;
char buffer[512];
int *bufferlen;
zoo_set_debug_level(ZOO_LOG_LEVEL_DEBUG);
zhandle_t* zkhandle = zookeeper_init(host, zookeeper_watcher_g, timeout, 0, NULL, 0);
if (zkhandle ==NULL) {
fprintf(stderr, "Error when connecting to zookeeper servers...
");
exit(EXIT_FAILURE);
}
create(zkhandle, "/abc");
exists(zkhandle, "/abc");
getACL(zkhandle, "/abc");
delete(zkhandle, "/abc");
while(1);
zookeeper_close(zkhandle);
}
cc -o zookeeper_m_test zookeeper_m_test.o -I/usr/local/include/zookeeper/ -L/usr/local/lib/ -L/usr/local/lib/x86_64-linux-gnu/ -lzookeeper_mt
执行:./zookeeper_m_test localhost:2181
2014-10-21 22:10:14,598:49903(0x7fc80aa16740):ZOO_INFO@zookeeper_init@786: Initiating client connection, host=localhost:2181 sessionTimeout=30000 watcher=0x400dfc sessionId=0 sessionPasswd=<null> context=(nil) flags=0 2014-10-21 22:10:14,598:49903(0x7fc80aa16740):ZOO_DEBUG@start_threads@221: starting threads... 2014-10-21 22:10:14,599:49903(0x7fc8094c2700):ZOO_DEBUG@do_io@367: started IO thread 2014-10-21 22:10:14,599:49903(0x7fc8094c2700):ZOO_INFO@check_events@1703: initiated connection to server [127.0.0.1:2181] 2014-10-21 22:10:14,599:49903(0x7fc808cc1700):ZOO_DEBUG@do_completion@459: started completion thread create node in synchronous mode----------------------- 2014-10-21 22:10:14,599:49903(0x7fc80aa16740):ZOO_DEBUG@zoo_acreate@2756: Sending request xid=0x54473c37 for path [/abc] to 127.0.0.1:2181 2014-10-21 22:10:14,604:49903(0x7fc8094c2700):ZOO_INFO@check_events@1750: session establishment complete on server [127.0.0.1:2181], sessionId=0x14929440e3d0028, negotiated timeout=30000 2014-10-21 22:10:14,604:49903(0x7fc8094c2700):ZOO_DEBUG@check_events@1756: Calling a watcher for a ZOO_SESSION_EVENT and the state=ZOO_CONNECTED_STATE 2014-10-21 22:10:14,604:49903(0x7fc808cc1700):ZOO_DEBUG@process_completions@2107: Calling a watcher for node [], type = -1 event=ZOO_SESSION_EVENT Something happened. type: SESSION_EVENT state: CONNECTED_STATE path: watcherCtx: (null) [zookeeper_watcher_g 81] connected to zookeeper server with clientid=92650639611199528 2014-10-21 22:10:14,608:49903(0x7fc8094c2700):ZOO_DEBUG@process_sync_completion@1868: Processing sync_completion with type=6 xid=0x54473c37 rc=0 created node is /abc 2014-10-21 22:10:14,608:49903(0x7fc80aa16740):ZOO_DEBUG@zoo_awexists@2835: Sending request xid=0x54473c38 for path [/abc] to 127.0.0.1:2181 2014-10-21 22:10:14,609:49903(0x7fc8094c2700):ZOO_DEBUG@process_sync_completion@1868: Processing sync_completion with type=1 xid=0x54473c38 rc=0 /abc node not exist 2014-10-21 22:10:14,609:49903(0x7fc80aa16740):ZOO_DEBUG@zoo_aget_acl@2989: Sending request xid=0x54473c39 for path [/abc] to 127.0.0.1:2181 2014-10-21 22:10:14,611:49903(0x7fc8094c2700):ZOO_DEBUG@process_sync_completion@1868: Processing sync_completion with type=5 xid=0x54473c39 rc=0 -----------------the ACL of /abc: ------------1 31 world anyone 2014-10-21 22:10:14,611:49903(0x7fc80aa16740):ZOO_DEBUG@zoo_adelete@2796: Sending request xid=0x54473c3a for path [/abc] to 127.0.0.1:2181 2014-10-21 22:10:14,613:49903(0x7fc8094c2700):ZOO_DEBUG@zookeeper_process@2193: Processing WATCHER_EVENT 2014-10-21 22:10:14,613:49903(0x7fc808cc1700):ZOO_DEBUG@process_completions@2107: Calling a watcher for node [/abc], type = -1 event=ZOO_DELETED_EVENT Something happened. type: DELETED_EVENT state: CONNECTED_STATE path: /abc watcherCtx: (null) [zookeeper_watcher_g 58] znode /abc deleted. 2014-10-21 22:10:14,613:49903(0x7fc8094c2700):ZOO_DEBUG@process_sync_completion@1868: Processing sync_completion with type=0 xid=0x54473c3a rc=0 delete node /abc success
很明显,多线程库使用起来更方便。
更多API使用详细可参考以下:
参考:
http://www.cnblogs.com/haippy/archive/2013/02/21/2920426.html
http://zookeeper.apache.org/doc/r3.4.6/zookeeperProgrammers.html#C+Binding