zoukankan      html  css  js  c++  java
  • Python Day 33 守护进程、进程同步(互斥锁)、进程间通信(IPC)、共享内存的方式、生产者消费者模型、

      ##内容回顾

    #上周回顾
        1.TCP粘包问题
        2.UDP
            区别:
            UDP 不粘包,不可靠,效率高,适合数据量小的传输,不要求顺序,不需要建立连接
            使用场景:
            UDP 适用于,对速度要求高,但是对数据完整性要求不高,DNS,对战游戏
    
        3.网络编程:目前的  无法并发处理多个客户端
    
    进程
        一个正在运行的程序称之为进程
        进程来自于操作系统
        #在第三代操作系统中,诞生了多道技术
        多道技术 是为了提高计算机资源的利用率,第三代以前是串行执行任务的批处理方式
        空间复用
            同一时间在内存中存储多个程序的数据
            内存区域要相互隔离,物理隔离,你是不可能随意访问的
        时间复用
            切换+保存
            当一个进程遇到了IO操作时,就切换其他进程来执行,切换前要记录当前运行状态
            切换条件:
                遇到了IO
                运行时间过长,时间片用完了
    
        #什么时候要开启进程
            当有一个任务,执行时间过长时,为了提高效率,我们就可以将这个任务交给子进程来完成
        #linux 与windows开启子进程的区别
            linux 会把数据完整copy给子进程 ,作为子进程的初始状态
            windows 会重新导入父进程的代码来获取需要数据,这样一来创建进程的代码又被执行了一次,造成递归创建进程
                所以要将开启子进程的代码放到 if __name__ == "__main__":中 保证创建进程的代码只被父进程执行
       # 如何开启子进程
            1.导入 multiprocessing 中的 Process
                实例化Process类 用target来指定要执行任务函数
                调用start来开启进程
            2.创建一个类,继承Process,覆盖run方法,将任务代码放到run中
    
        #进程之间内存相互隔离
    
        #僵尸进程 有害
            子进程结束了,父进程还在运行,子进程会占用pid 并且将保留最后的运行状态在内存中
            在linux中 父进程需要调用wait/waitpid来获取子进程的残留信息,并清理它
            python中已经封装好了 wait 操作不需要我们自己来清理
    
            如果出现很多僵尸进程
            清除僵尸进程的方法就是 杀死父进程
    
       # 孤儿
            父进程已经挂了 ,子进程还在运行,会被移交给操作系统来管理
    
        #常用属性:
            join  父进程等待子进程运行结束   其实是提高子进程的优先级
            is_alive 是否存活
            getpid 获取自己的进程id
            name 进程的名字
            daemon
            terminate  终止这个子进程    有延迟
            start  启动进程  有延迟
            因为开启和关闭进程 都是操作系统来完成
    
        #进程的状态
            运行 -io>  阻塞  -> 就绪态
            运行 -时间片用完了(运行时间过长)>  就绪态
    
        #阻塞
            当程序遇到了IO操作,就进入了阻塞态
        #非阻塞
            程序正在运行中,没有任何IO操作
        指的是程序的运行状态
    
        #并行
           多个程序同时运行,是真正的同时执行,仅在多核中会出现
    
        #并发
            多个事件同时发生了,看起来像是都在运行,本质上是切换执行
    
        #程序员永恒的话题
            提高程序的效率  减少IO 力求尽可能多占用CPU

      ##守护进程

    #主进程创建守护进程
      其一:守护进程会在主进程代码执行结束后就终止
      其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
    注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止
    
    #使用场景:
    
    ​    父进程交给了子进程一个任务,任务还没有完成父进程就结束了,子进程就没有继续执行的意义了 
    
    ​    例如:qq 接收到一个视频文件,于是开启了一个子进程来下载,如果中途退出了qq,下载任务就没必须要继续运行了 
    
    
    #案例
    from multiprocessing import Process
    import time
    
    # 妃子的一生
    def task():
        print("入宫了.....")
        time.sleep(50)
        print("妃子病逝了......")
    
    
    if __name__ == '__main__':
        # 康熙登基了
        print("登基了.....")
    
        # 找了一个妃子
        p = Process(target=task)
    
        # 设置为守护进程 必须在开启前就设置好
        p.daemon = True
        p.start()
    
        # 康熙驾崩了
        time.sleep(3)
        print("故事结束了!")

      ##进程同步(互斥锁)

    #引入
    进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,
    而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理
    
    # 什么是互斥锁
    
     互斥锁   互相排斥的锁,我在这站着你就别过来,(如果这个资源已经被锁了,其他进程就无法使用了)
    
    需要强调的是: 锁 并不是真的把资源锁起来了,只是在代码层面限制你的代码不能执行
    
    # 为什么需要互斥锁:
    
    并发将带来资源的竞争问题
    当多个进程同时要操作同一个资源时,将会导致数据错乱的问题
    如下列所示:
    
    # 解决方案1:
    
    ​    加join,
    ​    弊端 1.把原本并发的任务变成了穿行,避免了数据错乱问题,但是效率降低了,这样就没必要开子进程了
    ​         2.原本多个进程之间是公平竞争,join执行的顺序就定死了,这是不合理的
    
    # 解决方案2:
    
    ​    就是给公共资源加锁,互斥锁
    ​    互斥锁   互相排斥的锁,我在这站着你就别过来,(如果这个资源已经被锁了,其他进程就无法使用了)
    
    锁 并不是真的把资源锁起来了,只是在代码层面限制你的代码不能执行
    
    # 锁和join的区别:
    1.
    
    ​    join是固定了执行顺序,会造成父进程等待子进程
    ​    锁依然是公平竞争谁先抢到谁先执行,父进程也可以抢占资源,即可以做其他事情
    
    ​    2.最主要的区别:
    ​    join是把进程的任务全部串行
    ​    锁可以锁任意代码 一行也可以  可以自己调整粒度
            称之为:粒度    粒度越大意味着锁住的代码越多   效率越低
                   粒度越小意味着锁住的代码越少   效率越高
    
            MYSQL 中不同隔离级别 其实就是不同的粒度    
    #案例
    from multiprocessing import Process,Lock
    import time,random
    
    def task1(lock):
        # 要开始使用了 上锁
        lock.acquire()       #就等同于一个if判断
        print("hello iam jerry")
        time.sleep(random.randint(0, 2))
        print("gender is boy")
        time.sleep(random.randint(0, 2))
        print("age is  15")
        # 用完了就解锁
        lock.release()
    
    def task2(lock):
        lock.acquire()
        print("hello iam owen")
        time.sleep(random.randint(0,2))
        print("gender is girl")
        time.sleep(random.randint(0,2))
        print("age is  48")
        lock.release()
    
    def task3(lock):
        lock.acquire()
        print("hello iam jason")
        time.sleep(random.randint(0,2))
        print("gender is women")
        time.sleep(random.randint(0,2))
        print("age is  26")
        lock.release()
    
    if __name__ == '__main__':
        lock = Lock()
    
        p1 = Process(target=task1,args=(lock,))
        p2 = Process(target=task2,args=(lock,))
        p3 = Process(target=task3,args=(lock,))
    
        p1.start()
        # p1.join()
    
        p2.start()
        # p2.join()
    
        p3.start()
        # p3.join()
    
        # print("我是父进程,加上锁 ,数据是不会乱,但我也可以抢占资源哦,这一点比join好,不需要等子进程好了才能执行,大家一起抢占嘛")
        
    # 锁的伪代码实现 
    
    # if my_lock == False:
    #     my_lock = True
    #      #被锁住的代码
          my_lock = False 解锁
    
    
    #案例2
    “”“
    过程:
    1.查询余票
    2.购买
    ”“”
    import json,os,time,random
    from multiprocessing import Process,Lock
    #查票
    def show_ticket():
    
        with open("ticket.json","rt",encoding="utf-8") as f:
            data = json.load(f)
            # 模拟延迟
    
        time.sleep(random.randint(0, 2))
        print(" %s正在查询 剩余票数:%s" % (os.getpid(),data["count"]))
    
    def buy_ticket():
        with open("ticket.json", "rt", encoding="utf-8") as f:
            data = json.load(f)
            if data["count"] > 0:
                data["count"] -= 1
                # 模拟延迟
                time.sleep(random.randint(0, 2))
                with open("ticket.json","wt",encoding="utf-8") as f:
                    json.dump(data,f)
                    print("%s 恭喜你抢票成功!" % os.getpid())
            else:
                print("%s抢票失败! 被人抢走了! 开个加速包试试!" % os.getpid())
    
    
    def task(lock):
        #查看余票功能不需要加锁,每个人都可查询
        show_ticket()
        #买票的话需要加锁,因为要进行文件读写操作,不能出现边写边读的情况
        lock.acquire()
        buy_ticket()
        lock.release()
    
    
    if __name__ == '__main__':
    
        lock = Lock()
    
        # 三个人抢票
        for i in range(3):
            p = Process(target=task,args=(lock,))
            p.start()
    
    #注意
    1、不要对同一把执行多出acquire 会锁死导致程序无法执行  一次acquire必须对应一次release
        如:
         l = Lock()
         l.acquire()
         print("抢到了!")
         l.release()
         l.acquire()
         print("强哥毛线!")
    2、想要保住数据安全,必须保住所有进程使用同一把锁
    
    
    
    #总结
    
    #加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
    虽然可以用文件共享数据实现进程间通信,但问题是:
    1.效率低(共享数据基于文件,而文件是硬盘上的数据)
    2.需要自己加锁处理
    
    #因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
    1 队列和管道都是将数据存放于内存中
    2 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
    我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

      ##进程间通信(IPC)

    “”“
    进程间通讯    
    
    ​    通讯指的就是交换数据 
    
    ​    进程之间内存是相互隔离的,当一个进程想要把数据给另外一个进程,就需要考虑IPC
    
    方式:
    
    ​    管道: 只能单向通讯,数据都是二进制  
    
    ​    文件: 在硬盘上创建共享文件  
    
    ​        缺点:速度慢
    
    ​        优点:数据量几乎没有限制  
    
    ​    socket:
    
    ​        编程复杂度较高  
    
    ​    共享内存:必须由操作系统来分配    要掌握的方式*****
    
    ​        优点: 速度快 
    
    ​        缺点: 数据量不能太大 
    ”“”

      ##共享内存的方式

    #1.Manager类 了解
    
    ​        Manager提供很多数据结构  list dict等等
    
    ​        Manager所创建出来的数据结构,具备进程间共享的特点
    需要强调的是 Manager创建的一些数据结构是不带锁的 可能会出现问题
    
    
    #案例
    from multiprocessing import Process,Manager,Lock
    import time
    
    
    def task(data,l):
        l.acquire()
        num = data["num"] #
        time.sleep(0.1)
        data["num"] = num - 1
        l.release()
    
    if __name__ == '__main__':
        # 让Manager开启一个共享的字典(该方式解决之前的进程隔离)
        m = Manager()
        data = m.dict({"num":10})
        #需要强调的是 Manager创建的一些数据结构是不带锁的 可能会出现问题 所以要加锁
        l = Lock()
    
        for i in range(10):
            p = Process(target=task,args=(data,l))
            p.start()
    
        time.sleep(2)
        print(data)
    
    
    #2、 Queue队列   帮我们处理了锁的问题   重点
    
    ​        队列是一种特殊的数据结构,先存储的先取出    就像排队    先进先出
    
    ​        相反的是堆栈,先存储的后取出, 就像衣柜 桶装薯片    先进后出
    
    ​        扩展:
    
    ​        函数嵌套调用时  执行顺序是先进后出     也称之为函数栈  
    
    ​        调用 函数时  函数入栈   函数结束就出栈
    
    #案例
    from multiprocessing import Queue
    # 创建队列  不指定maxsize 则没有数量限制
    q = Queue(3)
    # 存储元素
    # q.put("abc")
    # q.put("hhh")
    # q.put("kkk")
    
    # print(q.get())
    # q.put("ooo")    # 如果容量已经满了,在调用put时将进入阻塞状态 直到有人从队列中拿走数据有空位置 才会继续执行
    
    #取出元素
    # print(q.get())# 如果队列已经空了,在调用get时将进入阻塞状态 直到有人从存储了新的数据到队列中 才会继续
    
    # print(q.get())
    # print(q.get())
    
    
    #block 表示是否阻塞 默认是阻塞的   # 当设置为False 并且队列为空时 抛出异常
    q.get(block=True,timeout=2)
    # block 表示是否阻塞 默认是阻塞的   # 当设置为False 并且队列满了时 抛出异常
    # q.put("123",block=False,)
    # timeout 表示阻塞的超时时间 ,超过时间还是没有值或还是没位置则抛出异常  仅在block为True有效

      ##生产者消费者模型

    #生产者消费者模型
    在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
    
    #为什么要使用生产者和消费者模式
    在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
    
    #什么是生产者消费者模式
    生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
    
    #案例
    基于队列实现生产者消费者模型
    def eat(q):
        for i in range(10):
            # 要消费
            rose = q.get()
            time.sleep(random.randint(0, 2))
            print(rose,"吃完了!")
    
    # 生产任务
    def make_rose(q):
        for i in range(10):
            # 再生产
            time.sleep(random.randint(0, 2))
            print("第%s盘青椒肉丝制作完成!" % i)
            rose = "第%s盘青椒肉丝" % i
            # 将生成完成的数据放入队列中
            q.put(rose)
    
    if __name__ == '__main__':
        # 创建一个共享队列
        q = Queue()
        make_p = Process(target=make_rose,args=(q,))
        eat_p =  Process(target=eat,args=(q,))
    
    
        make_p.start()
        eat_p.start()
  • 相关阅读:
    OLAP ODS项目的总结 平台选型,架构确定
    ORACLE ORA12520
    ORACLE管道函数
    ORACLE RAC JDBC 配置
    ORACLE RAC OCFS连接产生的错误
    ORACLE 启动和关闭详解
    OLAP ODS项目的总结 起步阶段
    ORACLE RAC 配置更改IP
    ORACLE RAC OCR cann't Access
    ORACLE RAC Debug 之路 CRS0184错误与CRS初始化
  • 原文地址:https://www.cnblogs.com/liangzhenghong/p/10969469.html
Copyright © 2011-2022 走看看