zoukankan      html  css  js  c++  java
  • day39---僵尸进程、孤儿进程、守护进程

    进程对象及其其他方法

    • windows上常看进程
    # tasklist
    

    查看具体的进程PID

    # tasklist | findstr 364
    

    • linux下查看进程
    # ps aux
    # ps ajx
    # ps -AF
    

    查看具体的进程IPD

    # ps aux|grep 1067 
    

    查看当前的进程号以及父进程号

    from multiprocessing import Process, current_process
    import os
    import time
    
    
    def beast(name):
        print('天王盖地虎'.center(30, '='))
        print(f'<子>当前的进程号:{current_process().pid}')  # 查看当前进程的进程号
        time.sleep(3)
        print(f'{name}一米五'.center(30, '^'))
        print(f'<子>当前进程的父进程:{os.getppid()}')  # 查看当前进程的父进程号
    
    
    if __name__ == '__main__':
        p = Process(target=beast, args=('egon',))
        p.start()
        p.join()
        print(f'<主>当前进程的进程号:{os.getpid()}')  # 查看当前进程的进程号
        print(f'<主>当前进程的父进程:{os.getppid()}')
    

    其他方法

    • 杀死当前进程
    """
    p.terminate()
    """
    
    if __name__ == '__main__':
        p = Process(target=beast, args=('egon',))
        p.start()
        p.terminate()
        p.join()
        print(f'<主>当前进程的进程号:{os.getpid()}')  # 查看当前进程的进程号
        print(f'<主>当前进程的父进程:{os.getppid()}')
    
    

    显示结果:

    """
    <主>当前进程的进程号:130676
    <主>当前进程的父进程:130662
    """
    
    • 判断进程是否存活
    """
    p.is_alive()
    """
    if __name__ == '__main__':
        p = Process(target=beast, args=('egon',))
        p.start()
        p.terminate()
        if p.is_alive():
            print(f'<主>当前进程的进程号:{os.getpid()}')  # 查看当前进程的进程号
            print(f'<主>当前进程的父进程:{os.getppid()}')
        else:
            print(False)
    

    按照程序逻辑来判断:杀死子进程后,p.is_alive()的值为False,输出到屏幕的应该是"False",但是事实是这样吗?

    """
    答案是否定的。因为p.terminate()会向系统调用杀死进程请求,其执行速度比代码的执行速度慢。因此,我们看到输出屏幕的还是: 
    <主>当前进程的进程号:568
    <主>当前进程的父进程:553
    """
    

    僵尸进程与孤儿进程

    • 什么是僵尸进程
    """
    定义:僵尸进程:一个进程使用fork创建子进程,如果子进程退出,而父进程并没有调用wait或waitpid获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中。这种进程称之为僵尸进程。
    """
    

    僵尸进程是有害的

    • 孤儿进程
    """
    定义:孤儿进程:一个父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将成为孤儿进程。孤儿进程将被init进程(进程号为1)所收养,并由init进程对它们完成状态收集工作。
    """
    

    孤儿进程是无害的

    僵尸进程的解决方案:

    """
    1.等待父进程正常结束后会调用wait/waitpid去回收僵尸进程
    2.但如果父进程是一个死循环,永远不会结束,那么该僵尸进程就会一直存在,僵尸进程过多,就是有害的:
        (1)杀死父进程
        (2)对开启的子进程应该记得使用join,join会回收僵尸进程
    3.使用signal处理僵尸进程https://blog.csdn.net/u010571844/article/details/50419798
    """
    

    守护进程

    主进程创建守护进程

    """
    1.守护进程会在主进程代码结束后就终止
    2.守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
    """
    

    注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

    假设有宫女[jason]和总管[ egon],他们爱的死去活来,[egon]要守护[jason],

    [egon]给一滴心头血[jason](如果,jason挂了,egon都没有出场机会)

    from multiprocessing import Process
    import time
    import random
    
    
    class Beast(Process):
        def __init__(self, name):
            super().__init__()
            self.name = name
    
        def run(self):
            print(f'总管[{self.name}]还活着')
            time.sleep(random.randrange(1, 3))
            print(f'总管[{self.name}]殉情自杀')
    
    
    if __name__ == '__main__':
        name = 'jason'
        p = Beast('egon_dsb')
        p.daemon = True
        p.start()
        print(f'宫女[{name}]驾鹤西去')
    

    于是乎,出现了以下一幕:

    """
    宫女[jason]驾鹤西去
    """
    

    进程同步锁(互斥锁)

    进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的;而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理

    """
    并发运行,效率高,但竞争同一打印终端,带来了打印错乱
    加锁:由并发变成了串行,牺牲了运行效率,保证了数据安全
    """
    

    模拟抢票的示例

    from multiprocessing import Process, Lock
    import os
    import json
    import time
    import random
    
    
    class BuyTicket(Process):
        def __init__(self, name,mutex):
            super().__init__()
            self.name = name
            self.mutex = mutex
    
        def search(self):
            with open('ticket.json', mode='r', encoding='utf-8') as fr:
                ticket_dic = json.load(fr)
            print(f"用户[{self.name}]查询到余票:{ticket_dic.get('ticket_num')}")
    
        def buy(self):
            with open('ticket.json', mode='r', encoding='utf-8') as fr:
                ticket_dic = json.load(fr)     # 先查票
            time.sleep(random.randrange(1,3)) # 模拟网络延迟
            if ticket_dic.get('ticket_num'):
                ticket_dic['ticket_num'] -= 1
                with open('ticket.json',mode='w',encoding='utf-8') as fw:
                    json.dump(ticket_dic,fw)
                print(f'用户[{self.name}]购票成功!')
            else:
                print(f'用户[{self.name}]购票失败!')
    
        def run(self):
            print(f'当前进程号:{os.getpid()}')
            self.search()
            self.mutex.acquire()
            self.buy()
            self.mutex.release()
    
    if __name__ == '__main__':
        mutex = Lock()
        p1 = BuyTicket('egon_dsb',mutex)
        p2 = BuyTicket('alex_dsb',mutex)
        p3 = BuyTicket('jason_dsb',mutex)
        p4 = BuyTicket('tank_dsb',mutex)
        p1.start()
        p2.start()
        p3.start()
        p4.start()
    

    运行结果:

    """
    当前进程号:8120
    用户[egon_dsb]查询到余票:1
    当前进程号:8123
    用户[tank_dsb]查询到余票:1
    当前进程号:8121
    用户[alex_dsb]查询到余票:1
    当前进程号:8122
    用户[jason_dsb]查询到余票:1
    用户[egon_dsb]购票成功!
    用户[tank_dsb]购票失败!
    用户[alex_dsb]购票失败!
    用户[jason_dsb]购票失败!
    """
    

    扩展:

    """
    行锁 表锁
    注意:
    1.锁不要轻易的使用,容易造成死锁现象(我们写代码一般不会用到,都是内部封装好的)
    2.锁只在处理数据的部分加来保证数据安全(只在争抢数据的环节加锁处理即可) 
    """
    

    因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。

    """
    1 队列和管道都是将数据存放于内存中
    2 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
    我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
    """
    

    进程之间的通信

    """
    进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的
    """
    

    管道:

    """
     stdin,stdout,stderr 
    """
    import subprocess
    
    obj1 = subprocess.Popen('ps -AF',
                            shell=True,
                            stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE)
    obj2 = subprocess.Popen('grep python',
                            shell=True,
                            stdin=obj1.stdout,
                            stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE)
    

    队列Queue模块

    """
    队列:先进先出
    队列就是管道+锁
    """
    

    创建队列的类

    """
    Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
    """
    

    maxsize是队列中允许最大项数,省略则无大小限制。

    主要方法:

    """
    1 q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
    2 q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
     
    3 q.get_nowait():同q.get(False)
    4 q.put_nowait():同q.put(False)
     
    5 q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
    6 q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
    """
    

    其他方法(了解):

    """
    1 q.cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞
    2 q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,如果某个使用者正在被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
    3 q.join_thread():连接队列的后台线程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread方法可以禁止这种行为
    """
    

    示例:

    from multiprocessing import Queue
    
    # 创建一个队列
    q = Queue(5)  # 括号内可以传数字 标示生成的队列最大可以同时存放的数据量
    
    # 往队列中存数据
    q.put(111)
    q.put(222)
    q.put(333)
    # print(q.full())  # 判断当前队列是否满了
    # print(q.empty())  # 判断当前队列是否空了
    q.put(444)
    q.put(555)
    # print(q.full())  # 判断当前队列是否满了
    
    # q.put(666)  # 当队列数据放满了之后 如果还有数据要放程序会阻塞 直到有位置让出来 不会报错
    
    """
    存取数据 存是为了更好的取
    千方百计的存、简单快捷的取
    
    同在一个屋檐下
    差距为何那么大
    """
    
    # 去队列中取数据
    v1 = q.get()
    v2 = q.get()
    v3 = q.get()
    v4 = q.get()
    v5 = q.get()
    # print(q.empty())
    # V6 = q.get_nowait()  # 没有数据直接报错queue.Empty
    # v6 = q.get(timeout=3)  # 没有数据之后原地等待三秒之后再报错  queue.Empty
    try:
        v6 = q.get(timeout=3)
        print(v6)
    except Exception as e:
        print('一滴都没有了!')
    
    # # v6 = q.get()  # 队列中如果已经没有数据的话 get方法会原地阻塞
    # print(v1, v2, v3, v4, v5, v6)
    
    """
    q.full()
    q.empty()
    q.get_nowait()
    在多进程的情况下是不精确
    """
    

    IPC机制

    from multiprocessing import Queue, Process
    
    """
    研究思路
        1.主进程跟子进程借助于队列通信
        2.子进程跟子进程借助于队列通信
    """
    def producer(q):
        q.put('我是23号技师 很高兴为您服务')
    
    
    def consumer(q):
        print(q.get())
    
    
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=producer,args=(q,))
        p1 = Process(target=consumer,args=(q,))
        p.start()
        p1.start()
    

    生产消费者模型

    生产消费者模型

    #生产者消费者模型总结
    
        #程序中有两类角色
            一类负责生产数据(生产者)
            一类负责处理数据(消费者)
    
        #引入生产者消费者模型为了解决的问题是:
            平衡生产者与消费者之间的工作能力,从而提高程序整体处理数据的速度
    
        #如何实现:
            生产者<-->队列<——>消费者
        #生产者消费者模型实现类程序的解耦和
    

    示例:

    from multiprocessing import Process, Queue
    import random
    import time
    import os
    
    
    def producer(name, q):
        time.sleep(random.randrange(1, 3))
        print(f'{os.getpid()},技师[{name}]上线!')
        q.put(name)
    
    
    def customer(name, q):
        print('欢迎来到天上人间桑拿中心'.center(30, '='))
        while True:
            res = q.get()
            if res is None: break
            time.sleep(random.randrange(1, 3))
            print(f'{name}叫了[{res}]进行服务!')
    
    
    if __name__ == '__main__':
        q = Queue()
        # 两个生产者
        p1 = Process(target=producer, args=('egon_dsb', q))
        p2 = Process(target=producer, args=('tank', q))
        # 一个消费者
        c1 = Process(target=customer, args=('jason', q))
        p1.start()
        p2.start()
        c1.start()
        p1.join()
        p2.join()
        q.put(None)
    

    JoinableQueue队列模块

    #JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
    
       #参数介绍:
        maxsize是队列中允许最大项数,省略则无大小限制。    
      #方法介绍:
        JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
        q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
        q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
    

    示例:

    from multiprocessing import Process, JoinableQueue
    import random
    import time
    import os
    
    
    def producer(name, q):
        time.sleep(random.randrange(1, 3))
        print(f'{os.getpid()},技师[{name}]上线!')
        q.put(name)
        q.join()
    
    
    def customer(name, q):
        print('欢迎来到天上人间桑拿中心'.center(30, '='))
        while True:
            res = q.get()
            time.sleep(random.randrange(1, 3))
            print(f'{name}叫了[{res}]进行服务!')
            q.task_done()  #向q.join()发送一次信号,证明一个数据已经被取走了
    
    
    if __name__ == '__main__':
        q = JoinableQueue()
        # 两个生产者
        p1 = Process(target=producer, args=('egon_dsb', q))
        p2 = Process(target=producer, args=('tank', q))
        # 一个消费者
        c1 = Process(target=customer, args=('jason', q))
        p1.start()
        p2.start()
        c1.daemon = True
        c1.start()
        p1.join()
        p2.join()
        q.join()
        
        """
        #主进程等--->p1,p2等---->c1
        #p1,p2结束了,证明c1肯定全都收完了p1,p2发到队列的数据
        #因而c1也没有存在的价值了,应该随着主进程的结束而结束,所以设置成守护进程
        """
    
  • 相关阅读:
    Educational Codeforces Round 10 C. Foe Pairs 水题
    Educational Codeforces Round 10 B. z-sort 构造
    CDOJ 1048 Bob's vector 三分
    Educational Codeforces Round 10 A. Gabriel and Caterpillar 模拟
    第14届电子科大初赛民间盗版部分题目题解
    HDU 5654 xiaoxin and his watermelon candy 离线树状数组 区间不同数的个数
    HDU 5653 Bomber Man wants to bomb an Array. dp
    HDU 5652 India and China Origins 二分+并查集
    HDU 5651 xiaoxin juju needs help 数学
    HDU 5650 so easy 数学
  • 原文地址:https://www.cnblogs.com/surpass123/p/12762690.html
Copyright © 2011-2022 走看看