Oosten Studio

这世界没有一件事情是虚空而生的。站在光里,背后就会有阴影,这深夜一片寂静,是因为你还没有听见声音。

Go

标签

大话golang GC

本文将从宏观方面尽可能用白话阐述golang的GC机制,描述GC的整体流程与一些重要的细节,尽量不深究具体实现,为读者节省研读源码的时间。

1. 并发垃圾回收

1.1 三色标记

golang采用标记-清除的垃圾回收算法,标记过程中会将对象抽象为三色:

  • 黑色:已扫描完成的存活对象
  • 灰色:待扫描的存活对象
  • 白色:未扫描的对象,可能是存活的也可能是垃圾

垃圾收集器开始工作后从根对象开始扫描,将正在扫描的对象引用的对象作为待扫描的对象放到工作池中,工作池中都是所谓的灰色对象,然后垃圾收集器从工作池中不断pop待扫描的对象进行扫描,直到工作池中没有待扫描的对象。

1.2 混合写屏障

为了保证并发正确性,go还引入了混合写屏障技术,往代码中插入类似如下的伪代码:

writePointer(slot, ptr):
    shade(*slot) // 原对象标灰,实际就是扔到工作池中
    shade(ptr) // 新对象标灰
    *slot = ptr

2. GC的触发时机

对于GC来说我们首先必须要知道的是GC何时触发。GC的触发时机有三种,分别是

2.1 距离上次gc已过去2分钟

这里的2分钟是runtime中的变量,目前没有看到有什么命令行参数可以去修改该值,应该仅是用来方便做runtime的单元测试使用的。

golang有个系统监控线程:sysmon,在该线程中会检测检测距离上次gc的时间,然后唤醒runtime.forcegchelper()这个Goroutine去调用runtime.gcStart()开启gc。

2.2 当前堆内存相较上次gc后增长100%时

这里这个100%可以由环境变量GOGC控制,默认是100。

golang中所有堆内存都是通过runtime.mallocgc()分配的,在此函数中会判断当前堆内存增长是否超过这个限制,超过这个限制后会调用runtime.gcStart()开启gc。

需要注意并不是每次申请内存mallocgc()都会进行这个判断,而是遵循如下逻辑:

  • 申请超过32K的对象时一定进行判断
  • 申请小于32K的对象时如果当前线程缓存mcache中没有可用内存需要从中心缓存mcentral或者页堆mheap中申请内存时会进行判断

如果看源码的话你会看到并不是增长达到100%时才触发,而是golang中有一套调步算法,会根据当前内存分配情况来动态调整触发值以尽量达到“增长100%时触发”这个目标。

2.3 手动触发

可以调用runtime.GC()强制进行GC,该函数会阻塞到GC完成才返回。

3. GC的启动

了解了GC的触发时机后下一步我们需要探究的就是GC的过程是怎样的,会对我们的用户程序造成什么影响。

从上面我们可以知道GC实际是通过runtime.gcStart()启动的,这个函数大概流程如下:

func gcStart(trigger gcTrigger) {
	...
	// 开启后台标记worker协程
	gcBgMarkStartWorkers()

	// STW
	systemstack(stopTheWorldWithSema)
	
	// 清理sync.Pool
	clearpools()
	
	// 禁用用户协程的调度
	schedEnableUser(false)

	// GC阶段修改为_GCmark,并开启写屏障
	setGCPhase(_GCmark)

	// 标记线程缓存mcache中的tiny alloc
	gcMarkTinyAllocs()

	// 修改全局变量
	atomic.Store(&gcBlackenEnabled, 1)

	// 关闭STW
	now = startTheWorldWithSema(trace.enabled)
	...
}

3.1 开启后台标记worker协程

gcBgMarkStartWorkers()函数中会为每个P创建一个用于执行标记任务的协程,协程创建后都会调用runtime.gopark()将自己挂起等待被调度器唤醒。

3.2 STW

stopTheWorldWithSema中会先修改全局变量sched.gcwaiting = 1,然后遍历所有P,向每个P绑定的M发抢占信号。M收到抢占信号后,会先判断是否能够安全抢占,如果能则调用runtime.schedule()重新开始调度,schedule()中判断sched.gcwaiting等于1会挂起M等待唤醒,这样所有的用户程序就都暂停执行了。

那么STW中如何确认所有P都暂停了呢?答案是STW中如有如下的代码等待所有P暂停。

for {
	// 等待100us,当P响应抢占信号后判断所有P都暂停时会调用notewakeup唤醒
	if notetsleep(&sched.stopnote, 100*1000) {
		noteclear(&sched.stopnote)
			break
	}
	// 再次抢占
	preemptall()
}

go1.14之前是协作时调度,因此不一定能及时响应抢占,这可能导致STW耗时很长。

4. 标记

当STW关闭后(即start the world后) 调度器就可以调度 “2.2 开启后台标记worker协程”一节中开启的后台标记协程了,那么后台标记协程是如何被调度执行的呢?

调度器runtime.schedule()中会按照如下逻辑调度后台标记协程:

// runtime/proc.go
func schedule() {
	...
	var gp *g
   	...
    	// 如果开启gc则通过gcController尝试获取P上的标记协程
    	if gp == nil && gcBlackenEnabled != 0 {
		gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
		tryWakeP = tryWakeP || gp != nil
	}
	...
	if gp == nil {
		if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
			lock(&sched.lock)
			// 从全局队列获取P
			gp = globrunqget(_g_.m.p.ptr(), 1)
			unlock(&sched.lock)
		}
	}
	if gp == nil {
		// 从本地队列获取P
		gp, inheritTime = runqget(_g_.m.p.ptr())
	}
	if gp == nil {
		// 当垃圾处理器处于标记阶段
		// 并且当前处理器不需要做任何任务时
		// findrunable会返回后台标记协程
		gp, inheritTime = findrunnable()
	}
	...
	// 执行gp
}

用于并发标记对象的工作协程有三种不同的工作模式runtime.gcMarkWorkerMode,这三种模式的Goroutine标记工作时采用不同的策略,垃圾收集控制器会按照需求执行不同的工作协程。

  • gcMarkWorkerDedicatedMode: P专门负责标记对象,不会被调度走
  • gcMarkWorkerFractionalMode: 当垃圾收集的后台CPU使用率达不到目标(默认为25%)启动该类型的工作协程帮助垃圾收集达到目标使用率,可以被调度。
  • gcMarkWorkerIdleMode: 当P上没有可运行的G时,它会执行垃圾收集标记任务

runtime.gcControllerState.findRunnabledGCWorker方法会计算得出Worker的mode。

4.1 辅助标记

因为用户程序与后台标记程序是并发执行的,这就有可能用户程序分配内存的速度大于标记速度,出现这种情况就非常尴尬了😓。

为了防止这种情况的发生,runtime引入了辅助标记。它遵循原则:“分配多少内存就需要标记多少内存”。mallocgc函数中会检查申请内存的协程是否入不敷出,如果是则将当前协程陷入休眠、加入全局的辅助标记队列并等待后台标记任务的唤醒。

4.2 标记是如何进行的

实际上标记任务都是调用runtime.gcDrain()函数进行标记的,在该函数中会先扫描根对象(数据段、BSS段以及协程的栈等)。扫描过程中会产生灰色对象,这些灰色对象都会被放入到工作池中,并且写屏障也会向工作池中添加灰色对象

根对象扫描结束后会不断从工作池中选出待扫描的灰色对象,这个过程又可能产生新的灰色对象,标记扫描器就不停重复这个动作直到工作池中没有待扫描的灰色对象为止。

另外内存分配mallocgc()函数中会判断如果当前GC阶段不是_GCoff会将新分配的对象直接标记为黑色

4.3 标记终止

当所有P的本地任务完成,且不存在剩余的工作协程后,后台标记程序或者辅助标记程序会调用runtime.gcMarkDone()转入标记终止阶段,该函数核心逻辑如下:

func gcMarkDone() {
	...
	systemstack(stopTheWorldWithSema)
	...
	// 禁止辅助gc和后台标记任务运行
	atomic.Store(&gcBlackenEnabled, 0)
    	// 唤醒所有因辅助gc休眠的G
	gcWakeAllAssists()
	...
	// 恢复用户协程的调度
	schedEnableUser(true)
	
	nextTriggerRatio := gcController.endCycle()
	// 进入标记终止
	gcMarkTermination(nextTriggerRatio)

gcMarkTermination中大概会做如下工作

func gcMarkTermination(nextTriggerRatio float64) {
	// 修改当前GC阶段为_GCmarktermination
	setGCPhase(_GCmarktermination)
	...
	// 开始STW中的标记
	gcMark(startTime)
	...
	// 设置当前阶段为_GCoff并禁用写屏障
	setGCPhase(_GCoff)
	// 唤醒后台清扫任务
	gcSweep(work.mode)
	...
	// 关闭STW
 	startTheWorldWithSema(true)
	...

可以看到在标记终止阶段还有STW的过程。

5. 清理

标记结束后会调用gcSweep()清理内存,核心逻辑如下:

func gcSweep(mode gcMode) {
	if sweep.parked {
		sweep.parked = false
		// 唤醒后台清扫任务
		ready(sweep.g, 0, true)
	}
}

可以看到gcSweep是通过唤醒sweep.g来执行清扫任务的,sweep.g是在初始化主协程时调用bgsweep()设置的:

// runtime/proc.go
func main() {
    ...
    gcenable()
    ...
}

// runtime/mgc.go
func gcenable() {
	c := make(chan int, 2)
	go bgsweep(c)
	go bgscavenge(c)
	<-c
	<-c
	memstats.enablegc = true
}

// runtime/mgcsweep.go
func bgsweep(c chan int) {
    	// 设置sweep.g
	sweep.g = getg()
	lockInit(&sweep.lock, lockRankSweep)
	lock(&sweep.lock)
	sweep.parked = true
	c <- 1
	goparkunlock(&sweep.lock, waitReasonGCSweepWait, traceEvGoBlock, 1)
    	// 循环清扫
	for {
		for sweepone() != ^uintptr(0) { // 清扫一个span
			sweep.nbgsweep++
			Gosched() // 进入调度
		}
		for freeSomeWbufs(true) { // 释放一些未使用的标记队列缓存到heap
			Gosched()
		}
		lock(&sweep.lock)
		if !isSweepDone() { // 判断sweepdone标记位是否等于0
			unlock(&sweep.lock)
			continue // 如果未清扫完成则继续清扫
		}
		// 否则让后台清扫任务进入休眠
		sweep.parked = true
		goparkunlock(&sweep.lock, waitReasonGCSweepWait, traceEvGoBlock, 1)
	}
}
阅读全文 »

golang sync.Pool分析

如何使用就不讲了,网上很多文章

1. 结构

type Pool struct {
	noCopy noCopy // 用于保证pool不会被复制
	local     unsafe.Pointer // 实际类型是 [P]poolLocal
	localSize uintptr        // local的size
	victim     unsafe.Pointer // 在新一轮GC来临时接管local,用于减少GC之后冷启动之后的性能抖动
	victimSize uintptr        // 在新一轮GC来临时接管localSize
	New func() interface{} // 当pool中没有对象时会调用这个函数生成一个新的
}

type poolLocal struct {
	poolLocalInternal
	// 避免false sharing问题
	pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

type poolLocalInternal struct {
    // P的私有缓存区,使用时无需加锁,Put对象时优先放到这里
	private interface{}
	// 公共缓存区,本地P可以pushHead/popHead,其他P只能popTail
	shared  poolChain
}

// 双端队列
type poolChain struct {
	head *poolChainElt
	tail *poolChainElt
}

type poolChainElt struct {
	poolDequeue
	next, prev *poolChainElt
}

// 环形队列
type poolDequeue struct {
	headTail uint64 // 头尾指针,之所以用一个变量持有两个字段大概率是为了方便原子操作一次性修改两个值吧
	vals []eface // 容量从8开始,依次x2,上限为2 ^ 30
}

type eface struct {
	typ, val unsafe.Pointer
}

image.png

2. Get

2.1 主流程

func (p *Pool) Get() interface{} {
    // 当G与P绑定禁止抢占,返回P对应的poolLocal以及P的id
	l, pid := p.pin()
	x := l.private
	l.private = nil
	if x == nil {
	    // 如果private为空则从shared头部pop出一个
		x, _ = l.shared.popHead()
		if x == nil {
		    // 如果shared中也没有则尝试从其他P的shared尾部偷一个
			x = p.getSlow(pid)
		}
	}
	// 解除非抢占
	runtime_procUnpin()
	if x == nil && p.New != nil {
	    // 如果上面的步骤都没有取到则New个出来
		x = p.New()
	}
	return x
}

其中涉及到了一些函数,我们再看下具体实现

2.2 pin

pin的作用是将当前G与P绑定,禁止被抢占。那么为什么要禁止被抢占呢?原因是G被抢占后再恢复执行之后再绑定的可能就不是被抢占之前的P了

func (p *Pool) pin() (*poolLocal, int) {
    // 执行绑定并返回当前pid
	pid := runtime_procPin()
	s := atomic.LoadUintptr(&p.localSize) 
	l := p.local
	if uintptr(pid) < s { // pid<localSize说明已经完成了poolLocal的创建,可以取
		return indexLocal(l, pid), pid
	}
	// pid>=localSize说明poolLocal还没有创建或者用户通过runtime.GOMAXPROCS(X)增加了p的数量,需要先创建
	return p.pinSlow()
}

// 返回local[i]即当前P的poolLocal
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
	lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
	return (*poolLocal)(lp)
}

// runtime/proc.go
func procPin() int {
	_g_ := getg()
	mp := _g_.m
    // 完成禁止抢占
    // 调度器执行抢占g之前会canPreemptM(mp *m)判断是否可以执行抢占,而canPreemptM有一个条件为m.locks==0
	mp.locks++
	return int(mp.p.ptr().id)
}

2.2.1 pinSlow

pinSlow主要用来在poolLocal还未创建时创建新poolLocal

func (p *Pool) pinSlow() (*poolLocal, int) {
    // 解除禁止抢占
	runtime_procUnpin()
	// 上锁
	allPoolsMu.Lock()
	defer allPoolsMu.Unlock()
	// 禁止抢占
	pid := runtime_procPin()
	s := p.localSize
	l := p.local
	if uintptr(pid) < s {
	    // 上锁之前可能其他线程已经进入到pinSlow了,所以再判断一下
		return indexLocal(l, pid), pid
	}
	if p.local == nil {
	    // 说明local第一次初始化,需要将pool加到allPools中
		allPools = append(allPools, p)
	}
	// 获取p的数量
	size := runtime.GOMAXPROCS(0)
	// 创建local
	local := make([]poolLocal, size)
	atomic.StorePointer(&p.local, unsafe.Pointer(&local[0]))
	atomic.StoreUintptr(&p.localSize, uintptr(size))
	return &local[pid], pid
}

2.3 poolChain.popHead

我们再看下Get主流程中从shared中通过popHead从shared头部pop出一个对象的实现

func (c *poolChain) popHead() (interface{}, bool) {
	d := c.head
	for d != nil { // 从链表头开始遍历,d的type为*poolDequeue
		if val, ok := d.popHead(); ok {
			return val, ok
		}
		d = loadPoolChainElt(&d.prev)
	}
	return nil, false
}

2.3.1 poolDequeue.popHead

poolDequeue.popHead用来从环形队列头部pop出一个缓存对象

func (d *poolDequeue) popHead() (interface{}, bool) {
	var slot *eface
	for {
		ptrs := atomic.LoadUint64(&d.headTail)
		head, tail := d.unpack(ptrs)
		if tail == head {
		    // 如果头尾指针相等则队列为空
			return nil, false
		}

        // 通过不断重试来实现无锁编程
        // 尝试将head-1然后修改poolDequeue.headTail
		head--
		ptrs2 := d.pack(head, tail)
		if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
		    // 取到head对应的槽
			slot = &d.vals[head&uint32(len(d.vals)-1)]
			break
		}
	}

	val := *(*interface{})(unsafe.Pointer(slot))
	if val == dequeueNil(nil) {
		val = nil // 通过3.2.1 poolDequeue.pushHead分析,貌似val不可能为nil
	}
	
	// 清空槽
	*slot = eface{}
	return val, true
}

func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
    // dequeueBits=32
	const mask = 1<<dequeueBits - 1
	head = uint32((ptrs >> dequeueBits) & mask) // &mask为了将高32位清零
	tail = uint32(ptrs & mask)
	return
}

func (d *poolDequeue) pack(head, tail uint32) uint64 {
	const mask = 1<<dequeueBits - 1
	return (uint64(head) << dequeueBits) |
		uint64(tail&mask)
}

type dequeueNil *struct{}

2.4 getSlow

再看下主流程中的getSlow函数的实现,getSlow用于在当前P缓存中没有时从其他P的共享缓存区偷缓存对象

func (p *Pool) getSlow(pid int) interface{} {
	size := atomic.LoadUintptr(&p.localSize) 
	locals := p.local
	// 从其他P偷
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals, (pid+i+1)%int(size))
		// 从其他P的共享区的尾部偷
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	}

    // 如果没有偷到,则尝试victim cache。我们将在尝试从所有主缓存中偷取之后这样做,因为我们想让victim cache中的对象尽可能的老化
	size = atomic.LoadUintptr(&p.victimSize)
	if uintptr(pid) >= size {
		return nil
	}
	locals = p.victim
	l := indexLocal(locals, pid)
	if x := l.private; x != nil {
		l.private = nil
		return x
	}
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals, (pid+i)%int(size))
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	}

    // 走到这里说明victim cache中也没有对象
	// 将victim cache标记为空,下次就不用尝试victim cache了
	atomic.StoreUintptr(&p.victimSize, 0)

	return nil
}

2.4.1 poolChain.popTail

getSlow中会通过poolChain.popTail从双端队列尾部pop对象,看下具体是如何操作的

func (c *poolChain) popTail() (interface{}, bool) {
    // 先获取双端队列的尾节点,因为这时被偷的P与当前P在并行,所以需要通过原子操作获取
	d := loadPoolChainElt(&c.tail)
	if d == nil {
	    // 尾节点为空说明被偷的P的双端队列为空直接返回即可
		return nil, false
	}

	for {
	    // 在pop tail之前load下一个指针是非常重要的,通常,d可能暂时为空,但是如果
	    // 在pop之前next非nil并且pop失败,那么d永远为空,这是唯一可以安全的从链表
	    // 中删除d的方法
		d2 := loadPoolChainElt(&d.next)

		if val, ok := d.popTail(); ok {
			return val, ok
		}

		if d2 == nil {
		    // next为空遍历终止
			return nil, false
		}
		
		// 走到这里说明当前尾节点为空,并且有下一个节点
		// 此时当前尾节点不可能有新对象被push进来了,可以删除掉了
		// 尝试将环形队列的尾节点指针改成它的下一个节点
		if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
			// 走到这里说明赢得了race,清除prev指针以便gc能够收集空dequeue
			// 因此popHead不会在必要时再次备份
			storePoolChainElt(&d2.prev, nil)
		}
		d = d2
	}
}

要修改poolChain的空尾节点指针为尾节点的下一个节点必须同时满足下面两个条件(即删除当前尾节点) * 当前尾节点环形队列为空 * 当前尾节点必须有下一个节点

我们注意到golang中先获取了当前尾节点的next再popTail,这是为什么呢?如果先popTail再获取next有可能遇到这样的情况:

  1. d的队列为空popTail没有获取到数据
  2. 另外一个线程向d中push了n个对象,此时d不为空,并且生成了下一个节点
  3. 原子获取next,next不为空
  4. 误将还有缓存对象的d删除

2.4.1.1

func (d *poolDequeue) popTail() (interface{}, bool) {
	var slot *eface
	for {
		ptrs := atomic.LoadUint64(&d.headTail)
		head, tail := d.unpack(ptrs)
		if tail == head {
			// 说明队列为空
			return nil, false
		}

		ptrs2 := d.pack(head, tail+1)
		if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
			// 修改成功,这个solt就被我们占有了
			slot = &d.vals[tail&uint32(len(d.vals)-1)]
			break
		}
	}

    // 从槽中取值
	val := *(*interface{})(unsafe.Pointer(slot))
	if val == dequeueNil(nil) {
		val = nil
	}

	slot.val = nil
	atomic.StorePointer(&slot.typ, nil)

	return val, true
}

3. Put

3.1 主流程

// sync/pool.go
func (p *Pool) Put(x interface{}) {
	if x == nil {
		return
	}
	l, _ := p.pin() // 禁止G被抢占,并返回当前P对应的poolLocal
	if l.private == nil {
	    // 如果私有缓存为空则直接放到私有缓存区
		l.private = x
		x = nil
	}
	if x != nil {
	    // 如果私有缓存已经被占了,则放到共享缓存区头
		l.shared.pushHead(x)
	}
	runtime_procUnpin() // 解除禁止抢占
}

接下来我们看下新元素具体是如何放到共享缓冲区头部的

3.2 poolChain.pushHead

func (c *poolChain) pushHead(val interface{}) {
	d := c.head
	if d == nil {
	    // 头结点为空则需要进行初始化
		const initSize = 8 // 第一个节点的环形队列的长度为8
		d = new(poolChainElt)
		d.vals = make([]eface, initSize)
		// 其他P可能会从尾部偷对象,所以poolChain的tail需要用atomic set,保证对其他P可见
		c.head = d
		storePoolChainElt(&c.tail, d)
	}

    // push到头结点双端队列的头部
	if d.pushHead(val) {
		return
	}

	// 走到这里说明当前头节点的环形队列已经满了,所以申请一个新的节点
	// 新节点环形队列的长度为旧队列的两倍,但如果大于1 << 32 / 4则长度为1 << 32 / 4
	newSize := len(d.vals) * 2
	if newSize >= dequeueLimit {
		newSize = dequeueLimit
	}

	d2 := &poolChainElt{prev: d}
	d2.vals = make([]eface, newSize)
	// 修改双端队列的头节点为新创建的节点
	c.head = d2
	// 其他P可能会用到next所以需要原子store
	storePoolChainElt(&d.next, d2)
	d2.pushHead(val)
}

3.2.1 poolDequeue.pushHead

poolDequeue.pushHead用于将对象放到环形队列上

func (d *poolDequeue) pushHead(val interface{}) bool {
	ptrs := atomic.LoadUint64(&d.headTail)
	head, tail := d.unpack(ptrs)
	// dequeueBits = 32
	if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
		// 如果环形队列tail加上长度等于head,说明队列实际已经满了
		return false
	}
	// 找到head对应的槽,slot的类型为*eface
	slot := &d.vals[head&uint32(len(d.vals)-1)]

	typ := atomic.LoadPointer(&slot.typ)
	if typ != nil {
		return false
	}

	if val == nil {
		val = dequeueNil(nil) // 追了下代码调用链貌似val不可能为nil
	}
	// 将val放到槽上
	*(*interface{})(unsafe.Pointer(slot)) = val

	// 增加头指针
	atomic.AddUint64(&d.headTail, 1<<dequeueBits)
	return true
}

4. GC

上面的流程中我们清楚了对象是如何被缓存已经如何被写入和获取的,但是缓存池容量不是无限的,何时清理呢?答案是GC时。

sync/pool.go中有个init函数,在这个函数中注册了GC时如何清理Pool的函数

func init() {
    // 编译器会将poolCleanup赋值给runtime/mgc.go文件中`poolcleanup`变量
    // 在runtime.clearpools()函数中会调用poolcleanup,而在gcStart函数中在开始标记之前会调用clearpools()
	runtime_registerPoolCleanup(poolCleanup)
}

func poolCleanup() {
	for _, p := range oldPools {
	    // 先清除所有旧pool中的victim
	    // 之后gc就能标记清理旧pool中缓存的对象了
		p.victim = nil
		p.victimSize = 0
	}

	for _, p := range allPools {
	    // 用victim接管pool
		p.victim = p.local
		p.victimSize = p.localSize
		p.local = nil
		p.localSize = 0
	}

	oldPools, allPools = allPools, nil
}

当时看到这里时还有一点疑惑,poolCleanup为什么不写成下面这样:

func poolCleanup() {
	for _, p := range allPools {
	    // 用victim接管pool
		p.victim = p.local
		p.victimSize = p.localSize
		p.local = nil
		p.localSize = 0
	}
}

看起来也能清理缓存队列。但实际有个非常浅而已见的坑,那就是allPools这个切片一直在增长,必须将allPools设置为nil清理下才行,所以就必须引入oldPools。

5. 总结

sync.Pool为每个P搞一个缓存队列,避免所有线程共用同一个队列引发的锁竞争问题。

5.1 Put流程

  1. push到双端队列的头部的环形队列头部,如果环形队列已满则创建一个新的环形队列
  2. 将环形队列作为双端队列的新头部

5.2 Get流程

  1. 先从当前P缓冲区的私有缓存取
  2. 如果私有缓存没有从共享缓存区的双端队列的环形队列的头部pop
  3. 还没获取到则从其他P的共享缓存区的双端队列的环形队列的尾部pop
  4. 还没获取到则从victim cache中取

5.3 总结

总的来说只要清楚了sync.Pool的数据结构基本都理解的大差不差了,还是很简单的。

阅读全文 »

Golang 启动流程图

最近研究了下go的GMP调度模型,总结了下启动流程中GMP调度模型相关的一些核心点,可与一些讲解启动源码的博客一起食用,效果更佳

未命名文件.png

阅读全文 »

golang sync.Map实现原理解析

本文Golang版本为go version go1.14 darwin/amd64

前言

Go中的map不是并发安全的,因此Go提供了sync.Map以在并发编程中使用,sync.Map是针对以下两种场景优化的:

  1. 某个指定的key只会被写入一次,但是会被读取多次,像不断增长的cache
  2. 多个goroutine读、写、覆盖不同的key

对于这两种场景,sync.Map比map配合Mutex/RWMutex可以大大降低锁的竞争。

数据结构

我们先看下sync.Map的定义

type Map struct {
	mu Mutex // 锁
	read atomic.Value // readOnly 读的时候先从read读
	dirty map[interface{}]*entry
	misses int // 当read没读到的时候+1(无论dirty中有没有)
}

read实际是readOnly结构体的一个实例,其定义:

type readOnly struct {
	m       map[interface{}]*entry
	// true表示dirty里存在read没有的key
	// 只有两种情况下amended为false
	// 1. map刚初始化的时候
	// 2. misses>=len(dirty)会将dirty赋值给read,同时dirty置为nil,amended置为false
	amended bool
}

entry定义:

type entry struct {
	p unsafe.Pointer
}

expunged定义:

var expunged = unsafe.Pointer(new(interface{}))

p的状态有以下三种:

状态 说明
nil entry已被删除,且dirty==nil
expunged entry已被删除,但dirty!=nil且dirty不存在该entry
其他 entry有效,并且存在于m.read中,如果m.dirty!=nil,则dirty中也存在该entry

基本原理

  • 通过read和dirty两个字段进行读写分离,read只用来读,将最新写入的数据存在dirty上
  • 读取时会先查询read,没有再去读dirty,写入则只写入dirty
  • 读read不需要加锁,读写dirty需要加锁
  • 通过misses字段来记录read被穿透的次数,穿透一定的次数则将dirty赋值给read
  • 删除为标记删除

写入

我们先看下写入即Store()方法 image.png

func (m *Map) Store(key, value interface{}) {
	// 将m.read原子读出
	read, _ := m.read.Load().(readOnly)
	// 如果read中存在要写入的key,则尝试修改
	if e, ok := read.m[key]; ok && e.tryStore(&value) {
		// 如果entry是expunged说明该entry已被删除,则不修改
		// 如果不是expunged则修改为新值,修改成功直接return
		// 此时最新值存在于read中,m.dirty不存在或者不是最新值
		return
	}

	// 走到这有两种可能
	// 1. read中没有我们要修改的key
	// 2. read中有我们要修改的key,但是已被删除
	m.mu.Lock()
	// 重新读取一遍
	read, _ = m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok {
		// 如果read中存在key的话,又走到这里的话entry基本是expunged
		// 将entry通过由expunded修改为nil
		if e.unexpungeLocked() {
			// 如果修改成功的话说明entry确实是expunged
			// 说明该dirty!=nil且entry不在dirty中
			// 将此entry加到dirty中
			// 此时dirty[key]与read[key]指向同一个entry
			// 下面修改entry值就会将read和dirty一并修改了
			m.dirty[key] = e
		} 
		// 如果修改失败的话说明在加锁之前entry由expunged被修改为了其他值
		// 可能是在另一个也是修改该key的线程中在加锁前抢先执行了if里面的代码		
		// 所以这时dirty中已经有这个key了

		// 将新值写入entry
		// read与dirty都得到了更新
		e.storeLocked(&value)
	} else if e, ok := m.dirty[key]; ok {
		// 如果read中不存在但dirty中存在的话
		// 将新值写入到dirty中的entry中
		e.storeLocked(&value)
	} else {
		// 如果read和dirty中都没有
		if !read.amended {
			// amended=false有两种可能性:
			// 1. map初始化后还没有写入值
			// 2. misses>=len(dirty),dirty被赋值给了read同时置为了nil
			// 创建新dirty并将read拷贝到新的dirty中
			m.dirtyLocked()
			// dirty拥有read全部数据,更新read.amended为true
			m.read.Store(readOnly{m: read.m, amended: true})
		} // amended=true说明dirty中存在read中没有的key
		// 向dirty中写入一个新entry
		// 此时新entry只存在于dirty中,read中没有
		m.dirty[key] = newEntry(value)
	}
	m.mu.Unlock()
}

func (e *entry) tryStore(i *interface{}) bool {
	for {
		p := atomic.LoadPointer(&e.p)
		if p == expunged {
			return false
		}
		// 将entry中的值替换为新值
		if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
			return true
		}
	}
}

func (e *entry) unexpungeLocked() (wasExpunged bool) {
	return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}

func (e *entry) storeLocked(i *interface{}) {
	atomic.StorePointer(&e.p, unsafe.Pointer(i))
}

// 将read数据复制到dirty中,删除数据除外
func (m *Map) dirtyLocked() {
	if m.dirty != nil {
		return
	}

	read, _ := m.read.Load().(readOnly)
	m.dirty = make(map[interface{}]*entry, len(read.m))
	for k, e := range read.m {
		// 如果p是nil修改为expunged
		// 如果p是expunged则不复制到dirty
		if !e.tryExpungeLocked() {
			m.dirty[k] = e
		}
	}
}

// 如果p是nil的修改为expunged,返回e是否是expunged
func (e *entry) tryExpungeLocked() (isExpunged bool) {
	p := atomic.LoadPointer(&e.p)
	for p == nil {
		if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
			return true
		}
		p = atomic.LoadPointer(&e.p)
	}
	return p == expunged
}

删除Delete()

image.png

func (m *Map) Delete(key interface{}) {
	read, _ := m.read.Load().(readOnly)
	e, ok := read.m[key]
	if !ok && read.amended {
		// 如果read中不存在要删除的key,且dirty中存在read中不存在的key
		// 则从dirty中删除key
		m.mu.Lock()
		read, _ = m.read.Load().(readOnly)
		e, ok = read.m[key]
		if !ok && read.amended {
			delete(m.dirty, key)
		}
		m.mu.Unlock()
	}
	if ok {
		// 如果read中存在,则将read中key对应的value修改为nil
		e.delete()
	}
}

// 把e.p改成nil,如果p为nil/expunged则不修改并返回false
func (e *entry) delete() (hadValue bool) {
	for {
		p := atomic.LoadPointer(&e.p)
		if p == nil || p == expunged {
			return false
		}
		if atomic.CompareAndSwapPointer(&e.p, p, nil) {
			return true
		}
	}
}

查找Load()

image.png

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
	read, _ := m.read.Load().(readOnly)
	e, ok := read.m[key]
	if !ok && read.amended {
		m.mu.Lock()
		read, _ = m.read.Load().(readOnly)
		e, ok = read.m[key]
		if !ok && read.amended {
			e, ok = m.dirty[key]
			m.missLocked()
		}
		m.mu.Unlock()
	}
	if !ok {
		return nil, false
	}
	return e.load()
}

func (m *Map) missLocked() {
	m.misses++
	if m.misses < len(m.dirty) {
		return
	}
	m.read.Store(readOnly{m: m.dirty})
	m.dirty = nil
	m.misses = 0
}

func (e *entry) load() (value interface{}, ok bool) {
	p := atomic.LoadPointer(&e.p)
	if p == nil || p == expunged {
		return nil, false
	}
	return *(*interface{})(p), true
}

Range()和LoadOrStore()方法的逻辑都比较简单这里就不再赘述了,看懂了上面的代码基本就能看懂。

阅读全文 »

浅入深出golang map的实现

为什么要研究map的实现

对于这个问题,我的回答有以下几个方面:

  1. 了解工业级map的另一种实现方案,增加自己的经验,可能对自己日后的工作有借鉴意义
  2. 只有了解了其实现原理才能清楚其性能瓶颈在哪,开发过程中尽量去避免

map的基本原理

map的底层结构是一个哈希表,map中的键值对被分配到了一个bucket数组上,每个bucket包含8个键值对。hash值的二进制低位部分被用来选择bucket,每个bucket存储了hash值的高位部分以用来在单个bucket上做区分。如果有超过8个key被hash到了同一个bucket上,那么就用链表连接到扩展bucket上。

底层数据结构

map 对应的结构体,一个map实际是hmap指针

type hmap struct {
	count     int // map中元素个数,即len(map)
	flags     uint8 // map的标记
	B         uint8 //bucket数量为2^B个
	noverflow uint16 // 溢出bucket的近似数量
	hash0     uint32 // hash seed 每次创建map都会生产一个随机的种子,提高hash碰撞攻击门槛
	buckets    unsafe.Pointer // bucket数组的地址,如果count=0,则有可能是nil
	oldbuckets unsafe.Pointer // 
	nevacuate  uintptr        // 指示扩容进度,小于此地址的buckets迁移完成
	extra *mapextra // optional fields
}

其中flags有以下几种标记:

常量名 对应二进制 说明
iterator 1 00000001 标记有迭代器在迭代buckets
oldIterator 2 00000010 标记有迭代器在迭代oldbuckets
hashWriting 4 00000100 标记当前正在写hashmap
sameSizeGrow 8 00001000 标记此次扩容容量没有变化只是重新分配元素,当扩容是由于溢出桶太多导致时会加此标记

bucket对应的结构体如下,源码中是没有keys、values、overflow这三个字段的,但实际运行过程内存中对应位置会有这三个字段

type bmap struct {
	tophash [8]uint8 // 如果<5表示存的是状态,>=5存的是hash值高8位
	// keys [8]keytype
	// values [8]valuetype // 之所以将key和value单独放在一起是因为内存对齐会浪费很多内存
	// overflow uintptr // 溢出桶地址
}

tophash时存的是状态,共有以下几种状态:

|常量名|值|说明| |——|——|——|——| |emptyRest|0|当前槽是空的,并且接下来的槽也是空的| |emptyOne|1|当前槽是空的,其他槽不一定| |evacuatedX|2|当前槽有效,但是已经被迁移到了扩容后buckets数组的上半区| |evacuatedY|3|当前槽有效,但是已经被迁移到了扩容后buckets数组的下半区| |evacuatedEmpty|4|当前槽是空的,已经被迁移了|

mapextra对应的结构体如下:

type mapextra struct {
	// The indirection allows to store a pointer to the slice in hiter.
	// 如果k/v都不包含指针,并且可以被inline,那么我们标记桶不含指针,这样避免gc扫描整个map
	// 然而bmap结构体的overflow字段是一个指针,为了保证溢出桶不被回收,我们用overflow存储所有hmap.buckets上的所有溢出桶,oldoverflow存储hmap.oldbuckets上的所有溢出桶
	overflow    *[]*bmap
	oldoverflow *[]*bmap

	// 创建bucket数组时会预先分配一些溢出桶出来,nextOverflow为这部分桶的第一个桶
	nextOverflow *bmap
}

它们之间的关系可以用下面这张图来表示: image.png

map的创建

在Go中通常使用make来创建一个map,像这样:

make(map[ktype]vtype, hint)

而在Go源码中提供了三个创建map的函数,它们分别是:

func makemap_small() *hmap
func makemap64(t *maptype, hint int64, h *hmap) *hmap
func makemap(t *maptype, hint int, h *hmap) *hmap
  1. 如果不指定hint或者hint(bucketCnt,即bucket上的元素数)时Go会通过runtime.makemap_small()来创建
  2. 如果hint为int64使用runtime.makemap64()来创建,但里面还是调用的runtime.makemap()
  3. 否则通过调用runtime.makemap()来创建

makemap_small()的代码非常简单,就是new一个hmap对象,然后生成hash种子:

func makemap_small() *hmap {
	h := new(hmap)
	h.hash0 = fastrand()
	return h
}

而makemap()的过程就相对复杂了,步骤为:

  1. 创建hmap结构体,并生成hash seed
  2. 通过hint找到bucket数组的长度,其实就是找到B
  3. 如果B不为0,那么就为hmap预创建bucket数组
  4. 如果B>=4我们认为大概率会有溢出桶,所以会预先分配2^(b-4)个溢出桶出来,然后将hmap.extra.nextOverflow指向预分配的第一个溢出桶

代码:

// t为map的类型信息, hint即make()的第二个参数
func makemap(t *maptype, hint int, h *hmap) *hmap {
	// 计算所需内存
	mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
	if overflow || mem > maxAlloc {
		// 如果溢出则将初始化大小修改为0
		hint = 0
	}

	// 初始化Hmap
	if h == nil {
		h = new(hmap)
	}
	// 生成hash seed, fastrand()这里不做详解,看注释是实现了某篇论文...
	h.hash0 = fastrand()

	// 找到一个 B,使得 map 的负载因子在正常范围内。
	B := uint8(0)
	for overLoadFactor(hint, B) { // overLoadFactor(hint, B) 等价于 hint > 2^B*6.5
		B++
	}
	h.B = B
	
	if h.B != 0 {
		var nextOverflow *bmap
		// 预分配bucket数组
		h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
		if nextOverflow != nil {
			// 如果预分配了溢出桶,则用h.extra.nextOverflow存第一个溢出桶的指针
			h.extra = new(mapextra)
			h.extra.nextOverflow = nextOverflow
		}
	}

	return h
}

makeBucketArray的源码

// dirtyalloc要么是nil要么是之前由makeBucketArray使用相同的t和b创建出来的bucket数组
// 如果dirtyalloc为nil,那么就会创建出一个新数组,如果不为nil则会清空掉之前的dirtyalloc然后重用这部分内存来创建新数组
// 如果b>=4我们认为大概率会有溢出桶,所以会预先分配2^(b-4)个溢出桶出来,nextOverflow即为这部分桶的第一个桶
func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
	base := bucketShift(b)
	nbuckets := base
	if b >= 4 {
		// 当b>=4时大概率会有溢出桶,所以这里预先分配一些溢出桶出来
		nbuckets += bucketShift(b - 4)
		sz := t.bucket.size * nbuckets
		up := roundupsize(sz)
		if up != sz {
			nbuckets = up / t.bucket.size
		}
	}

	if dirtyalloc == nil {
		buckets = newarray(t.bucket, int(nbuckets))
	} else {
		buckets = dirtyalloc
		size := t.bucket.size * nbuckets
		// 清空dirtyalloc
		if t.bucket.ptrdata != 0 {
			memclrHasPointers(buckets, size)
		} else {
			memclrNoHeapPointers(buckets, size)
		}
	}

	if base != nbuckets {
		// 进到这里说明上面申请了一些溢出桶
		// nextOverflow为第一个溢出桶的地址
		nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
		// last为最后一个溢出桶的地址
		last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
		// 将最后一个溢出桶的overflow设置为buckets数组的第一个元素
		// 之所以这么搞是为了方便判断h.extra.nextOverflow是不是最后一个预分配的溢出桶
		// 如果没理解的话看下下面的newoverflow函数😄
		last.setoverflow(t, (*bmap)(buckets))
	}
	return buckets, nextOverflow
}

赋值

Go中编译器会针对不同类型的key调用不同的赋值函数,它们分别是:

func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer
func mapassign_faststr(t *maptype, h *hmap, s string) unsafe.Pointer
func mapassign_fast32(t *maptype, h *hmap, key uint32) unsafe.Pointer
func mapassign_fast32ptr(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer
func mapassign_fast64(t *maptype, h *hmap, key uint64) unsafe.Pointer
func mapassign_fast64ptr(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer

它们的实现相差不大,我们只理解mapassign()的实现就行了,mapassign()赋值的步骤如下:

  1. 求key的hash值,找到其对应的bucket
  2. 如果正在扩容的话,需要将该bucket迁到新数组中
  3. 遍历bucket及其链接的溢出桶上的所有槽,如果找到了key,则接下来将新值赋到该槽上
  4. 如果没有找到key,则写到第一个空槽上
  5. 如果没有空槽则创建出来一个新的溢出桶出来,将新键值对写到这个新溢出桶上
  6. 如果要没有找到key或者需要创建新溢出桶的话还需要判断是否需要扩容,如果需要的话就先进行扩容,再重复步骤2
  7. 将键值对写到查询到的位置 func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer { ... // 求key的hash值 hash := t.hasher(key, uintptr(h.hash0)) ... if h.buckets == nil { // 如果map的bucket数组还没有创建则创建出来 h.buckets = newobject(t.bucket) } again: // 获取hash值的低B位 bucket := hash & bucketMask(h.B) if h.growing() { // 如果map正在扩容则调用growWork迁移bucket growWork(t, h, bucket) } // 根据低B位找到对应的bucket b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + bucket*uintptr(t.bucketsize))) // 取hash值的高8位,如果<5则再加上5 top := tophash(hash) var inserti *uint8 var insertk unsafe.Pointer var elem unsafe.Pointer bucketloop: for { for i := uintptr(0); i < bucketCnt; i++ { if b.tophash[i] != top { // tophash不匹配 if b.tophash[i] <= emptyOne && inserti == nil { // 如果槽是空的,则记录下这个位置 inserti = &b.tophash[i] insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize)) } if b.tophash[i] == emptyRest { // 当前槽是emptyRest,则之后的槽肯定也是空的,就不用往后查了 break bucketloop } continue } // tophash一致的话再判断key是否能匹配上 k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) if t.indirectkey() { k = *((*unsafe.Pointer)(k)) } if !t.key.equal(key, k) { // 如果key没有匹配上则继续查询 continue } // 找到了key,直接更新就行 if t.needkeyupdate() { typedmemmove(t.key, k, key) } elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize)) // 找到了key直接跳出循环 goto done } // 再遍历溢出桶 ovf := b.overflow(t) if ovf == nil { break } b = ovf } // 走到这里说明,当前map上没有指定的key,所以就需要写入一个新的键值对 // 如果写入新的键值对后负载系数>6.5 或者有太多溢出桶的话,就需要扩容 // 当然了,如果正在扩容中就不用再扩容了 // 溢出桶过多会严重影响查询效率所以需要扩容 if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) { hashGrow(t, h) // 扩容后需要重新找一个新的位置 goto again } if inserti == nil { // 当map上所有槽都满的情况下,上面查到的插入位置就是空了 // 所以加一个溢出桶承载新添加的键值对 newb := h.newoverflow(t, b) inserti = &newb.tophash[0] // 槽上k的位置 insertk = add(unsafe.Pointer(newb), dataOffset) // 槽上v的位置 elem = add(insertk, bucketCnt*uintptr(t.keysize)) } // 在插入位置写入新的键值对 if t.indirectkey() { // t.indirectkey()=true说明bucket存储的是key的指针 // 为key申请一块内存并返回该块内存的地址 kmem := newobject(t.key) // 将插入位置的值转为该块内存的地址 *(*unsafe.Pointer)(insertk) = kmem // insertk也指向到这块内存的地址 insertk = kmem } if t.indirectelem() { vmem := newobject(t.elem) *(*unsafe.Pointer)(elem) = vmem } // 将key写入到insertk指向的内存位置 typedmemmove(t.key, insertk, key) *inserti = top // 元素数量+1 h.count++ done: ... if t.indirectelem() { elem = *((*unsafe.Pointer)(elem)) } return elem }

newoverflow的流程:

源码:

func (h *hmap) newoverflow(t *maptype, b *bmap) *bmap {
	var ovf *bmap
	if h.extra != nil && h.extra.nextOverflow != nil {
		// 如果nextOverflow不为空,说明我们还有空闲的预分配溢出桶
		// 我们将优先使用空闲的预分配溢出桶
		ovf = h.extra.nextOverflow
		if ovf.overflow(t) == nil {
			// 如果该桶没有溢出桶,则将当前nextOverflow设置为它的下一个桶
			h.extra.nextOverflow = (*bmap)(add(unsafe.Pointer(ovf), uintptr(t.bucketsize)))
		} else {
			// 如果该桶有overflow,则说明它是最后一个溢出桶了,我们没有可用的预分配溢出桶了
			// 回忆下makeBucketArray函数,我们是不是把最后一个预分配的溢出桶的overflow设置为了buckets数组的第一个元素了😆
			// nextOverflow设置为nil
			ovf.setoverflow(t, nil)
			h.extra.nextOverflow = nil
		}
	} else {
		ovf = (*bmap)(newobject(t.bucket))
	}
	h.incrnoverflow()
	if t.bucket.ptrdata == 0 {
		// 这里如果没懂的话看下上面mapextra结构体的说明
		h.createOverflow() // 其实是初始化h.extra.overflow
		*h.extra.overflow = append(*h.extra.overflow, ovf)
	}
	b.setoverflow(t, ovf)
	return ovf
}
func (h *hmap) createOverflow() {
	if h.extra == nil {
		h.extra = new(mapextra)
	}
	if h.extra.overflow == nil {
		h.extra.overflow = new([]*bmap)
	}
}

扩容

既然上面提到了扩容那就在这里总结下扩容的步骤吧

扩容的条件

  1. 元素数量 / 桶数量 >= 6.5
  2. 溢出桶过多,当B<15(即bucket数量<2^15)时,如果溢出桶总数>=bucket数量则判定溢出桶过多;当B>=15时,如果溢出桶总数>=2^15则判定溢出桶过多

当条件1满足时,我们扩容一倍(将B加1),当条件1不足但条件2满足时说明桶利用率比较低,元素都被分配到了溢出桶上,这时容量不变,只移动bucket来降低空槽率

扩容步骤

  1. 如果负载系数>=6.5说明是由于溢出桶过多进行的扩容,这种扩容会进行等量扩容,不会增加buckets数组大小,这是将给map加上sameSizeGrow标志;否则将B+1,即扩容1倍
  2. 创建一个新buckets数组,并预分配一些溢出桶,清除map的iterator和oldIterator标志,如果当前map正在被迭代,则需要给map加个oldIterator标志
  3. 修改hmap结构体,将新buckets数组赋值给hmap.buckets字段,旧buckets数组赋值给hmap.oldbuckets字段,如果hmap.extra上还有溢出桶,还需要将此溢出桶赋值给hmap.extra.oldoverflow,同时hmap.extra.overflow置为nil
  4. 如果预分配了溢出桶,则将第一个溢出桶赋值给hmap.extra.nextOverflow func hashGrow(t *maptype, h *hmap) { bigger := uint8(1) if !overLoadFactor(h.count+1, h.B) { // 如果负载系数>=6.5说明是由于溢出桶过多进行的扩容,同时也说明了有许多bucket没有利用上 // 这时不扩容,只rehash bigger = 0 h.flags |= sameSizeGrow } oldbuckets := h.buckets // 创建一个新bucket数组 newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil) // 将flags中的iterator和oldIterator位清零 flags := h.flags &^ (iterator | oldIterator) if h.flags&iterator != 0 { flags |= oldIterator } // 修改map h.B += bigger h.flags = flags h.oldbuckets = oldbuckets h.buckets = newbuckets h.nevacuate = 0 h.noverflow = 0 if h.extra != nil && h.extra.overflow != nil { if h.extra.oldoverflow != nil { throw("oldoverflow is not nil") } h.extra.oldoverflow = h.extra.overflow h.extra.overflow = nil } if nextOverflow != nil { if h.extra == nil { h.extra = new(mapextra) } h.extra.nextOverflow = nextOverflow } }

可以看到hashGrow中是没有做迁移动作的,桶迁移是调用growWork渐进式进行的。

func growWork(t *maptype, h *hmap, bucket uintptr) {
	// bucket是在新数组上的位置,bucket&h.oldbucketmask()是在旧数组上的位置
	evacuate(t, h, bucket&h.oldbucketmask())

	// 如果还在扩容状态则再多迁移一个oldbucket
	if h.growing() {
		evacuate(t, h, h.nevacuate)
	}
}

evacuate源码

// oldbucket是要迁移的bucket在旧数组上的索引
func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
	// 索引找到要迁移的旧bucket的地址
	b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
	newbit := h.noldbuckets()
	if !evacuated(b) {
		// x和y是移动的目标
		// x表示的是新bucket数组的前半部分
		// y表示的是新bucket数组的后半部分
		var xy [2]evacDst
		x := &xy[0]
		// 找到要迁移的bmap的地址
		x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
		// 找到要迁移的k的地址
		x.k = add(unsafe.Pointer(x.b), dataOffset)
		// 找到要迁移的v的地址
		x.e = add(x.k, bucketCnt*uintptr(t.keysize))

		if !h.sameSizeGrow() {
			y := &xy[1]
			y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
			y.k = add(unsafe.Pointer(y.b), dataOffset)
			y.e = add(y.k, bucketCnt*uintptr(t.keysize))
		}

		
		for ; b != nil; b = b.overflow(t) {
			k := add(unsafe.Pointer(b), dataOffset)
			e := add(k, bucketCnt*uintptr(t.keysize))
			for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) {
				top := b.tophash[i]
				if isEmpty(top) {
					// 空槽不迁移
					b.tophash[i] = evacuatedEmpty
					continue
				}
				if top < minTopHash {
					throw("bad map state")
				}
				k2 := k
				if t.indirectkey() {
					k2 = *((*unsafe.Pointer)(k2))
				}
				var useY uint8
				if !h.sameSizeGrow() {
					// 增量扩容的情况下,计算hash以判断我们的数据是迁移到哪部分bucket
					hash := t.hasher(k2, uintptr(h.hash0))
					if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) {
						// 为什么要加 reflexivekey 的判断,可以参考这里:
                        			// https://go-review.googlesource.com/c/go/+/1480
                        			// key != key,只有在 float 数的 NaN 时会出现
                        			// 比如:
                        			// n1 := math.NaN()
                        			// n2 := math.NaN()
                        			// fmt.Println(n1, n2)
                        			// fmt.Println(n1 == n2)
                        			// 这种情况下 n1 和 n2 的哈希值也完全不一样
                        			// 这里官方表示这种情况是不可复现的
                        			// 需要在 iterators 参与的情况下才能复现
                        			// 但是对于这种 key 我们也可以随意对其目标进行发配
                        			// 同时 tophash 对于 NaN 也没啥意义
                        			// 还是按正常的情况下算一个随机的 tophash
                        			// 然后公平地把这些 key 平均分布到各 bucket 就好
						useY = top & 1
						top = tophash(hash)
					} else {
						// 假设旧桶数为2,那么newbit就为100(二进制形式)
						// 新桶数为4,bucketMask为111,
						// hash&newbit != 0说明hash形式为xxx1xx,
						// hash & bucketMask肯定是>100,所以肯定在Y半区
						if hash&newbit != 0 {
							useY = 1
						}
					}
				}

				if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY {
					throw("bad evacuatedN")
				}

				// 修改旧桶tophash为状态值
				b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY
				dst := &xy[useY]                 // evacuation destination

				if dst.i == bucketCnt {
					dst.b = h.newoverflow(t, dst.b)
					dst.i = 0
					dst.k = add(unsafe.Pointer(dst.b), dataOffset)
					dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))
				}
				dst.b.tophash[dst.i&(bucketCnt-1)] = top
				if t.indirectkey() {
					*(*unsafe.Pointer)(dst.k) = k2 
				} else {
					typedmemmove(t.key, dst.k, k) 
				}
				if t.indirectelem() {
					*(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
				} else {
					typedmemmove(t.elem, dst.e, e)
				}
				dst.i++
				
				dst.k = add(dst.k, uintptr(t.keysize))
				dst.e = add(dst.e, uintptr(t.elemsize))
			}
		}
		
		if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 {
			b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))
			ptr := add(b, dataOffset)
			n := uintptr(t.bucketsize) - dataOffset
			memclrHasPointers(ptr, n)
		}
	}

	if oldbucket == h.nevacuate {
		advanceEvacuationMark(h, t, newbit)
	}
}

查找

runtime的map提供了mapaccess1、mapaccess2、mapaccessK三个函数来获取值,取值时编译器会将v, exists = map[k] 转化为mapaccess2,v = map[k]转化为mapaccess1,mapaccessK只用于map遍历时。下面我们以mapaccess1为例看下是如何查找key的。

mapaccess1查找的流程:

  1. 计算key的hash值,hash值的低B位即为key所在bucket在map.buckets数组中的下标,通过hash值的低B位就能找到key所在bucket,如果map在扩容中那就找到key所在的旧bucket,如果旧bucket未迁移,那么接下来从旧bucket中找key
  2. 遍历bucket的每个槽,取hash值的tophash(高8位)与遍历到的槽上的tophash对比,如果不同则先判断下槽上的tophash是否是emptyRest,如果是那说明当前槽即下面的槽都是空槽了,就不用遍历了,如果不是就继续查下一个槽,直到tophash匹配上
  3. 如果上一步找到了匹配的tophash,就继续判断槽上的key是不是我们要找的key,如果不是还得继续遍历,如果是我们要找的key就取槽上的v返回就结束了
  4. 如果上一步没有找到对应的槽,说明map上没有我们要找的key func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer { ... if h == nil || h.count == 0 { if t.hashMightPanic() { // 这段代码是因为当从一个非空的map中取值时, // 如果key为nil, 那么会导致一个panic, // 但同样的操作在nil/空 map上就不会发生, 这可能会导致难以发现的bug // 所以这里还是要额外计算下key的hash值 t.hasher(key, 0) // see issue 23734 } return unsafe.Pointer(&zeroVal[0]) } ... // 计算key对应的hash值 // go为不同的type选择不同的hash函数,具体可以看下/cmd/compile/internal/gc/alg.go genhash()函数 hash := t.hasher(key, uintptr(h.hash0)) m := bucketMask(h.B) // 取hash值的低B位,然后找到对应的bucket b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize))) // 如果oldbuckets不为空,说明在扩容中 if c := h.oldbuckets; c != nil { if !h.sameSizeGrow() { m >>= 1 } // 找到旧bucket oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize))) // 如果还未迁移则下面从旧bucket查询key if !evacuated(oldb) { b = oldb } } // 取hash值的高8位(如果<5则再加上5) top := tophash(hash) bucketloop: // 先查找当前bucket的8个元素,再查找溢出桶上的元素 for ; b != nil; b = b.overflow(t) { for i := uintptr(0); i < bucketCnt; i++ { if b.tophash[i] != top { // 如果tophash为emptyRest则说明当前槽和后面的槽都不会有元素了 if b.tophash[i] == emptyRest { break bucketloop } continue } k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) if t.indirectkey() { k = *((*unsafe.Pointer)(k)) } // hash值一致后再判断key是否是我们要查询的key if t.key.equal(key, k) { // 取回元素值 e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize)) if t.indirectelem() { e = *((*unsafe.Pointer)(e)) } return e } } } return unsafe.Pointer(&zeroVal[0]) }

删除

删除的步骤:

  1. 与查找类似先通过key与hash值在bucket数组上找到对应的槽
  2. 将槽上的k/v清空,并将tophash设置为emptyOne
  3. 如果该槽是最后一个元素则将它及它之前的空槽tophash都设置为emptyRest,标识其后面都是空槽 func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) { ... // 计算key的hash值 alg := t.key.alg hash := alg.hash(key, uintptr(h.hash0)) ... // 根据hash值低B位找到bucket索引 bucket := hash & bucketMask(h.B) if h.growing() { // 如果正在扩容则执行迁移操作 growWork(t, h, bucket) } // 找到对应的bmap b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize))) bOrig := b top := tophash(hash) // 下面与查询类似都是去找到具体所在的槽 search: for ; b != nil; b = b.overflow(t) { for i := uintptr(0); i < bucketCnt; i++ { if b.tophash[i] != top { if b.tophash[i] == emptyRest { break search } continue } k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) k2 := k if t.indirectkey() { k2 = *((*unsafe.Pointer)(k2)) } if !alg.equal(key, k2) { continue } // 如果 key 中是指针,就清空 key 的内容 if t.indirectkey() { *(*unsafe.Pointer)(k) = nil } else if t.key.ptrdata != 0 { memclrHasPointers(k, t.key.size) } e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize)) if t.indirectelem() { *(*unsafe.Pointer)(e) = nil } else if t.elem.ptrdata != 0 { memclrHasPointers(e, t.elem.size) } else { memclrNoHeapPointers(e, t.elem.size) } b.tophash[i] = emptyOne // 如果该槽不是最后一个元素则跳转到最后 if i == bucketCnt-1 { if b.overflow(t) != nil && b.overflow(t).tophash[0] != emptyRest { goto notLast } } else { if b.tophash[i+1] != emptyRest { goto notLast } } // 如果该槽是最后一个元素则将它及它之前的空槽tophash都设置为emptyRest,标识其后都是空槽 for { b.tophash[i] = emptyRest if i == 0 { // 如果i=0说明遍历到了该bucket上的第一个槽,这时如果遍历到的b是冲突链表上的第一个bucket说明我们已经遍历完了,break if b == bOrig { break } c := b // 从冲突链表上第一个bucket开始找当前bucket的上一个bucket for b = bOrig; b.overflow(t) != c; b = b.overflow(t) { } i = bucketCnt - 1 } else { i-- } if b.tophash[i] != emptyOne { break } } notLast: // 元素数-1 h.count-- break search } } ... }
阅读全文 »

Golang切片与实现原理

本文Golang版本为1.13.4

Slice底层结构

go中切片实际是一个结构体,它位于runtime包的slice.go文件中

type slice struct {
	array unsafe.Pointer
	len   int
	cap   int
}

array是切片用来存储数据的底层数组的指针,len为切片中元素的数量,cap为切片的容量即数组的长度

切片的初始化

创建一个切片有以下几种方式

1. 通过字面量创建

arr1 := [3]int{1,2,3} // 创建一个数组
s1 := []int{1,2,3} // 创建一个len为3,cap为3的切片

上面的创建方式非常容易与数组的另一个创建方式弄混

arr2 := [...]int{1,2,3} // 创建一个数组,数组长度由编译器推断

s1在内存上的结构如下图: image.png

2. 通过make()函数创建

s1 := make([]int, 10) // 创建一个长度为10,容量为10的切片
s2 := make([]int, 5, 10) // 创建一个长度为5,容量为5的切片

s2的内存结构如图: image.png

3. 通过数组/切片创建另一个切片

通过数组/切片创建另一个切片语法为

slice[i:j:k]

其中i表示开始切的位置,包括该位置,如果没有则表示从0开始切;j表示切到的位置,不包括该位置,如果没有j则切到最后;k控制切片的容量切到的位置,不包括该位置,如果没有则切到尾部。下面举几个例子说明:

a := [10]int{0,1,2,3,4,5,6,7,8,9}
s1 := a[2:5:9] // s1结果为[2,3,4], len:3, cap:7
s2 := a[2:5:10] // s2结果为[2,3,4] len:3, cap:8
s3 := a[2:7:10] // s3结果为[2,3,4,5,6] len:5, cap:8
s4 := a[2:] // s4结果为[2,3,4,5,6,7,8,9] len:8, cap:8
s5 := a[:3] // s5结果为[0,1,2] len:3, cap:10
s6 := a[::3] // 编译报错: middle index required in 3-index slice
s7 := a[:] s7结果为[0,1,2,3,4,5,6,7,8,9], len:10, cap:10
s10 := s1[1:3] s10结果为[3,4], len:2, cap: 6。注意s10的cap是6,而不是7!

s1与s10在内存上的结构如图: image.png

由于as1s2s3s4s5s7共享同一个数组,所以其中任意一个变量通过索引修改了底层数组元素的值,相当于修改了以上所有变量:

s2[3] = 30

执行上面的代码后:变量a变成了[0,1,2,30,4,5,6,7,8,9]、s1变成了[2,30,4]、…… s7变成了[0,1,2,30,4,5,6,7,8,9]

nil切片与空切片

var s11 []int
var s12 = make([]int, 0)

上面的s11为nil,s12是空切片,他们在内存上的结构如图: image.png

我写了段代码验证了下:

var s10 = make([]int, 0)
sh10 := (*reflect.SliceHeader)(unsafe.Pointer(&s10))
println(unsafe.Pointer(sh10.Data))
var s11 []int
sh11 := (*reflect.SliceHeader)(unsafe.Pointer(&s11))
println(unsafe.Pointer(sh11.Data))
var s12 = make([]int, 0)
sh12 := (*reflect.SliceHeader)(unsafe.Pointer(&s12))
println(unsafe.Pointer(sh12.Data))
var s13 = make([]int, 0)
sh13 := (*reflect.SliceHeader)(unsafe.Pointer(&s13))
println(unsafe.Pointer(sh13.Data))

打印结果如下:

0xc00006af08
0x0
0xc00006af08
0xc00006af08

根据打印结果可以看到是上面的结构无误

切片创建源码

我们打印下下面代码对应的汇编,看下golang是如何为我们创建出来一个切片的

func main() {
	tttttt := make([]int, 999)
	fmt.Println(tttttt)
}

通过go tool compile -S -l slice.go打印对应汇编(-l是禁止内联),下面只摘取关键部分

"".main STEXT size=181 args=0x0 locals=0x48
	...
	// 栈增加72个字节
        0x0013 00019 (slice.go:5)       SUBQ    $72, SP
	// 将当前栈底地址加载到到当前栈顶地址+64处
        0x0017 00023 (slice.go:5)       MOVQ    BP, 64(SP)
	// 栈底修改为栈顶地址+64
        0x001c 00028 (slice.go:5)       LEAQ    64(SP), BP
        ...
        0x0021 00033 (slice.go:6)       LEAQ    type.int(SB), AX
        ...
	// 下面三行实际是把tuntime.makeslice放到栈上的指定位置
        0x0028 00040 (slice.go:6)       MOVQ    AX, (SP)
        0x002c 00044 (slice.go:6)       MOVQ    $999, 8(SP)
        0x0035 00053 (slice.go:6)       MOVQ    $999, 16(SP)

上面的部分画个图可能更清晰些: image.png

继续看汇编:

	// 调用runtime.makeslice函数
        0x003e 00062 (slice.go:6)       CALL    runtime.makeslice(SB)
        ...
	// 将返回值加载到AX寄存器
        0x0043 00067 (slice.go:6)       MOVQ    24(SP), AX
        ...
	// 下面就是调用fmt.Println函数的代码了
        0x0048 00072 (slice.go:7)       MOVQ    AX, (SP)
        0x004c 00076 (slice.go:7)       MOVQ    $999, 8(SP)
        0x0055 00085 (slice.go:7)       MOVQ    $999, 16(SP)
        0x005e 00094 (slice.go:7)       CALL    runtime.convTslice(SB)
        ...
        0x0063 00099 (slice.go:7)       MOVQ    24(SP), AX
        ...
        0x0068 00104 (slice.go:7)       XORPS   X0, X0
        0x006b 00107 (slice.go:7)       MOVUPS  X0, ""..autotmp_1+48(SP)
        ...
        0x0070 00112 (slice.go:7)       LEAQ    type.[]int(SB), CX
        ...
        0x0077 00119 (slice.go:7)       MOVQ    CX, ""..autotmp_1+48(SP)
        ...
        0x007c 00124 (slice.go:7)       MOVQ    AX, ""..autotmp_1+56(SP)
        ...
        0x0081 00129 (slice.go:7)       LEAQ    ""..autotmp_1+48(SP), AX
        ...
        0x0086 00134 (slice.go:7)       MOVQ    AX, (SP)
        0x008a 00138 (slice.go:7)       MOVQ    $1, 8(SP)
        0x0093 00147 (slice.go:7)       MOVQ    $1, 16(SP)
        0x009c 00156 (slice.go:7)       CALL    fmt.Println(SB)
        0x00a1 00161 (slice.go:8)       MOVQ    64(SP), BP
        0x00a6 00166 (slice.go:8)       ADDQ    $72, SP
        0x00aa 00170 (slice.go:8)       RET
        0x00ab 00171 (slice.go:8)       NOP
        ...
        0x00ab 00171 (slice.go:5)       CALL    runtime.morestack_noctxt(SB)
        ...
        0x00b0 00176 (slice.go:5)       JMP     0

上面出现了一个关键函数,即runtime.makeslice,(在堆上分配时才会调用这个函数)我们看下它的实现:

func makeslice(et *_type, len, cap int) unsafe.Pointer {
	// 这里实际是计算切片所占的内存大小,即元素的大小乘容量
	// mem为所需内存大小,overflow标识是否溢出
	mem, overflow := math.MulUintptr(et.size, uintptr(cap))
	if overflow || mem > maxAlloc || len < 0 || len > cap {
		// 如果溢出或者所需内存大于最大可分配内存或者len、cap不合法则报错
		mem, overflow := math.MulUintptr(et.size, uintptr(len))
		if overflow || mem > maxAlloc || len < 0 {
			panicmakeslicelen()
		}
		panicmakeslicecap()
	}
	// 调用mallocgc从go内存管理器获取一块内存
	return mallocgc(mem, et, true)
}

函数传参

切片作为函数参数传参时实际上是复制了一个runtime.slice结构体,而非是传递的runtime.slice结构体指针,举个栗子:

func main() {
	slice := []int{0,1,2}
	foo(slice)
}
func foo(slice []int) {
	...
}

其实就等价于

type Slice struct {
	ptr *[3]int
        len int
	cap int
}

func main() {
	slice := Slice{&[3]int{1,2,3}, 0, 0}
	foo(slice)
}
func foo(slice Slice) {
	...
}

因为函数的形参与实参共享同一个数组,这就导致当把一个切片作为参数传递到另一个函数时,在函数内修改形参的某个下标的值时也会修改了实参。描述的比较绕,下面看一个实例:

func main() {
	param := []int{0, 1, 2}
	foo(param)
	fmt.Println(param)
}

func foo(arg []int) {
	arg[1] = 10
}

打印结果为[0,10,2],原因是param与arg共享同一个底层数组,函数foo内修改了arg[1]实际是将两者的底层数组下标为1的元素修改为了10,所以main函数中的param[1]也就变成了10。 在foo函数内修改arg的len字段,是不会影响到param的len的,下面我们验证下:

func main() {
	param := []int{0, 1, 2}
	foo(param)
	fmt.Println(param)
	fmt.Println(len(param))
}

func foo(arg []int) {
	arg[1] = 10
	argSlice := (*reflect.SliceHeader)(unsafe.Pointer(&arg))
	argSlice.Len = 10
	fmt.Println(len(arg))
}

打印结果如下:

10
[0 10 2]
3

验证成功。

切片扩容

当通过append函数向切片中添加数据时,如果切片的容量不足,需要进行扩容,实际调用的是runtime包中的growslice()函数

// runtime/slice.go
func growslice(et *_type, old slice, cap int) slice {
	...

	// 下面就是计算新容量的部分了
	newcap := old.cap
	doublecap := newcap + newcap
	if cap > doublecap {
		// 如果所需容量大于当前容量的两倍,则新容量为所需容量
		newcap = cap
	} else {
		// 下面是所需容量<=当前容量两倍的逻辑
		if old.len < 1024 {
			// 如果当前长度<1024则新容量为当前容量x2
			newcap = doublecap
		} else {
			// 下面是当前长度>=1024的逻辑
			// 新容量每次增加自身的1/4,直到超过所需容量
			for 0 < newcap && newcap < cap {
				newcap += newcap / 4
			}
			// 如果溢出则新容量为所需容量
			if newcap <= 0 {
				newcap = cap
			}
		}
	}

	// 此处省略分配内存的代码
	...

	// p为新分配的底层数组的地址
	// 从old.array处拷贝lenmem个字节到p
	memmove(p, old.array, lenmem)
	// 返回新的切片
	return slice{p, old.len, newcap}
}
阅读全文 »

Go源码解析之sync.Mutex锁

本文使用Golang版本为:go1.13.4

Mutex的使用

先通过一段简单代码看下Go中Mutex的用法

func main() {
	a := 1
	m := sync.Mutex{}
	go func(){
		m.Lock()
		b := a
		a = b + 1
		m.Unlock()
	}()

	m.Lock()
	fmt.Println(a)
	m.Unlock()
}

Mutex的设计

在解释Lock()和Unlock()源码之前我们必须先整体了解下Mutex的设计,不然下面的源码很难看懂。

我们首先看下sync.Mutex这个结构体

type Mutex struct {
	state int32 // 锁的当前状态,共三种
	sema  uint32 // 信号量,用于阻塞和唤醒goroutine
}

锁的三个状态,它们使用Mutex.state的低三位来标识

mutexLocked = 1 << iota // 锁定状态,二进制表示即 ...001
mutexWoken // 唤醒状态,二进制表示即 ...010
mutexStarving // 饥饿状态,二进制表示即...100

mutexLocked位于state的第一位,mutexWoken位于state的第二位,mutexStarving位于state的第三位,如下图: image.png

Mutex锁有两种模式:正常模式和饥饿模式。正常模式时,waiter按照先到先得的方式获取锁,一个waiter被唤醒后并不能直接获取到锁,它需要与新到的goroutine抢占锁,但是新到的goroutine已经在CPU上运行了,所以它大概率抢不过新到的goroutine,如果抢不到锁waiter就需要在等待队列队头继续等待,而这可能会导致一个waiter等待很长时间。为了避免waiter等待过久,当waiter超过1ms没有抢到锁时就会将当前锁切换到饥饿模式。

切换到饥饿模式后,锁将从解锁的goroutine切换到等待队列的队头waiter,新来的goroutine不会去尝试获取锁,也不会自旋,它们会排到等待队列的队尾。

如果某waiter获取到了锁,那么在满足以下两个条件之一时,它会将当前锁从饥饿模式切换到正常模式。

  1. 它是最后一个waiter
  2. 它等待锁的时间不到1ms

了解了Mutex的设计后我们再继续看Lock()与Unlock()的实现。

加锁Lock()的实现

func (m *Mutex) Lock() {
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		// 这里本有竞争检测的代码,无意义,已被我删除
		return
	}
	m.lockSlow()
}

函数中首先通过CAS操作尝试获得锁,如果m.state为0即当前锁闲置就将它设置为1,如果尝试失败则进入m.lockSlow()

m.lockSlow()的实现

m.lockSlow()中用到了这几个函数:runtime_canSpin()runtime_doSpin()runtime_SemacquireMutex(),我们先挨个解释下这几个函数的作用再看m.lockSlow()的源码。

runtime_canSpin()

该函数的作用是判断能够进入自旋,下面看下源码

// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool { // i是当前自旋次数
	if i >= 4|| ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
		return false
	}
	if p := getg().m.p.ptr(); !runqempty(p) {
		return false
	}
	return true
}

通过这个函数我们可以看到,runtime层判断能够自旋必须满足以下几个条件

  • 当前自旋次数不能>=4
  • 必须是多核CPU
  • 至少有一个其他正在运行的P
  • 当前P本地G队列为空

这里解释下gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1这个条件: gomaxprocs是进程中P数量上限,sched.npidle是空闲的P的数量、sched.nmspinning是自旋中的M的数量gomaxprocs - sched.npidle - sched.nmspinning=当前运行中的P的数量,当前运行中的P数量-1(当前P) = 其他P的数量,所以这个条件就是至少有一个其他正在运行的P。

runtime_doSpin()

其源码为:

//go:linkname sync_runtime_doSpin sync.runtime_doSpin
//go:nosplit
func sync_runtime_doSpin() {
	procyield(30)
}

这里我们仅看下AMD64平台上proyield的实现:

TEXT runtime·procyield(SB),NOSPLIT,$0-0
	MOVL	cycles+0(FP), AX // 将第一个参数即30加载到AX寄存器
again:
	PAUSE // CPU空转,达到占用CPU的效果
	SUBL	$1, AX // AX寄存器-1
	JNZ	again // 如果不为0则继续执行PAUSE指令,否则退出
	RET

到这里可以看出runtime_doSpin()实际就是CPU空转30次。

runtime_SemacquireMutex()

其实现位于runtime包的sema.go文件中

//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
	semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}

semacquire1的实现并非本文重点,这里大概解释下这个函数的作用:

  1. 如果lifo为true,则加到等待队列队头
  2. 如果lifo为false,则加到等待队列队尾
m.lockSlow()

了解了上面几个函数后我们来看下m.lockSlow()中是怎么处理的吧

func (m *Mutex) lockSlow() {
	var waitStartTime int64
	starving := false // 饥饿模式标志
	awoke := false // 唤醒标志
	iter := 0 // 已进行的自旋次数
	old := m.state // 保存当前锁状态
	for {
		// 进入自旋需要满足三个条件
		// 1. 当前锁状态是锁定状态,如果不是锁定状态就退出自旋尝试获取锁
		// 2. 当前不是饥饿状态,原因是饥饿状态时自旋无意义,因为锁会交给等待队列中的第一个waiter
		// 3. runtime_canSpin判断能够自旋
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				// 如果没有唤醒 且 当前锁状态不在唤醒状态
				// 且 当前有等待者则尝试通过CAS将锁状态标记为唤醒
				// 标记为唤醒后,Unlock()中就不会通过信号量唤醒其他锁定的goroutine了
				// 如果CAS成功则标识自己为唤醒
				awoke = true
			}
			// CPU空转30次
			runtime_doSpin()
			// 自旋次数+1
			iter++
			// 更新当前锁状态
			old = m.state
			// 继续尝试自旋
			continue
		}

		// 如果判断不能进入自旋则进入以下逻辑
		// 进到这里有三种情况:
		// 1. 当前已解锁,锁处于正常状态
		// 2. 当前已解锁,锁处于饥饿状态
		// 3. 当前未解锁,锁处于正常状态
		// 4. 当前未解锁,锁处于饥饿状态

		// old是锁的当前状态,new是期望状态,在下面会尝试将锁通过CAS更新为期望状态
		new := old
		if old&mutexStarving == 0 {
			// 如果当前锁是正常状态则尝试获取锁
			new |= mutexLocked
		}
		if old&(mutexLocked|mutexStarving) != 0 {
			// 等待数+1
			// 如果锁当前处于饥饿状态,当前goroutine不能获取锁,需要进到等待队列队尾排队等待,所以等待数需要+1
			// 如果当前锁处于锁定状态,也需要进到等待队列等待
			new += 1 << mutexWaiterShift
		}
		if starving && old&mutexLocked != 0 {
			// 如果当前处于饥饿模式并且锁定状态
			// 则尝试设置为饥饿状态
			new |= mutexStarving
		}
		if awoke {
			if new&mutexWoken == 0 {
				// 如果当前goroutine抢到了唤醒,但是唤醒标志还为0说明出现了异常情况
				throw("sync: inconsistent mutex state")
			}
			// 如果在自旋时当前goroutine抢到唤醒了,则尝试将锁标记为未唤醒
			new &^= mutexWoken
		}
		// 尝试将锁状态由旧状态修改为期望状态
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			// 修改成功
			// 如果旧状态既不是锁定状态也不是饥饿状态
			// 说明了抢到了锁,则退出循环
			if old&(mutexLocked|mutexStarving) == 0 {
				break
			}
			
			queueLifo := waitStartTime != 0
			if waitStartTime == 0 {
				// 记录等待开始时间
				waitStartTime = runtime_nanotime()
			}
			// 通过信号量阻塞当前goroutine
			// 如果waitStartTime为0,则说明当前goroutine是一个新来的goroutine,那么queueLifo=false,意味加到队尾。
			// 如果waitStartTime不为0,意味当前goroutine是一个被唤醒的goroutine,那么queueLifo=true,意味着加到队头
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			// 如果等待时间超过了1ms则切换到饥饿模式
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			// 更新当前锁状态
			old = m.state
			// 如果当前锁处于饥饿状态
			if old&mutexStarving != 0 {
				// 如果当前锁处于锁定状态或者唤醒状态或者没有waiter,异常
				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				}
				// 因为当前goroutine已经获取了锁,delta用于将等待队列-1
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				// 如果当前不是锁定模式或者只有一个waiter
				// 就通过delta -= mutexStarving和atomic.AddInt32操作将锁的饥饿状态位设置为0,表示为正常模式
				if !starving || old>>mutexWaiterShift == 1 {
					delta -= mutexStarving
				}
				atomic.AddInt32(&m.state, delta)
				break
			}
			awoke = true
			iter = 0
		} else {
			old = m.state
		}
	}
}

同样的,我已将无关代码和注释删除。

解锁Unlock()的实现

func (m *Mutex) Unlock() {
        // 将锁定状态置为0
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
	    // 如果锁上存在等待者或者处于饥饿模式则进入unlockSlow()
		m.unlockSlow(new)
	}
}

Unlock()本身非常简单,下面重点关注下unlockSlow()的实现

func (m *Mutex) unlockSlow(new int32) {
	if (new+mutexLocked)&mutexLocked == 0 {
		// 如果解锁一个未锁定的锁则抛出异常
		throw("sync: unlock of unlocked mutex")
	}
	if new&mutexStarving == 0 {
		// 处于正常模式
		old := new
		for {
			// 如果没有等待者则无需唤醒任何goroutine,另外以下三种情况也无需唤醒
			// 1. 锁处于锁定状态,说明Unlock()解锁后紧接着就被其他goroutine获取,就不用再唤醒了
			// 2. 锁处于唤醒状态,说明有等待的goroutine已经被唤醒了,不用再尝试唤醒了
			// 3. 锁处于饥饿模式,锁会交给等待队列队头的等待者,不能往下进行
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				
				return
			}
			// 流程走到这里说明当前有等待者并且锁处于空闲状态(三个标志位都为0)
			// 说明等待者还没有被唤醒,需要唤醒等待者
			// 通过CAS将等待者数量-1,并且设置为唤醒
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				// 通过信号量唤醒等待者goroutine,然后退出
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			// CAS修改失败,说明锁的状态已经被修改,有以下几种可能性:
			// 1. 有新的等待者进来
			// 2. 锁被其他goroutine获取(Unlokc()中已经解锁了,走到这里可能已经被其他goroutine)
			// 3. 锁进入了饥饿模式
	
			// 更新锁状态,进入到下一个循环
			old = m.state
		}
	} else {
		// 处于饥饿模式则直接通过信号量唤醒等待队列头的goroutine
		// 此时state的mutexLocked还没有加锁,唤醒的goroutine会持有锁
		// 在此期间,如果有新的goroutine来请求锁, 因为mutex处于饥饿状态,不会抢占锁
		runtime_Semrelease(&m.sema, true, 1)
	}
}

后言

Mutex虽然代码简单,但由于并行的原因导致case太多,所以还是不太好理解了,建议大家代入到具体的场景中去分析。

阅读全文 »

深入理解原子操作的本质

引言

本文以go1.14 darwin/amd64中的原子操作为例,探究原子操作的汇编实现,引出LOCK指令前缀可见性MESI协议Store BufferInvalid Queue内存屏障,通过对CPU体系结构的探究,从而理解以上概念,并在最终给出一些事实。

Go中的原子操作

我们以atomic.CompareAndSwapInt32为例,它的函数原型是:

func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

对应的汇编代码为:

// sync/atomic/asm.s 24行
TEXT ·CompareAndSwapInt32(SB),NOSPLIT,$0
	JMP	runtime∕internal∕atomic·Cas(SB)

通过跳转指令JMP跳转到了runtime∕internal∕atomic·Cas(SB),由于架构的不同对应的汇编代码也不同,我们看下amd64平台对应的代码:

// runtime/internal/atomic/asm_amd64.s 17行
TEXT runtime∕internal∕atomic·Cas(SB),NOSPLIT,$0-17
	MOVQ	ptr+0(FP), BX // 将函数第一个实参即addr加载到BX寄存器
	MOVL	old+8(FP), AX // 将函数第二个实参即old加载到AX寄存器
	MOVL	new+12(FP), CX // // 将函数第一个实参即new加载到CX寄存器
	LOCK // 本文关键指令,下面会详述
	CMPXCHGL	CX, 0(BX) // 把AX寄存器中的内容(即old)与BX寄存器中地址数据(即addr)指向的数据做比较如果相等则把第一个操作数即CX中的数据(即new)赋值给第二个操作数
	SETEQ	ret+16(FP) // SETEQ与CMPXCHGL配合使用,在这里如果CMPXCHGL比较结果相等则设置本函数返回值为1,否则为0(16(FP)是返回值即swapped的地址)
	RET // 函数返回

从上面代码中可以看到本文的关键:LOCK。它实际是一个指令前缀,它后面必须跟read-modify-write指令,比如:ADD, ADC, AND, BTC, BTR, BTS, CMPXCHG, CMPXCH8B, CMPXCHG16B, DEC, INC, NEG, NOT, OR, SBB, SUB, XOR, XADD, XCHG

LOCK实现原理

在早期CPU上LOCK指令会锁总线,即其他核心不能再通过总线与内存通讯,从而实现该核心对内存的独占。

这种做法虽然解决了问题但是性能太差,所以在Intel P6 CPU(P6是一个架构,并非具体CPU)引入一个优化:如果数据已经缓存在CPU cache中,则锁缓存,否则还是锁总线。

Cache Coherency

CPU Cache与False Sharing 一文中详细介绍了CPU缓存的结构,CPU缓存带来了一致性问题,举个简单的例子:

// 假设CPU0执行了该函数
var a int = 0
go func fnInCpu0() {
    time.Sleep(1 * time.Second)
    a = 1 // 2. 在CPU1加载完a之后CPU0仅修改了自己核心上的cache但是没有同步给CPU1
}()
// CPU1执行了该函数
go func fnInCpu1() {
    fmt.Println(a) // 1. CPU1将a加载到自己的cache,此时a=0
    time.Sleep(3 * time.Second)
    fmt.Println(a) // 3. CPU1从cache中读到a=0,但此时a已经被CPU0修改为0了
}()

上例中由于CPU没有保证缓存的一致性,导致了两个核心之间的同一数据不可见从而程序出现了问题,所以CPU必须保证缓存的一致性,下面将介绍CPU是如何通过MESI协议做到缓存一致的。

MESI是以下四种cacheline状态的简称:

  • M(Modified):此状态为该cacheline被该核心修改,并且保证不会在其他核心的cacheline上
  • E(Exclusive):标识该cacheline被该核心独占,其他核心上没有该行的副本。该核心可直接修改该行而不用通知其他核心。
  • S(Share):该cacheline存在于多个核心上,但是没有修改,当前核心不能直接修改,修改该行必须与其他核心协商。
  • I(Invaild):该cacheline无效,cacheline的初始状态,说明要么不在缓存中,要么内容已过时。

核心之间协商通信需要以下消息机制:

  • Read: CPU发起数据读取请求,请求中包含数据的地址
  • Read Response: Read消息的响应,该消息有可能是内存响应的,有可能是其他核心响应的(即该地址存在于其他核心上cacheline中,且状态为Modified,这时必须返回最新数据)
  • Invalidate: 核心通知其他核心将它们自己核心上对应的cacheline置为Invalid
  • Invalidate ACK: 其他核心对Invalidate通知的响应,将对应cacheline置为Invalid之后发出该确认消息
  • Read Invalidate: 相当于Read消息+Invalidate消息,即当前核心要读取数据并修改该数据。
  • Write Back: 写回,即将Modified的数据写回到低一级存储器中,写回会尽可能地推迟内存更新,只有当替换算法要驱逐更新过的块时才写回到低一级存储器中。

手画状态转移图

image.png

这里有个存疑的地方:CPU从内存中读到数据I状态是转移到S还是E,查资料时两种说法都有。个人认为应该是E,因为这样另外一个核心要加载副本时只需要去当前核心上取就行了不需要读内存,性能会更高些,如果你有不同看法欢迎在评论区交流。

一些规律

  1. CPU在修改cacheline时要求其他持有该cacheline副本的核心失效,并通过Invalidate ACK来接收反馈
  2. cacheline为M意味着内存上的数据不是最新的,最新的数据在该cacheline上
  3. 数据在cacheline时,如果状态为E,则直接修改;如果状态为S则需要广播Invalidate消息,收到Invalidate ACK后修改状态为M;如果状态为I(包括cache miss)则需要发出Read Invalidate

Store Buffer

当CPU要修改一个S状态的数据时需要发出Invalidate消息并等待ACK才写数据,这个过程显然是一个同步过程,但这对于对计算速度要求极高的CPU来说显然是不可接受的,必须对此优化。 因此我们考虑在CPU与cache之间加一个buffer,CPU可以先将数据写入到这个buffer中并发出消息,然后它就可以去做其他事了,待消息响应后再从buffer写入到cache中。但这有个明显的逻辑漏洞,考虑下这段代码:

a = 1
b = a + 1

假设a初始值为0,然后CPU执行a=1,数据被写入Store Buffer还没有落地就紧接着执行了b=a+1,这时由于a还没有修改落地,因此CPU读到的还是0,最终计算出来b=1。

为了解决这个明显的逻辑漏洞,又提出了Store Forwarding:CPU可以把Buffer读出来传递(forwarding)给下面的读取操作,而不用去cache中读。 image.png

这倒是解决了上面的漏洞,但是还存在另外一个问题,我们看下面这段代码:

a = 0
flag = false
func runInCpu0() {
    a = 1
    flag = true
}

func runInCpu1() {
    while (!flag) {
   	continue
    }
    print(a)
}

对于上面的代码我们假设有如下执行步骤:

  1. 假定当前a存在于cpu1的cache中,flag存在于cpu0的cache中,状态均为E。
  2. cpu1先执行while(!flag),由于flag不存在于它的cache中,所以它发出Read flag消息
  3. cpu0执行a=1,它的cache中没有a,因此它将a=1写入Store Buffer,并发出Invalidate a消息
  4. cpu0执行flag=true,由于flag存在于它的cache中并且状态为E,所以将flag=true直接写入到cache,状态修改为M
  5. cpu0接收到Read flag消息,将cache中的flag=true发回给cpu1,状态修改为S
  6. cpu1收到cpu0的Read Response:flat=true,结束while(!flag)循环
  7. cpu1打印a,由于此时a存在于它的cache中a=0,所以打印出来了0
  8. cpu1此时收到Invalidate a消息,将cacheline状态修改为I,但为时已晚
  9. cpu0收到Invalidate ACK,将Store Buffer中的数据a=1刷到cache中

从代码角度看,我们的代码好像变成了

func runInCpu0() {
    flag = true
    a = 1
}

好像是被重新排序了,这其实是一种 伪重排序,必须提出新的办法来解决上面的问题

写屏障

CPU从软件层面提供了 写屏障(write memory barrier) 指令来解决上面的问题,linux将CPU写屏障封装为smp_wmb()函数。写屏障解决上面问题的方法是先将当前Store Buffer中的数据刷到cache后再执行屏障后面的写入操作。

SMP: Symmetrical Multi-Processing,即多处理器。

这里你可能好奇上面的问题是硬件问题,CPU为什么不从硬件上自己解决问题而要求软件开发者通过指令来避免呢?其实很好回答:CPU不能为了这一个方面的问题而抛弃Store Buffer带来的巨大性能提升,就像CPU不能因为分支预测错误会损耗性能增加功耗而放弃分支预测一样。

还是以上面的代码为例,前提保持不变,这时我们加入写屏障:

a = 0
flag = false
func runInCpu0() {
    a = 1
    smp_wmb()
    flag = true
}

func runInCpu1() {
    while (!flag) {
   	continue
    }
    print(a)
}

当cpu0执行flag=true时,由于Store Buffer中有a=1还没有刷到cache上,所以会先将a=1刷到cache之后再执行flag=true,当cpu1读到flag=true时,a也就=1了。

有文章指出CPU还有一种实现写屏障的方法:CPU将当前store buffer中的条目打标,然后将屏障后的“写入操作”也写到Store Buffer中,cpu继续干其他的事,当被打标的条目全部刷到cache中,之后再刷后面的条目。

Invalid Queue

上文通过写屏障解决了伪重排序的问题后,还要思考另一个问题,那就是Store Buffer size是有限的,当Store Buffer满了之后CPU还是要卡住等待Invalidate ACK。Invalidate ACK耗时的主要原因是CPU需要先将自己cacheline状态修改I后才响应ACK,如果一个CPU很繁忙或者处于S状态的副本特别多,可能所有CPU都在等它的ACK。

CPU优化这个问题的方式是搞一个Invalid Queue,CPU先将Invalidate消息放到这个队列中,接着就响应Invalidate ACK。然而这又带来了新的问题,还是以上面的代码为例

a = 0
flag = false
func runInCpu0() {
    a = 1
    smp_wmb()
    flag = true
}

func runInCpu1() {
    while (!flag) {
   	continue
    }
    print(a)
}

我们假设a在CPU0和CPU1中,且状态均为S,flag由CPU0独占

  1. CPU0执行a=1,因为a状态为S,所以它将a=1写入Store Buffer,并发出Invalidate a消息
  2. CPU1执行while(!flag),由于其cache中没有flag,所以它发出Read flag消息
  3. CPU1收到CPU0的Invalidate a消息,并将此消息写入了Invalid Queue,接着就响应了Invlidate ACK
  4. CPU0收到CPU1的Invalidate ACK后将a=1刷到cache中,并将其状态修改为了M
  5. CPU0执行到smp_wmb(),由于Store Buffer此时为空所以就往下执行了
  6. CPU0执行flag=true,因为flag状态为E,所以它直接将flag=true写入到cache,状态被修改为了M
  7. CPU0收到了Read flag消息,因为它cache中有flag,因此它响应了Read Response,并将状态修改为S
  8. CPU1收到Read flag Response,此时flag=true,所以结束了while循环
  9. CPU1打印a,由于a存在于它的cache中且状态为S,所以直接将cache中的a打印出来了,此时a=0,这显然发生了错误。
  10. CPU1这时才处理Invalid Queue中的消息将a状态修改为I,但为时已晚

为了解决上面的问题,CPU提出了读屏障指令,linux将其封装为了smp_rwm()函数。放到我们的代码中就是这样:

...
func runInCpu1() {
    while (!flag) {
   	continue
    }
    smp_rwm()
    print(a)
}

当CPU执行到smp_rwm()时,会将Invalid Queue中的数据处理完成后再执行屏障后面的读取操作,这就解决了上面的问题了。

除了上面提到的读屏障和写屏障外,还有一种全屏障,它其实是读屏障和写屏障的综合体,兼具两种屏障的作用,在linux中它是smp_mb()函数。 文章开始提到的LOCK指令其实兼具了内存屏障的作用。

几个问题

问题1: CPU采用MESI协议实现缓存同步,为什么还要LOCK

答: 1. MESI协议只维护缓存一致性,与可见性有关,与原子性无关。一个非原子性的指令需要加上lock前缀才能保证原子性。

问题2: 一条汇编指令是原子性的吗

  1. read-modify-write 内存的指令不是原子性的,以INC mem_addr为例,我们假设数据已经缓存在了cache上,指令的执行需要先将数据从cache读到执行单元中,再执行+1,然后写回到cache。
  2. 对于没有对齐的内存,读取内存可能需要多次读取,这不是原子性的。(在某些CPU上读取未对齐的内存是不被允许的)
  3. 其他未知原因…

问题3: Go中的原子读

我们看一个读取8字节数据的例子,直接看golang atomic.LoadUint64()汇编:

// uint64 atomicload64(uint64 volatile* addr);
1. TEXT runtime∕internal∕atomic·Load64(SB), NOSPLIT, $0-12
2.	MOVL	ptr+0(FP), AX // 将第一个参数加载到AX寄存器
3.	TESTL	$7, AX // 判断内存是否对齐
4.	JZ	2(PC) // 跳到这条指令的下两条处,即跳转到第6行
5.	MOVL	0, AX // crash with nil ptr deref 引用0x0地址会触发错误
6.	MOVQ	(AX), M0 // 将内存地址指向的数据加载到M0寄存器
7.	MOVQ	M0, ret+4(FP) // 将M0寄存器中数据(即内存指向的位置)给返回值
8.	EMMS // 清除M0寄存器
9.	RET

第3行TESTL指令对两个操作数按位与,如果结果为0,则将ZF设置为1,否则为0。所以这一行其实是判断传进来的内存地址是不是8的整数倍。

第4行JZ指令判断如果ZF即零标志位为1则执行跳转到第二个操作数指定的位置,结合第三行就是如果传入的内存地址是8的整数倍,即内存已对齐,则跳转到第6行,否则继续往下执行。

关于内存对齐可以看下我这篇文章:理解内存对齐

虽然MOV指令是原子性的,但是汇编中貌似没有加入内存屏障,那Golang是怎么实现可见性的呢?我这里也并没有完全的理解,不过大概意思是Golang的atomic会保证顺序一致性,详情可看下这篇文章:Memory Order Guarantees in Go

问题4:Go中的原子写

仍然以写一个8字节数据的操作为例,直接看golang atomic.LoadUint64()汇编:

TEXT runtime∕internal∕atomic·Store64(SB), NOSPLIT, $0-16
	MOVQ	ptr+0(FP), BX
	MOVQ	val+8(FP), AX
	XCHGQ	AX, 0(BX)
	RET

虽然没有LOCK指令,但XCHGQ指令具有LOCK的效果,所以还是原子性而且可见的。

总结

这篇文章花费了我大量的时间与精力,主要原因是刚开始觉得原子性只是个小问题,但是随着不断的深入挖掘,翻阅无数资料,才发现底下潜藏了无数的坑。 s70KdH.png

由于精力原因本文还有一些很重要的点没有讲到,比如acquire/release 语义等等。

另外客观讲本文问题很多,较真的话可能会对您造成一定的困扰,建议您可以将本文作为您研究计算机底层架构的一个契机,自行研究这方面的技术。

参考资料

阅读全文 »

golang unsafe.Pointer与uintptr

先说结论

  • uintptr 是一个地址数值,它不是指针,与地址上的对象没有引用关系,垃圾回收器不会因为有一个uintptr类型的值指向某对象而不回收该对象。
  • unsafe.Pointer是一个指针,类似于C的void *,它与地址上的对象存在引用关系,垃圾回收器因为有一个unsafe.Pointer类型的值指向某对象而不回收该对象。
  • 任何指针都可以转为unsafe.Pointer
  • unsafe.Pointer可以转为任何指针
  • uintptr可以转换为unsafe.Pointer
  • unsafe.Pointer可以转换为uintptr
  • 指针不能直接转换为uintptr

为什么需要uintptr这个类型呢?

理论上说指针不过是一个数值,即一个uint,但实际上在go中unsafe.Pointer是不能通过强制类型转换为一个uint的,只能将unsafe.Pointer强制类型转换为一个uintptr。

var v1 float64 = 1.1
var v2 *float64 = &v1
_ = int(v2) // 这里编译报错:cannot convert unsafe.Pointer(v2) (type unsafe.Pointer) to type uint

但是可以将一个unsafe.Pointer强制类型转换为一个uintptr:

var v1 float64 = 1.1
var v2 *float64 = &v1
var v3 uintptr = uintptr(unsafe.Pointer(v2))
v4 := uint(v3)
fmt.Println(v3, v4) // v3和v4打印出来的值是相同的

可以理解为uintptr是专门用来指针操作的uint。 另外需要指出的是指针不能直接转为uintptr,即

var a float64
uintptr(&a) 这里会报错,不允许将*float64转为uintptr

一个🌰

通过上面的描述如果你还是一头雾水的话,不妨看下下面这个实际案例:

package foo

type Person struct {
	Name string
	age  int
}

上面的代码中我们在foo包中定义了一个结构体Person,只导出了Name字段,而没有导出age字段,就是说在另外的包中我们只能直接操作Person.Name而不能直接操作Person.age,但是利用unsafe包可以绕过这个限制使我们能够操作Person.age

package main

func main() {
	p := &foo.Person{
		Name: "张三",
	}

	fmt.Println(p)
	// *Person是不能直接转换为*string的,所以这里先将*Person转为unsafe.Pointer,再将unsafe.Pointer转为*string
	pName := (*string)(unsafe.Pointer(p)) 
	*pName = "李四"

	// 正常手段是不能操作Person.age的这里先通过uintptr(unsafe.Pointer(pName))得到Person.Name的地址
	// 通过unsafe.Sizeof(p.Name)得到Person.Name占用的字节数
	// Person.Name的地址 + Person.Name占用的字节数就得到了Person.age的地址,然后将地址转为int指针。
	pAge := (*int)(unsafe.Pointer((uintptr(unsafe.Pointer(pName)) + unsafe.Sizeof(p.Name))))
	// 将p的age字段修改为12
	*pAge = 12

	fmt.Println(p)
}

打印结果为:

$ go run main.go
&{张三 0}
&{李四 12}

需要注意的是下面这段代码比较长:

pAge := (*int)(unsafe.Pointer((uintptr(unsafe.Pointer(pName)) + unsafe.Sizeof(p.Name))))

但是尽量不要分成两段代码,像这样:

temp := uintptr(unsafe.Pointer(pName)) + unsafe.Sizeof(p.Name))
pAge := (*int)(unsafe.Pointer(temp)

原因是在第二行语句时,已经没有指针指向p了,这时p可能会回收掉了,这时得到的地址temp就是个野指针了,不知道指向谁了,是比较危险的。

另外一个原因是在当前Go(golang版本:1.14)的内存管理机制中不会迁移内存,但是不保证以后的版本内存管理机制中有迁移内存的操作,一旦发生了内存迁移指针地址发生变更,上面的分段代码就有可能出现严重问题。

关于Go的内存管理可以参看这篇文章:https://draveness.me/golang/docs/part3-runtime/ch07-memory/golang-memory-allocator/,读完这篇文章相信你就能理解上面的内存迁移问题。

除了上面两点外还有一个原因是在Go 1.3上,当栈需要增长时栈可能会发生移动,对于下面的代码:

var obj int
fmt.Println(uintptr(unsafe.Pointer(&obj)))
bigFunc() // bigFunc()增大了栈
fmt.Println(uintptr(unsafe.Pointer(&obj)))

完全有可能打印出来两个地址。

通过上面的例子应该明白了为什么这个包名为unsafe,因为使用起来确实有风险,所以尽量不要使用这个包。

我之所以研究unsafe.Pointer完全是因为我要在多线程的环境中采用原子操作避免竞争问题,所以我用到了atomic.LoadPointer(addr *unsafe.Pointer)。不过我后面发现了atomic包提供了一个atomic.Value结构体,这个结构体提供的方法使我避免显式使用了unsafe.Pointer。所以你也正在使用atomic.LoadPointer()不妨看看atomic.Value是不是可以解决你的问题,这是我一点提醒。

参考资料

阅读全文 »

CPU Cache与False Sharing

一、CPU 缓存架构

现代多核CPU会在每个核心上加上一个较小的SRAM高速缓存存储器称为:L1高速缓存,其中L1缓存由分为dcache数据缓存,icache指令缓存。在L1缓存的下级加一个较大的L2高速缓存, 然后会再L2之下加一个多核共享的L3高速缓存。它们之间的逻辑结构大概是这样的: image.png

相较于访问CPU高速缓存来说访问主存简直太慢了,Jeff Dean曾给出过这样一组数字:

  • L1缓存访问时间 0.5ns
  • 分支预测错误 5ns
  • L2缓存访问时间 7ns
  • 主存访问 100ns

https://colin-scott.github.io/personal_website/research/interactive_latency.html 给出了不同年份这些指标的数字,

1.1 通用的高速缓存存储器结构

告诉缓存被划分为S = 2 ^ s高速缓存组(cache set),每个组含有E个高速缓存行(cache line),每个行又由B = 2 ^ b个字节的数据块(block)和有一个标识该行是否有效(即是否已过期或者已修改)的有效位(valid bit),以及t标记位(tag bit)组成。物理结构大概是下图这样: image.png

高速缓存的大小指的是所有数据块的大小的和,即:B * E * S。

假设当前CPU地址总线有m位,从而主存有M=2^m个地址,所以每个内存地址会有m位。当CPU需要从主存加载地址为A的内存块上的字节时,会先从CPU高速缓存中获取,这时候问题来了,应该到高速缓存的什么位置去拿呢?实际上会将m位地址A划分为以下几段:

image.png

这里的m、t、b、s都是二进制数,不要想成十进制数哦

从而有m = t + b + s CPU会根据A地址中间的s位定位主存地址A映射到了哪个组中,然后根据开头的t位与组内的E个行的标记位挨个比对以确认地址A映射到了哪一行(这里有文章说是并行查询),这时会检查该行的有效位标识该行是否有效,如果有效的话最后根据后b位就直接定位主存地址A映射到了数据块中的哪个字节上了。如果通过以上的步骤没有找到对应的高速缓存地址的话,高速缓存会向主存请求包含A地址的数据块的一个拷贝,CPU必须等待,当被请求的块到达高速缓存中时高速缓存会将这个块放到对应的位置上,然后从数据块中抽取出A地址上的字节返回给CPU。注意:高速缓存从主存中一次请求的是一个数据块而非具体地址上的一个字节,这非常关键!

CSAPP书中提到了为什么选择中间位作为组索引位,其大概意思是选择中间位能够使连续内存映射到不同的组上,提高高速缓存利用率并且减小冲突覆盖的问题,但是个人感觉其解释是按照特定平台来描述的,并没有普适所有平台,这里就不以我昏昏使你昭昭了,待后续查阅更加合理的解释再说。

根据E的不同我们将高速缓存划分为以下三类:

  • 直接映射高速缓存
  • 组相连高速缓存
  • 全相连高速缓存

1.2 直接映射高速缓存

每组只有一行即E=1的高速缓存称为直接映射高速缓存。这种缓存一旦定位到了组后就无需查询对应行了。

1.2.1 直接映射高速缓存的冲突不命中

假设当前计算机b=16即数据块为16个字节,高速缓存中有两个组,s=5即内存地址的第5位决定内存地址映射到哪个组上,有下面的一段golang代码

func foo(x [8]int32, y [8]int32) int32 {
	var sum int32 = 0
	for i := 0; i < 8; i++ {
		sum += x[i] * y[i]
	}

	return sum
}

上面的程序中xy占用了8 * 4 = 2 ^ 5 = 32个字节,假设x被加载到地址为0-31的内存之中,y被加载到32-63之中,sum存在于寄存器中,不占用内存地址。如下图所示 image.png 运行时,第一次循环中,CPU需要先加载x[0],由于高速缓存中一开始并没有,所以会从主存中加载x[0]-x[3]共16个字节(数据块的大小)的数据到高速缓存的组0中再返回给CPU,接下来同样的道理会将y[0]-y[3]加载到高速缓存的组0中。这时候由于每组只有一个行就导致了上一步加载进来的x[0]-x[3]被覆盖了,下一次循环中要加载x[1]时,x[1]就不在高速缓存中了,所以又必须去内存中加载一次,结果从内存中加载会的数据又把第二次加载进来的y[0]-y[3]给覆盖了,之后的每次循环都存在这个问题,导致每次都回冲突不命中。这种高速缓存反复地加载和驱逐相同的高速缓存块的组的情况称为抖动(thrash)

为了解决这个问题我们可以将x和y两个数组的长度定为12,即:

func foo(x [12]int32, y [12]int32) int32

这样的话再看下分布情况: image.png

这样的话由于y[0]-y[3]与x[0]-x[3]不在一个组上就不会出现抖动问题了。

1.3 组相联高速缓存

组相联高速缓存每组中有多个行。

1.4 全相联高速缓存

全相联高速缓存只有一个组。

1.5 写的问题

如果CPU要写一个已经缓存了的字时,有两种方法将该数据写到下层缓存中: 1. 直写,最简单的一种方法,直接将数据写入到下层缓存。但是这种方案每次写都回引起总线流量 2. 写回,为每个行单独维护一个修改位dirty bit,标识这个缓存块被修改过,只有当替换算法要驱逐更新过的块时才将它写入到下一层缓存中。

写不命中通常有两种方法: 1. 写分配,加载低一层的缓存到高速缓存中,然后更新这个数据块。缺点是每次不命中都会导致一个块从低一层传送到高速缓存。 2. 非写分配,避开高速缓存,直接把这个数据写入到低一层中。

直写通常是非写分配的,写会通常是写分配的。

二、伪共享False Sharing

通过上文了解CPU的缓存结构后我们做一个实验来引出伪共享的问题,实验前我们先看下实验机器的一些信息。Mac上通过sysctl -a查看机器信息,这里我过滤了下只拿出来与此实验相关的一些机器指标:

hw.cachelinesize: 64 // Cacheline 64字节
hw.l1icachesize: 32768
hw.l1dcachesize: 32768 // L1数据缓存32K
hw.l2cachesize: 262144 // L2缓存256K
hw.l3cachesize: 6291456 // L3缓存6M
machdep.cpu.core_count: 4 // 4核
machdep.cpu.thread_count: 8

现在我们定义一个程序,有2个线程,两个变量a和b,线程1循环n次执行a++操作,线程2执行n次b++操作,我们用Go来描述:

type SimpleStruct struct {
	n int32
}

type PaddedStruct struct {
	n int32
	_ CacheLinePad
}

type CacheLinePad struct {
	_ [CacheLinePadSize]byte
}

const CacheLinePadSize = 64

const Num = 10000000

func BenchmarkSimple(b *testing.B) {
	structA := SimpleStruct{}
	structB := SimpleStruct{}
	wg := sync.WaitGroup{}

	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		wg.Add(2)
		go func() { // 为方便下文描述这个线程称为structA线程
			var j int32
			for j = 0; j < Num; j++ {
				structA.n += j
			}
			wg.Done()
		}()
		go func() { // 为方便下文描述这个线程称为structB线程
			var j int32
			for j = 0; j < Num; j++ {
				structB.n += j
			}
			wg.Done()
		}()
		wg.Wait()
	}
}

func BenchmarkSimplePad(b *testing.B) {
	structB := SimpleStruct{}
	structA := PaddedStruct{}
	wg := sync.WaitGroup{}
	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		wg.Add(2)
		go func() {
			var j int32
			for j = 0; j < Num; j++ {
				structA.n += j
			}
			wg.Done()
		}()
		go func() {
			var j int32
			for j = 0; j < Num; j++ {
				structB.n += j
			}
			wg.Done()
		}()
		wg.Wait()
	}
}

运行benchmark go test -bench=. 得到以下结果 image.png 可以看到我们只在结构体中加入了一个64字节的元素性能就得到了极大的提高,这是为什么呢?

我们看下Simple这个函数的代码,假设structA线程运行在core1上,structB线程运行在core2s上,假设structA线程先执行,它会将structA这个变量与structB一起加载到core1的L1的同一cacheline中,structB线程也会将structA这个变量与structB一起加载到core2的L1的同一cacheline,structA线程修改了structA的值,它会将这个事件同步到core2上,导致core2上cacheline失效,这时就需要从低一层存储中加载最新数据,然后structB又修改了structB,又导致了cacheline失效,循环往复导致了运行效率极低。

而SimplePad这个函数中structA中加入了cachelinesize个字节,使得structA和structB处于不同的cacheline上,也就避免了上面的问题。

2.1 题外话

  1. 关于多核间同步缓存我没有查到特别好的文章,所以我就不妄加解释了,如果你想深入研究的话可以搜索这个关键词:MESI协议
  2. 上面的实验代码来自于【译】CPU 高速缓存原理和应用。最初在我在做这个实验时,写的实验代码是这样的: var a int32 var pad [64]byte{} var b int32 ...

运行benchmark后发现运行时间并没有缩短,后来获取了a、pad、b的地址后才发现go将pad这个变量分配到了堆上,a和b两个变量在内存上还是紧挨着的,你做实验的话可以吸收这个经验:如果加上pad后发现运行时间没有缩短的话确认下a和b是不是真的被分隔到了两个cacheline上。

参考资料

阅读全文 »
   第 1 页    下一页 »
© 2021 Oosten Studio