zoukankan      html  css  js  c++  java
  • Netty浅谈

    1.Netty的介绍

    1. Netty 是由 JBOSS 提供的一个 Java 开源框架,现为 Github 上的独立项目。
    2. Netty 是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络 IO 程序。
    3. Netty 主要针对在 TCP 协议下,面向 Client 端的高并发应用,或者 Peer-to-Peer 场景下的大量数据持续传输的应用。
    4. Netty 本质是一个 NIO 框架,适用于服务器通讯相关的多种应用场景。
    5. 要透彻理解 Netty,需要先学习 NIO,这样我们才能阅读 Netty 的源码。

     

    2.原生NIO存在的问题

    1. NIO 的类库和 API 繁杂,使用麻烦:需要熟练掌握 SelectorServerSocketChannelSocketChannelByteBuffer等。
    2. 需要具备其他的额外技能:要熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的 NIO 程序。
    3. 开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。4. JDK NIO 的 Bug:例如臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU100%。直到 JDK1.7 版本该问题仍旧存在,没有被根本解决。

    3.Netty的官网说明

    官网:https://netty.io/

    Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.

    4.Netty的优点

    Netty 对 JDK 自带的 NIO 的 API 进行了封装,解决了上述问题。

    1. 设计优雅:适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型-单线程,一个或多个线程池。
    2. 使用方便:详细记录的 Javadoc,用户指南和示例;没有其他依赖项,JDK5(Netty3.x)或 6(Netty4.x)就足够了。
    3. 高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。
    4. 安全:完整的 SSL/TLS 和 StartTLS 支持。
    5. 社区活跃、不断更新:社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复,同时,更多的新功能会被加入。

    5.Netty的模型

    5.1 Netty工作原理示意图

    Netty 主要基于主从 Reactors 多线程模型(如图)做了一定的改进,其中主从 Reactor 多线程模型有多个 Reactor

     5.2 对上图的说明

    1. Netty 抽象出两组线程池 BossGroup 专门负责接收客户端的连接,WorkerGroup 专门负责网络的读写
    2. BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup
    3. NioEventLoopGroup 相当于一个事件循环组,这个组中含有多个事件循环,每一个事件循环是 NioEventLoop
    4. NioEventLoop 表示一个不断循环的执行处理任务的线程,每个 NioEventLoop 都有一个 Selector,用于监听绑定在其上的 socket 的网络通讯
    5. NioEventLoopGroup 可以有多个线程,即可以含有多个 NioEventLoop
    6. 每个 BossNioEventLoop 循环执行的步骤有 3 步
      • 轮询 accept 事件
      • 处理 accept 事件,与 client 建立连接,生成 NioScocketChannel,并将其注册到某个 worker NIOEventLoop 上的 Selector
      • 处理任务队列的任务,即 runAllTasks
    7. 每个 Worker NIOEventLoop 循环执行的步骤
      • 轮询 readwrite 事件
      • 处理 I/O 事件,即 readwrite 事件,在对应 NioScocketChannel 处理
      • 处理任务队列的任务,即 runAllTasks
    8. 每个 Worker NIOEventLoop 处理业务时,会使用 pipeline(管道),pipeline 中包含了 channel,即通过 pipeline 可以获取到对应通道,管道中维护了很多的处理器

    6.Netty入门实例--TCP服务

    实例要求:使用 IDEA 创建 Netty 项目

    1. Netty 服务器在 9000 端口监听,客户端能发送消息给服务器16进制报文
    2. 服务器可以回复消息给客户端发送的16进制报文
    3. 目的:对 Netty 线程模型有一个初步认识,便于理解 Netty 模型理论

    6.1 引入Netty依赖

            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
            </dependency>

    6.2 服务端

    public class Myserver {
    
        public static void main(String[] args) throws InterruptedException {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workGroup = new NioEventLoopGroup();
    
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workGroup)
                    //保持连接数
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    // 保持连接
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() { //设置客户端连接socket参数
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new DecodeUtil());
                            pipeline.addLast(new EncodeUtil());
                            pipeline.addLast(new ServerHandler());
                        }
                    });
            ChannelFuture future = null;
            try {
                future = serverBootstrap.bind(9000).sync();
                if (future.isSuccess()) {
                    System.out.println("Netty服务端启动成功");
                } else {
                    System.out.println("Netty服务端启动失败");
                }
                future.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                bossGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
            }
        }
    }

    6.3 解码器

    /**
     * 解码器
     */
    public class DecodeUtil extends ByteToMessageDecoder {
        @Override
        protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) {
            try {
                //byteBuf的长度
                int bufNum = byteBuf.readableBytes();
                //byteBuf当前的读索引
                int readerIndex = byteBuf.readerIndex();
                byte[] bytes = new byte[2];
                if (bufNum >= 4) {   //byteBuf的长度大于4,
                   //查看前两个字节判断消息头
                    for (int index = 0; index < 2; index++) {
                        bytes[index] = byteBuf.getByte(readerIndex);
                        readerIndex++;
                    }
                    //将前2个字节转换为16进制
                    String header = ConvertCode.receiveHexToString(bytes);
                    int length = 0;
                    if (header.toUpperCase().equals("AAF5")) {
                        //获取包长度
                        bytes = new byte[2];
                        bytes[0] = byteBuf.getByte(2);
                        bytes[1] = byteBuf.getByte(3);
                        length = ConvertCode.getShort(bytes, 0);
                    } else {
                        return;
                    }
                    if (bufNum >= length) {
                        bytes = new byte[length];
                        byteBuf.readBytes(bytes);
                        list.add(bytes);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    }

    6.4 编码器

    /**
     * 编码器
     */
    public class EncodeUtil extends MessageToByteEncoder {
    
        @Override
        protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
            // netty需要用ByteBuf传输
            ByteBuf buff = Unpooled.buffer();
            // 对接需要16进制
            buff.writeBytes(ConvertCode.hexString2Bytes(o.toString()));
            channelHandlerContext.writeAndFlush(buff);
        }
    }

    6.5 业务处理的handler

    public class ServerHandler extends SimpleChannelInboundHandler<byte[]> {
    
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("客户端" + ctx.channel().remoteAddress() + "连接了");
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("客户端" + ctx.channel().remoteAddress() + "离开了");
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, byte[] msg) {
            try {
                System.out.println("收到消息:" + ConvertCode.receiveHexToString(msg));
                //报文原样返回
                channelHandlerContext.writeAndFlush(ConvertCode.receiveHexToString(msg));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            super.channelReadComplete(ctx);
        }
    
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }

    6.5 工具类

    public class ConvertCode {
        /**
         * @Title:bytes2HexString
         * @Description:字节数组转16进制字符串
         * @param b
         *            字节数组
         * @return 16进制字符串
         * @throws
         */
        public static String bytes2HexString(byte[] b) {
            StringBuffer result = new StringBuffer();
            String hex;
            for (int i = 0; i < b.length; i++) {
                hex = Integer.toHexString(b[i] & 0xFF);
                if (hex.length() == 1) {
                    hex = '0' + hex;
                }
                result.append(hex.toUpperCase());
            }
            return result.toString();
        }
        /**
         * @Title:hexString2Bytes
         * @Description:16进制字符串转字节数组
         * @param src  16进制字符串
         * @return 字节数组
         */
        public static byte[] hexString2Bytes(String src) {
            int l = src.length() / 2;
            byte[] ret = new byte[l];
            for (int i = 0; i < l; i++) {
                ret[i] = (byte) Integer.valueOf(src.substring(i * 2, i * 2 + 2), 16).byteValue();
            }
            return ret;
        }
        /**
         * @Title:string2HexString
         * @Description:字符串转16进制字符串
         * @param strPart  字符串
         * @return 16进制字符串
         */
        public static String string2HexString(String strPart) {
            StringBuffer hexString = new StringBuffer();
            for (int i = 0; i < strPart.length(); i++) {
                int ch = (int) strPart.charAt(i);
                String strHex = Integer.toHexString(ch);
                hexString.append(strHex);
            }
            return hexString.toString();
        }
        /**
         * @Title:hexString2String
         * @Description:16进制字符串转字符串
         * @param src
         *            16进制字符串
         * @return 字节数组
         * @throws
         */
        public static String hexString2String(String src) {
            String temp = "";
            for (int i = 0; i < src.length() / 2; i++) {
                //System.out.println(Integer.valueOf(src.substring(i * 2, i * 2 + 2),16).byteValue());
                temp = temp+ (char)Integer.valueOf(src.substring(i * 2, i * 2 + 2),16).byteValue();
            }
            return temp;
        }
    
        /**
         * @Title:char2Byte
         * @Description:字符转成字节数据char-->integer-->byte
         * @param src
         * @return
         * @throws
         */
        public static Byte char2Byte(Character src) {
            return Integer.valueOf((int)src).byteValue();
        }
    
        /**
         * @Title:intToHexString
         * @Description:10进制数字转成16进制
         * @param a 转化数据
         * @param len 占用字节数
         * @return
         * @throws
         */
        public static String intToHexString(int a,int len){
            len<<=1;
            String hexString = Integer.toHexString(a);
            int b = len -hexString.length();
            if(b>0){
                for(int i=0;i<b;i++)  {
                    hexString = "0" + hexString;
                }
            }
            return hexString;
        }
    
    
        /**
         * 将16进制的2个字符串进行异或运算
         * http://blog.csdn.net/acrambler/article/details/45743157
         * @param strHex_X
         * @param strHex_Y
         * 注意:此方法是针对一个十六进制字符串一字节之间的异或运算,如对十五字节的十六进制字符串异或运算:1312f70f900168d900007df57b4884
        先进行拆分:13 12 f7 0f 90 01 68 d9 00 00 7d f5 7b 48 84
        13 xor 12-->1
        1 xor f7-->f6
        f6 xor 0f-->f9
        ....
        62 xor 84-->e6
        即,得到的一字节校验码为:e6
         * @return
         */
        public static String xor(String strHex_X,String strHex_Y){
            //将x、y转成二进制形式
            String anotherBinary=Integer.toBinaryString(Integer.valueOf(strHex_X,16));
            String thisBinary=Integer.toBinaryString(Integer.valueOf(strHex_Y,16));
            String result = "";
            //判断是否为8位二进制,否则左补零
            if(anotherBinary.length() != 8){
                for (int i = anotherBinary.length(); i <8; i++) {
                    anotherBinary = "0"+anotherBinary;
                }
            }
            if(thisBinary.length() != 8){
                for (int i = thisBinary.length(); i <8; i++) {
                    thisBinary = "0"+thisBinary;
                }
            }
            //异或运算
            for(int i=0;i<anotherBinary.length();i++){
                //如果相同位置数相同,则补0,否则补1
                if(thisBinary.charAt(i)==anotherBinary.charAt(i))
                    result+="0";
                else{
                    result+="1";
                }
            }
            return Integer.toHexString(Integer.parseInt(result, 2));
        }
    
    
        /**
         *  Convert byte[] to hex string.这里我们可以将byte转换成int
         * @param src byte[] data
         * @return hex string
         */
        public static String bytes2Str(byte[] src){
            StringBuilder stringBuilder = new StringBuilder("");
            if (src == null || src.length <= 0) {
                return null;
            }
            for (int i = 0; i < src.length; i++) {
                int v = src[i] & 0xFF;
                String hv = Integer.toHexString(v);
                if (hv.length() < 2) {
                    stringBuilder.append(0);
                }
                stringBuilder.append(hv);
            }
            return stringBuilder.toString();
        }
        /**
         * @return 接收字节数据并转为16进制字符串
         */
        public static String receiveHexToString(byte[] by) {
            try {
                /*io.netty.buffer.WrappedByteBuf buf = (WrappedByteBuf)msg;
                ByteBufInputStream is = new ByteBufInputStream(buf);
                byte[] by = input2byte(is);*/
                String str = bytes2Str(by);
                str = str.toUpperCase();
                return str;
            } catch (Exception ex) {
                ex.printStackTrace();
                System.out.println("接收字节数据并转为16进制字符串异常");
            }
            return null;
        }
    
    
    
        /**
         * "7dd",4,'0'==>"07dd"
         * @param input 需要补位的字符串
         * @param size 补位后的最终长度
         * @param symbol 按symol补充 如'0'
         * @return
         * N_TimeCheck中用到了
         */
        public static String fill(String input, int size, char symbol) {
            while (input.length() < size) {
                input = symbol + input;
            }
            return input;
        }
    //    public static void main(String args[]) {
    //        String productNo = "3030303032383838";
    //        System.out.println(hexString2String(productNo));
    //        productNo = "04050103000001070302050304";
    //        System.out.println(hexString2String(productNo));
    //    }
    
        /**
         * 获取short,小端
         *
         * @param src
         * @param index
         * @return
         */
        public static short getShort(byte[] src, int index) {
            return (short) (((src[index + 1] << 8) | src[index] & 0xff));
        }
    
    
    
    }

    6.6 服务运行演示

    6.6.1 使用TCP Socket测试工具建立连接,发送报文:

    AAF56E0010026A0000000000363130313133303032373030303031000000000000000000000000000000000000A851000001006400000001010000010A02ED0B000020210414180000FF00000000000000000000000000000000000000000000000000000035C901000000000024

     6.6.2 我们看到服务器收到了客户端的报文,并将报文原样返回给了客户端

    完毕,有什么问题,请大家指正!

  • 相关阅读:
    趋势or过渡,量子点屏幕真的优于OLED?
    文件打开方式设置
    学Arduino 需要做哪些准备?(引自"知乎用户:郑兴芳,DhP"的回答)
    Arduino扫盲(持续添加中)
    订购一套Arduino UNO r3入门套件
    第一次接触Arduino
    关于移动端的事件委托问题
    ASDas
    CentOS利用source命令导入sql文件
    CentOS-LAMP
  • 原文地址:https://www.cnblogs.com/wiliamzhao/p/14779662.html
Copyright © 2011-2022 走看看