zoukankan      html  css  js  c++  java
  • multiprocessing模块

    Process

    进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。

    一个进程至少包含一个线程。

    Process 类用来描述一个进程对象。创建子进程的时候,只需要传入一个执行函数和函数的参数即可完成 Process 示例的创建。

    Process类通过创建子进程来是实现更多的占有cpu,提高效率

    在pycharm里查看下Process的源码:

     1 class Process(object):
     2     def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
     3         self.name = ''
     4         self.daemon = False
     5         self.authkey = None
     6         self.exitcode = None
     7         self.ident = 0
     8         self.pid = 0
     9         self.sentinel = None
    10 
    11     def run(self):
    12         pass
    13 
    14     def start(self):
    15         pass
    16 
    17     def terminate(self):
    18         pass
    19 
    20     def join(self, timeout=None):
    21         pass
    22 
    23     def is_alive(self):
    24         return False
    Process类

    除了__init__()之外,包含的方法有run(),start(),terminate(),join(),is_alive()五个方法.

    run()方法:

    在研究run()方法之前我们先看下面一段代码:

     1 from multiprocessing import Process
     2 import os
     3 
     4 
     5 def func():
     6 
     7     print(os.getpid(), os.getppid())
     8 
     9 
    10 if __name__ == '__main__':
    11 
    12     print(os.getpid(), os.getppid())
    13 
    14     p = Process(target=func)
    15 
    16     p.start()
    开启子进程

    运行这段代码的结果是:

    6864 6100
    5612 6864

    py文件中的子进程在调用Process类对象后由子进程变成了父进程,也就是说新开了一个子进程去执行func()函数.

    如果是使用面向对象的思想解决进程,自定义的子类要继承Process类,并且在子类中重构run()方法,切必须重构run()方法.

    重构的run()方法就相当于def func()函数一样,切套通过start()方法调用run()方法来执行run()方法,看下面一段代码:

     1 from multiprocessing import Process
     2 import os
     3 
     4 
     5 class MyProcess(Process):
     6 
     7     def run(self):
     8 
     9         print(os.getpid(), os.getppid())
    10 
    11 
    12 if __name__ == '__main__':
    13 
    14     print(os.getpid(), os.getppid())
    15 
    16     p = MyProcess()
    17 
    18     p.start()
    面向对象创建

    这段代码执行的结果:

    6184 6100
    5320 6184

    说明我们重构的run()方法是正确的,和面向过程的实现一样

    不过此处要注意的是:在windows操作系统下,一定要加上if __name__ == '__main__':这段代码,因为windows的操作机制是在是在开辟新的进程是会重新加载一编前边的额引入模块和变量.相当于把程序重新读了一遍,这样会陷入开辟子进程的死循环中,且会抛出RuntimeError的异常:

    RuntimeError: 
            An attempt has been made to start a new process before the
            current process has finished its bootstrapping phase.

            This probably means that you are not using fork to start your
            child processes and you have forgotten to use the proper idiom
            in the main module:

                if __name__ == '__main__':
                    freeze_support()
                    ...

            The "freeze_support()" line can be omitted if the program
            is not going to be frozen to produce an executable.

    而linux操作系统是直接把内存中的变量直接拷贝,不在去执行一遍

    start()方法:

    start()方法不是运行一个程序,而是调用操作系统的命令,告诉操作系统要创建子进程

    在Process类内部start()方法是调用run()方法,来实现开启一个子进程,且必须通过start()方法来实现开启子进程

     join()方法:

    join()方法会阻塞主程序一直到目标进程/线程执行完毕才执行编程非阻塞:

     1 from os import getpid
     2 from multiprocessing import Process
     3 
     4 
     5 def func(i):
     6     print('Process %s will be executed ' % i)
     7     print('Process ID is %s' % getpid())
     8     print('Process %s has been executed ' % i)
     9 
    10 
    11 if __name__ == '__main__':
    12     process_list = []
    13     for i in range(10):
    14         p = Process(target=func, args=(i,))
    15         p.start()
    16         process_list.append(p)  # 操作系统执行子进程并不是按照这里遍历i的顺序来执行的,而是按照他内部的算法实现的
    17 
    18     for p in process_list:
    19         p.join()    # 遍历process_list列表,检查p进程是否执行结束
    20 
    21     print('The main process has been executed')
    join方法

    打印结果是:

     1 Process 0 will be executed 
     2 Process ID is 11728
     3 Process 0 has been executed 
     4 Process 4 will be executed 
     5 Process ID is 11632
     6 Process 4 has been executed 
     7 Process 1 will be executed 
     8 Process ID is 7568
     9 Process 1 has been executed 
    10 Process 3 will be executed 
    11 Process ID is 10548
    12 Process 3 has been executed 
    13 Process 2 will be executed 
    14 Process ID is 10600
    15 Process 2 has been executed 
    16 Process 9 will be executed 
    17 Process ID is 11940
    18 Process 9 has been executed 
    19 Process 7 will be executed 
    20 Process ID is 11980
    21 Process 7 has been executed 
    22 Process 6 will be executed 
    23 Process ID is 11772
    24 Process 6 has been executed 
    25 Process 5 will be executed 
    26 Process ID is 11644
    27 Process 5 has been executed 
    28 Process 8 will be executed 
    29 Process ID is 11600
    30 Process 8 has been executed 
    31 The main process has been executed
    打印结果

    terminate()方法:

    terminate()方法是用来终止一个子进程的.即回收该子进程的资源.

    is_alive()方法:

    is_alive()方法是用来判断一个进程是否还在运行:

    如果运行:return True

    如果没有运行:return False

    下面这段代码整体来掩饰一遍:

     1 from os import getpid
     2 from time import sleep
     3 from multiprocessing import Process
     4 
     5 
     6 class MyProcess(Process):
     7 
     8     def __init__(self, args):
     9         super(MyProcess, self).__init__()
    10         self.args = args
    11 
    12     def run(self):
    13         print('Process ID is %s' % getpid())
    14         print('Process %s has been executed ' % self.args)
    15 
    16     def start(self):
    17         print('Process %s will be executed ' % self.args)
    18         super().start()
    19 
    20     def terminate(self):
    21         super().terminate()
    22 
    23     def join(self, timeout=None):
    24         super().join()
    25 
    26     def is_alive(self):
    27         try:
    28             result = super().is_alive()
    29             return result
    30         except:
    31             print("Something was wrong!")
    32 
    33 
    34 if __name__ == '__main__':
    35     process_list = []
    36     for i in range(10):
    37         p = MyProcess(i)
    38         p.start()
    39         print(p.is_alive())
    40         process_list.append(p)
    41     for p in process_list:
    42         print(p.is_alive())
    43         print('There will terminated process %s' % p)
    44         p.terminate()
    45         sleep(0.2)      # 这里不延时0.2而直接打印的话,下边的is_alive会显示True
    46         print(p.is_alive())
    47         p.join()
    48         print(p.is_alive())
    重构父类方法

    Process的一些常用方法:

    daemon()方法:

    daemon()方法是守护进程守护进程会在主进程的代码执行完毕之后直接结束,无论守护进程是否执行完毕,使用daemon()方法注意:

    • 守护进程的属性,默认是False,如果设置成True,就表示设置这个子进程为一个守护进程
    • 设置守护进程的操作应该在开启子进程之前

    守护进程的主要应用:

    • 报活 主进程还活着
     1 from time import sleep
     2 from multiprocessing import Process
     3 
     4 
     5 def func1():
     6     print('begin')
     7     sleep(3)
     8     print('wahaha')
     9 
    10 
    11 def func2():
    12     print('in func2')
    13     sleep(4)
    14     print('The func2 has been executed')
    15 
    16 
    17 if __name__ == '__main__':
    18     Process(target=func1).start()
    19     p = Process(target=func2)
    20     p.daemon = True
    21     p.start()
    22     sleep(1)
    23     print('In main process')
    daemon

    打印会发现func2中的最后一句没有被打印出来

    Lock类:

     现阶段了解的lock主要适用于共享数据时控制数据的安全,这个锁有且只有一把钥匙,如果前边的进程拿到了后边的就要等着那倒钥匙才能进去

    lock的类中有如下两个方法:

    1 class Lock(object):
    2     def acquire(self, blocking=True, timeout=-1):
    3         pass
    4 
    5     def release(self):
    6         pass
    Lock类

    acquire()方法:

    相当于拿钥匙开锁,

    release()方法:

    相当于还钥匙

     1 import json
     2 from multiprocessing import Process, Lock
     3 
     4 
     5 class MyProcess(Process):
     6 
     7     def __init__(self, lock):
     8         self.lock = lock
     9         super(MyProcess, self).__init__()
    10 
    11     def run(self):
    12         ticket_dict = self.read()
    13         print("The rest of the ticket>>>%s" % ticket_dict['count'])
    14         if ticket_dict['count'] >= 1:
    15             print("Out of ticket")
    16             self.lock.acquire()
    17             self.buy()
    18             self.lock.release()
    19         else:
    20             print('Failure to buy tickets')
    21 
    22     def read(self):
    23         with open('db', 'r') as filehandler:
    24             content = json.load(filehandler)
    25         return content
    26 
    27     def buy(self):
    28         conrent = self.read()
    29         if conrent['count'] >= 1:
    30             print('Buy ticket successful')
    31             conrent['count'] -= 1
    32             with open('db', 'w') as fh:
    33                 json.dump(conrent, fh)
    34         else:
    35             print('More votes short')
    36 
    37 
    38 if __name__ == '__main__':
    39     lock = Lock()
    40     for i in range(11):
    41         p = MyProcess(lock)
    42         p.start()
    Lock

    Semaphore()类:

    semaphore()类的实质就是锁+计数器,一把钥匙或者多把钥匙对应一个锁,用法和Lock()基本相同,唯一的区别是如果是多把钥匙的话要设置钥匙数,默认是1。下面是源码:

    1 class Semaphore(object):
    2     def __init__(self, value=1):
    3         pass
    4 
    5     def acquire(self, blocking=True, timeout=None):
    6         pass
    7 
    8     def release(self):
    9         pass
    Semaphore源码

    semaphore()类包含了三个方法:

    __init__(self, value=1)方法:

    value=1:默认是1,这里的传的值是设置钥匙的个数

    acquire()方法:

    同Lock()类的acquire()

    release()方法:

    同Lock()类的release()

    semaphore()类对应的时间是商场里的唱吧:假设一个唱吧里只能同时有四个人,外边公有20人要准备玩,只能排队等:

     1 from time import sleep
     2 from random import randint
     3 from multiprocessing import Semaphore, Process
     4 
     5 
     6 def ktv(s, i):
     7     s.acquire()
     8     print('%s 进入了ktv' % i)
     9     sleep(randint(1, 5))
    10     print('%s 离开了ktv' % i)
    11     s.release()
    12 
    13 
    14 if __name__ == '__main__':
    15     s = Semaphore(4)
    16     for i in range(20):
    17         p = Process(target=ktv, args=(s, i))
    18         p.start()
    唱吧模型

    打印会发现离开一个进一个

    Event()类:

     event()类主要是用于进程间的通信,通过其他进程间的传递的信号去控制其他进程,下面是event()类的源码:

     1 class Event(object):
     2     def is_set(self):
     3         return False
     4 
     5     def set(self):
     6         pass
     7 
     8     def clear(self):
     9         pass
    10 
    11     def wait(self, timeout=None):
    12         pass
    Event源码

    Enent提供了四个方法:

    is_set(self)方法:

    is_set()方法是用来查看标志的

    在事件的创建之初 默认是False

    set(self)方法:

    将标志设置为True

    clear(self)方法:

    将标志设置为False

    wait(self, timeout=None)方法:

    等待:阻塞、如果这个标志是False 那么就阻塞。非阻塞,如果这个标志是True 那么就非阻塞。

    timeout:如果是None,将会永远阻塞。如果设置了时间:在设置的时间内,如果标志变成True,则直接pass,如果没有,则继续阻塞

    Event()典型事件就是交通灯控制车辆是否通过,通过函数和面向对象模拟练习下这个事件:

     1 from time import sleep
     2 from random import randrange,choice
     3 from multiprocessing import Process, Event
     4 
     5 
     6 class MyObj(object):
     7 
     8     def traffic_light(self, e):
     9         """
    10         默认红灯开始:is_set = False,
    11         sleep(2)是亮灯时间,然后切换另一个灯,然后设置is_set的状态,
    12         红灯亮:普通车等待,救护车等0.5秒,如果0.5秒后没有变灯,则闯灯
    13         绿灯亮:所有车辆都通过
    14         :param e: Event类的实例化对象
    15         :return:
    16         """
    17 
    18         print('33[1;31m The red light is on 33[0m')
    19         while 1:
    20             sleep(2)
    21             if e.is_set():
    22                 print('33[1;31m The red light is on 33[0m')
    23                 e.clear()
    24             else:
    25                 print('33[1;32m The green light is on 33[0m')
    26                 e.set()
    27 
    28     def car(self, e, id):
    29         if not e.is_set():
    30             print('%s car wait' % id)
    31             e.wait()
    32         print('%s car pass' % id)
    33 
    34     def ambulance(self, e, id):
    35         if e.is_set():
    36             e.wait(timeout=0.5)
    37         print('%s ambulance pass' % id)
    38 
    39 
    40 if __name__ == '__main__':
    41     myobj = MyObj()
    42     e = Event()
    43     p = Process(target=myobj.traffic_light, args=(e,))
    44     p.start()
    45     car_list = [myobj.ambulance, myobj.car]
    46     for i in range(20):
    47         p = Process(target=choice(car_list), args=(e, i))
    48         p.start()
    49         sleep(randrange(0, 3, 2))
    面向对象

    函数模拟:

     1 from time import sleep
     2 from random import randrange, choice
     3 from multiprocessing import Event, Process
     4 
     5 
     6 def traffic_light(e):
     7     print('33[1;31m The red light is on33[0m')
     8     while 1:
     9         sleep(2)
    10         if e.is_set():
    11             print('33[1;31m The red light is on33[0m')
    12             e.clear()
    13         else:
    14             print('33[1;32m The green light is on33[0m')
    15             e.set()
    16 
    17 
    18 def car(id, e):
    19     if not e.is_set():
    20         print('%s car wait' % id)
    21         e.wait()
    22     print('%s car pass' % id)
    23 
    24 
    25 def ambulance(id, e):
    26     if not e.is_set():
    27         e.wait(timeout=0.5)
    28     print('%s ambulance pass' % id)
    29 
    30 
    31 if __name__ == '__main__':
    32     e = Event()
    33     p = Process(target=traffic_light, args=(e,))
    34     p.start()
    35     car_list = [car, ambulance]
    36     for i in range(20):
    37         p = Process(target=choice(car_list), args=(i, e))
    38         p.start()
    39         sleep(randrange(0, 3, 2))
    函数

     

  • 相关阅读:
    Hybrid App(二)Cordova+android入门
    Hybrid App(一)App开发选型
    redis(一)Windows下安装redis服务、搭建redis主从复制
    玩转Nuget服务器搭建(三)
    玩转Nuget服务器搭建(二)
    玩转Nuget服务器搭建(一)
    Topshelf+Quartz.net+Dapper+Npoi(二)
    MySQL练习
    用过哪些SpringBoot注解
    Java 将数据写入全路径下的指定文件
  • 原文地址:https://www.cnblogs.com/linga/p/9378060.html
Copyright © 2011-2022 走看看