zoukankan      html  css  js  c++  java
  • zookeeper记录5(curator与spring/ssm整合、zookeeper分布式锁)

    录:

    1、curator与spring(ssm)整合
    2、zookeeper实现分布式锁
    2.1、使用zk分布式锁流程图
    2.2、使用zk分布式锁的案例
    2.3、测试

    1、curator与spring(ssm)整合    <--返回目录

      依赖

    <!-- zookeeper -->
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.11</version>
    </dependency>
    <!-- curator -->
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>4.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>4.0.0</version>
    </dependency>

      applicationContext-zookeeper.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:aop="http://www.springframework.org/schema/aop"
        xmlns:tx="http://www.springframework.org/schema/tx"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:util="http://www.springframework.org/schema/util"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
            http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd
            http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
            http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
    
        <description>zk与spring容器结合,启动项目时建立与zk的连接</description>
        <!-- zookeeper 重试策略 -->
        <bean id="retryPolicy" class="org.apache.curator.retry.RetryNTimes">
            <!--重试次数-->
            <constructor-arg index="0" value="5"></constructor-arg>
            <!--重试间隔-->
            <constructor-arg index="1" value="5000"></constructor-arg>
        </bean>
    
        <bean id="client" class="org.apache.curator.framework.CuratorFrameworkFactory" factory-method="newClient" init-method="start">
            <!--zk服务地址-->
            <constructor-arg index="0" value="192.168.213.200:2181,192.168.213.200:2182,192.168.213.200:2183"></constructor-arg>
            <!--session timeoout 会话超时时间-->
            <constructor-arg index="1" value="10000"></constructor-arg>
            <!--connectionTimeoutMs 创建连接超时时间-->
            <constructor-arg index="2" value="5000"></constructor-arg>
            <!--重试策略-->
            <constructor-arg index="3" ref="retryPolicy"></constructor-arg>
        </bean>
    
        <!--注入zk客户端-->
        <bean id="zkCurator" class="com.oy.utils.ZKCurator" init-method="init">
            <constructor-arg index="0" ref="client"/>
        </bean>
    
    </beans>

      ZKCurator

    package com.oy.utils;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.RetryNTimes;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class ZKCurator {
        private static final Logger log = LoggerFactory.getLogger(ZKCurator.class);
        private CuratorFramework client = null; // zk客户端
    
        public ZKCurator(CuratorFramework client) {
            this.client = client; // 由spring容器注入
            // 参数1 重试次数; 参数2 每次重试间隔的时间
    //        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
    //        client = CuratorFrameworkFactory.builder()
    //                .connectString(zkServerPath).sessionTimeoutMs(20000)
    //                .retryPolicy(retryPolicy).build();
    //        client.start();
        }
    
        public void init() {
            // 使用命名空间
            client = client.usingNamespace("zk-namespace");
        }
    
        /**
         * 测试客户端连接
         * @param args
         * @throws Exception
         */
    //    public static void main(String[] args) throws Exception {
    //        ZKCurator curatorOperator = new ZKCurator();
    //        boolean started = curatorOperator.client.isStarted();
    //        log.warn("当前客户端状态: " + (started ? "连接中" : "已关闭"));
    //
    //        new Thread().sleep(5000);
    //        curatorOperator.closeZKClient();
    //        boolean started1 = curatorOperator.client.isStarted();
    //        log.warn("当前客户端状态: " + (started1 ? "连接中" : "已关闭"));
    //    }
    
        /**
         * 判断zk是否连接
         */
        public boolean isZKAlive() {
            return client.isStarted();
        }
    
        /**
         * 关闭zk客户端连接
         */
        public void closeZKClient() {
            if (client != null) client.close();
        }
    }

      DemoController

    @Controller
    public class DemoController {
        @Autowired
        private ZKCurator zkCurator;
    
        @RequestMapping("/isZKAlive")
        @ResponseBody
        public String isZKAlive() {
            return zkCurator.isZKAlive() ? "yes" : "no";
        }
    }

      启动项目,测试zookeeper能否连接:http://localhost:8080/isZKAlive

    2、zookeeper实现分布式锁    <--返回目录

    2.1、使用zk分布式锁流程图    <--返回目录

    2.2、使用zk分布式锁的案例    <--返回目录

       BuyController

    @RestController
    public class BuyController {
        @Resource(name = "buyService")
        private BuyService buyService;
        @Resource(name = "buy2Service")
        private Buy2Service buy2Service;
    
        // 查看库存总数
        @RequestMapping("/count")
        public String queryCount() {
            return "" + buyService.queryCount();
        }
        // 重置库存总数, 方便测试
        @RequestMapping("/reset")
        public String resetCount() {
            return "" + buyService.resetCount();
        }
        // 测试: 并发请求导致数据不一致(库存不够)
        @GetMapping("/buy")
        public String buy() throws Exception {
            return buyService.buy() ? "succ" : "failed";
        }
        // 测试: zk分布式锁。注: 本demo只是为了测试zk分布式锁, 并没有搭建分布式环境。
        @GetMapping("/buy2")
        public String buy2() throws Exception {
            return buy2Service.buy() ? "succ" : "failed";
        }
    
    }

      BuyService

    @Service("buyService")
    public class BuyService {
        final static Logger log = LoggerFactory.getLogger(BuyService.class);
    
        public int queryCount() {
            return Config.count;
        }
    
        public int resetCount() {
            Config.count = 8;
            return Config.count;
        }
    
        public boolean buy() {
            int buyCounts = 5; // 购买5件
    
            // 1. 判断库存
            if (Config.count < buyCounts) {
                log.info("库存剩余{}件,用户需求量{}件,库存不足,订单创建失败...",
                        Config.count, buyCounts);
                return false;
            }
    
            // 2. 创建订单. 为了方便测试, 模拟业务处理时间为5s
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            boolean isOrderCreated = true;
    
            // 3. 创建订单成功后,扣除库存
            if (isOrderCreated) {
                log.info("{}订单创建成功...", Thread.currentThread().getName());
                Config.count -= buyCounts;
            } else {
                log.info("{}订单创建失败...", Thread.currentThread().getName());
                return false;
            }
    
            return true;
        }
    
    }

      Buy2Service

    @Service("buy2Service")
    public class Buy2Service {
        final static Logger log = LoggerFactory.getLogger(Buy2Service.class);
    
        @Autowired
        private DistributedLock distributedLock;
    
        /**
         * 注意每个return前都要释放锁
         *
         * @return
         */
        public boolean buy() {
            // 执行订单流程前使用当前业务获取分布式锁
            distributedLock.getLock();
    
            int buyCounts = 5; // 购买5件
    
            // 1. 判断库存
            if (Config.count < buyCounts) {
                log.info("库存剩余{}件,用户需求量{}件,库存不足,订单创建失败...",
                        Config.count, buyCounts);
                // 释放锁,让下一个请求获取锁
                distributedLock.releaseLock();
                return false;
            }
    
            // 2. 创建订单. 为了方便测试, 模拟业务处理时间为5s
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
                // 释放锁,让下一个请求获取锁
                distributedLock.releaseLock();
            }
            boolean isOrderCreated = true;
    
            // 3. 创建订单成功后,扣除库存
            if (isOrderCreated) {
                log.info("{}订单创建成功...", Thread.currentThread().getName());
                Config.count -= buyCounts;
            } else {
                log.info("{}订单创建失败...", Thread.currentThread().getName());
                // 释放锁,让下一个请求获取锁
                distributedLock.releaseLock();
                return false;
            }
    
            // 释放锁,让下一个请求获取锁
            distributedLock.releaseLock();
            return true;
        }
    
    }

      DistributedLock

    package com.oy.utils;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.recipes.cache.PathChildrenCache;
    import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
    import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooDefs;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.concurrent.CountDownLatch;
    
    public class DistributedLock {
        private static final Logger log = LoggerFactory.getLogger(DistributedLock.class);
        private CuratorFramework client = null; // zk客户端
        // 用于挂起当前请求,并且等待上一个分布式锁释放
        private static CountDownLatch zkLockLatch = new CountDownLatch(1);
    
        // 分布式锁的总节点名
        private static final String ZK_LOCK_PROJECT="zk_lock_project";
        // 分布式锁节点
        private static final String ORDER_COUNT_LOCK = "order_count_lock";
    
        public DistributedLock(CuratorFramework client) {
            this.client = client; // 由spring容器注入
        }
    
        public void init() {
            /*
            创建zk锁的总节点
                ZKLocks-Namespace
                    |
                    ---- zk_lock_project
                        |
                        ---- order_count_lock
             */
            try {
                if (client.checkExists().forPath("/" + ZK_LOCK_PROJECT) == null) {
                    client.create().creatingParentContainersIfNeeded()
                            .withMode(CreateMode.PERSISTENT)
                            .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                            .forPath("/" + ZK_LOCK_PROJECT);
                }
                // 针对zk的分布式锁节点,创建相应的watcher事件监听
                addWatcherToLock("/" + ZK_LOCK_PROJECT);
            } catch (Exception e) {
                log.error("客户端连接zk服务器错误");
            }
        }
    
        /**
         * 获取分布式锁
         */
        public void getLock() {
            // 使用死循环,当且仅当上一个锁释放并且当前请求获得锁成功后才跳出
            while (true) {
                try {
                    client.create().creatingParentContainersIfNeeded()
                            .withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                            .forPath("/" + ZK_LOCK_PROJECT + "/" + ORDER_COUNT_LOCK);
                    log.warn("{}获得分布式锁成功", Thread.currentThread().getName());
                    return;
                } catch (Exception e) {
                    //log.warn("获得分布式锁失败. msg: {}", e);
                    try {
                        // 如果没有获取到锁,需要重新设置同步资源值
                        if (zkLockLatch.getCount() <= 0) {
                            zkLockLatch = new CountDownLatch(1);
                        }
                        // 阻塞线程
                        zkLockLatch.await();
                    } catch (InterruptedException ex) {
                    }
                }
            }
        }
    
        /**
         * 释放分布式锁
         */
        public boolean releaseLock() {
            try {
                if (client.checkExists().forPath("/" + ZK_LOCK_PROJECT + "/" + ORDER_COUNT_LOCK) != null) {
                    client.delete().forPath("/" + ZK_LOCK_PROJECT + "/" + ORDER_COUNT_LOCK);
                }
            } catch (Exception e) {
                log.warn("分布式锁释放失败. msg: {}", e);
                return false;
            }
            log.warn("{}分布式锁释放完毕", Thread.currentThread().getName());
            return true;
        }
    
        public void addWatcherToLock(String path) throws Exception {
            final PathChildrenCache cache = new PathChildrenCache(client, path, true);
            cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
            cache.getListenable().addListener(new PathChildrenCacheListener() {
                @Override
                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                    if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
                        String path = event.getData().getPath();
                        log.warn("上一次会话已释放锁或该会话已断开,节点路径为:" + path);
                        if (path.contains(ORDER_COUNT_LOCK)) {
                            log.warn("释放计数器,让当前请求来获得分布式锁。。。");
                            zkLockLatch.countDown();
                        }
                    }
                }
            });
        }
    
        /**
         * 判断zk是否连接
         */
        public boolean isZKAlive() {
            return client.isStarted();
        }
    
        /**
         * 关闭zk客户端连接
         */
        public void closeZKClient() {
            if (client != null) client.close();
        }
    }

      applicationContext-zookeeper.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:aop="http://www.springframework.org/schema/aop"
           xmlns:tx="http://www.springframework.org/schema/tx"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:util="http://www.springframework.org/schema/util"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
            http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd
            http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
            http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
    
        <description>zk与spring容器结合,启动项目时建立与zk的连接</description>
        <!-- zookeeper 重试策略 -->
        <bean id="retryPolicy" class="org.apache.curator.retry.RetryNTimes">
            <!--重试次数-->
            <constructor-arg index="0" value="5"></constructor-arg>
            <!--重试间隔-->
            <constructor-arg index="1" value="5000"></constructor-arg>
        </bean>
    
        <bean id="client" class="org.apache.curator.framework.CuratorFrameworkFactory" factory-method="newClient"
              init-method="start">
            <!--zk服务地址-->
            <constructor-arg index="0"
                             value="192.168.213.200:2181,192.168.213.200:2182,192.168.213.200:2183"></constructor-arg>
            <!--session timeoout 会话超时时间-->
            <constructor-arg index="1" value="10000"></constructor-arg>
            <!--connectionTimeoutMs 创建连接超时时间-->
            <constructor-arg index="2" value="5000"></constructor-arg>
            <!--重试策略-->
            <constructor-arg index="3" ref="retryPolicy"></constructor-arg>
        </bean>
    
        <!--zk客户端-->
        <bean id="zkCurator" class="com.oy.utils.ZKCurator" init-method="init">
            <constructor-arg index="0" ref="client"/>
        </bean>
    
        <bean id="distributedLock" class="com.oy.utils.DistributedLock" init-method="init">
            <constructor-arg index="0" ref="client"/>
        </bean>
    
    </beans>

    2.3、测试    <--返回目录

      1)查询库存 http://localhost:8080/count,结果为 8。

      2)并发请求 http://localhost:8080/buy 5次

      控制台打印

    2021-03-27 15:13:25,992 [http-bio-8080-exec-2] [com.oy.service.BuyService.buy(BuyService.java:41)] - [INFO] http-bio-8080-exec-2订单创建成功...
    2021-03-27 15:13:26,319 [http-bio-8080-exec-3] [com.oy.service.BuyService.buy(BuyService.java:41)] - [INFO] http-bio-8080-exec-3订单创建成功...
    2021-03-27 15:13:26,597 [http-bio-8080-exec-4] [com.oy.service.BuyService.buy(BuyService.java:41)] - [INFO] http-bio-8080-exec-4订单创建成功...
    2021-03-27 15:13:26,840 [http-bio-8080-exec-5] [com.oy.service.BuyService.buy(BuyService.java:41)] - [INFO] http-bio-8080-exec-5订单创建成功...
    2021-03-27 15:13:27,119 [http-bio-8080-exec-6] [com.oy.service.BuyService.buy(BuyService.java:41)] - [INFO] http-bio-8080-exec-6订单创建成功...

      再次查询库存 http://localhost:8080/count,结果为 -17。

      3)为了下一次测试,重置库存 http://localhost:8080/reset,然后查询库存 http://localhost:8080/count,结果为 8。

      4)并发请求 http://localhost:8080/buy2 5次

      控制台打印

      再次查询库存 http://localhost:8080/count,结果为 3。

  • 相关阅读:
    Prommetheus 插件监控 ES
    Linux LVM条带化
    MYSQL wait_timeout以及connect_timeout.这两个有什么区别
    alertmanager配置文件说明(转载)
    腾讯云MongoDB: skip查询内核优化(转载)
    MongoDB主从复制介绍和常见问题说明(转载)
    MongoDB 批量更新、批量新增、批量删除、批量替换 —— bulkWrite操作
    MongoDB Cluster 数据平衡优化
    MongoDB副本集提高读写速率
    Postgresql中时间戳与日期的相互转换(同样适用于GreenPlum)
  • 原文地址:https://www.cnblogs.com/xy-ouyang/p/14927228.html
Copyright © 2011-2022 走看看