zoukankan      html  css  js  c++  java
  • 分布式锁(zookeeper)

    首先就是添加对分布式锁支持的比较好的客户端依赖

    <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>2.12.0</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-api</artifactId>
                    </exclusion>
                </exclusions>
    </dependency>
    <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>2.12.0</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
    </dependency>

    添加配置信息

    curator.retryCount=5
    curator.elapsedTimeMs=5000
    curator.connectString=127.0.0.1:2181
    curator.sessionTimeoutMs=60000
    curator.connectionTimeoutMs=5000

    添加配置类信息

    package com.voole.config.curator;
    
    
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.RetryNTimes;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.PropertySource;
    import org.springframework.stereotype.Component;
    
    
    @Component
    @ConfigurationProperties(prefix="curator",ignoreInvalidFields=true,ignoreUnknownFields=true)
    @PropertySource("classpath:curator.properties")
    public class CuratorConfiguration {
        
        private final static Logger logger = LoggerFactory.getLogger(CuratorConfiguration.class);
        
        
    /*    public void init() {
            try {
                Properties param = new Properties();
                FileInputStream ois = FileUtils.openInputStream(ResourceUtils.getFile("classpath:curator.properties"));
                param.load(ois);
                setRetryCount(param.getProperty("retryCount"));
                setElapsedTimeMs(param.getProperty("elapsedTimeMs"));
                setConnectString(param.getProperty("connectString"));
                setSessionTimeoutMs(param.getProperty("sessionTimeoutMs"));
                setConnectionTimeoutMs(param.getProperty("connectionTimeoutMs"));
            } catch (Exception e) {
                logger.error("
    配置文件curator.properties 加载失败");
                e.printStackTrace();
            } 
        }*/
        
        
    
        /**
         * 重试次数
         */
    //    @Value("${curator.retryCount}")
        private String retryCount;
        /**
         * 重试间隔时间
         */
    //    @Value("${curator.elapsedTimeMs}")
        private String elapsedTimeMs;
        /**
         * zookeeper地址
         */
    //    @Value("${curator.connectString}")
        private String connectString;
        /**
         * session超时时间
         */
    //    @Value("${curator.sessionTimeoutMs}")
        private String sessionTimeoutMs;
        /**
         * 连接超时时间
         */
    //    @Value("${curator.connectionTimeoutMs}")
        private String connectionTimeoutMs;
        
        /*@Bean(initMethod = "start")
        public CuratorFramework curatorFramework() {
            return CuratorFrameworkFactory.newClient(connectString,sessionTimeoutMs,
                    connectionTimeoutMs,
                    new RetryNTimes(retryCount, elapsedTimeMs));
        }*/
        
        @Bean(initMethod = "start")
        public CuratorFramework curatorFramework() {
    //        this.init();
            logger.info("init CuratorFramework client");
            return CuratorFrameworkFactory.newClient(connectString,Integer.parseInt(sessionTimeoutMs),
                    Integer.parseInt(connectionTimeoutMs),
                    new RetryNTimes(Integer.parseInt(retryCount), Integer.parseInt(elapsedTimeMs)));
        }
    
        public String getRetryCount() {
            return retryCount;
        }
    
        public void setRetryCount(String retryCount) {
            this.retryCount = retryCount;
        }
    
        public String getElapsedTimeMs() {
            return elapsedTimeMs;
        }
    
        public void setElapsedTimeMs(String elapsedTimeMs) {
            this.elapsedTimeMs = elapsedTimeMs;
        }
    
        public String getConnectString() {
            return connectString;
        }
    
        public void setConnectString(String connectString) {
            this.connectString = connectString;
        }
    
        public String getSessionTimeoutMs() {
            return sessionTimeoutMs;
        }
    
        public void setSessionTimeoutMs(String sessionTimeoutMs) {
            this.sessionTimeoutMs = sessionTimeoutMs;
        }
    
        public String getConnectionTimeoutMs() {
            return connectionTimeoutMs;
        }
    
        public void setConnectionTimeoutMs(String connectionTimeoutMs) {
            this.connectionTimeoutMs = connectionTimeoutMs;
        }
    
        
        
        
        
        
    
    }

    之后是具体的工具类

    package com.voole.platform.util;
    
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.recipes.cache.PathChildrenCache;
    import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooDefs;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * 
     * <P>Description: InterProcessMutex curator的锁实现机制
     * 就目前来看,在多个线程共同创建一个已经创建的目录时,在这个目录被删除的时候,多个都想创建目录的线程可能都会被惊醒,争相创建目录,失败重新进入等待状态
     * 目前只是猜测
     * </P>
     * @ClassName: DistributedLockByCurator
     * @author 冯浩  2019年2月20日 下午4:28:18
     * @see TODO
     */
    @Component
    public class DistributedLockByCurator implements InitializingBean {
        
        private static final Logger logger = LoggerFactory.getLogger(DistributedLockByCurator.class);
        
        private final static String ROOT_PATH_LOCK = "rootlock";
        
        private CountDownLatch countDownLatch = new CountDownLatch(1);
        
        @Autowired
        private CuratorFramework curatorFramework;
        
        public void acquireLock(String path) {
            String keypath = "/"+ROOT_PATH_LOCK+"/"+path;
            while(true){
                try {
                    curatorFramework
                            .create()
                            .creatingParentContainersIfNeeded()
                            .withMode(CreateMode.EPHEMERAL)
                            .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                            .forPath(keypath);
                    logger.info("sucess to acquire lock for path:{}",keypath);
                    break;
                }catch (Exception e) {
                    //节点创建失败,锁获取失败
                    logger.info("failed to acquire lock for path:{}",keypath);
                    logger.info("while try again");
                    try {
                        if(countDownLatch.getCount()<=0) {
                            countDownLatch = new CountDownLatch(1);
                        }
                        countDownLatch.await();
                    }catch (Exception e1) {
                        e1.printStackTrace();
                    }
                }
            }
        }
        
        /**
         * 
         * <p>Title: releaseLock</p>
         * <p>Description: 删除节点,释放锁</p>
         * @param path
         * @return
         * @author 冯浩  2019年2月20日 下午2:32:35
         */
        public boolean releaseLock(String path) {
            try {
                String keypath = "/"+ROOT_PATH_LOCK+"/"+path;
                if(curatorFramework.checkExists().forPath(keypath) != null) {
                    curatorFramework.delete().forPath(keypath);
                }
            }catch (Exception e) {
                logger.error("failed to release lock");
                return false;
            }
            return true;
        }
        
        private void addWatcher(String path) throws Exception {
            String keypath;
            if(path.equals(ROOT_PATH_LOCK)) {
                keypath = "/"+path;
            }else {
                keypath = "/"+ROOT_PATH_LOCK+"/"+path;
            }
            final PathChildrenCache cache = new PathChildrenCache(curatorFramework, keypath, false);
            cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
            cache.getListenable().addListener((client,event) ->{
                if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
                    String oldPath = event.getData().getPath();
                    logger.info("success to release lock for path:{}",oldPath);
                    if(oldPath.contains(path)) {
                        logger.error("
    listen change {}",oldPath);
                        logger.error("
    path {}",path);
                        countDownLatch.countDown();
                    }
                }
            });
        }
        
        
        @Override
        public void afterPropertiesSet() throws Exception {
            curatorFramework = curatorFramework.usingNamespace("lock-namespace");
            String path = "/"+ROOT_PATH_LOCK;
            try {
                if(curatorFramework.checkExists().forPath(path) == null) {
                    curatorFramework
                            .create()
                            .creatingParentsIfNeeded()
                            .withMode(CreateMode.PERSISTENT)
                            .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                            .forPath(path);
                }
                addWatcher(ROOT_PATH_LOCK);
                logger.info("root path 的watch事件创建成功");
            }catch (Exception e) {
                logger.error("connect zookpeeper fail,please check the log >>{}",e.getMessage());
            }
        }
        
        
    
    }

    之后调用工具类的加锁,解锁方法即可哦

  • 相关阅读:
    张维迎:你必须知道的10个经济学原理
    艾德莱斯绸:“千年时尚”托起新产业
    Sending forms through JavaScript[form提交 form data]
    Sending form data
    Your first HTML form
    form submission
    <input type="file">
    web storm查看文件结构
    jQuery-File-Upload
    IHttpHandler
  • 原文地址:https://www.cnblogs.com/nihaofenghao/p/10577834.html
Copyright © 2011-2022 走看看