限制并发量

源码

今天调试了一下golang编译工具,发现了一个很有"奇怪"的地方

简化问题

假设有1000个任务,1000个并发协程,设置n(cpu核数)个物理处理器(task)并发完成,每个协程只要完成了当前任务采取完成下一个

代码

原始代码,go作者无所畏惧的使用协程

func parseFiles(filenames []string) uint {  
    var noders []*noder
    // Limit the number of simultaneously open files.
    sem := make(chan struct{}, runtime.GOMAXPROCS(0)+10)

    for _, filename := range filenames {
        go func(filename string) {
            sem <- struct{}{}
            defer func() { <-sem }()
                        ... 
        }(filename)
    }

针对问题代码

package main

import (  
    "fmt"
    "math/rand"
    "os"
    "os/signal"
    "runtime"
    "time"
)

func main() {

    c := make(chan os.Signal)
    signal.Notify(c, os.Interrupt)

    rand.Seed(time.Now().UnixNano())
    task := make([]func(), 1000)
    for i := 0; i < 1000; i++ {
        task[i] = func() {
            r := rand.Intn(100)
            time.Sleep(time.Duration(r) * time.Millisecond)
            fmt.Printf("Task cost %d\n", r)
        }
    }

    sem := make(chan struct{}, runtime.GOMAXPROCS(0)+10)

    for i := 0; i < len(task); i++ {
        f := task[i]
        go func(f func()) {
            sem <- struct{}{}
            defer func() { <-sem }()
            f()
        }(f)
    }

    <-c
}

如果控制10个协程,应该怎么写呢

package main

import (  
    "fmt"
    "math/rand"
    "os"
    "os/signal"
    "runtime"
    "sync"
    "time"
)

func main() {  
    c := make(chan os.Signal)
    signal.Notify(c, os.Interrupt)
    exit := make(chan bool)

    sem := make(chan func(), runtime.GOMAXPROCS(0))
    task := make([]func(), 1000, 1000)

    rand.Seed(time.Now().UnixNano())
    for i := 0; i < len(task); i++ {
        task[i] = func() {
            r := rand.Intn(100)
            time.Sleep(time.Duration(r) * time.Millisecond)
            fmt.Printf("task cost %d\n", r)
        }
    }

    wg := &sync.WaitGroup{}
    for i := 0; i < runtime.GOMAXPROCS(0); i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for {
                select {
                case f := <-sem:
                    f()
                case <-exit:
                    fmt.Printf("Gracefully exit....\n")
                    return
                }
            }
        }()
    }

    go func() {
        for i := 0; i < len(task); i++ {
            sem <- task[i]
        }
    }()

    <-c
    close(exit)
    wg.Wait()
}