zoukankan      html  css  js  c++  java
  • zookeeper学习【3】服务发现

    服务发现:指对集群中的服务上下线做统一管理,每个工作服务器都可以作为数据的发布方,向集群注册自己的基本信息,而让某些监控服务器作为订阅方,订阅工作服务器的基本信息。当工作服务器的基本信息改变时,如服务上下线、服务器的角色或服务范围变更,那么监控服务器可以得到通知并响应这些变化。

    实现代码如下:

    import com.alibaba.fastjson.JSON;
    import org.I0Itec.zkclient.IZkDataListener;
    import org.I0Itec.zkclient.ZkClient;
    import org.I0Itec.zkclient.exception.ZkNoNodeException;
    
    /**
     * 代表工作服务器
     */
    public class WorkServer {
    
        private ZkClient zkClient;
        // ZooKeeper
        private String configPath;
        // ZooKeeper集群中servers节点的路径
        private String serversPath;
        // 当前工作服务器的基本信息
        private ServerData serverData;
        // 当前工作服务器的配置信息
        private ServerConfig serverConfig;
        private IZkDataListener dataListener;
    
        public WorkServer(String configPath, String serversPath,
                          ServerData serverData, ZkClient zkClient, ServerConfig initConfig) {
            this.zkClient = zkClient;
            this.serversPath = serversPath;
            this.configPath = configPath;
            this.serverConfig = initConfig;
            this.serverData = serverData;
    
            this.dataListener = new IZkDataListener() {
    
                public void handleDataDeleted(String dataPath) throws Exception {
    
                }
    
                public void handleDataChange(String dataPath, Object data)
                        throws Exception {
                    String retJson = new String((byte[])data);
                    ServerConfig serverConfigLocal = (ServerConfig) JSON.parseObject(retJson,ServerConfig.class);
                    updateConfig(serverConfigLocal);
                    System.out.println("new Work server config is:"+serverConfig.toString());
    
                }
            };
    
        }
    
        // 启动服务器
        public void start() {
            System.out.println("work server start...");
            initRunning();
        }
    
        // 停止服务器
        public void stop() {
            System.out.println("work server stop...");
            zkClient.unsubscribeDataChanges(configPath, dataListener); // 取消监听config节点
        }
    
        // 服务器初始化
        private void initRunning() {
            registMe(); // 注册自己
            zkClient.subscribeDataChanges(configPath, dataListener); // 订阅config节点的改变事件
        }
    
        // 启动时向zookeeper注册自己的注册函数
        private void registMe() {
            String mePath = serversPath.concat("/").concat(serverData.getAddress());
    
            try {
                zkClient.createEphemeral(mePath, JSON.toJSONString(serverData)
                        .getBytes());
            } catch (ZkNoNodeException e) {
                zkClient.createPersistent(serversPath, true);
                registMe();
            }
        }
    
        // 更新自己的配置信息
        private void updateConfig(ServerConfig serverConfig) {
            this.serverConfig = serverConfig;
        }
    
    }
    /**
     * 调度类
     */
    public class SubscribeZkClient {
    
        private static final int  CLIENT_QTY = 5; // Work Server数量
    
        private static final String  ZOOKEEPER_SERVER = "192.168.1.105:2181";
    
        private static final String  CONFIG_PATH = "/config";
        private static final String  COMMAND_PATH = "/command";
        private static final String  SERVERS_PATH = "/servers";
    
        public static void main(String[] args) throws Exception {
    
            List<ZkClient> clients = new ArrayList<ZkClient>();
            List<WorkServer>  workServers = new ArrayList<WorkServer>();
            ManageServer manageServer = null;
    
            try {
    
                // 创建一个默认的配置
                ServerConfig initConfig = new ServerConfig();
                initConfig.setDbPwd("123456");
                initConfig.setDbUrl("jdbc:mysql://localhost:3306/mydb");
                initConfig.setDbUser("root");
    
                // 实例化一个Manage Server
                ZkClient clientManage = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer());
                manageServer = new ManageServer(SERVERS_PATH, COMMAND_PATH,CONFIG_PATH,clientManage,initConfig);
                manageServer.start(); // 启动Manage Server
    
                // 创建指定个数的工作服务器
                for ( int i = 0; i < CLIENT_QTY; ++i ) {
                    ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer());
                    clients.add(client);
                    ServerData serverData = new ServerData();
                    serverData.setId(i);
                    serverData.setName("WorkServer#"+i);
                    serverData.setAddress("192.168.1."+i);
    
                    WorkServer  workServer = new WorkServer(CONFIG_PATH, SERVERS_PATH, serverData, client, initConfig);
                    workServers.add(workServer);
                    workServer.start();    // 启动工作服务器
    
                }
    
                System.out.println("敲回车键退出!
    ");
                new BufferedReader(new InputStreamReader(System.in)).readLine();
    
            } finally {
                System.out.println("Shutting down...");
    
                for ( WorkServer workServer : workServers ) {
                    try {
                        workServer.stop();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
    
                for ( ZkClient client : clients ) {
                    try {
                        client.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
    
                }
            }
        }
    
    }
    /**
     * 服务器基本信息
     */
    public class ServerData {
    
        private String address;
        private Integer id;
        private String name;
    
        public String getAddress() {
            return address;
        }
        public void setAddress(String address) {
            this.address = address;
        }
        public Integer getId() {
            return id;
        }
        public void setId(Integer id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
    
        @Override
        public String toString() {
            return "ServerData [address=" + address + ", id=" + id + ", name="
                    + name + "]";
        }
    
    }
    /**
     * 配置信息
     */
    public class ServerConfig {
    
        private String dbUrl;
        private String dbPwd;
        private String dbUser;
        public String getDbUrl() {
            return dbUrl;
        }
        public void setDbUrl(String dbUrl) {
            this.dbUrl = dbUrl;
        }
        public String getDbPwd() {
            return dbPwd;
        }
        public void setDbPwd(String dbPwd) {
            this.dbPwd = dbPwd;
        }
        public String getDbUser() {
            return dbUser;
        }
        public void setDbUser(String dbUser) {
            this.dbUser = dbUser;
        }
    
        @Override
        public String toString() {
            return "ServerConfig [dbUrl=" + dbUrl + ", dbPwd=" + dbPwd
                    + ", dbUser=" + dbUser + "]";
        }
    
    }
    import com.alibaba.fastjson.JSON;
    import org.I0Itec.zkclient.IZkChildListener;
    import org.I0Itec.zkclient.IZkDataListener;
    import org.I0Itec.zkclient.ZkClient;
    import org.I0Itec.zkclient.exception.ZkNoNodeException;
    import org.I0Itec.zkclient.exception.ZkNodeExistsException;
    
    import java.util.List;
    
    public class ManageServer {
    
        // zookeeper的servers节点路径
        private String serversPath;
        // zookeeper的command节点路径
        private String commandPath;
        // zookeeper的config节点路径
        private String configPath;
        private ZkClient zkClient;
        private ServerConfig config;
        // 用于监听servers节点的子节点列表的变化
        private IZkChildListener childListener;
        // 用于监听command节点数据内容的变化
        private IZkDataListener dataListener;
        // 工作服务器的列表
        private List<String> workServerList;
    
        public ManageServer(String serversPath, String commandPath,
                            String configPath, ZkClient zkClient, ServerConfig config) {
            this.serversPath = serversPath;
            this.commandPath = commandPath;
            this.zkClient = zkClient;
            this.config = config;
            this.configPath = configPath;
            this.childListener = new IZkChildListener() {
    
                public void handleChildChange(String parentPath,
                                              List<String> currentChilds) throws Exception {
                    // TODO Auto-generated method stub
                    workServerList = currentChilds; // 更新内存中工作服务器列表
    
                    System.out.println("work server list changed, new list is ");
                    execList();
    
                }
            };
            this.dataListener = new IZkDataListener() {
    
                public void handleDataDeleted(String dataPath) throws Exception {
                    // TODO Auto-generated method stub
                    // ignore;
                }
    
                public void handleDataChange(String dataPath, Object data)
                        throws Exception {
                    // TODO Auto-generated method stub
                    String cmd = new String((byte[]) data);
                    System.out.println("cmd:"+cmd);
                    exeCmd(cmd); // 执行命令
    
                }
            };
    
        }
    
        private void initRunning() {
            zkClient.subscribeDataChanges(commandPath, dataListener);
            zkClient.subscribeChildChanges(serversPath, childListener);
        }
    
        /*
         * 1: list 2: create 3: modify
         */
        private void exeCmd(String cmdType) {
            if ("list".equals(cmdType)) {
                execList();
    
            } else if ("create".equals(cmdType)) {
                execCreate();
            } else if ("modify".equals(cmdType)) {
                execModify();
            } else {
                System.out.println("error command!" + cmdType);
            }
    
        }
    
        // 列出工作服务器列表
        private void execList() {
            System.out.println(workServerList.toString());
        }
    
        // 创建config节点
        private void execCreate() {
            if (!zkClient.exists(configPath)) {
                try {
                    zkClient.createPersistent(configPath, JSON.toJSONString(config)
                            .getBytes());
                } catch (ZkNodeExistsException e) {
                    zkClient.writeData(configPath, JSON.toJSONString(config)
                            .getBytes()); // config节点已经存在,则写入内容就可以了
                } catch (ZkNoNodeException e) {
                    String parentDir = configPath.substring(0,
                            configPath.lastIndexOf('/'));
                    zkClient.createPersistent(parentDir, true);
                    execCreate();
                }
            }
        }
    
        // 修改config节点内容
        private void execModify() {
            // 我们随意修改config的一个属性就可以了
            config.setDbUser(config.getDbUser() + "_modify");
    
            try {
                zkClient.writeData(configPath, JSON.toJSONString(config).getBytes());
            } catch (ZkNoNodeException e) {
                execCreate(); // 写入时config节点还未存在,则创建它
            }
        }
    
        // 启动工作服务器
        public void start() {
            initRunning();
        }
    
        // 停止工作服务器
        public void stop() {
            zkClient.unsubscribeChildChanges(serversPath, childListener);
            zkClient.unsubscribeDataChanges(commandPath, dataListener);
        }
    
    }
  • 相关阅读:
    浮动清除
    解剖JavaScript中的null和undefined【转】
    关于innerHTML以及html2dom
    javascript 作用域
    4390. 【GDOI2016模拟3.16】图计数 (Standard IO)
    5049. 【GDOI2017模拟一试4.11】腐女的生日
    4273_NOIP2015模拟10.28B组_圣章-精灵使的魔法语
    jzoj_5631_(NOI2018模拟4.5)_A
    jzoj_1001_最难的问题_Floyd
    jzoj_3385_黑魔法师之门
  • 原文地址:https://www.cnblogs.com/tinyj/p/10029130.html
Copyright © 2011-2022 走看看