关于锁的总结(三)信号量
难点
个人感觉信号量主要难在应用场景上;
同等对比,互斥量主要用来上锁,条件变量来优化多任务等待
其中互斥量正常场景下,都应该(最好都是遵从这样方式)保持上锁、解锁是同一个task;
在《评论迁移》里面,分布式锁redis的set NX 过程中,参数是key、requireId、expiration,其中requireId(多task环境下唯一ID)就是为了保证上锁、解锁是同一个task
综合来看,好像不需要信号量这类东西的存在
《UNIX 网络编程 卷2:进程间通信》 一书上讲了互斥量、信号量、条件变量的区别
(1)互斥锁必须总是由给它上锁的的线程(任务)解锁,信号量的挂出却不必由执行过它的等待操作的同一线程执行。(这是个特性,实际测试中go语言中的互斥量其实是可以的)。
(2)互斥锁要么被锁住,要么被解开(二值状态,类似于二值信号量)。
(3)既然信号量有一个与之关联的状态(它的计数值),那么信号量挂出操作总是被记住。然后
PV操作
p操作
是递减或者加锁等待,对应sem_wait
V操作
是递增或者解锁,并唤醒等待池
中的task,sem_post
从两个原子操作上来看,可以了解到有个计数器,而计数器是临界资源的,所以在处理信号量的过程中,访问计数器需要额外添加一个二值信号量(或者采用mutex)
整体上面还是很绕的,举个栗子
桌子上三个盘子,父亲在有空盘子情况下,剥好一个橘子放上去;母亲会削好一个苹果放上去; 哥哥在盘子中有橘子情况下消耗一个橘子,姐姐则会消耗一个苹果;此过程会一直循环;
整体分析一下几类信号量、计数器
- 空盘子信号量 empty_sem
- 占用盘子信号量 used_sem
- 盘子中橘子数量 orange_cnt
- 盘子中苹果数量 apple_cnt
- 访问计数器时的二元信号量 cntermutexsem
对于父亲的操作 (母亲类似)
empty_sem.P() // 空盘子信号量递减或者加锁等待
cnter_mutex_sem.P()
orange_cnt++
cnter_mutex_sem.V()
used_sem.V() // 占用盘子信号量递增且唤醒消费者
对于哥哥的操作(姐姐类似)
cnter_mutex_sem.P() //锁的粒度需要加在外层
if orange_cnt > 0
orange_cnt--
empty_sem.V()
used_sem.P()
cnter_mutex_sem.V()
读写
不推荐使用信号量,尽量使用互斥量+条件变量处理同步互斥问题;
即便涉及到本机跨进程,也尽量采用分布式的互斥锁+条件变量来处理;
来看一下读写锁的例子;
读并发、写互斥、读写之间互斥
参考一,给出了一种实现方式,但是该实现方式有个严重问题,就是写操作
可能会产生饥饿态
因为一旦出现了阻塞写
,后续的所有读
都是可以排在该写
之前的
实际上,后续的读
应该排队,而且在写
之后
void* writer(void* arg)
{
rw.P(); // 互斥访问共享文件
printf(" Writer %d start writing...\n", arg);
sleep(1);
printf(" Writer %d finish writing...\n", arg);
rw.V(); // 释放共享文件
}
void* reader(void* arg)
{
mutex.P(); // 互斥访问count变量
if(count == 0) // 当第一个读线程读文件时
rw.P(); // 阻止写线程写
++count; // 读者计数器加1
mutex.V(); // 释放count变量
printf("Reader %d start reading...\n", arg);
sleep(1);
printf("Reader %d finish reading...\n", arg);
mutex.P(); // 互斥访问count变量
--count; // 读者计数器减1
if(count == 0) // 当最后一个读线程读完文件
rw.V(); // 允许写线程写
mutex.V(); // 释放count变量
}
参考三,读写锁的不会产生写饥饿
,但是注意,以下代码,读锁不会死锁,写锁会造成死锁
package main
import (
"fmt"
"sync"
)
func main() {
rw := &sync.RWMutex{}
rw.RLock()
rw.RLock()
fmt.Printf("Rlock finished\n")
rw.RUnlock()
rw.RUnlock()
fmt.Printf("RUnlock finished\n")
rw.Lock()
rw.Lock()
fmt.Printf("Lock finished\n")
rw.Unlock()
rw.Unlock()
fmt.Printf("Unlock finished\n")
}
参考三所提到的锁嵌套
,主要针对的是写锁
(或者互斥量
,读锁
严格意义上不是锁
),但是在不同协程内,是可以作到两次Lock
产生阻塞的,下面的代码就没问题:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
rw := &sync.RWMutex{}
rw.RLock()
rw.RLock()
fmt.Printf("Rlock finished\n")
rw.RUnlock()
rw.RUnlock()
fmt.Printf("RUnlock finished\n")
go func() {
rw.Lock()
fmt.Printf("Lock1 finished\n")
time.Sleep(time.Second * 2)
rw.Unlock()
fmt.Printf("Unlock1 finished\n")
}()
go func() {
time.Sleep(time.Second * 1)
rw.Lock()
fmt.Printf("Lock2 finished\n")
time.Sleep(time.Second * 3)
rw.Unlock()
fmt.Printf("Unlock1 finished\n")
}()
time.Sleep(time.Second * 1000)
}
参考go的源代码读写锁,可以写出以下代码
package main
import (
"fmt"
"golang.org/x/net/context"
"golang.org/x/sync/semaphore"
"sync"
"sync/atomic"
"time"
)
const rwmutexMaxReaders = 10000
type RW struct {
readerSem *semaphore.Weighted
writerSem *semaphore.Weighted
readerCount int32
readerWait int32
w *sync.Mutex
}
func (rw *RW) Lock() {
rw.w.Lock()
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
ctx := context.Background()
rw.writerSem.Acquire(ctx, 1)
}
}
func (rw *RW) Unlock() {
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
// check
for i := 0; i < int(r); i++ {
rw.readerSem.Release(1)
}
rw.w.Unlock()
}
func (rw *RW) RLock() {
if r := atomic.AddInt32(&rw.readerCount, 1); r < 0 {
ctx := context.Background()
rw.readerSem.Acquire(ctx, 1)
}
}
func (rw *RW) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// check r
if 0 == atomic.AddInt32(&rw.readerWait, -1) {
rw.writerSem.Release(1)
}
}
}
func getRw() *RW {
rw := &RW{
readerSem: semaphore.NewWeighted(1),
writerSem: semaphore.NewWeighted(1),
readerCount: 0,
readerWait: 0,
w: &sync.Mutex{},
}
ctx := context.Background()
rw.writerSem.Acquire(ctx, 1)
rw.readerSem.Acquire(ctx, 1)
return rw
}
func main() {
rw := getRw()
go func() {
fmt.Printf("Rlock before\n")
rw.RLock()
fmt.Printf("Rlock after\n")
time.Sleep(time.Second * 5)
fmt.Printf("RUnlock before\n")
rw.RUnlock()
fmt.Printf("RUnlock after\n")
}()
go func() {
time.Sleep(time.Second * 1)
fmt.Printf("Lock1 before\n")
rw.Lock()
fmt.Printf("Lock1 after\n")
time.Sleep(time.Second * 3)
fmt.Printf("Unlock1 before\n")
rw.Unlock()
fmt.Printf("Unlock1 after\n")
}()
go func() {
time.Sleep(time.Second * 2)
fmt.Printf("Lock2 before\n")
rw.Lock()
fmt.Printf("Lock2 after\n")
time.Sleep(time.Second * 4)
fmt.Printf("Unlock2 before\n")
rw.Unlock()
fmt.Printf("Unlock2 after\n")
}()
time.Sleep(time.Second * 1000)
}
编译运行结果
Rlock before
Rlock after
Lock1 before => 被Rlock读者阻塞
Lock2 before => 被Rlock读者、Lock1写者阻塞
RUnlock before
RUnlock after
Lock1 after => RLock读者释放锁,Lock1写者就能取到锁
Unlock1 before
Unlock1 after
Lock2 after
Unlock2 before
Unlock2 after
上面代码需要注意的是在getRw()
中
初始化使用了semaphore.NewWeighted(1)
,然后紧接着rw.writerSem.Acquire(ctx, 1)
主要是因为这个信号量库不支持size = 0
时候Acquire
即P操作
既然如此,那就利用条件变量实现一个简单粗暴的Semaphore,支持0
初始值的操作
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
const rwmutexMaxReaders = 10000
type Sem struct {
sum int64
cond *sync.Cond
}
func NewSem(sum int64) *Sem {
return &Sem{
sum: sum,
cond: sync.NewCond(&sync.Mutex{}),
}
}
func (c *Sem) Acquire(n int64) {
c.cond.L.Lock()
for c.sum < n {
c.cond.Wait()
}
c.cond.L.Unlock()
}
func (c *Sem) Release(n int64) {
c.cond.L.Lock()
c.sum += n
c.cond.Broadcast()
c.cond.L.Unlock()
}
type RW struct {
readerSem *Sem
writerSem *Sem
readerCount int32
readerWait int32
w *sync.Mutex
}
func (rw *RW) Lock() {
rw.w.Lock()
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
rw.writerSem.Acquire(1)
}
}
func (rw *RW) Unlock() {
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
// check
for i := 0; i < int(r); i++ {
rw.readerSem.Release(1)
}
rw.w.Unlock()
}
func (rw *RW) RLock() {
if r := atomic.AddInt32(&rw.readerCount, 1); r < 0 {
rw.readerSem.Acquire(1)
}
}
func (rw *RW) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// check r
if 0 == atomic.AddInt32(&rw.readerWait, -1) {
rw.writerSem.Release(1)
}
}
}
func main() {
rw := RW{
readerSem: NewSem(0),
writerSem: NewSem(0),
readerCount: 0,
readerWait: 0,
w: &sync.Mutex{},
}
go func() {
fmt.Printf("Rlock before\n")
rw.RLock()
fmt.Printf("Rlock after\n")
time.Sleep(time.Second * 5)
fmt.Printf("RUnlock before\n")
rw.RUnlock()
fmt.Printf("RUnlock after\n")
}()
go func() {
time.Sleep(time.Second * 1)
fmt.Printf("Lock1 before\n")
rw.Lock()
fmt.Printf("Lock1 after\n")
time.Sleep(time.Second * 3)
fmt.Printf("Unlock1 before\n")
rw.Unlock()
fmt.Printf("Unlock1 after\n")
}()
go func() {
time.Sleep(time.Second * 2)
fmt.Printf("Lock2 before\n")
rw.Lock()
fmt.Printf("Lock2 after\n")
time.Sleep(time.Second * 4)
fmt.Printf("Unlock2 before\n")
rw.Unlock()
fmt.Printf("Unlock2 after\n")
}()
time.Sleep(time.Second * 1000)
}
//reader到来,前面有writer
RLock()
if writer exist:
wait(readerSem)
Unlock()
signal(readerSem)
//writer到来,前面有reader
Lock()
if reader exist:
wait(writerSem)
RUnlock()
signal(writerSem)