zoukankan      html  css  js  c++  java
  • python多进程总结

      一、概念

      进程:进程,是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。

      重点:1. 是一次运行活动,比如qq是程序,pycharm是程序,只有运行起来才是进程。

         2.是系统进行资源分配和调度的基本单位,每个进程运行时,系统都会为他分配各自的内存,数据。每个进程间的空间是各自独立的。

      二、实例结构

    import multiprocessing
    import time
    
    def worker_1(interval):
        print "worker_1"
        time.sleep(interval)
        print "end worker_1"
    
    def worker_2(interval):
        print "worker_2"
        time.sleep(interval)
        print "end worker_2"
    
    def worker_3(interval):
        print "worker_3"
        time.sleep(interval)
        print "end worker_3"
    
    if __name__ == "__main__":
        p1 = multiprocessing.Process(target = worker_1, args = (2,))
        p2 = multiprocessing.Process(target = worker_2, args = (3,))
        p3 = multiprocessing.Process(target = worker_3, args = (4,))
    
        p1.start()
        p2.start()
        p3.start()
    
        print("The number of CPU is:" + str(multiprocessing.cpu_count()))
        for p in multiprocessing.active_children():
            print("child   p.name:" + p.name + "	p.id" + str(p.pid))
        print ("END!!!!!!!!!!!!!!!!!")

    输出:

    The number of CPU is:4
    child   p.name:Process-1    p.id5791
    child   p.name:Process-3    p.id5793
    child   p.name:Process-2    p.id5792
    END!!!!!!!!!!!!!!!!!
    worker_1
    worker_2
    worker_3
    end worker_1
    end worker_2
    end worker_3

      三、进程池

      python中,进程池内部会维护一个进程序列。当需要时,程序会去进程池中获取一个进程。如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。

      因为每个进程需要有自己的内存,数据,和cpu消耗,我们不可能开大量的进程,进程池就为我们提供了这样的便捷,在进程池中固定好可用进程的数量,实现多进程操作。

      实例:

    from multiprocessing import Pool
    import time
    
    def func(arg):
        time.sleep(1)
        print('arg is ',arg)
    
    if __name__ == '__main__':
        pl = Pool(2)  # 同时可以开启2个进程
        for i in range(10):
            pl.apply_async(func=func,args=(i,))  # apply_async是异步执行,apply是同步执行
    
        pl.close()
        time.sleep(1)
        pl.terminate()  # 关闭进程池
        pl.join()    # 阻塞进程池
        print('done')

      执行结果:

    arg is  1
    arg is  0
    done

      它只输出了两个,在程序执行过程中,关闭进程池,程序会立即停止,不会再向后执行。

      去掉terminate

    from multiprocessing import Pool
    import time
    
    def func(arg):
        time.sleep(1)
        print('arg is ',arg)
    
    if __name__ == '__main__':
        pl = Pool(2)
        for i in range(10):
            pl.apply_async(func=func,args=(i,))
    
        pl.close()  # close执行之后不会有新的进程加入到pool
        time.sleep(1)
        # pl.terminate()
        pl.join()   # 在join之前调用close,否则会报错
        print('done')
    arg is  0
    arg is  1
    arg is  2
    arg is  3
    arg is  4
    arg is  5
    arg is  6
    arg is  7
    arg is  8
    arg is  9
    done

      四、守护进程:不阻挡主程序退出,主进程结束守护进程立即结束

    def worker(interval):
        print("work start:{0}".format(time.ctime()))
        time.sleep(interval)
        print("work end:{0}".format(time.ctime()))
    
    if __name__ == '__main__':
        p = multiprocessing.Process(target=worker,args=(3,))
        p.daemon = True   # 将p设置为守护进程
        p.start()
        print('end')

    输出结果:end

      这里将子进程设置为了守护进程,他就不会阻挡主程序的退出,即主程序运行完结束运行。

    def worker(interval):
        print("work start:{0}".format(time.ctime()))
        time.sleep(interval)
        print("work end:{0}".format(time.ctime()))
    
    if __name__ == '__main__':
        p = multiprocessing.Process(target=worker,args=(3,))
        p.daemon = True
        p.start()
        p.join()     # 设置阻塞,等待子进程结束
        print('end')
    work start:Mon May 13 14:45:50 2019
    work end:Mon May 13 14:45:53 2019
    end

      五、继承Process类的形式开启进程

    import os
    from multiprocessing import Process
    
    class MyProcess(Process):
        def __init__(self,name):
            super().__init__()
            self.name = name
    
        def run(self) -> None:
            print(os.getpid())
            print(f'我是{self.name}')
    
    
    p1 = MyProcess('mike')
    p2 = MyProcess('jack')
    p3 = MyProcess('allen')
    
    lis = [p1,p2,p3]
    p1.start()
    p2.start()
    p3.start()
    
    for i in lis:
        i.join()
    
    print('主线程')
    6031
    我是mike
    6032
    我是jack
    6033
    我是allen
    主线程

      六、互斥锁

      多个任务可以在几个进程之间并发处理,但他们之间没有顺序,开启后无法控制,会引发数据安全和顺序混乱的问题。

      一个小例子:

    def task1():
        print('这是 task1 任务'.center(30, '-'))
        print('task1 进了洗手间')
        time.sleep(random.randint(1, 3))
        print('task1 办事呢...')
        time.sleep(random.randint(1, 3))
        print('task1 走出了洗手间')
    
    
    def task2():
        print('这是 task2 任务'.center(30, '-'))
        print('task2 进了洗手间')
        time.sleep(random.randint(1, 3))
        print('task2 办事呢...')
        time.sleep(random.randint(1, 3))
        print('task2 走出了洗手间')
    
    
    def task3():
        print('这是 task3 任务'.center(30, '-'))
        print('task3 进了洗手间')
        time.sleep(random.randint(1, 3))
        print('task3 办事呢...')
        time.sleep(random.randint(1, 3))
        print('task3 走出了洗手间')
    
    
    if __name__ == '__main__':
        p1 = Process(target=task1)
        p2 = Process(target=task2)
        p3 = Process(target=task3)
    
        p1.start()
        p2.start()
        p3.start()
    加锁前

      输出结果:

    ---------这是 task1 任务----------
    task1 进了洗手间
    ---------这是 task2 任务----------
    task2 进了洗手间
    ---------这是 task3 任务----------
    task3 进了洗手间
    task3 办事呢...
    task1 办事呢...
    task2 办事呢...
    task1 走出了洗手间
    task3 走出了洗手间
    task2 走出了洗手间
    加锁前输出结果

      引入锁后:

    from multiprocessing import Process, Lock
    import time
    import random
    
    # 生成一个互斥锁
    mutex_lock = Lock()
    
    
    def task1(lock):
        # 锁门
        lock.acquire()
        print('这是 task1 任务'.center(30, '-'))
        print('task1 进了洗手间')
        time.sleep(random.randint(1, 3))
        print('task1 办事呢...')
        time.sleep(random.randint(1, 3))
        print('task1 走出了洗手间')
        # 释放锁
        lock.release()
    
    
    def task2(lock):
        # 锁门
        lock.acquire()
        print('这是 task2 任务'.center(30, '-'))
        print('task2 进了洗手间')
        time.sleep(random.randint(1, 3))
        print('task2 办事呢...')
        time.sleep(random.randint(1, 3))
        print('task2 走出了洗手间')
        # 释放锁
        lock.release()
    
    
    def task3(lock):
        # 锁门
        lock.acquire()
        print('这是 task3 任务'.center(30, '-'))
        print('task3 进了洗手间')
        time.sleep(random.randint(1, 3))
        print('task3 办事呢...')
        time.sleep(random.randint(1, 3))
        print('task3 走出了洗手间')
        # 释放锁
        lock.release()
    
    
    if __name__ == '__main__':
        p1 = Process(target=task1, args=(mutex_lock, ))
        p2 = Process(target=task2, args=(mutex_lock, ))
        p3 = Process(target=task3, args=(mutex_lock, ))
    
        # 释放新建进程的信号,具体谁先启动无法确定
        p1.start()
        p2.start()
        p3.start()
    加锁后
    ---------这是 task1 任务----------
    task1 进了洗手间
    task1 办事呢...
    task1 走出了洗手间
    ---------这是 task2 任务----------
    task2 进了洗手间
    task2 办事呢...
    task2 走出了洗手间
    ---------这是 task3 任务----------
    task3 进了洗手间
    task3 办事呢...
    task3 走出了洗手间
    加锁后输出结果

    加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,在牺牲速度的前提下保证数据安全。
    虽然可以用文件共享数据实现进程间通信,但问题是:
    1. 效率低(共享数据基于文件,而文件是硬盘上的数据)
    2. 需要自己加锁处理

    因此我们最好找寻一种解决方案能够兼顾:
    1. 效率高(多个进程共享一块内存的数据)
    2. 帮我们处理好锁问题。
    mutiprocessing模块中为我们提供了一个IPC通信机制:队列和管道。
    队列和管道都是将数据存放于内存中,队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
    我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可扩展性。

      七、队列

      Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

      下面是一个队列使用的简单用法。

    from multiprocessing import Queue
    q=Queue(3)
    
    #put ,get ,put_nowait,get_nowait,full,empty
    q.put(3)
    q.put(2)
    q.put(1)
    # q.put(3)   # 如果队列已经满了,程序就会停在这里,等待数据被别人取走,再将数据放入队列。
               # 如果队列中的数据一直不被取走,程序就会永远停在这里。
    try:
        q.put_nowait(3) # 可以使用put_nowait,如果队列满了不会阻塞,但是会因为队列满了而报错。
    except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去,但是会丢掉这个消息。
        print('队列已经满了')
    
    # 因此,我们再放入数据之前,可以先看一下队列的状态,如果已经满了,就不继续put了。
    print(q.full()) #满了
    
    print(q.get())
    print(q.get())
    print(q.get())
    # print(q.get()) # 同put方法一样,如果队列已经空了,那么继续取就会出现阻塞。
    try:
        q.get_nowait(3) # 可以使用get_nowait,如果队列满了不会阻塞,但是会因为没取到值而报错。
    except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去。
        print('队列已经空了')
    
    print(q.empty()) #空了

      进程间的数据共享:

    import os
    import time
    import multiprocessing
    
    # 向queue中输入数据的函数
    def inputQ(queue):
        info = str(os.getpid()) + '(put):' + str(time.asctime())
        queue.put(info)
    
    # 向queue中输出数据的函数
    def outputQ(queue):
        info = queue.get()
        print (str(os.getpid()), '(get):',info)
    
    # Main
    if __name__ == '__main__':
        multiprocessing.freeze_support()
        record1 = []   # store input processes
        record2 = []   # store output processes
        queue = multiprocessing.Queue(3)
    
        # 输入进程
        for i in range(10):
            process = multiprocessing.Process(target=inputQ,args=(queue,))
            process.start()
            record1.append(process)
    
        # 输出进程
        for i in range(10):
            process = multiprocessing.Process(target=outputQ,args=(queue,))
            process.start()
            record2.append(process)
    
        for p in record1:
            p.join()
    
        for p in record2:
            p.join()

      进程间的数据共享主要运用在生产者消费者模型之间,新建一个队列,多个生产者可以往队列中放数据,消费者直接从队列中取出数据进行处理,队列存在于内存之中,直接读取。

      

  • 相关阅读:
    #1015 : KMP算法
    #1014 Trie树
    Type.IsContextful 说明
    判断.net中在windows系统下的字节序
    Python3 循环语句
    adb 脚本
    如何使用 adb 命令实现自动化测试
    python 字符串的方法和注释
    Android使用Fiddler模拟弱网络环境测试
    Android定位元素与操作
  • 原文地址:https://www.cnblogs.com/jimmyhe/p/10857021.html
Copyright © 2011-2022 走看看