zoukankan      html  css  js  c++  java
  • JavaNetty拆包粘包(二)

    netty 使用 tcp/ip 协议传输数据。而 tcp/ip 协议是类似水流一样的数据传输方式。多次 访问的时候有可能出现数据粘包的问题,解决这种问题的方式如下:

    定长数据流 

    客户端和服务器,提前协调好,每个消息长度固定。(如:长度 10)。如果客户端或服 务器写出的数据不足 10,则使用空白字符补足(如:使用空格)。 

    /**
     * 1. 单线程组
     * 2. Bootstrap配置启动信息
     * 3. 注册业务处理Handler
     * 4. connect连接服务,并发起请求
     */
    
    
    import java.nio.charset.Charset;
    import java.util.Scanner;
    import java.util.concurrent.TimeUnit;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.FixedLengthFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class Client4FixedLength {
    	
    	// 处理请求和处理服务端响应的线程组
    	private EventLoopGroup group = null;
    	// 服务启动相关配置信息
    	private Bootstrap bootstrap = null;
    	
    	public Client4FixedLength(){
    		init();
    	}
    	
    	private void init(){
    		group = new NioEventLoopGroup();
    		bootstrap = new Bootstrap();
    		// 绑定线程组
    		bootstrap.group(group);
    		// 设定通讯模式为NIO
    		bootstrap.channel(NioSocketChannel.class);
    	}
    	
    	public ChannelFuture doRequest(String host, int port) throws InterruptedException{
    		this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    
    			@Override
    			protected void initChannel(SocketChannel ch) throws Exception {
    				ChannelHandler[] handlers = new ChannelHandler[3];
    				handlers[0] = new FixedLengthFrameDecoder(3);
    				// 字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象
    				handlers[1] = new StringDecoder(Charset.forName("UTF-8"));
    				handlers[2] = new Client4FixedLengthHandler();
    				
    				ch.pipeline().addLast(handlers);
    			}
    		});
    		ChannelFuture future = this.bootstrap.connect(host, port).sync();
    		return future;
    	}
    	
    	public void release(){
    		this.group.shutdownGracefully();
    	}
    	
    	public static void main(String[] args) {
    		Client4FixedLength client = null;
    		ChannelFuture future = null;
    		try{
    			client = new Client4FixedLength();
    			
    			future = client.doRequest("localhost", 9999);
    			
    			Scanner s = null;
    			while(true){
    				s = new Scanner(System.in);
    				System.out.print("enter message send to server > ");
    				String line = s.nextLine();
    				byte[] bs = new byte[5];
    				byte[] temp = line.getBytes("UTF-8");
    				if(temp.length <= 5){
    					for(int i = 0; i < temp.length; i++){
    						bs[i] = temp[i];
    					}
    				}
    				future.channel().writeAndFlush(Unpooled.copiedBuffer(bs));
    				TimeUnit.SECONDS.sleep(1);
    			}
    		}catch(Exception e){
    			e.printStackTrace();
    		}finally{
    			if(null != future){
    				try {
    					future.channel().closeFuture().sync();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    			if(null != client){
    				client.release();
    			}
    		}
    	}
    	
    }
    

      

    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.ReferenceCountUtil;
    
    public class Client4FixedLengthHandler extends ChannelHandlerAdapter {
    
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		try{
    			String message = msg.toString();
    			System.out.println("from server : " + message);
    		}finally{
    			// 用于释放缓存。避免内存溢出
    			ReferenceCountUtil.release(msg);
    		}
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		System.out.println("client exceptionCaught method run...");
    		// cause.printStackTrace();
    		ctx.close();
    	}
    
    }
    

      

    /**
     * 1. 双线程组
     * 2. Bootstrap配置启动信息
     * 3. 注册业务处理Handler
     * 4. 绑定服务监听端口并启动服务
     */
    
    
    import java.nio.charset.Charset;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.FixedLengthFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class Server4FixedLength {
    	// 监听线程组,监听客户端请求
    	private EventLoopGroup acceptorGroup = null;
    	// 处理客户端相关操作线程组,负责处理与客户端的数据通讯
    	private EventLoopGroup clientGroup = null;
    	// 服务启动相关配置信息
    	private ServerBootstrap bootstrap = null;
    	public Server4FixedLength(){
    		init();
    	}
    	private void init(){
    		acceptorGroup = new NioEventLoopGroup();
    		clientGroup = new NioEventLoopGroup();
    		bootstrap = new ServerBootstrap();
    		// 绑定线程组
    		bootstrap.group(acceptorGroup, clientGroup);
    		// 设定通讯模式为NIO
    		bootstrap.channel(NioServerSocketChannel.class);
    		// 设定缓冲区大小
    		bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
    		// SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)
    		bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
    			.option(ChannelOption.SO_RCVBUF, 16*1024)
    			.option(ChannelOption.SO_KEEPALIVE, true);
    	}
    	public ChannelFuture doAccept(int port) throws InterruptedException{
    		
    		bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    
    			@Override
    			protected void initChannel(SocketChannel ch) throws Exception {
    				ChannelHandler[] acceptorHandlers = new ChannelHandler[3];
    				// 定长Handler。通过构造参数设置消息长度(单位是字节)。发送的消息长度不足可以使用空格补全。
    				acceptorHandlers[0] = new FixedLengthFrameDecoder(5);
    				// 字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象
    				acceptorHandlers[1] = new StringDecoder(Charset.forName("UTF-8"));
    				acceptorHandlers[2] = new Server4FixedLengthHandler();
    				ch.pipeline().addLast(acceptorHandlers);
    			}
    		});
    		ChannelFuture future = bootstrap.bind(port).sync();
    		return future;
    	}
    	public void release(){
    		this.acceptorGroup.shutdownGracefully();
    		this.clientGroup.shutdownGracefully();
    	}
    	
    	public static void main(String[] args){
    		ChannelFuture future = null;
    		Server4FixedLength server = null;
    		try{
    			server = new Server4FixedLength();
    			
    			future = server.doAccept(9999);
    			System.out.println("server started.");
    			future.channel().closeFuture().sync();
    		}catch(InterruptedException e){
    			e.printStackTrace();
    		}finally{
    			if(null != future){
    				try {
    					future.channel().closeFuture().sync();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    			
    			if(null != server){
    				server.release();
    			}
    		}
    	}
    	
    }
    

      

    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class Server4FixedLengthHandler extends ChannelHandlerAdapter {
    	
    	// 业务处理逻辑
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		String message = msg.toString();
    		System.out.println("from client : " + message.trim());
    		String line = "ok ";
    		ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
    	}
    	
    
    	// 异常处理逻辑
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		System.out.println("server exceptionCaught method run...");
    		// cause.printStackTrace();
    		ctx.close();
    	}
    
    }

    特殊结束符 

    客户端和服务器,协商定义一个特殊的分隔符号,分隔符号长度自定义。如:‘#’、‘$_$’、 ‘AA@’。在通讯的时候,只要没有发送分隔符号,则代表一条数据没有结束。

    import java.nio.charset.Charset;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class Server4Delimiter {
    	    // 监听线程组,监听客户端请求
    		private EventLoopGroup acceptorGroup = null;
    		// 处理客户端相关操作线程组,负责处理与客户端的数据通讯
    		private EventLoopGroup clientGroup = null;
    		// 服务启动相关配置信息
    		private ServerBootstrap bootstrap = null;
    		public Server4Delimiter(){
    			init();
    		}
    		public void init(){
    			acceptorGroup = new NioEventLoopGroup();
    			clientGroup = new NioEventLoopGroup();
    			bootstrap = new ServerBootstrap();
    			// 绑定线程组
    			bootstrap.group(acceptorGroup, clientGroup);
    			// 设定通讯模式为NIO
    			bootstrap.channel(NioServerSocketChannel.class);
    			// 设定缓冲区大小
    			bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
    			// SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)
    			bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
    				.option(ChannelOption.SO_RCVBUF, 16*1024)
    				.option(ChannelOption.SO_KEEPALIVE, true);
    		}
    
    		public ChannelFuture  doAccept(int port) throws InterruptedException{
    			bootstrap.childHandler(new ChannelInitializer<SocketChannel>(){
    				@Override
    				public void initChannel(SocketChannel ch) throws Exception{
    					// 数据分隔符, 定义的数据分隔符一定是一个ByteBuf类型的数据对象。
    					ByteBuf delimiter =Unpooled.copiedBuffer("$E$".getBytes());
    					ChannelHandler[] acceptorHandlers =new ChannelHandler[3];
    					// 处理固定结束标记符号的Handler。这个Handler没有@Sharable注解修饰,
    					// 必须每次初始化通道时创建一个新对象
    					// 使用特殊符号分隔处理数据粘包问题,也要定义每个数据包最大长度。netty建议数据有最大长度。
    					acceptorHandlers[0]=new DelimiterBasedFrameDecoder(1024, delimiter);
    					// 字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象
    					acceptorHandlers[1]=new StringDecoder(Charset.forName("UTF-8"));
    					acceptorHandlers[2]=new  Server4DelimiterHandler();
    					ch.pipeline().addLast(acceptorHandlers);
    				}
    			});
    			ChannelFuture  future =bootstrap.bind(port).sync();
    			return future;	
    		}
    		
    		public void release(){
    			this.acceptorGroup.shutdownGracefully();
    			this.clientGroup.shutdownGracefully();
    		}
    		
    		public static void main(String[] args) {
    			ChannelFuture future = null;
    			Server4Delimiter server = null;
    			try{
    				server = new Server4Delimiter();
    				
    				future = server.doAccept(9999);
    				System.out.println("server started.");
    				future.channel().closeFuture().sync();
    			}catch(InterruptedException e){
    				e.printStackTrace();
    			}finally{
    				if(null != future){
    					try {
    						future.channel().closeFuture().sync();
    					} catch (InterruptedException e) {
    						e.printStackTrace();
    					}
    				}
    				
    				if(null != server){
    					server.release();
    				}
    			}
    		}
    }
    

      

    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class Server4DelimiterHandler extends ChannelHandlerAdapter {
    	    // 业务处理逻辑
    		@Override
    		public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    			String message = msg.toString();
    			System.out.println("from client : " + message);
    			String line = "server message $E$ test delimiter handler!! $E$ second message $E$";
    			ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
    		}
    		
    		// 异常处理逻辑
    		@Override
    		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    			System.out.println("server exceptionCaught method run...");
    			// cause.printStackTrace();
    			ctx.close();
    		}
    }
    

      

    import java.nio.charset.Charset;
    import java.util.Scanner;
    import java.util.concurrent.TimeUnit;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class Client4Delimiter {
    	   // 处理请求和处理服务端响应的线程组
    		private EventLoopGroup group = null;
    		// 服务启动相关配置信息
    		private Bootstrap bootstrap = null;
    		
    		public Client4Delimiter(){
    			init();
    		}
    		
    		private void init(){
    			group = new NioEventLoopGroup();
    			bootstrap = new Bootstrap();
    			// 绑定线程组
    			bootstrap.group(group);
    			// 设定通讯模式为NIO
    			bootstrap.channel(NioSocketChannel.class);
    		}
    		
    		public ChannelFuture doRequest(String host, int port) throws InterruptedException{
    			this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    
    				@Override
    				protected void initChannel(SocketChannel ch) throws Exception {
    					// 数据分隔符
    					ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes());
    					ChannelHandler[] handlers = new ChannelHandler[3];
    					handlers[0] = new DelimiterBasedFrameDecoder(1024, delimiter);
    					// 字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象
    					handlers[1] = new StringDecoder(Charset.forName("UTF-8"));
    					handlers[2] = new Client4DelimiterHandler();
    					
    					ch.pipeline().addLast(handlers);
    				}
    			});
    			ChannelFuture future = this.bootstrap.connect(host, port).sync();
    			return future;
    		}
    		
    		public void release(){
    			this.group.shutdownGracefully();
    		}
    		
    		public static void main(String[] args) {
    			Client4Delimiter client = null;
    			ChannelFuture future = null;
    			try{
    				client = new Client4Delimiter();
    				
    				future = client.doRequest("localhost", 9999);
    				
    				Scanner s = null;
    				while(true){
    					s = new Scanner(System.in);
    					System.out.print("enter message send to server > ");
    					String line = s.nextLine();
    					future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
    					TimeUnit.SECONDS.sleep(1);
    				}
    			}catch(Exception e){
    				e.printStackTrace();
    			}finally{
    				if(null != future){
    					try {
    						future.channel().closeFuture().sync();
    					} catch (InterruptedException e) {
    						e.printStackTrace();
    					}
    				}
    				if(null != client){
    					client.release();
    				}
    			}
    		}
    }
    

      

    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.ReferenceCountUtil;
    
    public class Client4DelimiterHandler extends ChannelHandlerAdapter {
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		try{
    			String message = msg.toString();
    			System.out.println("from server : " + message);
    		}finally{
    			// 用于释放缓存。避免内存溢出
    			ReferenceCountUtil.release(msg);
    		}
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		System.out.println("client exceptionCaught method run...");
    		// cause.printStackTrace();
    		ctx.close();
    	}
    
    }
    

      

    协议 

    相对最成熟的数据传递方式。有服务器的开发者提供一个固定格式的协议标准。客户端 和服务器发送数据和接受数据的时候,都依据协议制定和解析消息。

     自定义协议格式:

    协议格式:
    HEADcontent-length:xxxxHEADBODYxxxxxxBODY
    

      

    /**
     * 1. 双线程组
     * 2. Bootstrap配置启动信息
     * 3. 注册业务处理Handler
     * 4. 绑定服务监听端口并启动服务
     */
    import java.nio.charset.Charset;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class Server4Protocol {
    	// 监听线程组,监听客户端请求
    	private EventLoopGroup acceptorGroup = null;
    	// 处理客户端相关操作线程组,负责处理与客户端的数据通讯
    	private EventLoopGroup clientGroup = null;
    	// 服务启动相关配置信息
    	private ServerBootstrap bootstrap = null;
    
    	public Server4Protocol() {
    		init();
    	}
    
    	private void init() {
    		acceptorGroup = new NioEventLoopGroup();
    		clientGroup = new NioEventLoopGroup();
    		bootstrap = new ServerBootstrap();
    		// 绑定线程组
    		bootstrap.group(acceptorGroup, clientGroup);
    		// 设定通讯模式为NIO
    		bootstrap.channel(NioServerSocketChannel.class);
    		// 设定缓冲区大小
    		bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
    		// SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)
    		bootstrap.option(ChannelOption.SO_SNDBUF, 16 * 1024).option(ChannelOption.SO_RCVBUF, 16 * 1024)
    				.option(ChannelOption.SO_KEEPALIVE, true);
    	}
    
    	public ChannelFuture doAccept(int port, final ChannelHandler... acceptorHandlers) throws InterruptedException {
    
    		bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    
    			@Override
    			protected void initChannel(SocketChannel ch) throws Exception {
    				ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
    				ch.pipeline().addLast(acceptorHandlers);
    			}
    		});
    		ChannelFuture future = bootstrap.bind(port).sync();
    		return future;
    	}
    
    	public void release() {
    		this.acceptorGroup.shutdownGracefully();
    		this.clientGroup.shutdownGracefully();
    	}
    
    	public static void main(String[] args) {
    		ChannelFuture future = null;
    		Server4Protocol server = null;
    		try {
    			server = new Server4Protocol();
    			future = server.doAccept(9999, new Server4ProtocolHandler());
    			System.out.println("server started.");
    
    			future.channel().closeFuture().sync();
    
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		} finally {
    			if(null != future){
    				try {
    					future.channel().closeFuture().sync();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    			
    			if(null != server){
    				server.release();
    			}
    
    		}
    	}
    
    }
    

      

    /**
     * @Sharable注解 - 
     *  代表当前Handler是一个可以分享的处理器。也就意味着,服务器注册此Handler后,可以分享给多个客户端同时使用。
     *  如果不使用注解描述类型,则每次客户端请求时,必须为客户端重新创建一个新的Handler对象。
     *  
     */
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandler.Sharable;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    
    @Sharable
    public class Server4ProtocolHandler extends ChannelHandlerAdapter {
    	// 业务处理逻辑
    		@Override
    		public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    			String message = msg.toString();
    			System.out.println("server receive protocol content : " + message);
    			message = ProtocolParser.parse(message);
    			if(null == message){
    				System.out.println("error request from client");
    				return ;
    			}
    			System.out.println("from client : " + message);
    			String line = "server message";
    			line = ProtocolParser.transferTo(line);
    			System.out.println("server send protocol content : " + line);
    			ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
    		}
    
    		// 异常处理逻辑
    		@Override
    		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    			System.out.println("server exceptionCaught method run...");
    			cause.printStackTrace();
    			ctx.close();
    		}
    		
    		static class ProtocolParser{
    			public static String parse(String message) {
    				String[] temp=message.split("HEADBODY");
    				temp[0]=temp[0].substring(4);
    				temp[1]=temp[1].substring(0,temp[1].length()-4);
    				int length=Integer.parseInt(temp[0].substring(temp[0].indexOf(":")+1));
    				if(length != temp[1].length()){
    					return null;
    				}
    				return temp[1];
    			}
    			public static String transferTo(String message){
    				message = "HEADcontent-length:" + message.length() + "HEADBODY" + message + "BODY";
    				return message;
    			}
    		}
    
    }
    

      

    import java.nio.charset.Charset;
    import java.util.Scanner;
    import java.util.concurrent.TimeUnit;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class Client4Protocol {
    
    	    // 处理请求和处理服务端响应的线程组
    		private EventLoopGroup group = null;
    		// 服务启动相关配置信息
    		private Bootstrap bootstrap = null;
    		
    		public Client4Protocol(){
    			init();
    		}
    		
    		private void init(){
    			group = new NioEventLoopGroup();
    			bootstrap = new Bootstrap();
    			// 绑定线程组
    			bootstrap.group(group);
    			// 设定通讯模式为NIO
    			bootstrap.channel(NioSocketChannel.class);
    		}
    		
    		public ChannelFuture doRequest(String host, int port, final ChannelHandler... handlers) throws InterruptedException{
    			this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    
    				@Override
    				protected void initChannel(SocketChannel ch) throws Exception {
    					ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
    					ch.pipeline().addLast(handlers);
    				}
    			});
    			ChannelFuture future = this.bootstrap.connect(host, port).sync();
    			return future;
    		}
    		
    		public void release(){
    			this.group.shutdownGracefully();
    		}
    		
    		public static void main(String[] args) {
    			Client4Protocol client = null;
    			ChannelFuture future = null;
    			try{
    				client = new Client4Protocol();
    				future = client.doRequest("localhost", 9999, new Client4ProtocolHandler());
    				
    				Scanner s = null;
    				while(true){
    					s = new Scanner(System.in);
    					System.out.print("enter message send to server > ");
    					String line = s.nextLine();
    					line = Client4ProtocolHandler.ProtocolParser.transferTo(line);
    					System.out.println("client send protocol content : " + line);
    					future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
    					TimeUnit.SECONDS.sleep(1);
    				}
    			}catch(Exception e){
    				e.printStackTrace();
    			}finally{
    				if(null != future){
    					try {
    						future.channel().closeFuture().sync();
    					} catch (InterruptedException e) {
    						e.printStackTrace();
    					}
    				}
    				if(null != client){
    					client.release();
    				}
    			}
    		}
    }
    

      

    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.ReferenceCountUtil;
    
    public class Client4ProtocolHandler extends ChannelHandlerAdapter {
    	
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		try{
    			String message = msg.toString();
    			System.out.println("client receive protocol content : " + message);
    			message = ProtocolParser.parse(message);
    			if(null == message){
    				System.out.println("error response from server");
    				return ;
    			}
    			System.out.println("from server : " + message);
    		}finally{
    			// 用于释放缓存。避免内存溢出
    			ReferenceCountUtil.release(msg);
    		}
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		System.out.println("client exceptionCaught method run...");
    		// cause.printStackTrace();
    		ctx.close();
    	}
    
    	static class ProtocolParser{
    		public static String parse(String message){
    			String[] temp = message.split("HEADBODY");
    			temp[0] = temp[0].substring(4);
    			temp[1] = temp[1].substring(0, (temp[1].length()-4));
    			int length = Integer.parseInt(temp[0].substring(temp[0].indexOf(":")+1));
    			if(length != temp[1].length()){
    				return null;
    			}
    			return temp[1];
    		}
    		public static String transferTo(String message){
    			message = "HEADcontent-length:" + message.length() + "HEADBODY" + message + "BODY";
    			return message;
    		}
    	}
    
    }
    

      

     序列化对象 

    JBoss Marshalling 序列化

    Java 是面向对象的开发语言。传递的数据如果是 Java 对象,应该是最方便且可靠。 

    /**
     * 1. 双线程组
     * 2. Bootstrap配置启动信息
     * 3. 注册业务处理Handler
     * 4. 绑定服务监听端口并启动服务
     */
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import utils.SerializableFactory4Marshalling;
    
    public class Server4Serializable {
    	   // 监听线程组,监听客户端请求
    		private EventLoopGroup acceptorGroup = null;
    		// 处理客户端相关操作线程组,负责处理与客户端的数据通讯
    		private EventLoopGroup clientGroup = null;
    		// 服务启动相关配置信息
    		private ServerBootstrap bootstrap = null;
    		public Server4Serializable(){
    			init();
    		}
    		
    		private void init(){
    			acceptorGroup = new NioEventLoopGroup();
    			clientGroup = new NioEventLoopGroup();
    			bootstrap = new ServerBootstrap();
    			// 绑定线程组
    			bootstrap.group(acceptorGroup, clientGroup);
    			// 设定通讯模式为NIO
    			bootstrap.channel(NioServerSocketChannel.class);
    			// 设定缓冲区大小
    			bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
    			// SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)
    			bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
    				.option(ChannelOption.SO_RCVBUF, 16*1024)
    				.option(ChannelOption.SO_KEEPALIVE, true);
    		}
    		
    		public ChannelFuture doAccept(int port, final ChannelHandler... acceptorHandlers) throws InterruptedException{
    			bootstrap.childHandler(new ChannelInitializer<SocketChannel>(){ 
    				@Override
    				protected void initChannel(SocketChannel ch) throws Exception {
    					ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
    					ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
    					ch.pipeline().addLast(acceptorHandlers);
    				}
    			});
    			ChannelFuture future=bootstrap.bind(port).sync();
    			return future;
    		}
    		
    		public void release(){
    			this.acceptorGroup.shutdownGracefully();
    			this.clientGroup.shutdownGracefully();
    		}
    		
    		
    		public static void main(String[] args){
    			ChannelFuture future = null;
    			Server4Serializable server = null;
    			try{
    				server = new Server4Serializable();
    				future = server.doAccept(9999,new Server4SerializableHandler());
    				System.out.println("server started.");
    				
    				future.channel().closeFuture().sync();
    			}catch(InterruptedException e){
    				e.printStackTrace();
    			}finally{
    				if(null != future){
    					try {
    						future.channel().closeFuture().sync();
    					} catch (InterruptedException e) {
    						e.printStackTrace();
    					}
    				}
    				
    				if(null != server){
    					server.release();
    				}
    			}
    		}	
    		
    }
    

      

    /**
     * @Sharable注解 - 
     *  代表当前Handler是一个可以分享的处理器。也就意味着,服务器注册此Handler后,可以分享给多个客户端同时使用。
     *  如果不使用注解描述类型,则每次客户端请求时,必须为客户端重新创建一个新的Handler对象。
     *  
     */
    import io.netty.channel.ChannelHandler.Sharable;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import utils.GzipUtils;
    import utils.RequestMessage;
    import utils.ResponseMessage;
    
    @Sharable
    public class Server4SerializableHandler extends ChannelHandlerAdapter{
    	// 业务处理逻辑
    		@Override
    		public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    			System.out.println("from client : ClassName - " + msg.getClass().getName()
    					+ " ; message : " + msg.toString());
    			if(msg instanceof RequestMessage){
    				RequestMessage request = (RequestMessage)msg;
    				//byte[] attachment = GzipUtils.unzip(request.getAttachment());
    				//System.out.println(new String(attachment));
    			}
    			ResponseMessage response = new ResponseMessage(0L, "test response");
    			ctx.writeAndFlush(response);
    		}
    
    		// 异常处理逻辑
    		@Override
    		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    			System.out.println("server exceptionCaught method run...");
    			cause.printStackTrace();
    			ctx.close();
    		}
    }
    

      

    /**
     * 1. 单线程组
     * 2. Bootstrap配置启动信息
     * 3. 注册业务处理Handler
     * 4. connect连接服务,并发起请求
     */
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import utils.GzipUtils;
    import utils.RequestMessage;
    import utils.SerializableFactory4Marshalling;
    
    public class Client4Serializable {
    	// 处理请求和处理服务端响应的线程组
    	private EventLoopGroup group = null;
    	// 服务启动相关配置信息
    	private Bootstrap bootstrap = null;
    
    	public Client4Serializable() {
    		init();
    	}
    
    	private void init() {
    		group = new NioEventLoopGroup();
    		bootstrap = new Bootstrap();
    		// 绑定线程组
    		bootstrap.group(group);
    		// 设定通讯模式为NIO
    		bootstrap.channel(NioSocketChannel.class);
    
    	}
    
    	public ChannelFuture doRequest(String host, int port, final ChannelHandler... handlers)
    			throws InterruptedException {
    		this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    			@Override
    			protected void initChannel(SocketChannel ch) throws Exception {
    				ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
    				ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
    				ch.pipeline().addLast(handlers);
    			}
    		});
    		ChannelFuture future = this.bootstrap.connect(host, port).sync();
    		return future;
    	}
    	
    	public void release(){
    		this.group.shutdownGracefully();
    	}
    	
    	public static void main(String[] args) {
    		Client4Serializable client = null;
    		ChannelFuture future = null;
    		try{
    			client = new Client4Serializable();
    			future = client.doRequest("localhost", 9999, new Client4SerializableHandler());
    			String attachment = "test attachment";
    			byte[] attBuf = attachment.getBytes();
    			//attBuf = GzipUtils.zip(attBuf);
    			RequestMessage msg = new RequestMessage(new Random().nextLong(), 
    					"test",new byte[0]);
    					//"test", attBuf);
    			future.channel().writeAndFlush(msg);
    			TimeUnit.SECONDS.sleep(1);
    			future.addListener(ChannelFutureListener.CLOSE);
    		}catch(Exception e){
    			e.printStackTrace();
    		}finally{
    			if(null != future){
    				try {
    					future.channel().closeFuture().sync();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    			if(null != client){
    				client.release();
    			}
    		}
    	}
    }
    

      

    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class Client4SerializableHandler extends ChannelHandlerAdapter {
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		System.out.println("from server : ClassName - " + msg.getClass().getName()
    				+ " ; message : " + msg.toString());
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		System.out.println("client exceptionCaught method run...");
    		cause.printStackTrace();
    		ctx.close();
    	}
    }
    

      

    import java.io.ByteArrayInputStream;
    import java.io.ByteArrayOutputStream;
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import java.util.zip.GZIPInputStream;
    import java.util.zip.GZIPOutputStream;
    
    public class GzipUtils {
    	public static void main(String[] args) throws Exception {
    		FileInputStream fis = new FileInputStream("D:\3\1.jpg");
    		byte[] temp = new byte[fis.available()];
            int length=fis.read(temp);
            System.out.println("长度 : " + length);
            
            byte[] zipArray = GzipUtils.zip(temp);
    		System.out.println("压缩后的长度 : " + zipArray.length);
    		
    		byte[] unzipArray = GzipUtils.unzip(zipArray);
    		System.out.println("解压缩后的长度 : " + unzipArray.length);
    		
    		FileOutputStream fos = new FileOutputStream("D:\3\101.jpg");
    		fos.write(unzipArray);
    		fos.flush();
    		
    		fos.close();
    		fis.close();
    	}
    	
    	/**
    	 * 解压缩
    	 * @param source 源数据。需要解压的数据。
    	 * @return 解压后的数据。 恢复的数据。
    	 * @throws Exception
    	 */
    	public static  byte[] unzip(byte[] source) throws Exception{
    		ByteArrayOutputStream out = new ByteArrayOutputStream();
    		ByteArrayInputStream in = new ByteArrayInputStream(source);
    		// JDK提供的。 专门用于压缩使用的流对象。可以处理字节数组数据。
    		GZIPInputStream zipIn = new GZIPInputStream(in);
    		byte[] temp=new byte[256];
    		int length = 0;
    		while((length = zipIn.read(temp, 0, temp.length)) != -1){
    			out.write(temp, 0, length);
    		}
    		// 将字节数组输出流中的数据,转换为一个字节数组。
    		byte[] target = out.toByteArray();
    		
    		zipIn.close();
    		out.close();
    		
    		return target;
    	}
    	
    	/**
    	 * 压缩
    	 * @param source 源数据,需要压缩的数据
    	 * @return 压缩后的数据。
    	 * @throws Exception
    	 */
    	public static byte[] zip(byte[] source) throws Exception{
    		ByteArrayOutputStream out = new ByteArrayOutputStream();
    		// 输出流,JDK提供的,提供解压缩功能。
    		GZIPOutputStream zipOut = new GZIPOutputStream(out);
    		// 将压缩信息写入到内存。 写入的过程会实现解压。
    		zipOut.write(source);
    		// 结束。
    		zipOut.finish();
    		byte[] target = out.toByteArray();
    		
    		zipOut.close();
    		
    		return target;
    	}
    }
    

      

    import java.io.Serializable;
    
    public class RequestMessage implements Serializable {
    	private static final long serialVersionUID = 7084843947860990140L;
    	private Long id;
    	private String message;
    	private byte[] attachment;
    	@Override
    	public String toString() {
    		return "RequestMessage [id=" + id + ", message=" + message + "]";
    	}
    	public RequestMessage() {
    		super();
    	}
    	public RequestMessage(Long id, String message, byte[] attachment) {
    		super();
    		this.id = id;
    		this.message = message;
    		this.attachment = attachment;
    	}
    	public Long getId() {
    		return id;
    	}
    	public void setId(Long id) {
    		this.id = id;
    	}
    	public String getMessage() {
    		return message;
    	}
    	public void setMessage(String message) {
    		this.message = message;
    	}
    	public byte[] getAttachment() {
    		return attachment;
    	}
    	public void setAttachment(byte[] attachment) {
    		this.attachment = attachment;
    	}
    }
    

      

    import java.io.Serializable;
    
    public class ResponseMessage implements  Serializable {
    	private static final long serialVersionUID = -8134313953478922076L;
    	private Long id;
    	private String message;
    	@Override
    	public String toString() {
    		return "ResponseMessage [id=" + id + ", message=" + message + "]";
    	}
    	public ResponseMessage() {
    		super();
    	}
    	public ResponseMessage(Long id, String message) {
    		super();
    		this.id = id;
    		this.message = message;
    	}
    	public Long getId() {
    		return id;
    	}
    	public void setId(Long id) {
    		this.id = id;
    	}
    	public String getMessage() {
    		return message;
    	}
    	public void setMessage(String message) {
    		this.message = message;
    	}
    
    }
    

      

    import org.jboss.marshalling.MarshallerFactory;
    import org.jboss.marshalling.Marshalling;
    import org.jboss.marshalling.MarshallingConfiguration;
    
    import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
    import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
    import io.netty.handler.codec.marshalling.MarshallerProvider;
    import io.netty.handler.codec.marshalling.MarshallingDecoder;
    import io.netty.handler.codec.marshalling.MarshallingEncoder;
    import io.netty.handler.codec.marshalling.UnmarshallerProvider;
    
    public class SerializableFactory4Marshalling {
    
    	/**
    	 * 创建Jboss Marshalling解码器MarshallingDecoder
    	 * 
    	 * @return MarshallingDecoder
    	 */
    	public static MarshallingDecoder buildMarshallingDecoder() {
    		// 首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
    		// jboss-marshalling-serial 包提供
    		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
    		// 创建了MarshallingConfiguration对象,配置了版本号为5
    		final MarshallingConfiguration configuration = new MarshallingConfiguration();
    		// 序列化版本。只要使用JDK5以上版本,version只能定义为5。
    		configuration.setVersion(5);
    		// 根据marshallerFactory和configuration创建provider
    		UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
    		// 构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
    		MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
    		return decoder;
    	}
    
        /**
         * 创建Jboss Marshalling编码器MarshallingEncoder
         * @return MarshallingEncoder
         */
        public static MarshallingEncoder buildMarshallingEncoder() {
            final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
            //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
            MarshallingEncoder encoder = new MarshallingEncoder(provider);
            return encoder;
        }
    }
    

      定时断线重连 

    客户端断线重连机制。

    客户端数量多,且需要传递的数据量级较大。可以周期性的发送数据的时候,使用。

    要 求对数据的即时性不高的时候,才可使用。

    优点: 可以使用数据缓存。不是每条数据进行一次数据交互。可以定时回收资源,对 资源利用率高。相对来说,即时性可以通过其他方式保证。如: 120 秒自动断线。数据变 化 1000 次请求服务器一次。300 秒中自动发送不足 1000 次的变化数据

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.timeout.ReadTimeoutHandler;
    import utils.SerializableFactory4Marshalling;
    
    public class Server4Timer {
    	    // 监听线程组,监听客户端请求
    		private EventLoopGroup acceptorGroup = null;
    		// 处理客户端相关操作线程组,负责处理与客户端的数据通讯
    		private EventLoopGroup clientGroup = null;
    		// 服务启动相关配置信息
    		private ServerBootstrap bootstrap = null;
    		public Server4Timer(){
    			init();
    		}
    		
    		private void init(){
    			acceptorGroup = new NioEventLoopGroup();
    			clientGroup = new NioEventLoopGroup();
    			bootstrap = new ServerBootstrap();
    			// 绑定线程组
    			bootstrap.group(acceptorGroup, clientGroup);
    			// 设定通讯模式为NIO
    			bootstrap.channel(NioServerSocketChannel.class);
    			// 设定缓冲区大小
    			bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
    			// SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)
    			bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
    				.option(ChannelOption.SO_RCVBUF, 16*1024)
    				.option(ChannelOption.SO_KEEPALIVE, true);
    			// 增加日志Handler,日志级别为info
    			// bootstrap.handler(new LoggingHandler(LogLevel.INFO));
    		}
    		
    		public ChannelFuture doAccept(int port) throws InterruptedException{
    			
    			bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    
    				@Override
    				protected void initChannel(SocketChannel ch) throws Exception {
    					ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
    					ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
    					// 定义一个定时断线处理器,当多长时间内,没有任何的可读取数据,自动断开连接。
    					// 构造参数,就是间隔时长。 默认的单位是秒。
    					// 自定义间隔时长单位。 new ReadTimeoutHandler(long times, TimeUnit unit);
    					ch.pipeline().addLast(new ReadTimeoutHandler(3));
    					ch.pipeline().addLast(new Server4TimerHandler());
    				}
    			});
    			ChannelFuture future = bootstrap.bind(port).sync();
    			return future;
    		}
    		public void release(){
    			this.acceptorGroup.shutdownGracefully();
    			this.clientGroup.shutdownGracefully();
    		}
    		
    		public static void main(String[] args){
    			ChannelFuture future = null;
    			Server4Timer server = null;
    			try{
    				server = new Server4Timer();
    				future = server.doAccept(9999);
    				System.out.println("server started.");
    				
    				future.channel().closeFuture().sync();
    			}catch(InterruptedException e){
    				e.printStackTrace();
    			}finally{
    				if(null != future){
    					try {
    						future.channel().closeFuture().sync();
    					} catch (InterruptedException e) {
    						e.printStackTrace();
    					}
    				}
    				
    				if(null != server){
    					server.release();
    				}
    			}
    		}
    }
    

      

    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import utils.ResponseMessage;
    
    public class Server4TimerHandler extends ChannelHandlerAdapter {
    	   // 业务处理逻辑
    		@Override
    		public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    			System.out.println("from client : ClassName - " + msg.getClass().getName()
    					+ " ; message : " + msg.toString());
    			ResponseMessage response = new ResponseMessage(0L, "test response");
    			ctx.writeAndFlush(response);
    		}
    
    		// 异常处理逻辑
    		@Override
    		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    			System.out.println("server exceptionCaught method run...");
    			// cause.printStackTrace();
    			ctx.close();
    		}
    }
    

      

    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.timeout.WriteTimeoutHandler;
    import utils.RequestMessage;
    import utils.SerializableFactory4Marshalling;
    
    public class Client4Timer {
    	    // 处理请求和处理服务端响应的线程组
    		private EventLoopGroup group = null;
    		// 服务启动相关配置信息
    		private Bootstrap bootstrap = null;
    		private ChannelFuture future = null;
    		
    		public Client4Timer(){
    			init();
    		}
    		
    		private void init(){
    			group = new NioEventLoopGroup();
    			bootstrap = new Bootstrap();
    			// 绑定线程组
    			bootstrap.group(group);
    			// 设定通讯模式为NIO
    			bootstrap.channel(NioSocketChannel.class);
    			// bootstrap.handler(new LoggingHandler(LogLevel.INFO));
    		}
    		
    		public void setHandlers() throws InterruptedException{
    			this.bootstrap.handler(new ChannelInitializer<SocketChannel>(){
    				@Override
    				protected void initChannel(SocketChannel ch) throws Exception {
    					ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
    					ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
    					// 写操作自定断线。 在指定时间内,没有写操作,自动断线。
    					ch.pipeline().addLast(new WriteTimeoutHandler(3));
    					ch.pipeline().addLast(new Client4TimerHandler());
    				}
    			});
    		}
    		
    		public ChannelFuture getChannelFuture(String host, int port) throws InterruptedException{
    			if(future == null){
    				future = this.bootstrap.connect(host, port).sync();
    			}
    			if(!future.channel().isActive()){
    				future = this.bootstrap.connect(host, port).sync();
    			}
    			return future;
    		}
    		
    		public void release(){
    			this.group.shutdownGracefully();
    		}
    		
    		public static void main(String[] args) {
    			Client4Timer client = null;
    			ChannelFuture future = null;
    			try{
    				client = new Client4Timer();
    				client.setHandlers();
    				
    				future = client.getChannelFuture("localhost", 9999);
    				for(int i = 0; i < 3; i++){
    					RequestMessage msg = new RequestMessage(new Random().nextLong(), 
    							"test"+i, new byte[0]);
    					future.channel().writeAndFlush(msg);
    					TimeUnit.SECONDS.sleep(2);
    				}
    				TimeUnit.SECONDS.sleep(5);
    				
    				future = client.getChannelFuture("localhost", 9999);
    				RequestMessage msg = new RequestMessage(new Random().nextLong(), 
    						"test", new byte[0]);
    				future.channel().writeAndFlush(msg);
    			}catch(Exception e){
    				e.printStackTrace();
    			}finally{
    				if(null != future){
    					try {
    						future.channel().closeFuture().sync();
    					} catch (InterruptedException e) {
    						e.printStackTrace();
    					}
    				}
    				if(null != client){
    					client.release();
    				}
    			}
    		}
    
    }
    

      

    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class Client4TimerHandler extends ChannelHandlerAdapter {
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		System.out.println("from server : ClassName - " + msg.getClass().getName()
    				+ " ; message : " + msg.toString());
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		System.out.println("client exceptionCaught method run...");
    		cause.printStackTrace();
    		ctx.close();
    	}
    
    	/**
    	 * 当连接建立成功后,出发的代码逻辑。
    	 * 在一次连接中只运行唯一一次。
    	 * 通常用于实现连接确认和资源初始化的。
    	 */
    	@Override
    	public void channelActive(ChannelHandlerContext ctx) throws Exception {
    		System.out.println("client channel active");
    	}
    }
    

      

  • 相关阅读:
    转!!MySQL中的存储引擎讲解(InnoDB,MyISAM,Memory等各存储引擎对比)
    转!!left join on and 与 left join on where的区别
    swoole WebSocket 消息推送
    基于swoole搭建聊天室程序
    使用php+swoole对client数据实时更新(下)
    使用php+swoole对client数据实时更新(上)
    swoole实现websocket推送
    PHP只显示姓名首尾字符,隐藏中间字符并用*替换
    微信小程序 tp5上传图片
    thinkphp 调用wsdl接口实例化SoapClient抛出异常
  • 原文地址:https://www.cnblogs.com/sunliyuan/p/12256140.html
Copyright © 2011-2022 走看看