正文
一,JavaAPI普通操作
上篇文章已经对zookeeper的使用有了简单的介绍,api的使用也相对简单,在使用前需要导入zookeeper的jar包,其他就如下代码。
package zookeeperTest; import java.util.List; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test; public class ZookeeperDemo { ZooKeeper zk=null; @Before public void init() throws Exception { // 构造一个连接zookeeper的客户端对象 zk = new ZooKeeper("hd1:2181,hd2:2181,hd3:2181", 200, null); } @Test public void testCreate() throws Exception { // 参数1:要创建的节点路径 参数2:数据 参数3:访问权限 参数4:节点类型 String path = zk.create("/java", "hello word".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(path); zk.close(); } @Test public void testUpdate() throws Exception { // 参数1:节点路径 参数2:数据 参数3:所要修改的版本,-1代表任何版本 Stat status = zk.setData("/java", "set data test".getBytes(), -1); zk.close(); } @Test public void testGet() throws Exception { // 参数1:节点路径 参数2:是否要监听 参数3:所要获取的数据的版本,null表示最新版本 byte[] data = zk.getData("/java", false, null); System.out.println(new String(data, "UTF-8")); zk.close(); } @Test public void testGetChild() throws Exception { // 参数1:节点路径 参数2:是否要监听 // 注意:返回的结果中只有子节点名字,不带全路径 List<String> childs = zk.getChildren("/", false); for (String string : childs) { System.out.println(string); } zk.close(); } @Test public void testRemove() throws Exception { zk.delete("/java", -1); List<String> childs = zk.getChildren("/", false); for (String string : childs) { System.out.println(string); } zk.close(); } }
二,JavaAPI监控操作
上面一节是无监控的使用,下面是有监控的使用:
package zookeeperTest; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; import org.junit.Before; import org.junit.Test; public class ZookeeperWatch { static ZooKeeper zk = null; WatchDemo wd = new WatchDemo(); // 创建watcher类实现Watch接口 public static class WatchDemo implements Watcher{ @Override public void process(WatchedEvent event) { // event返回的事件对象 if(event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeDataChanged) { System.out.println("有数据改动了"); try { zk.getData("/server", true, null); // 循环监听 } catch (Exception e) { } }else if (event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeChildrenChanged) { System.out.println("子节点变化了"); } } } @Before public void init() throws Exception{ // 构造一个连接zookeeper的客户端对象 zk = new ZooKeeper("hd1:2181,hd2:2181,hd3:2181", 2000, wd); } // 节点数据监听 @Test public void getUpdateWatch() throws Exception{ zk.getData("/server", true, null); Thread.sleep(Long.MAX_VALUE); } // 节点节点监听 @Test public void getChildWatch() throws Exception{ zk.getChildren("/server", true); Thread.sleep(Long.MAX_VALUE); } }
三, 利用zookeeper实现服务器上下线动态感知
3.1 Consumer类(client)
package cn.edu360.zk.distributesystem; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.util.ArrayList; import java.util.List; import java.util.Random; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; public class Consumer { // 定义一个list用于存放最新的在线服务器列表 private volatile ArrayList<String> onlineServers = new ArrayList<>(); // 构造zk连接对象 ZooKeeper zk = null; // 构造zk客户端连接 public void connectZK() throws Exception { zk = new ZooKeeper("hd1:2181,hd2:2181,hd3:2181", 2000, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeChildrenChanged) { try { // 事件回调逻辑中,再次查询zk上的在线服务器节点即可,查询逻辑中又再次注册了子节点变化事件监听 getOnlineServers(); } catch (Exception e) { e.printStackTrace(); } } } }); } // 查询在线服务器列表 public void getOnlineServers() throws Exception { List<String> children = zk.getChildren("/servers", true); ArrayList<String> servers = new ArrayList<>(); for (String child : children) { byte[] data = zk.getData("/servers/" + child, false, null); String serverInfo = new String(data); servers.add(serverInfo); } onlineServers = servers; System.out.println("查询了一次zk,当前在线的服务器有:" + servers); } public void sendRequest() throws Exception { Random random = new Random(); while (true) { try { // 挑选一台当前在线的服务器 int nextInt = random.nextInt(onlineServers.size()); String server = onlineServers.get(nextInt); String hostname = server.split(":")[0]; int port = Integer.parseInt(server.split(":")[1]); System.out.println("本次请求挑选的服务器为:" + server); Socket socket = new Socket(hostname, port); OutputStream out = socket.getOutputStream(); InputStream in = socket.getInputStream(); out.write("haha".getBytes()); out.flush(); byte[] buf = new byte[256]; int read = in.read(buf); System.out.println("服务器响应的时间为:" + new String(buf, 0, read)); out.close(); in.close(); socket.close(); Thread.sleep(2000); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) throws Exception { Consumer consumer = new Consumer(); // 构造zk连接对象 consumer.connectZK(); // 查询在线服务器列表 consumer.getOnlineServers(); // 处理业务(向一台服务器发送时间查询请求) consumer.sendRequest(); } }
3.2 TimeQueryServer(zookeeper注册服务器)
package cn.edu360.zk.distributesystem; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; public class TimeQueryServer { ZooKeeper zk = null; // 构造zk客户端连接 public void connectZK() throws Exception{ zk = new ZooKeeper("hd1:2181,hd2:2181,hd3:2181", 2000, null); } // 注册服务器信息 public void registerServerInfo(String hostname,String port) throws Exception{ /** * 先判断注册节点的父节点是否存在,如果不存在,则创建 */ Stat stat = zk.exists("/servers", false); if(stat==null){ zk.create("/servers", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } // 注册服务器数据到zk的约定注册节点下 String create = zk.create("/servers/server", (hostname+":"+port).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(hostname+" 服务器向zk注册信息成功,注册的节点为:" + create); } public static void main(String[] args) throws Exception { TimeQueryServer timeQueryServer = new TimeQueryServer(); // 构造zk客户端连接 timeQueryServer.connectZK(); // 注册服务器信息 timeQueryServer.registerServerInfo(args[0], args[1]); // 启动业务线程开始处理业务 new TimeQueryService(Integer.parseInt(args[1])).start(); } }
3.3 TimeQueryService(server提供服务)
package cn.edu360.zk.distributesystem; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import java.util.Date; public class TimeQueryService extends Thread{ int port = 0; public TimeQueryService(int port){ this.port = port; } @Override public void run() { try { ServerSocket ss = new ServerSocket(port); System.out.println("业务线程已绑定端口"+port+"准备接受消费端请求了....."); while(true){ Socket sc = ss.accept(); InputStream inputStream = sc.getInputStream(); OutputStream outputStream = sc.getOutputStream(); outputStream.write(new Date().toString().getBytes()); } } catch (IOException e) { e.printStackTrace(); } } }