zoukankan      html  css  js  c++  java
  • Zookeeper 学习(四) zookeeper的javaAPI使用

    一,JavaAPI普通操作

    二,JavaAPI监控操作

    三, 利用zookeeper实现服务器上下线动态感知

    正文

    一,JavaAPI普通操作

      上篇文章已经对zookeeper的使用有了简单的介绍,api的使用也相对简单,在使用前需要导入zookeeper的jar包,其他就如下代码。

      

    package zookeeperTest;
    
    
    import java.util.List;
    
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.data.Stat;
    import org.junit.Before;
    import org.junit.Test;
    
    public class ZookeeperDemo {
        ZooKeeper zk=null;
        
        @Before
        public void init() throws Exception {
            // 构造一个连接zookeeper的客户端对象
            zk = new ZooKeeper("hd1:2181,hd2:2181,hd3:2181", 200, null);
        }
        
        @Test
        public void testCreate() throws Exception {
            // 参数1:要创建的节点路径  参数2:数据  参数3:访问权限  参数4:节点类型
            String path = zk.create("/java", "hello word".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println(path);
            zk.close();
        }
        
        @Test
        public void testUpdate() throws Exception {
            // 参数1:节点路径   参数2:数据    参数3:所要修改的版本,-1代表任何版本
            Stat status = zk.setData("/java", "set data test".getBytes(), -1);
            zk.close();
        }
        
        
        @Test
        public void testGet() throws Exception {
            // 参数1:节点路径    参数2:是否要监听    参数3:所要获取的数据的版本,null表示最新版本
            byte[] data = zk.getData("/java", false, null);
            System.out.println(new String(data, "UTF-8"));
            zk.close();
        }
        
        @Test
        public void testGetChild() throws Exception {
            // 参数1:节点路径    参数2:是否要监听   
            // 注意:返回的结果中只有子节点名字,不带全路径
            List<String> childs = zk.getChildren("/", false);
            for (String string : childs) {
                System.out.println(string);
            }
            zk.close();
        }
        
        @Test
        public void testRemove() throws Exception {
            zk.delete("/java", -1);
            List<String> childs = zk.getChildren("/", false);
            for (String string : childs) {
                System.out.println(string);
            }
            zk.close();
        }
    }

    二,JavaAPI监控操作

      上面一节是无监控的使用,下面是有监控的使用:

    package zookeeperTest;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.Watcher.Event.EventType;
    import org.apache.zookeeper.Watcher.Event.KeeperState;
    import org.apache.zookeeper.ZooKeeper;
    import org.junit.Before;
    import org.junit.Test;
    
    public class ZookeeperWatch {
        
        static ZooKeeper zk = null;
        
        WatchDemo wd = new WatchDemo();
        
        // 创建watcher类实现Watch接口
        public static class WatchDemo implements Watcher{
            @Override
            public void process(WatchedEvent event) {
                // event返回的事件对象
                if(event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeDataChanged) {
                    System.out.println("有数据改动了");
                    try {
                        zk.getData("/server", true, null); // 循环监听
                    } catch (Exception e) {
                    } 
        
                }else if (event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeChildrenChanged) {
                    System.out.println("子节点变化了");
                }
                
            }
            
        }
        
        
        @Before
        public void init()  throws Exception{
            // 构造一个连接zookeeper的客户端对象
            zk = new ZooKeeper("hd1:2181,hd2:2181,hd3:2181", 2000, wd);
        }
        
        // 节点数据监听
        @Test
        public void getUpdateWatch() throws Exception{
            zk.getData("/server", true, null);
            Thread.sleep(Long.MAX_VALUE);
        }
        
        // 节点节点监听
        @Test
        public void getChildWatch() throws Exception{
            zk.getChildren("/server", true);
            Thread.sleep(Long.MAX_VALUE);
        }
    
    }

     三, 利用zookeeper实现服务器上下线动态感知

      3.1 Consumer类(client)

    package cn.edu360.zk.distributesystem;
    
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.Socket;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.Watcher.Event.EventType;
    import org.apache.zookeeper.Watcher.Event.KeeperState;
    
    public class Consumer {
    
        // 定义一个list用于存放最新的在线服务器列表
        private volatile ArrayList<String> onlineServers = new ArrayList<>();
    
        // 构造zk连接对象
        ZooKeeper zk = null;
    
        // 构造zk客户端连接
        public void connectZK() throws Exception {
    
            zk = new ZooKeeper("hd1:2181,hd2:2181,hd3:2181", 2000, new Watcher() {
    
                @Override
                public void process(WatchedEvent event) {
                    if (event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeChildrenChanged) {
    
                        try {
                            // 事件回调逻辑中,再次查询zk上的在线服务器节点即可,查询逻辑中又再次注册了子节点变化事件监听
                            getOnlineServers();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }
    
        // 查询在线服务器列表
        public void getOnlineServers() throws Exception {
    
            List<String> children = zk.getChildren("/servers", true);
            ArrayList<String> servers = new ArrayList<>();
    
            for (String child : children) {
                byte[] data = zk.getData("/servers/" + child, false, null);
    
                String serverInfo = new String(data);
    
                servers.add(serverInfo);
            }
            onlineServers = servers;
            System.out.println("查询了一次zk,当前在线的服务器有:" + servers);
    
        }
    
        public void sendRequest() throws Exception {
            Random random = new Random();
            while (true) {
                try {
                    // 挑选一台当前在线的服务器
                    int nextInt = random.nextInt(onlineServers.size());
                    String server = onlineServers.get(nextInt);
                    String hostname = server.split(":")[0];
                    int port = Integer.parseInt(server.split(":")[1]);
    
                    System.out.println("本次请求挑选的服务器为:" + server);
    
                    Socket socket = new Socket(hostname, port);
                    OutputStream out = socket.getOutputStream();
                    InputStream in = socket.getInputStream();
    
                    out.write("haha".getBytes());
                    out.flush();
    
                    byte[] buf = new byte[256];
                    int read = in.read(buf);
                    System.out.println("服务器响应的时间为:" + new String(buf, 0, read));
    
                    out.close();
                    in.close();
                    socket.close();
    
                    Thread.sleep(2000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
    
            }
    
        }
    
        public static void main(String[] args) throws Exception {
    
            Consumer consumer = new Consumer();
            // 构造zk连接对象
            consumer.connectZK();
    
            // 查询在线服务器列表
            consumer.getOnlineServers();
    
            // 处理业务(向一台服务器发送时间查询请求)
            consumer.sendRequest();
    
        }
    
    }

      3.2 TimeQueryServer(zookeeper注册服务器)

    package cn.edu360.zk.distributesystem;
    
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.data.Stat;
    
    public class TimeQueryServer {
        ZooKeeper zk = null;
        
        // 构造zk客户端连接
        public void connectZK() throws Exception{
            
            zk = new ZooKeeper("hd1:2181,hd2:2181,hd3:2181", 2000, null);
            
        }
        
        
        // 注册服务器信息
        public void registerServerInfo(String hostname,String port) throws Exception{
            /**
             * 先判断注册节点的父节点是否存在,如果不存在,则创建
             */
            Stat stat = zk.exists("/servers", false);
            if(stat==null){
                zk.create("/servers", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            
            // 注册服务器数据到zk的约定注册节点下
            String create = zk.create("/servers/server", (hostname+":"+port).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            
            System.out.println(hostname+" 服务器向zk注册信息成功,注册的节点为:" + create);
            
        }
        
        
        public static void main(String[] args) throws Exception {
            
            TimeQueryServer timeQueryServer = new TimeQueryServer();
            
            // 构造zk客户端连接
            timeQueryServer.connectZK();
            
            // 注册服务器信息
            timeQueryServer.registerServerInfo(args[0], args[1]);
            
            // 启动业务线程开始处理业务
            new TimeQueryService(Integer.parseInt(args[1])).start();
            
        }
    }

      3.3 TimeQueryService(server提供服务)

    package cn.edu360.zk.distributesystem;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.Date;
    
    public class TimeQueryService extends Thread{
        
        int port = 0;
        public TimeQueryService(int port){
            this.port = port;
        }
        
        
        @Override
        public void run() {
            try {
                ServerSocket ss = new ServerSocket(port);
                System.out.println("业务线程已绑定端口"+port+"准备接受消费端请求了.....");
                while(true){
                    Socket sc = ss.accept();
                    InputStream inputStream = sc.getInputStream();
                    OutputStream outputStream = sc.getOutputStream();
                    outputStream.write(new Date().toString().getBytes());
                }
                
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
  • 相关阅读:
    MySQL InnoDB 存储引擎探秘
    MySQL优化面试
    技术面试老是有劲使不出,该怎么办?
    详解RPC远程调用和消息队列MQ的区别
    ConcurrentHashMap1.8源码分析
    kafka topic制定规则
    GitLab本地、远程更新已经fork的项目
    Swagger2使用参考
    分布式配置中心选型
    搭建Elasticsearch平台
  • 原文地址:https://www.cnblogs.com/tashanzhishi/p/10869136.html
Copyright © 2011-2022 走看看