zoukankan      html  css  js  c++  java
  • Netty学习——Netty和Protobuf的整合(一)

    Netty学习——Netty和Protobuf的整合


    Protobuf作为序列化的工具,将序列化后的数据,通过Netty来进行在网络上的传输

    1.将proto文件里的java包的位置修改一下,然后再执行一下protoc

     异常捕获:启动服务器端正常,在启动客户端的时候,发送消息,报错

    警告: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
    io.netty.handler.codec.DecoderException: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:98)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:313)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:427)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:281)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
        at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
        at com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:102)
        at com.google.protobuf.CodedInputStream$ArrayDecoder.readTag(CodedInputStream.java:627)
        at com.dawa.netty.sixthexample.MyDataInfo$Person.<init>(MyDataInfo.java:109)
        at com.dawa.netty.sixthexample.MyDataInfo$Person.<init>(MyDataInfo.java:69)
        at com.dawa.netty.sixthexample.MyDataInfo$Person$1.parsePartialFrom(MyDataInfo.java:881)
        at com.dawa.netty.sixthexample.MyDataInfo$Person$1.parsePartialFrom(MyDataInfo.java:875)
        at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:158)
        at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:191)
        at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:197)
        at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48)
        at io.netty.handler.codec.protobuf.ProtobufDecoder.decode(ProtobufDecoder.java:121)
        at io.netty.handler.codec.protobuf.ProtobufDecoder.decode(ProtobufDecoder.java:65)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
        ... 23 more
    
    十二月 03, 2019 11:01:51 上午 io.netty.channel.DefaultChannelPipeline onUnhandledInboundException
    警告: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
    io.netty.handler.codec.DecoderException: com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field.  This could mean either that the input has been truncated or that an embedded message misreported its own length.
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:98)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:300)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
        at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field.  This could mean either that the input has been truncated or that an embedded message misreported its own length.
        at com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:84)
        at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawByte(CodedInputStream.java:1238)
        at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawVarint64SlowPath(CodedInputStream.java:1126)
        at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawVarint32(CodedInputStream.java:1020)
        at com.google.protobuf.CodedInputStream$ArrayDecoder.readTag(CodedInputStream.java:623)
        at com.dawa.netty.sixthexample.MyDataInfo$Person.<init>(MyDataInfo.java:109)
        at com.dawa.netty.sixthexample.MyDataInfo$Person.<init>(MyDataInfo.java:69)
        at com.dawa.netty.sixthexample.MyDataInfo$Person$1.parsePartialFrom(MyDataInfo.java:881)
        at com.dawa.netty.sixthexample.MyDataInfo$Person$1.parsePartialFrom(MyDataInfo.java:875)
        at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:158)
        at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:191)
        at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:197)
        at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48)
        at io.netty.handler.codec.protobuf.ProtobufDecoder.decode(ProtobufDecoder.java:121)
        at io.netty.handler.codec.protobuf.ProtobufDecoder.decode(ProtobufDecoder.java:65)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
        ... 21 more

    开始找问题:
    1.io.netty.handler.codec.DecoderException: 在解码过程中出错,可能是服务器端出的错
    2.Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
    3.While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either that the input has been truncated or that an embedded message misreported its own length.

    检查了一下代码:猜测了一下:成了,效果图如下

     可怕,原因竟然出现在编码器添加的顺序不同导致的: 正确的顺序是这样的:

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //这里和之前的不一样了,但也就是处理器的不一样
        //编解码处理器,protobuf提供的专门的编解码器.4个处理器
        pipeline.addLast(new ProtobufVarint32FrameDecoder());
        //Decoder是重点,解码器,将字节码转换成想要的数据类型
        //参数  messageLite,外层的要转换的类的实例
        pipeline.addLast(new ProtobufDecoder(MyDataInfo.Person.getDefaultInstance()));
        pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
        pipeline.addLast(new ProtobufEncoder());
        pipeline.addLast(new TestServerHandler());
    }

    接下来存放一下完整的客户端和服务器端的代码
    proto文件

    yntax ="proto2";
    
    package com.dawa.protobuf;
    
    option optimize_for = SPEED;
    option java_package ="com.dawa.netty.sixthexample";
    option java_outer_classname = "MyDataInfo";
    
    message Person{
        required string name = 1;
        optional int32 age = 2;
        optional string address = 3;
    }
    
    message Person2{
        required string name = 1;
        optional int32 age = 2;
        optional string address = 3;
    }

    服务器端代码:

    package com.dawa.netty.sixthexample;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    
    /**
     * @Title: TestServer
     * @Author: 大娃
     * @Date: 2019/12/3 10:01
     * @Description:
     */
    public class TestServer {
        public static void main(String[] args) throws Exception {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new TestServerInitializer());
    
                ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
                channelFuture.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    package com.dawa.netty.sixthexample;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    /**
     * @Title: TestServerHandler
     * @Author: 大娃
     * @Date: 2019/12/3 10:09
     * @Description: handler本身是个泛型,这里的泛型就是取 要处理的类型
     */
    public class TestServerHandler extends SimpleChannelInboundHandler<MyDataInfo.Person> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.Person msg) throws Exception {
            //对接受到的消息进行处理:
            System.out.println(msg.getName());
            System.out.println(msg.getAge());
            System.out.println(msg.getAddress());
        }
    }
    package com.dawa.netty.sixthexample;
    
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.protobuf.ProtobufDecoder;
    import io.netty.handler.codec.protobuf.ProtobufEncoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
    
    /**
     * @Title: TestServerInitializer
     * @Author: 大娃
     * @Date: 2019/12/3 10:05
     * @Description:
     */
    public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            //这里和之前的不一样了,但也就是处理器的不一样
            //编解码处理器,protobuf提供的专门的编解码器.4个处理器
            pipeline.addLast(new ProtobufVarint32FrameDecoder());
            //Decoder是重点,解码器,将字节码转换成想要的数据类型
            //参数  messageLite,外层的要转换的类的实例
            pipeline.addLast(new ProtobufDecoder(MyDataInfo.Person.getDefaultInstance()));
            pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
            pipeline.addLast(new ProtobufEncoder());
            pipeline.addLast(new TestServerHandler());
        }
    }

    客户端代码

    package com.dawa.netty.sixthexample;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    /**
     * @Title: TestClient
     * @Author: 大娃
     * @Date: 2019/12/3 10:15
     * @Description: 客户端内容
     */
    public class TestClient {
        public static void main(String[] args) throws Exception {
            EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                .handler(new TestClientInitializer());
                //注意此处,使用的是connect,不是使用的bind
                ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
                channelFuture.channel().closeFuture().sync();
            } finally {
                eventLoopGroup.shutdownGracefully();
            }
        }
    }
    package com.dawa.netty.sixthexample;
    
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.protobuf.ProtobufDecoder;
    import io.netty.handler.codec.protobuf.ProtobufEncoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
    
    /**
     * @Title: TestClientInitializer
     * @Author: 大娃
     * @Date: 2019/12/3 10:43
     * @Description:
     */
    public class TestClientInitializer extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            //这里和之前的不一样了,但也就是处理器的不一样
            //编解码处理器,protobuf提供的专门的编解码器.4个处理器
            pipeline.addLast(new ProtobufVarint32FrameDecoder());
            //Decoder是重点,解码器,将字节码转换成想要的数据类型
            //参数  messageLite,外层的要转换的类的实例
            pipeline.addLast(new ProtobufDecoder(MyDataInfo.Person.getDefaultInstance()));
            pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
            pipeline.addLast(new ProtobufEncoder());
            //自己的处理器
            pipeline.addLast(new TestClientHandler());
        }
    }
    package com.dawa.netty.sixthexample;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    /**
     * @Title: TestClientHandler
     * @Author: 大娃
     * @Date: 2019/12/3 10:44
     * @Description:
     */
    public class TestClientHandler extends SimpleChannelInboundHandler<MyDataInfo.Person> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.Person msg) throws Exception {
    
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            //处于活动状态
            MyDataInfo.Person person = MyDataInfo.Person.newBuilder().setName("大娃").setAge(22).setAddress("北京").build();
            ctx.channel().writeAndFlush(person);
        }
    }

    这程序是有瑕疵的,解码器那里不通用,耦合性太强,有两个很明显的问题,但是要怎么解决呢?

    会专门写个帖子进行分析,Netty学习——Netty和Protobuf的整合(二)

  • 相关阅读:
    java 数据类型:集合接口Collection之List~ArrayList:remove移除;replaceAll改变原有值;sort排序;迭代器listIterator();
    java 数据类型:集合接口Collection之常用ArrayList;lambda表达式遍历;iterator遍历;forEachRemaining遍历;增强for遍历;removeIf批量操作集合元素(Predicate);
    java 常用类库:格式化NumberFormat;SimpleDataFormat类(转换Data()对象);DateTimeFormatter 转换LocalDateTime时间对象
    java 常用类库:时间类LocalDate;LocalTime;LocalDateTime;Calendar 类;Date ;
    java 常用类库:BigInteger大整数;BigDecimal大小数(解决double精度损失);
    java 常用类库:Math:常用min、max;floor;ceil;random;
    1345. Jump Game IV
    1298. Maximum Candies You Can Get from Boxes
    1293. Shortest Path in a Grid with Obstacles Elimination
    1263. Minimum Moves to Move a Box to Their Target Location
  • 原文地址:https://www.cnblogs.com/bigbaby/p/11978746.html
Copyright © 2011-2022 走看看