Abell Studio

这世界没有一件事情是虚空而生的。站在光里,背后就会有阴影,这深夜一片寂静,是因为你还没有听见声音。

golang sync.Map实现原理解析

本文Golang版本为go version go1.14 darwin/amd64

前言

Go中的map不是并发安全的,因此Go提供了sync.Map以在并发编程中使用,sync.Map是针对以下两种场景优化的:

  1. 某个指定的key只会被写入一次,但是会被读取多次,像不断增长的cache
  2. 多个goroutine读、写、覆盖不同的key

对于这两种场景,sync.Map比map配合Mutex/RWMutex可以大大降低锁的竞争。

数据结构

我们先看下sync.Map的定义

type Map struct {
	mu Mutex // 锁
	read atomic.Value // readOnly 读的时候先从read读
	dirty map[interface{}]*entry
	misses int // 当read没读到的时候+1(无论dirty中有没有)
}

read实际是readOnly结构体的一个实例,其定义:

type readOnly struct {
	m       map[interface{}]*entry
	// true表示dirty里存在read没有的key
	// 只有两种情况下amended为false
	// 1. map刚初始化的时候
	// 2. misses>=len(dirty)会将dirty赋值给read,同时dirty置为nil,amended置为false
	amended bool
}

entry定义:

type entry struct {
	p unsafe.Pointer
}

expunged定义:

var expunged = unsafe.Pointer(new(interface{}))

p的状态有以下三种:

状态 说明
nil entry已被删除,且dirty==nil
expunged entry已被删除,但dirty!=nil且dirty不存在该entry
其他 entry有效,并且存在于m.read中,如果m.dirty!=nil,则dirty中也存在该entry

基本原理

  • 通过read和dirty两个字段进行读写分离,read只用来读,将最新写入的数据存在dirty上
  • 读取时会先查询read,没有再去读dirty,写入则只写入dirty
  • 读read不需要加锁,读写dirty需要加锁
  • 通过misses字段来记录read被穿透的次数,穿透一定的次数则将dirty赋值给read
  • 删除为标记删除

写入

我们先看下写入即Store()方法 image.png

func (m *Map) Store(key, value interface{}) {
	// 将m.read原子读出
	read, _ := m.read.Load().(readOnly)
	// 如果read中存在要写入的key,则尝试修改
	if e, ok := read.m[key]; ok && e.tryStore(&value) {
		// 如果entry是expunged说明该entry已被删除,则不修改
		// 如果不是expunged则修改为新值,修改成功直接return
		// 此时最新值存在于read中,m.dirty不存在或者不是最新值
		return
	}

	// 走到这有两种可能
	// 1. read中没有我们要修改的key
	// 2. read中有我们要修改的key,但是已被删除
	m.mu.Lock()
	// 重新读取一遍
	read, _ = m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok {
		// 如果read中存在key的话,又走到这里的话entry基本是expunged
		// 将entry通过由expunded修改为nil
		if e.unexpungeLocked() {
			// 如果修改成功的话说明entry确实是expunged
			// 说明该dirty!=nil且entry不在dirty中
			// 将此entry加到dirty中
			// 此时dirty[key]与read[key]指向同一个entry
			// 下面修改entry值就会将read和dirty一并修改了
			m.dirty[key] = e
		} 
		// 如果修改失败的话说明在加锁之前entry由expunged被修改为了其他值
		// 可能是在另一个也是修改该key的线程中在加锁前抢先执行了if里面的代码		
		// 所以这时dirty中已经有这个key了

		// 将新值写入entry
		// read与dirty都得到了更新
		e.storeLocked(&value)
	} else if e, ok := m.dirty[key]; ok {
		// 如果read中不存在但dirty中存在的话
		// 将新值写入到dirty中的entry中
		e.storeLocked(&value)
	} else {
		// 如果read和dirty中都没有
		if !read.amended {
			// amended=false有两种可能性:
			// 1. map初始化后还没有写入值
			// 2. misses>=len(dirty),dirty被赋值给了read同时置为了nil
			// 创建新dirty并将read拷贝到新的dirty中
			m.dirtyLocked()
			// dirty拥有read全部数据,更新read.amended为true
			m.read.Store(readOnly{m: read.m, amended: true})
		} // amended=true说明dirty中存在read中没有的key
		// 向dirty中写入一个新entry
		// 此时新entry只存在于dirty中,read中没有
		m.dirty[key] = newEntry(value)
	}
	m.mu.Unlock()
}

func (e *entry) tryStore(i *interface{}) bool {
	for {
		p := atomic.LoadPointer(&e.p)
		if p == expunged {
			return false
		}
		// 将entry中的值替换为新值
		if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
			return true
		}
	}
}

func (e *entry) unexpungeLocked() (wasExpunged bool) {
	return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}

func (e *entry) storeLocked(i *interface{}) {
	atomic.StorePointer(&e.p, unsafe.Pointer(i))
}

// 将read数据复制到dirty中,删除数据除外
func (m *Map) dirtyLocked() {
	if m.dirty != nil {
		return
	}

	read, _ := m.read.Load().(readOnly)
	m.dirty = make(map[interface{}]*entry, len(read.m))
	for k, e := range read.m {
		// 如果p是nil修改为expunged
		// 如果p是expunged则不复制到dirty
		if !e.tryExpungeLocked() {
			m.dirty[k] = e
		}
	}
}

// 如果p是nil的修改为expunged,返回e是否是expunged
func (e *entry) tryExpungeLocked() (isExpunged bool) {
	p := atomic.LoadPointer(&e.p)
	for p == nil {
		if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
			return true
		}
		p = atomic.LoadPointer(&e.p)
	}
	return p == expunged
}

删除Delete()

image.png

func (m *Map) Delete(key interface{}) {
	read, _ := m.read.Load().(readOnly)
	e, ok := read.m[key]
	if !ok && read.amended {
		// 如果read中不存在要删除的key,且dirty中存在read中不存在的key
		// 则从dirty中删除key
		m.mu.Lock()
		read, _ = m.read.Load().(readOnly)
		e, ok = read.m[key]
		if !ok && read.amended {
			delete(m.dirty, key)
		}
		m.mu.Unlock()
	}
	if ok {
		// 如果read中存在,则将read中key对应的value修改为nil
		e.delete()
	}
}

// 把e.p改成nil,如果p为nil/expunged则不修改并返回false
func (e *entry) delete() (hadValue bool) {
	for {
		p := atomic.LoadPointer(&e.p)
		if p == nil || p == expunged {
			return false
		}
		if atomic.CompareAndSwapPointer(&e.p, p, nil) {
			return true
		}
	}
}

查找Load()

image.png

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
	read, _ := m.read.Load().(readOnly)
	e, ok := read.m[key]
	if !ok && read.amended {
		m.mu.Lock()
		read, _ = m.read.Load().(readOnly)
		e, ok = read.m[key]
		if !ok && read.amended {
			e, ok = m.dirty[key]
			m.missLocked()
		}
		m.mu.Unlock()
	}
	if !ok {
		return nil, false
	}
	return e.load()
}

func (m *Map) missLocked() {
	m.misses++
	if m.misses < len(m.dirty) {
		return
	}
	m.read.Store(readOnly{m: m.dirty})
	m.dirty = nil
	m.misses = 0
}

func (e *entry) load() (value interface{}, ok bool) {
	p := atomic.LoadPointer(&e.p)
	if p == nil || p == expunged {
		return nil, false
	}
	return *(*interface{})(p), true
}

Range()和LoadOrStore()方法的逻辑都比较简单这里就不再赘述了,看懂了上面的代码基本就能看懂。

阅读全文 »

浅入深出golang map的实现

为什么要研究map的实现

对于这个问题,我的回答有以下几个方面:

  1. 了解工业级map的另一种实现方案,增加自己的经验,可能对自己日后的工作有借鉴意义
  2. 只有了解了其实现原理才能清楚其性能瓶颈在哪,开发过程中尽量去避免

map的基本原理

map的底层结构是一个哈希表,map中的键值对被分配到了一个bucket数组上,每个bucket包含8个键值对。hash值的二进制低位部分被用来选择bucket,每个bucket存储了hash值的高位部分以用来在单个bucket上做区分。如果有超过8个key被hash到了同一个bucket上,那么就用链表连接到扩展bucket上。

底层数据结构

map 对应的结构体,一个map实际是hmap指针

type hmap struct {
	count     int // map中元素个数,即len(map)
	flags     uint8 // map的标记
	B         uint8 //bucket数量为2^B个
	noverflow uint16 // 溢出bucket的近似数量
	hash0     uint32 // hash seed 每次创建map都会生产一个随机的种子,提高hash碰撞攻击门槛
	buckets    unsafe.Pointer // bucket数组的地址,如果count=0,则有可能是nil
	oldbuckets unsafe.Pointer // 
	nevacuate  uintptr        // 指示扩容进度,小于此地址的buckets迁移完成
	extra *mapextra // optional fields
}

其中flags有以下几种标记:

常量名 对应二进制 说明
iterator 1 00000001 标记有迭代器在迭代buckets
oldIterator 2 00000010 标记有迭代器在迭代oldbuckets
hashWriting 4 00000100 标记当前正在写hashmap
sameSizeGrow 8 00001000 标记此次扩容容量没有变化只是重新分配元素,当扩容是由于溢出桶太多导致时会加此标记

bucket对应的结构体如下,源码中是没有keys、values、overflow这三个字段的,但实际运行过程内存中对应位置会有这三个字段

type bmap struct {
	tophash [8]uint8 // 如果<5表示存的是状态,>=5存的是hash值高8位
	// keys [8]keytype
	// values [8]valuetype // 之所以将key和value单独放在一起是因为内存对齐会浪费很多内存
	// overflow uintptr // 溢出桶地址
}

tophash时存的是状态,共有以下几种状态:

|常量名|值|说明| |——|——|——|——| |emptyRest|0|当前槽是空的,并且接下来的槽也是空的| |emptyOne|1|当前槽是空的,其他槽不一定| |evacuatedX|2|当前槽有效,但是已经被迁移到了扩容后buckets数组的上半区| |evacuatedY|3|当前槽有效,但是已经被迁移到了扩容后buckets数组的下半区| |evacuatedEmpty|4|当前槽是空的,已经被迁移了|

mapextra对应的结构体如下:

type mapextra struct {
	// The indirection allows to store a pointer to the slice in hiter.
	// 如果k/v都不包含指针,并且可以被inline,那么我们标记桶不含指针,这样避免gc扫描整个map
	// 然而bmap结构体的overflow字段是一个指针,为了保证溢出桶不被回收,我们用overflow存储所有hmap.buckets上的所有溢出桶,oldoverflow存储hmap.oldbuckets上的所有溢出桶
	overflow    *[]*bmap
	oldoverflow *[]*bmap

	// 创建bucket数组时会预先分配一些溢出桶出来,nextOverflow为这部分桶的第一个桶
	nextOverflow *bmap
}

它们之间的关系可以用下面这张图来表示: image.png

map的创建

在Go中通常使用make来创建一个map,像这样:

make(map[ktype]vtype, hint)

而在Go源码中提供了三个创建map的函数,它们分别是:

func makemap_small() *hmap
func makemap64(t *maptype, hint int64, h *hmap) *hmap
func makemap(t *maptype, hint int, h *hmap) *hmap
  1. 如果不指定hint或者hint(bucketCnt,即bucket上的元素数)时Go会通过runtime.makemap_small()来创建
  2. 如果hint为int64使用runtime.makemap64()来创建,但里面还是调用的runtime.makemap()
  3. 否则通过调用runtime.makemap()来创建

makemap_small()的代码非常简单,就是new一个hmap对象,然后生成hash种子:

func makemap_small() *hmap {
	h := new(hmap)
	h.hash0 = fastrand()
	return h
}

而makemap()的过程就相对复杂了,步骤为:

  1. 创建hmap结构体,并生成hash seed
  2. 通过hint找到bucket数组的长度,其实就是找到B
  3. 如果B不为0,那么就为hmap预创建bucket数组
  4. 如果B>=4我们认为大概率会有溢出桶,所以会预先分配2^(b-4)个溢出桶出来,然后将hmap.extra.nextOverflow指向预分配的第一个溢出桶 // t为map的类型信息, hint即make()的第二个参数 func makemap(t *maptype, hint int, h *hmap) *hmap { // 计算所需内存 mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size) if overflow || mem > maxAlloc { // 如果溢出则将初始化大小修改为0 hint = 0 } // 初始化Hmap if h == nil { h = new(hmap) } // 生成hash seed, fastrand()这里不做详解,看注释是实现了某篇论文... h.hash0 = fastrand() // 找到一个 B,使得 map 的负载因子在正常范围内。 B := uint8(0) for overLoadFactor(hint, B) { // overLoadFactor(hint, B) 等价于 hint > 2^B*6.5 B++ } h.B = B if h.B != 0 { var nextOverflow *bmap // 预分配bucket数组 h.buckets, nextOverflow = makeBucketArray(t, h.B, nil) if nextOverflow != nil { // 如果预分配了溢出桶,则用h.extra.nextOverflow存第一个溢出桶的指针 h.extra = new(mapextra) h.extra.nextOverflow = nextOverflow } } return h }

makeBucketArray的源码

// dirtyalloc要么是nil要么是之前由makeBucketArray使用相同的t和b创建出来的bucket数组
// 如果dirtyalloc为nil,那么就会创建出一个新数组,如果不为nil则会清空掉之前的dirtyalloc然后重用这部分内存来创建新数组
// 如果b>=4我们认为大概率会有溢出桶,所以会预先分配2^(b-4)个溢出桶出来,nextOverflow即为这部分桶的第一个桶
func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
	base := bucketShift(b)
	nbuckets := base
	if b >= 4 {
		// 当b>=4时大概率会有溢出桶,所以这里预先分配一些溢出桶出来
		nbuckets += bucketShift(b - 4)
		sz := t.bucket.size * nbuckets
		up := roundupsize(sz)
		if up != sz {
			nbuckets = up / t.bucket.size
		}
	}

	if dirtyalloc == nil {
		buckets = newarray(t.bucket, int(nbuckets))
	} else {
		buckets = dirtyalloc
		size := t.bucket.size * nbuckets
		// 清空dirtyalloc
		if t.bucket.ptrdata != 0 {
			memclrHasPointers(buckets, size)
		} else {
			memclrNoHeapPointers(buckets, size)
		}
	}

	if base != nbuckets {
		// 进到这里说明上面申请了一些溢出桶
		// nextOverflow为第一个溢出桶的地址
		nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
		// last为最后一个溢出桶的地址
		last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
		// 将最后一个溢出桶的overflow设置为buckets数组的第一个元素
		// 之所以这么搞是为了方便判断h.extra.nextOverflow是不是最后一个预分配的溢出桶
		// 如果没理解的话看下下面的newoverflow函数😄
		last.setoverflow(t, (*bmap)(buckets))
	}
	return buckets, nextOverflow
}

赋值

Go中编译器会针对不同类型的key调用不同的赋值函数,它们分别是:

func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer
func mapassign_faststr(t *maptype, h *hmap, s string) unsafe.Pointer
func mapassign_fast32(t *maptype, h *hmap, key uint32) unsafe.Pointer
func mapassign_fast32ptr(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer
func mapassign_fast64(t *maptype, h *hmap, key uint64) unsafe.Pointer
func mapassign_fast64ptr(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer

它们的实现相差不大,我们只理解mapassign()的实现就行了,mapassign()赋值的步骤如下:

  1. 求key的hash值,找到其对应的bucket
  2. 如果正在扩容的话,需要将该bucket迁到新数组中
  3. 遍历bucket及其链接的溢出桶上的所有槽,如果找到了key,则接下来将新值赋到该槽上
  4. 如果没有找到key,则写到第一个空槽上
  5. 如果没有空槽则创建出来一个新的溢出桶出来,将新键值对写到这个新溢出桶上
  6. 如果要没有找到key或者需要创建新溢出桶的话还需要判断是否需要扩容,如果需要的话就先进行扩容,再重复步骤2
  7. 将键值对写到查询到的位置 func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer { ... // 求key的hash值 hash := t.hasher(key, uintptr(h.hash0)) ... if h.buckets == nil { // 如果map的bucket数组还没有创建则创建出来 h.buckets = newobject(t.bucket) } again: // 获取hash值的低B位 bucket := hash & bucketMask(h.B) if h.growing() { // 如果map正在扩容则调用growWork迁移bucket growWork(t, h, bucket) } // 根据低B位找到对应的bucket b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + bucket*uintptr(t.bucketsize))) // 取hash值的高8位,如果<5则再加上5 top := tophash(hash) var inserti *uint8 var insertk unsafe.Pointer var elem unsafe.Pointer bucketloop: for { for i := uintptr(0); i < bucketCnt; i++ { if b.tophash[i] != top { // tophash不匹配 if b.tophash[i] <= emptyOne && inserti == nil { // 如果槽是空的,则记录下这个位置 inserti = &b.tophash[i] insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize)) } if b.tophash[i] == emptyRest { // 当前槽是emptyRest,则之后的槽肯定也是空的,就不用往后查了 break bucketloop } continue } // tophash一致的话再判断key是否能匹配上 k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) if t.indirectkey() { k = *((*unsafe.Pointer)(k)) } if !t.key.equal(key, k) { // 如果key没有匹配上则继续查询 continue } // 找到了key,直接更新就行 if t.needkeyupdate() { typedmemmove(t.key, k, key) } elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize)) // 找到了key直接跳出循环 goto done } // 再遍历溢出桶 ovf := b.overflow(t) if ovf == nil { break } b = ovf } // 走到这里说明,当前map上没有指定的key,所以就需要写入一个新的键值对 // 如果写入新的键值对后负载系数>6.5 或者有太多溢出桶的话,就需要扩容 // 当然了,如果正在扩容中就不用再扩容了 // 溢出桶过多会严重影响查询效率所以需要扩容 if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) { hashGrow(t, h) // 扩容后需要重新找一个新的位置 goto again } if inserti == nil { // 当map上所有槽都满的情况下,上面查到的插入位置就是空了 // 所以加一个溢出桶承载新添加的键值对 newb := h.newoverflow(t, b) inserti = &newb.tophash[0] // 槽上k的位置 insertk = add(unsafe.Pointer(newb), dataOffset) // 槽上v的位置 elem = add(insertk, bucketCnt*uintptr(t.keysize)) } // 在插入位置写入新的键值对 if t.indirectkey() { // t.indirectkey()=true说明bucket存储的是key的指针 // 为key申请一块内存并返回该块内存的地址 kmem := newobject(t.key) // 将插入位置的值转为该块内存的地址 *(*unsafe.Pointer)(insertk) = kmem // insertk也指向到这块内存的地址 insertk = kmem } if t.indirectelem() { vmem := newobject(t.elem) *(*unsafe.Pointer)(elem) = vmem } // 将key写入到insertk指向的内存位置 typedmemmove(t.key, insertk, key) *inserti = top // 元素数量+1 h.count++ done: ... if t.indirectelem() { elem = *((*unsafe.Pointer)(elem)) } return elem }

newoverflow的流程:

源码:

func (h *hmap) newoverflow(t *maptype, b *bmap) *bmap {
	var ovf *bmap
	if h.extra != nil && h.extra.nextOverflow != nil {
		// 如果nextOverflow不为空,说明我们还有空闲的预分配溢出桶
		// 我们将优先使用空闲的预分配溢出桶
		ovf = h.extra.nextOverflow
		if ovf.overflow(t) == nil {
			// 如果该桶没有溢出桶,则将当前nextOverflow设置为它的下一个桶
			h.extra.nextOverflow = (*bmap)(add(unsafe.Pointer(ovf), uintptr(t.bucketsize)))
		} else {
			// 如果该桶有overflow,则说明它是最后一个溢出桶了,我们没有可用的预分配溢出桶了
			// 回忆下makeBucketArray函数,我们是不是把最后一个预分配的溢出桶的overflow设置为了buckets数组的第一个元素了😆
			// nextOverflow设置为nil
			ovf.setoverflow(t, nil)
			h.extra.nextOverflow = nil
		}
	} else {
		ovf = (*bmap)(newobject(t.bucket))
	}
	h.incrnoverflow()
	if t.bucket.ptrdata == 0 {
		// 这里如果没懂的话看下上面mapextra结构体的说明
		h.createOverflow() // 其实是初始化h.extra.overflow
		*h.extra.overflow = append(*h.extra.overflow, ovf)
	}
	b.setoverflow(t, ovf)
	return ovf
}
func (h *hmap) createOverflow() {
	if h.extra == nil {
		h.extra = new(mapextra)
	}
	if h.extra.overflow == nil {
		h.extra.overflow = new([]*bmap)
	}
}

扩容

既然上面提到了扩容那就在这里总结下扩容的步骤吧

扩容的条件

  1. 元素数量 / 桶数量 >= 6.5
  2. 溢出桶过多,当B<15(即bucket数量<2^15)时,如果溢出桶总数>=bucket数量则判定溢出桶过多;当B>=15时,如果溢出桶总数>=2^15则判定溢出桶过多

当条件1满足时,我们扩容一倍(将B加1),当条件1不足但条件2满足时说明桶利用率比较低,元素都被分配到了溢出桶上,这时容量不变,只移动bucket来降低空槽率

扩容步骤

  1. 如果负载系数>=6.5说明是由于溢出桶过多进行的扩容,这种扩容会进行等量扩容,不会增加buckets数组大小,这是将给map加上sameSizeGrow标志;否则将B+1,即扩容1倍
  2. 创建一个新buckets数组,并预分配一些溢出桶,清除map的iterator和oldIterator标志,如果当前map正在被迭代,则需要给map加个oldIterator标志
  3. 修改hmap结构体,将新buckets数组赋值给hmap.buckets字段,旧buckets数组赋值给hmap.oldbuckets字段,如果hmap.extra上还有溢出桶,还需要将此溢出桶赋值给hmap.extra.oldoverflow,同时hmap.extra.overflow置为nil
  4. 如果预分配了溢出桶,则将第一个溢出桶赋值给hmap.extra.nextOverflow func hashGrow(t *maptype, h *hmap) { bigger := uint8(1) if !overLoadFactor(h.count+1, h.B) { // 如果负载系数>=6.5说明是由于溢出桶过多进行的扩容,同时也说明了有许多bucket没有利用上 // 这时不扩容,只rehash bigger = 0 h.flags |= sameSizeGrow } oldbuckets := h.buckets // 创建一个新bucket数组 newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil) // 将flags中的iterator和oldIterator位清零 flags := h.flags &^ (iterator | oldIterator) if h.flags&iterator != 0 { flags |= oldIterator } // 修改map h.B += bigger h.flags = flags h.oldbuckets = oldbuckets h.buckets = newbuckets h.nevacuate = 0 h.noverflow = 0 if h.extra != nil && h.extra.overflow != nil { if h.extra.oldoverflow != nil { throw("oldoverflow is not nil") } h.extra.oldoverflow = h.extra.overflow h.extra.overflow = nil } if nextOverflow != nil { if h.extra == nil { h.extra = new(mapextra) } h.extra.nextOverflow = nextOverflow } }

可以看到hashGrow中是没有做迁移动作的,桶迁移是调用growWork渐进式进行的。

func growWork(t *maptype, h *hmap, bucket uintptr) {
	// bucket是在新数组上的位置,bucket&h.oldbucketmask()是在旧数组上的位置
	evacuate(t, h, bucket&h.oldbucketmask())

	// 如果还在扩容状态则再多迁移一个oldbucket
	if h.growing() {
		evacuate(t, h, h.nevacuate)
	}
}

evacuate源码

// oldbucket是要迁移的bucket在旧数组上的索引
func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
	// 索引找到要迁移的旧bucket的地址
	b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
	newbit := h.noldbuckets()
	if !evacuated(b) {
		// x和y是移动的目标
		// x表示的是新bucket数组的前半部分
		// y表示的是新bucket数组的后半部分
		var xy [2]evacDst
		x := &xy[0]
		// 找到要迁移的bmap的地址
		x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
		// 找到要迁移的k的地址
		x.k = add(unsafe.Pointer(x.b), dataOffset)
		// 找到要迁移的v的地址
		x.e = add(x.k, bucketCnt*uintptr(t.keysize))

		if !h.sameSizeGrow() {
			y := &xy[1]
			y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
			y.k = add(unsafe.Pointer(y.b), dataOffset)
			y.e = add(y.k, bucketCnt*uintptr(t.keysize))
		}

		
		for ; b != nil; b = b.overflow(t) {
			k := add(unsafe.Pointer(b), dataOffset)
			e := add(k, bucketCnt*uintptr(t.keysize))
			for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) {
				top := b.tophash[i]
				if isEmpty(top) {
					// 空槽不迁移
					b.tophash[i] = evacuatedEmpty
					continue
				}
				if top < minTopHash {
					throw("bad map state")
				}
				k2 := k
				if t.indirectkey() {
					k2 = *((*unsafe.Pointer)(k2))
				}
				var useY uint8
				if !h.sameSizeGrow() {
					// 增量扩容的情况下,计算hash以判断我们的数据是迁移到哪部分bucket
					hash := t.hasher(k2, uintptr(h.hash0))
					if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) {
						// 为什么要加 reflexivekey 的判断,可以参考这里:
                        			// https://go-review.googlesource.com/c/go/+/1480
                        			// key != key,只有在 float 数的 NaN 时会出现
                        			// 比如:
                        			// n1 := math.NaN()
                        			// n2 := math.NaN()
                        			// fmt.Println(n1, n2)
                        			// fmt.Println(n1 == n2)
                        			// 这种情况下 n1 和 n2 的哈希值也完全不一样
                        			// 这里官方表示这种情况是不可复现的
                        			// 需要在 iterators 参与的情况下才能复现
                        			// 但是对于这种 key 我们也可以随意对其目标进行发配
                        			// 同时 tophash 对于 NaN 也没啥意义
                        			// 还是按正常的情况下算一个随机的 tophash
                        			// 然后公平地把这些 key 平均分布到各 bucket 就好
						useY = top & 1
						top = tophash(hash)
					} else {
						// 假设旧桶数为2,那么newbit就为100(二进制形式)
						// 新桶数为4,bucketMask为111,
						// hash&newbit != 0说明hash形式为xxx1xx,
						// hash & bucketMask肯定是>100,所以肯定在Y半区
						if hash&newbit != 0 {
							useY = 1
						}
					}
				}

				if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY {
					throw("bad evacuatedN")
				}

				// 修改旧桶tophash为状态值
				b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY
				dst := &xy[useY]                 // evacuation destination

				if dst.i == bucketCnt {
					dst.b = h.newoverflow(t, dst.b)
					dst.i = 0
					dst.k = add(unsafe.Pointer(dst.b), dataOffset)
					dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))
				}
				dst.b.tophash[dst.i&(bucketCnt-1)] = top
				if t.indirectkey() {
					*(*unsafe.Pointer)(dst.k) = k2 
				} else {
					typedmemmove(t.key, dst.k, k) 
				}
				if t.indirectelem() {
					*(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
				} else {
					typedmemmove(t.elem, dst.e, e)
				}
				dst.i++
				
				dst.k = add(dst.k, uintptr(t.keysize))
				dst.e = add(dst.e, uintptr(t.elemsize))
			}
		}
		
		if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 {
			b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))
			ptr := add(b, dataOffset)
			n := uintptr(t.bucketsize) - dataOffset
			memclrHasPointers(ptr, n)
		}
	}

	if oldbucket == h.nevacuate {
		advanceEvacuationMark(h, t, newbit)
	}
}

查找

runtime的map提供了mapaccess1、mapaccess2、mapaccessK三个函数来获取值,取值时编译器会将v, exists = map[k] 转化为mapaccess2,v = map[k]转化为mapaccess1,mapaccessK只用于map遍历时。下面我们以mapaccess1为例看下是如何查找key的。

mapaccess1查找的流程:

  1. 计算key的hash值,hash值的低B位即为key所在bucket在map.buckets数组中的下标,通过hash值的低B位就能找到key所在bucket,如果map在扩容中那就找到key所在的旧bucket,如果旧bucket未迁移,那么接下来从旧bucket中找key
  2. 遍历bucket的每个槽,取hash值的tophash(高8位)与遍历到的槽上的tophash对比,如果不同则先判断下槽上的tophash是否是emptyRest,如果是那说明当前槽即下面的槽都是空槽了,就不用遍历了,如果不是就继续查下一个槽,直到tophash匹配上
  3. 如果上一步找到了匹配的tophash,就继续判断槽上的key是不是我们要找的key,如果不是还得继续遍历,如果是我们要找的key就取槽上的v返回就结束了
  4. 如果上一步没有找到对应的槽,说明map上没有我们要找的key func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer { ... if h == nil || h.count == 0 { if t.hashMightPanic() { // 这段代码是因为当从一个非空的map中取值时, // 如果key为nil, 那么会导致一个panic, // 但同样的操作在nil/空 map上就不会发生, 这可能会导致难以发现的bug // 所以这里还是要额外计算下key的hash值 t.hasher(key, 0) // see issue 23734 } return unsafe.Pointer(&zeroVal[0]) } ... // 计算key对应的hash值 // go为不同的type选择不同的hash函数,具体可以看下/cmd/compile/internal/gc/alg.go genhash()函数 hash := t.hasher(key, uintptr(h.hash0)) m := bucketMask(h.B) // 取hash值的低B位,然后找到对应的bucket b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize))) // 如果oldbuckets不为空,说明在扩容中 if c := h.oldbuckets; c != nil { if !h.sameSizeGrow() { m >>= 1 } // 找到旧bucket oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize))) // 如果还未迁移则下面从旧bucket查询key if !evacuated(oldb) { b = oldb } } // 取hash值的高8位(如果<5则再加上5) top := tophash(hash) bucketloop: // 先查找当前bucket的8个元素,再查找溢出桶上的元素 for ; b != nil; b = b.overflow(t) { for i := uintptr(0); i < bucketCnt; i++ { if b.tophash[i] != top { // 如果tophash为emptyRest则说明当前槽和后面的槽都不会有元素了 if b.tophash[i] == emptyRest { break bucketloop } continue } k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) if t.indirectkey() { k = *((*unsafe.Pointer)(k)) } // hash值一致后再判断key是否是我们要查询的key if t.key.equal(key, k) { // 取回元素值 e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize)) if t.indirectelem() { e = *((*unsafe.Pointer)(e)) } return e } } } return unsafe.Pointer(&zeroVal[0]) }

删除

删除的步骤:

  1. 与查找类似先通过key与hash值在bucket数组上找到对应的槽
  2. 将槽上的k/v清空,并将tophash设置为emptyOne
  3. 如果该槽是最后一个元素则将它及它之前的空槽tophash都设置为emptyRest,标识其后面都是空槽 func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) { ... // 计算key的hash值 alg := t.key.alg hash := alg.hash(key, uintptr(h.hash0)) ... // 根据hash值低B位找到bucket索引 bucket := hash & bucketMask(h.B) if h.growing() { // 如果正在扩容则执行迁移操作 growWork(t, h, bucket) } // 找到对应的bmap b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize))) bOrig := b top := tophash(hash) // 下面与查询类似都是去找到具体所在的槽 search: for ; b != nil; b = b.overflow(t) { for i := uintptr(0); i < bucketCnt; i++ { if b.tophash[i] != top { if b.tophash[i] == emptyRest { break search } continue } k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) k2 := k if t.indirectkey() { k2 = *((*unsafe.Pointer)(k2)) } if !alg.equal(key, k2) { continue } // 如果 key 中是指针,就清空 key 的内容 if t.indirectkey() { *(*unsafe.Pointer)(k) = nil } else if t.key.ptrdata != 0 { memclrHasPointers(k, t.key.size) } e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize)) if t.indirectelem() { *(*unsafe.Pointer)(e) = nil } else if t.elem.ptrdata != 0 { memclrHasPointers(e, t.elem.size) } else { memclrNoHeapPointers(e, t.elem.size) } b.tophash[i] = emptyOne // 如果该槽不是最后一个元素则跳转到最后 if i == bucketCnt-1 { if b.overflow(t) != nil && b.overflow(t).tophash[0] != emptyRest { goto notLast } } else { if b.tophash[i+1] != emptyRest { goto notLast } } // 如果该槽是最后一个元素则将它及它之前的空槽tophash都设置为emptyRest,标识其后都是空槽 for { b.tophash[i] = emptyRest if i == 0 { // 如果i=0说明遍历到了该bucket上的第一个槽,这时如果遍历到的b是冲突链表上的第一个bucket说明我们已经遍历完了,break if b == bOrig { break } c := b // 从冲突链表上第一个bucket开始找当前bucket的上一个bucket for b = bOrig; b.overflow(t) != c; b = b.overflow(t) { } i = bucketCnt - 1 } else { i-- } if b.tophash[i] != emptyOne { break } } notLast: // 元素数-1 h.count-- break search } } ... }
阅读全文 »

Golang切片与实现原理

本文Golang版本为1.13.4

Slice底层结构

go中切片实际是一个结构体,它位于runtime包的slice.go文件中

type slice struct {
	array unsafe.Pointer
	len   int
	cap   int
}

array是切片用来存储数据的底层数组的指针,len为切片中元素的数量,cap为切片的容量即数组的长度

切片的初始化

创建一个切片有以下几种方式

1. 通过字面量创建

arr1 := [3]int{1,2,3} // 创建一个数组
s1 := []int{1,2,3} // 创建一个len为3,cap为3的切片

上面的创建方式非常容易与数组的另一个创建方式弄混

arr2 := [...]int{1,2,3} // 创建一个数组,数组长度由编译器推断

s1在内存上的结构如下图: image.png

2. 通过make()函数创建

s1 := make([]int, 10) // 创建一个长度为10,容量为10的切片
s2 := make([]int, 5, 10) // 创建一个长度为5,容量为5的切片

s2的内存结构如图: image.png

3. 通过数组/切片创建另一个切片

通过数组/切片创建另一个切片语法为

slice[i:j:k]

其中i表示开始切的位置,包括该位置,如果没有则表示从0开始切;j表示切到的位置,不包括该位置,如果没有j则切到最后;k控制切片的容量切到的位置,不包括该位置,如果没有则切到尾部。下面举几个例子说明:

a := [10]int{0,1,2,3,4,5,6,7,8,9}
s1 := a[2:5:9] // s1结果为[2,3,4], len:3, cap:7
s2 := a[2:5:10] // s2结果为[2,3,4] len:3, cap:8
s3 := a[2:7:10] // s3结果为[2,3,4,5,6] len:5, cap:8
s4 := a[2:] // s4结果为[2,3,4,5,6,7,8,9] len:8, cap:8
s5 := a[:3] // s5结果为[0,1,2] len:3, cap:10
s6 := a[::3] // 编译报错: middle index required in 3-index slice
s7 := a[:] s7结果为[0,1,2,3,4,5,6,7,8,9], len:10, cap:10
s10 := s1[1:3] s10结果为[3,4], len:2, cap: 6。注意s10的cap是6,而不是7!

s1与s10在内存上的结构如图: image.png

由于as1s2s3s4s5s7共享同一个数组,所以其中任意一个变量通过索引修改了底层数组元素的值,相当于修改了以上所有变量:

s2[3] = 30

执行上面的代码后:变量a变成了[0,1,2,30,4,5,6,7,8,9]、s1变成了[2,30,4]、…… s7变成了[0,1,2,30,4,5,6,7,8,9]

nil切片与空切片

var s11 []int
var s12 = make([]int, 0)

上面的s11为nil,s12是空切片,他们在内存上的结构如图: image.png

我写了段代码验证了下:

var s10 = make([]int, 0)
sh10 := (*reflect.SliceHeader)(unsafe.Pointer(&s10))
println(unsafe.Pointer(sh10.Data))
var s11 []int
sh11 := (*reflect.SliceHeader)(unsafe.Pointer(&s11))
println(unsafe.Pointer(sh11.Data))
var s12 = make([]int, 0)
sh12 := (*reflect.SliceHeader)(unsafe.Pointer(&s12))
println(unsafe.Pointer(sh12.Data))
var s13 = make([]int, 0)
sh13 := (*reflect.SliceHeader)(unsafe.Pointer(&s13))
println(unsafe.Pointer(sh13.Data))

打印结果如下:

0xc00006af08
0x0
0xc00006af08
0xc00006af08

根据打印结果可以看到是上面的结构无误

切片创建源码

我们打印下下面代码对应的汇编,看下golang是如何为我们创建出来一个切片的

func main() {
	tttttt := make([]int, 999)
	fmt.Println(tttttt)
}

通过go tool compile -S -l slice.go打印对应汇编(-l是禁止内联),下面只摘取关键部分

"".main STEXT size=181 args=0x0 locals=0x48
	...
	// 栈增加72个字节
        0x0013 00019 (slice.go:5)       SUBQ    $72, SP
	// 将当前栈底地址加载到到当前栈顶地址+64处
        0x0017 00023 (slice.go:5)       MOVQ    BP, 64(SP)
	// 栈底修改为栈顶地址+64
        0x001c 00028 (slice.go:5)       LEAQ    64(SP), BP
        ...
        0x0021 00033 (slice.go:6)       LEAQ    type.int(SB), AX
        ...
	// 下面三行实际是把tuntime.makeslice放到栈上的指定位置
        0x0028 00040 (slice.go:6)       MOVQ    AX, (SP)
        0x002c 00044 (slice.go:6)       MOVQ    $999, 8(SP)
        0x0035 00053 (slice.go:6)       MOVQ    $999, 16(SP)

上面的部分画个图可能更清晰些: image.png

继续看汇编:

	// 调用runtime.makeslice函数
        0x003e 00062 (slice.go:6)       CALL    runtime.makeslice(SB)
        ...
	// 将返回值加载到AX寄存器
        0x0043 00067 (slice.go:6)       MOVQ    24(SP), AX
        ...
	// 下面就是调用fmt.Println函数的代码了
        0x0048 00072 (slice.go:7)       MOVQ    AX, (SP)
        0x004c 00076 (slice.go:7)       MOVQ    $999, 8(SP)
        0x0055 00085 (slice.go:7)       MOVQ    $999, 16(SP)
        0x005e 00094 (slice.go:7)       CALL    runtime.convTslice(SB)
        ...
        0x0063 00099 (slice.go:7)       MOVQ    24(SP), AX
        ...
        0x0068 00104 (slice.go:7)       XORPS   X0, X0
        0x006b 00107 (slice.go:7)       MOVUPS  X0, ""..autotmp_1+48(SP)
        ...
        0x0070 00112 (slice.go:7)       LEAQ    type.[]int(SB), CX
        ...
        0x0077 00119 (slice.go:7)       MOVQ    CX, ""..autotmp_1+48(SP)
        ...
        0x007c 00124 (slice.go:7)       MOVQ    AX, ""..autotmp_1+56(SP)
        ...
        0x0081 00129 (slice.go:7)       LEAQ    ""..autotmp_1+48(SP), AX
        ...
        0x0086 00134 (slice.go:7)       MOVQ    AX, (SP)
        0x008a 00138 (slice.go:7)       MOVQ    $1, 8(SP)
        0x0093 00147 (slice.go:7)       MOVQ    $1, 16(SP)
        0x009c 00156 (slice.go:7)       CALL    fmt.Println(SB)
        0x00a1 00161 (slice.go:8)       MOVQ    64(SP), BP
        0x00a6 00166 (slice.go:8)       ADDQ    $72, SP
        0x00aa 00170 (slice.go:8)       RET
        0x00ab 00171 (slice.go:8)       NOP
        ...
        0x00ab 00171 (slice.go:5)       CALL    runtime.morestack_noctxt(SB)
        ...
        0x00b0 00176 (slice.go:5)       JMP     0

上面出现了一个关键函数,即runtime.makeslice,我们看下它的实现:

func makeslice(et *_type, len, cap int) unsafe.Pointer {
	// 这里实际是计算切片所占的内存大小,即元素的大小乘容量
	// mem为所需内存大小,overflow标识是否溢出
	mem, overflow := math.MulUintptr(et.size, uintptr(cap))
	if overflow || mem > maxAlloc || len < 0 || len > cap {
		// 如果溢出或者所需内存大于最大可分配内存或者len、cap不合法则报错
		mem, overflow := math.MulUintptr(et.size, uintptr(len))
		if overflow || mem > maxAlloc || len < 0 {
			panicmakeslicelen()
		}
		panicmakeslicecap()
	}
	// 调用mallocgc从go内存管理器获取一块内存
	return mallocgc(mem, et, true)
}

函数传参

切片作为函数参数传参时实际上是复制了一个runtime.slice结构体,而非是传递的runtime.slice结构体指针,举个栗子:

func main() {
	slice := []int{0,1,2}
	foo(slice)
}
func foo(slice []int) {
	...
}

其实就等价于

type Slice struct {
	ptr *[3]int
        len int
	cap int
}

func main() {
	slice := Slice{&[3]int{1,2,3}, 0, 0}
	foo(slice)
}
func foo(slice Slice) {
	...
}

因为函数的形参与实参共享同一个数组,这就导致当把一个切片作为参数传递到另一个函数时,在函数内修改形参的某个下标的值时也会修改了实参。描述的比较绕,下面看一个实例:

func main() {
	param := []int{0, 1, 2}
	foo(param)
	fmt.Println(param)
}

func foo(arg []int) {
	arg[1] = 10
}

打印结果为[0,10,2],原因是param与arg共享同一个底层数组,函数foo内修改了arg[1]实际是将两者的底层数组下标为1的元素修改为了10,所以main函数中的param[1]也就变成了10。 在foo函数内修改arg的len字段,是不会影响到param的len的,下面我们验证下:

func main() {
	param := []int{0, 1, 2}
	foo(param)
	fmt.Println(param)
	fmt.Println(len(param))
}

func foo(arg []int) {
	arg[1] = 10
	argSlice := (*reflect.SliceHeader)(unsafe.Pointer(&arg))
	argSlice.Len = 10
	fmt.Println(len(arg))
}

打印结果如下:

10
[0 10 2]
3

验证成功。

切片扩容

当通过append函数向切片中添加数据时,如果切片的容量不足,需要进行扩容,实际调用的是runtime包中的growslice()函数

// runtime/slice.go
func growslice(et *_type, old slice, cap int) slice {
	...

	// 下面就是计算新容量的部分了
	newcap := old.cap
	doublecap := newcap + newcap
	if cap > doublecap {
		// 如果所需容量大于当前容量的两倍,则新容量为所需容量
		newcap = cap
	} else {
		// 下面是所需容量<=当前容量两倍的逻辑
		if old.len < 1024 {
			// 如果当前长度<1024则新容量为当前容量x2
			newcap = doublecap
		} else {
			// 下面是当前长度>=1024的逻辑
			// 新容量每次增加自身的1/4,直到超过所需容量
			for 0 < newcap && newcap < cap {
				newcap += newcap / 4
			}
			// 如果溢出则新容量为所需容量
			if newcap <= 0 {
				newcap = cap
			}
		}
	}

	// 此处省略分配内存的代码
	...

	// p为新分配的底层数组的地址
	// 从old.array处拷贝lenmem个字节到p
	memmove(p, old.array, lenmem)
	// 返回新的切片
	return slice{p, old.len, newcap}
}
阅读全文 »

Go源码解析之sync.Mutex锁

本文使用Golang版本为:go1.13.4

Mutex的使用

先通过一段简单代码看下Go中Mutex的用法

func main() {
	a := 1
	m := sync.Mutex{}
	go func(){
		m.Lock()
		b := a
		a = b + 1
		m.Unlock()
	}()

	m.Lock()
	fmt.Println(a)
	m.Unlock()
}

Mutex的设计

在解释Lock()和Unlock()源码之前我们必须先整体了解下Mutex的设计,不然下面的源码很难看懂。

我们首先看下sync.Mutex这个结构体

type Mutex struct {
	state int32 // 锁的当前状态,共三种
	sema  uint32 // 信号量,用于阻塞和唤醒goroutine
}

锁的三个状态,它们使用Mutex.state的低三位来标识

mutexLocked = 1 << iota // 锁定状态,二进制表示即 ...001
mutexWoken // 唤醒状态,二进制表示即 ...010
mutexStarving // 饥饿状态,二进制表示即...100

mutexLocked位于state的第一位,mutexWoken位于state的第二位,mutexStarving位于state的第三位,如下图: image.png

Mutex锁有两种模式:正常模式和饥饿模式。正常模式时,waiter按照先到先得的方式获取锁,一个waiter被唤醒后并不能直接获取到锁,它需要与新到的goroutine抢占锁,但是新到的goroutine已经在CPU上运行了,所以它大概率抢不过新到的goroutine,如果抢不到锁waiter就需要在等待队列队头继续等待,而这可能会导致一个waiter等待很长时间。为了避免waiter等待过久,当waiter超过1ms没有抢到锁时就会将当前锁切换到饥饿模式。

切换到饥饿模式后,锁将从解锁的goroutine切换到等待队列的队头waiter,新来的goroutine不会去尝试获取锁,也不会自旋,它们会排到等待队列的队尾。

如果某waiter获取到了锁,那么在满足以下两个条件之一时,它会将当前锁从饥饿模式切换到正常模式。

  1. 它是最后一个waiter
  2. 它等待锁的时间不到1ms

了解了Mutex的设计后我们再继续看Lock()与Unlock()的实现。

加锁Lock()的实现

func (m *Mutex) Lock() {
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		// 这里本有竞争检测的代码,无意义,已被我删除
		return
	}
	m.lockSlow()
}

函数中首先通过CAS操作尝试获得锁,如果m.state为0即当前锁闲置就将它设置为1,如果尝试失败则进入m.lockSlow()

m.lockSlow()的实现

m.lockSlow()中用到了这几个函数:runtime_canSpin()runtime_doSpin()runtime_SemacquireMutex(),我们先挨个解释下这几个函数的作用再看m.lockSlow()的源码。

runtime_canSpin()

该函数的作用是判断能够进入自旋,下面看下源码

// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool { // i是当前自旋次数
	if i >= 4|| ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
		return false
	}
	if p := getg().m.p.ptr(); !runqempty(p) {
		return false
	}
	return true
}

通过这个函数我们可以看到,runtime层判断能够自旋必须满足以下几个条件

  • 当前自旋次数不能>=4
  • 必须是多核CPU
  • 至少有一个其他正在运行的P
  • 当前P本地G队列为空

这里解释下gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1这个条件: gomaxprocs是进程中P数量上限,sched.npidle是空闲的P的数量、sched.nmspinning是自旋中的M的数量gomaxprocs - sched.npidle - sched.nmspinning=当前运行中的P的数量,当前运行中的P数量-1(当前P) = 其他P的数量,所以这个条件就是至少有一个其他正在运行的P。

runtime_doSpin()

其源码为:

//go:linkname sync_runtime_doSpin sync.runtime_doSpin
//go:nosplit
func sync_runtime_doSpin() {
	procyield(30)
}

这里我们仅看下AMD64平台上proyield的实现:

TEXT runtime·procyield(SB),NOSPLIT,$0-0
	MOVL	cycles+0(FP), AX // 将第一个参数即30加载到AX寄存器
again:
	PAUSE // CPU空转,达到占用CPU的效果
	SUBL	$1, AX // AX寄存器-1
	JNZ	again // 如果不为0则继续执行PAUSE指令,否则退出
	RET

到这里可以看出runtime_doSpin()实际就是CPU空转30次。

runtime_SemacquireMutex()

其实现位于runtime包的sema.go文件中

//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
	semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}

semacquire1的实现并非本文重点,这里大概解释下这个函数的作用:

  1. 如果lifo为true,则加到等待队列队头
  2. 如果lifo为false,则加到等待队列队尾
m.lockSlow()

了解了上面几个函数后我们来看下m.lockSlow()中是怎么处理的吧

func (m *Mutex) lockSlow() {
	var waitStartTime int64
	starving := false // 饥饿模式标志
	awoke := false // 唤醒标志
	iter := 0 // 已进行的自旋次数
	old := m.state // 保存当前锁状态
	for {
		// 进入自旋需要满足三个条件
		// 1. 当前锁状态是锁定状态,如果不是锁定状态就退出自旋尝试获取锁
		// 2. 当前不是饥饿状态,原因是饥饿状态时自旋无意义,因为锁会交给等待队列中的第一个waiter
		// 3. runtime_canSpin判断能够自旋
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				// 如果没有唤醒 且 当前锁状态不在唤醒状态
				// 且 当前有等待者则尝试通过CAS将锁状态标记为唤醒
				// 标记为唤醒后,Unlock()中就不会通过信号量唤醒其他锁定的goroutine了
				// 如果CAS成功则标识自己为唤醒
				awoke = true
			}
			// CPU空转30次
			runtime_doSpin()
			// 自旋次数+1
			iter++
			// 更新当前锁状态
			old = m.state
			// 继续尝试自旋
			continue
		}

		// 如果判断不能进入自旋则进入以下逻辑
		// 进到这里有三种情况:
		// 1. 当前已解锁,锁处于正常状态
		// 2. 当前已解锁,锁处于饥饿状态
		// 3. 当前未解锁,锁处于正常状态
		// 4. 当前未解锁,锁处于饥饿状态

		// old是锁的当前状态,new是期望状态,在下面会尝试将锁通过CAS更新为期望状态
		new := old
		if old&mutexStarving == 0 {
			// 如果当前锁是正常状态则尝试获取锁
			new |= mutexLocked
		}
		if old&(mutexLocked|mutexStarving) != 0 {
			// 等待数+1
			// 如果锁当前处于饥饿状态,当前goroutine不能获取锁,需要进到等待队列队尾排队等待,所以等待数需要+1
			// 如果当前锁处于锁定状态,也需要进到等待队列等待
			new += 1 << mutexWaiterShift
		}
		if starving && old&mutexLocked != 0 {
			// 如果当前处于饥饿模式并且锁定状态
			// 则尝试设置为饥饿状态
			new |= mutexStarving
		}
		if awoke {
			if new&mutexWoken == 0 {
				// 如果当前goroutine抢到了唤醒,但是唤醒标志还为0说明出现了异常情况
				throw("sync: inconsistent mutex state")
			}
			// 如果在自旋时当前goroutine抢到唤醒了,则尝试将锁标记为未唤醒
			new &^= mutexWoken
		}
		// 尝试将锁状态由旧状态修改为期望状态
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			// 修改成功
			// 如果旧状态既不是锁定状态也不是饥饿状态
			// 说明了抢到了锁,则退出循环
			if old&(mutexLocked|mutexStarving) == 0 {
				break
			}
			
			queueLifo := waitStartTime != 0
			if waitStartTime == 0 {
				// 记录等待开始时间
				waitStartTime = runtime_nanotime()
			}
			// 通过信号量阻塞当前goroutine
			// 如果waitStartTime为0,则说明当前goroutine是一个新来的goroutine,那么queueLifo=false,意味加到队尾。
			// 如果waitStartTime不为0,意味当前goroutine是一个被唤醒的goroutine,那么queueLifo=true,意味着加到队头
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			// 如果等待时间超过了1ms则切换到饥饿模式
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			// 更新当前锁状态
			old = m.state
			// 如果当前锁处于饥饿状态
			if old&mutexStarving != 0 {
				// 如果当前锁处于锁定状态或者唤醒状态或者没有waiter,异常
				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				}
				// 因为当前goroutine已经获取了锁,delta用于将等待队列-1
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				// 如果当前不是锁定模式或者只有一个waiter
				// 就通过delta -= mutexStarving和atomic.AddInt32操作将锁的饥饿状态位设置为0,表示为正常模式
				if !starving || old>>mutexWaiterShift == 1 {
					delta -= mutexStarving
				}
				atomic.AddInt32(&m.state, delta)
				break
			}
			awoke = true
			iter = 0
		} else {
			old = m.state
		}
	}
}

同样的,我已将无关代码和注释删除。

解锁Unlock()的实现

func (m *Mutex) Unlock() {
        // 将锁定状态置为0
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
	    // 如果锁上存在等待者或者处于饥饿模式则进入unlockSlow()
		m.unlockSlow(new)
	}
}

Unlock()本身非常简单,下面重点关注下unlockSlow()的实现

func (m *Mutex) unlockSlow(new int32) {
	if (new+mutexLocked)&mutexLocked == 0 {
		// 如果解锁一个未锁定的锁则抛出异常
		throw("sync: unlock of unlocked mutex")
	}
	if new&mutexStarving == 0 {
		// 处于正常模式
		old := new
		for {
			// 如果没有等待者则无需唤醒任何goroutine,另外以下三种情况也无需唤醒
			// 1. 锁处于锁定状态,说明Unlock()解锁后紧接着就被其他goroutine获取,就不用再唤醒了
			// 2. 锁处于唤醒状态,说明有等待的goroutine已经被唤醒了,不用再尝试唤醒了
			// 3. 锁处于饥饿模式,锁会交给等待队列队头的等待者,不能往下进行
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				
				return
			}
			// 流程走到这里说明当前有等待者并且锁处于空闲状态(三个标志位都为0)
			// 说明等待者还没有被唤醒,需要唤醒等待者
			// 通过CAS将等待者数量-1,并且设置为唤醒
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				// 通过信号量唤醒等待者goroutine,然后退出
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			// CAS修改失败,说明锁的状态已经被修改,有以下几种可能性:
			// 1. 有新的等待者进来
			// 2. 锁被其他goroutine获取(Unlokc()中已经解锁了,走到这里可能已经被其他goroutine)
			// 3. 锁进入了饥饿模式
	
			// 更新锁状态,进入到下一个循环
			old = m.state
		}
	} else {
		// 处于饥饿模式则直接通过信号量唤醒等待队列头的goroutine
		// 此时state的mutexLocked还没有加锁,唤醒的goroutine会持有锁
		// 在此期间,如果有新的goroutine来请求锁, 因为mutex处于饥饿状态,不会抢占锁
		runtime_Semrelease(&m.sema, true, 1)
	}
}

后言

Mutex虽然代码简单,但由于并行的原因导致case太多,所以还是不太好理解了,建议大家代入到具体的场景中去分析。

阅读全文 »

深入理解原子操作的本质

引言

本文以go1.14 darwin/amd64中的原子操作为例,探究原子操作的汇编实现,引出LOCK指令前缀可见性MESI协议Store BufferInvalid Queue内存屏障,通过对CPU体系结构的探究,从而理解以上概念,并在最终给出一些事实。

Go中的原子操作

我们以atomic.CompareAndSwapInt32为例,它的函数原型是:

func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

对应的汇编代码为:

// sync/atomic/asm.s 24行
TEXT ·CompareAndSwapInt32(SB),NOSPLIT,$0
	JMP	runtime∕internal∕atomic·Cas(SB)

通过跳转指令JMP跳转到了runtime∕internal∕atomic·Cas(SB),由于架构的不同对应的汇编代码也不同,我们看下amd64平台对应的代码:

// runtime/internal/atomic/asm_amd64.s 17行
TEXT runtime∕internal∕atomic·Cas(SB),NOSPLIT,$0-17
	MOVQ	ptr+0(FP), BX // 将函数第一个实参即addr加载到BX寄存器
	MOVL	old+8(FP), AX // 将函数第二个实参即old加载到AX寄存器
	MOVL	new+12(FP), CX // // 将函数第一个实参即new加载到CX寄存器
	LOCK // 本文关键指令,下面会详述
	CMPXCHGL	CX, 0(BX) // 把AX寄存器中的内容(即old)与BX寄存器中地址数据(即addr)指向的数据做比较如果相等则把第一个操作数即CX中的数据(即new)赋值给第二个操作数
	SETEQ	ret+16(FP) // SETEQ与CMPXCHGL配合使用,在这里如果CMPXCHGL比较结果相等则设置本函数返回值为1,否则为0(16(FP)是返回值即swapped的地址)
	RET // 函数返回

从上面代码中可以看到本文的关键:LOCK。它实际是一个指令前缀,它后面必须跟read-modify-write指令,比如:ADD, ADC, AND, BTC, BTR, BTS, CMPXCHG, CMPXCH8B, CMPXCHG16B, DEC, INC, NEG, NOT, OR, SBB, SUB, XOR, XADD, XCHG

LOCK实现原理

在早期CPU上LOCK指令会锁总线,即其他核心不能再通过总线与内存通讯,从而实现该核心对内存的独占。

这种做法虽然解决了问题但是性能太差,所以在Intel P6 CPU(P6是一个架构,并非具体CPU)引入一个优化:如果数据已经缓存在CPU cache中,则锁缓存,否则还是锁总线。

Cache Coherency

CPU Cache与False Sharing 一文中详细介绍了CPU缓存的结构,CPU缓存带来了一致性问题,举个简单的例子:

// 假设CPU0执行了该函数
var a int = 0
go func fnInCpu0() {
    time.Sleep(1 * time.Second)
    a = 1 // 2. 在CPU1加载完a之后CPU0仅修改了自己核心上的cache但是没有同步给CPU1
}()
// CPU1执行了该函数
go func fnInCpu1() {
    fmt.Println(a) // 1. CPU1将a加载到自己的cache,此时a=0
    time.Sleep(3 * time.Second)
    fmt.Println(a) // 3. CPU1从cache中读到a=0,但此时a已经被CPU0修改为0了
}()

上例中由于CPU没有保证缓存的一致性,导致了两个核心之间的同一数据不可见从而程序出现了问题,所以CPU必须保证缓存的一致性,下面将介绍CPU是如何通过MESI协议做到缓存一致的。

MESI是以下四种cacheline状态的简称:

  • M(Modified):此状态为该cacheline被该核心修改,并且保证不会在其他核心的cacheline上
  • E(Exclusive):标识该cacheline被该核心独占,其他核心上没有该行的副本。该核心可直接修改该行而不用通知其他核心。
  • S(Share):该cacheline存在于多个核心上,但是没有修改,当前核心不能直接修改,修改该行必须与其他核心协商。
  • I(Invaild):该cacheline无效,cacheline的初始状态,说明要么不在缓存中,要么内容已过时。

核心之间协商通信需要以下消息机制:

  • Read: CPU发起数据读取请求,请求中包含数据的地址
  • Read Response: Read消息的响应,该消息有可能是内存响应的,有可能是其他核心响应的(即该地址存在于其他核心上cacheline中,且状态为Modified,这时必须返回最新数据)
  • Invalidate: 核心通知其他核心将它们自己核心上对应的cacheline置为Invalid
  • Invalidate ACK: 其他核心对Invalidate通知的响应,将对应cacheline置为Invalid之后发出该确认消息
  • Read Invalidate: 相当于Read消息+Invalidate消息,即当前核心要读取数据并修改该数据。
  • Write Back: 写回,即将Modified的数据写回到低一级存储器中,写回会尽可能地推迟内存更新,只有当替换算法要驱逐更新过的块时才写回到低一级存储器中。

手画状态转移图

image.png

这里有个存疑的地方:CPU从内存中读到数据I状态是转移到S还是E,查资料时两种说法都有。个人认为应该是E,因为这样另外一个核心要加载副本时只需要去当前核心上取就行了不需要读内存,性能会更高些,如果你有不同看法欢迎在评论区交流。

一些规律

  1. CPU在修改cacheline时要求其他持有该cacheline副本的核心失效,并通过Invalidate ACK来接收反馈
  2. cacheline为M意味着内存上的数据不是最新的,最新的数据在该cacheline上
  3. 数据在cacheline时,如果状态为E,则直接修改;如果状态为S则需要广播Invalidate消息,收到Invalidate ACK后修改状态为M;如果状态为I(包括cache miss)则需要发出Read Invalidate

Store Buffer

当CPU要修改一个S状态的数据时需要发出Invalidate消息并等待ACK才写数据,这个过程显然是一个同步过程,但这对于对计算速度要求极高的CPU来说显然是不可接受的,必须对此优化。 因此我们考虑在CPU与cache之间加一个buffer,CPU可以先将数据写入到这个buffer中并发出消息,然后它就可以去做其他事了,待消息响应后再从buffer写入到cache中。但这有个明显的逻辑漏洞,考虑下这段代码:

a = 1
b = a + 1

假设a初始值为0,然后CPU执行a=1,数据被写入Store Buffer还没有落地就紧接着执行了b=a+1,这时由于a还没有修改落地,因此CPU读到的还是0,最终计算出来b=1。

为了解决这个明显的逻辑漏洞,又提出了Store Forwarding:CPU可以把Buffer读出来传递(forwarding)给下面的读取操作,而不用去cache中读。 image.png

这倒是解决了上面的漏洞,但是还存在另外一个问题,我们看下面这段代码:

a = 0
flag = false
func runInCpu0() {
    a = 1
    flag = true
}

func runInCpu1() {
    while (!flag) {
   	continue
    }
    print(a)
}

对于上面的代码我们假设有如下执行步骤:

  1. 假定当前a存在于cpu1的cache中,flag存在于cpu0的cache中,状态均为E。
  2. cpu1先执行while(!flag),由于flag不存在于它的cache中,所以它发出Read flag消息
  3. cpu0执行a=1,它的cache中没有a,因此它将a=1写入Store Buffer,并发出Invalidate a消息
  4. cpu0执行flag=true,由于flag存在于它的cache中并且状态为E,所以将flag=true直接写入到cache,状态修改为M
  5. cpu0接收到Read flag消息,将cache中的flag=true发回给cpu1,状态修改为S
  6. cpu1收到cpu0的Read Response:flat=true,结束while(!flag)循环
  7. cpu1打印a,由于此时a存在于它的cache中a=0,所以打印出来了0
  8. cpu1此时收到Invalidate a消息,将cacheline状态修改为I,但为时已晚
  9. cpu0收到Invalidate ACK,将Store Buffer中的数据a=1刷到cache中

从代码角度看,我们的代码好像变成了

func runInCpu0() {
    flag = true
    a = 1
}

好像是被重新排序了,这其实是一种 伪重排序,必须提出新的办法来解决上面的问题

写屏障

CPU从软件层面提供了 写屏障(write memory barrier) 指令来解决上面的问题,linux将CPU写屏障封装为smp_wmb()函数。写屏障解决上面问题的方法是先将当前Store Buffer中的数据刷到cache后再执行屏障后面的写入操作。

SMP: Symmetrical Multi-Processing,即多处理器。

这里你可能好奇上面的问题是硬件问题,CPU为什么不从硬件上自己解决问题而要求软件开发者通过指令来避免呢?其实很好回答:CPU不能为了这一个方面的问题而抛弃Store Buffer带来的巨大性能提升,就像CPU不能因为分支预测错误会损耗性能增加功耗而放弃分支预测一样。

还是以上面的代码为例,前提保持不变,这时我们加入写屏障:

a = 0
flag = false
func runInCpu0() {
    a = 1
    smp_wmb()
    flag = true
}

func runInCpu1() {
    while (!flag) {
   	continue
    }
    print(a)
}

当cpu0执行flag=true时,由于Store Buffer中有a=1还没有刷到cache上,所以会先将a=1刷到cache之后再执行flag=true,当cpu1读到flag=true时,a也就=1了。

有文章指出CPU还有一种实现写屏障的方法:CPU将当前store buffer中的条目打标,然后将屏障后的“写入操作”也写到Store Buffer中,cpu继续干其他的事,当被打标的条目全部刷到cache中,之后再刷后面的条目。

Invalid Queue

上文通过写屏障解决了伪重排序的问题后,还要思考另一个问题,那就是Store Buffer size是有限的,当Store Buffer满了之后CPU还是要卡住等待Invalidate ACK。Invalidate ACK耗时的主要原因是CPU需要先将自己cacheline状态修改I后才响应ACK,如果一个CPU很繁忙或者处于S状态的副本特别多,可能所有CPU都在等它的ACK。

CPU优化这个问题的方式是搞一个Invalid Queue,CPU先将Invalidate消息放到这个队列中,接着就响应Invalidate ACK。然而这又带来了新的问题,还是以上面的代码为例

a = 0
flag = false
func runInCpu0() {
    a = 1
    smp_wmb()
    flag = true
}

func runInCpu1() {
    while (!flag) {
   	continue
    }
    print(a)
}

我们假设a在CPU0和CPU1中,且状态均为S,flag由CPU0独占

  1. CPU0执行a=1,因为a状态为S,所以它将a=1写入Store Buffer,并发出Invalidate a消息
  2. CPU1执行while(!flag),由于其cache中没有flag,所以它发出Read flag消息
  3. CPU1收到CPU0的Invalidate a消息,并将此消息写入了Invalid Queue,接着就响应了Invlidate ACK
  4. CPU0收到CPU1的Invalidate ACK后将a=1刷到cache中,并将其状态修改为了M
  5. CPU0执行到smp_wmb(),由于Store Buffer此时为空所以就往下执行了
  6. CPU0执行flag=true,因为flag状态为E,所以它直接将flag=true写入到cache,状态被修改为了M
  7. CPU0收到了Read flag消息,因为它cache中有flag,因此它响应了Read Response,并将状态修改为S
  8. CPU1收到Read flag Response,此时flag=true,所以结束了while循环
  9. CPU1打印a,由于a存在于它的cache中且状态为S,所以直接将cache中的a打印出来了,此时a=0,这显然发生了错误。
  10. CPU1这时才处理Invalid Queue中的消息将a状态修改为I,但为时已晚

为了解决上面的问题,CPU提出了读屏障指令,linux将其封装为了smp_rwm()函数。放到我们的代码中就是这样:

...
func runInCpu1() {
    while (!flag) {
   	continue
    }
    smp_rwm()
    print(a)
}

当CPU执行到smp_rwm()时,会将Invalid Queue中的数据处理完成后再执行屏障后面的读取操作,这就解决了上面的问题了。

除了上面提到的读屏障和写屏障外,还有一种全屏障,它其实是读屏障和写屏障的综合体,兼具两种屏障的作用,在linux中它是smp_mb()函数。 文章开始提到的LOCK指令其实兼具了内存屏障的作用。

几个问题

问题1: CPU采用MESI协议实现缓存同步,为什么还要LOCK

答: 1. MESI协议只维护缓存一致性,与可见性有关,与原子性无关。一个非原子性的指令需要加上lock前缀才能保证原子性。

问题2: 一条汇编指令是原子性的吗

  1. read-modify-write 内存的指令不是原子性的,以INC mem_addr为例,我们假设数据已经缓存在了cache上,指令的执行需要先将数据从cache读到执行单元中,再执行+1,然后写回到cache。
  2. 对于没有对齐的内存,读取内存可能需要多次读取,这不是原子性的。(在某些CPU上读取未对齐的内存是不被允许的)
  3. 其他未知原因…

问题3: Go中的原子读

我们看一个读取8字节数据的例子,直接看golang atomic.LoadUint64()汇编:

// uint64 atomicload64(uint64 volatile* addr);
1. TEXT runtime∕internal∕atomic·Load64(SB), NOSPLIT, $0-12
2.	MOVL	ptr+0(FP), AX // 将第一个参数加载到AX寄存器
3.	TESTL	$7, AX // 判断内存是否对齐
4.	JZ	2(PC) // 跳到这条指令的下两条处,即跳转到第6行
5.	MOVL	0, AX // crash with nil ptr deref 引用0x0地址会触发错误
6.	MOVQ	(AX), M0 // 将内存地址指向的数据加载到M0寄存器
7.	MOVQ	M0, ret+4(FP) // 将M0寄存器中数据(即内存指向的位置)给返回值
8.	EMMS // 清除M0寄存器
9.	RET

第3行TESTL指令对两个操作数按位与,如果结果为0,则将ZF设置为1,否则为0。所以这一行其实是判断传进来的内存地址是不是8的整数倍。

第4行JZ指令判断如果ZF即零标志位为1则执行跳转到第二个操作数指定的位置,结合第三行就是如果传入的内存地址是8的整数倍,即内存已对齐,则跳转到第6行,否则继续往下执行。

关于内存对齐可以看下我这篇文章:理解内存对齐

虽然MOV指令是原子性的,但是汇编中貌似没有加入内存屏障,那Golang是怎么实现可见性的呢?我这里也并没有完全的理解,不过大概意思是Golang的atomic会保证顺序一致性,详情可看下这篇文章:Memory Order Guarantees in Go

问题4:Go中的原子写

仍然以写一个8字节数据的操作为例,直接看golang atomic.LoadUint64()汇编:

TEXT runtime∕internal∕atomic·Store64(SB), NOSPLIT, $0-16
	MOVQ	ptr+0(FP), BX
	MOVQ	val+8(FP), AX
	XCHGQ	AX, 0(BX)
	RET

虽然没有LOCK指令,但XCHGQ指令具有LOCK的效果,所以还是原子性而且可见的。

总结

这篇文章花费了我大量的时间与精力,主要原因是刚开始觉得原子性只是个小问题,但是随着不断的深入挖掘,翻阅无数资料,才发现底下潜藏了无数的坑。 s70KdH.png

由于精力原因本文还有一些很重要的点没有讲到,比如acquire/release 语义等等。

另外客观讲本文问题很多,较真的话可能会对您造成一定的困扰,建议您可以将本文作为您研究计算机底层架构的一个契机,自行研究这方面的技术。

参考资料

阅读全文 »

golang unsafe.Pointer与uintptr

先说结论

  • uintptr 是一个地址数值,它不是指针,与地址上的对象没有引用关系,垃圾回收器不会因为有一个uintptr类型的值指向某对象而不回收该对象。
  • unsafe.Pointer是一个指针,类似于C的void *,它与地址上的对象存在引用关系,垃圾回收器因为有一个unsafe.Pointer类型的值指向某对象而不回收该对象。
  • 任何指针都可以转为unsafe.Pointer
  • unsafe.Pointer可以转为任何指针
  • uintptr可以转换为unsafe.Pointer
  • unsafe.Pointer可以转换为uintptr
  • 指针不能直接转换为uintptr

为什么需要uintptr这个类型呢?

理论上说指针不过是一个数值,即一个uint,但实际上在go中unsafe.Pointer是不能通过强制类型转换为一个uint的,只能将unsafe.Pointer强制类型转换为一个uintptr。

var v1 float64 = 1.1
var v2 *float64 = &v1
_ = int(v2) // 这里编译报错:cannot convert unsafe.Pointer(v2) (type unsafe.Pointer) to type uint

但是可以将一个unsafe.Pointer强制类型转换为一个uintptr:

var v1 float64 = 1.1
var v2 *float64 = &v1
var v3 uintptr = uintptr(unsafe.Pointer(v2))
v4 := uint(v3)
fmt.Println(v3, v4) // v3和v4打印出来的值是相同的

可以理解为uintptr是专门用来指针操作的uint。 另外需要指出的是指针不能直接转为uintptr,即

var a float64
uintptr(&a) 这里会报错,不允许将*float64转为uintptr

一个🌰

通过上面的描述如果你还是一头雾水的话,不妨看下下面这个实际案例:

package foo

type Person struct {
	Name string
	age  int
}

上面的代码中我们在foo包中定义了一个结构体Person,只导出了Name字段,而没有导出age字段,就是说在另外的包中我们只能直接操作Person.Name而不能直接操作Person.age,但是利用unsafe包可以绕过这个限制使我们能够操作Person.age

package main

func main() {
	p := &foo.Person{
		Name: "张三",
	}

	fmt.Println(p)
	// *Person是不能直接转换为*string的,所以这里先将*Person转为unsafe.Pointer,再将unsafe.Pointer转为*string
	pName := (*string)(unsafe.Pointer(p)) 
	*pName = "李四"

	// 正常手段是不能操作Person.age的这里先通过uintptr(unsafe.Pointer(pName))得到Person.Name的地址
	// 通过unsafe.Sizeof(p.Name)得到Person.Name占用的字节数
	// Person.Name的地址 + Person.Name占用的字节数就得到了Person.age的地址,然后将地址转为int指针。
	pAge := (*int)(unsafe.Pointer((uintptr(unsafe.Pointer(pName)) + unsafe.Sizeof(p.Name))))
	// 将p的age字段修改为12
	*pAge = 12

	fmt.Println(p)
}

打印结果为:

$ go run main.go
&{张三 0}
&{李四 12}

需要注意的是下面这段代码比较长:

pAge := (*int)(unsafe.Pointer((uintptr(unsafe.Pointer(pName)) + unsafe.Sizeof(p.Name))))

但是尽量不要分成两段代码,像这样:

temp := uintptr(unsafe.Pointer(pName)) + unsafe.Sizeof(p.Name))
pAge := (*int)(unsafe.Pointer(temp)

原因是在第二行语句时,已经没有指针指向p了,这时p可能会回收掉了,这时得到的地址temp就是个野指针了,不知道指向谁了,是比较危险的。

另外一个原因是在当前Go(golang版本:1.14)的内存管理机制中不会迁移内存,但是不保证以后的版本内存管理机制中有迁移内存的操作,一旦发生了内存迁移指针地址发生变更,上面的分段代码就有可能出现严重问题。

关于Go的内存管理可以参看这篇文章:https://draveness.me/golang/docs/part3-runtime/ch07-memory/golang-memory-allocator/,读完这篇文章相信你就能理解上面的内存迁移问题。

除了上面两点外还有一个原因是在Go 1.3上,当栈需要增长时栈可能会发生移动,对于下面的代码:

var obj int
fmt.Println(uintptr(unsafe.Pointer(&obj)))
bigFunc() // bigFunc()增大了栈
fmt.Println(uintptr(unsafe.Pointer(&obj)))

完全有可能打印出来两个地址。

通过上面的例子应该明白了为什么这个包名为unsafe,因为使用起来确实有风险,所以尽量不要使用这个包。

我之所以研究unsafe.Pointer完全是因为我要在多线程的环境中采用原子操作避免竞争问题,所以我用到了atomic.LoadPointer(addr *unsafe.Pointer)。不过我后面发现了atomic包提供了一个atomic.Value结构体,这个结构体提供的方法使我避免显式使用了unsafe.Pointer。所以你也正在使用atomic.LoadPointer()不妨看看atomic.Value是不是可以解决你的问题,这是我一点提醒。

参考资料

阅读全文 »

数据库隔离级别以及Mysql实操

1. 事务的ACID

ACID表示原子性(atomicity)、一致性(consistency)、隔离性(isolation)和持久性(durability),一个健壮的事务处理系统必须满足这四个特性。

  • 原子性 一个事务必须是一个不可分割的最小执行单元,事务中的所有操作要么都成功,要么失败回滚所有操作。
  • 一致性 数据库总是从一个一致性的状态转移到另一个一致性的状态,事务只要没有提交那么其中的所做的所有修改都不会落地到数据库。
  • 隔离性 一般来说一个事务未提交之前,它所做的操作对其他事务是不可见的。不同的隔离级别不可见的部分是不同的。
  • 持久性 事务一旦提交,其所做所有修改都会落地到数据库

2. 隔离级别

SQL标准中定义了四种隔离级别,隔离级别定义了在一个事务中所做的修改,哪些在事务内和事务间是可见的。高级的隔离级别实现起来更复杂,带来的开销也更高,支持的并发也更低。

每种存储引擎实现的隔离级别可能是不同的,可能会在较低的隔离级别上解决该级别的某些问题,从而具有了较高隔离级别的某些能力。例如InnoDB引擎在可重复读的级别上解决了幻读的问题。

  • READ UNCOMMITTED 未提交读 在未提交读级别,可以读到未提交事务中的修改,也被称为脏读。从性能上说该级别不会比其他级别高太多,所以一般不用。
  • READ COMMITTED 提交读 事务未提交的修改其他事务是读不到的,不存在脏读的问题,但是存在不可重复读的问题,即同样的一条查询两次读取读到的数据可能是不同的。
  • REPEATABLE READ 可重复读 可重复读不存在不可重复读的问题,即同样一条查询两次读取读的数据肯定是相同的,但是理论上存在幻读的问题,幻读是指同样一条查询第二次读取可能会读到另外一个事务刚刚新增的记录。不过InnoDB引擎在此级别通过MVCC(多版本并发控制,Multiversion Concurrency Control)解决了幻读的问题。Mysql默认的隔离级别即为该级别。
  • SERIALIZABLE可串行化 可串行化是最高的隔离级别,它通过强制事务串行化执行避免了幻读的问题,性能很差实际很少用。

3. Mysql实操

Mysql版本:Server version: 8.0.18 MySQL Community Server - GPL

3.1 查看mysql当前隔离级别

mysql> select @@transaction_isolation;
+-------------------------+
| @@transaction_isolation |
+-------------------------+
| REPEATABLE-READ         |
+-------------------------+

可以看到当前隔离级别为可重复读

3.2 修改mysql隔离级别

SET [SESSION | GLOBAL] TRANSACTION ISOLATION LEVEL {READ UNCOMMITTED | READ COMMITTED | REPEATABLE READ | SERIALIZABLE}

如果指定了SESSION则只在该对话中生效,指定了GLOBAL则全局修改隔离级别。下面我们将隔离级别修改为未提交读

mysql> set session transaction isolation level READ UNCOMMITTED;
Query OK, 0 rows affected (0.00 sec)

mysql> select @@transaction_isolation;
+-------------------------+
| @@transaction_isolation |
+-------------------------+
| READ-UNCOMMITTED        |
+-------------------------+

可以看到隔离级别成功被设置为未提交读,下面我们在未提交读的隔离级别下观察下脏读的问题。

3.3 观察脏读问题

我们保持未提交读的隔离级别,然后创建一张实验表,写入两条数据

mysql> CREATE TABLE `t` (
    ->     `id` BIGINT(20) NOT NULL AUTO_INCREMENT,
    ->     `age` INT(11) NOT NULL,
    ->     `name` varchar(255) NOT NULL,
    ->     PRIMARY KEY (`id`)
    -> ) ENGINE = InnoDB;
Query OK, 0 rows affected, 2 warnings (0.21 sec)

insert into `t`(age,name) values(10,'n1');
insert into `t`(age,name) values(11,'n2');

mysql> select * from t;
+----+-----+------+
| id | age | name |
+----+-----+------+
|  1 |  10 | n1   |
|  2 |  11 | n2   |
+----+-----+------+
2 rows in set (0.00 sec)

这时我们开启事务A,然后修改id为1的记录的name为’o1’,但是不要提交事务:

mysql> begin;
Query OK, 0 rows affected (0.00 sec)

mysql> update t set name='o1' where id=1;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1  Changed: 1  Warnings: 0

此时我们新开一个窗口,查询下id=1的数据:

mysql> select @@transaction_isolation;
+-------------------------+
| @@transaction_isolation |
+-------------------------+
| REPEATABLE-READ         |
+-------------------------+
1 row in set (0.00 sec)

mysql> select * from t where id=1;
+----+-----+------+
| id | age | name |
+----+-----+------+
|  1 |  10 | n1   |
+----+-----+------+
1 row in set (0.00 sec)

在默认可重复读的隔离级别下读不到事务A的修改。

我们修改隔离级别为未提交读,再查下:

mysql> set session transaction isolation level READ UNCOMMITTED;
Query OK, 0 rows affected (0.00 sec)

mysql> select * from t where id=1;
+----+-----+------+
| id | age | name |
+----+-----+------+
|  1 |  10 | o1   |
+----+-----+------+
1 row in set (0.00 sec)

可以看到事务A没有提交,但是我们仍然读到了修改,这就是脏读。

3.4 观察不可重复读问题

我们将事务隔离级别修改为提交读:

mysql> select @@transaction_isolation;
+-------------------------+
| @@transaction_isolation |
+-------------------------+
| READ-COMMITTED          |
+-------------------------+
1 row in set (0.00 sec)

然后开启事务A,执行一条查询sql:

mysql> begin;
Query OK, 0 rows affected (0.00 sec)

mysql> select * from t where id =1;
+----+-----+------+
| id | age | name |
+----+-----+------+
|  1 |  10 | n1   |
+----+-----+------+
1 row in set (0.00 sec)

然后我们新开一个窗口,修改id=1的记录:

mysql> update t set name='o1' where id=1;
Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0

然后我们回到事务A,然后重新执行上一条查询:

mysql> select * from t where id =1;
+----+-----+------+
| id | age | name |
+----+-----+------+
|  1 |  10 | o1   |
+----+-----+------+
1 row in set (0.00 sec)

可以看到在一个事务中两次相同查询查到的结果是不同的,这就是不可重复读问题。

3.5 验证不可重复读隔离级别下是否解决了脏读问题

当前表数据为:

mysql> select * from t;
+----+-----+------+
| id | age | name |
+----+-----+------+
|  1 |  10 | o1   |
|  2 |  11 | n2   |
+----+-----+------+
2 rows in set (0.00 sec)

然后开启一个事务将id=1的记录的name改为’n1’,但是不要提交:

mysql> begin;
Query OK, 0 rows affected (0.00 sec)

mysql> update t set name='n1' where id=1;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1  Changed: 1  Warnings: 0

这时在另外一个窗口中查下:

mysql> select * from t;
+----+-----+------+
| id | age | name |
+----+-----+------+
|  1 |  10 | o1   |
|  2 |  11 | n2   |
+----+-----+------+

可以看到此时没有查询到未提交的事务中的修改,就是说提交读隔离级别解决了脏读问题。

3.6 验证可重复读隔离级别是否解决了不可重复读问题

首先将隔离级别修改为可重复读

mysql> select @@transaction_isolation;
+-------------------------+
| @@transaction_isolation |
+-------------------------+
| REPEATABLE-READ         |
+-------------------------+

然后我们开启一个事务A,查询下id=1的记录:

mysql> begin;
Query OK, 0 rows affected (0.00 sec)

mysql> select * from t where id=1;
+----+-----+------+
| id | age | name |
+----+-----+------+
|  1 |  10 | o1   |
+----+-----+------+

然后再另一个窗口中修改name为’n1’:

mysql> update t set name='n1' where id =1;
Query OK, 1 row affected (0.01 sec)

这时回到事务A中重新查询下id=1的记录:

mysql> select * from t where id=1;
+----+-----+------+
| id | age | name |
+----+-----+------+
|  1 |  10 | o1   |
+----+-----+------+

可以看到在一个事务中两次读到的是相同的,不可重复读问题已解决。

3.7 观察幻读问题

隔离级别保持为可重复读,将表t的存储引擎修改为MyISAM(因为InnoDB引擎解决了幻读的问题,所以我们这里需要将存储引擎修改为MyISAM才能观察到)

mysql> alter table t ENGINE=MyISAM;
Query OK, 2 rows affected (0.19 sec)
Records: 2  Duplicates: 0  Warnings: 0

mysql> show create table t;
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table                                                                                                                                                                                                                     |
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| t     | CREATE TABLE `t` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `age` int(11) NOT NULL,
  `name` varchar(255) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=MyISAM AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci |
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

开启一个事务A,查询下所有记录:

mysql> begin;
Query OK, 0 rows affected (0.00 sec)

mysql> select * from t;
+----+-----+------+
| id | age | name |
+----+-----+------+
|  1 |  10 | n1   |
|  2 |  11 | n2   |
+----+-----+------+

然后我们新开一个窗口插入一行新记录:

mysql> insert into t(age,name) values (12, 't3');

然后回到事务A中重新查询下:

mysql> select * from t;
+----+-----+------+
| id | age | name |
+----+-----+------+
|  1 |  10 | n1   |
|  2 |  11 | n2   |
|  3 |  12 | t3   |
+----+-----+------+

可以看到第二次查询查到了新增的记录,两次查询结果是不同的,这就是幻读问题。

3.8 验证下InnoDB引擎是否解决了幻读问题

我们将表的存储引擎修改为InnoDB:

mysql> alter table t ENGINE=InnoDB;
Query OK, 3 rows affected (11.52 sec)
Records: 3  Duplicates: 0  Warnings: 0

mysql> show create table t;
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table                                                                                                                                                                                                                     |
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| t     | CREATE TABLE `t` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `age` int(11) NOT NULL,
  `name` varchar(255) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci |
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

这时我们开启事务A,查询下所有表记录:

mysql> begin;
Query OK, 0 rows affected (0.00 sec)

mysql> select * from t;
+----+-----+------+
| id | age | name |
+----+-----+------+
|  1 |  10 | n1   |
|  2 |  11 | n2   |
|  3 |  12 | t3   |
+----+-----+------+

然后这时在另外一个窗口中新增一条记录:

mysql> insert into t(age,name) value (1, 't10');

执行完成后回到事务A,重新查一下:

mysql> select * from t;
+----+-----+------+
| id | age | name |
+----+-----+------+
|  1 |  10 | n1   |
|  2 |  11 | n2   |
|  3 |  12 | t3   |
+----+-----+------+

可以看到第二次查询跟第一次查询结果是相同的,就是说InnoDB解决了幻读问题。

阅读全文 »

Redis部署方案的演进

一、前言

多年前曾看到过一篇讲解Redis的文章,文章以单节点部署存在的不足开始,一步一步寻找解决方案来提高Redis服务的可用性,最终引出了Redis Cluster与Codis两种不同的集群方案,并给出了两种集群方案的优劣,文章质量非常高。
当时虽然理解了但后面就基本忘了差不多了,不如今天用自己的语言按照这篇文章的思路尝试自己描述一遍加深记忆与理解。

二、Redis部署方案的演进

1. 单点部署

image.png 系统中只有一个redis服务器,所有请求都打到这一台机器上。 随着业务发展,整个系统对redis读的请求量逐渐增加,一台机器逐渐扛不住,所以我们增加了两台从库来分担主库读压力,所以又有了主从架构

2. 简单主从

image.png 写的请求全部打到Master节点,读的请求分担到Slave节点,Slave是readonly的。 好了,我们现在抗住了较大的读请求,但是这个系统跟上面的单点系统都存在一个问题:Master节点挂掉后,整个系统不可写(因为Slave节点还存活所以系统还可以支撑部分读的请求),导致系统不可用。 虽然可以在发现故障后手动切换Slave节点为Master,但是人工操作还是需要消耗一段时间的,还是不可接受的。我们还需要优化架构提高系统可用性,因为我们引入哨兵机制,使得Master挂点后由Slave节点能够自动切换为Master继续提供服务。

3. 哨兵模式

Redis中提供了Sentinel的能力,Sentinel以一个单独进程的形式存在,它可以监控Master节点,一旦master节点挂点会立即选出一个Slave节点切换为新Master。因为Sentinel也存在单点故障的隐患,所以Sentinel通常也是一个集群形式。 image.png

Sentinel会监听Master节点与Slave节点,同时它们之间也会互相监听运行状态并交换节点检测的状态。

一个Sentinel检测到一个实例超过阈值时间没有回复PING,那这个实例会被该Sentinel标记为主观下线。如果Master被标记主观下线则所有Sentinel都要以每秒1次频率判断该Master是否下线,当超过一定数量的Sentinel都认为Master下线,则Master会被标记为客观下线,然后协商出来一个Slave作为Master节点。

到此为止我们已经实现了Redis的高可用,不过这时我们业务进入了一个高速发展的阶段,key的数量达到了一个非常高的量级,redis内存不断告急,运维不断的扩容,RDB文件这时变得特别大,主从同步也变得非常缓慢,另外这时写的请求量也上来了,单Master已经扛不住了,这时就需要分片存储了,将key均匀的分布到多个Master上,减小单台redis内存,分担单个Master压力。

4. Redis Cluster

Redis Cluster 是redis官方提供的分布式方案,它虚拟出16384个槽,通过crc16(key) % 16384计算出key映射到了哪个槽上,集群中的每个节点维护其中一部分槽,节点间会互相通信告诉其他节点自己维护了哪些槽。

客户端一开始会随机选择一个节点连接,然后发送自己要操作的key,该节点通过crc16(key) % 16384计算出key所在的槽,如果该槽由自己维护那就直接返回操作结果了,如果不是由它维护的槽它会返回一个MOVED操作,客户端根据MOVED操作提供的信息转向正确的节点。

image.png

5. Codis

Codis是豌豆荚开源的Redis分布式方案,Codis分为1024个槽,key到槽的算法为crc32(key) % 1024 槽位与节点的映射关系存储在CodisProxy上,因为CodisProxy也存在单点故障隐患,所以CodisProxy也要做集群。redis客户端连接到CodisProxy上而非真实的redis节点。
CodisProxy实际是利用Zookeeper来存储映射关系,不同CodisProxy用Zookeeper来同步映射。
Codis中还有一个codis-ha (ha:High Availability)的组件,用来监控CodisProxy的状态,同时替代了哨兵用来执行节点的主从切换,从而实现高可用。 image.png

三、参考资料

阅读全文 »

x64架构下Linux系统函数调用

一、 函数调用相关指令

关于栈可以看下我之前的这篇文章x86 CPU与IA-32架构

在开始函数调用约定之前我们需要先了解一下几个相关的指令

1.1 push

pushq 立即数 # q/l是后缀,表示操作对象的大小
pushl 寄存器

push指令将数据压栈。具体就是将esp(stack pointer)寄存器减去压栈数据的大小,再将数据存储到esp寄存器所指向的地址。

1.2 pop

popq 寄存器
popl 寄存器

pop指令将数据出栈并写入寄存器。具体就是将数据从esp寄存器所指向的地址加载到指令的目标寄存器中,再将esp寄存器加上出栈的数据的大小。

1.3 call

call 立即数
call 寄存器
call 内存

call指令会调用由操作数所代表的地址指向的函数,一般都是call一个符号。call指令会将当前指令寄存器中的内容(即这条call指令下一条指令的地址,也就是函数执行完的返回地址)入栈,然后跳到函数对应的地址开始执行。

1.4 ret

ret指令用于从子函数中返回,ret指令会先弹出当前栈顶的数据,这个数据就是先前调用这个函数的call指令压入的“下一条指令的地址”,然后跳转到这个地址执行。

1.5 leave

leave相当于执行了movq %rbp, %rsp; popq %rbp,即释放栈帧。

二、 函数调用约定

函数调用约定约定了caller如何传参即将实参放到何处,应该按照何种顺序保存,以及callee如何返回返回值即将返回值放到何处。

x86的32位机器之上C语言一般是通过栈来传递参数,且一般都是倒序push,即先push最后一个参数再push倒数第二个参数,并通过ax寄存器返回结果,这称为cdecl调用约定(C有三种调用约定,linux系统中使用cdecl),Go与之类似但是区别在于Go通过栈来返回结果,所以Go支持多个返回值。

x64架构中增加了8个通用寄存器,C语言采用了寄存器来传递参数,如果参数超过。在x64系统默认有System V AMD64Microsoft x64两种C语言函数调用约定,System V AMD64实际是System V AMD64 ABI文档的一部分,类UNIX系统多采用System V的调用约定。

System V AMD64 ABI文档地址https://software.intel.com/sites/default/files/article/402129/mpx-linux64-abi.pdf

本文主要讨论x64架构下Linux系统的函数调用约定即System V AMD64调用约定。

三、 x64架构下Linux系统函数调用

3.1 如何传递参数

System V AMD64调用约定规定了caller将第1-6个整型参数分别保存到rdirsirdxrcxr8r9寄存器中,第7个及之后的整型参数从右往左倒序的压入栈中。前8个浮点类型的参数放到xmm0-xmm7寄存器中,之后的浮点类型的参数从右往左倒序的压入栈中。

3.2 如何返回返回值

对于整型返回值要保存到rax寄存器中,浮点型返回值保存到xmm0寄存器中。

3.3 栈的对齐问题

System V AMD64要求栈必须按照16字节对齐,就是说在通过call指令调用目标函数之前栈顶指针即rsp指针必须是16的倍数。之所以要按照16字节对齐是因为x64架构引入了SSE和AVX指令,这些指令要求必须从16的整数倍地址取数,为了兼顾这些指令所以就要求了16字节对齐。

3.4 变长参数

这部分没看懂,待后续发掘。

四、 实际案例分析

4.1 案例1

看下下面这段C代码

unsigned long long foo(unsigned long long param1, unsigned long long param2) {
    unsigned long long sum = param1 + param2;
    return sum;
}

int main(void) {
    unsigned long long sum = foo(8589934593, 8589934597);
    return 0;
}

uname -a: Linux xxx 3.10.0-514.26.2.el7.x86_64 #1 SMP Tue Jul 4 15:04:05 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux gcc -v: gcc 版本 4.8.5 20150623 (Red Hat 4.8.5-39) (GCC)

转为汇编代码,gcc -S call.c

    .file   "call.c"
    .text
    .globl  foo
    .type   foo, @function
foo:
.LFB0:
    .cfi_startproc
    pushq   %rbp
    .cfi_def_cfa_offset 16
    .cfi_offset 6, -16
    movq    %rsp, %rbp
    .cfi_def_cfa_register 6
    movq    %rdi, -24(%rbp)
    movq    %rsi, -32(%rbp)
    movq    -32(%rbp), %rax
    movq    -24(%rbp), %rdx
    addq    %rdx, %rax
    movq    %rax, -8(%rbp)
    movq    -8(%rbp), %rax
    popq    %rbp
    .cfi_def_cfa 7, 8
    ret
    .cfi_endproc
.LFE0:
    .size   foo, .-foo
    .globl  main
    .type   main, @function
main:
.LFB1:
    .cfi_startproc
    pushq   %rbp
    .cfi_def_cfa_offset 16
    .cfi_offset 6, -16
    movq    %rsp, %rbp
    .cfi_def_cfa_register 6
    subq    $16, %rsp
    movabsq $8589934597, %rsi
    movabsq $8589934593, %rdi
    call    foo
    movq    %rax, -8(%rbp)
    movl    $0, %eax
    leave
    .cfi_def_cfa 7, 8
    ret
    .cfi_endproc
.LFE1:
    .size   main, .-main
    .ident  "GCC: (GNU) 4.8.5 20150623 (Red Hat 4.8.5-39)"
    .section    .note.GNU-stack,"",@progbits

我们先看main函数的汇编代码,main函数中首先执行了三条指令:

pushq   %rbp # 将当前栈基底地址压入栈中
movq    %rsp, %rbp # 将栈基底地址修改为栈顶地址
subq    $16, %rsp # 栈顶地址-16,栈扩容,这里没搞懂为什么要扩容,有懂的同学欢迎评论区指点下

这三条指令是用来分配栈帧的,执行完成后栈变成下方的样子: image.png 继续往下看:

movabsq $8589934597, %rsi # 先将第二个参数保存到rsi寄存器
movabsq $8589934593, %rdi # 再将第一个参数保存到rdi寄存器
call foo # 调用foo函数,这一步会将下一条指令的地址压到栈上

执行完call foo指令后,栈的情况如下: image.png

然后我们跳到foo函数中看下:

pushq   %rbp # 将当前栈基底地址压入栈中
movq    %rsp, %rbp # 将栈基底地址修改为栈顶地址

开头仍然是建立栈帧的指令,执行完成后,此时栈帧的样子如下: image.png

继续往下看:

movq    %rdi, -24(%rbp)
movq    %rsi, -32(%rbp)
movq    -32(%rbp), %rax # 将第二个参数保存到rax寄存器
movq    -24(%rbp), %rdx # 将第一个参数保存到rdx寄存器
addq    %rdx, %rax # 执行加法并将结果保存在rax寄存器
movq    %rax, -8(%rbp) 
movq    -8(%rbp), %rax # 将返回值保存到rax寄存器

这里没搞懂为什么需要先挪到内存中再保存到rax寄存器上,可能是编译器实现起来比较方便吧,有懂的同学欢迎评论区指点下

此时栈情况: image.png foo函数最后执行了以下两条指令:

popq    %rbp # 将栈顶值pop出来保存到rbp寄存器,即修改栈基底地址为当前栈顶值,同时栈顶指针-8
ret # 从子函数中返回到main函数中

最终结果如图: image.png

4.2 案例2

我们修改下函数foo,使它接收9个参数验证下上面的理论。

unsigned long long foo(unsigned long long param1, unsigned long long param2, unsigned long long param3, unsigned long long param4, unsigned long long param5, unsigned long long param6, unsigned long long param7, unsigned long long param8, unsigned long long param9) {
    unsigned long long sum = param1 + param2;
    return sum;
}

int main(void) {
    unsigned long long sum = foo(8589934593, 8589934597, 3, 4,5,6,7,8,9);
    return 0;
}

编译为汇编后:

foo:
.LFB0:
    .cfi_startproc
    pushq   %rbp
    .cfi_def_cfa_offset 16
    .cfi_offset 6, -16
    movq    %rsp, %rbp
    .cfi_def_cfa_register 6
    movq    %rdi, -24(%rbp)
    movq    %rsi, -32(%rbp)
    movq    %rdx, -40(%rbp)
    movq    %rcx, -48(%rbp)
    movq    %r8, -56(%rbp)
    movq    %r9, -64(%rbp)
    movq    -32(%rbp), %rax
    movq    -24(%rbp), %rdx
    addq    %rdx, %rax
    movq    %rax, -8(%rbp)
    movq    -8(%rbp), %rax
    popq    %rbp
    .cfi_def_cfa 7, 8
    ret
    .cfi_endproc
.LFE0:
    .size   foo, .-foo
    .globl  main
    .type   main, @function
main:
.LFB1:
    .cfi_startproc
    pushq   %rbp
    .cfi_def_cfa_offset 16
    .cfi_offset 6, -16
    movq    %rsp, %rbp
    .cfi_def_cfa_register 6
    subq    $40, %rsp
    movq    $9, 16(%rsp) # 后6个参数放到栈上
    movq    $8, 8(%rsp)
    movq    $7, (%rsp)
    movl    $6, %r9d # 前6个参数分别使用rdi rsi rdx ecx r8 r9寄存器
    movl    $5, %r8d
    movl    $4, %ecx
    movl    $3, %edx
    movabsq $8589934597, %rsi
    movabsq $8589934593, %rdi 
    call    foo
    movq    %rax, -8(%rbp)
    movl    $0, %eax
    leave
    .cfi_def_cfa 7, 8
    ret

五、 参考资料

阅读全文 »

IEEE754标准浮点数表示与舍入

友情提示:本文排版不太好,但内容简单,请耐心观看,总会搞懂的。

1. 定点数

对于一个无符号二进制小数,例如101.111,如果我们要用2个字节即16位来存储它,我们可以约定用高8位存储小数点前的数字,用低8位存储小数点后的数字,这样的话它在存储空间中就是这样的:00000101.11100000。这种存储方式中小数点的位置是固定的,这称为定点数。这种存储方式有个问题那就是存储的数值是有上限的即11111111.11111111 = 2^7^+2^6^+2^5^+2^4^+2^3^+2^2^+2^1^+2^0^+2^-1^+2^-2^+2^-3^+2^-4^+2^-5^+2^-6^+2^-7^+2^-8^。如果我们要存储1111111111111111.这个数的话,用这个存储方式是无法存储的,但是实际上对于这个数来说16位的存储空间是够用的,就是说定点数存在空间浪费的缺点。

基于这个缺点,计算机中通常用浮点数来表示一个小数。

2. 浮点数

IEEE754标准使用V = (-1)^s^ × M × 2^E^表示浮点数,符号位(sign)s 决定该数是正数(s=0)还是负数(s=1),尾数(significand)M是一个二进制小数,阶码(exponent) E。

单精度浮点数中,s占用1位,M占用23位,E占用8位,总共32位,双精度浮点数s占1位,M占52位,E占11位,总共64位,这两种分别对应C中的float和double,另外还有一个扩展双精度它占用80位。

image.png

根据E值,浮点数有三种情况,

2.1 规格化的:E所有位既不全为0也不全为1。

在这种情况中,阶码被解释为以偏置(biased)形式表示的有符号整数,这时E的值表示为E=e-Bias,其中e为E所占位所表示的无符号整数,Bias=2^E所占位数^-1。举个单精度浮点数的🌰,假设当前E为00001010那么E = (00001010所对应的无符号整数) - (2^8^ - 1) = 10 - 127 = -117。

这种情况中M用来表示小数,其二进制表示为1.f~-1~f~-2~f~-3~……f~n~。举个单精度的例子,假设当前M为01100000000000000000100,那么M=1 + (2^-2^ + 2^-3^ + 2^-21^)。

2.2 非规格化的:E所有位都为0

在这种情况中,阶码值E=1-Bias,而尾数M二进制表示为0.f~-1~f~-2~f~-3~……f~n~,没有规格化值前面的1。
非规格化值有两个用途。首先规格化值M始终>1,所以没法表示0,所以+0.0的浮点表示的位模式为全0:符号位0,阶码字段全为0(表明是一个非规格化值),尾数都是0就得到M=0.0。如果符号位为1,我们就得到了-0.0。其次非规格值的另外一个用途是表示那些非常接近0.0的数。

2.3 特殊值:E所有位都为1,这时又有两种以下两种情况

  1. 无穷大:M所有位全为0,当符号位为0是就是正无穷,当符号位为1时就表示负无穷。当我们把两个特别大的数相乘或者除0的时候无穷能表示溢出的结果。
  2. NaN(Not a Number):M不全为0,如果一些运算的结果不能是实数或者无穷,比如对-1开平方根时就会返回NaN。

经过上面的讲解后我们思考下十进制数15.3203125使用单精度浮点数来表示的话其二进制形式应该是什么呢?我们首先将它转为二进制数,即:1111.0101001 = 1.1110101001 × 2^3^,即M=1.1110101001,E=3。

3. 浮点数舍入

浮点数并不能表示所有的实数,比如十进制的2.1没有完全对应的二进制数,浮点数只能近似的表示一些实数,为了尽量精确的表示这个实数就只能尽量增加二进制的位数,但是数据类型的位数是有限的,比如C中float只有32位。

关于十进制小数如何转二进制不清楚的同学可以自行搜索下相关文章,很简单,这里就不详述了。

这里举个例子:将十进制的2.1用单精度浮点数表示。首先小数点前的2转为二进制是10,然后我们将小数点后的0.1转为2进制,它是这个样子的:0.000110011001100110011001100110011001100110011001100110011...(后面是0011无限循环)所以2.1转为二进制就是:10.000110011001100110011001100110011001100110011001100110011...,转为IEEE标准表达方式就是 1.0000110011001100110011001100110011001100110011001100110011… × 2^1^,即M=0.0000110011001100110011001100110011001100110011001100110011… + 1,但单精度浮点数位数只有23位,这样就面临一个问题00001100110011001100110(这里是23)01100110011001100110011001100110011...这一长串23位之后的数字怎么办?直接舍去后面的位的话意味着计算机中所有小数都小于等于它的实际值,进1的话意味着计算机中所有小数都大于等于它的实际值,四舍五入看起来不错,但是由于中间的5会进位,所以仍然会使计算系统中的小数整体偏大。在进行一些大量数据的统计时,这三种方式都回累计一个相当大的误差。

IEEE浮点格式定义了四种不同的舍入方式,下面以十进制的小数舍入只保留小数点后0位为例:

方式 1.40 1.60 1.50 2.50 -1.50
向偶数舍入 1 2 2 2 -2
向零舍入 1 1 1 2 -1
向下舍入 1 1 1 2 -2
向上舍入 2 2 2 2 -1

向偶数舍入这个方式乍看可能没看懂,它其实是使舍入后的值的最低有效数字是偶数。1.5舍入有两个选择:1和2,但由于2是偶数所以就舍入到2,同样2.5舍入有两个选择:2和3,但由于3是奇数,所以还是舍入到2。

向偶数舍入的方式使得在大多数情况下,5舍去还是进位的概率是差不多的,在进行一些大量数据的统计时产生的偏差相较其他方式小一些。

4. 二进制舍入的🌰与规则总结

好多中文资料一般到这里就戛然而止了,CSAPP书中讲到这也没有给到一个二进制的例子,相信大部分读者看完了上面也不知道二进制里是怎么处理的,所以下面给个二进制舍入的例子。

假设我们要求只保留小数点后三位,有以下例子:

  1. 1.001 011 舍入后: 1.001 原因: 1.001 011舍入有两个选择:1.0011.010|1.001 011 - 1.001| = 0.000 011|1.001 011 - 1.010| = 0.000 101,显然0.000 011 < 0.000 101,所以1.0011.010更接近原值1.001 011,所以舍入到了1.001
  2. 1.001 101 舍入后: 1.010 原因: 1.001 101舍入有两个选择:1.0011.010|1.001 101 - 1.001| = 0.000 101|1.001 101 - 1.010| = 0.000 011,显然0.000 101 > 0.000 011所以舍入到后者。
  3. 1.001 100 舍入后: 1.010 原因: 1.001 100舍入有两个选择:1.0011.010|1.001 100 - 1.001| = 0.000 100|1.001 100 - 1.010| = 0.000 100,两种选择的差值是相同的,这时使用向偶数舍入的方式,1.010是偶数(0偶1奇),所以舍入到1.010

根据上面的例子我们总结出以下规律: 我们用RR…RDD…D来表示一个二进制小数,R表示保留位,D表示舍去位,那么有以下规则: 1. DD…D < 10…0 直接舍去 2. DD…D > 10…0 向上舍入 3. DD…D = 10…0 向偶数舍入,细则: 1. RR…R = XX…0,直接舍去 2. RR…R = XX…1,向上舍入

5. 代码验证下

最后,我们写一段C代码,看下到底是不是按照IEEE754标准存的浮点数,代码如下:

int main(void) {
    float a = 2.1;
    float b = a + 3;
    return 0;
}

gcc编译下:

$ gcc -O0 -g float.c // -O0禁用优化,-g以下面使用gdb调试

gdb调试下:

$ gdb ./a.out

进入gdb后,输入start再输入layout asm查看反汇编结果: image.png 可以看到a的值被存入了寄存器eax,在gdb中通过i r eax查看eax寄存器中的值: image.png 可以看到eax寄存器中保存的值是0x400666666,转为二进制:01000000000001100110011001100110,套入IEEE754标准表示法: 0 10000000 00001100110011001100110,即符号位为0,M = 1.00001100110011001100110,E = 2^7^ - (2^7^ - 1) = 1

参考资料

阅读全文 »

CPU Cache与False Sharing

一、CPU 缓存架构

现代多核CPU会在每个核心上加上一个较小的SRAM高速缓存存储器称为:L1高速缓存,其中L1缓存由分为dcache数据缓存,icache指令缓存。在L1缓存的下级加一个较大的L2高速缓存, 然后会再L2之下加一个多核共享的L3高速缓存。它们之间的逻辑结构大概是这样的: image.png

相较于访问CPU高速缓存来说访问主存简直太慢了,Jeff Dean曾给出过这样一组数字:

  • L1缓存访问时间 0.5ns
  • 分支预测错误 5ns
  • L2缓存访问时间 7ns
  • 主存访问 100ns

https://colin-scott.github.io/personal_website/research/interactive_latency.html 给出了不同年份这些指标的数字,

1.1 通用的高速缓存存储器结构

告诉缓存被划分为S = 2 ^ s高速缓存组(cache set),每个组含有E个高速缓存行(cache line),每个行又由B = 2 ^ b个字节的数据块(block)和有一个标识该行是否有效(即是否已过期或者已修改)的有效位(valid bit),以及t标记位(tag bit)组成。物理结构大概是下图这样: image.png

高速缓存的大小指的是所有数据块的大小的和,即:B * E * S。

假设当前CPU地址总线有m位,从而主存有M=2^m个地址,所以每个内存地址会有m位。当CPU需要从主存加载地址为A的内存块上的字节时,会先从CPU高速缓存中获取,这时候问题来了,应该到高速缓存的什么位置去拿呢?实际上会将m位地址A划分为以下几段:

image.png

这里的m、t、b、s都是二进制数,不要想成十进制数哦

从而有m = t + b + s CPU会根据A地址中间的s位定位主存地址A映射到了哪个组中,然后根据开头的t位与组内的E个行的标记位挨个比对以确认地址A映射到了哪一行(这里有文章说是并行查询),这时会检查该行的有效位标识该行是否有效,如果有效的话最后根据后b位就直接定位主存地址A映射到了数据块中的哪个字节上了。如果通过以上的步骤没有找到对应的高速缓存地址的话,高速缓存会向主存请求包含A地址的数据块的一个拷贝,CPU必须等待,当被请求的块到达高速缓存中时高速缓存会将这个块放到对应的位置上,然后从数据块中抽取出A地址上的字节返回给CPU。注意:高速缓存从主存中一次请求的是一个数据块而非具体地址上的一个字节,这非常关键!

CSAPP书中提到了为什么选择中间位作为组索引位,其大概意思是选择中间位能够使连续内存映射到不同的组上,提高高速缓存利用率并且减小冲突覆盖的问题,但是个人感觉其解释是按照特定平台来描述的,并没有普适所有平台,这里就不以我昏昏使你昭昭了,待后续查阅更加合理的解释再说。

根据E的不同我们将高速缓存划分为以下三类:

  • 直接映射高速缓存
  • 组相连高速缓存
  • 全相连高速缓存

1.2 直接映射高速缓存

每组只有一行即E=1的高速缓存称为直接映射高速缓存。这种缓存一旦定位到了组后就无需查询对应行了。

1.2.1 直接映射高速缓存的冲突不命中

假设当前计算机b=16即数据块为16个字节,高速缓存中有两个组,s=5即内存地址的第5位决定内存地址映射到哪个组上,有下面的一段golang代码

func foo(x [8]int32, y [8]int32) int32 {
	var sum int32 = 0
	for i := 0; i < 8; i++ {
		sum += x[i] * y[i]
	}

	return sum
}

上面的程序中xy占用了8 * 4 = 2 ^ 5 = 32个字节,假设x被加载到地址为0-31的内存之中,y被加载到32-63之中,sum存在于寄存器中,不占用内存地址。如下图所示 image.png 运行时,第一次循环中,CPU需要先加载x[0],由于高速缓存中一开始并没有,所以会从主存中加载x[0]-x[3]共16个字节(数据块的大小)的数据到高速缓存的组0中再返回给CPU,接下来同样的道理会将y[0]-y[3]加载到高速缓存的组0中。这时候由于每组只有一个行就导致了上一步加载进来的x[0]-x[3]被覆盖了,下一次循环中要加载x[1]时,x[1]就不在高速缓存中了,所以又必须去内存中加载一次,结果从内存中加载会的数据又把第二次加载进来的y[0]-y[3]给覆盖了,之后的每次循环都存在这个问题,导致每次都回冲突不命中。这种高速缓存反复地加载和驱逐相同的高速缓存块的组的情况称为抖动(thrash)

为了解决这个问题我们可以将x和y两个数组的长度定为12,即:

func foo(x [12]int32, y [12]int32) int32

这样的话再看下分布情况: image.png

这样的话由于y[0]-y[3]与x[0]-x[3]不在一个组上就不会出现抖动问题了。

1.3 组相联高速缓存

组相联高速缓存每组中有多个行。

1.4 全相联高速缓存

全相联高速缓存只有一个组。

1.5 写的问题

如果CPU要写一个已经缓存了的字时,有两种方法将该数据写到下层缓存中: 1. 直写,最简单的一种方法,直接将数据写入到下层缓存。但是这种方案每次写都回引起总线流量 2. 写回,为每个行单独维护一个修改位dirty bit,标识这个缓存块被修改过,只有当替换算法要驱逐更新过的块时才将它写入到下一层缓存中。

写不命中通常有两种方法: 1. 写分配,加载低一层的缓存到高速缓存中,然后更新这个数据块。缺点是每次不命中都会导致一个块从低一层传送到高速缓存。 2. 非写分配,避开高速缓存,直接把这个数据写入到低一层中。

直写通常是非写分配的,写会通常是写分配的。

二、伪共享False Sharing

通过上文了解CPU的缓存结构后我们做一个实验来引出伪共享的问题,实验前我们先看下实验机器的一些信息。Mac上通过sysctl -a查看机器信息,这里我过滤了下只拿出来与此实验相关的一些机器指标:

hw.cachelinesize: 64 // Cacheline 64字节
hw.l1icachesize: 32768
hw.l1dcachesize: 32768 // L1数据缓存32K
hw.l2cachesize: 262144 // L2缓存256K
hw.l3cachesize: 6291456 // L3缓存6M
machdep.cpu.core_count: 4 // 4核
machdep.cpu.thread_count: 8

现在我们定义一个程序,有2个线程,两个变量a和b,线程1循环n次执行a++操作,线程2执行n次b++操作,我们用Go来描述:

type SimpleStruct struct {
	n int32
}

type PaddedStruct struct {
	n int32
	_ CacheLinePad
}

type CacheLinePad struct {
	_ [CacheLinePadSize]byte
}

const CacheLinePadSize = 64

const Num = 10000000

func BenchmarkSimple(b *testing.B) {
	structA := SimpleStruct{}
	structB := SimpleStruct{}
	wg := sync.WaitGroup{}

	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		wg.Add(2)
		go func() { // 为方便下文描述这个线程称为structA线程
			var j int32
			for j = 0; j < Num; j++ {
				structA.n += j
			}
			wg.Done()
		}()
		go func() { // 为方便下文描述这个线程称为structB线程
			var j int32
			for j = 0; j < Num; j++ {
				structB.n += j
			}
			wg.Done()
		}()
		wg.Wait()
	}
}

func BenchmarkSimplePad(b *testing.B) {
	structB := SimpleStruct{}
	structA := PaddedStruct{}
	wg := sync.WaitGroup{}
	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		wg.Add(2)
		go func() {
			var j int32
			for j = 0; j < Num; j++ {
				structA.n += j
			}
			wg.Done()
		}()
		go func() {
			var j int32
			for j = 0; j < Num; j++ {
				structB.n += j
			}
			wg.Done()
		}()
		wg.Wait()
	}
}

运行benchmark go test -bench=. 得到以下结果 image.png 可以看到我们只在结构体中加入了一个64字节的元素性能就得到了极大的提高,这是为什么呢?

我们看下Simple这个函数的代码,假设structA线程运行在core1上,structB线程运行在core2s上,假设structA线程先执行,它会将structA这个变量与structB一起加载到core1的L1的同一cacheline中,structB线程也会将structA这个变量与structB一起加载到core2的L1的同一cacheline,structA线程修改了structA的值,它会将这个事件同步到core2上,导致core2上cacheline失效,这时就需要从低一层存储中加载最新数据,然后structB又修改了structB,又导致了cacheline失效,循环往复导致了运行效率极低。

而SimplePad这个函数中structA中加入了cachelinesize个字节,使得structA和structB处于不同的cacheline上,也就避免了上面的问题。

2.1 题外话

  1. 关于多核间同步缓存我没有查到特别好的文章,所以我就不妄加解释了,如果你想深入研究的话可以搜索这个关键词:MESI协议
  2. 上面的实验代码来自于【译】CPU 高速缓存原理和应用。最初在我在做这个实验时,写的实验代码是这样的: var a int32 var pad [64]byte{} var b int32 ...

运行benchmark后发现运行时间并没有缩短,后来获取了a、pad、b的地址后才发现go将pad这个变量分配到了堆上,a和b两个变量在内存上还是紧挨着的,你做实验的话可以吸收这个经验:如果加上pad后发现运行时间没有缩短的话确认下a和b是不是真的被分隔到了两个cacheline上。

参考资料

阅读全文 »

理解内存对齐

相信大家都听说过内存对齐的概念,不过这里还是通过一个现象来引出本篇话题。

一、求一个结构体的size

猜下下面这个结构体会占用多少字节

type S struct {
    B byte  // Go中一个byte占1字节,int32占4个字节,int64占8个字节
    I64 int64
    I32 int32
}

是不是以为是1+8+4 = 13个字节?写段代码验证下:

type S struct {
	B   byte
	I64 int64
	I32 int32
}

func main() {
	s := S{}
	fmt.Printf("s size:%d\n", unsafe.Sizeof(s))
}

输出:

s size:24

与预想显然不同,这是为什么呢?答案是编译器替我们做了内存对齐。

二、什么是内存对齐

要理解这个问题需要先了解一下字长的概念以及内存的物理结构

2.1 字长

在计算器领域,对于某种特定的计算机设计而言,字(word)是用于表示其自然的数据单位的术语。在这个特定计算机中,字是其用来一次性处理事务的一个固定长度的位(bit)组。一个字的位数即为字长

字长在计算机结构和操作的多个方面均有体现,计算机中大多数寄存器(这里应该是指通用寄存器)的大小是一个字长

上面这段话可能太过于概念化不太好理解,那么请看下面的这段64位机器上的GUN汇编器语法的汇编代码:

movq (%ecx) %eax

这段汇编代码是将eax这个寄存器中的数据作为地址访问内存,并将内存中的数据加载到eax寄存器中。

我们可以看到mov指令的后缀是q,意味着该指令将加载一个64位的数据到eax寄存器中,这样一条指令可以操作一个64位的数据,说明该机器的字长为64位,同时这段代码能够执行则说明我们机器上的CPU中的eax寄存器必定是64位的,而一条指令能够从内存中加载一个64位的数据也说明了数据总线的位宽也为64位,说明了我们的CPU可以一次从内存中加载8个字节的数据。

2.2 64位内存物理结构

内存是由若干个黑色颗粒组成的,每个内存颗粒叫做一个chip,每个chip是由8个bank组成,每个bank是二维平面上的矩阵,矩阵中的每个元素保存1个字节也就是8个bit。

对于内存中连续的8个字节比如0x0000-0x0007,并非位于一个bank上,而是位于8个bank上,每个bank保存一个字节,8个bank像是垂直叠在一起,物理上它们并不是连续的。 image.png 之所以这样设计是基于电路工作效率考虑,这样的设计可以并行取8个字节的数据,如果想取址0x0000-0x0007,每个bank只需要工作一次就可以取到,IO效率比较高,如果这8个字节在同一个bank上则需要串行读取该bank8次才能取到。

结合上面的结构图可以看到0x0000-0x0007是一列,0x0008-0x000F是另外一列,如果从内存中取8-15字节的数据也可以一次取出来,但如果我们要取1-9的数据就需要先取0-7的数据,再取8-15的数据然后拼起来,这样的话就会产生两次内存IO。所以基于性能的考虑某些CPU会强制只能读取8的倍数的内存,而这也导致了编译器再此类平台上编译时必须做内存对齐。

2.3 Cacheline

CPU通常会将Cacheline size个字节一次加载到高速缓存中(即L1、L2、L3缓存)。 这部分内容我后续会写一篇博客专门介绍下CPU高速缓存结构。

2.4 再来看结构体size的问题

以下均以64位平台,即:64位宽内存以及64位cpu(数据总线64位,寄存器64位)、cacheline size=64byte为前提

type S struct {
    B byte
    I64 int64
    I32 int32
}

在不了解内存对齐前我们可能会简单以为结构体在内存中可能是这样排列的: image.png 总共占用13个字节。我们可以看到 I64 这个字段的内存地址是1-8,而在64位平台上为了将这个字段加载到寄存器中,CPU需要两次内存IO。
但做内存对齐后: image.png 总共占用20个字节,I64这个字段的内存地址是8-15,为了将这个字段加载到寄存器中,只需要一次内存IO即可。 我们写段代码验证下是否真的占用了20个字节:

type S struct {
	B   byte
	I64 int64
	I32 int32
}

func main() {
	s := S{}
	fmt.Printf("s size: %d, align: %d\n", unsafe.Sizeof(s), unsafe.Alignof(s))
}

输出:

s size: 24, align: 8

程序输出了24,而非上面我们以为的20,这是怎么回事呢?原因是结构体本身也必须要做对齐,它必须在后面再额外占用4个字节以使自己的size为8的倍数。

上面的结构体如果后面跟一个4字节的变量的话理论上说不用对齐也能保证一次内存IO就可加载,所以结构体对齐的根本原因目前我还不是特别能理解,可能为编译器做的优化,了解的同学欢迎在评论区指点一下

我们再调整下结构体的声明:

type S struct {
    B byte
    I32 int32
    I64 int64
}

再做内存对齐的话该结构体在内存中应该就是下面这个样子了: image.png 这时总共占用16个字节,相比较上面我们节省了8个字节。 写段代码验证下:

type S struct {
	B   byte
	I32 int32
	I64 int64
}
func main() {
	s := S{}
	fmt.Printf("s size:%v, s.B地址:%v, s.I32地址:%v, s.I64地址:%v\n", unsafe.Sizeof(s), &s.B, &s.I32, &s.I64)
}

输出结果:

s size:16, s.B地址:0xc0000b4010, s.I32地址:0xc0000b4014, s.I64地址:0xc0000b4018

确实占用了16字节,但貌似I32这个字段跟我们预想的不太一样,它被对齐到了4的倍数地址上,而非紧跟在B后边,这大概是编译器编译一套代码可以运行在32位又可以运行在64位平台上吧,目前没有查到相关资料姑且这么认为吧。

参考资料

字 (计算机))

带你深入理解内存对齐最底层原理

阅读全文 »

x86 CPU与IA-32架构

x86 CPU

现代计算机使用的CPU大部分都是x86CPU,包括现在牙膏厂的酷睿。x86系列CPU的原型是Intel 1978年推出的8086 CPU

32位CPU

368是x86系列第一款32位CPU,Pentium4是Intel第一款64位CPU。”xx位CPU”的定位比较模糊,但一般要满足以下两个条件: 1. 具备n位宽的通用寄存器 2. 具备n位以上的地址空间

“通用寄存器”是寄存器中用于整数运算等的通用的寄存器,地址空间是指进程虚拟地址的全体范围。

指令集

多种多样的CPU有着不同的架构和速度,存在很大的差异,但尽管有这些差异一般386和Core 2都可以统称为x86CPU,这是因为386和Core 2能够执行相同的机器语言的指令。如果只是使用386指令编写的程序,在386和Core 2上都是可以跑的。像这样不同的CPU都能解释的机器语言的体系称为 指令集架构(ISA, Instruction Set Architecture) ,简称 指令集 。 Intel将x86系列CPU之中的32位CPU的指令集架构称为IA-32。IA是“Intel Architecture”。

IA-32的变迁

随着CPU技术的不同发展,CPU支持的指令越来越多,IA-32中指令增加的非常多。 首先486中增加了非常重要的指令。从486的486DX型号开始加入了 浮点数运算单元(FPU,Floating Point number Processing Unit) 支持浮点数计算。486DX所支持的浮点数运算指令称为 x87FPU指令(x87 FPU instuctions)。 386也能够支持浮点数运算,但必须添加名为387的FPU。也就是说配置有387的机器与没有配置387的机器支持的指令是不同的。 所添加的其他重要的指令还有 MMX和SSE(Streaming SIMD Extensions) 。两者都是为了支持并行处理多条数据的扩展指令。例如用通常的IA-32指令进行加法运算时一次只能执行一次加法运算,但使用MMX和SSE的加法指令可以同时执行多个运算。

IA-32的64位扩展: AMD64

AMD曾先于Intel提出x86系列的64位扩展,并推出了相应的产品。由AMD设计的x86位指令集架构称为AMD64。
Intel随后在自己的CPU中加入了和AMD64几乎相同的名为Intel64的指令集。Pentium4后期的版本和Core 2的后续产品都是基于Intel64指令集架构的。
要统称AMD64和Intel64时可以试用独立于公司名称的用语:x86-64。另外,Windows中将AMD64对应的架构称为x64。
Intel曾与HP一起开发名为IA-64的指令集架构,IA-64与IA-32架构完全不兼容。Intel推出的Itanium处理器是基于IA-64架构的。

IA-32的概要

IA-32中主要寄存器如下图:

image.png

通用寄存器 (generic register)是编程时使用频率最高的寄存器,宽度为32位的通用寄存器有eax、ebx、ecx、edx、esi、esp、ebp共8个,用于整数运算和指针处理。

指令指针 (instruction pointer) 是存放下一条要执行的代码的地址的寄存器,IA-32的指令指针为32位,称为eip。

标志寄存器 (flag register) 用于保存CPU的运行模式及表示运算状态等的标志的寄存器。 浮点数寄存器 (floating point number register) 是存放浮点数的寄存器,用于浮点数的计算。IA-32中从st0到st7有8个宽度为80位的浮点数寄存器。

MMX寄存器 (MMX register) 是MMX指令用的寄存器。MMX Pentium以及Pentiunm Ⅱ之后的CPU中有从mm0到mm7共8个64位的寄存器。但实际上MMX寄存器和浮点数寄存器是共用的,即无法同时使用浮点数寄存器和MMX寄存器。

XMM寄存器 (XMM register) 是SSE指令指令用的寄存器。Pentium Ⅲ以及之后的CPU中提供了xmm0到xmm7共8个128位宽的XMM寄存器。XMM寄存器和MMX寄存器不同,是独立的寄存器不和浮点数寄存器共用。另外 mxcsr寄存器 是表示SSE指令的运算状态的寄存器。

除上述寄存器外还有写OS内核时用到的 系统寄存器 和debug时用到的 debug寄存器 以及32位环境下用不到的段寄存器。

通用寄存器

名称由来 | 寄存器 | 名称的由来 | 翻译 | | —— | —— | —— | | eax | accumulator | 累加器,很多加法乘法指令的缺省寄存器 | | ebx | base regiter | 基底寄存器,在内存寻址时存放基地址 | | ecx | count register | 计数寄存器,是重复(REP)前缀指令和LOOP指令的内定计数器 | | edx | data register | 数据暂存寄存器,总是被用来放整数除法产生的余数 | | esi | source index | 源索引寄存器 | | edi | destination index | 目标索引寄存器 | | ebp | base point | 基址指针,经常被用作高级语言函数调用的frame pointer | | esp | stack pointer | 用作堆栈指针,称为栈顶指针 |

ebp和esp寄存器一般用来实现机器栈,其他寄存器原则上可以随便用。

通用寄存器的宽度都为32位,它们的一部分可以当做16位/8位寄存器使用。例如可以当eax寄存器中的低16位当做16位寄存器ax来访问,还可以将ax寄存器的高8位当做ah寄存器,低8位当做al寄存器。 image.png

IA-32中各进程的一部分地址空间被当做栈来使用,主要用于保存函数的临时变量和参数。栈的位置因OS而已,IA-32 Linux平台上,栈位于各进程地址空间中靠近3GB位置。即栈是从高地址向低地址进行延伸image.png

IA-32中用栈指针(stack pointer)来表示栈,栈指针(esp寄存器)是存放栈顶地址的寄存器

栈的操作

举个例子如果我们要向栈中压一个4字节的整数17,整个操作步骤就是先将esp寄存器-4(栈从高地址向低地址进行延伸的),然后将整数保存到esp寄存器指向的内存地址中。 image.png

出栈则正好相反,首先从esp寄存器指向的内存地址中将数据加载出来,并将esp寄存器+4。 image.png

栈帧

栈并不是连续的一整块,栈是根据每一个函数分开管理的,我们将管理单个函数数据的栈的领域称为栈帧(stack frame)。如果有这样一个程序:main函数调用函数f,f调用函数g,那么这个程序在执行g时的栈就会是下图这样:

image.png

ebp寄存器总是指向当前函数栈的栈底,栈帧的顶部与当前进程的栈顶是相同的,esp寄存器总是指向栈帧的顶部。其他架构中一般将具有和基址指针相同功能的指针称为帧指针(frame pointer)。

一个栈帧中通常保存一下信息: * 临时变量 * 源函数执行中的代码地址(返回地址) * 函数的参数 在每个栈帧上存储上述信息的具体步骤是由函数的调用约定(calling convention)决定的,各个CPU、操作系统的函数调用约定是不同的。

指令指针

指令指针(instruction pointer)是存放下一条要执行的指令的地址的寄存器。CPU从该寄存器所指向的内存地址中获取下一条指令并执行,同时将指令指针推到下一条指令,可以通过跳转指令来改变指令指针的值。

根据架构的不同,有时将指令指针称为程序计数器(program counter, pc)。

标志寄存器

eflags是32位寄存器,CPU的运行模式以及运算相关的信息等都以1个bit的形式存在该寄存器中。 image.png

标志有以下三类: 1. 表示运算结果的状态标志(status flag) 2. 用于运算控制的控制标志(control flag) 3. 用于控制计算器整体运行的系统标志(system flag)

一般程序中可用的只有状态标志和控制标志,系统标志再写OS时会用到,用户模式的进程不能修改系统标志,否则会报没有权限的错误。

状态标志的具体含义如下: 简称 | 标志的正式名称 | 含义 – | – | – CF | carry flag | 运算结果中发生进位或借位 PF | parity flag | 运算结果中的奇偶标志位 AF | auxiliary carry flag | 运算结果中低4位向高4位发生进位或借位 ZF | zero flag | 比较结果为0的时候被置为1 SF | sign flag | 运算结果为负数时被置为1 OF | overflow flag | 运算结果越过了正/负的界限

这些标志位一般与跳转指令配合使用。

字节序

32位即4个字节数据的二进制表现形式如下: image.png

MSB(Most Significant Bit)指向最高位,LSB(Least Significant Bit)指向最低位。而在内存中先放MSB所在的字节还是先放LSB所在的字节是由CPU的类型决定的,先放MSB所在字节的架构称为大端(big endian),先放LSB所在字节的架构称为小端(little endian)。通过网络传输超过2个字节数据时一般使用大端的方式,所以大端也被称为网络字节序(network byte order)

本文摘自

How to develop a compiler

阅读全文 »

PHP实现Bitmap的探索 - GMP扩展使用

一、背景

公司当前有一个用户群的系统,核心功能是根据不同的条件组去不同的业务线中get符合条件的uid列表,然后存到redis中的bitmap中。

举个🌰,如果一个用户群中有两个用户: 3和7,即[3,7],用bitmap表示那就是:00010001

最后利用redis提供的bitOp命令: bitOp AND \ bitOp XOR \ bitOp OR对各个条件组对应的uid列表bitmap做交并差集计算,得出最终的用户群并存储到redis bitmap中。

二、问题

对于上面描述的系统,如果用户群人数较多的那我们就需要执行较多次的setBit {uid} 1命令,而且如果用户群中的第一个uid是一个特别大的值比如10亿的话,就可能会一次malloc 1000000000/1024/1024/8 ~= 120M的内存,这可能会导致redis卡住一段时间,在高并发的redis实例上执行这个操作是相当危险的。而且可以预想到对于两个较大的bitmap key执行bitOp也是非常消耗CPU的,应该尽量避免在存储型的redis实例中做这种十分消耗CPU的计算操作。

实际上内存最少只能申请一个字节,即8位,所以上面的计算方式稍有出入,但相差并不大。

三、解决方案

针对上述的问题,可以将bitmap的计算挪到应用程序中来,只将最终统计出来的bitmap存储到redis中即可。
如果最终结果用户群中的第一个uid是一个特别大的值的话,可以先set 1K再设置2K..3K…这样缓存的增加bitmap的大小避免redis卡住。

四、PHP实现Bitmap

由于该系统目前是使用的PHP,所以下面记录下PHP实现Bitmap的”心路历程“。

由于要操作PHP变量的某一位,所以就要借助位运算来实现,但是又由于PHP的位运算只能作用在整型数上,所以我们无法使用字符串或者浮点数来实现,所以最先考虑的就是使用整型数组来实现。

为什么是数组呢?因为在64位机器上一个整型变量最多只能使用64位,又由于PHP的整型是有符号的,所以最高位无法供我们使用,所以一个整型变量能存储的最大的uid就是63,这真是太鸡肋了-_-||,所以只能搞个用多个整型变量了实现了。

OK,到此为止貌似找到一个看起来不错的解决方案。但是我们再思考这样一个问题:假设我们系统中最大的uid是63x100万=3.6千万(*对主流互联网公司来说这很正常吧😸*),那为了存储所有uid,我们需要1百万个整数才行,即我们需要一个拥有1百万个元素的数组,那么如果我在进程中制造了一个这样的数组会占用多少内存呢?会是64 * 1百万 / 1024 / 1024 / 8 ~= 7.6M吗?答案是否定的,因为php数组是由HashTable实现的,这是一个复杂的结构体,除了数组元素占用的内存外,还有其他的占用。(这里先不做展开,有兴趣可以自行查看下php数组的实现) 眼见为实:

<?php
ini_set('memory_limit','4G');
$arr = [];
for ($i = 0; $i < 64 * 1000000; $i++)
{
    $arr[] = PHP_INT_MAX;
}

echo "done\n";
while(1){
}

查看内存占用

image.png

可以看到大概是1.5G,比我们上面预计的大的多,这太可怕了,必须优化下我们的内存占用,才能真正在生产环境中使用。

这里需要提一句,我的机器只有8G,所以程序可能会用到swap分区,而ps命令结果中的RSS不统计swap分区的占用,在我实际实现中发现ps结果中RSS一列显示占用的内存会随着时间慢慢减少,但是我的程序中arr变量占用的内存是不可能被回收的,所以推测是物理内存中占用的部分内存被置换到了swap分区中。如果你要进行这个实验的话建议关闭swap分区,这样你能得到一个更准确的结果。

五、继续优化

基于上面的经验,如果我们要占用尽可能小的内存,那我们必须能够操作一段近乎无限长的内存且不能产生其他额外占用才可以。幸运的是PHP给我们提供了这样一个扩展:GMP,这个扩展可以让我们使用一个任意长度的整数。OK现在我们拥有了获得一块连续的内存而不会产生其他额外占用的手段,再写一段代码使用下并验证下内存占用情况:

<?php

$gmp = gmp_init(0);
gmp_setbit($gmp, 64 * 1000000, true);
echo "done\n";
while(1){}

image.png

Awesome,这次只使用了15M的内存。更加兴奋的是这个扩展提供了诸如:gmp_andgmp_orgmp_xor这样进行位运算的函数,极大的方便了我们的使用。

到此为止我们似乎找到了一个完美的解决方案,但是真的完美吗?No!其实还可以再优化一下,想象下如果我们有一个用户群,里面只有一个uid:64000000(表示为数组的话就是:[64000000]),为了存储这个用户我们需要占用7.6M内存,而这个用户群中仅仅只有一个元素,这真是极大的浪费啊!

为了优化这个问题可以拥抱上面被我们唾弃的数组😸,一个大的bitmap拆分为一个个小bitmap的数组,这一个个小的bitmap我们限制大小为1Kw位。 image.png

回到上面的问题,如果我们要存储[64000000]这个用户群的话只需要在数组的第6个元素中设置一个little bitmap: 1即可。这样我们就由一开始的占用7.6M内存优化为了占用1位内存。

OK,到此为止我们找到一个还不错的解决方案😸。

后言

为了在Mac中安装GMP扩展又耗费了很多时间,当然,这又是另外一个故事了。有时间我会分享Mac中安装GMP扩展的过程中我遇到的问题。

参考资料

  1. GNU Multiple Precision
  2. Process Memory Management in Linux
  3. 从源码看 PHP 7 数组的实现
阅读全文 »

LR分析中shift/reduce reduce/reduce冲突解决方案SLR(1)与LR(1)

此篇文章要求读者对编译原理前端部分有一定了解 此篇文章中,我们以大写英文作为非终结符,小写英文作为终结符

1. LR(0)分析法简述

LR分析法从左至右移进输入的终结符(词法分析器的输出实际是token,但在语法分析阶段会代表是一个终结符),并将终结符压入到堆栈,称为shift。如果当前栈上的符号恰好符合某个非终结符的生成式,则此时进行归约操作:将这些符号弹出栈,然后将规约后的非终结符压入堆栈,这一步就称为reduce。然后继续上面的步骤,直到没有输入。
如果最终栈上只有一个非终结符,且该非终结符就是目标符号,那证明识别成功,否则识别失败。
名称LR得名于:从左(Left)到右扫描(L),反向(Reverse)最右推导(R)。

2. LR(0)分析法的不足

上面描述的算法存在一个问题,我们以下面的语法为例说明:

// 例1
B : A c
A : b d
  | b

对于上面的语法,当语法分析器遇到终结符b时,面临着两个选择,一个是继续移进下一个终结符,一个是使用生成式A : b进行归约。这种情况称为shift/reduce冲突
继续看下面一个例子:

// 例2
A : b
C : b
D : A a
E : C d

对于上面的语法,当语法分析器遇到终结符b时,面临着两个选择,一个是根据A : b,归约为A,另一个选择是使用生成式C : b进行归约。这种情况称为reduce/reduce冲突

因为这两种冲突的存在导致了LR(0)分析法在实际语法分析中基本不可用,必须找到解决这两种冲突的方案才行,那么如何这两种冲突呢?

3. SLR(1)

对于这两种冲突,我们首先先看一种简单的解决方案:SLR(1) (Simple LR)分析法。
SLR(1)分析法首先求出所有非终结符的Follow Set,即 跟在非终结符之后的所有终结符的集合,然后前瞻一个符号(即从词法分析器中预先读入下一个终结符),如果该前瞻符号在一个非终结符的Follow Set中,就根据此非终结符的生成式进行归约。

我们以上面的例2为例,SLR(1)分析器先求出A的Follow Set为{a},C的Follow Set为{b},假设当前输入为b a,输入b之后,语法分析器面临选择:归约到A or 归约到C,此时分析器前瞻一个符号即c,由于c属于A的Follow Set,所以分析器选择归约到A。

上面的例1也可以通过此算法解决shift/reduce冲突。

遗憾的是SLR(1)依然存在问题,这里举个例子就清楚了:

// 例3
T : S
S : aAd
S : bAc
S : aec
S : bed
A : e

首先求出各个非终结符的Follow Set:

Follow(T) = {}
Follow(S) = {}
Follow(A) = {d, c}

我们假设当前的输入为a e c, 当输入e时,SLR(1)分析器面临两个选择:继续移进下一个符号 or 根据A : e归约到A,此时SLR(1)分析器前瞻符号c,c存在于Follow(A)中,但此时又可以选择移进c,所以SLR(1)此时又面临着冲突了。

SLR(1)不足之处在于Follow Set太宽泛,处于Follow Set中的前瞻符号不一定能合法的跟在非终结符之后。实际上SLR(1)忽略了分析的上下文,针对SLR(1)的不足由提出了LR(1)分析法。

4. LR(1)

LR(1)的基本原理就是只要前瞻符号能合法跟在归约的非终结符之后就可以进行归约,LR(1)会为每个生成式绑定一个** LookAhead Set**,只有前瞻符号处于这个集合之中才进行归约,它是Follow Set的子集。那么LookAhead Set如何生成呢?

4.1 LookAhead Set生成

我们将生成式一般化为下面的样子:

s -> α .x β, C 
x -> . r

其中 s,x都是非终结符,α β r可以是终结符也可以是非终结符,C 为生成式的LookAhead Set。

x的LookAhead Set = First(β C),即β的FirstSet与C串起来之后的First集

First Set可以理解为非终结符所有生成式中第一个终结符的集合

5. Merak

我将LR(1)分析算法封装成了一个Golang Parser库:Merak,并且用它实现了一个面向对象语言的Parser: Mizar。对此有兴趣的同学可以试用下,它将为你省略手写语法分析器的过程,节省宝贵的时间投入到更加有趣的编译器后端工作中。

阅读全文 »

《我的第一个面向需求的Haskell程序》续

前言

上一篇《我的第一个面向需求的Haskell程序》文章中的Haskell程序还存在一个问题: 程序只打印出了文件中有没有重复的元素但是并没有告知是哪一个元素重复了,重复了几次也没有打印出来。 所以我继续优化下上篇文章中的Haskell程序,现在这段程序变成了下面这样

代码

module Main where

import Data.List.Split
import Data.List
import System.IO
import System.Environment

main = do
    args <- getArgs
    check args

check::[String] -> IO ()
check [filename] = do
    contents <- readFile filename
    mapM_ printRepeat $ fmap (\(x:xs) -> (x, 1 + length xs)) $ group $ splitOn "\r\n" contents
    putStrLn "check done"

check x = do
    putStrLn "请输入文件名"

printRepeat::(String, Int) -> IO()
printRepeat (word, num)
    | num > 1 = putStrLn $ word ++ " repeated " ++ (show num) ++ " times."
    | otherwise = return ()

使用

$ cabal build
$ ./dist-newstyle/build/x86_64-osx/ghc-8.8.4/repeat-0.1.0.0/x/repeat/build/repeat/repeat test.txt
joM2qWfjOJc repeated 2 times.
check done

解释

首先我们使用split包提供的splitOn 函数按照换行符将文件内容切分为[String],现在我们有了:

["abc", "abc", "def", "ghi", "def"]

然后使用group函数聚合下这个List,得到:

[["abc", "abc", "abc"], ["def", "def"], ["ghi"]]

再通过fmap (\(x:xs) -> (x, 1 + length xs))即map一个lambda表达式到这个List上,将这个List中的每个元素转为元组,得到:

[("abc", 3), ("def", 2), ("ghi", 1)]

至此我们实际做了一个WordCount程序…

接下来调用printRepeat函数打印出来结果就OK了

阅读全文 »

GNU 汇编器的语法

学习汇编语法的目的

为什么要学习汇编语法呢?原因是我最近在做一个面向对象语言的编译器(地址:https://github.com/Orlion/Mizar),目前已经完成了parser部分,即已经生成了AST,下一步要做的就是语义分析了,而语义分析之后要做的就是生成AT&T汇编代码了,所以有必要提前了解下汇编语法看在语义分析的实现阶段能否有所指导。

先看一段代码

首先我们有这样一段c语言代码:

#include <stdio.h>

char msg[14] = "Hello,world!\n";
 
int main(void)
{
    puts(msg);
    return 0;
}

运行 gcc -S -Os hello.c

    .file   "hello.c"
    .section    .text.startup,"ax",@progbits
    .globl  main
    .type   main, @function
main:
.LFB0:
    .cfi_startproc
    pushq   %rax
    .cfi_def_cfa_offset 16
    movl    $msg, %edi
    call    puts
    xorl    %eax, %eax
    popq    %rdx
    .cfi_def_cfa_offset 8
    ret
    .cfi_endproc
.LFE0:
    .size   main, .-main
    .globl  msg
    .data
    .align 8
    .type   msg, @object
    .size   msg, 14
msg:
    .string "Hello,world!\n"
    .ident  "GCC: (GNU) 4.8.5 20150623 (Red Hat 4.8.5-39)"
    .section    .note.GNU-stack,"",@progbits

接下来解释下AT&T汇编的语法

指令

指令是直接由CPU负责处理的命令,不以.开头的行首缩进的行都是指令行。

    movl    $msg, %edi
    call    puts
    xorl    %eax, %eax

指令由操作符和作为参数的操作数组成,以 movl $msg, %edi 为例,movl 为操作符, $msg%edi 为操作数,操作数以逗号来间隔。

汇编伪操作

. 开头末尾没有:的行都是汇编伪操作。例如,.file "hello.c", .globl main。汇编伪操作是由汇编器而非CPU处理的指令。一般用于在目标文件中记录元数据(meta data)或者设定指定的属性等。例如 .string 是用来定义字符串常量的汇编伪操作。

标签(labal)

以冒号: 结尾的行都是标签行,例如:.LFB0:,main:。 标签具有为汇编伪操作生成的数据或者指令命名(标上符号)的功能,这样就可以在其他地方调用通过标签定义的符号。 标签可以以.开头

注释

支持两种注释:

# xxx

/* xxx
xxx */

助记符后缀

刚才提到的movlsubl为助记符,更准确的说movsub为助记符,末尾的l是后缀,llong的缩写,表示操作对象的数据大小。类似这样的后缀还有b,w,l

|后缀|操作对象的大小|缩写| |-|-|-| |b|8位|byte| |w|16位|word| |l|32位|long|

操作数

操作数有四种: 1. 立即数 2. 寄存器 3. 直接内存引用 4. 间接内存引用

1. 立即数

立即数就是C语言中的字面量,机器语言中立即数以整数的形式出现,能高速访问。像$27这样,立即数用$来标识,如果漏掉了$就成了直接内存引用了。立即数有8位,16位,32位。

2. 寄存器

GUN汇编器规定寄存器以%开头,例如eax寄存器写作%eax

3. 直接内存引用

直接访问固定内存地址的方式。GNC汇编器会将任何整数解释为内存地址并访问。比起使用数字,更常用符号(symbol)直接访问内存。例如.LFE0就是访问符号.LFE0所指向的地址。符号在汇编和链接的过程中会被置换为实际内存地址。

4. 间接内存引用

是将寄存器的值作为内存地址访问的方式。间接内存引用中最通用的就是下方的形式:

disp(base, index, scale)

其中任何一者都可以省略。

上述指令访问disp + (base + index * scale)的地址。 下面详细讲解,首先最简单的间接引用的形式如下:

(%eax)

即只指定基地址(base)的形式。上述表达式将eax寄存器中的值作为内存地址访问。 接着带有disp的形式如下。disp是displacement(偏移)的简称。

4(%eax)

上述就是访问 4 + (eax寄存器中值) 这个内存地址。在C语言中用来访问如下结构体中成员y的情况:

struct point {
    int x; // x占4个字节,即4个内存地址
    int y;
}

最后使用index和scale的情况如下所示:

(%ebx, %eax, 4)

上面访问的就是(ebx寄存器中的值 + eax寄存器中的值 * 4)内存地址。在C语言中用来访问数组,例如访问元素大小为4字节(例如int)的数组中元素的第%ebx个元素时就可以用这种方式。当并非所有的数组访问都可以只靠间接内存引用来表示,因为scale只能是1、2、4、8之一。

2020-09-22更

突然意识到如果要将一个复杂工程的AST编译为汇编代码必须具备能够用汇编实现这个复杂工程的能力才行,这太难了… 所以暂时放弃吧,先编译到了LLVM IR再说,嗯。

2020-10-07更

不如趁此机会学习下汇编以及后续的链接装载,有助于建立宏观的了解。所以还是继续学习吧。 😸

未完待续…

阅读全文 »

对Haskell惰性求值的理解

全文均为伪代码,没有验证,只可意会

doIf::Bool -> a -> Maybe a
doIf cond action = if cond then (Maybe action) else Nothing

我们声明了一个doIf函数,它接收两个参数:cond与action,它干一件事:如果cond == true 就调用action,并包在Maybe中返回。 如果是strict的语言的话,在调用doIf函数的之时action就会被执行了,而Haskell默认是non-strict的,所以在调用doIf时,action无需求值,只有在cond为true时才需要对action求值,如果cond为false的话action根本不会被执行,这就是non-strict与strict的区别。

阅读全文 »

我的第一个面向需求的Haskell程序

背景

上周五(20年8月28日)的时候,公司测试同学需要测试我的一个提测需求,其中有个测试用例是需要检查下下后台导出的兑换口令列表文件中是否有重复的口令。

由于导出的口令有数百万之多,肯定是不能用眼去看了,原本是打算用excel来检查的,但是我一想:ei(二声)~,最近不是正好在搞Haskell吗?正好拿来练练手,用Haskell写个检测程序。

Why is Haskell

因为这个程序写出来是要交给测试同学使用的,如果用java或者php这种解释型语言来写,还需要测试同学先去安装个java/php的解释器才行,显然是有点扯的,所以用编译型语言写完后直接build出一个可执行文件才比较方便。

当然可以将java/php的程序打包成一个可执行文件,但是又要花费我一些不必要的时间了。

编译型语言中我常用的有golang和Haskell。不可否认Go面对这个需求写起来可能更快,但是我其实还是想用Haskell练练手。

那? 开始吧!

首先,使用cabal创建一个项目

$ mkdir repeat && cd repeat
$ cabal init

导出的口令文件是以\r\n换行的,haskell的lines函数无法切分,所以需要通过cabal引入一个包:split,我的repeat.cabal文件就变成了下面这样了:

cabal-version:       >=1.10
-- Initial package description 'repeat.cabal' generated by 'cabal init'.
-- For further documentation, see http://haskell.org/cabal/users-guide/

name:                repeat
version:             0.1.0.0
-- synopsis:
-- description:
-- bug-reports:
-- license:
license-file:        LICENSE
author:              wangdongdong
maintainer:          wangdongdong@smzdm.com
-- copyright:
-- category:
build-type:          Simple
extra-source-files:  CHANGELOG.md

executable repeat
  main-is:             Main.hs
  -- other-modules:
  -- other-extensions:
  build-depends:       base >=4.13 && <4.14, split
  -- hs-source-dirs:
  default-language:    Haskell2010

编辑Main.hs

module Main where

import Data.List.Split
import Data.List
import System.IO
import System.Environment

main = do
    args <- getArgs
    check args

-- 通过模式匹配获取命令行参数中的文件名
check::[String] -> IO ()
check [filename] = do
    contents <- readFile filename
    -- 暴力通过去重后的list length对比来判重,不可取
    if (length $ mylines contents) /= (length $ nub $ mylines contents)
        then putStrLn "有重复元素" 
        else putStrLn "没有重复元素"

check x = putStrLn "请输入文件名"

-- 通过split库的splitOn函数以\r\n为切割符将文件内容切分为list
mylines contents = splitOn "\r\n" contents

最后编译为可执行文件

$ cabal build

编译结果在dist-newstype文件夹之中

交付使用

$ ./repeat keywords.txt

能够满足需求!

后续优化请看

《我的第一个面向需求的Haskell程序》续

阅读全文 »

go mod 提示 unknown revision问题

通过go mod download下载公司gitlab仓库代码时提示unknown revision 由于是私有仓库且回车执行命令后并没有输入密码的提示,所以猜测是go mod download时git clone 没有输入密码提示

一番搜索后发现解决方案如下:

// 设置永久存储账号密码
git config credential.helper store
// git pull过程中允许输入用户名密码
export GIT_TERMINAL_PROMPT=1
阅读全文 »