zoukankan      html  css  js  c++  java
  • python BaseManager分布式学习

    如果我们已经有一个通过Queue通信的多进程程序在同一台机器上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程和处理任务的进程分布到两台机器上。怎么用分布式进程实现?原有的Queue可以继续使用,但是,通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了。
    Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。
    首先编写个manager服务器
    # encoding:utf-8

    import random, time, Queue
    from multiprocessing.managers import BaseManager

    # 发送任务的队列
    task_queue = Queue.Queue()
    # 接收结果的队列
    result_queue = Queue.Queue()


    # 使用标准函数来代替lambda函数,避免python2.7中,pickle无法序列化lambda的问题
    def get_task_queue():
    global task_queue
    return task_queue


    # 使用标准函数来代替lambda函数,避免python2.7中,pickle无法序列化lambda的问题
    def get_result_queue():
    global task_queue
    return task_queue


    def startManager(host, port, authkey):
    # 把两个Queue都注册到网络上,callable参数关联了Queue对象,注意回调函数不能使用括号
    BaseManager.register('get_task_queue', callable=get_task_queue)
    BaseManager.register('get_result_queue', callable=get_result_queue)
    # 设置host,绑定端口port,设置验证码为authkey
    manager = BaseManager(address=(host, port), authkey=authkey)
    # 启动manager服务器
    manager.start()
    return manager


    def put_queue(manager):
    # 通过网络访问queueu
    task = manager.get_task_queue()
    while 1:
    n = random.randint(0, 1000)
    print ('Put task %d' % n)
    task.put(n)
    time.sleep(0.5)


    if __name__ == "__main__":
    host = '127.0.0.1'
    port = 5000
    authkey = 'abc'
    # 启动manager服务器
    manager = startManager(host, port, authkey)
    # 给task队列添加数据
    put_queue(manager)
    # 关闭服务器
    manager.shutdown
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    然后编写worker
    # encoding:utf-8

    import random, time, Queue
    from multiprocessing.managers import BaseManager


    def start_worker(host, port, authkey):
    # 由于这个BaseManager只从网络上获取queue,所以注册时只提供名字
    BaseManager.register('get_task_queue')
    BaseManager.register('get_result_queue')
    print ('Connect to server %s' % host)
    # 注意,端口port和验证码authkey必须和manager服务器设置的完全一致
    worker = BaseManager(address=(host, port), authkey=authkey)
    # 链接到manager服务器
    worker.connect()
    return worker


    def get_queue(worker):
    task = worker.get_task_queue()
    result = worker.get_result_queue()
    # 从task队列取数据,并添加到result队列中
    while 1:
    if task.empty():
    time.sleep(1)
    continue
    n = task.get(timeout=1)
    print ('worker get %d' % n)
    result.put(n)
    time.sleep(1)

    if __name__ == "__main__":
    host = '127.0.0.1'
    port = 5000
    authkey = 'abc'
    # 启动worker
    worker = start_worker(host, port, authkey)
    # 获取队列
    get_queue(worker)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    最后,先启动manager服务器,然后启动两个worker
    manager服务器截图


    worker1截图


    worker2截图


    可以看到worker1+worker2的数据了等于manager服务器的数据,并且没有重复的值
    ---------------------
    作者:Imagine_Dragon
    来源:CSDN
    原文:https://blog.csdn.net/Imagine_Dragon/article/details/77689194
    版权声明:本文为博主原创文章,转载请附上博文链接!

  • 相关阅读:
    Lucene.NET中Field.Index 和 Field.Store的几种属性的用法
    WP7学习笔记(三)
    sql注入
    JSP数据库开发实例
    oracle命令大全(转)
    .net个人涉及
    JavaScript的错误处理之onerror事件的使用方法
    脚本问题。调试
    做到了,你就成熟
    ea8.0
  • 原文地址:https://www.cnblogs.com/ExMan/p/10187595.html
Copyright © 2011-2022 走看看