zoukankan      html  css  js  c++  java
  • ZooKeeper伪集群安装配置

    一、安装部署

    1. 下载Zookeeper安装包

    a)         下载地址http://zookeeper.apache.org/releases.html(这个是apache zookeeper的官网下载地址)

    1. tar文件解压

    使用命令解压文件,当我们需要部署伪集群的时候需要进行多次解压,解压到不同目录下如:server002,server003下面

    a)         cd /usr/local/server001                //进入该目录下进行解压

    b)         tar -zvxf zookeeper-3.4.9.tar.gz          //解压文件

    1. 修改zookeeper-3.4.9/zoo_sample.cfd文件名改为zoo.cfg,内容缺省如下:

    Server001目录下的zoo.cfg配置文件内容修改:

     

    # The number of milliseconds of each tick
    tickTime=2000
    # The number of ticks that the initial 
    # synchronization phase can take
    initLimit=10
    # The number of ticks that can pass between 
    # sending a request and getting an acknowledgement
    syncLimit=5
    # the port at which the clients will connect
    clientPort=2181 #注意主要就是修改这个端口,端口不能一样
    # the directory where the snapshot is stored.
    #dataDir=/export/crawlspace/mahadev/zookeeper/server1/data
    dataDir=/usr/local/zookeeper-3.3.3/data# 

    dataLogDir=/usr/localzookeeper-3.3.3/log

    server.1=192.168.201.128:8881:7771 
    server.2=192.168.201.131:8882:7772 
    server.3=192.168.201.132:8883:7773

     

    Server002目录下的zoo.cfg配置文件内容修改:

     

    # The number of milliseconds of each tick
    tickTime=2000
    # The number of ticks that the initial 
    # synchronization phase can take
    initLimit=10
    # The number of ticks that can pass between 
    # sending a request and getting an acknowledgement
    syncLimit=5
    # the port at which the clients will connect
    clientPort=2182#注意主要就是修改这个端口,端口不能一样
    # the directory where the snapshot is stored.
    #dataDir=/export/crawlspace/mahadev/zookeeper/server1/data
    dataDir=/usr/local/zookeeper-3.3.3/data# 

    dataLogDir=/usr/localzookeeper-3.3.3/log

    server.1=192.168.201.128:8881:7771 
    server.2=192.168.201.131:8882:7772 
    server.3=192.168.201.132:8883:7773

     

     

    Server003目录下的zoo.cfg配置文件内容修改:

     

    # The number of milliseconds of each tick
    tickTime=2000
    # The number of ticks that the initial 
    # synchronization phase can take
    initLimit=10
    # The number of ticks that can pass between 
    # sending a request and getting an acknowledgement
    syncLimit=5
    # the port at which the clients will connect
    clientPort=2183#注意主要就是修改这个端口,端口不能一样
    # the directory where the snapshot is stored.
    #dataDir=/export/crawlspace/mahadev/zookeeper/server1/data
    dataDir=/usr/local/zookeeper-3.3.3/data# 

    dataLogDir=/usr/localzookeeper-3.3.3/log

    server.1=192.168.201.128:8881:7771 
    server.2=192.168.201.131:8882:7772 
    server.3=192.168.201.132:8883:7773

     

    1. 在/usr/local/server001目录下面创建两个文件data,log(三个节点目录下都要创建)

    mkdir /usr/loca/server001/{data,log}

    1. 在data文件下面床架一个myid文件,内容分别对应你安装的zookeeper的端口号。如:2181内容则为1。2182内容则为2;

    进入打他文件目录:cd /usr/local/server001/data

    创建myid文件并且设置内容 vi myid

    6.启动每个zookeeper

             /usr/local/server001/zookeeper-3.4.9/bin/zkServer.sh restart

    /usr/local/server002/zookeeper-3.4.9/bin/zkServer.sh restart

    /usr/local/server003/zookeeper-3.4.9/bin/zkServer.sh restart

    1. 查看是否存在进程:

    输入jps

    7.执行客户端脚本

             /usr/local/server001/zookeeper-3.4.9/bin/zkCli.sh –server 127.0.0.1:2181

    成功

    一、java连接zk实例

    public class DistributedLock implements Watcher{ 

     

        private int threadId; 

        private ZooKeeper zk = null

        private String selfPath; 

        private String waitPath; 

        private String LOG_PREFIX_OF_THREAD; 

        private static final int SESSION_TIMEOUT = 10000;  //session超时时间

        private static final String GROUP_PATH = "/disLocks"; 

        private static final String SUB_PATH = "/disLocks/sub"; 

        private static final String CONNECTION_STRING = "localhost:2182,localhost:2181,localhsot:2183";//服务器地址  ,远程操作将地址修改一下

         

        private static final int THREAD_NUM = 10;  

        //确保连接zk成功; 

        private CountDownLatch connectedSemaphore = new CountDownLatch(1); 

        //确保所有线程运行结束; 

        private static final CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM); 

    //    private static final Logger LOG = LoggerFactory.getLogger(AllZooKeeperWatcher.class); 

        public DistributedLock(int id) { 

            this.threadId = id; 

            LOG_PREFIX_OF_THREAD = "【第"+threadId+"个线程】"; 

        } 

        public static void main(String[] args) { 

            for(int i=0; i < THREAD_NUM; i++){ 

                final int threadId = i+1; 

                new Thread(){ 

                    @Override 

                    public void run() { 

                      DistributedLock dc = null;

                        try

                          dc = new DistributedLock(threadId); 

                            dc.createConnection(CONNECTION_STRING, SESSION_TIMEOUT); 

                            //GROUP_PATH不存在的话,由一个线程创建即可;

                           synchronized (threadSemaphore){ 

                                dc.createPath(GROUP_PATH, "该节点由线程" + threadId + "创建", true); 

                            } 

                         

                            dc.getLock(); 

                        } catch (Exception e){ 

                            System.out.println("【第"+threadId+"个线程】 抛出的异常:"); 

                            e.printStackTrace(); 

                            dc.releaseConnection();

                        } 

                    } 

                }.start(); 

            } 

            try

                threadSemaphore.await(); 

                System.out.println("所有线程运行结束!"); 

                System.out.println(DistribLock.ku); 

            } catch (InterruptedException e) { 

                e.printStackTrace(); 

            } 

        } 

        /**

         * 获取锁

         * @return

         */ 

        private void getLock() throws KeeperException, InterruptedException {

           

            selfPath = zk.create(SUB_PATH,null, ZooDefs.Ids.OPEN_ACL_UNSAFE, org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL); 

            System.out.println(LOG_PREFIX_OF_THREAD+"创建锁路径:"+selfPath); 

            if(checkMinPath()){ 

                getLockSuccess(); 

            } 

        } 

        /**

         * 创建节点

         * @param path 节点path

         * @param data 初始数据内容

         * @return

         */ 

        public boolean createPath( String path, String data, boolean needWatch) throws KeeperException, InterruptedException { 

            if(zk.exists(path, needWatch)==null){ 

            System.out.println( LOG_PREFIX_OF_THREAD + "节点创建成功, Path: " 

                        + this.zk.create( path, 

                        data.getBytes(), 

                        ZooDefs.Ids.OPEN_ACL_UNSAFE

                        org.apache.zookeeper.CreateMode.PERSISTENT

                        + ", content: " + data ); 

            } 

            return true

        } 

        /**

         * 创建ZK连接

         * @param connectString  ZK服务器地址列表

         * @param sessionTimeout Session超时时间

         */ 

        public void createConnection( String connectString, int sessionTimeout ) throws IOException, InterruptedException { 

                zk = new ZooKeeper( connectString, sessionTimeout, this); 

                connectedSemaphore.await(); 

        } 

        /**

         * 获取锁成功

        */ 

        public void getLockSuccess() throws KeeperException, InterruptedException { 

            if(zk.exists(this.selfPath,false) == null){ 

            System.out.println(LOG_PREFIX_OF_THREAD+"本节点已不在了..."); 

                return

            } 

            System.out.println(LOG_PREFIX_OF_THREAD + "获取锁成功,赶紧干活!"); 

            Thread.sleep(2000); 

            System.out.println(LOG_PREFIX_OF_THREAD + "删除本节点:"+selfPath); 

            zk.delete(this.selfPath, -1); 

            releaseConnection(); 

            threadSemaphore.countDown(); 

        } 

        /**

         * 关闭ZK连接

         */ 

        public void releaseConnection() { 

            if ( this.zk !=null ) { 

                try

                    this.zk.close(); 

                } catch ( InterruptedException e ) {} 

            } 

            System.out.println(LOG_PREFIX_OF_THREAD + "释放连接"); 

        } 

        /**

         * 检查自己是不是最小的节点

         * @return 

         */ 

        public boolean checkMinPath() throws KeeperException, InterruptedException { 

             List<String> subNodes = zk.getChildren(GROUP_PATH, false); 

             Collections.sort(subNodes); 

             int index = subNodes.indexOf( selfPath.substring(GROUP_PATH.length()+1)); 

             switch (index){ 

                 case -1:{ 

                 System.out.println(LOG_PREFIX_OF_THREAD+"本节点已不在了..."+selfPath); 

                     return false

                 } 

                 case 0:{ 

                 System.out.println(LOG_PREFIX_OF_THREAD+"子节点中,我果然是老大"+selfPath); 

                     return true

                 } 

                 default:{ 

                     this.waitPath = GROUP_PATH +"/"+ subNodes.get(index - 1); 

                     System.out.println(LOG_PREFIX_OF_THREAD+"获取子节点中,排在我前面的"+waitPath); 

                     try

                         zk.getData(waitPath, true, new Stat()); 

                         return false

                     }catch(KeeperException e){ 

                         if(zk.exists(waitPath,false) == null){ 

                           System.out.println(LOG_PREFIX_OF_THREAD+"子节点中,排在我前面的"+waitPath+"已失踪,幸福来得太突然?"); 

                             return checkMinPath(); 

                         }else

                             throw e; 

                         } 

                     } 

                 } 

                      

             } 

          

        } 

        @Override 

        public void process(WatchedEvent event) { 

            if(event == null){ 

                return

            } 

            Event.KeeperState keeperState = event.getState(); 

            Event.EventType eventType = event.getType(); 

           

            if ( Event.KeeperState.SyncConnected == keeperState) { 

                if ( Event.EventType.None == eventType ) { 

                System.out.println( LOG_PREFIX_OF_THREAD + "成功连接上ZK服务器" ); 

                    connectedSemaphore.countDown(); 

                }else if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) { 

                System.out.println(LOG_PREFIX_OF_THREAD + "收到情报,排我前面的家伙已挂,我是不是可以出山了?"); 

                    try

                        if(checkMinPath()){ 

                            getLockSuccess(); 

                        } 

                    } catch (KeeperException e) { 

                        e.printStackTrace(); 

                    } catch (InterruptedException e) { 

                        e.printStackTrace(); 

                    } 

                } 

            }else if ( Event.KeeperState.Disconnected == keeperState ) { 

            System.out.println( LOG_PREFIX_OF_THREAD + "与ZK服务器断开连接" ); 

            } else if ( Event.KeeperState.fromInt(4) == keeperState ) { 

            System.out.println( LOG_PREFIX_OF_THREAD + "权限检查失败" ); 

            } else if ( Event.KeeperState.Expired == keeperState ) { 

            System.out.println( LOG_PREFIX_OF_THREAD + "会话失效" ); 

            } 

        } 

  • 相关阅读:
    jmeter教程索引
    JMeter 中_time 函数的使用(时间戳、当前时间)
    通用分页存储过程
    如何才算掌握Java(J2SE篇) 转载
    Java 外企面试若干题
    Java 有用的网址 转载
    JDBC链接基本步骤
    java基础学习 视频学习 数据类型以及运算符
    Java基础 构造对象初始化变量的顺序浅见
    全面解析《嵌入式程序员应该知道的16个问题》 转载
  • 原文地址:https://www.cnblogs.com/yuancheng1/p/6004393.html
Copyright © 2011-2022 走看看