code-learning/netty/40-Netty 源码解析-Buffer 之 ByteBufAllocator(二)UnpooledByteBufAllocator.md

505 lines
16 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 精尽 Netty 源码解析 —— Buffer 之 ByteBufAllocatorUnpooledByteBufAllocator
# 1. 概述
本文,我们来分享 UnpooledByteBufAllocator **普通**的 ByteBuf 的分配器,**不基于内存池**。
# 2. ByteBufAllocatorMetricProvider
`io.netty.buffer.ByteBufAllocatorMetricProvider` ByteBufAllocator Metric 提供者接口,**用于监控 ByteBuf 的 Heap 和 Direct 占用内存的情况**。代码如下:
```
public interface ByteBufAllocatorMetricProvider {
/**
* Returns a {@link ByteBufAllocatorMetric} for a {@link ByteBufAllocator}.
*/
ByteBufAllocatorMetric metric();
}
```
ByteBufAllocatorMetricProvider 有两个子类UnpooledByteBufAllocator 和 PooledByteBufAllocator 。
# 3. ByteBufAllocatorMetric
`io.netty.buffer.ByteBufAllocatorMetric` ByteBufAllocator Metric 接口。代码如下:
```
public interface ByteBufAllocatorMetric {
/**
* Returns the number of bytes of heap memory used by a {@link ByteBufAllocator} or {@code -1} if unknown.
*
* 已使用 Heap 占用内存大小
*/
long usedHeapMemory();
/**
* Returns the number of bytes of direct memory used by a {@link ByteBufAllocator} or {@code -1} if unknown.
*
* 已使用 Direct 占用内存大小
*/
long usedDirectMemory();
}
```
ByteBufAllocatorMetric 有两个子类UnpooledByteBufAllocatorMetric 和 PooledByteBufAllocatorMetric 。
## 3.1 UnpooledByteBufAllocatorMetric
UnpooledByteBufAllocatorMetric ,在 UnpooledByteBufAllocator 的**内部静态类**,实现 ByteBufAllocatorMetric 接口UnpooledByteBufAllocator Metric 实现类。代码如下:
```
/**
* Direct ByteBuf 占用内存大小
*/
final LongCounter directCounter = PlatformDependent.newLongCounter();
/**
* Heap ByteBuf 占用内存大小
*/
final LongCounter heapCounter = PlatformDependent.newLongCounter();
@Override
public long usedHeapMemory() {
return heapCounter.value();
}
@Override
public long usedDirectMemory() {
return directCounter.value();
}
```
- 比较简单,两个计数器。
- `PlatformDependent#newLongCounter()` 方法,获得 LongCounter 对象。代码如下:
```
/**
* Creates a new fastest {@link LongCounter} implementation for the current platform.
*/
public static LongCounter newLongCounter() {
if (javaVersion() >= 8) {
return new LongAdderCounter();
} else {
return new AtomicLongCounter();
}
}
```
- 也就是说JDK `>=8` 使用 `java.util.concurrent.atomic.LongAdder` JDK `<7` 使用 `java.util.concurrent.atomic.AtomicLong` 。相比来说Metric 写多读少,所以 LongAdder 比 AtomicLong 更合适。对比的解析,可以看看 [《Java并发计数器探秘》](https://www.cnkirito.moe/java-concurrent-counter/) 。
# 4. UnpooledByteBufAllocator
`io.netty.buffer.UnpooledByteBufAllocator` ,实现 ByteBufAllocatorMetricProvider 接口,继承 AbstractByteBufAllocator 抽象类,**普通**的 ByteBuf 的分配器,**不基于内存池**。
## 4.1 构造方法
```
/**
* Metric
*/
private final UnpooledByteBufAllocatorMetric metric = new UnpooledByteBufAllocatorMetric();
/**
* 是否禁用内存泄露检测功能
*/
private final boolean disableLeakDetector;
/**
* 不使用 `io.netty.util.internal.Cleaner` 释放 Direct ByteBuf
*
* @see UnpooledUnsafeNoCleanerDirectByteBuf
* @see InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf
*/
private final boolean noCleaner;
public UnpooledByteBufAllocator(boolean preferDirect) {
this(preferDirect, false);
}
public UnpooledByteBufAllocator(boolean preferDirect, boolean disableLeakDetector) {
this(preferDirect, disableLeakDetector, PlatformDependent.useDirectBufferNoCleaner() /** 返回 true **/ );
}
/**
* Create a new instance
*
* @param preferDirect {@code true} if {@link #buffer(int)} should try to allocate a direct buffer rather than
* a heap buffer
* @param disableLeakDetector {@code true} if the leak-detection should be disabled completely for this
* allocator. This can be useful if the user just want to depend on the GC to handle
* direct buffers when not explicit released.
* @param tryNoCleaner {@code true} if we should try to use {@link PlatformDependent#allocateDirectNoCleaner(int)}
* to allocate direct memory.
*/
public UnpooledByteBufAllocator(boolean preferDirect, boolean disableLeakDetector, boolean tryNoCleaner) {
super(preferDirect);
this.disableLeakDetector = disableLeakDetector;
noCleaner = tryNoCleaner && PlatformDependent.hasUnsafe() /** 返回 true **/
&& PlatformDependent.hasDirectBufferNoCleanerConstructor() /** 返回 true **/ ;
}
```
- `metric` 属性UnpooledByteBufAllocatorMetric 对象。
- ```
disableLeakDetector
```
属性,是否禁用内存泄露检测功能。
- 默认为 `false` 。
- ```
noCleaner
```
属性,是否不使用
```
io.netty.util.internal.Cleaner
```
来释放 Direct ByteBuf 。
- 默认为 `true` 。
- 详细解析,见 [「5.5 InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf」](https://svip.iocoder.cn/Netty/ByteBuf-2-2-ByteBufAllocator-unpooled/#) 。
## 4.2 newHeapBuffer
```
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
return PlatformDependent.hasUnsafe() ?
new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
```
- 创建的是以 `"Instrumented"` 的 Heap ByteBuf 对象,因为要结合 Metric 。详细解析,见 [「5. Instrumented ByteBuf」](https://svip.iocoder.cn/Netty/ByteBuf-2-2-ByteBufAllocator-unpooled/#) 。
## 4.3 newDirectBuffer
```
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
final ByteBuf buf;
if (PlatformDependent.hasUnsafe()) {
buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}
```
- 创建的是以 `"Instrumented"` 的 Heap ByteBuf 对象,因为要结合 Metric 。详细解析,见 [「5. Instrumented ByteBuf」](https://svip.iocoder.cn/Netty/ByteBuf-2-2-ByteBufAllocator-unpooled/#) 。
- 结合了 `disableLeakDetector` 属性。
## 4.4 compositeHeapBuffer
```
@Override
public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
CompositeByteBuf buf = new CompositeByteBuf(this, false, maxNumComponents);
return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}
```
- 结合了 `disableLeakDetector` 属性。
## 4.5 compositeDirectBuffer
```
@Override
public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
CompositeByteBuf buf = new CompositeByteBuf(this, true, maxNumComponents);
return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}
```
- 结合了 `disableLeakDetector` 属性。
## 4.6 isDirectBufferPooled
```
@Override
public boolean isDirectBufferPooled() {
return false;
}
```
## 4.7 Metric 相关操作方法
```
@Override
public ByteBufAllocatorMetric metric() {
return metric;
}
void incrementDirect(int amount) { // 增加 Direct
metric.directCounter.add(amount);
}
void decrementDirect(int amount) { // 减少 Direct
metric.directCounter.add(-amount);
}
void incrementHeap(int amount) { // 增加 Heap
metric.heapCounter.add(amount);
}
void decrementHeap(int amount) { // 减少 Heap
metric.heapCounter.add(-amount);
}
```
# 5. Instrumented ByteBuf
因为要和 Metric 结合,所以通过**继承**的方式,进行增强。
## 5.1 InstrumentedUnpooledUnsafeHeapByteBuf
**Instrumented**UnpooledUnsafeHeapByteBuf ,在 UnpooledByteBufAllocator 的**内部静态类**,继承 UnpooledUnsafeHeapByteBuf 类。代码如下:
```
private static final class InstrumentedUnpooledUnsafeHeapByteBuf extends UnpooledUnsafeHeapByteBuf {
InstrumentedUnpooledUnsafeHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
@Override
protected byte[] allocateArray(int initialCapacity) {
byte[] bytes = super.allocateArray(initialCapacity);
// Metric ++
((UnpooledByteBufAllocator) alloc()).incrementHeap(bytes.length);
return bytes;
}
@Override
protected void freeArray(byte[] array) {
int length = array.length;
super.freeArray(array);
// Metric --
((UnpooledByteBufAllocator) alloc()).decrementHeap(length);
}
}
```
- 在原先的基础上,调用 Metric 相应的增减操作方法,得以记录 Heap 占用内存的大小。
## 5.2 InstrumentedUnpooledHeapByteBuf
**Instrumented**UnpooledHeapByteBuf ,在 UnpooledByteBufAllocator 的**内部静态类**,继承 UnpooledHeapByteBuf 类。代码如下:
```
private static final class InstrumentedUnpooledHeapByteBuf extends UnpooledHeapByteBuf {
InstrumentedUnpooledHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
@Override
protected byte[] allocateArray(int initialCapacity) {
byte[] bytes = super.allocateArray(initialCapacity);
// Metric ++
((UnpooledByteBufAllocator) alloc()).incrementHeap(bytes.length);
return bytes;
}
@Override
protected void freeArray(byte[] array) {
int length = array.length;
super.freeArray(array);
// Metric --
((UnpooledByteBufAllocator) alloc()).decrementHeap(length);
}
}
```
- 在原先的基础上,调用 Metric 相应的增减操作方法,得以记录 Heap 占用内存的大小。
## 5.3 InstrumentedUnpooledUnsafeDirectByteBuf
**Instrumented**UnpooledUnsafeDirectByteBuf ,在 UnpooledByteBufAllocator 的**内部静态类**,继承 UnpooledUnsafeDirectByteBuf 类。代码如下:
```
private static final class InstrumentedUnpooledUnsafeDirectByteBuf extends UnpooledUnsafeDirectByteBuf {
InstrumentedUnpooledUnsafeDirectByteBuf(
UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
@Override
protected ByteBuffer allocateDirect(int initialCapacity) {
ByteBuffer buffer = super.allocateDirect(initialCapacity);
// Metric ++
((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity());
return buffer;
}
@Override
protected void freeDirect(ByteBuffer buffer) {
int capacity = buffer.capacity();
super.freeDirect(buffer);
// Metric --
((UnpooledByteBufAllocator) alloc()).decrementDirect(capacity);
}
}
```
- 在原先的基础上,调用 Metric 相应的增减操作方法,得以记录 Direct 占用内存的大小。
## 5.4 InstrumentedUnpooledDirectByteBuf
**Instrumented**UnpooledDirectByteBuf 的**内部静态类**,继承 UnpooledDirectByteBuf 类。代码如下:
```
private static final class InstrumentedUnpooledDirectByteBuf extends UnpooledDirectByteBuf {
InstrumentedUnpooledDirectByteBuf(
UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
@Override
protected ByteBuffer allocateDirect(int initialCapacity) {
ByteBuffer buffer = super.allocateDirect(initialCapacity);
// Metric ++
((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity());
return buffer;
}
@Override
protected void freeDirect(ByteBuffer buffer) {
int capacity = buffer.capacity();
super.freeDirect(buffer);
// Metric --
((UnpooledByteBufAllocator) alloc()).decrementDirect(capacity);
}
}
```
- 在原先的基础上,调用 Metric 相应的增减操作方法,得以记录 Direct 占用内存的大小。
## 5.5 InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf
**Instrumented**UnpooledDirectByteBuf 的**内部静态类**,继承 UnpooledUnsafeNoCleanerDirectByteBuf 类。代码如下:
```
private static final class InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf
extends UnpooledUnsafeNoCleanerDirectByteBuf {
InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(
UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
@Override
protected ByteBuffer allocateDirect(int initialCapacity) {
ByteBuffer buffer = super.allocateDirect(initialCapacity);
// Metric ++
((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity());
return buffer;
}
@Override
ByteBuffer reallocateDirect(ByteBuffer oldBuffer, int initialCapacity) {
int capacity = oldBuffer.capacity();
ByteBuffer buffer = super.reallocateDirect(oldBuffer, initialCapacity);
// Metric ++
((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity() - capacity);
return buffer;
}
@Override
protected void freeDirect(ByteBuffer buffer) {
int capacity = buffer.capacity();
super.freeDirect(buffer);
// Metric --
((UnpooledByteBufAllocator) alloc()).decrementDirect(capacity);
}
}
```
- 在原先的基础上,调用 Metric 相应的增减操作方法,得以记录 Heap 占用内存的大小。
### 5.5.1 UnpooledUnsafeNoCleanerDirectByteBuf
`io.netty.buffer.UnpooledUnsafeNoCleanerDirectByteBuf` ,继承 UnpooledUnsafeDirectByteBuf 类。代码如下:
```
class UnpooledUnsafeNoCleanerDirectByteBuf extends UnpooledUnsafeDirectByteBuf {
UnpooledUnsafeNoCleanerDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
@Override
protected ByteBuffer allocateDirect(int initialCapacity) {
// 反射,直接创建 ByteBuffer 对象。并且该对象不带 Cleaner 对象
return PlatformDependent.allocateDirectNoCleaner(initialCapacity);
}
ByteBuffer reallocateDirect(ByteBuffer oldBuffer, int initialCapacity) {
return PlatformDependent.reallocateDirectNoCleaner(oldBuffer, initialCapacity);
}
@Override
protected void freeDirect(ByteBuffer buffer) {
// 直接释放 ByteBuffer 对象
PlatformDependent.freeDirectNoCleaner(buffer);
}
@Override
public ByteBuf capacity(int newCapacity) {
checkNewCapacity(newCapacity);
int oldCapacity = capacity();
if (newCapacity == oldCapacity) {
return this;
}
// 重新分配 ByteBuf 对象
ByteBuffer newBuffer = reallocateDirect(buffer, newCapacity);
if (newCapacity < oldCapacity) {
if (readerIndex() < newCapacity) {
// 重置 writerIndex 为 newCapacity ,避免越界
if (writerIndex() > newCapacity) {
writerIndex(newCapacity);
}
} else {
// 重置 writerIndex 和 readerIndex 为 newCapacity ,避免越界
setIndex(newCapacity, newCapacity);
}
}
// 设置 ByteBuf 对象
setByteBuffer(newBuffer, false);
return this;
}
}
```
> FROM [《Netty源码分析 ByteBuf》](https://www.jianshu.com/p/b833254908f7)
>
> 和 UnpooledUnsafeDirectByteBuf 最大区别在于 UnpooledUnsafeNoCleanerDirectByteBuf 在 allocate的时候通过反射构造函数的方式创建DirectByteBuffer这样在DirectByteBuffer中没有对应的Cleaner函数(通过ByteBuffer.allocateDirect的方式会自动生成Cleaner函数Cleaner用于内存回收具体可以看源码)内存回收时UnpooledUnsafeDirectByteBuf通过调用DirectByteBuffer中的Cleaner函数回收而UnpooledUnsafeNoCleanerDirectByteBuf直接使用UNSAFE.freeMemory(address)释放内存地址。
# 666. 彩蛋
😈 小水文一篇铺垫铺垫你懂的