zoukankan      html  css  js  c++  java
  • Python3 从零单排24_多进程

      进程:正在进行的一个过程或者说一个任务。而负责执行任务则是cpu。 程序仅仅只是一堆代码而已,而进程指的是程序的运行过程

      1.开启进程的两种方式:

     1 from multiprocessing import Process
     2 import time
     3 import os
     4 
     5 # 方式一:
     6 def task(name):
     7     print("%s is running...,pid: %s; ppid: %s " % (name, os.getpid(), os.getppid()))  # 拿到进程pid以及父进程pid
     8     time.sleep(1)
     9     print("%s is done..." % name)
    10 
    11 # 方式二
    12 class CreateProcess(Process):
    13     def __init__(self, name):
    14         super().__init__()
    15         self.name = name  # 如果没指定名字,会有自己的名称,类名+编号
    16 
    17     def run(self):
    18         print("%s is running...,pid: %s; ppid: %s " % (self.name, os.getpid(), os.getppid()))
    19         time.sleep(3)
    20         print("%s is done..." % self.name)
    21 
    22 
    23 if __name__ == "__main__":
    24     # 方式一:
    25     p1 = Process(target=task, args=("方式一子进程",))
    26     p1.start()  # 告诉操作系统帮忙开启新的子进程,操作系统开辟一个新的内存空间给子进程使用,并把主进程的内存数据复制到子进程的内存空间里
    27     print(p1.name)  # 进程名称,Process-编号 也可以自己指定
    28     p4 = Process(target=task, name="我是方式一的子进程4444", args=("方式一子进程4",))
    29     print(p4.name)
    30 
    31     # 方式二:
    32     p2 = CreateProcess("方式二子进程1")
    33     p3 = CreateProcess("方式二子进程2")
    34     p2.start()
    35     p3.start()
    36 
    37     print(p1.is_alive())  # true
    38     print(p3.is_alive())  # true
    39     p3.terminate()  # 杀调子进程,将子进程变成僵尸进程。实质是发送杀掉进程命令给操作系统,操作系统回收内存资源,保留子进程状态信息
    40     p1.join()  # 等待子进程p1运行结算才往下运行
    41     print("after terminate p3:", p3.is_alive())  # false
    42     print(p1.pid, p2.pid, p3.pid)  # 查看子进程的pid
    43     print(p1.is_alive())  # 判断进程存活状态,因为上面join了,所以p1已经是结束了的,只是保留了状态,所以false
    44     print("主进程结束! pid: %s" % os.getpid())  # 拿到进程pid
    View Code

      僵尸进程:子进程运行完后,内存空间被操作系统收回,但是进程状态需要保留给父进程查看; 也就是说子进程实际上是死了,但是还留有状态信息给父进程可查,等父进程也运行完后,才会清理掉这个已经死掉的子进程状态信息。

      孤儿进程:父进程挂掉后,子进程还在运行,这个时候就由操作系统的 init(linux操作系统中国呢所有进程的父进程) 进程监管/回收这些子进程

      2.守护进程 :子进程设置为主进程的守护进程,主进程运行完(重点,主进程运行完!!!),子进程也跟着死掉。

     1 from multiprocessing import Process
     2 import time
     3 class my_process_thread(Process):
     4     def run(self):
     5         print("%s is running" % self.name)
     6         time.sleep(1)
     7         print("%s is done" % self.name)
     8 
     9 
    10 if __name__ == "__main__":
    11     p1 = my_process_thread()
    12     p1.daemon = True  # 设置为守护线程后,这里主线程运行完,p1就死掉了,不会打印进程1
    13     p1.start()
    14     p2 = my_process_thread()
    15     p2.start()
    16     print("主线程 is over !")
    View Code

      3.互斥锁

     1 # 问题:竞争资源会出现数据错乱,比如多个进程竞争终端,都要打印数据,大家都在打印,数据就乱了。
     2 from multiprocessing import Process
     3 import time
     4 class my_process_thread(Process):
     5     def run(self):
     6         print("%s is running 第一行" % self.name)
     7         time.sleep(1)
     8         print("%s is running 第二行" % self.name)
     9         time.sleep(1)
    10         print("%s is running 第三行" % self.name)
    11         time.sleep(1)
    12         print("%s is done" % self.name)
    13 
    14 
    15 if __name__ == "__main__":
    16     for i in range(3):
    17         p = my_process_thread()
    18         p.start()
    19     print("主线程 is over !")
    20 
    21 
    22 
    23 # 主线程 is over !
    24 # my_process&thread-1 is running 第一行
    25 # my_process&thread-2 is running 第一行
    26 # my_process&thread-3 is running 第一行
    27 # my_process&thread-1 is running 第二行
    28 # my_process&thread-2 is running 第二行
    29 # my_process&thread-3 is running 第二行
    30 # my_process&thread-1 is running 第三行
    31 # my_process&thread-3 is running 第三行Myprocess-2 is running 第三行
    32 # 
    33 # my_process&thread-1 is done
    34 # my_process&thread-3 is done
    35 # my_process&thread-2 is done
    36 '''
    37 
    38 # 互斥锁的意义在于将并发变回串行,一个进程用完下个进程才可以用,数据就有序了,但是效率降低,取决你需要数据安全性还是效率。
    39 '''
    40 from multiprocessing import Process, Lock
    41 import time
    42 class my_process&thread(Process):
    43     def __init__(self, lock):
    44         super().__init__()
    45         self.lock = lock
    46 
    47     def run(self):
    48         self.lock.acquire()  # 申请锁
    49         print("%s is running 第一行" % self.name)
    50         time.sleep(1)
    51         print("%s is running 第二行" % self.name)
    52         time.sleep(1)
    53         print("%s is running 第三行" % self.name)
    54         time.sleep(1)
    55         print("%s is done" % self.name)
    56         self.lock.release()  # 释放锁
    57 
    58 
    59 if __name__ == "__main__":
    60     lock = Lock()   # 实例化锁,子进程拿到锁才可以运行
    61     for i in range(3):
    62         p = my_process&thread(lock)
    63         p.start()
    64     print("主线程 is over !")
    65 
    66 
    67 # 主线程 is over !
    68 # my_process&thread-1 is running 第一行
    69 # my_process&thread-1 is running 第二行
    70 # my_process&thread-1 is running 第三行
    71 # my_process&thread-1 is done
    72 # my_process&thread-2 is running 第一行
    73 # my_process&thread-2 is running 第二行
    74 # my_process&thread-2 is running 第三行
    75 # my_process&thread-2 is done
    76 # my_process&thread-3 is running 第一行
    77 # my_process&thread-3 is running 第二行
    78 # my_process&thread-3 is running 第三行
    79 # my_process&thread-3 is done
    View Code

      4.join和互斥锁的区别
      join只能让整个子进程串行,互斥锁可以让局部代码串行(比如修改数据部分串行,其他查询可以继续并行)

     1 from multiprocessing import Process,Lock
     2 import json
     3 import time
     4 
     5 def search(name):
     6     time.sleep(1)
     7     dic=json.load(open('db.txt','r',encoding='utf-8'))
     8     print('<%s> 查看到剩余票数【%s】' %(name,dic['count']))
     9 
    10 
    11 def get(name):
    12     time.sleep(1)
    13     dic=json.load(open('db.txt','r',encoding='utf-8'))
    14     if dic['count'] > 0:
    15         dic['count']-=1
    16         time.sleep(3)
    17         json.dump(dic,open('db.txt','w',encoding='utf-8'))
    18         print('<%s> 购票成功' %name)
    19     else:
    20         print('<%s> 购票失败' %name)
    21 
    22 def task(name,):
    23     search(name)
    24     # mutex.acquire()
    25     get(name)
    26     # mutex.release()
    27 
    28 if __name__ == '__main__':
    29     # mutex=Lock()
    30     for i in range(10):
    31         p=Process(target=task,args=('路人%s' %i,))
    32         p.start()
    33         p.join()
    View Code

      5.队列

      进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的
      创建队列的类(底层就是以管道和锁定的方式实现):
      Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

      maxsize是队列中允许最大项数,省略则无大小限制。但需要明确:
        1、队列内存放的是消息而非大数据
        2、队列占用的是内存空间,因而maxsize即便是无大小限制也受限于内存大小

     1 from multiprocessing import Queue
     2 q = Queue(3)
     3 
     4 q.put(123)  # 往队列里塞一个数据,可以是任何数据类型
     5 q.put([4, 5, 6])
     6 print(q.full())  # 判断队列大小是否已经达到最大项数,这里才塞了俩数据,所以false
     7 q.put({"a": "apple"})
     8 print(q.full())  # 已经达到最大项数了,True
     9 print(q.empty())  # 判断队列里的是否不存在数据了,这里队列了有三数据,所以为false
    10 
    11 print(q.get())  # 根据先进先出原则,从队列里取一个数据
    12 print(q.get())
    13 print(q.get())
    14 print(q.empty())  # True
    15 
    16 
    17 # 生产者/消费者模型
    18 from multiprocessing import Queue
    19 from multiprocessing import Process
    20 import time
    21 
    22 
    23 def producer(name, q):
    24     for i in range(5):
    25         time.sleep(0.5)
    26         res = "%s生产的馒头%s" % (name, i)
    27         q.put(res)
    28         print(res)
    29 
    30 
    31 def consumer(name, q):
    32     while True:
    33         time.sleep(0.2)
    34         res = q.get()
    35         if not res:
    36             break
    37         print("%s 吃了 %s" % (name, res))
    38 
    39 
    40 if __name__ == "__main__":
    41     q = Queue()
    42     p_lis = []
    43     for i in range(3):
    44         p = Process(target=producer, args=("xg%s" % i, q))
    45         p_lis.append(p)
    46         p.start()
    47     s = Process(target=consumer, args=("yy", q))
    48     s.start()
    49     for p in p_lis:
    50         p.join()
    51     q.put(None)  # 注意,有几个消费者就要发送几个None
    52     print("over")
    View Code

      6.JoinableQueue 的使用 可以用join方法的Queue

      JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
      q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
      q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

     1 from multiprocessing import JoinableQueue
     2 from multiprocessing import Process
     3 import time
     4 
     5 
     6 def producer(name, q):
     7     for i in range(5):
     8         time.sleep(0.5)
     9         res = "%s生产的馒头%s" % (name, i)
    10         q.put(res)
    11         print(res)
    12     q.join()  # 等待q结束(即q队列数据全部调用了q.task_done()方法,才往下继续运行代码)
    13 
    14 
    15 def consumer(name, q):
    16     while True:
    17         time.sleep(0.2)
    18         res = q.get()
    19         print("%s 吃了 %s" % (name, res))
    20         q.task_done()  # 每消费一个数据,调用一次task_done方法,为了给join方法用
    21 
    22 
    23 if __name__ == "__main__":
    24     q = JoinableQueue()
    25     p_lis = []
    26     for i in range(3):
    27         p = Process(target=producer, args=("xg%s" % i, q))
    28         p_lis.append(p)
    29         p.start()
    30     s = Process(target=consumer, args=("yy", q))
    31     s.daemon = True  # 主线程运行完,就销毁掉生产者的进程
    32     s.start()
    33     for p in p_lis:
    34         p.join()
    35     print("over")  # 走到这一步说明,生产者已经运行完了,生产者运行完,代表队列里的数据都调用了task_done方法,也就是消费者运行也结束了
    View Code
  • 相关阅读:
    git 常用命令
    PHP打印日志类
    如何从总账获取分类账信息
    AP -> GL 数据流动
    JDeveloper 速度慢问题
    JDeveloper 滚轮不受控制
    MyBatis 环境搭建
    初识 MyBatis
    Linux 中设置 MySQL 字符集为 UTF-8
    Linux 安装 MySQL 详解(rpm 包)
  • 原文地址:https://www.cnblogs.com/znyyy/p/10174878.html
Copyright © 2011-2022 走看看