code-learning/netty/61-Netty 源码解析-Util 之 Recycler.md

394 lines
14 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 精尽 Netty 源码解析 —— Util 之 Recycler
笔者先把 Netty 主要的内容写完,所以关于 Recycler 的分享,先放在后续的计划里。
> 老艿艿:其实是因为,自己想去研究下 Service Mesh ,所以先简单收个小尾。
当然,良心如我,还是为对这块感兴趣的胖友,先准备好了一篇不错的文章:
- 沧行 [《Netty之Recycler》](https://www.jianshu.com/p/4eab8450560c)
为避免可能 [《Netty之Recycler》](https://www.jianshu.com/p/4eab8450560c) 被作者删除,笔者这里先复制一份作为备份。
# 666. 备份
Recycler用来实现对象池其中对应堆内存和直接内存的池化实现分别是PooledHeapByteBuf和PooledDirectByteBuf。Recycler主要提供了3个方法
- get():获取一个对象。
- recycle(T, Handle):回收一个对象T为对象泛型。
- newObject(Handle):当没有可用对象时创建对象的实现方法。
Recycler的UML图如下
[![img](61-Netty 源码解析-Util 之 Recycler.assets/jpeg.webp)](https://upload-images.jianshu.io/upload_images/3751588-916c6baab07fa863.png?imageMogr2/auto-orient/strip|imageView2/2/w/1000/format/jpeg)img
Recycler.png
Recycler关联了4个核心类
- DefaultHandle:对象的包装类在Recycler中缓存的对象都会包装成DefaultHandle类。
- Stack:存储本线程回收的对象。对象的获取和回收对应Stack的pop和push即获取对象时从Stack中pop出1个DefaultHandle回收对象时将对象包装成DefaultHandle push到Stack中。Stack会与线程绑定即每个用到Recycler的线程都会拥有1个Stack在该线程中获取对象都是在该线程的Stack中pop出一个可用对象。
- WeakOrderQueue:存储其它线程回收到本线程stack的对象当某个线程从Stack中获取不到对象时会从WeakOrderQueue中获取对象。每个线程的Stack拥有1个WeakOrderQueue链表链表每个节点对应1个其它线程的WeakOrderQueue其它线程回收到该Stack的对象就存储在这个WeakOrderQueue里。
- Link: WeakOrderQueue中包含1个Link链表回收对象存储在链表某个Link节点里当Link节点存储的回收对象满了时会新建1个Link放在Link链表尾。
整个Recycler回收对象存储结构如下图所示
[![img](61-Netty 源码解析-Util 之 Recycler.assets/jpeg-17199762893821.webp)](https://upload-images.jianshu.io/upload_images/3751588-63236f0c4e59328d.png?imageMogr2/auto-orient/strip|imageView2/2/w/799/format/jpeg)img
Recycler.png
下面分析下源码首先看下Recycler.recycle(T, Handle)方法用于回收1个对象
```
public final boolean recycle(T o, Handle handle) {
if (handle == NOOP_HANDLE) {
return false;
}
DefaultHandle h = (DefaultHandle) handle;
if (h.stack.parent != this) {
return false;
}
if (o != h.value) {
throw new IllegalArgumentException("o does not belong to handle");
}
h.recycle();
return true;
}
```
回收1个对象会调用该对象DefaultHandle.recycle()方法,如下:
```
public void recycle() {
stack.push(this);
}
```
回收1个对象DefaultHandle就是把该对象push到stack中。
```
void push(DefaultHandle item) {
Thread currentThread = Thread.currentThread();
if (thread == currentThread) {
// The current Thread is the thread that belongs to the Stack, we can try to push the object now.
/**
* 如果该stack就是本线程的stack那么直接把DefaultHandle放到该stack的数组里
*/
pushNow(item);
} else {
// The current Thread is not the one that belongs to the Stack, we need to signal that the push
// happens later.
/**
* 如果该stack不是本线程的stack那么把该DefaultHandle放到该stack的WeakOrderQueue中
*/
pushLater(item, currentThread);
}
}
```
这里分为两种情况当stack是当前线程对应的stack时执行pushNow(item)方法直接把对象放到该stack的DefaultHandle数组中如下
```
/**
* 直接把DefaultHandle放到stack的数组里如果数组满了那么扩展该数组为当前2倍大小
* @param item
*/
private void pushNow(DefaultHandle item) {
if ((item.recycleId | item.lastRecycledId) != 0) {
throw new IllegalStateException("recycled already");
}
item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
int size = this.size;
if (size >= maxCapacity || dropHandle(item)) {
// Hit the maximum capacity or should drop - drop the possibly youngest object.
return;
}
if (size == elements.length) {
elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
}
elements[size] = item;
this.size = size + 1;
}
```
当stack是其它线程的stack时执行pushLater(item, currentThread)方法将对象放到WeakOrderQueue中如下
```
private void pushLater(DefaultHandle item, Thread thread) {
/**
* Recycler有1个stack->WeakOrderQueue映射每个stack会映射到1个WeakOrderQueue这个WeakOrderQueue是该stack关联的其它线程WeakOrderQueue链表的head WeakOrderQueue。
* 当其它线程回收对象到该stack时会创建1个WeakOrderQueue中并加到stack的WeakOrderQueue链表中。
*/
Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
WeakOrderQueue queue = delayedRecycled.get(this);
if (queue == null) {
/**
* 如果delayedRecycled满了那么将1个伪造的WeakOrderQueueDUMMY放到delayedRecycled中并丢弃该对象DefaultHandle
*/
if (delayedRecycled.size() >= maxDelayedQueues) {
// Add a dummy queue so we know we should drop the object
delayedRecycled.put(this, WeakOrderQueue.DUMMY);
return;
}
// Check if we already reached the maximum number of delayed queues and if we can allocate at all.
/**
* 创建1个WeakOrderQueue
*/
if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
// drop object
return;
}
delayedRecycled.put(this, queue);
} else if (queue == WeakOrderQueue.DUMMY) {
// drop object
return;
}
/**
* 将对象放入到该stack对应的WeakOrderQueue中
*/
queue.add(item);
}
static WeakOrderQueue allocate(Stack<?> stack, Thread thread) {
// We allocated a Link so reserve the space
/**
* 如果该stack的可用共享空间还能再容下1个WeakOrderQueue那么创建1个WeakOrderQueue否则返回null
*/
return reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY)
? new WeakOrderQueue(stack, thread) : null;
}
```
WeakOrderQueue的构造函数如下WeakOrderQueue实现了多线程环境下回收对象的机制当由其它线程回收对象到stack时会为该stack创建1个WeakOrderQueue这些由其它线程创建的WeakOrderQueue会在该stack中按链表形式串联起来每次创建1个WeakOrderQueue会把该WeakOrderQueue作为该stack的head WeakOrderQueue
```
private WeakOrderQueue(Stack<?> stack, Thread thread) {
head = tail = new Link();
owner = new WeakReference<Thread>(thread);
/**
* 每次创建WeakOrderQueue时会更新WeakOrderQueue所属的stack的head为当前WeakOrderQueue 当前WeakOrderQueue的next为stack的之前head
* 这样把该stack的WeakOrderQueue通过链表串起来了当下次stack中没有可用对象需要从WeakOrderQueue中转移对象时从WeakOrderQueue链表的head进行scavenge转移到stack的对DefaultHandle数组。
*/
synchronized (stack) {
next = stack.head;
stack.head = this;
}
availableSharedCapacity = stack.availableSharedCapacity;
}
```
下面再看Recycler.get()方法:
```
public final T get() {
if (maxCapacity == 0) {
return newObject(NOOP_HANDLE);
}
Stack<T> stack = threadLocal.get();
DefaultHandle handle = stack.pop();
if (handle == null) {
handle = stack.newHandle();
handle.value = newObject(handle);
}
return (T) handle.value;
}
```
取出该线程对应的stack从stack中pop出1个DefaultHandle返回该DefaultHandle的真正对象。
下面看stack.pop()方法:
```
DefaultHandle pop() {
int size = this.size;
if (size == 0) {
if (!scavenge()) {
return null;
}
size = this.size;
}
size --;
DefaultHandle ret = elements[size];
elements[size] = null;
if (ret.lastRecycledId != ret.recycleId) {
throw new IllegalStateException("recycled multiple times");
}
ret.recycleId = 0;
ret.lastRecycledId = 0;
this.size = size;
return ret;
}
```
如果该stack的DefaultHandle数组中还有对象可用那么从该DefaultHandle数组中取出1个可用对象返回如果该DefaultHandle数组没有可用的对象了那么执行scavenge()方法将head WeakOrderQueue中的head Link中的DefaultHandle数组转移到stack的DefaultHandle数组scavenge方法如下
```
boolean scavenge() {
// continue an existing scavenge, if any
if (scavengeSome()) {
return true;
}
// reset our scavenge cursor
prev = null;
cursor = head;
return false;
}
```
具体执行了scavengeSome()方法清理WeakOrderQueue中部分DefaultHandle到stack每次尽可能清理head WeakOrderQueue的head Link的全部DefaultHandle如下
```
boolean scavengeSome() {
WeakOrderQueue cursor = this.cursor;
if (cursor == null) {
cursor = head;
if (cursor == null) {
return false;
}
}
boolean success = false;
WeakOrderQueue prev = this.prev;
do {
/**
* 将当前WeakOrderQueue的head Link的DefaultHandle数组转移到stack的DefaultHandle数组中
*/
if (cursor.transfer(this)) {
success = true;
break;
}
WeakOrderQueue next = cursor.next;
if (cursor.owner.get() == null) {
if (cursor.hasFinalData()) {
for (;;) {
if (cursor.transfer(this)) {
success = true;
} else {
break;
}
}
}
if (prev != null) {
prev.next = next;
}
} else {
prev = cursor;
}
cursor = next;
} while (cursor != null && !success);
this.prev = prev;
this.cursor = cursor;
return success;
}
```
WeakOrderQueue.transfer()方法如下将WeakOrderQueue的head Link中的DefaultHandle数组迁移到stack中
```
boolean transfer(Stack<?> dst) {
Link head = this.head;
if (head == null) {
return false;
}
/**
* 如果head Link的readIndex到达了Link的容量LINK_CAPACITY说明该Link已经被scavengge完了。
* 这时需要把下一个Link作为新的head Link。
*/
if (head.readIndex == LINK_CAPACITY) {
if (head.next == null) {
return false;
}
this.head = head = head.next;
}
final int srcStart = head.readIndex;
/**
* head Link的回收对象数组的最大位置
*/
int srcEnd = head.get();
/**
* head Link可以scavenge的DefaultHandle的数量
*/
final int srcSize = srcEnd - srcStart;
if (srcSize == 0) {
return false;
}
final int dstSize = dst.size;
/**
* 每次会尽可能scavenge整个head Link如果head Link的DefaultHandle数组能全部迁移到stack中stack的DefaultHandle数组预期容量
*/
final int expectedCapacity = dstSize + srcSize;
/**
* 如果预期容量大于stack的DefaultHandle数组最大长度说明本次无法将head Link的DefaultHandle数组全部迁移到stack中
*/
if (expectedCapacity > dst.elements.length) {
final int actualCapacity = dst.increaseCapacity(expectedCapacity);
srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
}
if (srcStart != srcEnd) {
/**
* head Link的DefaultHandle数组
*/
final DefaultHandle[] srcElems = head.elements;
/**
* stack的DefaultHandle数组
*/
final DefaultHandle[] dstElems = dst.elements;
int newDstSize = dstSize;
/**
* 迁移head Link的DefaultHandle数组到stack的DefaultHandle数组
*/
for (int i = srcStart; i < srcEnd; i++) {
DefaultHandle element = srcElems[i];
if (element.recycleId == 0) {
element.recycleId = element.lastRecycledId;
} else if (element.recycleId != element.lastRecycledId) {
throw new IllegalStateException("recycled already");
}
srcElems[i] = null;
if (dst.dropHandle(element)) {
// Drop the object.
continue;
}
element.stack = dst;
dstElems[newDstSize ++] = element;
}
/**
* 当head节点的对象全都转移给stack后取head下一个节点作为head下次转移的时候再从新的head转移回收的对象
*/
if (srcEnd == LINK_CAPACITY && head.next != null) {
// Add capacity back as the Link is GCed.
reclaimSpace(LINK_CAPACITY);
this.head = head.next;
}
/**
* 迁移完成后更新原始head Link的readIndex
*/
head.readIndex = srcEnd;
if (dst.size == newDstSize) {
return false;
}
dst.size = newDstSize;
return true;
} else {
// The destination stack is full already.
return false;
}
}
```