在单节点上运行的程序,多个线程对一个共享变量进行操作,则有可能出现线程安全的问题。例如:春运期间购买火车票
public class BuyTicketRunnable implements Runnable { //static 表示ticketCount是全局共享变量 public static int ticketCount = 100; @Override public void run() { while (true) { if (ticketCount > 0) { try { // 此处休眠500毫秒更能看出效果 Thread.sleep(20); ticketCount--; System.out.println("当前剩余票的数量:" + ticketCount); } catch (InterruptedException e) { e.printStackTrace(); return; } } } } public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Thread(new BuyTicketRunnable()).start(); } } }
当前剩余票的数量:15 当前剩余票的数量:11 当前剩余票的数量:9 当前剩余票的数量:9 当前剩余票的数量:9 当前剩余票的数量:8 当前剩余票的数量:5 当前剩余票的数量:3 当前剩余票的数量:4 当前剩余票的数量:6 当前剩余票的数量:6 当前剩余票的数量:2 当前剩余票的数量:0 当前剩余票的数量:1 当前剩余票的数量:-1 当前剩余票的数量:-2 当前剩余票的数量:-3 当前剩余票的数量:-5 当前剩余票的数量:-7 当前剩余票的数量:-5 当前剩余票的数量:-6
最后运行的结果显示和实际情况有出入的,之所以出现不是连续的减少车票,是因为多个线程是并行执行的,从结果看出了多个线程操作共享数据,会出现线程安全问题,所以我们为了确保这种问题不会出现,可以使用锁.
锁的分类有很多:读写锁,悲观锁,乐观锁,重入锁等...........
Synchronized
synchronized属于重入锁,
重入锁,也叫做递归锁,指的是同一线程 外层函数获得锁之后 ,内层递归函数仍然有获取该锁的代码,但不受影响。
public class BuyTicketRunnable implements Runnable { // static 表示ticketCount是全局共享变量 public static int ticketCount = 100; //lock作为全局变量锁 public static Object lock = new Object(); @Override public void run() { synchronized (BuyTicketRunnable.lock) { while (true) { if (ticketCount > 0) { try { // 此处休眠500毫秒更能看出效果 Thread.sleep(20); ticketCount--; System.out.println("当前剩余票的数量:" + ticketCount); } catch (InterruptedException e) { e.printStackTrace(); return; } } } } } public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Thread(new BuyTicketRunnable()).start(); } } }
当前剩余票的数量:29 当前剩余票的数量:28 当前剩余票的数量:27 当前剩余票的数量:26 当前剩余票的数量:25 当前剩余票的数量:24 当前剩余票的数量:23 当前剩余票的数量:22 当前剩余票的数量:21 当前剩余票的数量:20 当前剩余票的数量:19 当前剩余票的数量:18 当前剩余票的数量:17 当前剩余票的数量:16 当前剩余票的数量:15 当前剩余票的数量:14 当前剩余票的数量:13 当前剩余票的数量:12 当前剩余票的数量:11 当前剩余票的数量:10 当前剩余票的数量:9 当前剩余票的数量:8 当前剩余票的数量:7 当前剩余票的数量:6 当前剩余票的数量:5 当前剩余票的数量:4 当前剩余票的数量:3 当前剩余票的数量:2 当前剩余票的数量:1 当前剩余票的数量:0
使用lock锁
lock锁需要手动加锁,手动释放锁,如果遇到异常,锁还在不会自动释放,Synchronized锁如果遇到异常,则会自动释放锁
在分布式或者集群中可能出现的数据安全问题
Zookeeper实现分布式锁原理
zookeeper是以树形结构的方式存储数据,每个节点不能重复,zookeeper里面有四种节点,两大类节点,持久节点和临时节点。持久节点是存放到本地里,连接断了也存在,但是临时节点如果zk的连接断掉以后。临时节点则不会存在,正是因为这种特性,我们可以使用zk的临时节点来作为锁。
有A,B两个连接操作数据,A连接先去创建节点,如果能创建节点成功,表示获取了锁可以操作数据,此时如果B连接在来创建节点,根据zk存储特性,节点名字不能有重复,此时B连接是无法创建节点成功,所以处于等待状态。等待A连接处理完数据以后,断开连接,临时节点断开连接以后该节点就消失(释放锁),此时B 就可以在创建节点(获取锁),B处理完以后断开连接(释放锁)。所以zk作为锁的原理是,临时节点创建成功(获取锁),断开连接(释放锁).
JAVA代码实现Zookeeper分布式锁
导入Zookeeper所需要的依赖
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.14</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency>
新增接口:
package com.zk.distributed; public interface DistributedLock { // 获取锁方法 void getLock(); // 释放锁方法 void unLock(); }
抽象类实现接口:
package com.zk.distributed; import java.util.concurrent.CountDownLatch; import org.I0Itec.zkclient.ZkClient; public abstract class ZkDistributedAbstractLock implements DistributedLock { // zk连接地址 private final String CONNETION_ADDR = "127.0.0.1:2181"; // 创建zk连接 protected ZkClient zkClient = new ZkClient(CONNETION_ADDR); //创建这个节点,以这个节点作为锁 protected final String PATH = "/Distributed"; //信号量 protected CountDownLatch countDownLatch = null; @Override public void getLock() { if (tryGetLock()) { System.out.println("###(创建节点成功)获取锁成功#####"); } else { // 等待 waitUnLock(); // 重新获取锁 getLock(); } } // 尝试去获取锁 abstract Boolean tryGetLock(); // 如果当前锁被占用,则等待 abstract void waitUnLock(); @Override public void unLock() { if (zkClient != null) { zkClient.close(); System.out.println("关闭连接,释放资源(释放锁)"); } } }
实现抽象类:
package com.zk.distributed; import java.util.concurrent.CountDownLatch; import org.I0Itec.zkclient.IZkDataListener; public class ZkDistributeLock extends ZkDistributedAbstractLock { @Override Boolean tryGetLock() { try { zkClient.createEphemeral(PATH); return true; } catch (Exception e) { return false; } } @Override void waitUnLock() { // 使用事件监听,获取到节点被删除,zookeeper上节点有什么变化都可以感知到 IZkDataListener iZkDataListener = new IZkDataListener() { // 当节点被删除 public void handleDataDeleted(String dataPath) throws Exception { if (countDownLatch != null) { // 唤醒 countDownLatch.countDown(); } } // 当节点发生改变 public void handleDataChange(String dataPath, Object data) throws Exception { } }; // 注册节点信息 zkClient.subscribeDataChanges(PATH, iZkDataListener); //检测是否存在该节点信息 if (zkClient.exists(PATH)) { // 创建信号量 countDownLatch = new CountDownLatch(1); try { // 等待 countDownLatch.await(); } catch (Exception e) { } } // 删除事件通知 zkClient.unsubscribeDataChanges(PATH, iZkDataListener); } }
测试类:
package com.zk.distributed; import com.zk.distributed.DistributedLock; import com.zk.distributed.ZkDistributeLock; public class DistributedRunnable implements Runnable { public DistributedLock lock = new ZkDistributeLock(); public static int count = 0; @Override public void run() { try { // 上锁 lock.getLock(); Thread.sleep(1000); // 模拟用户生成订单号 count++; System.out.println("当前累加########" + count); // getNumber(); } catch (Exception e) { e.printStackTrace(); } finally { // 釋放鎖資源 lock.unLock(); } } public static void main(String[] args) { for (int i = 0; i < 25; i++) { new Thread(new DistributedRunnable()).start(); } } }