zoukankan      html  css  js  c++  java
  • Netty权威指南 第2版

    第一章  Java的I/O演进之路

    1.I/O基础入门

      1.Linux网络I/O模型简介

        1.阻塞I/O模型:最常用的I/O模型,缺省情况下, 所有文件操作都是阻塞的

        2.非阻塞I/O模型:recvform从应用层到内核的时候,轮询检查是否有数据到来

     

     

        3.I/O复用模型

        4.信号渠道I/O模型

        5.异步I/O:告知内核启动某个操作,并让内核在整个操作完成后通知我们

      2.I/O多路复用技术

        1.应用场景:服务器需要同时处理多个处于监听状态或者多个连接状态的套接字;服务器需要同时处理多种网络协议的套接字

        2.epoll的改进

          1.支持一个进行打开的socket描述符不受限制

          2.I/O效率不会随着FD数目的增加而线性下降

          3.使用mmap加速内核与用户空间的消息传递

          4.epoll API更加简单

    2.Java的I/O演进

      1.Java的I/O发展简史

    第二章  NIO入门

    1.传统的BIO编程

      1.BIO通信模型

      2.同步阻塞式I/O创建的TimeServer源码分析

        1.TimeServer

    public class TimeServer {
        public static void main(String[] args) throws IOException {
            int port = 8080;
            if (NpeCheck.checkArray(args)) {
                try {
                    port = Integer.valueOf(args[0]);//设置监听端口
                } catch (NumberFormatException e) {
                    //采用默认值
                }
            }
    
            ServerSocket serverSocket = null;
            try {
                serverSocket = new ServerSocket(port);
                System.out.println("服务器启动,端口=" + port);
                Socket socket = null;
                while (true) {
                    socket = serverSocket.accept();//阻塞等待客户端连接
                    new Thread(new TimeServerHandler(socket)).start();
                }
            } finally {
                if (serverSocket != null) {
                    System.out.println("服务器关闭");
                    serverSocket.close();
                    serverSocket = null;
                }
            }
        }
    }

        2.TimeServerHandler

    public class TimeServerHandler implements Runnable {
        private Socket socket;
    
        public TimeServerHandler(Socket socket) {
            this.socket = socket;
        }
    
        @Override
        public void run() {
            BufferedReader in = null;
            PrintWriter out = null;
            try {
                in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
                out = new PrintWriter(this.socket.getOutputStream(), true);
                String currentTime = null;
                String body = null;
                while (true) {
                    body = in.readLine();
                    if (body == null) {
                        break;
                    }
                    System.out.println("收到客户端请求:" + body);
                    if ("QUERY TIME ORDER".equalsIgnoreCase(body)) {
                        currentTime = String.valueOf(System.currentTimeMillis());
                    } else {
                        currentTime = "BAD ORDER";
                    }
                    out.println(currentTime);//发送消息给客户端
                }
            } catch (Exception e) {
                if (in != null) {
                    try {
                        in.close();
                    } catch (IOException e1) {
                        e1.printStackTrace();
                    }
                }
                if (out != null) {
                    out.close();
                    out = null;
                }
                if (this.socket != null) {
                    try {
                        this.socket.close();
                    } catch (IOException e1) {
                        e1.printStackTrace();
                    }
                    this.socket = null;
                }
            }
        }
    }

        3.NpeCheck

    public class NpeCheck {
        public static boolean checkArray(String[] args) {
            if (args != null && args.length > 0) {
                return true;
            } else {
                return false;
            }
        }
    }

      3.同步阻塞式I/O创建的TimerClient源码分析

     

    public class TimeClient {
        public static void main(String[] args) {
            int port = 8080;
            if (NpeCheck.checkArray(args)) {
                try {
                    port = Integer.valueOf(args[0]);//设置监听端口
                } catch (NumberFormatException e) {
                    //采用默认值
                }
            }
    
            Socket socket = null;
            BufferedReader in = null;
            PrintWriter out = null;
            try {
                socket = new Socket("127.0.0.1", port);
                in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                out = new PrintWriter(socket.getOutputStream(), true);
                out.println("QUERY TIME ORDER");//发送请求到服务器
                System.out.println("发送请求到服务器");
                String resp = in.readLine();//读取服务器的响应
                System.out.println("当前时间=" + resp);
            } catch (Exception e) {
                //不需要处理
            } finally {
                if (out != null) {
                    out.close();
                    out = null;
                }
                if (in != null) {
                    try {
                        in.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    in = null;
                }
    
                if (socket != null) {
                    try {
                        socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    socket = null;
                }
            }
        }
    }

    2.伪异步I/O编程

      1.伪异步I/O模型图

        当新的客户端接入时,将客户端的socket封装成一个Task投递到后端的线程池中进行处理,jdk的线程池维护一个消息队列和N个活跃线程,对消息队列中的任务进行处理。

      2.伪异步I/O创建的TimerServer源码分析

        1.TimerServer

     

    public class TimeServer {
        public static void main(String[] args) throws IOException {
            int port = 8080;
            if (NpeCheck.checkArray(args)) {
                try {
                    port = Integer.valueOf(args[0]);//设置监听端口
                } catch (NumberFormatException e) {
                    //采用默认值
                }
            }
    
            ServerSocket server = null;
            try {
                server = new ServerSocket(port);
                System.out.println("服务器启动,端口=" + port);
                Socket socket = null;
                TimeServerHandlerExecutePool singleExecutor=new TimeServerHandlerExecutePool(50,100);//创建I/O任务线程池
                while (true) {
                    socket = server.accept();//阻塞等待客户端连接
                    singleExecutor.execute(new TimeServerHandler(socket));//线程池自动调用线程执行
                }
            } finally {
                if (server != null) {
                    System.out.println("服务器关闭");
                    server.close();
                    server = null;
                }
            }
        }
    }

     

        2.TimeServerHandlerExecutePool线程池

    public class TimeServerHandlerExecutePool {
        private ExecutorService executor;
    
        //创建线程池
        public TimeServerHandlerExecutePool(int maxPoolSize, int queueSiez) {
            executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), maxPoolSize, 120L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(queueSiez));
        }
    
        //执行任务
        public void execute(Runnable task) {
            executor.execute(task);
        }
    }

     

      3.伪异步I/O弊端分析:通信对方返回应答时间过长会引起的级联故障

        1.服务端处理换么,返回应答消息耗时60s,平时只需10ms

        2.采用伪异步I/O的线程正在读取故障服务节点的响应,由于读取输入流是阻塞的,它将会被同步阻塞60s

        3.加入所有的可用线程都被故障服务器阻塞,那后续所有的I/O消息都将在队列中排队

        4.由于线程池采用阻塞队列实现,当队列积满后,后续入队列的操作将被阻塞

        5.由于前端只有一个Accptor线程接收客户端接入,它被阻塞在线程池的同步阻塞队列之后,新的客户端请求消息将被拒绝,客户端会发生大量的连接超时

        6.由于几乎所有的连接都会超时,调用者任务系统已经崩溃,无法接收新的请求消息

     

    3.NIO编程

      1.NIO类库简介

        1.缓冲区Buffer,保护一些要写入或者读出的数据,实质是一个字节数组,提供了对数据的结构化访问以及维护读写位置等信息

        2.通道Channel:双向的,可以用于读、写或者二者同时进行,网络读写SelectableChannel和文件操作FileChannel

        3.多路复用器Selector:提供已经就绪的任务

      2.NIO服务端序列图

      3.NIO创建的TimerServer源码分析

        1.TimeServer

    public class TimeServer {
        public static void main(String[] args) throws IOException {
            int port = 8080;
            if (NpeCheck.checkArray(args)) {
                try {
                    port = Integer.valueOf(args[0]);//设置监听端口
                } catch (NumberFormatException e) {
                    //采用默认值
                }
            }
    
            //创建多路复用类,负责轮询多路复用器Selector,可以处理多个客户端的并发接入
            MultiplexerTimeServer timeServer=new MultiplexerTimeServer(port);
            new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start();
        }
    }

        2.MultiplexerTimeServer多路复用器

    import org.apache.commons.lang3.StringUtils;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    public class MultiplexerTimeServer implements Runnable {
    
        private Selector selector;
    
        private ServerSocketChannel serverChannel;
    
        private volatile boolean stop;
    
        public MultiplexerTimeServer(int port) {
            try {
                selector = Selector.open();//3.1创建Reactor线程
                serverChannel = ServerSocketChannel.open();//1.打开ServerSocketChannel,用于监听客户端的连接,是所有客户端连接的父管道
                serverChannel.configureBlocking(false);//2.2设置连接为非阻塞模式
                serverChannel.socket().bind(new InetSocketAddress(port), 1024);//2.1绑定监听端口,设置连接为非阻塞模式
                serverChannel.register(selector, SelectionKey.OP_ACCEPT);//4.将打开ServerSocketChannel注册到Reactor线程的多路复用器selector上,监听ACCEPT事件
                System.out.println("服务器启动,端口=" + port);
            } catch (IOException e) {
                e.printStackTrace();
                System.exit(1);//若端口占用,退出程序
            }
        }
    
        public void stop() {
            this.stop = true;
        }
    
        @Override
        public void run() {
            while (!stop) {
                try {
                    //5.多路复用器在线程run方法的无限循环体内轮询准备就绪的key
                    selector.select(1000);//1秒轮询1次
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> it = selectionKeys.iterator();
                    SelectionKey key;
                    while (it.hasNext()) {
                        key = it.next();
                        it.remove();
                        try {
                            handleInput(key);
                        } catch (Exception e) {
                            if (key != null) {
                                key.cancel();
                                if (key.channel() != null) {
                                    key.channel().close();
                                }
                            }
                        }
                    }
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
    
            //多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会自动去注册并关闭,所以不需要重复释放资源
            if (selector != null) {
                try {
                    selector.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        //处理新接入的请求消息
        private void handleInput(SelectionKey key) throws IOException {
            if (key.isValid()) {
                //处理新接入的请求消息
                if (key.isAcceptable()) {
                    //接收新的连接
                    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                    //6.多路复用器监听到有新的客户端接入,处理新的接入请求,完成TCP三次握手,建立物理链路
                    SocketChannel sc = ssc.accept();
                    //7.设置客户端链路为非阻塞模式
                    sc.configureBlocking(false);
                    //8.将新接入的客户端连接注册到Reactor线程的多路复用器上,监听读操作,读取客户端发送的网络消息
                    sc.register(selector, SelectionKey.OP_READ);
                }
    
                if (key.isReadable()) {
                    //读取数据
                    SocketChannel sc = (SocketChannel) key.channel();
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    //9.异步读取客户端请求消息到缓冲区
                    int readBytes = sc.read(readBuffer);
                    if (readBytes > 0) {//读取到字节,对字节进行编解码
                        readBuffer.flip();//刷新指针
                        //10.对ByteBuffer进行编解码,如果有半包消息指针reset,继续读取后续的报文,将界面成功的消息封装成Task,投递到业务线程池中,进行业务逻辑编排
                        byte[] bytes = new byte[readBuffer.remaining()];//创建可读的字节数组
                        readBuffer.get(bytes);//复制到新创建的数组中
                        String body = new String(bytes, "UTF-8");//创建请求消息体
                        System.out.println("接收请求=" + body);
                        String currentTime;
                        if ("QUERY TIME ORDER".equalsIgnoreCase(body)) {
                            currentTime = String.valueOf(System.currentTimeMillis());
                        } else {
                            currentTime = "BAD ORDER";
                        }
                        doWrite(sc, currentTime);
                    } else if (readBytes < 0) {//链路已经关闭,需要关闭SocketChannel,释放资源
                        key.cancel();
                        sc.close();
                    } else {
                        //没有读取到字节,属于正常场景,忽略
                    }
                }
            }
        }
    
        //将应答消息异步发送给客户端
        private void doWrite(SocketChannel channel, String response) throws IOException {
            if (StringUtils.isNotBlank(response)) {
                byte[] bytes = response.getBytes();//将字符串编码成字节数组
                ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);//根据字节数组的容量创建ByteBuffer
                writeBuffer.put(bytes);//将字节数组复制到缓冲区
                writeBuffer.flip();//刷新缓冲区
                //11.将POJO对象encode成ByteBuffer,调用SocketChannel的异步write接口,将消息异步发给客户端
                //如果发送区TCP缓冲区满,会导致写半包,此时,需要注册监听写操作位,循环写,知道整包消息写入TCP缓冲区
                channel.write(writeBuffer);
            }
        }
    }

      4.NIO客户端序列图

      5.NIO创建的TimerClient源码分析

        1.TimeClient

    public class TimeClient {
        public static void main(String[] args) {
            int port = 8080;
            if (NpeCheck.checkArray(args)) {
                try {
                    port = Integer.valueOf(args[0]);//设置监听端口
                } catch (NumberFormatException e) {
                    //采用默认值
                }
            }
            new Thread(new TimeClientHandle("127.0.0.1",port),"TimeClient-001").start();
        }
    }

        2.TimeClientHandle

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    public class TimeClientHandle implements Runnable {
        private String host;
        private int port;
        private Selector selector;
        private SocketChannel socketChannel;
        private volatile boolean stop;
    
        public TimeClientHandle(String host, int port) {
            this.host = host == null ? "127.0.0.1" : host;
            this.port = port;
            try {
                selector = Selector.open();//6.1创建Reactor线程
                socketChannel = SocketChannel.open();//1.打开SocketChannel,绑定客户端本地地址
                socketChannel.configureBlocking(false);//2.设置SocketChannel为非阻塞模式,同时设置客户端连接的TCP参数
            } catch (IOException e) {
                e.printStackTrace();
                System.exit(1);
            }
        }
    
        @Override
        public void run() {
            try {
                doConnect();
            } catch (IOException e) {
                e.printStackTrace();
                System.exit(1);
            }
            while (!stop) {
                try {
                    //7.多路复用器在线程的run方法的无限循环体内轮询准备就绪的key
                    selector.select(1000);
                    Set<SelectionKey> selectionKeySet = selector.selectedKeys();
                    Iterator<SelectionKey> it = selectionKeySet.iterator();
                    SelectionKey key;
                    while (it.hasNext()) {
                        key = it.next();
                        it.remove();
                        try {
                            handleInput(key);
                        } catch (Exception e) {
                            if (key != null) {
                                key.cancel();
                                if (key.channel() != null) {
                                    key.channel().close();
                                }
                            }
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    System.exit(1);
                }
            }
    
            //多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会自动去注册并关闭,所以不需要重复释放资源
            if (selector != null) {
                try {
                    selector.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        //处理新接入的请求消息
        private void handleInput(SelectionKey key) throws IOException {
            if (key.isValid()) {
                //判断是否连接成功
                SocketChannel sc = (SocketChannel) key.channel();
                if (key.isConnectable()) {//8.接收连接事件进行处理
                    if (sc.finishConnect()) {//9.判断连接结果,如果连接成功,注册读事件到多路复用器
                        sc.register(selector, SelectionKey.OP_READ);//10.注册读事件到多路复用器
                        doWrite(sc);
                    } else {
                        System.exit(1);//连接失败,进程退出
                    }
                }
                if (key.isReadable()) {
                    //11.异步读服务器的应答消息到缓冲区
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    int readByte = sc.read(readBuffer);
                    if (readByte > 0) {
                        //12.对ByteBuffer进行编解码,如果有半包消息指针reset,继续读取后续的报文,将界面成功的消息封装成Task,投递到业务线程池中,进行业务逻辑编排
                        readBuffer.flip();
                        byte[] bytes = new byte[readBuffer.remaining()];
                        readBuffer.get(bytes);
                        String body = new String(bytes, "UTF-8");
                        System.out.println("读取时间=" + body);
                        this.stop = true;
                    } else if (readByte < 0) {
                        //对端链路关闭
                        key.cancel();
                        sc.close();
                    } else {
                        //没有读取到字节,属于正常场景,忽略
                    }
                }
            }
        }
    
        private void doConnect() throws IOException {
            //如果连接成功,则注册到多路复用器上,发送请求消息,读应答
            if (socketChannel.connect(new InetSocketAddress(host, port))) {//3.异步连接服务器
                socketChannel.register(selector, SelectionKey.OP_READ);//4.注册读状态到多路复用器
                doWrite(socketChannel);
            } else {
                socketChannel.register(selector, SelectionKey.OP_CONNECT);//5.向Reactor线程的多路复用器注册连接状态位,监听TCPACK应答
            }
        }
    
        //将应答消息异步发送给客户端
        private void doWrite(SocketChannel sc) throws IOException {
            byte[] req = "QUERY TIME ORDER".getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
            writeBuffer.put(req);
            writeBuffer.flip();
            sc.write(writeBuffer);//13.将POJO对象encode成ByteBuffer,调用SocketChannel的异步write接口,将消息异步发给客户端
            if (!writeBuffer.hasRemaining()) {//判断消息是否全部发送完成
                System.out.println("请求服务器成功");
            }
        }
    }

    4.AIO编程

      1.AIO创建的TimerServer源码分析

        1.TimeServer

     

    import com.example.demo.util.NpeCheck;
    
    import java.io.IOException;
    
    public class TimeServer {
        public static void main(String[] args) throws IOException {
            int port = 8080;
            if (NpeCheck.checkArray(args)) {
                try {
                    port = Integer.valueOf(args[0]);//设置监听端口
                } catch (NumberFormatException e) {
                    //采用默认值
                }
            }
    
            //创建多路复用类,负责轮询多路复用器Selector,可以处理多个客户端的并发接入
            AsyncTimeServerHandler timeServer = new AsyncTimeServerHandler(port);
            new Thread(timeServer, "AIO-AsyncTimeServerHandler-001").start();//3.2创建多路复用器并启动线程
        }
    }

     

        2.AsyncTimeServerHandler

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.AsynchronousServerSocketChannel;
    import java.util.concurrent.CountDownLatch;
    
    public class AsyncTimeServerHandler implements Runnable {
        private int port;
    
        CountDownLatch latch;
        AsynchronousServerSocketChannel asynchronousServerSocketChannel;
    
        public AsyncTimeServerHandler(int port) {
            this.port = port;
            try {
                asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
                asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
                System.out.println("服务器启动,端口号=" + port);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void run() {
            latch = new CountDownLatch(1);//完成一组正在执行的操作之前,允许当前的线程一直阻塞
            doAccept();
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        private void doAccept() {
            asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler());//接收客户端连接
        }
    }

        3.AcceptCompletionHandler

    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.nio.channels.CompletionHandler;
    
    //作为handler来接收通知消息
    public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,AsyncTimeServerHandler> {
    
        @Override
        public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) {
            attachment.asynchronousServerSocketChannel.accept(attachment, this);//接口客户端连接
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            result.read(buffer, buffer, new ReadCompletionHandler(result));
        }
    
        @Override
        public void failed(Throwable exc, AsyncTimeServerHandler attachment) {
            exc.printStackTrace();
            attachment.latch.countDown();
        }
    }

     

      2.AIO创建的TimerClient源码分析

        1.TimeClient

     

    import com.example.demo.util.NpeCheck;
    
    public class TimeClient {
        public static void main(String[] args) {
            int port = 8080;
            if (NpeCheck.checkArray(args)) {
                try {
                    port = Integer.valueOf(args[0]);//设置监听端口
                } catch (NumberFormatException e) {
                    //采用默认值
                }
            }
            new Thread(new AsyncTimeClientHandler("127.0.0.1", port), "AIO-AsyncTimeClient-001").start();//6.2创建多路复用器并启动线程
        }
    }

     

        2.AsyncTimeClientHandler

     

    import java.io.IOException;
    import java.io.UnsupportedEncodingException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.nio.channels.CompletionHandler;
    import java.util.concurrent.CountDownLatch;
    
    public class AsyncTimeClientHandler implements CompletionHandler<Void, AsyncTimeClientHandler>, Runnable {
    
        private AsynchronousSocketChannel client;
        private String host;
        private int port;
        private CountDownLatch latch;
    
        public AsyncTimeClientHandler(String host, int port) {
            this.host = host;
            this.port = port;
            try {
                client = AsynchronousSocketChannel.open();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void run() {
            latch = new CountDownLatch(1);//防止异步操作没有执行完成线程就退出
            //发起异步操作,attachment用于回调通知时作为入参被传递,handler异步回调通知接口
            client.connect(new InetSocketAddress(host, port), this, this);
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                client.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void completed(Void result, AsyncTimeClientHandler attachment) {
            byte[] req = "QUERY TIME ORDER".getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
            writeBuffer.put(req);
            writeBuffer.flip();
            client.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer result, ByteBuffer buffer) {
                    if (buffer.hasRemaining()) {//未发送完成继续异步发送
                        client.write(buffer, buffer, this);
                    } else {//发送完成,异步读取
                        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                        client.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                            @Override
                            public void completed(Integer result, ByteBuffer buffer) {
                                buffer.flip();
                                byte[] bytes = new byte[buffer.remaining()];
                                buffer.get(bytes);
                                String body;
                                try {
                                    body = new String(bytes, "UTF-8");
                                    System.out.println("读取时间=" + body);
                                    latch.countDown();
                                } catch (UnsupportedEncodingException e) {
                                    e.printStackTrace();
                                }
                            }
    
                            @Override
                            public void failed(Throwable exc, ByteBuffer attachment) {
                                try {
                                    client.close();//关闭链路
                                    latch.countDown();//让线程执行完毕
                                } catch (IOException e) {
                                    //忽略
                                }
                            }
                        });
                    }
                }
    
                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    try {
                        client.close();
                        latch.countDown();
                    } catch (IOException e) {
                        //忽略
                    }
                }
            });
        }
    
        @Override
        public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
            try {
                client.close();
                latch.countDown();
            } catch (IOException e) {
                //忽略
            }
        }
    }

     

        3.ReadCompletionHandler

     

    import org.apache.commons.lang3.StringUtils;
    
    import java.io.IOException;
    import java.io.UnsupportedEncodingException;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.nio.channels.CompletionHandler;
    
    public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
        private AsynchronousSocketChannel channel;//用于读取半包消息和发送应答
    
        public ReadCompletionHandler(AsynchronousSocketChannel channel) {
            if (this.channel == null) {
                this.channel = channel;
            }
        }
    
        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            attachment.flip();
            byte[] body = new byte[attachment.remaining()];
            attachment.get(body);
            try {
                String req = new String(body, "UTF-8");
                System.out.println("收到请求:" + req);
                String currentTime;
                if ("QUERY TIME ORDER".equalsIgnoreCase(req)) {
                    currentTime = String.valueOf(System.currentTimeMillis());
                } else {
                    currentTime = "BAD ORDER";
                }
                doWrite(currentTime);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
    
        private void doWrite(String currentTime) {
            if (StringUtils.isNotBlank(currentTime)) {
                byte[] bytes = (currentTime).getBytes();
                ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
                writeBuffer.put(bytes);
                writeBuffer.flip();
                channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                    @Override
                    public void completed(Integer result, ByteBuffer buffer) {
                        //如果没有发送完成,继续发送
                        if (buffer.hasRemaining()) {
                            channel.write(buffer, buffer, this);
                        }
                    }
    
                    @Override
                    public void failed(Throwable exc, ByteBuffer attachment) {
                        try {
                            channel.close();
                        } catch (IOException e) {
                            //忽略
                        }
                    }
                });
            }
        }
    
        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            try {
                this.channel.close();
            } catch (IOException e) {
                //忽略
            }
        }
    }

      3.AIO版本时间服务器运行结果

        异步SocketChannel是被动执行对象,我们不需要像NIO编程那样创建一个独立的I/O线程来处理读写操作。对应AsynchronousServerSocketChannel和AsynchronousSocketChannel都是JDK底层的线程池负责回调并驱动读写操作,AIO编程比NIO编程更为简单。

    5.4种I/O对比

      1.概念澄清

        1.异步非阻塞I/O,NIO不是真正意义的异步非阻塞I/O,是非阻塞I/O

        2.多路复用器Selector

        3.伪异步I/O,通过线程池做缓冲区实现

      2.不同I/O模型对比

    6.选择Netty的理由

      1.不选择Java元素NIO编程的原因

      2.为什么选择Netty

    第3章  Netty入门应用

    3.1Netty开发环境的搭建

      3.1.1下载Netty的软件包

      3.1.2搭建Netty应用工程

    3.2Netty服务端开发

      3.2.1步骤

     

    3.3Netty客户端开发

      3.3.1TimeClient

     

    import com.example.demo.util.NpeCheck;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    public class TimeClient {
        public void connect(int port, String host) throws Exception {
            //配置客户端NIO线程组
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY,true)
                        .handler(new ChannelInitializer<SocketChannel>() {//处理网络I/O事件
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new TimeClientHandler());
                            }
                        });
                //发起异步连接操作
                ChannelFuture f=b.connect(host,port).sync();
    
                //等待客户端链路关闭
                f.channel().closeFuture().sync();
            }finally {
                //优雅退出,释放NIO线程组
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port = 8080;
            if (NpeCheck.checkArray(args)) {
                try {
                    port = Integer.valueOf(args[0]);//设置监听端口
                } catch (NumberFormatException e) {
                    //采用默认值
                }
            }
            new TimeClient().connect(port,"127.0.0.1");
        }
    }

     

      3.3.2TimeClientHandler

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    import java.util.logging.Logger;
    
    public class TimeClientHandler extends ChannelHandlerAdapter {
    
        private static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName());
    
        private final ByteBuf firstMessage;
    
        public TimeClientHandler() {
            byte[] req = "QUERY TIME ORDER".getBytes();
            firstMessage = Unpooled.buffer(req.length);
            firstMessage.writeBytes(req);
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.writeAndFlush(firstMessage);//连接成功后,发送请求
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;//读取应答消息
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "UTF-8");
            System.out.println("当前时间=" + body);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            //释放资源
            logger.warning("异常=" + cause.getMessage());
            ctx.close();
        }
    }

     

    3.4运行和调试

      3.4.1服务器和客户端的运行

      3.4.2打包和部署

    第4章  TCP粘包/拆包问题的解决之道

     

     4.1TCP粘包/拆包

      一个完整的包可能会被TCP拆分多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送

      4.1.1TCO粘包/拆包问题说明

        1.示意图

        2.分析

      4.1.2TCP粘包/拆包发生的原因

      4.1.3粘包问题的解决策略

    4.2未考虑TCP粘包导致功能异常案例

      4.2.1TimeServer的改造

      4.2.2TimeClient的改造

      4.2.3运行结果

    4.3理由LineBasedFrameDecoder解决TCP粘包问题

      4.3.1支持TCP粘包的TimeServer

        1.TimeServer

    import com.example.demo.util.NpeCheck;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.LineBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class TimeServer {
        public void bind(int port) throws Exception {
            //配置服务端的NIO线程组
            EventLoopGroup bossGroup = new NioEventLoopGroup();//用于接收客户端的线程组
            EventLoopGroup workerGroup = new NioEventLoopGroup();//用于用于网络读写的线程组
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)//设置通道
                        .option(ChannelOption.SO_BACKLOG, 1024)//设置TCP参数
                        .childHandler(new ChildChannelHandler());//用于处理网络I/O事件(记录日志、对消息编解码)
                ChannelFuture f = b.bind(port).sync();//绑定端口,同步等待成功
    
                f.channel().closeFuture().sync();//等待服务端监听端口关闭,才退出main函数
            } finally {
                //优雅退出,释放线程池资源
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
    
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                ch.pipeline().addLast(new StringDecoder());
                ch.pipeline().addLast(new TimeServerHandler());
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port = 8080;
            if (NpeCheck.checkArray(args)) {
                try {
                    port = Integer.valueOf(args[0]);
                } catch (NumberFormatException e) {
                    //采用默认值
                }
            }
            new TimeServer().bind(port);
        }
    }

        2.TimeServerHandler

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    import java.util.Date;
    
    public class TimeServerHandler extends ChannelHandlerAdapter {
    
        private int counter;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String body = (String) msg;//接收到的是删除回车换行符后的请求消息
            System.out.println("接收到请求命令:" + body + ";次数="+ ++counter);
            String currentTime;
            if ("QUERY TIME ORDER".equals(body)) {
                currentTime = new Date(System.currentTimeMillis()).toString();
            } else {
                currentTime = "参数错误";
            }
            currentTime += System.getProperty("line.separator");
            ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
            ctx.writeAndFlush(resp);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }

      4.3.2支持TCP粘包的TimeClient

        1.TimeClient

    import com.example.demo.util.NpeCheck;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.LineBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class TimeClient {
        public void connect(int port, String host) throws Exception {
            //配置客户端NIO线程组
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY,true)
                        .handler(new ChannelInitializer<SocketChannel>() {//处理网络I/O事件
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                                ch.pipeline().addLast(new StringDecoder());
                                ch.pipeline().addLast(new TimeClientHandler());
                            }
                        });
                //发起异步连接操作
                ChannelFuture f=b.connect(host,port).sync();
    
                //等待客户端链路关闭
                f.channel().closeFuture().sync();
            }finally {
                //优雅退出,释放NIO线程组
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port = 8080;
            if (NpeCheck.checkArray(args)) {
                try {
                    port = Integer.valueOf(args[0]);//设置监听端口
                } catch (NumberFormatException e) {
                    //采用默认值
                }
            }
            new TimeClient().connect(port,"127.0.0.1");
        }
    }

        2.TimeClientHandler

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    import java.util.logging.Logger;
    
    public class TimeClientHandler extends ChannelHandlerAdapter {
    
        private static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName());
        private int counter;
        private byte[] req;
    
        public TimeClientHandler() {
            req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ByteBuf message;
            for (int i = 0; i < 100; i++) {
                message = Unpooled.buffer(req.length);
                message.writeBytes(req);
                ctx.writeAndFlush(message);
            }
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String body = (String) msg;
            System.out.println("当前时间=" + body + ";次数=" + ++counter);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            //释放资源
            logger.warning("异常=" + cause.getMessage());
            ctx.close();
        }
    }

      4.3.3运行支持TCP粘包的时间服务器程序

      4.3.4LineBasedFrameDecoder和StringDecoder的原理分析

    第5章  分隔符和定长解码器的应用

    5.1DelimiterBasedFrameDecoder应用开发

      可以自动完成以分割服作为码流结束标识的消息的解码

      5.1.1DelimiterBasedFrameDecoder服务端开发

        1.EchoServer

     

    import com.example.demo.util.NpeCheck;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    
    public class EchoServer {
        public void bind(int port) throws Exception {
            //配置服务端的NIO线程组
            EventLoopGroup bossGroup = new NioEventLoopGroup();//用于接收客户端的线程组
            EventLoopGroup workerGroup = new NioEventLoopGroup();//用于用于网络读写的线程组
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)//设置通道
                        .option(ChannelOption.SO_BACKLOG, 1024)//设置TCP参数
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) {
                                ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());//创建分割符缓存对象
                                //1024标识单条消息的最大长度,当达到该长度后仍然没有查找到分割符,就抛出异常,防止异常码流缺失分隔符导致的内存溢出,是netty解码器的可靠性保护
                                ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                                ch.pipeline().addLast(new StringDecoder());
                                ch.pipeline().addLast(new EchoServerHandler());
                            }
                        });
                ChannelFuture f = b.bind(port).sync();//绑定端口,同步等待成功
    
                f.channel().closeFuture().sync();//等待服务端监听端口关闭,才退出main函数
            } finally {
                //优雅退出,释放线程池资源
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port = 8080;
            if (NpeCheck.checkArray(args)) {
                try {
                    port = Integer.valueOf(args[0]);
                } catch (NumberFormatException e) {
                    //采用默认值
                }
            }
            new EchoServer().bind(port);
        }
    }

     

        2.EchoServerHandler

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class EchoServerHandler extends ChannelHandlerAdapter {
    
        int counter = 0;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String body = (String) msg;//接收到的是删除回车换行符后的请求消息
            System.out.println("接收到请求命令:" + body + ";次数=" + ++counter);
            body += "$_";//加上分隔符
            ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
            ctx.writeAndFlush(echo);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }

     

      5.1.2DelimiterBasedFrameDecoder客户端开发

        1.EchoClient

     

    import com.example.demo.util.NpeCheck;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class EchoClient {
        public void connect(int port, String host) throws Exception {
            //配置客户端NIO线程组
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY,true)
                        .handler(new ChannelInitializer<SocketChannel>() {//处理网络I/O事件
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());//创建分割符缓存对象
                                //1024标识单条消息的最大长度,当达到该长度后仍然没有查找到分割符,就抛出异常,防止异常码流缺失分隔符导致的内存溢出,是netty解码器的可靠性保护
                                ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                                ch.pipeline().addLast(new StringDecoder());
                                ch.pipeline().addLast(new EchoClientHandler());
                            }
                        });
                //发起异步连接操作
                ChannelFuture f=b.connect(host,port).sync();
    
                //等待客户端链路关闭
                f.channel().closeFuture().sync();
            }finally {
                //优雅退出,释放NIO线程组
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port = 8080;
            if (NpeCheck.checkArray(args)) {
                try {
                    port = Integer.valueOf(args[0]);//设置监听端口
                } catch (NumberFormatException e) {
                    //采用默认值
                }
            }
            new EchoClient().connect(port,"127.0.0.1");
        }
    }

     

        2.EchoClientHandler

    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class EchoClientHandler extends ChannelHandlerAdapter {
    
        private int counter;
    
        static final String ECHO_REQ = "hello netty.$_";
    
    
        public EchoClientHandler() {
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            for (int i = 0; i < 10; i++) {
                ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
            }
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String body = (String) msg;
            System.out.println("接收的消息=" + body + ";次数=" + ++counter);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }

      5.1.3运行DelimiterBasedFrameDecoder服务端和客户端

    5.2FixedLengthFrameDecoder应用开发

      固定长度解码器:按照指定的长度对消息进行自动解码,不需要考虑粘包/拆包问题

      5.2.1FixedLengthFrameDecoder服务端开发

        1.EchoServer

     

    import com.example.demo.util.NpeCheck;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.FixedLengthFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    
    public class EchoServer {
        public void bind(int port) throws Exception {
            //配置服务端的NIO线程组
            EventLoopGroup bossGroup = new NioEventLoopGroup();//用于接收客户端的线程组
            EventLoopGroup workerGroup = new NioEventLoopGroup();//用于用于网络读写的线程组
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)//设置通道
                        .option(ChannelOption.SO_BACKLOG, 1024)//设置TCP参数
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) {
                                ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
                                ch.pipeline().addLast(new StringDecoder());
                                ch.pipeline().addLast(new EchoServerHandler());
                            }
                        });
                ChannelFuture f = b.bind(port).sync();//绑定端口,同步等待成功
    
                f.channel().closeFuture().sync();//等待服务端监听端口关闭,才退出main函数
            } finally {
                //优雅退出,释放线程池资源
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port = 8080;
            if (NpeCheck.checkArray(args)) {
                try {
                    port = Integer.valueOf(args[0]);
                } catch (NumberFormatException e) {
                    //采用默认值
                }
            }
            new EchoServer().bind(port);
        }
    }

     

        2.EchoServerHandler

    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class EchoServerHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            System.out.println("收到客户端请求:" + msg);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }

      5.2.2利用telnet命令行测试EchoServer服务端

    第6章  编解码技术

    6.1Java序列化的缺点

      6.1.1无法跨语言

      6.1.2序列化后的码流太大

      6.1.3序列化性能太低

    6.2业界主流的编解码框架

      6.2.1Google的Protobuf

      6.2.2Facebook的Thrift

        适用于静态的数据交换、适用于搭建大型数据交换及存储的通用工具

      6.2.3JBoss Marshalling

    第7章  MessagePack编解码

    7.1MessagePack介绍

      7.1.1MessagePack多语言支持

      7.1.2MessagePack Java API介绍

        1.导入

     

        compile group: 'org.msgpack', name: 'msgpack', version: '0.6.12'

     

        2.使用

     

        public static void main(String[] args) throws IOException {
            List<String> list=new ArrayList<>();
            list.add("a");
            list.add("b");
            list.add("c");
            MessagePack messagePack=new MessagePack();
            byte[] data = messagePack.write(list);
            List<String> read = messagePack.read(data, Templates.tList(Templates.TString));
            for(String s:read){
                System.out.println(s);
            }
        }

     

      7.1.3MessagePack开发包下载

    7.2MessagePack编码器和解码器开发

      7.2.1MessagePack编码器开发

        1.MsgpackEncoder

     

    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    import org.msgpack.MessagePack;
    
    //编码器
    public class MsgpackEncoder extends MessageToByteEncoder<Object> {
        @Override
        protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
            MessagePack messagePack = new MessagePack();
            byte[] write = messagePack.write(msg);//将POJO对象编码为byte数组
            out.writeBytes(write);
        }
    }

     

      7.2.2MessagePack解码器开发

        1.MsgpackDecoder

     

    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToMessageDecoder;
    import org.msgpack.MessagePack;
    import java.util.List;
    
    //解码器
    public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf> {
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
            final byte[] array;
            final int length = msg.readableBytes();
            array = new byte[length];
            msg.getBytes(msg.readerIndex(), array, 0, length);//获取需要解码的byte数组
            MessagePack messagePack = new MessagePack();
            out.add(messagePack.read(array));//反序列化为Objec对象,添加到解码列表
        }
    }

     

      7.2.3功能测试

    7.3粘包/半包支持

      1.EchoClient

     

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import io.netty.handler.codec.LengthFieldPrepender;
    
    public class EchoClient {
        private final String host;
        private final int port;
        private final int sendNumber;
    
        public EchoClient(String host, int port, int sendNumber) {
            this.host = host;
            this.port = port;
            this.sendNumber = sendNumber;
        }
    
        public void run() throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535,0,2,0,2));
                                ch.pipeline().addLast("msgpack decoder", new MsgpackDecoder());
                                ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));
                                ch.pipeline().addLast("msgpack encoder", new MsgpackEncoder());
                                ch.pipeline().addLast(new EchoClientHandler(sendNumber));
                            }
                        });
                //发起异步连接操作
                ChannelFuture f = b.connect(host, port).sync();
    
                //等待客户端链路关闭
                f.channel().closeFuture().sync();
            } finally {
                //优雅退出,释放NIO线程组
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port = 8080;
            if (NpeCheck.checkArray(args)) {
                try {
                    port = Integer.valueOf(args[0]);//设置监听端口
                } catch (NumberFormatException e) {
                    //采用默认值
                }
            }
            new EchoClient("127.0.0.1",port,100).run();
        }
    }

     

      2.EchoClientHandler

    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class EchoClientHandler extends ChannelHandlerAdapter {
        private final int sendNumber;
    
        public EchoClientHandler(int sendNumber) {
            this.sendNumber = sendNumber;
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            UserInfo[] userInfos = UserInfo();
            for (UserInfo userInfo : userInfos) {
                ctx.write(userInfo);
            }
            ctx.flush();
        }
    
        private UserInfo[] UserInfo() {
            UserInfo[] userInfos = new UserInfo[sendNumber];
            UserInfo userInfo;
            System.out.println(sendNumber);
            for (int i = 0; i < sendNumber; i++) {
                userInfo = new UserInfo();
                userInfo.setAge(i);
                userInfo.setName("---->" + i);
                userInfos[i] = userInfo;
                System.out.println(i);
            }
            return userInfos;
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            System.out.println("客户端接收消息 = " + msg);
            ctx.write(msg);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
            ctx.flush();
        }
    }

      3.EchoServer

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import io.netty.handler.codec.LengthFieldPrepender;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    
    public class EchoServer {
        public void bind(int port) throws Exception {
            //配置服务端的NIO线程组
            EventLoopGroup bossGroup = new NioEventLoopGroup();//用于接收客户端的线程组
            EventLoopGroup workerGroup = new NioEventLoopGroup();//用于用于网络读写的线程组
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)//设置通道
                        .option(ChannelOption.SO_BACKLOG, 1024)//设置TCP参数
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) {
                                ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535,0,2,0,2));
                                ch.pipeline().addLast("msgpack decoder", new MsgpackDecoder());
                                ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));
                                ch.pipeline().addLast("msgpack encoder", new MsgpackEncoder());
                                ch.pipeline().addLast(new EchoServerHandler());
                            }
                        });
                ChannelFuture f = b.bind(port).sync();//绑定端口,同步等待成功
    
                f.channel().closeFuture().sync();//等待服务端监听端口关闭,才退出main函数
            } finally {
                //优雅退出,释放线程池资源
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port = 8080;
            if (NpeCheck.checkArray(args)) {
                try {
                    port = Integer.valueOf(args[0]);
                } catch (NumberFormatException e) {
                    //采用默认值
                }
            }
            new EchoServer().bind(port);
        }
    }

      4.EchoServerHandler

    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class EchoServerHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            System.out.println("收到客户端请求:" + msg);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }

    第8章  Google Protobuf编解码

    8.1Protobuf的入门

      8.1.1Protobuf开发环境搭建

      8.1.2Protobuf编解码开发

        1.SubscribeReqProto

     

    public final class SubscribeReqProto {
      private SubscribeReqProto() {}
      public static void registerAllExtensions(
          com.google.protobuf.ExtensionRegistry registry) {
      }
      public interface SubscribeReqOrBuilder
          extends com.google.protobuf.MessageOrBuilder {
    
        // required int32 subReqID = 1;
        /**
         * <code>required int32 subReqID = 1;</code>
         */
        boolean hasSubReqID();
        /**
         * <code>required int32 subReqID = 1;</code>
         */
        int getSubReqID();
    
        // required string userName = 2;
        /**
         * <code>required string userName = 2;</code>
         */
        boolean hasUserName();
        /**
         * <code>required string userName = 2;</code>
         */
        String getUserName();
        /**
         * <code>required string userName = 2;</code>
         */
        com.google.protobuf.ByteString
            getUserNameBytes();
    
        // required string productName = 3;
        /**
         * <code>required string productName = 3;</code>
         */
        boolean hasProductName();
        /**
         * <code>required string productName = 3;</code>
         */
        String getProductName();
        /**
         * <code>required string productName = 3;</code>
         */
        com.google.protobuf.ByteString
            getProductNameBytes();
    
        // repeated string address = 4;
        /**
         * <code>repeated string address = 4;</code>
         */
        java.util.List<String>
        getAddressList();
        /**
         * <code>repeated string address = 4;</code>
         */
        int getAddressCount();
        /**
         * <code>repeated string address = 4;</code>
         */
        String getAddress(int index);
        /**
         * <code>repeated string address = 4;</code>
         */
        com.google.protobuf.ByteString
            getAddressBytes(int index);
      }
      /**
       * Protobuf type {@code netty.SubscribeReq}
       */
      public static final class SubscribeReq extends
          com.google.protobuf.GeneratedMessage
          implements SubscribeReqOrBuilder {
        // Use SubscribeReq.newBuilder() to construct.
        private SubscribeReq(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
          super(builder);
          this.unknownFields = builder.getUnknownFields();
        }
        private SubscribeReq(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
    
        private static final SubscribeReq defaultInstance;
        public static SubscribeReq getDefaultInstance() {
          return defaultInstance;
        }
    
        public SubscribeReq getDefaultInstanceForType() {
          return defaultInstance;
        }
    
        private final com.google.protobuf.UnknownFieldSet unknownFields;
        @Override
        public final com.google.protobuf.UnknownFieldSet
            getUnknownFields() {
          return this.unknownFields;
        }
        private SubscribeReq(
            com.google.protobuf.CodedInputStream input,
            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
            throws com.google.protobuf.InvalidProtocolBufferException {
          initFields();
          int mutable_bitField0_ = 0;
          com.google.protobuf.UnknownFieldSet.Builder unknownFields =
              com.google.protobuf.UnknownFieldSet.newBuilder();
          try {
            boolean done = false;
            while (!done) {
              int tag = input.readTag();
              switch (tag) {
                case 0:
                  done = true;
                  break;
                default: {
                  if (!parseUnknownField(input, unknownFields,
                                         extensionRegistry, tag)) {
                    done = true;
                  }
                  break;
                }
                case 8: {
                  bitField0_ |= 0x00000001;
                  subReqID_ = input.readInt32();
                  break;
                }
                case 18: {
                  bitField0_ |= 0x00000002;
                  userName_ = input.readBytes();
                  break;
                }
                case 26: {
                  bitField0_ |= 0x00000004;
                  productName_ = input.readBytes();
                  break;
                }
                case 34: {
                  if (!((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
                    address_ = new com.google.protobuf.LazyStringArrayList();
                    mutable_bitField0_ |= 0x00000008;
                  }
                  address_.add(input.readBytes());
                  break;
                }
              }
            }
          } catch (com.google.protobuf.InvalidProtocolBufferException e) {
            throw e.setUnfinishedMessage(this);
          } catch (java.io.IOException e) {
            throw new com.google.protobuf.InvalidProtocolBufferException(
                e.getMessage()).setUnfinishedMessage(this);
          } finally {
            if (((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
              address_ = new com.google.protobuf.UnmodifiableLazyStringList(address_);
            }
            this.unknownFields = unknownFields.build();
            makeExtensionsImmutable();
          }
        }
        public static final com.google.protobuf.Descriptors.Descriptor
            getDescriptor() {
          return SubscribeReqProto.internal_static_netty_SubscribeReq_descriptor;
        }
    
        protected FieldAccessorTable
            internalGetFieldAccessorTable() {
          return SubscribeReqProto.internal_static_netty_SubscribeReq_fieldAccessorTable
              .ensureFieldAccessorsInitialized(
                  SubscribeReq.class, Builder.class);
        }
    
        public static com.google.protobuf.Parser<SubscribeReq> PARSER =
            new com.google.protobuf.AbstractParser<SubscribeReq>() {
          public SubscribeReq parsePartialFrom(
              com.google.protobuf.CodedInputStream input,
              com.google.protobuf.ExtensionRegistryLite extensionRegistry)
              throws com.google.protobuf.InvalidProtocolBufferException {
            return new SubscribeReq(input, extensionRegistry);
          }
        };
    
        @Override
        public com.google.protobuf.Parser<SubscribeReq> getParserForType() {
          return PARSER;
        }
    
        private int bitField0_;
        // required int32 subReqID = 1;
        public static final int SUBREQID_FIELD_NUMBER = 1;
        private int subReqID_;
        /**
         * <code>required int32 subReqID = 1;</code>
         */
        public boolean hasSubReqID() {
          return ((bitField0_ & 0x00000001) == 0x00000001);
        }
        /**
         * <code>required int32 subReqID = 1;</code>
         */
        public int getSubReqID() {
          return subReqID_;
        }
    
        // required string userName = 2;
        public static final int USERNAME_FIELD_NUMBER = 2;
        private Object userName_;
        /**
         * <code>required string userName = 2;</code>
         */
        public boolean hasUserName() {
          return ((bitField0_ & 0x00000002) == 0x00000002);
        }
        /**
         * <code>required string userName = 2;</code>
         */
        public String getUserName() {
          Object ref = userName_;
          if (ref instanceof String) {
            return (String) ref;
          } else {
            com.google.protobuf.ByteString bs = 
                (com.google.protobuf.ByteString) ref;
            String s = bs.toStringUtf8();
            if (bs.isValidUtf8()) {
              userName_ = s;
            }
            return s;
          }
        }
        /**
         * <code>required string userName = 2;</code>
         */
        public com.google.protobuf.ByteString
            getUserNameBytes() {
          Object ref = userName_;
          if (ref instanceof String) {
            com.google.protobuf.ByteString b = 
                com.google.protobuf.ByteString.copyFromUtf8(
                    (String) ref);
            userName_ = b;
            return b;
          } else {
            return (com.google.protobuf.ByteString) ref;
          }
        }
    
        // required string productName = 3;
        public static final int PRODUCTNAME_FIELD_NUMBER = 3;
        private Object productName_;
        /**
         * <code>required string productName = 3;</code>
         */
        public boolean hasProductName() {
          return ((bitField0_ & 0x00000004) == 0x00000004);
        }
        /**
         * <code>required string productName = 3;</code>
         */
        public String getProductName() {
          Object ref = productName_;
          if (ref instanceof String) {
            return (String) ref;
          } else {
            com.google.protobuf.ByteString bs = 
                (com.google.protobuf.ByteString) ref;
            String s = bs.toStringUtf8();
            if (bs.isValidUtf8()) {
              productName_ = s;
            }
            return s;
          }
        }
        /**
         * <code>required string productName = 3;</code>
         */
        public com.google.protobuf.ByteString
            getProductNameBytes() {
          Object ref = productName_;
          if (ref instanceof String) {
            com.google.protobuf.ByteString b = 
                com.google.protobuf.ByteString.copyFromUtf8(
                    (String) ref);
            productName_ = b;
            return b;
          } else {
            return (com.google.protobuf.ByteString) ref;
          }
        }
    
        // repeated string address = 4;
        public static final int ADDRESS_FIELD_NUMBER = 4;
        private com.google.protobuf.LazyStringList address_;
        /**
         * <code>repeated string address = 4;</code>
         */
        public java.util.List<String>
            getAddressList() {
          return address_;
        }
        /**
         * <code>repeated string address = 4;</code>
         */
        public int getAddressCount() {
          return address_.size();
        }
        /**
         * <code>repeated string address = 4;</code>
         */
        public String getAddress(int index) {
          return address_.get(index);
        }
        /**
         * <code>repeated string address = 4;</code>
         */
        public com.google.protobuf.ByteString
            getAddressBytes(int index) {
          return address_.getByteString(index);
        }
    
        private void initFields() {
          subReqID_ = 0;
          userName_ = "";
          productName_ = "";
          address_ = com.google.protobuf.LazyStringArrayList.EMPTY;
        }
        private byte memoizedIsInitialized = -1;
        public final boolean isInitialized() {
          byte isInitialized = memoizedIsInitialized;
          if (isInitialized != -1) return isInitialized == 1;
    
          if (!hasSubReqID()) {
            memoizedIsInitialized = 0;
            return false;
          }
          if (!hasUserName()) {
            memoizedIsInitialized = 0;
            return false;
          }
          if (!hasProductName()) {
            memoizedIsInitialized = 0;
            return false;
          }
          memoizedIsInitialized = 1;
          return true;
        }
    
        public void writeTo(com.google.protobuf.CodedOutputStream output)
                            throws java.io.IOException {
          getSerializedSize();
          if (((bitField0_ & 0x00000001) == 0x00000001)) {
            output.writeInt32(1, subReqID_);
          }
          if (((bitField0_ & 0x00000002) == 0x00000002)) {
            output.writeBytes(2, getUserNameBytes());
          }
          if (((bitField0_ & 0x00000004) == 0x00000004)) {
            output.writeBytes(3, getProductNameBytes());
          }
          for (int i = 0; i < address_.size(); i++) {
            output.writeBytes(4, address_.getByteString(i));
          }
          getUnknownFields().writeTo(output);
        }
    
        private int memoizedSerializedSize = -1;
        public int getSerializedSize() {
          int size = memoizedSerializedSize;
          if (size != -1) return size;
    
          size = 0;
          if (((bitField0_ & 0x00000001) == 0x00000001)) {
            size += com.google.protobuf.CodedOutputStream
              .computeInt32Size(1, subReqID_);
          }
          if (((bitField0_ & 0x00000002) == 0x00000002)) {
            size += com.google.protobuf.CodedOutputStream
              .computeBytesSize(2, getUserNameBytes());
          }
          if (((bitField0_ & 0x00000004) == 0x00000004)) {
            size += com.google.protobuf.CodedOutputStream
              .computeBytesSize(3, getProductNameBytes());
          }
          {
            int dataSize = 0;
            for (int i = 0; i < address_.size(); i++) {
              dataSize += com.google.protobuf.CodedOutputStream
                .computeBytesSizeNoTag(address_.getByteString(i));
            }
            size += dataSize;
            size += 1 * getAddressList().size();
          }
          size += getUnknownFields().getSerializedSize();
          memoizedSerializedSize = size;
          return size;
        }
    
        private static final long serialVersionUID = 0L;
        @Override
        protected Object writeReplace()
            throws java.io.ObjectStreamException {
          return super.writeReplace();
        }
    
        public static SubscribeReq parseFrom(
            com.google.protobuf.ByteString data)
            throws com.google.protobuf.InvalidProtocolBufferException {
          return PARSER.parseFrom(data);
        }
        public static SubscribeReq parseFrom(
            com.google.protobuf.ByteString data,
            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
            throws com.google.protobuf.InvalidProtocolBufferException {
          return PARSER.parseFrom(data, extensionRegistry);
        }
        public static SubscribeReq parseFrom(byte[] data)
            throws com.google.protobuf.InvalidProtocolBufferException {
          return PARSER.parseFrom(data);
        }
        public static SubscribeReq parseFrom(
            byte[] data,
            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
            throws com.google.protobuf.InvalidProtocolBufferException {
          return PARSER.parseFrom(data, extensionRegistry);
        }
        public static SubscribeReq parseFrom(java.io.InputStream input)
            throws java.io.IOException {
          return PARSER.parseFrom(input);
        }
        public static SubscribeReq parseFrom(
            java.io.InputStream input,
            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
            throws java.io.IOException {
          return PARSER.parseFrom(input, extensionRegistry);
        }
        public static SubscribeReq parseDelimitedFrom(java.io.InputStream input)
            throws java.io.IOException {
          return PARSER.parseDelimitedFrom(input);
        }
        public static SubscribeReq parseDelimitedFrom(
            java.io.InputStream input,
            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
            throws java.io.IOException {
          return PARSER.parseDelimitedFrom(input, extensionRegistry);
        }
        public static SubscribeReq parseFrom(
            com.google.protobuf.CodedInputStream input)
            throws java.io.IOException {
          return PARSER.parseFrom(input);
        }
        public static SubscribeReq parseFrom(
            com.google.protobuf.CodedInputStream input,
            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
            throws java.io.IOException {
          return PARSER.parseFrom(input, extensionRegistry);
        }
    
        public static Builder newBuilder() { return Builder.create(); }
        public Builder newBuilderForType() { return newBuilder(); }
        public static Builder newBuilder(SubscribeReq prototype) {
          return newBuilder().mergeFrom(prototype);
        }
        public Builder toBuilder() { return newBuilder(this); }
    
        @Override
        protected Builder newBuilderForType(
            BuilderParent parent) {
          Builder builder = new Builder(parent);
          return builder;
        }
        /**
         * Protobuf type {@code netty.SubscribeReq}
         */
        public static final class Builder extends
            com.google.protobuf.GeneratedMessage.Builder<Builder>
           implements SubscribeReqOrBuilder {
          public static final com.google.protobuf.Descriptors.Descriptor
              getDescriptor() {
            return SubscribeReqProto.internal_static_netty_SubscribeReq_descriptor;
          }
    
          protected FieldAccessorTable
              internalGetFieldAccessorTable() {
            return SubscribeReqProto.internal_static_netty_SubscribeReq_fieldAccessorTable
                .ensureFieldAccessorsInitialized(
                    SubscribeReq.class, Builder.class);
          }
    
          // Construct using com.example.demo.protobuf.SubscribeReqProto.SubscribeReq.newBuilder()
          private Builder() {
            maybeForceBuilderInitialization();
          }
    
          private Builder(
              BuilderParent parent) {
            super(parent);
            maybeForceBuilderInitialization();
          }
          private void maybeForceBuilderInitialization() {
            if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
            }
          }
          private static Builder create() {
            return new Builder();
          }
    
          public Builder clear() {
            super.clear();
            subReqID_ = 0;
            bitField0_ = (bitField0_ & ~0x00000001);
            userName_ = "";
            bitField0_ = (bitField0_ & ~0x00000002);
            productName_ = "";
            bitField0_ = (bitField0_ & ~0x00000004);
            address_ = com.google.protobuf.LazyStringArrayList.EMPTY;
            bitField0_ = (bitField0_ & ~0x00000008);
            return this;
          }
    
          public Builder clone() {
            return create().mergeFrom(buildPartial());
          }
    
          public com.google.protobuf.Descriptors.Descriptor
              getDescriptorForType() {
            return SubscribeReqProto.internal_static_netty_SubscribeReq_descriptor;
          }
    
          public SubscribeReq getDefaultInstanceForType() {
            return SubscribeReq.getDefaultInstance();
          }
    
          public SubscribeReq build() {
            SubscribeReq result = buildPartial();
            if (!result.isInitialized()) {
              throw newUninitializedMessageException(result);
            }
            return result;
          }
    
          public SubscribeReq buildPartial() {
            SubscribeReq result = new SubscribeReq(this);
            int from_bitField0_ = bitField0_;
            int to_bitField0_ = 0;
            if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
              to_bitField0_ |= 0x00000001;
            }
            result.subReqID_ = subReqID_;
            if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
              to_bitField0_ |= 0x00000002;
            }
            result.userName_ = userName_;
            if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
              to_bitField0_ |= 0x00000004;
            }
            result.productName_ = productName_;
            if (((bitField0_ & 0x00000008) == 0x00000008)) {
              address_ = new com.google.protobuf.UnmodifiableLazyStringList(
                  address_);
              bitField0_ = (bitField0_ & ~0x00000008);
            }
            result.address_ = address_;
            result.bitField0_ = to_bitField0_;
            onBuilt();
            return result;
          }
    
          public Builder mergeFrom(com.google.protobuf.Message other) {
            if (other instanceof SubscribeReq) {
              return mergeFrom((SubscribeReq)other);
            } else {
              super.mergeFrom(other);
              return this;
            }
          }
    
          public Builder mergeFrom(SubscribeReq other) {
            if (other == SubscribeReq.getDefaultInstance()) return this;
            if (other.hasSubReqID()) {
              setSubReqID(other.getSubReqID());
            }
            if (other.hasUserName()) {
              bitField0_ |= 0x00000002;
              userName_ = other.userName_;
              onChanged();
            }
            if (other.hasProductName()) {
              bitField0_ |= 0x00000004;
              productName_ = other.productName_;
              onChanged();
            }
            if (!other.address_.isEmpty()) {
              if (address_.isEmpty()) {
                address_ = other.address_;
                bitField0_ = (bitField0_ & ~0x00000008);
              } else {
                ensureAddressIsMutable();
                address_.addAll(other.address_);
              }
              onChanged();
            }
            this.mergeUnknownFields(other.getUnknownFields());
            return this;
          }
    
          public final boolean isInitialized() {
            if (!hasSubReqID()) {
              
              return false;
            }
            if (!hasUserName()) {
              
              return false;
            }
            if (!hasProductName()) {
              
              return false;
            }
            return true;
          }
    
          public Builder mergeFrom(
              com.google.protobuf.CodedInputStream input,
              com.google.protobuf.ExtensionRegistryLite extensionRegistry)
              throws java.io.IOException {
            SubscribeReq parsedMessage = null;
            try {
              parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
            } catch (com.google.protobuf.InvalidProtocolBufferException e) {
              parsedMessage = (SubscribeReq) e.getUnfinishedMessage();
              throw e;
            } finally {
              if (parsedMessage != null) {
                mergeFrom(parsedMessage);
              }
            }
            return this;
          }
          private int bitField0_;
    
          // required int32 subReqID = 1;
          private int subReqID_ ;
          /**
           * <code>required int32 subReqID = 1;</code>
           */
          public boolean hasSubReqID() {
            return ((bitField0_ & 0x00000001) == 0x00000001);
          }
          /**
           * <code>required int32 subReqID = 1;</code>
           */
          public int getSubReqID() {
            return subReqID_;
          }
          /**
           * <code>required int32 subReqID = 1;</code>
           */
          public Builder setSubReqID(int value) {
            bitField0_ |= 0x00000001;
            subReqID_ = value;
            onChanged();
            return this;
          }
          /**
           * <code>required int32 subReqID = 1;</code>
           */
          public Builder clearSubReqID() {
            bitField0_ = (bitField0_ & ~0x00000001);
            subReqID_ = 0;
            onChanged();
            return this;
          }
    
          // required string userName = 2;
          private Object userName_ = "";
          /**
           * <code>required string userName = 2;</code>
           */
          public boolean hasUserName() {
            return ((bitField0_ & 0x00000002) == 0x00000002);
          }
          /**
           * <code>required string userName = 2;</code>
           */
          public String getUserName() {
            Object ref = userName_;
            if (!(ref instanceof String)) {
              String s = ((com.google.protobuf.ByteString) ref)
                  .toStringUtf8();
              userName_ = s;
              return s;
            } else {
              return (String) ref;
            }
          }
          /**
           * <code>required string userName = 2;</code>
           */
          public com.google.protobuf.ByteString
              getUserNameBytes() {
            Object ref = userName_;
            if (ref instanceof String) {
              com.google.protobuf.ByteString b = 
                  com.google.protobuf.ByteString.copyFromUtf8(
                      (String) ref);
              userName_ = b;
              return b;
            } else {
              return (com.google.protobuf.ByteString) ref;
            }
          }
          /**
           * <code>required string userName = 2;</code>
           */
          public Builder setUserName(
              String value) {
            if (value == null) {
        throw new NullPointerException();
      }
      bitField0_ |= 0x00000002;
            userName_ = value;
            onChanged();
            return this;
          }
          /**
           * <code>required string userName = 2;</code>
           */
          public Builder clearUserName() {
            bitField0_ = (bitField0_ & ~0x00000002);
            userName_ = getDefaultInstance().getUserName();
            onChanged();
            return this;
          }
          /**
           * <code>required string userName = 2;</code>
           */
          public Builder setUserNameBytes(
              com.google.protobuf.ByteString value) {
            if (value == null) {
        throw new NullPointerException();
      }
      bitField0_ |= 0x00000002;
            userName_ = value;
            onChanged();
            return this;
          }
    
          // required string productName = 3;
          private Object productName_ = "";
          /**
           * <code>required string productName = 3;</code>
           */
          public boolean hasProductName() {
            return ((bitField0_ & 0x00000004) == 0x00000004);
          }
          /**
           * <code>required string productName = 3;</code>
           */
          public String getProductName() {
            Object ref = productName_;
            if (!(ref instanceof String)) {
              String s = ((com.google.protobuf.ByteString) ref)
                  .toStringUtf8();
              productName_ = s;
              return s;
            } else {
              return (String) ref;
            }
          }
          /**
           * <code>required string productName = 3;</code>
           */
          public com.google.protobuf.ByteString
              getProductNameBytes() {
            Object ref = productName_;
            if (ref instanceof String) {
              com.google.protobuf.ByteString b = 
                  com.google.protobuf.ByteString.copyFromUtf8(
                      (String) ref);
              productName_ = b;
              return b;
            } else {
              return (com.google.protobuf.ByteString) ref;
            }
          }
          /**
           * <code>required string productName = 3;</code>
           */
          public Builder setProductName(
              String value) {
            if (value == null) {
        throw new NullPointerException();
      }
      bitField0_ |= 0x00000004;
            productName_ = value;
            onChanged();
            return this;
          }
          /**
           * <code>required string productName = 3;</code>
           */
          public Builder clearProductName() {
            bitField0_ = (bitField0_ & ~0x00000004);
            productName_ = getDefaultInstance().getProductName();
            onChanged();
            return this;
          }
          /**
           * <code>required string productName = 3;</code>
           */
          public Builder setProductNameBytes(
              com.google.protobuf.ByteString value) {
            if (value == null) {
        throw new NullPointerException();
      }
      bitField0_ |= 0x00000004;
            productName_ = value;
            onChanged();
            return this;
          }
    
          // repeated string address = 4;
          private com.google.protobuf.LazyStringList address_ = com.google.protobuf.LazyStringArrayList.EMPTY;
          private void ensureAddressIsMutable() {
            if (!((bitField0_ & 0x00000008) == 0x00000008)) {
              address_ = new com.google.protobuf.LazyStringArrayList(address_);
              bitField0_ |= 0x00000008;
             }
          }
          /**
           * <code>repeated string address = 4;</code>
           */
          public java.util.List<String>
              getAddressList() {
            return java.util.Collections.unmodifiableList(address_);
          }
          /**
           * <code>repeated string address = 4;</code>
           */
          public int getAddressCount() {
            return address_.size();
          }
          /**
           * <code>repeated string address = 4;</code>
           */
          public String getAddress(int index) {
            return address_.get(index);
          }
          /**
           * <code>repeated string address = 4;</code>
           */
          public com.google.protobuf.ByteString
              getAddressBytes(int index) {
            return address_.getByteString(index);
          }
          /**
           * <code>repeated string address = 4;</code>
           */
          public Builder setAddress(
              int index, String value) {
            if (value == null) {
        throw new NullPointerException();
      }
      ensureAddressIsMutable();
            address_.set(index, value);
            onChanged();
            return this;
          }
          /**
           * <code>repeated string address = 4;</code>
           */
          public Builder addAddress(
              String value) {
            if (value == null) {
        throw new NullPointerException();
      }
      ensureAddressIsMutable();
            address_.add(value);
            onChanged();
            return this;
          }
          /**
           * <code>repeated string address = 4;</code>
           */
          public Builder addAllAddress(
              Iterable<String> values) {
            ensureAddressIsMutable();
            super.addAll(values, address_);
            onChanged();
            return this;
          }
          /**
           * <code>repeated string address = 4;</code>
           */
          public Builder clearAddress() {
            address_ = com.google.protobuf.LazyStringArrayList.EMPTY;
            bitField0_ = (bitField0_ & ~0x00000008);
            onChanged();
            return this;
          }
          /**
           * <code>repeated string address = 4;</code>
           */
          public Builder addAddressBytes(
              com.google.protobuf.ByteString value) {
            if (value == null) {
        throw new NullPointerException();
      }
      ensureAddressIsMutable();
            address_.add(value);
            onChanged();
            return this;
          }
    
          // @@protoc_insertion_point(builder_scope:netty.SubscribeReq)
        }
    
        static {
          defaultInstance = new SubscribeReq(true);
          defaultInstance.initFields();
        }
    
        // @@protoc_insertion_point(class_scope:netty.SubscribeReq)
      }
    
      private static com.google.protobuf.Descriptors.Descriptor
        internal_static_netty_SubscribeReq_descriptor;
      private static
        com.google.protobuf.GeneratedMessage.FieldAccessorTable
          internal_static_netty_SubscribeReq_fieldAccessorTable;
    
      public static com.google.protobuf.Descriptors.FileDescriptor
          getDescriptor() {
        return descriptor;
      }
      private static com.google.protobuf.Descriptors.FileDescriptor
          descriptor;
      static {
        String[] descriptorData = {
          "
    22SubscribeReq.proto2205netty"X
    14Subscribe" +
          "Req2220
    10subReqID3001 02(052220
    10userName3002 02(	22" +
          "23
    13productName3003 02(	2217
    07address3004 03(	B.
    " +
          "31com.example.demo.protobufB21SubscribeReq" +
          "Proto"
        };
        com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
          new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
            public com.google.protobuf.ExtensionRegistry assignDescriptors(
                com.google.protobuf.Descriptors.FileDescriptor root) {
              descriptor = root;
              internal_static_netty_SubscribeReq_descriptor =
                getDescriptor().getMessageTypes().get(0);
              internal_static_netty_SubscribeReq_fieldAccessorTable = new
                com.google.protobuf.GeneratedMessage.FieldAccessorTable(
                  internal_static_netty_SubscribeReq_descriptor,
                  new String[] { "SubReqID", "UserName", "ProductName", "Address", });
              return null;
            }
          };
        com.google.protobuf.Descriptors.FileDescriptor
          .internalBuildGeneratedFileFrom(descriptorData,
            new com.google.protobuf.Descriptors.FileDescriptor[] {
            }, assigner);
      }
    
      // @@protoc_insertion_point(outer_class_scope)
    }

     

        2.SubscribeRespProto

     

    public final class SubscribeRespProto {
      private SubscribeRespProto() {}
      public static void registerAllExtensions(
          com.google.protobuf.ExtensionRegistry registry) {
      }
      public interface SubscribeRespOrBuilder
          extends com.google.protobuf.MessageOrBuilder {
    
        // required int32 subReqID = 1;
        /**
         * <code>required int32 subReqID = 1;</code>
         */
        boolean hasSubReqID();
        /**
         * <code>required int32 subReqID = 1;</code>
         */
        int getSubReqID();
    
        // required int32 respCode = 2;
        /**
         * <code>required int32 respCode = 2;</code>
         */
        boolean hasRespCode();
        /**
         * <code>required int32 respCode = 2;</code>
         */
        int getRespCode();
    
        // required string desc = 3;
        /**
         * <code>required string desc = 3;</code>
         */
        boolean hasDesc();
        /**
         * <code>required string desc = 3;</code>
         */
        String getDesc();
        /**
         * <code>required string desc = 3;</code>
         */
        com.google.protobuf.ByteString
            getDescBytes();
      }
      /**
       * Protobuf type {@code netty.SubscribeResp}
       */
      public static final class SubscribeResp extends
          com.google.protobuf.GeneratedMessage
          implements SubscribeRespOrBuilder {
        // Use SubscribeResp.newBuilder() to construct.
        private SubscribeResp(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
          super(builder);
          this.unknownFields = builder.getUnknownFields();
        }
        private SubscribeResp(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
    
        private static final SubscribeResp defaultInstance;
        public static SubscribeResp getDefaultInstance() {
          return defaultInstance;
        }
    
        public SubscribeResp getDefaultInstanceForType() {
          return defaultInstance;
        }
    
        private final com.google.protobuf.UnknownFieldSet unknownFields;
        @Override
        public final com.google.protobuf.UnknownFieldSet
            getUnknownFields() {
          return this.unknownFields;
        }
        private SubscribeResp(
            com.google.protobuf.CodedInputStream input,
            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
            throws com.google.protobuf.InvalidProtocolBufferException {
          initFields();
          int mutable_bitField0_ = 0;
          com.google.protobuf.UnknownFieldSet.Builder unknownFields =
              com.google.protobuf.UnknownFieldSet.newBuilder();
          try {
            boolean done = false;
            while (!done) {
              int tag = input.readTag();
              switch (tag) {
                case 0:
                  done = true;
                  break;
                default: {
                  if (!parseUnknownField(input, unknownFields,
                                         extensionRegistry, tag)) {
                    done = true;
                  }
                  break;
                }
                case 8: {
                  bitField0_ |= 0x00000001;
                  subReqID_ = input.readInt32();
                  break;
                }
                case 16: {
                  bitField0_ |= 0x00000002;
                  respCode_ = input.readInt32();
                  break;
                }
                case 26: {
                  bitField0_ |= 0x00000004;
                  desc_ = input.readBytes();
                  break;
                }
              }
            }
          } catch (com.google.protobuf.InvalidProtocolBufferException e) {
            throw e.setUnfinishedMessage(this);
          } catch (java.io.IOException e) {
            throw new com.google.protobuf.InvalidProtocolBufferException(
                e.getMessage()).setUnfinishedMessage(this);
          } finally {
            this.unknownFields = unknownFields.build();
            makeExtensionsImmutable();
          }
        }
        public static final com.google.protobuf.Descriptors.Descriptor
            getDescriptor() {
          return SubscribeRespProto.internal_static_netty_SubscribeResp_descriptor;
        }
    
        protected FieldAccessorTable
            internalGetFieldAccessorTable() {
          return SubscribeRespProto.internal_static_netty_SubscribeResp_fieldAccessorTable
              .ensureFieldAccessorsInitialized(
                  SubscribeResp.class, Builder.class);
        }
    
        public static com.google.protobuf.Parser<SubscribeResp> PARSER =
            new com.google.protobuf.AbstractParser<SubscribeResp>() {
          public SubscribeResp parsePartialFrom(
              com.google.protobuf.CodedInputStream input,
              com.google.protobuf.ExtensionRegistryLite extensionRegistry)
              throws com.google.protobuf.InvalidProtocolBufferException {
            return new SubscribeResp(input, extensionRegistry);
          }
        };
    
        @Override
        public com.google.protobuf.Parser<SubscribeResp> getParserForType() {
          return PARSER;
        }
    
        private int bitField0_;
        // required int32 subReqID = 1;
        public static final int SUBREQID_FIELD_NUMBER = 1;
        private int subReqID_;
        /**
         * <code>required int32 subReqID = 1;</code>
         */
        public boolean hasSubReqID() {
          return ((bitField0_ & 0x00000001) == 0x00000001);
        }
        /**
         * <code>required int32 subReqID = 1;</code>
         */
        public int getSubReqID() {
          return subReqID_;
        }
    
        // required int32 respCode = 2;
        public static final int RESPCODE_FIELD_NUMBER = 2;
        private int respCode_;
        /**
         * <code>required int32 respCode = 2;</code>
         */
        public boolean hasRespCode() {
          return ((bitField0_ & 0x00000002) == 0x00000002);
        }
        /**
         * <code>required int32 respCode = 2;</code>
         */
        public int getRespCode() {
          return respCode_;
        }
    
        // required string desc = 3;
        public static final int DESC_FIELD_NUMBER = 3;
        private Object desc_;
        /**
         * <code>required string desc = 3;</code>
         */
        public boolean hasDesc() {
          return ((bitField0_ & 0x00000004) == 0x00000004);
        }
        /**
         * <code>required string desc = 3;</code>
         */
        public String getDesc() {
          Object ref = desc_;
          if (ref instanceof String) {
            return (String) ref;
          } else {
            com.google.protobuf.ByteString bs = 
                (com.google.protobuf.ByteString) ref;
            String s = bs.toStringUtf8();
            if (bs.isValidUtf8()) {
              desc_ = s;
            }
            return s;
          }
        }
        /**
         * <code>required string desc = 3;</code>
         */
        public com.google.protobuf.ByteString
            getDescBytes() {
          Object ref = desc_;
          if (ref instanceof String) {
            com.google.protobuf.ByteString b = 
                com.google.protobuf.ByteString.copyFromUtf8(
                    (String) ref);
            desc_ = b;
            return b;
          } else {
            return (com.google.protobuf.ByteString) ref;
          }
        }
    
        private void initFields() {
          subReqID_ = 0;
          respCode_ = 0;
          desc_ = "";
        }
        private byte memoizedIsInitialized = -1;
        public final boolean isInitialized() {
          byte isInitialized = memoizedIsInitialized;
          if (isInitialized != -1) return isInitialized == 1;
    
          if (!hasSubReqID()) {
            memoizedIsInitialized = 0;
            return false;
          }
          if (!hasRespCode()) {
            memoizedIsInitialized = 0;
            return false;
          }
          if (!hasDesc()) {
            memoizedIsInitialized = 0;
            return false;
          }
          memoizedIsInitialized = 1;
          return true;
        }
    
        public void writeTo(com.google.protobuf.CodedOutputStream output)
                            throws java.io.IOException {
          getSerializedSize();
          if (((bitField0_ & 0x00000001) == 0x00000001)) {
            output.writeInt32(1, subReqID_);
          }
          if (((bitField0_ & 0x00000002) == 0x00000002)) {
            output.writeInt32(2, respCode_);
          }
          if (((bitField0_ & 0x00000004) == 0x00000004)) {
            output.writeBytes(3, getDescBytes());
          }
          getUnknownFields().writeTo(output);
        }
    
        private int memoizedSerializedSize = -1;
        public int getSerializedSize() {
          int size = memoizedSerializedSize;
          if (size != -1) return size;
    
          size = 0;
          if (((bitField0_ & 0x00000001) == 0x00000001)) {
            size += com.google.protobuf.CodedOutputStream
              .computeInt32Size(1, subReqID_);
          }
          if (((bitField0_ & 0x00000002) == 0x00000002)) {
            size += com.google.protobuf.CodedOutputStream
              .computeInt32Size(2, respCode_);
          }
          if (((bitField0_ & 0x00000004) == 0x00000004)) {
            size += com.google.protobuf.CodedOutputStream
              .computeBytesSize(3, getDescBytes());
          }
          size += getUnknownFields().getSerializedSize();
          memoizedSerializedSize = size;
          return size;
        }
    
        private static final long serialVersionUID = 0L;
        @Override
        protected Object writeReplace()
            throws java.io.ObjectStreamException {
          return super.writeReplace();
        }
    
        public static SubscribeResp parseFrom(
            com.google.protobuf.ByteString data)
            throws com.google.protobuf.InvalidProtocolBufferException {
          return PARSER.parseFrom(data);
        }
        public static SubscribeResp parseFrom(
            com.google.protobuf.ByteString data,
            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
            throws com.google.protobuf.InvalidProtocolBufferException {
          return PARSER.parseFrom(data, extensionRegistry);
        }
        public static SubscribeResp parseFrom(byte[] data)
            throws com.google.protobuf.InvalidProtocolBufferException {
          return PARSER.parseFrom(data);
        }
        public static SubscribeResp parseFrom(
            byte[] data,
            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
            throws com.google.protobuf.InvalidProtocolBufferException {
          return PARSER.parseFrom(data, extensionRegistry);
        }
        public static SubscribeResp parseFrom(java.io.InputStream input)
            throws java.io.IOException {
          return PARSER.parseFrom(input);
        }
        public static SubscribeResp parseFrom(
            java.io.InputStream input,
            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
            throws java.io.IOException {
          return PARSER.parseFrom(input, extensionRegistry);
        }
        public static SubscribeResp parseDelimitedFrom(java.io.InputStream input)
            throws java.io.IOException {
          return PARSER.parseDelimitedFrom(input);
        }
        public static SubscribeResp parseDelimitedFrom(
            java.io.InputStream input,
            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
            throws java.io.IOException {
          return PARSER.parseDelimitedFrom(input, extensionRegistry);
        }
        public static SubscribeResp parseFrom(
            com.google.protobuf.CodedInputStream input)
            throws java.io.IOException {
          return PARSER.parseFrom(input);
        }
        public static SubscribeResp parseFrom(
            com.google.protobuf.CodedInputStream input,
            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
            throws java.io.IOException {
          return PARSER.parseFrom(input, extensionRegistry);
        }
    
        public static Builder newBuilder() { return Builder.create(); }
        public Builder newBuilderForType() { return newBuilder(); }
        public static Builder newBuilder(SubscribeResp prototype) {
          return newBuilder().mergeFrom(prototype);
        }
        public Builder toBuilder() { return newBuilder(this); }
    
        @Override
        protected Builder newBuilderForType(
            BuilderParent parent) {
          Builder builder = new Builder(parent);
          return builder;
        }
        /**
         * Protobuf type {@code netty.SubscribeResp}
         */
        public static final class Builder extends
            com.google.protobuf.GeneratedMessage.Builder<Builder>
           implements SubscribeRespOrBuilder {
          public static final com.google.protobuf.Descriptors.Descriptor
              getDescriptor() {
            return SubscribeRespProto.internal_static_netty_SubscribeResp_descriptor;
          }
    
          protected FieldAccessorTable
              internalGetFieldAccessorTable() {
            return SubscribeRespProto.internal_static_netty_SubscribeResp_fieldAccessorTable
                .ensureFieldAccessorsInitialized(
                    SubscribeResp.class, Builder.class);
          }
    
          // Construct using com.example.demo.protobuf.SubscribeRespProto.SubscribeResp.newBuilder()
          private Builder() {
            maybeForceBuilderInitialization();
          }
    
          private Builder(
              BuilderParent parent) {
            super(parent);
            maybeForceBuilderInitialization();
          }
          private void maybeForceBuilderInitialization() {
            if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
            }
          }
          private static Builder create() {
            return new Builder();
          }
    
          public Builder clear() {
            super.clear();
            subReqID_ = 0;
            bitField0_ = (bitField0_ & ~0x00000001);
            respCode_ = 0;
            bitField0_ = (bitField0_ & ~0x00000002);
            desc_ = "";
            bitField0_ = (bitField0_ & ~0x00000004);
            return this;
          }
    
          public Builder clone() {
            return create().mergeFrom(buildPartial());
          }
    
          public com.google.protobuf.Descriptors.Descriptor
              getDescriptorForType() {
            return SubscribeRespProto.internal_static_netty_SubscribeResp_descriptor;
          }
    
          public SubscribeResp getDefaultInstanceForType() {
            return SubscribeResp.getDefaultInstance();
          }
    
          public SubscribeResp build() {
            SubscribeResp result = buildPartial();
            if (!result.isInitialized()) {
              throw newUninitializedMessageException(result);
            }
            return result;
          }
    
          public SubscribeResp buildPartial() {
            SubscribeResp result = new SubscribeResp(this);
            int from_bitField0_ = bitField0_;
            int to_bitField0_ = 0;
            if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
              to_bitField0_ |= 0x00000001;
            }
            result.subReqID_ = subReqID_;
            if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
              to_bitField0_ |= 0x00000002;
            }
            result.respCode_ = respCode_;
            if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
              to_bitField0_ |= 0x00000004;
            }
            result.desc_ = desc_;
            result.bitField0_ = to_bitField0_;
            onBuilt();
            return result;
          }
    
          public Builder mergeFrom(com.google.protobuf.Message other) {
            if (other instanceof SubscribeResp) {
              return mergeFrom((SubscribeResp)other);
            } else {
              super.mergeFrom(other);
              return this;
            }
          }
    
          public Builder mergeFrom(SubscribeResp other) {
            if (other == SubscribeResp.getDefaultInstance()) return this;
            if (other.hasSubReqID()) {
              setSubReqID(other.getSubReqID());
            }
            if (other.hasRespCode()) {
              setRespCode(other.getRespCode());
            }
            if (other.hasDesc()) {
              bitField0_ |= 0x00000004;
              desc_ = other.desc_;
              onChanged();
            }
            this.mergeUnknownFields(other.getUnknownFields());
            return this;
          }
    
          public final boolean isInitialized() {
            if (!hasSubReqID()) {
              
              return false;
            }
            if (!hasRespCode()) {
              
              return false;
            }
            if (!hasDesc()) {
              
              return false;
            }
            return true;
          }
    
          public Builder mergeFrom(
              com.google.protobuf.CodedInputStream input,
              com.google.protobuf.ExtensionRegistryLite extensionRegistry)
              throws java.io.IOException {
            SubscribeResp parsedMessage = null;
            try {
              parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
            } catch (com.google.protobuf.InvalidProtocolBufferException e) {
              parsedMessage = (SubscribeResp) e.getUnfinishedMessage();
              throw e;
            } finally {
              if (parsedMessage != null) {
                mergeFrom(parsedMessage);
              }
            }
            return this;
          }
          private int bitField0_;
    
          // required int32 subReqID = 1;
          private int subReqID_ ;
          /**
           * <code>required int32 subReqID = 1;</code>
           */
          public boolean hasSubReqID() {
            return ((bitField0_ & 0x00000001) == 0x00000001);
          }
          /**
           * <code>required int32 subReqID = 1;</code>
           */
          public int getSubReqID() {
            return subReqID_;
          }
          /**
           * <code>required int32 subReqID = 1;</code>
           */
          public Builder setSubReqID(int value) {
            bitField0_ |= 0x00000001;
            subReqID_ = value;
            onChanged();
            return this;
          }
          /**
           * <code>required int32 subReqID = 1;</code>
           */
          public Builder clearSubReqID() {
            bitField0_ = (bitField0_ & ~0x00000001);
            subReqID_ = 0;
            onChanged();
            return this;
          }
    
          // required int32 respCode = 2;
          private int respCode_ ;
          /**
           * <code>required int32 respCode = 2;</code>
           */
          public boolean hasRespCode() {
            return ((bitField0_ & 0x00000002) == 0x00000002);
          }
          /**
           * <code>required int32 respCode = 2;</code>
           */
          public int getRespCode() {
            return respCode_;
          }
          /**
           * <code>required int32 respCode = 2;</code>
           */
          public Builder setRespCode(int value) {
            bitField0_ |= 0x00000002;
            respCode_ = value;
            onChanged();
            return this;
          }
          /**
           * <code>required int32 respCode = 2;</code>
           */
          public Builder clearRespCode() {
            bitField0_ = (bitField0_ & ~0x00000002);
            respCode_ = 0;
            onChanged();
            return this;
          }
    
          // required string desc = 3;
          private Object desc_ = "";
          /**
           * <code>required string desc = 3;</code>
           */
          public boolean hasDesc() {
            return ((bitField0_ & 0x00000004) == 0x00000004);
          }
          /**
           * <code>required string desc = 3;</code>
           */
          public String getDesc() {
            Object ref = desc_;
            if (!(ref instanceof String)) {
              String s = ((com.google.protobuf.ByteString) ref)
                  .toStringUtf8();
              desc_ = s;
              return s;
            } else {
              return (String) ref;
            }
          }
          /**
           * <code>required string desc = 3;</code>
           */
          public com.google.protobuf.ByteString
              getDescBytes() {
            Object ref = desc_;
            if (ref instanceof String) {
              com.google.protobuf.ByteString b = 
                  com.google.protobuf.ByteString.copyFromUtf8(
                      (String) ref);
              desc_ = b;
              return b;
            } else {
              return (com.google.protobuf.ByteString) ref;
            }
          }
          /**
           * <code>required string desc = 3;</code>
           */
          public Builder setDesc(
              String value) {
            if (value == null) {
        throw new NullPointerException();
      }
      bitField0_ |= 0x00000004;
            desc_ = value;
            onChanged();
            return this;
          }
          /**
           * <code>required string desc = 3;</code>
           */
          public Builder clearDesc() {
            bitField0_ = (bitField0_ & ~0x00000004);
            desc_ = getDefaultInstance().getDesc();
            onChanged();
            return this;
          }
          /**
           * <code>required string desc = 3;</code>
           */
          public Builder setDescBytes(
              com.google.protobuf.ByteString value) {
            if (value == null) {
        throw new NullPointerException();
      }
      bitField0_ |= 0x00000004;
            desc_ = value;
            onChanged();
            return this;
          }
    
          // @@protoc_insertion_point(builder_scope:netty.SubscribeResp)
        }
    
        static {
          defaultInstance = new SubscribeResp(true);
          defaultInstance.initFields();
        }
    
        // @@protoc_insertion_point(class_scope:netty.SubscribeResp)
      }
    
      private static com.google.protobuf.Descriptors.Descriptor
        internal_static_netty_SubscribeResp_descriptor;
      private static
        com.google.protobuf.GeneratedMessage.FieldAccessorTable
          internal_static_netty_SubscribeResp_fieldAccessorTable;
    
      public static com.google.protobuf.Descriptors.FileDescriptor
          getDescriptor() {
        return descriptor;
      }
      private static com.google.protobuf.Descriptors.FileDescriptor
          descriptor;
      static {
        String[] descriptorData = {
          "
    23SubscribeResp.proto2205netty"A
    
    Subscrib" +
          "eResp2220
    10subReqID3001 02(052220
    10respCode3002 02(" +
          "052214
    04desc3003 02(	B/
    31com.example.demo.prot" +
          "obufB22SubscribeRespProto"
        };
        com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
          new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
            public com.google.protobuf.ExtensionRegistry assignDescriptors(
                com.google.protobuf.Descriptors.FileDescriptor root) {
              descriptor = root;
              internal_static_netty_SubscribeResp_descriptor =
                getDescriptor().getMessageTypes().get(0);
              internal_static_netty_SubscribeResp_fieldAccessorTable = new
                com.google.protobuf.GeneratedMessage.FieldAccessorTable(
                  internal_static_netty_SubscribeResp_descriptor,
                  new String[] { "SubReqID", "RespCode", "Desc", });
              return null;
            }
          };
        com.google.protobuf.Descriptors.FileDescriptor
          .internalBuildGeneratedFileFrom(descriptorData,
            new com.google.protobuf.Descriptors.FileDescriptor[] {
            }, assigner);
      }
    
      // @@protoc_insertion_point(outer_class_scope)
    }

     

        3.TestSubscribeReqProto

     

    import com.example.demo.protobuf.SubscribeReqProto;
    import com.google.protobuf.InvalidProtocolBufferException;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class TestSubscribeReqProto {
        private static byte[] encode(SubscribeReqProto.SubscribeReq req) {
            return req.toByteArray();//编码
        }
    
        private static SubscribeReqProto.SubscribeReq decode(byte[] body) throws InvalidProtocolBufferException {
            return SubscribeReqProto.SubscribeReq.parseFrom(body);//解码
        }
    
        private static SubscribeReqProto.SubscribeReq createSubscribeReq() {
            SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
    
            //设置属性
            builder.setSubReqID(1);
            builder.setUserName("plxz");
            builder.setProductName("netty");
    
            List<String> address = new ArrayList<>();
            address.add("GuangZhou xiaomanyao");
            address.add("BeiJing tiananmen");
            address.add("ChengDu chunxilu");
            builder.addAllAddress(address);//将集合对象设置到对应的属性中
            return builder.build();
        }
    
        public static void main(String[] args) throws InvalidProtocolBufferException {
            SubscribeReqProto.SubscribeReq req = createSubscribeReq();
            System.out.println("解码前 : " + req.toString());
            SubscribeReqProto.SubscribeReq req2 = decode(encode(req));
            System.out.println("解码后 : " + req2.toString());
            System.out.println("比较结果 : " + req2.equals(req));
        }
    }

     

      8.1.3运行Protobuf例程

    8.2Netty的Protobuf服务端开发

      8.2.1Protobuf版本的图书订购服务端开发

        1.SubReqServer

     

    import com.example.demo.protobuf.SubscribeReqProto;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.protobuf.ProtobufDecoder;
    import io.netty.handler.codec.protobuf.ProtobufEncoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    
    public class SubReqServer {
        public void bind(int port) throws Exception {
            //配置服务端的NIO线程组
            EventLoopGroup bossGroup = new NioEventLoopGroup();//用于接收客户端的线程组
            EventLoopGroup workerGroup = new NioEventLoopGroup();//用于用于网络读写的线程组
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)//设置通道
                        .option(ChannelOption.SO_BACKLOG, 1024)//设置TCP参数
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) {
                                ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());//用于半包消息的解码器
                                ch.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance()));//指定解码类型,不支持读半包
                                ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                                ch.pipeline().addLast(new ProtobufEncoder());
                                ch.pipeline().addLast(new SubReqServerHandler());
                            }
                        });
                ChannelFuture f = b.bind(port).sync();//绑定端口,同步等待成功
    
                f.channel().closeFuture().sync();//等待服务端监听端口关闭,才退出main函数
            } finally {
                //优雅退出,释放线程池资源
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port = 8080;
            if (NpeCheck.checkArray(args)) {
                try {
                    port = Integer.valueOf(args[0]);
                } catch (NumberFormatException e) {
                    //采用默认值
                }
            }
            new SubReqServer().bind(port);
        }
    }

     

        2.SubReqServerHandler

     

    import com.example.demo.protobuf.SubscribeReqProto;
    import com.example.demo.protobuf.SubscribeRespProto;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class SubReqServerHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            SubscribeReqProto.SubscribeReq req= (SubscribeReqProto.SubscribeReq) msg;//经过了ProtobufDecoder解码后可以直接使用消息
            if("plxz".equalsIgnoreCase(req.getUserName())){//验证用户名
                System.out.println("收到客户端请求 : "+req.toString());
                //使用了ProtobufEncoder自动编码,不需要对回送消息手工编码
                ctx.writeAndFlush(resp(req.getSubReqID()));
            }
        }
    
        private Object resp(int subReqID) {
            SubscribeRespProto.SubscribeResp.Builder builder=SubscribeRespProto.SubscribeResp.newBuilder();
            builder.setSubReqID(subReqID);
            builder.setRespCode(0);
            builder.setDesc("netty buy success");
            return builder.build();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }

     

      8.2.2Protobuf版本的图书订购客务端开发

        1.SubReqClient

     

    import com.example.demo.protobuf.SubscribeRespProto;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.protobuf.ProtobufDecoder;
    import io.netty.handler.codec.protobuf.ProtobufEncoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
    
    public class SubReqClient {
        public void connect(String host, int port) throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());//用于半包处理后添加ProtobufDecoder解码器
                                ch.pipeline().addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance()));//指定解码类型
                                ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                                ch.pipeline().addLast(new ProtobufEncoder());
                                ch.pipeline().addLast(new SubReqClientHandler());
                            }
                        });
                //发起异步连接操作
                ChannelFuture f = b.connect(host, port).sync();
    
                //等待客户端链路关闭
                f.channel().closeFuture().sync();
            } finally {
                //优雅退出,释放NIO线程组
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port = 8080;
            if (NpeCheck.checkArray(args)) {
                try {
                    port = Integer.valueOf(args[0]);//设置监听端口
                } catch (NumberFormatException e) {
                    //采用默认值
                }
            }
            new SubReqClient().connect("127.0.0.1", port);
        }
    }

     

     

        2.SubReqClientHandler

     

    import com.example.demo.protobuf.SubscribeReqProto;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class SubReqClientHandler extends ChannelHandlerAdapter {
        public SubReqClientHandler() {
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            for (int i = 0; i < 10; i++) {
                ctx.write(subReq(i));
            }
            ctx.flush();
        }
    
        private SubscribeReqProto.SubscribeReq subReq(int i) {
            SubscribeReqProto.SubscribeReq.Builder builder=SubscribeReqProto.SubscribeReq.newBuilder();
            builder.setSubReqID(i);
            builder.setUserName("plxz");
            builder.setProductName("netty");
            List<String> address = new ArrayList<>();
            address.add("GuangZhou xiaomanyao");
            address.add("BeiJing tiananmen");
            address.add("ChengDu chunxilu");
            builder.addAllAddress(address);//将集合对象设置到对应的属性中
            return builder.build();
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            System.out.println("收到服务端响应消息 = " + msg);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }

     

      8.2.3Protobuf版本的图书订购程序功能测试

    8.3Protobuf使用注意事项

     

    第9章  JBoss Marshalling编解码

    9.1Marshalling开发环境准备

    9.2Netty的Marshalling服务端开发

      1.SubReqServer

     

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    
    public class SubReqServer {
        public void bind(int port) throws Exception {
            //配置服务端的NIO线程组
            EventLoopGroup bossGroup = new NioEventLoopGroup();//用于接收客户端的线程组
            EventLoopGroup workerGroup = new NioEventLoopGroup();//用于用于网络读写的线程组
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)//设置通道
                        .option(ChannelOption.SO_BACKLOG, 1024)//设置TCP参数
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) {
                                ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());//创建解码器
                                ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());//创建编码器
                                ch.pipeline().addLast(new SubReqServerHandler());
                            }
                        });
                ChannelFuture f = b.bind(port).sync();//绑定端口,同步等待成功
    
                f.channel().closeFuture().sync();//等待服务端监听端口关闭,才退出main函数
            } finally {
                //优雅退出,释放线程池资源
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port = 8080;
            if (NpeCheck.checkArray(args)) {
                try {
                    port = Integer.valueOf(args[0]);
                } catch (NumberFormatException e) {
                    //采用默认值
                }
            }
            new SubReqServer().bind(port);
        }
    }

     

      2.MarshallingCodeCFactory

     

    import io.netty.handler.codec.marshalling.*;
    import org.jboss.marshalling.MarshallerFactory;
    import org.jboss.marshalling.Marshalling;
    import org.jboss.marshalling.MarshallingConfiguration;
    
    public class MarshallingCodeCFactory {
        //解码器
        public static MarshallingDecoder buildMarshallingDecoder() {
            //获取MarshallerFactory,serial表示创建的是java序列号工厂对象
            final MarshallerFactory marshallingCodeCFactory = Marshalling.getProvidedMarshallerFactory("serial");
            final MarshallingConfiguration configuration = new MarshallingConfiguration();//创建MarshallingConfiguration对象
            configuration.setVersion(5);//设置版本号
            UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallingCodeCFactory, configuration);
            //1024表示单个消息序列化后的最大长度
            MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
            return decoder;
        }
    
        //编码器
        public static MarshallingEncoder buildMarshallingEncoder() {
            final MarshallerFactory marshallingCodeCFactory = Marshalling.getProvidedMarshallerFactory("serial");
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            MarshallerProvider provider = new DefaultMarshallerProvider(marshallingCodeCFactory, configuration);
            //将实现序列化接口的POJO对象序列化为二进制数组
            MarshallingEncoder encoder = new MarshallingEncoder(provider);
            return encoder;
        }
    }

     

    9.3Netty的Marshalling客户端开发

      1.SubReqClient

     

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    public class SubReqClient {
        public void connect(String host, int port) throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());//创建解码器
                                ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());//创建编码器
                                ch.pipeline().addLast(new SubReqClientHandler());
                            }
                        });
                //发起异步连接操作
                ChannelFuture f = b.connect(host, port).sync();
    
                //等待客户端链路关闭
                f.channel().closeFuture().sync();
            } finally {
                //优雅退出,释放NIO线程组
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port = 8080;
            if (NpeCheck.checkArray(args)) {
                try {
                    port = Integer.valueOf(args[0]);//设置监听端口
                } catch (NumberFormatException e) {
                    //采用默认值
                }
            }
            new SubReqClient().connect("127.0.0.1", port);
        }
    }

     

    第10章  HTTP协议开发应用

    10.1HTTP协议介绍

      10.1.1HTTP协议的URL

      10.1.2HTTP请求消息

      10.1.3HTTP响应消息

    10.2Netty HTTP服务端入门开发

      10.2.1HTTP服务端例程场景描述

      10.2.2HTTP服务端开发

        1.HttpFileServer

     

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.http.*;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.stream.ChunkedWriteHandler;
    
    public class HttpFileServer {
        public static final String DEFAULT_URL = "/src/main/java/com/example/demo/resources/";
    
        public void run(final int port, final String url) throws Exception {
            //配置服务端的NIO线程组
            EventLoopGroup bossGroup = new NioEventLoopGroup();//用于接收客户端的线程组
            EventLoopGroup workerGroup = new NioEventLoopGroup();//用于用于网络读写的线程组
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)//设置通道
                        .option(ChannelOption.SO_BACKLOG, 1024)//设置TCP参数
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) {
                                ch.pipeline().addLast("http-decoder", new HttpRequestDecoder());//创建解码器
                                ch.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65536));//将多个消息合并
                                ch.pipeline().addLast("http-encoder", new HttpResponseEncoder());//创建编码器
                                ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());//支持异步发送大的码流,但不占用过多内存
                                ch.pipeline().addLast("fileServerHandler", new HttpFileServerHandler(url));//创建编码器
                            }
                        });
                ChannelFuture f = b.bind("127.0.0.1", port).sync();//绑定端口,同步等待成功
    
                f.channel().closeFuture().sync();//等待服务端监听端口关闭,才退出main函数
            } finally {
                //优雅退出,释放线程池资源
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port = 8080;
            if (NpeCheck.checkArray(args)) {
                try {
                    port = Integer.valueOf(args[0]);
                } catch (NumberFormatException e) {
                    //采用默认值
                }
            }
            new HttpFileServer().run(port, DEFAULT_URL);
        }
    }

     

        2.HttpFileServerHandler

     

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.*;
    import io.netty.handler.codec.http.*;
    import io.netty.handler.codec.http.HttpResponseStatus;
    import io.netty.handler.stream.ChunkedFile;
    import io.netty.util.CharsetUtil;
    
    import javax.activation.MimetypesFileTypeMap;
    import java.io.File;
    import java.io.FileNotFoundException;
    import java.io.RandomAccessFile;
    import java.net.URLDecoder;
    import java.util.regex.Pattern;
    
    import static io.netty.handler.codec.http.HttpHeaderNames.*;
    import static io.netty.handler.codec.http.HttpHeaderUtil.isKeepAlive;
    import static io.netty.handler.codec.http.HttpHeaderUtil.setContentLength;
    import static io.netty.handler.codec.http.HttpHeaderValues.KEEP_ALIVE;
    import static io.netty.handler.codec.http.HttpMethod.GET;
    import static io.netty.handler.codec.http.HttpResponseStatus.*;
    import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    
    public class HttpFileServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
        private final String url;
        private static final Pattern ALLOWED_FILE_NAME = Pattern.compile("[A-Za-z0-9][-_A-Za-z0-9\.]*");
        private static final Pattern INSECURE_URI = Pattern.compile(".*[<>&"].*");
    
        public HttpFileServerHandler(String url) {
            this.url = url;
        }
    
        @Override
        protected void messageReceived(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
            //消息解码
            if (!request.decoderResult().isSuccess()) {
                sendError(ctx, BAD_REQUEST);
                return;
            }
            //是否get请求
            if (request.method() != GET) {
                sendError(ctx, METHOD_NOT_ALLOWED);
                return;
            }
            final String uri = request.uri();
            final String path = sanitizeUri(uri);
            //是否有路径
            if (path == null) {
                sendError(ctx, FORBIDDEN);
                return;
            }
            File file = new File(path);
            //是否隐藏或存在
            if (file.isHidden() || !file.exists()) {
                sendError(ctx, NOT_FOUND);
                return;
            }
            //是否存在目录
            if (file.isDirectory()) {
                if (uri.endsWith("/")) {
                    senfListing(ctx, file);
                } else {
                    sendRedirect(ctx, uri + "/");
                }
                return;
            }
            //是否文件
            if (!file.isFile()) {
                sendError(ctx, FORBIDDEN);
                return;
            }
            RandomAccessFile randomAccessFile;
            try {
                randomAccessFile = new RandomAccessFile(file, "r");//以只读的方式打开
            } catch (FileNotFoundException fnfe) {
                sendError(ctx, NOT_FOUND);
                return;
            }
            long fileLength = randomAccessFile.length();//获取文件长度
            HttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, OK);
            setContentLength(response, fileLength);//设置内容长度
            setContentTypeHeader(response, file);//设置内容类型
            //是否长连进
            if (isKeepAlive(request)) {
                response.headers().set(CONNECTION, KEEP_ALIVE);
            }
            ctx.write(response);//发送响应消息
            ChannelFuture sendFileFutrue;
            //将文件写入缓冲区
            sendFileFutrue = ctx.write(new ChunkedFile(randomAccessFile, 0, fileLength, 8192), ctx.newProgressivePromise());
            sendFileFutrue.addListener(new ChannelProgressiveFutureListener() {
                public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
                    if (total < 0) {
                        System.err.println("progress:" + progress);
                    } else {
                        System.err.println("progress:" + progress + "/" + total); //打印进度
                    }
                }
    
                public void operationComplete(ChannelProgressiveFuture future) {
                    System.err.println("complete");//发送完成
                }
            });
    
            ChannelFuture lastChannelFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);//标识消息发送完毕
            if (!isKeepAlive(request)) {
                lastChannelFuture.addListener(ChannelFutureListener.CLOSE);//关闭连接
            }
        }
    
        private void sendError(ChannelHandlerContext channelHandlerContext, HttpResponseStatus status) {
            FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status.toString() + "
    ", CharsetUtil.UTF_8));
            response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
            channelHandlerContext.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
        }
    
        //url包装
        public String sanitizeUri(String uri) {
            //解码
            try {
                uri = URLDecoder.decode(uri, "UTF-8");
            } catch (Exception e) {
                try {
                    uri = URLDecoder.decode(uri, "ISO-8859-1");
                } catch (Exception ew) {
                    ew.printStackTrace();
                }
            }
            //合法性判断
            if (!uri.startsWith(url)) {
                return null;
            }
            if (!uri.startsWith("/")) {
                return null;
            }
            //替换为本地操作系统的文件路径分隔符
            uri = uri.replace('/', File.separatorChar);
            //合法性判断
            if (uri.contains(File.separator + '.') || uri.startsWith(".") || uri.endsWith(".") || INSECURE_URI.matcher(uri).matches()) {
                return null;
            }
            return System.getProperty("user.dir") + File.separator + uri;//返回绝对路径
        }
    
        //设置消息类型
        private void setContentTypeHeader(HttpResponse httpResponse, File file) {
            MimetypesFileTypeMap mimetypesFileTypeMap = new MimetypesFileTypeMap();
            httpResponse.headers().set(CONTENT_TYPE, mimetypesFileTypeMap.getContentType(file.getPath()));
        }
    
        //发送目录给浏览器
        private void senfListing(ChannelHandlerContext channelHandlerContext, File dir) {
            FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK);
            response.headers().set(CONTENT_TYPE, "text/html;charset=UTF-8");//设置消息头
            StringBuilder builder = new StringBuilder();//构造响应消息体
            String dirPath = dir.getPath();
            builder.append("<!DOCTYPE html> 
    ");
            builder.append("<html><head><title>");
            builder.append(dirPath);
            builder.append("目录:");
            builder.append("</title></head><body>
    ");
            builder.append("<h3>");
            builder.append(dirPath).append("目录:");
            builder.append("</h3>
    ");
            builder.append("<ul>");
            builder.append("<li>链接:<a href="../">..</a></li>
    ");
            //展示目录及其文件夹
            for (File f : dir.listFiles()) {
                if (f.isHidden() || !f.canRead()) {
                    continue;
                }
                String fname = f.getName();
                if (!ALLOWED_FILE_NAME.matcher(fname).matches()) {
                    continue;
                }
                builder.append("<li>链接:<a href=" ");
                builder.append(fname);
                builder.append("" >");
                builder.append(fname);
                builder.append("</a></li>
    ");
            }
            builder.append("</ul></body></html>
    ");
            ByteBuf byteBuf = Unpooled.copiedBuffer(builder, CharsetUtil.UTF_8);//分配对应消息的缓冲区对象
            response.content().writeBytes(byteBuf);//将缓冲区消息放入响应消息中
            byteBuf.release();//释放缓冲区
            channelHandlerContext.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);//将响应消息发送到缓冲区病刷新到socket
        }
    
        //设置重定向
        private void sendRedirect(ChannelHandlerContext channelHandlerContext, String newUri) {
            FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, FOUND);
            response.headers().set(LOCATION, newUri);
            channelHandlerContext.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
        }
    }

    10.3Netty HTTP+XML协议栈开发

      10.3.1开发场景介绍

      10.3.2HTTP+XML协议栈设计

      10.3.3高效的XML绑定框架JiBx

      10.3.4HTTP+XML编解码框架开发

        1.请求消息编码类

    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.http.DefaultFullHttpRequest;
    import io.netty.handler.codec.http.FullHttpRequest;
    import io.netty.handler.codec.http.HttpHeaders;
    import io.netty.handler.codec.http.HttpMethod;
    import io.netty.handler.codec.http.HttpVersion;
    import java.net.InetAddress;
    import java.util.List;
    
    //请求消息编码类
    public class HttpXmlRequestEncoder extends AbstractHttpXmlEncoder<HttpXmlRequest> {
        @Override
        protected void encode(ChannelHandlerContext ctx, HttpXmlRequest msg, List<Object> out) throws Exception {
            ByteBuf body = encode0(ctx, msg.getBody());//调用父类解码器将pojo对象解码成xml
            FullHttpRequest request = msg.getRequest();
            if (request == null) {
                //构造新的消息头
                request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/do", body);
                HttpHeaders headers = request.headers();
                headers.set(HttpHeaders.Names.HOST, InetAddress.getLocalHost().getHostAddress());
                headers.set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
                headers.set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP.toString() + ',' + HttpHeaders.Values.DEFLATE.toString());
                headers.set(HttpHeaders.Names.ACCEPT_CHARSET, "ISO-8859-1,utf-8;q=0.7,*;q=0.7");
                headers.set(HttpHeaders.Names.ACCEPT_LANGUAGE, "zh");
                headers.set(HttpHeaders.Names.USER_AGENT, "Netty xml Http Client side");
                headers.set(HttpHeaders.Names.ACCEPT, "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8");
            }
            HttpHeaders.setContentLength(request, body.readableBytes());//设置消息体长度
            out.add(request);
        }
    }

        2.请求消息编码基类

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToMessageEncoder;
    import java.io.StringWriter;
    import java.nio.charset.Charset;
    import org.jibx.runtime.BindingDirectory;
    import org.jibx.runtime.IBindingFactory;
    import org.jibx.runtime.IMarshallingContext;
    
    //请求消息编码基类
    public abstract class AbstractHttpXmlEncoder<T> extends MessageToMessageEncoder<T> {
        IBindingFactory factory = null;
        StringWriter writer = null;
        final static String CHARSET_NAME = "UTF-8";
        final static Charset UTF_8 = Charset.forName(CHARSET_NAME);
        
        //将pojo对象解码成xml
        protected ByteBuf encode0(ChannelHandlerContext ctx, Object body) throws Exception {
            factory = BindingDirectory.getFactory(body.getClass());
            writer = new StringWriter();
            IMarshallingContext mctx = factory.createMarshallingContext();
            mctx.setIndent(2);
            mctx.marshalDocument(body, CHARSET_NAME, null, writer);
            String xmlStr = writer.toString();
            writer.close();
            writer = null;
            ByteBuf encodeBuf = Unpooled.copiedBuffer(xmlStr, UTF_8);
            return encodeBuf;
        }
    
        @Skip
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            // 释放资源
            if (writer != null) {
                writer.close();
                writer = null;
            }
        }
    }

        3.请求消息类

    import io.netty.handler.codec.http.FullHttpRequest;
    
    //请求消息
    public class HttpXmlRequest {
    
        private FullHttpRequest request;
        private Object body;//编码对象
    
        public HttpXmlRequest(FullHttpRequest request, Object body) {
        this.request = request;
        this.body = body;
        }
    
        public final FullHttpRequest getRequest() {
        return request;
        }
    
        public final void setRequest(FullHttpRequest request) {
        this.request = request;
        }
    
        public final Object getBody() {
        return body;
        }
    
        public final void setBody(Object body) {
        this.body = body;
        }
    
        @Override
        public String toString() {
        return "HttpXmlRequest [request=" + request + ", body =" + body + "]";
        }
    }

        4.请求消息解码

    import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
    import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.http.DefaultFullHttpResponse;
    import io.netty.handler.codec.http.FullHttpRequest;
    import io.netty.handler.codec.http.FullHttpResponse;
    import io.netty.handler.codec.http.HttpResponseStatus;
    import io.netty.util.CharsetUtil;
    import java.util.List;
    //请求消息解码
    public class HttpXmlRequestDecoder extends AbstractHttpXmlDecoder<FullHttpRequest> {
    
        public HttpXmlRequestDecoder(Class<?> clazz) {
            this(clazz, false);
        }
    
        public HttpXmlRequestDecoder(Class<?> clazz, boolean isPrint) {
            super(clazz, isPrint);
        }
    
        @Override
        protected void decode(ChannelHandlerContext arg0, FullHttpRequest arg1, List<Object> arg2) throws Exception {
            //请求消息本身是否解码成功
            if (!arg1.getDecoderResult().isSuccess()) {
                sendError(arg0, BAD_REQUEST);
                return;
            }
            HttpXmlRequest request = new HttpXmlRequest(arg1, decode0(arg0, arg1.content()));
            arg2.add(request);//添加到解码结果列表
        }
    
        //构造处理结果异常的http应答消息
        private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
            FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status.toString() + "
    ", CharsetUtil.UTF_8));
            response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
            ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
        }
    }

        5.请求消息解码基类

    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToMessageDecoder;
    
    import java.io.StringReader;
    import java.nio.charset.Charset;
    
    import org.jibx.runtime.BindingDirectory;
    import org.jibx.runtime.IBindingFactory;
    import org.jibx.runtime.IUnmarshallingContext;
    
    //请求消息解码基类
    public abstract class AbstractHttpXmlDecoder<T> extends MessageToMessageDecoder<T> {
    
        private IBindingFactory factory;
        private StringReader reader;
        private Class<?> clazz;
        private boolean isPrint;
        private final static String CHARSET_NAME = "UTF-8";
        private final static Charset UTF_8 = Charset.forName(CHARSET_NAME);
    
        protected AbstractHttpXmlDecoder(Class<?> clazz) {
            this(clazz, false);
        }
    
        protected AbstractHttpXmlDecoder(Class<?> clazz, boolean isPrint) {
            this.clazz = clazz;
            this.isPrint = isPrint;
        }
    
        //将xml转换成pojo对象
        protected Object decode0(ChannelHandlerContext arg0, ByteBuf body) throws Exception {
            factory = BindingDirectory.getFactory(clazz);
            String content = body.toString(UTF_8);
            if (isPrint) {
                System.out.println("The body is : " + content);
            }
            reader = new StringReader(content);
            IUnmarshallingContext uctx = factory.createUnmarshallingContext();
            Object result = uctx.unmarshalDocument(reader);
            reader.close();
            reader = null;
            return result;
        }
    
        @Skip
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            // 释放资源
            if (reader != null) {
                reader.close();
                reader = null;
            }
        }
    }

        6.应答消息

    import io.netty.handler.codec.http.FullHttpResponse;
    
    //应答消息
    public class HttpXmlResponse {
        private FullHttpResponse httpResponse;
        private Object result;//pojo对象
    
        public HttpXmlResponse(FullHttpResponse httpResponse, Object result) {
            this.httpResponse = httpResponse;
            this.result = result;
        }
    
        public final FullHttpResponse getHttpResponse() {
            return httpResponse;
        }
    
        public final void setHttpResponse(FullHttpResponse httpResponse) {
            this.httpResponse = httpResponse;
        }
    
        public final Object getResult() {
            return result;
        }
    
        public final void setResult(Object result) {
            this.result = result;
        }
    
        @Override
        public String toString() {
            return "HttpXmlResponse [httpResponse=" + httpResponse + ", result=" + result + "]";
        }
    }

        7.应答消息编码类

    import static io.netty.handler.codec.http.HttpHeaders.setContentLength;
    import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    import static io.netty.handler.codec.http.HttpResponseStatus.OK;
    import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.http.DefaultFullHttpResponse;
    import io.netty.handler.codec.http.FullHttpResponse;
    
    import java.util.List;
    
    //应答消息编码类
    public class HttpXmlResponseEncoder extends
        AbstractHttpXmlEncoder<HttpXmlResponse> {
    
        protected void encode(ChannelHandlerContext ctx, HttpXmlResponse msg, List<Object> out) throws Exception {
        ByteBuf body = encode0(ctx, msg.getResult());
        FullHttpResponse response = msg.getHttpResponse();
        //构造新的http应答消息
        if (response == null) {
            response = new DefaultFullHttpResponse(HTTP_1_1, OK, body);
        } else {
            response = new DefaultFullHttpResponse(msg.getHttpResponse().getProtocolVersion(), msg.getHttpResponse().getStatus(), body);
        }
        response.headers().set(CONTENT_TYPE, "text/xml");//设置消息体内容格式
        setContentLength(response, body.readableBytes());
        out.add(response);//添加到解码列表
        }
    }

        8.应答消息解码类

    //应答消息解码类
    public class HttpXmlResponseDecoder extends
            AbstractHttpXmlDecoder<DefaultFullHttpResponse> {
    
        public HttpXmlResponseDecoder(Class<?> clazz) {
            this(clazz, false);
        }
    
        public HttpXmlResponseDecoder(Class<?> clazz, boolean isPrintlog) {
            super(clazz, isPrintlog);
        }
    
        @Override
        protected void decode(ChannelHandlerContext ctx, DefaultFullHttpResponse msg, List<Object> out) throws Exception {
            HttpXmlResponse resHttpXmlResponse = new HttpXmlResponse(msg, decode0(ctx, msg.content()));
            out.add(resHttpXmlResponse);
        }
    }

         9.客户端启动类

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpRequestEncoder;
    import io.netty.handler.codec.http.HttpResponseDecoder;
    
    import java.net.InetSocketAddress;
    
    import com.phei.netty.protocol.http.xml.codec.HttpXmlRequestEncoder;
    import com.phei.netty.protocol.http.xml.codec.HttpXmlResponseDecoder;
    import com.phei.netty.protocol.http.xml.pojo.Order;
    
    //客户端启动类
    public class HttpXmlClient {
    
        public void connect(int port) throws Exception {
            // 配置客户端NIO线程组
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast("http-decoder", new HttpResponseDecoder());//将二进制解码成http应答消息
                                ch.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65536));//合并成完整的http消息
                                ch.pipeline().addLast("xml-decoder", new HttpXmlResponseDecoder(Order.class, true));
                                ch.pipeline().addLast("http-encoder", new HttpRequestEncoder());
                                ch.pipeline().addLast("xml-encoder", new HttpXmlRequestEncoder());
                                ch.pipeline().addLast("xmlClientHandler", new HttpXmlClientHandle());
                            }
                        });
    
                // 发起异步连接操作
                ChannelFuture f = b.connect(new InetSocketAddress(port)).sync();
    
                // 当代客户端链路关闭
                f.channel().closeFuture().sync();
            } finally {
                // 优雅退出,释放NIO线程组
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port = 8080;
            if (args != null && args.length > 0) {
                try {
                    port = Integer.valueOf(args[0]);
                } catch (NumberFormatException e) {
                    // 采用默认值
                }
            }
            new HttpXmlClient().connect(port);
        }
    }

        10.客户端业务逻辑类

    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    import com.phei.netty.protocol.http.xml.codec.HttpXmlRequest;
    import com.phei.netty.protocol.http.xml.codec.HttpXmlResponse;
    import com.phei.netty.protocol.http.xml.pojo.OrderFactory;
    
    //客户端业务逻辑类
    public class HttpXmlClientHandle extends
            SimpleChannelInboundHandler<HttpXmlResponse> {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            HttpXmlRequest request = new HttpXmlRequest(null, OrderFactory.create(123));
            ctx.writeAndFlush(request);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    
        @Override
        protected void messageReceived(ChannelHandlerContext ctx, HttpXmlResponse msg) throws Exception {
            System.out.println("The client receive response of http header is : " + msg.getHttpResponse().headers().names());
            System.out.println("The client receive response of http body is : " + msg.getResult());
        }
    }

        11.订购对象工厂

    //订购对象工厂
    public class OrderFactory {
        public static Order create(long orderID) {
            Order order = new Order();
            order.setOrderNumber(orderID);
            order.setTotal(9999.999f);
            Address address = new Address();
            address.setCity("南京市");
            address.setCountry("中国");
            address.setPostCode("123321");
            address.setState("江苏省");
            address.setStreet1("龙眠大道");
            order.setBillTo(address);
            Customer customer = new Customer();
            customer.setCustomerNumber(orderID);
            customer.setFirstName("李");
            customer.setLastName("林峰");
            order.setCustomer(customer);
            order.setShipping(Shipping.INTERNATIONAL_MAIL);
            order.setShipTo(address);
            return order;
        }
    }

        12.服务端主程序

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpRequestDecoder;
    import io.netty.handler.codec.http.HttpResponseEncoder;
    
    import java.net.InetSocketAddress;
    
    import com.phei.netty.protocol.http.xml.codec.HttpXmlRequestDecoder;
    import com.phei.netty.protocol.http.xml.codec.HttpXmlResponseEncoder;
    import com.phei.netty.protocol.http.xml.pojo.Order;
    
    //服务端主程序
    public class HttpXmlServer {
        public void run(final int port) throws Exception {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast("http-decoder", new HttpRequestDecoder());
                                ch.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65536));
                                ch.pipeline().addLast("xml-decoder", new HttpXmlRequestDecoder(Order.class, true));
                                ch.pipeline().addLast("http-encoder", new HttpResponseEncoder());
                                ch.pipeline().addLast("xml-encoder", new HttpXmlResponseEncoder());
                                ch.pipeline().addLast("xmlServerHandler", new HttpXmlServerHandler());
                            }
                        });
                ChannelFuture future = b.bind(new InetSocketAddress(port)).sync();
                System.out.println("HTTP订购服务器启动,网址是 : " + "http://localhost:" + port);
                future.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port = 8080;
            if (args.length > 0) {
                try {
                    port = Integer.parseInt(args[0]);
                } catch (NumberFormatException e) {
                    e.printStackTrace();
                }
            }
            new HttpXmlServer().run(port);
        }
    }

        13.服务端处理类

    import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive;
    import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.codec.http.DefaultFullHttpResponse;
    import io.netty.handler.codec.http.FullHttpResponse;
    import io.netty.handler.codec.http.HttpRequest;
    import io.netty.handler.codec.http.HttpResponseStatus;
    import io.netty.util.CharsetUtil;
    import io.netty.util.concurrent.Future;
    import io.netty.util.concurrent.GenericFutureListener;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import com.phei.netty.protocol.http.xml.codec.HttpXmlRequest;
    import com.phei.netty.protocol.http.xml.codec.HttpXmlResponse;
    import com.phei.netty.protocol.http.xml.pojo.Address;
    import com.phei.netty.protocol.http.xml.pojo.Order;
    
    //服务端处理类
    public class HttpXmlServerHandler extends SimpleChannelInboundHandler<HttpXmlRequest> {
    
        @Override
        public void messageReceived(final ChannelHandlerContext ctx, HttpXmlRequest xmlRequest) throws Exception {
            HttpRequest request = xmlRequest.getRequest();
            Order order = (Order) xmlRequest.getBody();
            System.out.println("Http server receive request : " + order);
            dobusiness(order);//处理订购消息
            //发送应答消息
            ChannelFuture future = ctx.writeAndFlush(new HttpXmlResponse(null, order));
            if (!isKeepAlive(request)) {
                future.addListener(new GenericFutureListener<Future<? super Void>>() {
                    public void operationComplete(Future future) throws Exception {
                        ctx.close();
                    }
                });
            }
        }
    
        private void dobusiness(Order order) {
            order.getCustomer().setFirstName("狄");
            order.getCustomer().setLastName("仁杰");
            List<String> midNames = new ArrayList<String>();
            midNames.add("李元芳");
            order.getCustomer().setMiddleNames(midNames);
            Address address = order.getBillTo();
            address.setCity("洛阳");
            address.setCountry("大唐");
            address.setState("河南道");
            address.setPostCode("123456");
            order.setBillTo(address);
            order.setShipTo(address);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            if (ctx.channel().isActive()) {
                sendError(ctx, INTERNAL_SERVER_ERROR);
            }
        }
    
        private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
            FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, Unpooled.copiedBuffer("失败: " + status.toString() + "
    ", CharsetUtil.UTF_8));
            response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
            ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
        }
    }

     第11章   WebSocket协议开发

    11.1HTTP协议的弊端

    11.2WebSocket入门

      11.2.1WebSocket背景

      11.2.2WebSocket连接建立

      11.2.3WebSocket生命周期

      11.2.4WebSocket连接关闭

    11.3WebSocket协议开发

      11.3.1WebSocket服务端功能介绍

      11.3.2WebSocket服务端开发

        1.服务端启动类

     

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpServerCodec;
    import io.netty.handler.stream.ChunkedWriteHandler;
    
    public class WebSocketServer {
        public void run(int port) throws Exception {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
    
                            @Override
                            protected void initChannel(SocketChannel ch) {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast("http-codec", new HttpServerCodec());//将请求消息编解码为http消息
                                pipeline.addLast("aggregator", new HttpObjectAggregator(65536));//组合成完整的http消息
                                ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());//向客户端发送和html5文件
                                pipeline.addLast("handler", new WebSocketServerHandler());
                            }
                        });
    
                Channel ch = b.bind(port).sync().channel();
                System.out.println("Web socket server started at port " + port + '.');
                System.out.println("Open your browser and navigate to http://localhost:" + port + '/');
                ch.closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port = 8080;
            if (args.length > 0) {
                try {
                    port = Integer.parseInt(args[0]);
                } catch (NumberFormatException e) {
                    e.printStackTrace();
                }
            }
            new WebSocketServer().run(port);
        }
    }

     

        2.服务端处理类

    import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive;
    import static io.netty.handler.codec.http.HttpHeaders.setContentLength;
    import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
    import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.codec.http.DefaultFullHttpResponse;
    import io.netty.handler.codec.http.FullHttpRequest;
    import io.netty.handler.codec.http.FullHttpResponse;
    import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.WebSocketFrame;
    import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
    import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
    import io.netty.util.CharsetUtil;
    
    import java.util.logging.Level;
    import java.util.logging.Logger;
    
    public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
        private static final Logger logger = Logger.getLogger(WebSocketServerHandler.class.getName());
    
        private WebSocketServerHandshaker handshaker;
    
        @Override
        public void messageReceived(ChannelHandlerContext ctx, Object msg) {
            // 传统的HTTP接入
            if (msg instanceof FullHttpRequest) {
                handleHttpRequest(ctx, (FullHttpRequest) msg);
            } else if (msg instanceof WebSocketFrame) { // WebSocket接入
                handleWebSocketFrame(ctx, (WebSocketFrame) msg);
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
            ctx.flush();
        }
    
        private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
            // 如果HTTP解码失败,返回HHTP异常
            if (!req.getDecoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
                sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
                return;
            }
    
            // 构造握手响应返回,本机测试
            WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:8080/websocket", null, false);
            handshaker = wsFactory.newHandshaker(req);
            if (handshaker == null) {
                WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
            } else {
                handshaker.handshake(ctx.channel(), req);
            }
        }
    
        private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
            // 判断是否是关闭链路的指令
            if (frame instanceof CloseWebSocketFrame) {
                handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
                return;
            }
            // 判断是否是Ping消息
            if (frame instanceof PingWebSocketFrame) {
                ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
                return;
            }
            // 本例程仅支持文本消息,不支持二进制消息
            if (!(frame instanceof TextWebSocketFrame)) {
                throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
            }
    
            // 返回应答消息
            String request = ((TextWebSocketFrame) frame).text();
            if (logger.isLoggable(Level.FINE)) {
                logger.fine(String.format("%s received %s", ctx.channel(), request));
            }
            ctx.channel().write(new TextWebSocketFrame(request + " , 欢迎使用Netty WebSocket服务,现在时刻:" + new java.util.Date().toString()));
        }
    
        private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
            // 返回应答给客户端
            if (res.getStatus().code() != 200) {
                ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
                res.content().writeBytes(buf);
                buf.release();
                setContentLength(res, res.content().readableBytes());
            }
    
            // 如果是非Keep-Alive,关闭连接
            ChannelFuture f = ctx.channel().writeAndFlush(res);
            if (!isKeepAlive(req) || res.getStatus().code() != 200) {
                f.addListener(ChannelFutureListener.CLOSE);
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }

        3.页面

    <!DOCTYPE html>
    <html>
    <head>
        <meta charset="UTF-8">
        Netty WebSocket 时间服务器
    </head>
    <br>
    <body>
    <br>
    <script type="text/javascript">
        var socket;
        if (!window.WebSocket) {
            window.WebSocket = window.MozWebSocket;
        }
        if (window.WebSocket) {
            socket = new WebSocket("ws://localhost:8080/websocket");
            socket.onmessage = function (event) {
                var ta = document.getElementById('responseText');
                ta.value = "";
                ta.value = event.data
            };
            socket.onopen = function (event) {
                var ta = document.getElementById('responseText');
                ta.value = "打开WebSocket服务正常,浏览器支持WebSocket!";
            };
            socket.onclose = function (event) {
                var ta = document.getElementById('responseText');
                ta.value = "";
                ta.value = "WebSocket 关闭!";
            };
        } else {
            alert("抱歉,您的浏览器不支持WebSocket协议!");
        }
    
        function send(message) {
            if (!window.WebSocket) {
                return;
            }
            if (socket.readyState == WebSocket.OPEN) {
                socket.send(message);
            } else {
                alert("WebSocket连接没有建立成功!");
            }
        }
    </script>
    <form onsubmit="return false;">
        <input type="text" name="message" value="Netty最佳实践"/>
        <br><br>
        <input type="button" value="发送WebSocket请求消息" onclick="send(this.form.message.value)"/>
        <hr color="blue"/>
        <h3>服务端返回的应答消息</h3>
        <textarea id="responseText" style="500px;height:300px;"></textarea>
    </form>
    </body>
    </html>

    第12章  私有协议栈开发

    12.1私有协议介绍

    12.2Netty协议栈功能设计

      12.2.1网络拓扑图

      12.2.2协议栈功能描述

      12.2.3通信模型

      12.2.4消息定义

      12.2.5协议支持的字段类型

      12.2.6编解码规范

        1.编码

        2.解码

      12.2.7链路的建立

      12.2.8链路的关闭

      12.2.9可靠性设计

        1.心跳机制

        2.重连机制

        3.重复登录保护

        4.消息缓存重发

      12.2.10安全性设计

      12.2.11可扩展性设计

    12.3Netty协议栈开发

      12.3.1数据库结构定义

        1.NettyMessage类定义

     

    public final class NettyMessage {
        private Header header;//消息头
        private Object body;//消息体
    
        public final Header getHeader() {
            return header;
        }
    
        public final void setHeader(Header header) {
            this.header = header;
        }
    
        public final Object getBody() {
            return body;
        }
    
        public final void setBody(Object body) {
            this.body = body;
        }
    
        @Override
        public String toString() {
            return "NettyMessage [header=" + header + "]";
        }
    }

     

        2.消息头

     

    import java.util.HashMap;
    import java.util.Map;
    
    public final class Header {
        private int crcCode = 0xabef0101;
        private int length;// 消息长度
        private long sessionID;// 会话ID
        private byte type;// 消息类型
        private byte priority;// 消息优先级
        private Map<String, Object> attachment = new HashMap<String, Object>(); // 附件
    
        public final int getCrcCode() {
            return crcCode;
        }
    
        public final void setCrcCode(int crcCode) {
            this.crcCode = crcCode;
        }
    
        public final int getLength() {
            return length;
        }
    
        public final void setLength(int length) {
            this.length = length;
        }
    
        public final long getSessionID() {
            return sessionID;
        }
    
        public final void setSessionID(long sessionID) {
            this.sessionID = sessionID;
        }
    
        public final byte getType() {
            return type;
        }
    
        public final void setType(byte type) {
            this.type = type;
        }
    
        public final byte getPriority() {
            return priority;
        }
    
        public final void setPriority(byte priority) {
            this.priority = priority;
        }
    
        public final Map<String, Object> getAttachment() {
            return attachment;
        }
    
        public final void setAttachment(Map<String, Object> attachment) {
            this.attachment = attachment;
        }
    
        @Override
        public String toString() {
            return "Header [crcCode=" + crcCode + ", length=" + length + ", sessionID=" + sessionID + ", type=" + type + ", priority=" + priority + ", attachment=" + attachment + "]";
        }
    }

     

      12.3.2消息编解码

        1.编码器

     

    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    
    import java.io.IOException;
    import java.util.Map;
    
    import com.phei.netty.protocol.netty.struct.NettyMessage;
    
    public final class NettyMessageEncoder extends MessageToByteEncoder<NettyMessage> {
    
        MarshallingEncoder marshallingEncoder;
    
        public NettyMessageEncoder() throws IOException {
            this.marshallingEncoder = new MarshallingEncoder();
        }
    
        @Override
        protected void encode(ChannelHandlerContext ctx, NettyMessage msg, ByteBuf sendBuf) throws Exception {
            if (msg == null || msg.getHeader() == null) {
                throw new Exception("The encode message is null");
            }
            sendBuf.writeInt((msg.getHeader().getCrcCode()));
            sendBuf.writeInt((msg.getHeader().getLength()));
            sendBuf.writeLong((msg.getHeader().getSessionID()));
            sendBuf.writeByte((msg.getHeader().getType()));
            sendBuf.writeByte((msg.getHeader().getPriority()));
            sendBuf.writeInt((msg.getHeader().getAttachment().size()));
            String key;
            byte[] keyArray;
            Object value;
            for (Map.Entry<String, Object> param : msg.getHeader().getAttachment().entrySet()) {
                key = param.getKey();
                keyArray = key.getBytes("UTF-8");
                sendBuf.writeInt(keyArray.length);
                sendBuf.writeBytes(keyArray);
                value = param.getValue();
                marshallingEncoder.encode(value, sendBuf);
            }
            if (msg.getBody() != null) {
                marshallingEncoder.encode(msg.getBody(), sendBuf);
            } else
                sendBuf.writeInt(0);
            sendBuf.setInt(4, sendBuf.readableBytes() - 8);
        }
    }

     

        2.编码消息工具类

     

    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandler.Sharable;
    
    import java.io.IOException;
    
    import org.jboss.marshalling.Marshaller;
    
    @Sharable
    public class MarshallingEncoder {
    
        private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
        Marshaller marshaller;
    
        public MarshallingEncoder() throws IOException {
            marshaller = MarshallingCodecFactory.buildMarshalling();
        }
    
        protected void encode(Object msg, ByteBuf out) throws Exception {
            try {
                int lengthPos = out.writerIndex();
                out.writeBytes(LENGTH_PLACEHOLDER);
                ChannelBufferByteOutput output = new ChannelBufferByteOutput(out);
                marshaller.start(output);
                marshaller.writeObject(msg);
                marshaller.finish();
                out.setInt(lengthPos, out.writerIndex() - lengthPos - 4);
            } finally {
                marshaller.close();
            }
        }
    }

        3.解码器

    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    import com.phei.netty.protocol.netty.struct.Header;
    import com.phei.netty.protocol.netty.struct.NettyMessage;
    
    public class NettyMessageDecoder extends LengthFieldBasedFrameDecoder {
    
        MarshallingDecoder marshallingDecoder;
    
        public NettyMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) throws IOException {
            super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
            marshallingDecoder = new MarshallingDecoder();
        }
    
        @Override
        protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
            ByteBuf frame = (ByteBuf) super.decode(ctx, in);
            if (frame == null) {
                return null;//是半包消息
            }
    
            NettyMessage message = new NettyMessage();
            Header header = new Header();
            header.setCrcCode(frame.readInt());
            header.setLength(frame.readInt());
            header.setSessionID(frame.readLong());
            header.setType(frame.readByte());
            header.setPriority(frame.readByte());
    
            int size = frame.readInt();
            if (size > 0) {
                Map<String, Object> attch = new HashMap<>(size);
                int keySize = 0;
                byte[] keyArray;
                String key;
                for (int i = 0; i < size; i++) {
                    keySize = frame.readInt();
                    keyArray = new byte[keySize];
                    frame.readBytes(keyArray);
                    key = new String(keyArray, "UTF-8");
                    attch.put(key, marshallingDecoder.decode(frame));
                }
                header.setAttachment(attch);
            }
            if (frame.readableBytes() > 4) {
                message.setBody(marshallingDecoder.decode(frame));
            }
            message.setHeader(header);
            return message;
        }
    }

        4.消息解码工具类

     

    import io.netty.buffer.ByteBuf;
    
    import java.io.IOException;
    import org.jboss.marshalling.ByteInput;
    import org.jboss.marshalling.Unmarshaller;
    
    public class MarshallingDecoder {
    
        private final Unmarshaller unmarshaller;
    
        public MarshallingDecoder() throws IOException {
            unmarshaller = MarshallingCodecFactory.buildUnMarshalling();
        }
    
        protected Object decode(ByteBuf in) throws Exception {
            int objectSize = in.readInt();
            ByteBuf buf = in.slice(in.readerIndex(), objectSize);
            ByteInput input = new ChannelBufferByteInput(buf);
            try {
                unmarshaller.start(input);
                Object obj = unmarshaller.readObject();
                unmarshaller.finish();
                in.readerIndex(in.readerIndex() + objectSize);
                return obj;
            } finally {
                unmarshaller.close();
            }
        }
    }

      12.3.3握手和安全验证

        1.消息枚举

     

    public enum MessageType {
    
        SERVICE_REQ((byte) 0), 
        SERVICE_RESP((byte) 1), 
        ONE_WAY((byte) 2), 
        LOGIN_REQ((byte) 3), 
        LOGIN_RESP((byte) 4), 
        HEARTBEAT_REQ((byte) 5), 
        HEARTBEAT_RESP((byte) 6);
    
        private byte value;
    
        private MessageType(byte value) {
            this.value = value;
        }
    
        public byte value() {
            return this.value;
        }
    }

     

        2.握手认证客户端

     

    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    import com.phei.netty.protocol.netty.MessageType;
    import com.phei.netty.protocol.netty.struct.Header;
    import com.phei.netty.protocol.netty.struct.NettyMessage;
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    
    public class LoginAuthReqHandler extends ChannelHandlerAdapter {
    
        private static final Log LOG = LogFactory.getLog(LoginAuthReqHandler.class);
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            ctx.writeAndFlush(buildLoginReq());
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            NettyMessage message = (NettyMessage) msg;
    
            // 如果是握手应答消息,需要判断是否认证成功
            if (message.getHeader() != null && message.getHeader().getType() == MessageType.LOGIN_RESP.value()) {
                byte loginResult = (byte) message.getBody();
                if (loginResult != (byte) 0) {
                    // 握手失败,关闭连接
                    ctx.close();
                } else {
                    LOG.info("Login is ok : " + message);
                    ctx.fireChannelRead(msg);
                }
            } else
                ctx.fireChannelRead(msg);
        }
    
        //构造握手请求消息
        private NettyMessage buildLoginReq() {
            NettyMessage message = new NettyMessage();
            Header header = new Header();
            header.setType(MessageType.LOGIN_REQ.value());
            message.setHeader(header);
            return message;
        }
    
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            ctx.fireExceptionCaught(cause);
        }
    }

     

        3.握手认证服务端

     

    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    import java.net.InetSocketAddress;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    import com.phei.netty.protocol.netty.MessageType;
    import com.phei.netty.protocol.netty.struct.Header;
    import com.phei.netty.protocol.netty.struct.NettyMessage;
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    
    public class LoginAuthRespHandler extends ChannelHandlerAdapter {
    
        private final static Log LOG = LogFactory.getLog(LoginAuthRespHandler.class);
    
        private Map<String, Boolean> nodeCheck = new ConcurrentHashMap<>();//重复登录保护
        private String[] whitekList = {"127.0.0.1", "192.168.1.104"};//ip认证白名单列表
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            NettyMessage message = (NettyMessage) msg;
    
            // 如果是握手请求消息,处理,其它消息透传
            if (message.getHeader() != null && message.getHeader().getType() == MessageType.LOGIN_REQ.value()) {
                String nodeIndex = ctx.channel().remoteAddress().toString();
                NettyMessage loginResp;
                // 重复登陆,拒绝
                if (nodeCheck.containsKey(nodeIndex)) {
                    loginResp = buildResponse((byte) -1);
                } else {
                    //白名单验证
                    InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
                    String ip = address.getAddress().getHostAddress();
                    boolean isOK = false;
                    for (String WIP : whitekList) {
                        if (WIP.equals(ip)) {
                            isOK = true;
                            break;
                        }
                    }
                    loginResp = isOK ? buildResponse((byte) 0) : buildResponse((byte) -1);
                    if (isOK) {
                        nodeCheck.put(nodeIndex, true);
                    }
                }
                LOG.info("The login response is : " + loginResp + " body [" + loginResp.getBody() + "]");
                ctx.writeAndFlush(loginResp);
            } else {
                ctx.fireChannelRead(msg);
            }
        }
    
        private NettyMessage buildResponse(byte result) {
            NettyMessage message = new NettyMessage();
            Header header = new Header();
            header.setType(MessageType.LOGIN_RESP.value());
            message.setHeader(header);
            message.setBody(result);
            return message;
        }
    
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            nodeCheck.remove(ctx.channel().remoteAddress().toString());// 删除缓存
            ctx.close();
            ctx.fireExceptionCaught(cause);
        }
    }

      12.3.4心跳检测机制

        1.客户端

     

    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    import java.util.concurrent.ScheduledFuture;
    import java.util.concurrent.TimeUnit;
    
    import com.phei.netty.protocol.netty.MessageType;
    import com.phei.netty.protocol.netty.struct.Header;
    import com.phei.netty.protocol.netty.struct.NettyMessage;
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    
    public class HeartBeatReqHandler extends ChannelHandlerAdapter {
    
        private static final Log LOG = LogFactory.getLog(HeartBeatReqHandler.class);
    
        private volatile ScheduledFuture<?> heartBeat;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            NettyMessage message = (NettyMessage) msg;
            // 握手成功,主动发送心跳消息
            if (message.getHeader() != null && message.getHeader().getType() == MessageType.LOGIN_RESP.value()) {
                heartBeat = ctx.executor().scheduleAtFixedRate(new HeartBeatReqHandler.HeartBeatTask(ctx), 0, 5000, TimeUnit.MILLISECONDS);
            } else if (message.getHeader() != null && message.getHeader().getType() == MessageType.HEARTBEAT_RESP.value()) {
                LOG.info("Client receive server heart beat message : ---> " + message);
            } else {
                ctx.fireChannelRead(msg);
            }
        }
    
        private class HeartBeatTask implements Runnable {
            private final ChannelHandlerContext ctx;
    
            public HeartBeatTask(final ChannelHandlerContext ctx) {
                this.ctx = ctx;
            }
    
            @Override
            public void run() {
                NettyMessage heatBeat = buildHeatBeat();
                LOG.info("Client send heart beat messsage to server : ---> " + heatBeat);
                ctx.writeAndFlush(heatBeat);
            }
    
            private NettyMessage buildHeatBeat() {
                NettyMessage message = new NettyMessage();
                Header header = new Header();
                header.setType(MessageType.HEARTBEAT_REQ.value());
                message.setHeader(header);
                return message;
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            if (heartBeat != null) {
                heartBeat.cancel(true);
                heartBeat = null;
            }
            ctx.fireExceptionCaught(cause);
        }
    }

     

        2.服务端

     

    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    import com.phei.netty.protocol.netty.MessageType;
    import com.phei.netty.protocol.netty.struct.Header;
    import com.phei.netty.protocol.netty.struct.NettyMessage;
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    
    public class HeartBeatRespHandler extends ChannelHandlerAdapter {
    
        private static final Log LOG = LogFactory.getLog(HeartBeatRespHandler.class);
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            NettyMessage message = (NettyMessage) msg;
            // 返回心跳应答消息
            if (message.getHeader() != null && message.getHeader().getType() == MessageType.HEARTBEAT_REQ.value()) {
                LOG.info("Receive client heart beat message : ---> " + message);
                NettyMessage heartBeat = buildHeatBeat();
                LOG.info("Send heart beat response message to client : ---> " + heartBeat);
                ctx.writeAndFlush(heartBeat);
            } else
                ctx.fireChannelRead(msg);
        }
    
        private NettyMessage buildHeatBeat() {
            NettyMessage message = new NettyMessage();
            Header header = new Header();
            header.setType(MessageType.HEARTBEAT_RESP.value());
            message.setHeader(header);
            return message;
        }
    }

     

      12.3.5断连和重连

     

                // 所有资源释放完成之后,清空资源,再次发起重连操作
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            TimeUnit.SECONDS.sleep(1);
                            try {
                                connect(NettyConstant.PORT, NettyConstant.REMOTEIP);// 发起重连操作
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });

     

      12.3.6客户端

     

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.timeout.ReadTimeoutHandler;
    
    import java.net.InetSocketAddress;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    import com.phei.netty.protocol.netty.NettyConstant;
    import com.phei.netty.protocol.netty.codec.NettyMessageDecoder;
    import com.phei.netty.protocol.netty.codec.NettyMessageEncoder;
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    
    public class NettyClient {
    
        private static final Log LOG = LogFactory.getLog(NettyClient.class);
    
        private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
    
        EventLoopGroup group = new NioEventLoopGroup();
    
        public void connect(int port, String host) throws Exception {
            // 配置客户端NIO线程组
            try {
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4));//为了防止由于单条消息过大导致的内存溢出或者畸形码流导致解码错位引起内存分配失败,加上了消息长度限制
                                ch.pipeline().addLast("MessageEncoder", new NettyMessageEncoder());//消息自动编码
                                ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50));//读超时
                                ch.pipeline().addLast("LoginAuthHandler", new LoginAuthReqHandler());//握手请求
                                ch.pipeline().addLast("HeartBeatHandler", new HeartBeatReqHandler());//心跳消息
                            }
                        });
                // 发起异步连接操作,服务端重复登录保护
                ChannelFuture future = b.connect(new InetSocketAddress(host, port), new InetSocketAddress(NettyConstant.LOCALIP, NettyConstant.LOCAL_PORT)).sync();
                // 当对应的channel关闭的时候,就会返回对应的channel。
                // Returns the ChannelFuture which will be notified when this channel is closed. This method always returns the same future instance.
                future.channel().closeFuture().sync();
            } finally {
                // 所有资源释放完成之后,清空资源,再次发起重连操作
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            TimeUnit.SECONDS.sleep(1);
                            try {
                                connect(NettyConstant.PORT, NettyConstant.REMOTEIP);// 发起重连操作
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }
    
        public static void main(String[] args) throws Exception {
            new NettyClient().connect(NettyConstant.PORT, NettyConstant.REMOTEIP);
        }
    }

     

      12.3.7服务端

     

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.timeout.ReadTimeoutHandler;
    
    import java.io.IOException;
    
    import com.phei.netty.protocol.netty.NettyConstant;
    import com.phei.netty.protocol.netty.codec.NettyMessageDecoder;
    import com.phei.netty.protocol.netty.codec.NettyMessageEncoder;
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    
    public class NettyServer {
    
        private static final Log LOG = LogFactory.getLog(NettyServer.class);
    
        public void bind() throws Exception {
            // 配置服务端的NIO线程组
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws IOException {
                            ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4));
                            ch.pipeline().addLast(new NettyMessageEncoder());
                            ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50));
                            ch.pipeline().addLast(new LoginAuthRespHandler());
                            ch.pipeline().addLast("HeartBeatHandler", new HeartBeatRespHandler());
                        }
                    });
    
            // 绑定端口,同步等待成功
            b.bind(NettyConstant.REMOTEIP, NettyConstant.PORT).sync();
            LOG.info("Netty server start ok : " + (NettyConstant.REMOTEIP + " : " + NettyConstant.PORT));
        }
    
        public static void main(String[] args) throws Exception {
            new NettyServer().bind();
        }
    }

     

    12.4运行协议栈

      12.4.1正常场景

      12.4.2服务器宕机重启

      12.4.3客户端宕机重启

    第13章  服务端创建

    13.1原生NIO类库的复杂性

    13.2Netty服务端创建源码分析

      13.2.1Netty服务端创建时序图

      13.2.2Netty服务端创建源码分析

      13.2.3客户端接入源码分析

    第14章  客户端创建

    14.1Netty客户端创建流程分析

      14.1.1时序图

      14.1.2流程分析

    14.2源码分析

      14.2.1连接辅助类Bootstrap

      14.2.2连接操作

      14.2.3结果通知

      14.2.4连接超时机制

    第15章  ByteBuf和相关辅助类

    15.1ByteBuf功能说明

      15.1.1ByteBuf功能原理

      15.1.2ByteBuf的功能介绍

        1.顺序读操作

        2.顺序写操作

        3.readIndex和writeIndex

        4.Discardable bytes

        5.Readable bytes(可读空间)和Writable bytes(可写空间)

        6.Clear操作

        7.Mask和Rest

        8.查找操作

          indexOf、bytesBefore、forEachByte、forEachDesc

        9.Derived buffers

        10.转换成标准的ByteBuffer

        11.随机读写

    15.2ByteBuf源码分析

      15.2.1ByteBuf主要继承关系

      15.2.2AbstractByteBuf源码分析

        1.主要成员变量

        2.读操作簇

        3.写操作簇

        4.操作索引

        5.重用缓冲区

        6.skipBytes

      15.2.3AbstractReferenceCountedByteBuf源码分析

        1.主要成员变量

        2.对象计数器

      15.2.4UnpooledHeapByteBuf源码分析

        1.成员变量

        2.动态扩展缓冲区

        3.字节数组复制

        4.转换成JDK ByteBuffer

        5.子类实现相关的方法

      15.2.5PooledByteBuf内存池原理分析

        1.PoolArena

        2.PoolChunk

        3.PoolSubpage

        4.内存回收策略

      15.2.6PooledDirectByteBuf源码分析

        1.创建字节缓冲区实例

        2.复制新的字节缓冲区实例

     

     

     

        3.子类实现相关的方法

    15.3ByteBuf相关的辅助类介绍

      15.3.1ByteBufHolder

      15.3.2ByteBufAllocator字节缓冲区分配器

      15.3.3CompositeByteBuf

      15.3.4ByteBufUtil

    第16章  Channel和Unsafe

    16.1Channel功能说明

      16.1.1Channel的工作原理

      16.1.2Channel的功能介绍

        1.网络I/O操作

        2.其他常用的API功能说明

    16.2Channel源码分析

      16.2.1Channel的主要继承关系图

      16.2.2AbstractChannel源码分析

        1.成员变量定义

        2.核心API源码分析

      16.2.3AbstractNioChannel源码分析

        1.成员变量定义

        2.核心API源码分析

      16.2.4AbstractNioByteChannel源码分析

      16.2.5AbstractNioMessageChannel源码分析

      16.2.6AbstractNioMessageServerChannel源码分析

      16.2.7NioServerSocketChannel源码分析

      16.2.8NioSocketChannel源码分析

        1.连接操作

     

        2.写半包

        3.读写操作

    16.3Unsafe功能说明

    16.4Unsafe源码分析

      16.4.1Unsafe继承关系图

      16.4.2AbstractUnsafe源码分析

        1.register将channal注册到EventLoop的多路复用器

        2.bind方法用于绑定指定端口

        3.disconnect

        4.close方法

        5.write方法

     

        6.flush方法

     

      16.4.3AbstractNioUnsafe源码分析

        1.connect

        2.finishConnect

      16.4.4NioByteUnsafe源码分析

    第17章  ChannelPipeline和ChannelHandler

    17.1ChannelPipeline的功能说明

      17.1.1ChannelPipeline的事件处理

     

      17.1.2自定义拦截器

      17.1.3构建pipeline

      17.1.4ChannelPipeline的主要特性

    17.2ChannelPipeline源码分析

      17.2.1ChannelPipeline的类继承关系图

      17.2.2ChannelPipeline对ChannelHandler的管理

      17.2.3ChannelPipeline的inbound事件

      17.2.4ChannelPipeline的outbound事件

    17.3ChannelHandler功能说明

      17.3.1ChannelHandlerAdapter功能说明

      17.3.2ByteToMessageDecoder功能说明

      17.3.3MessageToMessageDecoder功能说明

      17.3.4LengthFieldBasedFrameDecoder功能说明

      17.3.5MessageToByteEncoder功能说明

      17.3.6MessageToMessageEncoder功能说明

      17.3.7LengthFieldPrepender功能说明

    17.4ChannelHandler源码分析

      17.4.1ChannelHandler的类继承关系

     

      17.4.2ByteToMessageDecoder源码分析

      17.4.3MessageToMessageDecoder源码分析

      17.4.4LengthFieldBasedFrameDecoder源码分析

      17.4.5MessageToByteEncoder源码分析

      17.4.6MessageToMessageEncoder源码分析

      17.4.7LengthFieldPrepender源码分析

    第18 章  EventLoop和EventLoopGroup

    18.1Netty的线程模型

      18.1.1Reactor单线程模型

      18.1.2Reactor多线程模型

      18.1.3主从Reactor多线程模型

      18.1.4Netty的线程模型

      18.1.5最佳实践

    18.2EventLoop源码分析

      18.2.1EventLoop设计原理

      18.2.2继承关系图

      18.2.3EventLoop

    第19章  Future和Promise

    19.1Future的功能

      1.ChannelFuture功能介绍

    19.2ChannelFuture源码分析

      1.AbstractFuture

    19.3Promise功能介绍

    19.4Promise源码分析

      19.4.1Promise继承关系图

      19.4.2DefaultPromise

    第20章  Netty架构剖析

    20.1Netty逻辑架构

      20.1.1Reactor通信层调度

      20.1.2职责链ChannelPipeline

      20.1.3业务逻辑编排

    20.2关键架构质量属性

      20.2.1高性能

      20.2.2可靠性

        1.链路有效性检测

     

        2.内存保护机制

        3.优雅停机

     

      20.2.3可定制型

      20.2.4可扩展性

     

    第21章  Java多线程在Netty中的应用

    21.1Java内存模型与多线程编程

      21.1.1硬件的发展和多任务处理

      21.1.2Java内存模型

        1.工作内存和主内存

        2.Java内存交互协议

        3.Java多线程

    21.2Netty的并发编程实践

      21.2.1对共享的可变数据进行正确的同步

      21.2.2正确使用锁

      21.2.3volatile的正确使用

      21.2.4CAS指令和原子类

      21.2.5线程安全类的应用

      21.2.6读写锁的应用

      21.2.7线程安全性文档说明

      21.2.8不用依赖线程优先级

    第22章  高性能之道

    22.1RPC调用性能模型分析

      22.1.1传统RPC调用性能差的原因

        1.网络传输方式问题

        2.序列化性能差

        3.线程模型问题

      22.1.2I/O通信性能原则

    22.2Netty高性能之道

      22.2.1异步非阻塞通信

      22.2.2高效的Reactor线程模型

      22.2.3无锁化差串行设计

      22.2.4高效的并发编程

      22.2.5高性能的序列化框架

      22.2.6零拷贝

      22.2.7内存池

      22.2.8灵活的TCP参数配置

    22.3主流NIO框架性能对比

    第23章  可靠性

    23.1可靠性需求

      23.1.1宕机的代价

      23.1.2Netty可靠性需求

    23.2Netty可靠性需求设计

      23.2.1网络通信类故障

        1.客户端连接超时

        2.通信对端强制关闭连接

        3.链路关闭

        4.定制I/O故障

      23.2.2链路的有效性检测

      23.2.3Reactor线程的保护

        1.异常处理要谨慎

        2.规避NIO BUG

      23.2.4内存保护

        1.缓冲区的内存泄漏保护

        2.缓冲区溢出保护

      23.2.5流量整形

     

        1.全局流量整形

        2.链路级流量整形

      23.2.6优雅停机接口

    23.3优化建议

      23.3.1发送队列容量上限控制

      23.3.2回推发送失败的消息

    第24章  安全性

    24.1严峻的安全形势

      24.1.1OpenSSL Heart bleed漏洞

      24.1.2安全漏洞的代价

      24.1.3Netty面临的安全风险

    24.2Netty SSL安全特性

      24.2.1SSL单向认证

      24.2.2SSL双向认证

      24.2.3第三方CA认证

    24.3Netty SSL源码分析

      24.3.1客户端

      24.3.2服务端

      24.3.3消息读取

      24.3.4消息发送

    24.4Netty扩展的安全性

      24.4.1IP地址黑名单机制

      24.4.2接入认证

    第25章  Netty未来展望

    25.1应用范围

    25.2技术演进

    25.3社区活跃度

    25.4Road Map

  • 相关阅读:
    矩阵树(Matrix-Tree)定理精华
    【面试题flex】关于弹性盒子中 flex: 0 1 auto 的一系列问题
    纯函数-Javascript
    uniapp 低功耗蓝牙使用(经典蓝牙用native.js)
    uniapp截屏
    unicloud数据聚合处理
    uniapp的post请求失效问题
    unicloud链表查询
    uni-id的使用
    坐标转换
  • 原文地址:https://www.cnblogs.com/plxz/p/9910493.html
Copyright © 2011-2022 走看看