介绍:
- 编写网络应用程序时,因为数据在网络中传输的都是二进制字节码数据,在发送数据时就需要编码,接收数据时就需要解码
- codec(编解码器) 的组成部分有两个:decoder(解码器)和 encoder(编码器)。encoder 负责把业务数据转换成字节码数据,decoder 负责把字节码数据转换成业务数据
示意图:
Netty自身提供的一些编解码器:
- Netty 提供的编码器
StringEncoder,对字符串数据进行编码
ObjectEncoder,对 Java 对象进行编码
... - Netty 提供的解码器
StringDecoder, 对字符串数据进行解码
ObjectDecoder,对 Java 对象进行解码
Netty 本身自带的 ObjectDecoder 和 ObjectEncoder 可以用来实现 POJO 对象或各种业务对象的编码和解码,底层使用的仍是 Java 序列化技术 , 而Java 序列化技术本身效率就不高,存在如下问题
- 无法跨语言
- 序列化后的体积太大,是二进制编码的 5 倍多。
- 序列化性能太低
可以使用Google 的Protobuf 技术解决上述问题, 这个后面介绍 先说一下 编解码器的调用机制
用StringEncoder(编码) 和 StringDecoder(解码) 为例:
public class StringEncoder extends MessageToMessageEncoder<CharSequence>
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter
查看源码得知 StringEncoder 间接继承了 ChannelOutboundHandlerAdapter类,所以 他实际上是一个outboundHandler(出站),很好理解,信息往外发送 当时是编码了,以此推断StringDecoder 是一个inboundHandler(入站),读取数据时解码.
编码解码器都是以Handler 处理器注册在pipeLine的调用链中 在读取 发送数据 时进行编解码
自定义编码解码器
服务端:
注册自定义的编码解码器
public class MyServer {
public static void main(String[] args) throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//入站的handler进行解码 MyByteToLongDecoder
pipeline.addLast(new MyByteToLongDecoder());
// pipeline.addLast(new MyByteToLongDecoder2());
//出站的handler进行编码
pipeline.addLast(new MyLongToByteEncoder());
//自定义的handler 处理业务逻辑
pipeline.addLast(new MyServerHandler());
System.out.println("xx");
}
}); //自定义一个初始化类
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
自定义的 解码器 继承 ByteToMessageDecoder 会在自定义Handler类之前执行
在解码器 进行数据解码时,需要判断 缓存区(ByteBuf)的数据是否足够 ,否则接收到的结果会期望结果可能不一致(** 继承ReplayingDecoder<S> 则不需要进行判断 这个类自动检测 数据是否符合 泛型S **):
读取通道中的数据 并调用 该Handler的 decode方法, ByteBuf则为读取到的数据,进行自定义的解码,并添加到List中, 方法结束后 会遍历List中的数据 多次执行下一个Handler
//源码
for (int i = 0; i < numElements; i ++) {
ctx.fireChannelRead(msgs.getUnsafe(i));
}
public class MyByteToLongDecoder extends ByteToMessageDecoder {
/**
*
* @param ctx 上下文对象
* @param in 入站的 ByteBuf
* @param out List 集合,将解码后的数据传给下一个handler . 下一个Handler 接受的数据就为List中的数据
向下一个Handler执行的代码 如果List 数据为多个 则传递多次
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//如果读取的数据 等于 16 则会 分两次执行下一个Handler
System.out.println("MyByteToLongDecoder 被调用");
//因为 long 8个字节, 需要判断有8个字节,才能读取一个long
if(in.readableBytes() >= 8) {
out.add(in.readLong());
}
}
}
自定义编码器 继承MessageToByteEncoder类 泛型为 处理的数据 如果发送的数据不为 Long 则不会调用该方法 :
public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
//编码方法
@Override
protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
System.out.println("MyLongToByteEncoder encode 被调用");
System.out.println("msg=" + msg);
out.writeLong(msg);
}
}
客户端:
public class MyClient {
public static void main(String[] args) throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//加入一个出站的handler 对数据进行一个编码
pipeline.addLast(new MyLongToByteEncoder());
//这时一个入站的解码器(入站handler )
pipeline.addLast(new MyByteToLongDecoder());
// pipeline.addLast(new MyByteToLongDecoder2());
//加入一个自定义的handler , 处理业务
pipeline.addLast(new MyClientHandler());
}
}); //自定义一个初始化类
ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
客户端Handler 通道建立 发送信息 只有发送Long类型的数据才会调用编码器
public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("服务器的ip=" + ctx.channel().remoteAddress());
System.out.println("收到服务器消息=" + msg);
}
//重写channelActive 发送数据
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("MyClientHandler 发送数据");
ctx.writeAndFlush(123456L); //发送的是一个long
// ctx.writeAndFlush(Unpooled.copiedBuffer("abcdabcdabcdabcd",CharsetUtil.UTF_8));
}
}
Netty自带的解码器:
-
LineBasedFrameDecoder:这个类在Netty内部也有使用,它使用行尾控制字符( 或者 )作为分隔符来解析数据。
-
DelimiterBasedFrameDecoder:使用自定义的 特殊字符作为消息的分隔符。
-
HttpObjectDecoder:一个HTTP数据的解码器
-
LengthFieldBasedFrameDecoder:通过指定长度来标识整包消息,这样就可以自动的处理黏包和半包消息。
Protobuf技术
上文中说到,在客户端和服务端传输数据的格式问题上 有着或多或少的问题,而Google Protobuf 就是解决这些问题的. 官方文档
工作机制:
使用 protobuf 编译器能自动生成代码,Protobuf 是将类的定义使用.proto 文件进行描述。然后通过 protoc.exe 编译器根据.proto 自动生成.java 文件.
原理分析:
通过一定的规范 构建你想要表达的业务含义,只要解码端能够支持这种规范,就能还原出对应的 该语言代码(跨语言),而它本身也对序列化过程进行优化,极尽所能的压榨每一寸空间和性能.如netty的 支持此规范的 编码器中.
各个语言的字段表达含义:
protobuf 使用示意图:
简单的小案例
实例代码:
syntax = "proto3"; //版本
option java_outer_classname = "StudentPOJO";//生成的外部类名,同时也是文件名
//protobuf 使用message 管理数据
message Student { //会在 StudentPOJO 外部类生成一个内部类 Student, 他是真正发送的POJO对象
int32 id = 1; // Student 类中有 一个属性 名字为 id 类型为int32(protobuf类型) 1表示属性序号,不是值
string name = 2;
}
使用protobuf编译器解析这个文件,命令protoc.exe --java_out=. Student.proto
生成的java类伪代码:
class StudentPOJO{
class Student{
int id;
String name;
}
}
使用Netty 包装发送此对象
服务端 注册ProtobufDecoder
解码器:
public class NettyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup) //设置两个线程组
.channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数
.childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
// .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup
.childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
//给pipeline 设置处理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//在pipeline加入ProtoBufDecoder
//指定对哪种对象进行解码
pipeline.addLast("decoder", new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
pipeline.addLast(new NettyServerHandler());
}
});
System.out.println(".....服务器 is ready...");
//绑定一个端口并且同步, 生成了一个 ChannelFuture 对象
//启动服务器(并绑定端口)
ChannelFuture cf = bootstrap.bind(6668).sync();
//给cf 注册监听器,监控我们关心的事件
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口 6668 成功");
} else {
System.out.println("监听端口 6668 失败");
}
}
});
//对关闭通道进行监听
cf.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
服务端Handler:
public class NettyServerHandler extends SimpleChannelInboundHandler<StudentPOJO.Student> {
//读取数据实际(这里我们可以读取客户端发送的消息)
/*
1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
2. Object msg: 就是客户端发送的数据 默认Object
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, StudentPOJO.Student msg) throws Exception {
//读取从客户端发送的StudentPojo.Student
System.out.println("客户端发送的数据 id=" + msg.getId() + " 名字=" + msg.getName());
}
//数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
客户端 发送数据给服务端 添加 ProtoBufEncoder
编码器:
public class NettyClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端启动对象
//注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(group) //设置线程组
.channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//在pipeline中加入 ProtoBufEncoder
pipeline.addLast("encoder", new ProtobufEncoder());
pipeline.addLast(new NettyClientHandler()); //加入自己的处理器
}
});
System.out.println("客户端 ok..");
//启动客户端去连接服务器端
//关于 ChannelFuture 要分析,涉及到netty的异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
客户端Handler 构建Student类的过程看代码:
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//当通道就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//发生一个Student 对象到服务器
StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(4).setName("智多星 吴用").build();
//Teacher , Member ,Message
ctx.writeAndFlush(student);
}
//当通道有读取事件时,会触发
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
这个案例有个很大问题 ,就是已经对发送和接受数据的类型定死,下面进行改造:
proto 定义一个包含 Student 或者 Worker的类 发送接受时 根据 枚举字段 获取:
syntax = "proto3";
option optimize_for = SPEED; // 加快解析
option java_package="com.atguigu.netty.codec2"; //指定生成到哪个包下
option java_outer_classname="MyDataInfo"; // 外部类名, 文件名
//protobuf 可以使用message 管理其他的message
message MyMessage {
//定义一个枚举类型( 内部类 ))
enum DataType {
StudentType = 0; //在proto3 要求enum的编号从0开始
WorkerType = 1;
}
//用data_type 来标识传的是哪一个枚举类型
DataType data_type = 1;
//表示每次枚举类型最多只能出现其中的一个, 节省空间 ,这两个属性只能赋值一个 赋值的后一个会替换前一个
oneof dataBody {
Student student = 2;
Worker worker = 3;
}
}
message Student {
int32 id = 1;//Student类的属性
string name = 2; //
}
message Worker {
string name=1;
int32 age=2;
}
java伪代码:
class MyDataInfo{
class MyMessage{
enum DataType{
StudentType,WorkerType
}
DataType data_type;
//Student 和Worker 只可存在一个 设置后一个会替换前一个
Student student;
Worker worker;
}
class Student{
int id;
String name;
}
class Worker{
String name;
int age;
}
}
改造上述案例的 客户端发送信息的方法 channelActive (随机发送一个对象) :
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//随机的发送Student 或者 Workder 对象
int random = new Random().nextInt(3);
MyDataInfo.MyMessage myMessage = null;
if(0 == random) { //发送Student 对象
myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.StudentType).setStudent(MyDataInfo.Student.newBuilder().setId(5).setName("玉麒麟 卢俊义").build()).build();
} else { // 发送一个Worker 对象
myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.WorkerType).setWorker(MyDataInfo.Worker.newBuilder().setAge(20).setName("老李").build()).build();
}
ctx.writeAndFlush(myMessage);
}
改造服务端 添加的 解码器 指定的是包含 Student 和Worker的 MyMessage对象:
...
//指定对哪种对象进行解码(注册解码器)
pipeline.addLast("decoder", new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
...
改造 服务端接受数据的Handler 指定接受数据类型的泛型也为MyMessage对象 ,并根据 DataType字段 获取相应的对象:
//处理Handler类 指定的对象为 MyMessage 类型
public class NettyServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {
//读取数据实际(这里我们可以读取客户端发送的消息)
/*
1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
2. Object msg: 就是客户端发送的数据 默认Object
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {
//根据dataType 来显示不同的信息
MyDataInfo.MyMessage.DataType dataType = msg.getDataType();
if(dataType == MyDataInfo.MyMessage.DataType.StudentType) {
MyDataInfo.Student student = msg.getStudent();
System.out.println("学生id=" + student.getId() + " 学生名字=" + student.getName());
} else if(dataType == MyDataInfo.MyMessage.DataType.WorkerType) {
MyDataInfo.Worker worker = msg.getWorker();
System.out.println("工人的名字=" + worker.getName() + " 年龄=" + worker.getAge());
} else {
System.out.println("传输的类型不正确");
}
}