关于锁的总结(三)信号量

难点

个人感觉信号量主要难在应用场景上;
同等对比,互斥量主要用来上锁,条件变量来优化多任务等待
其中互斥量正常场景下,都应该(最好都是遵从这样方式)保持上锁、解锁是同一个task;

在《评论迁移》里面,分布式锁redis的set NX 过程中,参数是key、requireId、expiration,其中requireId(多task环境下唯一ID)就是为了保证上锁、解锁是同一个task

综合来看,好像不需要信号量这类东西的存在
《UNIX 网络编程 卷2:进程间通信》 一书上讲了互斥量、信号量、条件变量的区别

(1)互斥锁必须总是由给它上锁的的线程(任务)解锁,信号量的挂出却不必由执行过它的等待操作的同一线程执行。(这是个特性,实际测试中go语言中的互斥量其实是可以的)。
(2)互斥锁要么被锁住,要么被解开(二值状态,类似于二值信号量)。
(3)既然信号量有一个与之关联的状态(它的计数值),那么信号量挂出操作总是被记住。然后

PV操作

p操作是递减或者加锁等待,对应sem_wait
V操作是递增或者解锁,并唤醒等待池中的task,sem_post

从两个原子操作上来看,可以了解到有个计数器,而计数器是临界资源的,所以在处理信号量的过程中,访问计数器需要额外添加一个二值信号量(或者采用mutex)
整体上面还是很绕的,举个栗子

桌子上三个盘子,父亲在有空盘子情况下,剥好一个橘子放上去;母亲会削好一个苹果放上去; 哥哥在盘子中有橘子情况下消耗一个橘子,姐姐则会消耗一个苹果;此过程会一直循环;

整体分析一下几类信号量、计数器

  • 空盘子信号量 empty_sem
  • 占用盘子信号量 used_sem
  • 盘子中橘子数量 orange_cnt
  • 盘子中苹果数量 apple_cnt
  • 访问计数器时的二元信号量 cntermutexsem

对于父亲的操作 (母亲类似)

empty_sem.P()  // 空盘子信号量递减或者加锁等待  
cnter_mutex_sem.P()  
orange_cnt++  
cnter_mutex_sem.V()  
used_sem.V()   // 占用盘子信号量递增且唤醒消费者  

对于哥哥的操作(姐姐类似)

cnter_mutex_sem.P()   //锁的粒度需要加在外层  
if orange_cnt > 0  
    orange_cnt--
    empty_sem.V()
    used_sem.P()
cnter_mutex_sem.V()  

读写

不推荐使用信号量,尽量使用互斥量+条件变量处理同步互斥问题;
即便涉及到本机跨进程,也尽量采用分布式的互斥锁+条件变量来处理;

来看一下读写锁的例子;

读并发、写互斥、读写之间互斥

参考一,给出了一种实现方式,但是该实现方式有个严重问题,就是写操作可能会产生饥饿态
因为一旦出现了阻塞写,后续的所有都是可以排在该之前的
实际上,后续的应该排队,而且在之后

void* writer(void* arg)  
{
    rw.P();              // 互斥访问共享文件

    printf("  Writer %d start writing...\n", arg);
    sleep(1);
    printf("  Writer %d finish writing...\n", arg);

    rw.V();              // 释放共享文件
}

void* reader(void* arg)  
{
    mutex.P();           // 互斥访问count变量
    if(count == 0)       // 当第一个读线程读文件时
        rw.P();          // 阻止写线程写
    ++count;             // 读者计数器加1
    mutex.V();           // 释放count变量

    printf("Reader %d start reading...\n", arg);
    sleep(1);
    printf("Reader %d finish reading...\n", arg);

    mutex.P();           // 互斥访问count变量
    --count;             // 读者计数器减1
    if(count == 0)       // 当最后一个读线程读完文件
        rw.V();          // 允许写线程写
    mutex.V();           // 释放count变量
}


参考三,读写锁的不会产生写饥饿,但是注意,以下代码,读锁不会死锁,写锁会造成死锁

package main

import (  
    "fmt"
    "sync"
)

func main() {  
    rw := &sync.RWMutex{}
    rw.RLock()
    rw.RLock()
    fmt.Printf("Rlock finished\n")
    rw.RUnlock()
    rw.RUnlock()
    fmt.Printf("RUnlock finished\n")

    rw.Lock()
    rw.Lock()
    fmt.Printf("Lock finished\n")
    rw.Unlock()
    rw.Unlock()
    fmt.Printf("Unlock finished\n")
}


参考三所提到的锁嵌套,主要针对的是写锁(或者互斥量读锁严格意义上不是),但是在不同协程内,是可以作到两次Lock产生阻塞的,下面的代码就没问题:

package main

import (  
    "fmt"
    "sync"
    "time"
)

func main() {  
    rw := &sync.RWMutex{}
    rw.RLock()
    rw.RLock()
    fmt.Printf("Rlock finished\n")
    rw.RUnlock()
    rw.RUnlock()
    fmt.Printf("RUnlock finished\n")

    go func() {
        rw.Lock()
        fmt.Printf("Lock1 finished\n")
        time.Sleep(time.Second * 2)
        rw.Unlock()
        fmt.Printf("Unlock1 finished\n")
    }()
    go func() {
        time.Sleep(time.Second * 1)
        rw.Lock()
        fmt.Printf("Lock2 finished\n")
        time.Sleep(time.Second * 3)
        rw.Unlock()
        fmt.Printf("Unlock1 finished\n")
    }()
    time.Sleep(time.Second * 1000)
}

参考go的源代码读写锁,可以写出以下代码

package main

import (  
        "fmt"
        "golang.org/x/net/context"
        "golang.org/x/sync/semaphore"
        "sync"
        "sync/atomic"
        "time"
)

const rwmutexMaxReaders = 10000

type RW struct {  
        readerSem   *semaphore.Weighted
        writerSem   *semaphore.Weighted
        readerCount int32
        readerWait  int32
        w           *sync.Mutex
}

func (rw *RW) Lock() {  
        rw.w.Lock()
        r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
        if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
                ctx := context.Background()
                rw.writerSem.Acquire(ctx, 1)
        }
}

func (rw *RW) Unlock() {  
        r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
        // check
        for i := 0; i < int(r); i++ {
                rw.readerSem.Release(1)
        }
        rw.w.Unlock()
}

func (rw *RW) RLock() {  
        if r := atomic.AddInt32(&rw.readerCount, 1); r < 0 {
                ctx := context.Background()
                rw.readerSem.Acquire(ctx, 1)
        }
}

func (rw *RW) RUnlock() {  
        if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
                // check r
                if 0 == atomic.AddInt32(&rw.readerWait, -1) {
                        rw.writerSem.Release(1)
                }
        }
}

func getRw() *RW {  
        rw := &RW{
                readerSem:   semaphore.NewWeighted(1),
                writerSem:   semaphore.NewWeighted(1),
                readerCount: 0,
                readerWait:  0,
                w:           &sync.Mutex{},
        }
        ctx := context.Background()
        rw.writerSem.Acquire(ctx, 1)
        rw.readerSem.Acquire(ctx, 1)
        return rw
}

func main() {

        rw := getRw()

        go func() {
                fmt.Printf("Rlock before\n")
                rw.RLock()
                fmt.Printf("Rlock after\n")
                time.Sleep(time.Second * 5)
                fmt.Printf("RUnlock before\n")
                rw.RUnlock()
                fmt.Printf("RUnlock after\n")
        }()

        go func() {
                time.Sleep(time.Second * 1)
                fmt.Printf("Lock1 before\n")
                rw.Lock()
                fmt.Printf("Lock1 after\n")
                time.Sleep(time.Second * 3)
                fmt.Printf("Unlock1 before\n")
                rw.Unlock()
                fmt.Printf("Unlock1 after\n")
        }()
        go func() {
                time.Sleep(time.Second * 2)
                fmt.Printf("Lock2 before\n")
                rw.Lock()
                fmt.Printf("Lock2 after\n")
                time.Sleep(time.Second * 4)
                fmt.Printf("Unlock2 before\n")
                rw.Unlock()
                fmt.Printf("Unlock2 after\n")
        }()
        time.Sleep(time.Second * 1000)
}

编译运行结果
Rlock before  
Rlock after  
Lock1 before    => 被Rlock读者阻塞  
Lock2 before    => 被Rlock读者、Lock1写者阻塞  
RUnlock before  
RUnlock after  
Lock1 after     => RLock读者释放锁,Lock1写者就能取到锁  
Unlock1 before  
Unlock1 after  
Lock2 after  
Unlock2 before  
Unlock2 after  

上面代码需要注意的是在getRw()
初始化使用了semaphore.NewWeighted(1),然后紧接着rw.writerSem.Acquire(ctx, 1)
主要是因为这个信号量库不支持size = 0时候AcquireP操作

既然如此,那就利用条件变量实现一个简单粗暴的Semaphore,支持0初始值的操作

package main

import (  
        "fmt"
        "sync"
        "sync/atomic"
        "time"
)

const rwmutexMaxReaders = 10000

type Sem struct {  
        sum  int64
        cond *sync.Cond
}

func NewSem(sum int64) *Sem {  
        return &Sem{
                sum:  sum,
                cond: sync.NewCond(&sync.Mutex{}),
        }
}

func (c *Sem) Acquire(n int64) {  
        c.cond.L.Lock()
        for c.sum < n {
                c.cond.Wait()
        }
        c.cond.L.Unlock()
}

func (c *Sem) Release(n int64) {  
        c.cond.L.Lock()
        c.sum += n
        c.cond.Broadcast()
        c.cond.L.Unlock()
}

type RW struct {  
        readerSem   *Sem
        writerSem   *Sem
        readerCount int32
        readerWait  int32
        w           *sync.Mutex
}

func (rw *RW) Lock() {  
        rw.w.Lock()
        r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
        if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
                rw.writerSem.Acquire(1)
        }
}

func (rw *RW) Unlock() {  
        r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
        // check
        for i := 0; i < int(r); i++ {
                rw.readerSem.Release(1)
        }
        rw.w.Unlock()
}

func (rw *RW) RLock() {  
        if r := atomic.AddInt32(&rw.readerCount, 1); r < 0 {
                rw.readerSem.Acquire(1)
        }
}

func (rw *RW) RUnlock() {  
        if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
                // check r
                if 0 == atomic.AddInt32(&rw.readerWait, -1) {
                        rw.writerSem.Release(1)
                }
        }
}

func main() {  
        rw := RW{
                readerSem:   NewSem(0),
                writerSem:   NewSem(0),
                readerCount: 0,
                readerWait:  0,
                w:           &sync.Mutex{},
        }

        go func() {
                fmt.Printf("Rlock before\n")
                rw.RLock()
                fmt.Printf("Rlock after\n")
                time.Sleep(time.Second * 5)
                fmt.Printf("RUnlock before\n")
                rw.RUnlock()
                fmt.Printf("RUnlock after\n")
        }()

        go func() {
                time.Sleep(time.Second * 1)
                fmt.Printf("Lock1 before\n")
                rw.Lock()
                fmt.Printf("Lock1 after\n")
                time.Sleep(time.Second * 3)
                fmt.Printf("Unlock1 before\n")
                rw.Unlock()
                fmt.Printf("Unlock1 after\n")
        }()
        go func() {
                time.Sleep(time.Second * 2)
                fmt.Printf("Lock2 before\n")
                rw.Lock()
                fmt.Printf("Lock2 after\n")
                time.Sleep(time.Second * 4)
                fmt.Printf("Unlock2 before\n")
                rw.Unlock()
                fmt.Printf("Unlock2 after\n")
        }()
        time.Sleep(time.Second * 1000)
}
//reader到来,前面有writer
RLock()  
    if writer exist:
        wait(readerSem)

Unlock()  
    signal(readerSem)

//writer到来,前面有reader
Lock()  
    if reader exist:
        wait(writerSem)

RUnlock()  
    signal(writerSem)

参考
【Linux多线程】三个经典同步问题
官方的semphore扩展包
剖析Go的读写锁