Google Protobuf
编码和解码的基本介绍
- GitHub
- 编写网络应用程序时,因为数据在网络中传输的都是二进制字节码数据,在发送数据时就需要编码,接收数据时就需要解码
- codec(编解码器) 的组成部分有两个:encoder(编码器) 和 decoder(解码器)。encoder 负责把业务数据转换成字节码数据,decoder 负责把字节码数据转换成业务数据
Netty 本身的编码解码的机制和问题分析
- Netty 自身提供了一些 codec(编解码器)
- Netty 提供的编码器
StringEncoder
,对字符串数据进行编码ObjectEncoder
,对 Java 对象进行编码- ...
- Netty 提供的解码器
StringDecoder
, 对字符串数据进行解码ObjectDecoder
,对 Java 对象进行解码- ...
- Netty 本身自带的
ObjectDecoder
和ObjectEncoder
可以用来实现 POJO 对象或各种业务对象的编码和解码,底层使用的仍是 Java 序列化技术 , 而Java 序列化技术本身效率就不高,存在如下问题:- 无法跨语言
- 序列化后的体积太大,是二进制编码的 5 倍多。
- 序列化性能太低
- 引出 新的解决方案 [Google 的 Protobuf]
Protobuf
Protobuf基本介绍和使用示意图
-
Protobuf 是 Google 发布的开源项目,全称 Google Protocol Buffers,是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化,或者说序列化。它很适合做数据存储或 RPC[远程过程调用 remote procedure call ] 数据交换格式 。
目前很多公司 http+json → tcp+protobuf
-
Protobuf 是以 message 的方式来管理数据的.
-
支持跨平台、跨语言,即[客户端和服务器端可以是不同的语言编写的] (支持目前绝大多数语言,例如 C++、C#、Java、python 等)
-
高性能,高可靠性
-
使用 protobuf 编译器能自动生成代码,Protobuf 是将类的定义使用 .proto 文件进行描述。说明,在idea 中编写 .proto 文件时,会自动提示是否下载 .ptotot 编写插件. 可以让语法高亮。
-
然后通过 protoc.exe 编译器根据 .proto 自动生成 .java 文件
-
protobuf 使用示意图
-
字段类型对应
.proto Type Notes C++ Type Java Type double double double float float float int32 使用可变长度编码, 对负数编码效率低下,如果您的字段可能有负值, 则使用sint32代替. int32 int int64 使用可变长度编码, 对负数编码效率低下,如果您的字段可能有负值, 则使用sint64代替. int64 long uint32 使用可变长度编码 uint32 int uint64 使用可变长度编码 uint64 long sint32 使用可变长度编码,有符号的int值这些编码比常规int32更有效地编码负数 uint32 int sint64 使用可变长度编码,有符号的int值这些编码比常规int64更有效地编码负数 int64 long fixed32 四个字节, 如果值通常大于2的28次方, 则比uint32更有效 uint32 int fixed64 四个字节, 如果值通常大于2的56次方, 则比uint64更有效 uint64 long sfixed32 四个字节 int32 int sfixed64 四个字节 int64 long bool bool boolean string 字符串必须始终包含UTF-8编码或7位ASCII文本 string String bytes 字符串必须始终包含UTF-8编码或7位ASCII文本 string ByteString
Protobuf快速入门实例
编写程序,使用Protobuf完成如下功能:
- 客户端可以发送一个Student PoJo 对象到服务器 (通过 Protobuf 编码)
- 服务端能接收Student PoJo 对象,并显示信息(通过 Protobuf 解码)
Student.proto
syntax = "proto3"; //版本
option java_outer_classname = "StudentPOJO";//生成的外部类名,同时也是文件名
//protobuf 使用message 管理数据
message Student { //会在 StudentPOJO 外部类生成一个内部类 Student, 他是真正发送的POJO对象
int32 id = 1; // Student 类中有 一个属性 名字为 id 类型为int32(protobuf类型) 1表示属性序号,不是值
string name = 2;
}
使用 protobuf 生成 Java 文件
protoc.exe --java_out=. Student.proto
服务端
@Slf4j
public class NettyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true).childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
//给pipeline 设置处理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//在pipeline加入ProtoBufDecoder
//指定对哪种对象进行解码
pipeline.addLast("decoder", new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
pipeline.addLast(new NettyServerHandler());
}
});
log.info(".....服务器 is ready...");
ChannelFuture cf = bootstrap.bind(6668).sync();
//给cf 注册监听器,监控我们关心的事件
cf.addListener((ChannelFutureListener) future -> {
if (cf.isSuccess()) {
log.info("监听端口 6668 成功");
} else {
log.info("监听端口 6668 失败");
}
});
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
服务端 Handler
@Slf4j
public class NettyServerHandler extends SimpleChannelInboundHandler<StudentPOJO.Student> {
@Override
public void channelRead0(ChannelHandlerContext ctx, StudentPOJO.Student msg) throws Exception {
log.info("客户端发送的数据 id=" + msg.getId() + " 名字=" + msg.getName());
}
//数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
客户端
@Slf4j
public class NettyClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//在pipeline中加入 ProtoBufEncoder
pipeline.addLast("encoder", new ProtobufEncoder());
pipeline.addLast(new NettyClientHandler());
}
});
log.info("客户端 ok..");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
客户端 Handler
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//当通道就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//发生一个Student 对象到服务器
StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(4).setName("智多星 吴用").build();
ctx.writeAndFlush(student);
}
//当通道有读取事件时,会触发
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.info("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
log.info("服务器的地址: " + ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
Protobuf快速入门实例2
编写程序,使用Protobuf完成如下功能
- 客户端可以随机发送Student PoJo/ Worker PoJo 对象到服务器 (通过 Protobuf 编码)
- 服务端能接收Student PoJo/ Worker PoJo 对象(需要判断是哪种类型),并显示信息(通过 Protobuf 解码)
Student.proto
syntax = "proto3";
option optimize_for = SPEED; // 加快解析
option java_package = "com.atguigu.netty.codec2"; //指定生成到哪个包下
option java_outer_classname = "MyDataInfo"; // 外部类名, 文件名
//protobuf 可以使用message 管理其他的message
message MyMessage {
//定义一个枚举类型
enum DataType {
StudentType = 0; //在proto3 要求enum的编号从0开始
WorkerType = 1;
}
//用data_type 来标识传的是哪一个枚举类型
DataType data_type = 1;
//表示每次枚举类型最多只能出现其中的一个, 节省空间
oneof dataBody {
Student student = 2;
Worker worker = 3;
}
}
message Student {
int32 id = 1;//Student类的属性
string name = 2; //
}
message Worker {
string name = 1;
int32 age = 2;
}
服务端
@Slf4j
public class NettyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true).childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
//给pipeline 设置处理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//在pipeline加入ProtoBufDecoder
//指定对哪种对象进行解码
pipeline.addLast("decoder", new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
pipeline.addLast(new NettyServerHandler());
}
});
log.info(".....服务器 is ready...");
ChannelFuture cf = bootstrap.bind(6668).sync();
cf.addListener((ChannelFutureListener) future -> {
if (cf.isSuccess()) {
log.info("监听端口 6668 成功");
} else {
log.info("监听端口 6668 失败");
}
});
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
服务端 Handler
@Slf4j
public class NettyServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {
@Override
public void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {
//根据dataType 来显示不同的信息
MyDataInfo.MyMessage.DataType dataType = msg.getDataType();
if (dataType == MyDataInfo.MyMessage.DataType.StudentType) {
MyDataInfo.Student student = msg.getStudent();
log.info("学生id=" + student.getId() + " 学生名字=" + student.getName());
} else if (dataType == MyDataInfo.MyMessage.DataType.WorkerType) {
MyDataInfo.Worker worker = msg.getWorker();
log.info("工人的名字=" + worker.getName() + " 年龄=" + worker.getAge());
} else {
log.info("传输的类型不正确");
}
}
//数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
客户端
@Slf4j
public class NettyClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//在pipeline中加入 ProtoBufEncoder
pipeline.addLast("encoder", new ProtobufEncoder());
pipeline.addLast(new NettyClientHandler());
}
});
log.info("客户端 ok..");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
客户端 Handler
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//随机的发送Student 或者 Workder 对象
int second = LocalTime.now().getSecond();
log.info("LocalTime.now().getSecond() = {}", second);
long random = second % 2;
MyDataInfo.MyMessage myMessage = null;
if (0 == random) {
//发送Student 对象
myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.StudentType).setStudent(MyDataInfo.Student.newBuilder().setId(5).setName("玉麒麟 卢俊义").build()).build();
} else {
// 发送一个Worker 对象
myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.WorkerType).setWorker(MyDataInfo.Worker.newBuilder().setAge(20).setName("老李").build()).build();
}
ctx.writeAndFlush(myMessage);
}
//当通道有读取事件时,会触发
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.info("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
log.info("服务器的地址: " + ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}