var wg2 sync.WaitGroup wg2.Add(nums) xc :=0 parallelNum := plt.MaxParallel var waitCount int32 = 0 for i:=0;i<nums*lll;i=i+lll { begin: if i % 30 == 1 { tools.L2 <- msg } if i % 10 == 1 { mm := fmt.Sprintf("子任务%v开始执行",i+1) tools.L2 <- mm } currentParallelNum := atomic.LoadInt32(&plt.currentParallel) if i % 10 == 1 { mm := fmt.Sprintf("当前任务数%v,最大并发数%v",currentParallelNum,parallelNum) tools.L2 <- mm } if currentParallelNum > parallelNum { waitCount++ if waitCount > plt.MaxWaitCount { //等待超过一定次数后,就放开一次并行度 parallelNum = plt.MaxParallel2 mm := fmt.Sprintf("当前等待次数%v超过最大等待次数%v,开始将并行数从%v增加到%v",waitCount,plt.MaxWaitCount,plt.MaxParallel,plt.MaxParallel2) waitCount = 0 //tools.LogLevelByConfigFile(mm,2) //tools.LogTask(mm,2) tools.L2 <- mm }else { if parallelNum != plt.MaxParallel { parallelNum = plt.MaxParallel } } mm := fmt.Sprintf("当前并行度%v超过最大并行度%v,开始等待",currentParallelNum,parallelNum) //tools.LogLevelByConfigFile(mm,2) //tools.LogTask(mm,2) tools.L2 <- mm tools.SleepByMil(100) tools.SleepByRandMil(3000*common.SleepInterval) goto begin } xc++ mm := fmt.Sprintf("第 %v 个协程开始运行",xc) tools.L2 <- mm endIndex := i + lll if endIndex > l { endIndex = l } lstNew := (*sourceDataList)[i:endIndex] go func(lst *[]map[string]string,wg *sync.WaitGroup) { for _,row := range *lst { atomic.AddInt32(&plt.currentParallel,1) rowMap(&row,&plt.CommonStruct) //每行数据该如何处理的函数 atomic.AddInt32(&plt.currentParallel,-1) } wg.Done() }(&lstNew,&wg2) tools.SleepByMil(100) } wg2.Wait()
在for循环中加入一个时间等待,否则的话,GO会在瞬间启动上千个并发,可能会直接把程序打死;
tools.SleepByMil(100)
如果不加这个时间等待,代码中的超过指定并发数开始等待的控制,根据控制不住;