Netty的ByteBuf
数据容器 ByteBuf
注意:通过索引来访问ByteBuf时并不会改变真实的读索引与写索引,我们可以通过ByteBuf的readerIndex(int i)与WriterIndex(int i)方法分别直接修改读索引与写索引
Netty ByteByf所提供的3种缓冲区类型
- heap buffer
- direct buffer
- composite buffer ,复合缓冲区,可以被认为是一个容器/列表,容器的里面可以承载多个heap buffer与direct buffer,所容纳的缓冲区类型可以是一样的也可以是不一样的
Heap Buffer (堆缓冲区)
这是最常用的类型,ByteBuf将数据存储到JVM的堆空间中,并且将实际的数据存放到byte array中来实现
优点:由于数据是存储在JVM的堆中,因此可以快速的创建与快速的释放,并且它提供了直接访问内部字节数组的方法。
缺点:每次读写数据时,都需要先将数据复制到直接缓冲区中再进行网络传输。
Direct Buffer (直接缓冲区)
在堆之外直接分配内存空间,直接缓冲区并不会占用堆的容量空间,因为它是由操作系统在本地内存进行的数据分配。
优点:在使用Socket进行数据传递时,性能非常好,因为数据直接位于操作系统的本地内存中,所以不需要从JVM中将数据复制到直接缓冲区中,减少了一次数据拷贝。
缺点:因为Direct Buffer是直接在操作系统内存中的,所以内存空间的分配与释放要比堆空间更加复杂,而且速度要慢一些
Netty通过提供内存池的方式来解决这个问题(分配,释放),直接缓冲区并不支持通过字节数组来访问数据
重点:对于后端的业务消息来编解码来说,推荐使用HeapByteBuf,对于I/O通信线程在读写缓冲区时,推荐使用DirectByteBuf
在通过socket将数据发送给网络上的其它机器的时候,一定都是通过直接缓冲区发送的,不可能通过堆缓冲区。如果是堆缓冲区那么它一定会先复制到直接缓冲区然后再进行数据的传递
Composite Buffer (复合缓冲区)
比如有些情况我们就需要将两个或多个buffer合并起来,然后以一致的方式进行操作和传递
JDK的ByteBuffer与Netty的ByteBuf之间的差异对比
- Netty的ByteBuf采用了读写索引分离的策略(readerIndex与writerIndex),一个初始化(里面尚未有任何数据时)的ByteBuf的readerIndex与writerIndex的值都为0
- 当读索引与写索引处于同一个位置时,如果我们继续读取,那么就会抛出IndexOutOfBoundsException
- 对于ByteBuf的任何读写操作都会分别单独维护读索引与写索引
- ByteBuf可以自动扩容大小,可以指定maxCapacity来设置最大的扩容大小,maxCapacity最大容量默认的限制就是Integer.MA_VALUE (就和Java的ArrayList的性质一样)
程序示例
程序1 读写ByteBuf
public class ByteBufTest1 {
public static void main(String[] args) {
//创建一个长度为10的ByteBuf,Unpooled表示非池化的,用完就释放掉
ByteBuf buffer = Unpooled.buffer(10);
for (int i = 0; i < 10; i++) {
//向ByteBuf中写入数据
buffer.writeByte(i);
}
//绝对方法 不会改变readerIndex
for (int i = 0; i < buffer.capacity(); i++) {
System.out.print(buffer.getByte(i));
}
System.out.println("
-----------------------");
//相对方法 会改变readerIndex 每读一次readerIndex 自增1 进入下一个位置上
for (int i = 0; i < buffer.capacity(); i++) {
System.out.print(buffer.readByte());
}
}
}
程序2 字符串创建一个ByteBuf
使用一个字符串创建一个ByteBuf 并通过不同的方式进行获取ByteBuf里面的内容
public class ByteBufTest2 {
public static void main(String[] args) {
//使用copiedBuffer的重载方法 (CharSequence string, Charset charset);生成ByteBuf
ByteBuf byteBuf = Unpooled.copiedBuffer("张hello world", Charset.forName("UTF-8"));
//判断这个byteBuf是否是一个heapByteBuf(堆上buffer,底层由一个字节数组维护)
//如果是则可以直接通过array()获取到这个底层存放数据的字节数组
if(byteBuf.hasArray()){
byte[] content = byteBuf.array();
System.out.println(new String(content,Charset.forName("UTF-8")));
System.out.println(byteBuf);
System.out.println(byteBuf.arrayOffset());
System.out.println(byteBuf.readerIndex());
System.out.println(byteBuf.writerIndex());
System.out.println(byteBuf.capacity());
int length = byteBuf.readableBytes();
System.out.println("可读字节长度 = " + length);
for (int i = 0; i < byteBuf.readableBytes(); i++) {
System.out.println((char)byteBuf.getByte(i));
}
//getCharSequence(起始索引,读取几个字节,编码方式) 返回一个字符
System.out.println(byteBuf.getCharSequence(0,4,Charset.forName("UTF-8")));
System.out.println(byteBuf.getCharSequence(4,6,Charset.forName("UTF-8")));
}
}
}
程序3 Netty复合缓冲区的使用
public class ByteBufTest3 {
public static void main(String[] args) {
//创建一个非池化的,不包含任何buffer的复合缓冲区
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
ByteBuf heapBuf = Unpooled.buffer(10);
ByteBuf directBuf = Unpooled.directBuffer(8);
compositeByteBuf.addComponents(heapBuf,directBuf);
//移除下标为0的ByteBuf组件
//compositeByteBuf.removeComponent(0);
Iterator<ByteBuf> iterator = compositeByteBuf.iterator();
while (iterator.hasNext()){
System.out.println(iterator.next());
}
System.out.println("-----------------------");
compositeByteBuf.forEach(System.out::println);
}
}
程序4 AtomicIntegerFieldUpdater的使用
AtomicIntegerFieldUpdater<T
基于反射的功能类,它可以实现对于指定类的指定了volatile的int属性/字段的原子更新。
此类设计用于原子数据结构,在该原子数据结构中,同一节点的多个字段/成员变量将独立进行原子更新。
注意的保证compareAndSet这一类方法比其他原子类弱。 因为此类不能确保该领域的所有应用都适合于原子访问目的,它可以保证原子只能相对于其他调用compareAndSet和set在同一个更新。
请注意,此类中的compareAndSet()方法的保证要弱于其他原子类。因为此类不能确保对字段的所有使用都适合于原子访问,
所以它可以仅针对同一更新程序上的其他compareAndSet()和set()调用)保证原子性。
它只能在其它地方也使用compareAndSet()或set()更新时,才能确保原子性
T 拥有待更新字段的对象的类型
public class AtomicUpdaterTest {
public static void main(String[] args) {
Person person = new Person();
// for (int i = 0; i < 10; i++) {
// Thread thread = new Thread(() -> {
// try {
// Thread.sleep(20);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
//
// System.out.println(person.age++);
// });
// thread.start();
// }
AtomicIntegerFieldUpdater<Person> integerFieldUpdater =
AtomicIntegerFieldUpdater.newUpdater(Person.class, "age");
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(integerFieldUpdater.getAndIncrement(person));
});
thread.start();
}
}
}
class Person{
volatile int age = 1;
}