网络编程中的tips

Category 计算机网络
Posted on
View

Connection reset by peer

网络编程中某一端可能会产生Connection reset by peer的报错,这是因为收到了对端发送的RST包。RST包是在tcp异常关闭时发出的,产生的情形很多。 我在写cat-agent 也遇到了这个报错,经排查发现是客户端发送数据后没有读取服务端的响应而直接关闭了连接(之所以不读取就关闭是因为客户端为提高发送效率不care服务端返回,后来为避免大量的RST包选择服务端不回包来解决)

EPOLL_CLOEXEC

epoll_create1() flags中有个EPOLL_CLOEXEC,类似的还有open()函数的O_CLOEXEC,它们的用处在于:当父进程fork出子进程之后,子进程会继承父进程的文件描述符,当子进程执行exec系统调用之后保存文件描述符的变量就不存在了,这些继承过来的文件描述符将无法关闭。 xxx_CLOEXEC选项就可以标识当子进程执行exec系统调用之后就内核自动关闭继承过来的文件描述符。

syscall.Forklock

在go标准库和其他一些网络库创建socket通常会有如下的代码: syscall.ForkLock.RLock() s, err = socketFunc(family, sotype, proto) if err == nil { syscall.CloseOnExec(s) } syscall.ForkLock.RUnlock() syscall.CloseOnExec(s)与上一节的xxx_CLOEXEC选项作用的相同的。这里ForkLock的作用是保证fork操作时socket的创建与设置CloseOnExec原子性。

持续更新中

...

阅读全文 »

与世界分享我刚编的转发ntunnel_mysql.php的工具

Category 杂文
Tag 杂文
Posted on
View

背景

先说背景吧,我司目前是通过phpMyAdmin来操作mysql,phpMyAdmin每次都要输入账号密码,而且我们的数据库实例又非常多,每次登录还要先搜索一番。虽然搞了一些chrome插件来简化这个过程,但是个人还是不太满意,所以考虑使用桌面客户端来代替。 然而测试环境的mysql是无法直连的,虽然有些桌面客户端(比如navicat)提供了ssh隧道或者http代理来连接,但是我们开发机只开放了http server的端口,没有开放ssh(只能通过跳板机连接)。所以一些桌面客户端直连和通过ssh隧道来连接是不可能了,http代理方式倒是可以连接,但是貌似只有navicat提供了这个功能,而navicat是收费的。得,所有常规道路都不通了。 只能靠骚操作了

方案

1、内网穿透!把内网ssh服务暴露到公网去!

image.png 如上图 1. 首先开发机上agent进程同时连接代理服务器proxy server与本地ssh,做双向转发 2. 然后代理服务器proxy server监听来自我的mac上的桌面客户端的ssh隧道连接,做该连接和proxy server <=> agent这个tcp连接的双向转发 3. 这样mac上的桌面客户端就能建立到内网开发机的ssh隧道了 很好!我们现在就能通过ssh隧道连接到测试环境mysql了! 不过,过不了多久我们可能会遇到下面的情况↓↓↓

2、用http隧道代理tcp连接进而在80端口上提供ssh服务

先简单提下http隧道的原理吧: 1. 首先客户端会发放CONNECT请求给代理服务器,告诉代理服务器它要连接的目标ip+port 2. 代理服务器建立与目标ip+port的tcp连接然后返回给客户端HTTP 200 Connection Established报文 3. 这时候就建立起了连接,后续客户端到代理服务器的所有tcp报文都会被代理服务器转发到目标ip+port的tcp连接上 > 详情可以参考这篇文章:HTTP 隧道代理原理和实现 这个方案存在以下两个问题: 第一、80端口被nginx监听,而nginx不支持http tunnel,虽然有些插件可以实现,但是考虑到开发机许多同学都在使用,添加插件还是有风险的。 第二、与方案1同样的,偷开放ssh服务这件事就不能干🙈

3、做一个mysql server,解析客户端请求转发到http代理

先看图: image.png 在我的mac上起一个服务:hersql,“伪装”成mysql server,接收来自桌面客户端的连接,按照mysql协议从报文中解析出请求的dbsql,然后请求开发机上的navicat提供的代理工具:ntunnel_mysql.phpntunnel_mysql.php连接到db并执行sql,将结果返回给hersqlhersql将查询结果按照mysql协议包装成报文返回给桌面客户端。 完美!

hersql

在方案3中我实现了hersql并开源到了github: github.com/Orlion/hersql,供大家借鉴

...

阅读全文 »

golang sync.Pool分析

Category Golang
Tag Go
Posted on
View

如何使用就不讲了,网上很多文章

1. 结构

type Pool struct {
	noCopy noCopy // 用于保证pool不会被复制
	local     unsafe.Pointer // 实际类型是 [P]poolLocal
	localSize uintptr        // local的size
	victim     unsafe.Pointer // 在新一轮GC来临时接管local,用于减少GC之后冷启动之后的性能抖动
	victimSize uintptr        // 在新一轮GC来临时接管localSize
	New func() interface{} // 当pool中没有对象时会调用这个函数生成一个新的
}

type poolLocal struct {
	poolLocalInternal
	// 避免false sharing问题
	pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

type poolLocalInternal struct {
    // P的私有缓存区,使用时无需加锁,Put对象时优先放到这里
	private interface{}
	// 公共缓存区,本地P可以pushHead/popHead,其他P只能popTail
	shared  poolChain
}

// 双端队列
type poolChain struct {
	head *poolChainElt
	tail *poolChainElt
}

type poolChainElt struct {
	poolDequeue
	next, prev *poolChainElt
}

// 环形队列
type poolDequeue struct {
	headTail uint64 // 头尾指针,之所以用一个变量持有两个字段大概率是为了方便原子操作一次性修改两个值吧
	vals []eface // 容量从8开始,依次x2,上限为2 ^ 30
}

type eface struct {
	typ, val unsafe.Pointer
}

image.png

2. Get

2.1 主流程

func (p *Pool) Get() interface{} {
    // 当G与P绑定禁止抢占,返回P对应的poolLocal以及P的id
	l, pid := p.pin()
	x := l.private
	l.private = nil
	if x == nil {
	    // 如果private为空则从shared头部pop出一个
		x, _ = l.shared.popHead()
		if x == nil {
		    // 如果shared中也没有则尝试从其他P的shared尾部偷一个
			x = p.getSlow(pid)
		}
	}
	// 解除非抢占
	runtime_procUnpin()
	if x == nil && p.New != nil {
	    // 如果上面的步骤都没有取到则New个出来
		x = p.New()
	}
	return x
}

其中涉及到了一些函数,我们再看下具体实现

2.2 pin

pin的作用是将当前G与P绑定,禁止被抢占。那么为什么要禁止被抢占呢?原因是G被抢占后再恢复执行之后再绑定的可能就不是被抢占之前的P了

func (p *Pool) pin() (*poolLocal, int) {
    // 执行绑定并返回当前pid
	pid := runtime_procPin()
	s := atomic.LoadUintptr(&p.localSize) 
	l := p.local
	if uintptr(pid) < s { // pid<localSize说明已经完成了poolLocal的创建,可以取
		return indexLocal(l, pid), pid
	}
	// pid>=localSize说明poolLocal还没有创建或者用户通过runtime.GOMAXPROCS(X)增加了p的数量,需要先创建
	return p.pinSlow()
}

// 返回local[i]即当前P的poolLocal
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
	lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
	return (*poolLocal)(lp)
}

// runtime/proc.go
func procPin() int {
	_g_ := getg()
	mp := _g_.m
    // 完成禁止抢占
    // 调度器执行抢占g之前会canPreemptM(mp *m)判断是否可以执行抢占,而canPreemptM有一个条件为m.locks==0
	mp.locks++
	return int(mp.p.ptr().id)
}

2.2.1 pinSlow

pinSlow主要用来在poolLocal还未创建时创建新poolLocal

func (p *Pool) pinSlow() (*poolLocal, int) {
    // 解除禁止抢占
	runtime_procUnpin()
	// 上锁
	allPoolsMu.Lock()
	defer allPoolsMu.Unlock()
	// 禁止抢占
	pid := runtime_procPin()
	s := p.localSize
	l := p.local
	if uintptr(pid) < s {
	    // 上锁之前可能其他线程已经进入到pinSlow了,所以再判断一下
		return indexLocal(l, pid), pid
	}
	if p.local == nil {
	    // 说明local第一次初始化,需要将pool加到allPools中
		allPools = append(allPools, p)
	}
	// 获取p的数量
	size := runtime.GOMAXPROCS(0)
	// 创建local
	local := make([]poolLocal, size)
	atomic.StorePointer(&p.local, unsafe.Pointer(&local[0]))
	atomic.StoreUintptr(&p.localSize, uintptr(size))
	return &local[pid], pid
}

2.3 poolChain.popHead

我们再看下Get主流程中从shared中通过popHead从shared头部pop出一个对象的实现

func (c *poolChain) popHead() (interface{}, bool) {
	d := c.head
	for d != nil { // 从链表头开始遍历,d的type为*poolDequeue
		if val, ok := d.popHead(); ok {
			return val, ok
		}
		d = loadPoolChainElt(&d.prev)
	}
	return nil, false
}

2.3.1 poolDequeue.popHead

poolDequeue.popHead用来从环形队列头部pop出一个缓存对象

func (d *poolDequeue) popHead() (interface{}, bool) {
	var slot *eface
	for {
		ptrs := atomic.LoadUint64(&d.headTail)
		head, tail := d.unpack(ptrs)
		if tail == head {
		    // 如果头尾指针相等则队列为空
			return nil, false
		}

        // 通过不断重试来实现无锁编程
        // 尝试将head-1然后修改poolDequeue.headTail
		head--
		ptrs2 := d.pack(head, tail)
		if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
		    // 取到head对应的槽
			slot = &d.vals[head&uint32(len(d.vals)-1)]
			break
		}
	}

	val := *(*interface{})(unsafe.Pointer(slot))
	if val == dequeueNil(nil) {
		val = nil // 通过3.2.1 poolDequeue.pushHead分析,貌似val不可能为nil
	}
	
	// 清空槽
	*slot = eface{}
	return val, true
}

func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
    // dequeueBits=32
	const mask = 1<<dequeueBits - 1
	head = uint32((ptrs >> dequeueBits) & mask) // &mask为了将高32位清零
	tail = uint32(ptrs & mask)
	return
}

func (d *poolDequeue) pack(head, tail uint32) uint64 {
	const mask = 1<<dequeueBits - 1
	return (uint64(head) << dequeueBits) |
		uint64(tail&mask)
}

type dequeueNil *struct{}

2.4 getSlow

再看下主流程中的getSlow函数的实现,getSlow用于在当前P缓存中没有时从其他P的共享缓存区偷缓存对象

func (p *Pool) getSlow(pid int) interface{} {
	size := atomic.LoadUintptr(&p.localSize) 
	locals := p.local
	// 从其他P偷
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals, (pid+i+1)%int(size))
		// 从其他P的共享区的尾部偷
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	}

    // 如果没有偷到,则尝试victim cache。我们将在尝试从所有主缓存中偷取之后这样做,因为我们想让victim cache中的对象尽可能的老化
	size = atomic.LoadUintptr(&p.victimSize)
	if uintptr(pid) >= size {
		return nil
	}
	locals = p.victim
	l := indexLocal(locals, pid)
	if x := l.private; x != nil {
		l.private = nil
		return x
	}
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals, (pid+i)%int(size))
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	}

    // 走到这里说明victim cache中也没有对象
	// 将victim cache标记为空,下次就不用尝试victim cache了
	atomic.StoreUintptr(&p.victimSize, 0)

	return nil
}

2.4.1 poolChain.popTail

getSlow中会通过poolChain.popTail从双端队列尾部pop对象,看下具体是如何操作的

func (c *poolChain) popTail() (interface{}, bool) {
    // 先获取双端队列的尾节点,因为这时被偷的P与当前P在并行,所以需要通过原子操作获取
	d := loadPoolChainElt(&c.tail)
	if d == nil {
	    // 尾节点为空说明被偷的P的双端队列为空直接返回即可
		return nil, false
	}

	for {
	    // 在pop tail之前load下一个指针是非常重要的,通常,d可能暂时为空,但是如果
	    // 在pop之前next非nil并且pop失败,那么d永远为空,这是唯一可以安全的从链表
	    // 中删除d的方法
		d2 := loadPoolChainElt(&d.next)

		if val, ok := d.popTail(); ok {
			return val, ok
		}

		if d2 == nil {
		    // next为空遍历终止
			return nil, false
		}
		
		// 走到这里说明当前尾节点为空,并且有下一个节点
		// 此时当前尾节点不可能有新对象被push进来了,可以删除掉了
		// 尝试将环形队列的尾节点指针改成它的下一个节点
		if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
			// 走到这里说明赢得了race,清除prev指针以便gc能够收集空dequeue
			// 因此popHead不会在必要时再次备份
			storePoolChainElt(&d2.prev, nil)
		}
		d = d2
	}
}

要修改poolChain的空尾节点指针为尾节点的下一个节点必须同时满足下面两个条件(即删除当前尾节点) * 当前尾节点环形队列为空 * 当前尾节点必须有下一个节点

我们注意到golang中先获取了当前尾节点的next再popTail,这是为什么呢?如果先popTail再获取next有可能遇到这样的情况:

  1. d的队列为空popTail没有获取到数据
  2. 另外一个线程向d中push了n个对象,此时d不为空,并且生成了下一个节点
  3. 原子获取next,next不为空
  4. 误将还有缓存对象的d删除

2.4.1.1

func (d *poolDequeue) popTail() (interface{}, bool) {
	var slot *eface
	for {
		ptrs := atomic.LoadUint64(&d.headTail)
		head, tail := d.unpack(ptrs)
		if tail == head {
			// 说明队列为空
			return nil, false
		}

		ptrs2 := d.pack(head, tail+1)
		if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
			// 修改成功,这个solt就被我们占有了
			slot = &d.vals[tail&uint32(len(d.vals)-1)]
			break
		}
	}

    // 从槽中取值
	val := *(*interface{})(unsafe.Pointer(slot))
	if val == dequeueNil(nil) {
		val = nil
	}

	slot.val = nil
	atomic.StorePointer(&slot.typ, nil)

	return val, true
}

3. Put

3.1 主流程

// sync/pool.go
func (p *Pool) Put(x interface{}) {
	if x == nil {
		return
	}
	l, _ := p.pin() // 禁止G被抢占,并返回当前P对应的poolLocal
	if l.private == nil {
	    // 如果私有缓存为空则直接放到私有缓存区
		l.private = x
		x = nil
	}
	if x != nil {
	    // 如果私有缓存已经被占了,则放到共享缓存区头
		l.shared.pushHead(x)
	}
	runtime_procUnpin() // 解除禁止抢占
}

接下来我们看下新元素具体是如何放到共享缓冲区头部的

3.2 poolChain.pushHead

func (c *poolChain) pushHead(val interface{}) {
	d := c.head
	if d == nil {
	    // 头结点为空则需要进行初始化
		const initSize = 8 // 第一个节点的环形队列的长度为8
		d = new(poolChainElt)
		d.vals = make([]eface, initSize)
		// 其他P可能会从尾部偷对象,所以poolChain的tail需要用atomic set,保证对其他P可见
		c.head = d
		storePoolChainElt(&c.tail, d)
	}

    // push到头结点双端队列的头部
	if d.pushHead(val) {
		return
	}

	// 走到这里说明当前头节点的环形队列已经满了,所以申请一个新的节点
	// 新节点环形队列的长度为旧队列的两倍,但如果大于1 << 32 / 4则长度为1 << 32 / 4
	newSize := len(d.vals) * 2
	if newSize >= dequeueLimit {
		newSize = dequeueLimit
	}

	d2 := &poolChainElt{prev: d}
	d2.vals = make([]eface, newSize)
	// 修改双端队列的头节点为新创建的节点
	c.head = d2
	// 其他P可能会用到next所以需要原子store
	storePoolChainElt(&d.next, d2)
	d2.pushHead(val)
}

3.2.1 poolDequeue.pushHead

poolDequeue.pushHead用于将对象放到环形队列上

func (d *poolDequeue) pushHead(val interface{}) bool {
	ptrs := atomic.LoadUint64(&d.headTail)
	head, tail := d.unpack(ptrs)
	// dequeueBits = 32
	if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
		// 如果环形队列tail加上长度等于head,说明队列实际已经满了
		return false
	}
	// 找到head对应的槽,slot的类型为*eface
	slot := &d.vals[head&uint32(len(d.vals)-1)]

	typ := atomic.LoadPointer(&slot.typ)
	if typ != nil {
		return false
	}

	if val == nil {
		val = dequeueNil(nil) // 追了下代码调用链貌似val不可能为nil
	}
	// 将val放到槽上
	*(*interface{})(unsafe.Pointer(slot)) = val

	// 增加头指针
	atomic.AddUint64(&d.headTail, 1<<dequeueBits)
	return true
}

4. GC

上面的流程中我们清楚了对象是如何被缓存已经如何被写入和获取的,但是缓存池容量不是无限的,何时清理呢?答案是GC时。

sync/pool.go中有个init函数,在这个函数中注册了GC时如何清理Pool的函数

func init() {
    // 编译器会将poolCleanup赋值给runtime/mgc.go文件中`poolcleanup`变量
    // 在runtime.clearpools()函数中会调用poolcleanup,而在gcStart函数中在开始标记之前会调用clearpools()
	runtime_registerPoolCleanup(poolCleanup)
}

func poolCleanup() {
	for _, p := range oldPools {
	    // 先清除所有旧pool中的victim
	    // 之后gc就能标记清理旧pool中缓存的对象了
		p.victim = nil
		p.victimSize = 0
	}

	for _, p := range allPools {
	    // 用victim接管pool
		p.victim = p.local
		p.victimSize = p.localSize
		p.local = nil
		p.localSize = 0
	}

	oldPools, allPools = allPools, nil
}

当时看到这里时还有一点疑惑,poolCleanup为什么不写成下面这样:

func poolCleanup() {
	for _, p := range allPools {
	    // 用victim接管pool
		p.victim = p.local
		p.victimSize = p.localSize
		p.local = nil
		p.localSize = 0
	}
}

看起来也能清理缓存队列。但实际有个非常浅而已见的坑,那就是allPools这个切片一直在增长,必须将allPools设置为nil清理下才行,所以就必须引入oldPools。

5. 总结

sync.Pool为每个P搞一个缓存队列,避免所有线程共用同一个队列引发的锁竞争问题。

5.1 Put流程

  1. push到双端队列的头部的环形队列头部,如果环形队列已满则创建一个新的环形队列
  2. 将环形队列作为双端队列的新头部

5.2 Get流程

  1. 先从当前P缓冲区的私有缓存取
  2. 如果私有缓存没有从共享缓存区的双端队列的环形队列的头部pop
  3. 还没获取到则从其他P的共享缓存区的双端队列的环形队列的尾部pop
  4. 还没获取到则从victim cache中取

5.3 总结

总的来说只要清楚了sync.Pool的数据结构基本都理解的大差不差了,还是很简单的。

...

阅读全文 »

bitcask的设计与实现

Category 存储
Tag LSM
Posted on
View

背景

最近在研究LSM tree,听闻bitcask在LSM tree各种各样的应用中是一个比较简单的实现,所以就以它为突破口,了解下LSM tree真实世界的实现。

bitcask存储模型由Riak提出,github上有各种语言的实现,本人挑选了一个golang版本的实现来进行研究,源码地址是:git.mills.io/prologic/bitcask,学习过程中我添加了一些注释,有需要的同学可以参考下:github.com/Orlion/bitcask

存储模型

与LSM tree的基本思想一样,bitcask中所有增删改操作都是追加写磁盘中的datafile,其数据结构如图: image.png

实际文件中是没有换行的,每个entry都是与前一个entry紧密串联在一起的,这里只是为了体现出来一个一个的entry。

datafile由一个一个的entry组成,每个entry的前4个字节存储key的size,第二个8字节存储value的size,然后顺序写入key和value,再写入校验和和过期时间的unix时间戳

datafile写入完成后可以得到新写入项的offset,然后将该key对应的offset与写入的数据项的size写入到内存的索引中,prologic/bitcask索引使用了artAdaptive Radix Tree(自适应前缀树)作为索引的数据结构,虽然不如hash表查找速度快,但因为是树状结构所以可以支持范围查找。

当datafile写入到一定的大小时会创建一个新的可读可写的datafile,在此之后的新数据会写入到这个新datafile中,老datafile会被设置为只读。因此一个bitcask实例会有多个datafile,所以索引中还必须存储key所在文件的id。

数据结构

image.png

  • path指向bitcask的工作目录,即各种文件的存放目录
  • curr指向当前可读写的datafile,数据都写入到该文件中
  • datafiles 为bitcask持有的所有datafile map,其中key为文件id
  • trie和ttlIndex指向内存中的索引树
  • isMerging标记当前是否在进行Merge

删除/修改key

上面提到bitcask中删除修改数据也是顺序写磁盘,那么写入的是什么样的数据呢?

实际上,bitcask中修改数据与写入数据是同一个api,即都是Put(key, value []byte),所以修改key也是往datafile中追加写一个新entry,不同是会修改索引中key的指向为最新数据项在文件中的位置。

而删除数据其实就是put(key, []byte{})即向datafile写入空字节切片,写完之后会删除索引中的key。

查找key

get(key)时会先从内存的索引树中根据key找到key所在的文件id和offset以及size,然后通过mmap到对应datafile文件中offset处拉取entry,然后根据前12个字节处的key len与value len数据指示解析出key和value,检查下校验和,自此数据检索完成。

Merge过程

由于bitcask中增删改都是追加写文件,不可避免的磁盘占用会越来越多,所以需要在合适的时机执行merge操作,将old entry和deleted entry从磁盘中清理掉。

bitcask Merge的过程如下:

  1. 加写锁,判断当前是否有其他线程在执行merge,如果有则退出,如果没有则标记isMerging为true,解锁继续执行
  2. 加读锁,禁止数据写入,但是可以读
  3. 当前datafile刷盘,然后关闭当前datafile,将当前dafafile创建为只读datafile加到bitcask实例的datafile列表中,再创建一个新的当前可读写datafile,新的当前文件不执行merge
  4. 释放读锁
  5. 在工作目录下创建merge子目录,以merge目录为工作目录创建merge用的bitcask实例:mdb
  6. 遍历当前bitcask实例索引中的所有key,如果k在要merge的datafiles列表中的话则将k/v写入到mdb中,完成后关闭mdb
  7. 加写锁,禁止读写
  8. 关闭当前bitcask实例
  9. 删除当前工作目录中的所有文件
  10. 通过rename将mdb工作目录中的所有文件挪到当前工作目录下
  11. 重新打开实例
  12. isMerging标记为false,释放写锁,此时可进行读写

索引持久化

上文提到索引是存储在内存中的,这样的话进程重启后索引就需要重新构建,如果数据量多的话,可想而知进程启动得多慢。

所以bitcask中会在以下几个时机将内存中的索引持久化到磁盘中:

  1. bitcask实例关闭时
  2. 创建新的datafile之后

索引持久化流程

  1. 在工作目录中创建临时索引文件temp_index
  2. 遍历art索引树将节点的 k/item 写入到文件中
  3. temp_index文件rename为index

索引文件的使用

创建bitcask实例时,会检查工作目录下的索引文件,如果有索引文件,会将索引文件加载到内存中生成art索引树,然后判断索引文件是否是最新的,即索引文件生成后有没有新数据写入,如果不是最新的还需要从最新的datafile中读取数据到索引中。

如果没有索引文件则会遍历所有的datafile,遍历所有数据来构建索引。

总结

可以看到bitcask的实现还是非常简单(lou)的。put(k, v)加了全局锁,锁粒度较粗,并发读写性能应该不是很强。merge的过程要遍历所有的datafile,还要创建新文件,所以对系统的IO压力应该也比较大。

...

阅读全文 »

keynote中插入高亮代码的方法

Category 杂文
Tag 杂文
Posted on
View

直接复制vscode中的代码到keynote中效果惨不忍睹,经过一番搜索,发现了一种效果很好的方法,而且keynote中可编辑

1. 安装highlight

$ brew install highlight

2. 复制要插入的代码

3. 执行以下命令

$ pbpaste | highlight --syntax=go --style=github -k "Fira Code" -K 36 -u "utf-8" -t 4 -O rtf | pbcopy
  • –syntax 指定代码语法格式
  • -u 指定编码,否则中文会乱码
  • –style 指定高亮的样式
  • -K 指定代码的字大小

实际效果

image.png

...

阅读全文 »

brk与mmap

Posted on
View

1. 前言

glibc的malloc函数在申请大于128K的内存时使用mmap分配内存,mmap会从堆区和栈区中间的部分划分内存,而在申请小于128K的内存时使用brk从堆上划分内存。

2. brk/sbrk

brk是linux上一个系统调用,而sbrk是一个C库函数

2.1 brk函数原型

int brk(void *addr);

参数

参数 解释
addr 要调整到的内存地址

返回值

返回增加的大小

2.2 sbrk函数原型

void *sbrk(intptr_t increment);

参数

参数 解释
increment 增加的内存大小

返回值

返回增加之后的program break内存地址

2.3 brk的原理

brk实际是通过改变program break来实现的,这个program break可以理解为“堆顶指针”,意味着程序堆内存使用到了该位置。

image.png

而这种内存的分配方式有个问题,看下下图:

image.png

假设B已经被free了,但是由于上面有一个C对象,所以program break指针不能简单的向下移动来释放内存。那怎么办呢?

实际上free后不能立即归还内存,只是将这块内存标记为空闲,后续再申请内存时可以复用这块内存。并且两块连续的空闲内存可以合并为一块空闲内存,当最高地址空间的空闲内存大于128K时(可以通过M_TRIM_THRESHOLD选项调节),执行内存紧缩。对于上图来说,如果C也被free了,program break就可以指向A的结束地址了,就能释放一部分内存了。

难道这就是传说中的线性内存分配

3. mmap

3.1 mmap函数原型

void *mmap(void *addr, size_t len, int prot, int flags, int fd, off_t offset);

参数

参数 解释
addr 映射的起始地址,通常设置为NULL,由内核来分配
len 指定将文件映射到内存的部分的长度
prot 映射区域的保护方式,通常是下面几个选项的组合
flags 映射区的特性标志位,常用的有以下两个选项
fd 要映射到内存中的文件描述符
offset 文件映射的偏移量,必须是操作系统内存页大小的整数倍

返回值

返回映射区的起始地址

3.2 munmap函数原型

// 解除映射
int munmap(void *start, size_t length);

参数

参数 解释
start mmap返回的映射区的起始地址
length 映射区的大小

返回值

解除成功返回0,失败返回-1

3.3 mmap原理

linxu内核使用vm_area_struct结构来表示一个独立的虚拟内存区域(比如堆、栈、bss段、代码段等),一个进程会有多个vm_area_struct结构体,各个vm_area_struct使用树结构或者链表连接。

vm_area_struct结构包含了该区域的起止地址和其他信息以及一个vm_ops指针,vm_ops指针内部引出所有针对这个区域可用的系统调用函数。

mmap就是创建一个新vm_area_struct结构,并将其与文件磁盘地址做映射。

image.png

mmap申请的内存可以通过munmap释放。

4. 总结

方式 内存碎片 适用场景
brk 多,因为不可释放 申请小内存
mmap 无,因为可以直接释放 申请大内存,如果用来申请小内存的话就会创建非常多的vm_area_struct结构体,不划算

5. 参考资料

...

阅读全文 »

事务隔离级别实现原理

Category Mysql
Tag Mysql
Posted on
View

1. 前言

数据库隔离级别以及Mysql实操 一文中,我描述了为了解决并发事务间的冲突,实现事务的隔离性,SQL标椎定义了四种隔离级别,今天就通过这篇文章来看下SQL标准中每种隔离级别的实现原理以及InnoDB引擎又是如何实现的。

2. 标准SQL事务隔离级别实现原理

解决并发问题最直觉的方法就是加锁了,而标准SQL事务隔离级别的实现就是依赖于锁的。

隔离级别 实现
未提交读 事务对当前读取到的数据不加锁;事务在更新的瞬间对其加行级共享锁(读锁),直到事务结束才释放。 更新时加共享锁,会阻塞其他事务的更新,但是不会阻塞读。 由于在更新时没有加排他锁(写锁)并且其他事务读的时候也没有尝试加锁,导致其他事务是可以读到修改的,即脏读。
提交读 事务对当前读到的数据加行级共享锁,一旦读完该行就释放锁;事务在更新的瞬间对其加行级排他锁(写锁),直到事务结束才释放。 由于更新时加了排他锁,所以当前事务提交前,其他事务是读不到修改的,这就解决了脏读。 由于读完数据后就释放了锁,所以之后另外一个事务还能修改该行,修改后再读到就是修改之后的数据,这就造成一个事务内读取两次读到的数据是不同的了,即不可重复读。
可重复读 事务开始读取时,对其加行级共享锁,事务结束后才释放;事务在更新的瞬间对其加行级排他锁(写锁),直到事务结束才释放。 由于直到事务结束后才释放读锁,所以在事务结束前,其他事务无法修改该行,所以一个事务多次读取到的数据肯定是相同的,就不会存在不可重复读的问题了。 但是这个隔离级别下,由于只能锁住已存在的行,对insert进来的新数据,还是能读到的,即幻读。
串行化 事务在读取时,加表级共享锁,事务结束后才释放;事务在修改数据时,加表级排他锁。 这个级别下由于加了表锁,所以事务提交前就写不进来新数据,就不存在幻读的问题了。

3. MVCC(Multi-Version Concurrency Control)

通过锁虽然能实现事务间的隔离,但是开销还是太大了,系统性能肯定是扛不起高并发的,为了优化这个问题,尽量避免使用锁,提出了MVCC方式来解决事务并发问题。

3.1 InnoDB的MVCC实现

MVCC在InnoDB中是通过两个隐式字段undo logRead View实现的。

3.1.1 隐式字段

InnoDB会在每一行加上两个隐式字段:

  • DB_TRX_ID: 6bytes,最近修改事务的ID,记录这行记录最后一次修改的事务的ID
  • DB_ROLL_PTR: 7bytes,回滚指针,指向这条记录的上一个版本(存储于rollback segment中)

实际上还有两个字段,但是与MVCC无关。

  • DB_ROW_ID: 隐藏的自增ID(隐藏主键),如果没有主键,则InnoDB会自动以DB_ROW_ID产生一个聚簇索引
  • 一个隐藏的删除flag字段

image.png

3.1.2 undo log

undo log分为两种:

  • insert undo log: 事务在insert时产生,事务提交后可以立即丢弃
  • update undo log:事务在update/delete时产生,不仅在回滚时需要,快照读时也需要,不能随便删除,只有在快照读或者事务不涉及的时候才由purge线程去清除。

purge:为了实现MVCC,删除只是设置下记录的deleted_bit,并不真正删除,InnoDB 有专门的purge线程来回收标记删除的记录,为了不影响MVCC的工作,purge线程也维护一个自己的read view,如果某个记录的DB_TRX_ID相对于purge线程read view可见,那么这条记录就能被安全的删除。

执行流程如下:

1> 比如数据库中当前有一条记录:

name age DB_ROW_ID DB_TRX_ID DB_ROLL_PTR
n1 11 1 1 null

2> 新来一个事务 2修改了记录:update name=n2 where age = 11,流程如下:

  • 事务1修改改行记录时,InnoDB先对改行加排他锁
  • 把当前记录拷贝到undo log中,作为旧记录
  • 拷贝完了后修改name为n2,并且修改记录的DB_TRX_ID为当前事务的id,即:2。DR_ROLL_ID指向undo log中的旧记录,即它的上一个版本
  • 事务提交后,释放锁

image.png

3> 又来一个事务 3修改记录:update name=n3 where age=11,流程如下:

  • 事务1修改改行记录时,InnoDB先对改行加排他锁
  • 把当前记录拷贝到undo log中,作为旧记录,由于该行记录已经有undo log了,那么最新的旧记录作为链表头,插在undo log最前面
  • 拷贝完了后修改name为n3,并且修改记录的DB_TRX_ID为当前事务的id,即:3。DR_ROLL_ID指向undo log中的旧记录,即它的上一个版本
  • 事务提交后,释放锁

image.png

3.1.3 ReadView 读视图

ReadView中有四个比较重要的内容:

  • creator_trx_id: 表示生成该ReadView的事务ID。 (只有在执行insert、update、delete时才会分配事务ID,在一个只读的事务中事务id默认为0)
  • m_ids: 在生成ReadView时所有活跃的事务id集合,活跃事务是指开启还未提交的事务。
  • min_trx_id: m_ids最小值。
  • max_trx_id: 生成ReadView时系统应该分配的下一个事务ID,并非m_ids最大值。

有了这个ReadView,就可以这样判断一条记录是否对该事务可见:

  • 如果被访问版本的trx_id等于creator_trx_id,说明生成该版本的事务就是当前事务,所以可见
  • 如果被访问版本的trx_id小于min_trx_id,说明生成该版本的事务在当前事务生成ReadView前已提交,所以该版本可见
  • 如果被访问版本的trx_id大于等于max_trx_id,表示生成该版本的事务在当前事务生成ReadView之后才开启,所以不可见
  • 如果被访问版本trx_id在min_trx_id与max_trx_id之间,则判断是否在m_ids之中,如果在,说明创建ReadView时生成该版本的事务还活跃,所以不可见;如果不在m_ids中,则说明事务已提交所以可见。

如果某个版本的记录不可见就顺着版本链寻找下一个版本,依次判断是否可见,直到遍历到最后。

3.1.4 MVCC的实现

现在我们已经了解了undo log与ReadView,那么就来看下MVCC到底是如何实操的。

我们假设当前数据结构如下:

image.png

假设 事务20 与 事务30 并发执行,那么对于事务20,它的ReadView中m_ids=[20,30],min_trx_id=20,max_trx_id=31,creator_trx_id=20,对于事务30,它的ReadView 中m_ids=[20,30],min_trx_id=20,max_trx_id=31,creator_trx_id=30

如果此时 事务20 去读取数据,当前版本链中,数据最新版本的DB_TRX_ID为10,它小于 事务20 ReadView的min_trx_id,所以这个版本对 事务20 是可见的。

接着 事务30 修改了这行记录,数据结构就变成了下面这样:

image.png

这时 事务 20 再去读这行记录,当前版本链中,数据最新版本的DB_TRX_ID为30,30在 事务20 的m_ids中,所以这个版本数据对 事务20 不可见,继续顺着版本链读上一个版本,上一个版本DB_TRX_ID为10,可见,所以 事务20 就读到了 上一个版本的数据。

4. 几个概念

在了解InnoDB四种隔离级别的实现之前,我们先明确几个概念

4.1 锁定读和一致性非锁定读

  • 锁定读:在一个事务中主动给读加锁,eg. select … for update(排他锁)、select … lock in share mode(共享锁)
  • 一致性非锁定度:InnoDB通过MVCC向事务提供数据库某个时间点的快照,查询时只能查到当前事务开始前提交的修改,查不到该事务开始之后的修改。就是说事务开始后,事务看到的数据就是事务开始时的数据,后续其他事务的修改在当前事务不可见。

一致性非锁定读是InnoDB在RC和RR两个级别处理SELECT的默认模式,这个过程不用加锁,所以其他事务可以并发修改和读取。

4.2 当前读和快照读

  • 当前读:像update、delete、insert、select … for update、select … lock in share mode,读到的都是当前版本数据,读取时要保证其他并发事务不能修改当前记录,还要加锁
  • 快照读:读到的是快照版本,不加锁的select就是快照读,不加锁。前提是隔离级别不是未提交读和串行化,因为未提交读所有读都是当前读,串行化会对表加锁。

4.3 隐式锁定与显示锁定

  • 隐式锁定 InnoDB在事务执行过程中采用两阶段锁协议,InnoDB根据隔离级别在需要的时候自动加锁,直到事务提交或回滚之后才释放锁,所有的锁都在同一时刻释放。

  • 显示锁定 通过特定的语句显式锁定:

    select ... for update
    select ... lock in share mode
    

5. InnoDB隔离级别实现

InnoDB中,RC与RR两个隔离级别生成ReadView时机是不同的 * RC - 每次读取记录前都生成一个ReadView,而这就导致不可重复读问题 * RR - 在第一次读取时生成一个ReadView,这就解决了可重复读问题

事务隔离级别 实现
未提交读 事务对读都不加锁,都是当前读; 事务在更新的瞬间对其加行级共享锁(读锁),直到事务结束才释放。
提交读 事务对读不加锁,都是快照读;事务在更新的瞬间对其加行级排他锁(写锁),直到事务结束才释放。
可重复读 事务读不加锁,都是快照读;事务在更新时,加Next-Key Lock直到事务结束才释放
串行化读 事务在读取时,加表级共享锁,直到事务结束才释放,都是当前读;写入时加表级排他锁,直到事务结束才释放

我们再思考两个问题:

5.1 RC级别就是快照读了,那还存在不可重复读的问题吗?

答案是仍然存在,原因是InnoDB在这个级别每次读取记录前都生成一个ReadView。

5.2 很多文章提到InnoDB在RR级别就通过MVCC解决了幻读问题,真的吗?

我们先运行一个例子:

事务A 事务B
begin;
select * from users;

Empty set (0.00 sec)
begin;
insert into users(name,age) values('n1', 1);
commit;
select * from users;

Empty set (0.00 sec)

OK,看起来是解决了,这个例子中事务B的ID>=事务A的ReadView的max_trx_id,所以事务B写入的数据对事务A是不可见的。

不过先别着急下结论,再看下下面的这个例子:

事务A 事务B
begin;
select * from users;

Empty set (0.00 sec)
begin;
insert into users(name,age) values(‘n1’, 1);
commit;
update users set name=‘n2’ where id=1;
select * from users;

+—-+——+——+
| id | name | age |
+—-+——+——+
| 1 | n2 | 1 |
+—-+——+——+
1 row in set (0.00 sec)

这个例子中第二次查询给查出来了,原因在于update是当前读,执行update后生成了一个新的快照,而这个快照对事务A是可见的,所以给查出来了。

如果想第二次select查询结果跟第一次一致,还依赖间隙锁(Gap Lock),事务A的第一个

select * from users;

要显式加锁,即:

select * from users lock in share mode;

这样事务B在执行insert语句时会被阻塞住直到事务A提交。

那么什么是间隙锁呢?

5.3 Gap Lock

举个例子,age字段有普通索引,对于如下sql:

update users set name='n3' where age = 30;

不止会锁住30这一行记录,而且还会锁住两侧的区间(10,30]和(30,positive infinity)

( 表示包括这个, [ 表示不包括这个,间隙锁遵循前开后闭原则,就是说update … age=10,insert age=30的话是不会撞到锁的。

image.png

注意,如果age没有索引,那么会给所有行上一个Gap Lock!但是如果age为唯一索引,就只锁一行了。

5.4 Next-Key Lock

Record Lock与Gap Lock的结合,既锁住行也锁住索引之间的间隙。

参考资料

...

阅读全文 »

Golang切片与实现原理

Category Golang
Tag Go
Posted on
View

本文Golang版本为1.13.4

Slice底层结构

go中切片实际是一个结构体,它位于runtime包的slice.go文件中

type slice struct {
	array unsafe.Pointer
	len   int
	cap   int
}

array是切片用来存储数据的底层数组的指针,len为切片中元素的数量,cap为切片的容量即数组的长度

切片的初始化

创建一个切片有以下几种方式

1. 通过字面量创建

arr1 := [3]int{1,2,3} // 创建一个数组
s1 := []int{1,2,3} // 创建一个len为3,cap为3的切片

上面的创建方式非常容易与数组的另一个创建方式弄混

arr2 := [...]int{1,2,3} // 创建一个数组,数组长度由编译器推断

s1在内存上的结构如下图: image.png

2. 通过make()函数创建

s1 := make([]int, 10) // 创建一个长度为10,容量为10的切片
s2 := make([]int, 5, 10) // 创建一个长度为5,容量为5的切片

s2的内存结构如图: image.png

3. 通过数组/切片创建另一个切片

通过数组/切片创建另一个切片语法为

slice[i:j:k]

其中i表示开始切的位置,包括该位置,如果没有则表示从0开始切;j表示切到的位置,不包括该位置,如果没有j则切到最后;k控制切片的容量切到的位置,不包括该位置,如果没有则切到尾部。下面举几个例子说明:

a := [10]int{0,1,2,3,4,5,6,7,8,9}
s1 := a[2:5:9] // s1结果为[2,3,4], len:3, cap:7
s2 := a[2:5:10] // s2结果为[2,3,4] len:3, cap:8
s3 := a[2:7:10] // s3结果为[2,3,4,5,6] len:5, cap:8
s4 := a[2:] // s4结果为[2,3,4,5,6,7,8,9] len:8, cap:8
s5 := a[:3] // s5结果为[0,1,2] len:3, cap:10
s6 := a[::3] // 编译报错: middle index required in 3-index slice
s7 := a[:] s7结果为[0,1,2,3,4,5,6,7,8,9], len:10, cap:10
s10 := s1[1:3] s10结果为[3,4], len:2, cap: 6。注意s10的cap是6,而不是7!

s1与s10在内存上的结构如图: image.png

由于as1s2s3s4s5s7共享同一个数组,所以其中任意一个变量通过索引修改了底层数组元素的值,相当于修改了以上所有变量:

s2[3] = 30

执行上面的代码后:变量a变成了[0,1,2,30,4,5,6,7,8,9]、s1变成了[2,30,4]、…… s7变成了[0,1,2,30,4,5,6,7,8,9]

nil切片与空切片

var s11 []int
var s12 = make([]int, 0)

上面的s11为nil,s12是空切片,他们在内存上的结构如图: image.png

我写了段代码验证了下:

var s10 = make([]int, 0)
sh10 := (*reflect.SliceHeader)(unsafe.Pointer(&s10))
println(unsafe.Pointer(sh10.Data))
var s11 []int
sh11 := (*reflect.SliceHeader)(unsafe.Pointer(&s11))
println(unsafe.Pointer(sh11.Data))
var s12 = make([]int, 0)
sh12 := (*reflect.SliceHeader)(unsafe.Pointer(&s12))
println(unsafe.Pointer(sh12.Data))
var s13 = make([]int, 0)
sh13 := (*reflect.SliceHeader)(unsafe.Pointer(&s13))
println(unsafe.Pointer(sh13.Data))

打印结果如下:

0xc00006af08
0x0
0xc00006af08
0xc00006af08

根据打印结果可以看到是上面的结构无误

切片创建源码

我们打印下下面代码对应的汇编,看下golang是如何为我们创建出来一个切片的

func main() {
	tttttt := make([]int, 999)
	fmt.Println(tttttt)
}

通过go tool compile -S -l slice.go打印对应汇编(-l是禁止内联),下面只摘取关键部分

"".main STEXT size=181 args=0x0 locals=0x48
	...
	// 栈增加72个字节
        0x0013 00019 (slice.go:5)       SUBQ    $72, SP
	// 将当前栈底地址加载到到当前栈顶地址+64处
        0x0017 00023 (slice.go:5)       MOVQ    BP, 64(SP)
	// 栈底修改为栈顶地址+64
        0x001c 00028 (slice.go:5)       LEAQ    64(SP), BP
        ...
        0x0021 00033 (slice.go:6)       LEAQ    type.int(SB), AX
        ...
	// 下面三行实际是把tuntime.makeslice放到栈上的指定位置
        0x0028 00040 (slice.go:6)       MOVQ    AX, (SP)
        0x002c 00044 (slice.go:6)       MOVQ    $999, 8(SP)
        0x0035 00053 (slice.go:6)       MOVQ    $999, 16(SP)

上面的部分画个图可能更清晰些: image.png

继续看汇编:

	// 调用runtime.makeslice函数
        0x003e 00062 (slice.go:6)       CALL    runtime.makeslice(SB)
        ...
	// 将返回值加载到AX寄存器
        0x0043 00067 (slice.go:6)       MOVQ    24(SP), AX
        ...
	// 下面就是调用fmt.Println函数的代码了
        0x0048 00072 (slice.go:7)       MOVQ    AX, (SP)
        0x004c 00076 (slice.go:7)       MOVQ    $999, 8(SP)
        0x0055 00085 (slice.go:7)       MOVQ    $999, 16(SP)
        0x005e 00094 (slice.go:7)       CALL    runtime.convTslice(SB)
        ...
        0x0063 00099 (slice.go:7)       MOVQ    24(SP), AX
        ...
        0x0068 00104 (slice.go:7)       XORPS   X0, X0
        0x006b 00107 (slice.go:7)       MOVUPS  X0, ""..autotmp_1+48(SP)
        ...
        0x0070 00112 (slice.go:7)       LEAQ    type.[]int(SB), CX
        ...
        0x0077 00119 (slice.go:7)       MOVQ    CX, ""..autotmp_1+48(SP)
        ...
        0x007c 00124 (slice.go:7)       MOVQ    AX, ""..autotmp_1+56(SP)
        ...
        0x0081 00129 (slice.go:7)       LEAQ    ""..autotmp_1+48(SP), AX
        ...
        0x0086 00134 (slice.go:7)       MOVQ    AX, (SP)
        0x008a 00138 (slice.go:7)       MOVQ    $1, 8(SP)
        0x0093 00147 (slice.go:7)       MOVQ    $1, 16(SP)
        0x009c 00156 (slice.go:7)       CALL    fmt.Println(SB)
        0x00a1 00161 (slice.go:8)       MOVQ    64(SP), BP
        0x00a6 00166 (slice.go:8)       ADDQ    $72, SP
        0x00aa 00170 (slice.go:8)       RET
        0x00ab 00171 (slice.go:8)       NOP
        ...
        0x00ab 00171 (slice.go:5)       CALL    runtime.morestack_noctxt(SB)
        ...
        0x00b0 00176 (slice.go:5)       JMP     0

上面出现了一个关键函数,即runtime.makeslice,(在堆上分配时才会调用这个函数)我们看下它的实现:

func makeslice(et *_type, len, cap int) unsafe.Pointer {
	// 这里实际是计算切片所占的内存大小,即元素的大小乘容量
	// mem为所需内存大小,overflow标识是否溢出
	mem, overflow := math.MulUintptr(et.size, uintptr(cap))
	if overflow || mem > maxAlloc || len < 0 || len > cap {
		// 如果溢出或者所需内存大于最大可分配内存或者len、cap不合法则报错
		mem, overflow := math.MulUintptr(et.size, uintptr(len))
		if overflow || mem > maxAlloc || len < 0 {
			panicmakeslicelen()
		}
		panicmakeslicecap()
	}
	// 调用mallocgc从go内存管理器获取一块内存
	return mallocgc(mem, et, true)
}

函数传参

切片作为函数参数传参时实际上是复制了一个runtime.slice结构体,而非是传递的runtime.slice结构体指针,举个栗子:

func main() {
	slice := []int{0,1,2}
	foo(slice)
}
func foo(slice []int) {
	...
}

其实就等价于

type Slice struct {
	ptr *[3]int
        len int
	cap int
}

func main() {
	slice := Slice{&[3]int{1,2,3}, 0, 0}
	foo(slice)
}
func foo(slice Slice) {
	...
}

因为函数的形参与实参共享同一个数组,这就导致当把一个切片作为参数传递到另一个函数时,在函数内修改形参的某个下标的值时也会修改了实参。描述的比较绕,下面看一个实例:

func main() {
	param := []int{0, 1, 2}
	foo(param)
	fmt.Println(param)
}

func foo(arg []int) {
	arg[1] = 10
}

打印结果为[0,10,2],原因是param与arg共享同一个底层数组,函数foo内修改了arg[1]实际是将两者的底层数组下标为1的元素修改为了10,所以main函数中的param[1]也就变成了10。 在foo函数内修改arg的len字段,是不会影响到param的len的,下面我们验证下:

func main() {
	param := []int{0, 1, 2}
	foo(param)
	fmt.Println(param)
	fmt.Println(len(param))
}

func foo(arg []int) {
	arg[1] = 10
	argSlice := (*reflect.SliceHeader)(unsafe.Pointer(&arg))
	argSlice.Len = 10
	fmt.Println(len(arg))
}

打印结果如下:

10
[0 10 2]
3

验证成功。

切片扩容

当通过append函数向切片中添加数据时,如果切片的容量不足,需要进行扩容,实际调用的是runtime包中的growslice()函数

// runtime/slice.go
func growslice(et *_type, old slice, cap int) slice {
	...

	// 下面就是计算新容量的部分了
	newcap := old.cap
	doublecap := newcap + newcap
	if cap > doublecap {
		// 如果所需容量大于当前容量的两倍,则新容量为所需容量
		newcap = cap
	} else {
		// 下面是所需容量<=当前容量两倍的逻辑
		if old.len < 1024 {
			// 如果当前长度<1024则新容量为当前容量x2
			newcap = doublecap
		} else {
			// 下面是当前长度>=1024的逻辑
			// 新容量每次增加自身的1/4,直到超过所需容量
			for 0 < newcap && newcap < cap {
				newcap += newcap / 4
			}
			// 如果溢出则新容量为所需容量
			if newcap <= 0 {
				newcap = cap
			}
		}
	}

	// 此处省略分配内存的代码
	...

	// p为新分配的底层数组的地址
	// 从old.array处拷贝lenmem个字节到p
	memmove(p, old.array, lenmem)
	// 返回新的切片
	return slice{p, old.len, newcap}
}

...

阅读全文 »

Go源码解析之sync.Mutex锁

Category Golang
Tag Go
Posted on
View

本文使用Golang版本为:go1.13.4

Mutex的使用

先通过一段简单代码看下Go中Mutex的用法

func main() {
	a := 1
	m := sync.Mutex{}
	go func(){
		m.Lock()
		b := a
		a = b + 1
		m.Unlock()
	}()

	m.Lock()
	fmt.Println(a)
	m.Unlock()
}

Mutex的设计

在解释Lock()和Unlock()源码之前我们必须先整体了解下Mutex的设计,不然下面的源码很难看懂。

我们首先看下sync.Mutex这个结构体

type Mutex struct {
	state int32 // 锁的当前状态,共三种
	sema  uint32 // 信号量,用于阻塞和唤醒goroutine
}

锁的三个状态,它们使用Mutex.state的低三位来标识

mutexLocked = 1 << iota // 锁定状态,二进制表示即 ...001
mutexWoken // 唤醒状态,二进制表示即 ...010
mutexStarving // 饥饿状态,二进制表示即...100

mutexLocked位于state的第一位,mutexWoken位于state的第二位,mutexStarving位于state的第三位,如下图: image.png

Mutex锁有两种模式:正常模式和饥饿模式。正常模式时,waiter按照先到先得的方式获取锁,一个waiter被唤醒后并不能直接获取到锁,它需要与新到的goroutine抢占锁,但是新到的goroutine已经在CPU上运行了,所以它大概率抢不过新到的goroutine,如果抢不到锁waiter就需要在等待队列队头继续等待,而这可能会导致一个waiter等待很长时间。为了避免waiter等待过久,当waiter超过1ms没有抢到锁时就会将当前锁切换到饥饿模式。

切换到饥饿模式后,锁将从解锁的goroutine切换到等待队列的队头waiter,新来的goroutine不会去尝试获取锁,也不会自旋,它们会排到等待队列的队尾。

如果某waiter获取到了锁,那么在满足以下两个条件之一时,它会将当前锁从饥饿模式切换到正常模式。

  1. 它是最后一个waiter
  2. 它等待锁的时间不到1ms

了解了Mutex的设计后我们再继续看Lock()与Unlock()的实现。

加锁Lock()的实现

func (m *Mutex) Lock() {
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		// 这里本有竞争检测的代码,无意义,已被我删除
		return
	}
	m.lockSlow()
}

函数中首先通过CAS操作尝试获得锁,如果m.state为0即当前锁闲置就将它设置为1,如果尝试失败则进入m.lockSlow()

m.lockSlow()的实现

m.lockSlow()中用到了这几个函数:runtime_canSpin()runtime_doSpin()runtime_SemacquireMutex(),我们先挨个解释下这几个函数的作用再看m.lockSlow()的源码。

runtime_canSpin()

该函数的作用是判断能够进入自旋,下面看下源码

// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool { // i是当前自旋次数
	if i >= 4|| ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
		return false
	}
	if p := getg().m.p.ptr(); !runqempty(p) {
		return false
	}
	return true
}

通过这个函数我们可以看到,runtime层判断能够自旋必须满足以下几个条件

  • 当前自旋次数不能>=4
  • 必须是多核CPU
  • 至少有一个其他正在运行的P
  • 当前P本地G队列为空

这里解释下gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1这个条件: gomaxprocs是进程中P数量上限,sched.npidle是空闲的P的数量、sched.nmspinning是自旋中的M的数量gomaxprocs - sched.npidle - sched.nmspinning=当前运行中的P的数量,当前运行中的P数量-1(当前P) = 其他P的数量,所以这个条件就是至少有一个其他正在运行的P。

runtime_doSpin()

其源码为:

//go:linkname sync_runtime_doSpin sync.runtime_doSpin
//go:nosplit
func sync_runtime_doSpin() {
	procyield(30)
}

这里我们仅看下AMD64平台上proyield的实现:

TEXT runtime·procyield(SB),NOSPLIT,$0-0
	MOVL	cycles+0(FP), AX // 将第一个参数即30加载到AX寄存器
again:
	PAUSE // CPU空转,达到占用CPU的效果
	SUBL	$1, AX // AX寄存器-1
	JNZ	again // 如果不为0则继续执行PAUSE指令,否则退出
	RET

到这里可以看出runtime_doSpin()实际就是CPU空转30次。

runtime_SemacquireMutex()

其实现位于runtime包的sema.go文件中

//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
	semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}

semacquire1的实现并非本文重点,这里大概解释下这个函数的作用:

  1. 如果lifo为true,则加到等待队列队头
  2. 如果lifo为false,则加到等待队列队尾
m.lockSlow()

了解了上面几个函数后我们来看下m.lockSlow()中是怎么处理的吧

func (m *Mutex) lockSlow() {
	var waitStartTime int64
	starving := false // 饥饿模式标志
	awoke := false // 唤醒标志
	iter := 0 // 已进行的自旋次数
	old := m.state // 保存当前锁状态
	for {
		// 进入自旋需要满足三个条件
		// 1. 当前锁状态是锁定状态,如果不是锁定状态就退出自旋尝试获取锁
		// 2. 当前不是饥饿状态,原因是饥饿状态时自旋无意义,因为锁会交给等待队列中的第一个waiter
		// 3. runtime_canSpin判断能够自旋
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				// 如果没有唤醒 且 当前锁状态不在唤醒状态
				// 且 当前有等待者则尝试通过CAS将锁状态标记为唤醒
				// 标记为唤醒后,Unlock()中就不会通过信号量唤醒其他锁定的goroutine了
				// 如果CAS成功则标识自己为唤醒
				awoke = true
			}
			// CPU空转30次
			runtime_doSpin()
			// 自旋次数+1
			iter++
			// 更新当前锁状态
			old = m.state
			// 继续尝试自旋
			continue
		}

		// 如果判断不能进入自旋则进入以下逻辑
		// 进到这里有三种情况:
		// 1. 当前已解锁,锁处于正常状态
		// 2. 当前已解锁,锁处于饥饿状态
		// 3. 当前未解锁,锁处于正常状态
		// 4. 当前未解锁,锁处于饥饿状态

		// old是锁的当前状态,new是期望状态,在下面会尝试将锁通过CAS更新为期望状态
		new := old
		if old&mutexStarving == 0 {
			// 如果当前锁是正常状态则尝试获取锁
			new |= mutexLocked
		}
		if old&(mutexLocked|mutexStarving) != 0 {
			// 等待数+1
			// 如果锁当前处于饥饿状态,当前goroutine不能获取锁,需要进到等待队列队尾排队等待,所以等待数需要+1
			// 如果当前锁处于锁定状态,也需要进到等待队列等待
			new += 1 << mutexWaiterShift
		}
		if starving && old&mutexLocked != 0 {
			// 如果当前处于饥饿模式并且锁定状态
			// 则尝试设置为饥饿状态
			new |= mutexStarving
		}
		if awoke {
			if new&mutexWoken == 0 {
				// 如果当前goroutine抢到了唤醒,但是唤醒标志还为0说明出现了异常情况
				throw("sync: inconsistent mutex state")
			}
			// 如果在自旋时当前goroutine抢到唤醒了,则尝试将锁标记为未唤醒
			new &^= mutexWoken
		}
		// 尝试将锁状态由旧状态修改为期望状态
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			// 修改成功
			// 如果旧状态既不是锁定状态也不是饥饿状态
			// 说明了抢到了锁,则退出循环
			if old&(mutexLocked|mutexStarving) == 0 {
				break
			}
			
			queueLifo := waitStartTime != 0
			if waitStartTime == 0 {
				// 记录等待开始时间
				waitStartTime = runtime_nanotime()
			}
			// 通过信号量阻塞当前goroutine
			// 如果waitStartTime为0,则说明当前goroutine是一个新来的goroutine,那么queueLifo=false,意味加到队尾。
			// 如果waitStartTime不为0,意味当前goroutine是一个被唤醒的goroutine,那么queueLifo=true,意味着加到队头
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			// 如果等待时间超过了1ms则切换到饥饿模式
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			// 更新当前锁状态
			old = m.state
			// 如果当前锁处于饥饿状态
			if old&mutexStarving != 0 {
				// 如果当前锁处于锁定状态或者唤醒状态或者没有waiter,异常
				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				}
				// 因为当前goroutine已经获取了锁,delta用于将等待队列-1
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				// 如果当前不是锁定模式或者只有一个waiter
				// 就通过delta -= mutexStarving和atomic.AddInt32操作将锁的饥饿状态位设置为0,表示为正常模式
				if !starving || old>>mutexWaiterShift == 1 {
					delta -= mutexStarving
				}
				atomic.AddInt32(&m.state, delta)
				break
			}
			awoke = true
			iter = 0
		} else {
			old = m.state
		}
	}
}

同样的,我已将无关代码和注释删除。

解锁Unlock()的实现

func (m *Mutex) Unlock() {
        // 将锁定状态置为0
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
	    // 如果锁上存在等待者或者处于饥饿模式则进入unlockSlow()
		m.unlockSlow(new)
	}
}

Unlock()本身非常简单,下面重点关注下unlockSlow()的实现

func (m *Mutex) unlockSlow(new int32) {
	if (new+mutexLocked)&mutexLocked == 0 {
		// 如果解锁一个未锁定的锁则抛出异常
		throw("sync: unlock of unlocked mutex")
	}
	if new&mutexStarving == 0 {
		// 处于正常模式
		old := new
		for {
			// 如果没有等待者则无需唤醒任何goroutine,另外以下三种情况也无需唤醒
			// 1. 锁处于锁定状态,说明Unlock()解锁后紧接着就被其他goroutine获取,就不用再唤醒了
			// 2. 锁处于唤醒状态,说明有等待的goroutine已经被唤醒了,不用再尝试唤醒了
			// 3. 锁处于饥饿模式,锁会交给等待队列队头的等待者,不能往下进行
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				
				return
			}
			// 流程走到这里说明当前有等待者并且锁处于空闲状态(三个标志位都为0)
			// 说明等待者还没有被唤醒,需要唤醒等待者
			// 通过CAS将等待者数量-1,并且设置为唤醒
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				// 通过信号量唤醒等待者goroutine,然后退出
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			// CAS修改失败,说明锁的状态已经被修改,有以下几种可能性:
			// 1. 有新的等待者进来
			// 2. 锁被其他goroutine获取(Unlokc()中已经解锁了,走到这里可能已经被其他goroutine)
			// 3. 锁进入了饥饿模式
	
			// 更新锁状态,进入到下一个循环
			old = m.state
		}
	} else {
		// 处于饥饿模式则直接通过信号量唤醒等待队列头的goroutine
		// 此时state的mutexLocked还没有加锁,唤醒的goroutine会持有锁
		// 在此期间,如果有新的goroutine来请求锁, 因为mutex处于饥饿状态,不会抢占锁
		runtime_Semrelease(&m.sema, true, 1)
	}
}

后言

Mutex虽然代码简单,但由于并行的原因导致case太多,所以还是不太好理解了,建议大家代入到具体的场景中去分析。

...

阅读全文 »

深入理解原子操作的本质

Category Golang
Tag Go
Posted on
View

引言

本文以go1.14 darwin/amd64中的原子操作为例,探究原子操作的汇编实现,引出LOCK指令前缀可见性MESI协议Store BufferInvalid Queue内存屏障,通过对CPU体系结构的探究,从而理解以上概念,并在最终给出一些事实。

Go中的原子操作

我们以atomic.CompareAndSwapInt32为例,它的函数原型是:

func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

对应的汇编代码为:

// sync/atomic/asm.s 24行
TEXT ·CompareAndSwapInt32(SB),NOSPLIT,$0
	JMP	runtime∕internal∕atomic·Cas(SB)

通过跳转指令JMP跳转到了runtime∕internal∕atomic·Cas(SB),由于架构的不同对应的汇编代码也不同,我们看下amd64平台对应的代码:

// runtime/internal/atomic/asm_amd64.s 17行
TEXT runtime∕internal∕atomic·Cas(SB),NOSPLIT,$0-17
	MOVQ	ptr+0(FP), BX // 将函数第一个实参即addr加载到BX寄存器
	MOVL	old+8(FP), AX // 将函数第二个实参即old加载到AX寄存器
	MOVL	new+12(FP), CX // // 将函数第一个实参即new加载到CX寄存器
	LOCK // 本文关键指令,下面会详述
	CMPXCHGL	CX, 0(BX) // 把AX寄存器中的内容(即old)与BX寄存器中地址数据(即addr)指向的数据做比较如果相等则把第一个操作数即CX中的数据(即new)赋值给第二个操作数
	SETEQ	ret+16(FP) // SETEQ与CMPXCHGL配合使用,在这里如果CMPXCHGL比较结果相等则设置本函数返回值为1,否则为0(16(FP)是返回值即swapped的地址)
	RET // 函数返回

从上面代码中可以看到本文的关键:LOCK。它实际是一个指令前缀,它后面必须跟read-modify-write指令,比如:ADD, ADC, AND, BTC, BTR, BTS, CMPXCHG, CMPXCH8B, CMPXCHG16B, DEC, INC, NEG, NOT, OR, SBB, SUB, XOR, XADD, XCHG

LOCK实现原理

在早期CPU上LOCK指令会锁总线,即其他核心不能再通过总线与内存通讯,从而实现该核心对内存的独占。

这种做法虽然解决了问题但是性能太差,所以在Intel P6 CPU(P6是一个架构,并非具体CPU)引入一个优化:如果数据已经缓存在CPU cache中,则锁缓存,否则还是锁总线。

Cache Coherency

CPU Cache与False Sharing 一文中详细介绍了CPU缓存的结构,CPU缓存带来了一致性问题,举个简单的例子:

// 假设CPU0执行了该函数
var a int = 0
go func fnInCpu0() {
    time.Sleep(1 * time.Second)
    a = 1 // 2. 在CPU1加载完a之后CPU0仅修改了自己核心上的cache但是没有同步给CPU1
}()
// CPU1执行了该函数
go func fnInCpu1() {
    fmt.Println(a) // 1. CPU1将a加载到自己的cache,此时a=0
    time.Sleep(3 * time.Second)
    fmt.Println(a) // 3. CPU1从cache中读到a=0,但此时a已经被CPU0修改为0了
}()

上例中由于CPU没有保证缓存的一致性,导致了两个核心之间的同一数据不可见从而程序出现了问题,所以CPU必须保证缓存的一致性,下面将介绍CPU是如何通过MESI协议做到缓存一致的。

MESI是以下四种cacheline状态的简称:

  • M(Modified):此状态为该cacheline被该核心修改,并且保证不会在其他核心的cacheline上
  • E(Exclusive):标识该cacheline被该核心独占,其他核心上没有该行的副本。该核心可直接修改该行而不用通知其他核心。
  • S(Share):该cacheline存在于多个核心上,但是没有修改,当前核心不能直接修改,修改该行必须与其他核心协商。
  • I(Invaild):该cacheline无效,cacheline的初始状态,说明要么不在缓存中,要么内容已过时。

核心之间协商通信需要以下消息机制:

  • Read: CPU发起数据读取请求,请求中包含数据的地址
  • Read Response: Read消息的响应,该消息有可能是内存响应的,有可能是其他核心响应的(即该地址存在于其他核心上cacheline中,且状态为Modified,这时必须返回最新数据)
  • Invalidate: 核心通知其他核心将它们自己核心上对应的cacheline置为Invalid
  • Invalidate ACK: 其他核心对Invalidate通知的响应,将对应cacheline置为Invalid之后发出该确认消息
  • Read Invalidate: 相当于Read消息+Invalidate消息,即当前核心要读取数据并修改该数据。
  • Write Back: 写回,即将Modified的数据写回到低一级存储器中,写回会尽可能地推迟内存更新,只有当替换算法要驱逐更新过的块时才写回到低一级存储器中。

手画状态转移图

image.png

这里有个存疑的地方:CPU从内存中读到数据I状态是转移到S还是E,查资料时两种说法都有。个人认为应该是E,因为这样另外一个核心要加载副本时只需要去当前核心上取就行了不需要读内存,性能会更高些,如果你有不同看法欢迎在评论区交流。

一些规律

  1. CPU在修改cacheline时要求其他持有该cacheline副本的核心失效,并通过Invalidate ACK来接收反馈
  2. cacheline为M意味着内存上的数据不是最新的,最新的数据在该cacheline上
  3. 数据在cacheline时,如果状态为E,则直接修改;如果状态为S则需要广播Invalidate消息,收到Invalidate ACK后修改状态为M;如果状态为I(包括cache miss)则需要发出Read Invalidate

Store Buffer

当CPU要修改一个S状态的数据时需要发出Invalidate消息并等待ACK才写数据,这个过程显然是一个同步过程,但这对于对计算速度要求极高的CPU来说显然是不可接受的,必须对此优化。 因此我们考虑在CPU与cache之间加一个buffer,CPU可以先将数据写入到这个buffer中并发出消息,然后它就可以去做其他事了,待消息响应后再从buffer写入到cache中。但这有个明显的逻辑漏洞,考虑下这段代码:

a = 1
b = a + 1

假设a初始值为0,然后CPU执行a=1,数据被写入Store Buffer还没有落地就紧接着执行了b=a+1,这时由于a还没有修改落地,因此CPU读到的还是0,最终计算出来b=1。

为了解决这个明显的逻辑漏洞,又提出了Store Forwarding:CPU可以把Buffer读出来传递(forwarding)给下面的读取操作,而不用去cache中读。 image.png

这倒是解决了上面的漏洞,但是还存在另外一个问题,我们看下面这段代码:

a = 0
flag = false
func runInCpu0() {
    a = 1
    flag = true
}

func runInCpu1() {
    while (!flag) {
   	continue
    }
    print(a)
}

对于上面的代码我们假设有如下执行步骤:

  1. 假定当前a存在于cpu1的cache中,flag存在于cpu0的cache中,状态均为E。
  2. cpu1先执行while(!flag),由于flag不存在于它的cache中,所以它发出Read flag消息
  3. cpu0执行a=1,它的cache中没有a,因此它将a=1写入Store Buffer,并发出Invalidate a消息
  4. cpu0执行flag=true,由于flag存在于它的cache中并且状态为E,所以将flag=true直接写入到cache,状态修改为M
  5. cpu0接收到Read flag消息,将cache中的flag=true发回给cpu1,状态修改为S
  6. cpu1收到cpu0的Read Response:flat=true,结束while(!flag)循环
  7. cpu1打印a,由于此时a存在于它的cache中a=0,所以打印出来了0
  8. cpu1此时收到Invalidate a消息,将cacheline状态修改为I,但为时已晚
  9. cpu0收到Invalidate ACK,将Store Buffer中的数据a=1刷到cache中

从代码角度看,我们的代码好像变成了

func runInCpu0() {
    flag = true
    a = 1
}

好像是被重新排序了,这其实是一种 伪重排序,必须提出新的办法来解决上面的问题

写屏障

CPU从软件层面提供了 写屏障(write memory barrier) 指令来解决上面的问题,linux将CPU写屏障封装为smp_wmb()函数。写屏障解决上面问题的方法是先将当前Store Buffer中的数据刷到cache后再执行屏障后面的写入操作。

SMP: Symmetrical Multi-Processing,即多处理器。

这里你可能好奇上面的问题是硬件问题,CPU为什么不从硬件上自己解决问题而要求软件开发者通过指令来避免呢?其实很好回答:CPU不能为了这一个方面的问题而抛弃Store Buffer带来的巨大性能提升,就像CPU不能因为分支预测错误会损耗性能增加功耗而放弃分支预测一样。

还是以上面的代码为例,前提保持不变,这时我们加入写屏障:

a = 0
flag = false
func runInCpu0() {
    a = 1
    smp_wmb()
    flag = true
}

func runInCpu1() {
    while (!flag) {
   	continue
    }
    print(a)
}

当cpu0执行flag=true时,由于Store Buffer中有a=1还没有刷到cache上,所以会先将a=1刷到cache之后再执行flag=true,当cpu1读到flag=true时,a也就=1了。

有文章指出CPU还有一种实现写屏障的方法:CPU将当前store buffer中的条目打标,然后将屏障后的“写入操作”也写到Store Buffer中,cpu继续干其他的事,当被打标的条目全部刷到cache中,之后再刷后面的条目。

Invalid Queue

上文通过写屏障解决了伪重排序的问题后,还要思考另一个问题,那就是Store Buffer size是有限的,当Store Buffer满了之后CPU还是要卡住等待Invalidate ACK。Invalidate ACK耗时的主要原因是CPU需要先将自己cacheline状态修改I后才响应ACK,如果一个CPU很繁忙或者处于S状态的副本特别多,可能所有CPU都在等它的ACK。

CPU优化这个问题的方式是搞一个Invalid Queue,CPU先将Invalidate消息放到这个队列中,接着就响应Invalidate ACK。然而这又带来了新的问题,还是以上面的代码为例

a = 0
flag = false
func runInCpu0() {
    a = 1
    smp_wmb()
    flag = true
}

func runInCpu1() {
    while (!flag) {
   	continue
    }
    print(a)
}

我们假设a在CPU0和CPU1中,且状态均为S,flag由CPU0独占

  1. CPU0执行a=1,因为a状态为S,所以它将a=1写入Store Buffer,并发出Invalidate a消息
  2. CPU1执行while(!flag),由于其cache中没有flag,所以它发出Read flag消息
  3. CPU1收到CPU0的Invalidate a消息,并将此消息写入了Invalid Queue,接着就响应了Invlidate ACK
  4. CPU0收到CPU1的Invalidate ACK后将a=1刷到cache中,并将其状态修改为了M
  5. CPU0执行到smp_wmb(),由于Store Buffer此时为空所以就往下执行了
  6. CPU0执行flag=true,因为flag状态为E,所以它直接将flag=true写入到cache,状态被修改为了M
  7. CPU0收到了Read flag消息,因为它cache中有flag,因此它响应了Read Response,并将状态修改为S
  8. CPU1收到Read flag Response,此时flag=true,所以结束了while循环
  9. CPU1打印a,由于a存在于它的cache中且状态为S,所以直接将cache中的a打印出来了,此时a=0,这显然发生了错误。
  10. CPU1这时才处理Invalid Queue中的消息将a状态修改为I,但为时已晚

为了解决上面的问题,CPU提出了读屏障指令,linux将其封装为了smp_rwm()函数。放到我们的代码中就是这样:

...
func runInCpu1() {
    while (!flag) {
   	continue
    }
    smp_rwm()
    print(a)
}

当CPU执行到smp_rwm()时,会将Invalid Queue中的数据处理完成后再执行屏障后面的读取操作,这就解决了上面的问题了。

除了上面提到的读屏障和写屏障外,还有一种全屏障,它其实是读屏障和写屏障的综合体,兼具两种屏障的作用,在linux中它是smp_mb()函数。 文章开始提到的LOCK指令其实兼具了内存屏障的作用。

几个问题

问题1: CPU采用MESI协议实现缓存同步,为什么还要LOCK

答: 1. MESI协议只维护缓存一致性,与可见性有关,与原子性无关。一个非原子性的指令需要加上lock前缀才能保证原子性。

问题2: 一条汇编指令是原子性的吗

  1. read-modify-write 内存的指令不是原子性的,以INC mem_addr为例,我们假设数据已经缓存在了cache上,指令的执行需要先将数据从cache读到执行单元中,再执行+1,然后写回到cache。
  2. 对于没有对齐的内存,读取内存可能需要多次读取,这不是原子性的。(在某些CPU上读取未对齐的内存是不被允许的)
  3. 其他未知原因…

问题3: Go中的原子读

我们看一个读取8字节数据的例子,直接看golang atomic.LoadUint64()汇编:

// uint64 atomicload64(uint64 volatile* addr);
1. TEXT runtime∕internal∕atomic·Load64(SB), NOSPLIT, $0-12
2.	MOVL	ptr+0(FP), AX // 将第一个参数加载到AX寄存器
3.	TESTL	$7, AX // 判断内存是否对齐
4.	JZ	2(PC) // 跳到这条指令的下两条处,即跳转到第6行
5.	MOVL	0, AX // crash with nil ptr deref 引用0x0地址会触发错误
6.	MOVQ	(AX), M0 // 将内存地址指向的数据加载到M0寄存器
7.	MOVQ	M0, ret+4(FP) // 将M0寄存器中数据(即内存指向的位置)给返回值
8.	EMMS // 清除M0寄存器
9.	RET

第3行TESTL指令对两个操作数按位与,如果结果为0,则将ZF设置为1,否则为0。所以这一行其实是判断传进来的内存地址是不是8的整数倍。

第4行JZ指令判断如果ZF即零标志位为1则执行跳转到第二个操作数指定的位置,结合第三行就是如果传入的内存地址是8的整数倍,即内存已对齐,则跳转到第6行,否则继续往下执行。

关于内存对齐可以看下我这篇文章:理解内存对齐

虽然MOV指令是原子性的,但是汇编中貌似没有加入内存屏障,那Golang是怎么实现可见性的呢?我这里也并没有完全的理解,不过大概意思是Golang的atomic会保证顺序一致性,详情可看下这篇文章:Memory Order Guarantees in Go

问题4:Go中的原子写

仍然以写一个8字节数据的操作为例,直接看golang atomic.LoadUint64()汇编:

TEXT runtime∕internal∕atomic·Store64(SB), NOSPLIT, $0-16
	MOVQ	ptr+0(FP), BX
	MOVQ	val+8(FP), AX
	XCHGQ	AX, 0(BX)
	RET

虽然没有LOCK指令,但XCHGQ指令具有LOCK的效果,所以还是原子性而且可见的。

总结

这篇文章花费了我大量的时间与精力,主要原因是刚开始觉得原子性只是个小问题,但是随着不断的深入挖掘,翻阅无数资料,才发现底下潜藏了无数的坑。 s70KdH.png

由于精力原因本文还有一些很重要的点没有讲到,比如acquire/release 语义等等。

另外客观讲本文问题很多,较真的话可能会对您造成一定的困扰,建议您可以将本文作为您研究计算机底层架构的一个契机,自行研究这方面的技术。

参考资料

...

阅读全文 »