关于zookeeper的基础知识及安装部署,这位文章已经讲的很清楚了,传送门https://my.oschina.net/u/3796575/blog/1845035,场景为全局id生成,直接说一下思路,有两种实现,一种基于临时节点,一种是基于临时顺序节点,当然共同的部分要用的zookeeper的watch机制以及客户端断开连接后临时节点自动删除的特性。
基于临时节点
依赖
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency>
简单的序列化
package com.jlwj.zklock.service; import org.I0Itec.zkclient.exception.ZkMarshallingError; import org.I0Itec.zkclient.serialize.ZkSerializer; import java.io.UnsupportedEncodingException; public class MyZkSerializer implements ZkSerializer { private String charset = "UTF-8"; @Override public byte[] serialize(Object o) throws ZkMarshallingError { try { return String.valueOf(o).getBytes(charset); } catch (UnsupportedEncodingException e) { throw new ZkMarshallingError(); } } @Override public Object deserialize(byte[] bytes) throws ZkMarshallingError { try { return new String(bytes,charset); } catch (UnsupportedEncodingException e) { throw new ZkMarshallingError(); } } }
id生成器
package com.jlwj.zklock.service; import java.text.SimpleDateFormat; import java.time.Instant; import java.time.format.DateTimeFormatter; import java.util.Date; public class OrderNoGenerate { private int i =0; public String genetateOrderNo(){ SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss"); return simpleDateFormat.format(new Date()) +"-"+ ++i; } }
分布式锁实现
package com.jlwj.zklock.service; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; public class ZkLock01 implements Lock { private String lockPath; private ZkClient zkClient; public ZkLock01(String lockPath){ this.lockPath = lockPath; zkClient = new ZkClient("localhost:2181"); zkClient.setZkSerializer(new MyZkSerializer()); } @Override public void unlock() { zkClient.delete(lockPath); } @Override public boolean tryLock() { try { zkClient.createEphemeral(lockPath); } catch (ZkNodeExistsException e) { return false; } return true; } @Override public void lock() { if(!tryLock()){ waitForLock(); lock(); } } private void waitForLock(){ CountDownLatch countDownLatch = new CountDownLatch(1); IZkDataListener listener = new IZkDataListener() { @Override public void handleDataChange(String s, Object o) throws Exception { } @Override public void handleDataDeleted(String s) throws Exception { System.out.println(s+ "节点被删除了"); countDownLatch.countDown(); } }; zkClient.subscribeDataChanges(lockPath,listener); if(zkClient.exists(lockPath)){ try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } zkClient.unsubscribeDataChanges(lockPath,listener); } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public Condition newCondition() { return null; } }
使用分布式锁的service
package com.jlwj.zklock.service.impl; import com.jlwj.zklock.service.OrderNoGenerate; import com.jlwj.zklock.service.OrderService; import com.jlwj.zklock.service.ZkLock01; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ZkLockOrderServiceImpl implements OrderService { private static OrderNoGenerate orderNoGenerate = new OrderNoGenerate(); private Lock lock = new ZkLock01("/001"); @Override public void createOrder() { String orderNo = null; try { lock.lock(); orderNo =orderNoGenerate.genetateOrderNo(); System.out.println(Thread.currentThread().getName()+ "------->"+orderNo); } finally { lock.unlock(); } } }
测试类
package com.jlwj.zklock; import com.jlwj.zklock.service.OrderService; import com.jlwj.zklock.service.impl.ZkImproveLockOrderServiceImpl; import com.jlwj.zklock.service.impl.ZkLockOrderServiceImpl; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class LockTest { public static void main(String[] args) { int currency = 20; CyclicBarrier cyclicBarrier = new CyclicBarrier(currency); for (int i = 0; i <currency ; i++) { new Thread(() -> { OrderService orderService = new ZkLockOrderServiceImpl(); System.out.println(Thread.currentThread().getName()+"线程准备好"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } orderService.createOrder(); }).start(); } try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } }
因为我们要模拟分布式环境,因此在使用分布式锁的servicve中引用了id生成器,真实情况是id的生成应该独立部署并且线程安全,通过查看打印的日志可以看出每次节点被删除后,所有等待锁的线程都会收到通知从而去抢锁,这种也称为惊群效应,会造成资源的浪费,其实只需要通知其中一个线程去抢锁即可。
基于临时顺序节点
package com.jlwj.zklock.service; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.stream.Collectors; public class ZkLock02 implements Lock { private String lockPath; private ZkClient zkClient; private String beforePath; private String currentPath; public ZkLock02(String lockPath){ this.lockPath = lockPath; zkClient = new ZkClient("localhost:2181"); zkClient.setZkSerializer(new MyZkSerializer()); if(!zkClient.exists(lockPath)){ try { zkClient.createPersistent(lockPath); } catch (RuntimeException e) { } } } @Override public void unlock() { zkClient.delete(currentPath); } @Override public boolean tryLock() { if(currentPath==null){ currentPath = zkClient.createEphemeralSequential(lockPath+"/","asd"); } List<String> childs = zkClient.getChildren(lockPath).stream().sorted((a, b)->a.compareTo(b)).collect(Collectors.toList()); if(currentPath.equals(lockPath +"/" + childs.get(0))){ return true; }else{ int curIndex = childs.indexOf(currentPath.substring(lockPath.length()+1)); beforePath = lockPath +"/" + childs.get(curIndex-1); } return false; } @Override public void lock() { if(!tryLock()){ waitForLock(); lock(); } } private void waitForLock(){ CountDownLatch countDownLatch = new CountDownLatch(1); IZkDataListener listener = new IZkDataListener() { @Override public void handleDataChange(String s, Object o) throws Exception { } @Override public void handleDataDeleted(String s) throws Exception { System.out.println(s+ "节点被删除了"); countDownLatch.countDown(); } }; zkClient.subscribeDataChanges(beforePath,listener); if(zkClient.exists(beforePath)){ try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } zkClient.unsubscribeDataChanges(beforePath,listener); } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public Condition newCondition() { return null; } }
使用临时顺序节点后,避免了惊群效应,同时一定程度上实现了公平,先到先得。