zoukankan      html  css  js  c++  java
  • day031同步锁、信号量、事件、队列、生成者消费者模型、Jionablequeue

    multiprocessing模块主要内容:

    1、守护进程
    2、进程同步
        1.同步锁(*****)
        2.信号量
        3.事件
    3、进程通信
        1.队列(*****)
        2.生产者消费者模型,JoinableQueue, 主要在队列的基础上多了两个功能:q.task_done,q.join
    

    一、multiprocessing模块的介绍

    仔细说来,multiprocess不是一个模块而是python中一个操作、管理进程的包。
    之所以叫multi是取自multiple的多功能的意思,在这个包中几乎包含了和进程有关的所有子模块。
    由于提供的子模块非常多,为了方便大家归类记忆,
    
    我将这部分大致分为四个部分:
    创建进程部分,
    进程同步部分,
    进程池部分,
    进程之间数据共享。
    
    重点强调:进程没有任何共享状态,进程修改的数据,改动仅限于该进程内,但是通过一些特殊的方法,可以实现进程之间数据的共享
    

    1、process模块介绍

    process模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建。
    
    Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)
    
    强调:
    1. 需要使用关键字的方式来指定参数
    2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
    

    fe:创建一个进程

    # 当前文件名称为test.py
    from multiprocessing import Process
    
    def func():
        print(12345)
    
    if __name__ == '__main__': #windows 下才需要写这个,这和系统创建进程的机制有关系,不用深究,记着windows下要写就好啦
        #首先我运行当前这个test.py文件,运行这个文件的程序,那么就产生了进程,这个进程我们称为主进程
    
        p = Process(target=func,) #将函数注册到一个进程中,p是一个进程对象,此时还没有启动进程,只是创建了一个进程对象。并且func是不加括号的,因为加上括号这个函数就直接运行了对吧。
        p.start() #告诉操作系统,给我开启一个进程,func这个函数就被我们新开的这个进程执行了,而这个进程是我主进程运行过程中创建出来的,所以称这个新创建的进程为主进程的子进程,而主进程又可以称为这个新进程的父进程。
              #而这个子进程中执行的程序,相当于将现在这个test.py文件中的程序copy到一个你看不到的python文件中去执行了,就相当于当前这个文件,被另外一个py文件import过去并执行了。
              #start并不是直接就去执行了,我们知道进程有三个状态,进程会进入进程的三个状态,就绪,(被调度,也就是时间片切换到它的时候)执行,阻塞,并且在这个三个状态之间不断的转换,等待cpu执行时间片到了。
        print('*' * 10) #这是主进程的程序,上面开启的子进程的程序是和主进程的程序同时运行的,我们称为异步
    
    Python

    fe:证明子进程和主进程的关系

    我们通过主进程创建的子进程是异步执行的,那么我们就验证一下,并且看一下子进程和主进程(也就是父进程)的ID号(讲一下pid和ppid,使用pycharm举例),来看看是否是父子关系。
    

    子进程与主进程的关系

    1.看一个问题,说明linux和windows两个不同的操作系统创建进程的不同机制导致的不同结果:

    import time
    import os
    from multiprocessing import Process
    
    def func():
        print('aaaa')
        time.sleep(1)
        print('子进程>>',os.getpid())
        print('该子进程的父进程>>',os.getppid())
        print(12345)
    
    print('太白老司机~~~~') # 如果我在这里加了一个打印,你会发现运行结果中会出现两次打印出来的太白老司机,因为我们在主进程中开了一个子进程,
    # 子进程中的程序相当于import的主进程中的程序,那么import的时候会不会执行你import的那个文件的程序啊,前面学的,是会执行的,所以出现了两次打印
    
    # 其实是因为windows开起进程的机制决定的,在linux下是不存在这个效果的,因为windows使用的是process方法来开启进程,他就会拿到主进程中的所有程序,
    # 而linux下只是去执行我子进程中注册的那个函数,不会执行别的程序,这也是为什么在windows下要加上执行程序的时候,
    
    要加上if __name__ == '__main__':,否则会出现子进程中运行的时候还开启子进程,那就出现无限循环的创建进程了,就报错了
    
    Python

    2.Process类中参数的介绍:

    参数介绍:
    1 group参数未使用,值始终为None
    2 target表示调用对象,即子进程要执行的任务
    3 args表示调用对象的位置参数元组,args=(1,2,'egon',)
    4 kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
    5 name为子进程的名称
    
    fe:给要执行的函数传参数

    给要执行的函数传参数

    3.Process类中各方法的介绍:

    1、p.start():启动进程,并调用该子进程中的p.run()
    2、p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
    3、p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
    4、p.is_alive():如果p仍然运行,返回True, 判断进程是否还存活
    5、p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
    
    Python

    join方法的例子:

    让主进程加上join的地方等待(也就是阻塞住),等待子进程执行完之后,再继续往下执行我的主进程,
    好多时候,我们主进程需要子进程的执行结果,所以必须要等待。join感觉就像是将子进程和主进程拼接起来一样,将异步改为同步执行。
    

    join方法的例子

    4.怎么样开启多个进程呢?for循环。

    并且我有个需求就是说,所有的子进程异步执行,然后所有的子进程全部执行完之后,我再执行主进程,怎么搞?看代码
    

    开启多个进程

    fe:模拟两个应用场景:1、同时对一个文件进行写操作 2、同时创建多个文件

    示例

    5.Process类中自带封装的各属性的介绍

    1、p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
    2、p.name:进程的名称
    3、p.pid:进程的pid
    4、p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
    5、p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)
    
    Python

    2、Process类的使用

    注意:在windows中Process()必须放到# if name == ‘main‘:下

    原因解释

    练习:那我们自己通过多进程来实现一下同时和多个客户端进行连接通信。

    我们之前学socket的时候,知道tcp协议的socket是不能同时和多个客户端进行连接的,(这里先不考虑socketserver那个模块),
    对不对,那我们自己通过多进程来实现一下同时和多个客户端进行连接通信。
    
    服务端代码示例:(注意一点:通过这个是不能做qq聊天的,
    因为qq聊天是qq的客户端把信息发给另外一个qq的客户端,中间有一个服务端帮你转发消息,
    而不是我们这样的单纯的客户端和服务端对话,并且子进程开启之后咱们是没法操作的,
    并且没有为子进程input输入提供控制台,所有你再在子进程中写上了input会报错,EOFError错误,
    这个错误的意思就是你的input需要输入,但是你输入不了,就会报这个错误。
    而子进程的输出打印之类的,是pycharm做了优化,将所有子进程中的输出结果帮你打印出来了,但实质还是不同进程的。)
    

    服务端代码客户端代码

    1.上面我们通过多进程实现了并发,但是有个问题

    每来一个客户端,都在服务端开启一个进程,如果并发来一个万个客户端,要开启一万个进程吗,你自己尝试着在你自己的机器上开启一万个,10万个进程试一试。
    解决方法:进程池,本篇博客后面会讲到,大家继续学习呀
    

    2.僵尸进程与孤儿进程(简单了解 一下就可以啦)

    僵尸进程与孤儿进程

    3、守护进程

    之前我们讲的子进程是不会随着主进程的结束而结束,子进程全部执行完之后,程序才结束,
    那么如果有一天我们的需求是我的主进程结束了,由我主进程创建的那些子进程必须跟着结束,怎么办?守护进程就来了!
    
    主进程创建守护进程
    
    其一:守护进程会在主进程代码执行结束后就终止
    
    其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
    
    注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止
    
    Python
    import time
    from multiprocessing import Process
    
    def func1():
        time.sleep(2)
        print('xxxxx')
    
    if __name__ == '__main__':
        p = Process(target=func1,)
        p.daemon = True  #守护进程,要写在start之前
        p.start()
    
        print('主进程代码运行完了')
        # time.sleep(3)
    
    Python

    4.进程同步(锁)重点

    保证数据安全用的,但是将锁起来的那段代码的执行变成了同步串行,牺牲了效率,保证了安全
    
    from multiprocessing import Lock
    L = Lock() # 创建锁的对象
    L.acquire() # 限定一个进入
    数据操作
    L.release() # 释放
    
    Python

    fe:接下来,我们以模拟抢票为例,来看看数据安全的重要性。

    并发运行,效率高,但是竞争同一个文件,导致数据混乱加锁:购票行为由并发变成了串行,牺牲了效率,但是保证了数据安全

    进程锁总结

    进程锁总结

    5、信号量 (了解)

    互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的线程更改数据 。
    假设商场里有4个迷你唱吧,所以同时可以进去4个人,如果来了第五个人就要在外面等待,等到有人出来才能再进去玩。
    实现:
    信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。
    这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。
    
    信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念
    
    Python

    信号量的使用

    6、事件(了解)

    事件介绍

    python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
    
        事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
    
    clear:将“Flag”设置为False
    set:将“Flag”设置为True
    
    Python

    事件方法的使用通过事件来模拟红绿灯示例基于事件的进程通信

    7、队列(推荐使用)重点

    队列:进程安全的,能够保证数据安全,
    进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的。
    队列就像一个特殊的列表,但是可以设置固定长度,并且从前面插入数据,从后面取出数据,先进先出。
    
    Queue([maxsize]) 创建共享的进程队列。
    参数 :maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。
    底层队列使用管道和锁实现
    

    队列的简单用法示例queue的方法介绍queue的其他方法(了解)子进程与父进程通过队列进行通信

    8、生产者消费者的模型 JoinableQueue

    缓冲区解耦的事情
    Joinablequeue,能记录着你往队列里面put的数据量
    其他方法和queue是一样的,比queue多了两个方法:q.task_done,q.join
    
    队列是进程安全的:同一时间只能一个进程拿到队列中的一个数据,你拿到了一个数据,这个数据别人就拿不到了。
    
    在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
    
    为什么要使用生产者和消费者模式
    
    在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。
    在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。
    同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
    
    什么是生产者消费者模式
    生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。
    生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,
    直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,
    平衡了生产者和消费者的处理能力,并且我可以根据生产速度和消费速度来均衡一下多少个生产者可以为多少个消费者提供足够的服务,
    就可以开多进程等等,而这些进程都是到阻塞队列或者说是缓冲区中去获取或者添加数据。
    
    Python

    那么我们基于队列来实现一个生产者消费者模型,代码示例:

    基于队列的生产者消费者模型

    #生产者消费者模型总结
    
        #程序中有两类角色
            一类负责生产数据(生产者)
            一类负责处理数据(消费者)
    
        #引入生产者消费者模型为了解决的问题是:
            平衡生产者与消费者之间的工作能力,从而提高程序整体处理数据的速度
    
        #如何实现:
            生产者<-->队列<——>消费者
        #生产者消费者模型实现类程序的解耦和
    ````
    #### 9、多生产者多消费者模型 运用JoinableQueue
        在有多个生产者和多个消费者时,由于队列我们说了是进程安全的,我一个进程拿走了结束信号,
        另外一个进程就拿不到了,还需要多发送一个结束信号,有几个取数据的进程就要发送几个结束信号,我们则需要用一个很low的方式去解决
        这样的方法并不适合,所有这里引入了JoinableQueue
    #### JoinableQueue([maxsize])
    ```python
    #JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
    
    #参数介绍:
      maxsize是队列中允许最大项数,省略则无大小限制。
    #方法介绍:
      JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
      q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
      q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止,也就是队列中的数据全部被get拿走了。
     ```
    ![](https://lj.fwit.win/wp-content/uploads/2018/11/a8579f17425d37b3ba954fcff35d610e.png)
    #### JoinableQueue队列实现生产者消费者模型
    
    <details>
    <summary>JoinableQueue队列实现生产者消费者模型</summary>
    
    ```python
    # 2.基于队列写一个多个生产者和多个消费者的模型
    
    from multiprocessing import Process, JoinableQueue
    import time
    
    def producer1(name,q):   # 重复代码的重用就行了,不用每次都写多一个函数,只用参数来控制
        for i in range(5):
            time.sleep(0.5)
            q.put("%s号生产的%s" % (i,name))
            print("%s号的%s好了" %(i,name))
        q.join()  # 只设置一个join就行了,这是个共享的队列空间,q.task_done()都是发到这里来的,#生产完毕,使用此方法进行阻塞,直到队列中所有项目均被处理。
        # print("结束了")
    
    def consumer1(q):
        while 1:
            time.sleep(1)
            res = q.get()
            print("%s被消费者吃掉了" % res)
            q.task_done() # 每拿一次,向q.join()发送一次信号,证明一个数据已经被取走并执行完了
    
    
    if __name__ == '__main__':
        q = JoinableQueue(10)
        # 生产者:厨师们
        p1 = Process(target=producer1, args=("包子", q,))
        p2 = Process(target=producer1, args=("馒头", q,))
        # 消费者:吃货们
        c1 = Process(target=consumer1, args=(q,))
        c2 = Process(target=consumer1, args= (q,))
        c1.daemon = True
        c2.daemon = True # 消费者设置为守护进程,这样消费者这个子进程才会结束,,但是加了守护之后,必须确保生产者的内容生产完并且被处理完了,
        # 所有必须还要在主进程给生产者设置join,才能确保生产者生产的任务被执行完了,并且能够确保守护进程在所有任务执行完成之后才随着主进程的结束而结束。
    
        p1.start()
        p2.start()
        c1.start()
        c2.start()
        p1.join()  # 等待一个子进程就行了,因为q.join是共享的
    
    # 主进程等--->p1,p2,p3等---->c1,c2
        # p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据
        # 因而c1,c2也没有存在的价值了,不需要继续阻塞在进程中影响主进程了。应该随着主进程的结束而结束,所以设置成守护进程就可以了。
  • 相关阅读:
    spring-pool.xml
    spring-jmx.xml
    spring-common.xml
    applicationContext.xml
    spring-webservice.xml
    webservice统一认证
    jdbc.properties
    oracle,mysql分页
    springmvc-servlet.xml
    四、用“”或构造函数创建Java的String区别
  • 原文地址:https://www.cnblogs.com/yipianshuying/p/10034152.html
Copyright © 2011-2022 走看看