zoukankan      html  css  js  c++  java
  • python3 分布式进程(跨机器)BaseManager(multiprocessing.managers)

    A机器负责发送任务和接受结果:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    #task_master.py
    import random,time,queue
    from multiprocessing.managers import BaseManager
     
    task_queue = queue.Queue()
    result_queue = queue.Queue()
     
    class QueueManager(BaseManager):
        pass
     
    if __name__ == '__main__':
        print("master start.")
        QueueManager.register('get_task_queue',callable = lambda:task_queue)
        QueueManager.register('get_result_queue',callable = lambda:result_queue)
        manager = QueueManager(address = ('10.10.100.11',9833),authkey=b'abc')
        manager.start()
        task = manager.get_task_queue()
        result = manager.get_result_queue()
     
        for in range(10):
            = random.randint(0,1000)
            print('put task %d ...' % n)
            task.put(n)
        print('try get results...')
     
        for in range(10):
            = result.get(timeout = 100)
            print('Result:%s' % r)
        manager.shutdown()
        print('master exit.')

    B机器负责处理任务和发送结果:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    #task_worker.py
    import sys,time,queue
    from multiprocessing.managers import BaseManager
     
    class QueueManager(BaseManager):
        pass
     
    QueueManager.register('get_task_queue')
    QueueManager.register('get_result_queue')
     
    server_addr = '10.10.100.11'
    print('connect to server %s...' % server_addr)
     
    = QueueManager(address=(server_addr,9833),authkey=b'abc')
    m.connect()
     
    task = m.get_task_queue()
    result = m.get_result_queue()
     
    for in range(10):
        try:
            = task.get(timeout = 10)
            print('run task %d * %d' %(n,n))
            = '%d * %d = %d' %(n,n,n*n)
            time.sleep(1)
            result.put(r)
        except Queue.Empty:
            print('task queue is empty')
     
    print('worker exit'

  • 相关阅读:
    rtmp 之 amf
    Codeforces Round #601 (Div. 1)
    Codeforces Round #618 (Div. 1)
    Codeforces Round #694 (Div. 1) BCDE
    AtCoder Regular Contest 106 DEF
    AtCoder Grand Contest 006 BCDEFF
    JavaScript中深拷贝的实现方法
    suiidfadf
    macOs打开时提示:xxx.app已损坏修复教程
    vue通过事件对象获取标签上的属性值
  • 原文地址:https://www.cnblogs.com/ExMan/p/10187599.html
Copyright © 2011-2022 走看看