简介
linux 网络I/O模型介绍
1)堵塞I/O模型
2)非堵塞I/O模型
3)伪异步I/O模型
4)多路复用select / poll /epoll
5)信号驱动I/O模型
6) 异步I/O
netty入门应用
package com.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class TimeServer { public void bind(int port) throws Exception { // 配置线程组 //用于网络事件的处理 EventLoopGroup bossGroup = new NioEventLoopGroup(); //用于socketChannel的网络读写 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //netty用于启动NIO服务端的辅助启动类,目的是降低服务端的开发难度 ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChildChannelHandler()); //绑定网络IO事件的处理类 // 绑定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); // 等待服务端监听端口关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel arg0) throws Exception { arg0.pipeline().addLast(new TimeServerHandler()); } } public static void main(String[] args) throws Exception { int port = 8080; if(args != null && args.length > 0){ try { port = Integer.valueOf(args[0]); } catch (Exception e) { //采用默认值 } } new TimeServer().bind(port); } }
package com.netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class TimeServerHandler extends ChannelHandlerAdapter{ public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{ ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()];//根据缓冲区可读字节数创建byte数组 buf.readBytes(req); String body = new String(req,"UTF-8"); System.out.println("The time server receive order :"+body); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?new java.util.Date(System.currentTimeMillis()).toString():"BAD ORDER"; ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.write(resp); } public void channelReadComplete(ChannelHandlerContext ctx) throws Exception{ ctx.flush(); } /* (non-Javadoc) * @see io.netty.channel.ChannelHandlerAdapter#exceptionCaught(io.netty.channel.ChannelHandlerContext, java.lang.Throwable) */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // TODO Auto-generated method stub super.exceptionCaught(ctx, cause); ctx.close(); } }
package com.netty; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class TimeClient { public void connect(int port, String host) throws Exception { // 配置客户端NIO线程组 NioEventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeClientHandler()); } }); // 发起异步连接操作,等待连接成功 ChannelFuture f = b.connect(host, port).sync(); // 等待客户端链路关闭 f.channel().closeFuture().sync(); } finally { // 优雅的退出 释放NIO线程组 group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (Exception e) { // TODO: handle exception } } new TimeClient().connect(port, "127.0.0.1"); } }
package com.netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class TimeClientHandler extends ChannelHandlerAdapter { private final ByteBuf firstMessage; public TimeClientHandler() { byte[] req = "QUERY TIME ORDER".getBytes(); firstMessage = Unpooled.buffer(req.length); firstMessage.writeBytes(req); } // 当客户端与服务器TCP建立成功后,Netty的NIO线程会调用channelActive方法 public void channelActive(ChannelHandlerContext ctx) { // 将请求消息发送给服务端 ctx.writeAndFlush(firstMessage); } // 当服务端返回应答消息时,channelRead方法被调用 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("Now is :" + body); } /* * (non-Javadoc) * * @see * io.netty.channel.ChannelHandlerAdapter#exceptionCaught(io.netty.channel. * ChannelHandlerContext, java.lang.Throwable) */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // TODO Auto-generated method stub super.exceptionCaught(ctx, cause); ctx.close(); } }
TCP粘包/拆包问题解决之道
1)消息定长
2)在包尾增加回车换行符进行分割,列如FTP协议
3)将消息分为消息头和消息体,消息头包含消息的总长度
4)更复杂的应用层协议
使用LineBasedFrameDecoder和StringDecoder按行切换的文本编辑器
在client和server的pipeline中添加:
/**
* LineBasedFrameDecoder的工作原理依次遍历ByteBuf中的可读字节,判断是否有“
”和“
”,如果有,以此结束。
* 当1024长度还没发现结束符,则结束掉并抛弃之前读到的异常流码
*
* StringDecoder将接受到的对象转化成字符串,然后继续调用handler
*
* LineBasedFrameDecoder+StringDecoder组合其实就是按行切换的文本编辑器
*/
//判断ByteBuf中的可读字节,判断是否有" "," ",如果有就以此位置为结束位置。 //当最大长度1024字节仍然没有发现换行符,就抛出异常。 ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); //将收到的对象转化成字符串 ch.pipeline().addLast(new StringDecoder());
分隔符和定长编码器的应用
对消息区分:
1)消息固定长度
2)将回车换行符作为消息结束符
3)将特殊分隔符作为消息的结束标志
4)通过在消息头中定义长度字段来标识消息的总长度
DelimiterBasedFrameDecoder开发
在client与server的pipeline中添加如下解码器
//创建分隔符缓冲对象 ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
ch.pipeline().addLast(new StringDecoder());
FixedLengthFrameDecoder应用开发
/** * 利用FixedLengthFrameDecoder解码器,无论一次性接收多少数据报, * 他都会按照构造函数中设置的固定长度进行解码,如果是半包消息, * FixedLengthFrameDecoder会缓存半包消息并等待下个包到达后进行拼包,直到读取到一个完整的包。 */ ch.pipeline().addLast(new FixedLengthFrameDecoder(20)); ch.pipeline().addLast(new StringDecoder());
编解码技术
java序列化的缺点:
1)无法跨语言
2)码流太大
3)序列化性能不高
编解码框架
MessagePack编解码
特点:
1)编解码高效,性能高。
2)序列化之后的码流小。
3)支持跨语言。
/** * MessagePack编码器的开发 * 负责将Object类型的POJO对象编码成byte数组,然后写入到ByteBuf中 * @author Administrator * */ public class MsgpackEncoder extends MessageToByteEncoder<Object> { @Override protected void encode(ChannelHandlerContext arg0, Object arg1, ByteBuf arg2) throws Exception { MessagePack msgpack = new MessagePack(); // 对象arg1序列化 byte[] raw = msgpack.write(arg1); arg2.writeBytes(raw); } }
/** * MessagePack 解码器开发 * @author Administrator * */ public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf> { @Override protected void decode(ChannelHandlerContext arg0, ByteBuf arg1, List<Object> arg2) throws Exception { final byte[] array; final int length = arg1.readableBytes(); array = new byte[length]; //首先从数据报arg1中获取解码的byte数组 arg1.getBytes(arg1.readerIndex(), array,0,length); MessagePack msgpack = new MessagePack(); //调用read()将其反序列化为object对象,并加入到解码列表中 arg2.add(msgpack.read(array)); } }
在client和server的pipeline中添加编解码处理器:
//LengthFieldBasedFrameDecoder LengthFieldPrepender 解决粘包问题 //MsgpackDecoder MsgpackEncoder 解决对象编码问题 ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2)); ch.pipeline().addLast("msgpack decoder",new MsgpackDecoder());
//加了2个字节的消息字段 ch.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2)); ch.pipeline().addLast("msgpack encoder",new MsgpackEncoder());
Google Protobuf编解码
JBoss Marshalling编解码 jboss-marshalling-1.3.0.jar 和 jboss-marshalling-serial-1.3.4.jar
/** * 创建Jboss Marshalling解码器marshallingDecoder * @return */ public static MarshallingDecoder buildMarshallingDecoder(){ //获取MarshallerFactory实例 ,参数serial表示创建的java序列化工厂对象, //由jboss-marshalling-serial-1.3.4.jar提供 final MarshallerFactory marshallerFactory = Marshalling.getMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); //1024 单个消息序列化最大长度 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024); return decoder; } public static MarshallingEncoder buildMarshallingEncoder(){ final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); DefaultMarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); //将POJO对象序列化为二进制数组 MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; }
在client和server的pipeline中添加编解码器: (支持半包和拆包的处理)
ch.pipeline().addLast(MarshallingCodeCFactory
.buildMarshallingDecoder());
ch.pipeline().addLast(MarshallingCodeCFactory
.buildMarshallingEncoder());