zoukankan      html  css  js  c++  java
  • Netty Client和Server端实现

    本文基于Nett4.0.26.Final版本浅析Client与Server端通讯,先看服务器端:

    public class Server {
    
        public static void run(int port) {
            /**Netty创建ServerSocketChannel,默认SelectionKey.OP_ACCEPT*/
            EventLoopGroup boss = new NioEventLoopGroup();
            EventLoopGroup worker = new NioEventLoopGroup();
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(boss, worker)
                         .channel(NioServerSocketChannel.class)                // 设置Channel Type
                         .option(ChannelOption.SO_BACKLOG, 1024)            // 设置Channel属性
                         .childHandler(new ChannelInitializer<SocketChannel>() {
    
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                                pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                                pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
                                pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
                                pipeline.addLast(new SimpleChannelHandler());
                            }
                        });            
                /**服务器端绑定监听端口并对Channel进行初始化
                 * 1-ChannelConfig由ChannelOption初始化
                 * 2-ChannelPipeline(默认DefaultChannelPipeline)添加ChannelHandler
                 * 3-注册Channel并添加监听器ChannelFutureListener.CLOSE_ON_FAILURE
                 * 以异步的方式等待上述操作的完成
                 * */
                ChannelFuture channelFuture = bootstrap.bind(port).sync();
                if (channelFuture.isDone()) {
                    System.out.println(String.format("server bind port %s sucess", port));
                }
                /**CloseFuture异步方式关闭*/
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
    
        public static void main(String []args) {
            Server.run(8080);
        }
    }
    
    public class SimpleChannelHandler implements ChannelInboundHandler {
    
        private static final Gson GSON = new GsonBuilder().create();
        
        /**
         * the method called when new connect come
         * */
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            System.out.println(String.format("last channel handler [%s] add", ctx.pipeline().last().getClass().getSimpleName()));
        }
    
        /**
         * the method called when client close connect 
         * */
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            ctx.disconnect(ctx.newPromise());
            ctx.close();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
        }
    
        /**
         * register port for connect channel
         * */
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            String connect = ctx.channel().remoteAddress().toString().substring(1);
            System.out.println(String.format("remote connecter address %s", connect));
        }
    
        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Request req = GSON.fromJson(String.valueOf(msg), Request.class);
            String json = GSON.toJson(new Response(String.format("server get client status [%s]", req.getStatus()), new Random().nextInt(10)));
            ctx.write(json);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            
        }
    
        @Override
        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
            
        }
    
    }

    服务器端的ChannelHandler的handlerRemoved方法是当客户端关闭链接时该方法被触发,服务器应当关闭当前与客户端的连接,完成TCP的四次挥手过程。

    客户端的实现:

    public class Client {
    
    	public static void run(String host, int port) {
    		EventLoopGroup group = new NioEventLoopGroup();
    		try {
    			Bootstrap bootstrap = new Bootstrap();
    			bootstrap.group(group)
    					 .channel(NioSocketChannel.class)
    					 .option(ChannelOption.TCP_NODELAY, true)
    					 .handler(new ChannelInitializer<SocketChannel>() {
    
    						@Override
    						protected void initChannel(SocketChannel ch) throws Exception {
    							ChannelPipeline pipeline = ch.pipeline();
    							pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
    							pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
    							pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
    							pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
    							pipeline.addLast(new SimpleClientChannelHandler());
    						}
    						
    					});
    			/**客户端向服务器发起连接请求
    			 * 1-ChannelConfig由ChannelOption初始化
    			 * 2-ChannelPipeline(默认DefaultChannelPipeline)添加ChannelHandler
    			 * 3-注册Channel并添加监听器ChannelFutureListener.CLOSE_ON_FAILURE
    			 * 以异步的方式等待上述操作的完成
    			 * */
    			ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
    			if (channelFuture.isSuccess()) {
    				System.out.println(String.format("connect server(%s:%s) sucess", host, port));
    			}
    			channelFuture.channel().closeFuture().sync();
    			System.out.println("client close sucess");
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		} finally {
    			group.shutdownGracefully();
    		}
    	}
    	
    	public static void main(String []args) {
    		for (int i = 0 ; i < 3 ; ++i) {
    			Client.run("127.0.0.1", 8080);
    			System.out.println();
    		}
    //		Client.run("127.0.0.1", 8080);
    	}
    }
    
    
    public class SimpleClientChannelHandler implements ChannelInboundHandler{
    
    	private static final Gson GSON = new GsonBuilder().create();
    	
    	/**
    	 * the method called when client add channel handler(1)
    	 * */
    	public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    		ChannelHandler channelHandler = ctx.channel().pipeline().last();
    		System.out.println("client last channel handle " + channelHandler.getClass().getSimpleName());
    	}
    
    	/**
    	 * the method called when server disconnect 
    	 * */
    	public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    		Channel ch = ctx.channel();
    		SocketAddress local = ch.localAddress();
    		SocketAddress remote = ch.remoteAddress();
    		System.out.println(String.format("server(%s) diconnect and client(%s) close connect", remote.toString().substring(1), local.toString().substring(1)));
    		ctx.close();
    	}
    
    	/**
    	 * the method called for register port before connect server(2)
    	 * */
    	public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    		System.out.println("client start to register port");
    	}
    
    	@Override
    	public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
    		
    	}
    
    	/**
    	 * the method called when channel active(3)
    	 * */
    	public void channelActive(ChannelHandlerContext ctx) throws Exception {
    		String json = GSON.toJson(new Request("client status", new Random().nextInt(10)));
    		ctx.writeAndFlush(json);
    		System.out.println(String.format("connect established and send to server message [%s]", json));
    	}
    
    	@Override
    	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    		
    	}
    
    	/**
    	 * close after receive response from server(server also should close connect)
    	 * */
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		System.out.println(String.format("client receive message [%s]", String.valueOf(msg)));
    		ctx.disconnect(ctx.newPromise());
    	}
    
    	@Override
    	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    		System.out.println("77777");
    	}
    
    	@Override
    	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    		System.out.println("88888");
    	}
    
    	@Override
    	public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
    		System.out.println("99999");
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		cause.printStackTrace();
    	}
    
    }
    

      在客户端的ChannelHandler中有几个关键方法:

    channelActive方法:客户端与服务器建立连接且Channel被激活时该方法被调用,本文在客户端与服务器端建立连接就绪时向服务器发送数据

    channelRead方法:当服务器端有数据发送时方法被调用,本文在收到服务器端响应时关闭当前连接(此时服务器端的handlerRemoved方法被调用)

     handlerRemoved方法:当服务器确认断开连接时该方法被调用,客户端应关闭Channel(TCP四次挥手结束)

  • 相关阅读:
    668. Kth Smallest Number in Multiplication Table
    658. Find K Closest Elements
    483. Smallest Good Base
    475. Heaters
    454. 4Sum II
    441. Arranging Coins
    436. Find Right Interval
    410. Split Array Largest Sum
    392. Is Subsequence
    378. Kth Smallest Element in a Sorted Matrix
  • 原文地址:https://www.cnblogs.com/hanfight/p/4851972.html
Copyright © 2011-2022 走看看