zoukankan      html  css  js  c++  java
  • 基于zookeeper实现分布式锁

      关于zookeeper的基础知识及安装部署,这位文章已经讲的很清楚了,传送门https://my.oschina.net/u/3796575/blog/1845035,场景为全局id生成,直接说一下思路,有两种实现,一种基于临时节点,一种是基于临时顺序节点,当然共同的部分要用的zookeeper的watch机制以及客户端断开连接后临时节点自动删除的特性。

      基于临时节点

      依赖

           <dependency>
                <groupId>com.101tec</groupId>
                <artifactId>zkclient</artifactId>
                <version>0.10</version>
            </dependency>

      简单的序列化

    package com.jlwj.zklock.service;
    
    import org.I0Itec.zkclient.exception.ZkMarshallingError;
    import org.I0Itec.zkclient.serialize.ZkSerializer;
    
    import java.io.UnsupportedEncodingException;
    
    public class MyZkSerializer implements ZkSerializer {
    
        private String charset = "UTF-8";
    
    
        @Override
        public byte[] serialize(Object o) throws ZkMarshallingError {
            try {
                return String.valueOf(o).getBytes(charset);
            } catch (UnsupportedEncodingException e) {
                throw new ZkMarshallingError();
            }
        }
    
        @Override
        public Object deserialize(byte[] bytes) throws ZkMarshallingError {
            try {
                return new String(bytes,charset);
            } catch (UnsupportedEncodingException e) {
                throw new ZkMarshallingError();
            }
        }
    }

      id生成器

    package com.jlwj.zklock.service;
    
    import java.text.SimpleDateFormat;
    import java.time.Instant;
    import java.time.format.DateTimeFormatter;
    import java.util.Date;
    
    
    public class OrderNoGenerate {
    
        private int i =0;
    
        public String genetateOrderNo(){
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
            return simpleDateFormat.format(new Date()) +"-"+  ++i;
    
        }
    }

      分布式锁实现

    package com.jlwj.zklock.service;
    
    import org.I0Itec.zkclient.IZkDataListener;
    import org.I0Itec.zkclient.ZkClient;
    import org.I0Itec.zkclient.exception.ZkNodeExistsException;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    
    public class ZkLock01 implements Lock {
    
        private String lockPath;
    
        private ZkClient zkClient;
    
        public ZkLock01(String lockPath){
            this.lockPath = lockPath;
            zkClient = new ZkClient("localhost:2181");
            zkClient.setZkSerializer(new MyZkSerializer());
        }
    
        @Override
        public void unlock() {
            zkClient.delete(lockPath);
    
        }
    
        @Override
        public boolean tryLock() {
            try {
                zkClient.createEphemeral(lockPath);
            } catch (ZkNodeExistsException e) {
                return false;
            }
            return true;
        }
    
        @Override
        public void lock() {
            if(!tryLock()){
                waitForLock();
                lock();
            }
    
    
        }
    
        private void waitForLock(){
            CountDownLatch countDownLatch = new CountDownLatch(1);
            IZkDataListener listener = new IZkDataListener() {
                @Override
                public void handleDataChange(String s, Object o) throws Exception {
                }
                @Override
                public void handleDataDeleted(String s) throws Exception {
                    System.out.println(s+ "节点被删除了");
                    countDownLatch.countDown();
                }
            };
            zkClient.subscribeDataChanges(lockPath,listener);
            if(zkClient.exists(lockPath)){
                try {
                    countDownLatch.await();
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            zkClient.unsubscribeDataChanges(lockPath,listener);
    
        }
    
    
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
    
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return false;
        }
    
        @Override
        public Condition newCondition() {
            return null;
        }
    }

      使用分布式锁的service

    package com.jlwj.zklock.service.impl;
    
    import com.jlwj.zklock.service.OrderNoGenerate;
    import com.jlwj.zklock.service.OrderService;
    import com.jlwj.zklock.service.ZkLock01;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class ZkLockOrderServiceImpl implements OrderService {
    
        private  static OrderNoGenerate  orderNoGenerate = new OrderNoGenerate();
    
        private Lock lock = new ZkLock01("/001");
    
        @Override
        public void createOrder() {
            String orderNo = null;
            try {
                lock.lock();
                orderNo =orderNoGenerate.genetateOrderNo();
                System.out.println(Thread.currentThread().getName()+ "------->"+orderNo);
            } finally {
                lock.unlock();
            }
        }
    }

      测试类

    package com.jlwj.zklock;
    
    import com.jlwj.zklock.service.OrderService;
    import com.jlwj.zklock.service.impl.ZkImproveLockOrderServiceImpl;
    import com.jlwj.zklock.service.impl.ZkLockOrderServiceImpl;
    
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    
    public class LockTest {
    
        public static void main(String[] args) {
            int currency = 20;
    
            CyclicBarrier cyclicBarrier = new CyclicBarrier(currency);
            for (int i = 0; i <currency ; i++) {
    
                new Thread(() -> {
                    OrderService orderService = new ZkLockOrderServiceImpl();
                    System.out.println(Thread.currentThread().getName()+"线程准备好");
                    try {
                        cyclicBarrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                    orderService.createOrder();
                }).start();
            }
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

      因为我们要模拟分布式环境,因此在使用分布式锁的servicve中引用了id生成器,真实情况是id的生成应该独立部署并且线程安全,通过查看打印的日志可以看出每次节点被删除后,所有等待锁的线程都会收到通知从而去抢锁,这种也称为惊群效应,会造成资源的浪费,其实只需要通知其中一个线程去抢锁即可。

      基于临时顺序节点

    package com.jlwj.zklock.service;
    
    import org.I0Itec.zkclient.IZkDataListener;
    import org.I0Itec.zkclient.ZkClient;
    import org.I0Itec.zkclient.exception.ZkNodeExistsException;
    
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.stream.Collectors;
    
    public class ZkLock02 implements Lock {
    
        private String lockPath;
    
        private ZkClient zkClient;
    
        private String beforePath;
    
        private String currentPath;
    
    
        public ZkLock02(String lockPath){
            this.lockPath = lockPath;
            zkClient = new ZkClient("localhost:2181");
            zkClient.setZkSerializer(new MyZkSerializer());
            if(!zkClient.exists(lockPath)){
                try {
                    zkClient.createPersistent(lockPath);
                } catch (RuntimeException e) {
                }
            }
        }
    
        @Override
        public void unlock() {
            zkClient.delete(currentPath);
    
        }
    
        @Override
        public boolean tryLock() {
            if(currentPath==null){
                currentPath = zkClient.createEphemeralSequential(lockPath+"/","asd");
            }
            List<String> childs = zkClient.getChildren(lockPath).stream().sorted((a, b)->a.compareTo(b)).collect(Collectors.toList());
            if(currentPath.equals(lockPath +"/" + childs.get(0))){
                return true;
            }else{
                int curIndex = childs.indexOf(currentPath.substring(lockPath.length()+1));
                beforePath = lockPath +"/" + childs.get(curIndex-1);
            }
            return false;
        }
    
        @Override
        public void lock() {
            if(!tryLock()){
                waitForLock();
                lock();
            }
    
    
        }
    
        private void waitForLock(){
            CountDownLatch countDownLatch = new CountDownLatch(1);
            IZkDataListener listener = new IZkDataListener() {
                @Override
                public void handleDataChange(String s, Object o) throws Exception {
                }
                @Override
                public void handleDataDeleted(String s) throws Exception {
                    System.out.println(s+ "节点被删除了");
                    countDownLatch.countDown();
                }
            };
            zkClient.subscribeDataChanges(beforePath,listener);
            if(zkClient.exists(beforePath)){
                try {
                    countDownLatch.await();
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            zkClient.unsubscribeDataChanges(beforePath,listener);
    
        }
    
    
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
    
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return false;
        }
    
        @Override
        public Condition newCondition() {
            return null;
        }
    }

      使用临时顺序节点后,避免了惊群效应,同时一定程度上实现了公平,先到先得。

  • 相关阅读:
    MongoDB在windows服务器安装部署及远程连接MongoDB
    react 常用组件
    react component 语法报错解决
    yarn install node-sass(gulp-sass) 安装失败解决方案
    eslint 规则中文注释
    react jsx 代码格式化
    vue sublime 工欲善其事,必先利其器
    jenkins 配置
    nodejs 使用 superagent 与 cheerio 完成简单爬虫
    jQuery DOM对象区别与联系
  • 原文地址:https://www.cnblogs.com/hhhshct/p/11365817.html
Copyright © 2011-2022 走看看