zoukankan      html  css  js  c++  java
  • Netty Client和Server端实现

    本文基于Nett4.0.26.Final版本浅析Client与Server端通讯,先看服务器端:

    public class Server {
    
        public static void run(int port) {
            /**Netty创建ServerSocketChannel,默认SelectionKey.OP_ACCEPT*/
            EventLoopGroup boss = new NioEventLoopGroup();
            EventLoopGroup worker = new NioEventLoopGroup();
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(boss, worker)
                         .channel(NioServerSocketChannel.class)                // 设置Channel Type
                         .option(ChannelOption.SO_BACKLOG, 1024)            // 设置Channel属性
                         .childHandler(new ChannelInitializer<SocketChannel>() {
    
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                                pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                                pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
                                pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
                                pipeline.addLast(new SimpleChannelHandler());
                            }
                        });            
                /**服务器端绑定监听端口并对Channel进行初始化
                 * 1-ChannelConfig由ChannelOption初始化
                 * 2-ChannelPipeline(默认DefaultChannelPipeline)添加ChannelHandler
                 * 3-注册Channel并添加监听器ChannelFutureListener.CLOSE_ON_FAILURE
                 * 以异步的方式等待上述操作的完成
                 * */
                ChannelFuture channelFuture = bootstrap.bind(port).sync();
                if (channelFuture.isDone()) {
                    System.out.println(String.format("server bind port %s sucess", port));
                }
                /**CloseFuture异步方式关闭*/
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
    
        public static void main(String []args) {
            Server.run(8080);
        }
    }
    
    public class SimpleChannelHandler implements ChannelInboundHandler {
    
        private static final Gson GSON = new GsonBuilder().create();
        
        /**
         * the method called when new connect come
         * */
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            System.out.println(String.format("last channel handler [%s] add", ctx.pipeline().last().getClass().getSimpleName()));
        }
    
        /**
         * the method called when client close connect 
         * */
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            ctx.disconnect(ctx.newPromise());
            ctx.close();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
        }
    
        /**
         * register port for connect channel
         * */
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            String connect = ctx.channel().remoteAddress().toString().substring(1);
            System.out.println(String.format("remote connecter address %s", connect));
        }
    
        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Request req = GSON.fromJson(String.valueOf(msg), Request.class);
            String json = GSON.toJson(new Response(String.format("server get client status [%s]", req.getStatus()), new Random().nextInt(10)));
            ctx.write(json);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            
        }
    
        @Override
        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
            
        }
    
    }

    服务器端的ChannelHandler的handlerRemoved方法是当客户端关闭链接时该方法被触发,服务器应当关闭当前与客户端的连接,完成TCP的四次挥手过程。

    客户端的实现:

    public class Client {
    
    	public static void run(String host, int port) {
    		EventLoopGroup group = new NioEventLoopGroup();
    		try {
    			Bootstrap bootstrap = new Bootstrap();
    			bootstrap.group(group)
    					 .channel(NioSocketChannel.class)
    					 .option(ChannelOption.TCP_NODELAY, true)
    					 .handler(new ChannelInitializer<SocketChannel>() {
    
    						@Override
    						protected void initChannel(SocketChannel ch) throws Exception {
    							ChannelPipeline pipeline = ch.pipeline();
    							pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
    							pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
    							pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
    							pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
    							pipeline.addLast(new SimpleClientChannelHandler());
    						}
    						
    					});
    			/**客户端向服务器发起连接请求
    			 * 1-ChannelConfig由ChannelOption初始化
    			 * 2-ChannelPipeline(默认DefaultChannelPipeline)添加ChannelHandler
    			 * 3-注册Channel并添加监听器ChannelFutureListener.CLOSE_ON_FAILURE
    			 * 以异步的方式等待上述操作的完成
    			 * */
    			ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
    			if (channelFuture.isSuccess()) {
    				System.out.println(String.format("connect server(%s:%s) sucess", host, port));
    			}
    			channelFuture.channel().closeFuture().sync();
    			System.out.println("client close sucess");
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		} finally {
    			group.shutdownGracefully();
    		}
    	}
    	
    	public static void main(String []args) {
    		for (int i = 0 ; i < 3 ; ++i) {
    			Client.run("127.0.0.1", 8080);
    			System.out.println();
    		}
    //		Client.run("127.0.0.1", 8080);
    	}
    }
    
    
    public class SimpleClientChannelHandler implements ChannelInboundHandler{
    
    	private static final Gson GSON = new GsonBuilder().create();
    	
    	/**
    	 * the method called when client add channel handler(1)
    	 * */
    	public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    		ChannelHandler channelHandler = ctx.channel().pipeline().last();
    		System.out.println("client last channel handle " + channelHandler.getClass().getSimpleName());
    	}
    
    	/**
    	 * the method called when server disconnect 
    	 * */
    	public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    		Channel ch = ctx.channel();
    		SocketAddress local = ch.localAddress();
    		SocketAddress remote = ch.remoteAddress();
    		System.out.println(String.format("server(%s) diconnect and client(%s) close connect", remote.toString().substring(1), local.toString().substring(1)));
    		ctx.close();
    	}
    
    	/**
    	 * the method called for register port before connect server(2)
    	 * */
    	public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    		System.out.println("client start to register port");
    	}
    
    	@Override
    	public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
    		
    	}
    
    	/**
    	 * the method called when channel active(3)
    	 * */
    	public void channelActive(ChannelHandlerContext ctx) throws Exception {
    		String json = GSON.toJson(new Request("client status", new Random().nextInt(10)));
    		ctx.writeAndFlush(json);
    		System.out.println(String.format("connect established and send to server message [%s]", json));
    	}
    
    	@Override
    	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    		
    	}
    
    	/**
    	 * close after receive response from server(server also should close connect)
    	 * */
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		System.out.println(String.format("client receive message [%s]", String.valueOf(msg)));
    		ctx.disconnect(ctx.newPromise());
    	}
    
    	@Override
    	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    		System.out.println("77777");
    	}
    
    	@Override
    	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    		System.out.println("88888");
    	}
    
    	@Override
    	public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
    		System.out.println("99999");
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		cause.printStackTrace();
    	}
    
    }
    

      在客户端的ChannelHandler中有几个关键方法:

    channelActive方法:客户端与服务器建立连接且Channel被激活时该方法被调用,本文在客户端与服务器端建立连接就绪时向服务器发送数据

    channelRead方法:当服务器端有数据发送时方法被调用,本文在收到服务器端响应时关闭当前连接(此时服务器端的handlerRemoved方法被调用)

     handlerRemoved方法:当服务器确认断开连接时该方法被调用,客户端应关闭Channel(TCP四次挥手结束)

  • 相关阅读:
    【零基础】极星9.5量化入门二:滚动止盈策略
    【零基础】极星9.5量化入门零:简单的开始
    今天分享下移动端rem 适配
    css超出内容省略号代替。
    今天给大家的小知识点是JS的一种排序方式---快速排序
    highcharts 的基本使用
    zTree jquery-zTree的基本使用
    克隆 JS克隆
    JS判断一个对象是不是数组的几种方式
    js 定时器实现倒计时
  • 原文地址:https://www.cnblogs.com/hanfight/p/4851972.html
Copyright © 2011-2022 走看看