zoukankan      html  css  js  c++  java
  • Netty实例

    Netty是基于JDK NIO的网络框架

    简化了NIO编程, 不用程序自己维护selector, 将网络通信和数据处理的部分做了分离

    多用于做底层的数据通信, 心跳检测(keepalived)

    1. 数据通信

    1.1 Hello World

    复制代码
    public class Server {
    
        public static void main(String[] args) throws Exception {
            // 1 创建线两个事件循环组
            // 一个是用于处理服务器端接收客户端连接的
            // 一个是进行网络通信的(网络读写的)
            EventLoopGroup pGroup = new NioEventLoopGroup();
            EventLoopGroup cGroup = new NioEventLoopGroup();
    
            // 2 创建辅助工具类ServerBootstrap,用于服务器通道的一系列配置
            ServerBootstrap b = new ServerBootstrap();
            b.group(pGroup, cGroup) // 绑定俩个线程组
                    .channel(NioServerSocketChannel.class) // 指定NIO的模式.NioServerSocketChannel对应TCP, NioDatagramChannel对应UDP
                    .option(ChannelOption.SO_BACKLOG, 1024) // 设置TCP缓冲区
                    .option(ChannelOption.SO_SNDBUF, 32 * 1024) // 设置发送缓冲大小
                    .option(ChannelOption.SO_RCVBUF, 32 * 1024) // 这是接收缓冲大小
                    .option(ChannelOption.SO_KEEPALIVE, true) // 保持连接
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {  //SocketChannel建立连接后的管道
                            // 3 在这里配置 通信数据的处理逻辑, 可以addLast多个...
                            sc.pipeline().addLast(new ServerHandler());
                        }
                    });
    
            // 4 绑定端口, bind返回future(异步), 加上sync阻塞在获取连接处
            ChannelFuture cf1 = b.bind(8765).sync();
            //ChannelFuture cf2 = b.bind(8764).sync();   //可以绑定多个端口
            // 5 等待关闭, 加上sync阻塞在关闭请求处
            cf1.channel().closeFuture().sync();
            //cf2.channel().closeFuture().sync();
            pGroup.shutdownGracefully();
            cGroup.shutdownGracefully();
        }
    }
    复制代码

    SO_BACKLOG详解:
    服务器的TCP内核维护两个队列A和B
    客户端向服务端请求connect时, 发送SYN(第一次握手)
    服务端收到SYN后, 向客户端发送SYN ACK(第二次握手),  TCP内核将连接放入队列A
    客户端收到后向服务端发送ACK(第三次握手),  TCP内核将连接从A->B, accept返回, 连接完成
    A/B队列的长度和即为BACKLOG, 当accept速度跟不上, A/B队列使得BACKLOG满了, 客户端连接就会被TCP内核拒绝
    可以调大backlog缓解这一现象, 经验值~100

    复制代码
    public class ServerHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("server channel active... ");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                ByteBuf buf = (ByteBuf) msg;
                byte[] req = new byte[buf.readableBytes()];
                buf.readBytes(req);
                String body = new String(req, "utf-8");
                System.out.println("Server :" + body );
                String response = "返回给客户端的响应:" + body ;
                ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
                // future完成后触发监听器, 此处是写完即关闭(短连接). 因此需要关闭连接时, 要通过server端关闭. 直接关闭用方法ctx[.channel()].close()
                //.addListener(ChannelFutureListener.CLOSE);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx)
                throws Exception {
            System.out.println("读完了");
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable t)
                throws Exception {
            ctx.close();
        }
    }
    复制代码
    复制代码
    public class Client {
    
        public static void main(String[] args) throws Exception {
            
            EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap b = new Bootstrap();
            b.group(group)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception { 
                    sc.pipeline().addLast(new ClientHandler());
                }
            });
            
            ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();
            //ChannelFuture cf2 = b.connect("127.0.0.1", 8764).sync();  //可以使用多个端口
            //发送消息, Buffer类型. write需要flush才发送, 可用writeFlush代替
            cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes()));
            cf1.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes()));
            Thread.sleep(2000);
            cf1.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
            //cf2.channel().writeAndFlush(Unpooled.copiedBuffer("999".getBytes()));
            
            cf1.channel().closeFuture().sync();
            //cf2.channel().closeFuture().sync();
            group.shutdownGracefully();
        }
    }
    复制代码
    复制代码
    public class ClientHandler extends ChannelHandlerAdapter{
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                ByteBuf buf = (ByteBuf) msg;
                byte[] req = new byte[buf.readableBytes()];
                buf.readBytes(req);
                String body = new String(req, "utf-8");
                System.out.println("Client :" + body );
            } finally {
                // 记得释放xxxHandler里面的方法的msg参数: 写(write)数据, msg引用将被自动释放不用手动处理; 但只读数据时,!必须手动释放引用数
                 ReferenceCountUtil.release(msg);
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            ctx.close();
        }
    }
    复制代码

    1.2 拆包粘包问题

    TCP/IP确保了包的传送, 包的顺序等, 但编程中还需要解决拆包粘包问题

    -> 接收的一连串包中的数据, 处理的分隔在哪里?  基本解决方案:

    1)特殊字符作为结束分隔符

    2)消息定长. 固定包的长度, 长度不够用空格补全. 接收方需要trim, 效率不高不推荐

    3)自定义协议. 在消息头中包含消息总长度的字段. 需要安全性时可以考虑.

    特殊字符

    复制代码
    public class Server {
    
        public static void main(String[] args) throws Exception {
            EventLoopGroup pGroup = new NioEventLoopGroup();
            EventLoopGroup cGroup = new NioEventLoopGroup();
            
            ServerBootstrap b = new ServerBootstrap();
            b.group(pGroup, cGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 1024)
             .option(ChannelOption.SO_SNDBUF, 32*1024)
             .option(ChannelOption.SO_RCVBUF, 32*1024)
             .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    // 使用DelimiterBasedFrameDecoder设置结尾分隔符$_
                    ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
                    sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
                    // 设置字符串形式的解码.  经过StringDecoder, Handler回调方法中接收的msg的具体类型就是String了(不再是ByteBuffer). 但写时仍需要传入ByteBuffer
                    sc.pipeline().addLast(new StringDecoder());
                    // 通信数据的处理逻辑
                    sc.pipeline().addLast(new ServerHandler());
                }
            });
            //4 绑定连接
            ChannelFuture cf = b.bind(8765).sync();
            
            //等待服务器监听端口关闭
            cf.channel().closeFuture().sync();
            pGroup.shutdownGracefully();
            cGroup.shutdownGracefully();
        }
    }
    复制代码
    复制代码
    public class ServerHandler extends ChannelHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println(" server channel active... ");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("Server :" + msg);
            String response = "服务器响应: " + msg + "$_";
            ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception {
            ctx.close();
        }
    }
    复制代码
    复制代码
    public class Client {
    
        public static void main(String[] args) throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();
            
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
                    sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
                    sc.pipeline().addLast(new StringDecoder()); 
                    sc.pipeline().addLast(new ClientHandler());
                }
            });
            
            ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
            
            cf.channel().writeAndFlush(Unpooled.wrappedBuffer("bbbb$_".getBytes()));
            cf.channel().writeAndFlush(Unpooled.wrappedBuffer("cccc$_".getBytes()));
            
            cf.channel().closeFuture().sync();
            group.shutdownGracefully();
            
        }
    }
    复制代码
    复制代码
    public class ClientHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("client channel active... ");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                String response = (String) msg;
                System.out.println("Client: " + response);
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    复制代码

    定长

    复制代码
    public class Server {
    
        public static void main(String[] args) throws Exception{
            EventLoopGroup pGroup = new NioEventLoopGroup();
            EventLoopGroup cGroup = new NioEventLoopGroup();
            
            ServerBootstrap b = new ServerBootstrap();
            b.group(pGroup, cGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 1024)
             .option(ChannelOption.SO_SNDBUF, 32*1024)
             .option(ChannelOption.SO_RCVBUF, 32*1024)
             .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    //设置定长字符串接收, 定长为5, 积累到5个字节才会把数据发出去
                    sc.pipeline().addLast(new FixedLengthFrameDecoder(5));
                    //设置字符串形式的解码
                    sc.pipeline().addLast(new StringDecoder());
                    sc.pipeline().addLast(new ServerHandler());
                }
            });
            
            ChannelFuture cf = b.bind(8765).sync();
            cf.channel().closeFuture().sync();
            pGroup.shutdownGracefully();
            cGroup.shutdownGracefully();
        }
    }
    复制代码
    复制代码
    public class ServerHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println(" server channel active... ");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String request = (String)msg;
            System.out.println("Server :" + msg);
            String response =  request ;
            ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception {
        }
    }
    复制代码
    复制代码
    public class Client {
    
        public static void main(String[] args) throws Exception {
            
            EventLoopGroup group = new NioEventLoopGroup();
            
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    sc.pipeline().addLast(new FixedLengthFrameDecoder(5)); 
                    sc.pipeline().addLast(new StringDecoder());
                    sc.pipeline().addLast(new ClientHandler());
                }
            });
            
            ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
            
            cf.channel().writeAndFlush(Unpooled.wrappedBuffer("aaa".getBytes()));
            cf.channel().writeAndFlush(Unpooled.copiedBuffer("bbccccc".getBytes()));
            
            cf.channel().closeFuture().sync();
            group.shutdownGracefully();
        }
    }
    复制代码
    复制代码
    public class ClientHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("client channel active... ");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String response = (String) msg;
            System.out.println("Client: " + response);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        }
    }
    复制代码

    1.3 编解码

    即对象序列化技术, 目的是为了实现对象的网络传输和本地持久化
    如果使用java的序列化, 码流较大. 因此多用Marshalling, Kyro(基于Protobuf)

    下面的例子, 使用编解码传输javabean(Marshalling的javabean需要实现serializable), 并将message进行gzip压缩

    自定义编解码器

    复制代码
    public final class MarshallingCodeCFactory {
        // 解码
        public static MarshallingDecoder buildMarshallingDecoder() {
            //创建工厂对象, 参数serial指创建的是java对象序列化的工厂对象
            final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            //创建配置对象,版本号为5 
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            //根据工厂对象和配置对象创建解码provider
            UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
            //创建解码器对象. 第一个参数是provider, 第二个参数是单个消息序列化后的最大长度, 超过后拒绝处理
            MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
            return decoder;
        }
    
        // 编码
        public static MarshallingEncoder buildMarshallingEncoder() {
            final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
            //创建编码器对象. 用于将实现Serializable接口的JavaBean序列化为二进制数组
            MarshallingEncoder encoder = new MarshallingEncoder(provider);
            return encoder;
        }
    }
    复制代码

    javabean

    复制代码
    public class Request implements Serializable {  // 标记Serializable接口
    
        private String id ;
        private String name ;
        private String requestMessage ;
        private byte[] attachment;
        
        public String getId() {
            return id;
        }
        public void setId(String id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public String getRequestMessage() {
            return requestMessage;
        }
        public void setRequestMessage(String requestMessage) {
            this.requestMessage = requestMessage;
        }
        public byte[] getAttachment() {
            return attachment;
        }
        public void setAttachment(byte[] attachment) {
            this.attachment = attachment;
        }
    }
    复制代码
    复制代码
    public class Response implements Serializable { // 标记Serializable接口
        
        private String id;
        private String name;
        private String responseMessage;
        
        public String getId() {
            return id;
        }
        public void setId(String id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public String getResponseMessage() {
            return responseMessage;
        }
        public void setResponseMessage(String responseMessage) {
            this.responseMessage = responseMessage;
        }
    }
    复制代码

    GZip压缩的Util

    复制代码
    public class GzipUtils {
    
        public static byte[] gzip(byte[] data) throws Exception {
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            GZIPOutputStream gzip = new GZIPOutputStream(bos);
            gzip.write(data);
            gzip.finish();
            gzip.close();
            byte[] ret = bos.toByteArray();
            bos.close();
            return ret;
        }
        
        public static byte[] ungzip(byte[] data) throws Exception{
            ByteArrayInputStream bis = new ByteArrayInputStream(data);
            GZIPInputStream gzip = new GZIPInputStream(bis);
            byte[] buf = new byte[1024];
            int num = -1;
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            while((num = gzip.read(buf)) != -1 ){
                bos.write(buf, 0, num);
            }
            gzip.close();
            bis.close();
            byte[] ret = bos.toByteArray();
            bos.close();
            return ret;
        }
    }
    复制代码

    服务端与客户端

    复制代码
    public class Server {
        public static void main(String[] args) throws Exception{
            
            EventLoopGroup pGroup = new NioEventLoopGroup();
            EventLoopGroup cGroup = new NioEventLoopGroup();
            
            ServerBootstrap b = new ServerBootstrap();
            b.group(pGroup, cGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 1024)
             //设置日志
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel sc) throws Exception {
                    // 添加编解码. 发送自定义的类型, 而Handler的方法接收的msg参数的实际类型也是相应的自定义类了
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                    sc.pipeline().addLast(new ServerHandler());
                }
            });
            
            ChannelFuture cf = b.bind(8765).sync();
            cf.channel().closeFuture().sync();
            pGroup.shutdownGracefully();
            cGroup.shutdownGracefully();
        }
    复制代码
    复制代码
    public class ServerHandler extends ChannelHandlerAdapter{
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Request req = (Request)msg;
            System.out.println("Server : " + req.getId() + ", " + req.getName() + ", " + req.getRequestMessage());
            byte[] attachment = GzipUtils.ungzip(req.getAttachment());
            
            String path = System.getProperty("user.dir") + File.separatorChar + "receive" +  File.separatorChar + "001.jpg";
            FileOutputStream fos = new FileOutputStream(path);
            fos.write(attachment);
            fos.close();
            
            Response resp = new Response();
            resp.setId(req.getId());
            resp.setName("resp" + req.getId());
            resp.setResponseMessage("响应内容" + req.getId());
            ctx.writeAndFlush(resp);//.addListener(ChannelFutureListener.CLOSE);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    复制代码
    复制代码
    public class Client {
        public static void main(String[] args) throws Exception{
            
            EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                    sc.pipeline().addLast(new ClientHandler());
                }
            });
            
            ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
            
            for(int i = 0; i < 5; i++){
                Request req = new Request();
                req.setId("" + i);
                req.setName("req" + i);
                req.setRequestMessage("数据信息" + i);    
                String path = System.getProperty("user.dir") + File.separatorChar + "sources" +  File.separatorChar + "001.jpg";
                File file = new File(path);
                FileInputStream in = new FileInputStream(file);  
                byte[] data = new byte[in.available()];  
                in.read(data);  
                in.close(); 
                req.setAttachment(GzipUtils.gzip(data)); //压缩
                cf.channel().writeAndFlush(req);
            }
    
            cf.channel().closeFuture().sync();
            group.shutdownGracefully();
        }
    }
    复制代码
    复制代码
    public class ClientHandler extends ChannelHandlerAdapter{
        
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                Response resp = (Response) msg;
                System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());            
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    复制代码

    1.4 长连接/短连接

    1.长连接, 一致保持着连接不主动中断, 实时性强
    2.短连接. 数据放在缓存, 一次性批量提交所有数据, 服务端接收后即关闭连接
    以上两种根据是否给ChannelHandlerContext添加ChannelFutureListener.ClOSE监听器实现

    3.长连接, 一定时间不活跃则关闭连接. 给SocketChannel添加ReadTimeoutHandler实现. 实例如下:

    复制代码
    public final class MarshallingCodeCFactory {
        public static MarshallingDecoder buildMarshallingDecoder() {
            final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
            MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
            return decoder;
        }
        public static MarshallingEncoder buildMarshallingEncoder() {
            final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
            MarshallingEncoder encoder = new MarshallingEncoder(provider);
            return encoder;
        }
    }
    复制代码
    复制代码
    public class Request implements Serializable{
        private String id ;
        private String name ;
        private String requestMessage ;
        
        public String getId() {
            return id;
        }
        public void setId(String id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public String getRequestMessage() {
            return requestMessage;
        }
        public void setRequestMessage(String requestMessage) {
            this.requestMessage = requestMessage;
        }
    }
    复制代码
    复制代码
    public class Response implements Serializable{
        private String id;
        private String name;
        private String responseMessage;
        
        public String getId() {
            return id;
        }
        public void setId(String id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public String getResponseMessage() {
            return responseMessage;
        }
        public void setResponseMessage(String responseMessage) {
            this.responseMessage = responseMessage;
        }
    }
    复制代码
    复制代码
    public class Server {
    
        public static void main(String[] args) throws Exception{
            
            EventLoopGroup pGroup = new NioEventLoopGroup();
            EventLoopGroup cGroup = new NioEventLoopGroup();
            
            ServerBootstrap b = new ServerBootstrap();
            b.group(pGroup, cGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 1024)
             //设置日志
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel sc) throws Exception {
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                    sc.pipeline().addLast(new ReadTimeoutHandler(5));  // 时限, 读客户端超时没数据则断开
                    sc.pipeline().addLast(new ServerHandler());
                }
            });
            
            ChannelFuture cf = b.bind(8765).sync();
            cf.channel().closeFuture().sync();
            pGroup.shutdownGracefully();
            cGroup.shutdownGracefully();
        }
    }
    复制代码
    复制代码
    public class ServerHandler extends ChannelHandlerAdapter{
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Request request = (Request) msg;
            System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage());
            Response response = new Response();
            response.setId(request.getId());
            response.setName("response" + request.getId());
            response.setResponseMessage("响应内容" + request.getId());
            ctx.writeAndFlush(response);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    复制代码
    复制代码
    public class Client {
        private EventLoopGroup group;
        private Bootstrap b;
        private ChannelFuture cf ;
    
        // 单例
        private static class SingletonHolder { 
            static final Client instance = new Client();
        }
        public static Client getInstance(){
            return SingletonHolder.instance;
        }
        
        private Client(){
                group = new NioEventLoopGroup();
                b = new Bootstrap();
                b.group(group)
                 .channel(NioSocketChannel.class)
                 .handler(new LoggingHandler(LogLevel.INFO))
                 .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                            sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                            //超时handler(当服务器端与客户端在指定时间以上没有任何进行通信,则会关闭通道)
                            sc.pipeline().addLast(new ReadTimeoutHandler(5));   // 时限5s, 读服务端超时没数据则断开
                            sc.pipeline().addLast(new ClientHandler());
                        }
                });
        }
        
        public void connect(){
            try {
                this.cf = b.connect("127.0.0.1", 8765).sync();
                System.out.println("远程服务器已经连接, 可以进行数据交换");                
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        
        public ChannelFuture getChannelFuture(){
            if(this.cf == null) {   //初次连接
                this.connect();
            }
            if(!this.cf.channel().isActive()){  //重连
                this.connect();
            }
            return this.cf;
        }
        
        public static void main(String[] args) throws Exception{
            final Client c = Client.getInstance();
            
            ChannelFuture cf = c.getChannelFuture();
            for(int i = 1; i <= 3; i++ ){
                Request request = new Request();
                request.setId("" + i);
                request.setName("request" + i);
                request.setRequestMessage("数据信息" + i);
                cf.channel().writeAndFlush(request);
                TimeUnit.SECONDS.sleep(4);  //间隔4s发送一次数据
            }
    
            cf.channel().closeFuture().sync(); //阻塞至超时关闭
            
            // 这里用子线程重连并发送数据一次
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("进入子线程重连一次");
                        ChannelFuture cf = c.getChannelFuture();
                        assert true == cf.channel().isActive(); //断言
                        //再次发送数据
                        Request request = new Request();
                        request.setId("" + 4);
                        request.setName("request" + 4);
                        request.setRequestMessage("数据信息" + 4);
                        cf.channel().writeAndFlush(request);                    
                        cf.channel().closeFuture().sync();
                        System.out.println("子线程完成");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
            
            System.out.println("断开连接,主线程结束..");
        }
        
    }
    复制代码
    复制代码
    public class ClientHandler extends ChannelHandlerAdapter{
        
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                Response resp = (Response) msg;
                System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());            
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    复制代码

    1.5 使用UDP (较少使用)

    复制代码
    public class Server {
        public void run(int port) throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioDatagramChannel.class)  // UDP: NioDatagramChannel
                    .option(ChannelOption.SO_BROADCAST, true) // 广播
                    .handler(new ServerHandler());
                b.bind(port).sync().channel().closeFuture().await();
            } finally {
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            new Server().run(8765);
        }
    }
    复制代码
    复制代码
    public class ServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
    
        // 谚语列表
        private static final String[] DICTIONARY = { 
            "只要功夫深,铁棒磨成针。",
            "旧时王谢堂前燕,飞入寻常百姓家。", 
            "洛阳亲友如相问,一片冰心在玉壶。",
            "一寸光阴一寸金,寸金难买寸光阴。",
            "老骥伏枥,志在千里。烈士暮年,壮心不已!"
        };
    
        private String nextQuote() {
            int quoteId = ThreadLocalRandom.current().nextInt(DICTIONARY.length);
            return DICTIONARY[quoteId];
        }
    
        @Override
        public void messageReceived(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
            String req = packet.content().toString(CharsetUtil.UTF_8);
            System.out.println(req);
            if ("谚语字典查询?".equals(req)) {
                ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("谚语查询结果: " + nextQuote(), CharsetUtil.UTF_8), packet.sender()));
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
            ctx.close();
            cause.printStackTrace();
        }
    }
    复制代码
    复制代码
    public class Client {
    
        public void run(int port) throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioDatagramChannel.class)
                    .option(ChannelOption.SO_BROADCAST, true)
                    .handler(new ClientHandler());
                Channel ch = b.bind(0).sync().channel();
                // 向网段内的所有机器广播UDP消息
                ch.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("谚语字典查询?", CharsetUtil.UTF_8), new InetSocketAddress("255.255.255.255", port))).sync();
                if (!ch.closeFuture().await(15000)) {
                    System.out.println("查询超时!");
                }
            } finally {
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            new Client().run(8765);
        }
    }
    复制代码
    复制代码
    public class ClientHandler extends SimpleChannelInboundHandler<DatagramPacket> {
    
        @Override
        public void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
            String response = msg.content().toString(CharsetUtil.UTF_8);
            if (response.startsWith("谚语查询结果: ")) {
                System.out.println(response);
                ctx.close();
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    复制代码

    2. 心跳检测

    集群中主服务器需要知道从服务器的状态
    因此client每隔5~10秒给server发送心跳包

    可通过netty与定时任务来实现

    复制代码
    public final class MarshallingCodeCFactory {
        public static MarshallingDecoder buildMarshallingDecoder() {
            final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
            MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
            return decoder;
        }
        public static MarshallingEncoder buildMarshallingEncoder() {
            final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
            MarshallingEncoder encoder = new MarshallingEncoder(provider);
            return encoder;
        }
    }
    复制代码
    复制代码
    public class RequestInfo implements Serializable {
        private String ip ;
        private HashMap<String, Object> cpuPercMap ;
        private HashMap<String, Object> memoryMap;
        
        public String getIp() {
            return ip;
        }
        public void setIp(String ip) {
            this.ip = ip;
        }
        public HashMap<String, Object> getCpuPercMap() {
            return cpuPercMap;
        }
        public void setCpuPercMap(HashMap<String, Object> cpuPercMap) {
            this.cpuPercMap = cpuPercMap;
        }
        public HashMap<String, Object> getMemoryMap() {
            return memoryMap;
        }
        public void setMemoryMap(HashMap<String, Object> memoryMap) {
            this.memoryMap = memoryMap;
        }
    }
    复制代码
    复制代码
    public class Server {
    
        public static void main(String[] args) throws Exception{
            
            EventLoopGroup pGroup = new NioEventLoopGroup();
            EventLoopGroup cGroup = new NioEventLoopGroup();
            
            ServerBootstrap b = new ServerBootstrap();
            b.group(pGroup, cGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 1024)
             //设置日志
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel sc) throws Exception {
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                    sc.pipeline().addLast(new ServerHeartBeatHandler());
                }
            });
            
            ChannelFuture cf = b.bind(8765).sync();
            cf.channel().closeFuture().sync();
            pGroup.shutdownGracefully();
            cGroup.shutdownGracefully();
        }
    }
    复制代码
    复制代码
    public class ServerHeartBeatHandler extends ChannelHandlerAdapter {
        
        private static HashMap<String, String> AUTH_IP_MAP = new HashMap<String, String>();
        private static final String SUCCESS_KEY = "auth_success_key";
        
        static {
            AUTH_IP_MAP.put("127.0.0.1", "1234");
        }
        
        private boolean auth(ChannelHandlerContext ctx, Object msg){
            String [] ret = ((String) msg).split(",");
            String auth = AUTH_IP_MAP.get(ret[0]);
            if(auth != null && auth.equals(ret[1])){
                // 认证成功, 返回确认信息
                ctx.writeAndFlush(SUCCESS_KEY);
                return true;
            } else {
                ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE);
                return false;
            }
        }
        
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if(msg instanceof String){
                auth(ctx, msg);
            } else if (msg instanceof RequestInfo) {
                RequestInfo info = (RequestInfo) msg;
                System.out.println("--------------------------------------------");
                System.out.println("当前主机ip为: " + info.getIp());
                System.out.println("当前主机cpu情况: ");
                HashMap<String, Object> cpu = info.getCpuPercMap();
                System.out.println("总使用率: " + cpu.get("combined"));
                System.out.println("用户使用率: " + cpu.get("user"));
                System.out.println("系统使用率: " + cpu.get("sys"));
                System.out.println("等待率: " + cpu.get("wait"));
                System.out.println("空闲率: " + cpu.get("idle"));
                
                System.out.println("当前主机memory情况: ");
                HashMap<String, Object> memory = info.getMemoryMap();
                System.out.println("内存总量: " + memory.get("total"));
                System.out.println("当前内存使用量: " + memory.get("used"));
                System.out.println("当前内存剩余量: " + memory.get("free"));
                System.out.println("--------------------------------------------");
                
                ctx.writeAndFlush("info received!");
            } else {
                ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE);
            }
        }
    }
    复制代码
    复制代码
    public class Client {
        public static void main(String[] args) throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                    sc.pipeline().addLast(new ClienHeartBeatHandler());
                }
            });
            
            ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
            cf.channel().closeFuture().sync();
            group.shutdownGracefully();
        }
    }
    复制代码
    复制代码
    public class ClienHeartBeatHandler extends ChannelHandlerAdapter {
    
        private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        private ScheduledFuture<?> heartBeat;  //定时任务
        //主动向服务器发送认证信息
        private InetAddress addr ;
        private static final String SUCCESS_KEY = "auth_success_key";
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            addr = InetAddress.getLocalHost();
            //String ip = addr.getHostAddress();
            String ip = "127.0.0.1";
            String key = "1234";
            //证书
            String auth = ip + "," + key;
            // 发送认证
            ctx.writeAndFlush(auth);
        }
        
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                if(msg instanceof String){
                    String ret = (String) msg;
                    if(SUCCESS_KEY.equals(ret)){
                        // 收到认证 确认信息,设置每隔5秒发送心跳消息
                        this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 5, TimeUnit.SECONDS);
                        System.out.println(msg);                
                    } else {  
                        // 收到心跳包 确认信息
                        System.out.println(msg);
                    }
                }
            } finally {
                // 只读, 需要手动释放引用计数
                ReferenceCountUtil.release(msg);
            }
        }
    
        private class HeartBeatTask implements Runnable {
            private final ChannelHandlerContext ctx;
            public HeartBeatTask(final ChannelHandlerContext ctx) {
                this.ctx = ctx;
            }
            @Override
            public void run() {
                try {
                    RequestInfo info = new RequestInfo();
                    //ip
                    info.setIp(addr.getHostAddress());
                    Sigar sigar = new Sigar();
                    //cpu prec
                    CpuPerc cpuPerc = sigar.getCpuPerc();
                    HashMap<String, Object> cpuPercMap = new HashMap<String, Object>();
                    cpuPercMap.put("combined", cpuPerc.getCombined());
                    cpuPercMap.put("user", cpuPerc.getUser());
                    cpuPercMap.put("sys", cpuPerc.getSys());
                    cpuPercMap.put("wait", cpuPerc.getWait());
                    cpuPercMap.put("idle", cpuPerc.getIdle());
                    // memory
                    Mem mem = sigar.getMem();
                    HashMap<String, Object> memoryMap = new HashMap<String, Object>();
                    memoryMap.put("total", mem.getTotal() / 1024L);
                    memoryMap.put("used", mem.getUsed() / 1024L);
                    memoryMap.put("free", mem.getFree() / 1024L);
                    info.setCpuPercMap(cpuPercMap);
                    info.setMemoryMap(memoryMap);
                    
                    ctx.writeAndFlush(info);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                cause.printStackTrace();
                // 取消定时发送心跳包的任务
                if (heartBeat != null) {
                    heartBeat.cancel(true);
                    heartBeat = null;
                }
                ctx.fireExceptionCaught(cause);
            }
        }
    }
    复制代码

    3. HTTP

    3.1 Hello World

    复制代码
    public final class HttpHelloWorldServer {
      
          static final boolean SSL = System.getProperty("ssl") != null;
          static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8080"));
      
          public static void main(String[] args) throws Exception {
              final SslContext sslCtx;
              if (SSL) {
                  SelfSignedCertificate ssc = new SelfSignedCertificate();
                  sslCtx = SslContext.newServerContext(ssc.certificate(), ssc.privateKey());
              } else {
                  sslCtx = null;
              }
      
              EventLoopGroup bossGroup = new NioEventLoopGroup();
              EventLoopGroup workerGroup = new NioEventLoopGroup();
              try {
                  ServerBootstrap b = new ServerBootstrap();
                  b.option(ChannelOption.SO_BACKLOG, 1024);
                  b.group(bossGroup, workerGroup)
                   .channel(NioServerSocketChannel.class)
                   .handler(new LoggingHandler(LogLevel.INFO))
                   .childHandler(new HttpHelloWorldServerInitializer(sslCtx));
      
                  Channel ch = b.bind(PORT).sync().channel();
      
                  System.err.println("Open your web browser and navigate to " +
                          (SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');
      
                  ch.closeFuture().sync();
              } finally {
                  bossGroup.shutdownGracefully();
                  workerGroup.shutdownGracefully();
              }
          }
    }
    复制代码
    复制代码
    public class HttpHelloWorldServerInitializer extends ChannelInitializer<SocketChannel> {
    
        private final SslContext sslCtx;
    
        public HttpHelloWorldServerInitializer(SslContext sslCtx) {
            this.sslCtx = sslCtx;
        }
    
        @Override
        public void initChannel(SocketChannel ch) {
            ChannelPipeline p = ch.pipeline();
            if (sslCtx != null) {
                p.addLast(sslCtx.newHandler(ch.alloc()));
            }
            p.addLast(new HttpServerCodec());   // !使用http通信, HttpRequest和HttpResponse
            p.addLast(new HttpHelloWorldServerHandler());
        }
    }
    复制代码
    复制代码
    public class HttpHelloWorldServerHandler extends ChannelHandlerAdapter {
        private static final byte[] CONTENT = "HELLO WORLD".getBytes();
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
            ctx.flush();
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            if (msg instanceof HttpRequest) {
                HttpRequest req = (HttpRequest) msg;
    
                if (HttpHeaderUtil.is100ContinueExpected(req)) {
                    ctx.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE));
                }
                boolean keepAlive = HttpHeaderUtil.isKeepAlive(req);
                // 构造响应
                FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(CONTENT));
                response.headers().set(CONTENT_TYPE, "text/plain;charset=UTF-8");
                response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes());
    
                if (!keepAlive) {
                    // Request短连接, 写完后直接关闭
                    ctx.write(response).addListener(ChannelFutureListener.CLOSE);
                } else {
                    // 长连接, response也设置为KEEP_ALIVE
                    response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
                    ctx.write(response);
                }
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }
    复制代码

    3.2 HTTP下载文件

    复制代码
    public class HttpDownloadServer {
    
        private static final String DEFAULT_URL = "/sources/";
    
        public void run(final int port, final String url) throws Exception {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // addLast的第一项为key, 自定义的
                        // request解码器
                        ch.pipeline().addLast("http-decoder", new HttpRequestDecoder());
                        // response的编码器
                        ch.pipeline().addLast("http-encoder", new HttpResponseEncoder());
                        // chunked, 传输文件时分多个response分解地传输文件
                        ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
                        // ObjectAggregator, 将多个response合并为一个FullHttpResponse
                        ch.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65536));
                        // 自定义业务逻辑handler
                        ch.pipeline().addLast("fileServerHandler", new HttpDownoadServerHandler(url));
                    }
                    });
                ChannelFuture future = b.bind("127.0.0.1", port).sync();
                System.out.println("HTTP文件目录服务器启动,网址是 : " + "http://localhost:"  + port + url);
                future.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port = 8765;
            new HttpDownloadServer().run(port, DEFAULT_URL);
        }
    }
    复制代码
    复制代码
    // 注意这里继承了SimpleChannelInboundHandler<T>, 含泛型, 即指定了传入参数msg的类型
    public class HttpDownoadServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
        
        private final String url;
    
        public HttpDownoadServerHandler(String url) {
            this.url = url;
        }
    
        @Override
        public void messageReceived(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
            //是否能理解(解码)请求
            if (!request.decoderResult().isSuccess()) {
                // 400
                sendError(ctx, BAD_REQUEST);
                return;
            }
            //对请求的方法进行判断:如果不是GET方法则返回异常
            if (request.method() != GET) {
                // 405
                sendError(ctx, METHOD_NOT_ALLOWED);
                return;
            }
            //获取请求uri路径
            final String uri = request.uri();
            //对url进行分析,返回本地路径
            final String path = parseURI(uri);
            //如果 路径构造不合法,则path为null
            if (path == null) {
                //403
                sendError(ctx, FORBIDDEN);
                return;
            }
            
            // 创建file对象
            File file = new File(path);
            // 文件隐藏或不存在
            if (file.isHidden() || !file.exists()) {
                // 404 
                sendError(ctx, NOT_FOUND);
                return;
            }
            // 是文件夹
            if (file.isDirectory()) {
                if (uri.endsWith("/")) {
                    //如果以正常"/"结束 说明是访问的一个文件目录:则进行展示文件列表
                    sendListing(ctx, file);
                } else {
                    //如果非"/"结束 则重定向,让客户端补全"/"并再次请求
                    sendRedirect(ctx, uri + '/');
                }
                return;
            }
            // 如果所创建的file对象不是文件类型
            if (!file.isFile()) {
                // 403
                sendError(ctx, FORBIDDEN);
                return;
            }
            
            //随机文件读写对象
            RandomAccessFile randomAccessFile = null;
            try {
                randomAccessFile = new RandomAccessFile(file, "r");// 以只读的方式打开文件
            } catch (FileNotFoundException fnfe) {
                // 404
                sendError(ctx, NOT_FOUND);
                return;
            }
            
            //获取文件长度
            long fileLength = randomAccessFile.length();
            //建立响应对象
            HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
            //设置响应信息
            HttpHeaderUtil.setContentLength(response, fileLength);
            //设置Content-Type
            setContentTypeHeader(response, file);
            //设置为KeepAlive
            if (HttpHeaderUtil.isKeepAlive(request)) {
                response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
            }
            //输出response header, HttpObjectAggregator能将其与下面输出整合合并
            ctx.write(response);
            
            //写出ChunkedFile. 创建ChunkedFile需要使用RandomAccessFile并设置分段. 这里每次传输8192个字节
            ChannelFuture sendFileFuture = ctx.write(new ChunkedFile(randomAccessFile, 0, fileLength, 8192), ctx.newProgressivePromise());
            //添加传输监听
            sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
                @Override
                public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
                    if (total < 0) { 
                        System.err.println("Transfer progress: " + progress);
                    } else {
                        System.err.println("Transfer progress: " + progress + " / " + total);
                    }
                }
                @Override
                public void operationComplete(ChannelProgressiveFuture future) throws Exception {
                    System.out.println("Transfer complete.");
                }
            });
            
            //使用Chunked, 完成时需要发送标记结束的空消息体!
            ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
            //如果当前连接请求非Keep-Alive, 最后一包消息发送完后, 服务器主动关闭连接
            if (!HttpHeaderUtil.isKeepAlive(request)) {
                lastContentFuture.addListener(ChannelFutureListener.CLOSE);
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            if (ctx.channel().isActive()) {
               // 500
                sendError(ctx, INTERNAL_SERVER_ERROR);
                ctx.close();
            }
        }
    
        //判断非法URI的正则
        private static final Pattern INSECURE_URI = Pattern.compile(".*[<>&"].*");
        private String parseURI(String uri) {
            try {
                //使用UTF-8字符集
                uri = URLDecoder.decode(uri, "UTF-8");
            } catch (UnsupportedEncodingException e) {
                try {
                    //尝试ISO-8859-1
                    uri = URLDecoder.decode(uri, "ISO-8859-1");
                } catch (UnsupportedEncodingException e1) {
                    //抛出预想外异常信息
                    throw new Error();
                }
            }
            // 对uri进行细粒度判断:4步验证操作
            // step 1 基础验证
            if (!uri.startsWith(url)) {
                return null;
            }
            // step 2 基础验证
            if (!uri.startsWith("/")) {
                return null;
            }
            // step 3 将文件分隔符替换为本地操作系统的文件路径分隔符
            uri = uri.replace('/', File.separatorChar);
            // step 4 验证路径合法性
            if (uri.contains(File.separator + '.') || uri.contains('.' + File.separator) || 
                    uri.startsWith(".") || uri.endsWith(".") || 
                    INSECURE_URI.matcher(uri).matches()) {
                return null;
            }
            //利用当前工程所在目录 + URI相对路径 构造绝对路径 
            return System.getProperty("user.dir") + File.separator + uri;
        }
        
        //用正则表达式过滤文件名
        private static final Pattern ALLOWED_FILE_NAME = Pattern.compile("[A-Za-z0-9][-_A-Za-z0-9\.]*");
        //文件列表, 拼html文件
        private static void sendListing(ChannelHandlerContext ctx, File dir) {
            // 设置响应对象
            FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK);
            // 响应头
            response.headers().set(CONTENT_TYPE, "text/html; charset=UTF-8");
            // 构造文本内容
            StringBuilder ret = new StringBuilder();
            String dirPath = dir.getPath();
            ret.append("<!DOCTYPE html>
    ");
            ret.append("<html><head><title>");
            ret.append(dirPath);
            ret.append(" 目录:");
            ret.append("</title></head><body>
    ");
            ret.append("<h3>");
            ret.append(dirPath).append(" 目录:");
            ret.append("</h3>
    ");
            ret.append("<ul>");
            ret.append("<li>链接:<a href="../">..</a></li>
    ");
            
            // 遍历文件, 生成超链接
            for (File f : dir.listFiles()) {
                //step 1: 跳过隐藏文件和不可读文件 
                if (f.isHidden() || !f.canRead()) {
                    continue;
                }
                String name = f.getName();
                //step 2: 跳过正则过滤的文件名
                if (!ALLOWED_FILE_NAME.matcher(name).matches()) {
                    continue;
                }
                ret.append("<li>链接:<a href="");
                ret.append(name);
                ret.append("">");
                ret.append(name);
                ret.append("</a></li>
    ");
            }
            ret.append("</ul></body></html>
    ");
            //构造ByteBuf,写入缓冲区
            ByteBuf buffer = Unpooled.copiedBuffer(ret, CharsetUtil.UTF_8);
            //进行写出操作
            response.content().writeBytes(buffer);
            //重置ByteBuf
            buffer.release();
            //发送完成并主动关闭连接
            ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
        }
    
        //重定向操作
        private static void sendRedirect(ChannelHandlerContext ctx, String newUri) {
            FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, FOUND);
            response.headers().set(LOCATION, newUri);
            ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
        }
    
        //错误信息
        private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
            FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status.toString()+ "
    ", CharsetUtil.UTF_8));
            response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
            ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
        }
    
        private static void setContentTypeHeader(HttpResponse response, File file) {
            //使用mime对象获取文件对应的Content-Type
            MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap();
            response.headers().set(CONTENT_TYPE, mimeTypesMap.getContentType(file.getPath()));
        }
    }
    复制代码

    3.3 HTTP上传文件 (较少使用)

    实际应用中文件上传服务端有成熟的框架fastDFS(小文件)和HDFS(大文件)

    如要实现断点续传, 需要记录上传进度. 参考HTTP头的Range和Content-Range

    复制代码
    public final class HttpUploadServer {
    
        static final boolean SSL = System.getProperty("ssl") != null;
        static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8080"));
    
        public static void main(String[] args) throws Exception {
            // Configure SSL.
            final SslContext sslCtx;
            if (SSL) {
                SelfSignedCertificate ssc = new SelfSignedCertificate();
                sslCtx = SslContext.newServerContext(ssc.certificate(), ssc.privateKey());
            } else {
                sslCtx = null;
            }
    
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup);
                b.channel(NioServerSocketChannel.class);
                b.handler(new LoggingHandler(LogLevel.INFO));
                b.childHandler(new HttpUploadServerInitializer(sslCtx));
    
                Channel ch = b.bind(PORT).sync().channel();
                System.err.println("Open your web browser and navigate to " + (SSL ? "https" : "http") + "://127.0.0.1:" + PORT + '/');
    
                ch.closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    复制代码
    复制代码
    public class HttpUploadServerInitializer extends ChannelInitializer<SocketChannel> {
    
        private final SslContext sslCtx;
    
        public HttpUploadServerInitializer(SslContext sslCtx) {
            this.sslCtx = sslCtx;
        }
    
        @Override
        public void initChannel(SocketChannel ch) {
            ChannelPipeline pipeline = ch.pipeline();
            if (sslCtx != null) {
                pipeline.addLast(sslCtx.newHandler(ch.alloc()));
            }
            pipeline.addLast(new HttpRequestDecoder());
            pipeline.addLast(new HttpResponseEncoder());
            // 压缩
            pipeline.addLast(new HttpContentCompressor());
            pipeline.addLast(new HttpUploadServerHandler());
        }
    }
    复制代码
    复制代码
    public class HttpUploadServerHandler extends SimpleChannelInboundHandler<HttpObject> {
    
        private static final Logger logger = Logger.getLogger(HttpUploadServerHandler.class.getName());
    
        private HttpRequest request;
    
        private boolean readingChunks;
    
        private final StringBuilder responseContent = new StringBuilder();
    
        private static final HttpDataFactory factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE); // 大小超过minsize放磁盘上
    
        private HttpPostRequestDecoder decoder;
        
        static {
            DiskFileUpload.deleteOnExitTemporaryFile = true; //退出时是否删除临时文件
            DiskFileUpload.baseDirectory = "D:" + File.separatorChar + "aa";  //文件存储路径
            
            DiskAttribute.deleteOnExitTemporaryFile = true; //退出时是否删除临时文件
            DiskAttribute.baseDirectory = "D:" + File.separatorChar + "aa"; //文件存储路径
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            if (decoder != null) {
                decoder.cleanFiles();
            }
        }
    
        @Override
        public void messageReceived(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
            if (msg instanceof HttpRequest) {           // HttpRequest传输头
                HttpRequest request = this.request = (HttpRequest) msg;
                URI uri = new URI(request.uri());
                if (!uri.getPath().startsWith("/form")) {
                    // 返回上传菜单
                    writeMenu(ctx);
                    return;
                }
                // 拼接反馈内容
                responseContent.setLength(0);
                responseContent.append("WELCOME TO THE WILD WILD WEB SERVER
    ");
                responseContent.append("===================================
    ");
                responseContent.append("VERSION: " + request.protocolVersion().text() + "
    ");
                responseContent.append("REQUEST_URI: " + request.uri() + "
    
    ");
                responseContent.append("
    
    ");
    
                for (Entry<CharSequence, CharSequence> entry : request.headers()) {
                    responseContent.append("HEADER: " + entry.getKey() + '=' + entry.getValue() + "
    ");
                }
                responseContent.append("
    
    ");
    
                Set<Cookie> cookies = null;
                String value = request.headers().getAndConvert(HttpHeaderNames.COOKIE);
                if (value == null) {
                    cookies = Collections.emptySet();
                } else {
                    cookies = ServerCookieDecoder.decode(value);
                }
                for (Cookie cookie : cookies) {
                    responseContent.append("COOKIE: " + cookie + "
    ");
                }
                responseContent.append("
    
    ");
    
                QueryStringDecoder decoderQuery = new QueryStringDecoder(request.uri());
                Map<String, List<String>> uriAttributes = decoderQuery.parameters();
                for (Entry<String, List<String>> attr: uriAttributes.entrySet()) {
                    for (String attrVal: attr.getValue()) {
                        responseContent.append("URI: " + attr.getKey() + '=' + attrVal + "
    ");
                    }
                }
                responseContent.append("
    
    ");
    
                // GET方法, 就此return
                if (request.method().equals(HttpMethod.GET)) {
                    responseContent.append("
    
    END OF GET CONTENT
    ");
                    return;
                }
                
                // POST方法
                try {
                    decoder = new HttpPostRequestDecoder(factory, request);
                } catch (ErrorDataDecoderException e1) {
                    e1.printStackTrace();
                    responseContent.append(e1.getMessage());
                    writeResponse(ctx.channel());
                    ctx.channel().close();
                    return;
                }
    
                readingChunks = HttpHeaderUtil.isTransferEncodingChunked(request);
                responseContent.append("Is Chunked: " + readingChunks + "
    ");
                responseContent.append("IsMultipart: " + decoder.isMultipart() + "
    ");
                if (readingChunks) {
                    responseContent.append("Chunks: ");
                }
            }
    
            if (decoder != null) {
                if (msg instanceof HttpContent) {    //HttpContent具体传输的内容
                    // 读取到一个chunk
                    HttpContent chunk = (HttpContent) msg;
                    try {
                        decoder.offer(chunk);
                    } catch (ErrorDataDecoderException e1) {
                        e1.printStackTrace();
                        responseContent.append(e1.getMessage());
                        writeResponse(ctx.channel());
                        ctx.channel().close();
                        return;
                    }
                    responseContent.append('o'); //每读一个chunk标记一个'o'
                    readHttpDataChunkByChunk();
                    // 最后一块chunk
                    if (chunk instanceof LastHttpContent) {
                        writeResponse(ctx.channel());
                        readingChunks = false;
                        reset();
                    }
                }
            } else {
                writeResponse(ctx.channel());
            }
        }
    
        private void reset() {
            request = null;
            decoder.destroy(); //释放资源
            decoder = null;
        }
    
        private void readHttpDataChunkByChunk() throws Exception {
            try {
                while (decoder.hasNext()) {
                    InterfaceHttpData data = decoder.next();
                    if (data != null) {
                        try {
                            writeHttpData(data);
                        } finally {
                            data.release();
                        }
                    }
                }
            } catch (EndOfDataDecoderException e1) {
                responseContent.append("
    
    END OF CONTENT CHUNK BY CHUNK
    
    ");
            }
        }
    
        private void writeHttpData(InterfaceHttpData data) throws Exception {
            if (data.getHttpDataType() == HttpDataType.Attribute) {
                Attribute attribute = (Attribute) data;
                String value = null;
                try {
                    value = attribute.getValue();
                } catch (IOException e1) {
                    e1.printStackTrace();
                    responseContent.append("
    BODY Attribute: " + attribute.getHttpDataType().name() + ": " + attribute.getName() + " Error while reading value: " + e1.getMessage() + "
    ");
                    return;
                }
                if (value.length() > 100) {
                    responseContent.append("
    BODY Attribute: " + attribute.getHttpDataType().name() + ": " + attribute.getName() + " data too long
    ");
                } else {
                    responseContent.append("
    BODY Attribute: " + attribute.getHttpDataType().name() + ": " + attribute + "
    ");
                }
            } else {
                responseContent.append("
     -----------start-------------" + "
    ");
                responseContent.append("
    BODY FileUpload: " + data.getHttpDataType().name() + ": " + data + "
    ");
                responseContent.append("
     ------------end------------" + "
    ");
                if (data.getHttpDataType() == HttpDataType.FileUpload) {
                    FileUpload fileUpload = (FileUpload) data;
                    if (fileUpload.isCompleted()) {
                        System.out.println("file name : " + fileUpload.getFilename());
                        System.out.println("file length: " + fileUpload.length());
                        System.out.println("file maxSize : " + fileUpload.getMaxSize());
                        System.out.println("file path :" + fileUpload.getFile().getPath());
                        System.out.println("file absolutepath :" + fileUpload.getFile().getAbsolutePath());
                        System.out.println("parent path :" + fileUpload.getFile().getParentFile());
                        
                        if (fileUpload.length() < 1024 * 1024 * 10) {
                            responseContent.append("	Content of file
    ");
                            try {
                                responseContent.append(fileUpload.getString(fileUpload.getCharset()));
                            } catch (Exception e1) {
                                e1.printStackTrace();
                            }
                            responseContent.append("
    ");
                        } else {
                            responseContent.append("	File too long to be printed out:" + fileUpload.length() + "
    ");
                        }
                        fileUpload.renameTo(new File(fileUpload.getFile().getPath())); // 核心操作, 写文件
                        decoder.removeHttpDataFromClean(fileUpload); 
                    } else {
                        responseContent.append("	File to be continued but should not!
    ");
                    }
                }
            }
        }
    
        private void writeResponse(Channel channel) {
            ByteBuf buf = copiedBuffer(responseContent.toString(), CharsetUtil.UTF_8);
            responseContent.setLength(0);
    
            // 是否是短连接
            boolean close = request.headers().contains(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE, true)
                    || request.protocolVersion().equals(HttpVersion.HTTP_1_0)
                    && !request.headers().contains(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE, true);
    
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buf);
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
            // 最后一次连接不需要Content-Length
            if (!close) {
                response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, buf.readableBytes());
            }
    
            Set<Cookie> cookies = null;
            String value = request.headers().getAndConvert(HttpHeaderNames.COOKIE);
            if (value == null) {
                cookies = Collections.emptySet();
            } else {
                cookies = ServerCookieDecoder.decode(value);
            }
            if (!cookies.isEmpty()) {
                for (Cookie cookie : cookies) {
                    response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.encode(cookie));
                }
            }
            ChannelFuture future = channel.writeAndFlush(response);
            if (close) {
                future.addListener(ChannelFutureListener.CLOSE);
            }
        }
        
        //拼接上传页html菜单
        private void writeMenu(ChannelHandlerContext ctx) {
            responseContent.setLength(0);
    
            // create Pseudo Menu
            responseContent.append("<html>");
            responseContent.append("<head>");
            responseContent.append("<title>Netty Test Form</title>
    ");
            responseContent.append("</head>
    ");
            responseContent.append("<body bgcolor=white><style>td{font-size: 12pt;}</style>");
    
            responseContent.append("<table border="0">");
            responseContent.append("<tr>");
            responseContent.append("<td>");
            responseContent.append("<h1>Netty Test Form</h1>");
            responseContent.append("Choose one FORM");
            responseContent.append("</td>");
            responseContent.append("</tr>");
            responseContent.append("</table>
    ");
    
            // GET
            responseContent.append("<CENTER>GET FORM<HR WIDTH="75%" NOSHADE color="blue"></CENTER>");
            responseContent.append("<FORM ACTION="/formget" METHOD="GET">");
            responseContent.append("<input type=hidden name=getform value="GET">");
            responseContent.append("<table border="0">");
            responseContent.append("<tr><td>Fill with value: <br> <input type=text name="info" size=10></td></tr>");
            responseContent.append("<tr><td>Fill with value: <br> <input type=text name="secondinfo" size=20>");
            responseContent.append("<tr><td>Fill with value: <br> <textarea name="thirdinfo" cols=40 rows=10></textarea>");
            responseContent.append("</td></tr>");
            responseContent.append("<tr><td><INPUT TYPE="submit" NAME="Send" VALUE="Send"></INPUT></td>");
            responseContent.append("<td><INPUT TYPE="reset" NAME="Clear" VALUE="Clear" ></INPUT></td></tr>");
            responseContent.append("</table></FORM>
    ");
            responseContent.append("<CENTER><HR WIDTH="75%" NOSHADE color="blue"></CENTER>");
    
            // POST
            responseContent.append("<CENTER>POST FORM<HR WIDTH="75%" NOSHADE color="blue"></CENTER>");
            responseContent.append("<FORM ACTION="/formpost" METHOD="POST">");
            responseContent.append("<input type=hidden name=getform value="POST">");
            responseContent.append("<table border="0">");
            responseContent.append("<tr><td>Fill with value: <br> <input type=text name="info" size=10></td></tr>");
            responseContent.append("<tr><td>Fill with value: <br> <input type=text name="secondinfo" size=20>");
            responseContent.append("<tr><td>Fill with value: <br> <textarea name="thirdinfo" cols=40 rows=10></textarea>");
            responseContent.append("<tr><td>Fill with file (only file name will be transmitted): <br> <input type=file name="myfile">");
            responseContent.append("</td></tr>");
            responseContent.append("<tr><td><INPUT TYPE="submit" NAME="Send" VALUE="Send"></INPUT></td>");
            responseContent.append("<td><INPUT TYPE="reset" NAME="Clear" VALUE="Clear" ></INPUT></td></tr>");
            responseContent.append("</table></FORM>
    ");
            responseContent.append("<CENTER><HR WIDTH="75%" NOSHADE color="blue"></CENTER>");
    
            // POST with enctype="multipart/form-data"
            responseContent.append("<CENTER>POST MULTIPART FORM<HR WIDTH="75%" NOSHADE color="blue"></CENTER>");
            responseContent.append("<FORM ACTION="/formpostmultipart" ENCTYPE="multipart/form-data" METHOD="POST">");
            responseContent.append("<input type=hidden name=getform value="POST">");
            responseContent.append("<table border="0">");
            responseContent.append("<tr><td>Fill with value: <br> <input type=text name="info" size=10></td></tr>");
            responseContent.append("<tr><td>Fill with value: <br> <input type=text name="secondinfo" size=20>");
            responseContent.append("<tr><td>Fill with value: <br> <textarea name="thirdinfo" cols=40 rows=10></textarea>");
            responseContent.append("<tr><td>Fill with file: <br> <input type=file name="myfile">");
            responseContent.append("</td></tr>");
            responseContent.append("<tr><td><INPUT TYPE="submit" NAME="Send" VALUE="Send"></INPUT></td>");
            responseContent.append("<td><INPUT TYPE="reset" NAME="Clear" VALUE="Clear" ></INPUT></td></tr>");
            responseContent.append("</table></FORM>
    ");
            responseContent.append("<CENTER><HR WIDTH="75%" NOSHADE color="blue"></CENTER>");
    
            responseContent.append("</body>");
            responseContent.append("</html>");
    
            ByteBuf buf = copiedBuffer(responseContent.toString(), CharsetUtil.UTF_8);
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buf);
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");
            response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, buf.readableBytes());
    
            ctx.channel().writeAndFlush(response);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            logger.log(Level.WARNING, responseContent.toString(), cause);
            ctx.channel().close();
        }
    }
    复制代码

    3.4 WebSocket(较少使用)

    复制代码
    public class WebSocketServer {
        public void run(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast("http-codec", new HttpServerCodec());
                    pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
                    ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
                    pipeline.addLast("handler", new WebSocketServerHandler());
                }
         });
    
        Channel ch = b.bind(port).sync().channel();
        System.out.println("Web socket server started at port " + port + '.');
        System.out.println("Open your browser and navigate to http://localhost:" + port + '/');
    
        ch.closeFuture().sync();
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
        }
    
        public static void main(String[] args) throws Exception {
            new WebSocketServer().run(8765);
        }
    }
    复制代码
    复制代码
    public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
        private static final Logger logger = Logger.getLogger(WebSocketServerHandler.class.getName());
    
        private WebSocketServerHandshaker handshaker;
    
        @Override
        public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
            // 传统的HTTP接入
            if (msg instanceof FullHttpRequest) {
                handleHttpRequest(ctx, (FullHttpRequest) msg);
            }
            // WebSocket接入
            else if (msg instanceof WebSocketFrame) {
                handleWebSocketFrame(ctx, (WebSocketFrame) msg);
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
        private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
            // 如果HTTP解码失败,返回HTTP异常
            if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
                sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
                return;
            }
        
            // 构造握手响应返回,本机测试
            WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:8080/websocket", null, false);
            handshaker = wsFactory.newHandshaker(req);
            
            if (handshaker == null) {
                WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
            } else {
                handshaker.handshake(ctx.channel(), req);
            }
        }
    
        private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
    
            // 判断是否是关闭链路的指令
            if (frame instanceof CloseWebSocketFrame) {
                handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
                return;
            }
            // 判断是否是Ping消息
            if (frame instanceof PingWebSocketFrame) {
                ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
                return;
            }
            // 本例程仅支持文本消息,不支持二进制消息
            if (!(frame instanceof TextWebSocketFrame)) {
                throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
            }
        
            // 返回应答消息
            String request = ((TextWebSocketFrame) frame).text();
            if (logger.isLoggable(Level.FINE)) {
                logger.fine(String.format("%s received %s", ctx.channel(), request));
            }
            ctx.channel().write(
                new TextWebSocketFrame(request + " , 欢迎使用Netty WebSocket服务,现在时刻:" + new java.util.Date().toString()));
        }
    
        private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
            // 返回应答给客户端
            if (res.status().code() != 200) {
                ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
                res.content().writeBytes(buf);
                buf.release();
                HttpHeaderUtil.setContentLength(res, res.content().readableBytes());
            }
        
            // 如果是非Keep-Alive,关闭连接
            ChannelFuture f = ctx.channel().writeAndFlush(res);
            if (!HttpHeaderUtil.isKeepAlive(req) || res.status().code() != 200) {
                f.addListener(ChannelFutureListener.CLOSE);
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    复制代码

    客户端是网页

    复制代码
    <!DOCTYPE html>
    <html>
    <head>
    <meta charset="UTF-8">
    Netty WebSocket 时间服务器
    </head>
    <br>
    <body>
        <br>
        <script type="text/javascript">
            var socket;
            if (!window.WebSocket) {
                window.WebSocket = window.MozWebSocket;
            }
            if (window.WebSocket) {
                socket = new WebSocket("ws://localhost:8765/websocket");
                socket.onmessage = function(event) {
                    var ta = document.getElementById('responseText');
                    ta.value = "";
                    ta.value = event.data
                };
                socket.onopen = function(event) {
                    var ta = document.getElementById('responseText');
                    ta.value = "打开WebSocket服务正常,浏览器支持WebSocket!";
                };
                socket.onclose = function(event) {
                    var ta = document.getElementById('responseText');
                    ta.value = "";
                    ta.value = "WebSocket 关闭!";
                };
            } else {
                alert("抱歉,您的浏览器不支持WebSocket协议!");
            }
    
            function send(message) {
                if (!window.WebSocket) {
                    return;
                }
                if (socket.readyState == WebSocket.OPEN) {
                    socket.send(message);
                } else {
                    alert("WebSocket连接没有建立成功!");
                }
            }
        </script>
        <form onsubmit="return false;">
            <input type="text" name="message" value="Netty最佳实践" /> <br>
            <br> <input type="button" value="发送WebSocket请求消息"
                onclick="send(this.form.message.value)" />
            <hr color="blue" />
            <h3>服务端返回的应答消息</h3>
            <textarea id="responseText" style=" 500px; height: 300px;"></textarea>
        </form>
    </body>
    </html>
    复制代码
  • 相关阅读:
    常见排序算法总结(一)
    27.移除元素
    556. 下一个更大元素 III
    503. 下一个更大元素 II
    496.下一个更大元素Ⅰ
    汇编基础
    SQL回顾
    Pandas整理
    爬取中公网新闻时政
    Python合并Excel表格
  • 原文地址:https://www.cnblogs.com/jswang/p/9054857.html
Copyright © 2011-2022 走看看