zoukankan      html  css  js  c++  java
  • scheduling algorithm

      1 from __future__ import division
      2 
      3 from bisect import insort
      4 from collections import deque, OrderedDict
      5 from functools import reduce
      6 from heapq import heapify, heappop, heappush
      7 from math import ceil
      8 
      9 from blist import blist, sorteddict, sortedlist
     10 
     11 def intceil(x):  # superfluous in Python 3, ceil is sufficient
     12     return int(ceil(x))
     13 
     14 class Scheduler:
     15     def next_internal_event(self):
     16         return None
     17 
     18 
     19 class PS(Scheduler):
     20     def __init__(self):
     21         self.running = set()
     22 
     23     def enqueue(self, t, jobid, size):
     24         self.running.add(jobid)
     25 
     26     def dequeue(self, t, jobid):
     27         try:
     28             self.running.remove(jobid)
     29         except KeyError:
     30             raise ValueError("dequeuing missing job")
     31 
     32     def schedule(self, t):
     33         running = self.running
     34         if running:
     35             share = 1 / len(running)
     36             return {jobid: share for jobid in running}
     37         else:
     38             return {}
     39 
     40 class GPS(Scheduler):
     41     def __init__(self):
     42         self.running = {}
     43 
     44     def enqueue(self, t, jobid, size, priority=1):
     45         self.running[jobid] = priority
     46 
     47     def dequeue(self, t, jobid):
     48         try:
     49             del self.running[jobid]
     50         except KeyError:
     51             raise ValueError("dequeuing missing job")
     52 
     53     def schedule(self, t):
     54         running = self.running
     55         if running:
     56             share = 1 / sum(running.values())
     57             return {jobid: weight * share for jobid, weight in running.items()}
     58         else:
     59             return {}
     60 
     61 class FIFO(Scheduler):
     62     def __init__(self): 
     63        self.jobs = deque()
     64 
     65     def enqueue(self, t, jobid, size):
     66         self.jobs.append(jobid)
     67 
     68     def dequeue(self, t, jobid):
     69         try:
     70             self.jobs.remove(jobid)
     71         except ValueError:
     72             raise ValueError("dequeuing missing job")
     73 
     74     def schedule(self, t):
     75         jobs = self.jobs
     76         if jobs:
     77             return {jobs[0]: 1}
     78         else:
     79             return {}
     80 
     81 
     82 class SRPT(Scheduler):
     83     def __init__(self):
     84         self.jobs = []
     85         self.last_t = 0
     86 
     87     def update(self, t):
     88         delta = t - self.last_t
     89         if delta == 0:
     90             return
     91         jobs = self.jobs
     92         if jobs:
     93             jobs[0][0] -= delta
     94         self.last_t = t
     95 
     96     def enqueue(self, t, jobid, job_size):
     97         self.update(t)
     98         heappush(self.jobs, [job_size, jobid])
     99 
    100     def dequeue(self, t, jobid):
    101         jobs = self.jobs
    102         self.update(t)
    103         # common case: we dequeue the running job
    104         if jobid == jobs[0][1]:
    105             heappop(jobs)
    106             return
    107         # we still care if we dequeue a job not running (O(n)) --
    108         # could be made more efficient, but still O(n) because of the
    109         # search, by exploiting heap properties (i.e., local heappop)
    110         try:
    111             idx = next(i for i, v in jobs if v[1] == jobid)
    112         except StopIteration:
    113             raise ValueError("dequeuing missing job")
    114         jobs[idx], jobs[-1] = jobs[-1], jobs[idx]
    115         jobs.pop()
    116         heapify(jobs)
    117 
    118     def schedule(self, t):
    119         self.update(t)
    120         jobs = self.jobs
    121         if jobs:
    122             return {jobs[0][1]: 1}
    123         else:
    124             return {}
    125 
    126 
    127 class SRPT_plus_PS(Scheduler):
    128 
    129     def __init__(self, eps=1e-6):
    130         self.jobs = []
    131         self.last_t = 0
    132         self.late = set()
    133         self.eps = eps
    134 
    135     def update(self, t):
    136         delta = t - self.last_t
    137         jobs = self.jobs
    138         delta /= 1 + len(self.late)  # key difference with SRPT #1
    139         if jobs:
    140             jobs[0][0] -= delta
    141         while jobs and jobs[0][0] < self.eps:
    142             _, jobid = heappop(jobs)
    143             self.late.add(jobid)
    144         self.last_t = t
    145 
    146     def next_internal_event(self):
    147         jobs = self.jobs
    148         if not jobs:
    149             return None
    150         return jobs[0][0] * (1 + len(self.late))
    151 
    152     def schedule(self, t):
    153         self.update(t)
    154         jobs = self.jobs
    155         late = self.late
    156         scheduled = late.copy()  # key difference with SRPT #2
    157         if jobs:
    158             scheduled.add(jobs[0][1])
    159         if not scheduled:
    160             return {}
    161         share = 1 / len(scheduled)
    162         return {jobid: share for jobid in scheduled}
    163 
    164     def enqueue(self, t, jobid, job_size):
    165         self.update(t)
    166         heappush(self.jobs, [job_size, jobid])
    167 
    168     def dequeue(self, t, jobid):
    169         self.update(t)
    170         late = self.late
    171         if jobid in late:
    172             late.remove(jobid)
    173             return
    174         # common case: we dequeue the running job
    175         jobs = self.jobs
    176         if jobid == jobs[0][1]:
    177             heappop(jobs)
    178             return
    179         # we still care if we dequeue a job not running (O(n)) --
    180         # could be made more efficient, but still O(n) because of the
    181         # search, by exploiting heap properties (i.e., local heappop)
    182         try:
    183             idx = next(i for i, v in jobs if v[1] == jobid)
    184         except StopIteration:
    185             raise ValueError("dequeuing missing job")
    186         jobs[idx], jobs[-1] = jobs[-1], jobs[idx]
    187         jobs.pop()
    188         heapify(jobs)
    189 
    190 
    191 class FSP(Scheduler):
    192 
    193     def __init__(self, eps=1e-6):
    194 
    195         # [remaining, jobid] queue for the *virtual* scheduler
    196         self.queue = blist()
    197 
    198         # Jobs that should have finished in the virtual time,
    199         # but didn't in the real (happens only in case of estimation
    200         # errors)
    201         # Keys are jobids (ordered by the time they became late), values are
    202         # not significant.
    203         self.late = OrderedDict()
    204 
    205         # last time we run the schedule function
    206         self.last_t = 0
    207 
    208         # Jobs that are running in the real time
    209         self.running = set()
    210 
    211         # Jobs that have less than eps work to do are considered done
    212         # (deals with floating point imprecision)
    213         self.eps = eps
    214 
    215     def enqueue(self, t, jobid, size):
    216         self.update(t)  # needed to age only existing jobs in the virtual queue
    217         insort(self.queue, [size, jobid])
    218         self.running.add(jobid)
    219 
    220     def dequeue(self, t, jobid):
    221         # the job remains in the virtual scheduler!
    222         self.running.remove(jobid)
    223 
    224         late = self.late
    225         if jobid in late:
    226             late.pop(jobid)
    227 
    228     def update(self, t):
    229 
    230         delta = t - self.last_t
    231 
    232         queue = self.queue
    233 
    234         if queue:
    235             running = self.running
    236             late = self.late
    237             eps = self.eps
    238             fair_share = delta / len(queue)
    239             fair_plus_eps = fair_share + eps
    240 
    241             # jobs in queue[:idx] are done in the virtual scheduler
    242             idx = 0
    243             for vrem, jobid in queue:
    244                 if vrem > fair_plus_eps:
    245                     break
    246                 idx += 1
    247                 if jobid in running:
    248                     late[jobid] = True
    249             if idx:
    250                 del queue[:idx]
    251 
    252             if fair_share > 0:
    253                 for vrem_jobid in queue:
    254                     vrem_jobid[0] -= fair_share
    255 
    256         self.last_t = t
    257 
    258     def schedule(self, t):
    259 
    260         self.update(t)
    261 
    262         late = self.late
    263         if late:
    264             return {next(iter(late)): 1}
    265 
    266         running = self.running
    267         if not running:
    268             return {}
    269 
    270         jobid = next(jobid for _, jobid in self.queue if jobid in running)
    271         return {jobid: 1}
    272 
    273     def next_internal_event(self):
    274 
    275         queue = self.queue
    276 
    277         if not queue:
    278             return None
    279 
    280         return queue[0][0] * len(queue)
    281 
    282 
    283 class FSP_plus_PS(FSP):
    284 
    285     def __init__(self, *args, **kwargs):
    286 
    287         FSP.__init__(self, *args, **kwargs)
    288         self.late = dict(self.late)  # we don't need the order anymore!
    289 
    290     def schedule(self, t):
    291 
    292         self.update(t)
    293 
    294         late = self.late
    295         if late:
    296             share = 1 / len(late)
    297             return {jobid: share for jobid in late}
    298 
    299         running = self.running
    300         if not running:
    301             return {}
    302 
    303         jobid = next(jobid for _, jobid in self.queue if jobid in running)
    304         return {jobid: 1}
    305 
    306 
    307 class FSPE_PS_DC(FSP_plus_PS):
    308 
    309     def schedule(self, t):
    310 
    311         self.update(t)
    312         queue = self.queue
    313         running = self.running
    314         scheduled = set(self.late)
    315         try:
    316             scheduled.add(next(jobid for _, jobid in self.queue
    317                                if jobid in running))
    318         except StopIteration:
    319             pass
    320         if scheduled:
    321             share = 1 / len(scheduled)
    322             return {jobid: share for jobid in scheduled}
    323         else:
    324             return {}
    325 
    326     
    327 class LAS(Scheduler):
    328 
    329     def __init__(self, eps=1e-6):
    330 
    331         # job attained service is represented as (real attained service // eps)
    332         # (not perfectly precise but avoids problems with floats)
    333         self.eps = eps
    334 
    335         # sorted dictionary for {attained: {jobid}}
    336         self.queue = sorteddict()
    337 
    338         # {jobid: attained} dictionary
    339         self.attained = {}
    340 
    341         # result of the last time the schedule() method was called
    342         # grouped by {attained: [service, {jobid}]}
    343         self.scheduled = {}
    344         # This is the entry point for doing XXX + LAS schedulers:
    345         # it's sufficient to touch here
    346 
    347         # last time when the schedule was changed
    348         self.last_t = 0
    349 
    350     def enqueue(self, t, jobid, size):
    351 
    352         self.queue.setdefault(0, set()).add(jobid)
    353         self.attained[jobid] = 0
    354 
    355     def dequeue(self, t, jobid):
    356 
    357         att = self.attained.pop(jobid)
    358         q = self.queue[att]
    359         if len(q) == 1:
    360             del self.queue[att]
    361         else:
    362             q.remove(jobid)
    363 
    364     def update(self, t):
    365 
    366         delta = intceil((t - self.last_t) / self.eps)
    367         queue = self.queue
    368         attained = self.attained
    369         set_att = set(attained)
    370 
    371         for att, sub_schedule in self.scheduled.items():
    372 
    373             jobids = reduce(set.union, (jobids for _, jobids in sub_schedule))
    374 
    375             # remove jobs from queue
    376 
    377             try:
    378                 q_att = queue[att]
    379             except KeyError:
    380                 pass  # all jobids have terminated
    381             else:
    382                 q_att -= jobids
    383                 if not q_att:
    384                     del queue[att]
    385 
    386             # recompute attained values, re-put in queue,
    387             # and update values in attained
    388 
    389             for service, jobids in sub_schedule:
    390 
    391                 jobids &= set_att  # exclude completed jobs
    392                 if not jobids:
    393                     continue
    394                 new_att = att + intceil(service * delta)
    395 
    396                 # let's coalesce pieces of work differing only by eps, to avoid
    397                 # rounding errors
    398                 attvals = [new_att, new_att - 1, new_att + 1]
    399                 try:
    400                     new_att = next(v for v in attvals if v in queue)
    401                 except StopIteration:
    402                     pass
    403 
    404                 queue.setdefault(new_att, set()).update(jobids)
    405                 for jobid in jobids:
    406                     attained[jobid] = new_att
    407         self.last_t = t
    408 
    409     def schedule(self, t):
    410 
    411         self.update(t)
    412 
    413         try:
    414             attained, jobids = self.queue.items()[0]
    415         except IndexError:
    416             service = 0
    417             jobids = set()
    418             self.scheduled = {}
    419         else:
    420             service = 1 / len(jobids)
    421             self.scheduled = {attained: [(service, jobids.copy())]}
    422             
    423         return {jobid: service for jobid in jobids}
    424 
    425     def next_internal_event(self):
    426 
    427         queue = self.queue
    428 
    429         if len(queue) >= 2:
    430             qitems = queue.items()
    431             running_attained, running_jobs = qitems[0]
    432             waiting_attained, _ = qitems[1]
    433             diff = waiting_attained - running_attained
    434             return diff * len(running_jobs) * self.eps
    435         else:
    436             return None
    437 
    438 
    439 class SRPT_plus_LAS(Scheduler):
    440 
    441     def __init__(self, eps=1e-6):
    442 
    443         # job that should have finished, but didn't
    444         # (because of estimation errors)
    445         self.late = set()
    446 
    447         # [remaining, jobid] heap for the SRPT scheduler
    448         self.queue = sortedlist()
    449 
    450         # last time we run the update function
    451         self.last_t = 0
    452 
    453         # Jobs that have less than eps work to do are considered done
    454         # (deals with floating point imprecision)
    455         self.eps = eps
    456 
    457         # {jobid: att} where att is jobid's attained service
    458         self.attained = {}
    459 
    460         # queue for late jobs, sorted by attained service
    461         self.late_queue = sorteddict()
    462 
    463         # last result of calling the schedule function
    464         self.scheduled = {}
    465 
    466     def enqueue(self, t, jobid, size):
    467 
    468         size_int = intceil(size / self.eps)
    469         self.queue.add([size_int, jobid])
    470         self.attained[jobid] = 0
    471 
    472     def dequeue(self, t, jobid):
    473 
    474         att = self.attained.pop(jobid)
    475         late = self.late
    476         if jobid in late:
    477             late_queue = self.late_queue
    478             late.remove(jobid)
    479             latt = late_queue[att]
    480             if len(latt) == 1:
    481                 del late_queue[att]
    482             else:
    483                 latt.remove(jobid)
    484         else:
    485             queue = self.queue
    486             for i, (_, jid) in enumerate(queue):
    487                 if jid == jobid:
    488                     del queue[i]
    489                     break
    490 
    491     def update(self, t):
    492 
    493         attained = self.attained
    494         eps = self.eps
    495         late = self.late
    496         late_queue = self.late_queue
    497         queue = self.queue
    498         scheduled = self.scheduled
    499 
    500         delta = intceil((t - self.last_t) / eps)
    501 
    502         # Real attained service
    503 
    504         def qinsert(jobid, att):
    505             # coalesce pieces of work differing only by eps to avoid rounding
    506             # errors -- return the chosen attained work value
    507             attvals = [att, att - 1, att + 1]
    508             try:
    509                 att = next(v for v in attvals if v in late_queue)
    510             except StopIteration:
    511                 late_queue[att] = {jobid}
    512             else:
    513                 late_queue[att].add(jobid)
    514             return att
    515 
    516         for jobid, service in scheduled.items():
    517             try:
    518                 old_att = self.attained[jobid]
    519             except KeyError: # dequeued job
    520                 continue
    521             work = intceil(delta * service)
    522             new_att = old_att + work
    523             if jobid in self.late:
    524                 l_old = late_queue[old_att]
    525                 l_old.remove(jobid)
    526                 if not l_old:
    527                     del late_queue[old_att]
    528                 new_att = qinsert(jobid, new_att)
    529             else:
    530                 idx, rem = next((i, r) for i, (r, jid)
    531                                 in enumerate(queue)
    532                                 if jid == jobid)
    533                 new_rem = rem - work
    534                 if new_rem <= 0:
    535                     del queue[idx]
    536                     late.add(jobid)
    537                     new_att = qinsert(jobid, new_att)
    538                 else:
    539                     if idx == 0:
    540                         queue[0][0] = new_rem
    541                     else:
    542                         del queue[idx]
    543                         queue.add([new_rem, jobid])
    544             attained[jobid] = new_att
    545 
    546         self.last_t = t
    547 
    548     def schedule(self, t):
    549 
    550         self.update(t)
    551 
    552         queue = self.queue
    553         late_queue = self.late_queue
    554 
    555         jobs = {queue[0][1]} if queue else set()
    556         if late_queue:
    557             jobs.update(next(iter(late_queue.values())))
    558 
    559         if jobs:
    560             service = 1 / len(jobs)
    561             res = {jobid: service for jobid in jobs}
    562         else:
    563             res = {}
    564 
    565         self.scheduled = res
    566         return res
    567 
    568     def next_internal_event(self):
    569 
    570         queue = self.queue
    571 
    572         if queue:
    573             return queue[0][0] * (len(self.late) + 1) * self.eps
    574 
    575         
    576 class FSP_plus_LAS(Scheduler):
    577 
    578     def __init__(self, eps=1e-6):
    579 
    580         # [remaining, jobid] queue for the *virtual* scheduler
    581         self.queue = blist()
    582 
    583         # Jobs that should have finished in the virtual time,
    584         # but didn't in the real (happens only in case of estimation
    585         # errors)
    586         self.late = set()
    587 
    588         # last time we run the schedule function
    589         self.last_t = 0
    590 
    591         # Jobs that are running in the real time
    592         self.running = set()
    593 
    594         # Jobs that have less than eps work to do are considered done
    595         # (deals with floating point imprecision)
    596         self.eps = eps
    597 
    598         # queue for late jobs, sorted by attained service
    599         self.late_queue = sorteddict()
    600 
    601         # {jobid: att} where att is jobid's attained service
    602         self.attained = {}
    603 
    604         # last result of calling the schedule function
    605         self.scheduled = {}
    606 
    607     def enqueue(self, t, jobid, size):
    608 
    609         self.update(t)  # needed to age only existing jobs in the virtual queue
    610         insort(self.queue, [intceil(size / self.eps), jobid])
    611         self.running.add(jobid)
    612         self.attained[jobid] = 0
    613 
    614     def dequeue(self, t, jobid):
    615 
    616         late = self.late
    617 
    618         self.running.remove(jobid)
    619         if jobid in late:
    620             late_queue = self.late_queue
    621             late.remove(jobid)
    622             att = self.attained[jobid]
    623             latt = late_queue[att]
    624             if len(latt) == 1:
    625                 del late_queue[att]
    626             else:
    627                 latt.remove(jobid)
    628 
    629     def update(self, t):
    630 
    631         attained = self.attained
    632         eps = self.eps
    633         late = self.late
    634         late_queue = self.late_queue
    635         queue = self.queue
    636 
    637         delta = intceil((t - self.last_t) / eps)
    638 
    639         # Real attained service
    640 
    641         def qinsert(jobid, att):
    642             # coalesce pieces of work differing only by eps to avoid rounding
    643             # errors -- return the chosen attained work value
    644             attvals = [att, att - 1, att + 1]
    645             try:
    646                 att = next(v for v in attvals if v in late_queue)
    647             except StopIteration:
    648                 late_queue[att] = {jobid}
    649             else:
    650                 late_queue[att].add(jobid)
    651             return att
    652 
    653         if delta:
    654             for jobid, service in self.scheduled.items():
    655                 if jobid in self.late:
    656                     old_att = self.attained[jobid]
    657                     l_old = late_queue[old_att]
    658                     l_old.remove(jobid)
    659                     if not l_old:
    660                         del late_queue[old_att]
    661                     new_att = old_att + intceil(delta * service)
    662                     new_att = qinsert(jobid, new_att)
    663                     attained[jobid] = new_att
    664                 else:
    665                     attained[jobid] += intceil(delta * service)
    666 
    667         # Virtual scheduler
    668 
    669         if queue:
    670             running = self.running
    671             fair_share = intceil(delta / len(queue))
    672             fair_plus_eps = fair_share + 1
    673 
    674             # jobs in queue[:idx] are done in the virtual scheduler
    675             idx = 0
    676             for vrem, jobid in queue:
    677                 if vrem > fair_plus_eps:
    678                     break
    679                 idx += 1
    680                 if jobid in running:
    681                     late.add(jobid)
    682                     attained[jobid] = qinsert(jobid, attained[jobid])
    683 
    684             if idx:
    685                 del queue[:idx]
    686 
    687             if fair_share > 0:
    688                 for vrem_jobid in queue:
    689                     vrem_jobid[0] -= fair_share
    690 
    691         self.last_t = t
    692 
    693     def schedule(self, t):
    694 
    695         self.update(t)
    696 
    697         late_queue = self.late_queue
    698         running = self.running
    699 
    700         if late_queue:
    701             jobs = next(iter(late_queue.values()))
    702             service = 1 / len(jobs)
    703             res = {jobid: service for jobid in jobs}
    704         elif not running:
    705             res = {}
    706         else:
    707             jobid = next(jobid for _, jobid in self.queue if jobid in running)
    708             res = {jobid: 1}
    709         self.scheduled = res
    710 
    711         return res
    712 
    713     def next_internal_event(self):
    714 
    715         eps = self.eps
    716         late_queue = self.late_queue
    717         queue = self.queue
    718 
    719         res = None
    720 
    721         if queue:
    722             # time at which a job becomes late
    723             res = queue[0][0] * len(queue) * eps
    724         if len(late_queue) >= 2:
    725             # time at which scheduled late jobs reach the service of
    726             # others
    727             qitems = late_queue.items()
    728             running_attained, running_jobs = qitems[0]
    729             waiting_attained, _ = qitems[1]
    730             diff = waiting_attained - running_attained
    731             delta = diff * len(running_jobs) * eps
    732             if not res or res > delta:
    733                 res = delta
    734         return res
    735 
    736     
    737 class PSBS(Scheduler):
    738     
    739     def __init__(self, eps=1e-6):
    740         # heap of (gtime, jobid, weight) for the virtual time
    741         self.queue = []
    742 
    743         # heap of (gtime, jobid, weight) for jobs that are in the virtual
    744         # time, done in the real time and were at the head of
    745         # self.queue
    746         self.early = []
    747 
    748         # time allotted to the "ghost job"
    749         self.gtime = 0
    750 
    751         # {jobid: weight} for jobs that are finished in the virtual
    752         # time, but not in the real (happens only in case of
    753         # estimation errors)
    754         self.late = {}
    755 
    756         # last time we run the update method
    757         self.last_t = 0
    758 
    759         # Jobs that are running in the real time
    760         self.running = set()
    761 
    762         # Jobs that have less than eps work to do in virtual time are
    763         # considered done (deals with floating point imprecision)
    764         self.eps = eps
    765 
    766         # equivalent to sum(late.values())
    767         self.late_w = 0
    768 
    769         # equivalent to sum(w for _, _, w in queue + early)
    770         self.virtual_w = 0
    771 
    772     def enqueue(self, t, jobid, size, w=1):
    773 
    774         if w <= 0:
    775             raise ValueError("w needs to be positive")
    776         
    777         self.update(t) # we need to age only existing jobs in the virtual queue
    778         heappush(self.queue, (self.gtime + size / w, jobid, w))
    779         self.virtual_w += w
    780         self.running.add(jobid)
    781 
    782     def dequeue(self, t, jobid):
    783         # job remains in the virtual time!
    784         self.running.remove(jobid)
    785         late = self.late
    786         if jobid in self.late:
    787             self.late_w -= late[jobid]
    788             del late[jobid]
    789 
    790     def update(self, t):
    791         
    792         delta = t - self.last_t
    793 
    794         virtual_w = self.virtual_w
    795 
    796         if self.virtual_w > 0:
    797             queue = self.queue
    798             early = self.early
    799             running = self.running
    800             late = self.late
    801             
    802             fair_share = delta / virtual_w
    803 
    804             self.gtime = gtime = self.gtime + fair_share
    805             gtime_plus_eps = gtime + self.eps
    806 
    807             # deal with jobs that are done in the virtual scheduler
    808             while queue and queue[0][0] < gtime_plus_eps:
    809                 _, jobid, w = heappop(queue)
    810                 self.virtual_w -= w
    811                 if jobid in running:
    812                     late[jobid] = w
    813                     self.late_w += w
    814             while early and early[0][0] < gtime_plus_eps:
    815                 _, _, w = heappop(early)
    816                 self.virtual_w -= w
    817 
    818             # reset to avoid precision erros due to floating point
    819             if not queue and not early:
    820                 self.virtual_w = 0
    821             else:
    822                 assert self.virtual_w > 0
    823             
    824         self.last_t = t
    825 
    826     def schedule(self, t):
    827 
    828         self.update(t)
    829 
    830         late = self.late
    831         if late:
    832             late_w = self.late_w
    833             return {jobid: w / late_w for jobid, w in late.items()}
    834 
    835         running = self.running
    836         if not running:
    837             return {}
    838 
    839         queue = self.queue
    840         early = self.early
    841         while queue[0][1] not in running:
    842             heappush(early, heappop(queue))
    843         return {queue[0][1]: 1}
    844 
    845     def next_internal_event(self):
    846 
    847         virtual_w = self.virtual_w
    848         if virtual_w == 0:
    849             return None
    850 
    851         queue = self.queue
    852         early = self.early
    853         if queue:
    854             if early:
    855                 v = min(queue[0][0], early[0][0])
    856             else:
    857                 v = queue[0][0]
    858         else:
    859             v = early[0][0]
    860 
    861         return (v - self.gtime) * self.virtual_w
    862 
    863 WFQE_GPS = PSBS
    864     
    865 class FSPE_PS(WFQE_GPS):
    866     def enqueue(self, t, jobid, size, w=None):
    867         super(FSPE_PS, self).enqueue(t, jobid, size, 1)
    868 
    869 class WSRPTE_GPS(Scheduler):
    870 
    871     def __init__(self, eps=1e-6):
    872         self.jobs = []
    873         self.last_t = 0
    874         self.late = {}
    875         self.late_w = 0
    876         self.eps = eps
    877 
    878     def update(self, t):
    879         delta = t - self.last_t
    880         jobs = self.jobs
    881         if jobs:
    882             rpt_over_w, w, _ = jobs[0]
    883             work = delta / (w + self.late_w)
    884             jobs[0][0] -= work / w
    885             while jobs and jobs[0][0] < self.eps:
    886                 _, w, jobid = heappop(jobs)
    887                 self.late[jobid] = w
    888                 self.late_w += w
    889         self.last_t = t
    890 
    891     def next_internal_event(self):
    892         jobs = self.jobs
    893         if not jobs:
    894             return None
    895         rpt_over_w, w, _ = jobs[0]
    896         return rpt_over_w * w * w / (w + self.late_w) # = rpt * w / (w + late_w)
    897 
    898     def schedule(self, t):
    899         self.update(t)
    900         jobs = self.jobs
    901         tot_w = self.late_w
    902         if jobs:
    903             _, w, jobid = jobs[0]
    904             tot_w += w
    905             schedule = {jobid: w / tot_w}
    906         else:
    907             schedule = {}
    908         for jobid, w in self.late.items():
    909             schedule[jobid] = w / tot_w
    910         return schedule
    911 
    912     def enqueue(self, t, jobid, job_size, w=1):
    913         self.update(t)
    914         heappush(self.jobs, [job_size / w, w, jobid])
    915 
    916     def dequeue(self, t, jobid):
    917         self.update(t)
    918         late = self.late
    919         if jobid in late:
    920             del late[jobid]
    921             return
    922         # common case: we dequeue the running job
    923         jobs = self.jobs
    924         if jobid == jobs[0][2]:
    925             heappop(jobs)
    926             return
    927         # we still care if we dequeue a job not running (O(n)) --
    928         # could be made more efficient, but still O(n) because of the
    929         # search, by exploiting heap properties (i.e., local heappop)
    930         try:
    931             idx = next(i for i, v in jobs if v[2] == jobid)
    932         except StopIteration:
    933             raise ValueError("dequeuing missing job")
    934         jobs[idx], jobs[-1] = jobs[-1], jobs[idx]
    935         jobs.pop()
    936         heapify(jobs)
  • 相关阅读:
    服务器中一个进程kill不掉,如何处理?
    JVM基本概念
    Redis安装以及常见问题
    JVM---类加载器
    lambda表达式
    maven学习(3)pom.xml文件说明以及常用指令
    maven学习(2)仓库和配置
    maven学习(1)下载和安装和初步使用(手动构建项目和自动构建项目)
    JMicro微服务之超时&重试
    JMicro微服务Hello World
  • 原文地址:https://www.cnblogs.com/d3fei/p/4905756.html
Copyright © 2011-2022 走看看