zoukankan      html  css  js  c++  java
  • channel 并发编程和超时控制

    golang中,通过关键字go可以创建goroutine,一个函数可以被创建多个goroutine,一个goroutine必定对应一个函数

    go创建goroutine与channel返回数据

    为一个普通函数创建goroutine的写法:

    go 函数名(参数列表)
    

    使用go函数是,被调用函数的返回值会被忽略,因此如果需要在goroutine中返回数据,往往需要结合channel使用,通过通道将数据从goroutine中作为返回值传出

    例子:

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func task(task_id int, sleeptime int, resCh chan string) {
    	time.Sleep(time.Second * time.Duration(sleeptime))
    	resCh <- fmt.Sprintf("任务序号:%d ,执行完成", task_id)
    	return
    }
    
    func main() {
    	startTime := time.Now()
    	fmt.Println("子goroutine创建:")
    	// 假设有10个任务需要执行,每个任务需要执行,并发执行
    	inputs := []int{2, 3, 4, 2, 1, 2, 3, 2, 5, 2}
    	resCh := make(chan string, len(inputs)) // 任务数量
    
    	for i, sleeptime := range inputs {
    		go task(i, sleeptime, resCh)
    	}
    
    	for range inputs {
    		fmt.Println(<-resCh)
    	}
    
    	endTime := time.Now()
    	fmt.Printf("子goroutine运行结束,耗时 %s. 任务数量: %d", endTime.Sub(startTime), len(inputs))
    }
    

    输出结果:

    子goroutine创建:
    任务序号:4 ,执行完成
    任务序号:3 ,执行完成
    任务序号:0 ,执行完成
    任务序号:9 ,执行完成
    任务序号:5 ,执行完成
    任务序号:7 ,执行完成
    任务序号:1 ,执行完成
    任务序号:6 ,执行完成
    任务序号:2 ,执行完成
    任务序号:8 ,执行完成
    子goroutine运行结束,耗时 5.014132s. 任务数量: 10
    

    这里有10个任务,同时开启10个goroutine执行,耗时为5秒(受限为最长耗时的一个任务的执行时间)

    channel控制并发度(blocking特性)

    但是这里可以看到实际开发中存在一个问题,假如任务数量为10000或者更多,同时开启1w个goroutine是不太合理的,可能会占用过多资源,因此我们需要控制并发度。
    这里可以利用channel的缓冲机制,当缓冲满了,goroutine就会自动阻塞,直到channel数据可以被读取为止。

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func task(task_id int, sleeptime int, resCh chan string) {
    	time.Sleep(time.Second * time.Duration(sleeptime))
    	resCh <- fmt.Sprintf("任务序号:%d ,执行完成", task_id)
    	return
    }
    
    func main() {
    	startTime := time.Now()
    	fmt.Println("子goroutine创建:")
    	// 假设有10个任务需要执行,每个任务需要执行,并发执行
    	inputs := []int{2, 3, 4, 2, 1, 2, 3, 2, 5, 2}
    	resCh := make(chan string, len(inputs))
    
    	limitCh := make(chan bool, 2) // 并发度为2
    	limitFunc := func(limitCh chan bool, task_id int, sleeptime int, resCh chan string) {
    		task(task_id, sleeptime, resCh)
    		<-limitCh
    	}
    
    	// 限制并发度的关键在于,开启多个任务时,往channel(limitCh)缓冲写入数据,任务执行(子goroutine)完成时读出数据,当channel缓冲满时不能读出就会阻塞任务的执行
    	for i, sleeptime := range inputs {
    		limitCh <- true
    		go limitFunc(limitCh, i, sleeptime, resCh)
    	}
    
    	for range inputs {
    		fmt.Println(<-resCh)
    	}
    
    	endTime := time.Now()
    	fmt.Printf("子goroutine运行结束,耗时 %s. 任务数量: %d", endTime.Sub(startTime), len(inputs))
    }
    

    当limitCh的缓冲大小设置为2时,结果为:子goroutine运行结束,耗时 14.0578327s. 任务数量: 10
    当limitCh的缓冲大小设置为1时,结果为:子goroutine运行结束,耗时 26.0864911s. 任务数量: 10

    使用select控制超时

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func Run(task_id int, sleeptime int, resCh chan string) {
    	runCh := make(chan string)
    	go task(task_id, sleeptime, runCh)
    	select {
    	case re := <-runCh:
    		// 任务执行正常
    		resCh <- re
    	case <-time.After(3 * time.Second): // 这里定义3秒超时,也就是说不管任务的sleeptime多长,最久3秒返回,避免长时间阻塞
    		re := fmt.Sprintf("任务序号:%d ,执行超时", task_id)
    		resCh <- re
    	}
    }
    
    func task(task_id int, sleeptime int, resCh chan string) {
    	time.Sleep(time.Second * time.Duration(sleeptime))
    	//fmt.Println("正在执行任务:", task_id)
    	resCh <- fmt.Sprintf("任务序号:%d ,执行完成", task_id)
    	return
    }
    
    func main() {
    	startTime := time.Now()
    	fmt.Println("子goroutine创建:")
    	// 假设有10个任务需要执行,每个任务需要执行,并发执行
    	inputs := []int{2, 3, 4, 2, 1, 2, 3, 2, 5, 2}
    	resCh := make(chan string, len(inputs))
    
    	limitCh := make(chan bool, 2) // 并发度为2
    	limitFunc := func(limitCh chan bool, task_id int, sleeptime int, resCh chan string) {
    		Run(task_id, sleeptime, resCh)
    		<-limitCh
    	}
    
    	// 限制并发度的关键在于,开启多个任务时,往channel(limitCh)缓冲写入数据,任务执行(子goroutine)完成时读出数据,当channel缓冲满时不能读出就会阻塞任务的执行
    	for i, sleeptime := range inputs {
    		limitCh <- true
    		go limitFunc(limitCh, i, sleeptime, resCh)
    	}
    
    	for range inputs {
    		fmt.Println(<-resCh)
    	}
    
    	endTime := time.Now()
    	fmt.Printf("子goroutine运行结束,耗时 %s. 任务数量: %d", endTime.Sub(startTime), len(inputs))
    }
    
    执行结果:
    子goroutine创建:
    任务序号:0 ,执行完成
    任务序号:1 ,执行超时
    任务序号:3 ,执行完成
    任务序号:2 ,执行超时
    任务序号:4 ,执行完成
    任务序号:5 ,执行完成
    任务序号:6 ,执行完成
    任务序号:7 ,执行完成
    任务序号:9 ,执行完成
    任务序号:8 ,执行超时
    子goroutine运行结束,耗时 12.0359419s. 任务数量: 10
    
  • 相关阅读:
    好吧,左小波出山了——ie8兼容indexOf问题
    jmeter负载机运行/添加压力机/分布式
    jmeter操作数据库
    Charles手机抓包设置&无法打开火狐网页设置
    python学习-Day1-接口测试
    动态SQL
    MyBatis缓存
    正则表达式
    MyBatis配置文件的配置说明
    几种数据源的配置
  • 原文地址:https://www.cnblogs.com/chq3272991/p/15390772.html
Copyright © 2011-2022 走看看