zoukankan      html  css  js  c++  java
  • multiprocessing模块

    multiprocessing模块为在子进程中运行任务、通信和共享数据,以及执行各种形式的同步提供支持。进程没有任何共享状态,如果某个进程修改数据,改动只限于该进程内。

    Process()类:表示运行在一个子进程中的任务。

    class Process(object):
    def init(self, group=None, target=None, name=None, args=(), kwargs={}):

    参数:

    group:未使用

    target:当前进程启动时执行的可调用对象

    name:为进程指定描述性名称的字符串

    args:传递给target的位置参数的元组

    kwargs:传递给target的关键字参数的字典

    实例p有以下方法和属性:

    
    'authkey','daemon', 'exitcode', 'ident', 'is_alive', 'join', 'name', 'pid', 'run', 'sentinel', 'start', 'terminate'
    
    

    方法

    1.p.is_alive():如果p仍然运行,返回True

    2.p.join():等待进程p终止。timeout是可选的超市期限。进程可以被连接无数次,但如果连接自身则会出错。

    3.p.run():进程启动时运行的方法。默认情况下,会调用传递给Process构造函数的target。定义进程的另一个方法是继承Process类并重新实现run()函数。

    4.p.start():启动进程。这将运行代表进程的子进程,并调用该子进程中的p.run()函数

    5.p.terminate():强制终止进程。如果调用此函数,进程P将被立即终止,同时不会进行任何清理动作。如果进程p创建了它自己的子进程,这些进程将变为僵尸进程。使用此方法时需特别小心。如果p保存了一个锁或参与了进程间通信,那么终止它可能会导致死锁或I/O损坏。

    属性

    6.p.authkey:进程的身份验证键。除非显式设定,这是由os. urandom()函数生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性。这类连接只有在两端具有相同的身份验证键时才能成功。

    7.p.daemon:

    一个布尔标志,指示进程是否是后台进程。当创建它的 Python进程终止时,后台( Daemonic)进程将自动终止。另外,禁止后台进程创建自己的新进程。p.daemon的值必须在使用 p.start()函数启动进程之前进行设置。

    8.p.exitcode:进程的整数退出代码。如果进程仍然在运行,它的值为None。如果值为负数,-N表示进程由信号N所终止。

    9.p.name:进程的名称。

    10.p.pid:进程的整数进程ID。

    11.p.ident:

    12.p.sentinel:

    • 例子:demo3.py

    进程间通信

    • multiprocessing模块支持进程间通信的两种主要形式:管道和队列。这两种方法都是使用消息传递实现的,但队列接口有意模仿线程程序中常见的队列用法。

    队列

    Queue( [maxsize])

    
    class Queue(object):
    
    def __init__(self, maxsize=-1):
    
    self._maxsize = maxsize
    
    
    • 创建共享的进程队列。 maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底

    层队列使用管道和锁实现。另外,还需要运行攴持线程以便将队列中的数据传输到底层管道中

    • Queue的实例q具有以下方法:
    
    'cancel_join_thread', 'close', 'empty', 'full', 'get', 'get_nowait', 'join_thread', 'put', 'put_nowait', 'qsize'
    
    
    1. q.cancel_join_thread():

    2. q.close():

    3. q.empty():

    4. q.full():

    5. q.get():

    6. q.get_nowait():

    7. q.join_thread():

    8. q.put():

    9. q.put_nowait():

    10. q.qsize():

    JoinableQueue([maxsize])

    • 创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项的消费者通知生产者项已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

    • JoinableQueue的实例p除了与 Queue对象相同的方法之外,还具有以下方法:

    
    'cancel_join_thread', 'close', 'empty', 'full', 'get', 'get_nowait', 'join', 'join_thread', 'put', 'put_nowait', 'qsize', 'task_done'
    
    
    1. p.task_done():消费者使用此方法发出信号,表示qget()返回的项已经被处理。如果调用此方法的次数大于从队列中删除的项的数量,将引发 Valueerror异常。

    2. p.join():生产者使用此方法进行阻塞,直到队列中的所有项均被处理。阻塞将持续到为队列中的每个项均调用q.task done()方法为止。

    • 例子:demo4.py

    管道

    Pip([duplex])

    在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn和conn2是表示管道两端的 Connection对象。默认情况下,管道是双向的。如果将 duplex置为 False,conn只能用于接收,而con2只能用于发送。必须在创建和启动使用管道的 Process对象之前调用pipe()方法。

    
    def Pipe(duplex=True):
    
    return Connection(), Connection()
    
    

    Pipe()方法返回的 Connection对象的实例c具有以下方法和属性。

    1. c.close():关闭连接。如果c被垃圾回收,将自动调用此方法。

    2. c.fileno():返回连接使用的整数文件描述符。

    3. c.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将 timeout置为None,操作将无限期地等待数据到达。

    4. c.recv():接收c.send()方法返回的对象。如果连接的另一端已经关闭,再也不存在任何数据,将引发EOFERror异常。

    5. c.recv_bytes([maxlength]):

    接受c.send_ bytes()方法发送的一条完整的字节消息。 maxlength指定要接收的最大字节数。如果进入的消息超过了这个最大值,将引发 IOError异常,并且在连接上无法进行进一步读取。如果连接的另一端已经关闭,再也不存在任何数据,将引EOFERror异常。

    1. c.recv_bytes_into(buffer [ offset]):接收一条完整的字节消息,并把它保存在 buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发 BufferTooShort异常。

    2. c.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象

    3. C.send_bytes(buffer [,offset [,size]]):通过连接发送字节数据缓冲区。 buffer是支持缓冲区接口的任意对象, offset是缓冲区中的字

    节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收。

    如何使用管道实现前面的生产者-消费者问题

    
    # import multiprocessing
    
    # def consumer(output_p,input_p):
    
    # input_p.close()
    
    # while True:
    
    # try:
    
    # item = output_p.recv()
    
    # except EOFError:
    
    # break
    
    # print(item)
    
    # print("Consumer done")
    
    #
    
    #
    
    # def producer(sequence,input_P):
    
    # for item in sequence:
    
    # input_P.send(item)
    
    #
    
    #
    
    # if __name__ == '__main__':
    
    # (output_p,input_p) = multiprocessing.Pipe()
    
    # cons_p = multiprocessing.Process(target=consumer,args=(output_p,input_p),)
    
    # cons_p.start()
    
    #
    
    #
    
    # output_p.close()
    
    #
    
    # sequence = [1,2,3,4]
    
    # producer(sequence,input_p)
    
    # input_p.close()
    
    #
    
    # cons_p.join()
    
    

    管道可用于双向通信。利用通常在客户端/服务器计算中使用的请求响应模型或远程过程调用,就可以使用管道编写与进程交互的程序,例如:

    
    import multiprocessing
    
    def add(pipe):
    
    server_P, client_P = pipe
    
    client_P.close()
    
    while True:
    
    try:
    
    x,y = server_P.recv()
    
    except EOFError:
    
    break
    
    result = x + y
    
    server_P.send(result)
    
    print("Server done")
    
    if __name__ == '__main__':
    
    (server_P, client_P) = multiprocessing.Pipe()
    
    adder_p = multiprocessing.Process(target=add,args=((server_P, client_P),))
    
    adder_p.start()
    
    server_P.close()
    
    client_P.send((3,4))
    
    print(client_P.recv())
    
    client_P.send(('hello','world'))
    
    print(client_P.recv())
    
    client_P.close()
    
    adder_p.join()
    
    
    • 如何使用管道实现前面的生产者-消费者问题这个例子中有一个错误:

    output_p, input_p= pipe

    传进来的只有一个参数却赋值给两个参数。

    秋来凉风起,无限思远人
  • 相关阅读:
    f(n)=1-1/2+1/3-1/4...+1/n
    练习2-15 求简单交错序列前N项和 (15 分)
    练习2-14 求奇数分之一序列前N项和 (15 分)
    练习2-13 求N分之一序列前N项和 (15 分)
    练习2-12 输出华氏-摄氏温度转换表 (15 分)
    【转载】如何从零开始开发一款嵌入式产品(20年的嵌入式经验分享学习,来自STM32神舟系列开发板设计师的总结
    【转载-Andrew_qian】stm32中断学习
    【转-Andrew_qian】stm32中断嵌套全攻略
    【转】写给自学者的入门指南
    【转】作为一个程序员,数学对你到底有多重要
  • 原文地址:https://www.cnblogs.com/lalavender/p/10744888.html
Copyright © 2011-2022 走看看