zoukankan      html  css  js  c++  java
  • go 并发

    占位...

    ... 又一个模式

    package main
    
    import (
    	"fmt"
    	"runtime"
    	"time"
    )
    
    // --------------------------- Job ---------------------
    type Job interface {
    	Do()
    }
    
    // --------------------------- Worker ---------------------
    type Worker struct {
    	JobQueue chan Job
    }
    
    func NewWorker() Worker {
    	return Worker{JobQueue: make(chan Job)}
    }
    func (w Worker) Run(wq chan chan Job) {
    	go func() {
    		for {
    			// 不会退出, job chan持续传给 workpool的 workqueue
    			// 这里加入队列, chan job 加入 workpool的chan chan job,后边workpool有定义取出动作
    			wq <- w.JobQueue // 执select行前,释放占用,这样 workpool中的chan chan job channel 增加一,触发workpool 中的select
    			// 将自己的chan job 注册到workpool 中的chan chan job
    			select {
    			case job := <-w.JobQueue:
    				job.Do()
    			}
    		}
    	}()
    }
    
    // --------------------------- WorkerPool ---------------------
    type WorkerPool struct {
    	workerlen   int
    	JobQueue    chan Job
    	WorkerQueue chan chan Job
    }
    
    func NewWorkerPool(workerlen int) *WorkerPool {
    	return &WorkerPool{
    		workerlen:   workerlen,
    		JobQueue:    make(chan Job),
    		WorkerQueue: make(chan chan Job, workerlen),
    	}
    }
    func (wp *WorkerPool) Run() {
    	fmt.Println("初始化worker")
    	//初始化worker
    	for i := 0; i < wp.workerlen; i++ {
    		worker := NewWorker()
    		worker.Run(wp.WorkerQueue)
    	}
    	// 循环获取可用的worker,往worker中写job
    	go func() {
    		for {
    			select {
    			case job := <-wp.JobQueue: // 对内输出channel,接受调用者输入
    				worker := <-wp.WorkerQueue // 从 chan chan job 中取出, work run中会发送补充此channel
    				worker <- job              // job 就是数据,外界输入通过job Queue 传给 worker
    			}
    		}
    	}()
    }
    
    type Score struct {
    	Num int
    }
    
    func (s *Score) Do() {
    	fmt.Println("num:", s.Num)
    	time.Sleep(1 * 1 * time.Second)
    }
    
    func main() {
    	// num := 100 * 100 * 20
    	num := 10
    	// debug.SetMaxThreads(num + 1000) //设置最大线程数
    	// 注册工作池,传入任务
    	// 参数1 worker并发个数
    	p := NewWorkerPool(num)
    	p.Run()
    	// datanum := 100 * 100 * 100 * 100
    	datanum := 15
    	go func() {
    		for i := 1; i <= datanum; i++ {
    			sc := &Score{Num: i}
    			p.JobQueue <- sc
    		}
    	}()
    
    	for {
    		fmt.Println("runtime.NumGoroutine() :", runtime.NumGoroutine())
    		time.Sleep(2 * time.Second)
    	}
    
    }
    

      

    ...自己写的,凑合用

    package main
    
    import (
    	"log"
    )
    import "sync"
    
    
    func main() {
    	log.SetFlags(log.Llongfile | log.LstdFlags)
    	var wg sync.WaitGroup
    	listn := []int{}
    	for i := 1; i < 6; i++ {
    		listn = append(listn, i)
    	}
    	log.Println(listn)
    	ch := make(chan int, 13)
    	go func() {
    		for _, v := range listn {
    			log.Println("push to ch")
    			ch <- v
    		}
    		close(ch)
    	}()
    	wg.Add(13)
    	huzh:=5
    	for i := 0; i < 13; i++ {
    		go func(i int) {
    			defer wg.Done()
    			for v :=range ch{
    				log.Println(v,huzh,i)
    			}
    		}(i)
    	}
    	wg.Wait()
    }
    

      

    from https://gobyexample.com/worker-pools

    package main
    
    import (
        "fmt"
        "time"
    )
    
    func worker(id int, jobs <-chan int, results chan<- int) {
        for j := range jobs {
            fmt.Println("worker", id, "started  job", j)
            time.Sleep(time.Second)
            fmt.Println("worker", id, "finished job", j)
            results <- j * 2
        }
    }
    
    func main() {
    
        const numJobs = 5
        jobs := make(chan int, numJobs)
        results := make(chan int, numJobs)
    
        for w := 1; w <= 3; w++ {
            go worker(w, jobs, results)
        }
    
        for j := 1; j <= numJobs; j++ {
            jobs <- j
        }
        close(jobs)
    
        for a := 1; a <= numJobs; a++ {
            <-results
        }
    }
    

      

    另外一个例子,生成固定数量的goroutine,这些协程从一个chan中使用for方式取数, 在排入另一个chan, 结果模块在第二个chan使用for方式取数

    package main
    
    import (
    	"fmt"
    	"math/rand"
    	"sync"
    	"time"
    )
    
    type Task struct {
    	id      int
    	randnum int
    }
    type Result struct {
    	task   Task
    	result int
    }
    
    var tasks = make(chan Task, 10)
    var results = make(chan Result, 10)
    
    func process(num int) int {
    	sum := 0
    	for num != 0 {
    		digit := num % 10
    		sum += digit
    		num /= 10
    	}
    	//time.Sleep(2 * time.Second)
    	return sum
    }
    func worker(wg *sync.WaitGroup) {
    	defer wg.Done()
    	for task := range tasks {
    		result := Result{task, process(task.randnum)}
    		results <- result
    	}
    }
    func createWorkerPool(numOfWorkers int) {
    	var wg sync.WaitGroup
    	//createWorkerPool生成固定的work数量,保持中...
    	for i := 0; i < numOfWorkers; i++ {
    		wg.Add(1)
    		go worker(&wg)
    	}
    	wg.Wait()
    	close(results)
    }
    
    
    func allocate(numOfTasks int) {
    	//任务源,持续输出
    	for i := 0; i < numOfTasks; i++ {
    		randnum := rand.Intn(999)
    		task := Task{i, randnum}
    		//传到数量固定的channel->tasks中
    		tasks <- task
    	}
    	close(tasks)
    }
    func getResult(done chan bool) {
    	for result := range results {
    		fmt.Printf("Task id %d, randnum %d , sum %d
    ", result.task.id, result.task.randnum, result.result)
    	}
    	done <- true
    }
    func main() {
    	startTime := time.Now()
    	numOfWorkers := 20
    	numOfTasks := 100
    
    	var done = make(chan bool)
    	go getResult(done)
    	go allocate(numOfTasks)
    	go createWorkerPool(numOfWorkers)
    	// 必须在allocate()和getResult()之后创建工作池
    	<-done
    	endTime := time.Now()
    	diff := endTime.Sub(startTime)
    	fmt.Println("total time taken ", diff.Seconds(), "seconds")
    }
    

      

     

    ...一个错误的demo

    package main
    
    import (
    	"bytes"
    	"fmt"
    	"log"
    	"os/exec"
    	"strconv"
    	"strings"
    	"sync"
    	"time"
    
    	"github.com/grd/stat"
    )
    
    /*
    RunStress no comment
    */
    func RunStress(wg *sync.WaitGroup, ch chan string) {
    	log.Println("entry RunStress")
    	wg.Add(1)
    	defer wg.Done()
    	cmd := exec.Command("stress", "-c", "4")
    	cmd.Stdin = strings.NewReader("some input")
    	var out bytes.Buffer
    	cmd.Stdout = &out
    	log.Println(out)
    	log.Println("befor cmd.Run")
    	ch <- "hello"
    	err := cmd.Run()
    	if err != nil {
    		log.Fatal(err)
    	}
    }
    
    type Process struct {
    	pid int
    	cpu float64
    }
    
    func CalcStressCpu(wg *sync.WaitGroup, ch chan string) {
    	log.Println("entry CalcStress")
    	wg.Add(1)
    	defer wg.Done()
    	log.Println(<-ch)
    	time.Sleep(5 * time.Second)
    	cmd := exec.Command("ps", "aux")
    	var out bytes.Buffer
    	cmd.Stdout = &out
    	err := cmd.Run()
    	if err != nil {
    		log.Fatal(err)
    	}
    	processes := make([]*Process, 0)
    	for {
    		line, err := out.ReadString('
    ')
    		if err != nil {
    			break
    		}
    
    		if !strings.Contains(line, "stress -c") || !strings.Contains(line, "R+") {
    			continue
    		}
    		log.Println("line====>", line)
    
    		tokens := strings.Split(line, " ")
    
    		log.Println("tokens begin")
    		log.Println("tokens====>", tokens)
    		log.Println("tokens end")
    		ft := make([]string, 0)
    		for _, t := range tokens {
    			if t != "" && t != "	" {
    				ft = append(ft, t)
    			}
    		}
    
    		pid, err := strconv.Atoi(ft[1])
    		if err != nil {
    			continue
    		}
    		cpu, err := strconv.ParseFloat(ft[2], 64)
    		if err != nil {
    			log.Fatal(err)
    		}
    		processes = append(processes, &Process{pid, cpu})
    	}
    
    	data := stat.Float64Slice{}
    	for _, p := range processes {
    		log.Println("Process ", p.pid, " takes ", p.cpu, " % of the CPU")
    		data = append(data, p.cpu)
    	}
    
    	variance := stat.Variance(&data)
    	fmt.Printf("The estimated variance is %.4f", variance)
    
    }
    
    func main() {
    	wg := &sync.WaitGroup{}
    	ch1 := make(chan string)
    	go CalcStressCpu(wg, ch1)
    	go RunStress(wg, ch1)
    	wg.Wait()
    	time.Sleep(10 * time.Second)
    
    }
    

    ...一个正确点的demo

    package main
    
    import (
    	"bytes"
    	"fmt"
    	"log"
    	"os/exec"
    	"strconv"
    	"strings"
    	"sync"
    	"time"
    
    	"github.com/grd/stat"
    )
    
    /*
    RunStress no comment
    */
    func RunStress(wg *sync.WaitGroup, ch chan string) {
    	defer wg.Done()
    
    	//kill stress for init
    	cmd_stop := exec.Command("pkill", "stress")
    	cmd_stop.Run()
    
    	//start stress
    	cmd := exec.Command("stress", "-c", "4")
    
    	//start stress and notify the calc start
    	ch <- "hello"
    	cmd.Run()
    	// if err != nil {
    	// 	log.Println("recieved signal")
    	// 	// log.Fatal(err)
    	// }
    }
    
    type Process struct {
    	pid int
    	cpu float64
    }
    
    func CalcStressCpu(wg *sync.WaitGroup, ch chan string) {
    	// log.Println("entry calc stress")
    	defer wg.Done()
    
    	//make sure the calc after "start stress"
    	<-ch
    
    	time.Sleep(5 * time.Second)
    	cmd := exec.Command("ps", "aux")
    	var out bytes.Buffer
    	cmd.Stdout = &out
    	err := cmd.Run()
    	if err != nil {
    		log.Fatal(err)
    	}
    	processes := make([]*Process, 0)
    	for {
    		line, err := out.ReadString('
    ')
    		if err != nil {
    			break
    		}
    
    		if !strings.Contains(line, "stress -c") || !strings.Contains(line, "R+") {
    			continue
    		}
    
    		tokens := strings.Split(line, " ")
    
    		ft := make([]string, 0)
    		for _, t := range tokens {
    			if t != "" && t != "	" {
    				ft = append(ft, t)
    			}
    		}
    
    		pid, err := strconv.Atoi(ft[1])
    		if err != nil {
    			continue
    		}
    		cpu, err := strconv.ParseFloat(ft[2], 64)
    		if err != nil {
    			log.Fatal(err)
    		}
    		processes = append(processes, &Process{pid, cpu})
    	}
    
    	data := stat.Float64Slice{}
    	for _, p := range processes {
    		// log.Println("Process ", p.pid, " takes ", p.cpu, " % of the CPU")
    		data = append(data, p.cpu)
    	}
    
    	variance := stat.Variance(&data)
    	fmt.Printf("The estimated variance is %.4f", variance)
    
    	//after calc , stop the stress
    	cmd_stop := exec.Command("pkill", "stress")
    	cmd_stop.Run()
    
    }
    
    func main() {
    	wg := &sync.WaitGroup{}
    	ch1 := make(chan string)
    	wg.Add(1)
    	go CalcStressCpu(wg, ch1)
    	wg.Add(1)
    	go RunStress(wg, ch1)
    	wg.Wait()
    
    }
    

      

  • 相关阅读:
    在线pdm查看
    vscode
    idea for Mac 代码提示设置
    定位功能
    canvas刮奖
    jquery生成二维码
    Redux DevTools浏览器插件调试redux
    .gitignore
    HBuilder在MAC下的SVN
    UMD编码规范
  • 原文地址:https://www.cnblogs.com/eiguleo/p/14185966.html
Copyright © 2011-2022 走看看