1.pom.xml
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.5.0</version> </dependency> </dependencies>
2.JAVA代码
1 package com.xbq.zookeeper.curator; 2 3 import java.util.concurrent.ExecutorService; 4 import java.util.concurrent.Executors; 5 import java.util.concurrent.Semaphore; 6 import java.util.concurrent.TimeUnit; 7 import org.apache.curator.framework.CuratorFramework; 8 import org.apache.curator.framework.CuratorFrameworkFactory; 9 import org.apache.curator.framework.recipes.locks.InterProcessMutex; 10 import org.apache.curator.retry.RetryNTimes; 11 12 /** 13 * 使用Curator来实现分布式锁 14 * @author xbq 15 */ 16 public class LockByCurator { 17 18 // 此demo使用的集群,所以有多个ip和端口 19 private static String CONNECT_SERVER = "192.168.242.129:2181,192.168.242.129:2182,192.168.242.129:2183"; 20 // session过期时间 21 private static int SESSION_TIMEOUT = 3000; 22 // 连接超时时间 23 private static int CONNECTION_TIMEOUT = 3000; 24 25 // 锁节点 26 private static final String CURATOR_LOCK = "/curatorLock"; 27 28 /** 29 * 获取锁操作 30 * @param cf 31 */ 32 public static void doLock(CuratorFramework cf){ 33 System.out.println(Thread.currentThread().getName() + " 尝试获取锁!"); 34 // 实例化 zk分布式锁 35 InterProcessMutex mutex = new InterProcessMutex(cf, CURATOR_LOCK); 36 try { 37 // 判断是否获取到了zk分布式锁 38 if(mutex.acquire(5, TimeUnit.SECONDS)){ 39 System.out.println(Thread.currentThread().getName() + " 获取到了锁!-------"); 40 // 业务操作 41 Thread.sleep(5000); 42 } 43 } catch (Exception e) { 44 e.printStackTrace(); 45 } finally { 46 try { 47 // 释放锁 48 mutex.release(); 49 } catch (Exception e) { 50 e.printStackTrace(); 51 } 52 } 53 } 54 55 /** 56 * 测试 57 * @param args 58 */ 59 public static void main(String[] args) { 60 // 定义线程池 61 ExecutorService service = Executors.newCachedThreadPool(); 62 // 定义信号灯,只能允许10个线程并发操作 63 final Semaphore semaphore = new Semaphore(10); 64 // 模拟10个客户端 65 for(int i=0; i < 10 ;i++){ 66 Runnable runnable = new Runnable() { 67 @Override 68 public void run() { 69 try { 70 semaphore.acquire(); 71 // 连接 ZooKeeper 72 CuratorFramework framework = CuratorFrameworkFactory. 73 newClient(CONNECT_SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, new RetryNTimes(10,5000)); 74 // 启动 75 framework.start(); 76 doLock(framework); 77 78 semaphore.release(); 79 } catch (Exception e) { 80 81 } 82 } 83 }; 84 service.execute(runnable); 85 } 86 service.shutdown(); 87 } 88 }