zoukankan      html  css  js  c++  java
  • Zookeeper应用场景之分布式屏障Barrier

    Barrier就是栅栏或者屏障,适用于这样的业务场景:当有些操作需要并行执行,但后续操作又需要串行执行,此时必须等待所有并行执行的线程全部结束,才开始串行,于是就需要一个屏障,来控制所有线程同时开始,并等待所有线程全部结束。

    下面放上一个简陋的图以便理解。

    要解决的问题如下:

    1.如何控制所有线程同时开始?
    所有的线程启动时在zookeeper节点/barrier下插入顺序临时节点,然后检查/barrier下所有children节点的数量是否为所有的线程数,如果不是,则等待。
    如果是,则开始执行。
    2.如何等待所有线程结束?
    所有线程在执行完毕后,都检查/barrier下所有children节点数量是否为0,若不为0,则继续等待。
    3.用什么类型的节点?
    根节点使用持久节点(persistent node),子节点使用临时节点(Ephemeral node)
    根节点为什么要用持久节点?首先因为临时节点不能有子节点,所以根节点要用持久节点,并且在程序中要判断根节点是否存在。
    子节点为什么要用临时节点?临时节点随着连接的断开而消失,在程序中,虽然会删除临时节点,但可能会出现程序在节点被删除之前就crash了,如果是持久节点,节点不会被删除。
     
    首先我写了一个ZookeeperClient,由这个client端负维护一个Zookeeper对象,并对Zookeeper节点进行操作。
    public class ZookeeperClient {
        public static ZooKeeper zooKeeper;
        public static final String IP_ADDRESS="xxxx:2181";
        public static void init() throws  Exception{
    
            zooKeeper = new ZooKeeper(IP_ADDRESS, 15000, new Watcher() {
    
                public void process(WatchedEvent watchedEvent) {
                    if (watchedEvent.getState()== Event.KeeperState.SyncConnected) {
    
    
                    }
                }
    
            });
    
        }
        public static String createTempNode(String path,String data) {
            try {
                String node = zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                System.out.println("node "+ path +" with data is created,return node "+node);
                return node;
            } catch (KeeperException e) {
                e.printStackTrace();
                return "ERROR";
            } catch (InterruptedException e) {
                e.printStackTrace();
                return "ERROR";
            }
        }
        public  static boolean delete(String path,int version) {
            try {
                zooKeeper.delete(path,version);
                System.out.println("delete path:"+ path + "success");
                return true;
            } catch (InterruptedException e) {
                e.printStackTrace();
                return false;
            } catch (KeeperException e) {
                e.printStackTrace();
                return false;
            }
    
        }
        public static boolean createPersistentNode(String path,String data) {
            try {
                String node = zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                System.out.println("node "+ path +" with data is created");
                return true;
            } catch (KeeperException e) {
                e.printStackTrace();
                return false;
            } catch (InterruptedException e) {
                e.printStackTrace();
                return false;
            }
        }
        public static boolean checkExists(String path){
            try {
                Stat stat = zooKeeper.exists(path,true);
                if(stat!=null) {
                    return true;
                }
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return false;
        }
        public static int getChildrens(String path) {
            try {
                return zooKeeper.getChildren(path,true).size();
            } catch (KeeperException e) {
                e.printStackTrace();
                return -1;
            } catch (InterruptedException e) {
                e.printStackTrace();
                return  -1;
            }
        }
    }

    Barrier类,负责实现屏障的功能

    public class Barrier {
        private int size;
        private String rootPath;
        public Barrier(int size,String rootPath){
            this.rootPath = rootPath;
            this.size = size;
    
    
        }
        public void init() throws Exception {
           ZookeeperClient.init();
            if(!ZookeeperClient.checkExists(rootPath)){
                ZookeeperClient.createPersistentNode(rootPath,"1");
            }
        }
        public boolean enter(String name,String number){
            ZookeeperClient.createTempNode(rootPath+"/"+name, number);
            //如果节点下children的数量没有达到所有线程的总数,则继续轮询。
            //此时要等待所有的线程都在根节点下创建了节点,才开始执行
            while(true) {
                int size=ZookeeperClient.getChildrens(rootPath);
                if (size != this.size) {
                    try {
    
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    return true;
                }
            }
        }
        public  boolean exit(String name){
            //先删除自己的节点
            ZookeeperClient.delete(rootPath+"/"+name,0);
            //如果节点下children数量大于0,则继续轮询
            //此时要等待所有的线程都删除了节点,即所有线程都做完了该做的事情,才结束线程。确保所有的线程同时结束。
            while(true){
                int size = ZookeeperClient.getChildrens(rootPath);
                if(size!=0) {
                    System.out.println("The current children node under "+rootPath+" is " + size+", still need waiting");
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    return true;
                }
            }
        }
    }

    BarrierDemo类,负责启动线程,为了模拟等待,我为3个线程设置了不同的休眠时间。

    预想的结果是t3首先删除节点,此时子节点剩下两个(t1和t2),t3不会结束,而是继续轮询。

    t2随后删除节点,此时子节点剩下1个(t1),t2和t3继续轮询。

    t3最后删除节点,此时没有子节点,t1,t2,t3全部结束。

    public class BarrierDemo {
        public static void main(String[] args) throws  Exception{
            Barrier barrier = new Barrier(3,"/barrier");
            barrier.init();
            Worker worker1 = new Worker(barrier,10000);
            Worker worker2 = new Worker(barrier,5000);
            Worker worker3 = new Worker(barrier,2000);
            Thread t1 = new Thread(worker1,"t1");
            Thread t2 = new Thread(worker2,"t2");
            Thread t3 = new Thread(worker3,"t3");
            t1.start();
            t2.start();
            t3.start();
    
    
        }
    }
    class Worker implements Runnable {
        Barrier barrier;
        long time;
        Worker(Barrier barrier, long time){
            this.barrier = barrier;
            this.time = time;
    
        }
        public void run() {
            boolean isEnter=barrier.enter(Thread.currentThread().getName(),"0");
            if(isEnter) {
                System.out.println(Thread.currentThread().getName()+"is working on something important now");
                try {
                    Thread.sleep(time);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            boolean isExit=barrier.exit(Thread.currentThread().getName());
            if (isExit) {
                System.out.println(Thread.currentThread().getName()+"is exiting..");
            }
        }
    }

    运行结果如下

  • 相关阅读:
    Pandas学习整理与实践
    数据描述性统计整理
    关于购置硬盘的相关注意点
    福大软工 · 最终作业
    福大软工 · 第十二次作业
    Beta冲刺 (7/7)
    Beta冲刺 (6/7)
    深度剖析Vue中父给子、子给父、兄弟之间传值!
    mysql 增删改插
    前端必学TypeScript之第一弹,st基础类型!
  • 原文地址:https://www.cnblogs.com/qingfei1994/p/7670326.html
Copyright © 2011-2022 走看看