zoukankan      html  css  js  c++  java
  • Java实现心跳机制

    一、心跳机制简介

         在分布式系统中,分布在不同主机上的节点需要检测其他节点的状态,如服务器节点需要检测从节点是否失效。为了检测对方节点的有效性,每隔固定时间就发送一个固定信息给对方,对方回复一个固定信息,如果长时间没有收到对方的回复,则断开与对方的连接。

         发包方既可以是服务端,也可以是客户端,这要看具体实现。因为是每隔固定时间发送一次,类似心跳,所以发送的固定信息称为心跳包。心跳包一般为比较小的包,可根据具体实现。心跳包主要应用于长连接的保持与短线链接。

          一般而言,应该客户端主动向服务器发送心跳包,因为服务器向客户端发送心跳包会影响服务器的性能。

    二、心跳机制实现方式

        心跳机制有两种实现方式,一种基于TCP自带的心跳包,TCP的SO_KEEPALIVE选项可以,系统默认的默认跳帧频率为2小时,超过2小时后,本地的TCP 实现会发送一个数据包给远程的 Socket. 如果远程Socket 没有发回响应, TCP实现就会持续尝试 11 分钟, 直到接收到响应为止。 否则就会自动断开Socket连接。但TCP自带的心跳包无法检测比较敏感地知道对方的状态,默认2小时的空闲时间,对于大多数的应用而言太长了。可以手工开启KeepAlive功能并设置合理的KeepAlive参数。

        另一种在应用层自己进行实现,基本步骤如下:

    1. Client使用定时器,不断发送心跳;
    2. Server收到心跳后,回复一个包;
    3. Server为每个Client启动超时定时器,如果在指定时间内没有收到Client的心跳包,则Client失效。

    三、Java实现心跳机制

        这里基于Java实现的简单RPC框架实现心跳机制。Java实现代码如下所示:

        心跳客户端类:

    public class HeartbeatClient implements Runnable {
    
        private String serverIP = "127.0.0.1";
        private int serverPort = 8089;
        private String nodeID = UUID.randomUUID().toString();
        private boolean isRunning = true;
        //  最近的心跳时间
        private long lastHeartbeat;
        // 心跳间隔时间
        private long heartBeatInterval = 10 * 1000;
    
        public void run() {
            try {
                while (isRunning) {
                    HeartbeatHandler handler = RPClient.getRemoteProxyObj(HeartbeatHandler.class, new InetSocketAddress(serverIP, serverPort));
                    long startTime = System.currentTimeMillis();
                    // 是否达到发送心跳的周期时间
                    if (startTime - lastHeartbeat > heartBeatInterval) {
                        System.out.println("send a heart beat");
                        lastHeartbeat = startTime;
    
                        HeartbeatEntity entity = new HeartbeatEntity();
                        entity.setTime(startTime);
                        entity.setNodeID(nodeID);
    
                        // 向服务器发送心跳,并返回需要执行的命令
                        Cmder cmds = handler.sendHeartBeat(entity);
    
                        if (!processCommand(cmds))
                            continue;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        private boolean processCommand(Cmder cmds) {
            // ...
            return true;
        }
    
    }

          心跳包实体类:

    public class HeartbeatEntity implements Serializable {
    
        private long time;
        private String nodeID;
        private String error;
        private Map<String, Object> info = new HashMap<String, Object>();
    
        public String getNodeID() {
            return nodeID;
        }
    
        public void setNodeID(String nodeID) {
            this.nodeID = nodeID;
        }
    
        public String getError() {
            return error;
        }
    
        public void setError(String error) {
            this.error = error;
        }
    
        public Map<String, Object> getInfo() {
            return info;
        }
    
        public void setInfo(Map<String, Object> info) {
            this.info = info;
        }
    
        public long getTime() {
            return time;
        }
    
        public void setTime(long time) {
            this.time = time;
        }
    }
    

      服务器接受心跳包返回的命令对象类:

    public class Cmder implements Serializable {
    
        private String nodeID;
        private String error;
        private Map<String, Object> info = new HashMap<String, Object>();
    
        public String getNodeID() {
            return nodeID;
        }
    
        public void setNodeID(String nodeID) {
            this.nodeID = nodeID;
        }
    
        public String getError() {
            return error;
        }
    
        public void setError(String error) {
            this.error = error;
        }
    
        public Map<String, Object> getInfo() {
            return info;
        }
    
        public void setInfo(Map<String, Object> info) {
            this.info = info;
        }
    }

      RPC服务注册中心:

    public class ServiceCenter {
    
        private ExecutorService executor = Executors.newFixedThreadPool(20);
    
        private final ConcurrentHashMap<String, Class> serviceRegistry = new ConcurrentHashMap<String, Class>();
    
        private AtomicBoolean isRunning = new AtomicBoolean(true);
    
        // 服务器监听端口
        private int port = 8089;
    
        // 心跳监听器
        HeartbeatLinstener linstener;
    
        // 单例模式
        private static class SingleHolder {
            private static final ServiceCenter INSTANCE = new ServiceCenter();
        }
    
        private ServiceCenter() {
        }
    
        public static ServiceCenter getInstance() {
            return SingleHolder.INSTANCE;
        }
    
        public void register(Class serviceInterface, Class impl) {
            System.out.println("regeist service " + serviceInterface.getName());
            serviceRegistry.put(serviceInterface.getName(), impl);
        }
    
        public void start() throws IOException {
            ServerSocket server = new ServerSocket();
            server.bind(new InetSocketAddress(port));
            System.out.println("start server");
            linstener = HeartbeatLinstener.getInstance();
            System.out.println("start listen heart beat");
            try {
                while (true) {
                    // 1.监听客户端的TCP连接,接到TCP连接后将其封装成task,由线程池执行
                    executor.execute(new ServiceTask(server.accept()));
                }
            } finally {
                server.close();
            }
        }
    
        public void stop() {
            isRunning.set(false);
            executor.shutdown();
        }
    
    
        public boolean isRunning() {
            return isRunning.get();
        }
    
        public int getPort() {
            return port;
        }
    
        public void settPort(int port) {
            this.port = port;
        }
    
        public ConcurrentHashMap<String, Class> getServiceRegistry() {
            return serviceRegistry;
        }
    
        private class ServiceTask implements Runnable {
            Socket clent = null;
    
            public ServiceTask(Socket client) {
                this.clent = client;
            }
    
            public void run() {
                ObjectInputStream input = null;
                ObjectOutputStream output = null;
                try {
                    // 2.将客户端发送的码流反序列化成对象,反射调用服务实现者,获取执行结果
                    input = new ObjectInputStream(clent.getInputStream());
                    String serviceName = input.readUTF();
                    String methodName = input.readUTF();
                    Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
                    Object[] arguments = (Object[]) input.readObject();
                    Class serviceClass = serviceRegistry.get(serviceName);
                    if (serviceClass == null) {
                        throw new ClassNotFoundException(serviceName + " not found");
                    }
                    Method method = serviceClass.getMethod(methodName, parameterTypes);
                    Object result = method.invoke(serviceClass.newInstance(), arguments);
    
                    // 3.将执行结果反序列化,通过socket发送给客户端
                    output = new ObjectOutputStream(clent.getOutputStream());
                    output.writeObject(result);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    if (output != null) {
                        try {
                            output.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                    if (input != null) {
                        try {
                            input.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                    if (clent != null) {
                        try {
                            clent.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
    
            }
        }
    }
    
    

      心跳监听类:

    package com.cang.heartbeat;
    
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.lang.reflect.Method;
    import java.net.InetSocketAddress;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.Iterator;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    /**
     * 心跳监听保存信息
     *
     * @author cang
     * @create_time 2016-09-28 11:40
     */
    public class HeartbeatLinstener {
    
        private ExecutorService executor = Executors.newFixedThreadPool(20);
    
        private final ConcurrentHashMap<String, Object> nodes = new ConcurrentHashMap<String, Object>();
        private final ConcurrentHashMap<String, Long> nodeStatus = new ConcurrentHashMap<String, Long>();
    
        private long timeout = 10 * 1000;
    
        // 服务器监听端口
        private int port = 8089;
    
        // 单例模式
        private static class SingleHolder {
            private static final HeartbeatLinstener INSTANCE = new HeartbeatLinstener();
        }
    
        private HeartbeatLinstener() {
        }
    
        public static HeartbeatLinstener getInstance() {
            return SingleHolder.INSTANCE;
        }
    
        public ConcurrentHashMap<String, Object> getNodes() {
            return nodes;
        }
    
        public void registerNode(String nodeId, Object nodeInfo) {
            nodes.put(nodeId, nodeInfo);
            nodeStatus.put(nodeId, System.currentTimeMillis());
        }
    
        public void removeNode(String nodeID) {
            if (nodes.containsKey(nodeID)) {
                nodes.remove(nodeID);
            }
        }
    
        // 检测节点是否有效
        public boolean checkNodeValid(String key) {
            if (!nodes.containsKey(key) || !nodeStatus.containsKey(key)) return false;
            if ((System.currentTimeMillis() - nodeStatus.get(key)) > timeout) return false;
            return true;
        }
    
        // 删除所有失效节点
        public void removeInValidNode() {
            Iterator<Map.Entry<String, Long>> it = nodeStatus.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<String, Long> e = it.next();
                if ((System.currentTimeMillis() - nodeStatus.get(e.getKey())) > timeout) {
                    nodes.remove(e.getKey());
                }
            }
        }
    
    }
    
    

      心跳处理类接口:

    public interface HeartbeatHandler {
        public Cmder sendHeartBeat(HeartbeatEntity info);
    }
    

       心跳处理实现类:

    public class HeartbeatHandlerImpl implements HeartbeatHandler {
        public Cmder sendHeartBeat(HeartbeatEntity info) {
            HeartbeatLinstener linstener = HeartbeatLinstener.getInstance();
    
            // 添加节点
            if (!linstener.checkNodeValid(info.getNodeID())) {
                linstener.registerNode(info.getNodeID(), info);
            }
    
            // 其他操作
            Cmder cmder = new Cmder();
           cmder.setNodeID(info.getNodeID());
            // ...
    
            System.out.println("current all the nodes: ");
            Map<String, Object> nodes = linstener.getNodes();
            for (Map.Entry e : nodes.entrySet()) {
                System.out.println(e.getKey() + " : " + e.getValue());
            }
            System.out.println("hadle a heartbeat");
            return cmder;
        }
    }

      测试类:

    public class HeartbeatTest {
    
        public static void main(String[] args) {
            new Thread(new Runnable() {
                public void run() {
                    try {
                        ServiceCenter serviceServer = ServiceCenter.getInstance();
                        serviceServer.register(HeartbeatHandler.class, HeartbeatHandlerImpl.class);
                        serviceServer.start();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
            Thread client1 = new Thread(new HeartbeatClient());
            client1.start();
            Thread client2 = new Thread(new HeartbeatClient());
            client2.start();
        }
    }
    

    四、总结

        上面的代码还有很多不足的地方,希望有空能进行改善:

    1.  配置为硬编码;
    2.  命令类Cmder没有实际实现,返回的Cmder对象没有实际进行处理;

       其他小问题就暂时不管了,希望以后能重写上面的代码。

  • 相关阅读:
    28. css样式中px转rem
    27.用webpack自搭react和vue框架
    26.webpack 入门
    25.redux回顾,redux中的action函数异步
    24.redux
    23.react-router 路由
    22.2、react生命周期与react脚手架(二)
    22.1 、react生命周期(一)
    21.react 组件通信
    const关键字的作用
  • 原文地址:https://www.cnblogs.com/codingexperience/p/5939059.html
Copyright © 2011-2022 走看看