# 精尽 Netty 源码解析 —— Buffer 之 ByteBuf(二)核心子类 # 1. 概述 在 [《精尽 Netty 源码解析 —— ByteBuf(一)之简介》](http://svip.iocoder.cn/Netty/ByteBuf-1-1-ByteBuf-intro/) 中,我们对 ByteBuf 有了整体的认识,特别是核心 API 部分。同时,我们也看到,ByteBuf 有非常非常非常多的子类,那么怎么办呢?实际上,**ByteBuf 有 8 个最最最核心的子类实现**。如下图所示:[![核心子类](36-Netty 源码解析-Buffer 之 ByteBuf(二)核心子类.assets/01.png)](http://static.iocoder.cn/images/Netty/2018_08_04/01.png)核心子类 一共可以按照三个维度来看这 8 个核心子类,刚好是 2 x 2 x 2 = 8 : - 按照 内存类型 分类: - ① 堆内存字节缓冲区( **Heap**ByteBuf ):底层为 JVM 堆内的字节数组,其特点是申请和释放效率较高。但是如果要进行 Socket 的 I/O 读写,需要额外多做一次内存复制,需要将堆内存对应的缓冲区复制到内核 Channel 中,性能可能会有一定程度的损耗。 - ② 直接内存字节缓冲区( **Direct**ByteBuf ):堆外内存,为操作系统内核空间的字节数组,它由操作系统直接管理和操作,其申请和释放的效率会慢于堆缓冲区。但是将它写入或者从 SocketChannel 中读取时,会少一次内存复制,这样可以大大提高 I/O 效率,实现零拷贝。 - 关于这两者的对比,感兴趣的胖友,可以再看看 [《Java NIO direct buffer 的优势在哪儿?》](https://www.zhihu.com/question/60892134) 和 [《JAVA NIO 之 Direct Buffer 与 Heap Buffer的区别?》](http://eyesmore.iteye.com/blog/1133335) - 按照 对象池 分类: - ① 基于对象池( **Pooled**ByteBuf ):基于对象池的 ByteBuf 可以重用 ByteBuf ,也就是说它自己内部维护着一个对象池,当对象释放后会归还给对象池,这样就可以循环地利用创建的 ByteBuf,提升内存的使用率,降低由于高负载导致的频繁 GC。当需要大量且频繁创建缓冲区时,推荐使用该类缓冲区。 - ② 不使用对象池( **Unpooled**ByteBuf ):对象池的管理和维护会比较困难,所以在不需要创建大量缓冲区对象时,推荐使用此类缓冲区。 - 按照 Unsafe 分类: - ① 使用 Unsafe :基于 Java `sun.misc.Unsafe.Unsafe` 的 API ,直接访问内存中的数据。 - ② 不使用 Unsafe : 基于 **Heap**ByteBuf 和 **Direct**ByteBuf 的标准 API ,进行访问对应的数据。 - 关于 Unsafe ,JVM 大佬 R 大在知乎上有个回答:[《为什么 JUC 中大量使用了 sun.misc.Unsafe 这个类,但官方却不建议开发者使用?》](https://www.zhihu.com/question/29266773) 。关于为什么 Unsafe 的性能会更好:”其中一种是嫌 Java 性能不够好,例如说数组访问的边界检查语义,嫌这个开销太大,觉得用 Unsafe 会更快;”。 默认情况下,使用 PooledUnsafeDirectByteBuf 类型。所以,重点重点重点,看 [「2.4 PooledUnsafeDirectByteBuf」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 。 # 2. PooledByteBuf `io.netty.buffer.PooledByteBuf` ,继承 AbstractReferenceCountedByteBuf 抽象类,**对象池化**的 ByteBuf 抽象基类,为基于**对象池**的 ByteBuf 实现类,提供公用的方法。 关于 `io.netty.util.AbstractReferenceCountedByteBuf` 抽象类,对象引用计数器抽象类。本文暂时不解析,我们会在 [《精尽 Netty 源码解析 —— Buffer 之 ByteBuf(三)内存泄露检测》](http://svip.iocoder.cn/Netty/ByteBuf-1-3-ByteBuf-resource-leak-detector/) 详细解析。 ## 2.1 内部方法 ### 2.1.1 构造方法 ``` /** * Recycler 处理器,用于回收对象 */ private final Recycler.Handle> recyclerHandle; /** * Chunk 对象 */ protected PoolChunk chunk; /** * 从 Chunk 对象中分配的内存块所处的位置 */ protected long handle; /** * 内存空间。具体什么样的数据,通过子类设置泛型。 */ protected T memory; /** * {@link #memory} 开始位置 * * @see #idx(int) */ protected int offset; /** * 容量 * * @see #capacity() */ protected int length; /** * 占用 {@link #memory} 的大小 */ int maxLength; /** * TODO 1013 Chunk */ PoolThreadCache cache; /** * 临时 ByteBuff 对象 * * @see #internalNioBuffer() */ private ByteBuffer tmpNioBuf; /** * ByteBuf 分配器对象 */ private ByteBufAllocator allocator; @SuppressWarnings("unchecked") protected PooledByteBuf(Recycler.Handle> recyclerHandle, int maxCapacity) { super(maxCapacity); this.recyclerHandle = (Handle>) recyclerHandle; } ``` - `recyclerHandle` 属性,Recycler 处理器,用于回收**当前**对象。 - ``` chunk ``` 属性,PoolChunk 对象。在 Netty 中,使用 Jemalloc 算法管理内存,而 Chunk 是里面的一种 内存块 。在这里,我们可以理解 ``` memory ``` 所属的 PoolChunk 对象。 - `handle` 属性,从 Chunk 对象中分配的内存块所处的位置。具体的,胖友后面仔细看看 [《精尽 Netty 源码解析 —— Buffer 之 Jemalloc(二)PoolChunk》](http://svip.iocoder.cn/Netty/ByteBuf-3-2-Jemalloc-chunk/) 和 [《精尽 Netty 源码解析 —— Buffer 之 Jemalloc(三)PoolSubpage》](http://svip.iocoder.cn/Netty/ByteBuf-3-3-Jemalloc-subpage/) 。 - ``` memory ``` 属性,内存空间。具体什么样的数据,通过子类设置泛型( ``` T ``` )。例如:1) PooledDirectByteBuf 和 PooledUnsafeDirectByteBuf 为 ByteBuffer ;2) PooledHeapByteBuf 和 PooledUnsafeHeapByteBuf 为 ``` byte[] ``` 。 - `offset` 属性,使用 `memory` 的开始位置。 - `maxLength` 属性,**最大**使用 `memory` 的长度( 大小 )。 - `length` 属性,**目前**使用 `memory` 的长度( 大小 )。 - 😈 因为 `memory` 属性,可以被**多个** ByteBuf 使用。**每个** ByteBuf 使用范围为 `[offset, maxLength)` 。 - `cache` 属性,TODO 1013 Chunk - `tmpNioBuf` 属性,临时 ByteBuff 对象,通过 `#tmpNioBuf()` 方法生成。详细解析,见 [「2.1.9 internalNioBuffer」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 。 - `allocator` 属性,ByteBuf 分配器。 ### 2.1.2 init0 `#init0(PoolChunk chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache)` 方法,初始化 PooledByteBuf 对象。代码如下: ``` private void init0(PoolChunk chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) { assert handle >= 0; assert chunk != null; // From PoolChunk 对象 this.chunk = chunk; memory = chunk.memory; allocator = chunk.arena.parent; // 其他 this.cache = cache; this.handle = handle; this.offset = offset; this.length = length; this.maxLength = maxLength; tmpNioBuf = null; } ``` 仔细的胖友,可能会发现,这是一个 `private` 私有方法。目前它被两个方法调用: - ① `#init(PoolChunk chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache)` 方法,一般是基于 **pooled** 的 PoolChunk 对象,初始化 PooledByteBuf 对象。代码如下: ``` void init(PoolChunk chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) { init0(chunk, handle, offset, length, maxLength, cache); } ``` - ② `#initUnpooled(PoolChunk chunk, int length)` 方法,基于 **unPoolooled** 的 PoolChunk 对象,初始化 PooledByteBuf 对象。代码如下: ``` void initUnpooled(PoolChunk chunk, int length) { init0(chunk, 0, chunk.offset, length, length, null); } ``` - 例如说 **Huge** 大小的 PoolChunk 对象。 - 注意,传入的给 `#init0(...)` 方法的 `length` 和 `maxLength` 方法参数,**都是** `length` 。 可能胖友读到此处会一脸懵逼。其实,这是很正常的。可以在看完 [《精尽 Netty 源码解析 —— Buffer 之 Jemalloc(二)PoolChunk》](http://svip.iocoder.cn/Netty/ByteBuf-3-2-Jemalloc-chunk/) 后,在回过头来,理解理解。 ### 2.1.3 reuse `#reuse(int maxCapacity)` 方法,每次在重用 PooledByteBuf 对象时,需要调用该方法,重置属性。代码如下: ``` /** * Method must be called before reuse this {@link PooledByteBufAllocator} */ final void reuse(int maxCapacity) { // 设置最大容量 maxCapacity(maxCapacity); // 设置引用数量为 0 setRefCnt(1); // 重置读写索引为 0 setIndex0(0, 0); // 重置读写标记位为 0 discardMarks(); } ``` 也就是说,该方法在 [「2.1.2 init9」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) **之前**就调用了。在下文中,我们会看到,该方法的调用。 ### 2.1.4 capacity `#capacity()` 方法,获得容量。代码如下: ``` @Override public final int capacity() { return length; } ``` **当前**容量的值为 `length` 属性。 但是,要注意的是,`maxLength` 属性,**不是表示最大容量**。`maxCapacity` 属性,才是真正表示最大容量。 那么,`maxLength` 属性有什么用?表示**占用** `memory` 的最大容量( 而不是 PooledByteBuf 对象的最大容量 )。在写入数据超过 `maxLength` 容量时,会进行扩容,但是容量的上限,为 `maxCapacity` 。 ------ `#capacity(int newCapacity)` 方法,调整容量大小。在这个过程中,根据情况,可能对 `memory` 扩容或缩容。代码如下: ``` 1: @Override 2: public final ByteBuf capacity(int newCapacity) { 3: // 校验新的容量,不能超过最大容量 4: checkNewCapacity(newCapacity); 5: 6: // Chunk 内存,非池化 7: // If the request capacity does not require reallocation, just update the length of the memory. 8: if (chunk.unpooled) { 9: if (newCapacity == length) { // 相等,无需扩容 / 缩容 10: return this; 11: } 12: // Chunk 内存,是池化 13: } else { 14: // 扩容 15: if (newCapacity > length) { 16: if (newCapacity <= maxLength) { 17: length = newCapacity; 18: return this; 19: } 20: // 缩容 21: } else if (newCapacity < length) { 22: // 大于 maxLength 的一半 23: if (newCapacity > maxLength >>> 1) { 24: if (maxLength <= 512) { 25: // 因为 Netty SubPage 最小是 16 ,如果小于等 16 ,无法缩容。 26: if (newCapacity > maxLength - 16) { 27: length = newCapacity; 28: // 设置读写索引,避免超过最大容量 29: setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity)); 30: return this; 31: } 32: } else { // > 512 (i.e. >= 1024) 33: length = newCapacity; 34: // 设置读写索引,避免超过最大容量 35: setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity)); 36: return this; 37: } 38: } 39: // 相等,无需扩容 / 缩容 40: } else { 41: return this; 42: } 43: } 44: 45: // 重新分配新的内存空间,并将数据复制到其中。并且,释放老的内存空间。 46: // Reallocation required. 47: chunk.arena.reallocate(this, newCapacity, true); 48: return this; 49: } ``` - 第 4 行:调用 `AbstractByteBuf#checkNewCapacity(int newCapacity)` 方法,校验新的容量,不能超过最大容量。代码如下: ``` protected final void checkNewCapacity(int newCapacity) { ensureAccessible(); if (newCapacity < 0 || newCapacity > maxCapacity()) { throw new IllegalArgumentException("newCapacity: " + newCapacity + " (expected: 0-" + maxCapacity() + ')'); } } ``` - 第 6 至 11 行:对于基于 **unPoolooled** 的 PoolChunk 对象,除非容量不变,否则会扩容或缩容,即【第 47 行】的代码。为什么呢?在 `#initUnpooled(PoolChunk chunk, int length)` 方法中,我们可以看到,`maxLength` 和 `length` 是相等的,所以大于或小于时,需要进行扩容或缩容。 - 第 13 行:对于基于 poolooled 的 PoolChunk 对象,需要根据情况: - 第 39 至 42 行:容量未变,不进行扩容。类似【第 9 至 11 行】的代码。 - 第 14 至 19 行:新容量**大于**当前容量,但是小于 `memory` 最大容量,仅仅修改当前容量,无需进行扩容。否则,第【第 47 行】的代码,进行**扩容**。 - 第 20 至 38 行:新容量 小于 当前容量,但是不到 ``` memory ``` 最大容量的 一半 ,因为缩容 相对 释放不多,无需进行缩容。否则,第【第 47 行】的代码,进行 缩容 。 - 比较神奇的是【第 26 行】的 `newCapacity > maxLength - 16` 代码块。 笔者的理解是,Netty SubPage **最小**是 16 B ,如果小于等 16 ,无法缩容。 - 第 47 行:调用 `PoolArena#reallocate(PooledByteBuf buf, int newCapacity, boolean freeOldMemory)` 方法,**重新分配**新的内存空间,并将数据**复制**到其中。并且,**释放**老的内存空间。详细解析,见 [《TODO 1013 Chunk》](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 中。 ### 2.1.5 order `#order()` 方法,返回字节序为 `ByteOrder.BIG_ENDIAN` 大端。代码如下: ``` @Override public final ByteOrder order() { return ByteOrder.BIG_ENDIAN; } ``` 统一**大端**模式。 > FROM [《深入浅出: 大小端模式》](https://www.bysocket.com/?p=615) > > 在网络上传输数据时,由于数据传输的两端对应不同的硬件平台,采用的存储字节顺序可能不一致。所以在 TCP/IP 协议规定了在网络上必须采用网络字节顺序,也就是大端模式。 ### 2.1.6 unwrap `#unwrap()` 方法,返回空,因为没有被装饰的 ByteBuffer 对象。代码如下: ``` @Override public final ByteBuf unwrap() { return null; } ``` ### 2.1.7 retainedSlice `#retainedSlice()` 方法,代码如下: ``` @Override public final ByteBuf retainedSlice() { final int index = readerIndex(); return retainedSlice(index, writerIndex() - index); } @Override public final ByteBuf retainedSlice(int index, int length) { return PooledSlicedByteBuf.newInstance(this, this, index, length); } ``` - 调用 `PooledSlicedByteBuf#newInstance(AbstractByteBuf unwrapped, ByteBuf wrapped, int index, int length)` 方法,创建**池化的** PooledSlicedByteBuf 对象。 - TODO 1016 派生类 ### 2.1.8 retainedDuplicate `#retainedDuplicate()` 方法,代码如下: ``` @Override public final ByteBuf retainedDuplicate() { return PooledDuplicatedByteBuf.newInstance(this, this, readerIndex(), writerIndex()); } ``` - 调用 `PooledSlicedByteBuf#newInstance(AbstractByteBuf unwrapped, ByteBuf wrapped, int readerIndex, int writerIndex)` 方法,创建**池化的** PooledDuplicatedByteBuf.newInstance 对象。 - TODO 1016 派生类 ### 2.1.9 internalNioBuffer `#internalNioBuffer()` 方法,获得临时 ByteBuf 对象( `tmpNioBuf` ) 。代码如下: ``` protected final ByteBuffer internalNioBuffer() { ByteBuffer tmpNioBuf = this.tmpNioBuf; // 为空,创建临时 ByteBuf 对象 if (tmpNioBuf == null) { this.tmpNioBuf = tmpNioBuf = newInternalNioBuffer(memory); } return tmpNioBuf; } ``` - 当 `tmpNioBuf` 属性为空时,调用 `#newInternalNioBuffer(T memory)` 方法,创建 ByteBuffer 对象。因为 `memory` 的类型不确定,所以该方法定义成**抽象方法**,由子类实现。代码如下: ``` protected abstract ByteBuffer newInternalNioBuffer(T memory); ``` ------ 为什么要有 `tmpNioBuf` 这个属性呢?以 PooledDirectByteBuf 举例子,代码如下: ``` @Override public int setBytes(int index, FileChannel in, long position, int length) throws IOException { checkIndex(index, length); // 获得临时 ByteBuf 对象 ByteBuffer tmpBuf = internalNioBuffer(); index = idx(index); tmpBuf.clear().position(index).limit(index + length); try { // 写入临时 ByteBuf 对象 return in.read(tmpBuf, position); } catch (ClosedChannelException ignored) { return -1; } } private int getBytes(int index, FileChannel out, long position, int length, boolean internal) throws IOException { checkIndex(index, length); if (length == 0) { return 0; } // 获得临时 ByteBuf 对象 ByteBuffer tmpBuf = internal ? internalNioBuffer() : memory.duplicate(); index = idx(index); tmpBuf.clear().position(index).limit(index + length); // 写入到 FileChannel 中 return out.write(tmpBuf, position); } ``` ### 2.1.10 deallocate `#deallocate()` 方法,当引用计数为 0 时,调用该方法,进行内存回收。代码如下: ``` @Override protected final void deallocate() { if (handle >= 0) { // 重置属性 final long handle = this.handle; this.handle = -1; memory = null; tmpNioBuf = null; // 释放内存回 Arena 中 chunk.arena.free(chunk, handle, maxLength, cache); chunk = null; // 回收对象 recycle(); } } private void recycle() { recyclerHandle.recycle(this); // 回收对象 } ``` ### 2.1.11 idx `#idx(int index)` 方法,获得指定位置在 `memory` 变量中的位置。代码如下: ``` protected final int idx(int index) { return offset + index; } ``` ## 2.2 PooledDirectByteBuf `io.netty.buffer.PooledDirectByteBuf` ,实现 PooledByteBuf 抽象类,基于 **ByteBuffer** 的**可重用** ByteBuf 实现类。所以,泛型 `T` 为 ByteBuffer ,即: ``` final class PooledDirectByteBuf extends PooledByteBuf ``` ### 2.2.1 构造方法 ``` private PooledDirectByteBuf(Recycler.Handle recyclerHandle, int maxCapacity) { super(recyclerHandle, maxCapacity); } ``` ### 2.2.2 newInstance `#newInstance(int maxCapacity)` **静态**方法,“创建” PooledDirectByteBuf 对象。代码如下: ``` /** * Recycler 对象 */ private static final Recycler RECYCLER = new Recycler() { @Override protected PooledDirectByteBuf newObject(Handle handle) { return new PooledDirectByteBuf(handle, 0); // 真正创建 PooledDirectByteBuf 对象 } }; static PooledDirectByteBuf newInstance(int maxCapacity) { // 从 Recycler 的对象池中获得 PooledDirectByteBuf 对象 PooledDirectByteBuf buf = RECYCLER.get(); // 重置 PooledDirectByteBuf 的属性 buf.reuse(maxCapacity); return buf; } ``` ### 2.2.3 newInternalNioBuffer `#newInternalNioBuffer(ByteBuffer memory)` 方法,获得临时 ByteBuf 对象( `tmpNioBuf` ) 。代码如下: ``` @Override protected ByteBuffer newInternalNioBuffer(ByteBuffer memory) { return memory.duplicate(); } ``` - 调用 `ByteBuffer#duplicate()` 方法,复制一个 ByteBuffer 对象,**共享**里面的数据。 ### 2.2.4 isDirect `#isDirect()` 方法,获得内部类型是否为 Direct ,返回 `true` 。代码如下: ``` @Override public boolean isDirect() { return true; } ``` ### 2.2.5 读取 / 写入操作 老样子,我们以 Int 类型为例子,来看看它的读取和写入操作的实现代码。代码如下: ``` @Override protected int _getInt(int index) { return memory.getInt(idx(index)); } @Override protected void _setInt(int index, int value) { memory.putInt(idx(index), value); } ``` ### 2.2.6 copy `#copy(int index, int length)` 方法,复制指定范围的数据到新创建的 Direct ByteBuf 对象。代码如下: ``` @Override public ByteBuf copy(int index, int length) { // 校验索引 checkIndex(index, length); // 创建一个 Direct ByteBuf 对象 ByteBuf copy = alloc().directBuffer(length, maxCapacity()); // 写入数据 copy.writeBytes(this, index, length); return copy; } ``` ### 2.2.7 转换 NIO ByteBuffer 操作 #### 2.2.7.1 nioBufferCount `#nioBufferCount()` 方法,返回 ByteBuf 包含 ByteBuffer 数量为 **1** 。代码如下: ``` @Override public int nioBufferCount() { return 1; } ``` #### 2.2.7.2 nioBuffer `#nioBuffer(int index, int length)` 方法,返回 ByteBuf **指定范围**包含的 ByteBuffer 对象( **共享** )。代码如下: ``` @Override public ByteBuffer nioBuffer(int index, int length) { checkIndex(index, length); // memory 中的开始位置 index = idx(index); // duplicate 复制一个 ByteBuffer 对象,共享数据 // position + limit 设置位置和大小限制 // slice 创建 [position, limit] 子缓冲区,共享数据 return ((ByteBuffer) memory.duplicate().position(index).limit(index + length)).slice(); } ``` - 代码比较简单,看具体注释。 #### 2.2.7.3 nioBuffers `#nioBuffers(int index, int length)` 方法,返回 ByteBuf **指定范围**内包含的 ByteBuffer 数组( **共享** )。代码如下: ``` @Override public ByteBuffer[] nioBuffers(int index, int length) { return new ByteBuffer[] { nioBuffer(index, length) }; } ``` - 在 `#nioBuffer(int index, int length)` 方法的基础上,创建大小为 1 的 ByteBuffer 数组。 #### 2.2.7.4 internalNioBuffer `#internalNioBuffer(int index, int length)` 方法,返回 ByteBuf **指定范围**内的 ByteBuffer 对象( **共享** )。代码如下: ``` @Override public ByteBuffer internalNioBuffer(int index, int length) { checkIndex(index, length); // memory 中的开始位置 index = idx(index); // clear 标记清空(不会清理数据) // position + limit 设置位置和大小限制 return (ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length); } ``` - 代码比较简单,看具体注释。 - 因为是基于 `tmpNioBuf` 属性实现,所以方法在命名上,以 `"internal"` 打头。 ### 2.2.8 Heap 相关方法 不支持 Heap 相关方法。代码如下: ``` @Override public boolean hasArray() { return false; } @Override public byte[] array() { throw new UnsupportedOperationException("direct buffer"); } @Override public int arrayOffset() { throw new UnsupportedOperationException("direct buffer"); } ``` ### 2.2.9 Unsafe 相关方法 不支持 Unsafe 相关方法。代码如下: ``` @Override public boolean hasMemoryAddress() { return false; } @Override public long memoryAddress() { throw new UnsupportedOperationException(); } ``` ## 2.3 PooledHeapByteBuf `io.netty.buffer.PooledHeapByteBuf` ,实现 PooledByteBuf 抽象类,基于 **ByteBuffer** 的**可重用** ByteBuf 实现类。所以,泛型 `T` 为 `byte[]` ,即: ``` class PooledHeapByteBuf extends PooledByteBuf { ``` ### 2.3.1 构造方法 和 [「2.2.1 构造方法」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 相同。 ### 2.3.2 newInstance 和 [「2.2.2 newInstance」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 相同。 ### 2.3.3 newInternalNioBuffer `#newInternalNioBuffer(byte[] memory)` 方法,获得临时 ByteBuf 对象( `tmpNioBuf` ) 。代码如下: ``` @Override protected final ByteBuffer newInternalNioBuffer(byte[] memory) { return ByteBuffer.wrap(memory); } ``` - 调用 `ByteBuffer#wrap(byte[] array)` 方法,创建 ByteBuffer 对象。注意,返回的是 HeapByteBuffer 对象。 ### 2.3.4 isDirect `#isDirect()` 方法,获得内部类型是否为 Direct ,返回 `false` 。代码如下: ``` @Override public boolean isDirect() { return false; } ``` ### 2.3.5 读取 / 写入操作 老样子,我们以 Int 类型为例子,来看看它的读取和写入操作的实现代码。 ① **读取**操作: ``` @Override protected int _getInt(int index) { return HeapByteBufUtil.getInt(memory, idx(index)); } // HeapByteBufUtil.java static int getInt(byte[] memory, int index) { return (memory[index] & 0xff) << 24 | (memory[index + 1] & 0xff) << 16 | (memory[index + 2] & 0xff) << 8 | memory[index + 3] & 0xff; } ``` ② **写入**操作: ``` @Override protected void _setInt(int index, int value) { HeapByteBufUtil.setInt(memory, idx(index), value); } // HeapByteBufUtil.java static void setInt(byte[] memory, int index, int value) { memory[index] = (byte) (value >>> 24); memory[index + 1] = (byte) (value >>> 16); memory[index + 2] = (byte) (value >>> 8); memory[index + 3] = (byte) value; } ``` ### 2.3.6 copy `#copy(int index, int length)` 方法,复制指定范围的数据到新创建的 Heap ByteBuf 对象。代码如下: ``` @Override public ByteBuf copy(int index, int length) { // 校验索引 checkIndex(index, length); // 创建一个 Heap ByteBuf 对象 ByteBuf copy = alloc().heapBuffer(length, maxCapacity()); // 写入数据 copy.writeBytes(this, index, length); return copy; } ``` 和 PooledDirectByteBuf [「2.2.6 copy」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 的差异在于,创建的是 **Heap** ByteBuf 对象。 ### 2.3.7 转换 NIO ByteBuffer 操作 #### 2.3.7.1 nioBufferCount 和 [「2.2.7.1 nioBufferCount」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 一致。 #### 2.3.7.2 nioBuffer `#nioBuffer(int index, int length)` 方法,返回 ByteBuf **指定范围**包含的 ByteBuffer 对象( **共享** )。代码如下: ``` @Override public final ByteBuffer nioBuffer(int index, int length) { checkIndex(index, length); // memory 中的开始位置 index = idx(index); // 创建 ByteBuffer 对象 ByteBuffer buf = ByteBuffer.wrap(memory, index, length); // slice 创建 [position, limit] 子缓冲区 return buf.slice(); } ``` - 代码比较简单,看具体注释。 #### 2.3.7.3 nioBuffers 和 [「2.2.7.3 nioBuffers」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 一致。 #### 2.3.7.4 internalNioBuffer 和 [「2.2.7.4 nioBuffers」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 一致。 ### 2.3.8 Heap 相关方法 ``` @Override public final boolean hasArray() { return true; } @Override public final byte[] array() { ensureAccessible(); return memory; } @Override public final int arrayOffset() { return offset; } ``` ### 2.3.8 Unsafe 相关方法 和 [「2.2.9 Unsafe 相关方法」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 一致。 ## 2.4 PooledUnsafeDirectByteBuf > 老艿艿:它是 [「2.2 PooledDirectByteBuf」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 对应的基于 Unsafe 版本的实现类。 `io.netty.buffer.PooledUnsafeDirectByteBuf` ,实现 PooledByteBuf 抽象类,基于 **ByteBuffer** + **Unsafe** 的**可重用** ByteBuf 实现类。所以,泛型 `T` 为 `ByteBuffer` ,即: ``` final class PooledUnsafeDirectByteBuf extends PooledByteBuf ``` ### 2.4.1 构造方法 和 [「2.2.1 构造方法」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 相同。 ### 2.4.2 newInstance 和 [「2.2.2 newInstance」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 相同。 ### 2.4.3 初始化 PooledUnsafeDirectByteBuf 重写了初始化相关的方法,代码如下: ``` @Override void init(PoolChunk chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) { // 调用父初始化方法 super.init(chunk, handle, offset, length, maxLength, cache); // 初始化内存地址 initMemoryAddress(); // <1> } @Override void initUnpooled(PoolChunk chunk, int length) { // 调用父初始化方法 super.initUnpooled(chunk, length); // 初始化内存地址 initMemoryAddress(); // <2> } ``` - 在 `<1>` 处,增加调用 `#initMemoryAddress()` 方法,初始化内存地址。代码如下: ``` /** * 内存地址 */ private long memoryAddress; private void initMemoryAddress() { memoryAddress = PlatformDependent.directBufferAddress(memory) + offset; // <2> } ``` - 调用 `PlatformDependent#directBufferAddress(ByteBuffer buffer)` 方法,获得 ByteBuffer 对象的起始内存地址。代码如下: ``` // PlatformDependent.java public static long directBufferAddress(ByteBuffer buffer) { return PlatformDependent0.directBufferAddress(buffer); } // PlatformDependent0.java static final Unsafe UNSAFE; static long directBufferAddress(ByteBuffer buffer) { return getLong(buffer, ADDRESS_FIELD_OFFSET); } private static long getLong(Object object, long fieldOffset) { return UNSAFE.getLong(object, fieldOffset); } ``` - 对于 Unsafe 类不熟悉的胖友,可以看看 [《Java Unsafe 类》](https://blog.csdn.net/zhxdick/article/details/52003123) - 注意,`<2>` 处的代码,已经将 `offset` 添加到 `memoryAddress` 中。所以在 `#addr(int index)` 方法中,求指定位置( `index` ) 在内存地址的顺序,不用再添加。代码如下: ``` private long addr(int index) { return memoryAddress + index; } ``` - x ### 2.4.4 newInternalNioBuffer 和 [「2.2.3 newInternalNioBuffer」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 相同。 ### 2.4.5 isDirect 和 [「2.2.4 isDirect」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 相同。 ### 2.4.6 读取 / 写入操作 老样子,我们以 Int 类型为例子,来看看它的读取和写入操作的实现代码。 ① **读取**操作: ``` @Override protected int _getInt(int index) { return UnsafeByteBufUtil.getInt(addr(index)); } // UnsafeByteBufUtil.java static int getInt(long address) { if (UNALIGNED) { int v = PlatformDependent.getInt(address); return BIG_ENDIAN_NATIVE_ORDER ? v : Integer.reverseBytes(v); } return PlatformDependent.getByte(address) << 24 | (PlatformDependent.getByte(address + 1) & 0xff) << 16 | (PlatformDependent.getByte(address + 2) & 0xff) << 8 | PlatformDependent.getByte(address + 3) & 0xff; } // PlatformDependent.java public static int getInt(long address) { return PlatformDependent0.getInt(address); } // PlatformDependent0.java static int getInt(long address) { return UNSAFE.getInt(address); } ``` ② **写入**操作: ``` @Override protected void _setInt(int index, int value) { UnsafeByteBufUtil.setInt(addr(index), value); } // UnsafeByteBufUtil.java static void setInt(long address, int value) { if (UNALIGNED) { PlatformDependent.putInt(address, BIG_ENDIAN_NATIVE_ORDER ? value : Integer.reverseBytes(value)); } else { PlatformDependent.putByte(address, (byte) (value >>> 24)); PlatformDependent.putByte(address + 1, (byte) (value >>> 16)); PlatformDependent.putByte(address + 2, (byte) (value >>> 8)); PlatformDependent.putByte(address + 3, (byte) value); } } // PlatformDependent.java public static void putInt(long address, int value) { PlatformDependent0.putInt(address, value); } // PlatformDependent0.java static void putInt(long address, int value) { UNSAFE.putInt(address, value); } ``` ### 2.4.7 copy `#copy(int index, int length)` 方法,复制指定范围的数据到新创建的 Direct ByteBuf 对象。代码如下: ``` @Override public ByteBuf copy(int index, int length) { return UnsafeByteBufUtil.copy(this, addr(index), index, length); } // UnsafeByteBufUtil.java static ByteBuf copy(AbstractByteBuf buf, long addr, int index, int length) { buf.checkIndex(index, length); // 创建 Direct ByteBuffer 对象 ByteBuf copy = buf.alloc().directBuffer(length, buf.maxCapacity()); if (length != 0) { if (copy.hasMemoryAddress()) { // 使用 Unsafe 操作来复制 PlatformDependent.copyMemory(addr, copy.memoryAddress(), length); copy.setIndex(0, length); } else { copy.writeBytes(buf, index, length); } } return copy; } // PlatformDependent.java public static void copyMemory(long srcAddr, long dstAddr, long length) { PlatformDependent0.copyMemory(srcAddr, dstAddr, length); } // PlatformDependent0.java static void copyMemory(long srcAddr, long dstAddr, long length) { //UNSAFE.copyMemory(srcAddr, dstAddr, length); while (length > 0) { long size = Math.min(length, UNSAFE_COPY_THRESHOLD); UNSAFE.copyMemory(srcAddr, dstAddr, size); length -= size; srcAddr += size; dstAddr += size; } } ``` ### 2.4.8 转换 NIO ByteBuffer 操作 #### 2.4.8.1 nioBufferCount 和 [「2.2.7.1 nioBufferCount」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 一致。 #### 2.4.8.2 nioBuffer 和 [「2.2.7.2 nioBuffer」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 一致。 #### 2.4.8.3 nioBuffers 和 [「2.2.7.3 nioBuffers」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 一致。 #### 2.4.8.4 internalNioBuffer 和 [「2.2.7.4 internalNioBuffer」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 一致。 ### 2.4.9 Heap 相关方法 不支持 Heap 相关方法。 ### 2.4.10 Unsafe 相关方法。 ``` @Override public boolean hasMemoryAddress() { return true; } @Override public long memoryAddress() { ensureAccessible(); return memoryAddress; } ``` ### 2.4.11 newSwappedByteBuf > `#newSwappedByteBuf()` 方法的**重写**,是 Unsafe 类型独有的。 `#newSwappedByteBuf()` 方法,创建 SwappedByteBuf 对象。代码如下: ``` @Override protected SwappedByteBuf newSwappedByteBuf() { if (PlatformDependent.isUnaligned()) { // 支持 // Only use if unaligned access is supported otherwise there is no gain. return new UnsafeDirectSwappedByteBuf(this); } return super.newSwappedByteBuf(); } ``` - 对于 Linux 环境下,一般是支持 unaligned access( 对齐访问 ),所以返回的是 UnsafeDirectSwappedByteBuf 对象。详细解析,见 [《TODO 1016 派生类》](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 。 - 为什么要对齐访问呢?可看 [《什么是字节对齐,为什么要对齐?》](https://www.zhihu.com/question/23791224) 。有趣。 ## 2.5 PooledUnsafeHeapByteBuf `io.netty.buffer.PooledUnsafeHeapByteBuf` ,实现 PooledHeapByteBuf 类,在 [「2.3 PooledHeapByteBuf」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 的基础上,基于 **Unsafe** 的**可重用** ByteBuf 实现类。所以,泛型 `T` 为 `byte[]` ,即: ``` final class PooledUnsafeHeapByteBuf extends PooledHeapByteBuf ``` 也因此,PooledUnsafeHeapByteBuf 需要实现的方法,灰常少。 ### 2.5.1 构造方法 和 [「2.2.1 构造方法」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 相同。 ### 2.5.2 newInstance 和 [「2.2.2 newInstance」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 相同。 ### 2.5.3 读取 / 写入操作 老样子,我们以 Int 类型为例子,来看看它的读取和写入操作的实现代码。 ① **读取**操作: ``` @Override protected int _getInt(int index) { return UnsafeByteBufUtil.getInt(memory, idx(index)); } // UnsafeByteBufUtil.java static int getInt(byte[] array, int index) { if (UNALIGNED) { int v = PlatformDependent.getInt(array, index); return BIG_ENDIAN_NATIVE_ORDER ? v : Integer.reverseBytes(v); } return PlatformDependent.getByte(array, index) << 24 | (PlatformDependent.getByte(array, index + 1) & 0xff) << 16 | (PlatformDependent.getByte(array, index + 2) & 0xff) << 8 | PlatformDependent.getByte(array, index + 3) & 0xff; } // PlatformDependent.java public static int getInt(byte[] data, int index) { return PlatformDependent0.getInt(data, index); } // PlatformDependent0.java static int getInt(byte[] data, int index) { return UNSAFE.getInt(data, BYTE_ARRAY_BASE_OFFSET + index); } ``` - 基于 Unsafe 操作 `byte[]` 数组。 ② **写入**操作: ``` @Override protected void _setInt(int index, int value) { UnsafeByteBufUtil.setInt(memory, idx(index), value); } // UnsafeByteBufUtil.java static void setInt(byte[] array, int index, int value) { if (UNALIGNED) { PlatformDependent.putInt(array, index, BIG_ENDIAN_NATIVE_ORDER ? value : Integer.reverseBytes(value)); } else { PlatformDependent.putByte(array, index, (byte) (value >>> 24)); PlatformDependent.putByte(array, index + 1, (byte) (value >>> 16)); PlatformDependent.putByte(array, index + 2, (byte) (value >>> 8)); PlatformDependent.putByte(array, index + 3, (byte) value); } } // PlatformDependent.java public static void putInt(byte[] data, int index, int value) { PlatformDependent0.putInt(data, index, value); } // PlatformDependent0.java static void putInt(byte[] data, int index, int value) { UNSAFE.putInt(data, BYTE_ARRAY_BASE_OFFSET + index, value); } ``` ### 2.5.4 newSwappedByteBuf > `#newSwappedByteBuf()` 方法的**重写**,是 Unsafe 类型独有的。 `#newSwappedByteBuf()` 方法,创建 SwappedByteBuf 对象。代码如下: ``` @Override @Deprecated protected SwappedByteBuf newSwappedByteBuf() { if (PlatformDependent.isUnaligned()) { // Only use if unaligned access is supported otherwise there is no gain. return new UnsafeHeapSwappedByteBuf(this); } return super.newSwappedByteBuf(); } ``` - 对于 Linux 环境下,一般是支持 unaligned access( 对齐访问 ),所以返回的是 UnsafeHeapSwappedByteBuf 对象。详细解析,见 [《TODO 1016 派生类》](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 。 # 3. UnpooledByteBuf 😈 不存在 UnpooledByteBuf 这样一个类,主要是为了**聚合**所有 Unpooled 类型的 ByteBuf 实现类。 ## 3.1 UnpooledDirectByteBuf `io.netty.buffer.UnpooledDirectByteBuf` ,实现 AbstractReferenceCountedByteBuf 抽象类,对应 [「2.2 PooledDirectByteBuf」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 的**非池化** ByteBuf 实现类。 ### 3.1.1 构造方法 ``` /** * ByteBuf 分配器对象 */ private final ByteBufAllocator alloc; /** * 数据 ByteBuffer 对象 */ private ByteBuffer buffer; /** * 临时 ByteBuffer 对象 */ private ByteBuffer tmpNioBuf; /** * 容量 */ private int capacity; /** * 是否需要释放 <1> * * 如果 {@link #buffer} 从外部传入,则需要进行释放,即 {@link #UnpooledDirectByteBuf(ByteBufAllocator, ByteBuffer, int)} 构造方法。 */ private boolean doNotFree; public UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { // 设置最大容量 super(maxCapacity); if (alloc == null) { throw new NullPointerException("alloc"); } if (initialCapacity < 0) { throw new IllegalArgumentException("initialCapacity: " + initialCapacity); } if (maxCapacity < 0) { throw new IllegalArgumentException("maxCapacity: " + maxCapacity); } if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format("initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; // 创建 Direct ByteBuffer 对象 // 设置数据 ByteBuffer 对象 setByteBuffer(ByteBuffer.allocateDirect(initialCapacity)); } protected UnpooledDirectByteBuf(ByteBufAllocator alloc, ByteBuffer initialBuffer, int maxCapacity) { // 设置最大容量 super(maxCapacity); if (alloc == null) { throw new NullPointerException("alloc"); } if (initialBuffer == null) { throw new NullPointerException("initialBuffer"); } if (!initialBuffer.isDirect()) { // 必须是 Direct throw new IllegalArgumentException("initialBuffer is not a direct buffer."); } if (initialBuffer.isReadOnly()) { // 必须可写 throw new IllegalArgumentException("initialBuffer is a read-only buffer."); } // 获得剩余可读字节数,作为初始容量大小 <2> int initialCapacity = initialBuffer.remaining(); if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; // 标记为 true 。因为 initialBuffer 是从外部传递进来,释放的工作,不交给当前 UnpooledDirectByteBuf 对象。 doNotFree = true; <1> // slice 切片 // 设置数据 ByteBuffer 对象 setByteBuffer(initialBuffer.slice().order(ByteOrder.BIG_ENDIAN)); // 设置写索引 <2> writerIndex(initialCapacity); } ``` - 代码比较简单,主要要理解下 `<1>` 和 `<2>` 两处。 - 调用 `#allocateDirect(int initialCapacity)` 方法,创建 Direct ByteBuffer 对象。代码如下: ``` protected ByteBuffer allocateDirect(int initialCapacity) { return ByteBuffer.allocateDirect(initialCapacity); } ``` - 调用 `#setByteBuffer(ByteBuffer buffer)` 方法,设置数据 ByteBuffer 对象。如果有老的**自己的**( 指的是自己创建的 ) `buffer` 对象,需要进行释放。代码如下: ``` private void setByteBuffer(ByteBuffer buffer) { ByteBuffer oldBuffer = this.buffer; if (oldBuffer != null) { // 标记为 false 。因为设置的 ByteBuffer 对象,是 UnpooledDirectByteBuf 自己创建的 if (doNotFree) { doNotFree = false; } else { // 释放老的 buffer 对象 freeDirect(oldBuffer); // <3> } } // 设置 buffer this.buffer = buffer; // 重置 tmpNioBuf 为 null tmpNioBuf = null; // 设置容量 capacity = buffer.remaining(); } ``` - `<3>` 处,调用 `#freeDirect(ByteBuffer buffer)` 方法,释放**老的** `buffer` 对象。详细解析,见 [「3.1.3 deallocate」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 。 ### 3.1.2 capacity `#capacity()` 方法,获得容量。代码如下: ``` @Override public int capacity() { return capacity; } ``` ------ `#capacity(int newCapacity)` 方法,调整容量大小。在这个过程中,根据情况,可能对 `buffer` 扩容或缩容。代码如下: ``` @SuppressWarnings("Duplicates") @Override public ByteBuf capacity(int newCapacity) { // 校验新的容量,不能超过最大容量 checkNewCapacity(newCapacity); int readerIndex = readerIndex(); int writerIndex = writerIndex(); int oldCapacity = capacity; // 扩容 if (newCapacity > oldCapacity) { ByteBuffer oldBuffer = buffer; // 创建新的 Direct ByteBuffer 对象 ByteBuffer newBuffer = allocateDirect(newCapacity); // 复制数据到新的 buffer 对象 oldBuffer.position(0).limit(oldBuffer.capacity()); newBuffer.position(0).limit(oldBuffer.capacity()); newBuffer.put(oldBuffer); newBuffer.clear(); // 因为读取和写入,使用 readerIndex 和 writerIndex ,所以没关系。 // 设置新的 buffer 对象,并根据条件释放老的 buffer 对象 setByteBuffer(newBuffer); // 缩容 } else if (newCapacity < oldCapacity) { ByteBuffer oldBuffer = buffer; // 创建新的 Direct ByteBuffer 对象 ByteBuffer newBuffer = allocateDirect(newCapacity); if (readerIndex < newCapacity) { // 如果写索引超过新容量,需要重置下,设置为最大容量。否则就越界了。 if (writerIndex > newCapacity) { writerIndex(writerIndex = newCapacity); } // 复制数据到新的 buffer 对象 oldBuffer.position(readerIndex).limit(writerIndex); newBuffer.position(readerIndex).limit(writerIndex); newBuffer.put(oldBuffer); newBuffer.clear(); // 因为读取和写入,使用 readerIndex 和 writerIndex ,所以没关系。 } else { // 因为读索引超过新容量,所以写索引超过新容量 // 如果读写索引都超过新容量,需要重置下,都设置为最大容量。否则就越界了。 setIndex(newCapacity, newCapacity); // 这里要注意下,老的数据,相当于不进行复制,因为已经读取完了。 } // 设置新的 buffer 对象,并根据条件释放老的 buffer 对象 setByteBuffer(newBuffer); } return this; } ``` - 虽然代码比较长,实际很简单。胖友自己耐心看下注释进行理解下噢。 ### 3.1.3 deallocate `#deallocate()` 方法,当引用计数为 0 时,调用该方法,进行内存回收。代码如下: ``` @Override protected void deallocate() { ByteBuffer buffer = this.buffer; if (buffer == null) { return; } // 置空 buffer 属性 this.buffer = null; // 释放 buffer 对象 if (!doNotFree) { freeDirect(buffer); } } ``` - `#freeDirect(ByteBuffer buffer)` 方法,释放 `buffer` 对象。代码如下: ``` protected void freeArray(byte[] array) { PlatformDependent.freeDirectBuffer(buffer); } // PlatformDependent.java private static final Cleaner NOOP = new Cleaner() { ... } public static void freeDirectBuffer(ByteBuffer buffer) { CLEANER.freeDirectBuffer(buffer); } ``` - 通过调用 `io.netty.util.internal.Cleaner#freeDirectBuffer(ByteBuffer buffer)` 方法,释放 Direct ByteBuffer 对象。因为 Java 的版本不同,调用的方法,所以 Cleaner 有两个 实现类: - `io.netty.util.internal.CleanerJava9` ,适用于 Java9+ 的版本,通过反射调用 DirectByteBuffer 对象的 `#invokeCleaner()` 方法,进行释放。 - `io.netty.util.internal.CleanerJava6` ,适用于 Java6+ 的版本,通过反射获得 DirectByteBuffer 对象的 `#cleaner()` 方法,从而调用 `sun.misc.Cleaner#clean()` 方法,进行释放。 - 虽然实现略有不同,但是原理是一致的。感兴趣的胖友,自己看下 CleanerJava9 和 CleanerJava6 的实现代码。 ### 3.1.4 其它方法 其他方法,和 [「2.2 PooledDirectByteBuf」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 基本一致。 ## 3.2 UnpooledHeapByteBuf `io.netty.buffer.UnpooledHeapByteBuf` ,实现 AbstractReferenceCountedByteBuf 抽象类,对应 [「2.3 PooledHeapByteBuf」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 的**非池化** ByteBuf 实现类。 ### 3.2.1 构造方法 ``` /** * ByteBuf 分配器对象 */ private final ByteBufAllocator alloc; /** * 字节数组 */ byte[] array; /** * 临时 ByteBuff 对象 */ private ByteBuffer tmpNioBuf; public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { // 设置最大容量 super(maxCapacity); checkNotNull(alloc, "alloc"); if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; // 创建并设置字节数组 setArray(allocateArray(initialCapacity)); // 设置读写索引 setIndex(0, 0); } protected UnpooledHeapByteBuf(ByteBufAllocator alloc, byte[] initialArray, int maxCapacity) { // 设置最大容量 super(maxCapacity); checkNotNull(alloc, "alloc"); checkNotNull(initialArray, "initialArray"); if (initialArray.length > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialArray.length, maxCapacity)); } this.alloc = alloc; // 设置字节数组 setArray(initialArray); // 设置读写索引 setIndex(0, initialArray.length); } ``` - 第一、二个构造方法的区别,后者字节数组是否从方法参数( `initialArray` )传递进来。 - 调用 `#allocateArray(int initialCapacity)` 方法,创建字节数组。 ``` protected byte[] allocateArray(int initialCapacity) { return new byte[initialCapacity]; } ``` - 调用 `#setArray(byte[] initialArray)` 方法,设置 `array` 属性。代码如下: ``` private void setArray(byte[] initialArray) { array = initialArray; tmpNioBuf = null; } ``` ### 3.2.2 capacity `#capacity()` 方法,获得容量。代码如下: ```Java @Override public int capacity() { return array.length; } ``` - 使用字节数组的大小,作为当前容量上限。 ------ `#capacity(int newCapacity)` 方法,调整容量大小。在这个过程中,根据情况,可能对 `array` 扩容或缩容。代码如下: ``` @Override public ByteBuf capacity(int newCapacity) { // // 校验新的容量,不能超过最大容量 checkNewCapacity(newCapacity); int oldCapacity = array.length; byte[] oldArray = array; // 扩容 if (newCapacity > oldCapacity) { // 创建新数组 byte[] newArray = allocateArray(newCapacity); // 复制【全部】数据到新数组 System.arraycopy(oldArray, 0, newArray, 0, oldArray.length); // 设置数组 setArray(newArray); // 释放老数组 freeArray(oldArray); // 缩容 } else if (newCapacity < oldCapacity) { // 创建新数组 byte[] newArray = allocateArray(newCapacity); int readerIndex = readerIndex(); if (readerIndex < newCapacity) { // 如果写索引超过新容量,需要重置下,设置为最大容量。否则就越界了。 int writerIndex = writerIndex(); if (writerIndex > newCapacity) { writerIndex(writerIndex = newCapacity); } // 只复制【读取段】数据到新数组 System.arraycopy(oldArray, readerIndex, newArray, readerIndex, writerIndex - readerIndex); } else { // 因为读索引超过新容量,所以写索引超过新容量 // 如果读写索引都超过新容量,需要重置下,都设置为最大容量。否则就越界了。 setIndex(newCapacity, newCapacity); // 这里要注意下,老的数据,相当于不进行复制,因为已经读取完了。 } // 设置数组 setArray(newArray); // 释放老数组 freeArray(oldArray); } return this; } ``` - 虽然代码比较长,实际很简单。胖友自己耐心看下注释进行理解下噢。😈 和 [「3.1.2 capacity」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 基本一直的。 ### 3.2.3 deallocate `#deallocate()` 方法,当引用计数为 0 时,调用该方法,进行内存回收。代码如下: ``` @Override protected void deallocate() { // 释放老数组 freeArray(array); // 设置为空字节数组 array = EmptyArrays.EMPTY_BYTES; } ``` - `#freeArray(byte[] array)` 方法,释放数组。代码如下: ``` protected void freeArray(byte[] array) { // NOOP } ``` - 字节数组,无引用后,自然就会被 GC 回收。 ### 3.2.4 其它方法 其它方法,和 [「2.3 PooledHeapByteBuf」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 基本一致。 ## 3.3 UnpooledUnsafeDirectByteBuf `io.netty.buffer.UnpooledUnsafeDirectByteBuf` ,实现 AbstractReferenceCountedByteBuf 抽象类,对应 `「2.4 PooledUnsafeDirectByteBuf」` 的**非池化** ByteBuf 实现类。 - 构造方法、`#capacity(...)` 方法、`#deallocate()` 方法,和 [「3.1 PooledDirectByteBuf」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 基本一致。 - 其它方法,和 [「2.4 PooledUnsafeDirectByteBuf」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 基本一致。 另外,UnpooledUnsafeDirectByteBuf 有一个子类 UnpooledUnsafeNoCleanerDirectByteBuf ,用于 `netty-microbench` 模块,进行基准测试。感兴趣的胖友,可以自己看看。 ## 3.4 UnpooledUnsafeHeapByteBuf `io.netty.buffer.UnpooledUnsafeHeapByteBuf` ,实现 AbstractReferenceCountedByteBuf 抽象类,对应 `「2.5 PooledUnsafeHeapByteBuf」` 的**非池化** ByteBuf 实现类。 - 构造方法、`#capacity(...)` 方法、`#deallocate()` 方法,和 [「3.2 PooledHeapByteBuf」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 基本一致。 - 其它方法,和 [「2.5 PooledUnsafeHeapByteBuf」](https://svip.iocoder.cn/Netty/ByteBuf-1-2-ByteBuf-core-impl/#) 基本一致。 ## 3.5 ThreadLocal ByteBuf > 老艿艿:这是本文的拓展内容。 虽然 UnpooledByteBuf 不基于**对象池**实现,但是考虑到 NIO Direct ByteBuffer 申请的成本是比较高的,所以基于 ThreadLocal + Recycler 实现重用。 `ByteBufUtil#threadLocalDirectBuffer()` 方法,创建 ThreadLocal ByteBuf 对象。代码如下: ``` private static final int THREAD_LOCAL_BUFFER_SIZE; static { THREAD_LOCAL_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 0); } /** * Returns a cached thread-local direct buffer, if available. * * @return a cached thread-local direct buffer, if available. {@code null} otherwise. */ public static ByteBuf threadLocalDirectBuffer() { if (THREAD_LOCAL_BUFFER_SIZE <= 0) { return null; } if (PlatformDependent.hasUnsafe()) { return ThreadLocalUnsafeDirectByteBuf.newInstance(); } else { return ThreadLocalDirectByteBuf.newInstance(); } } ``` - `THREAD_LOCAL_BUFFER_SIZE` **静态**属性,通过 `"io.netty.threadLocalDirectBufferSize"` 配置,默认为 0 。也就是说,实际 `#threadLocalDirectBuffer()` 方法,返回 `null` ,不开启 ThreadLocal ByteBuf 的功能。 - 根据是否支持 Unsafe 操作,创建 ThreadLocalUnsafeDirectByteBuf 或 ThreadLocalDirectByteBuf 对象。 ### 3.5.1 ThreadLocalUnsafeDirectByteBuf ThreadLocalUnsafeDirectByteBuf ,在 ByteBufUtil 的**内部静态类**,继承 UnpooledUnsafeDirectByteBuf 类。代码如下: ``` static final class ThreadLocalUnsafeDirectByteBuf extends UnpooledUnsafeDirectByteBuf { /** * Recycler 对象 */ private static final Recycler RECYCLER = new Recycler() { @Override protected ThreadLocalUnsafeDirectByteBuf newObject(Handle handle) { return new ThreadLocalUnsafeDirectByteBuf(handle); } }; static ThreadLocalUnsafeDirectByteBuf newInstance() { // 从 RECYCLER 中,获得 ThreadLocalUnsafeDirectByteBuf 对象 ThreadLocalUnsafeDirectByteBuf buf = RECYCLER.get(); // 初始化 ref 为 1 buf.setRefCnt(1); return buf; } /** * Recycler 处理器 */ private final Handle handle; private ThreadLocalUnsafeDirectByteBuf(Handle handle) { super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE); this.handle = handle; } @Override protected void deallocate() { if (capacity() > THREAD_LOCAL_BUFFER_SIZE) { // <1> // 释放 super.deallocate(); } else { // 清空 clear(); // 回收对象 handle.recycle(this); } } } ``` - 在 `<1>` 处,我们可以看到,只有 ByteBuffer 容量小于 `THREAD_LOCAL_BUFFER_SIZE` 时,才会重用 ByteBuffer 对象。 ### 3.5.2 ThreadLocalDirectByteBuf ThreadLocalUnsafeDirectByteBuf ,在 ByteBufUtil 的**内部静态类**,继承 UnpooledDirectByteBuf 类。代码如下: ``` static final class ThreadLocalDirectByteBuf extends UnpooledDirectByteBuf { /** * Recycler 对象 */ private static final Recycler RECYCLER = new Recycler() { @Override protected ThreadLocalDirectByteBuf newObject(Handle handle) { return new ThreadLocalDirectByteBuf(handle); } }; static ThreadLocalDirectByteBuf newInstance() { // 从 RECYCLER 中,获得 ThreadLocalUnsafeDirectByteBuf 对象 ThreadLocalDirectByteBuf buf = RECYCLER.get(); // 初始化 ref 为 1 buf.setRefCnt(1); return buf; } /** * Recycler 处理器 */ private final Handle handle; private ThreadLocalDirectByteBuf(Handle handle) { super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE); this.handle = handle; } @Override protected void deallocate() { if (capacity() > THREAD_LOCAL_BUFFER_SIZE) { // 释放 super.deallocate(); } else { // 清理 clear(); // 回收 handle.recycle(this); } } } ``` ## 3.6 WrappedUnpooledUnsafeDirectByteBuf > 老艿艿:这是本文的拓展内容。 `io.netty.buffer.WrappedUnpooledUnsafeDirectByteBuf` ,继承 UnpooledUnsafeDirectByteBuf 类,基于 `memoryAddress` 内存地址,创建 Direct ByteBuf 对象。代码如下: ``` final class WrappedUnpooledUnsafeDirectByteBuf extends UnpooledUnsafeDirectByteBuf { // 基于 memoryAddress 内存地址,创建 Direct ByteBuf 对象 WrappedUnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, long memoryAddress, int size, boolean doFree) { super(alloc, PlatformDependent.directBuffer(memoryAddress, size) /** 创建 Direct ByteBuf 对象 **/, size, doFree); } @Override protected void freeDirect(ByteBuffer buffer) { PlatformDependent.freeMemory(memoryAddress); } } ``` > FROM [《Netty源码分析(一) ByteBuf》](https://www.jianshu.com/p/b833254908f7) > > 创建一个指定内存地址的UnpooledUnsafeDirectByteBuf,该内存块可能已被写入数据以减少一步拷贝操作。 # 666. 彩蛋 每次这种 N 多实现类的源码解析,写到 60% 的时候,就特别头疼。不是因为难写,是因为基本是组合排列,不断在啰嗦啰嗦啰嗦的感觉。 嗯嗯,如果有地方写的错乱,烦请指出。默默再 review 几遍。 ------ 推荐阅读文章: - HryReal [《PooledByteBuf源码分析》](https://blog.csdn.net/qq_33394088/article/details/72763305) - 江南白衣 [《Netty之Java堆外内存扫盲贴》](http://calvin1978.blogcn.com/articles/directbytebuffer.html)