zoukankan      html  css  js  c++  java
  • 基于zookeeper实现分布式锁

      关于zookeeper的基础知识及安装部署,这位文章已经讲的很清楚了,传送门https://my.oschina.net/u/3796575/blog/1845035,场景为全局id生成,直接说一下思路,有两种实现,一种基于临时节点,一种是基于临时顺序节点,当然共同的部分要用的zookeeper的watch机制以及客户端断开连接后临时节点自动删除的特性。

      基于临时节点

      依赖

           <dependency>
                <groupId>com.101tec</groupId>
                <artifactId>zkclient</artifactId>
                <version>0.10</version>
            </dependency>

      简单的序列化

    package com.jlwj.zklock.service;
    
    import org.I0Itec.zkclient.exception.ZkMarshallingError;
    import org.I0Itec.zkclient.serialize.ZkSerializer;
    
    import java.io.UnsupportedEncodingException;
    
    public class MyZkSerializer implements ZkSerializer {
    
        private String charset = "UTF-8";
    
    
        @Override
        public byte[] serialize(Object o) throws ZkMarshallingError {
            try {
                return String.valueOf(o).getBytes(charset);
            } catch (UnsupportedEncodingException e) {
                throw new ZkMarshallingError();
            }
        }
    
        @Override
        public Object deserialize(byte[] bytes) throws ZkMarshallingError {
            try {
                return new String(bytes,charset);
            } catch (UnsupportedEncodingException e) {
                throw new ZkMarshallingError();
            }
        }
    }

      id生成器

    package com.jlwj.zklock.service;
    
    import java.text.SimpleDateFormat;
    import java.time.Instant;
    import java.time.format.DateTimeFormatter;
    import java.util.Date;
    
    
    public class OrderNoGenerate {
    
        private int i =0;
    
        public String genetateOrderNo(){
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
            return simpleDateFormat.format(new Date()) +"-"+  ++i;
    
        }
    }

      分布式锁实现

    package com.jlwj.zklock.service;
    
    import org.I0Itec.zkclient.IZkDataListener;
    import org.I0Itec.zkclient.ZkClient;
    import org.I0Itec.zkclient.exception.ZkNodeExistsException;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    
    public class ZkLock01 implements Lock {
    
        private String lockPath;
    
        private ZkClient zkClient;
    
        public ZkLock01(String lockPath){
            this.lockPath = lockPath;
            zkClient = new ZkClient("localhost:2181");
            zkClient.setZkSerializer(new MyZkSerializer());
        }
    
        @Override
        public void unlock() {
            zkClient.delete(lockPath);
    
        }
    
        @Override
        public boolean tryLock() {
            try {
                zkClient.createEphemeral(lockPath);
            } catch (ZkNodeExistsException e) {
                return false;
            }
            return true;
        }
    
        @Override
        public void lock() {
            if(!tryLock()){
                waitForLock();
                lock();
            }
    
    
        }
    
        private void waitForLock(){
            CountDownLatch countDownLatch = new CountDownLatch(1);
            IZkDataListener listener = new IZkDataListener() {
                @Override
                public void handleDataChange(String s, Object o) throws Exception {
                }
                @Override
                public void handleDataDeleted(String s) throws Exception {
                    System.out.println(s+ "节点被删除了");
                    countDownLatch.countDown();
                }
            };
            zkClient.subscribeDataChanges(lockPath,listener);
            if(zkClient.exists(lockPath)){
                try {
                    countDownLatch.await();
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            zkClient.unsubscribeDataChanges(lockPath,listener);
    
        }
    
    
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
    
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return false;
        }
    
        @Override
        public Condition newCondition() {
            return null;
        }
    }

      使用分布式锁的service

    package com.jlwj.zklock.service.impl;
    
    import com.jlwj.zklock.service.OrderNoGenerate;
    import com.jlwj.zklock.service.OrderService;
    import com.jlwj.zklock.service.ZkLock01;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class ZkLockOrderServiceImpl implements OrderService {
    
        private  static OrderNoGenerate  orderNoGenerate = new OrderNoGenerate();
    
        private Lock lock = new ZkLock01("/001");
    
        @Override
        public void createOrder() {
            String orderNo = null;
            try {
                lock.lock();
                orderNo =orderNoGenerate.genetateOrderNo();
                System.out.println(Thread.currentThread().getName()+ "------->"+orderNo);
            } finally {
                lock.unlock();
            }
        }
    }

      测试类

    package com.jlwj.zklock;
    
    import com.jlwj.zklock.service.OrderService;
    import com.jlwj.zklock.service.impl.ZkImproveLockOrderServiceImpl;
    import com.jlwj.zklock.service.impl.ZkLockOrderServiceImpl;
    
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    
    public class LockTest {
    
        public static void main(String[] args) {
            int currency = 20;
    
            CyclicBarrier cyclicBarrier = new CyclicBarrier(currency);
            for (int i = 0; i <currency ; i++) {
    
                new Thread(() -> {
                    OrderService orderService = new ZkLockOrderServiceImpl();
                    System.out.println(Thread.currentThread().getName()+"线程准备好");
                    try {
                        cyclicBarrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                    orderService.createOrder();
                }).start();
            }
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

      因为我们要模拟分布式环境,因此在使用分布式锁的servicve中引用了id生成器,真实情况是id的生成应该独立部署并且线程安全,通过查看打印的日志可以看出每次节点被删除后,所有等待锁的线程都会收到通知从而去抢锁,这种也称为惊群效应,会造成资源的浪费,其实只需要通知其中一个线程去抢锁即可。

      基于临时顺序节点

    package com.jlwj.zklock.service;
    
    import org.I0Itec.zkclient.IZkDataListener;
    import org.I0Itec.zkclient.ZkClient;
    import org.I0Itec.zkclient.exception.ZkNodeExistsException;
    
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.stream.Collectors;
    
    public class ZkLock02 implements Lock {
    
        private String lockPath;
    
        private ZkClient zkClient;
    
        private String beforePath;
    
        private String currentPath;
    
    
        public ZkLock02(String lockPath){
            this.lockPath = lockPath;
            zkClient = new ZkClient("localhost:2181");
            zkClient.setZkSerializer(new MyZkSerializer());
            if(!zkClient.exists(lockPath)){
                try {
                    zkClient.createPersistent(lockPath);
                } catch (RuntimeException e) {
                }
            }
        }
    
        @Override
        public void unlock() {
            zkClient.delete(currentPath);
    
        }
    
        @Override
        public boolean tryLock() {
            if(currentPath==null){
                currentPath = zkClient.createEphemeralSequential(lockPath+"/","asd");
            }
            List<String> childs = zkClient.getChildren(lockPath).stream().sorted((a, b)->a.compareTo(b)).collect(Collectors.toList());
            if(currentPath.equals(lockPath +"/" + childs.get(0))){
                return true;
            }else{
                int curIndex = childs.indexOf(currentPath.substring(lockPath.length()+1));
                beforePath = lockPath +"/" + childs.get(curIndex-1);
            }
            return false;
        }
    
        @Override
        public void lock() {
            if(!tryLock()){
                waitForLock();
                lock();
            }
    
    
        }
    
        private void waitForLock(){
            CountDownLatch countDownLatch = new CountDownLatch(1);
            IZkDataListener listener = new IZkDataListener() {
                @Override
                public void handleDataChange(String s, Object o) throws Exception {
                }
                @Override
                public void handleDataDeleted(String s) throws Exception {
                    System.out.println(s+ "节点被删除了");
                    countDownLatch.countDown();
                }
            };
            zkClient.subscribeDataChanges(beforePath,listener);
            if(zkClient.exists(beforePath)){
                try {
                    countDownLatch.await();
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            zkClient.unsubscribeDataChanges(beforePath,listener);
    
        }
    
    
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
    
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return false;
        }
    
        @Override
        public Condition newCondition() {
            return null;
        }
    }

      使用临时顺序节点后,避免了惊群效应,同时一定程度上实现了公平,先到先得。

  • 相关阅读:
    Python中所有的关键字
    关于selenium的8种元素定位
    对提示框的操作
    selenium+webservice进行百度登录
    MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist on disk. Commands that may modify the data set are disabled...报错解决
    Vue中使用echarts
    npm WARN deprecated request@2.88.2: request has been deprecated, see https://github.com/request/request/issues/3142解决方法
    插入排序
    冒泡排序优化
    roject 'org.springframework.boot:spring-boot-starter-parent:XXX' not found 解决
  • 原文地址:https://www.cnblogs.com/hhhshct/p/11365817.html
Copyright © 2011-2022 走看看