0%

freecache源码解析

代码仓库地址

freeCache 相比较 golang 的原生map实现缓存,可以通过减少指针的数量避免 GC压力,无论存储了多少数据,内部只会占用 512个指针,
数据集 通过 hash(key) 被分片256个 segment,每个 segment 有两个指针,

  • 一个存储键和值的唤醒缓冲区
  • 另一个是用于查找索引条目的索引切片
    每个 segment 都有自己的 sync.Mutex,所以支持多线程访问.

[TOC]

特性:

  • 存储百万的 entrys
  • Zero GC overhead
  • 线程安全的并发访问
  • 纯Golang实现
  • 支持数据过期
  • LRU缓存替换策略
  • 严格限制内存使用
  • 附带一个 demo server,支持 一些带有管道(pipeline)的 redis命令
  • 迭代支持

1. 数据结构

datastructure

将 缓存分为 256 个段 segment, 每个 segment 分为 256 个slot. 采用 开发寻址发 解决 hash冲突问题.

  • 每个segment 一把锁, 减小锁粒度
  • 每个 slot 存储数据的索引(在 ringBuffer中的偏移量)
  • rinbBuffer 循环数组,实际存储数据.

Cache

// Cache is a freecache instance.
type Cache struct {
	locks    [segmentCount]sync.Mutex // 减小锁粒度, 每个 segment 一把锁
	segments [segmentCount]segment
}

segment

// a segment contains 256 slots, a slot is an array of entry pointers ordered by hash16 value
// the entry can be looked up by hash value of the key.
type segment struct {
	rb    RingBuf // ring buffer that stores data 环形缓冲区 存储数据
	segId int
	_     uint32

	// 一些 统计信息
	missCount     int64
	hitCount      int64
	entryCount    int64
	totalCount    int64 // number of entries in ring buffer, including deleted entries.
	totalTime     int64 // used to calculate least recent used entry.
	totalEvacuate int64 // used for debug
	totalExpired  int64 // used for debug
	overwrites    int64 // used for debug
	touched       int64 // used for debug

	timer     Timer      // Timer giving current time 用于计算过期时间
	vacuumLen int64      // up to vacuumLen, new data can be written without overwriting old data.
	slotLens  [256]int32 // The actual length for every slot.   - (每个 slot 容纳的 entryPtr  数量 len)
	slotCap   int32      // max number of entry pointers a slot can hold. - (每个 slot 可以容纳的 entryPtr  cap)
	slotsData []entryPtr // shared by all 256 slots - (用来解决 hash冲突, 每个 slot 是一个 []entryPtr, 是直接寻址法)
}

Entry


// entry pointer struct points to an entry in ring buffer, 
type entryPtr struct {
	offset   int64  // entry offset in ring buffer
	hash16   uint16 // entries are ordered by hash16 in a slot.
	keyLen   uint16 // used to compare a key
	reserved uint32
}

// entry header struct in ring buffer, followed by key and value.
type entryHdr struct {
	accessTime uint32
	expireAt   uint32
	keyLen     uint16
	hash16     uint16
	valLen     uint32
	valCap     uint32
	deleted    bool
	slotId     uint8
	reserved   uint16
}

entryPtr: slot 中存储的数据结构
entryHdr: ringBuffer 中存储的 entry 的 Header 结构

每个slot 中存储的 entryPtr. entryPtr.offset 指向 entry 在 ringBuffer 中的偏移量.

ringBuffer 中 entry的数据结构

ringBuffer

ringBuffer 固定大小, 当超过 容量时, 新数据会覆盖老数据.
数据 存储在 data[begin] -> data[end] 之间.

// Ring buffer has a fixed size, when data exceeds the
// size, old data will be overwritten by new data.
// It only contains the data in the stream from begin to end
type RingBuf struct {
	begin int64 // beginning offset of the data stream.
	end   int64 // ending offset of the data stream.
	data  []byte
	index int //range from '0' to 'len(rb.data)-1'
}

2. 常用操作:

# go doc freecache Cache

package freecache // import "github.com/coocood/freecache"

type Cache struct {
        // Has unexported fields.
}
    Cache is a freecache instance.

func NewCache(size int) (cache *Cache)
func NewCacheCustomTimer(size int, timer Timer) (cache *Cache)
func (cache *Cache) AverageAccessTime() int64
func (cache *Cache) Clear()
func (cache *Cache) Del(key []byte) (affected bool)
func (cache *Cache) DelInt(key int64) (affected bool)
func (cache *Cache) EntryCount() (entryCount int64)
func (cache *Cache) EvacuateCount() (count int64)
func (cache *Cache) ExpiredCount() (count int64)
func (cache *Cache) Get(key []byte) (value []byte, err error)
func (cache *Cache) GetFn(key []byte, fn func([]byte) error) (err error)
func (cache *Cache) GetInt(key int64) (value []byte, err error)
func (cache *Cache) GetIntWithExpiration(key int64) (value []byte, expireAt uint32, err error)
func (cache *Cache) GetOrSet(key, value []byte, expireSeconds int) (retValue []byte, err error)
func (cache *Cache) GetWithBuf(key, buf []byte) (value []byte, err error)
func (cache *Cache) GetWithExpiration(key []byte) (value []byte, expireAt uint32, err error)
func (cache *Cache) HitCount() (count int64)
func (cache *Cache) HitRate() float64
func (cache *Cache) LookupCount() int64
func (cache *Cache) MissCount() (count int64)
func (cache *Cache) NewIterator() *Iterator
func (cache *Cache) OverwriteCount() (overwriteCount int64)
func (cache *Cache) Peek(key []byte) (value []byte, err error)
func (cache *Cache) PeekFn(key []byte, fn func([]byte) error) (err error)
func (cache *Cache) ResetStatistics()
func (cache *Cache) Set(key, value []byte, expireSeconds int) (err error)
func (cache *Cache) SetAndGet(key, value []byte, expireSeconds int) (retValue []byte, found bool, err error)
func (cache *Cache) SetInt(key int64, value []byte, expireSeconds int) (err error)
func (cache *Cache) TTL(key []byte) (timeLeft uint32, err error)
func (cache *Cache) Touch(key []byte, expireSeconds int) (err error)
func (cache *Cache) TouchedCount() (touchedCount int64)

1. Set

// Set sets a key, value and expiration for a cache entry and stores it in the cache.
// If the key is larger than 65535 or value is larger than 1/1024 of the cache size,
// the entry will not be written to the cache. expireSeconds <= 0 means no expire,
// but it can be evicted when cache is full.
func (cache *Cache) Set(key, value []byte, expireSeconds int) (err error) {
	hashVal := hashFunc(key)
	segID := hashVal & segmentAndOpVal // 通过 位运算 计算取余操作
	cache.locks[segID].Lock()
	err = cache.segments[segID].set(key, value, hashVal, expireSeconds)
	cache.locks[segID].Unlock()
	return
}
# 取余操作. 对于 2的幂次方取余 可以通过位运算的方式进行处理:
x % 256 = x & (2^8 -1)
x % 512 = x & (2^9 -1)

拿到 segment 后,

func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (err error) {
	// param check
	...
	
	now := seg.timer.Now()
	expireAt := uint32(0)
	if expireSeconds > 0 {
		expireAt = now + uint32(expireSeconds)
	}

	// uint64     32     16      8       8
	// hashValue          hash16 slotId
	slotId := uint8(hashVal >> 8)
	hash16 := uint16(hashVal >> 16)

	// slot 中每个元素是按照  hash16 升序排序的, 因此查询操作可以利用二分查找
	//从 segment 中查找数据
	slot := seg.getSlot(slotId)
	idx, match := seg.lookup(slot, hash16, key) // idx 要插入的位置,  match 是否找到  对应的 entry

	var hdrBuf [ENTRY_HDR_SIZE]byte
	hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0]))
	// 进行替换操作
	if match {
		matchedPtr := &slot[idx]
		seg.rb.ReadAt(hdrBuf[:], matchedPtr.offset)
		hdr.slotId = slotId
		hdr.hash16 = hash16
		hdr.keyLen = uint16(len(key))
		originAccessTime := hdr.accessTime
		hdr.accessTime = now
		hdr.expireAt = expireAt
		hdr.valLen = uint32(len(value))
		if hdr.valCap >= hdr.valLen {
			//in place overwrite
			atomic.AddInt64(&seg.totalTime, int64(hdr.accessTime)-int64(originAccessTime))
			seg.rb.WriteAt(hdrBuf[:], matchedPtr.offset)
			seg.rb.WriteAt(value, matchedPtr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
			atomic.AddInt64(&seg.overwrites, 1)
			return
		}
		// avoid unnecessary memory copy.
		seg.delEntryPtr(slotId, slot, idx)
		match = false
		// increase capacity and limit entry len.
		// 进行扩容
		for hdr.valCap < hdr.valLen {
			hdr.valCap *= 2
		}
		if hdr.valCap > uint32(maxKeyValLen-len(key)) {
			hdr.valCap = uint32(maxKeyValLen - len(key))
		}
	} else {
		hdr.slotId = slotId
		hdr.hash16 = hash16
		hdr.keyLen = uint16(len(key))
		hdr.accessTime = now
		hdr.expireAt = expireAt
		hdr.valLen = uint32(len(value))
		hdr.valCap = uint32(len(value))
		if hdr.valCap == 0 { // avoid infinite loop when increasing capacity.
			hdr.valCap = 1
		}
	}

	entryLen := ENTRY_HDR_SIZE + int64(len(key)) + int64(hdr.valCap)
	slotModified := seg.evacuate(entryLen, slotId, now)
	if slotModified {
		// the slot has been modified during evacuation, we need to looked up for the 'idx' again.
		// otherwise there would be index out of bound error.
		slot = seg.getSlot(slotId)
		idx, match = seg.lookup(slot, hash16, key)
		// assert(match == false)
	}
	newOff := seg.rb.End()
	// 将 偏移量(data在 ringbuffer 中的偏移量)写入  slot
	seg.insertEntryPtr(slotId, hash16, newOff, idx, hdr.keyLen)
	// 写入 ringBuffer
	...
	return
}
  1. 通过 hash(key) 找到对应的 segment
  2. 通过 hash(key) 找到对应的 slot
    • slotId := uint8(hashVal >> 8)
    • hash16 := uint16(hashVal >> 16)
  3. 找到 kv 在 slot 中要插入的 位置. idx
  • 如果对应的 key 存在,就进行替换. (如果 该位置的容量 < 要插入的kv, 就将该位置标记为 deleted, 往后面append)
  • 如果 对应的key不存在,就在 slot 后面进行 append.(如果 空间不足就扩容)

总结:
set 操作为什么高效?

  • 通过 hash(key) 计算 bucket 使用 位运算操作
  • 通过 二分查找 的方式 在 slot中查询 entry
  • key 不存在的场景:
    • 如果 ringBuffer容量充足, 就直接在 环尾部 append entry. 时间复杂度 是 O(1)
    • 如果 ringBuffer容量不足,需要将一些 key 移除掉. freeCache 通过一定的措施,保证移除key的操作时间复杂度为 O(1). entry追加操作的时间复杂度也是O(1)
  • key存在的场景(match) 找到entry索引:
    • 如果原来预留的entry容量充足.那么直接更新原来的entryHdr 和 value. 时间复杂度是 O(1)
    • 如果原来预留的entry容量不足: freecache 为了避免底层移动数组数据. 不直接对原来的entry进行扩容,而是将原来的entry标记为删除(懒删除).然后在环形缓冲区默认append 新的entry. 时间复杂度是 O(1).

2. Get

/ Get returns the value or not found error.
func (cache *Cache) Get(key []byte) (value []byte, err error) {
	hashVal := hashFunc(key)
	segID := hashVal & segmentAndOpVal
	cache.locks[segID].Lock()
	value, _, err = cache.segments[segID].get(key, nil, hashVal, false)
	cache.locks[segID].Unlock()
	return
}



func (seg *segment) get(key, buf []byte, hashVal uint64, peek bool) (value []byte, expireAt uint32, err error) {
	hdr, ptr, err := seg.locate(key, hashVal, peek)
	if err != nil {
		return
	}
	expireAt = hdr.expireAt
	if cap(buf) >= int(hdr.valLen) {
		value = buf[:hdr.valLen]
	} else {
		value = make([]byte, hdr.valLen)
	}

	seg.rb.ReadAt(value, ptr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
	if !peek {
		atomic.AddInt64(&seg.hitCount, 1)
	}
	return
}
  1. 找到 segment, 然后 找到 对应的 slot
  2. seg.locate() 找到 对应的 entryPtr,以及 对应的 entryHdr

要读取的数据在 ringBuffer中的偏移量是 ptr.offset + ENTRY_HDR_SIZR + keyLen

3. 过期 与删除

3.1. key 过期

对于过期的数据,freecache会让它继续存储在RingBuf中,RingBuf从一开始初始化之后,就固定不变了. 是否删掉数据,对RingBuf的实际占用空间不会产生影响.
当get到一个过期缓存时,freecache会删掉缓存的entry索引(但是不会将缓存从RingBuf中移除), 然后对外报ErrNotFound错误. 当RingBuf的容量不足时,会从环头开始遍历,如果key已经过期,这时才会将它删除掉. 如果一个key已经过期时,在它被freecache删除之前,如果又重新set进来(过期不会主动删除entry索引,理论上有被重新set的可能),过期的entry容量充足的情况下,则会重新复用这个entry.
freecache这种过期机制,一方面减少了维护过期数据的工作,另一方面,freecache底层存储是采用数组来实现,要求缓存数据必须连续,缓存过期的剔除会带来空间碎片,挪动数组来维持缓存数据的连续性不是一个很好的选择.

3.2 key 删除

freecache有一下两种情况会进行删除key操作:

  • 外部主动调用del接口删除key.
  • set缓存时,发现key已经存在,但是为entry预留的cap不足时,会选择将旧的数据删掉,然后再环尾追加新的数据.

freecache的删除机制也是懒删除,删除缓存时,只会删掉entry索引,但是缓存还是会继续保留在RingBuf中,只是被标记为删除,等到RingBuf容量不足需要置换缓存时,才会对标记为删除的缓存数据做最后的删除工作. freecache删除一个key,需要搜索entry索引和标记缓存数据.

4. RingBuffer

// go doc RingBuf
package freecache // import "."

type RingBuf struct {
        // Has unexported fields.
}
    Ring buffer has a fixed size, when data exceeds the size, old data will be
    overwritten by new data. It only contains the data in the stream from begin
    to end

func NewRingBuf(size int, begin int64) (rb RingBuf)
func (rb *RingBuf) Begin() int64
func (rb *RingBuf) Dump() []byte
func (rb *RingBuf) End() int64
func (rb *RingBuf) EqualAt(p []byte, off int64) bool
func (rb *RingBuf) Evacuate(off int64, length int) (newOff int64)
func (rb *RingBuf) ReadAt(p []byte, off int64) (n int, err error)
func (rb *RingBuf) Reset(begin int64)
func (rb *RingBuf) Resize(newSize int)
func (rb *RingBuf) Size() int64
func (rb *RingBuf) Skip(length int64)
func (rb *RingBuf) Slice(off, length int64) ([]byte, error)
func (rb *RingBuf) String() string
func (rb *RingBuf) Write(p []byte) (n int, err error)
func (rb *RingBuf) WriteAt(p []byte, off int64) (n int, err error)

实现一个 ringBuffer


freeCache zeroGC 的Go Cache
添加注释的代码

欢迎关注我的其它发布渠道