zoukankan      html  css  js  c++  java
  • 九. 并发编程 (进程管道)

    一 .进程(Pipe 双管道通信)

    1.进程管道概念(Pipe)

    管道: 是python多进程中一种交换数据的方式 
    multiprocessing.Pipe()则可以双向通信

                                                                                                 

    2.管道使用(Pipe)

    from multiprocessing import Pipe
    from multiprocessing import Process
    conn1,conn2=Pipe()
    conn1.send("你好哈哈哈")
    print(conn2.recv()) # 你好哈哈哈
    进程之间通信 方法1

    def show (conn1): print("子进程")# 子进程 conn1.send("你好呀哈哈哈哈哈哈哈") if __name__ == '__main__': print("主进程!!!!!!!!!!!!") conn1,coon2=Pipe() p1= Process(target=show,args=(conn1,)) p1.start() print(coon2.recv()) # 你好呀哈哈哈哈哈哈哈 # 执行顺序 # 主进程!!!!!!!!!!!! # 子进程 # 你好呀哈哈哈哈哈哈哈
    进程之间通信 方法2

    def show (conn2): print("子进程") # 子进程 aa=conn2.recv() print(aa) # 你好世界!!!!!! if __name__ == '__main__': print("主进程!!!!!!!!!!!!") conn1,coon2=Pipe() conn1.send("你好世界!!!!!!") p1= Process(target=show,args=(coon2,)) p1.start() # 执行顺序 # 主进程!!!!!!!!!!!! # 子进程 # 你好世界!!!!!!
    进程之间通信 方法3

    def show (conn1): while True: msg=conn1.recv() if msg is None:break # 如果没有明确条件 就会阻塞到这里(死循环) 程序就结束不了 print(msg) if __name__ == '__main__': print("主进程!!!!!!!!!!!!") conn1,coon2=Pipe() p1= Process(target=show,args=(conn1,)) p1.start() for i in range(10): coon2.send("你好鸭阿嘎嘎嘎") coon2.send(None) # 如果没有明确条件 就会阻塞到这里(死循环) 执行结果: 主进程!!!!!!!!!!!! 你好鸭阿嘎嘎嘎 你好鸭阿嘎嘎嘎 你好鸭阿嘎嘎嘎 你好鸭阿嘎嘎嘎 你好鸭阿嘎嘎嘎 你好鸭阿嘎嘎嘎 你好鸭阿嘎嘎嘎 你好鸭阿嘎嘎嘎 你好鸭阿嘎嘎嘎 你好鸭阿嘎嘎嘎 进程已结束,退出代码 0
    # 进程之间通信方法四
    
    def show (conn1,coon2):#一个发送 conn2 一个接收 conn1 coon2.close() while True: try: msg=conn1.recv() print(msg) except EOFError: conn1.close() break if __name__ == '__main__': print("主进程!!!!!!!!!!!!") conn1,conn2=Pipe() # 可以把这两个对象传给子进程 一个发送 conn2 一个接收 conn1 p1= Process(target=show,args=(conn1,conn2)) p1.start() conn1.close() for i in range(10): conn2.send("你好鸭阿嘎嘎嘎") conn2.close() 执行结果: 主进程!!!!!!!!!!!! 你好鸭阿嘎嘎嘎 你好鸭阿嘎嘎嘎 你好鸭阿嘎嘎嘎 你好鸭阿嘎嘎嘎 你好鸭阿嘎嘎嘎 你好鸭阿嘎嘎嘎 你好鸭阿嘎嘎嘎 你好鸭阿嘎嘎嘎 你好鸭阿嘎嘎嘎 你好鸭阿嘎嘎嘎 进程已结束,退出代码 0

    图文说明

     3.使用管道来实现生产者消费者(Pipe)

    pipe数据不安全 (抢占资源)
    IPC 进程通信
    from multiprocessing import Pipe from multiprocessing import Process import time,random # 消费者 def conm(con,pro,name): pro.close() while True: try: food=con.recv() print('%s吃了%s'%(name,food)) time.sleep(random.randint(1,5)) except EOFError: con.close() break # 生产者 def prod(con,pro,name,food): con.close() for i in range(20): time.sleep(random.randint(1,5)) f="%s生产%s个数%d"%(name,food,i) pro.send(f) pro.close() if __name__ == '__main__': print("主进程!!!!!!!!!!!!") con,pro=Pipe() p1= Process(target=prod,args=(con,pro,"张三","馒头")) p2 = Process(target=conm, args=(con,pro,"老鼠")) p1.start() p2.start() con.close() pro.close()

    图文说明

    # 枷锁来控制管道行为 来避免进程之间抢占数据造成数据不安全现象
    注意异步抢占资源 所以加锁来控制行为 同步
    from multiprocessing import Pipe from multiprocessing import Process,Lock import time,random # 消费者 def conm(con,pro,name,lock): pro.close() while True: lock.acquire() food=con.recv() lock.release() if food is None: con.close() break print('%s吃了%s' % (name, food)) # 生产者 def prod(con,pro,name,food): con.close() for i in range(20): time.sleep(random.randint(1,3)) f="%s生产了%s,%s"%(name,food,i) pro.send(f) pro.send(None) pro.send(None) pro.send(None) pro.close() if __name__ == '__main__': print("主进程!!!!!!!!!!!!") lock=Lock() con,pro=Pipe() # 生产者 p1= Process(target=prod,args=(con,pro,"张三","馒头")) # 消费者 p2 = Process(target=conm, args=(con,pro,"老鼠", lock)) p3 = Process(target=conm, args=(con, pro, "母鸡", lock)) p1.start() p2.start() p3.start() con.close() pro.close()
  • 相关阅读:
    spark 读取mongodb失败,报executor time out 和GC overhead limit exceeded 异常
    在zepplin 使用spark sql 查询mongodb的数据
    Unable to query from Mongodb from Zeppelin using spark
    spark 与zepplin 版本兼容
    kafka 新旧消费者的区别
    kafka 新生产者发送消息流程
    spark ui acl 不生效的问题分析
    python中if __name__ == '__main__': 的解析
    深入C++的new
    NSSplitView
  • 原文地址:https://www.cnblogs.com/Sup-to/p/11194096.html
Copyright © 2011-2022 走看看