zoukankan      html  css  js  c++  java
  • Zookeeper C API 指南八(Zookeeper C API 应用示例)

    前面七讲我们基本上介绍完了 Zookeeper C API 的所有内容,本文将结合一个小例子讲讲如何在你的实际项目中使用 Zookeeper 服务。

    设想如下场景:

    假设程序 A 需要 7* 24 小时在线对外提供服务,但是 A 程序在生产环境下总是不稳定,时常崩溃,不过幸运的是解决方案很简单,在 A 程序崩溃以后只需要重启它就可以了。当然如此简单的问题你可以提出多种解决方案,比方说自己实现一个服务程序,每隔一定时间去轮询 A 的状态,如果发现 A 崩溃了,立即重启它,并向管理人员报告问题。不过我们并不打算这么做,毕竟本文主题是讲 Zookeeper C API 的应用,所以我们采用 Zookeeper 服务来解决该问题。

    若采用 Zookeeper 服务可以按照如下方案解决问题,程序 A 在启动时创建一个临时(ZOO_EPHEMERAL) znode 节点 /A,然后按照正常流程对外提供服务。另外监控程序对 /A 节点设置监视,当 /A 节点消失(说明 A 程序已经崩溃)时,重启 A 程序。假设 A 的名称是 QueryServer,即对外提供查询服务的程序,具体提供什么查询服务由应用自身决定,我们这里只是简单地模拟一下。QueryServer 在启动时创建一个 /QueryServer 的临时节点(ZOO_EPHEMERAL),然后,程序 QueryServerd 监控 /QueryServer 节点,当 /QueryServer 节点消失(说明 A 程序已经崩溃)时,重启 QueryServer 程序。

    下面是 QueryServer 的实现代码:

    /*
     * =============================================================================
     *
     *       Filename:  QueryServer.c
     *
     *    Description:  QueryServer
     *
     *        Created:  02/15/2013 08:48:49 PM
     *
     *         Author:  Fu Haiping (forhappy), haipingf@gmail.com
     *        Company:  ICT ( Institute Of Computing Technology, CAS )
     *
     * =============================================================================
     */
    #include <stdbool.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <zookeeper/zookeeper.h>
    #include <zookeeper/zookeeper_log.h>
    
    void QueryServer_watcher_g(zhandle_t* zh, int type, int state,
            const char* path, void* watcherCtx)
    {
        if (type == ZOO_SESSION_EVENT) {
            if (state == ZOO_CONNECTED_STATE) {
                printf("[[[QueryServer]]] Connected to zookeeper service successfully!\n");
            } else if (state == ZOO_EXPIRED_SESSION_STATE) { 
                printf("Zookeeper session expired!\n");
            }
        }  
    }
    
    void QueryServer_string_completion(int rc, const char *name, const void *data)
    {
        fprintf(stderr, "[%s]: rc = %d\n", (char*)(data==0?"null":data), rc);
        if (!rc) {
            fprintf(stderr, "\tname = %s\n", name);
        }
    }
    
    void QueryServer_accept_query()
    {
        printf("QueryServer is running...\n");
    }
    
    int main(int argc, const char *argv[])
    {
        const char* host = "127.0.0.1:2181,127.0.0.1:2182,"
            "127.0.0.1:2183,127.0.0.1:2184,127.0.0.1:2185";
        int timeout = 30000;
        
        zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
        zhandle_t* zkhandle = zookeeper_init(host,
                QueryServer_watcher_g, timeout, 0, "hello zookeeper.", 0);
        if (zkhandle == NULL) {
            fprintf(stderr, "Error when connecting to zookeeper servers...\n");
            exit(EXIT_FAILURE);
        }
    
        // struct ACL ALL_ACL[] = {{ZOO_PERM_ALL, ZOO_ANYONE_ID_UNSAFE}};
        // struct ACL_vector ALL_PERMS = {1, ALL_ACL};
        int ret = zoo_acreate(zkhandle, "/QueryServer", "alive", 5,
               &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL,
               QueryServer_string_completion, "zoo_acreate");
        if (ret) {
            fprintf(stderr, "Error %d for %s\n", ret, "acreate");
            exit(EXIT_FAILURE);
        }
    
        do {
            // 模拟 QueryServer 对外提供服务.
            // 为了简单起见, 我们在此调用一个简单的函数来模拟 QueryServer.
            // 然后休眠 5 秒,程序主动退出(即假设此时已经崩溃).
            QueryServer_accept_query();
            sleep(5);
        } while(false);
    
        zookeeper_close(zkhandle);
    }

    Makefile如下:

    all:QueryServer
    
    QueryServer:QueryServer.o
        gcc -L/usr/local/lib/ -lzookeeper_mt -o $@ $^ 
    
    QueryServer.o:QueryServer.c
        gcc -DTHREADED -I/usr/local/include/zookeeper -o $@ -c $^
    
    .PHONY:clean
    
    clean:
        rm QueryServer.o QueryServer

    QueryServerd 代码如下:

    /*
     * =============================================================================
     *
     *       Filename:  QueryServerd.c
     *
     *    Description:  QueryServer daemon using zookeeper. 
     *
     *        Created:  02/15/2013 08:48:49 PM
     *
     *         Author:  Fu Haiping (forhappy), haipingf@gmail.com
     *        Company:  ICT ( Institute Of Computing Technology, CAS )
     *
     * =============================================================================
     */
    #include <unistd.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <sys/types.h>
    #include <sys/wait.h>
    #include <zookeeper/zookeeper.h>
    #include <zookeeper/zookeeper_log.h>
    
    void QueryServerd_watcher_global(zhandle_t * zh, int type, int state,
                                const char *path, void *watcherCtx);
    static void QueryServerd_dump_stat(const struct Stat *stat);
    void QueryServerd_stat_completion(int rc, const struct Stat *stat,
                                 const void *data);
    void QueryServerd_watcher_awexists(zhandle_t *zh, int type, int state,
                                  const char *path, void *watcherCtx);
    static void QueryServerd_awexists(zhandle_t *zh);
    
    void
    QueryServerd_watcher_global(zhandle_t * zh, int type, int state,
                                const char *path, void *watcherCtx)
    {
        if (type == ZOO_SESSION_EVENT) {
            if (state == ZOO_CONNECTED_STATE) {
                printf("Connected to zookeeper service successfully!\n");
            } else if (state == ZOO_EXPIRED_SESSION_STATE) { 
                printf("Zookeeper session expired!\n");
            }
        }
    }
    
    static void
    QueryServerd_dump_stat(const struct Stat *stat)
    {
        char tctimes[40];
        char tmtimes[40];
        time_t tctime;
        time_t tmtime;
    
        if (!stat) {
            fprintf(stderr, "null\n");
            return;
        }
        tctime = stat->ctime / 1000;
        tmtime = stat->mtime / 1000;
    
        ctime_r(&tmtime, tmtimes);
        ctime_r(&tctime, tctimes);
    
        fprintf(stderr, "\tctime = %s\tczxid=%llx\n"
                "\tmtime=%s\tmzxid=%llx\n"
                "\tversion=%x\taversion=%x\n"
                "\tephemeralOwner = %llx\n",
                tctimes, stat->czxid,
                tmtimes, stat->mzxid,
                (unsigned int) stat->version, (unsigned int) stat->aversion,
                stat->ephemeralOwner);
    }
    
    void
    QueryServerd_stat_completion(int rc, const struct Stat *stat,
                                 const void *data)
    {
        // fprintf(stderr, "%s: rc = %d Stat:\n", (char *) data, rc);
        // QueryServerd_dump_stat(stat);
    }
    
    void
    QueryServerd_watcher_awexists(zhandle_t *zh, int type, int state,
                                  const char *path, void *watcherCtx)
    {
        if (state == ZOO_CONNECTED_STATE) {
            if (type == ZOO_DELETED_EVENT) {
                printf("QueryServer gone away, restart now...\n");
                // re-exists and set watch on /QueryServer again.
                QueryServerd_awexists(zh);
                pid_t pid = fork();
                if (pid < 0) {
                    fprintf(stderr, "Error when doing fork.\n");
                    exit(EXIT_FAILURE);
                }
                if (pid == 0) { /* child process */
                    // 重启 QueryServer 服务.
                    execl("/tmp/QueryServer/QueryServer", "QueryServer", NULL);
                    exit(EXIT_SUCCESS);
                }
                sleep(1); /* sleep 1 second for purpose. */
            } else if (type == ZOO_CREATED_EVENT) {
                printf("QueryServer started...\n");
            }
        }
    
        // re-exists and set watch on /QueryServer again.
        QueryServerd_awexists(zh);
    }
    
    static void
    QueryServerd_awexists(zhandle_t *zh)
    {
        int ret =
            zoo_awexists(zh, "/QueryServer",
                         QueryServerd_watcher_awexists,
                         "QueryServerd_awexists.",
                         QueryServerd_stat_completion,
                         "zoo_awexists");
        if (ret) {
            fprintf(stderr, "Error %d for %s\n", ret, "aexists");
            exit(EXIT_FAILURE);
        }
    }
    
    int
    main(int argc, const char *argv[])
    {
        const char *host = "127.0.0.1:2181,127.0.0.1:2182,"
            "127.0.0.1:2183,127.0.0.1:2184,127.0.0.1:2185";
        int timeout = 30000;
    
        zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
        zhandle_t *zkhandle = zookeeper_init(host,
                                             QueryServerd_watcher_global,
                                             timeout,
                                             0, "QueryServerd", 0);
        if (zkhandle == NULL) {
            fprintf(stderr, "Error when connecting to zookeeper servers...\n");
            exit(EXIT_FAILURE);
        }
    
        QueryServerd_awexists(zkhandle);
        // Wait for asynchronous zookeeper call done.
        getchar();
    
        zookeeper_close(zkhandle);
    
        return 0;
    }

    Makefile 如下:

    all:QueryServerd
    
    QueryServerd:QueryServerd.o
        gcc -L/usr/local/lib/ -lzookeeper_mt -o $@ $^ 
    
    QueryServerd.o:QueryServerd.c
        gcc -g -DTHREADED -I/usr/local/include/zookeeper -o $@ -c $^
    
    .PHONY:clean
    
    clean:
        rm QueryServerd.o QueryServerd

    首先执行 QueryServerd,

    forhappy@haiping-ict:/tmp/QueryServerd$ ./QueryServerd 
    Connected to zookeeper service successfully!

    然后执行 QueryServer,

    forhappy@haiping-ict:/tmp/QueryServer$ ./QueryServer 
    QueryServer is running...
    [[[QueryServer]]] Connected to zookeeper service successfully!
    [zoo_acreate]: rc = 0
        name = /QueryServer

    可见 Queryerver 创建了 /QueryServer 节点,5 秒后 QueryServer 模拟程序崩溃而退出,那么此时在 QueryServerd 端输出如下:

    Connected to zookeeper service successfully!
    QueryServer started... # QueryServerd 感知到 QueryServer 已正常启动.
    QueryServer gone away, restart now... # 5 秒钟后,QueryServer 崩溃,QueryServerd 准备重启 QueryServer.
    QueryServer is running... # QueryServer 正在运行,以下 3 行是 QueryServer 输出结果。
    [[[QueryServer]]] Connected to zookeeper service successfully!
    [zoo_acreate]: rc = 0
        name = /QueryServer
    QueryServer started... # QueryServerd 感知到 QueryServer 已正常启动.
    QueryServer gone away, restart now...# 又过了 5 秒钟后,QueryServer 崩溃,QueryServerd 准备重启 QueryServer.
    QueryServer is running... # QueryServer 再次运行,以下 3 行是 QueryServer 输出结果。
    [[[QueryServer]]] Connected to zookeeper service successfully!
    [zoo_acreate]: rc = 0
        name = /QueryServer
    QueryServer started... # QueryServerd 再次感知到 QueryServer 已正常启动,如此反复.
    QueryServer gone away, restart now...
    QueryServer is running...
    [[[QueryServer]]] Connected to zookeeper service successfully!
    [zoo_acreate]: rc = 0
        name = /QueryServer
    QueryServer started...

    即 QueryServer 每 5 秒钟崩溃一次,然后又被 QueryServerd 重启,模拟了上面的应用场景。

    好了 Zookeeper C API 的应用小示例讲完了,可能应用场景选取的不好,不过大致可以一些说明问题吧,如果你想看 Zookeeper 更贴近现实的应用场景,可以参考淘宝的一篇文章《ZooKeeper典型应用场景一览》和 IBM developerWorks 的一篇博文《分布式服务框架 Zookeeper -- 管理分布式环境中的数据》。

  • 相关阅读:
    设计模式----单例模式
    C++ 派生类
    C++ 操作符
    构造,清理,拷贝和移动
    php的yii框架开发总结10
    php的yii框架开发总结9
    php的yii框架开发总结8
    php的yii框架开发总结7
    php的yii框架开发总结6
    php的yii框架开发总结5
  • 原文地址:https://www.cnblogs.com/haippy/p/2924570.html
Copyright © 2011-2022 走看看