zoukankan      html  css  js  c++  java
  • Python之进程与线程

    一.进程

    1.什么是进程

    程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程。程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本;进程是程序的一次执行活动,属于动态概念。

    在多道编程中,我们允许多个程序同时加载到内存中,在操作系统的调度下,可以实现并发地执行。这是这样的设计,大大提高了CPU的利用率。进程的出现让每个用户感觉到自己独享CPU,因此,进程就是为了在CPU上实现多道编程而提出的。

    2.有了进程为什么还要线程?

    进程有很多优点,它提供了多道编程,让我们感觉我们每个人都拥有自己的CPU和其他资源,可以提高计算机的利用率。很多人就不理解了,既然进程这么优秀,为什么还要线程呢?其实,仔细观察就会发现进程还是有很多缺陷的,主要体现在两点上:

    • 进程只能在一个时间干一件事,如果想同时干两件事或多件事,进程就无能为力了。

    • 进程在执行的过程中如果阻塞,例如等待输入,整个进程就会挂起,即使进程中有些工作不依赖于输入的数据,也将无法执行。

    例如,我们在使用qq聊天, qq做为一个独立进程如果同一时间只能干一件事,那他如何实现在同一时刻 即能监听键盘输入、又能监听其它人给你发的消息、同时还能把别人发的消息显示在屏幕上呢?你会说,操作系统不是有分时么?但我的亲,分时是指在不同进程间的分时呀, 即操作系统处理一会你的qq任务,又切换到word文档任务上了,每个cpu时间片分给你的qq程序时,你的qq还是只能同时干一件事呀。

    再直白一点, 一个操作系统就像是一个工厂,工厂里面有很多个生产车间,不同的车间生产不同的产品,每个车间就相当于一个进程,且你的工厂又穷,供电不足,同一时间只能给一个车间供电,为了能让所有车间都能同时生产,你的工厂的电工只能给不同的车间分时供电,但是轮到你的qq车间时,发现只有一个干活的工人,结果生产效率极低,为了解决这个问题,应该怎么办呢?。。。。没错,你肯定想到了,就是多加几个工人,让几个人工人并行工作,这每个工人,就是线程!

    二.线程

    1.什么是线程

      在传统操作系统中,每个进程有一个地址空间,而且默认就有一个控制线程

      多线程(即多个控制线程)的概念是,在一个进程中存在多个控制线程,多个控制线程共享该进程的地址空间

      进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上的执行单位

          例如,北京地铁与上海地铁是不同的进程,而北京地铁里的13号线是一个线程,北京地铁所有的线路共享北京地铁所有的资源,比如所有的乘客可以被所有线路拉。

    2.为什么要用线程

      多线程指的是,在一个进程中开启多个线程,简单的讲:如果多个任务共用一块地址空间,那么必须在一个进程内开启多个线程。详细的讲分为4点: 

      1. 多线程共享一个进程的地址空间

      2. 线程比进程更轻量级,线程比进程更容易创建可撤销,在许多操作系统中,创建一个线程比创建一个进程要快10-100倍,在有大量线程需要动态和快速修改时,这一特性很有用

      3. 若多个线程都是cpu密集型的,那么并不能获得性能上的增强,但是如果存在大量的计算和大量的I/O处理,拥有多个线程允许这些活动彼此重叠运行,从而会加快程序执行的速度。

      4. 在多cpu系统中,为了最大限度的利用多核,可以开启多个线程(比开进程开销要小的多)

    三.Python并发编程之多进程

    1.multiprocessing模块介绍

      python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python提供了非常好用的多进程包multiprocessing。  

      multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。

      multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

      需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

    2.Process类的介绍

    • 创建进程的类

    Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)
    强调:
    1. 需要使用关键字的方式来指定参数
    2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
    • 参数介绍

    group参数未使用,值始终为None
    target表示调用对象,即子进程要执行的任务
    args表示调用对象的位置参数元组,args=(1,2,'egon',)
    kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
    name为子进程的名称
    • 方法介绍

    p.start():启动进程,并调用该子进程中的p.run() 
    p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  
    p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
    p.is_alive():如果p仍然运行,返回True
    p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
    • 属性介绍

    p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
    p.name:进程的名称
    p.pid:进程的pid
    p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
    p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可) 

    3.Process类的使用

    part1:创建并开启子进程的方式

    注意:在windows中Process()必须放到# if __name__ == '__main__':下

    由于Windows没有fork,多处理模块启动一个新的Python进程并导入调用模块。
    如果在导入时调用Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)。
    这是隐藏对Process()内部调用的原,使用if __name__ == “__main __”,这个if语句中的语句将不会在导入时被调用。

    • 开进程第一种方法

    # -*- coding:utf-8 -*-
    # /user/bin/python
    import time
    import random
    from multiprocessing import Process
    def eat(name):
        print("%s eating" % name)
        time.sleep(random.randint(1,3))
        print("%s eat end" % name)
    if __name__ == '__main__':
    
        p1 = Process(target=eat, args=("alex",))
        p2 = Process(target=eat, args=("nickle",))
        p3 = Process(target=eat, args=("json",))
        p4 = Process(target=eat, args=("tom",))
    
        p1.start()
        p2.start()
        p3.start()
        p4.start()
        print("主线程")
    
    """
    输出结果:
    主线程
    alex eating
    nickle eating
    json eating
    tom eating
    nickle eat end
    json eat end
    alex eat end
    tom eat end
    """
    • 开启进程第二种方法

    # -*- coding:utf-8 -*-
    # /user/bin/python
    import time
    import random
    from multiprocessing import Process
    class eat(Process):
    
        def __init__(self,name):
            super().__init__()
            self.name = name
    
        def run(self):
            print("%s eating" % self.name)
            time.sleep(random.randint(1,3))
            print("%s eat end" % self.name)
    if __name__ == '__main__':
        p1 = eat("alex")
        p2 = eat("nickle")
        p3 = eat("json")
        p4 = eat("tom")
    
        p1.start()
        p2.start()
        p3.start()
        p4.start()
        print("主进程")
    
    """
    输出结果:
    主进程
    alex eating
    json eating
    nickle eating
    tom eating
    nickle eat end
    tom eat end
    json eat end
    alex eat end
    """

    part2:Process对象的其他方法或属性

    • 进程对象的方法一:terminate,is_alive

    # -*- coding:utf-8 -*-
    # /user/bin/python
    from multiprocessing import Process
    import time
    import random
    class eat(Process):
    
        def __init__(self,name):
            super().__init__()
            self.name = name
    
        def run(self):
            print("%s eating" % self.name)
            time.sleep(random.randint(1,3))
            print("%s eat end" % self.name)
    if __name__ == '__main__':
        p = eat("nickle")
        p.start()
    
        p.terminate() # 关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活
        print(p.is_alive()) # 结果为True
        time.sleep(random.randint(1,3))
        print("go")
        print(p.is_alive())  # 结果为False
    
    """
    输出结果:
    True
    go
    False
    """

     注意了:p.join(),是父进程在等p的结束,是父进程阻塞在原地,而p仍然在后台运行

    • 进程对象的方法二:p.daemon=True,p.join

    # -*- coding:utf-8 -*-
    # /user/bin/python
    from multiprocessing import Process
    import time
    import random
    class eat(Process):
    
        def __init__(self,name):
            super().__init__()
            self.name = name
    
        def run(self):
            print("%s eating" % self.name)
            time.sleep(random.randint(1,3))
            print("%s eat end" % self.name)
    if __name__ == '__main__':
        p = eat("nickle")
        p.daemon = True # 一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且主进程死,p跟着一起死
        p.start()
    
        p.join(0.0001) # 等待p停止,等0.0001秒就不再等了
        print("go")
    
    """
    输出结果:
    go
    """
    # -*- coding:utf-8 -*-
    # /user/bin/python
    from multiprocessing import Process
    import time
    import random
    
    def eat(name):
        print("%s eating" % name)
        time.sleep(random.randint(1,3))
        print("%s eat end" % name)
    if __name__ == '__main__':
    
        p1 = Process(target=eat, args=("alex",))
        p2 = Process(target=eat, args=("nickle",))
        p3 = Process(target=eat, args=("json",))
        p4 = Process(target=eat, args=("tom",))
    
        p1.start()
        p2.start()
        p3.start()
        p4.start()
    
        # 既然join是等待进程结束,那么我像下面这样写,进程不就又变成串行的了吗?
        # 当然不是了,必须明确:p.join()是让谁等?
        # 很明显p.join()是让主线程等待p的结束,卡住的是主线程而绝非进程p,
    
        # 详细解析如下:
        # 进程只要start就会在开始运行了,所以p1-p4.start()时,系统中已经有四个并发的进程了
        # 而我们p1.join()是在等p1结束,没错p1只要不结束主线程就会一直卡在原地,这也是问题的关键
        # join是让主线程等,而p1-p4仍然是并发执行的,p1.join的时候,其余p2,p3,p4仍然在运行,等#p1.join结束,可能p2,p3,p4早已经结束了,这样p2.join,p3.join.p4.join直接通过检测,无需等待
        # 所以4个join花费的总时间仍然是耗费时间最长的那个进程运行的时间
    
        p1.join()
        p2.join()
        p3.join()
        p4.join()
        print("主线程")
    # 简化方式:
    #     p_lis = [p1, p2, p3, p4]
    #     for p in p_lis:
    #         p.start()
    #
    #     for p in p_lis:
    #         p.join()
    """
    输出结果:
    alex eating
    nickle eating
    json eating
    tom eating
    nickle eat end
    tom eat end
    alex eat end
    json eat end
    主线程
    """
    View Code
    • 进程对象的其他方法:name,id

    # -*- coding:utf-8 -*-
    # /user/bin/python
    from multiprocessing import Process
    import time
    import random
    
    class eat(Process):
    
        def __init__(self,name):
            # self.name=name
            # super().__init__() #Process的__init__方法会执行self.name=Piao-1,
            #                    #所以加到这里,会覆盖我们的self.name=name
    
            #为我们开启的进程设置名字的做法
            super().__init__()
            self.name = name
    
        def run(self):
            print("%s eating" % self.name)
            time.sleep(random.randint(1,3))
            print("%s eat end" % self.name)
    if __name__ == '__main__':
    
        p = eat("nickle")
        p.start()
        print("主线程")
        print(p.pid)
        print(p.name)
    
    """
    输出结果:
    主线程
    9100
    nickle
    nickle eating
    nickle eat end
    """

    4.进程间通信(IPC)方式一:队列

    进程彼此之间互相隔离,要实现进程间通信,即IPC,multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

      创建队列的类(底层就是以管道和锁定的方式实现):

    1 Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

      参数介绍:

    主要方法:

    1 q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。
    如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
    2 q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,
    会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
    3 4 q.get_nowait():同q.get(False) 5 q.put_nowait():同q.put(False) 6 7 q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。 8 q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。 9 q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样

    其他方法:

    1 1 q.cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞
    2 2 q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将调用此方法。关闭队列不会在队列
    使用者中产生任何类型的数据结束信号或异常。例如,如果某个使用者正在被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
    3 3 q.join_thread():连接队列的后台线程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread方法可以禁止这种行为

    应用:

    # -*- coding:utf-8 -*-
    # /user/bin/python
    from multiprocessing import Process,Queue
    q = Queue(3)
    q.put({"a":1})
    q.put("b")
    q.put("c")
    print(q.full()) # 判断队列满没满
    q.put("d",False) # 当block=False时抛出异常 等同于q.put_nowait("d")
    q.put("d",timeout=2) # 设置等待时间
    print(q.qsize()) # 查看队列大小
    
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.empty()) # 判断队列是否为空
    print(q.get(block=False)) # 当block=False时抛出异常 等同于print(q.get_nowait())
    print(q.get(timeout=2)) # # 设置等待时间

     生产者消费模型

    在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

      为什么要使用生产者和消费者模式

    在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

      什么是生产者消费者模式

    生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

    基于队列实现生产者消费者模型

    # -*- coding:utf-8 -*-
    # /user/bin/python
    from multiprocessing import Process,Queue
    import time
    import random
    
    def consumer(q,name):
        while True:
            time.sleep(random.randint(1,3))
            res = q.get()
            print("33[41m消费者%s拿到了%s33[0m"% (name,res))
    
    def producer(seq,q,name):
        for item in seq:
            time.sleep(random.randint(1, 3))
            q.put(item)
            print("生产者%s生产了%s" % (name,item))
    
    if __name__ == '__main__':
        q = Queue()
        c = Process(target=consumer,args=(q,"nickle"))
        c.start()
    
        seq = ["包子%s" % i for i in range(10)]
        producer(seq,q,"厨师1")
    
        print("主进程")
        
    """
    输出结果:
    生产者厨师1生产了包子0
    生产者厨师1生产了包子1
    消费者nickle拿到了包子0
    生产者厨师1生产了包子2
    消费者nickle拿到了包子1
    生产者厨师1生产了包子3
    消费者nickle拿到了包子2
    生产者厨师1生产了包子4
    消费者nickle拿到了包子3
    生产者厨师1生产了包子5
    消费者nickle拿到了包子4
    生产者厨师1生产了包子6
    消费者nickle拿到了包子5
    生产者厨师1生产了包子7
    消费者nickle拿到了包子6
    生产者厨师1生产了包子8
    消费者nickle拿到了包子7
    生产者厨师1生产了包子9
    主进程
    消费者nickle拿到了包子8
    消费者nickle拿到了包子9
    """

    创建队列的另外一个类:

        JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

        参数介绍:

        maxsize是队列中允许最大项数,省略则无大小限制。    

      方法介绍:

        JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
        q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
        q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
    # -*- coding:utf-8 -*-
    # /user/bin/python
    from multiprocessing import Process,JoinableQueue
    import time
    import random
    
    def consumer(q,name):
        while True:
            time.sleep(random.randint(1,3))
            res = q.get()
            q.task_done()
            print("消费者%s拿到了%s" % (name,res))
    
    def producer(seq,q,name):
        for item in seq:
            time.sleep(random.randint(1, 3))
            q.put(item)
            print("生产者%s生产了%s" % (name,item))
        q.join()
        print("=================================")
    
    if __name__ == '__main__':
        q = JoinableQueue()
        c = Process(target=consumer,args=(q,"nickle"))
        c.daemon=True # 设置守护主进程
        c.start()
    
        seq = ["包子%s" % i for i in range(5)]
        p = Process(target=producer,args=(seq,q,"厨师1"))
        p.start()
    
        # master--->producer----->q--->consumer(10次task_done)
        p.join()  # 主进程等待p结束,p等待c把数据都取完,c一旦取完数据,p.join就是不再阻塞,进
        # 而主进程结束,主进程结束会回收守护进程c,而且c此时也没有存在的必要了
    
        print("主进程")
    
    """
    输出结果:
    生产者厨师1生产了包子0
    消费者nickle拿到了包子0
    生产者厨师1生产了包子1
    消费者nickle拿到了包子1
    生产者厨师1生产了包子2
    消费者nickle拿到了包子2
    生产者厨师1生产了包子3
    消费者nickle拿到了包子3
    生产者厨师1生产了包子4
    消费者nickle拿到了包子4
    =================================
    主进程
    """

     5.进程池

        开多进程的目的是为了并发,如果有多核,通常有几个核就开几个进程,进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行),但很明显需要并发执行的任务要远大于核数,这时我们就可以通过维护一个进程池来控制进程数目,比如httpd的进程模式,规定最小进程数和最大进程数...    

        当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。

       而且对于远程过程调用的高级应用程序而言,应该使用进程池,Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。

        在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。

      创建进程池的类:

    1  Pool([numprocess  [,initializer [, initargs]]]):创建进程池

      参数介绍:

    1 1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
    2 2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None
    3 3 initargs:是要传给initializer的参数组

      主要方法:

    1 p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,
    必须从不同线程调用p.apply()函数或者使用p.apply_async()
    2 p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为
    可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。
    3 p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成5 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用

       其他方法:

    1 方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法
    2 obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
    3 obj.ready():如果调用完成,返回True
    4 obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
    5 obj.wait([timeout]):等待结果变为可用。
    6 obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数

      应用:

         提交任务,并在主进程中拿到结果(之前的Process是执行任务,结果放到队列里,现在可以在主进程中直接拿到结果)

    # -*- coding:utf-8 -*-
    # /user/bin/python
    from multiprocessing import Pool
    import time
    def work(n):
        print("开工了")
        time.sleep(3)
        return n**2
    
    if __name__ == '__main__':
        q = Pool()
    
        res = q.apply_async(work,args=(2,))
        q.close()
        q.join()
        print(res.get())
    """
    输出结果:
    开工了
    4
    """
  • 相关阅读:
    asp.net控件开发基础(17)
    AjaxControlToolkit更新
    asp.net控件开发基础(18)
    asp.net控件开发基础(15)
    新装了vista,不容易啊
    asp.net控件开发基础(19)
    Oracle创建用户及数据表
    RMAN快速入门指南
    Oracle数据库中sql基础
    PL/SQL循序渐进全面学习教程Oracle
  • 原文地址:https://www.cnblogs.com/Crazy-lyl/p/7087780.html
Copyright © 2011-2022 走看看