1.redis锁
1.工具类代码:
package com.lhw.chche; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.dao.DataAccessException; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.core.RedisCallback; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisCommands; import java.util.ArrayList; import java.util.List; public class RedisLock { private final static Logger LOG = LoggerFactory.getLogger(RedisLock.class); static StringBuilder sb = new StringBuilder(); static { sb.append("if redis.call("get",KEYS[1]) == ARGV[1] "); sb.append("then "); sb.append(" return redis.call("del",KEYS[1]) "); sb.append("else "); sb.append(" return 0 "); sb.append("end "); } public static final String UNLOCK_LUA = sb.toString(); /** * redis加锁 * @param key lock * @param expire * @return */ public static String setLock(final String key, final long expire) { //System.out.println("1--------------setLock,key="+key); try { RedisCallback<String> callback = new RedisCallback<String>() { @Override public String doInRedis(RedisConnection connection) throws DataAccessException { JedisCommands commands = (JedisCommands) connection.getNativeConnection(); return commands.set(key, key, "NX", "PX", expire); } }; String result = RedisOperationConfig.getStringRedisTemplate().execute(callback); //System.out.println("1--------------setLock,result="+result); if (StringUtils.isEmpty(result)) { LOG.warn("key="+key+",获取锁失败。。。。。"); return null; }else { if(LOG.isInfoEnabled()) { LOG.info("key="+key+",获取锁成功。。。。。"); } return key; } } catch (Exception e) { LOG.error("",e); LOG.warn("key="+key+",获取锁异常。。。。。"); } return null; } /** * redis 解锁 * @param key * @param requestId * @return */ public static boolean releaseLock(String key, String requestId) { try { final List keys = new ArrayList(); keys.add(key); final List args = new ArrayList(); args.add(requestId); RedisCallback callback = new RedisCallback() { @Override public Long doInRedis(RedisConnection connection) throws DataAccessException { Object nativeConnection = connection.getNativeConnection(); if ((nativeConnection instanceof JedisCluster)) { return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args); } if ((nativeConnection instanceof Jedis)) { return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args); } return Long.valueOf(0L); } }; Long result = (Long) RedisOperationConfig.getStringRedisTemplate().execute(callback); boolean bool = (result != null) && (result.longValue() > 0L); return bool; } catch (Exception e) { } finally { } return false; } /** * 获取redis * @param key * @return */ public static String getLockValue(final String key){ try { RedisCallback<String> callback = new RedisCallback<String>() { @Override public String doInRedis(RedisConnection connection) throws DataAccessException { JedisCommands commands = (JedisCommands) connection.getNativeConnection(); return commands.get(key); } }; String result = RedisOperationConfig.getStringRedisTemplate().execute(callback); if (StringUtils.isEmpty(result)) { LOG.warn("key="+key+",获取锁值失败。。。。。"); return null; }else { if(LOG.isInfoEnabled()) { LOG.info("key="+key+",获取锁值成功。。。。。"); } return result; } } catch (Exception e) { LOG.error("",e); LOG.warn("key="+key+",获取锁异常。。。。。"); } return null; } }
2.使用场景顺序
1 --> taskNo是一个任务的唯一值,先进行上锁.看是否成功,成功则继续
2 --> taskNo+时间(年月日),先进行上锁.看是否成功,成功则继续,执行具体的任务
3 --> 设置 2里面 taskNo+时间(年月日)的过期时间
4 --> 释放 1里面 taskNo 的锁
1. String rediskey = "com.lhw.lock."+taskNo; String lock = RedisLock.setLock(rediskey,outTime); 2. String redisDealkey = "com.lhw.dealLock."+taskNo+"."+ymd; String dealLock = RedisLock.getLockValue(redisDealkey); 3. //设置当天任务已执行的开关值,默认12小时过期时间 RedisLock.setLock(redisDealkey,12*60*60*1000); 4. RedisLock.releaseLock(rediskey,rediskey);
2.zookeeper锁
图解:公平锁和可重入锁 模型
分布式锁的概念和原理,比较抽象难懂。如果用一个简单的故事来类比,估计就简单多了。
很久以前,在一个村子有一口井,水质非常的好,村民们都抢着取井里的水。井就那么一口,村里的人很多,村民为争抢取水打架斗殴,甚至头破血流。
问题总是要解决,于是村长绞尽脑汁,最终想出了一个凭号取水的方案。井边安排一个看井人,维护取水的秩序。
说起来,秩序很简单,取水之前,先取号。号排在前面的,就可以先取水。先到的排在前面,那些后到的,没有排在最前面的人,一个一个挨着,在井边排成一队。取水示意图如下 :
这种排队取水模型,就是一种锁的模型。排在最前面的号,拥有取水权,就是一种典型的独占锁。另外,先到先得,号排在前面的人先取到水,取水之后就轮到下一个号取水,至少,看起来挺公平的,说明它是一种公平锁。
在公平独占锁的基础上,再进一步,看看可重入锁的模型。
假定,取水时以家庭为单位,哪个家庭任何人拿到号,就可以排号取水,而且如果一个家庭有一个人拿到号,其它家人这时候过来打水不用再取号。新的排号取水示意图如下 :
如上图的1号,老公有号,他的老婆来了,直接排第一个,妻凭夫贵。再看上图的2号,父亲正在打水,他的儿子和女儿也到井边了,直接排第二个,这个叫做子凭父贵。 等等,如果是同一个家庭,可以直接复用排号,不用重新取号从后面排起。
以上这个故事模型,就是可以重入锁的模型。只要满足条件,同一个排号,可以用来多次取水。在锁的模型中,相当于一把锁,可以被多次锁定,这就叫做可重入锁。
zookeeper分布式锁的原理
理解了锁的原理后,就会发现,Zookeeper 天生就是一副分布式锁的胚子。
首先,Zookeeper的每一个节点,都是一个天然的顺序发号器。
在每一个节点下面创建子节点时,只要选择的创建类型是有序(EPHEMERAL_SEQUENTIAL 临时有序或者PERSISTENT_SEQUENTIAL 永久有序)类型,那么,新的子节点后面,会加上一个次序编号。这个次序编号,是上一个生成的次序编号加一
比如,创建一个用于发号的节点“/test/lock”,然后以他为父亲节点,可以在这个父节点下面创建相同前缀的子节点,假定相同的前缀为“/test/lock/seq-”,在创建子节点时,同时指明是有序类型。如果是第一个创建的子节点,那么生成的子节点为/test/lock/seq-0000000000,下一个节点则为/test/lock/seq-0000000001,依次类推,等等。
其次,Zookeeper节点的递增性,可以规定节点编号最小的那个获得锁。
一个zookeeper分布式锁,首先需要创建一个父节点,尽量是持久节点(PERSISTENT类型),然后每个要获得锁的线程都会在这个节点下创建个临时顺序节点,由于序号的递增性,可以规定排号最小的那个获得锁。所以,每个线程在尝试占用锁之前,首先判断自己是排号是不是当前最小,如果是,则获取锁。
第三,Zookeeper的节点监听机制,可以保障占有锁的方式有序而且高效。
每个线程抢占锁之前,先抢号创建自己的ZNode。同样,释放锁的时候,就需要删除抢号的Znode。抢号成功后,如果不是排号最小的节点,就处于等待通知的状态。等谁的通知呢?不需要其他人,只需要等前一个Znode 的通知就可以了。当前一个Znode 删除的时候,就是轮到了自己占有锁的时候。第一个通知第二个、第二个通知第三个,击鼓传花似的依次向后。
Zookeeper的节点监听机制,可以说能够非常完美的,实现这种击鼓传花似的信息传递。具体的方法是,每一个等通知的Znode节点,只需要监听linsten或者 watch 监视排号在自己前面那个,而且紧挨在自己前面的那个节点。 只要上一个节点被删除了,就进行再一次判断,看看自己是不是序号最小的那个节点,如果是,则获得锁。
为什么说Zookeeper的节点监听机制,可以说是非常完美呢?
一条龙式的首尾相接,后面监视前面,就不怕中间截断吗?比如,在分布式环境下,由于网络的原因,或者服务器挂了或则其他的原因,如果前面的那个节点没能被程序删除成功,后面的节点不就永远等待么?
其实,Zookeeper的内部机制,能保证后面的节点能够正常的监听到删除和获得锁。在创建取号节点的时候,尽量创建临时znode 节点而不是永久znode 节点,一旦这个 znode 的客户端与Zookeeper集群服务器失去联系,这个临时 znode 也将自动删除。排在它后面的那个节点,也能收到删除事件,从而获得锁。
说Zookeeper的节点监听机制,是非常完美的。还有一个原因。
Zookeeper这种首尾相接,后面监听前面的方式,可以避免羊群效应。所谓羊群效应就是每个节点挂掉,所有节点都去监听,然后做出反映,这样会给服务器带来巨大压力,所以有了临时顺序节点,当一个节点挂掉,只有它后面的那一个节点才做出反映。
1.简单的分布式锁
代码实现
AbstractLock.java
package zklock; import org.I0Itec.zkclient.ZkClient; public abstract class AbstractLock { //zk地址和端口 public static final String ZK_ADDR = "192.168.0.230:2181"; //超时时间 public static final int SESSION_TIMEOUT = 10000; //创建zk protected ZkClient zkClient = new ZkClient(ZK_ADDR, SESSION_TIMEOUT); /** * 可以认为是模板模式,两个子类分别实现它的抽象方法 * 1,简单的分布式锁 * 2,高性能分布式锁 */ public void getLock() { String threadName = Thread.currentThread().getName(); if (tryLock()) { System.out.println(threadName+"-获取锁成功"); }else { System.out.println(threadName+"-获取锁失败,进行等待..."); waitLock(); //递归重新获取锁 getLock(); } } public abstract void releaseLock(); public abstract boolean tryLock(); public abstract void waitLock(); }
AbstractLock类是个抽象类,里面getLock使用模板模式,子类分别是简单的zk锁和高性能的zk锁
SimpleZkLock.java
package zklock; import java.util.concurrent.CountDownLatch; import org.I0Itec.zkclient.IZkDataListener; /** * 简单的分布式锁的实现 */ public class SimpleZkLock extends AbstractLock { private static final String NODE_NAME = "/test_simple_lock"; private CountDownLatch countDownLatch; @Override public void releaseLock() { if (null != zkClient) { //删除节点 zkClient.delete(NODE_NAME); zkClient.close(); System.out.println(Thread.currentThread().getName()+"-释放锁成功"); } } //直接创建临时节点,如果创建成功,则表示获取了锁,创建不成功则处理异常 @Override public boolean tryLock() { if (null == zkClient) return false; try { zkClient.createEphemeral(NODE_NAME); return true; } catch (Exception e) { return false; } } @Override public void waitLock() { //监听器 IZkDataListener iZkDataListener = new IZkDataListener() { //节点被删除回调 @Override public void handleDataDeleted(String dataPath) throws Exception { if (countDownLatch != null) { countDownLatch.countDown(); } } //节点改变被回调 @Override public void handleDataChange(String dataPath, Object data) throws Exception { // TODO Auto-generated method stub } }; zkClient.subscribeDataChanges(NODE_NAME, iZkDataListener); //如果存在则阻塞 if (zkClient.exists(NODE_NAME)) { countDownLatch = new CountDownLatch(1); try { countDownLatch.await(); System.out.println(Thread.currentThread().getName()+" 等待获取锁..."); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //删除监听 zkClient.unsubscribeDataChanges(NODE_NAME, iZkDataListener); } }
SimpleZkLock 表示简单的zk分布式锁,逻辑还是相对比较简单,下面看下测试
LockTest.java
package zklock; public class LockTest { public static void main(String[] args) { //模拟多个10个客户端 for (int i=0;i<10;i++) { Thread thread = new Thread(new LockRunnable()); thread.start(); } } static class LockRunnable implements Runnable{ @Override public void run() { AbstractLock zkLock = new SimpleZkLock(); //AbstractLock zkLock = new HighPerformanceZkLock(); zkLock.getLock(); //模拟业务操作 try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } zkLock.releaseLock(); } } }
zk实现的简单的分布式锁存在的性能问题
二。高性能分布式锁
1、思路:客户端在抢锁的时候进行排队,客户端只要监听它前一个节点的变化就行,如果前一个节点释放了锁,客户端才去进行抢锁操作,这个时候我们就需要创建顺序节点了
HighPerformanceZkLock .java
package zklock; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import org.I0Itec.zkclient.IZkDataListener; /** * 高性能分布式锁 * @author hongtaolong * */ public class HighPerformanceZkLock extends AbstractLock { private static final String PATH = "/highPerformance_zklock"; //当前节点路径 private String currentPath; //前一个节点的路径 private String beforePath; private CountDownLatch countDownLatch = null; public HighPerformanceZkLock() { //如果不存在这个节点,则创建持久节点 if (!zkClient.exists(PATH)) { zkClient.createPersistent(PATH); } } @Override public void releaseLock() { if (null != zkClient) { zkClient.delete(currentPath); zkClient.close(); } } @Override public boolean tryLock() { //如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath if (null == currentPath || "".equals(currentPath)) { //在path下创建一个临时的顺序节点 currentPath = zkClient.createEphemeralSequential(PATH+"/", "lock"); } //获取所有的临时节点,并排序 List<String> childrens = zkClient.getChildren(PATH); Collections.sort(childrens); if (currentPath.equals(PATH+"/"+childrens.get(0))) { return true; }else {//如果当前节点不是排名第一,则获取它前面的节点名称,并赋值给beforePath int pathLength = PATH.length(); int wz = Collections.binarySearch(childrens, currentPath.substring(pathLength+1)); beforePath = PATH+"/"+childrens.get(wz-1); } return false; } @Override public void waitLock() { IZkDataListener lIZkDataListener = new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { if (null != countDownLatch){ countDownLatch.countDown(); } } @Override public void handleDataChange(String dataPath, Object data) throws Exception { } }; //监听前一个节点的变化 zkClient.subscribeDataChanges(beforePath, lIZkDataListener); if (zkClient.exists(beforePath)) { countDownLatch = new CountDownLatch(1); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } zkClient.unsubscribeDataChanges(beforePath, lIZkDataListener); } }
这里只要帖高性能锁的代码了,AbstractLock没变化,LockTest中只要修改一行代码
//AbstractLock zkLock = new SimpleZkLock();
AbstractLock zkLock = new HighPerformanceZkLock();
测试结果:
转载自:https://blog.csdn.net/crazymakercircle/article/details/85956246
https://blog.csdn.net/hongtaolong/java/article/details/88898875
3.数据库锁
mysql行锁。
存储引擎使用innoDB
行锁的使用方式为:
Select * from student_game where id=3 for update;
这样我们的业务代码大概是这样的:
conn.setAutoCommit(false);//关闭自动提交 select scores from test.student where id= 4 for update;//上锁 //获取学生id,scores数据,查询es记录,等到总分。 //更新学生得分表scores记录 Conn.commit;//释放锁
原文链接:https://blog.csdn.net/humanity11/java/article/details/93356344
mysql分布式锁
当我们要锁住某个方法或资源时,我们就在该表中增加一条记录,想要释放锁的时候就删除这条记录。
创建这样一张数据库表:
CREATE TABLE `methodLock` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键', `method_name` varchar(64) NOT NULL DEFAULT '' COMMENT '锁定的方法名', `desc` varchar(1024) NOT NULL DEFAULT '备注信息', `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '保存数据时间,自动生成', PRIMARY KEY (`id`), UNIQUE KEY `uidx_method_name` (`method_name `) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';
当我们想要锁住某个方法时,执行以下SQL:
insert into methodLock(method_name,desc) values (‘method_name’,‘desc’)
当方法执行完毕之后,想要释放锁的话,需要执行以下Sql:
delete from methodLock where method_name ='method_name'
上面这种简单的实现有以下几个问题:
1、这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
2、这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。
3、这把锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。
4、这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。
当然,我们也可以有其他方式解决上面的问题。
1.数据库是单点?搞两个数据库,数据之前双向同步。一旦挂掉快速切换到备库上。
2.没有失效时间?只要做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍。
3.非阻塞的?搞一个while循环,直到insert成功再返回成功。
4.非重入的?在数据库表中加个字段,记录当前获得锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了。
链接:https://www.jianshu.com/p/7b57ebb25900