zoukankan      html  css  js  c++  java
  • Zookeeper学习(八):Zookeeper的数据发布与订阅模式

     http://blog.csdn.net/ZuoAnYinXiang/article/category/6104448

    1.发布订阅的基本概念

     

         1.发布订阅模式可以看成一对多的关系:多个订阅者对象同时监听一个主题对象,这个主题对象在自身状态发生变化时,会通知所有的订阅者对象,使他们能够自动的更新自己的状态。
     
         2.发布订阅模式,可以让发布方和订阅方,独立封装,独立改变,当一个对象的改变,需要同时改变其他的对象,而且它不知道有多少个对象需要改变时,可以使用发布订阅模式
     
        3.发布订阅模式在分布式系统的典型应用有, 配置管理和服务发现。
               配置管理:是指如果集群中机器拥有某些相同的配置,并且这些配置信息需要动态的改变,我们可以使用发布订阅模式,对配置文件做统一的管理,让这些机器各       自订阅配置文件的改变,当配置文件发生改变的时候这些机器就会得到通知,把自己的配置文件更新为最新的配置
        

                服务发现:是指对集群中的服务上下线做统一的管理,每个工作服务器都可以作为数据的发布方,向集群注册自己的基本信息,而让模型机器作为订阅方,订阅工       作服务器的基本信息,当工作服务器的基本信息发生改变时如上下线,服务器的角色和服务范围变更,监控服务器就会得到通知,并响应这些变化。

     

    2.发布订阅的架构


    3.Manager Server的工作流程

    4.Work Server的工作流程

    5.发布订阅程序的结构图

     

     

     

     

    6.程序代码实现


    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. package com.zk.subscribe;  
    2.   
    3. import java.io.BufferedReader;  
    4. import java.io.InputStreamReader;  
    5. import java.util.ArrayList;  
    6. import java.util.List;  
    7.   
    8. import org.I0Itec.zkclient.ZkClient;  
    9. import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;  
    10.   
    11.   
    12. public class SubscribeZkClient {  
    13.           
    14.         //需要多少个workserver  
    15.         private static final int  CLIENT_QTY = 5;  
    16.   
    17.         private static final String  ZOOKEEPER_SERVER = "192.168.30.164:2181,192.168.30.165:2181,192.168.30.166:2181";  
    18.         //节点的路径  
    19.         private static final String  CONFIG_PATH = "/config";//配置节点  
    20.         private static final String  COMMAND_PATH = "/command";//命令节点  
    21.         private static final String  SERVERS_PATH = "/servers";//服务器列表节点  
    22.              
    23.         public static void main(String[] args) throws Exception  
    24.         {  
    25.             //用来存储所有的clients  
    26.             List<ZkClient>  clients = new ArrayList<ZkClient>();  
    27.             //用来存储所有的workservers  
    28.             List<WorkServer>  workServers = new ArrayList<WorkServer>();  
    29.             ManagerServer manageServer = null;  
    30.   
    31.             try  
    32.             {  
    33.                 ServerConfig initConfig = new ServerConfig();  
    34.                 initConfig.setDbPwd("123456");  
    35.                 initConfig.setDbUrl("jdbc:mysql://localhost:3306/mydb");  
    36.                 initConfig.setDbUser("root");  
    37.                   
    38.                 ZkClient clientManage = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer());  
    39.                 manageServer = new ManagerServer(SERVERS_PATH, COMMAND_PATH,CONFIG_PATH,clientManage,initConfig);  
    40.                 manageServer.start();  
    41.                   
    42.                 //根据定义的work服务个数,创建服务器后注册,然后启动  
    43.                 for ( int i = 0; i < CLIENT_QTY; ++i )  
    44.                 {  
    45.                     ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer());  
    46.                     clients.add(client);  
    47.                     ServerData serverData = new ServerData();  
    48.                     serverData.setId(i);  
    49.                     serverData.setName("WorkServer#"+i);  
    50.                     serverData.setAddress("192.168.1."+i);  
    51.   
    52.                     WorkServer  workServer = new WorkServer(CONFIG_PATH, SERVERS_PATH, serverData, client, initConfig);  
    53.                     workServers.add(workServer);  
    54.                     workServer.start();                   
    55.                       
    56.                 }                 
    57.                 System.out.println("敲回车键退出! ");  
    58.                 new BufferedReader(new InputStreamReader(System.in)).readLine();  
    59.                   
    60.             }finally{  
    61.                 //将workserver和client给关闭  
    62.                   
    63.                 System.out.println("Shutting down...");  
    64.   
    65.                 for ( WorkServer workServer : workServers )  
    66.                 {  
    67.                     try {  
    68.                         workServer.stop();  
    69.                     } catch (Exception e) {  
    70.                         e.printStackTrace();  
    71.                     }                 
    72.                 }  
    73.                 for ( ZkClient client : clients )  
    74.                 {  
    75.                     try {  
    76.                         client.close();  
    77.                     } catch (Exception e) {  
    78.                         e.printStackTrace();  
    79.                     }  
    80.                       
    81.                 }  
    82.             }  
    83.         }     
    84.   
    85. }  
    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. package com.zk.subscribe;  
    2.   
    3. import java.util.List;  
    4.   
    5. import org.I0Itec.zkclient.IZkChildListener;  
    6. import org.I0Itec.zkclient.IZkDataListener;  
    7. import org.I0Itec.zkclient.ZkClient;  
    8. import org.I0Itec.zkclient.exception.ZkNoNodeException;  
    9. import org.I0Itec.zkclient.exception.ZkNodeExistsException;  
    10.   
    11. import com.alibaba.fastjson.JSON;  
    12.   
    13. public class ManagerServer {  
    14.       
    15.     private String serversPath;  
    16.     private String commandPath;  
    17.     private String configPath;  
    18.     private ZkClient zkClient;  
    19.     private ServerConfig config;  
    20.     //用于监听zookeeper中servers节点的子节点列表变化  
    21.     private IZkChildListener childListener;  
    22.     //用于监听zookeeper中command节点的数据变化  
    23.     private IZkDataListener dataListener;  
    24.     //工作服务器的列表  
    25.     private List<String> workServerList;  
    26.   
    27.     /** 
    28.      *  
    29.      * @param serversPath 
    30.      * @param commandPath Zookeeper中存放命令的节点路径 
    31.      * @param configPath 
    32.      * @param zkClient 
    33.      * @param config 
    34.      */  
    35.     public ManagerServer(String serversPath, String commandPath,String configPath, ZkClient zkClient, ServerConfig config) {  
    36.         this.serversPath = serversPath;  
    37.         this.commandPath = commandPath;  
    38.         this.zkClient = zkClient;  
    39.         this.config = config;  
    40.         this.configPath = configPath;  
    41.         this.childListener = new IZkChildListener() {  
    42.             //用于监听zookeeper中servers节点的子节点列表变化  
    43.             public void handleChildChange(String parentPath,List<String> currentChilds) throws Exception {  
    44.                 //更新服务器列表  
    45.                 workServerList = currentChilds;  
    46.                   
    47.                 System.out.println("work server list changed, new list is ");  
    48.                 execList();  
    49.   
    50.             }  
    51.         };  
    52.           
    53.         //用于监听zookeeper中command节点的数据变化  
    54.         this.dataListener = new IZkDataListener() {  
    55.   
    56.             public void handleDataDeleted(String dataPath) throws Exception {  
    57.               
    58.             }  
    59.   
    60.             public void handleDataChange(String dataPath, Object data)  
    61.                     throws Exception {  
    62.                   
    63.                 String cmd = new String((byte[]) data);  
    64.                 System.out.println("cmd:"+cmd);  
    65.                 exeCmd(cmd);  
    66.   
    67.             }  
    68.         };  
    69.   
    70.     }  
    71.       
    72.     public void start() {  
    73.         initRunning();  
    74.     }  
    75.   
    76.     public void stop() {  
    77.         //取消订阅command节点数据变化和servers节点的列表变化  
    78.         zkClient.unsubscribeChildChanges(serversPath, childListener);  
    79.         zkClient.unsubscribeDataChanges(commandPath, dataListener);  
    80.     }  
    81.       
    82.     /** 
    83.      * 初始化 
    84.      */  
    85.     private void initRunning() {  
    86.         //执行订阅command节点数据变化和servers节点的列表变化  
    87.         zkClient.subscribeDataChanges(commandPath, dataListener);  
    88.         zkClient.subscribeChildChanges(serversPath, childListener);  
    89.       
    90.     }  
    91.       
    92.   
    93.     /* 
    94.      * 执行控制命令的函数 
    95.      * 1: list 2: create 3: modify 
    96.      */  
    97.     private void exeCmd(String cmdType) {  
    98.         if ("list".equals(cmdType)) {  
    99.             execList();  
    100.   
    101.         } else if ("create".equals(cmdType)) {  
    102.             execCreate();  
    103.         } else if ("modify".equals(cmdType)) {  
    104.             execModify();  
    105.         } else {  
    106.             System.out.println("error command!" + cmdType);  
    107.         }  
    108.   
    109.     }  
    110.   
    111.       
    112.     private void execList() {  
    113.       
    114.         System.out.println(workServerList.toString());  
    115.     }  
    116.   
    117.     private void execCreate() {  
    118.         if (!zkClient.exists(configPath)) {  
    119.             try {  
    120.                   
    121.                 zkClient.createPersistent(configPath, JSON.toJSONString(config).getBytes());  
    122.               
    123.             } catch (ZkNodeExistsException e) {  
    124.                 //节点已经存在异常,直接写入数据  
    125.                 zkClient.writeData(configPath, JSON.toJSONString(config).getBytes());  
    126.             } catch (ZkNoNodeException e) {  
    127.                 //表示其中的一个节点的父节点还没有被创建  
    128.                 String parentDir = configPath.substring(0,configPath.lastIndexOf('/'));  
    129.                 zkClient.createPersistent(parentDir, true);  
    130.                 execCreate();  
    131.                   
    132.             }  
    133.         }  
    134.     }  
    135.   
    136.     private void execModify() {  
    137.         config.setDbUser(config.getDbUser() + "_modify");  
    138.   
    139.         try {  
    140.             //回写到zookeeper中  
    141.             zkClient.writeData(configPath, JSON.toJSONString(config).getBytes());  
    142.         } catch (ZkNoNodeException e) {  
    143.             execCreate();  
    144.         }  
    145.     }  
    146.       
    147.       
    148. }  
    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. package com.zk.subscribe;  
    2. import org.I0Itec.zkclient.IZkChildListener;  
    3. import org.I0Itec.zkclient.IZkDataListener;  
    4. import org.I0Itec.zkclient.ZkClient;  
    5. import org.I0Itec.zkclient.exception.ZkNoNodeException;  
    6. import org.I0Itec.zkclient.exception.ZkNodeExistsException;  
    7.   
    8. import com.alibaba.fastjson.JSON;  
    9. import com.alibaba.fastjson.JSONObject;  
    10.   
    11. /** 
    12.  * 代表工作服务器 
    13.  * workServer服务器的信息 
    14.  * 
    15.  */  
    16. public class WorkServer{  
    17.     private String serversPath;  
    18.     private String configPath;  
    19.     private ZkClient zkClient;  
    20.     private ServerConfig config;  
    21.     private ServerData serverData;  
    22.       
    23.     private IZkDataListener dataListener;//数据监听器  
    24.       
    25.       
    26.     /** 
    27.      *  
    28.      * @param configPath 代表config节点的路径 
    29.      * @param serversPath 代表servers节点的路径 
    30.      * @param serverData   代表当前服务器的基本信息 
    31.      * @param zkClient     底层与zookeeper集群通信的组件 
    32.      * @param initconfig   当前服务器的初始配置 
    33.      */  
    34.     public WorkServer(String configPath,String serversPath,ServerData serverData,ZkClient zkClient, ServerConfig initconfig){  
    35.           
    36.         this.configPath = configPath;  
    37.         this.serversPath = serversPath;  
    38.         this.serverData = serverData;  
    39.         this.zkClient = zkClient;  
    40.         this.config = initconfig;  
    41.           
    42.         /** 
    43.          * dataListener 用于监听config节点的数据改变 
    44.          */  
    45.         this.dataListener = new IZkDataListener() {  
    46.               
    47.             public void handleDataDeleted(String arg0) throws Exception {  
    48.                   
    49.             }  
    50.               
    51.             /** 
    52.              * 当数据的值改变时处理的 
    53.              * Object data,这个data是将ServerConfig对象转成json字符串存入 
    54.              * 可以通过参数中的Object data 拿到当前数据节点最新的配置信息 
    55.              * 拿到这个data信息后将它反序列化成ServerConfig对象,然后更新到自己的serverconfig属性中 
    56.              */  
    57.             public void handleDataChange(String dataPath, Object data) throws Exception {  
    58.                 String retJson = new String((byte[])data);  
    59.                 ServerConfig serverConfigLocal = (ServerConfig)JSON.parseObject(retJson,ServerConfig.class);  
    60.                 //更新配置  
    61.                 updateConfig(serverConfigLocal);  
    62.                 System.out.println("new work server config is:"+serverConfigLocal.toString());  
    63.             }  
    64.         };  
    65.           
    66.     }  
    67.       
    68.     /** 
    69.      * 服务的启动 
    70.      */  
    71.     public void start(){  
    72.         System.out.println("work server start...");  
    73.         initRunning();  
    74.     }  
    75.       
    76.     /** 
    77.      * 服务的停止 
    78.      */  
    79.     public void stop(){  
    80.         System.out.println("work server stop...");  
    81.         //取消监听  
    82.         zkClient.unsubscribeDataChanges(configPath, dataListener);  
    83.       
    84.     }  
    85.       
    86.     /** 
    87.      * 服务器的初始化 
    88.      */  
    89.     private void initRunning(){  
    90.         registMeToZookeeper();  
    91.         //订阅config节点的改变  
    92.         zkClient.subscribeDataChanges(configPath, dataListener);  
    93.     }  
    94.       
    95.     /** 
    96.      * 启动时向zookeeper注册自己 
    97.      */  
    98.     private void registMeToZookeeper(){  
    99.         //向zookeeper中注册自己的过程其实就是向servers节点下注册一个临时节点  
    100.         //构造临时节点  
    101.         String mePath = serversPath.concat("/").concat(serverData.getAddress());  
    102.         try{  
    103.             //存入是将json序列化  
    104.             zkClient.createEphemeral(mePath, JSON.toJSONString(serverData).getBytes());   
    105.         } catch (ZkNoNodeException e) {  
    106.             //父节点不存在  
    107.             zkClient.createPersistent(serversPath, true);  
    108.             registMeToZookeeper();  
    109.         }  
    110.           
    111.     }  
    112.       
    113.     /** 
    114.      * 当监听到zookeeper中config节点的配置信息改变时,要读取配置信息来更新自己的配置信息 
    115.      */  
    116.     private void updateConfig(ServerConfig serverConfig){  
    117.         this.config = serverConfig;  
    118.     }  
    119.       
    120. }  
    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. package com.zk.subscribe;  
    2.   
    3. /** 
    4.  * 用于记录WorkServer(工作服务器)的配置信息 
    5.  */  
    6. public class ServerConfig {  
    7.     private String dbUrl;  
    8.     private String dbPwd;  
    9.     private String dbUser;  
    10.     public String getDbUrl() {  
    11.         return dbUrl;  
    12.     }  
    13.     public void setDbUrl(String dbUrl) {  
    14.         this.dbUrl = dbUrl;  
    15.     }  
    16.     public String getDbPwd() {  
    17.         return dbPwd;  
    18.     }  
    19.     public void setDbPwd(String dbPwd) {  
    20.         this.dbPwd = dbPwd;  
    21.     }  
    22.     public String getDbUser() {  
    23.         return dbUser;  
    24.     }  
    25.     public void setDbUser(String dbUser) {  
    26.         this.dbUser = dbUser;  
    27.     }  
    28.     @Override  
    29.     public String toString() {  
    30.         return "ServerConfig [dbUrl=" + dbUrl + ", dbPwd=" + dbPwd + ", dbUser=" + dbUser + "]";  
    31.     }  
    32.       
    33.       
    34. }  
    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. package com.zk.subscribe;  
    2. /** 
    3.  * 用于记录WorkServer(工作服务器)的基本信息 
    4.  */  
    5. public class ServerData {  
    6.     private String address;  
    7.     private Integer id;  
    8.     private String name;  
    9.     public String getAddress() {  
    10.         return address;  
    11.     }  
    12.     public void setAddress(String address) {  
    13.         this.address = address;  
    14.     }  
    15.     public Integer getId() {  
    16.         return id;  
    17.     }  
    18.     public void setId(Integer id) {  
    19.         this.id = id;  
    20.     }  
    21.     public String getName() {  
    22.         return name;  
    23.     }  
    24.     public void setName(String name) {  
    25.         this.name = name;  
    26.     }  
    27.       
    28.     @Override  
    29.     public String toString() {  
    30.         return "ServerData [address=" + address + ", id=" + id + ", name=" + name + "]";  
    31.     }  
    32.       
    33.       
    34. }  


    启动SubscribeZkClient


    在zookeeper客户端上输出命令

    managerServer订阅了commod的变化后,输出变化


     

     

     

     

     

     

     

     

  • 相关阅读:
    spring mvc+ELK从头开始搭建日志平台
    java分布式系统开关功能设计(服务升降级)
    可伸缩性架构常用技术
    大众点评订单系统分库分表实践
    分布式缓存--系列1 -- Hash环/一致性Hash原理
    Netty原理剖析
    一个轻量级分布式 RPC 框架 — NettyRpc
    HDU 2583 permutation
    HDU 2573 Typing
    c语言中逗号运算符和逗号表达式
  • 原文地址:https://www.cnblogs.com/duanxz/p/3555888.html
Copyright © 2011-2022 走看看