首先就是添加对分布式锁支持的比较好的客户端依赖
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.12.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.12.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency>
添加配置信息
curator.retryCount=5 curator.elapsedTimeMs=5000 curator.connectString=127.0.0.1:2181 curator.sessionTimeoutMs=60000 curator.connectionTimeoutMs=5000
添加配置类信息
package com.voole.config.curator; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.PropertySource; import org.springframework.stereotype.Component; @Component @ConfigurationProperties(prefix="curator",ignoreInvalidFields=true,ignoreUnknownFields=true) @PropertySource("classpath:curator.properties") public class CuratorConfiguration { private final static Logger logger = LoggerFactory.getLogger(CuratorConfiguration.class); /* public void init() { try { Properties param = new Properties(); FileInputStream ois = FileUtils.openInputStream(ResourceUtils.getFile("classpath:curator.properties")); param.load(ois); setRetryCount(param.getProperty("retryCount")); setElapsedTimeMs(param.getProperty("elapsedTimeMs")); setConnectString(param.getProperty("connectString")); setSessionTimeoutMs(param.getProperty("sessionTimeoutMs")); setConnectionTimeoutMs(param.getProperty("connectionTimeoutMs")); } catch (Exception e) { logger.error(" 配置文件curator.properties 加载失败"); e.printStackTrace(); } }*/ /** * 重试次数 */ // @Value("${curator.retryCount}") private String retryCount; /** * 重试间隔时间 */ // @Value("${curator.elapsedTimeMs}") private String elapsedTimeMs; /** * zookeeper地址 */ // @Value("${curator.connectString}") private String connectString; /** * session超时时间 */ // @Value("${curator.sessionTimeoutMs}") private String sessionTimeoutMs; /** * 连接超时时间 */ // @Value("${curator.connectionTimeoutMs}") private String connectionTimeoutMs; /*@Bean(initMethod = "start") public CuratorFramework curatorFramework() { return CuratorFrameworkFactory.newClient(connectString,sessionTimeoutMs, connectionTimeoutMs, new RetryNTimes(retryCount, elapsedTimeMs)); }*/ @Bean(initMethod = "start") public CuratorFramework curatorFramework() { // this.init(); logger.info("init CuratorFramework client"); return CuratorFrameworkFactory.newClient(connectString,Integer.parseInt(sessionTimeoutMs), Integer.parseInt(connectionTimeoutMs), new RetryNTimes(Integer.parseInt(retryCount), Integer.parseInt(elapsedTimeMs))); } public String getRetryCount() { return retryCount; } public void setRetryCount(String retryCount) { this.retryCount = retryCount; } public String getElapsedTimeMs() { return elapsedTimeMs; } public void setElapsedTimeMs(String elapsedTimeMs) { this.elapsedTimeMs = elapsedTimeMs; } public String getConnectString() { return connectString; } public void setConnectString(String connectString) { this.connectString = connectString; } public String getSessionTimeoutMs() { return sessionTimeoutMs; } public void setSessionTimeoutMs(String sessionTimeoutMs) { this.sessionTimeoutMs = sessionTimeoutMs; } public String getConnectionTimeoutMs() { return connectionTimeoutMs; } public void setConnectionTimeoutMs(String connectionTimeoutMs) { this.connectionTimeoutMs = connectionTimeoutMs; } }
之后是具体的工具类
package com.voole.platform.util; import java.util.concurrent.CountDownLatch; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * * <P>Description: InterProcessMutex curator的锁实现机制 * 就目前来看,在多个线程共同创建一个已经创建的目录时,在这个目录被删除的时候,多个都想创建目录的线程可能都会被惊醒,争相创建目录,失败重新进入等待状态 * 目前只是猜测 * </P> * @ClassName: DistributedLockByCurator * @author 冯浩 2019年2月20日 下午4:28:18 * @see TODO */ @Component public class DistributedLockByCurator implements InitializingBean { private static final Logger logger = LoggerFactory.getLogger(DistributedLockByCurator.class); private final static String ROOT_PATH_LOCK = "rootlock"; private CountDownLatch countDownLatch = new CountDownLatch(1); @Autowired private CuratorFramework curatorFramework; public void acquireLock(String path) { String keypath = "/"+ROOT_PATH_LOCK+"/"+path; while(true){ try { curatorFramework .create() .creatingParentContainersIfNeeded() .withMode(CreateMode.EPHEMERAL) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath(keypath); logger.info("sucess to acquire lock for path:{}",keypath); break; }catch (Exception e) { //节点创建失败,锁获取失败 logger.info("failed to acquire lock for path:{}",keypath); logger.info("while try again"); try { if(countDownLatch.getCount()<=0) { countDownLatch = new CountDownLatch(1); } countDownLatch.await(); }catch (Exception e1) { e1.printStackTrace(); } } } } /** * * <p>Title: releaseLock</p> * <p>Description: 删除节点,释放锁</p> * @param path * @return * @author 冯浩 2019年2月20日 下午2:32:35 */ public boolean releaseLock(String path) { try { String keypath = "/"+ROOT_PATH_LOCK+"/"+path; if(curatorFramework.checkExists().forPath(keypath) != null) { curatorFramework.delete().forPath(keypath); } }catch (Exception e) { logger.error("failed to release lock"); return false; } return true; } private void addWatcher(String path) throws Exception { String keypath; if(path.equals(ROOT_PATH_LOCK)) { keypath = "/"+path; }else { keypath = "/"+ROOT_PATH_LOCK+"/"+path; } final PathChildrenCache cache = new PathChildrenCache(curatorFramework, keypath, false); cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); cache.getListenable().addListener((client,event) ->{ if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { String oldPath = event.getData().getPath(); logger.info("success to release lock for path:{}",oldPath); if(oldPath.contains(path)) { logger.error(" listen change {}",oldPath); logger.error(" path {}",path); countDownLatch.countDown(); } } }); } @Override public void afterPropertiesSet() throws Exception { curatorFramework = curatorFramework.usingNamespace("lock-namespace"); String path = "/"+ROOT_PATH_LOCK; try { if(curatorFramework.checkExists().forPath(path) == null) { curatorFramework .create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath(path); } addWatcher(ROOT_PATH_LOCK); logger.info("root path 的watch事件创建成功"); }catch (Exception e) { logger.error("connect zookpeeper fail,please check the log >>{}",e.getMessage()); } } }
之后调用工具类的加锁,解锁方法即可哦