zoukankan      html  css  js  c++  java
  • zookeeper 本地多线程模拟分布式事务控制及配置中心

    基本思路是:通过每个连接创建 临时节点(避免宕机后节点不释放)后规定节点最小的拥有获取锁的权利,那么其他的就拿不到了,但是每个节点都对前一个节点建立delete的watch机制。那么每次前一个节点释放锁(delete)触发watch 后一个节点就能获取锁 本地启动zkServer:

    package xyz.luofu.www;
    
    import org.apache.zookeeper.*;
    import org.junit.Test;
    
    import java.io.IOException;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    public class TestZookeePerLock {
    
        private String seqNodeNamePar = "/Order"; //这个节点先建好 create /Order
        private ThreadLocal<String> currentNodeNameThr = new ThreadLocal<>();
        private ThreadLocal<ZooKeeper> zk = new ThreadLocal<>();
        public boolean tryLock() {
            try {
                String seqNodeName = "/sq";
                zk.set(new ZooKeeper("localhost:2181", 3000, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
    
                    }
                }));
                //Ids.OPEN_ACL_UNSAFE,这就表明之后对这个节点的任何操作都不受权限控制
                currentNodeNameThr.set(zk.get().create(seqNodeNamePar + seqNodeName, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL));
                List<String> children = zk.get().getChildren(seqNodeNamePar, false);
                Collections.sort(children);
    
                String currentNodeNameStr = currentNodeNameThr.get().substring(currentNodeNameThr.get().lastIndexOf("/") + 1);
                int index = children.indexOf(currentNodeNameStr);
                String preNodeName = "";
                if(index != 0){
                    preNodeName = children.get(index - 1);
                }
                String first = children.get(0);
                if (currentNodeNameStr.equals(first)) {
                    System.out.println(Thread.currentThread().getName()+"获得锁成功");
                    return true;
                } else {
                    final CountDownLatch countDownLatch = new CountDownLatch(1);
                    zk.get().exists(seqNodeNamePar + "/" + preNodeName, new Watcher() {
                        @Override
                        public void process(WatchedEvent event) {
                            if (Event.EventType.NodeDeleted == event.getType()) {
                                countDownLatch.countDown();
                                System.out.println(Thread.currentThread().getName()+"重新获得锁成功");
                            }
                        }
                    });
                    System.out.println(Thread.currentThread().getName()+"等待锁");
                    countDownLatch.await();
                    return true;
                }
            } catch (Exception e) {
                e.printStackTrace();
                return false;
            }
        }
        public void unlock(){
            try {
                Thread.sleep(100);//避免还没建立监听这个节点就已delete。那么后面建立的delete watch 就触发不了了。
                zk.get().delete(currentNodeNameThr.get(),-1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"释放锁成功");
        }
    
        class TestZoo implements Runnable{
    
            @Override
            public void run() {
                new Order().create();
                tryLock();
                boolean flag = new Store().descStore();
                unlock();
                if(flag){
                    new Pay().doPay();
                    System.out.println(Thread.currentThread().getName()+"支付成功");
                }
            }
        }
    
        @Test
        public void testZo() throws InterruptedException {
            new Thread(new TestZoo(),"线程一").start();
            new Thread(new TestZoo(),"线程二").start();
            int count = Thread.activeCount();
            while(count > 2){
                Thread.yield();
            }
            System.out.println("主线程结束");
        }
    }
    
    public class Order {
        public void create(){
            System.out.println(Thread.currentThread().getName()+"创建订单");
        }
    }
    
    public class Pay {
        public void doPay(){
        }
    }
    
    public class Store {
        public int count =1;
        public boolean descStore(){
            if(count > 0){
                count--;
                System.out.println(Thread.currentThread().getName()+"减库存成功 count:"+count);
                return true;
            }
            System.out.println(Thread.currentThread().getName()+"减库存失败");
            return false;
        }
    }
    

    运行截图:

    对于分布式配置中心,也是结合watch机制,加cache。每次新增zk.create并且加入cache其他操作类似。初始化时加载配置节点/config下的所有节点进入cache,并且设置对数据操作的watch相应的操作缓存。然后其他服务只需从cache中读取数据。

  • 相关阅读:
    MVVMLight leaning note
    c# Random Class usage
    Learning note for Binding and validation
    Content Template & DataTemplate 区别
    ListBox mvvm 学习笔记
    spinlock自旋锁de使用
    linux 内核(驱动)常用函数
    linux 编译,链接和加载
    Linux 下多核CPU知识
    linux 内核调试
  • 原文地址:https://www.cnblogs.com/leifonlyone/p/12851534.html
Copyright © 2011-2022 走看看