zoukankan      html  css  js  c++  java
  • Curator的cluster,实现多节点数据共享

    模拟两个客户端,实现多节点数据共享

     1 package bjsxt.curator.cluster;
     2 
     3 import org.apache.curator.RetryPolicy;
     4 import org.apache.curator.framework.CuratorFramework;
     5 import org.apache.curator.framework.CuratorFrameworkFactory;
     6 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
     7 import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
     8 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
     9 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
    10 import org.apache.curator.retry.ExponentialBackoffRetry;
    11 import org.apache.zookeeper.CreateMode;
    12 
    13 public class CuratorWatcher {
    14 
    15     /** 父节点path */
    16     static final String PARENT_PATH = "/super";
    17 
    18     /** zookeeper服务器地址 */
    19     public static final String CONNECT_ADDR = "192.168.2.2:2181";
    20     /** 定义session失效时间 */
    21 
    22     public static final int SESSION_TIMEOUT = 30000;
    23 
    24     public CuratorWatcher() throws Exception {
    25         // 1 重试策略:初试时间为1s 重试10次
    26         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
    27         // 2 通过工厂创建连接
    28         CuratorFramework cf = CuratorFrameworkFactory.builder()
    29                 .connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_TIMEOUT)
    30                 .retryPolicy(retryPolicy).build();
    31         // 3 建立连接
    32         cf.start();
    33 
    34         // 4 创建跟节点
    35         if (cf.checkExists().forPath(PARENT_PATH) == null) {
    36             cf.create().withMode(CreateMode.PERSISTENT)
    37                     .forPath(PARENT_PATH, "super init".getBytes());
    38         }
    39 
    40         // 4 建立一个PathChildrenCache缓存,第三个参数为是否接受节点数据内容 如果为false则不接受
    41         PathChildrenCache cache = new PathChildrenCache(cf, PARENT_PATH, true);
    42         // 5 在初始化的时候就进行缓存监听
    43         cache.start(StartMode.POST_INITIALIZED_EVENT);
    44         cache.getListenable().addListener(new PathChildrenCacheListener() {
    45             /**
    46              * <B>方法名称:</B>监听子节点变更<BR>
    47              * <B>概要说明:</B>新建、修改、删除<BR>
    48              * 
    49              * @see org.apache.curator.framework.recipes.cache.PathChildrenCacheListener#childEvent(org.apache.curator.framework.CuratorFramework,
    50              *      org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent)
    51              */
    52             @Override
    53             public void childEvent(CuratorFramework cf,
    54                     PathChildrenCacheEvent event) throws Exception {
    55                 switch (event.getType()) {
    56                 case CHILD_ADDED:
    57                     System.out.println("CHILD_ADDED :"
    58                             + event.getData().getPath());
    59                     System.out.println("CHILD_ADDED :"
    60                             + new String(event.getData().getData()));
    61                     break;
    62                 case CHILD_UPDATED:
    63                     System.out.println("CHILD_UPDATED :"
    64                             + event.getData().getPath());
    65                     System.out.println("CHILD_UPDATED :"
    66                             + new String(event.getData().getData()));
    67                     break;
    68                 case CHILD_REMOVED:
    69                     System.out.println("CHILD_REMOVED :"
    70                             + event.getData().getPath());
    71                     System.out.println("CHILD_REMOVED :"
    72                             + new String(event.getData().getData()));
    73                     break;
    74                 default:
    75                     break;
    76                 }
    77             }
    78         });
    79     }
    80 
    81 }
    package bjsxt.curator.cluster;
    
    public class Client1 {
    
        public static void main(String[] args) throws Exception {
    
            CuratorWatcher watcher = new CuratorWatcher();
            System.out.println("c1 start...");
            Thread.sleep(100000000);
        }
    }
    package bjsxt.curator.cluster;
    
    public class Client2 {
    
        public static void main(String[] args) throws Exception {
    
            CuratorWatcher watcher = new CuratorWatcher();
            System.out.println("c2 start...");
            Thread.sleep(100000000);
        }
    }

    在写一个测试类,

     1 package bjsxt.curator.cluster;
     2 
     3 import org.apache.curator.RetryPolicy;
     4 import org.apache.curator.framework.CuratorFramework;
     5 import org.apache.curator.framework.CuratorFrameworkFactory;
     6 import org.apache.curator.retry.ExponentialBackoffRetry;
     7 import org.apache.zookeeper.CreateMode;
     8 
     9 public class Test {
    10 
    11     /** zookeeper地址 */
    12     static final String CONNECT_ADDR = "192.168.2.2:2181";
    13     /** session超时时间 */
    14     static final int SESSION_OUTTIME = 5000;// ms
    15 
    16     public static void main(String[] args) throws Exception {
    17 
    18         // 1 重试策略:初试时间为1s 重试10次
    19         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
    20         // 2 通过工厂创建连接
    21         CuratorFramework cf = CuratorFrameworkFactory.builder()
    22                 .connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_OUTTIME)
    23                 .retryPolicy(retryPolicy).build();
    24         // 3 开启连接
    25         cf.start();
    26 
    27         // Thread.sleep(3000);
    28         // System.out.println(cf.getChildren().forPath("/super").get(0));
    29 
    30         // 4 创建节点
    31         Thread.sleep(1000);
    32         cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
    33                 .forPath("/super/c1", "c1内容".getBytes());
    34         Thread.sleep(1000);
    35         cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
    36                 .forPath("/super/c2", "c2内容".getBytes());
    37         Thread.sleep(1000);
    38         //
    39         //
    40         //
    41         // //5 读取节点
    42         // Thread.sleep(1000);
    43         // String ret1 = new String(cf.getData().forPath("/super/c1"));
    44         // System.out.println(ret1);
    45         //
    46         //
    47         // //6 修改节点
    48         Thread.sleep(1000);
    49         cf.setData().forPath("/super/c2", "修改的新c2内容".getBytes());
    50         String ret2 = new String(cf.getData().forPath("/super/c2"));
    51         System.out.println(ret2);
    52         //
    53         //
    54         //
    55         // //7 删除节点
    56         // Thread.sleep(1000);
    57         // cf.delete().forPath("/super/c1");
    58 
    59     }
    60 }

    运行代码后,可以看到Client1和Client2,监听,获取到节点的数据变化。

  • 相关阅读:
    good course Very
    HTTP Hypertext Transfer Protocol Overview
    Linux下tar.xz结尾的文件的解压方法
    原来java HttpURLConnection本身就提供了chunk的支持,又是让人一惊啊
    牛人一枚
    V2EX › 花了3个晚上,把readability最新的1.7.1转成了python版的
    13.11. 惯例优先原则(convention over configuration)
    明尼苏达大学
    快速构建实时抓取集群 « 搜索技术博客-淘宝
    Java Practices > Home
  • 原文地址:https://www.cnblogs.com/shmilyToHu/p/9120006.html
Copyright © 2011-2022 走看看