zoukankan      html  css  js  c++  java
  • (原)3.2 Zookeeper应用

    本文为原创文章,转载请注明出处,谢谢

    数据的发布与订阅

    1、应用

      服务端监听数据改变,客户端创建/更新节点数据,客户端提供数据,服务端处理

    2、原理 

    • 客户端监控节点数据改变事件(例如配置信息,下图的config节点),启动时在服务器节点下创建临时节点(图中servers下节点)
    • 服务端监听工作服务器的子节点更新,触发自身存储的工作服务器列表,同时监听订阅节点的数据改变事件(下图中command节点)

    3、架构图

    • config:配置信息节点
    • servers:服务器列表父节点
    • command:数据订阅节点

    4、客户端流程图

    5、服务端流程图

     6、核心类关系图

    • SubscribeClient:模拟服务端、客户端启动
    • WorkServer:客户端
    • ManageServer:服务端
    • ServerConfig:配置信息
    • ServerData:server数据

    7、核心代码

    • WorkServer 监听
      public WorkServer(String serverPath, String configPath, final ServerData serverData, ServerConfig serverConfig, ZkClient zkClient) {
              this.serverPath = serverPath;
              this.configPath = configPath;
              this.serverData = serverData;
              this.serverConfig = serverConfig;
              this.zkClient = zkClient;
      
              dataListener = new IZkDataListener() {
                  @Override
                  public void handleDataChange(String s, Object o)  {
      
                      try{
                          String data = new String((byte[])o);
                          System.out.println(data);
                          ServerConfig config = (ServerConfig)JSON.parseObject(data,ServerConfig.class);
                          updateConfig(config);
                          System.out.println("server name:"+serverData.getName()+" update config:"+config.toString());
                      }catch (Exception e) {
                          e.printStackTrace();
                      }
      
                  }
      
                  @Override
                  public void handleDataDeleted(String s) throws Exception {
      
                  }
              };
          }
    • WorkServer 注册
      private void registerMe() {
              String myPath = serverPath.concat("/").concat(serverData.getAddress());
              try{
                  if(!zkClient.exists(myPath))
                      zkClient.createEphemeral(myPath,JSON.toJSONString(serverData).getBytes());
              }catch (ZkNoNodeException e ) {
                  zkClient.createPersistent(serverPath, true);
                  registerMe();
              }
          }

      ps:此操作是在servers节点下创建节点,需要servers节点已存在

    • ManageServer 监听
       public ManageServer(String serverPath, String configPath, String cmdPath, ServerConfig serverConfig, ZkClient zkClient) {
              this.serverPath = serverPath;
              this.configPath = configPath;
              this.cmdPath = cmdPath;
              this.serverConfig = serverConfig;
              this.zkClient = zkClient;
      
              childListener = new IZkChildListener() {
                  @Override
                  public void handleChildChange(String s, List<String> strings) throws Exception {
                      workServerList = strings;
                      System.out.println("----"+workServerList.toString());
                  }
              };
      
              dataListener = new IZkDataListener() {
                  @Override
                  public void handleDataChange(String s, Object o) throws Exception {
                      String cmd = new String((byte[])o);
                      System.out.println("cmd="+cmd);
                      exeCmd(cmd);
                  }
      
                  @Override
                  public void handleDataDeleted(String s) throws Exception {
      
                  }
              };
      
          }
      • childListener :监听Servers下的节点变化
      • dataListener :监听command节点的数据变化
    • ManageServer 执行操作
      /**
           *模拟命令:1、list 2、create 3、modify
           */
          private void exeCmd(String cmd) {
              if("list".equals(cmd)) {
                  System.out.println(workServerList.toString());
              }else if("create".equals(cmd)) {
                  exeCreate();
              }else if("modify".equals(cmd)) {
                  exeModify();
              }else {
                  System.out.println("this cmd can not exe");
              }
      
          }
    • SubscribeClient
      public class SubscribeClient {
          private static final int CLIENT_QTY =3;
          private static final String ZOOKEEPER_URL = "192.168.117.128:2181";
          private static final String SERVERPATH = "/servers";
          private static final String CONFIGPATH = "/config";
          private static final String CMDPATH = "/command";
      
          @Test
          public void testSubScribe() throws IOException {
              ServerConfig config = new ServerConfig("DBURL...","DBUSER...","DBPAW...");
              ZkClient zk = new ZkClient(ZOOKEEPER_URL,5000,5000,new BytesPushThroughSerializer());
              ManageServer manageServer = new ManageServer(SERVERPATH,CONFIGPATH,CMDPATH,config,zk);
              manageServer.start();
      
              for (int i = 0; i < CLIENT_QTY; i++) {
                  ZkClient zkq = new ZkClient(ZOOKEEPER_URL,5000,5000,new BytesPushThroughSerializer());
                  ServerData data = new ServerData("address"+i,i+"","name_"+i);
                  WorkServer server = new WorkServer(SERVERPATH,CONFIGPATH,data,config,zkq);
                  server.start();
              }
      
              new BufferedReader(new InputStreamReader(System.in)).readLine();
          }
      }
  • 相关阅读:
    UltraWebGrid多表头
    2009个人年度总结报告(IT)
    DevExpress分发
    AspxTreeList数据绑定以及模板和外观定制的运用
    每日一句英语:No problem, Mr. Smith. Anything else?
    “向程序发送命令时出现问题”的解决方法
    ASP常用进制转化类(2,8,10,16,32,64)
    我的分页用户控件(性能问题)
    研发的那些事2—设计之惑
    一个架构的演化2用ESB集成
  • 原文地址:https://www.cnblogs.com/shengkejava/p/5736923.html
Copyright © 2011-2022 走看看