zoukankan      html  css  js  c++  java
  • Zookeeper实现分布式锁

    在单节点上运行的程序,多个线程对一个共享变量进行操作,则有可能出现线程安全的问题。例如:春运期间购买火车票

    public class BuyTicketRunnable implements Runnable {
        //static 表示ticketCount是全局共享变量
        public static int ticketCount = 100;
    
        @Override
        public void run() {
            while (true) {
                if (ticketCount > 0) {
                    try {
                        // 此处休眠500毫秒更能看出效果
                        Thread.sleep(20);
                        ticketCount--;
                        System.out.println("当前剩余票的数量:" + ticketCount);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        return;
                    }
                }
    
            }
        }
    
        public static void main(String[] args) {
            for (int i = 0; i < 10; i++) {
                new Thread(new BuyTicketRunnable()).start();
            }
        }
    
    }
    当前剩余票的数量:15
    当前剩余票的数量:11
    当前剩余票的数量:9
    当前剩余票的数量:9
    当前剩余票的数量:9
    当前剩余票的数量:8
    当前剩余票的数量:5
    当前剩余票的数量:3
    当前剩余票的数量:4
    当前剩余票的数量:6
    当前剩余票的数量:6
    当前剩余票的数量:2
    当前剩余票的数量:0
    当前剩余票的数量:1
    当前剩余票的数量:-1
    当前剩余票的数量:-2
    当前剩余票的数量:-3
    当前剩余票的数量:-5
    当前剩余票的数量:-7
    当前剩余票的数量:-5
    当前剩余票的数量:-6

    最后运行的结果显示和实际情况有出入的,之所以出现不是连续的减少车票,是因为多个线程是并行执行的,从结果看出了多个线程操作共享数据,会出现线程安全问题,所以我们为了确保这种问题不会出现,可以使用锁.

    锁的分类有很多:读写锁,悲观锁,乐观锁,重入锁等...........

    Synchronized

    synchronized属于重入锁,

    重入锁,也叫做递归锁,指的是同一线程 外层函数获得锁之后 ,内层递归函数仍然有获取该锁的代码,但不受影响。

    public class BuyTicketRunnable implements Runnable {
        // static 表示ticketCount是全局共享变量
        public static int ticketCount = 100;
        //lock作为全局变量锁
        public static Object lock = new Object();
    
        @Override
        public void run() {
            synchronized (BuyTicketRunnable.lock) {
                while (true) {
                    if (ticketCount > 0) {
                        try {
                            // 此处休眠500毫秒更能看出效果
                            Thread.sleep(20);
                            ticketCount--;
                            System.out.println("当前剩余票的数量:" + ticketCount);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            return;
                        }
                    }
    
                }
            }
        }
    
        public static void main(String[] args) {
            for (int i = 0; i < 10; i++) {
                new Thread(new BuyTicketRunnable()).start();
            }
        }
    
    }
    当前剩余票的数量:29
    当前剩余票的数量:28
    当前剩余票的数量:27
    当前剩余票的数量:26
    当前剩余票的数量:25
    当前剩余票的数量:24
    当前剩余票的数量:23
    当前剩余票的数量:22
    当前剩余票的数量:21
    当前剩余票的数量:20
    当前剩余票的数量:19
    当前剩余票的数量:18
    当前剩余票的数量:17
    当前剩余票的数量:16
    当前剩余票的数量:15
    当前剩余票的数量:14
    当前剩余票的数量:13
    当前剩余票的数量:12
    当前剩余票的数量:11
    当前剩余票的数量:10
    当前剩余票的数量:9
    当前剩余票的数量:8
    当前剩余票的数量:7
    当前剩余票的数量:6
    当前剩余票的数量:5
    当前剩余票的数量:4
    当前剩余票的数量:3
    当前剩余票的数量:2
    当前剩余票的数量:1
    当前剩余票的数量:0

    使用lock锁

    lock锁需要手动加锁,手动释放锁,如果遇到异常,锁还在不会自动释放,Synchronized锁如果遇到异常,则会自动释放锁

    在分布式或者集群中可能出现的数据安全问题

     

     Zookeeper实现分布式锁原理

        zookeeper是以树形结构的方式存储数据,每个节点不能重复,zookeeper里面有四种节点,两大类节点,持久节点和临时节点。持久节点是存放到本地里,连接断了也存在,但是临时节点如果zk的连接断掉以后。临时节点则不会存在,正是因为这种特性,我们可以使用zk的临时节点来作为锁。

     有A,B两个连接操作数据,A连接先去创建节点,如果能创建节点成功,表示获取了锁可以操作数据,此时如果B连接在来创建节点,根据zk存储特性,节点名字不能有重复,此时B连接是无法创建节点成功,所以处于等待状态。等待A连接处理完数据以后,断开连接,临时节点断开连接以后该节点就消失(释放锁),此时B 就可以在创建节点(获取锁),B处理完以后断开连接(释放锁)。所以zk作为锁的原理是,临时节点创建成功(获取锁),断开连接(释放锁).

    JAVA代码实现Zookeeper分布式锁

      导入Zookeeper所需要的依赖

            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>3.4.14</version>
            </dependency>
            <dependency>
                <groupId>com.101tec</groupId>
                <artifactId>zkclient</artifactId>
                <version>0.10</version>
            </dependency>

     增接口:

    package com.zk.distributed;
    
    public interface DistributedLock {
        // 获取锁方法
        void getLock();
    
        // 释放锁方法
        void unLock();
    
    }

    抽象类实现接口:

    package com.zk.distributed;
    
    import java.util.concurrent.CountDownLatch;
    
    import org.I0Itec.zkclient.ZkClient;
    
    public abstract class ZkDistributedAbstractLock implements DistributedLock {
        // zk连接地址
        private  final String CONNETION_ADDR = "127.0.0.1:2181";
        // 创建zk连接
        protected ZkClient zkClient = new ZkClient(CONNETION_ADDR);
        //创建这个节点,以这个节点作为锁
        protected  final String PATH = "/Distributed";
        //信号量
        protected CountDownLatch countDownLatch = null;
    
        @Override
        public void getLock() {
            if (tryGetLock()) {
                System.out.println("###(创建节点成功)获取锁成功#####");
            } else {
                // 等待
                waitUnLock();
                // 重新获取锁
                getLock();
            }
        }
    
        // 尝试去获取锁
        abstract Boolean tryGetLock();
    
        // 如果当前锁被占用,则等待
        abstract void waitUnLock();
    
        @Override
        public void unLock() {
            if (zkClient != null) {
                zkClient.close();
                System.out.println("关闭连接,释放资源(释放锁)");
            }
        }
    }

    实现抽象类:

    package com.zk.distributed;
    
    import java.util.concurrent.CountDownLatch;
    
    import org.I0Itec.zkclient.IZkDataListener;
    
    public class ZkDistributeLock extends ZkDistributedAbstractLock {
    
        @Override
        Boolean tryGetLock() {
            try {
                zkClient.createEphemeral(PATH);
                return true;
            } catch (Exception e) {
                return false;
            }
    
        }
    
        @Override
        void waitUnLock() {
    
            // 使用事件监听,获取到节点被删除,zookeeper上节点有什么变化都可以感知到
            IZkDataListener iZkDataListener = new IZkDataListener() {
                // 当节点被删除
                public void handleDataDeleted(String dataPath) throws Exception {
                    if (countDownLatch != null) {
                        // 唤醒
                        countDownLatch.countDown();
                    }
    
                }
    
                // 当节点发生改变
                public void handleDataChange(String dataPath, Object data) throws Exception {
    
                }
                
            };
    
            // 注册节点信息
            zkClient.subscribeDataChanges(PATH, iZkDataListener);
            //检测是否存在该节点信息
            if (zkClient.exists(PATH)) {
                // 创建信号量
                countDownLatch = new CountDownLatch(1);
                try {
                    // 等待
                    countDownLatch.await();
                } catch (Exception e) {
    
                }
    
            }
            // 删除事件通知
            zkClient.unsubscribeDataChanges(PATH, iZkDataListener);
        }
    
    }

    测试类:

    package com.zk.distributed;
    
    import com.zk.distributed.DistributedLock;
    import com.zk.distributed.ZkDistributeLock;
    
    public class DistributedRunnable implements Runnable {
        public DistributedLock lock = new ZkDistributeLock();
        public static int count = 0;
    
        @Override
    
        public void run() {
            try {
                // 上锁
                lock.getLock();
                Thread.sleep(1000);
                // 模拟用户生成订单号
                count++;
                System.out.println("当前累加########" + count);
                // getNumber();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                // 釋放鎖資源
                lock.unLock();
            }
        }
    
        public static void main(String[] args) {
            for (int i = 0; i < 25; i++) {
                new Thread(new DistributedRunnable()).start();
            }
        }
    
    }

     

  • 相关阅读:
    netty系列之:基于流的数据传输
    netty系列之:中国加油
    netty系列之:Event、Handler和Pipeline
    netty系列之:netty中的Channel详解
    netty系列之:netty架构概述
    Python 列表解析式竟然支持异步?
    Python 数值中的下划线是怎么回事?
    Nginx+keepalived 双机主从模式下修改主Nginx自动同步nginx.conf配置文件至备Nginx
    Oracle nvarchar2存储特殊字符乱码问题
    Oracle 11.2.0.4.0版本下大表添加默认值字段影响因素
  • 原文地址:https://www.cnblogs.com/920913cheng/p/10984225.html
Copyright © 2011-2022 走看看