zoukankan      html  css  js  c++  java
  • netty中级篇(2)

     

    上一篇 netty入门篇(1)

    一、编码解码技术

    如何评价一个编解码技术:

    • 是否支持跨语言,或者说支持的语言是否丰富
    • 编码码流大小,影响传输速度
    • 编码和解码的性能,即时间
    • 类库是否精致,API是否方便
    • 使用难度

    1. Java序列化缺点

    Java也提供了序列化技术,在工业化工程中有以下缺点:

    • 无法跨语言
    • 序列化后的码流太大
    • 序列化的性能太差

    下面我们来测试以下jdk序列化的问题

    创建一个测试类UserInfo:

     1 import java.io.Serializable;
     2 import java.nio.ByteBuffer;
     3 
     4 /**
     5  * @author Administrator
     6  * @version 1.0
     7  * @date 2014年2月23日
     8  */
     9 public class UserInfo implements Serializable {
    10 
    11     /**
    12      * 默认的序列号
    13      */
    14     private static final long serialVersionUID = 1L;
    15 
    16     private String userName;
    17 
    18     private int userID;
    19 
    20     public UserInfo buildUserName(String userName) {
    21         this.userName = userName;
    22         return this;
    23     }
    24 
    25     public UserInfo buildUserID(int userID) {
    26         this.userID = userID;
    27         return this;
    28     }
    29 
    30     /**
    31      * @return the userName
    32      */
    33     public final String getUserName() {
    34         return userName;
    35     }
    36 
    37     /**
    38      * @param userName the userName to set
    39      */
    40     public final void setUserName(String userName) {
    41         this.userName = userName;
    42     }
    43 
    44     /**
    45      * @return the userID
    46      */
    47     public final int getUserID() {
    48         return userID;
    49     }
    50 
    51     /**
    52      * @param userID the userID to set
    53      */
    54     public final void setUserID(int userID) {
    55         this.userID = userID;
    56     }
    57 
    58     /**
    59      * 将当前对象转换一个byte[]数组
    60      * @return
    61      */
    62     public byte[] codeC() {
    63         ByteBuffer buffer = ByteBuffer.allocate(1024);
    64         //写入userName长度和内容
    65         byte[] value = this.userName.getBytes();
    66         buffer.putInt(value.length);
    67         buffer.put(value);
    68         //直接写入Id
    69         buffer.putInt(this.userID);
    70         buffer.flip();
    71         value = null;
    72         byte[] result = new byte[buffer.remaining()];
    73         buffer.get(result);
    74         return result;
    75     }
    76 
    77     public byte[] codeC(ByteBuffer buffer) {
    78         buffer.clear();
    79         byte[] value = this.userName.getBytes();
    80         buffer.putInt(value.length);
    81         buffer.put(value);
    82         buffer.putInt(this.userID);
    83         buffer.flip();
    84         value = null;
    85         byte[] result = new byte[buffer.remaining()];
    86         buffer.get(result);
    87         return result;
    88     }
    89 }

    其中的codeC是最朴素的编码方法,我们来和它比较以下

    比较大小:

    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.io.ObjectOutputStream;
    
    /**
     * @author Administrator
     * @version 1.0
     * @date 2014年2月23日
     */
    public class TestUserInfo {
    
        /**
         * @param args
         * @throws IOException
         */
        public static void main(String[] args) throws IOException {
            UserInfo info = new UserInfo();
            info.buildUserID(100).buildUserName("Welcome to Netty");
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            ObjectOutputStream os = new ObjectOutputStream(bos);
            os.writeObject(info);
            os.flush();
            os.close();
            byte[] b = bos.toByteArray();
            System.out.println("The jdk serializable length is : " + b.length);
            bos.close();
            System.out.println("-------------------------------------");
            System.out.println("The byte array serializable length is : "
                    + info.codeC().length);
    
        }
    
    }

    结果有点不能接受,这么一点就大了6倍

    "C:Program Files (x86)Javajdk1.8.0_102injava" -Didea.launcher.port=7537 "-Didea.launcher.bin.path=C:devJetBrainsIntelliJ IDEA 2016.2.1in" -Dfile.encoding=UTF-8 -classpath "C:Program Files (x86)Javajdk1.8.0_102jrelibcharsets.jar;C:Program Files (x86)Javajdk1.8.0_102jrelibdeploy.jar;C:Program Files (x86)Javajdk1.8.0_102jrelibextaccess-bridge-32.jar;C:Program Files (x86)Javajdk1.8.0_102jrelibextcldrdata.jar;C:Program Files (x86)Javajdk1.8.0_102jrelibextdnsns.jar;C:Program Files (x86)Javajdk1.8.0_102jrelibextjaccess.jar;C:Program Files (x86)Javajdk1.8.0_102jrelibextjfxrt.jar;C:Program Files (x86)Javajdk1.8.0_102jrelibextlocaledata.jar;C:Program Files (x86)Javajdk1.8.0_102jrelibext
    ashorn.jar;C:Program Files (x86)Javajdk1.8.0_102jrelibextsunec.jar;C:Program Files (x86)Javajdk1.8.0_102jrelibextsunjce_provider.jar;C:Program Files (x86)Javajdk1.8.0_102jrelibextsunmscapi.jar;C:Program Files (x86)Javajdk1.8.0_102jrelibextsunpkcs11.jar;C:Program Files (x86)Javajdk1.8.0_102jrelibextzipfs.jar;C:Program Files (x86)Javajdk1.8.0_102jrelibjavaws.jar;C:Program Files (x86)Javajdk1.8.0_102jrelibjce.jar;C:Program Files (x86)Javajdk1.8.0_102jrelibjfr.jar;C:Program Files (x86)Javajdk1.8.0_102jrelibjfxswt.jar;C:Program Files (x86)Javajdk1.8.0_102jrelibjsse.jar;C:Program Files (x86)Javajdk1.8.0_102jrelibmanagement-agent.jar;C:Program Files (x86)Javajdk1.8.0_102jrelibplugin.jar;C:Program Files (x86)Javajdk1.8.0_102jrelib
    esources.jar;C:Program Files (x86)Javajdk1.8.0_102jrelib
    t.jar;G:projects-helloworld
    etty	argetclasses;G:
    epomavenio
    etty
    etty-all4.1.5.Final
    etty-all-4.1.5.Final.jar;C:devJetBrainsIntelliJ IDEA 2016.2.1libidea_rt.jar" com.intellij.rt.execution.application.AppMain demo.codec.serializable.TestUserInfo
    The jdk serializable length is : 117
    -------------------------------------
    The byte array serializable length is : 24

    比较下时间

    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.io.ObjectOutputStream;
    import java.nio.ByteBuffer;
    
    /**
     * @author Administrator
     * @version 1.0
     * @date 2014年2月23日
     */
    public class PerformTestUserInfo {
    
        /**
         * @param args
         * @throws IOException
         */
        public static void main(String[] args) throws IOException {
            UserInfo info = new UserInfo();
            info.buildUserID(100).buildUserName("Welcome to Netty");
            int loop = 1000000;
            ByteArrayOutputStream bos = null;
            ObjectOutputStream os = null;
            long startTime = System.currentTimeMillis();
            for (int i = 0; i < loop; i++) {
                bos = new ByteArrayOutputStream();
                os = new ObjectOutputStream(bos);
                os.writeObject(info);
                os.flush();
                os.close();
                byte[] b = bos.toByteArray();
                bos.close();
            }
            long endTime = System.currentTimeMillis();
            System.out.println("The jdk serializable cost time is  : "
                    + (endTime - startTime) + " ms");
    
            System.out.println("-------------------------------------");
    
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            startTime = System.currentTimeMillis();
            for (int i = 0; i < loop; i++) {
                byte[] b = info.codeC(buffer);
            }
            endTime = System.currentTimeMillis();
            System.out.println("The byte array serializable cost time is : "
                    + (endTime - startTime) + " ms");
    
        }
    
    }

    运行结果,jdk的慢了10倍都不止

    The jdk serializable cost time is  : 1928 ms
    -------------------------------------
    The byte array serializable cost time is : 164 ms

    2. 主流的编解码框架简介

    • Google的Protobuf
    • Facebook的Thrift
    • JBoss Marshalling

    这里主要介绍这3种,还有其他著名比如Hryo等等...

    Google ProtoBuf

    google内部久经考验。它将数据结构以.proto文件进行描述,通过代码生成工具可以生成对应数据结构的POJO对象和Protobuf相关方法和属性。

    特点:

    •   结构化数据存储格式
    •   性能高效
    •   语言无关、平台无关、扩展性
    •   官方支持Java、C++和Python三种语言

    (1) ProtoBuf使用二进制编码,而不是XML,尽管XML的可读性和扩展性都不错,但是XML牺牲的空间和时间开销太大,不适合高性能框架

    (2) ProtoBuf另一个吸引人的地方是数据描述文件和代码生成机制

    下面的图很有说服力,为什么这么多人选择Google的Protobuf

    性能对比:

     码流对比:

     Facebook的Thrift

    对当时的Facebook而言,thrift用于解决各系统间大数量的传输通信问题,因此可以多种语言,C++ C# Cocoa Erlang Haskell Java Perl PHP Python Ruby和Smalltalk

    • Thrift可以作为高性能的通信中间件,支持数据序列化的多种类型的RPC服务。
    • 适用于静态数据交换,即事先确定好它的数据结构,当数据结构变化时,必须重新编辑IDL文件,生成代码和编译。
    • 相对于XML和Json在性能和传输大小上有明显优势。

    Thrift主要由5部分组成:

    (1) 语言系统和IDL编译器:负责由用户给定的IDL文件生成相应语言接口代码;

    (2) TProtocol: RPC协议层,可以选择多种不同的序列化方式,例如Binary和Json;

    (3) TTransport:RPC传输层,同样可以选择不同的传输层实现,例如socket NIO和MemoryBuffer等;

    (4) Tprocessor: 作为协议层和用户提供的服务实现的纽带,负责调用服务实现的接口;

    (5) TServer:聚合TProtocol、TTransport和TProcessor等对象。

    关注协议的话就是关于于Tprotocol层,其支持3中典型的编解码方式:

    • 通用二进制
    • 压缩二进制
    • 优化可选字段的压缩编解码

    下图展示同等测试条件下的编解码耗时信息:

    JBoss Marshalling

    JBoss内部使用,不能跨语言,可以看做是jdk的进化版... 拥有优点如下:

    • 可插拔的类解析器、更加便捷的类加载定制策略,通过一个接口实现定制;
    • 可插拔的对象替换方式,不需要继续的方式;
    • 可插拔的预定义类缓存表,可以减小序列化的字节数组长度,提升常用类型的序列化对象性能;
    • 无须实现java.io.Serializable接口,实现序列化;
    • 利用了缓存技术提升性能

    二、MessagePack编解码技术

    2.1 介绍

    高效、性能、跨语言、码流小、支持的语言由Java Python Ruby Hashkell C# OCaml Lua Go C C++等。

    pom文件,guava是额外可以不用.

     <!-- https://mvnrepository.com/artifact/org.msgpack/msgpack -->
            <dependency>
                <groupId>org.msgpack</groupId>
                <artifactId>msgpack</artifactId>
                <version>0.6.11</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>20.0</version>
            </dependency>

    Java API

    import com.google.common.collect.Lists;
    import org.msgpack.MessagePack;
    import org.msgpack.template.Templates;
    
    import java.util.List;
    
    /**
     * Created by carl.yu on 2016/12/15.
     */
    public class ApiDemo {
        public static void main(String[] args) throws Exception {
            //使用了guava
            List<String> src = Lists.newArrayList("msgpack", "kumofs", "viver");
            MessagePack msgpack = new MessagePack();
            //序列化
            byte[] raw = msgpack.write(src);
            //反序列化
            List<String> dst1 = msgpack.read(raw, Templates.tList(Templates.TString));
            System.out.println(dst1);
        }
    }

    2.2 编写Encoder和Decoder

    注意,要使用Messagepack,需要在实体类前加上注解@Message.

    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToMessageDecoder;
    import org.msgpack.MessagePack;
    
    import java.util.List;
    
    /**
     * Created by carl.yu on 2016/12/15.
     */
    public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf> {
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
            //将msg中的字节写到array中
            System.out.println("开始进行解码...");
            final byte[] array;
            final int length = msg.readableBytes();
            array = new byte[length];
            msg.getBytes(msg.readerIndex(), array, 0, length);
            MessagePack msgpack = new MessagePack();
            Object result = msgpack.read(array);
            out.add(result);
        }
    }
    import com.google.common.base.Throwables;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    import org.msgpack.MessagePack;
    
    /**
     * Created by carl.yu on 2016/12/15.
     */
    public class MsgpackEncoder extends MessageToByteEncoder<Object> {
        @Override
        protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
            //负责将POJO对象编码为byte数组
            MessagePack msgpack = new MessagePack();
            byte[] raw = null;
            try {
                raw = msgpack.write(msg);
            } catch (Exception e) {
                e.printStackTrace();
                Throwables.propagateIfPossible(e);
            }
            out.writeBytes(raw);
        }
    }

    分别用MessagePack进行编解码

    2.3 编写Server和ServerHandler

     1 import io.netty.bootstrap.ServerBootstrap;
     2 import io.netty.channel.ChannelFuture;
     3 import io.netty.channel.ChannelInitializer;
     4 import io.netty.channel.ChannelOption;
     5 import io.netty.channel.EventLoopGroup;
     6 import io.netty.channel.nio.NioEventLoopGroup;
     7 import io.netty.channel.socket.SocketChannel;
     8 import io.netty.channel.socket.nio.NioServerSocketChannel;
     9 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    10 import io.netty.handler.codec.LengthFieldPrepender;
    11 import io.netty.handler.logging.LogLevel;
    12 import io.netty.handler.logging.LoggingHandler;
    13 
    14 /**
    15  * Created by carl.yu on 2016/12/15.
    16  */
    17 public class EchoServer {
    18     public void bind(int port) throws Exception {
    19         // 配置服务端的NIO线程组
    20         EventLoopGroup bossGroup = new NioEventLoopGroup();
    21         EventLoopGroup workerGroup = new NioEventLoopGroup();
    22         try {
    23             ServerBootstrap b = new ServerBootstrap();
    24             b.group(bossGroup, workerGroup)
    25                     .channel(NioServerSocketChannel.class)
    26                     .option(ChannelOption.SO_BACKLOG, 100)
    27                     .handler(new LoggingHandler(LogLevel.INFO))
    28                     .childHandler(new ChannelInitializer<SocketChannel>() {
    29                         @Override
    30                         public void initChannel(SocketChannel ch)
    31                                 throws Exception {
    32                             //读数据的时候用decoder解码
    33                             ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
    34                             ch.pipeline().addLast("msgpack decoder", new MsgpackDecoder());
    35                             //写数据的时候用encoder编码
    36                             ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));
    37                             ch.pipeline().addLast("msgpack encoder", new MsgpackEncoder());
    38                             //
    39                             ch.pipeline().addLast(new EchoServerHandler());
    40                         }
    41                     });
    42 
    43             // 绑定端口,同步等待成功
    44             ChannelFuture f = b.bind(port).sync();
    45 
    46             // 等待服务端监听端口关闭
    47             f.channel().closeFuture().sync();
    48         } finally {
    49             // 优雅退出,释放线程池资源
    50             bossGroup.shutdownGracefully();
    51             workerGroup.shutdownGracefully();
    52         }
    53     }
    54 
    55     public static void main(String[] args) throws Exception {
    56         int port = 8080;
    57         if (args != null && args.length > 0) {
    58             try {
    59                 port = Integer.valueOf(args[0]);
    60             } catch (NumberFormatException e) {
    61                 // 采用默认值
    62             }
    63         }
    64         new EchoServer().bind(port);
    65     }
    66 }

    主要在于2个编解码器。

    在MessagePack编码器之前增加了LengthFieldPrepender,它将在ByteBuf之前增加字节的消息长度。

      然后使用LengthFieldBasedFrameDecoder根据消息长度进行解码,工作原理如图:

    这样获取到的永远是整包消息,非常简单的解决了烦人的半包问题

    2.4 编写Client和ClientHandler

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import io.netty.handler.codec.LengthFieldPrepender;
    
    /**
     * Created by carl.yu on 2016/12/15.
     */
    public class EchoClient {
    
        public void connect(int port, String host) throws Exception {
            // 配置客户端NIO线程组
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch)
                                    throws Exception {
                                //读数据的时候用decoder解码
                                ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
                                ch.pipeline().addLast("msgpack decoder", new MsgpackDecoder());
                                //写数据的时候用encoder编码
                                ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));
                                ch.pipeline().addLast("msgpack encoder", new MsgpackEncoder());
    
                                ch.pipeline().addLast(new EchoClientHandler(100));
                            }
                        });
    
                // 发起异步连接操作
                ChannelFuture f = b.connect(host, port).sync();
    
                // 当代客户端链路关闭
                f.channel().closeFuture().sync();
            } finally {
                // 优雅退出,释放NIO线程组
                group.shutdownGracefully();
            }
        }
    
        /**
         * @param args
         * @throws Exception
         */
        public static void main(String[] args) throws Exception {
            int port = 8080;
            if (args != null && args.length > 0) {
                try {
                    port = Integer.valueOf(args[0]);
                } catch (NumberFormatException e) {
                    // 采用默认值
                }
            }
            new EchoClient().connect(port, "127.0.0.1");
        }
    }
    import demo.codec.serializable.UserInfo;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    /**
     * Created by carl.yu on 2016/12/15.
     */
    public class EchoClientHandler extends ChannelInboundHandlerAdapter {
        private final int sendNumber;
    
        public EchoClientHandler(int sendNumber) {
            this.sendNumber = sendNumber;
        }
    
        private UserInfo[] userInfo() {
            UserInfo[] userInfos = new UserInfo[sendNumber];
            UserInfo userInfo = null;
            for (int i = 0; i < sendNumber; i++) {
                userInfo = new UserInfo();
                userInfos[i] = userInfo;
                userInfo.setUserID(i);
                userInfo.setUserName("ABDCEFG-->" + i);
            }
            return userInfos;
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
           /* UserInfo userInfo = new UserInfo();
            userInfo.setUserID(0);
            userInfo.setUserName("ABDCEFG-->" + 0);*/
            UserInfo[] userInfos = userInfo();
            for (int i = 0; i < userInfos.length; i++) {
                ctx.writeAndFlush(userInfos[i]);
            }
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("客户端收到信息:" + msg);
    //        ctx.write(msg);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    //        ctx.flush();
    //        ctx.close();
        }
    }

    后面我们会更加详细的讲解LengthFieldPrepender和LengthFieldBasedFrameDecoder,这里只需要明白用来解决半包问题即可。

    三、Google Protobuf

    3.1 测试Google Protobuf

    准备环境:

    SubscribeReq.proto:

    package netty;
    option java_package="demo.codec.protobuf";
    option java_outer_classname="SubscribeReqProto";
    
    message SubscribeReq{
        required int32 subReqID = 1;
        required string userName = 2;
        required string productName = 3;
        repeated string address = 4;
    }

    SubscribeResp.proto

    package netty;
    option java_package="demo.codec.protobuf";
    option java_outer_classname="SubscribeRespProto";
    
    message SubscribeResp{
        required int32 subReqID = 1;
        required int32 respCode = 2;
        required string desc = 3;
    }

    这里不详细介绍google protobuf的语法:https://developers.google.com/protocol-buffers/docs/proto?hl=zh-CN

    build.bat

    protoc ./proto/*.proto --java_out=../main/java
    
    pause

    google protobuf依赖maven:

    <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
            <dependency>
                <groupId>com.google.protobuf</groupId>
                <artifactId>protobuf-java</artifactId>
                <version>2.5.0</version>
            </dependency>

    运行build.bat,生成:

    下面我们运行以下代码来了解Protobuf的用法:

    import com.google.protobuf.InvalidProtocolBufferException;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * @author Administrator
     * @version 1.0
     * @date 2014年2月23日
     */
    public class TestSubscribeReqProto {
    
        // 编码方法: Object->byte[]
        private static byte[] encode(SubscribeReqProto.SubscribeReq req) {
            return req.toByteArray();
        }
    
        // 解码方法: bayte[] -> Object
        private static SubscribeReqProto.SubscribeReq decode(byte[] body)
                throws InvalidProtocolBufferException {
            return SubscribeReqProto.SubscribeReq.parseFrom(body);
        }
    
        /**
         * 创建实例
         *
         * @return
         */
        private static SubscribeReqProto.SubscribeReq createSubscribeReq() {
            //(1) Builder模式
            SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq
                    .newBuilder();
            builder.setSubReqID(1);
            builder.setUserName("Lilinfeng");
            builder.setProductName("Netty Book");
            List<String> address = new ArrayList<>();
            address.add("NanJing YuHuaTai");
            address.add("BeiJing LiuLiChang");
            address.add("ShenZhen HongShuLin");
            builder.addAllAddress(address);
            return builder.build();
        }
    
        /**
         * @param args
         * @throws InvalidProtocolBufferException
         */
        public static void main(String[] args)
                throws InvalidProtocolBufferException {
            SubscribeReqProto.SubscribeReq req = createSubscribeReq();
            System.out.println("Before encode : " + req.toString());
            SubscribeReqProto.SubscribeReq req2 = decode(encode(req));
            System.out.println("After decode : " + req.toString());
            System.out.println("Assert equal : --> " + req2.equals(req));
    
        }
    
    }

    3.2 开发图书订购服务端

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.protobuf.ProtobufDecoder;
    import io.netty.handler.codec.protobuf.ProtobufEncoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    
    /**
     * @author lilinfeng
     * @version 1.0
     * @date 2014年2月14日
     */
    public class SubReqServer {
        public void bind(int port) throws Exception {
            // 配置服务端的NIO线程组
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 100)
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) {
                                ch.pipeline().addLast(
                                        new ProtobufVarint32FrameDecoder());
                                ch.pipeline().addLast(
                                        new ProtobufDecoder(
                                                SubscribeReqProto.SubscribeReq
                                                        .getDefaultInstance()));
                                ch.pipeline().addLast(
                                        new ProtobufVarint32LengthFieldPrepender());
                                ch.pipeline().addLast(new ProtobufEncoder());
                                ch.pipeline().addLast(new SubReqServerHandler());
                            }
                        });
    
                // 绑定端口,同步等待成功
                ChannelFuture f = b.bind(port).sync();
    
                // 等待服务端监听端口关闭
                f.channel().closeFuture().sync();
            } finally {
                // 优雅退出,释放线程池资源
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            int port = 8080;
            if (args != null && args.length > 0) {
                try {
                    port = Integer.valueOf(args[0]);
                } catch (NumberFormatException e) {
                    // 采用默认值
                }
            }
            new SubReqServer().bind(port);
        }
    }

    我们来注意以下编解码器的顺序:

    (1) ProtobufVarint32FrameDecoder : 半包问题

    (2) ProtobufDecoder:解码

    (3) ProtobufVarint32LenghtFiedldPrepender:半包问题

    (4) ProtobufEncoder:编码

    于是逻辑处理部分可以直接使用类:

     1 import io.netty.channel.ChannelHandler.Sharable;
     2 import io.netty.channel.ChannelHandlerAdapter;
     3 import io.netty.channel.ChannelHandlerContext;
     4 import io.netty.channel.ChannelInboundHandlerAdapter;
     5 
     6 /**
     7  * @author lilinfeng
     8  * @version 1.0
     9  * @date 2014年2月14日
    10  */
    11 @Sharable
    12 public class SubReqServerHandler extends ChannelInboundHandlerAdapter {
    13 
    14     @Override
    15     public void channelRead(ChannelHandlerContext ctx, Object msg)
    16             throws Exception {
    17         SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq) msg;
    18         if ("Lilinfeng".equalsIgnoreCase(req.getUserName())) {
    19             System.out.println("Service accept client subscribe req : ["
    20                     + req.toString() + "]");
    21             ctx.writeAndFlush(resp(req.getSubReqID()));
    22         }
    23     }
    24 
    25     private SubscribeRespProto.SubscribeResp resp(int subReqID) {
    26         SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp
    27                 .newBuilder();
    28         builder.setSubReqID(subReqID);
    29         builder.setRespCode(0);
    30         builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address");
    31         return builder.build();
    32     }
    33 
    34     @Override
    35     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    36         cause.printStackTrace();
    37         ctx.close();// 发生异常,关闭链路
    38     }
    39 }

    3.3 图书订购客户端开发

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.protobuf.ProtobufDecoder;
    import io.netty.handler.codec.protobuf.ProtobufEncoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
    
    /**
     * @author lilinfeng
     * @version 1.0
     * @date 2014年2月14日
     */
    public class SubReqClient {
    
        public void connect(int port, String host) throws Exception {
            // 配置客户端NIO线程组
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch)
                                    throws Exception {
                                ch.pipeline().addLast(
                                        new ProtobufVarint32FrameDecoder());
                                ch.pipeline().addLast(
                                        new ProtobufDecoder(
                                                SubscribeRespProto.SubscribeResp
                                                        .getDefaultInstance()));
                                ch.pipeline().addLast(
                                        new ProtobufVarint32LengthFieldPrepender());
                                ch.pipeline().addLast(new ProtobufEncoder());
                                ch.pipeline().addLast(new SubReqClientHandler());
                            }
                        });
    
                // 发起异步连接操作
                ChannelFuture f = b.connect(host, port).sync();
    
                // 当代客户端链路关闭
                f.channel().closeFuture().sync();
            } finally {
                // 优雅退出,释放NIO线程组
                group.shutdownGracefully();
            }
        }
    
        /**
         * @param args
         * @throws Exception
         */
        public static void main(String[] args) throws Exception {
            int port = 8080;
            if (args != null && args.length > 0) {
                try {
                    port = Integer.valueOf(args[0]);
                } catch (NumberFormatException e) {
                    // 采用默认值
                }
            }
            new SubReqClient().connect(port, "127.0.0.1");
        }
    }
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * @author lilinfeng
     * @version 1.0
     * @date 2014年2月14日
     */
    public class SubReqClientHandler extends ChannelInboundHandlerAdapter {
    
        /**
         * Creates a client-side handler.
         */
        public SubReqClientHandler() {
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            for (int i = 0; i < 10; i++) {
                ctx.write(subReq(i));
            }
            ctx.flush();
        }
    
        private SubscribeReqProto.SubscribeReq subReq(int i) {
            SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq
                    .newBuilder();
            builder.setSubReqID(i);
            builder.setUserName("Lilinfeng");
            builder.setProductName("Netty Book For Protobuf");
            List<String> address = new ArrayList<>();
            address.add("NanJing YuHuaTai");
            address.add("BeiJing LiuLiChang");
            address.add("ShenZhen HongShuLin");
            builder.addAllAddress(address);
            return builder.build();
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
            System.out.println("Receive server response : [" + msg + "]");
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }

    在不怎么了解Protobuf实现和使用细节的情况 下,我们就可以轻松支持Google Protobuf编码

    3.4 注意事项

    ProtobufDecoder仅仅负责解码,因此在ProtobufDecoder前面,一定要能够处理半包的解码器,有以下3种方式:

    (1) 使用Netty提供的ProtobufVarint32FrameDecoder,它可以处理半包消息;

    (2) 继承Netty提供的通用半包解码器LengthFieldBasedFrameDecoder;

    (3) 继承ByteToMessageDecoder,自己处理..

    半包问题必须解决,否则服务器无法正常工作。

    四、JBoss Marshalling编解码

    暂时略。可以参考netty初探(2)

  • 相关阅读:
    OLAP ODS项目的总结 平台选型,架构确定
    ORACLE ORA12520
    ORACLE管道函数
    ORACLE RAC JDBC 配置
    ORACLE RAC OCFS连接产生的错误
    ORACLE 启动和关闭详解
    OLAP ODS项目的总结 起步阶段
    ORACLE RAC 配置更改IP
    ORACLE RAC OCR cann't Access
    ORACLE RAC Debug 之路 CRS0184错误与CRS初始化
  • 原文地址:https://www.cnblogs.com/carl10086/p/6183687.html
Copyright © 2011-2022 走看看