1、curator与spring(ssm)整合
2、zookeeper实现分布式锁
2.1、使用zk分布式锁流程图
2.2、使用zk分布式锁的案例
2.3、测试
1、curator与spring(ssm)整合 <--返回目录
依赖
<!-- zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.11</version>
</dependency>
<!-- curator -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
applicationContext-zookeeper.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<description>zk与spring容器结合,启动项目时建立与zk的连接</description>
<!-- zookeeper 重试策略 -->
<bean id="retryPolicy" class="org.apache.curator.retry.RetryNTimes">
<!--重试次数-->
<constructor-arg index="0" value="5"></constructor-arg>
<!--重试间隔-->
<constructor-arg index="1" value="5000"></constructor-arg>
</bean>
<bean id="client" class="org.apache.curator.framework.CuratorFrameworkFactory" factory-method="newClient" init-method="start">
<!--zk服务地址-->
<constructor-arg index="0" value="192.168.213.200:2181,192.168.213.200:2182,192.168.213.200:2183"></constructor-arg>
<!--session timeoout 会话超时时间-->
<constructor-arg index="1" value="10000"></constructor-arg>
<!--connectionTimeoutMs 创建连接超时时间-->
<constructor-arg index="2" value="5000"></constructor-arg>
<!--重试策略-->
<constructor-arg index="3" ref="retryPolicy"></constructor-arg>
</bean>
<!--注入zk客户端-->
<bean id="zkCurator" class="com.oy.utils.ZKCurator" init-method="init">
<constructor-arg index="0" ref="client"/>
</bean>
</beans>
ZKCurator
package com.oy.utils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ZKCurator {
private static final Logger log = LoggerFactory.getLogger(ZKCurator.class);
private CuratorFramework client = null; // zk客户端
public ZKCurator(CuratorFramework client) {
this.client = client; // 由spring容器注入
// 参数1 重试次数; 参数2 每次重试间隔的时间
// RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
// client = CuratorFrameworkFactory.builder()
// .connectString(zkServerPath).sessionTimeoutMs(20000)
// .retryPolicy(retryPolicy).build();
// client.start();
}
public void init() {
// 使用命名空间
client = client.usingNamespace("zk-namespace");
}
/**
* 测试客户端连接
* @param args
* @throws Exception
*/
// public static void main(String[] args) throws Exception {
// ZKCurator curatorOperator = new ZKCurator();
// boolean started = curatorOperator.client.isStarted();
// log.warn("当前客户端状态: " + (started ? "连接中" : "已关闭"));
//
// new Thread().sleep(5000);
// curatorOperator.closeZKClient();
// boolean started1 = curatorOperator.client.isStarted();
// log.warn("当前客户端状态: " + (started1 ? "连接中" : "已关闭"));
// }
/**
* 判断zk是否连接
*/
public boolean isZKAlive() {
return client.isStarted();
}
/**
* 关闭zk客户端连接
*/
public void closeZKClient() {
if (client != null) client.close();
}
}
DemoController
@Controller
public class DemoController {
@Autowired
private ZKCurator zkCurator;
@RequestMapping("/isZKAlive")
@ResponseBody
public String isZKAlive() {
return zkCurator.isZKAlive() ? "yes" : "no";
}
}
启动项目,测试zookeeper能否连接:http://localhost:8080/isZKAlive
2、zookeeper实现分布式锁 <--返回目录
2.1、使用zk分布式锁流程图 <--返回目录
2.2、使用zk分布式锁的案例 <--返回目录
BuyController
@RestController
public class BuyController {
@Resource(name = "buyService")
private BuyService buyService;
@Resource(name = "buy2Service")
private Buy2Service buy2Service;
// 查看库存总数
@RequestMapping("/count")
public String queryCount() {
return "" + buyService.queryCount();
}
// 重置库存总数, 方便测试
@RequestMapping("/reset")
public String resetCount() {
return "" + buyService.resetCount();
}
// 测试: 并发请求导致数据不一致(库存不够)
@GetMapping("/buy")
public String buy() throws Exception {
return buyService.buy() ? "succ" : "failed";
}
// 测试: zk分布式锁。注: 本demo只是为了测试zk分布式锁, 并没有搭建分布式环境。
@GetMapping("/buy2")
public String buy2() throws Exception {
return buy2Service.buy() ? "succ" : "failed";
}
}
BuyService
@Service("buyService")
public class BuyService {
final static Logger log = LoggerFactory.getLogger(BuyService.class);
public int queryCount() {
return Config.count;
}
public int resetCount() {
Config.count = 8;
return Config.count;
}
public boolean buy() {
int buyCounts = 5; // 购买5件
// 1. 判断库存
if (Config.count < buyCounts) {
log.info("库存剩余{}件,用户需求量{}件,库存不足,订单创建失败...",
Config.count, buyCounts);
return false;
}
// 2. 创建订单. 为了方便测试, 模拟业务处理时间为5s
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean isOrderCreated = true;
// 3. 创建订单成功后,扣除库存
if (isOrderCreated) {
log.info("{}订单创建成功...", Thread.currentThread().getName());
Config.count -= buyCounts;
} else {
log.info("{}订单创建失败...", Thread.currentThread().getName());
return false;
}
return true;
}
}
Buy2Service
@Service("buy2Service")
public class Buy2Service {
final static Logger log = LoggerFactory.getLogger(Buy2Service.class);
@Autowired
private DistributedLock distributedLock;
/**
* 注意每个return前都要释放锁
*
* @return
*/
public boolean buy() {
// 执行订单流程前使用当前业务获取分布式锁
distributedLock.getLock();
int buyCounts = 5; // 购买5件
// 1. 判断库存
if (Config.count < buyCounts) {
log.info("库存剩余{}件,用户需求量{}件,库存不足,订单创建失败...",
Config.count, buyCounts);
// 释放锁,让下一个请求获取锁
distributedLock.releaseLock();
return false;
}
// 2. 创建订单. 为了方便测试, 模拟业务处理时间为5s
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
// 释放锁,让下一个请求获取锁
distributedLock.releaseLock();
}
boolean isOrderCreated = true;
// 3. 创建订单成功后,扣除库存
if (isOrderCreated) {
log.info("{}订单创建成功...", Thread.currentThread().getName());
Config.count -= buyCounts;
} else {
log.info("{}订单创建失败...", Thread.currentThread().getName());
// 释放锁,让下一个请求获取锁
distributedLock.releaseLock();
return false;
}
// 释放锁,让下一个请求获取锁
distributedLock.releaseLock();
return true;
}
}
DistributedLock
package com.oy.utils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
public class DistributedLock {
private static final Logger log = LoggerFactory.getLogger(DistributedLock.class);
private CuratorFramework client = null; // zk客户端
// 用于挂起当前请求,并且等待上一个分布式锁释放
private static CountDownLatch zkLockLatch = new CountDownLatch(1);
// 分布式锁的总节点名
private static final String ZK_LOCK_PROJECT="zk_lock_project";
// 分布式锁节点
private static final String ORDER_COUNT_LOCK = "order_count_lock";
public DistributedLock(CuratorFramework client) {
this.client = client; // 由spring容器注入
}
public void init() {
/*
创建zk锁的总节点
ZKLocks-Namespace
|
---- zk_lock_project
|
---- order_count_lock
*/
try {
if (client.checkExists().forPath("/" + ZK_LOCK_PROJECT) == null) {
client.create().creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/" + ZK_LOCK_PROJECT);
}
// 针对zk的分布式锁节点,创建相应的watcher事件监听
addWatcherToLock("/" + ZK_LOCK_PROJECT);
} catch (Exception e) {
log.error("客户端连接zk服务器错误");
}
}
/**
* 获取分布式锁
*/
public void getLock() {
// 使用死循环,当且仅当上一个锁释放并且当前请求获得锁成功后才跳出
while (true) {
try {
client.create().creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/" + ZK_LOCK_PROJECT + "/" + ORDER_COUNT_LOCK);
log.warn("{}获得分布式锁成功", Thread.currentThread().getName());
return;
} catch (Exception e) {
//log.warn("获得分布式锁失败. msg: {}", e);
try {
// 如果没有获取到锁,需要重新设置同步资源值
if (zkLockLatch.getCount() <= 0) {
zkLockLatch = new CountDownLatch(1);
}
// 阻塞线程
zkLockLatch.await();
} catch (InterruptedException ex) {
}
}
}
}
/**
* 释放分布式锁
*/
public boolean releaseLock() {
try {
if (client.checkExists().forPath("/" + ZK_LOCK_PROJECT + "/" + ORDER_COUNT_LOCK) != null) {
client.delete().forPath("/" + ZK_LOCK_PROJECT + "/" + ORDER_COUNT_LOCK);
}
} catch (Exception e) {
log.warn("分布式锁释放失败. msg: {}", e);
return false;
}
log.warn("{}分布式锁释放完毕", Thread.currentThread().getName());
return true;
}
public void addWatcherToLock(String path) throws Exception {
final PathChildrenCache cache = new PathChildrenCache(client, path, true);
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
String path = event.getData().getPath();
log.warn("上一次会话已释放锁或该会话已断开,节点路径为:" + path);
if (path.contains(ORDER_COUNT_LOCK)) {
log.warn("释放计数器,让当前请求来获得分布式锁。。。");
zkLockLatch.countDown();
}
}
}
});
}
/**
* 判断zk是否连接
*/
public boolean isZKAlive() {
return client.isStarted();
}
/**
* 关闭zk客户端连接
*/
public void closeZKClient() {
if (client != null) client.close();
}
}
applicationContext-zookeeper.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<description>zk与spring容器结合,启动项目时建立与zk的连接</description>
<!-- zookeeper 重试策略 -->
<bean id="retryPolicy" class="org.apache.curator.retry.RetryNTimes">
<!--重试次数-->
<constructor-arg index="0" value="5"></constructor-arg>
<!--重试间隔-->
<constructor-arg index="1" value="5000"></constructor-arg>
</bean>
<bean id="client" class="org.apache.curator.framework.CuratorFrameworkFactory" factory-method="newClient"
init-method="start">
<!--zk服务地址-->
<constructor-arg index="0"
value="192.168.213.200:2181,192.168.213.200:2182,192.168.213.200:2183"></constructor-arg>
<!--session timeoout 会话超时时间-->
<constructor-arg index="1" value="10000"></constructor-arg>
<!--connectionTimeoutMs 创建连接超时时间-->
<constructor-arg index="2" value="5000"></constructor-arg>
<!--重试策略-->
<constructor-arg index="3" ref="retryPolicy"></constructor-arg>
</bean>
<!--zk客户端-->
<bean id="zkCurator" class="com.oy.utils.ZKCurator" init-method="init">
<constructor-arg index="0" ref="client"/>
</bean>
<bean id="distributedLock" class="com.oy.utils.DistributedLock" init-method="init">
<constructor-arg index="0" ref="client"/>
</bean>
</beans>
2.3、测试 <--返回目录
1)查询库存 http://localhost:8080/count,结果为 8。
2)并发请求 http://localhost:8080/buy 5次
控制台打印
2021-03-27 15:13:25,992 [http-bio-8080-exec-2] [com.oy.service.BuyService.buy(BuyService.java:41)] - [INFO] http-bio-8080-exec-2订单创建成功...
2021-03-27 15:13:26,319 [http-bio-8080-exec-3] [com.oy.service.BuyService.buy(BuyService.java:41)] - [INFO] http-bio-8080-exec-3订单创建成功...
2021-03-27 15:13:26,597 [http-bio-8080-exec-4] [com.oy.service.BuyService.buy(BuyService.java:41)] - [INFO] http-bio-8080-exec-4订单创建成功...
2021-03-27 15:13:26,840 [http-bio-8080-exec-5] [com.oy.service.BuyService.buy(BuyService.java:41)] - [INFO] http-bio-8080-exec-5订单创建成功...
2021-03-27 15:13:27,119 [http-bio-8080-exec-6] [com.oy.service.BuyService.buy(BuyService.java:41)] - [INFO] http-bio-8080-exec-6订单创建成功...
再次查询库存 http://localhost:8080/count,结果为 -17。
3)为了下一次测试,重置库存 http://localhost:8080/reset,然后查询库存 http://localhost:8080/count,结果为 8。
4)并发请求 http://localhost:8080/buy2 5次
控制台打印
再次查询库存 http://localhost:8080/count,结果为 3。