zoukankan      html  css  js  c++  java
  • zookeeper应用场景练习(分布式锁)

     

     在寻常的高并发的程序中。为了保证数据的一致性。因此都会用到锁。来对当前的线程进行锁定。在单机操作中。非常好做到,比方能够採用Synchronized、Lock或者其它的读写多来锁定当前的线程。可是在分布式的系统中,就非常难做到这一点。因此能够採用zookeeper中节点的特性来满足这一点。

    大致实现的思路例如以下。

     1.每一个客户端都去zookeeper上创建暂时的顺序节点

     2.客户端推断当前自己创建的节点是不是最小的

     3.假设是的话,就获得了运行当前任务的锁

     4.假设不是的话。就找到比自己小的节点,然后进行监听,假设被删除的话。就能够获得锁


     上面就是大致的实现思路,以下我们来通过代码来实现一下。

     

    package com.test;
    
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.api.CuratorWatcher;
    import org.apache.curator.framework.state.ConnectionState;
    import org.apache.curator.framework.state.ConnectionStateListener;
    import org.apache.curator.retry.RetryNTimes;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher.Event.EventType;
    import org.apache.zookeeper.data.Stat;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class DistributedLock {
    
    	private String lockName;
    	private final int timeOut = 3000;
    	private final String root = "/locks";
    	private String myZnode;// 代表当前节点信息
    	private String waitZnode;
    	private static Logger logger = LoggerFactory
    			.getLogger(DistributedLock.class);
    	private CuratorFramework client;
    	private CountDownLatch latch = new CountDownLatch(1);
    
    	public DistributedLock(String connectString, String lockName) {
    		this.lockName = lockName;
    		client = CuratorFrameworkFactory.builder().connectionTimeoutMs(timeOut)
    				.connectString(connectString)
    				.retryPolicy(new RetryNTimes(3, 3000)).build();
    		ConnectionStateListener listener = new ConnectionStateListener() {
    
    			public void stateChanged(CuratorFramework client,
    					ConnectionState newState) {
    				if (newState == ConnectionState.CONNECTED) {
    					logger.info("连接成功了");
    					latch.countDown();
    				}
    			}
    		};
    
    		client.getConnectionStateListenable().addListener(listener);
    		client.start();
    		try {
    			latch.await();
    			createRoot();
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    
    	}
    
    	/**
    	 * @Title: 创建根节点root
    	 * @Description: TODO
    	 * @param
    	 * @return void
    	 * @throws
    	 */
    	private void createRoot() {
    		try {
    			Stat stat = client.checkExists().forPath(root);
    			if (stat != null) {
    				logger.info("root has already exists");
    			} else {
    				// 创建跟节点
    				client.create().creatingParentsIfNeeded().forPath(root);
    
    			}
    		} catch (Exception e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
    
    	public void getLocks() {
    
    		try {
    			myZnode = client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
    					.forPath(root + "/" + lockName);
    			logger.info(myZnode + "has created");
    			// 取出全部的子节点。然后找出比自己小的节点。进行监听的设置
    			List<String> subNodes = client.getChildren().forPath(root);
    			// 取出全部带有lockname的节点信息
    			List<String> lockObjNodes = new ArrayList<String>();
    			for (String node : subNodes) {
    				if (node.contains(lockName)) {
    					lockObjNodes.add(node);
    				}
    			}
    			// 对当前节点进行排序
    			Collections.sort(lockObjNodes);
    			// 推断当前的节点是不是最小的节点
    			if (myZnode.equals(root + "/" + lockObjNodes.get(0))) {
    				doAction();
    			} else {
    				// 找到比自己节点大一的节点进行监听
    				String subMyZone = myZnode
    						.substring(myZnode.lastIndexOf("/") + 1);
    				waitZnode = lockObjNodes.get(Collections.binarySearch(
    						lockObjNodes, subMyZone) - 1);
    				// 对节点进行监听
    				Stat stat = client.checkExists()
    						.usingWatcher(deleteNodeWatcher).forPath("/"+waitZnode);
    				if (stat != null) {
    					System.out.println(Thread.currentThread().getName()
    							+ "处于等待状态");
    				} else {
    					doAction();
    				}
    			}
    		} catch (Exception e) {
    			logger.error(e.getMessage());
    		}
    	}
    
    	// 删除节点的事件监听
    	CuratorWatcher deleteNodeWatcher = new CuratorWatcher() {
    
    		public void process(WatchedEvent event) throws Exception {
    
    			if (event.getType() == EventType.NodeDeleted) {
    				doAction();
    			}
    		}
    	};
    
    	private void doAction() {
    		System.out.println(Thread.currentThread().getName() + "開始运行");
    		client.close();
    	}
    }
    


     以下来測试一下

     

    /**     
     * @FileName: TestCurrentZk.java   
     * @Package:com.test   
     * @Description: TODO  
     * @author: LUCKY    
     * @date:2016年2月2日 下午11:36:04   
     * @version V1.0     
     */
    package com.test;
    
    /**
     * @ClassName: TestCurrentZk
     * @Description: TODO
     * @author: LUCKY
     * @date:2016年2月2日 下午11:36:04
     */
    public class TestCurrentZk {
    
    	public static void main(String[] args) throws Exception {
    		Thread threads[] = new Thread[10];
    		for (int i = 0; i < threads.length; i++) {
    			threads[i] = new Thread(new Runnable() {
    				public void run() {
    					ClientTest clientTest = new ClientTest(
    							"100.66.162.36:2181", "locknametest");
    					clientTest.getLocks();
    				}
    			});
    
    			threads[i].start();
    
    		}
    		Thread.sleep(Integer.MAX_VALUE);
    	}
    }
    


  • 相关阅读:
    vue-cli 2.x升级到3.x版本, 和3.x降级到2.x版本命令
    vue-cli 2.x项目,删除打包线上环境的控制台打印
    vue-cli 2.x项目使用cross-env新建多个打包环境
    解决vue项目路由出现message: "Navigating to current location (XXX) is not allowed"的问题
    vue打包后,解决出现不到字体文件的错误
    js 网络图片转base64的方式(两种)
    vscode编译器,Settings Sync 同步插件,忘记GitHub token 和 Gist的解决办法
    输入两个时间,计算他们相差多少天多少时,多少秒
    老生常谈之js深拷贝与浅拷贝
    React Hooks 你不来了解下?
  • 原文地址:https://www.cnblogs.com/yutingliuyl/p/7066890.html
Copyright © 2011-2022 走看看