zoukankan      html  css  js  c++  java
  • 并发编程

    1.开启子进程的两种方式
     1 from multiprocessing import Process
     2 import time
     3 
     4 def task(name):
     5     print('%s is running'%name)
     6     time.sleep(2)
     7     print('%s is down'%name)
     8 
     9 if __name__ == "__main__":
    10     # p = Process(target=task,kwargs={'name':'alice'})
    11     p = Process(target=task,args=('alice',))
    12     p.start()
    13 
    14     print('')
    方式一
     1 from multiprocessing import Process
     2 import time
     3 
     4 class MyProcess(Process):
     5     def __init__(self,name):
     6         super().__init__()
     7         self.name = name
     8 
     9     def run(self):
    10         print('%s is running'%self.name)
    11         time.sleep(2)
    12         print("%s is down"%self.name)
    13 
    14 if __name__ == "__main__":
    15     p = MyProcess('子进程1')
    16     p.start()
    17 
    18     print('')
    方式二
    2.查看pid
    1.os.getpid()
    2.os.getppid()
     1 from multiprocessing import Process
     2 import time
     3 import os
     4 
     5 def task(name):
     6     print('%s is running'%name,os.getpid(),os.getppid())
     7     time.sleep(2)
     8     print('%s is down'%name)
     9 
    10 if __name__ == "__main__":
    11     p = Process(target=task,args=('子进程1',))
    12     p.start()
    13 
    14     print('',os.getpid(),os.getppid())
    查看pid
    3.僵尸进程/孤儿进程
    父进程一直不死,子进程变成了僵尸进程一直占着pid;有害
    孤儿进程,父进程先死了,linux中最大的父进程init进程会回收孤儿进程的pid
    4.Process对象的其他属性或方法
    p = Process(target=task,kwargs={'name':'子进程1'})
    p = Process(target=task,name='sub-process',args=('子进程1',))
    0. p.start() # 只是给os发了一个信号,调用 p.run()
    1. p.join() # 等待子进程运行完成
    2. p.pid # 查看子进程的pid
    3. p.is_alive() # 查看子进程是否还活着
    4. p.name # 查看子进程的name
    5. p.terminate() # 给os发个信号,杀死子进程,信号,需要时间处理

    多个子进程p.join()是并发执行,遇到io切 5s 多
    多个子进程p1.start() p1.join() 是串行执行 10s

     1 from multiprocessing import Process
     2 import os
     3 import time
     4 
     5 def task(name,n):
     6     print('%s is running'%name,os.getpid(),os.getppid())
     7     time.sleep(n)
     8     print('%s is down'%name)
     9 
    10 if __name__ == "__main__":
    11     start = time.time()
    12     p1 = Process(target=task,name='sub-process',args=('子进程1',5))
    13     p2 = Process(target=task,args=('子进程2',3))
    14     p3 = Process(target=task,args=('子进程3',2))
    15     p_l = [p1,p2,p3]  # 并行 5s 多
    16     for p in p_l:
    17         p.start()
    18     print(p1.is_alive())  # True
    19     p1.terminate()  # 给os 发个信号,杀死子进程,需要时间
    20     time.sleep(2)
    21     print(p1.is_alive())  # False
    22     print(p1.name)
    23     print(p2.name)
    24     for p in p_l:
    25         p.join()
    26 
    27     print(p1.is_alive()) # False
    28     print(p1.name)
    29     print(p2.name)
    30     print(p1.pid)
    31 
    32     # p1.start()   # 串行 10s 多
    33     # p1.join()
    34     # p2.start()
    35     # p2.join()
    36     # p3.start()
    37     # p3.join()
    38 
    39     end = time.time()
    40     print('',end-start)   # 用时5s多,说明是 并发执行
    41 
    42     # print(p.pid)
    43     # 这里任然得到了子进程的pid 说明子进程变成了 僵尸进程
    44     # 主进程结束了,基于它的子进程才会结束,即僵尸进程的pid才会被回收
    Process对象的其他属性或方法
    5.守护进程
    开进程的目的:并发任务,假设任务在主进程死后没意义存在了,就设为守护进程
    守护进程:
    1. p.daemon = True
    2. 主进程结束时,守护进程也结束了
    3. 守护进程一定要在p.start()前设置
    4. 不允许在守护进程内在开子进程,没意义
     1 from multiprocessing import Process
     2 import time
     3 
     4 def task(name):
     5     print('%s is running'%name)
     6     time.sleep(2)
     7     print('%s is down'%name)
     8     p = Process(target=time.sleep,args=(3,))
     9     p.start()
    10 
    11 if __name__ == "__main__":
    12     p = Process(target=task,args=('子进程1',))
    13     p.daemon = True
    14     p.start()
    15 
    16     print('')
    17 
    18 # 思考下列代码的执行结果有可能有哪些情况?为什么?
    19 from multiprocessing import Process
    20 import time
    21 
    22 def foo():
    23     print(123)
    24     time.sleep(1)
    25     print("end123")
    26 
    27 def bar():
    28     print(456)
    29     time.sleep(3)
    30     print("end456")
    31 
    32 if __name__ == '__main__':
    33     p1=Process(target=foo)
    34     p2=Process(target=bar)
    35 
    36     p1.daemon=True  # 主进程死后,代码执行完毕,守护进程就死了
    37     p1.start()
    38     p2.start()
    39     print("main-------")  # 只要这里一运行,守护进程就完了
    40 """
    41 main-------
    42 456
    43 end456
    44 """
    守护进程
    6.互斥锁:
    进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱
    如何控制,就是加锁处理:
    互斥锁的原理,就是把并发改成串行,降低了效率,但保证了数据安全不错乱。保证了多个进程修改一块数据时,一个一个修改,不错乱。
     1 # 并发运行,效率高,但竞争同一打印终端,带来了打印错乱
     2 # 由并发变成了串行,牺牲了运行效率,但避免了竞争
     3 from multiprocessing import Process,Lock
     4 import time
     5 import os
     6 
     7 def task(lock):
     8     lock.acquire()
     9     print('%s is running'%os.getpid())
    10     time.sleep(2)
    11     print('%s is done'%os.getpid())
    12     lock.release()
    13 
    14 if __name__ == "__main__":
    15     lock = Lock()
    16     for i in range(3):
    17         p = Process(target=task,args=(lock,))
    18         p.start()
    互斥锁
    7.互斥锁的应用:
    模拟抢票
    加锁处理:购票行为由并发变成了串行,牺牲了运行效率,但保证了数据安全
    with mutex:
    get(name)
    等价于:
    mutex.acquire()
    mutex.release()
     1 # 文件db.txt的内容为:{"count":1}
     2 # 注意一定要用双引号,不然json无法识别
     3 import json
     4 import time
     5 from multiprocessing import Process,Lock
     6 
     7 def search(name):
     8     with open('db.txt','r',encoding='utf-8') as f:
     9         dic =json.load(f)
    10     time.sleep(1)
    11     print('33[1;31m%s 查到剩余票数 %s33[0m'%(name,dic['count']))
    12 
    13 def get(name):
    14     dic = json.load(open('db.txt'))
    15     time.sleep(1)
    16     if dic['count'] > 0:
    17         dic['count'] -= 1
    18         time.sleep(1)
    19         json.dump(dic,open('db.txt','w'))
    20         print('%s 购票成功'%name)
    21 
    22 def task(name,mutex):
    23     search(name)
    24     # mutex.acquire()
    25     # get(name)
    26     # mutex.release()
    27     with mutex:
    28         get(name)
    29 
    30 if __name__ == "__main__":
    31     mutex = Lock()
    32     for i in range(4):
    33         p = Process(target=task,args=('用户%s'%i,mutex))
    34         p.start()
    35 """
    36 用户0 查到剩余票数 2
    37 用户1 查到剩余票数 2
    38 用户2 查到剩余票数 2
    39 用户3 查到剩余票数 2
    40 用户0 购票成功
    41 用户1 购票成功
    42 """
    互斥锁的应用
    8.互斥锁与join的区别:
    使用join可以将并发变成串行,互斥锁的原理也是将并发变成串行
    区别:
    1.join是将一个任务整体串行
    2.互斥锁的好处则是可以将一个任务中的某一段代码串行
     1 import json
     2 import time
     3 from multiprocessing import Process,Lock
     4 
     5 def search(name):
     6     with open('db.txt','r',encoding='utf-8') as f:
     7         dic =json.load(f)
     8     time.sleep(1)
     9     print('33[1;31m%s 查到剩余票数 %s33[0m'%(name,dic['count']))
    10 
    11 def get(name):
    12     dic = json.load(open('db.txt'))
    13     time.sleep(1)
    14     if dic['count'] > 0:
    15         dic['count'] -= 1
    16         time.sleep(1)
    17         json.dump(dic,open('db.txt','w'))
    18         print('%s 购票成功'%name)
    19 
    20 def task(name):
    21     search(name)
    22     get(name)
    23 
    24 if __name__ == "__main__":
    25     for i in range(4):
    26         p = Process(target=task,args=('用户%s'%i,))
    27         p.start()
    28         p.join()
    29 """
    30 用户0 查到剩余票数 1
    31 用户0 购票成功
    32 用户1 查到剩余票数 0
    33 用户2 查到剩余票数 0
    34 用户3 查到剩余票数 0
    35 """
    互斥锁与join的区别
    9.互斥锁总结:
    加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行地修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
    虽然可以用文件共享数据实现进程间通信,但问题是:
    1.效率低(共享数据基于文件,而文件是硬盘上的数据)
    2.需要自己加锁处理
    因此我们最好找寻一种解决方案能够兼顾:
    1.效率高
    2.帮我们处理好锁的问题
    这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
    队列和管道都是将数据存放于内存中,而队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,因而队列才是进程间通信(IPC)的最佳选择。
    队列作用:
    多个进程之间通信使用的,一个进程将数据放到队列里面,另外一个进程从队列里面取走数据,干的是进程之间通信的活
    10.队列:
    进程彼此之间是相互隔离的,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列 管道
    队列 = 管道 + 锁
    q = Queue(3) # 省略无大小限制 maxsize
    注:
    1.队列内存放的是消息而非大数据
    2.队列占用的是内存空间,而maxsize即便无大小限制也受限于内存大小
    11.队列的使用: 先进先出  
    q = Queue(3)
    q.put(2)
    q.get()
    q.full()
    q.empty()
    q.put_nowait(2) # 无阻塞,当队列满时,直接抛出异常 queue.Full
    q.get_nowait() # 无阻塞,当队列空时,直接抛出异常 queue.Empty
     1 from multiprocessing import Process,Lock,Queue
     2 
     3 q = Queue(3)
     4 q.put(2)
     5 q.put([1,2,3])
     6 # q.put_nowait()
     7 q.put({'name':'alice'})
     8 # q.put_nowait(1)
     9 # q.put(1)
    10 print(q.full())
    11 # q.put('aa')  # 满了 再放就阻塞了
    12 
    13 print(q.get())
    14 print(q.get())
    15 print(q.get())
    16 # print(q.get())
    17 # q.get_nowait()
    18 print(q.empty())
    19 
    20 # print(q.get()) # 空了 在取就阻塞了
    队列的使用
    12.生产者消费者模型:
    生产者:生产数据的任务
    消费者:处理数据的任务

    生产者与消费者模型:
    生产者与消费者之间引入一个容器(队列):
    生产者《---》队列《---》消费者
    生产者:一个进程
    消费者:一个进程
    进程间通信(IPC):队列 Queue

    好处:程序解开耦合,生产者与消费者不直接通信,平衡了生产者与消费者的速度差
    注:
    q.put(None) # 有几个消费者,就发送几个结束信号 None 相当low
     1 import time,os
     2 from multiprocessing import Process, Queue
     3 
     4 def producter(q):
     5     for i in range(4):
     6         res = '包子%s'%i
     7         time.sleep(0.5)
     8         print('生产者生产了 %s'%res)
     9 
    10         q.put(res)
    11 
    12 def consumer(q):
    13     while True:
    14         res = q.get()
    15         if not res:break
    16         time.sleep(1)
    17         print('%s消费者吃了%s'%(os.getpid(),res))
    18 
    19 if __name__ == '__main__':
    20     q = Queue()
    21     p1 = Process(target=producter,args=(q,))
    22     c1 = Process(target=consumer,args=(q,))
    23     c2 = Process(target=consumer, args=(q,))
    24     p1.start()
    25     c1.start()
    26     c2.start()
    27 
    28     p1.join()
    29     q.put(None)  # 有几个消费者,就要放几个None
    30     q.put(None)  # 有几个消费者,就发送几个结束信号 None 相当low
    31 
    32     print('')
    33 """
    34 生产者生产了 包子0
    35 生产者生产了 包子1
    36 生产者生产了 包子2
    37 20280消费者吃了包子0
    38 生产者生产了 包子3
    39 11920消费者吃了包子1
    40 41 20280消费者吃了包子2
    42 11920消费者吃了包子3
    43 """
    生产者消费者模型
    13.JoinableQueue:
    基于JoinableQueue实现生产者消费者模型:
    q = JoinableQueue(size) # size 省略则大小无限制
    生产者:q.join()
    消费者:q.task_done()
    并且:消费者设为守护进程
     1 import time,os
     2 from multiprocessing import Process, Queue,JoinableQueue
     3 
     4 def producter(q):
     5     for i in range(4):
     6         res = '包子%s'%i
     7         time.sleep(0.5)
     8         print('%s生产者生产了 %s'%(os.getpid(),res))
     9 
    10         q.put(res)
    11     q.join()  # 等到消费者把自己放入队列中的所有的数据都取走之后,生产者才结束
    12 
    13 def consumer(q):
    14     while True:
    15         res = q.get()
    16         time.sleep(1)
    17         print('%s消费者吃了%s'%(os.getpid(),res))
    18         q.task_done()   # 发送信号给q.join(),说明已经从队列中取走一个数据并处理完毕了
    19 
    20 if __name__ == '__main__':
    21     # 容器
    22     q = JoinableQueue()
    23     # 生产者
    24     p1 = Process(target=producter,args=(q,))
    25     p2 = Process(target=producter, args=(q,))
    26     # 消费者
    27     c1 = Process(target=consumer,args=(q,))
    28     c2 = Process(target=consumer, args=(q,))
    29 
    30     p1.start()
    31     p2.start()
    32 
    33     c1.daemon = True  # 必须设为守护进程
    34     c2.daemon = True
    35 
    36     c1.start()
    37     c2.start()
    38 
    39     p1.join()
    40     p2.join()
    41 
    42     print('')
    43 """
    44     1、主进程等生产者p1、p2结束
    45     2、而p1、p2是在消费者把所有数据都取干净之后才会结束
    46     3、所以一旦p1、p2结束了,证明消费者也没必要存在了,应该随着主进程一块死掉,因而需要将消费者设置成守护进程
    47 """
    48 """
    49 18712生产者生产了 包子0
    50 19100生产者生产了 包子0
    51 18712生产者生产了 包子1
    52 19100生产者生产了 包子1
    53 19636消费者吃了包子0
    54 18712生产者生产了 包子2
    55 18084消费者吃了包子0
    56 19100生产者生产了 包子2
    57 18712生产者生产了 包子3
    58 19100生产者生产了 包子3
    59 19636消费者吃了包子1
    60 18084消费者吃了包子1
    61 19636消费者吃了包子2
    62 18084消费者吃了包子2
    63 19636消费者吃了包子3
    64 18084消费者吃了包子3
    65 66 """
    JoinableQueue
    14.生产者消费者模型总结: 
    1.程序中有两类角色:
    一类负责生产数据(生产者)
    二类负责处理数据(消费者)
    2.引入生产者消费者模型为了解决的问题是:
    1.平衡生产者与消费者之间的速度差
    2.程序解开耦合
    3.如何实现生产者消费者模型:
    生产者《---》队列《---》消费者

     

     Python多进程通信Queue、Pipe、Value、Array实例

      queue和pipe用来在进程间传递消息;
      Value + Array 是python中共享内存映射文件的方法;速度比较快.
       http://www.jb51.net/article/57666.htm

     进程间共享数据  multiprocessing.Manager

       http://www.jb51.net/article/57663.htm




  • 相关阅读:
    精彩的漫画小说
    《Java语言精粹》译者序
    群啊群
    围观透明咆哮体
    读《Cassandra权威指南》
    好书什么样?
    一个关于360和腾讯的调查
    Xcode 3.x class ations 以及outlets 去哪里了 ?
    「译」JavaScript 的 MVC 模式
    MAC OS 虚拟机里的control键设置
  • 原文地址:https://www.cnblogs.com/alice-bj/p/8694864.html
Copyright © 2011-2022 走看看