zoukankan      html  css  js  c++  java
  • Netty--数据通信和心跳检测

    数据通信

    概述:

    netty的ReadTimeOut实现方案3

    服务端:

    public class Server {
    
        public static void main(String[] args) throws Exception{
            
            EventLoopGroup pGroup = new NioEventLoopGroup();
            EventLoopGroup cGroup = new NioEventLoopGroup();
            
            ServerBootstrap b = new ServerBootstrap();
            b.group(pGroup, cGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 1024)
             //设置日志
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel sc) throws Exception {
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                    sc.pipeline().addLast(new ReadTimeoutHandler(5)); 
                    sc.pipeline().addLast(new ServerHandler());
                }
            });
            
            ChannelFuture cf = b.bind(8765).sync();
            
            cf.channel().closeFuture().sync();
            pGroup.shutdownGracefully();
            cGroup.shutdownGracefully();
            
        }
    }
    View Code

    主要是加入sc.pipeline().addLast(new ReadTimeoutHandler(5)); 

    客户端:

    public class Client {
    
        private static class SingletonHolder {
            static final Client instance = new Client();
        }
    
        public static Client getInstance() {
            return SingletonHolder.instance;
        }
    
        private EventLoopGroup group;
        private Bootstrap b;
        private ChannelFuture cf;
    
        private Client() {
            group = new NioEventLoopGroup();
            b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                            sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                            // 超时handler(当服务器端与客户端在指定时间以上没有任何进行通信,则会关闭响应的通道,主要为减小服务端资源占用)
                            sc.pipeline().addLast(new ReadTimeoutHandler(5));
                            sc.pipeline().addLast(new ClientHandler());
                        }
                    });
        }
    
        public void connect() {
            try {
                this.cf = b.connect("127.0.0.1", 8765).sync();
                System.out.println("远程服务器已经连接, 可以进行数据交换..");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public ChannelFuture getChannelFuture() {
    
            if (this.cf == null) {
                this.connect();
            }
            if (!this.cf.channel().isActive()) {
                this.connect();
            }
    
            return this.cf;
        }
    
        public static void main(String[] args) throws Exception {
            final Client c = Client.getInstance();
            // c.connect();
    
            ChannelFuture cf = c.getChannelFuture();
            for (int i = 1; i <= 3; i++) {
                Request request = new Request();
                request.setId("" + i);
                request.setName("pro" + i);
                request.setRequestMessage("数据信息" + i);
                cf.channel().writeAndFlush(request);
                TimeUnit.SECONDS.sleep(4);
            }
    
            cf.channel().closeFuture().sync();
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("进入子线程...");
                        ChannelFuture cf = c.getChannelFuture();
                        System.out.println(cf.channel().isActive());
                        System.out.println(cf.channel().isOpen());
    
                        // 再次发送数据
                        Request request = new Request();
                        request.setId("" + 4);
                        request.setName("pro" + 4);
                        request.setRequestMessage("数据信息" + 4);
                        cf.channel().writeAndFlush(request);
                        cf.channel().closeFuture().sync();
                        System.out.println("子线程结束.");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
    
            System.out.println("断开连接,主线程结束..");
    
        }
    
    }
    View Code

    主要看getChannelFuture这个方法,this.cf == null是第一次连接的时候用到的,!this.cf.channel().isActive() 是连接超时后重新发起连接用到的。

    其他的代码:

    public class ClientHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                Response resp = (Response) msg;
                System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    
    }
    
    
    public class ServerHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Request request = (Request) msg;
            System.out
                    .println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage());
            Response response = new Response();
            response.setId(request.getId());
            response.setName("response" + request.getId());
            response.setResponseMessage("响应内容" + request.getId());
            ctx.writeAndFlush(response);// .addListener(ChannelFutureListener.CLOSE);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    
    }
    
    
    public final class MarshallingCodeCFactory {
    
        /**
         * 创建Jboss Marshalling解码器MarshallingDecoder
         * @return MarshallingDecoder
         */
        public static MarshallingDecoder buildMarshallingDecoder() {
            //首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
            final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            //创建了MarshallingConfiguration对象,配置了版本号为5 
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            //根据marshallerFactory和configuration创建provider
            UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
            //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
            MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
            return decoder;
        }
    
        /**
         * 创建Jboss Marshalling编码器MarshallingEncoder
         * @return MarshallingEncoder
         */
        public static MarshallingEncoder buildMarshallingEncoder() {
            final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
            //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
            MarshallingEncoder encoder = new MarshallingEncoder(provider);
            return encoder;
        }
    }
    
    
    public class Request implements Serializable {
    
        private static final long serialVersionUID = 1L;
    
        private String id;
        private String name;
        private String requestMessage;
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public String getRequestMessage() {
            return requestMessage;
        }
    
        public void setRequestMessage(String requestMessage) {
            this.requestMessage = requestMessage;
        }
    
    }
    
    
    public class Response implements Serializable {
    
        private static final long serialVersionUID = 1L;
    
        private String id;
        private String name;
        private String responseMessage;
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public String getResponseMessage() {
            return responseMessage;
        }
    
        public void setResponseMessage(String responseMessage) {
            this.responseMessage = responseMessage;
        }
    
    }
    View Code

     心跳检测

    概述:

    代码示例:

    public class Server {
    
        public static void main(String[] args) throws Exception {
    
            EventLoopGroup pGroup = new NioEventLoopGroup();
            EventLoopGroup cGroup = new NioEventLoopGroup();
    
            ServerBootstrap b = new ServerBootstrap();
            b.group(pGroup, cGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
                    // 设置日志
                    .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel sc) throws Exception {
                            sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                            sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                            sc.pipeline().addLast(new ServerHeartBeatHandler());
                        }
                    });
    
            ChannelFuture cf = b.bind(8765).sync();
    
            cf.channel().closeFuture().sync();
            pGroup.shutdownGracefully();
            cGroup.shutdownGracefully();
    
        }
    }
    
    
    public class ServerHeartBeatHandler extends ChannelHandlerAdapter {
    
        /** key:ip value:auth */
        private static HashMap<String, String> AUTH_IP_MAP = new HashMap<String, String>();
        private static final String SUCCESS_KEY = "auth_success_key";
    
        static {
            AUTH_IP_MAP.put("192.168.1.200", "1234");
        }
    
        private boolean auth(ChannelHandlerContext ctx, Object msg) {
            // System.out.println(msg);
            String[] ret = ((String) msg).split(",");
            String auth = AUTH_IP_MAP.get(ret[0]);
            if (auth != null && auth.equals(ret[1])) {
                ctx.writeAndFlush(SUCCESS_KEY);
                return true;
            } else {
                ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE);
                return false;
            }
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof String) {
                auth(ctx, msg);
            } else if (msg instanceof RequestInfo) {
    
                RequestInfo info = (RequestInfo) msg;
                System.out.println("--------------------------------------------");
                System.out.println("当前主机ip为: " + info.getIp());
                System.out.println("当前主机cpu情况: ");
                HashMap<String, Object> cpu = info.getCpuPercMap();
                System.out.println("总使用率: " + cpu.get("combined"));
                System.out.println("用户使用率: " + cpu.get("user"));
                System.out.println("系统使用率: " + cpu.get("sys"));
                System.out.println("等待率: " + cpu.get("wait"));
                System.out.println("空闲率: " + cpu.get("idle"));
    
                System.out.println("当前主机memory情况: ");
                HashMap<String, Object> memory = info.getMemoryMap();
                System.out.println("内存总量: " + memory.get("total"));
                System.out.println("当前内存使用量: " + memory.get("used"));
                System.out.println("当前内存剩余量: " + memory.get("free"));
                System.out.println("--------------------------------------------");
    
                ctx.writeAndFlush("info received!");
            } else {
                ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE);
            }
        }
    
    }
    
    
    public class Client {
    
        public static void main(String[] args) throws Exception {
    
            EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                    sc.pipeline().addLast(new ClienHeartBeattHandler());
                }
            });
    
            ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
    
            cf.channel().closeFuture().sync();
            group.shutdownGracefully();
        }
    }
    
    
    public class ClienHeartBeattHandler extends ChannelHandlerAdapter {
    
        private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    
        private ScheduledFuture<?> heartBeat;
        // 主动向服务器发送认证信息
        private InetAddress addr;
    
        private static final String SUCCESS_KEY = "auth_success_key";
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            addr = InetAddress.getLocalHost();
            String ip = addr.getHostAddress();
            String key = "1234";
            // 证书
            String auth = ip + "," + key;
            ctx.writeAndFlush(auth);
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                if (msg instanceof String) {
                    String ret = (String) msg;
                    if (SUCCESS_KEY.equals(ret)) {
                        // 握手成功,主动发送心跳消息
                        this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 2,
                                TimeUnit.SECONDS);
                        System.out.println(msg);
                    } else {
                        System.out.println(msg);
                    }
                }
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    
        private class HeartBeatTask implements Runnable {
            private final ChannelHandlerContext ctx;
    
            public HeartBeatTask(final ChannelHandlerContext ctx) {
                this.ctx = ctx;
            }
    
            @Override
            public void run() {
                try {
                    RequestInfo info = new RequestInfo();
                    // ip
                    info.setIp(addr.getHostAddress());
                    Sigar sigar = new Sigar();
                    // cpu prec
                    CpuPerc cpuPerc = sigar.getCpuPerc();
                    HashMap<String, Object> cpuPercMap = new HashMap<String, Object>();
                    cpuPercMap.put("combined", cpuPerc.getCombined());
                    cpuPercMap.put("user", cpuPerc.getUser());
                    cpuPercMap.put("sys", cpuPerc.getSys());
                    cpuPercMap.put("wait", cpuPerc.getWait());
                    cpuPercMap.put("idle", cpuPerc.getIdle());
                    // memory
                    Mem mem = sigar.getMem();
                    HashMap<String, Object> memoryMap = new HashMap<String, Object>();
                    memoryMap.put("total", mem.getTotal() / 1024L);
                    memoryMap.put("used", mem.getUsed() / 1024L);
                    memoryMap.put("free", mem.getFree() / 1024L);
                    info.setCpuPercMap(cpuPercMap);
                    info.setMemoryMap(memoryMap);
                    ctx.writeAndFlush(info);
    
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                cause.printStackTrace();
                if (heartBeat != null) {
                    heartBeat.cancel(true);
                    heartBeat = null;
                }
                ctx.fireExceptionCaught(cause);
            }
    
        }
    }
    
    
    public final class MarshallingCodeCFactory {
    
        /**
         * 创建Jboss Marshalling解码器MarshallingDecoder
         * 
         * @return MarshallingDecoder
         */
        public static MarshallingDecoder buildMarshallingDecoder() {
            // 首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
            final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            // 创建了MarshallingConfiguration对象,配置了版本号为5
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            // 根据marshallerFactory和configuration创建provider
            UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
            // 构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
            MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
            return decoder;
        }
    
        /**
         * 创建Jboss Marshalling编码器MarshallingEncoder
         * 
         * @return MarshallingEncoder
         */
        public static MarshallingEncoder buildMarshallingEncoder() {
            final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
            // 构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
            MarshallingEncoder encoder = new MarshallingEncoder(provider);
            return encoder;
        }
    }
    
    
    public class RequestInfo implements Serializable {
    
        private String ip;
        private HashMap<String, Object> cpuPercMap;
        private HashMap<String, Object> memoryMap;
        // .. other field
    
        public String getIp() {
            return ip;
        }
    
        public void setIp(String ip) {
            this.ip = ip;
        }
    
        public HashMap<String, Object> getCpuPercMap() {
            return cpuPercMap;
        }
    
        public void setCpuPercMap(HashMap<String, Object> cpuPercMap) {
            this.cpuPercMap = cpuPercMap;
        }
    
        public HashMap<String, Object> getMemoryMap() {
            return memoryMap;
        }
    
        public void setMemoryMap(HashMap<String, Object> memoryMap) {
            this.memoryMap = memoryMap;
        }
    
    }
    View Code

    当client刚刚连接的时候,会发送认证信息到server端认证,认证通过后再定时发送心跳包。

  • 相关阅读:
    AjaxMethod js调用后台方法
    鼠标点击清空文本框 失去焦点显示提示信息
    js屏蔽BackSpace 返回上一页
    IE8标准模式打开网页
    Windows无法启动SQL server 代理服务(位于本地计算机上)错误1067:进程意外终止
    遍历枚举,添加进DropDownist
    文本框只能输入数字
    个人开发框架总结(五)
    从Power Design设计文档中提取Model
    FaibClass.WebControls控件详解(一)
  • 原文地址:https://www.cnblogs.com/lostyears/p/8482450.html
Copyright © 2011-2022 走看看