0%

golang-sync.Pool解析

sync.Pool是sync包下的一个组件,可以作为临时取还对象的一个 池子.

作用: 对于很多需要重复分配、回收内存的地方,sync.Pool 是一个很好的选择.频繁地分配、回收内存会给 GC 带来一定的负担,严重的时候会引起 CPU 的毛刺,sync.Pool 可以将暂时不用的对象缓存起来m待下次需要的时候直接使用,不用再次经过内存分配,复用对象的内存,减轻 GC 的压力,提升系统的性能.

使用场景:

  1. 当多个 goroutine 都需要创建同⼀个对象的时候,如果 goroutine 数过多,导致对象的创建数⽬剧增,进⽽导致 GC 压⼒增大.形成”并发⼤-占⽤内存⼤-GC 缓慢-处理并发能⼒降低-并发更⼤”这样的恶性循环.
  2. 关键思想就是对象的复用,避免重复创建.销毁.

Pool原理详解

type Pool struct {
	noCopy noCopy

	local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
	localSize uintptr        // size of the local array

	victim     unsafe.Pointer // local from previous cycle
	victimSize uintptr        // size of victims array

	// New optionally specifies a function to generate
	// a value when Get would otherwise return nil.
	// It may not be changed concurrently with calls to Get.
	New func() any
}
  • noCopy:
  • local: 每个P的本地队列,实际类型为 [P]poolLocal
  • localSize:
  • victicm:
  • victicmSize:
  • New: 自定义创建对象的回调函数,当pool中没有都可用对象时会调用

1. noCopy

nocopy:
因为Pool不希望被复制,所以结构体里面有一个noCopy字段, 使用 go vet 工具可以检查用户是否复制了 Pool.

用户只需要实现这样不需要消耗内存的,仅用于静态分析的结构, 保证对象在第一次使用后不会发生复制.

// noCopy may be embedded into structs which must not be copied
// after the first use.
// noCopy 可以被嵌入结构体来保证其第一次使用后不会在被复制.
//
// See https://golang.org/issues/8005#issuecomment-190753527
// for details.
type noCopy struct{}

// Lock is a no-op used by -copylocks checker from `go vet`.
// Lock 是一个空操作用来给 `go ve` 的 -copylocks 静态分析
func (*noCopy) Lock()   {}
func (*noCopy) Unlock() {}

2. local

local字段指向 [P]poolLocal 数组(切片)的指针, localSize 则表示 这个数组的大小. 访问时 P 的 id 对应 [P]poolLocal 下标索引, 这样的设计减少了 多个goroutine 的竞争,提升了性能.

2.1 poolLocal

type poolLocal struct {
	poolLocalInternal

	// 将 poolLocal 补齐至两个缓存行的倍数,防止 false sharing,
	// 每个缓存行具有 64 bytes,即 512 bit$$
	// 目前我们的处理器一般拥有 32 * 1024 / 64 = 512 条缓存行
	// 伪共享,仅占位用,防止在 cache line 上分配多个 poolLocalInternal
	// 
	// Prevents false sharing on widespread platforms with$$
	// 128 mod (cache line size) = 0 .
	pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

2.2 poolLocalInternal


// Local per-P Pool appendix.
type poolLocalInternal struct {
	private any       // Can be used only by the respective P.  仅能被各自的 P 获取
	shared  poolChain // Local P can pushHead/popHead; any P can popTail. 本地P可以从头部取, 其他 P 从尾部取
}

2.3 poolChain


// poolChain is a dynamically-sized version of poolDequeue.
//
// This is implemented as a doubly-linked list queue of poolDequeues
// where each dequeue is double the size of the previous one. Once a
// dequeue fills up, this allocates a new one and only ever pushes to
// the latest dequeue. Pops happen from the other end of the list and
// once a dequeue is exhausted, it gets removed from the list.
type poolChain struct {
	// head is the poolDequeue to push to. This is only accessed
	// by the producer, so doesn't need to be synchronized.
	head *poolChainElt

	// tail is the poolDequeue to popTail from. This is accessed
	// by consumers, so reads and writes must be atomic.
	tail *poolChainElt
}



type poolChainElt struct {
	poolDequeue

	// next and prev link to the adjacent poolChainElts in this
	// poolChain.
	//
	// next is written atomically by the producer and read
	// atomically by the consumer. It only transitions from nil to
	// non-nil.
	//
	// prev is written atomically by the consumer and read
	// atomically by the producer. It only transitions from
	// non-nil to nil.
	next, prev *poolChainElt
}

// 
// poolDequeue is a lock-free fixed-size single-producer,
// multi-consumer queue. The single producer can both push and pop
// from the head, and consumers can pop from the tail.
//
// It has the added feature that it nils out unused slots to avoid
// unnecessary retention of objects. This is important for sync.Pool,
// but not typically a property considered in the literature.
type poolDequeue struct {
	// headTail 包含一个 32 位的 head 和一个 32 位的 tail 指针. 这两个值都和 len(vals)-1 取模过.
	// tail 是队列中最老的数据,head 指向下一个将要填充的 slot
    // slots 的有效范围是 [tail, head),由 consumers 持有.
	// 
	// headTail packs together a 32-bit head index and a 32-bit
	// tail index. Both are indexes into vals modulo len(vals)-1.
	//
	// tail = index of oldest data in queue
	// head = index of next slot to fill
	//
	// Slots in the range [tail, head) are owned by consumers.
	// A consumer continues to own a slot outside this range until
	// it nils the slot, at which point ownership passes to the
	// producer.
	//
	// The head index is stored in the most-significant bits so
	// that we can atomically add to it and the overflow is
	// harmless.
	headTail uint64

	// 
	// vals 是一个存储 interface{} 的环形队列,它的 size 必须是 2 的幂
	// 如果 slot 为空,则 vals[i].typ 为空;否则,非空.
	// 一个 slot 在这时宣告无效: tail 不指向它了,vals[i].typ 为 nil
	// 由 consumer 设置成 nil,由 producer 读
	// 
	// vals is a ring buffer of interface{} values stored in this
	// dequeue. The size of this must be a power of 2.
	//
	// vals[i].typ is nil if the slot is empty and non-nil
	// otherwise. A slot is still in use until *both* the tail
	// index has moved beyond it and typ has been set to nil. This
	// is set to nil atomically by the consumer and read
	// atomically by the producer.
	vals []eface
}

poolDequeue 被设计成单生产者,多消费者固定长度&&无锁的 双端队列.
producer 可以从head插入和删除. consumer可以从尾部pop 数据.

headTail 指向队头和队尾, 通过位运算, 将 head & tail 存入 headTail中.

Pool结构体

我们看到 Pool 并没有直接使用 poolDequeue,原因是它的大小是固定的,而 Pool 的大小是没有限制的.
因此,在 poolDequeue 之上包装了一下,变成了一个 poolChainElt 的双向链表,可以动态增长.

3. victim

一轮 GC 完成后,victim 和 victimSize 会分别接管 local 和 localSize,victim 的机制用于减少GC后冷启动导致的性能抖动. 让分配对象更加平滑.

victim Cache 本来是计算机架构里面的一个概念,是让CPU硬件处理缓存的一种技术, sync.Pool引入的意图在于 降低GC压力的同时增加缓存命中率.

4. New

当Pool中没有对象可供提供时,会调用 New 生成一个新对象.

2. 源码详解

2.1. Get

// Get selects an arbitrary item from the Pool, removes it from the
// Pool, and returns it to the caller.
// Get may choose to ignore the pool and treat it as empty.
// Callers should not assume any relation between values passed to Put and
// the values returned by Get.
//
// If Get would otherwise return nil and p.New is non-nil, Get returns
// the result of calling p.New.
func (p *Pool) Get() any {
	if race.Enabled {
		race.Disable()
	}
	// 将当前的 goroutine 和 P绑定,禁止被强占,返回当前P对应的 localPool & pid
	l, pid := p.pin()
	x := l.private
	l.private = nil
	if x == nil {
		// Try to pop the head of the local shard. We prefer
		// the head over the tail for temporal locality of
		// reuse.
		x, _ = l.shared.popHead()
		if x == nil {
			// 尝试从 qita P 的 shared 双端队列尾部头一个对象出来.
			x = p.getSlow(pid)
		}
	}
	// pool 操作完成之后, 接触非抢占
	runtime_procUnpin()
	if race.Enabled {
		race.Enable()
		if x != nil {
			race.Acquire(poolRaceAddr(x))
		}
	}

	// 如果最后还是没有获取到缓存对象,那就直接调用预先设置好的回调函数 `New` 创建一个对象.
	if x == nil && p.New != nil {
		x = p.New()
	}
	return x
}

2.1.1 pin

// pin pins the current goroutine to P, disables preemption and
// returns poolLocal pool for the P and the P's id.
// Caller must call runtime_procUnpin() when done with the pool.
func (p *Pool) pin() (*poolLocal, int) {
	pid := runtime_procPin()
	// In pinSlow we store to local and then to localSize, here we load in opposite order.
	// Since we've disabled preemption, GC cannot happen in between.
	// Thus here we must observe local at least as large localSize.
	// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
	s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
	l := p.local                              // load-consume
	// 因为可能存在动态的 P(运行时调整 P 的个数)
	if uintptr(pid) < s {
		return indexLocal(l, pid), pid
	}
	return p.pinSlow()
}

调用方必须在完成取值后,调用 runtime.proc_Unpin() 来取消抢占.

pin 的作用就是将当前 groutine 和 P 绑定在一起,禁止抢占. 并且返回对应的 poolLocal 以及 P 的 id。

如果 G 被抢占,则 G 的状态从 running 变成 runnable,会被放回 P 的 localq 或 globaq,等待下一次调度.
下次再执行时,就不一定是和现在的 P 相结合了. 因为之后会用到 pid,如果被抢占了,有可能接下来使用的 pid 与所绑定的 P 并非同一个.


func (p *Pool) pinSlow() (*poolLocal, int) {
	// Retry under the mutex.
	// Can not lock the mutex while pinned.
	runtime_procUnpin()
	allPoolsMu.Lock()
	defer allPoolsMu.Unlock()
	pid := runtime_procPin()
	// poolCleanup won't be called while we are pinned.
	s := p.localSize
	l := p.local
	if uintptr(pid) < s {
		return indexLocal(l, pid), pid
	}
	if p.local == nil {
		allPools = append(allPools, p)
	}
	// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
	size := runtime.GOMAXPROCS(0)
	local := make([]poolLocal, size)
	atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
	runtime_StoreReluintptr(&p.localSize, uintptr(size))     // store-release
	return &local[pid], pid
}

因为有一把大锁 allPoolsMu, 所以函数名带有 slow. 锁粒度越大,竞争越多,就越慢. 不过想要上锁的话,先要解除绑定. 原因是锁越大,被阻塞的概率越大,如果还占着 P, 那就浪费资源.

2.1.2 popHead


func (c *poolChain) pushHead(val any) {
	d := c.head
	if d == nil {
		// Initialize the chain.
		const initSize = 8 // Must be a power of 2
		d = new(poolChainElt)
		d.vals = make([]eface, initSize)
		c.head = d
		storePoolChainElt(&c.tail, d)
	}

	if d.pushHead(val) {
		return
	}

	// The current dequeue is full. Allocate a new one of twice
	// the size.
	newSize := len(d.vals) * 2
	if newSize >= dequeueLimit {
		// Can't make it any bigger.
		newSize = dequeueLimit
	}

	d2 := &poolChainElt{prev: d}
	d2.vals = make([]eface, newSize)
	c.head = d2
	storePoolChainElt(&d.next, d2)
	d2.pushHead(val)
}

func (c *poolChain) popHead() (any, bool) {
	d := c.head
	for d != nil {
		// 调用 dequeue 的 popHead
		if val, ok := d.popHead(); ok {
			return val, ok
		}
		// There may still be unconsumed elements in the
		// previous dequeue, so try backing up.
		d = loadPoolChainElt(&d.prev)
	}
	return nil, false
}

popHead 只会被 producer调用,首先拿到头结点: ,如果头结点不为空,尝试调用 头结点(poolDequeue)的 popHead().


// 自旋锁的模式,避免加锁.
// 
// popHead removes and returns the element at the head of the queue.
// It returns false if the queue is empty. It must only be called by a
// single producer.
func (d *poolDequeue) popHead() (any, bool) {
	var slot *eface
	for {
		ptrs := atomic.LoadUint64(&d.headTail)
		head, tail := d.unpack(ptrs)
		if tail == head {
			// Queue is empty.
			return nil, false
		}

		// head 是队头的前一个位置,所以要后移一位.
		// 在读出 slot 的 value 之前就将 head值 -1,取消对这个 slot 的控制.
		// Confirm tail and decrement head. We do this before
		// reading the value to take back ownership of this
		// slot.
		head--
		ptrs2 := d.pack(head, tail)
		if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
			// We successfully took back slot.
			slot = &d.vals[head&uint32(len(d.vals)-1)]
			break
		}
	}

	val := *(*any)(unsafe.Pointer(slot))
	if val == dequeueNil(nil) {
		val = nil
	}
	// Zero the slot. Unlike popTail, this isn't racing with
	// pushHead, so we don't need to be careful here.
	*slot = eface{}
	return val, true
}

通过 自旋锁的模式(for 循环 + CAS) 避免加锁.

2.1.3 getSlow

如果在 shared里面没有获得缓存对象,则继续调用 Pool.getSlow, 尝试从其他 P 的 poolLocal 中偷取.

func (p *Pool) getSlow(pid int) any {
	// See the comment in pin regarding ordering of the loads.
	size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
	locals := p.local                            // load-consume
	// 尝试从其他P中偷取 对象.
	// Try to steal one element from other procs.
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals, (pid+i+1)%int(size))
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	}

	// 尝试从victim cache中取对象。这发生在尝试从其他 P 的 poolLocal 偷去失败后,
	// 因为这样可以使 victim 中的对象更容易被回收.
	// 
	// Try the victim cache. We do this after attempting to steal
	// from all primary caches because we want objects in the
	// victim cache to age out if at all possible.
	size = atomic.LoadUintptr(&p.victimSize)
	if uintptr(pid) >= size {
		return nil
	}
	locals = p.victim
	l := indexLocal(locals, pid)
	if x := l.private; x != nil {
		l.private = nil
		return x
	}
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals, (pid+i)%int(size))
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	}

	// 清空 victimCache,下次就不用从这里面找了.
	// Mark the victim cache as empty for future gets don't bother
	// with it.
	atomic.StoreUintptr(&p.victimSize, 0)

	return nil
}
  1. 从索引为 pid + 1 的 poolLocal 开始, 尝试调用shared.popTail 获取缓存对象. 如果没有拿到,从victim中查找. 和 从 poolLocal 的逻辑类似.
  2. 最后 如果还没有找到,就把 victimSize 值 0. 防止后来的人再从 victim中找.
  3. 在 Get 函数的最后,经过这一番操作还是没找到缓存的对象,就调用 New 函数创建一个新的对象.

2.1.4 popTail

func (c *poolChain) popTail() (any, bool) {
	d := loadPoolChainElt(&c.tail)
	if d == nil {
		return nil, false
	}

	for {
		// It's important that we load the next pointer
		// *before* popping the tail. In general, d may be
		// transiently empty, but if next is non-nil before
		// the pop and the pop fails, then d is permanently
		// empty, which is the only condition under which it's
		// safe to drop d from the chain.
		d2 := loadPoolChainElt(&d.next)

		if val, ok := d.popTail(); ok {
			return val, ok
		}

		if d2 == nil {
			// This is the only dequeue. It's empty right
			// now, but could be pushed to in the future.
			return nil, false
		}

		// The tail of the chain has been drained, so move on
		// to the next dequeue. Try to drop it from the chain
		// so the next pop doesn't have to look at the empty
		// dequeue again.
		if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
			// We won the race. Clear the prev pointer so
			// the garbage collector can collect the empty
			// dequeue and so popHead doesn't back up
			// further than necessary.
			// 甩掉尾结点.
			storePoolChainElt(&d2.prev, nil)
		}
		d = d2
	}
}
// popTail removes and returns the element at the tail of the queue.
// It returns false if the queue is empty. It may be called by any
// number of consumers.
func (d *poolDequeue) popTail() (any, bool) {
	var slot *eface
	for {
		ptrs := atomic.LoadUint64(&d.headTail)
		head, tail := d.unpack(ptrs)
		if tail == head {
			// Queue is empty.
			return nil, false
		}

		// Confirm head and tail (for our speculative check
		// above) and increment tail. If this succeeds, then
		// we own the slot at tail.
		ptrs2 := d.pack(head, tail+1)
		if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
			// Success.
			slot = &d.vals[tail&uint32(len(d.vals)-1)]
			break
		}
	}

	// We now own slot.
	val := *(*any)(unsafe.Pointer(slot))
	if val == dequeueNil(nil) {
		val = nil
	}

	// Tell pushHead that we're done with this slot. Zeroing the
	// slot is also important so we don't leave behind references
	// that could keep this object live longer than necessary.
	//
	// We write to val first and then publish that we're done with
	// this slot by atomically writing to typ.
	slot.val = nil
	atomic.StorePointer(&slot.typ, nil)
	// At this point pushHead owns the slot.

	return val, true
}

2.2. Put

// Put adds x to the pool.
func (p *Pool) Put(x any) {
	if x == nil {
		return
	}
	if race.Enabled {
		if fastrandn(4) == 0 {
			// Randomly drop x on floor.
			return
		}
		race.ReleaseMerge(poolRaceAddr(x))
		race.Disable()
	}
	l, _ := p.pin()
	if l.private == nil {
		l.private = x
		x = nil
	}
	if x != nil {
		l.shared.pushHead(x)
	}
	runtime_procUnpin()
	if race.Enabled {
		race.Enable()
	}
}
  1. 先绑定 g 和 P. 然后尝试将 x 赋值给 private字段.
  2. 如果失败. 就调用 pushHead() 尝试将其放入 shared字段 维护的双端队列中.

2.2.1 pushHead

func (c *poolChain) pushHead(val any) {
	d := c.head
	if d == nil {
		// Initialize the chain.
		const initSize = 8 // Must be a power of 2
		d = new(poolChainElt)
		d.vals = make([]eface, initSize)
		c.head = d
		storePoolChainElt(&c.tail, d)
	}

	if d.pushHead(val) {
		return
	}

	// 当前 poolDequeue 满了. 分配一个 当前 poolDequeue 2倍的一个 poolDequeue
	// The current dequeue is full. Allocate a new one of twice
	// the size.
	newSize := len(d.vals) * 2
	if newSize >= dequeueLimit {
		// Can't make it any bigger.
		newSize = dequeueLimit
	}

	// 收尾相连. 构成链表
	d2 := &poolChainElt{prev: d}
	d2.vals = make([]eface, newSize)
	c.head = d2
	storePoolChainElt(&d.next, d2)
	d2.pushHead(val)
}
// // 将 val 添加到双端队列头部。如果队列已满,则返回 false。此函数只能被一个生产者调用
//
// pushHead adds val at the head of the queue. It returns false if the
// queue is full. It must only be called by a single producer.
func (d *poolDequeue) pushHead(val any) bool {
	ptrs := atomic.LoadUint64(&d.headTail)
	head, tail := d.unpack(ptrs)

	//队列满了
	if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
		// Queue is full.
		return false
	}
	slot := &d.vals[head&uint32(len(d.vals)-1)]

	// Check if the head slot has been released by popTail.
	typ := atomic.LoadPointer(&slot.typ)
	if typ != nil {
		// Another goroutine is still cleaning up the tail, so
		// the queue is actually still full.
		return false
	}

	// The head slot is free, so we own it.
	if val == nil {
		val = dequeueNil(nil)
	}
	*(*any)(unsafe.Pointer(slot)) = val

	// Increment head. This passes ownership of slot to popTail
	// and acts as a store barrier for writing the slot.
	atomic.AddUint64(&d.headTail, 1<<dequeueBits)
	return true
}

3. pack & unpack

3. GC


[参考]
深度解密 Go 语言之 sync.Pool
请问sync.Pool有什么缺点?
几个 Go 系统可能遇到的锁问题

欢迎关注我的其它发布渠道