zoukankan      html  css  js  c++  java
  • python 分布式进程

    分布式进程

     

    如果已经有一个通过Queue通信的多进程程序在同一台机器上运行,
    希望把发送任务的进程和处理任务的进程分布到两台机器上。

    通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了

    服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务

    # task_master.py

    import random, time, queue
    from multiprocessing.managers import BaseManager

    # 发送任务的队列:
    task_queue = queue.Queue()
    # 接收结果的队列:
    result_queue = queue.Queue()

    # 从BaseManager继承的QueueManager:
    class QueueManager(BaseManager):
    pass

    # 把两个Queue都注册到网络上, callable参数关联了Queue对象:

    QueueManager.register('get_task_queue', callable=lambda: task_queue)
    QueueManager.register('get_result_queue', callable=lambda: result_queue)

    # 绑定端口5000, 设置验证码'abc':

    manager = QueueManager(address=('', 5000), authkey=b'abc')

    # 启动Queue:

    manager.start()

    # 获得通过网络访问的Queue对象:

    task = manager.get_task_queue()
    result = manager.get_result_queue()

    # 放几个任务进去:

    for i in range(10):
    n = random.randint(0, 10000)
    print('Put task %d...' % n)
    task.put(n)

    # 从result队列读取结果:

    print('Try get results...')
    for i in range(10):
    r = result.get(timeout=10)
    print('Result: %s' % r)
    # 关闭:
    manager.shutdown()
    print('master exit.')

    必须通过manager.get_task_queue()获得的Queue接口添加

     

    然后,在另一台机器上启动任务进程(本机上启动也可以)


    # task_worker.py

    import time, sys, queue
    from multiprocessing.managers import BaseManager

    # 创建类似的QueueManager:
    class QueueManager(BaseManager):
    pass

    # 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
    QueueManager.register('get_task_queue')
    QueueManager.register('get_result_queue')

    # 连接到服务器,也就是运行task_master.py的机器:
    server_addr = '127.0.0.1'
    print('Connect to server %s...' % server_addr)
    # 端口和验证码注意保持与task_master.py设置的完全一致:
    m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
    # 从网络连接:
    m.connect()
    # 获取Queue的对象:
    task = m.get_task_queue()
    result = m.get_result_queue()
    # 从task队列取任务,并把结果写入result队列:
    for i in range(10):
    try:
    n = task.get(timeout=1)
    print('run task %d * %d...' % (n, n))
    r = '%d * %d = %d' % (n, n, n*n)
    time.sleep(1)
    result.put(r)
    except Queue.Empty:
    print('task queue is empty.')
    # 处理结束:
    print('worker exit.')

    任务进程要通过网络连接到服务进程,所以要指定服务进程的IP。

    先启动task_master.py服务进程:

    $ python3 task_master.py
    Put task 3411...
    Put task 1605...
    Put task 1398...
    Put task 4729...
    Put task 5300...
    Put task 7471...
    Put task 68...
    Put task 4219...
    Put task 339...
    Put task 7866...
    Try get results...

    task_master.py进程发送完任务后,开始等待result队列的结果。现在启动task_worker.py进程:

    $ python3 task_worker.py
    Connect to server 127.0.0.1...
    run task 3411 * 3411...
    run task 1605 * 1605...
    run task 1398 * 1398...
    run task 4729 * 4729...
    run task 5300 * 5300...
    run task 7471 * 7471...
    run task 68 * 68...
    run task 4219 * 4219...
    run task 339 * 339...
    run task 7866 * 7866...
    worker exit.


    task_worker.py进程结束,在task_master.py进程中会继续打印出结果:

    Result: 3411 * 3411 = 11634921
    Result: 1605 * 1605 = 2576025
    Result: 1398 * 1398 = 1954404
    Result: 4729 * 4729 = 22363441
    Result: 5300 * 5300 = 28090000
    Result: 7471 * 7471 = 55815841
    Result: 68 * 68 = 4624
    Result: 4219 * 4219 = 17799961
    Result: 339 * 339 = 114921
    Result: 7866 * 7866 = 61873956


    Queue对象存储在task_master.py进程中

    Queue之所以能通过网络访问,就是通过QueueManager实现的

     

    朝闻道
  • 相关阅读:
    linux中断申请之request_threaded_irq
    中断处理
    barrier()函数
    Intellij-设置生成serialVersionUID的方法
    mybatis一级缓存二级缓存
    mysql-EXPLAIN
    mybatis配置多个数据源事务(Transaction)处理
    mybatis实战教程三:mybatis和springmvc整合
    责任链模式
    MySQL-InnoDB-锁
  • 原文地址:https://www.cnblogs.com/wander-clouds/p/8481890.html
Copyright © 2011-2022 走看看