zoukankan      html  css  js  c++  java
  • springboot2整合zookeeper集成curator

    步骤:

    1- pom.xml

    <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>2.12.0</version>
            </dependency>

    2- yml配置:

    zk:
      url: 127.0.0.1:2181
      localPath: /newlock
      timeout: 3000

    3- 配置类

    package com.test.domi.config;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class ZookeeperConf {
    
        @Value("${zk.url}")
        private String zkUrl;
    
        @Bean
        public CuratorFramework getCuratorFramework(){
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
            CuratorFramework client = CuratorFrameworkFactory.newClient(zkUrl,retryPolicy);
            client.start();
            return client;
        }
    }

    4- 使用

    package com.test.domi.common.utils.lock;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.recipes.cache.NodeCache;
    import org.apache.curator.framework.recipes.cache.NodeCacheListener;
    import org.apache.zookeeper.CreateMode;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    import java.io.IOException;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    
    @Component("zklock")
    public class ZKlock implements Lock {
    
        @Autowired
        private CuratorFramework zkClient;
        @Value("${zk.localPath}")
        private String lockPath;
        private String currentPath;
        private String beforePath;
    
        @Override
        public boolean tryLock() {
            try {
                //根节点的初始化放在构造函数里面不生效
                if (zkClient.checkExists().forPath(lockPath) == null) {
                    System.out.println("初始化根节点==========>" + lockPath);
                    zkClient.create().creatingParentsIfNeeded().forPath(lockPath);
                }
                System.out.println("当前线程" + Thread.currentThread().getName() + "初始化根节点" + lockPath);
            } catch (Exception e) {
            }
    
            if (currentPath == null) {
                try {
                    currentPath = this.zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                            .forPath(lockPath + "/");
                } catch (Exception e) {
                    return false;
                }
            }
            try {
                //此处该如何获取所有的临时节点呢?如locks00004.而不是获取/locks/order中的order作为子节点??
                List<String> childrens = this.zkClient.getChildren().forPath(lockPath);
                Collections.sort(childrens);
                if (currentPath.equals(lockPath + "/" + childrens.get(0))) {
                    System.out.println("当前线程获得锁" + currentPath);
                    return true;
                }else{
                   //取前一个节点
                    int curIndex = childrens.indexOf(currentPath.substring(lockPath.length() + 1));
                    //如果是-1表示children里面没有该节点
                    beforePath = lockPath + "/" + childrens.get(curIndex - 1);
                }
            } catch (Exception e) {
                return false;
            }
            return false;
        }
    
        @Override
        public void lock() {
            if (!tryLock()) {
                waiForLock();
                lock();
            }
        }
    
        @Override
        public void unlock() {
            try {
                zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(currentPath);
            } catch (Exception e) {
                //guaranteed()保障机制,若未删除成功,只要会话有效会在后台一直尝试删除
            }
        }
    
        private void waiForLock(){
            CountDownLatch cdl = new CountDownLatch(1);
            //创建监听器watch
              NodeCache nodeCache = new NodeCache(zkClient,beforePath);
            try {
                nodeCache.start(true);
                nodeCache.getListenable().addListener(new NodeCacheListener() {
                    @Override
                    public void nodeChanged() throws Exception {
                        cdl.countDown();
                        System.out.println(beforePath + "节点监听事件触发,重新获得节点内容为:" + new String(nodeCache.getCurrentData().getData()));
                    }
                });
            } catch (Exception e) {
            }
            //如果前一个节点还存在,则阻塞自己
            try {
                if (zkClient.checkExists().forPath(beforePath) == null) {
                    cdl.await();
                }
            } catch (Exception e) {
            }finally {
                //阻塞结束,说明自己是最小的节点,则取消watch,开始获取锁
                try {
                    nodeCache.close();
                } catch (IOException e) {
                }
            }
        }
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return false;
        }
    
        @Override
        public Condition newCondition() {
            return null;
        }
    
    }

    5- 调用demo

    package com.test.domi.controller;
    
    import com.test.domi.common.utils.ZkUtil;
    import com.test.domi.common.utils.lock.ZKlock;
    import org.I0Itec.zkclient.ZkClient;
    import org.apache.curator.framework.CuratorFramework;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    @RequestMapping("/zk")
    public class ZKController {
    
        @Autowired
        private CuratorFramework zkClient;
    //    @Autowired
    //    private ZkClient zkClient;
    
        private String url = "127.0.0.1:2181";
        private int timeout = 3000;
        private String lockPath = "/testl";
        @Autowired
        private ZKlock zklock;
        private int k = 1;
    
        @GetMapping("/lock")
        public Boolean getLock() throws Exception{
    
            for (int i = 0; i < 10; i++) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                      zklock.lock();

              zklock.unlock();
    } }).start(); }
    return true; } }
  • 相关阅读:
    《C++ 并发编程》- 第1章 你好,C++的并发世界
    30分钟,让你成为一个更好的程序员
    程序员技术练级攻略
    谈新技术学习方法-如何学习一门新技术新编程语言
    计算机科学中最重要的32个算法
    程序员学习能力提升三要素
    一位在MIT教数学的老师总结了十条经验
    学习算法之路
    十个顶级的C语言资源助你成为优秀的程序员
    Linux中LoadAverage分析
  • 原文地址:https://www.cnblogs.com/domi22/p/9748083.html
Copyright © 2011-2022 走看看