Category 存储

一种应用于特定场景的支持LRU的线程安全的无锁uint32->uint32 cache实现

Category 存储
Tag Go
Posted on
View

1. 前言

几年前给公司前台业务一个QPS很高的接口做了一个优化,主要请求来源是当前在线用户,接口核心逻辑就是从codis中根据一个数字查询对应的用户id(小于1亿),这两个数字的映射关系是不变的,可以理解为codis中有一个map[uint32]uint32的映射表,这个映射表只增不改。

因为接口对codis造成压力很大,因此决定在Go内存中将映射关系缓存下来,但由于这个映射表很大所以不能全部缓存中内存。因此结合业务逻辑决定引入了一个支持LRU淘汰策略的uint32 -> uint32的高性能缓存组件。

调研之后发现市面上Go的各种线程安全还支持LRU的缓存都是有锁的,性能可能受限,因此决定根据应用场景自己搞个特殊的缓存组件。

2. 实现原理

首先还是贴一下源码仓库地址: https://github.com/Orlion/intcache

2.1 结构体定义:

type IntCache struct {
	b          uint8
	buckets    [][8]uint64
	lruBuckets []uint32
}

func New(b uint8) *IntCache {
	cap := 1 << b
	return &IntCache{
		b:          b,
		buckets:    make([][8]uint64, cap),
		lruBuckets: make([]uint32, cap),
	}
}

image.png

如上图所示,一个IntCache2^bbucketlruBucket,一个bucket有8个K-V对,一个K-V对使用uint64来存储,前32bit存储key,后32bit存储value。一个lruBucket有8个lru值,采用uint32存储,每4bit存储bucket对应的每个K-V对的lru值。

这里你可能会很奇怪为什么lru要单独存储,不要急,继续往下看,读流程时我会详细解释。

2.2 写流程

func (c *IntCache) Set(key uint32, value uint32) {
        if key == 0 && value == 0 {
		panic("key and value can't be 0")
	}
	bucketi := key & (1<<c.b - 1)
	for i := 0; i < 8; i++ {
		e := atomic.LoadUint64(&c.buckets[bucketi][i])
		if e == 0 {
			atomic.StoreUint64(&c.buckets[bucketi][i], uint64(key)<<32|uint64(value))
			c.updLru(bucketi, i)
			return
		}

		if uint32(e>>32) == key {
			e = uint64(key)<<32 | uint64(value)
			atomic.StoreUint64(&c.buckets[bucketi][i], e)
			c.updLru(bucketi, i)
			return
		}
	}

	// find the min lru
	lrus := atomic.LoadUint32(&c.lruBuckets[bucketi])
	var (
		minLru uint32
		mini   int
	)

	for i := 0; i < 8; i++ {
		lru := lrus | 0b1111<<uint32(i)
		if lru < minLru {
			minLru = lru
			mini = i
		}
	}

	atomic.StoreUint64(&c.buckets[bucketi][mini], uint64(key)<<32|uint64(value))
	c.updLru(bucketi, mini)
}

写入步骤如下:

  1. key对容量取模,计算出key落到哪个桶里,然后遍历桶中8个槽(K-V对)
  2. 如果遍历槽为0,说明这个槽还没有被占用,写入当前槽,并更新lru值为7,其他槽的lru值-1
  3. 如果遍历槽的key等于当前的key,则更新这个槽的值,并更新lru值为7,其他槽的lru值-1
  4. 如果遍历完后没有空槽也没有命中key,则找到lru值最小的,淘汰掉然后写入新key并更新lru值

2.3 读流程

func (c *IntCache) Get(key uint32) (value uint32, exists bool) {
	bucketi := key & (1<<c.b - 1)
	for i := 0; i < 8; i++ {
		e := atomic.LoadUint64(&c.buckets[bucketi][i])
                if e == 0 {
			break
		}

		if uint32(e>>32) == key {
			value = uint32(e)
			exists = true
			c.updLru(bucketi, i)
			break
		}
	}

	return
}

读取步骤如下:

  1. key对容量取模,计算出key落到哪个桶里,然后遍历桶中8个槽(K-V对)
  2. 如果遍历到的槽为0,说明后面的槽都是没有数据的,无需继续遍历
  3. 如果遍历到的槽的key等于查询的key,则返回value,并更新lru值

3. 总结

3.1 为什么lru要单独存储

每个bucket占用8*8=64B,正好是一个x86 cpu cacheline的大小,刚好填满一个cacheline,这样遍历bucket上8个槽实际只需要读取第一个槽时访问一次内存,后续访问都会直接从cpu cache中读到(当然前提是没有写请求造成cacheline过期),这样可以充分利用cpu缓存。

如果lru值与bucket存储在一起,那么系统中大量的读请求修改lru值就会造成cacheline过期的可能性就会变大,而如果分开存储,读请求不会造成cacheline过期。

你可能会问频繁的写入也会造成cacheline过期影响性能啊,但是我们这是一个典型的读多写少的系统,而且大量的bucket也降低了cacheline过期的几率。

3.2 缺陷

3.2.1 适用场景有限

由于我这个组件用在了在线用户访问的场景中,我将bucket数量设置为日活人数/8,hash冲突的几率还是比较小的,从监控看缓存命中率还是比较可观的。

但是由于不是严格的LRU,因此其他业务场景可能不适用。

3.2.2 value与lru值的更新不是原子的

因为要提高cpu cache命中率,因此value更新与lru更新是分离的,无法做到原子性,这也是很不严谨的,但是我们这个业务场景中不需要严谨的lru,所以可以忽略。

4. 基准测试

与fastcache对比的基准测试代码

func BenchmarkIntcache(b *testing.B) {
	rand.Seed(1)
	var B uint8 = 21
	m := New(B)
	for i := 0; i < b.N; i++ {
		intcacheBenchmarkFn(m)
	}
}

func BenchmarkFastcache(b *testing.B) {
	rand.Seed(1)
	m := fastcache.New(1 << 21 * (64 + 8))
	for i := 0; i < b.N; i++ {
		fastcacheBenchmarkFn(m)
	}
}

func intcacheBenchmarkFn(m *IntCache) {
	wg := &sync.WaitGroup{}
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func() {
			for j := 0; j < 300; j++ {
				key := rand.Uint32()
				m.Set(key, key)
				m.Get(key)
			}
			wg.Done()
		}()
	}
	wg.Wait()
}

func fastcacheBenchmarkFn(m *fastcache.Cache) {
	wg := &sync.WaitGroup{}
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func() {
			for j := 0; j < 300; j++ {
				key := rand.Uint32()
				b := make([]byte, 4) // uint64的大小为8字节
				binary.LittleEndian.PutUint32(b, key)
				m.Set(b, b)
				r := make([]byte, 4)
				m.Get(r, b)
			}
			wg.Done()
		}()
	}
	wg.Wait()
}

都是1000个协程并发读写300次,结果:

goos: darwin
goarch: arm64
pkg: github.com/Orlion/intcache
BenchmarkIntcache
BenchmarkIntcache-10                  14          78865301 ns/op
BenchmarkFastcache
BenchmarkFastcache-10                 10         113746746 ns/op
PASS
ok      github.com/Orlion/intcache      9.767s

可以看到我们这个实现要快一点。 image.png

...

阅读全文 »

bitcask的设计与实现

Category 存储
Tag LSM
Posted on
View

背景

最近在研究LSM tree,听闻bitcask在LSM tree各种各样的应用中是一个比较简单的实现,所以就以它为突破口,了解下LSM tree真实世界的实现。

bitcask存储模型由Riak提出,github上有各种语言的实现,本人挑选了一个golang版本的实现来进行研究,源码地址是:git.mills.io/prologic/bitcask,学习过程中我添加了一些注释,有需要的同学可以参考下:github.com/Orlion/bitcask

存储模型

与LSM tree的基本思想一样,bitcask中所有增删改操作都是追加写磁盘中的datafile,其数据结构如图: image.png

实际文件中是没有换行的,每个entry都是与前一个entry紧密串联在一起的,这里只是为了体现出来一个一个的entry。

datafile由一个一个的entry组成,每个entry的前4个字节存储key的size,第二个8字节存储value的size,然后顺序写入key和value,再写入校验和和过期时间的unix时间戳

datafile写入完成后可以得到新写入项的offset,然后将该key对应的offset与写入的数据项的size写入到内存的索引中,prologic/bitcask索引使用了artAdaptive Radix Tree(自适应前缀树)作为索引的数据结构,虽然不如hash表查找速度快,但因为是树状结构所以可以支持范围查找。

当datafile写入到一定的大小时会创建一个新的可读可写的datafile,在此之后的新数据会写入到这个新datafile中,老datafile会被设置为只读。因此一个bitcask实例会有多个datafile,所以索引中还必须存储key所在文件的id。

数据结构

image.png

  • path指向bitcask的工作目录,即各种文件的存放目录
  • curr指向当前可读写的datafile,数据都写入到该文件中
  • datafiles 为bitcask持有的所有datafile map,其中key为文件id
  • trie和ttlIndex指向内存中的索引树
  • isMerging标记当前是否在进行Merge

删除/修改key

上面提到bitcask中删除修改数据也是顺序写磁盘,那么写入的是什么样的数据呢?

实际上,bitcask中修改数据与写入数据是同一个api,即都是Put(key, value []byte),所以修改key也是往datafile中追加写一个新entry,不同是会修改索引中key的指向为最新数据项在文件中的位置。

而删除数据其实就是put(key, []byte{})即向datafile写入空字节切片,写完之后会删除索引中的key。

查找key

get(key)时会先从内存的索引树中根据key找到key所在的文件id和offset以及size,然后通过mmap到对应datafile文件中offset处拉取entry,然后根据前12个字节处的key len与value len数据指示解析出key和value,检查下校验和,自此数据检索完成。

Merge过程

由于bitcask中增删改都是追加写文件,不可避免的磁盘占用会越来越多,所以需要在合适的时机执行merge操作,将old entry和deleted entry从磁盘中清理掉。

bitcask Merge的过程如下:

  1. 加写锁,判断当前是否有其他线程在执行merge,如果有则退出,如果没有则标记isMerging为true,解锁继续执行
  2. 加读锁,禁止数据写入,但是可以读
  3. 当前datafile刷盘,然后关闭当前datafile,将当前dafafile创建为只读datafile加到bitcask实例的datafile列表中,再创建一个新的当前可读写datafile,新的当前文件不执行merge
  4. 释放读锁
  5. 在工作目录下创建merge子目录,以merge目录为工作目录创建merge用的bitcask实例:mdb
  6. 遍历当前bitcask实例索引中的所有key,如果k在要merge的datafiles列表中的话则将k/v写入到mdb中,完成后关闭mdb
  7. 加写锁,禁止读写
  8. 关闭当前bitcask实例
  9. 删除当前工作目录中的所有文件
  10. 通过rename将mdb工作目录中的所有文件挪到当前工作目录下
  11. 重新打开实例
  12. isMerging标记为false,释放写锁,此时可进行读写

索引持久化

上文提到索引是存储在内存中的,这样的话进程重启后索引就需要重新构建,如果数据量多的话,可想而知进程启动得多慢。

所以bitcask中会在以下几个时机将内存中的索引持久化到磁盘中:

  1. bitcask实例关闭时
  2. 创建新的datafile之后

索引持久化流程

  1. 在工作目录中创建临时索引文件temp_index
  2. 遍历art索引树将节点的 k/item 写入到文件中
  3. temp_index文件rename为index

索引文件的使用

创建bitcask实例时,会检查工作目录下的索引文件,如果有索引文件,会将索引文件加载到内存中生成art索引树,然后判断索引文件是否是最新的,即索引文件生成后有没有新数据写入,如果不是最新的还需要从最新的datafile中读取数据到索引中。

如果没有索引文件则会遍历所有的datafile,遍历所有数据来构建索引。

总结

可以看到bitcask的实现还是非常简单(lou)的。put(k, v)加了全局锁,锁粒度较粗,并发读写性能应该不是很强。merge的过程要遍历所有的datafile,还要创建新文件,所以对系统的IO压力应该也比较大。

...

阅读全文 »