zoukankan      html  css  js  c++  java
  • 使用Zookeeper 实现选主从或者分布式锁

    概述


    1.zookeeper实现选主从的原理

    2.zookeeper实现选主从代码

    选主从的原理

    在分布式场景中经常会用到zookeeper,常用的有利用zookeeper来选举主从,管理节点状态,或者使用zookeeper来实现分布式锁;具体原理是什么呢?

    这里只将实现方式的一种,根据编号大小来实现:(其他方式有通过创建节点实现的,等等)

    所有的节点向zk的某个路径下注册,创建临时节点(临时节点,zookeeper会主动监控,一旦连接失效,zk会删除该临时节点),每个注册者创建时会有一个编号,每次选举编号最小的为主节点,其他节点就为从节点,从节点会监控主节点是否失效(怎么监控? zk有事件,监听事件的状态变化,然后重新选举),为避免“惊群”现象,每个节点只监控比它小的一个临近节点。

    代码实现

    选主从代码:

    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    
    import java.io.IOException;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * Created by  on 17/11/23.
     */
    public class ChooseMaster implements Watcher {
    
    
        private ZooKeeper zk=null;
        private String selfPath=null;
        private String waitPath=null;
        private static final String ZK_ROOT_PATH="/zkmaster";  //选主从的跟路径
        private static final String ZK_SUB_PATH=ZK_ROOT_PATH+"/register";
        private CountDownLatch successCountDownLatch=new CountDownLatch(1);
        private CountDownLatch threadCompleteLatch=null;
    
        public ChooseMaster(CountDownLatch countDownLatch){
    
            this.threadCompleteLatch=countDownLatch;
        }
    
        @Override
        public void process(WatchedEvent watchedEvent) {  //监听事件
    
            Event.KeeperState keeperState=watchedEvent.getState(); 
            Event.EventType eventType=watchedEvent.getType();
            if(Event.KeeperState.SyncConnected==keeperState){  //建立连接
    
                if(Event.EventType.None==eventType){
    
                    System.out.println(Thread.currentThread().getName()+" connected to server");
                    successCountDownLatch.countDown();
                }else if(Event.EventType.NodeDeleted==eventType && watchedEvent.getPath().equals(waitPath)){ //监测到节点删除,且为当前线程的等待节点
    
                    System.out.println(Thread.currentThread().getName() + " some node was deleted,I'll check if I am the minimum node");
                    try{
    
                        if(checkMinPath()){  //判断自己是不是最小的编号
    
                            processMasterEvent();  //处理主节点做的事情
                        }
                    }catch (Exception e){
    
                        e.printStackTrace();
                    }
    
                }
    
            }else if(Event.KeeperState.Disconnected==keeperState){  //连接断开
    
                System.out.println(Thread.currentThread().getName()+ " release connection");
            }else if(Event.KeeperState.Expired==keeperState){  //超时
    
                System.out.println(Thread.currentThread().getName()+ " connection expire");
            }
        }
    
    
        public void chooseMaster() throws Exception {
    
            selfPath=zk.create(ZK_SUB_PATH,null,ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);  //创建临时节点
            System.out.println(Thread.currentThread().getName()+ "create path "+selfPath);
            if(checkMinPath()){  //判断是否为主节点
    
                processMasterEvent();  
            }
        }
    
        public boolean createPersistPath(String path,String data,boolean needWatch) throws KeeperException, InterruptedException {
    
            if(zk.exists(path,needWatch)==null){
    
                zk.create(path,data.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
                System.out.println(Thread.currentThread().getName()+" create persist path "+path);
            }
            return true;
    
    
        }
    
        public void createConnection(String connection,int timeout) throws IOException, InterruptedException {
    
            zk=new ZooKeeper(connection,timeout,this);
            successCountDownLatch.await();
    
        }
    
        private void processMasterEvent() throws KeeperException, InterruptedException {
    
            if(zk.exists(selfPath,false)==null){
    
                System.out.println(Thread.currentThread().getName()+ " selfnode is not exist "+ selfPath);
                return;
            }
            System.out.println(Thread.currentThread().getName()+ " I'm the master,now do work");
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getName()+" Finish do work,leave master");
            //zk.delete(selfPath,-1);
            releaseConnection();
            threadCompleteLatch.countDown();
    
        }
    
        private void releaseConnection() {
    
            if(zk!=null){
    
                try {
                    zk.close();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private boolean checkMinPath() throws Exception {
    
    //获取根节点下的所有子节点,进行排序,取当前路径的index,如果排在第一个,则为主,否则检测前一个节点是否存在,不存在则重新选举最小的节点 List
    <String> subNodes=zk.getChildren(ZK_ROOT_PATH,false); System.out.println(subNodes.toString()); Collections.sort(subNodes); System.out.println(Thread.currentThread().getName()+" tmp node index is "+selfPath.substring(ZK_ROOT_PATH.length()+1)); int index=subNodes.indexOf(selfPath.substring(ZK_ROOT_PATH.length()+1)); switch (index){ case -1: System.out.println(Thread.currentThread().getName()+" create node is not exist"); return false; case 0: System.out.println(Thread.currentThread().getName()+" I'm the master"); return true; default: waitPath=ZK_ROOT_PATH+"/"+subNodes.get(index-1); System.out.println(Thread.currentThread().getName()+" the node before me is "+waitPath); try{ zk.getData(waitPath,true,new Stat()); return false; }catch (Exception e){ if(zk.exists(waitPath,false)==null){ System.out.println(Thread.currentThread().getName()+" the node before me is not exist,now is me"); return checkMinPath(); }else{ throw e; } } } } }

    测试代码:

    import org.apache.zookeeper.KeeperException;
    
    import java.io.IOException;
    import java.util.concurrent.*;
    
    /**
     * Created by  on 17/11/23.
     */
    public class MasterChoiceTest {
    
        private final static String ZK_CONNECT_STRING="127.0.0.1:2181";
        private final static String ZK_ROOT_PATH="/zkmaster";
        private final static int SESSION_TIMEOUT=10000;
        private static final int THREAD_NUM=5;
        private static int threadNo=0;
        private static ExecutorService executorService=null;
        private static CountDownLatch threadCompleteLatch=new CountDownLatch(THREAD_NUM);
    
        public static void main(String[] args){
    
            executorService= Executors.newFixedThreadPool(THREAD_NUM, new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
    
                    String name=String.format("The %s thread",++threadNo);
                    Thread ret=new Thread(Thread.currentThread().getThreadGroup(),r,name,0);
                    ret.setDaemon(false);
                    return ret;
                }
            });
            if(executorService!=null){
    
                startProcess();
            }
        }
    
        private static void startProcess() {
    
            Runnable masterChoiceTest=new Runnable() {
                @Override
                public void run() {
    
                    String threadName=Thread.currentThread().getName();
                    ChooseMaster chooseMaster=new ChooseMaster(threadCompleteLatch);
                    try {
                        chooseMaster.createConnection(ZK_CONNECT_STRING,SESSION_TIMEOUT);
                        System.out.println(Thread.currentThread().getName()+" connected to server");
                        synchronized (MasterChoiceTest.class){
    
                            chooseMaster.createPersistPath(ZK_ROOT_PATH,"thread "+threadName,true);
                        }
                        chooseMaster.chooseMaster();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
    
            for(int i=0;i<THREAD_NUM;i++){
    
                executorService.execute(masterChoiceTest);
            }
            executorService.shutdown();
            try {
                threadCompleteLatch.await();
                System.out.println("All thread finished");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    输出结果:

    The 1 thread-EventThread connected to server
    The 5 thread-EventThread connected to server
    The 3 thread-EventThread connected to server
    The 2 thread-EventThread connected to server
    The 5 thread connected to server
    The 3 thread connected to server
    The 2 thread connected to server
    The 1 thread connected to server
    The 4 thread-EventThread connected to server
    The 4 thread connected to server
    The 5 thread create persist path /zkmaster
    The 5 threadcreate path /zkmaster/register0000000000
    [register0000000000]
    The 5 thread tmp node index is register0000000000
    The 5 thread I'm the master
    The 1 threadcreate path /zkmaster/register0000000001
    The 2 threadcreate path /zkmaster/register0000000002
    The 5 thread I'm the master,now do work
    The 3 threadcreate path /zkmaster/register0000000003
    [register0000000000, register0000000002, register0000000001]
    The 1 thread tmp node index is register0000000001
    The 1 thread the node before me is /zkmaster/register0000000000
    [register0000000000, register0000000002, register0000000001, register0000000003]
    [register0000000000, register0000000002, register0000000001, register0000000003]
    The 3 thread tmp node index is register0000000003
    The 2 thread tmp node index is register0000000002
    The 3 thread the node before me is /zkmaster/register0000000002
    The 2 thread the node before me is /zkmaster/register0000000001
    The 4 threadcreate path /zkmaster/register0000000004
    [register0000000000, register0000000002, register0000000001, register0000000004, register0000000003]
    The 4 thread tmp node index is register0000000004
    The 4 thread the node before me is /zkmaster/register0000000003
    The 5 thread Finish do work,leave master
    The 1 thread-EventThread some node was deleted,I'll check if I am the minimum node
    [register0000000002, register0000000001, register0000000004, register0000000003]
    The 1 thread-EventThread tmp node index is register0000000001
    The 1 thread-EventThread I'm the master
    The 1 thread-EventThread I'm the master,now do work
    The 1 thread-EventThread Finish do work,leave master
    The 2 thread-EventThread some node was deleted,I'll check if I am the minimum node
    [register0000000002, register0000000004, register0000000003]
    The 2 thread-EventThread tmp node index is register0000000002
    The 2 thread-EventThread I'm the master
    The 2 thread-EventThread I'm the master,now do work
    The 2 thread-EventThread Finish do work,leave master
    The 3 thread-EventThread some node was deleted,I'll check if I am the minimum node
    [register0000000004, register0000000003]
    The 3 thread-EventThread tmp node index is register0000000003
    The 3 thread-EventThread I'm the master
    The 3 thread-EventThread I'm the master,now do work
    The 3 thread-EventThread Finish do work,leave master
    The 4 thread-EventThread some node was deleted,I'll check if I am the minimum node
    [register0000000004]
    The 4 thread-EventThread tmp node index is register0000000004
    The 4 thread-EventThread I'm the master
    The 4 thread-EventThread I'm the master,now do work
    The 4 thread-EventThread Finish do work,leave master
    All thread finished

    分布式锁的原理也是一样,每次编号最小的获取锁。

  • 相关阅读:
    Median Value
    237. Delete Node in a Linked List
    206. Reverse Linked List
    160. Intersection of Two Linked Lists
    83. Remove Duplicates from Sorted List
    21. Merge Two Sorted Lists
    477. Total Hamming Distance
    421. Maximum XOR of Two Numbers in an Array
    397. Integer Replacement
    318. Maximum Product of Word Lengths
  • 原文地址:https://www.cnblogs.com/dpains/p/7885645.html
Copyright © 2011-2022 走看看