占位...
... 又一个模式
package main
import (
"fmt"
"runtime"
"time"
)
// --------------------------- Job ---------------------
type Job interface {
Do()
}
// --------------------------- Worker ---------------------
type Worker struct {
JobQueue chan Job
}
func NewWorker() Worker {
return Worker{JobQueue: make(chan Job)}
}
func (w Worker) Run(wq chan chan Job) {
go func() {
for {
// 不会退出, job chan持续传给 workpool的 workqueue
// 这里加入队列, chan job 加入 workpool的chan chan job,后边workpool有定义取出动作
wq <- w.JobQueue // 执select行前,释放占用,这样 workpool中的chan chan job channel 增加一,触发workpool 中的select
// 将自己的chan job 注册到workpool 中的chan chan job
select {
case job := <-w.JobQueue:
job.Do()
}
}
}()
}
// --------------------------- WorkerPool ---------------------
type WorkerPool struct {
workerlen int
JobQueue chan Job
WorkerQueue chan chan Job
}
func NewWorkerPool(workerlen int) *WorkerPool {
return &WorkerPool{
workerlen: workerlen,
JobQueue: make(chan Job),
WorkerQueue: make(chan chan Job, workerlen),
}
}
func (wp *WorkerPool) Run() {
fmt.Println("初始化worker")
//初始化worker
for i := 0; i < wp.workerlen; i++ {
worker := NewWorker()
worker.Run(wp.WorkerQueue)
}
// 循环获取可用的worker,往worker中写job
go func() {
for {
select {
case job := <-wp.JobQueue: // 对内输出channel,接受调用者输入
worker := <-wp.WorkerQueue // 从 chan chan job 中取出, work run中会发送补充此channel
worker <- job // job 就是数据,外界输入通过job Queue 传给 worker
}
}
}()
}
type Score struct {
Num int
}
func (s *Score) Do() {
fmt.Println("num:", s.Num)
time.Sleep(1 * 1 * time.Second)
}
func main() {
// num := 100 * 100 * 20
num := 10
// debug.SetMaxThreads(num + 1000) //设置最大线程数
// 注册工作池,传入任务
// 参数1 worker并发个数
p := NewWorkerPool(num)
p.Run()
// datanum := 100 * 100 * 100 * 100
datanum := 15
go func() {
for i := 1; i <= datanum; i++ {
sc := &Score{Num: i}
p.JobQueue <- sc
}
}()
for {
fmt.Println("runtime.NumGoroutine() :", runtime.NumGoroutine())
time.Sleep(2 * time.Second)
}
}
...自己写的,凑合用
package main
import (
"log"
)
import "sync"
func main() {
log.SetFlags(log.Llongfile | log.LstdFlags)
var wg sync.WaitGroup
listn := []int{}
for i := 1; i < 6; i++ {
listn = append(listn, i)
}
log.Println(listn)
ch := make(chan int, 13)
go func() {
for _, v := range listn {
log.Println("push to ch")
ch <- v
}
close(ch)
}()
wg.Add(13)
huzh:=5
for i := 0; i < 13; i++ {
go func(i int) {
defer wg.Done()
for v :=range ch{
log.Println(v,huzh,i)
}
}(i)
}
wg.Wait()
}
from https://gobyexample.com/worker-pools
package main
import (
"fmt"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
results <- j * 2
}
}
func main() {
const numJobs = 5
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
for a := 1; a <= numJobs; a++ {
<-results
}
}
另外一个例子,生成固定数量的goroutine,这些协程从一个chan中使用for方式取数, 在排入另一个chan, 结果模块在第二个chan使用for方式取数
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Task struct {
id int
randnum int
}
type Result struct {
task Task
result int
}
var tasks = make(chan Task, 10)
var results = make(chan Result, 10)
func process(num int) int {
sum := 0
for num != 0 {
digit := num % 10
sum += digit
num /= 10
}
//time.Sleep(2 * time.Second)
return sum
}
func worker(wg *sync.WaitGroup) {
defer wg.Done()
for task := range tasks {
result := Result{task, process(task.randnum)}
results <- result
}
}
func createWorkerPool(numOfWorkers int) {
var wg sync.WaitGroup
//createWorkerPool生成固定的work数量,保持中...
for i := 0; i < numOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
func allocate(numOfTasks int) {
//任务源,持续输出
for i := 0; i < numOfTasks; i++ {
randnum := rand.Intn(999)
task := Task{i, randnum}
//传到数量固定的channel->tasks中
tasks <- task
}
close(tasks)
}
func getResult(done chan bool) {
for result := range results {
fmt.Printf("Task id %d, randnum %d , sum %d
", result.task.id, result.task.randnum, result.result)
}
done <- true
}
func main() {
startTime := time.Now()
numOfWorkers := 20
numOfTasks := 100
var done = make(chan bool)
go getResult(done)
go allocate(numOfTasks)
go createWorkerPool(numOfWorkers)
// 必须在allocate()和getResult()之后创建工作池
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
...一个错误的demo
package main
import (
"bytes"
"fmt"
"log"
"os/exec"
"strconv"
"strings"
"sync"
"time"
"github.com/grd/stat"
)
/*
RunStress no comment
*/
func RunStress(wg *sync.WaitGroup, ch chan string) {
log.Println("entry RunStress")
wg.Add(1)
defer wg.Done()
cmd := exec.Command("stress", "-c", "4")
cmd.Stdin = strings.NewReader("some input")
var out bytes.Buffer
cmd.Stdout = &out
log.Println(out)
log.Println("befor cmd.Run")
ch <- "hello"
err := cmd.Run()
if err != nil {
log.Fatal(err)
}
}
type Process struct {
pid int
cpu float64
}
func CalcStressCpu(wg *sync.WaitGroup, ch chan string) {
log.Println("entry CalcStress")
wg.Add(1)
defer wg.Done()
log.Println(<-ch)
time.Sleep(5 * time.Second)
cmd := exec.Command("ps", "aux")
var out bytes.Buffer
cmd.Stdout = &out
err := cmd.Run()
if err != nil {
log.Fatal(err)
}
processes := make([]*Process, 0)
for {
line, err := out.ReadString('
')
if err != nil {
break
}
if !strings.Contains(line, "stress -c") || !strings.Contains(line, "R+") {
continue
}
log.Println("line====>", line)
tokens := strings.Split(line, " ")
log.Println("tokens begin")
log.Println("tokens====>", tokens)
log.Println("tokens end")
ft := make([]string, 0)
for _, t := range tokens {
if t != "" && t != " " {
ft = append(ft, t)
}
}
pid, err := strconv.Atoi(ft[1])
if err != nil {
continue
}
cpu, err := strconv.ParseFloat(ft[2], 64)
if err != nil {
log.Fatal(err)
}
processes = append(processes, &Process{pid, cpu})
}
data := stat.Float64Slice{}
for _, p := range processes {
log.Println("Process ", p.pid, " takes ", p.cpu, " % of the CPU")
data = append(data, p.cpu)
}
variance := stat.Variance(&data)
fmt.Printf("The estimated variance is %.4f", variance)
}
func main() {
wg := &sync.WaitGroup{}
ch1 := make(chan string)
go CalcStressCpu(wg, ch1)
go RunStress(wg, ch1)
wg.Wait()
time.Sleep(10 * time.Second)
}
...一个正确点的demo
package main
import (
"bytes"
"fmt"
"log"
"os/exec"
"strconv"
"strings"
"sync"
"time"
"github.com/grd/stat"
)
/*
RunStress no comment
*/
func RunStress(wg *sync.WaitGroup, ch chan string) {
defer wg.Done()
//kill stress for init
cmd_stop := exec.Command("pkill", "stress")
cmd_stop.Run()
//start stress
cmd := exec.Command("stress", "-c", "4")
//start stress and notify the calc start
ch <- "hello"
cmd.Run()
// if err != nil {
// log.Println("recieved signal")
// // log.Fatal(err)
// }
}
type Process struct {
pid int
cpu float64
}
func CalcStressCpu(wg *sync.WaitGroup, ch chan string) {
// log.Println("entry calc stress")
defer wg.Done()
//make sure the calc after "start stress"
<-ch
time.Sleep(5 * time.Second)
cmd := exec.Command("ps", "aux")
var out bytes.Buffer
cmd.Stdout = &out
err := cmd.Run()
if err != nil {
log.Fatal(err)
}
processes := make([]*Process, 0)
for {
line, err := out.ReadString('
')
if err != nil {
break
}
if !strings.Contains(line, "stress -c") || !strings.Contains(line, "R+") {
continue
}
tokens := strings.Split(line, " ")
ft := make([]string, 0)
for _, t := range tokens {
if t != "" && t != " " {
ft = append(ft, t)
}
}
pid, err := strconv.Atoi(ft[1])
if err != nil {
continue
}
cpu, err := strconv.ParseFloat(ft[2], 64)
if err != nil {
log.Fatal(err)
}
processes = append(processes, &Process{pid, cpu})
}
data := stat.Float64Slice{}
for _, p := range processes {
// log.Println("Process ", p.pid, " takes ", p.cpu, " % of the CPU")
data = append(data, p.cpu)
}
variance := stat.Variance(&data)
fmt.Printf("The estimated variance is %.4f", variance)
//after calc , stop the stress
cmd_stop := exec.Command("pkill", "stress")
cmd_stop.Run()
}
func main() {
wg := &sync.WaitGroup{}
ch1 := make(chan string)
wg.Add(1)
go CalcStressCpu(wg, ch1)
wg.Add(1)
go RunStress(wg, ch1)
wg.Wait()
}