15 并发控制

[TOC]

并发控制

通过并发控制解决并发下数据的一致性、步调的协同。

  • Go提供的并发控制组件有sync包下的WaitGroup、互斥锁、读写锁、条件变量cond、once、池Pool,还有通道chan。

为什么要控制并发

数据的安全

当多个协程操作同一个数据未能正确按照顺序执行,就会发生资源竞争。

  • add()函数中操作 num 数据自增,开启1000个协程执行add()函数,最后主协程读取num最终的结果,会发现num最后并不是预期的1000,而是一个不太确定的数值,它可能是933,也可能是966,也有可能是1000,总之它有极大可能会是预料之外的,这是因为num++不是一个原子性的操作。

  • 资源竞争案例:

package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup
var num int = 0
func add(){
	defer wg.Done()
	num ++
}
func main() {
	count := 1000
	wg.Add(count)
	for i:=0;i<count;i++ {
		go add()
	}
	wg.Wait()
	fmt.Printf("main主协程:num结果=%d\n",num)
}

资源竞争检查工具 race

Go 1.1后提供了资源竞争检查工具race。

  • race 指令检测程序中有资源竞争的数据,打印检测到的数据的冲突的协程栈。

  • 使用指令 go run -race xxx.go 检测xxx.go中存在资源竞争的数据。

  • 资源竞争检测消耗的内存可能是5倍以上,执行时间会增加2倍以上,资源竞争检测会为每个defer和recover语句额外分配8字节,在协程退出前这些额外分配的内存不会被回收,如果一个长期运行的协程并定期有调用defer和recove,则这个内存占用会不断增长,这些内存分配还不会显示到runtime.ReadMemStats或runtime/pprof的输出中。

  • 对【资源竞争案例】进行资源竞争检查,最终检查到num数据存在资源竞争。

原子性

在运行环境中,不可分割、不可中断的操作就是原子性的操作。

  • 一个操作看起来是原子性的,但在操作系统上下文环境中就可能不是原子性的,在操作系统上下文环境中是原子性的但是在机器环境中就可能不是原子性的。

num++ 不是原子性的,它可以分为三步:

  • 检索num当前的值

  • 增加num的值

  • 存储num的值

  • 这三个步骤的操作都是原子的,但三个步骤结合就不是了。

  • 因此,第100协程执行num++,先读取了num的值为100并将num+1,还未存储num的值,第101协程也读取了num的值为100也进行了num+1,这时,100协程和101协程存储的值都是101,这时num数据就不再准确了。

内存访问同步

  • 如果一个操作不是原子性的,而又会有并发操作,这就需要对数据进行同步,需要并发控制。

协程同步 sync

sync包提供了对低级别内存访问同步的原语。

  • 协程同步是为了保证数据一致性。协同步调,规划顺序。

WaitGroup

不关心并发操作的结果,或者有别的渠道收集协程的的结果时,WaitGroup是等待一组并发操作完成的好方法。

使用并发控制的一种手段实现协程完成任务后唤醒主协程main

  • sync.WaitGroup.Done() 方法表示完成1个协程

  • sync.WaitGroup.Add(int) 增加协程数量int,当达到这个数量的协程Done()时才把wait()的协程唤醒

  • sync.WaitGroup.Wait() 进入阻塞

互斥锁和读写锁

当一个数据会被争夺时,为保证数据的一致性可以对这片数据加锁,但是加锁前必须找到这片数据的临界区。

  • Mutex 互斥锁,拿到锁后才能访问数据,没有拿到锁的协程阻塞等待。

  • RWMutex 读写锁,写独占,读共享。写锁优先级更高。

Mutex 互斥锁

Mutex 互斥锁

  • 互斥锁是保护程序中临界区的一种方式,临界区是程序中需要独占访问共享资源的区域。Mutex提供了一种安全的方式来表示对这些共享资源的独占访问。

  • 定义:var lock sync.Mutex;加锁:lock.Lock();解锁:lock.Unlock()

  • 协程执行加锁 lock.Lock() 之后会独占这个资源,后来的协程执行到这个位置都会阻塞,直到独占者执行lock.Unlock() 解锁,阻塞的协程会竞争这个资源,只有一个协程会再次获得独占资源的权利。

互斥锁案例:

  • 当我们知道 num ++ 操作不是原子的,并且要确保安全的数据就是num时,就可以确定操作num数据的地方就是临界区,在操作这个数据之前加锁,对数据操作完成之后解锁。

  • 如果 num++ 之后还有别的操作,但是并不存在数据安全的问题就应该在num++之后执行 lock.Unlock() ,以减少临界区的范围。

  • 将 lock.Unlock() 注册进 defer 的好处是。如果操作数据发生panic,解锁动作总会发生,不至于让这里成为死锁。

RWMutex 读写锁

RWMutex 读写锁

  • 读写锁是 写独占,读共享,写优先级更高。在读多写少的情况下读写锁更适用。

  • 写拿到资源其余写和读都阻塞。读拿到资源其余读依然可读,但是写阻塞。

  • var rwLock sync.RWMutex 创建读写锁 RLock()函数以读模式加锁,Runlock()解锁。写模式是Lock()和Unlock()

读写锁案例:

  • 案例的输出结果不会是固定的,因为协程先后顺序执行权总是随机的。

  • 两个操作切片list的函数,读切片长度的函数read() 开启读锁,切片增加元素的函数writ()开启写锁 。

  • 最终输出结果,耗时往往都是2000毫秒左右,并且读操作都是成片出现,这是因为读操作下其它协程依然可以进入读但是写是被阻塞的,而写操作下读是阻塞的,因此往往写操作获得一次执行权其它写都会阻塞,而读操作只要获得一次执行权则所有的读都会蜂拥而入把所有读都一次执行完成。2000毫秒的耗时都是读操作中time.Sleep(1 * time.Second)导致的,因为读锁时其它读也能进入所以往往很难得到5000毫秒以上的结果。

互斥锁的实现

互斥锁是通过原子操作(atomic.CompareAndSwapInt32())构建的

  • 互斥锁的频繁自旋问题:当一个协程长时间霸占锁,其它协程抢占锁就毫无意义地消耗CPU资源,或者可能有一个协程始终抢不到锁。

  • 操作系统的锁接口提供了终止与唤醒机制,避免了频繁自旋造成的浪费,操作系统内部构建锁的等待队列,之后会依次唤醒等待的线程,但是操作系统的锁接口是锁住整个线程,而Go是比线程更轻量的协程,Go实现的互斥锁是更轻量级的,在同一时刻只有一个协程获取到锁继续执行,其它的协程进入等待状态。

  • Go互斥锁是一种混合锁,实现方式包含了自旋锁,也参考了操作系统锁的实现,结合了原子操作、自旋、信号量、全局哈希表、等待队列、操作系统级别锁等技术。

  • 自旋锁用于多线程同步的一种锁,线程反复检查锁变量是否可用。线程在这一过程中保持执行,是一种忙等待。一旦获取了自旋锁,线程会一直保持该锁,直至显式释放自旋锁。自旋锁避免了进程上下文的调度开销,因此对于线程只会阻塞很短时间的场合是有效的。单核单线程的CPU不适于使用自旋锁,因为,在同一时间只有一个线程是处在运行状态,假设运行线程A发现无法获取锁,只能等待解锁,但因为A自身不挂起,所以那个持有锁的线程B没有办法进入运行状态,只能等到操作系统分给A的时间片用完,才能有机会被调度。这种情况下使用自旋锁的代价很高。

  • state 使用位图形式存储了当前锁的状态,是否为锁定状态、正在等待被唤醒的协程数量、两个和饥饿模式有关的标志。

  • Go 1.9以后使用了饥饿模式,释放锁时会唤醒最先申请的协程从而保证公平。

互斥锁state位图

互斥锁state位图

  • 调用Lock()方法互斥锁会先使用原子锁快速抢占锁,抢占成功则立刻返回,抢占失败不会立刻进入休眠状态,会继续调用lockSlow()方法继续尝试抢占一段时间,这样即使互斥锁频繁加锁与释放也能良好工作。

  • 锁只有在正常状态下才能进入自旋状态,程序运行在单核CPU上、逻辑处理器小于等于1、协程所在逻辑处理器的本地队列有其它协程待运行或自旋次数超过阈值都会立即停止自旋状态。

  • 属于正常模式的情况:已经被加锁处于正常模式下;协程获取该锁进入自旋的次数小于四次;当前机器CPU核数大于1;当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空。

如果长时间不能获取到锁,互斥锁就会使用信号量 sema 进行同步。

  • 锁进入信号量同步则信号量计数值减1,解锁操作进入信号量同步则信号量计数值加1.。

  • 当信号量计数值大于0时,就是有其它协程执行了解锁操作,这时加锁协程可以直接退出。

  • 当信号量计数值等于0时,当前加锁协程需要陷入休眠状态。

所有锁的信息会根据锁的哈希地址存储在全局semtable哈希表中,哈希结果相同的锁会在同一个桶中用一个双向链表存储。

  • 哈希桶中的锁是存储在treap树结构中,treap树是引入了随机数的二叉树,通过引入随机数和必要时的旋转保证了较好的平衡性。

  • 将哈希桶中的锁使用二叉树结构存放的目的是为了能够加快查找速度,如果查找到存在的锁则将协程添加到等待队列尾部,不存在锁则向二叉树增加一个新的锁元素,这个过程也要保持同步,在自旋一定次数以后没有获取到锁就会调用操作系统级别的锁。

  • 协程放置到全局等待队列中等待被唤醒,唤醒的顺序是从前到后,遵循先入先出的准则,保证了公平性。协程长时间不能获取锁,锁会进入饥饿模式,饥饿模式下新申请锁的协程不进入自旋状态直接到等待队列末尾。

锁存储与协程队列

锁存储与协程队列

读写锁的实现

  • 读写锁的实现是基于互斥锁的,读写锁复用了互斥锁和信号量两种机制。

条件变量 cond

条件变量本身不是锁,但经常结合锁使用。

  • 条件变量是结构体类型的变量。

  • Wait()函数,执行该函数立即阻塞等待并释放已掌握的锁(这是一个不可分割的原子操作,原子即计算机中最小执行单位),被唤醒时会在原地解除阻塞并重新抢夺互斥锁。

  • var cond sync.Cond , cond.L = new (sync.Mutex) 加解锁,cond.L.Lock(),cond.L.Unlock()。

  • Wait()方法,原地释放执行权进入等待,当被唤醒时会原地获得执行权,因此如果在此之前有L.Lock()加锁,在Wait()之后一定要L.Unlock()解锁。

  • Signal()方法,单发通知唤醒一个正在该条件变量上等待的go程。

  • Broadcast()方法,群发通知唤醒所有在该条件变量上等待的go程。

条件变量案例:

  • read() 函数读取切片中数据,writ() 函数向切片写入数据。

  • writ() 写一个数据就进入 Wait(),read() 读取出数据就唤醒writ()的协程。

once

once内部封装了一些sync原语,以确保在不同的协程上也只会调用一次Do(func)中的func方法处理传递进来的函数。

  • once.Do(func) 无论开启多少个协程,.Do() 都只会调用一次func,所以最终num的结果是1。

池 pool

池Pool是Pool模式的并发安全实现。

  • Pool模式是一种创建和提供可供使用的固定数量实例或Pool实例的方法。它通常用于约束创建资源珍贵的场景(如数据库连接)。Pool在并发情况下是安全的。

  • pool.Get() 方法,有可用的实例则返回实例,没有可用实例则调用pool.New() 方法创建一个实例。

  • pool.Put() 方法,把实例存入Pool中。

  • Pool可以用于对象缓存,通过提前加载获取引用到另一个对象所需的时间,以节省调用的时间,这在高吞吐量网络服务器中比较常见,服务器要快速响应请求。

  • 有些情况下要谨慎使用Pool:如果使用Pool中转化检索到所需内容的时间币重新实例化内容要花费的时间更多,则Pool不会有多大帮助。

Pool的注意事项

  • 实例化sync.Pool,使用New方法创建一个成员变量,在调用时是线程安全的。

  • 收到一个Get的实例时,不要对所接收的对象的状态做出任何假设。

  • 用完了从Pool取出的对象时,使用Put装回,否则Pool就会失去这个实例,通常情况下是注册到defer中完成的。

Pool案例:

  • 最后输出结果中pool.New()方法调用次数是不固定的,有可能2次、3次...,因为并发调用pool.Get()是安全的,pool.Get()无法获取到实例则调用New()创建,最后还会把获取的实例Put()存入到pool。

原子锁 sync/atomic

原子锁 sync/atomic 标准库,使用它提供的方法进行原子操作。

  • atomic 提供了对数据的保存、增加、交换等操作方法。

原子锁案例:

  • atomic.AddInt64(&num,1) 是原子的,对num进行+1,因为是原子操作所以可以保证数据的同步,每个协程对num+1,num最终一定 = 100。

锁的问题

使用锁控制并发,需要做到加锁和解锁,如果只加锁不正确解锁就会出现死锁。

  • 加锁而不解锁很多情况下并不是编写程序时能够检查出来,它也许只是一个意外,但它确实就是一个问题,会导致一片数据永久无法被访问造成程序的异常。

死锁

所有并发程序彼此等待,这样的情况下没有外界的干预就将永远无法恢复正常,永久的等待就是死锁。

  • 简单的死锁go编译器在编译时就可以检查出,但是程序中的死锁往往是意料之外是在运行时发生的。

死锁案例:

  • fatal error: all goroutines are asleep - deadlock!

  • 协程1 调用 test(lock1,lock2) 先 lock1 锁定,然后 time.Sleep() 休眠1秒。

  • 协程2 调用 test(lock2,lock1) 先 lock2 锁定,这时协程1大可能还在休眠,因此lock2还没有被协程1锁定,等协程2锁定了lock2也去休眠,协程1醒来时去获取lock2,此时lock2已经被协程2锁定,这时就已经是死锁了,协程1再也无法获取到lock2。

活锁

正在主动执行并发操作的程序,但这些操作无法正常推进程序的状态。

  • 就像,你在走廊上行走对面也走来一个人,你向左边走去试图主动让开道路,而她也有默契似的向她的右侧走去也试图避让,所以你又只好再向另一侧避让,她又做了同样的事,这个情形永远持续下去就是活锁。

活锁案例:

  • 案例中对避让次数做了限制,否则会一直无休止得尝试避让,看起来程序是正常的,实际却不是,它在无休止循环。

饥饿:同一资源锁滥用

在任何情况下,并发进程都无法获得执行工作所需的资源,饥饿的协程会抢夺资源不断消耗。

  • 饥饿是有一个或多个贪婪的并发协程不公平地阻止别的协程访问资源。

饥饿案例:

  • 以 time.Sleep() 模拟在工作,两个协程模拟相同的工作时间。

  • 贪婪的协程会抢占共享锁,以完成整个工作循环,而普通的协程只在需要时加锁,两种协程都做同样多的模拟工作,案例中,在工作同样多的时间内,贪婪的协程工作量是普通协程的两倍。

  • 如果两种工作都有同样大小的临界区的情况下,应该控制好临界区,而不是使用贪婪的方式不必要的扩大其持有共享锁的临界区。

总结

并发控制要点在于公开的资源的安全。

  • 当涉及并发的函数、方法、变量以及其它资源时,为竞争的资源增加详细的注释,说明是谁负责并发、如何解决这个问题的、谁负责同步。

Last updated