Netty是基于JDK NIO的网络框架
简化了NIO编程, 不用程序自己维护selector, 将网络通信和数据处理的部分做了分离
多用于做底层的数据通信, 心跳检测(keepalived)
1. 数据通信
1.1 Hello World
public class Server { public static void main(String[] args) throws Exception { // 1 创建线两个事件循环组 // 一个是用于处理服务器端接收客户端连接的 // 一个是进行网络通信的(网络读写的) EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); // 2 创建辅助工具类ServerBootstrap,用于服务器通道的一系列配置 ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) // 绑定俩个线程组 .channel(NioServerSocketChannel.class) // 指定NIO的模式.NioServerSocketChannel对应TCP, NioDatagramChannel对应UDP .option(ChannelOption.SO_BACKLOG, 1024) // 设置TCP缓冲区 .option(ChannelOption.SO_SNDBUF, 32 * 1024) // 设置发送缓冲大小 .option(ChannelOption.SO_RCVBUF, 32 * 1024) // 这是接收缓冲大小 .option(ChannelOption.SO_KEEPALIVE, true) // 保持连接 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //SocketChannel建立连接后的管道 // 3 在这里配置 通信数据的处理逻辑, 可以addLast多个... sc.pipeline().addLast(new ServerHandler()); } }); // 4 绑定端口, bind返回future(异步), 加上sync阻塞在获取连接处 ChannelFuture cf1 = b.bind(8765).sync(); //ChannelFuture cf2 = b.bind(8764).sync(); //可以绑定多个端口 // 5 等待关闭, 加上sync阻塞在关闭请求处 cf1.channel().closeFuture().sync(); //cf2.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
SO_BACKLOG详解:
服务器的TCP内核维护两个队列A和B
客户端向服务端请求connect时, 发送SYN(第一次握手)
服务端收到SYN后, 向客户端发送SYN ACK(第二次握手), TCP内核将连接放入队列A
客户端收到后向服务端发送ACK(第三次握手), TCP内核将连接从A->B, accept返回, 连接完成
A/B队列的长度和即为BACKLOG, 当accept速度跟不上, A/B队列使得BACKLOG满了, 客户端连接就会被TCP内核拒绝
可以调大backlog缓解这一现象, 经验值~100
public class ServerHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("server channel active... "); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "utf-8"); System.out.println("Server :" + body ); String response = "返回给客户端的响应:" + body ; ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes())); // future完成后触发监听器, 此处是写完即关闭(短连接). 因此需要关闭连接时, 要通过server端关闭. 直接关闭用方法ctx[.channel()].close() //.addListener(ChannelFutureListener.CLOSE); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("读完了"); ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception { ctx.close(); } }
public class Client { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync(); //ChannelFuture cf2 = b.connect("127.0.0.1", 8764).sync(); //可以使用多个端口 //发送消息, Buffer类型. write需要flush才发送, 可用writeFlush代替 cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes())); cf1.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes())); Thread.sleep(2000); cf1.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes())); //cf2.channel().writeAndFlush(Unpooled.copiedBuffer("999".getBytes())); cf1.channel().closeFuture().sync(); //cf2.channel().closeFuture().sync(); group.shutdownGracefully(); } }
public class ClientHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "utf-8"); System.out.println("Client :" + body ); } finally { // 记得释放xxxHandler里面的方法的msg参数: 写(write)数据, msg引用将被自动释放不用手动处理; 但只读数据时,!必须手动释放引用数 ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
1.2 拆包粘包问题
TCP/IP确保了包的传送, 包的顺序等, 但编程中还需要解决拆包粘包问题
-> 接收的一连串包中的数据, 处理的分隔在哪里? 基本解决方案:
1)特殊字符作为结束分隔符
2)消息定长. 固定包的长度, 长度不够用空格补全. 接收方需要trim, 效率不高不推荐
3)自定义协议. 在消息头中包含消息总长度的字段. 需要安全性时可以考虑.
特殊字符
public class Server { public static void main(String[] args) throws Exception { EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_SNDBUF, 32*1024) .option(ChannelOption.SO_RCVBUF, 32*1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { // 使用DelimiterBasedFrameDecoder设置结尾分隔符$_ ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes()); sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); // 设置字符串形式的解码. 经过StringDecoder, Handler回调方法中接收的msg的具体类型就是String了(不再是ByteBuffer). 但写时仍需要传入ByteBuffer sc.pipeline().addLast(new StringDecoder()); // 通信数据的处理逻辑 sc.pipeline().addLast(new ServerHandler()); } }); //4 绑定连接 ChannelFuture cf = b.bind(8765).sync(); //等待服务器监听端口关闭 cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
public class ServerHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(" server channel active... "); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Server :" + msg); String response = "服务器响应: " + msg + "$_"; ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes())); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception { ctx.close(); } }
public class Client { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes()); sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("bbbb$_".getBytes())); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("cccc$_".getBytes())); cf.channel().closeFuture().sync(); group.shutdownGracefully(); } }
public class ClientHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client channel active... "); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { String response = (String) msg; System.out.println("Client: " + response); } finally { ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
定长
public class Server { public static void main(String[] args) throws Exception{ EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_SNDBUF, 32*1024) .option(ChannelOption.SO_RCVBUF, 32*1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //设置定长字符串接收, 定长为5, 积累到5个字节才会把数据发出去 sc.pipeline().addLast(new FixedLengthFrameDecoder(5)); //设置字符串形式的解码 sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ServerHandler()); } }); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
public class ServerHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(" server channel active... "); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String request = (String)msg; System.out.println("Server :" + msg); String response = request ; ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes())); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception { } }
public class Client { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new FixedLengthFrameDecoder(5)); sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("aaa".getBytes())); cf.channel().writeAndFlush(Unpooled.copiedBuffer("bbccccc".getBytes())); cf.channel().closeFuture().sync(); group.shutdownGracefully(); } }
public class ClientHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client channel active... "); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String response = (String) msg; System.out.println("Client: " + response); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { } }
1.3 编解码
即对象序列化技术, 目的是为了实现对象的网络传输和本地持久化
如果使用java的序列化, 码流较大. 因此多用Marshalling, Kyro(基于Protobuf)
下面的例子, 使用编解码传输javabean(Marshalling的javabean需要实现serializable), 并将message进行gzip压缩
自定义编解码器
public final class MarshallingCodeCFactory { // 解码 public static MarshallingDecoder buildMarshallingDecoder() { //创建工厂对象, 参数serial指创建的是java对象序列化的工厂对象 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); //创建配置对象,版本号为5 final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); //根据工厂对象和配置对象创建解码provider UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); //创建解码器对象. 第一个参数是provider, 第二个参数是单个消息序列化后的最大长度, 超过后拒绝处理 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1); return decoder; } // 编码 public static MarshallingEncoder buildMarshallingEncoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); //创建编码器对象. 用于将实现Serializable接口的JavaBean序列化为二进制数组 MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } }
javabean
public class Request implements Serializable { // 标记Serializable接口 private String id ; private String name ; private String requestMessage ; private byte[] attachment; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getRequestMessage() { return requestMessage; } public void setRequestMessage(String requestMessage) { this.requestMessage = requestMessage; } public byte[] getAttachment() { return attachment; } public void setAttachment(byte[] attachment) { this.attachment = attachment; } }
public class Response implements Serializable { // 标记Serializable接口 private String id; private String name; private String responseMessage; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getResponseMessage() { return responseMessage; } public void setResponseMessage(String responseMessage) { this.responseMessage = responseMessage; } }
GZip压缩的Util
public class GzipUtils { public static byte[] gzip(byte[] data) throws Exception { ByteArrayOutputStream bos = new ByteArrayOutputStream(); GZIPOutputStream gzip = new GZIPOutputStream(bos); gzip.write(data); gzip.finish(); gzip.close(); byte[] ret = bos.toByteArray(); bos.close(); return ret; } public static byte[] ungzip(byte[] data) throws Exception{ ByteArrayInputStream bis = new ByteArrayInputStream(data); GZIPInputStream gzip = new GZIPInputStream(bis); byte[] buf = new byte[1024]; int num = -1; ByteArrayOutputStream bos = new ByteArrayOutputStream(); while((num = gzip.read(buf)) != -1 ){ bos.write(buf, 0, num); } gzip.close(); bis.close(); byte[] ret = bos.toByteArray(); bos.close(); return ret; } }
服务端与客户端
public class Server { public static void main(String[] args) throws Exception{ EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) //设置日志 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel sc) throws Exception { // 添加编解码. 发送自定义的类型, 而Handler的方法接收的msg参数的实际类型也是相应的自定义类了 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ServerHandler()); } }); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); }
public class ServerHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Request req = (Request)msg; System.out.println("Server : " + req.getId() + ", " + req.getName() + ", " + req.getRequestMessage()); byte[] attachment = GzipUtils.ungzip(req.getAttachment()); String path = System.getProperty("user.dir") + File.separatorChar + "receive" + File.separatorChar + "001.jpg"; FileOutputStream fos = new FileOutputStream(path); fos.write(attachment); fos.close(); Response resp = new Response(); resp.setId(req.getId()); resp.setName("resp" + req.getId()); resp.setResponseMessage("响应内容" + req.getId()); ctx.writeAndFlush(resp);//.addListener(ChannelFutureListener.CLOSE); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
public class Client { public static void main(String[] args) throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); for(int i = 0; i < 5; i++){ Request req = new Request(); req.setId("" + i); req.setName("req" + i); req.setRequestMessage("数据信息" + i); String path = System.getProperty("user.dir") + File.separatorChar + "sources" + File.separatorChar + "001.jpg"; File file = new File(path); FileInputStream in = new FileInputStream(file); byte[] data = new byte[in.available()]; in.read(data); in.close(); req.setAttachment(GzipUtils.gzip(data)); //压缩 cf.channel().writeAndFlush(req); } cf.channel().closeFuture().sync(); group.shutdownGracefully(); } }
public class ClientHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { Response resp = (Response) msg; System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage()); } finally { ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
1.4 长连接/短连接
1.长连接, 一致保持着连接不主动中断, 实时性强
2.短连接. 数据放在缓存, 一次性批量提交所有数据, 服务端接收后即关闭连接
以上两种根据是否给ChannelHandlerContext添加ChannelFutureListener.ClOSE监听器实现
3.长连接, 一定时间不活跃则关闭连接. 给SocketChannel添加ReadTimeoutHandler实现. 实例如下:
public final class MarshallingCodeCFactory { public static MarshallingDecoder buildMarshallingDecoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024); return decoder; } public static MarshallingEncoder buildMarshallingEncoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } }
public class Request implements Serializable{ private String id ; private String name ; private String requestMessage ; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getRequestMessage() { return requestMessage; } public void setRequestMessage(String requestMessage) { this.requestMessage = requestMessage; } }
public class Response implements Serializable{ private String id; private String name; private String responseMessage; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getResponseMessage() { return responseMessage; } public void setResponseMessage(String responseMessage) { this.responseMessage = responseMessage; } }
public class Server { public static void main(String[] args) throws Exception{ EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) //设置日志 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ReadTimeoutHandler(5)); // 时限, 读客户端超时没数据则断开 sc.pipeline().addLast(new ServerHandler()); } }); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
public class ServerHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Request request = (Request) msg; System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage()); Response response = new Response(); response.setId(request.getId()); response.setName("response" + request.getId()); response.setResponseMessage("响应内容" + request.getId()); ctx.writeAndFlush(response); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
public class Client { private EventLoopGroup group; private Bootstrap b; private ChannelFuture cf ; // 单例 private static class SingletonHolder { static final Client instance = new Client(); } public static Client getInstance(){ return SingletonHolder.instance; } private Client(){ group = new NioEventLoopGroup(); b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); //超时handler(当服务器端与客户端在指定时间以上没有任何进行通信,则会关闭通道) sc.pipeline().addLast(new ReadTimeoutHandler(5)); // 时限5s, 读服务端超时没数据则断开 sc.pipeline().addLast(new ClientHandler()); } }); } public void connect(){ try { this.cf = b.connect("127.0.0.1", 8765).sync(); System.out.println("远程服务器已经连接, 可以进行数据交换"); } catch (Exception e) { e.printStackTrace(); } } public ChannelFuture getChannelFuture(){ if(this.cf == null) { //初次连接 this.connect(); } if(!this.cf.channel().isActive()){ //重连 this.connect(); } return this.cf; } public static void main(String[] args) throws Exception{ final Client c = Client.getInstance(); ChannelFuture cf = c.getChannelFuture(); for(int i = 1; i <= 3; i++ ){ Request request = new Request(); request.setId("" + i); request.setName("request" + i); request.setRequestMessage("数据信息" + i); cf.channel().writeAndFlush(request); TimeUnit.SECONDS.sleep(4); //间隔4s发送一次数据 } cf.channel().closeFuture().sync(); //阻塞至超时关闭 // 这里用子线程重连并发送数据一次 new Thread(new Runnable() { @Override public void run() { try { System.out.println("进入子线程重连一次"); ChannelFuture cf = c.getChannelFuture(); assert true == cf.channel().isActive(); //断言 //再次发送数据 Request request = new Request(); request.setId("" + 4); request.setName("request" + 4); request.setRequestMessage("数据信息" + 4); cf.channel().writeAndFlush(request); cf.channel().closeFuture().sync(); System.out.println("子线程完成"); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); System.out.println("断开连接,主线程结束.."); } }
public class ClientHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { Response resp = (Response) msg; System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage()); } finally { ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
1.5 使用UDP (较少使用)
public class Server { public void run(int port) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioDatagramChannel.class) // UDP: NioDatagramChannel .option(ChannelOption.SO_BROADCAST, true) // 广播 .handler(new ServerHandler()); b.bind(port).sync().channel().closeFuture().await(); } finally { group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new Server().run(8765); } }
public class ServerHandler extends SimpleChannelInboundHandler<DatagramPacket> { // 谚语列表 private static final String[] DICTIONARY = { "只要功夫深,铁棒磨成针。", "旧时王谢堂前燕,飞入寻常百姓家。", "洛阳亲友如相问,一片冰心在玉壶。", "一寸光阴一寸金,寸金难买寸光阴。", "老骥伏枥,志在千里。烈士暮年,壮心不已!" }; private String nextQuote() { int quoteId = ThreadLocalRandom.current().nextInt(DICTIONARY.length); return DICTIONARY[quoteId]; } @Override public void messageReceived(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception { String req = packet.content().toString(CharsetUtil.UTF_8); System.out.println(req); if ("谚语字典查询?".equals(req)) { ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("谚语查询结果: " + nextQuote(), CharsetUtil.UTF_8), packet.sender())); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); cause.printStackTrace(); } }
public class Client { public void run(int port) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioDatagramChannel.class) .option(ChannelOption.SO_BROADCAST, true) .handler(new ClientHandler()); Channel ch = b.bind(0).sync().channel(); // 向网段内的所有机器广播UDP消息 ch.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("谚语字典查询?", CharsetUtil.UTF_8), new InetSocketAddress("255.255.255.255", port))).sync(); if (!ch.closeFuture().await(15000)) { System.out.println("查询超时!"); } } finally { group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new Client().run(8765); } }
public class ClientHandler extends SimpleChannelInboundHandler<DatagramPacket> { @Override public void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception { String response = msg.content().toString(CharsetUtil.UTF_8); if (response.startsWith("谚语查询结果: ")) { System.out.println(response); ctx.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
2. 心跳检测
集群中主服务器需要知道从服务器的状态
因此client每隔5~10秒给server发送心跳包
可通过netty与定时任务来实现
public final class MarshallingCodeCFactory { public static MarshallingDecoder buildMarshallingDecoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1); return decoder; } public static MarshallingEncoder buildMarshallingEncoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } }
public class RequestInfo implements Serializable { private String ip ; private HashMap<String, Object> cpuPercMap ; private HashMap<String, Object> memoryMap; public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public HashMap<String, Object> getCpuPercMap() { return cpuPercMap; } public void setCpuPercMap(HashMap<String, Object> cpuPercMap) { this.cpuPercMap = cpuPercMap; } public HashMap<String, Object> getMemoryMap() { return memoryMap; } public void setMemoryMap(HashMap<String, Object> memoryMap) { this.memoryMap = memoryMap; } }
public class Server { public static void main(String[] args) throws Exception{ EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) //设置日志 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ServerHeartBeatHandler()); } }); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
public class ServerHeartBeatHandler extends ChannelHandlerAdapter { private static HashMap<String, String> AUTH_IP_MAP = new HashMap<String, String>(); private static final String SUCCESS_KEY = "auth_success_key"; static { AUTH_IP_MAP.put("127.0.0.1", "1234"); } private boolean auth(ChannelHandlerContext ctx, Object msg){ String [] ret = ((String) msg).split(","); String auth = AUTH_IP_MAP.get(ret[0]); if(auth != null && auth.equals(ret[1])){ // 认证成功, 返回确认信息 ctx.writeAndFlush(SUCCESS_KEY); return true; } else { ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE); return false; } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof String){ auth(ctx, msg); } else if (msg instanceof RequestInfo) { RequestInfo info = (RequestInfo) msg; System.out.println("--------------------------------------------"); System.out.println("当前主机ip为: " + info.getIp()); System.out.println("当前主机cpu情况: "); HashMap<String, Object> cpu = info.getCpuPercMap(); System.out.println("总使用率: " + cpu.get("combined")); System.out.println("用户使用率: " + cpu.get("user")); System.out.println("系统使用率: " + cpu.get("sys")); System.out.println("等待率: " + cpu.get("wait")); System.out.println("空闲率: " + cpu.get("idle")); System.out.println("当前主机memory情况: "); HashMap<String, Object> memory = info.getMemoryMap(); System.out.println("内存总量: " + memory.get("total")); System.out.println("当前内存使用量: " + memory.get("used")); System.out.println("当前内存剩余量: " + memory.get("free")); System.out.println("--------------------------------------------"); ctx.writeAndFlush("info received!"); } else { ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE); } } }
public class Client { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ClienHeartBeatHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); cf.channel().closeFuture().sync(); group.shutdownGracefully(); } }
public class ClienHeartBeatHandler extends ChannelHandlerAdapter { private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private ScheduledFuture<?> heartBeat; //定时任务 //主动向服务器发送认证信息 private InetAddress addr ; private static final String SUCCESS_KEY = "auth_success_key"; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { addr = InetAddress.getLocalHost(); //String ip = addr.getHostAddress(); String ip = "127.0.0.1"; String key = "1234"; //证书 String auth = ip + "," + key; // 发送认证 ctx.writeAndFlush(auth); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { if(msg instanceof String){ String ret = (String) msg; if(SUCCESS_KEY.equals(ret)){ // 收到认证 确认信息,设置每隔5秒发送心跳消息 this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 5, TimeUnit.SECONDS); System.out.println(msg); } else { // 收到心跳包 确认信息 System.out.println(msg); } } } finally { // 只读, 需要手动释放引用计数 ReferenceCountUtil.release(msg); } } private class HeartBeatTask implements Runnable { private final ChannelHandlerContext ctx; public HeartBeatTask(final ChannelHandlerContext ctx) { this.ctx = ctx; } @Override public void run() { try { RequestInfo info = new RequestInfo(); //ip info.setIp(addr.getHostAddress()); Sigar sigar = new Sigar(); //cpu prec CpuPerc cpuPerc = sigar.getCpuPerc(); HashMap<String, Object> cpuPercMap = new HashMap<String, Object>(); cpuPercMap.put("combined", cpuPerc.getCombined()); cpuPercMap.put("user", cpuPerc.getUser()); cpuPercMap.put("sys", cpuPerc.getSys()); cpuPercMap.put("wait", cpuPerc.getWait()); cpuPercMap.put("idle", cpuPerc.getIdle()); // memory Mem mem = sigar.getMem(); HashMap<String, Object> memoryMap = new HashMap<String, Object>(); memoryMap.put("total", mem.getTotal() / 1024L); memoryMap.put("used", mem.getUsed() / 1024L); memoryMap.put("free", mem.getFree() / 1024L); info.setCpuPercMap(cpuPercMap); info.setMemoryMap(memoryMap); ctx.writeAndFlush(info); } catch (Exception e) { e.printStackTrace(); } } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); // 取消定时发送心跳包的任务 if (heartBeat != null) { heartBeat.cancel(true); heartBeat = null; } ctx.fireExceptionCaught(cause); } } }
3. HTTP
3.1 Hello World
public final class HttpHelloWorldServer { static final boolean SSL = System.getProperty("ssl") != null; static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8080")); public static void main(String[] args) throws Exception { final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContext.newServerContext(ssc.certificate(), ssc.privateKey()); } else { sslCtx = null; } EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.option(ChannelOption.SO_BACKLOG, 1024); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new HttpHelloWorldServerInitializer(sslCtx)); Channel ch = b.bind(PORT).sync().channel(); System.err.println("Open your web browser and navigate to " + (SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/'); ch.closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
public class HttpHelloWorldServerInitializer extends ChannelInitializer<SocketChannel> { private final SslContext sslCtx; public HttpHelloWorldServerInitializer(SslContext sslCtx) { this.sslCtx = sslCtx; } @Override public void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } p.addLast(new HttpServerCodec()); // !使用http通信, HttpRequest和HttpResponse p.addLast(new HttpHelloWorldServerHandler()); } }
public class HttpHelloWorldServerHandler extends ChannelHandlerAdapter { private static final byte[] CONTENT = "HELLO WORLD".getBytes(); @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg instanceof HttpRequest) { HttpRequest req = (HttpRequest) msg; if (HttpHeaderUtil.is100ContinueExpected(req)) { ctx.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE)); } boolean keepAlive = HttpHeaderUtil.isKeepAlive(req); // 构造响应 FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(CONTENT)); response.headers().set(CONTENT_TYPE, "text/plain;charset=UTF-8"); response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes()); if (!keepAlive) { // Request短连接, 写完后直接关闭 ctx.write(response).addListener(ChannelFutureListener.CLOSE); } else { // 长连接, response也设置为KEEP_ALIVE response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE); ctx.write(response); } } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
3.2 HTTP下载文件
public class HttpDownloadServer { private static final String DEFAULT_URL = "/sources/"; public void run(final int port, final String url) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // addLast的第一项为key, 自定义的 // request解码器 ch.pipeline().addLast("http-decoder", new HttpRequestDecoder()); // response的编码器 ch.pipeline().addLast("http-encoder", new HttpResponseEncoder()); // chunked, 传输文件时分多个response分解地传输文件 ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); // ObjectAggregator, 将多个response合并为一个FullHttpResponse ch.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65536)); // 自定义业务逻辑handler ch.pipeline().addLast("fileServerHandler", new HttpDownoadServerHandler(url)); } }); ChannelFuture future = b.bind("127.0.0.1", port).sync(); System.out.println("HTTP文件目录服务器启动,网址是 : " + "http://localhost:" + port + url); future.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8765; new HttpDownloadServer().run(port, DEFAULT_URL); } }
// 注意这里继承了SimpleChannelInboundHandler<T>, 含泛型, 即指定了传入参数msg的类型 public class HttpDownoadServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> { private final String url; public HttpDownoadServerHandler(String url) { this.url = url; } @Override public void messageReceived(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { //是否能理解(解码)请求 if (!request.decoderResult().isSuccess()) { // 400 sendError(ctx, BAD_REQUEST); return; } //对请求的方法进行判断:如果不是GET方法则返回异常 if (request.method() != GET) { // 405 sendError(ctx, METHOD_NOT_ALLOWED); return; } //获取请求uri路径 final String uri = request.uri(); //对url进行分析,返回本地路径 final String path = parseURI(uri); //如果 路径构造不合法,则path为null if (path == null) { //403 sendError(ctx, FORBIDDEN); return; } // 创建file对象 File file = new File(path); // 文件隐藏或不存在 if (file.isHidden() || !file.exists()) { // 404 sendError(ctx, NOT_FOUND); return; } // 是文件夹 if (file.isDirectory()) { if (uri.endsWith("/")) { //如果以正常"/"结束 说明是访问的一个文件目录:则进行展示文件列表 sendListing(ctx, file); } else { //如果非"/"结束 则重定向,让客户端补全"/"并再次请求 sendRedirect(ctx, uri + '/'); } return; } // 如果所创建的file对象不是文件类型 if (!file.isFile()) { // 403 sendError(ctx, FORBIDDEN); return; } //随机文件读写对象 RandomAccessFile randomAccessFile = null; try { randomAccessFile = new RandomAccessFile(file, "r");// 以只读的方式打开文件 } catch (FileNotFoundException fnfe) { // 404 sendError(ctx, NOT_FOUND); return; } //获取文件长度 long fileLength = randomAccessFile.length(); //建立响应对象 HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); //设置响应信息 HttpHeaderUtil.setContentLength(response, fileLength); //设置Content-Type setContentTypeHeader(response, file); //设置为KeepAlive if (HttpHeaderUtil.isKeepAlive(request)) { response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE); } //输出response header, HttpObjectAggregator能将其与下面输出整合合并 ctx.write(response); //写出ChunkedFile. 创建ChunkedFile需要使用RandomAccessFile并设置分段. 这里每次传输8192个字节 ChannelFuture sendFileFuture = ctx.write(new ChunkedFile(randomAccessFile, 0, fileLength, 8192), ctx.newProgressivePromise()); //添加传输监听 sendFileFuture.addListener(new ChannelProgressiveFutureListener() { @Override public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) { if (total < 0) { System.err.println("Transfer progress: " + progress); } else { System.err.println("Transfer progress: " + progress + " / " + total); } } @Override public void operationComplete(ChannelProgressiveFuture future) throws Exception { System.out.println("Transfer complete."); } }); //使用Chunked, 完成时需要发送标记结束的空消息体! ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); //如果当前连接请求非Keep-Alive, 最后一包消息发送完后, 服务器主动关闭连接 if (!HttpHeaderUtil.isKeepAlive(request)) { lastContentFuture.addListener(ChannelFutureListener.CLOSE); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (ctx.channel().isActive()) { // 500 sendError(ctx, INTERNAL_SERVER_ERROR); ctx.close(); } } //判断非法URI的正则 private static final Pattern INSECURE_URI = Pattern.compile(".*[<>&"].*"); private String parseURI(String uri) { try { //使用UTF-8字符集 uri = URLDecoder.decode(uri, "UTF-8"); } catch (UnsupportedEncodingException e) { try { //尝试ISO-8859-1 uri = URLDecoder.decode(uri, "ISO-8859-1"); } catch (UnsupportedEncodingException e1) { //抛出预想外异常信息 throw new Error(); } } // 对uri进行细粒度判断:4步验证操作 // step 1 基础验证 if (!uri.startsWith(url)) { return null; } // step 2 基础验证 if (!uri.startsWith("/")) { return null; } // step 3 将文件分隔符替换为本地操作系统的文件路径分隔符 uri = uri.replace('/', File.separatorChar); // step 4 验证路径合法性 if (uri.contains(File.separator + '.') || uri.contains('.' + File.separator) || uri.startsWith(".") || uri.endsWith(".") || INSECURE_URI.matcher(uri).matches()) { return null; } //利用当前工程所在目录 + URI相对路径 构造绝对路径 return System.getProperty("user.dir") + File.separator + uri; } //用正则表达式过滤文件名 private static final Pattern ALLOWED_FILE_NAME = Pattern.compile("[A-Za-z0-9][-_A-Za-z0-9\.]*"); //文件列表, 拼html文件 private static void sendListing(ChannelHandlerContext ctx, File dir) { // 设置响应对象 FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK); // 响应头 response.headers().set(CONTENT_TYPE, "text/html; charset=UTF-8"); // 构造文本内容 StringBuilder ret = new StringBuilder(); String dirPath = dir.getPath(); ret.append("<!DOCTYPE html> "); ret.append("<html><head><title>"); ret.append(dirPath); ret.append(" 目录:"); ret.append("</title></head><body> "); ret.append("<h3>"); ret.append(dirPath).append(" 目录:"); ret.append("</h3> "); ret.append("<ul>"); ret.append("<li>链接:<a href="../">..</a></li> "); // 遍历文件, 生成超链接 for (File f : dir.listFiles()) { //step 1: 跳过隐藏文件和不可读文件 if (f.isHidden() || !f.canRead()) { continue; } String name = f.getName(); //step 2: 跳过正则过滤的文件名 if (!ALLOWED_FILE_NAME.matcher(name).matches()) { continue; } ret.append("<li>链接:<a href=""); ret.append(name); ret.append("">"); ret.append(name); ret.append("</a></li> "); } ret.append("</ul></body></html> "); //构造ByteBuf,写入缓冲区 ByteBuf buffer = Unpooled.copiedBuffer(ret, CharsetUtil.UTF_8); //进行写出操作 response.content().writeBytes(buffer); //重置ByteBuf buffer.release(); //发送完成并主动关闭连接 ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } //重定向操作 private static void sendRedirect(ChannelHandlerContext ctx, String newUri) { FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, FOUND); response.headers().set(LOCATION, newUri); ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } //错误信息 private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status.toString()+ " ", CharsetUtil.UTF_8)); response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8"); ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } private static void setContentTypeHeader(HttpResponse response, File file) { //使用mime对象获取文件对应的Content-Type MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap(); response.headers().set(CONTENT_TYPE, mimeTypesMap.getContentType(file.getPath())); } }
3.3 HTTP上传文件 (较少使用)
实际应用中文件上传服务端有成熟的框架fastDFS(小文件)和HDFS(大文件)
如要实现断点续传, 需要记录上传进度. 参考HTTP头的Range和Content-Range
public final class HttpUploadServer { static final boolean SSL = System.getProperty("ssl") != null; static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8080")); public static void main(String[] args) throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContext.newServerContext(ssc.certificate(), ssc.privateKey()); } else { sslCtx = null; } EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup); b.channel(NioServerSocketChannel.class); b.handler(new LoggingHandler(LogLevel.INFO)); b.childHandler(new HttpUploadServerInitializer(sslCtx)); Channel ch = b.bind(PORT).sync().channel(); System.err.println("Open your web browser and navigate to " + (SSL ? "https" : "http") + "://127.0.0.1:" + PORT + '/'); ch.closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
public class HttpUploadServerInitializer extends ChannelInitializer<SocketChannel> { private final SslContext sslCtx; public HttpUploadServerInitializer(SslContext sslCtx) { this.sslCtx = sslCtx; } @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); if (sslCtx != null) { pipeline.addLast(sslCtx.newHandler(ch.alloc())); } pipeline.addLast(new HttpRequestDecoder()); pipeline.addLast(new HttpResponseEncoder()); // 压缩 pipeline.addLast(new HttpContentCompressor()); pipeline.addLast(new HttpUploadServerHandler()); } }
public class HttpUploadServerHandler extends SimpleChannelInboundHandler<HttpObject> { private static final Logger logger = Logger.getLogger(HttpUploadServerHandler.class.getName()); private HttpRequest request; private boolean readingChunks; private final StringBuilder responseContent = new StringBuilder(); private static final HttpDataFactory factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE); // 大小超过minsize放磁盘上 private HttpPostRequestDecoder decoder; static { DiskFileUpload.deleteOnExitTemporaryFile = true; //退出时是否删除临时文件 DiskFileUpload.baseDirectory = "D:" + File.separatorChar + "aa"; //文件存储路径 DiskAttribute.deleteOnExitTemporaryFile = true; //退出时是否删除临时文件 DiskAttribute.baseDirectory = "D:" + File.separatorChar + "aa"; //文件存储路径 } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (decoder != null) { decoder.cleanFiles(); } } @Override public void messageReceived(ChannelHandlerContext ctx, HttpObject msg) throws Exception { if (msg instanceof HttpRequest) { // HttpRequest传输头 HttpRequest request = this.request = (HttpRequest) msg; URI uri = new URI(request.uri()); if (!uri.getPath().startsWith("/form")) { // 返回上传菜单 writeMenu(ctx); return; } // 拼接反馈内容 responseContent.setLength(0); responseContent.append("WELCOME TO THE WILD WILD WEB SERVER "); responseContent.append("=================================== "); responseContent.append("VERSION: " + request.protocolVersion().text() + " "); responseContent.append("REQUEST_URI: " + request.uri() + " "); responseContent.append(" "); for (Entry<CharSequence, CharSequence> entry : request.headers()) { responseContent.append("HEADER: " + entry.getKey() + '=' + entry.getValue() + " "); } responseContent.append(" "); Set<Cookie> cookies = null; String value = request.headers().getAndConvert(HttpHeaderNames.COOKIE); if (value == null) { cookies = Collections.emptySet(); } else { cookies = ServerCookieDecoder.decode(value); } for (Cookie cookie : cookies) { responseContent.append("COOKIE: " + cookie + " "); } responseContent.append(" "); QueryStringDecoder decoderQuery = new QueryStringDecoder(request.uri()); Map<String, List<String>> uriAttributes = decoderQuery.parameters(); for (Entry<String, List<String>> attr: uriAttributes.entrySet()) { for (String attrVal: attr.getValue()) { responseContent.append("URI: " + attr.getKey() + '=' + attrVal + " "); } } responseContent.append(" "); // GET方法, 就此return if (request.method().equals(HttpMethod.GET)) { responseContent.append(" END OF GET CONTENT "); return; } // POST方法 try { decoder = new HttpPostRequestDecoder(factory, request); } catch (ErrorDataDecoderException e1) { e1.printStackTrace(); responseContent.append(e1.getMessage()); writeResponse(ctx.channel()); ctx.channel().close(); return; } readingChunks = HttpHeaderUtil.isTransferEncodingChunked(request); responseContent.append("Is Chunked: " + readingChunks + " "); responseContent.append("IsMultipart: " + decoder.isMultipart() + " "); if (readingChunks) { responseContent.append("Chunks: "); } } if (decoder != null) { if (msg instanceof HttpContent) { //HttpContent具体传输的内容 // 读取到一个chunk HttpContent chunk = (HttpContent) msg; try { decoder.offer(chunk); } catch (ErrorDataDecoderException e1) { e1.printStackTrace(); responseContent.append(e1.getMessage()); writeResponse(ctx.channel()); ctx.channel().close(); return; } responseContent.append('o'); //每读一个chunk标记一个'o' readHttpDataChunkByChunk(); // 最后一块chunk if (chunk instanceof LastHttpContent) { writeResponse(ctx.channel()); readingChunks = false; reset(); } } } else { writeResponse(ctx.channel()); } } private void reset() { request = null; decoder.destroy(); //释放资源 decoder = null; } private void readHttpDataChunkByChunk() throws Exception { try { while (decoder.hasNext()) { InterfaceHttpData data = decoder.next(); if (data != null) { try { writeHttpData(data); } finally { data.release(); } } } } catch (EndOfDataDecoderException e1) { responseContent.append(" END OF CONTENT CHUNK BY CHUNK "); } } private void writeHttpData(InterfaceHttpData data) throws Exception { if (data.getHttpDataType() == HttpDataType.Attribute) { Attribute attribute = (Attribute) data; String value = null; try { value = attribute.getValue(); } catch (IOException e1) { e1.printStackTrace(); responseContent.append(" BODY Attribute: " + attribute.getHttpDataType().name() + ": " + attribute.getName() + " Error while reading value: " + e1.getMessage() + " "); return; } if (value.length() > 100) { responseContent.append(" BODY Attribute: " + attribute.getHttpDataType().name() + ": " + attribute.getName() + " data too long "); } else { responseContent.append(" BODY Attribute: " + attribute.getHttpDataType().name() + ": " + attribute + " "); } } else { responseContent.append(" -----------start-------------" + " "); responseContent.append(" BODY FileUpload: " + data.getHttpDataType().name() + ": " + data + " "); responseContent.append(" ------------end------------" + " "); if (data.getHttpDataType() == HttpDataType.FileUpload) { FileUpload fileUpload = (FileUpload) data; if (fileUpload.isCompleted()) { System.out.println("file name : " + fileUpload.getFilename()); System.out.println("file length: " + fileUpload.length()); System.out.println("file maxSize : " + fileUpload.getMaxSize()); System.out.println("file path :" + fileUpload.getFile().getPath()); System.out.println("file absolutepath :" + fileUpload.getFile().getAbsolutePath()); System.out.println("parent path :" + fileUpload.getFile().getParentFile()); if (fileUpload.length() < 1024 * 1024 * 10) { responseContent.append(" Content of file "); try { responseContent.append(fileUpload.getString(fileUpload.getCharset())); } catch (Exception e1) { e1.printStackTrace(); } responseContent.append(" "); } else { responseContent.append(" File too long to be printed out:" + fileUpload.length() + " "); } fileUpload.renameTo(new File(fileUpload.getFile().getPath())); // 核心操作, 写文件 decoder.removeHttpDataFromClean(fileUpload); } else { responseContent.append(" File to be continued but should not! "); } } } } private void writeResponse(Channel channel) { ByteBuf buf = copiedBuffer(responseContent.toString(), CharsetUtil.UTF_8); responseContent.setLength(0); // 是否是短连接 boolean close = request.headers().contains(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE, true) || request.protocolVersion().equals(HttpVersion.HTTP_1_0) && !request.headers().contains(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE, true); FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buf); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8"); // 最后一次连接不需要Content-Length if (!close) { response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, buf.readableBytes()); } Set<Cookie> cookies = null; String value = request.headers().getAndConvert(HttpHeaderNames.COOKIE); if (value == null) { cookies = Collections.emptySet(); } else { cookies = ServerCookieDecoder.decode(value); } if (!cookies.isEmpty()) { for (Cookie cookie : cookies) { response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.encode(cookie)); } } ChannelFuture future = channel.writeAndFlush(response); if (close) { future.addListener(ChannelFutureListener.CLOSE); } } //拼接上传页html菜单 private void writeMenu(ChannelHandlerContext ctx) { responseContent.setLength(0); // create Pseudo Menu responseContent.append("<html>"); responseContent.append("<head>"); responseContent.append("<title>Netty Test Form</title> "); responseContent.append("</head> "); responseContent.append("<body bgcolor=white><style>td{font-size: 12pt;}</style>"); responseContent.append("<table border="0">"); responseContent.append("<tr>"); responseContent.append("<td>"); responseContent.append("<h1>Netty Test Form</h1>"); responseContent.append("Choose one FORM"); responseContent.append("</td>"); responseContent.append("</tr>"); responseContent.append("</table> "); // GET responseContent.append("<CENTER>GET FORM<HR WIDTH="75%" NOSHADE color="blue"></CENTER>"); responseContent.append("<FORM ACTION="/formget" METHOD="GET">"); responseContent.append("<input type=hidden name=getform value="GET">"); responseContent.append("<table border="0">"); responseContent.append("<tr><td>Fill with value: <br> <input type=text name="info" size=10></td></tr>"); responseContent.append("<tr><td>Fill with value: <br> <input type=text name="secondinfo" size=20>"); responseContent.append("<tr><td>Fill with value: <br> <textarea name="thirdinfo" cols=40 rows=10></textarea>"); responseContent.append("</td></tr>"); responseContent.append("<tr><td><INPUT TYPE="submit" NAME="Send" VALUE="Send"></INPUT></td>"); responseContent.append("<td><INPUT TYPE="reset" NAME="Clear" VALUE="Clear" ></INPUT></td></tr>"); responseContent.append("</table></FORM> "); responseContent.append("<CENTER><HR WIDTH="75%" NOSHADE color="blue"></CENTER>"); // POST responseContent.append("<CENTER>POST FORM<HR WIDTH="75%" NOSHADE color="blue"></CENTER>"); responseContent.append("<FORM ACTION="/formpost" METHOD="POST">"); responseContent.append("<input type=hidden name=getform value="POST">"); responseContent.append("<table border="0">"); responseContent.append("<tr><td>Fill with value: <br> <input type=text name="info" size=10></td></tr>"); responseContent.append("<tr><td>Fill with value: <br> <input type=text name="secondinfo" size=20>"); responseContent.append("<tr><td>Fill with value: <br> <textarea name="thirdinfo" cols=40 rows=10></textarea>"); responseContent.append("<tr><td>Fill with file (only file name will be transmitted): <br> <input type=file name="myfile">"); responseContent.append("</td></tr>"); responseContent.append("<tr><td><INPUT TYPE="submit" NAME="Send" VALUE="Send"></INPUT></td>"); responseContent.append("<td><INPUT TYPE="reset" NAME="Clear" VALUE="Clear" ></INPUT></td></tr>"); responseContent.append("</table></FORM> "); responseContent.append("<CENTER><HR WIDTH="75%" NOSHADE color="blue"></CENTER>"); // POST with enctype="multipart/form-data" responseContent.append("<CENTER>POST MULTIPART FORM<HR WIDTH="75%" NOSHADE color="blue"></CENTER>"); responseContent.append("<FORM ACTION="/formpostmultipart" ENCTYPE="multipart/form-data" METHOD="POST">"); responseContent.append("<input type=hidden name=getform value="POST">"); responseContent.append("<table border="0">"); responseContent.append("<tr><td>Fill with value: <br> <input type=text name="info" size=10></td></tr>"); responseContent.append("<tr><td>Fill with value: <br> <input type=text name="secondinfo" size=20>"); responseContent.append("<tr><td>Fill with value: <br> <textarea name="thirdinfo" cols=40 rows=10></textarea>"); responseContent.append("<tr><td>Fill with file: <br> <input type=file name="myfile">"); responseContent.append("</td></tr>"); responseContent.append("<tr><td><INPUT TYPE="submit" NAME="Send" VALUE="Send"></INPUT></td>"); responseContent.append("<td><INPUT TYPE="reset" NAME="Clear" VALUE="Clear" ></INPUT></td></tr>"); responseContent.append("</table></FORM> "); responseContent.append("<CENTER><HR WIDTH="75%" NOSHADE color="blue"></CENTER>"); responseContent.append("</body>"); responseContent.append("</html>"); ByteBuf buf = copiedBuffer(responseContent.toString(), CharsetUtil.UTF_8); FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buf); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8"); response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, buf.readableBytes()); ctx.channel().writeAndFlush(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.log(Level.WARNING, responseContent.toString(), cause); ctx.channel().close(); } }
3.4 WebSocket(较少使用)
public class WebSocketServer { public void run(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("http-codec", new HttpServerCodec()); pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); pipeline.addLast("handler", new WebSocketServerHandler()); } }); Channel ch = b.bind(port).sync().channel(); System.out.println("Web socket server started at port " + port + '.'); System.out.println("Open your browser and navigate to http://localhost:" + port + '/'); ch.closeFuture().sync(); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } public static void main(String[] args) throws Exception { new WebSocketServer().run(8765); } }
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> { private static final Logger logger = Logger.getLogger(WebSocketServerHandler.class.getName()); private WebSocketServerHandshaker handshaker; @Override public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception { // 传统的HTTP接入 if (msg instanceof FullHttpRequest) { handleHttpRequest(ctx, (FullHttpRequest) msg); } // WebSocket接入 else if (msg instanceof WebSocketFrame) { handleWebSocketFrame(ctx, (WebSocketFrame) msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception { // 如果HTTP解码失败,返回HTTP异常 if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST)); return; } // 构造握手响应返回,本机测试 WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:8080/websocket", null, false); handshaker = wsFactory.newHandshaker(req); if (handshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { handshaker.handshake(ctx.channel(), req); } } private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { // 判断是否是关闭链路的指令 if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } // 判断是否是Ping消息 if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } // 本例程仅支持文本消息,不支持二进制消息 if (!(frame instanceof TextWebSocketFrame)) { throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName())); } // 返回应答消息 String request = ((TextWebSocketFrame) frame).text(); if (logger.isLoggable(Level.FINE)) { logger.fine(String.format("%s received %s", ctx.channel(), request)); } ctx.channel().write( new TextWebSocketFrame(request + " , 欢迎使用Netty WebSocket服务,现在时刻:" + new java.util.Date().toString())); } private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) { // 返回应答给客户端 if (res.status().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); HttpHeaderUtil.setContentLength(res, res.content().readableBytes()); } // 如果是非Keep-Alive,关闭连接 ChannelFuture f = ctx.channel().writeAndFlush(res); if (!HttpHeaderUtil.isKeepAlive(req) || res.status().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
客户端是网页
<!DOCTYPE html> <html> <head> <meta charset="UTF-8"> Netty WebSocket 时间服务器 </head> <br> <body> <br> <script type="text/javascript"> var socket; if (!window.WebSocket) { window.WebSocket = window.MozWebSocket; } if (window.WebSocket) { socket = new WebSocket("ws://localhost:8765/websocket"); socket.onmessage = function(event) { var ta = document.getElementById('responseText'); ta.value = ""; ta.value = event.data }; socket.onopen = function(event) { var ta = document.getElementById('responseText'); ta.value = "打开WebSocket服务正常,浏览器支持WebSocket!"; }; socket.onclose = function(event) { var ta = document.getElementById('responseText'); ta.value = ""; ta.value = "WebSocket 关闭!"; }; } else { alert("抱歉,您的浏览器不支持WebSocket协议!"); } function send(message) { if (!window.WebSocket) { return; } if (socket.readyState == WebSocket.OPEN) { socket.send(message); } else { alert("WebSocket连接没有建立成功!"); } } </script> <form onsubmit="return false;"> <input type="text" name="message" value="Netty最佳实践" /> <br> <br> <input type="button" value="发送WebSocket请求消息" onclick="send(this.form.message.value)" /> <hr color="blue" /> <h3>服务端返回的应答消息</h3> <textarea id="responseText" style=" 500px; height: 300px;"></textarea> </form> </body> </html>