文章目录
- 本系列
- 前言
- 设计
- 索引和数组
- 怎么判断是否被覆盖
- 其他问题
- 源码走读
- 数据结构
- set
- get
- 总结
本系列
- 本地缓存库分析(一):golang-lru
- 本地缓存库分析(二):bigcache
- 本地缓存库分析(三):freecache
- 本地缓存库分析(四):fastcache(本文)
- 本地缓存库分析(五):groupcache(未完待续)
前言
什么是fastcache?是一款go实现的本地缓存库,具有以下特点:
- 支持高并发
- 支持存储大量数据,但对GC压力小
- 严格内存限制
- 代码非常简单,易于理解
本文将剖析其设计及代码实现
源码:https://github.com/VictoriaMetrics/fastcache,版本:v1.12.2
设计
索引和数组
和bigcache的设计很类似,fastcache将整个缓存分为512个bucket,每个bucket都自己的锁,单独控制并发
完整的KV数据放在bucket的底层数组中,用 idx
记录下次要从哪个位置开始写
底层数组可以被循环使用,写满之后从头开始写,这样被覆盖的部分就失效了
字段 gen
代表当前是从头开始写的第几轮。每次写满了要从头开始写时,gen++
]
每个bucket用 map[uint64]uint64
类型作为索引,key为真实key的hash值,value的高24位存储被写入时,bucket的gen值,低40位存储在底层数组中的位置
为啥要存gen值?用于判断是否被覆盖,下一节会详细说明
底层数据被划分成N个chunk,每个chunk的大小 固定为64KB
为啥要切分成很多小数组,而不像bigCache,freeacache一样用一个大数组装下所有数据呢?
官方说法是能减少内存碎片。我理解是能减少外部碎片,如果都是一次性分配一片很大的空间,最后会因为要分配N个字节连续的空间,剩下的总空间够用,大于N,但没有连续的一片空间,导致内存无法分配的窘境,而每次分配一片小空间能解决外部碎片的问题
但这么做带来的代价就是指针数量增加,但增加的个数有限,为chunk的数量。根据官方的说法,64GB 的数据会多100万(64GB/64KB=100万)个指针,而同样的数据规模,用常规的map[string][]byte做缓存需要10亿个指针,因此这点开销还是比较小的
现在已知idx,怎么定位底层数据entry到在哪个chunk呢?
chunkIdx := idx / chunkSize
计算在该chunk的哪个位置:
idx % chunkSize
entry在底层数据中的内存布局为:2个字节key长度,2个字节val长度,然后是key,value
如果一行chunk剩余空间不够写,会从下一行chunk开头再写
怎么判断是否被覆盖
一旦最后一个chunk被写满后,就会从idx=0位置,也就是第0个chunk的开头位置开始写。这里可以看出 内存淘汰方式为FIFO
那么在Get时就需要有机制知道某个key是否被覆盖了,以免获取到非法的数据
bigcache,freecache在覆盖老数据时,用了特殊的技巧知道老entry的开始位置,也就能去删除该entry的索引
但fastcache中并没有这么做,而是在Get时惰性判断,并巧妙地利用了在写entry那一时刻的 bucket.gen
信息
以下2种情况,entry是可用的:
entry和bucket的gen相同,且entry的idx < bucket的idx
,说明entry和bucket在同一轮,肯定没被覆盖entry的gen比b的gen小1,且entry的idx >= bucket的idx
,说明bucket比entry大一轮,已经发生了重写,但还没覆盖到entry所在的位置,该entry也是合法的
其他任何情况entry都是非法的
entry和bucket的gen相同:合法
entry和bucket的gen有的相同,有的不同,就看entry的idx在bucket的idx之前还是之后:
还有个问题,entry中的gen只有24位,如果bucket的gen一直增加,发生溢出后,和entry中存的gen相同了咋办?那上面的判定岂不是会出错
fastcache考虑到了这个问题,在每一轮开始时,都会检查一遍所有KV,将已失效的从索引m中删除
其他问题
-
hash冲突:fastcache没有解决hash冲突的问题,一旦遇到hash冲突,老的数据就会被覆盖
- 不过不用太担心,因为hash值有64位,hash冲突的概率非常低
-
过期时间:fastcahce不支设置过期时间。用户可以将过期时间塞到value中,在get时手动检查
源码走读
上面对fastcache的整体设计进行了详细分析,下面进行源码走读
数据结构
type bucket struct {mu sync.RWMutex// 一堆chunk,每个chunk大小固定为64KB// 当一条KV在chunk剩余空间放不下时,会从新的chunk开始写,有一定空间浪费chunks [][]byte// key:hash值,value:写入chunks的idx,和gen的或值m map[uint64]uint64// 指向下一个要写入的位置idx uint64// 表示chunks写满后,从头开始循环的次数gen uint64
}
初始化bucket,就是计算需要多少个chunk,并分配空间:
const chunkSize = 64 * 1024func (b *bucket) Init(maxBytes uint64) {// 根据maxBytes计算需要多少个chunkmaxChunks := (maxBytes + chunkSize - 1) / chunkSizeb.chunks = make([][]byte, maxChunks)b.m = make(map[uint64]uint64)b.Reset()
}
set
- 从idx开始写
- 如果当前chunk满了,从下个chunk开始写
- 如果最后一个chunk也满了,从头开始写,并递增bucket.gen
func (b *bucket) Set(k, v []byte, h uint64) {atomic.AddUint64(&b.setCalls, 1)// KV的长度都必须在2个字节能装下// 也就是value最多64kbif len(k) >= (1<<16) || len(v) >= (1<<16) {return}var kvLenBuf [4]bytekvLenBuf[0] = byte(uint16(len(k)) >> 8)kvLenBuf[1] = byte(len(k))kvLenBuf[2] = byte(uint16(len(v)) >> 8)kvLenBuf[3] = byte(len(v))// KV的长度 + K的长度 + V的长度,必须 < 64KBkvLen := uint64(len(kvLenBuf) + len(k) + len(v))if kvLen >= chunkSize {return}chunks := b.chunksneedClean := falseb.mu.Lock()idx := b.idxidxNew := idx + kvLen// 现在idx在哪个chunkchunkIdx := idx / chunkSize// 写完后idx在哪个chunkchunkIdxNew := idxNew / chunkSize// 用了新的chunkif chunkIdxNew > chunkIdx {// 到头了,要从第一个chunk开始写if chunkIdxNew >= uint64(len(chunks)) {idx = 0idxNew = kvLenchunkIdx = 0// 新的一轮b.gen++// 如果gen超过了24位,需要++,也就是在24位中不能有0if b.gen&((1<<genSizeBits)-1) == 0 {b.gen++}needClean = true} else {// 新的一行开始位置idx = chunkIdxNew * chunkSize// 写完后idxNew在哪idxNew = idx + kvLenchunkIdx = chunkIdxNew}chunks[chunkIdx] = chunks[chunkIdx][:0]}// 惰性初始化空间chunk := chunks[chunkIdx]if chunk == nil {chunk = getChunk()chunk = chunk[:0]}// 写KV的长度chunk = append(chunk, kvLenBuf[:]...)// 写kchunk = append(chunk, k...)// 写vchunk = append(chunk, v...)chunks[chunkIdx] = chunk// 将idx和当前的gen组合成索引的valueb.m[h] = idx | (b.gen << bucketSizeBits)b.idx = idxNewif needClean {b.cleanLocked()}b.mu.Unlock()
}
当开始新的一轮后,需要清除所有失效KV的索引:
func (b *bucket) cleanLocked() {bGen := b.gen & ((1 << genSizeBits) - 1)bIdx := b.idxbm := b.mnewItems := 0for _, v := range bm {// v的gengen := v >> bucketSizeBits// v的idxidx := v & ((1 << bucketSizeBits) - 1)// b的gen比v的gen大1,且v的idx在b的idx后面,说明v还没被覆盖if (gen+1 == bGen || gen == maxGen && bGen == 1) && idx >= bIdx ||// v的gen和b的gen相同,且v的dix小于b的idx,也说明v还没被覆盖gen == bGen && idx < bIdx {newItems++}}// 到这说明有item被覆盖了if newItems < len(bm) {bmNew := make(map[uint64]uint64, newItems)for k, v := range bm {// v的gengen := v >> bucketSizeBits// v的idxidx := v & ((1 << bucketSizeBits) - 1)// 将没有被覆盖的KV重新建个索引,并替换原有索引,这样失效的KV就没索引,再也找不到了if (gen+1 == bGen || gen == maxGen && bGen == 1) && idx >= bIdx ||gen == bGen && idx < bIdx {bmNew[k] = v}}b.m = bmNew}
}
get
- 从索引中,根据key的hash值拿到entry的idx和gen
- 根据idx个gen判断是否是否被覆盖,如果没被覆盖,去chunks找到数据返回
func (b *bucket) Get(dst, k []byte, h uint64, returnDst bool) ([]byte, bool) {atomic.AddUint64(&b.getCalls, 1)found := falsechunks := b.chunksb.mu.RLock()v := b.m[h]// bucket的genbGen := b.gen & ((1 << genSizeBits) - 1)if v > 0 {// v的gen// v的高24位:gen,低40位:indexgen := v >> bucketSizeBitsidx := v & ((1 << bucketSizeBits) - 1)// v和b的gen相同,且v的idx < b的idx,说明是合法的if gen == bGen && idx < b.idx ||// v的gen比b的gen小1,且v的idx >= b的idx,说明还没覆盖到v,v也是合法的gen+1 == bGen && idx >= b.idx ||// 此时也是:v的gen比b的gen小1,且v的idx >= b的idx,说明还没覆盖到v,v也是合法的gen == maxGen && bGen == 1 && idx >= b.idx {// 在第几个chunkchunkIdx := idx / chunkSize// 定位到chunkchunk := chunks[chunkIdx]// 定位到在该chunk的哪个位置idx %= chunkSize// 接下来4个字节是kv的长度kvLenBuf := chunk[idx : idx+4]keyLen := (uint64(kvLenBuf[0]) << 8) | uint64(kvLenBuf[1])valLen := (uint64(kvLenBuf[2]) << 8) | uint64(kvLenBuf[3])idx += 4// key值匹配if string(k) == string(chunk[idx:idx+keyLen]) {idx += keyLenif returnDst {// 读取value,返回dst = append(dst, chunk[idx:idx+valLen]...)}found = true} else {atomic.AddUint64(&b.collisions, 1)}}}b.mu.RUnlock()if !found {atomic.AddUint64(&b.misses, 1)}return dst, found
}
总结
最后看看fastcache解决了哪些原生缓存的问题:
问题 | 解决 |
---|---|
锁竞争严重 | 解决,用了多个segment |
大量缓存写入,导致gc标记阶段占用cpu多 | 解决,指针数量约等于 总数据量/64KB 个 |
内存占用不可控 | 解决,底层数组占用空间是固定的。索引占用的不固定,但索引占用空间小 |
不支持缓存按时效性淘汰 | 按FIFO淘汰 |
不支持缓存过期 | 不支持 |
缓存数据可以被污染 | 解决,使用序列化后的字节数组 |