zoukankan      html  css  js  c++  java
  • 【zookeeper】4、利用zookeeper,借助观察模式,判断服务器的上下线

     首先什么是观察者模式,可以看看我之前的设计模式的文章

    https://www.cnblogs.com/cutter-point/p/5249780.html

    确定一下,要有观察者,要有被观察者,然后要被观察者触发事件,事件发生之后,观察者触发相应的事件发生

    了解了基本概念,我们来看看zookeeper是什么情况

    zookeeper也是类似观察者一样,我们先把本机信息注册进入服务器,然后设置一个watch方法,这个在zookeeper节点发生变化的时候通知对应的客户端,触发对应的方法

    这里先注册服务,如何向zookeeper进行注册呢

    package cn.cutter.demo.hadoop.zookeeper;
    
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooDefs;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.data.Stat;
    
    /**
     * @ProjectName: cutter-point
     * @Package: cn.cutter.demo.hadoop.zookeeper
     * @ClassName: TimeQueryServer
     * @Author: xiaof
     * @Description: 利用zookeeper来进行分布式时间查询
     * @Date: 2019/4/2 19:37
     * @Version: 1.0
     */
    public class TimeQueryServer {
    
        private ZooKeeper zooKeeper;
    
        // 构造zk客户端连接
        public void connectZK() throws Exception{
            zooKeeper = new ZooKeeper("192.168.1.4:2181,192.168.1.4:2182,192.168.1.4:2183", 2000, null);
        }
    
    
        // 注册服务器信息
        public void registerServerInfo(String hostname,String port) throws Exception{
    
            /**
             * 先判断注册节点的父节点是否存在,如果不存在,则创建
             */
            Stat stat = zooKeeper.exists("/servers", false);
            if(stat==null){
                zooKeeper.create("/servers", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
    
            // 注册服务器数据到zk的约定注册节点下
            String create = zooKeeper.create("/servers/server", (hostname+":"+port).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    
            System.out.println(hostname+" 服务器向zk注册信息成功,注册的节点为:" + create);
    
        }
    
    }

    如果注入了服务,那么我们为了监控这个服务的存在,那么是不是应该也模拟一个服务?

    好,这里我们就做一个时钟同步的服务,用消费线程不断请求服务,并获取当前时间

    package cn.cutter.demo.hadoop.zookeeper;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.*;
    import java.util.Date;
    import java.util.Iterator;
    
    /**
     * @ProjectName: cutter-point
     * @Package: cn.cutter.demo.hadoop.zookeeper
     * @ClassName: TimeQueryService
     * @Author: xiaof
     * @Description: ${description}
     * @Date: 2019/4/2 19:43
     * @Version: 1.0
     */
    public class TimeQueryService extends Thread {
    
        private static final Log log = LogFactory.getLog(TimeQueryService.class);
    
        int port = 0;
    
        public TimeQueryService(int port) {
            this.port = port;
        }
    
        @Override
        public void run() {
    
            //1.创建信道选择器
            Selector selector = null;
            //不断读取字符,只有读到换行我们才进行输出
    //        StringBuffer stringBuffer = new StringBuffer();
            try {
                selector = Selector.open();
                //2.创建对应端口的监听
                //2.1 创建通道
                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
                //2.2 socket 对象绑定端口  socket() 获取与此通道关联的服务器套接字
                serverSocketChannel.socket().bind(new InetSocketAddress(port));
                //2.3 设置为非阻塞
                serverSocketChannel.configureBlocking(false);
                //注册到对应的选择器,读取信息
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            } catch (IOException e) {
                e.printStackTrace();
            }
            //3.轮询获取信息
            while (true) {
                //获取socket对象
                //获取准备好的信道总数
                if (!selector.isOpen()) {
                    System.out.println("is close over");
                    break;
                }
    
                try {
                    if (selector.select(3000) == 0) {
                        continue; //下一次循环
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
    
                //获取信道
                Iterator<SelectionKey> keyIterable = selector.selectedKeys().iterator();
                while (keyIterable.hasNext()) {
                    //6.遍历键集,判断键类型,执行相应的操作
                    SelectionKey selectionKey = keyIterable.next();
                    //判断键类型,执行相应操作
                    if (selectionKey.isAcceptable()) {
                        try {
                            //从key中获取对应信道
                            //接受数据
                            SocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept();
                            //并设置成非阻塞
                            socketChannel.configureBlocking(false);
                            //从新注册,修改状态
                            socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
    
                    if (selectionKey.isReadable()) {
                        //读取数据
                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                        //获取当前的附加对象。
                        ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment();
    
                        //判断是是否断开连接
                        int count = 0;
                        while (true) {
                            try {
                                if (!((count = socketChannel.read(byteBuffer)) != 0 && count != -1 && selectionKey.isValid())) {
                                    if(count == -1) {
                                        //关闭通道
                                        socketChannel.close();
                                    }
                                    break;
                                }
                            } catch (IOException e) {
    //                                e.printStackTrace();
                                try {
                                    //如果读取数据会抛出异常,那么就断定通道已经被客户端关闭
                                    socketChannel.close();
                                } catch (IOException e1) {
                                    e1.printStackTrace();
                                }
                                System.out.println("无法读取数据!");
                                break;
                            }
                            //判断是否有换行
    //                            byteBuffer.flip();
                            byte msg[] = byteBuffer.array();
                            boolean isOver = false;
                            int i = byteBuffer.position() - count;
                            for (; i < byteBuffer.position(); ++i) {
                                //判断是否有换行
                                if (byteBuffer.get(i) == '
    ' || byteBuffer.get(i) == '
    ') {
                                    //输出
                                    //先压缩数据
                                    byteBuffer.flip();
                                    byte out[] = new byte[byteBuffer.limit()];
                                    byteBuffer.get(out, 0, out.length);
                                    log.info(new String(out));
                                    //设置成可以读和可写状态
                                    byteBuffer.compact();
                                    byteBuffer.clear();
                                    isOver = true;
                                }
                            }
                            if (isOver == true) {
    //                            interestOps(SelectionKey.OP_READ);的意思其实就是用同一个KEY重新注册
                                selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                                break;
                            }
                        }
    
                        if (count == -1) {
                            //如果是-1 ,那么就关闭客户端
                            try {
                                socketChannel.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        } else {
    //                            selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                        }
                    }
    
                    //告知此键是否有效。
                    if (selectionKey.isValid() && selectionKey.isWritable()) {
                        //获取当前的附加对象。
    //                        ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment();
                        // 清空,并写入数据
    //                        byteBuffer.clear();
                        byte smsBytes[] = (new Date().toString() + "
    ").getBytes();
                        ByteBuffer byteBuffer = ByteBuffer.wrap(smsBytes);
    //                        byteBuffer.put(smsBytes);
                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                        //写入数据
    //                        System.out.println(new String(byteBuffer.array()));
                        while (byteBuffer.hasRemaining()) {
                            //输出数据
                            try {
                                socketChannel.write(byteBuffer);
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                        //判断是否判断是否有待处理数据
                        if (!byteBuffer.hasRemaining()) {
                            //数据清理干净
                            selectionKey.interestOps(SelectionKey.OP_READ);
                        }
                        //压缩此缓冲区将缓冲区的当前位置和界限之间的字节(如果有)复制到缓冲区的开始处。
                        // 即将索引 p = position() 处的字节复制到索引 0 处,将索引 p + 1 处的字节复制到索引 1 处,依此类推,直到将索引 limit() - 1 处的字节复制到索引
                        // n = limit() - 1 - p 处。然后将缓冲区的位置设置为 n+1,并将其界限设置为其容量。如果已定义了标记,则丢弃它。
                        //将缓冲区的位置设置为复制的字节数,而不是零,以便调用此方法后可以紧接着调用另一个相对 put 方法。
                        //从缓冲区写入数据之后调用此方法,以防写入不完整。例如,以下循环语句通过 buf 缓冲区将字节从一个信道复制到另一个信道:
                        byteBuffer.compact();
                    }
    
                    //执行操作的时候,移除避免下一次循环干扰
    //                    原因是Selector不会自己从已选择键集中移除SelectionKey实例。必须在处理完通道时自己移除。下次该通道变成就绪时,Selector会再次将其放入已选择键集中
                    keyIterable.remove();
    
                }
    
            }
    
        }
    }

    说实话,这个服务端当时花了好大的力气写完的,mmp,因为客户端进行可以不关闭通道直接kill,导致服务端并不知道对端已经离线,到时候服务端不断再进行空轮训,一旦进行read就抛出io异常!!

    好了,服务和注册写完了,那么我们注册一把呗

    @Test
        public void test1() throws Exception {
            TimeQueryServer timeQueryServer = new TimeQueryServer();
    
            // 构造zk客户端连接
            timeQueryServer.connectZK();
    
            // 注册服务器信息
            timeQueryServer.registerServerInfo("192.168.1.7", "8888");
    
            // 启动业务线程开始处理业务
            new TimeQueryService(Integer.parseInt("8888")).start();
    
            while(true) {
                Thread.sleep(200000);
    //            System.out.println("..");
            }
        }

     不要在意那个null异常,那是因为我爸watch设置为null的原因

    服务端写完了,我们再考虑写一波客户端

    package cn.cutter.demo.hadoop.zookeeper;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    
    import java.net.InetSocketAddress;
    import java.net.SocketException;
    import java.nio.ByteBuffer;
    import java.nio.channels.SocketChannel;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;
    
    /**
     * @ProjectName: cutter-point
     * @Package: cn.cutter.demo.hadoop.zookeeper
     * @ClassName: Consumer
     * @Author: xiaof
     * @Description: ${description}
     * @Date: 2019/4/3 14:11
     * @Version: 1.0
     */
    public class Consumer {
    
        // 定义一个list用于存放最新的在线服务器列表
        private volatile ArrayList<String> onlineServers = new ArrayList<>();
    
        // 构造zk连接对象
        ZooKeeper zk = null;
    
        // 构造zk客户端连接
        public void connectZK() throws Exception {
    
            zk = new ZooKeeper("192.168.1.4:2181,192.168.1.4:2182,192.168.1.4:2183", 2000, new Watcher() {
    
                @Override
                public void process(WatchedEvent event) {
                    if (event.getState() == Event.KeeperState.SyncConnected && event.getType() == Event.EventType.NodeChildrenChanged) {
    
                        try {
                            // 事件回调逻辑中,再次查询zk上的在线服务器节点即可,查询逻辑中又再次注册了子节点变化事件监听
                            getOnlineServers();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
    
                    }
    
                }
            });
    
        }
    
        // 查询在线服务器列表
        public void getOnlineServers() throws Exception {
    
            List<String> children = zk.getChildren("/servers", true);
            ArrayList<String> servers = new ArrayList<>();
    
            for (String child : children) {
                byte[] data = zk.getData("/servers/" + child, false, null);
    
                String serverInfo = new String(data);
    
                servers.add(serverInfo);
            }
    
            onlineServers = servers;
            System.out.println("查询了一次zk,当前在线的服务器有:" + servers);
    
        }
    
        public void sendRequest() throws Exception {
            Random random = new Random();
            while (true) {
                try {
                    // 挑选一台当前在线的服务器
                    int nextInt = random.nextInt(onlineServers.size());
                    String server = onlineServers.get(nextInt);
                    String hostname = server.split(":")[0];
                    int port = Integer.parseInt(server.split(":")[1]);
    
                    System.out.println("本次请求挑选的服务器为:" + server);
    
                    //2.打开socket信道,设置成非阻塞模式
                    SocketChannel socketChannel = SocketChannel.open();
                    socketChannel.configureBlocking(false);
    
                    //3.尝试建立连接,然后轮询,判定,连接是否完全建立
                    int times = 0;
                    if(!socketChannel.connect(new InetSocketAddress(hostname, port))) {
                        while(!socketChannel.finishConnect()) {
    //                        System.out.println(times++ + ". ");
                        }
                    }
    
                    //4.创建相应的buffer缓冲
                    ByteBuffer writeBuffer = ByteBuffer.wrap("test
    ".getBytes());
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    int totalBytesRcvd = 0;
                    int bytesRcvd;
    
                    //5.向socket信道发送数据,然后尝试读取数据
                    socketChannel.write(writeBuffer);
                    //读取数据
                    if((bytesRcvd = socketChannel.read(readBuffer)) == -1) {
                        //这种非阻塞模式,如果读取不到数据是会返回0的,如果是-1该通道已到达流的末尾
                        throw new SocketException("连接关闭??");
                    }
    
                    //不停尝试获取数据,这是因为服务端数据反馈太慢了???
                    while (bytesRcvd == 0) {
                        bytesRcvd = socketChannel.read(readBuffer);
                    }
    
                    //6.输出
                    readBuffer.flip();
                    byte reads[] = new byte[readBuffer.limit()];
                    readBuffer.get(reads, 0, reads.length);
                    System.out.println("收到信息:" + new String(reads));
    
                    //7.关闭信道
                    socketChannel.close();
    
                    Thread.sleep(3000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
    
            }
    
        }
    
        public static void main(String[] args) throws Exception {
    
            Consumer consumer = new Consumer();
            // 构造zk连接对象
            consumer.connectZK();
    
            // 查询在线服务器列表
            consumer.getOnlineServers();
    
            // 处理业务(向一台服务器发送时间查询请求)
            consumer.sendRequest();
    
        }
    
    
    
    }

    启动客户端:

     为了体现zookeeper监控服务是否在线的操作,我们多起几个服务端,然后监控客户端的信息展示

     我们再起一个

     

     

     接下来我们kill掉8888端口的进程

     我们当前在线可以看到只有2个节点了,我们上zk看看,确实只有2个了

     

    到这里zookeeper的上下线的判断已经完成,我最近再自学大数据的东西,想向大数据进军一波,欢迎大家一起探讨大数据的学习。

  • 相关阅读:
    uniapp 检测android 是否开启GPS功能
    uniapp 使用$emit、$once 跨页面传值,数据改变,页面却不刷新(原创)
    根据输入关键字过滤数组列表(列表搜索功能)
    学习函数指针的笔记
    学习C++中指针和引用的区别
    学习Iterator笔记
    HTML5基础
    java错题集
    幸运抽奖
    吃货联盟
  • 原文地址:https://www.cnblogs.com/cutter-point/p/10651293.html
Copyright © 2011-2022 走看看