Netty对象池
1.Netty对象池实现思路介绍
- 基于FastThreadLocal实现
- 存储对象的结构为数组
2.使用方式
public class UserCache {
private static final Recycler<User> userRecycler = new Recycler<User>() {
@Override
protected User newObject(Handle<User> handle) {
return new User(handle);
}
};
static final class User {
private String name;
private Recycler.Handle<User> handle;
public void setName(String name) {
this.name = name;
}
public String getName() {
return name;
}
public User(Recycler.Handle<User> handle) {
this.handle = handle;
}
public void recycle() {
// 如果User对象有其他属性要在这把属性都重置为初始值
// 因为对象要被多次使用,如果不重置属性,再拿出来这个对象属性值就不正确了
handle.recycle(this);
}
}
public static void main(String[] args) {
User user1 = userRecycler.get(); // 1、从对象池获取 User 对象
user1.setName("hello"); // 2、设置 User 对象的属性
user1.recycle(); // 3、回收对象到对象池
User user2 = userRecycler.get(); // 4、从对象池获取对象
System.out.println(user2.getName());
System.out.println(user1 == user2);
}
}
3.认识Netty对象池的核心类
介绍
Netty对象池就一个抽象类Recycler,只不过内部包裹了很多静态内部类如:Handle,DefaultHandle,Stack,WeakOrderQueue,Head,Link等
核心类包裹层次
类职责基本认识
Recycler类
职责
整合各种类的对象对外提供对象池的功能,相当于门面模式对外提供简单易用的方法,注意这是一个抽象类
重要属性
- FastThreadLocal
> threadLocal:每个线程对应的Stack也就是对象池 - FastThreadLocal
- 各种配置属性:比如Stack对象池默认大小,Stack对象池最大大小,回收比例等等
重要方法
- get():从对象池中获取一个对象,如果对象池中没有该对象会调用newObject()创建一个被池化对象
- newObject():需要继承Recycler类的类去实现的方法,属于一个模板方法,创建一个被池化对象
Stack类
职责
这是真正的负责保存对象的对象池,内部最重要的属性就是elements,保存在FastThreadLocal中每个线程都会有自己的Stack对象
重要属性
重要方法
DefaultHandle类
职责
包装了要被池化的对象,为什么要包装?因为一个对象要保存在对象池中除了对象自身意外还要记录一些其他的信息例如:该对象是否被回收,该对象所属的对象池是谁,等等
重要属性
重要方法
- recycle(Object obj):把对象释放,重新放回到池子中
4.获取对象源码
流程图
源码
// 对象池内部池化的是Entry对象
private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
@Override
protected Entry newObject(Handle<Entry> handle) {
return new Entry(handle);
}
};
static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
// 从对象池获取一个Entry
Entry entry = RECYCLER.get();
// 忽略部分代码
return entry;
}
@SuppressWarnings("unchecked")
public final T get() {
// 如果对象池容量是0则不会池化,而是直接返回对象
if (maxCapacityPerThread == 0) {
return newObject((Handle<T>) NOOP_HANDLE);
}
// 从FastThreadLocal中获取当前线程的Stack也就是对象池
Stack<T> stack = threadLocal.get();
// 弹出一个对象
DefaultHandle<T> handle = stack.pop();
// 如果没有
if (handle == null) {
// 创建一个Handle
handle = stack.newHandle();
// 创建对象然后赋值给属性
handle.value = newObject(handle);
}
return (T) handle.value;
}
5.同线程释放对象
介绍
这个很好理解,就是本线程获取了本线程的对象池中的对象使用完成后进行释放
流程图
源码
@Override
public void recycle(Object object) {
// 参数判断
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
// 获得对象池
Stack<?> stack = this.stack;
if (lastRecycledId != recycleId || stack == null) {
throw new IllegalStateException("recycled already");
}
// 放回到对象池
stack.push(this);
}
void push(DefaultHandle<?> item) {
// 获取当前线程
Thread currentThread = Thread.currentThread();
// 判断当前线程和对象池的持有线程是不是一个,如果是直接放回到对象池中即可
if (threadRef.get() == currentThread) {
pushNow(item);
} else {
// 不是则说明是其他线程帮忙回收的,那就放到其他线程对应的WeakOrderQueue对象中即可
// 注意:既然走到这里,也就说明执行当前方法的线程已经是帮忙回收对象的线程了
// 这个currentThread参数就是帮忙回收对象的线程。所以才有了后来每一个帮忙回收对象的线程都有一个WeakOrderQueue对象
pushLater(item, currentThread);
}
}
private void pushNow(DefaultHandle<?> item) {
/*
* 已经回收的判断逻辑
* 条件成立:就是已经回收过了或者正在回收
* 如果线程回收自己对象池中的对象,那么Handle中的这两个属性都还是0,是相等的
* 其中有一个不等于0就说明已经被回收了,或者至少放在WakeOrderQueue了
*/
if ((item.recycleId | item.lastRecycledId) != 0) {
throw new IllegalStateException("recycled already");
}
// 走到这里说明这个对象没有回收过
// 把OWN_THREAD_ID赋值给lastRecycledId与recycleId
// 因为是当前线程回收对象所以这个两个值是相等的(recycleId,lastRecycledId)
// 这两个值一致说明当前Handle被回收掉了
item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
// 获得对象池中的数量
int size = this.size;
// 如果对象池容量已经达到上限 或者 没有达到回收比例
// 不进行回收了直接返回
if (size >= maxCapacity || dropHandle(item)) {
return;
}
// 对象池数组扩容逻辑,条件是对象池当前数量已经达到了数组最大容量
if (size == elements.length) {
// 扩容至两倍
elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
}
// 放入到对象池中就是回收了
elements[size] = item;
// 更新size
this.size = size + 1;
}
6.异线程回收情况介绍
每个线程都有自己的对象池,但是比如现在有Thread-1与Thread-2两个线程,Thread-2拿到了Thread-1的对象池中的对象,那么此时Thread-2要释放这个对象,肯定是不能释放到自己的对象池中的,应该释放到Thread-1的对象池中,那么在这种情况下Thread-2就成了帮Thread-1回收对象的线程了,具体可以看下图:
Netty的对象池在没有出现这种情况的逻辑是很简单的,但是Netty对象池为了处理这种逻辑做了比较复杂的设计
7.异线程回收核心类
WeakOrderQueue类
为什么设计这个类
帮助释放的问题,Thread-2在释放对象的时候可以直接把对象放到Thread-1的对象池中这是可以的但是会有并发问题要解决要上锁很影响性能,具体如下:
用了这个类之后的变化
帮助回收对象的线程会把被释放的对象存储到i自己的WeakOrderQueue中,当Thread-1线程对应的对象池中为空的时候并且要获取对象的时候则会以它的WeakOrderQueue为起点去遍历其他线程的WeakOrderQueue中的数据然后转移给自己的对象池中,这样就避免了并发冲突问题实现了无锁方式的帮助回收
职责
存储帮助回收的对象,实现了无锁的帮助回收
重要属性
重要方法
WeakOrderQueue内部是如何存储Handle的
Link类
介绍
WeakOrderQueue内部要存储帮助释放的对象,而且要方便记录读写情况,所以它采用了数组存储+记录读写索引的方式来实现
这个类是转移对象的基本单位,请注意这个类继承自AtomicInteger,AtomicInteger的value是写索引
重要属性
- int value:继承自AtomicInteger,用来表示写索引
- int readIndex:读索引
- DefaultHandle>[] elements:存储Handle的数组,长度默认为16不会扩容
- Link next:链表指针,指向下一个Link节点
8.异线程回收情况下的整体结构图
9.异线程释放对象流程
流程图
源码
private void pushLater(DefaultHandle<?> item, Thread thread) {
// FastThreadLocal保存的WeakHashMap
// Key是该线程帮忙回收对象线程所拥有的stack,value就是该线程自己的WeakOrderQueue
Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
// 根据Stack获取WeakOrderQueue
WeakOrderQueue queue = delayedRecycled.get(this);
// 第一次帮助这个线程回收对象WeakOrderQueue是NULL就去创建
if (queue == null) {
// 每个线程能帮忙回收的线程数量是由最大值的,超过最大值肯定就不行了
// 怎么知道当前线程目前帮助了几个线程进行回收呢,就是delayedRecycled的size即可,只要size>=maxDelayedQueues就不帮忙回收了
if (delayedRecycled.size() >= maxDelayedQueues) {
// 把stack和一个标识对象添进去
// 这个WeakOrderQueue.DUMMY属性,如果WeakOrderQueue链表中的一个对象是这个,就不会帮助回收对象
delayedRecycled.put(this, WeakOrderQueue.DUMMY);
return;
}
// 走到这里说明当前线程帮助其他线程回收对象的线程数量没有达到最大值
// 创建一个WeakOrderQueue并且把被绑住回收线程的对象池和当前线程传进来
// 这里就真正为帮助回收的线程创建了一个WeakOrderQueue,然后加入到stack中WeakOrderQueue链表的头节点处
if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
return;
}
// 放入到Map中
delayedRecycled.put(this, queue);
} else if (queue == WeakOrderQueue.DUMMY) {
// 走到这里就意味着queue不为null,但是为WeakOrderQueue.DUMMY
// 这也表示该WeakOrderQueue.DUMMY不会帮助回收对象
return;
}
// 把Handle放到WeakOrderQueue中
queue.add(item);
}
static WeakOrderQueue allocate(Stack<?> stack, Thread thread) {
// 首先判断是否还有剩余的可帮助回收的对象的数量,如果数量不够则不创建WeakOrderQueue对象
return Head.reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY)
? newQueue(stack, thread) : null;
}
static WeakOrderQueue newQueue(Stack<?> stack, Thread thread) {
// 创建一个WeakOrderQueue
final WeakOrderQueue queue = new WeakOrderQueue(stack, thread);
//还记得之前说的吗?WeakOrderQueue都是头插法插入到WeakOrderQueue链表中的
//点进去会发现setHead方法是整个类中唯一一个加了锁了方法。因为把WeakOrderQueue对象添加到WeakOrderQueue链表头部
//很可能是多个线程同时在进行,因为一个线程从对象池中获得的多个对象可能需要多个线程帮其回收,每个帮助回收的
//线程都对应着一个WeakOrderQueue对象,自然会出现同时把WeakOrderQueue对象添加到头节点的情况
//这时候就必须用synchronized来防止并发问题的出现
//注意,这里是stack在调用setHead方法
stack.setHead(queue);
return queue;
}
void add(DefaultHandle<?> handle) {
// 这里看到lastRecycledId再次登场了,其实就可以把handle看成要被回收的对象,现在该对象内的lastRecycledId被id赋值了
// 而id就是WeakOrderQueue中的对应的帮助回收的线程的id,这就把我们之前讲的那个给对应上了
// 现在要被回收的对象中的lastRecycledId是id这个值,这就意味着该对象的回收进度已经进行到一半了,它还未完全被回收
// 只是放在了帮助它回收的线程中,等它完全被回收的时候,handle中另一个属性recycleId也会被再次赋值,和lastRecycledId相等了
handle.lastRecycledId = id;
// 找到link链表的尾节点。就像我们上面说的,从尾节点添加被回收对象,从头节点取走
Link tail = this.tail;
// 这个writeIndex其实就可以代表是写指针,表示我们link对象中的数组写到第几位了
int writeIndex;
// 尾巴节点的写索引已经到了16了说明要创建新的Link节点了,新的Link节点插入到链表尾部
if ((writeIndex = tail.get()) == LINK_CAPACITY) {
// 判断可帮助创建的剩余对象数量还够不够,不够就直接返回
if (!head.reserveSpace(LINK_CAPACITY)) {
return;
}
// 创建新的Link然后tail指向它
this.tail = tail = tail.next = new Link();
// 新的tail节点的写索引应该是0
writeIndex = tail.get();
}
// 放到新的Link里面
tail.elements[writeIndex] = handle;
// 这里有个很有意思的地方,就是可以看到handle中的stack会被置为null,我们知道handle中的stack实际上对应的就是原来线程
// 的对象池,而这个stack对象内部其实持有者它对应的线程。所以我们可以通过stack这个属性找到对象池所属的线程。
// 但这里把该属性置为null了。这是考虑到在对象被回收的过程中如果原来对象池对应的线程突然挂掉了,线程挂掉了那对应的
// stack对象池也就不能再被使用了。所以这里把stack设置为null,是为了消除这个强引用,帮助jvm垃圾回收挂掉的线程对应的stack
// 但是如果线程没挂掉呢?这里却置为null了,这说明一定有一个地方还会把handle中的stack还原。没错,就是在该对象被转移回自己的stack时还原了
handle.stack = null;
// 更新写索引
tail.lazySet(writeIndex + 1);
}
10.异线程回收情况下的对象获取流程
@SuppressWarnings({"unchecked", "rawtypes"})
DefaultHandle<T> pop() {
// 对象池中对象的数量
int size = this.size;
// 如果当前对象池中的没有对象了
if (size == 0) {
// 这时候就要从其他帮忙回收对象的线程的WeakOrderQueue对象中看看是否有可以转移回来的对象,如果有就转移回来,没有就返回NULL
if (!scavenge()) {
return null;
}
// 走到这里说明WeakOrderQueue有数据,转移完成之后重新给size赋值
size = this.size;
}
// 要从Stack中获取一个对象所以减一
size--;
// 存储对象的个数减1正好得到数组存储的最后一个对象的数组下标,取出该下标的对象
DefaultHandle ret = elements[size];
// 把数组原位置置为null
elements[size] = null;
if (ret.lastRecycledId != ret.recycleId) {
throw new IllegalStateException("recycled multiple times");
}
// 刚从对象池拿出来的时候这两个属性都设置为0
ret.recycleId = 0;
ret.lastRecycledId = 0;
// 取走了一个对象,要重新给size赋值
this.size = size;
return ret;
}
boolean scavenge() {
// 这一步就是从其他线程的WeakOrderQueue对象中把属于自己对象池的对象移回到自己的stack中
if (scavengeSome()) {
return true;
}
// 如果所有WeakOrderQueue节点中都没有对象,就把当前节点和前驱节点重新初始化,因为在查找的过程中,这两个值已经发生改变了
prev = null;
cursor = head;
return false;
}
boolean scavengeSome() {
// 前驱节点
WeakOrderQueue prev;
// 也定义好当前节点,这时候当前节点还是null
WeakOrderQueue cursor = this.cursor;
// 这里会有一个判断,看当前节点是否为null,如果为null,说明是第一次扫描WeakOrderQueue链表
if (cursor == null) {
// 设置前驱节点也为NULL,一般这个时候也是NULL
prev = null;
// 获取WeakOrderQueue的head,也就是链表头
cursor = head;
// 判断head是不是NULL,是NULL说明没有其他线程帮它回收对象,return false代表转移失败
if (cursor == null) {
return false;
}
} else {
prev = this.prev;
}
// 走到这里cursor一定不为NULL,这里先是当作head看
// 成功标记
boolean success = false;
do {
// 调用head节点的transfer方法,传递的对象stack
// 这个方法之前讲过了吧,可以看之前的逻辑,就是把WeakOrderQueue对象中的
// 数据转移到当前stack中,一次限定转移一个link中数组的容量,也就是16
if (cursor.transfer(this)) {
// 转移成功标记
success = true;
// 结束
break;
}
// 走到这里意味着刚才的那个当前节点中没有可被回收的对象,这时候就找下一个WeakOrderQueue节点查看
// 所以先得到当前节点的下一个节点
WeakOrderQueue next = cursor.next;
// 这里要判断一下当前节点所对应的线程是不是已经挂掉了
// 注意,这里当前节点对应的是帮助回收对象的线程
if (cursor.owner.get() == null) {
// 如果真的挂掉了,就判断当前节点中是否还有未被转移到stack的数据
// 内部逻辑就是判断读写所以是否不一样,只要读写索引不一样就还有数据
if (cursor.hasFinalData()) {
// 自旋转移这个WeakOrderQueue的所有数据
for (; ; ) {
// 如果有,就把所有数据都回收一下,这里是循环回收,并不是只回收一次
if (cursor.transfer(this)) {
// 只要转移了就设置成功标记
success = true;
} else {
break;
}
}
}
if (prev != null) {
prev.setNext(next);
}
} else {
// 走到这里说明帮助回收对象的线程没有挂掉,把WeakOrderQueue当前节点保存到prev中
prev = cursor;
}
// 把next节点保存至cursor
cursor = next;
// 退出条件
} while (cursor != null && !success);
// 转移完了之后,给改变了的当前节点和前驱节点赋值
this.prev = prev;
this.cursor = cursor;
return success;
}
// 尽可能多地将队列中的项目转移到堆栈中,如果有项目转移,返回true
@SuppressWarnings("rawtypes")
boolean transfer(Stack<?> dst) {
// 获取当前WeakOrderQueue的Link的头节点
Link head = this.head.link;
// 如果Link链表是NULL说明这个里面根本没有帮忙回收的对象返回false即可
if (head == null) {
return false;
}
// 如果这个节点的读索引已经是16了,说明这个Link节点读取完了应该读取下一个Link节点
if (head.readIndex == LINK_CAPACITY) {
// 如果没有下一个节点
if (head.next == null) {
return false;
}
// 走到这里说明有下一个Link节点,这里获得下一个Link节点并且赋值给head,说明以前的Link都被读取过了作废了!
this.head.link = head = head.next;
// 更新availableSharedCapacity,加16,所有线程帮助回收的对象的总量增加了16
this.head.reclaimSpace(LINK_CAPACITY);
}
// 获取这个head的读索引
final int srcStart = head.readIndex;
// 获取这个Link的写索引
int srcEnd = head.get();
// 写索引-读索引,是还有多少个可以读
final int srcSize = srcEnd - srcStart;
// 如果这个Link内部没有可读的了则返回false
if (srcSize == 0) {
return false;
}
// 获取Stack中的对象数量
final int dstSize = dst.size;
// 未来增加后的Stack的对象数量
final int expectedCapacity = dstSize + srcSize;
// 如果计算后的数量 > 对象池的容量,这种判断可以猜测到是数组扩容逻辑
if (expectedCapacity > dst.elements.length) {
// 具体的扩容逻辑不用看,也没啥看的必要,扫一眼即可,确保没有其他重要的逻辑即可,如果没有忽略,这种属于工具方法没啥看的必要
final int actualCapacity = dst.increaseCapacity(expectedCapacity);
// Link中对象已读数量 + (Stack对象池扩容后的容量 - 当前的Stack中的对象数量 = 剩余的可写容量) = 该Link节点中一共转移走的对象个数综合
// Link写索引也就是Link中存储元素的个数
// 这里代表最终可以转移走的对象的数量,不能超过16,说实话有点没太看懂 后面回头再看
srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
}
// Link中对象已读数量(已经被转移走的对象数量) != 最终可以转移走的对象数量
// 读索引 != 写索引,反过来思考他俩相等说明什么?没有可读取的数据
// 反过来就是什么?有可读取的数据
if (srcStart != srcEnd) {
// 获取当前Link的所有Handle
final DefaultHandle[] srcElems = head.elements;
// 获取Stack里面的所有Handle
final DefaultHandle[] dstElems = dst.elements;
// Stack中的对象数量
int newDstSize = dstSize;
// 遍历Link的可读部分
for (int i = srcStart; i < srcEnd; i++) {
// 获取当前的Handle
DefaultHandle element = srcElems[i];
// 如果这个是0说明该对象确实还没有回收到Stack
if (element.recycleId == 0) {
// 更新recycleId为lastRecycledId
element.recycleId = element.lastRecycledId;
} else if (element.recycleId != element.lastRecycledId) {
// 该Handle被回收了
throw new IllegalStateException("recycled already");
}
// 帮助GC
srcElems[i] = null;
// 判断是否达到回收比例
if (dst.dropHandle(element)) {
// 没有达到就丢弃
continue;
}
// 赋值Stack,在Handle放到线程的WeakOrderQueue的时候Handle内部关联的Stack被清除了,在这里被重新赋值
element.stack = dst;
// 放入到Stack中
dstElems[newDstSize++] = element;
}
/*
* 前提条件:走到这里已经转移完了这个Link的所有可读数据
* 条件1:当前Link写完了,也就是当前Link数据都转移走了
* 条件2:不是最后一个Link
*/
if (srcEnd == LINK_CAPACITY && head.next != null) {
// 这里就可以增加可以帮助回收对象的数量,然后把下一个link节点设置成头节点
// 并且帮助垃圾回收
this.head.reclaimSpace(LINK_CAPACITY);
this.head.link = head.next;
}
// 把写指针的值赋给读指针,说明link节点的数据全都读取完了
head.readIndex = srcEnd;
// 这里是判断,经过转移后,创建对象的线程的对象池容量有没有变化,是不是还是旧的容量,也就是存储的元素个数没变
if (dst.size == newDstSize) {
return false;
}
// 有变化就给size重新赋值
dst.size = newDstSize;
return true;
} else {
return false;
}
}