zoukankan      html  css  js  c++  java
  • 大数据入门第二天——基础部分之zookeeper(下)

    一、集群自启动脚本

      1.关闭zk

    [root@localhost bin]# jps
    3104 Jps
    2805 QuorumPeerMain
    [root@localhost bin]# kill -9 2805

      //kill或者stop都是可以的

      2.远程执行命令

    [root@localhost bin]# ssh 192.168.137.138 /opt/zookeeper/zookeeper-3.4.5/bin/zkServer.sh start
    root@192.168.137.138's password: 
    JMX enabled by default
    Using config: /opt/zookeeper/zookeeper-3.4.5/bin/../conf/zoo.cfg
    Starting zookeeper ... /opt/zookeeper/zookeeper-3.4.5/bin/zkServer.sh: 第 103 行:[: /tmp/zookeeper: 期待二元表达式
    STARTED

      出现以上中文的地方只需要修改一下zoo.cfg,把多余的配置注释即可!

      当然,这样还是无法启动!因为ssh过去是以一个bash的方式过去的(不会执行/etc/profile,而正常登录是会执行的,也可以打开相应的脚本进行查看),也就是PATH不在了,导致JAVA_HOME等找不到了!

      EXPORT所定义的变量对自己所在的shell以及子shell生效

      这里就需要用到之前说到的source命令了:https://www.cnblogs.com/pkufork/p/linux_source.html

    ssh 192.168.137.138 "source /etc/profile&&/opt/zookeeper/zookeeper-3.4.5/bin/zkServer.sh start"

      //如果不使用引号,将会以空格作为命令的分割!

      3.配置免密登录

      在其中一台机器上(这里是192.168.137.128)

    ssh-keygen

      //之后enter即可

    ssh-copy-id 192.168.137.128
    ssh-copy-id 192.168.137.138
    ssh-copy-id 192.168.137.148

      4.一键启动脚本

    cd /root
    mkdir bin
    cd bin
    vim startZK.sh
    #!/bin/bash
    SERVERS="192.168.137.128 192.168.137.138 192.168.137.148"
    echo "start zk..."
    for SERVER in $SERVERS
    do
        ssh $SERVER "source /etc/profile&&/opt/zookeeper/zookeeper-3.4.5/bin/zkServer.sh start"
    done
    chmod +x startZK.sh 

      这样,通过startZK.sh就能一键启动了!(/root/bin默认在PATH中了!)

    二、zk的Java客户端

      相关API入门可以参考:https://www.cnblogs.com/ggjucheng/p/3370359.html

       第三方的客户端:zkclient参考:https://www.cnblogs.com/f1194361820/p/5575206.html

       1.引入maven依赖

            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>3.4.6</version>
            </dependency>    

      2.测试程序是否能通

    public class SimpleZK {
        // 设置连接字符串(可以用逗号隔开多个),失败时会自动尝试多个
        private static final String coonectString = "192.168.137.128:2181,192.168.137.138:2181,192.168.137.148:2181";
        // 超时时间
        private static final int sessionTimeout = 2000;
        public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
            ZooKeeper zkClient = new ZooKeeper(coonectString, sessionTimeout, new Watcher() {
                public void process(WatchedEvent watchedEvent) {
                    // 收到通知事件后的回调函数
                    System.out.println(watchedEvent.getType()+"---"+watchedEvent.getPath());
                }
            });
            // zk的增删改查(这里使用最底层的原生操作,zkclient的待补充)
            String node = zkClient.create("/app2", "hellozk".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    
        }
    
    }

      完美:

        

      3.增删改查实例

    package com.zk;
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    import org.junit.Before;
    import org.junit.Test;
    
    
    import java.io.IOException;
    import java.util.List;
    
    /**
     * 测试zk的Java客户端
     *
     * @author zcc ON 2018/1/17
     **/
    public class SimpleZK {
        // 设置连接字符串(可以用逗号隔开多个),失败时会自动尝试多个
        private static final String coonectString = "192.168.137.128:2181,192.168.137.138:2181,192.168.137.148:2181";
        // 超时时间
        private static final int sessionTimeout = 2000;
        // 初始变量
        ZooKeeper zkClient = null;
    
        /**
         * 初始化方法
         * @throws IOException
         */
        @Before
        public void init() throws IOException {
            zkClient = new ZooKeeper(coonectString, sessionTimeout, new Watcher() {
                public void process(WatchedEvent watchedEvent) {
                    // 收到通知事件后的回调函数
                    System.out.println(watchedEvent.getType()+"---"+watchedEvent.getPath());
                    // 开启循环监听(监听调用此方法,此方法又开启监听)
                    try {
                        zkClient.getChildren("/", true);
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    
        /**
         * 创建
         */
        public void testCreate() throws IOException, KeeperException, InterruptedException {
            String node = zkClient.create("/app2", "hellozk".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    
        /**
         * 判断是否存在节点
         * @throws KeeperException
         * @throws InterruptedException
         */
        public void testExists() throws KeeperException, InterruptedException {
            // stat就是zk中那一堆数据(null则不存在)
            Stat stat = zkClient.exists("/", false);
        }
        /**
         * 取得子节点
         */
        @Test
        public void testGetChildren() throws KeeperException, InterruptedException {
            List<String> children = zkClient.getChildren("/", true);
            for (String child : children) {
                System.out.println(child);
            }
        }
    
        /**
         * 获取数据
         */
        @Test
        public void testGetData() throws KeeperException, InterruptedException {
            // 分别是路径,是否监听以及状态版本(null就OK了,取最新)
            byte[] data = zkClient.getData("/app2", false, null);
            System.out.println(new String(data));
        }
    
        /**
         * 删除数据
         */
        @Test
        public void testDeleteZnode() throws KeeperException, InterruptedException {
            // -1作为版本号参数表示删除所有版本(上层的API是不用传这些参数的)
            zkClient.delete("/app2",-1);
        }
    
        /**
         * 修改数据
         */
        public void testSetData() throws KeeperException, InterruptedException {
            zkClient.setData("/app2", "hellozkCli".getBytes(), -1);
        }
    }
    View Code

    三、应用实例

      1.分布式服务器动态上下线感知(主节点HA)

      大致原理:

        

      大致流程比较清晰,当服务器启动时就去注册信息(例如给出主机与id等信息),给出一个短暂的带序列号的临时节点,这样服务器下线的时候节点便被删除了;而客户端则是去zk获取子节点信息,得到服务器列表并且注册监听,这样当有节点发生改变时变可以感知变化了!

      2.Java代码

        IDEA中给main方法添加参数,参考:http://blog.csdn.net/u013713294/article/details/53020293

      服务都端:

    package com.zk;
    
    import org.apache.zookeeper.*;
    
    import java.io.IOException;
    
    /**
     * 分布式服务器动态感知——服务端
     *
     * @author zcc ON 2018/1/17
     **/
    public class DistributedServer {
    
        private static final String coonectString = "192.168.137.128:2181,192.168.137.138:2181,192.168.137.148:2181";
        private static final int sessionTimeout = 2000;
        private static final String parentNode = "/servers";
    
        private ZooKeeper zk = null;
    
        /**
         * 获得连接
         * @throws IOException
         */
        public void getConn() throws IOException {
            zk = new ZooKeeper(coonectString, sessionTimeout, new Watcher() {
                public void process(WatchedEvent watchedEvent) {
                    // 收到通知事件后的回调函数
                    System.out.println(watchedEvent.getType()+"---"+watchedEvent.getPath());
                    // 开启循环监听(监听调用此方法,此方法又开启监听)
                    try {
                        zk.getChildren("/", true);
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    
        /**
         * 注册服务器
         */
        public void registServer(String hostname) throws KeeperException, InterruptedException {
            // 临时有编号的节点,可以重名
            String znode = zk.create(parentNode + "/server", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(hostname+" is online..."+znode);
        }
    
        /**
         * 业务逻辑
         */
        public void handleBI(String hostname) throws InterruptedException {
            System.out.println(hostname+" start working...");
            // 模拟保持不关闭状态
            Thread.sleep(Long.MAX_VALUE);
        }
    
        public static void main(String[] args) throws Exception {
            // 获取zk连接
            DistributedServer servers = new DistributedServer();
            servers.getConn();
            // 注册服务器
            servers.registServer(args[0]);
            // 业务逻辑
            servers.handleBI(args[0]);
        }
    }
    View Code

      客户端:

    package com.zk;
    
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * 分布式服务器动态感知——客户端
     *
     * @author zcc ON 2018/1/17
     **/
    public class DistributedClient {
        private static final String coonectString = "192.168.137.128:2181,192.168.137.138:2181,192.168.137.148:2181";
        private static final int sessionTimeout = 2000;
        private static final String parentNode = "/servers";
        // 注意volatile的使用意义(使每个线程都得到最新值)
        private volatile List<String> serverList;
    
        private ZooKeeper zk = null;
    
        /**
         * 获得连接
         * @throws IOException
         */
        public void getConn() throws IOException {
            zk = new ZooKeeper(coonectString, sessionTimeout, new Watcher() {
                public void process(WatchedEvent watchedEvent) {
                    try {
                        // 回调事件重新更新服务器列表并注册监听
                        getServerList();
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    
        /**
         * 业务逻辑
         */
        public void handleBI() throws InterruptedException {
            System.out.println("client working...");
            // 模拟保持不关闭状态
            Thread.sleep(Long.MAX_VALUE);
        }
        public void getServerList() throws KeeperException, InterruptedException {
            // 获取服务器子节点信息,并且对父节点监听
            List<String> children = zk.getChildren(parentNode, true);
            // 用于存放服务器列表的List
            List<String> servers = new ArrayList<>();
            for (String child : children) {
                // 获取数据(这里无需监听,因为连接上了不需要更换连接了)
                byte[] data = zk.getData(parentNode + "/" + child, false, null);
                servers.add(new String(data));
            }
            // 更新到成员变量,供业务线程使用
            serverList = servers;
            // 打印服务器列表
            System.out.println(serverList);
        }
    
        public static void main(String[] args) throws Exception {
            // 获取连接
            DistributedClient client = new DistributedClient();
            client.getConn();
            // 获取子节点信息,并监听
            client.getServerList();
            // 启动业务功能
            client.handleBI();
        }
    }
    View Code
  • 相关阅读:
    java并发编程 线程间协作
    博客园添加目录,导航,回到顶部
    汉诺塔递归实现
    java并发编程 线程基础
    Flink中算子进行Chain的规则分析(最新代码,源码版本大于1.11.2)
    Flink流处理程序在Local模式下的运行流程源码分析
    Flink-DataStream流处理应用(Local模式下)运行流程-源码分析
    Flink Streaming基于滚动窗口的事件时间分析
    Spark-2.3.2 Java SparkSQL的自定义HBase数据源
    Spark-2.3.2 HBase BulkLoad
  • 原文地址:https://www.cnblogs.com/jiangbei/p/8302147.html
Copyright © 2011-2022 走看看