zoukankan      html  css  js  c++  java
  • 基于Curator的Api使用及Leader选举实现

      Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。

    1.引入依赖:

    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>4.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>4.0.0</version>
    </dependency>
    

    2。 curator 简单操作实例

    public class CuratorDemo {
    
    	public static void main(String[] args) throws Exception {
    		CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() // 获取构造器
    				.connectString("192.168.254.135:2181," + "192.168.254.136:2181,192.168.254.137:2181") // 集群地址
    				.sessionTimeoutMs(4000) // session超时事件
    				.retryPolicy(new ExponentialBackoffRetry(1000, 3)) // 重试机制
    				.namespace("curator") // 命名构建 根节点
    				.build(); // 构造
    
    		curatorFramework.start();// 启动 连接
    
    		// 结果: /curator/mic/node1
    		// 原生api中,必须是逐层创建,也就是父节点必须存在,子节点才能创建
    		String path = curatorFramework.create() // 创建节点
    				.creatingParentsIfNeeded() // 如果需要创建父节点
    				.withMode(CreateMode.PERSISTENT) // 选择节点类型
    				.forPath("/wuzz/node1","1".getBytes()); // 路径及值
    		System.out.println(path);
    
    		Stat stat = new Stat();
    		curatorFramework.getData() // 获取
    				.storingStatIn(stat).forPath("/wuzz/node1");
    		System.out.println(stat);
    
    		curatorFramework.setData() // 修改
    				.withVersion(stat.getVersion()).forPath("/wuzz/node1", "xx".getBytes());
    
    		curatorFramework.delete()// 删除
    				.deletingChildrenIfNeeded().forPath("/mic/node1");
    
    		curatorFramework.close();// 关闭连接
    
    	}
    }
    

    监听事件实例:

    public class CuratorWatcherDemo {
    
        public static void main(String[] args) throws Exception {
        	
        	CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() // 获取构造器
    				.connectString("192.168.254.135:2181," + 
    						"192.168.254.136:2181,192.168.254.137:2181") // 集群地址
    				.sessionTimeoutMs(4000) // session超时事件
    				.retryPolicy(new ExponentialBackoffRetry(1000, 3)) // 重试机制
    				.namespace("curator") // 命名构建 根节点
    				.build(); // 构造
    
    		curatorFramework.start();// 启动 连接
            //当前节点的创建和删除事件监听 ---永久的
    //        addListenerWithNodeCache(curatorFramework,"/wuzz");
            //子节点的增加、修改、删除的事件监听
            addListenerWithPathChildCache(curatorFramework,"/wuzz");
            //综合节点监听事件
    //        addListenerWithTreeCache(curatorFramework,"/wuzz");
            System.in.read();
        }
    
        public static void addListenerWithTreeCache(CuratorFramework curatorFramework,String path) throws Exception {
            TreeCache treeCache=new TreeCache(curatorFramework,path);
            TreeCacheListener treeCacheListener=new TreeCacheListener() {
                @Override
                public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                    System.out.println(event.getType()+"->"+event.getData().getPath());
                }
            };
    
            treeCache.getListenable().addListener(treeCacheListener);
            treeCache.start();
        }
    
        /**
         * PathChildCache 监听一个节点下子节点的创建、删除、更新
         * NodeCache  监听一个节点的更新和创建事件
         * TreeCache  综合PatchChildCache和NodeCache的特性
         */
    
        public static void addListenerWithPathChildCache(CuratorFramework curatorFramework,String path) throws Exception {
            PathChildrenCache pathChildrenCache=new PathChildrenCache(curatorFramework,path,true);
    
            PathChildrenCacheListener pathChildrenCacheListener=new PathChildrenCacheListener() {
                @Override
                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                    System.out.println("Receive Event2:"+event.getType());
                }
            };
    
            pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
            pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL);
    
        }
    
        // 监听一个节点的更新,创建/wuzz节点事件
        public static void addListenerWithNodeCache(CuratorFramework curatorFramework,String path) throws Exception {
            final NodeCache nodeCache=new NodeCache(curatorFramework,path,false);
            NodeCacheListener nodeCacheListener=new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    System.out.println("Receive Event1:"+nodeCache.getCurrentData().getPath());
                }
            };
            nodeCache.getListenable().addListener(nodeCacheListener);
            nodeCache.start();
        }
    }
    

      简单实用,用起来会比原生的Api会方便许多。

    基于Curator的Leader选举:

      Curator提供了两种选举方案:Leader Latch 和 Leader Election。下面分别介绍这两种选举方案。

      Leader Latch:使用 Leader Latch 方案进行Master选举,系统将随机从候选者中选出一台作为 leader,直到调用 close() 释放leadship,此时再重新随机选举 leader,否则其他的候选者无法成为 leader。

      基本原理:选择一个根路径,例如"/leader_select",多个机器同时向该根路径下创建临时顺序节点,如"/leader_latch/0002","/leader_latch/0000","/leader_latch/0001",节点编号最小(这里为0000)的zk客户端成为leader,没抢到Leader的节点都监听前一个节点的删除事件,在前一个节点删除后进行重新抢主.可以看到节点变化类似:

       使用示例如下:

    public class LeaderLatchTest {
        private static final String zkServerIps = "192.168.1.101:2181";
        private static final String masterPath = "/testZK/leader_latch";
    
        public static void main(String[] args) {
            AtomicInteger atomicInteger = new AtomicInteger(1);
            try {//启动5个线程
                for (int i = 0; i < 5; i++) {
                    new Thread(new Runnable() {
                        @Override
                        public void run() {
                            // 创建客户端
                            CuratorFramework client = getClient();
                            try {
                                client.blockUntilConnected();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            System.out.println(client.getState());
                            int number = atomicInteger.getAndIncrement();
                            final LeaderLatch latch = new LeaderLatch(client, masterPath, "client#" + number);
                            System.out.println("创建客户端:" + latch.getId());
                            // LeaderLatch 添加监听事件
                            latch.addListener(new LeaderLatchListener() {
                                @Override//抢主成功时触发
                                public void isLeader() {
                                    System.out.println(latch.getId() + ": 我现在被选举为Leader!我开始工作了....");
                                    try {//放弃leader。让剩下来的服务重新选主
                                        latch.close();
                                    } catch (IOException e) {
                                        e.printStackTrace();
                                    }
                                }
    
                                @Override//不会执行
                                public void notLeader() {
                                }
                            });
                            try {
                                latch.start();
                                System.in.read();
                            } catch (Exception e) {
                                System.out.println(e.getMessage());
                            } finally {
                                System.out.println("客户端 " + latch.getId() + " 关闭");
                                CloseableUtils.closeQuietly(latch);
                                CloseableUtils.closeQuietly(client);
                            }
                        }
                    }).start();
                }
    //            countDownLatch.await(); // 等待,只有所有线程都退出
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        private static synchronized CuratorFramework getClient() {
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString(zkServerIps)
                    .sessionTimeoutMs(6000).connectionTimeoutMs(3000) //.namespace("LeaderLatchTest")
                    .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
            client.start();
            return client;
        }
    }

      Leader Election:通过 Leader Election 选举方案进行 Master选举,需添加 LeaderSelectorListener 监听器对领导权进行控制,当节点被选为leader之后,将调用 takeLeadership 方法进行业务逻辑处理,处理完成会立即释放 leadship,重新进行Master选举,这样每个节点都有可能成为leader。autoRequeue() 方法的调用确保此实例在释放领导权后还可能获得领导权:利用Curator中InterProcessMutex分布式锁进行抢主,抢到锁的即为Leader。示例如下:

    public class LeaderSelectorTest {
        private static final String zkServerIps = "192.168.1.101:2181";
        private static final String masterPath = "/testZK/leader_selector";
    
        public static void main(String[] args) {
            AtomicInteger atomicInteger = new AtomicInteger(1);
            try {
                for (int i = 0; i < 5; i++) {
                    new Thread(new Runnable() {
                        @Override
                        public void run() {
                            CuratorFramework client = getClient();  // 创建客户端
                            try {
                                client.blockUntilConnected();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            int number = atomicInteger.getAndIncrement();
                            final String name = "client#" + number;
                            final LeaderSelector selector = new LeaderSelector(client, masterPath, new LeaderSelectorListener() {
                                @Override
                                public void takeLeadership(CuratorFramework client) throws Exception {
                                    System.out.println(name + ": 我现在被选举为Leader!我开始工作了....");
                                    Thread.sleep(3000);
                                }
                                @Override
                                public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                                }
                            });
                            System.out.println("创建客户端:" + name);
                            try {//在抢到leader权限并释放后,自动加入抢主队列,重新抢主
                                selector.autoRequeue();
                                selector.start();
                                // 阻塞
                                System.in.read();
                            } catch (Exception e) {
                                System.out.println(e.getMessage());
                            } finally {
                                System.out.println("客户端 " + name + " 关闭");
                                CloseableUtils.closeQuietly(selector);
                                if (!client.getState().equals(CuratorFrameworkState.STOPPED)) {
                                    CloseableUtils.closeQuietly(client);
                                }
                            }
                        }
                    }).start();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        private static synchronized CuratorFramework getClient() {
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString(zkServerIps)
                    .sessionTimeoutMs(10000).connectionTimeoutMs(10000) //.namespace("LeaderLatchTest")
                    .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
            client.start();
            return client;
        }
    }

     

  • 相关阅读:
    js变量
    运行javascript的方式
    .Net 内存泄露
    .NET Reflector反编译的方法
    SVN 忽略获取和提交obj、bin文件夹
    SQL Server编程(06)触发器
    SQL Server编程(05)游标
    SQL Server编程(04)基本语法
    SQL Server编程(03)自定义存储过程
    SQL Server编程(02)自定义函数
  • 原文地址:https://www.cnblogs.com/wuzhenzhao/p/9995230.html
Copyright © 2011-2022 走看看