zoukankan      html  css  js  c++  java
  • netty 详解(五)netty 使用 protobuf 序列化

    录:

    1、编码和解码
    2、Google Protobuf 介绍
    3、案例--netty 使用 protobuf 序列化
        3.1、编写 .proto 文件
        3.2、自动生成代码
        3.3、netty 通过 Protobuf 传递消息
    4、netty 使用 protobuf 传输多种类型对象

    1、编码和解码    <--返回目录

      编写网络应用程序时,因为数据在网络中传输的都是二进制字节码数据,在发送数据时就需要编码,接收数据时就需要解码。

      codec(编解码器) 的组成部分有两个:decoder(解码器) 和 encoder(编码器)。encoder 负责把业务数据转换成字节码数据,decoder 负责把字节码数据转换成业务数据。

      netty 提供的 StringEncoder/StringDecoder 是对字符串数据进行编解码;ObjectEncoder/ObjectDecoder 是对 Java 对象进行编解码。

      ObjectEncoder/ObjectDecoder 可以用来实现 POJO 对象或各种业务对象的编解码,底层使用的是 Java 序列化技术,而 Java 序列化技术本身效率不高,并存在如下问题:

    • 无法跨语言
    • 序列化后的体积太大,是二进制编码的 5 倍多
    • 序列化性能低

      所以,引出新的解决方案:Google 的 Protobuf。

    2、Google Protobuf 介绍    <--返回目录

      参考文档:https://developers.google.com/protocol-buffers/docs/proto

      Protobuf 是 Google 发布的开源项目,全称 Google Protocol Buffers,是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化或者说序列化。它很适合做数据存储或 RPC 数据交换格式。

      支持跨平台/跨语言,支持绝大数语言,例如 c++,c#, Java, python 等。‘

      Protobuf 自动生成代码:

    • 使用 Protobuf 编译器自动生成代码,Protobuf 是将类的定义使用 .proto 文件进行描述。在 IDEA 中编写 .proto 文件时,会自动提示是否下载 .protot 编写插件(protobuf support 插件),可以让语法高亮。
    • 然后通过 protoc.exe 编译器根据 .proto 自动生成 .java 文件

      自动生成 .java 文件 参考:(注意 .proto 文件放在 src/main/proto 目录下)

    3、案例--netty 使用 protobuf 序列化    <--返回目录

      需求:

      1)客户端可以发送一个 User POJO 对象到服务器(通过 protobuf 编码);

      2)服务端能接收 User POJO 对象,并显示信息(通过 protobuf 解码);

    3.1、编写 .proto 文件    <--返回目录

      src/main/proto/User.proto

    syntax = "proto3";
    option java_package = "com.oy.protobuf";
    option java_outer_classname = "UserModel";// 生成的外部类名,同时也是文件名
    
    // protobuf 使用 message 管理数据
    message User { // 会在 UserModel 里面生成一个内部类 User,即是真正发送的 POJO 对象
        int32 id = 1;
        string name = 2;
    }

    3.2、自动生成代码    <--返回目录

      pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.example</groupId>
        <artifactId>netty-helloworld</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <java.version>1.8</java.version>
            <grpc.version>1.14.0</grpc.version>
            <protobuf.version>3.3.0</protobuf.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.22.Final</version>
            </dependency>
    
            <dependency>
                <groupId>io.grpc</groupId>
                <artifactId>grpc-netty</artifactId>
                <version>${grpc.version}</version>
            </dependency>
            <dependency>
                <groupId>io.grpc</groupId>
                <artifactId>grpc-protobuf</artifactId>
                <version>${grpc.version}</version>
            </dependency>
            <dependency>
                <groupId>io.grpc</groupId>
                <artifactId>grpc-stub</artifactId>
                <version>${grpc.version}</version>
            </dependency>
    
            <!--<dependency>
                <groupId>com.google.protobuf</groupId>
                <artifactId>protobuf-java</artifactId>
                <version>${protobuf.version}</version>
            </dependency>-->
        </dependencies>
    
        <build>
            <extensions>
                <extension>
                    <groupId>kr.motd.maven</groupId>
                    <artifactId>os-maven-plugin</artifactId>
                    <version>1.5.0.Final</version>
                </extension>
            </extensions>
    
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>8</source>
                        <target>8</target>
                    </configuration>
                </plugin>
    
                <plugin>
                    <groupId>org.xolstice.maven.plugins</groupId>
                    <artifactId>protobuf-maven-plugin</artifactId>
                    <version>0.5.1</version>
                    <configuration>
                        <protocArtifact>com.google.protobuf:protoc:3.5.1:exe:${os.detected.classifier}</protocArtifact>
                        <pluginId>grpc-java</pluginId>
                        <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.14.0:exe:${os.detected.classifier}</pluginArtifact>
                    </configuration>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>compile-custom</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </project>
    View Code

      执行 mvn clean compile 命令

    3.3、netty 通过 Protobuf 传递消息    <--返回目录

       Server

    package com.oy.protobuf;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.protobuf.ProtobufDecoder;
    import io.netty.handler.codec.protobuf.ProtobufEncoder;
    import io.netty.util.CharsetUtil;
    
    import java.net.InetSocketAddress;
    import java.util.Date;
    
    public class Server {
        private int port;
    
        public static void main(String[] args) {
            new Server(8003).start();
        }
    
        public Server(int port) {
            this.port = port;
        }
    
        public void start() {
            EventLoopGroup boss = new NioEventLoopGroup(1);
            EventLoopGroup work = new NioEventLoopGroup();
    
            try {
                ServerBootstrap server = new ServerBootstrap()
                        .group(boss, work)
                        .channel(NioServerSocketChannel.class)
                        //.localAddress(new InetSocketAddress(port))
                        //.option(ChannelOption.SO_BACKLOG, 128)
                        //.childOption(ChannelOption.SO_KEEPALIVE, true)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast("encoder", new ProtobufEncoder()); // protobuf 编码器
                                // 需要指定要对哪种对象进行解码
                                pipeline.addLast("decoder", new ProtobufDecoder(UserModel.User.getDefaultInstance()));
                                pipeline.addLast(new NettyServerHandler());
                            }
                        });
    
                // 绑定端口
                ChannelFuture future = server.bind(port).sync();
                System.out.println("server started and listen " + port);
                future.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                boss.shutdownGracefully();
                work.shutdownGracefully();
            }
        }
    
        public static class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                System.out.println("HelloWorldServerHandler active");
            }
    
            @Override
            public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
                System.out.println("server channelRead...");
                // 读取客户端发送的数据 UserMOdel.User
                UserModel.User user = (UserModel.User) msg;
                System.out.println("客户端发送的数据: " + user.getId() + "--" + user.getName());
            }
    
            /**
             * 数据读取完毕
             */
            @Override
            public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                UserModel.User user = UserModel.User.newBuilder().setId(20).setName("服务器").build();
                ctx.writeAndFlush(user);
            }
    
            /**
             * 处理异常,关闭通道
             */
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                ctx.channel().close();
            }
        }
    }

      Client

    package com.oy.protobuf;
    
    import com.oy.helloworld.NettyClient;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.protobuf.ProtobufDecoder;
    import io.netty.handler.codec.protobuf.ProtobufEncoder;
    import io.netty.util.CharsetUtil;
    
    public class Client {
        private static final String HOST = "127.0.0.1";
        private static final int PORT = 8003;
    
        public static void main(String[] args) {
            new Client().start(HOST, PORT);
        }
    
        public void start(String host, int port) {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap client = new Bootstrap()
                        .group(group)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast("encoder", new ProtobufEncoder()); // protobuf 编码器
                                // 需要指定要对哪种对象进行解码
                                pipeline.addLast("decoder", new ProtobufDecoder(UserModel.User.getDefaultInstance()));
                                pipeline.addLast();
                                pipeline.addLast(new NettyClientHandler());
                            }
                        });
    
                ChannelFuture future = client.connect(host, port).sync();
                future.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                group.shutdownGracefully();
            }
        }
    
        public static class NettyClientHandler extends ChannelInboundHandlerAdapter {
            /**
             * 通道就绪触发该方法
             */
            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                System.out.println("HelloWorldClientHandler Active");
                // 发送 User POJO 对象到服务器
                UserModel.User user = UserModel.User.newBuilder().setId(10).setName("客户端张三").build();
                ctx.writeAndFlush(user);
            }
    
            /**
             * 当通道有读取事件时触发该方法
             */
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                // 读取服务器发送的数据 UserMOdel.User
                UserModel.User user = (UserModel.User) msg;
                System.out.println("收到服务器响应: " + user.getId() + "--" + user.getName());
            }
        }
    }

      启动服务端和客户端程序:

     

    4、netty 使用 protobuf 传输多种类型对象    <--返回目录

      MyDataInfo.proto

    • 使用 message 管理其他的 message
    • 定义的 MyMessage 的第一个参数是枚举类型,标识传递的是 Student 还是 Teacher;第二个参数是 Student 或 Teacher 中的一个;
    syntax = "proto3";
    option optimize_for = SPEED; // 加快解析
    option java_package = "com.oy.protobuf2";
    option java_outer_classname = "MyDataInfo";// 生成的外部类名,同时也是文件名
    
    // protobuf 可以使用 message 管理其他的 message
    message MyMessage {
        // 定义一个枚举类型
        enum DataType {
            teacherType = 0;
            studentType = 1;
        }
        // 用 data_type 来标识传的是哪一个枚举类型
        DataType data_type = 1;
        // 表示每次枚举类型最多只能出现其中一个,节省空间
        oneof dataBody {
            Teacher teacher = 2;
            Student student = 3;
        }
    }
    
    // protobuf 使用 message 管理数据
    message Teacher {
        int32 id = 1;
        string name = 2;
    }
    message Student {
        int32 id = 1;
        string name = 2;
    }

      Server

    package com.oy.protobuf2;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.protobuf.ProtobufDecoder;
    import io.netty.handler.codec.protobuf.ProtobufEncoder;
    
    public class Server {
        private int port;
    
        public static void main(String[] args) {
            new Server(8004).start();
        }
    
        public Server(int port) {
            this.port = port;
        }
    
        public void start() {
            EventLoopGroup boss = new NioEventLoopGroup(1);
            EventLoopGroup work = new NioEventLoopGroup();
    
            try {
                ServerBootstrap server = new ServerBootstrap()
                        .group(boss, work)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast("encoder", new ProtobufEncoder()); // protobuf 编码器
                                // 需要指定要对哪种对象进行解码
                                pipeline.addLast("decoder", new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
                                pipeline.addLast(new NettyServerHandler());
                            }
                        });
    
                // 绑定端口
                ChannelFuture future = server.bind(port).sync();
                System.out.println("server started and listen " + port);
                future.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                boss.shutdownGracefully();
                work.shutdownGracefully();
            }
        }
    
        public static class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                System.out.println("HelloWorldServerHandler active");
            }
    
            @Override
            public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
                System.out.println("server channelRead...");
                // 读取客户端发送的数据 Student POJO
                MyDataInfo.MyMessage myMessage = (MyDataInfo.MyMessage) msg;
                System.out.println("客户端发送的数据: " + myMessage.getDataType() + "--"
                        + myMessage.getStudent().getId() + "--" + myMessage.getStudent().getName());
            }
    
            /**
             * 数据读取完毕
             */
            @Override
            public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                // 服务端返回 Teacher POJO
                MyDataInfo.Teacher teacher = MyDataInfo.Teacher.newBuilder().setId(222).setName("老师").build();
                MyDataInfo.MyMessage myMessage = MyDataInfo.MyMessage.newBuilder()
                        .setDataType(MyDataInfo.MyMessage.DataType.teacherType).setTeacher(teacher)
                        .build();
                ctx.writeAndFlush(myMessage);
            }
    
            /**
             * 处理异常,关闭通道
             */
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                ctx.channel().close();
            }
        }
    }

      Client

    package com.oy.protobuf2;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.protobuf.ProtobufDecoder;
    import io.netty.handler.codec.protobuf.ProtobufEncoder;
    
    public class Client {
        private static final String HOST = "127.0.0.1";
        private static final int PORT = 8004;
    
        public static void main(String[] args) {
            new Client().start(HOST, PORT);
        }
    
        public void start(String host, int port) {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap client = new Bootstrap()
                        .group(group)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast("encoder", new ProtobufEncoder()); // protobuf 编码器
                                // 需要指定要对哪种对象进行解码
                                pipeline.addLast("decoder", new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
                                pipeline.addLast();
                                pipeline.addLast(new NettyClientHandler());
                            }
                        });
    
                ChannelFuture future = client.connect(host, port).sync();
                future.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                group.shutdownGracefully();
            }
        }
    
        public static class NettyClientHandler extends ChannelInboundHandlerAdapter {
            /**
             * 通道就绪触发该方法
             */
            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                System.out.println("HelloWorldClientHandler Active");
                // 发送 Student POJO 对象到服务器
                MyDataInfo.Student student = MyDataInfo.Student.newBuilder().setId(111).setName("学生").build();
                MyDataInfo.MyMessage myMessage = MyDataInfo.MyMessage.newBuilder()
                        .setDataType(MyDataInfo.MyMessage.DataType.studentType).setStudent(student)
                        .build();
                ctx.writeAndFlush(myMessage);
            }
    
            /**
             * 当通道有读取事件时触发该方法
             */
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                // 读取服务器发送的数据,服务器发送的是 Teacher POJO
                MyDataInfo.MyMessage myMessage = (MyDataInfo.MyMessage) msg;
                System.out.println("服务器返回的数据: " + myMessage.getDataType() + "--"
                        + myMessage.getTeacher().getId() + "--" + myMessage.getTeacher().getName());
            }
        }
    }

      测试:客户端给服务器发送的是 Student POJO,服务器返回的是 Teacher POJO

     ---

  • 相关阅读:
    linux查看系统类型和版本
    javascript 中的继承实现, call,apply,prototype,构造函数
    redis原理分析
    HashTable 简述
    算法之 快速排序
    react js 之生命周期
    Java源代码编译过程
    Java字节码文件结构---概述
    Java程序运行时内存划分
    数据结构--汉诺塔--借助栈实现非递归---Java
  • 原文地址:https://www.cnblogs.com/xy-ouyang/p/12825132.html
Copyright © 2011-2022 走看看