zoukankan      html  css  js  c++  java
  • Python的多进程编程

    Python在2.6引入了多进程的机制,并提供了丰富的组件及api以方便编写并发应用。multiprocessing包的组件Process, Queue, Pipe, Lock等组件提供了与多线程类似的功能。

    使用这些组件。能够方便地编写多进程并发程序。


    Process

    Process等同于java.lang.Thread。start方法用以启动某个进程。一个简单的演示样例:

    Python代码  收藏代码
    1. from multiprocessing import Process  
    2. import os  
    3. import time  
    4.   
    5. def sleeper(name, seconds):  
    6.     print "Process ID# %s" % (os.getpid())  
    7.     print "Parent Process ID# %s" % (os.getppid())  
    8.     print "%s will sleep for %s seconds" % (name, seconds)  
    9.     time.sleep(seconds)  
    10.   
    11. if __name__ == "__main__":  
    12.     child_proc = Process(target=sleeper, args=('bob'5))  
    13.     child_proc.start()  
    14.     print "in parent process after child process start"  
    15.     print "parent process abount to join child process"  
    16.     child_proc.join()  
    17.     print "in parent process after child process join"  
    18.     print "the parent's parent process: %s" % (os.getppid())  
     
    实例化一个Process必需要指定target和args。

    target是新的进程的入口方法,能够觉得是main方法。args是该方法的參数列表。启动进程类似于启动Thread,必需要调用start方法。

    也能够继承Process,覆盖run方法,在run方法中实现该进程的逻辑。调用join方法会堵塞当前调用进程。直到被调用进程执行结束。


    手工终止一个进程能够调用terminate方法,在UNIX系统中。该方法会发送SIGTERM信号量,而在windows系统中,会借助TerminateProcess方法。

    须要注意的是。exit处理逻辑并不会被运行,该进程的子进程不会被终止,他们仅仅会变成孤儿进程。


    Queue

    Queue是多进程安全的队列,能够使用Queue实现多进程之间的数据传递。

    put方法用以插入数据到队列中,put方法还有两个可选參数:blocked和timeout。假设blocked为True(默认值),而且timeout为正值,该方法会堵塞timeout指定的时间,直到该队列有剩余的空间。假设超时,会抛出Queue.Full异常。

    假设blocked为False,但该Queue已满。会马上抛出Queue.Full异常。


    get方法能够从队列读取而且删除一个元素。

    相同,get方法有两个可选參数:blocked和timeout。假设blocked为True(默认值),而且timeout为正值。那么在等待时间内没有取到不论什么元素,会抛出Queue.Empty异常。假设blocked为False,有两种情况存在,假设Queue有一个值可用,则马上返回该值。否则,假设队列为空,则马上抛出Queue.Empty异常。Queue的一段演示样例代码:


    Python代码  收藏代码
    1. from multiprocessing import Process, Queue  
    2.   
    3. def offer(queue):  
    4.     queue.put("Hello World")  
    5.   
    6. if __name__ == '__main__':  
    7.     q = Queue()  
    8.     p = Process(target=offer, args=(q,))  
    9.     p.start()  
    10.     print q.get()  
     
    Pipes

    Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex參数。假设duplex參数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1仅仅负责接受消息,conn2仅仅负责发送消息。


    send和recv方法各自是发送和接受消息的方法。比如。在全双工模式下,能够调用conn1.send发送消息,conn1.recv接收消息。

    假设没有消息可接收,recv方法会一直堵塞。假设管道已经被关闭,那么recv方法会抛出EOFError。


    Python代码  收藏代码
    1. from multiprocessing import Process, Pipe  
    2.   
    3. def send(conn):  
    4.     conn.send("Hello World")  
    5.     conn.close()  
    6.   
    7. if __name__ == '__main__':  
    8.     parent_conn, child_conn = Pipe()  
    9.     p = Process(target=send, args=(child_conn,))  
    10.     p.start()  
    11.     print parent_conn.recv()  
     
    同步

    multiprocessing包提供了Condition, Event, Lock, RLock, Semaphore等组件可用于同步。以下是使用Lock的一个演示样例:

    Python代码  收藏代码
    1. from multiprocessing import Process, Lock  
    2.   
    3. def l(lock, num):  
    4.     lock.acquire()  
    5.     print "Hello Num: %s" % (num)  
    6.     lock.release()  
    7.   
    8. if __name__ == '__main__':  
    9.     lock = Lock()  
    10.   
    11.     for num in range(20):  
    12.         Process(target=l, args=(lock, num)).start()  
  • 相关阅读:
    Kafka文件存储机制那些事(转发)
    Kafka文件存储机制那些事(转发)
    消息队列设计精要(转发)
    RocketMQ原理解析-Broker(转发)
    Apache Kafka:下一代分布式消息系统(转发)
    新浪技术分享:我们如何扛下32亿条实时日志的分析处理(转发)
    消息队列技术介绍(转发)
    confluent kafka for .net
    kafka参考资料
    kafka架构(转发)
  • 原文地址:https://www.cnblogs.com/brucemengbm/p/7297522.html
Copyright © 2011-2022 走看看