占位...
... 又一个模式
package main import ( "fmt" "runtime" "time" ) // --------------------------- Job --------------------- type Job interface { Do() } // --------------------------- Worker --------------------- type Worker struct { JobQueue chan Job } func NewWorker() Worker { return Worker{JobQueue: make(chan Job)} } func (w Worker) Run(wq chan chan Job) { go func() { for { // 不会退出, job chan持续传给 workpool的 workqueue // 这里加入队列, chan job 加入 workpool的chan chan job,后边workpool有定义取出动作 wq <- w.JobQueue // 执select行前,释放占用,这样 workpool中的chan chan job channel 增加一,触发workpool 中的select // 将自己的chan job 注册到workpool 中的chan chan job select { case job := <-w.JobQueue: job.Do() } } }() } // --------------------------- WorkerPool --------------------- type WorkerPool struct { workerlen int JobQueue chan Job WorkerQueue chan chan Job } func NewWorkerPool(workerlen int) *WorkerPool { return &WorkerPool{ workerlen: workerlen, JobQueue: make(chan Job), WorkerQueue: make(chan chan Job, workerlen), } } func (wp *WorkerPool) Run() { fmt.Println("初始化worker") //初始化worker for i := 0; i < wp.workerlen; i++ { worker := NewWorker() worker.Run(wp.WorkerQueue) } // 循环获取可用的worker,往worker中写job go func() { for { select { case job := <-wp.JobQueue: // 对内输出channel,接受调用者输入 worker := <-wp.WorkerQueue // 从 chan chan job 中取出, work run中会发送补充此channel worker <- job // job 就是数据,外界输入通过job Queue 传给 worker } } }() } type Score struct { Num int } func (s *Score) Do() { fmt.Println("num:", s.Num) time.Sleep(1 * 1 * time.Second) } func main() { // num := 100 * 100 * 20 num := 10 // debug.SetMaxThreads(num + 1000) //设置最大线程数 // 注册工作池,传入任务 // 参数1 worker并发个数 p := NewWorkerPool(num) p.Run() // datanum := 100 * 100 * 100 * 100 datanum := 15 go func() { for i := 1; i <= datanum; i++ { sc := &Score{Num: i} p.JobQueue <- sc } }() for { fmt.Println("runtime.NumGoroutine() :", runtime.NumGoroutine()) time.Sleep(2 * time.Second) } }
...自己写的,凑合用
package main import ( "log" ) import "sync" func main() { log.SetFlags(log.Llongfile | log.LstdFlags) var wg sync.WaitGroup listn := []int{} for i := 1; i < 6; i++ { listn = append(listn, i) } log.Println(listn) ch := make(chan int, 13) go func() { for _, v := range listn { log.Println("push to ch") ch <- v } close(ch) }() wg.Add(13) huzh:=5 for i := 0; i < 13; i++ { go func(i int) { defer wg.Done() for v :=range ch{ log.Println(v,huzh,i) } }(i) } wg.Wait() }
from https://gobyexample.com/worker-pools
package main import ( "fmt" "time" ) func worker(id int, jobs <-chan int, results chan<- int) { for j := range jobs { fmt.Println("worker", id, "started job", j) time.Sleep(time.Second) fmt.Println("worker", id, "finished job", j) results <- j * 2 } } func main() { const numJobs = 5 jobs := make(chan int, numJobs) results := make(chan int, numJobs) for w := 1; w <= 3; w++ { go worker(w, jobs, results) } for j := 1; j <= numJobs; j++ { jobs <- j } close(jobs) for a := 1; a <= numJobs; a++ { <-results } }
另外一个例子,生成固定数量的goroutine,这些协程从一个chan中使用for方式取数, 在排入另一个chan, 结果模块在第二个chan使用for方式取数
package main import ( "fmt" "math/rand" "sync" "time" ) type Task struct { id int randnum int } type Result struct { task Task result int } var tasks = make(chan Task, 10) var results = make(chan Result, 10) func process(num int) int { sum := 0 for num != 0 { digit := num % 10 sum += digit num /= 10 } //time.Sleep(2 * time.Second) return sum } func worker(wg *sync.WaitGroup) { defer wg.Done() for task := range tasks { result := Result{task, process(task.randnum)} results <- result } } func createWorkerPool(numOfWorkers int) { var wg sync.WaitGroup //createWorkerPool生成固定的work数量,保持中... for i := 0; i < numOfWorkers; i++ { wg.Add(1) go worker(&wg) } wg.Wait() close(results) } func allocate(numOfTasks int) { //任务源,持续输出 for i := 0; i < numOfTasks; i++ { randnum := rand.Intn(999) task := Task{i, randnum} //传到数量固定的channel->tasks中 tasks <- task } close(tasks) } func getResult(done chan bool) { for result := range results { fmt.Printf("Task id %d, randnum %d , sum %d ", result.task.id, result.task.randnum, result.result) } done <- true } func main() { startTime := time.Now() numOfWorkers := 20 numOfTasks := 100 var done = make(chan bool) go getResult(done) go allocate(numOfTasks) go createWorkerPool(numOfWorkers) // 必须在allocate()和getResult()之后创建工作池 <-done endTime := time.Now() diff := endTime.Sub(startTime) fmt.Println("total time taken ", diff.Seconds(), "seconds") }
...一个错误的demo
package main import ( "bytes" "fmt" "log" "os/exec" "strconv" "strings" "sync" "time" "github.com/grd/stat" ) /* RunStress no comment */ func RunStress(wg *sync.WaitGroup, ch chan string) { log.Println("entry RunStress") wg.Add(1) defer wg.Done() cmd := exec.Command("stress", "-c", "4") cmd.Stdin = strings.NewReader("some input") var out bytes.Buffer cmd.Stdout = &out log.Println(out) log.Println("befor cmd.Run") ch <- "hello" err := cmd.Run() if err != nil { log.Fatal(err) } } type Process struct { pid int cpu float64 } func CalcStressCpu(wg *sync.WaitGroup, ch chan string) { log.Println("entry CalcStress") wg.Add(1) defer wg.Done() log.Println(<-ch) time.Sleep(5 * time.Second) cmd := exec.Command("ps", "aux") var out bytes.Buffer cmd.Stdout = &out err := cmd.Run() if err != nil { log.Fatal(err) } processes := make([]*Process, 0) for { line, err := out.ReadString(' ') if err != nil { break } if !strings.Contains(line, "stress -c") || !strings.Contains(line, "R+") { continue } log.Println("line====>", line) tokens := strings.Split(line, " ") log.Println("tokens begin") log.Println("tokens====>", tokens) log.Println("tokens end") ft := make([]string, 0) for _, t := range tokens { if t != "" && t != " " { ft = append(ft, t) } } pid, err := strconv.Atoi(ft[1]) if err != nil { continue } cpu, err := strconv.ParseFloat(ft[2], 64) if err != nil { log.Fatal(err) } processes = append(processes, &Process{pid, cpu}) } data := stat.Float64Slice{} for _, p := range processes { log.Println("Process ", p.pid, " takes ", p.cpu, " % of the CPU") data = append(data, p.cpu) } variance := stat.Variance(&data) fmt.Printf("The estimated variance is %.4f", variance) } func main() { wg := &sync.WaitGroup{} ch1 := make(chan string) go CalcStressCpu(wg, ch1) go RunStress(wg, ch1) wg.Wait() time.Sleep(10 * time.Second) }
...一个正确点的demo
package main import ( "bytes" "fmt" "log" "os/exec" "strconv" "strings" "sync" "time" "github.com/grd/stat" ) /* RunStress no comment */ func RunStress(wg *sync.WaitGroup, ch chan string) { defer wg.Done() //kill stress for init cmd_stop := exec.Command("pkill", "stress") cmd_stop.Run() //start stress cmd := exec.Command("stress", "-c", "4") //start stress and notify the calc start ch <- "hello" cmd.Run() // if err != nil { // log.Println("recieved signal") // // log.Fatal(err) // } } type Process struct { pid int cpu float64 } func CalcStressCpu(wg *sync.WaitGroup, ch chan string) { // log.Println("entry calc stress") defer wg.Done() //make sure the calc after "start stress" <-ch time.Sleep(5 * time.Second) cmd := exec.Command("ps", "aux") var out bytes.Buffer cmd.Stdout = &out err := cmd.Run() if err != nil { log.Fatal(err) } processes := make([]*Process, 0) for { line, err := out.ReadString(' ') if err != nil { break } if !strings.Contains(line, "stress -c") || !strings.Contains(line, "R+") { continue } tokens := strings.Split(line, " ") ft := make([]string, 0) for _, t := range tokens { if t != "" && t != " " { ft = append(ft, t) } } pid, err := strconv.Atoi(ft[1]) if err != nil { continue } cpu, err := strconv.ParseFloat(ft[2], 64) if err != nil { log.Fatal(err) } processes = append(processes, &Process{pid, cpu}) } data := stat.Float64Slice{} for _, p := range processes { // log.Println("Process ", p.pid, " takes ", p.cpu, " % of the CPU") data = append(data, p.cpu) } variance := stat.Variance(&data) fmt.Printf("The estimated variance is %.4f", variance) //after calc , stop the stress cmd_stop := exec.Command("pkill", "stress") cmd_stop.Run() } func main() { wg := &sync.WaitGroup{} ch1 := make(chan string) wg.Add(1) go CalcStressCpu(wg, ch1) wg.Add(1) go RunStress(wg, ch1) wg.Wait() }