zoukankan      html  css  js  c++  java
  • go语言调度器

    调度器就是将goroutine分配到工作线程中运行
    涉及3种类型的对象:
    G - goroutine
    M - 工作线程即os线程
    P - 处理器,一种用来运行go代码的抽象资源,最大数目不能超过GOMAXPROCS,在运行go代码时需要关联一个M

    全局的运行队列:

    G *runtime·sched.runqhead;
    G *runtime·sched.runqtail;
    int runtime·sched.runqsize;
    

    P结构的主要成员:包含一个本地运行队列

    struct P
    {
        uint32	status;		// one of Pidle/Prunning/...
        uint32	schedtick;	// incremented on every scheduler call
     
        M*	m;		// back-link to associated M (nil if idle)
     
        // Queue of runnable goroutines.
        uint32	runqhead;
        uint32	runqtail;
        G*	runq[256];
    };
    

    schedule函数主要部分代码

    // One round of scheduler: find a runnable goroutine and execute it.
    // Never returns.
    static void
    schedule(void)
    {
        G *gp;
        uint32 tick;
     
        ......
    top:
        ......
        
        gp = nil;
        // Check the global runnable queue once in a while to ensure fairness.
        // Otherwise two goroutines can completely occupy the local runqueue
        // by constantly respawning each other.
        tick = g->m->p->schedtick;
        // This is a fancy way to say tick%61==0,
        // it uses 2 MUL instructions instead of a single DIV and so is faster on modern processors.
        if(tick - (((uint64)tick*0x4325c53fu)>>36)*61 == 0 && runtime·sched.runqsize > 0) {
        	runtime·lock(&runtime·sched.lock);
        	gp = globrunqget(g->m->p, 1);
        	runtime·unlock(&runtime·sched.lock);
        	if(gp)
        		resetspinning();
        }
        if(gp == nil) {
        	gp = runqget(g->m->p);
        	if(gp && g->m->spinning)
        		runtime·throw("schedule: spinning with local work");
        }
        if(gp == nil) {
        	gp = findrunnable();  // blocks until work is available
        	resetspinning();
        }
     
        execute(gp);
    }
    

    调度器在超过一定间隔时间的情况下,为了公平原则,首先会从全局的运行队列获取G
    从本地的运行队列中获取G
    等待新的G进入运行队列

    globrunqget从全局运行队列获取G,同时它还会将一定数量的G转移到P的本地运行队列中.

    runqget从本地运行队列获取G,本地运行队列的实现是无锁的:

    // Get g from local runnable queue.
    // Executed only by the owner P.
    static G*
    runqget(P *p)
    {
        G *gp;
        uint32 t, h;
     
        for(;;) {
        	h = runtime·atomicload(&p->runqhead);  // load-acquire, synchronize with other consumers
        	t = p->runqtail;
        	if(t == h)
        		return nil;
        	gp = p->runq[h%nelem(p->runq)];
        	if(runtime·cas(&p->runqhead, h, h+1))  // cas-release, commits consume
        		return gp;
        }
    }
    

    findrunnable阻塞等待可运行的G

    • 检查本地运行队列

    • 检查全局运行队列

    • 以non-blocking的模式poll network

    • 检查其它P的本地运行队列

    • 如果最后依旧无法在系统内获取到G,那么就以blocking的模式poll network

      // Finds a runnable goroutine to execute.
      // Tries to steal from other P's, get g from global queue, poll network.
      static G*
      findrunnable(void)
      {
      G *gp;
      P *p;
      int32 i;

      top:
      if(runtime·sched.gcwaiting) {
      gcstopm();
      goto top;
      }
      if(runtime·fingwait && runtime·fingwake && (gp = runtime·wakefing()) != nil)
      runtime·ready(gp);
      // local runq
      gp = runqget(g->m->p);
      if(gp)
      return gp;
      // global runq
      if(runtime·sched.runqsize) {
      runtime·lock(&runtime·sched.lock);
      gp = globrunqget(g->m->p, 0);
      runtime·unlock(&runtime·sched.lock);
      if(gp)
      return gp;
      }
      // poll network
      gp = runtime·netpoll(false); // non-blocking
      if(gp) {
      injectglist(gp->schedlink);
      runtime·casgstatus(gp, Gwaiting, Grunnable);
      return gp;
      }
      // If number of spinning M's >= number of busy P's, block.
      // This is necessary to prevent excessive CPU consumption
      // when GOMAXPROCS>>1 but the program parallelism is low.
      if(!g->m->spinning && 2 * runtime·atomicload(&runtime·sched.nmspinning) >= runtime·gomaxprocs - runtime·atomicload(&runtime·sched.npidle)) // TODO: fast atomic
      goto stop;
      if(!g->m->spinning) {
      g->m->spinning = true;
      runtime·xadd(&runtime·sched.nmspinning, 1);
      }
      // random steal from other P's
      for(i = 0; i < 2*runtime·gomaxprocs; i++) {
      if(runtime·sched.gcwaiting)
      goto top;
      p = runtime·allp[runtime·fastrand1()%runtime·gomaxprocs];
      if(p == g->m->p)
      gp = runqget(p);
      else
      gp = runqsteal(g->m->p, p);
      if(gp)
      return gp;
      }
      stop:
      // return P and block
      runtime·lock(&runtime·sched.lock);
      if(runtime·sched.gcwaiting) {
      runtime·unlock(&runtime·sched.lock);
      goto top;
      }
      if(runtime·sched.runqsize) {
      gp = globrunqget(g->m->p, 0);
      runtime·unlock(&runtime·sched.lock);
      return gp;
      }
      p = releasep();
      pidleput(p);
      runtime·unlock(&runtime·sched.lock);
      if(g->m->spinning) {
      g->m->spinning = false;
      runtime·xadd(&runtime·sched.nmspinning, -1);
      }
      // check all runqueues once again
      for(i = 0; i < runtime·gomaxprocs; i++) {
      p = runtime·allp[i];
      if(p && p->runqhead != p->runqtail) {
      runtime·lock(&runtime·sched.lock);
      p = pidleget();
      runtime·unlock(&runtime·sched.lock);
      if(p) {
      acquirep(p);
      goto top;
      }
      break;
      }
      }
      // poll network
      if(runtime·xchg64(&runtime·sched.lastpoll, 0) != 0) {
      if(g->m->p)
      runtime·throw("findrunnable: netpoll with p");
      if(g->m->spinning)
      runtime·throw("findrunnable: netpoll with spinning");
      gp = runtime·netpoll(true); // block until new work is available
      runtime·atomicstore64(&runtime·sched.lastpoll, runtime·nanotime());
      if(gp) {
      runtime·lock(&runtime·sched.lock);
      p = pidleget();
      runtime·unlock(&runtime·sched.lock);
      if(p) {
      acquirep(p);
      injectglist(gp->schedlink);
      runtime·casgstatus(gp, Gwaiting, Grunnable);
      return gp;
      }
      injectglist(gp);
      }
      }
      stopm();
      goto top;
      }

  • 相关阅读:
    金额相关的测试用例
    Python练习题--持续更新
    Python基础--函数
    Python基础--文件操作和集合
    Python基础--数据类型
    Python基础--字典
    分布式理论(七)—— 一致性协议之 ZAB
    分布式理论(六)—— Raft 算法
    分布式理论(五)—— 一致性算法 Paxos
    分布式理论(四)—— 一致性协议之 3PC
  • 原文地址:https://www.cnblogs.com/richmonkey/p/4509658.html
Copyright © 2011-2022 走看看