zoukankan      html  css  js  c++  java
  • 多进程(了解):守护进程,互斥锁,信号量,进程Queue与线程queue(生产者与消费者模型)

    一、守护进程

      主进程创建守护进程,守护进程的主要的特征为:①守护进程会在主进程代码执行结束时立即终止;②守护进程内无法继续再开子进程,否则会抛出异常。

    实例:

    from multiprocessing import Process
    from threading import Thread
    import time
    def foo():  # 守护进程
        print(123)
        time.sleep(1)
        print("end123")
    
    def bar():
        print(456)
        time.sleep(3)
        print("end456")
    
    if __name__ == '__main__':
    
        p1=Process(target=foo)
        p2=Process(target=bar)
    
        p1.daemon=True
        p1.start()
        p2.start()
        print("main-------") #打印该行则主进程代码结束,则守护进程p1应该被终止,可能会有p1任务执行的打印信息123,因为主进程打印main----时,p1也执行了,但是随即被终止

      注:打印最后一行主进程代码结束,则守护进程p1应该被终止,可能会有p1任务执行的打印信息‘start123’,因为主进程打印main-时,p1也执行了,但是随即被终止。

    from threading import Thread
    import time
    def foo():  # 守护线程
        print(123)
        time.sleep(1)
        print("end123")
    
    def bar():
        print(456)
        time.sleep(3)
        print("end456")
    
    if __name__ == '__main__':
        p1=Thread(target=foo)
        p2=Thread(target=bar)
    
        p1.daemon=True
        p1.start()
        p2.start()
        print("main-------")
    # 123
    # 456
    # main-------
    # end123
    # end456
    守护线程
    from threading import Thread
    import time
    def foo():
        print(123)
        time.sleep(5)
        print('end123')
    def bar():
        print('start456')
        time.sleep(3)
        print('end456')
    if __name__ == '__main__':
        t1=Thread(target=foo)
        t2=Thread(target=bar)
        t1.daemon=True  #必须放在start()前
        t1.start()
        t2.start()
        print('main')
    
    # 123
    # start456
    # main
    # end456
    与上比较

    二、互斥锁 

      进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,竞争带来的结果就是错乱,如何控制,就是加锁处理(即局部实行串行)。

    模拟抢票实例:

    from multiprocessing import Process,Lock
    import json,os,time,random
    
    def search():
        with open('db.txt',encoding='utf-8')as f:
            dict = json.load(f)
            print('%s 剩余票数 %s'%(os.getpid(),dict['count']))
    
    def get():
        with open('db.txt',encoding='utf-8') as reaf_f:
            dic = json.load(reaf_f)
    
        if dic['count']>0:
            dic['count'] -= 1
            time.sleep(random.randint(1,3))  # 模拟手速,网速
            with open('db.txt','w',encoding='utf-8') as write_f:
                json.dump(dic,write_f)
                print('%s 抢票成功' %os.getpid())
        else:
            print('剩余票数为%s,购票失败'%dic['count'])
    
    def task(mutex):
        search()    # 20个人都可以并发的查询票数
        mutex.acquire()    # 加锁
        get()              #通过锁,查询到结果的20人通过竞争逐一买票。前一个释放锁后后一个才可以进入,即串行
        mutex.release()    # 解锁
    
    if __name__ == '__main__':
        mutex = Lock()
        for i in range(20):  # 20个人抢票
            p = Process(target=task,args=(mutex,))
            p.start()

    三、信号量

    同进程的一样

    Semaphore管理一个内置的计数器,
    每当调用acquire()时内置计数器-1;
    调用release() 时内置计数器+1;
    计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

    实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):

    from multiprocessing import Process,Semaphore
    # from threading import Thread,Semaphore
    import time,random,os
    
    
    def task(sm):
        with sm:
            print('%s 上厕所' %os.getpid())
            time.sleep(random.randint(1,3))
    
    if __name__ == '__main__':
        sm = Semaphore(3)
        for i in range(10):
            p= Process(target=task,args=(sm,))
            p.start()

    与进程池是完全不同的概念,进程池Pool(4),最大只能产生4个进程,而且从头到尾都只是这四个进程,不会产生新的,而信号量是产生一堆线程/进程

    四、进程间通信机制(IPC)

      基于互斥锁以上两种缺点,multiprocessing模块为我们提供了基于消息通信IPC机制:队列和管道。队列和管道都是将数据存放于内存中;队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

    1、队列(推荐)

    (1)队列相关知识

      队列创建介绍:

    from multiprocessing import Queue      #引入Queue类
    q=Queue(n)              #实例化,参数n代表队列中最大允许存放数,省略则无限制

      常见使用方法:

    q.put()                 #用于插入数据到队列
    q.get()                 #用于从队列读取并删除一个数据
    q.put_nowait()          #当队列存在数据已超过最大限制数,则抛出Queue.full异常
    q.get_nowait()          #当队列中已经不存在可取数据时,则抛出Queue.empty异常

      例子:

    from multiprocessing import Queue
    q=Queue(3)
    q.put({'a':1})
    q.put('bbbb')
    q.put((3,2,1))
    # q.put_nowait(1111111)  #queue.Full
    
    print(q.get())
    print(q.get())
    print(q.get())
    # print(q.get_nowait())  #queue.Empty

    (2)生产消费者模型

      生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

      实例1:

    from multiprocessing import Process,Queue
    import time,random,os
    
    def procducer(q):
        for i in range(10):
            res='包子%s' %i
            time.sleep(0.5)
            q.put(res)
            print('%s 生产了 %s' %(os.getpid(),res))
    
    def consumer(q):
        while True:
            res=q.get()
            if res is None:
                break
            print('%s 吃 %s' %(os.getpid(),res))
            time.sleep(random.randint(2,3))
    
    
    if __name__ == '__main__':
        q=Queue()
        p=Process(target=procducer,args=(q,))
        c=Process(target=consumer,args=(q,))
    
        p.start()
        c.start()
        print('')

    此时的问题是主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环。

      例子2:

    from multiprocessing import Process,Queue
    import time,random,os
    
    def procducer(q):
        for i in range(10):
            res='包子%s' %i
            time.sleep(0.5)
            q.put(res)
            print('%s 生产了 %s' %(os.getpid(),res))
    
    def consumer(q):
        while True:
            res=q.get()
            if res is None:
                break
            print('%s 吃 %s' %(os.getpid(),res))
            time.sleep(random.randint(2,3))
    
    
    if __name__ == '__main__':
        q=Queue()
        p=Process(target=procducer,args=(q,))
        c=Process(target=consumer,args=(q,))
    
        p.start()
        c.start()
    
        p.join()
        q.put(None)
        print('')

    注意:以上发送可以放在生产函数中循环完进行发送,当然也可以如上放在主进程中进行发送,但是前提是必须等生产子进程结束才可以。

  • 相关阅读:
    MVC是什么?
    Slice Header中的field_pic_flag的含义?
    Slice header 中的frame_num的含义?
    上下文管理器 contextlib
    mixin模式特点
    Flask中路由原理
    使用Docker搭建Django,Nginx,R,Python部署环境
    使用Docker+Jenkins自动构建部署
    Jenkins 关闭和重启
    使用pipenv管理python项目
  • 原文地址:https://www.cnblogs.com/jassin-du/p/7978645.html
Copyright © 2011-2022 走看看