# 精尽 Netty 源码解析 —— Buffer 之 Jemalloc(六)PoolThreadCache # 1. 概述 在 [《精尽 Netty 源码解析 —— Buffer 之 Jemalloc(五)PoolArena》](http://svip.iocoder.cn/Netty/ByteBuf-3-5-Jemalloc-Arena) 一文中,我们看到 PoolArena 在分配( `#allocate(...)` )和释放( `#free(...)` )内存的过程中,无可避免会出现 `synchronized` 的身影。虽然锁的粒度不是很大,但是如果一个 PoolArena 如果被**多个**线程引用,带来的线程锁的同步和竞争。并且,如果在锁竞争的过程中,申请 Direct ByteBuffer ,那么带来的线程等待就可能是**几百毫秒**的时间。 那么该如何解决呢?如下图红框所示: > FROM [《jemalloc源码解析-内存管理》](http://brionas.github.io/2015/01/31/jemalloc源码解析-内存管理/) > > [![大体结构](47-Netty 源码解析-Buffer 之 Jemalloc(六)PoolThreadCache.assets/01.png)](http://static.iocoder.cn/images/Netty/2018_09_16/01.png)大体结构 给**每个**线程引入其**独有**的 tcache 线程缓存。 - 在释放已分配的内存块时,不放回到 Chunk 中,而是缓存到 tcache 中。 - 在分配内存块时,优先从 tcache 获取。无法获取到,再从 Chunk 中分配。 通过这样的方式,尽可能的避免多线程的同步和竞争。 # 2. PoolThreadCache `io.netty.buffer.PoolThreadCache` ,Netty 对 Jemalloc tcache 的实现类,内存分配的线程缓存。 ## 2.1 构造方法 ``` /** * 对应的 Heap PoolArena 对象 */ final PoolArena heapArena; /** * 对应的 Direct PoolArena 对象 */ final PoolArena directArena; // Hold the caches for the different size classes, which are tiny, small and normal. /** * Heap 类型的 tiny Subpage 内存块缓存数组 */ private final MemoryRegionCache[] tinySubPageHeapCaches; /** * Heap 类型的 small Subpage 内存块缓存数组 */ private final MemoryRegionCache[] smallSubPageHeapCaches; /** * Heap 类型的 normal 内存块缓存数组 */ private final MemoryRegionCache[] normalHeapCaches; /** * Direct 类型的 tiny Subpage 内存块缓存数组 */ private final MemoryRegionCache[] tinySubPageDirectCaches; /** * Direct 类型的 small Subpage 内存块缓存数组 */ private final MemoryRegionCache[] smallSubPageDirectCaches; /** * Direct 类型的 normal 内存块缓存数组 */ private final MemoryRegionCache[] normalDirectCaches; // Used for bitshifting when calculate the index of normal caches later /** * 用于计算请求分配的 normal 类型的内存块,在 {@link #normalDirectCaches} 数组中的位置 * * 默认为 log2(pageSize) = log2(8192) = 13 */ private final int numShiftsNormalDirect; /** * 用于计算请求分配的 normal 类型的内存块,在 {@link #normalHeapCaches} 数组中的位置 * * 默认为 log2(pageSize) = log2(8192) = 13 */ private final int numShiftsNormalHeap; /** * 分配次数 */ private int allocations; /** * {@link #allocations} 到达该阀值,释放缓存 * * 默认为 8192 次 * * @see #free() */ private final int freeSweepAllocationThreshold; 1: PoolThreadCache(PoolArena heapArena, PoolArena directArena, 2: int tinyCacheSize, int smallCacheSize, int normalCacheSize, 3: int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { 4: if (maxCachedBufferCapacity < 0) { 5: throw new IllegalArgumentException("maxCachedBufferCapacity: " 6: + maxCachedBufferCapacity + " (expected: >= 0)"); 7: } 8: this.freeSweepAllocationThreshold = freeSweepAllocationThreshold; 9: this.heapArena = heapArena; 10: this.directArena = directArena; 11: 12: // 初始化 Direct 类型的内存块缓存 13: if (directArena != null) { 14: // 创建 tinySubPageDirectCaches 15: tinySubPageDirectCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); 16: // 创建 smallSubPageDirectCaches 17: smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small); 18: 19: // 计算 numShiftsNormalDirect 20: numShiftsNormalDirect = log2(directArena.pageSize); 21: // 创建 normalDirectCaches 22: normalDirectCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, directArena); 23: 24: // 增加 directArena 的线程引用计数 25: directArena.numThreadCaches.getAndIncrement(); 26: } else { 27: // No directArea is configured so just null out all caches 28: tinySubPageDirectCaches = null; 29: smallSubPageDirectCaches = null; 30: normalDirectCaches = null; 31: numShiftsNormalDirect = -1; 32: } 33: // 初始化 Heap 类型的内存块缓存。同上面部分。 34: if (heapArena != null) { 35: // Create the caches for the heap allocations 36: tinySubPageHeapCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); 37: smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small); 38: 39: numShiftsNormalHeap = log2(heapArena.pageSize); 40: normalHeapCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, heapArena); 41: 42: heapArena.numThreadCaches.getAndIncrement(); 43: } else { 44: // No heapArea is configured so just null out all caches 45: tinySubPageHeapCaches = null; 46: smallSubPageHeapCaches = null; 47: normalHeapCaches = null; 48: numShiftsNormalHeap = -1; 49: } 50: 51: // 校验参数,保证 PoolThreadCache 可缓存内存块。 52: // Only check if there are caches in use. 53: if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null 54: || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null) 55: && freeSweepAllocationThreshold < 1) { 56: throw new IllegalArgumentException("freeSweepAllocationThreshold: " + freeSweepAllocationThreshold + " (expected: > 0)"); 57: } 58: } ``` - 虽然代码比较多,主要分为 Heap 和 Direct 两种内存。 - Direct 相关 - `directArena` 属性,对应的 Heap PoolArena 对象。 - ``` tinySubPageDirectCaches ``` 属性,Direct 类型的 tiny Subpage 内存块缓存数组。 - 默认情况下,数组大小为 512 。 - 在【第 15 行】的代码,调用 `#createSubPageCaches(int cacheSize, int numCaches, SizeClass sizeClass)` 方法,创建 MemoryRegionCache 数组。详细解析,见 [「2.2 createSubPageCaches」](https://svip.iocoder.cn/Netty/ByteBuf-3-6-Jemalloc-ThreadCache/#) 。 - ``` smallSubPageDirectCaches ``` 属性,Direct 类型的 small Subpage 内存块缓存数组。 - 默认情况下,数组大小为 256 。 - 在【第 17 行】的代码,调用 `#createSubPageCaches(int cacheSize, int numCaches, SizeClass sizeClass)` 方法,创建 MemoryRegionCache 数组。详细解析,见 [「2.2 createSubPageCaches」](https://svip.iocoder.cn/Netty/ByteBuf-3-6-Jemalloc-ThreadCache/#) 。 - ``` normalDirectCaches ``` 属性,Direct 类型的 normal Page 内存块缓存数组。 - 默认情况下,数组大小为 64 。 - 在【第 22 行】的代码,调用 `#createNormalCaches(int cacheSize, int maxCachedBufferCapacity, PoolArena area)` 方法,创建 MemoryRegionCache 数组。详细解析,见 [「2.3 createNormalCaches」](https://svip.iocoder.cn/Netty/ByteBuf-3-6-Jemalloc-ThreadCache/#) 。 - ``` numShiftsNormalDirect ``` 属性,用于计算请求分配的 normal 类型的内存块,在 ``` normalDirectCaches ``` 数组中的位置。 - 默认情况下,数值为 13 。 - 在【第 20 行】的代码,调用 `#log2(int pageSize)` 方法, `log2(pageSize) = log2(8192) = 13` 。 - 在【第 25 行】的代码,增加 `directArena` 的线程引用计数。通过这样的方式,我们能够知道,一个 PoolArena 对象,被多少线程所引用。 - Heap 相关,和【Direct 相关】基本**类似**。 - ``` allocations ``` 属性,分配次数计数器。每次分配时,该计数器 + 1 。 - `freeSweepAllocationThreshold` 属性,当 `allocations` 到达该阀值时,调用 `#free()` 方法,释放缓存。同时,会重置 `allocations` 计数器为 0 。 ## 2.2 createSubPageCaches `#createSubPageCaches(int cacheSize, int numCaches, SizeClass sizeClass)` 方法,创建 Subpage 内存块缓存数组。代码如下: ``` // tiny 类型,默认 cacheSize = PooledByteBufAllocator.DEFAULT_TINY_CACHE_SIZE = 512 , numCaches = PoolArena.numTinySubpagePools = 512 >>> 4 = 32 // small 类型,默认 cacheSize = PooledByteBufAllocator.DEFAULT_SMALL_CACHE_SIZE = 256 , numCaches = pageSize - 9 = 13 - 9 = 4 private static MemoryRegionCache[] createSubPageCaches(int cacheSize, int numCaches, SizeClass sizeClass) { if (cacheSize > 0 && numCaches > 0) { @SuppressWarnings("unchecked") MemoryRegionCache[] cache = new MemoryRegionCache[numCaches]; for (int i = 0; i < cache.length; i++) { // TODO: maybe use cacheSize / cache.length cache[i] = new SubPageMemoryRegionCache(cacheSize, sizeClass); } return cache; } else { return null; } } ``` - 创建的 Subpage 内存块缓存数组,实际和 ``` PoolArena.tinySubpagePools ``` 和 ``` PoolArena.smallSubpagePools ``` 数组 大小保持一致 。从而实现,相同大小的内存,能对应相同的数组下标。 - `sizeClass` = `tiny` 时, 默认 `cacheSize` = `PooledByteBufAllocator.DEFAULT_TINY_CACHE_SIZE = 512` , `numCaches` = `PoolArena.numTinySubpagePools = 512 >>> 4 = 32` 。 - `sizeClass` = `small` 时,默认 `cacheSize` = `PooledByteBufAllocator.DEFAULT_SMALL_CACHE_SIZE = 256` , `numCaches` = `pageSize - 9 = 13 - 9 = 4` 。 - 创建的数组,每个元素的类型为 SubPageMemoryRegionCache 。详细解析,见 [「3.X.1 SubPageMemoryRegionCache」](https://svip.iocoder.cn/Netty/ByteBuf-3-6-Jemalloc-ThreadCache/#) 。 ## 2.3 createNormalCaches `#createSubPageCaches(int cacheSize, int numCaches, SizeClass sizeClass)` 方法,创建 Normal Page 内存块缓存数组。代码如下: ``` // normal 类型,默认 cacheSize = PooledByteBufAllocator.DEFAULT_NORMAL_CACHE_SIZE = 64 , maxCachedBufferCapacity = PoolArena.DEFAULT_MAX_CACHED_BUFFER_CAPACITY = 32 * 1024 = 32KB private static MemoryRegionCache[] createNormalCaches(int cacheSize, int maxCachedBufferCapacity, PoolArena area) { if (cacheSize > 0 && maxCachedBufferCapacity > 0) { // <1> 计算数组大小 int max = Math.min(area.chunkSize, maxCachedBufferCapacity); int arraySize = Math.max(1, log2(max / area.pageSize) + 1); @SuppressWarnings("unchecked") MemoryRegionCache[] cache = new MemoryRegionCache[arraySize]; for (int i = 0; i < cache.length; i++) { cache[i] = new NormalMemoryRegionCache(cacheSize); } return cache; } else { return null; } } ``` - `maxCachedBufferCapacity` 属性,缓存的 Normal 内存块的最大容量,避免过大的 Normal 内存块被缓存,占用过多通过。默认情况下,`maxCachedBufferCapacity = PoolArena.DEFAULT_MAX_CACHED_BUFFER_CAPACITY = 32 * 1024 = 32KB` 。也就说,在 `<1>` 处,`arraySize` 的计算**数组大小**的结果为 3 。刚好是 `cache[0] = 8KB`、`cache[1] = 16KB`、`cache[2] = 32KB` 。那么,如果申请的 Normal 内存块大小为 `64KB` ,超过了数组大小,所以无法被缓存。😈 是不是和原先自己认为的 `maxCachedBufferCapacity` 实现最大容量的想法,有点不同。 - 创建的数组,每个元素的类型为 SubPageMemoryRegionCache 。详细解析,见 [「3.X.2 NormalMemoryRegionCache」](https://svip.iocoder.cn/Netty/ByteBuf-3-6-Jemalloc-ThreadCache/#) 。 ## 2.4 cache ``` private MemoryRegionCache cacheForTiny(PoolArena area, int normCapacity) { // 获得数组下标 int idx = PoolArena.tinyIdx(normCapacity); if (area.isDirect()) { return cache(tinySubPageDirectCaches, idx); } return cache(tinySubPageHeapCaches, idx); } private MemoryRegionCache cacheForSmall(PoolArena area, int normCapacity) { // 获得数组下标 int idx = PoolArena.smallIdx(normCapacity); if (area.isDirect()) { return cache(smallSubPageDirectCaches, idx); } return cache(smallSubPageHeapCaches, idx); } private MemoryRegionCache cacheForNormal(PoolArena area, int normCapacity) { if (area.isDirect()) { // 获得数组下标 int idx = log2(normCapacity >> numShiftsNormalDirect); return cache(normalDirectCaches, idx); } // 获得数组下标 int idx = log2(normCapacity >> numShiftsNormalHeap); return cache(normalHeapCaches, idx); } ``` - 三个方法,分别获取内存容量对应所在的 MemoryRegionCache 对象。通过调用 `#cache(MemoryRegionCache[] cache, int idx)` 方法,代码如下: ``` private static MemoryRegionCache cache(MemoryRegionCache[] cache, int idx) { // 不在范围内,说明不缓存该容量的内存块 if (cache == null || idx > cache.length - 1) { return null; } // 获得 MemoryRegionCache 对象 return cache[idx]; } ``` ------ 当然,考虑到使用便利,封装了 `#cache(PoolArena area, int normCapacity, SizeClass sizeClass)` 方法,支持获取对应内存类型的 MemoryRegionCache 对象。代码如下: ``` private MemoryRegionCache cache(PoolArena area, int normCapacity, SizeClass sizeClass) { switch (sizeClass) { case Normal: return cacheForNormal(area, normCapacity); case Small: return cacheForSmall(area, normCapacity); case Tiny: return cacheForTiny(area, normCapacity); default: throw new Error(); } } ``` ## 2.5 add `#add(PoolArena area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass)` 方法,添加内存块到 PoolThreadCache 的指定 MemoryRegionCache 的队列中,进行缓存。并且,返回是否添加成功。代码如下: ``` /** * Add {@link PoolChunk} and {@code handle} to the cache if there is enough room. * Returns {@code true} if it fit into the cache {@code false} otherwise. */ @SuppressWarnings({ "unchecked", "rawtypes" }) boolean add(PoolArena area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) { // 获得对应的 MemoryRegionCache 对象 MemoryRegionCache cache = cache(area, normCapacity, sizeClass); if (cache == null) { return false; } // 添加到 MemoryRegionCache 内存块中 return cache.add(chunk, handle); } ``` - 代码比较简单,胖友自己看注释。 - 在 `PoolArea#free(PoolChunk chunk, long handle, int normCapacity, PoolThreadCache cache)` 中,调用该方法。所以,可以结合 [《精尽 Netty 源码解析 —— Buffer 之 Jemalloc(五)PoolArena》](http://svip.iocoder.cn/Netty/ByteBuf-3-5-Jemalloc-Arena) 的 [「2.6 free」](https://svip.iocoder.cn/Netty/ByteBuf-3-6-Jemalloc-ThreadCache/#) 一起看看罗。 ## 2.6 allocate ``` /** * Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code false} otherwise */ boolean allocateTiny(PoolArena area, PooledByteBuf buf, int reqCapacity, int normCapacity) { return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity); } /** * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise */ boolean allocateSmall(PoolArena area, PooledByteBuf buf, int reqCapacity, int normCapacity) { return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity); } /** * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise */ boolean allocateNormal(PoolArena area, PooledByteBuf buf, int reqCapacity, int normCapacity) { return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity); } ``` - 三个方法,从缓存中分别获取不同容量大小的内存块,初始化到 PooledByteBuf 对象中。通过调用 `#allocate(MemoryRegionCache cache, PooledByteBuf buf, int reqCapacity)` 方法,代码如下: ``` 1: private boolean allocate(MemoryRegionCache cache, PooledByteBuf buf, int reqCapacity) { 2: if (cache == null) { 3: // no cache found so just return false here 4: return false; 5: } 6: // 分配内存块,并初始化到 MemoryRegionCache 中 7: boolean allocated = cache.allocate(buf, reqCapacity); 8: // 到达阀值,整理缓存 9: if (++ allocations >= freeSweepAllocationThreshold) { 10: allocations = 0; 11: trim(); 12: } 13: // 返回是否分配成功 14: return allocated; 15: } ``` - 第 7 行:调用 `MemoryRegionCache#allocate(buf, reqCapacity)` 方法,从缓存中分配内存块,并初始化到 MemoryRegionCache 中。 - 第 8 至 12 行:增加 `allocations` 计数。若到达阀值( `freeSweepAllocationThreshold` ),重置计数,并调用 `#trim()` 方法,整理缓存。详细解析,见 [「2.7 trim」](https://svip.iocoder.cn/Netty/ByteBuf-3-6-Jemalloc-ThreadCache/#) 。 - 第 14 行:返回是否分配成功。如果从缓存中分配失败,后续就从 PoolArena 中获取内存块。 ## 2.7 free `#trim()` 方法,整理缓存,释放使用**频度**较少的内存块缓存。代码如下: ``` private static int free(MemoryRegionCache cache) { if (cache == null) { return 0; } return cache.free(); } void trim() { trim(tinySubPageDirectCaches); trim(smallSubPageDirectCaches); trim(normalDirectCaches); trim(tinySubPageHeapCaches); trim(smallSubPageHeapCaches); trim(normalHeapCaches); } private static void trim(MemoryRegionCache[] caches) { if (caches == null) { return; } for (MemoryRegionCache c: caches) { trim(c); } } private static void trim(MemoryRegionCache cache) { if (cache == null) { return; } cache.trim(); } ``` - 会调用所有 MemoryRegionCache 的 `#trim()` 方法,整理每个内存块缓存。详细解析,见 [「3.6 trim」](https://svip.iocoder.cn/Netty/ByteBuf-3-6-Jemalloc-ThreadCache/#) 。 ## 2.8 finalize `#finalize()` 方法,对象销毁时,清空缓存等等。代码如下: ``` /// TODO: In the future when we move to Java9+ we should use java.lang.ref.Cleaner. @Override protected void finalize() throws Throwable { try { // <1> 调用父 finalize super.finalize(); } finally { // 清空缓存 free(); } } /** * Should be called if the Thread that uses this cache is about to exist to release resources out of the cache */ void free() { // <2> 清空缓存 int numFreed = free(tinySubPageDirectCaches) + free(smallSubPageDirectCaches) + free(normalDirectCaches) + free(tinySubPageHeapCaches) + free(smallSubPageHeapCaches) + free(normalHeapCaches); if (numFreed > 0 && logger.isDebugEnabled()) { logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed, Thread.currentThread().getName()); } // <3.1> 减小 directArena 的线程引用计数 if (directArena != null) { directArena.numThreadCaches.getAndDecrement(); } // <3.2> 减小 heapArena 的线程引用计数 if (heapArena != null) { heapArena.numThreadCaches.getAndDecrement(); } } private static int free(MemoryRegionCache[] caches) { if (caches == null) { return 0; } int numFreed = 0; for (MemoryRegionCache c: caches) { numFreed += free(c); } return numFreed; } ``` - 代码比较简单,胖友自己看。主要是 `<1>`、`<2>`、`<3.1>/<3.2>` 三个点。 # 3. MemoryRegionCache MemoryRegionCache ,是 PoolThreadCache 的内部静态类,**内存块缓存**。在其内部,有一个**队列**,存储缓存的内存块。如下图所示:[![MemoryRegionCache](47-Netty 源码解析-Buffer 之 Jemalloc(六)PoolThreadCache.assets/02.png)](http://static.iocoder.cn/images/Netty/2018_09_16/02.png)MemoryRegionCache ## 3.1 构造方法 ``` private abstract static class MemoryRegionCache { /** * {@link #queue} 队列大小 */ private final int size; /** * 队列。里面存储内存块 */ private final Queue> queue; /** * 内存类型 */ private final SizeClass sizeClass; /** * 分配次数计数器 */ private int allocations; MemoryRegionCache(int size, SizeClass sizeClass) { this.size = MathUtil.safeFindNextPositivePowerOfTwo(size); queue = PlatformDependent.newFixedMpscQueue(this.size); // <1> MPSC this.sizeClass = sizeClass; } // ... 省略其它方法 } ``` - `sizeClass` 属性,内存类型。 - `queue` 属性,队列,里面存储内存块。每个元素为 Entry 对象,对应一个内存块。代码如下: ``` static final class Entry { /** * Recycler 处理器,用于回收 Entry 对象 */ final Handle> recyclerHandle; /** * PoolChunk 对象 */ PoolChunk chunk; /** * 内存块在 {@link #chunk} 的位置 */ long handle = -1; Entry(Handle> recyclerHandle) { this.recyclerHandle = recyclerHandle; } void recycle() { // 置空 chunk = null; handle = -1; // 回收 Entry 对象 recyclerHandle.recycle(this); } } ``` - 通过 `chunk` 和 `handle` 属性,可以唯一确认一个内存块。 - `recyclerHandle` 属性,用于回收 Entry 对象,用于 `#recycle()` 方法中。 - `size` 属性,队列大小。 - `allocations` 属性,分配次数计数器。 - 在 ``` <1> ``` 处理,我们可以看到创建的 ``` queue ``` 属性,类型为 MPSC( Multiple Producer Single Consumer ) 队列,即 多个 生产者 单一 消费者。为什么使用 MPSC 队列呢? - 多个生产者,指的是多个线程,移除( 释放 )内存块出队列。 - 单个消费者,指的是单个线程,添加( 缓存 )内存块到队列。 ## 3.2 newEntry `#newEntry(PoolChunk chunk, long handle)` 方法,创建 Entry 对象。代码如下: ``` @SuppressWarnings("rawtypes") private static Entry newEntry(PoolChunk chunk, long handle) { // 从 Recycler 对象中,获得 Entry 对象 Entry entry = RECYCLER.get(); // 初始化属性 entry.chunk = chunk; entry.handle = handle; return entry; } @SuppressWarnings("rawtypes") private static final Recycler RECYCLER = new Recycler() { @SuppressWarnings("unchecked") @Override protected Entry newObject(Handle handle) { return new Entry(handle); // 创建 Entry 对象 } }; ``` ## 3.3 add `#add(PoolChunk chunk, long handle)` 方法,添加( 缓存 )内存块到队列,并返回是否添加成功。代码如下: ``` /** * Add to cache if not already full. */ @SuppressWarnings("unchecked") public final boolean add(PoolChunk chunk, long handle) { // 创建 Entry 对象 Entry entry = newEntry(chunk, handle); // 添加到队列 boolean queued = queue.offer(entry); // 若添加失败,说明队列已满,回收 Entry 对象 if (!queued) { // If it was not possible to cache the chunk, immediately recycle the entry entry.recycle(); } return queued; // 是否添加成功 } ``` ## 3.4 allocate `#allocate(PooledByteBuf buf, int reqCapacity)` 方法,从队列中获取缓存的内存块,初始化到 PooledByteBuf 对象中,并返回是否分配成功。代码如下: ``` /** * Allocate something out of the cache if possible and remove the entry from the cache. */ public final boolean allocate(PooledByteBuf buf, int reqCapacity) { // 获取并移除队列首个 Entry 对象 Entry entry = queue.poll(); // 获取失败,返回 false if (entry == null) { return false; } // <1> 初始化内存块到 PooledByteBuf 对象中 initBuf(entry.chunk, entry.handle, buf, reqCapacity); // 回收 Entry 对象 entry.recycle(); // 增加 allocations 计数。因为分配总是在相同线程,所以不需要考虑线程安全的问题 // allocations is not thread-safe which is fine as this is only called from the same thread all time. ++ allocations; return true; // 返回 true ,分配成功 } ``` - 代码比较简单,胖友自己看注释。 - 在 `<1>` 处,调用 `#initBuf(PoolChunk chunk, long handle, PooledByteBuf buf, int reqCapacity)` **抽象**方法,初始化内存块到 PooledByteBuf 对象中。代码如下: ``` /** * Init the {@link PooledByteBuf} using the provided chunk and handle with the capacity restrictions. */ protected abstract void initBuf(PoolChunk chunk, long handle, PooledByteBuf buf, int reqCapacity); ``` - 该**抽象**方法需要子类 SubPageMemoryRegionCache 和 NormalMemoryRegionCache 来实现。并且,这也是 MemoryRegionCache 的**唯一**的抽象方法。 ## 3.5 free `#free(...)` 方法,清除队列。代码如下: ``` /** * 清除队列中的全部 * * Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s. */ public final int free() { return free(Integer.MAX_VALUE); } // 清除队列中的指定数量元素 private int free(int max) { int numFreed = 0; for (; numFreed < max; numFreed++) { // 获取并移除首元素 Entry entry = queue.poll(); if (entry != null) { // 释放缓存的内存块回 Chunk 中 freeEntry(entry); <1> } else { // all cleared return numFreed; } } return numFreed; } ``` - 代码比较简单,胖友自己看注释。 - `<1>` 处, 释放缓存的内存块回 Chunk 中。代码如下: ``` private void freeEntry(Entry entry) { PoolChunk chunk = entry.chunk; long handle = entry.handle; // 回收 Entry 对象 // recycle now so PoolChunk can be GC'ed. entry.recycle(); // 释放缓存的内存块回 Chunk 中 chunk.arena.freeChunk(chunk, handle, sizeClass); } ``` ## 3.6 trim 这块当时没太看懂,后来读了 [《自顶向下深入分析Netty(十)–PoolThreadCache》](https://www.jianshu.com/p/9177b7dabd37) 文章后,看懂了 `#trim()` 方法。引用如下: > 在分配过程还有一个`trim()`方法,当分配操作达到一定阈值(Netty默认8192)时,没有被分配出去的缓存空间都要被释放,以防止内存泄漏,核心代码如下: ``` // 内部类MemoryRegionCache public final void trim() { // allocations 表示已经重新分配出去的ByteBuf个数 int free = size - allocations; allocations = 0; // 在一定阈值内还没被分配出去的空间将被释放 if (free > 0) { free(free); // 释放队列中的节点 } } ``` > 也就是说,期望一个 MemoryRegionCache **频繁**进行回收-分配,这样 `allocations` > `size` ,将不会释放队列中的任何一个节点表示的内存空间; > > 但如果长时间没有分配,则应该释放这一部分空间,防止内存占据过多。Tiny请求缓存512 个节点,由此可知当使用率超过 `512 / 8192 = 6.25%` 时就不会释放节点。 ## 3.X1 SubPageMemoryRegionCache SubPageMemoryRegionCache ,是 PoolThreadCache 的内部静态类,继承 MemoryRegionCache 抽象类,**Subpage** MemoryRegionCache 实现类。代码如下: ``` /** * Cache used for buffers which are backed by TINY or SMALL size. */ private static final class SubPageMemoryRegionCache extends MemoryRegionCache { SubPageMemoryRegionCache(int size, SizeClass sizeClass) { super(size, sizeClass); } @Override protected void initBuf(PoolChunk chunk, long handle, PooledByteBuf buf, int reqCapacity) { // 初始化 Subpage 内存块到 PooledByteBuf 对象中 chunk.initBufWithSubpage(buf, handle, reqCapacity); } } ``` ## 3.X2 NormalMemoryRegionCache NormalMemoryRegionCache ,是 PoolThreadCache 的内部静态类,继承 MemoryRegionCache 抽象类,**Page** MemoryRegionCache 实现类。代码如下: ``` /** * Cache used for buffers which are backed by NORMAL size. */ private static final class NormalMemoryRegionCache extends MemoryRegionCache { NormalMemoryRegionCache(int size) { super(size, SizeClass.Normal); } @Override protected void initBuf(PoolChunk chunk, long handle, PooledByteBuf buf, int reqCapacity) { // 初始化 Page 内存块到 PooledByteBuf 对象中 chunk.initBuf(buf, handle, reqCapacity); } } ``` # 666. 彩蛋 嘿嘿,比想象中简单蛮多的一篇文章。 推荐阅读文章: - Hypercube [《自顶向下深入分析Netty(十)–PoolThreadCache》](https://www.jianshu.com/p/9177b7dabd37)