zoukankan      html  css  js  c++  java
  • 一种基于zookeeper的分布式队列的设计与实现

    package com.ysl.zkclient.queue;
    
    import com.ysl.zkclient.ZKClient;
    import com.ysl.zkclient.exception.ZKNoNodeException;
    import com.ysl.zkclient.utils.ExceptionUtil;
    import org.apache.zookeeper.CreateMode;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.Serializable;
    import java.util.List;
    
    /**
     * 一种分布式队列的实现
     * @param <T>
     */
    public class ZKDistributedQueue<T extends Serializable> {
    
        private static final Logger LOG = LoggerFactory.getLogger(ZKDistributedQueue.class);
    
        private static final String ELEMENT_NAME = "node";
    
        private ZKClient client;
        private String rootPath;
    
        /**
         * 创建分布式队列
         * @param client zk客户端
         * @param rootPath 队列的跟路径
         */
        public ZKDistributedQueue(ZKClient client, String rootPath) {
            this.client = client;
            this.rootPath = rootPath;
            if(!client.exists(rootPath)){
                throw new ZKNoNodeException("the root path is not exists, please create path first ["+rootPath+"]");
            }
        }
    
        /**
         * 添加一个元素
         * @param node
         * @return
         */
        public boolean offer(T node){
            try{
                client.create(rootPath+"/"+ELEMENT_NAME + "-",node, CreateMode.PERSISTENT_SEQUENTIAL);
            }catch (Exception e){
                throw ExceptionUtil.convertToRuntimeException(e);
            }
            return true;
        }
    
        /**
         * 删除并返回顶部元素
         * @return
         */
        public T pool(){
            while(true){
                Node node = getFirstNode();
                if(node == null){
                    return null;
                }
    
                try{
                    boolean flag = client.delete(node.getName());
                    if(flag){
                        return (T)node.getData();
                    }else{
                        //删除失败,说明数据已经被其他的线程获取,重新获取底部元素
                    }
                }catch (Exception e){
                    throw ExceptionUtil.convertToRuntimeException(e);
                }
            }
        }
    
        /**
         * 获取队列顶部元素
         * @return
         */
        private Node<T> getFirstNode() {
            try{
                while(true){
                    List<String> children = client.getChild(rootPath,true);
                    if(children == null || children.isEmpty()){
                        return null;
                    }
    
                    String nodeName = getNodeName(children);
                    try{
                        return new Node<T>(rootPath+"/"+nodeName,(T)client.getData(rootPath+"/"+nodeName));
                    }catch (ZKNoNodeException e){
                        //如果抛出此异常,证明该节点已被其他线程获取
                    }
                }
            }catch (Exception e){
                throw ExceptionUtil.convertToRuntimeException(e);
            }
        }
    
        /**
         * 获取编号最小的节点
         * @param children
         * @return
         */
        private String getNodeName(List<String> children) {
            String child= children.get(0);
            for(String path : children){
                if(path.compareTo(child) < 0){
                    child = path;
                }
            }
            return child;
        }
    
        public boolean isEmpty(){
            return client.getChild(rootPath,true).size() == 0;
        }
    
        public T peek(){
            Node<T> node = getFirstNode();
            if(node == null){
                return null;
            }
            return node.getData();
        }
    
        private class Node<T>{
    
            private String name;
            private T data;
    
            public Node(String name, T data) {
                this.name = name;
                this.data = data;
            }
    
            public String getName() {
                return name;
            }
    
            public T getData() {
                return data;
            }
        }
    }

    测试代码

    /**
         * 测试分布式队列
         * @throws Exception 
         * @return void
         */
        @Test
        public void testDistributedQueue() throws Exception{
            final String rootPath = "/zk/queue";
            //创建rootPath
            zkClient.createRecursive(rootPath, null, CreateMode.PERSISTENT);
            
            final List<String> list1 = new ArrayList<String>();
            final List<String> list2 = new ArrayList<String>();
            for(int i=0;i<21;i++){
                Thread thread1 = new Thread(new Runnable() {
                    public void run() {
                        ZKDistributedQueue<String> queue = new ZKDistributedQueue(zkClient, rootPath);
                        queue.offer(Thread.currentThread().getName());
                        list1.add(Thread.currentThread().getName());
                    }
                });
                thread1.start();
            }
            
            //等待事件到达
            int size1 = TestUtil.waitUntil(21, new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    return list1.size();
                }
                
            }, TimeUnit.SECONDS, 100);
            System.out.println(zkClient.getChildren(rootPath));
    
            for(int i=0;i<20;i++){
                Thread thread = new Thread(new Runnable() {
                    public void run() {
                        ZKDistributedQueue<String> queue = new ZKDistributedQueue(zkClient, rootPath);
                        list2.add(queue.poll());
                    }
                });
                thread.start();
            }
            //等待事件到达
            int size2 = TestUtil.waitUntil(20, new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    return list2.size();
                }
                
            }, TimeUnit.SECONDS, 100);
            assertThat(size2).isEqualTo(20);
            boolean flag = true;
            for(int i =0;i<20;i++){
               if(!list1.get(i).equals(list2.get(i))){
                   flag = false;
                   break;
               }
            }
            assertThat(flag).isTrue();
            
            ZKDistributedQueue<String> queue = new ZKDistributedQueue(zkClient, rootPath);
            assertThat(queue.peek()).isEqualTo(queue.poll());
        }
  • 相关阅读:
    Indy的TCPServer到底能支持多少个连接
    Delphi TStream 详细介绍
    WebAPI下的如何实现参数绑定
    使用 Weinre 调试移动网站及 PhoneGap 应用
    面向对象的三大特征:封装、继承、多态
    轻量级前端MVVM框架avalon
    三种工厂模式的分析以及C++实现
    简单实现TCP下的大文件高效传输
    Nunit NMock Ncover单元测试
    算法实践——数独的基本解法
  • 原文地址:https://www.cnblogs.com/senlinyang/p/7845192.html
Copyright © 2011-2022 走看看