zoukankan      html  css  js  c++  java
  • Curator的cluster,实现多节点数据共享

    模拟两个客户端,实现多节点数据共享

     1 package bjsxt.curator.cluster;
     2 
     3 import org.apache.curator.RetryPolicy;
     4 import org.apache.curator.framework.CuratorFramework;
     5 import org.apache.curator.framework.CuratorFrameworkFactory;
     6 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
     7 import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
     8 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
     9 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
    10 import org.apache.curator.retry.ExponentialBackoffRetry;
    11 import org.apache.zookeeper.CreateMode;
    12 
    13 public class CuratorWatcher {
    14 
    15     /** 父节点path */
    16     static final String PARENT_PATH = "/super";
    17 
    18     /** zookeeper服务器地址 */
    19     public static final String CONNECT_ADDR = "192.168.2.2:2181";
    20     /** 定义session失效时间 */
    21 
    22     public static final int SESSION_TIMEOUT = 30000;
    23 
    24     public CuratorWatcher() throws Exception {
    25         // 1 重试策略:初试时间为1s 重试10次
    26         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
    27         // 2 通过工厂创建连接
    28         CuratorFramework cf = CuratorFrameworkFactory.builder()
    29                 .connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_TIMEOUT)
    30                 .retryPolicy(retryPolicy).build();
    31         // 3 建立连接
    32         cf.start();
    33 
    34         // 4 创建跟节点
    35         if (cf.checkExists().forPath(PARENT_PATH) == null) {
    36             cf.create().withMode(CreateMode.PERSISTENT)
    37                     .forPath(PARENT_PATH, "super init".getBytes());
    38         }
    39 
    40         // 4 建立一个PathChildrenCache缓存,第三个参数为是否接受节点数据内容 如果为false则不接受
    41         PathChildrenCache cache = new PathChildrenCache(cf, PARENT_PATH, true);
    42         // 5 在初始化的时候就进行缓存监听
    43         cache.start(StartMode.POST_INITIALIZED_EVENT);
    44         cache.getListenable().addListener(new PathChildrenCacheListener() {
    45             /**
    46              * <B>方法名称:</B>监听子节点变更<BR>
    47              * <B>概要说明:</B>新建、修改、删除<BR>
    48              * 
    49              * @see org.apache.curator.framework.recipes.cache.PathChildrenCacheListener#childEvent(org.apache.curator.framework.CuratorFramework,
    50              *      org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent)
    51              */
    52             @Override
    53             public void childEvent(CuratorFramework cf,
    54                     PathChildrenCacheEvent event) throws Exception {
    55                 switch (event.getType()) {
    56                 case CHILD_ADDED:
    57                     System.out.println("CHILD_ADDED :"
    58                             + event.getData().getPath());
    59                     System.out.println("CHILD_ADDED :"
    60                             + new String(event.getData().getData()));
    61                     break;
    62                 case CHILD_UPDATED:
    63                     System.out.println("CHILD_UPDATED :"
    64                             + event.getData().getPath());
    65                     System.out.println("CHILD_UPDATED :"
    66                             + new String(event.getData().getData()));
    67                     break;
    68                 case CHILD_REMOVED:
    69                     System.out.println("CHILD_REMOVED :"
    70                             + event.getData().getPath());
    71                     System.out.println("CHILD_REMOVED :"
    72                             + new String(event.getData().getData()));
    73                     break;
    74                 default:
    75                     break;
    76                 }
    77             }
    78         });
    79     }
    80 
    81 }
    package bjsxt.curator.cluster;
    
    public class Client1 {
    
        public static void main(String[] args) throws Exception {
    
            CuratorWatcher watcher = new CuratorWatcher();
            System.out.println("c1 start...");
            Thread.sleep(100000000);
        }
    }
    package bjsxt.curator.cluster;
    
    public class Client2 {
    
        public static void main(String[] args) throws Exception {
    
            CuratorWatcher watcher = new CuratorWatcher();
            System.out.println("c2 start...");
            Thread.sleep(100000000);
        }
    }

    在写一个测试类,

     1 package bjsxt.curator.cluster;
     2 
     3 import org.apache.curator.RetryPolicy;
     4 import org.apache.curator.framework.CuratorFramework;
     5 import org.apache.curator.framework.CuratorFrameworkFactory;
     6 import org.apache.curator.retry.ExponentialBackoffRetry;
     7 import org.apache.zookeeper.CreateMode;
     8 
     9 public class Test {
    10 
    11     /** zookeeper地址 */
    12     static final String CONNECT_ADDR = "192.168.2.2:2181";
    13     /** session超时时间 */
    14     static final int SESSION_OUTTIME = 5000;// ms
    15 
    16     public static void main(String[] args) throws Exception {
    17 
    18         // 1 重试策略:初试时间为1s 重试10次
    19         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
    20         // 2 通过工厂创建连接
    21         CuratorFramework cf = CuratorFrameworkFactory.builder()
    22                 .connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_OUTTIME)
    23                 .retryPolicy(retryPolicy).build();
    24         // 3 开启连接
    25         cf.start();
    26 
    27         // Thread.sleep(3000);
    28         // System.out.println(cf.getChildren().forPath("/super").get(0));
    29 
    30         // 4 创建节点
    31         Thread.sleep(1000);
    32         cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
    33                 .forPath("/super/c1", "c1内容".getBytes());
    34         Thread.sleep(1000);
    35         cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
    36                 .forPath("/super/c2", "c2内容".getBytes());
    37         Thread.sleep(1000);
    38         //
    39         //
    40         //
    41         // //5 读取节点
    42         // Thread.sleep(1000);
    43         // String ret1 = new String(cf.getData().forPath("/super/c1"));
    44         // System.out.println(ret1);
    45         //
    46         //
    47         // //6 修改节点
    48         Thread.sleep(1000);
    49         cf.setData().forPath("/super/c2", "修改的新c2内容".getBytes());
    50         String ret2 = new String(cf.getData().forPath("/super/c2"));
    51         System.out.println(ret2);
    52         //
    53         //
    54         //
    55         // //7 删除节点
    56         // Thread.sleep(1000);
    57         // cf.delete().forPath("/super/c1");
    58 
    59     }
    60 }

    运行代码后,可以看到Client1和Client2,监听,获取到节点的数据变化。

  • 相关阅读:
    【crontab】误删crontab及其恢复
    New Concept English there (7)
    New Concept English there (6)
    New Concept English there (5)
    New Concept English there (4)
    New Concept English there (3)
    New Concept English there (2)Typing speed exercise
    New Concept English there (1)Typing speed exercise
    New Concept English Two 34 game over
    New Concept English Two 33 94
  • 原文地址:https://www.cnblogs.com/shmilyToHu/p/9120006.html
Copyright © 2011-2022 走看看