zoukankan      html  css  js  c++  java
  • GO瞬间并发数控制

        var wg2 sync.WaitGroup
        wg2.Add(nums)
    
        xc :=0
        parallelNum := plt.MaxParallel
        var waitCount int32 = 0
    
        for i:=0;i<nums*lll;i=i+lll  {
    
        begin:
    
            if i % 30 == 1 {
                tools.L2 <- msg
            }
    
            if i % 10 == 1 {
                mm := fmt.Sprintf("子任务%v开始执行",i+1)
                tools.L2 <- mm
            }
    
            currentParallelNum := atomic.LoadInt32(&plt.currentParallel)
            if i % 10 == 1 {
                mm := fmt.Sprintf("当前任务数%v,最大并发数%v",currentParallelNum,parallelNum)
                tools.L2 <- mm
            }
    
    
            if currentParallelNum > parallelNum {
                waitCount++
                if waitCount > plt.MaxWaitCount {    //等待超过一定次数后,就放开一次并行度
                    parallelNum = plt.MaxParallel2
    
                    mm := fmt.Sprintf("当前等待次数%v超过最大等待次数%v,开始将并行数从%v增加到%v",waitCount,plt.MaxWaitCount,plt.MaxParallel,plt.MaxParallel2)
                    waitCount = 0
    
                    //tools.LogLevelByConfigFile(mm,2)
                    //tools.LogTask(mm,2)
                    tools.L2 <- mm
                }else {
                    if parallelNum != plt.MaxParallel {
                        parallelNum = plt.MaxParallel
                    }
                }
    
                mm := fmt.Sprintf("当前并行度%v超过最大并行度%v,开始等待",currentParallelNum,parallelNum)
                //tools.LogLevelByConfigFile(mm,2)
                //tools.LogTask(mm,2)
                tools.L2 <- mm
                tools.SleepByMil(100)
                tools.SleepByRandMil(3000*common.SleepInterval)
                goto begin
            }
    
    
            xc++
            mm := fmt.Sprintf("第 %v 个协程开始运行",xc)
            tools.L2 <- mm
    
            endIndex := i + lll
            if endIndex > l {
                endIndex = l
            }
    
            lstNew := (*sourceDataList)[i:endIndex]
            go func(lst *[]map[string]string,wg *sync.WaitGroup) {
                for _,row :=  range *lst {
                    atomic.AddInt32(&plt.currentParallel,1)
                    rowMap(&row,&plt.CommonStruct)          //每行数据该如何处理的函数
                    atomic.AddInt32(&plt.currentParallel,-1)
                }
                wg.Done()
            }(&lstNew,&wg2)
    
            tools.SleepByMil(100)
        }
        wg2.Wait()

    在for循环中加入一个时间等待,否则的话,GO会在瞬间启动上千个并发,可能会直接把程序打死;

    tools.SleepByMil(100)

     如果不加这个时间等待,代码中的超过指定并发数开始等待的控制,根据控制不住;

  • 相关阅读:
    四、面向对象分析和设计全流程概述
    三、三大核心特征-继承
    二、三大核心特征-多态
    [第三章]一、三大核心特征-封装
    四、抽象类
    三、接口
    二、对象
    [第二章]一、类
    六、面向对象的迷思
    五、面向对象的应用范围
  • 原文地址:https://www.cnblogs.com/perfei/p/14296628.html
Copyright © 2011-2022 走看看