zoukankan      html  css  js  c++  java
  • Netty入门

    简介

    linux 网络I/O模型介绍

    1)堵塞I/O模型

    2)非堵塞I/O模型

    3)伪异步I/O模型

    4)多路复用select / poll /epoll

    5)信号驱动I/O模型

    6) 异步I/O

    netty入门应用

    package com.netty;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    public class TimeServer {
        public void bind(int port) throws Exception {
            // 配置线程组
            //用于网络事件的处理
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            //用于socketChannel的网络读写
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                //netty用于启动NIO服务端的辅助启动类,目的是降低服务端的开发难度
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
                        .childHandler(new ChildChannelHandler());  //绑定网络IO事件的处理类
                // 绑定端口,同步等待成功
                ChannelFuture f = b.bind(port).sync();
                // 等待服务端监听端口关闭
                f.channel().closeFuture().sync();
            } finally {
                // 优雅退出,释放线程池资源
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
    
            @Override
            protected void initChannel(SocketChannel arg0) throws Exception {
                arg0.pipeline().addLast(new TimeServerHandler());
    
            }
    
        }
    
        public static void main(String[] args) throws Exception {
            int port = 8080;
            if(args != null && args.length > 0){
                try {
                    port = Integer.valueOf(args[0]);
                } catch (Exception e) {
                    //采用默认值
                }
            }
            new TimeServer().bind(port);
        }
    }
    View Code
    package com.netty;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class TimeServerHandler extends ChannelHandlerAdapter{
    
        public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];//根据缓冲区可读字节数创建byte数组
            buf.readBytes(req);
            String body = new String(req,"UTF-8");
            System.out.println("The time server receive order :"+body);
            String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?new java.util.Date(System.currentTimeMillis()).toString():"BAD ORDER";
            ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
            ctx.write(resp);
        }
        
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception{
            ctx.flush();
        }
    
        /* (non-Javadoc)
         * @see io.netty.channel.ChannelHandlerAdapter#exceptionCaught(io.netty.channel.ChannelHandlerContext, java.lang.Throwable)
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            // TODO Auto-generated method stub
            super.exceptionCaught(ctx, cause);
            ctx.close();
        }
        
    }
    View Code
    package com.netty;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    public class TimeClient {
        public void connect(int port, String host) throws Exception {
            // 配置客户端NIO线程组
            NioEventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
    
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new TimeClientHandler());
    
                            }
    
                        });
                // 发起异步连接操作,等待连接成功
                ChannelFuture f = b.connect(host, port).sync();
                // 等待客户端链路关闭
                f.channel().closeFuture().sync();
            } finally {
                // 优雅的退出 释放NIO线程组
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port = 8080;
            if (args != null && args.length > 0) {
                try {
                    port = Integer.valueOf(args[0]);
                } catch (Exception e) {
                    // TODO: handle exception
                }
            }
            new TimeClient().connect(port, "127.0.0.1");
        }
    
    }
    View Code
    package com.netty;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class TimeClientHandler extends ChannelHandlerAdapter {
    
        private final ByteBuf firstMessage;
    
        public TimeClientHandler() {
            byte[] req = "QUERY TIME ORDER".getBytes();
            firstMessage = Unpooled.buffer(req.length);
            firstMessage.writeBytes(req);
        }
    
        // 当客户端与服务器TCP建立成功后,Netty的NIO线程会调用channelActive方法
        public void channelActive(ChannelHandlerContext ctx) {
            // 将请求消息发送给服务端
            ctx.writeAndFlush(firstMessage);
        }
    
        // 当服务端返回应答消息时,channelRead方法被调用
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "UTF-8");
            System.out.println("Now is :" + body);
        }
    
        /*
         * (non-Javadoc)
         * 
         * @see
         * io.netty.channel.ChannelHandlerAdapter#exceptionCaught(io.netty.channel.
         * ChannelHandlerContext, java.lang.Throwable)
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            // TODO Auto-generated method stub
            super.exceptionCaught(ctx, cause);
            ctx.close();
        }
    
    }
    View Code

    TCP粘包/拆包问题解决之道

     1)消息定长

    2)在包尾增加回车换行符进行分割,列如FTP协议

    3)将消息分为消息头和消息体,消息头包含消息的总长度

    4)更复杂的应用层协议

     使用LineBasedFrameDecoder和StringDecoder按行切换的文本编辑器

    在client和server的pipeline中添加:

    /**
    * LineBasedFrameDecoder的工作原理依次遍历ByteBuf中的可读字节,判断是否有“ ”和“ ”,如果有,以此结束。
    * 当1024长度还没发现结束符,则结束掉并抛弃之前读到的异常流码
    *
    * StringDecoder将接受到的对象转化成字符串,然后继续调用handler
    *
    * LineBasedFrameDecoder+StringDecoder组合其实就是按行切换的文本编辑器
    */

    //判断ByteBuf中的可读字节,判断是否有"
    ","
    ",如果有就以此位置为结束位置。
    //当最大长度1024字节仍然没有发现换行符,就抛出异常。
    ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
    //将收到的对象转化成字符串
    ch.pipeline().addLast(new StringDecoder());

    分隔符和定长编码器的应用

    对消息区分:

    1)消息固定长度

    2)将回车换行符作为消息结束符

    3)将特殊分隔符作为消息的结束标志

    4)通过在消息头中定义长度字段来标识消息的总长度

     DelimiterBasedFrameDecoder开发

    在client与server的pipeline中添加如下解码器

    //创建分隔符缓冲对象
    ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
    ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
    ch.pipeline().addLast(new StringDecoder());

    FixedLengthFrameDecoder应用开发

    /**
    * 利用FixedLengthFrameDecoder解码器,无论一次性接收多少数据报,
    * 他都会按照构造函数中设置的固定长度进行解码,如果是半包消息,
    * FixedLengthFrameDecoder会缓存半包消息并等待下个包到达后进行拼包,直到读取到一个完整的包。
    */
    ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
    ch.pipeline().addLast(new StringDecoder());

    编解码技术

    java序列化的缺点:

    1)无法跨语言

    2)码流太大

    3)序列化性能不高

    编解码框架

    MessagePack编解码  

    特点:

    1)编解码高效,性能高。

    2)序列化之后的码流小。

    3)支持跨语言。

    /**
     * MessagePack编码器的开发
     * 负责将Object类型的POJO对象编码成byte数组,然后写入到ByteBuf中
     * @author Administrator
     *
     */
    public class MsgpackEncoder extends MessageToByteEncoder<Object> {
    
        @Override
        protected void encode(ChannelHandlerContext arg0, Object arg1, ByteBuf arg2) throws Exception {
            MessagePack msgpack = new MessagePack();
            // 对象arg1序列化
            byte[] raw = msgpack.write(arg1);
            arg2.writeBytes(raw);
        }
    
    }
    /**
     * MessagePack 解码器开发
     * @author Administrator
     *
     */
    public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf> {
    
        @Override
        protected void decode(ChannelHandlerContext arg0, ByteBuf arg1, List<Object> arg2) throws Exception {
            final byte[] array;
            final int length = arg1.readableBytes();
            array = new byte[length];
            //首先从数据报arg1中获取解码的byte数组
            arg1.getBytes(arg1.readerIndex(), array,0,length);
            MessagePack msgpack = new MessagePack();
            //调用read()将其反序列化为object对象,并加入到解码列表中
            arg2.add(msgpack.read(array));
        }
    
    }

    在client和server的pipeline中添加编解码处理器:

    //LengthFieldBasedFrameDecoder  LengthFieldPrepender  解决粘包问题  
                                //MsgpackDecoder MsgpackEncoder  解决对象编码问题
                                ch.pipeline().addLast("frameDecoder",
                                        new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
                                ch.pipeline().addLast("msgpack decoder",new MsgpackDecoder());
    //加了2个字节的消息字段 ch.pipeline().addLast(
    "frameEncoder",new LengthFieldPrepender(2)); ch.pipeline().addLast("msgpack encoder",new MsgpackEncoder());

    Google Protobuf编解码

    JBoss Marshalling编解码    jboss-marshalling-1.3.0.jar 和 jboss-marshalling-serial-1.3.4.jar

    /**
         * 创建Jboss Marshalling解码器marshallingDecoder
         * @return
         */
        public static MarshallingDecoder buildMarshallingDecoder(){
            //获取MarshallerFactory实例 ,参数serial表示创建的java序列化工厂对象,
            //由jboss-marshalling-serial-1.3.4.jar提供
            final MarshallerFactory marshallerFactory =
                    Marshalling.getMarshallerFactory("serial");
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            UnmarshallerProvider provider = 
                    new DefaultUnmarshallerProvider(marshallerFactory, configuration);
            //1024 单个消息序列化最大长度
            MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
            return decoder;
        }
        
        public static MarshallingEncoder buildMarshallingEncoder(){
            final MarshallerFactory marshallerFactory = 
                    Marshalling.getProvidedMarshallerFactory("serial");
            final MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            DefaultMarshallerProvider provider =
                    new DefaultMarshallerProvider(marshallerFactory, configuration);
            //将POJO对象序列化为二进制数组
            MarshallingEncoder encoder = new MarshallingEncoder(provider);
            return encoder;
        }

    在client和server的pipeline中添加编解码器: (支持半包和拆包的处理)

    ch.pipeline().addLast(MarshallingCodeCFactory
                .buildMarshallingDecoder());
    ch.pipeline().addLast(MarshallingCodeCFactory
                .buildMarshallingEncoder());
  • 相关阅读:
    VUE动态组件component以及<keep-alive>
    git flow工作流
    vue组件通讯
    webpack加载器和自动打包工具
    webpack的插件使用,以及引入vue文件的注意事项
    webpack起步以及手动配置config文件
    git 基本操作
    C++中静态成员变量的可以在类内初始化吗?
    python 环境变量设置PYTHONPATH
    vector中resize和reserve的区别
  • 原文地址:https://www.cnblogs.com/gaojy/p/6694766.html
Copyright © 2011-2022 走看看