zoukankan      html  css  js  c++  java
  • IPC机制,线程

    一.IPC机制

      1.队列,及其方法

    from multiprocessing import Process,Queue
    # 实例化创建队列
    """
    队列中的方法:
        1.put,向队列中添加数据
        2.get,从队列中取数据
        3,full,判断队列是否满了
        4,empty,判断队列是否为空
        5,get_nowait,从队列取值,没值不等待直接报错
        6,实例化产生队列时,Queue()括号内可以传数字,表示该队列中的数据量多少,不传为默认值(很大默认无穷)
    """
    q=Queue(3)  # 最大存储数为3
    q.put(1)
    q.put(2)
    q.put(3)
    print(q.full())  # 判断队列是否满了
    # q.put(4)  # 不会报错,会卡在这,等从队列中取出一个数据
    
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.empty())  # 判断队列是否为空
    # print(q.get())  # 不会报错,会卡在这,等待队列传值
    print(q.get_nowait())  # 判断队列是否为空,空直接报错

    ps:full,empty,get_nowait都不适用与多进程  

      2.IPC机制(管道加队列)

    队列:创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。 

    from multiprocessing import  Process,Queue
    
    def put(q):
        q.put('为社么不是你来放')
        print(q)
    
    def get(q):
        print(q.get('为社么不是你来取'))
    
    if __name__ == '__main__':
        q=Queue()
        p=Process(target=put,args=(q,))
        g=Process(target=get,args=(q,))
        p.start()
        g.start()

      3.生产者消费者模型

    1.基于队列产生的消费者生产者模型

    from multiprocessing import Process,Queue
    import time,random,os
    def consumer(q):
        while True:
            res=q.get()
            time.sleep(random.randint(1,3))
            print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))
    
    def producer(q):
        for i in range(10):
            time.sleep(random.randint(1,3))
            res='包子%s' %i
            q.put(res)
            print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))
    
    if __name__ == '__main__':
        q=Queue()
        #生产者们:即厨师们
        p1=Process(target=producer,args=(q,))
    
        #消费者们:即吃货们
        c1=Process(target=consumer,args=(q,))
    
        #开始
        p1.start()
        c1.start()
        print('')
    生产者消费者模型1.0版本

      此时的问题是主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。

      解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环。

    from multiprocessing import Process,Queue
    import time,random,os
    def consumer(q):
        while True:
            res=q.get()
            if res is None:break #收到结束信号则结束
            time.sleep(random.randint(1,3))
            print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))
    
    def producer(q):
        for i in range(10):
            time.sleep(random.randint(1,3))
            res='包子%s' %i
            q.put(res)
            print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))
        q.put(None) #发送结束信号
    if __name__ == '__main__':
        q=Queue()
        #生产者们:即厨师们
        p1=Process(target=producer,args=(q,))
    
        #消费者们:即吃货们
        c1=Process(target=consumer,args=(q,))
    
        #开始
        p1.start()
        c1.start()
        print('')
    
    改良版——生产者消费者模型
    2.0版本

    但上述解决方式,在有多个生产者和多个消费者时,我们则需要用一个很low的方式去解决

    from multiprocessing import Process,Queue
    import time,random,os
    def consumer(q):
        while True:
            res=q.get()
            if res is None:break #收到结束信号则结束
            time.sleep(random.randint(1,3))
            print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))
    
    def producer(name,q):
        for i in range(2):
            time.sleep(random.randint(1,3))
            res='%s%s' %(name,i)
            q.put(res)
            print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))
    
    if __name__ == '__main__':
        q=Queue()
        #生产者们:即厨师们
        p1=Process(target=producer,args=('包子',q))
        p2=Process(target=producer,args=('骨头',q))
        p3=Process(target=producer,args=('泔水',q))
    
        #消费者们:即吃货们
        c1=Process(target=consumer,args=(q,))
        c2=Process(target=consumer,args=(q,))
    
        #开始
        p1.start()
        p2.start()
        p3.start()
        c1.start()
    
        p1.join() #必须保证生产者全部生产完毕,才应该发送结束信号
        p2.join()
        p3.join()
        q.put(None) #有几个消费者就应该发送几次结束信号None
        q.put(None) #发送结束信号
        print('')
    
    多个消费者的例子:有几个消费者就需要发送几次结束信号
    多个消费者生产者

    2.使用JoinableQueue

    创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。 

    JoinableQueue的实例p除了与Queue对象相同的方法之外,还具有以下方法:
    
    q.task_done() 
    使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError异常。
    
    q.join() 
    生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到为队列中的每个项目均调用q.task_done()方法为止。 
    下面的例子说明如何建立永远运行的进程,使用和处理队列上的项目。生产者将项目放入队列,并等待它们被处理。
    
    方法介绍
    from multiprocessing import Process,JoinableQueue
    import time,random,os
    def consumer(q):
        while True:
            res=q.get()
            time.sleep(random.randint(1,3))
            print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))
            q.task_done() #向q.join()发送一次信号,证明一个数据已经被取走了
    
    def producer(name,q):
        for i in range(10):
            time.sleep(random.randint(1,3))
            res='%s%s' %(name,i)
            q.put(res)
            print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))
        q.join() #生产完毕,使用此方法进行阻塞,直到队列中所有项目均被处理。
    
    
    if __name__ == '__main__':
        q=JoinableQueue()
        #生产者们:即厨师们
        p1=Process(target=producer,args=('包子',q))
        p2=Process(target=producer,args=('骨头',q))
        p3=Process(target=producer,args=('泔水',q))
    
        #消费者们:即吃货们
        c1=Process(target=consumer,args=(q,))
        c2=Process(target=consumer,args=(q,))
        c1.daemon=True
        c2.daemon=True
    
        #开始
        p_l=[p1,p2,p3,c1,c2]
        for p in p_l:
            p.start()
    
        p1.join()
        p2.join()
        p3.join()
        print('') 
        
        #主进程等--->p1,p2,p3等---->c1,c2
        #p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据
        #因而c1,c2也没有存在的价值了,不需要继续阻塞在进程中影响主进程了。应该随着主进程的结束而结束,所以设置成守护进程就可以了。
    
    JoinableQueue队列实现消费之生产者模型
    JoinQueue版本

    二.线程

      1.初识线程

    什么是线程:
        进程线程其实都是虚拟单位,都是用来帮助我们形象的描述某种事物
    
        进程:资源单位
        线程:执行单位
            将内存比如成工厂
            那么进程就相当于是工厂里面的车间
            而你的线程就相当于是车间里面的流水线
        ps:每个进程都自带一个线程,线程才是真正的执行单位,进程只是在线程运行过程中
        提供代码运行所需要的资源
    
    为什么要有线程
        开进程
            1.申请内存空间  耗资源
            2."拷贝代码"    耗资源
    
        开线程
            一个进程内可以起多个线程,并且线程与线程之间数据是共享的
        ps:开启线程的开销要远远小于开启进程的开销

      2.创建线程的两种方法

    import time
    from threading import Thread
    
    def eat(name):
        print('%s is eatting'%name)
        time.sleep(3)
        print('%s is waking'%name)
    
    t=Thread(target=eat,args=('xu',))  # 创建线程可以不再main内,但是进程必须再,
    t.start()
    print('')
    
    
    class MyThread(Thread):
        def __init__(self,name):
            super().__init__()
            self.name=name
    
    
        def run(self):
            print('%s is eatting' % self.name)
            time.sleep(3)
            print('%s is waking'%self.name)
    
    if __name__ == '__main__':
        t=MyThread('xu')
        t.start()
        print('')

    ps:创建线程的代价远小于进程,所以在向系统发出创建线程的命令时,可能线程就创建好了

      3.线程对象及其方法

    from threading import Thread,current_thread,active_count
    import time
    import os
    
    def task(name,i):
        print('%s is running'%name)
        print('子current_thread:',current_thread().name)  # 获取线程名
        print('',os.getpid())  # 获取进程的pid码
        time.sleep(i)
    
        print('%s is over'%name)
    # 开线程不需要在__main__代码块内 但是习惯性的还是写在__main__代码块内
    t = Thread(target=task,args=('egon',1))
    t1 = Thread(target=task,args=('jason',2))
    t.start()  # 告诉操作系统开辟一个线程  线程的开销远远小于进程
    t1.start()  # 告诉操作系统开辟一个线程  线程的开销远远小于进程
    t1.join()  # 主线程等待子线程运行完毕
    print('当前正在活跃的线程数',active_count())  # 获取活着的线程
    # 小的代码执行完 线程就已经开启了
    print('')
    print('主current_thread:',current_thread().name)
    print('',os.getpid())

    ps:正在活跃的线程数为1是因为在等待t1的过程中t早已结束,所以存活的进程数为1只剩下主进程了

      4.守护线程

    from threading import Thread,current_thread
    import time
    
    
    
    def task(i):
        print(current_thread().name)
        time.sleep(i)
        print('GG')
    # for i in range(3):
    #     t = Thread(target=task,args=(i,))
    #     t.start()
    t = Thread(target=task,args=(1,))
    t.daemon = True
    t.start()
    print('')
    # 主线程运行结束之后需要等待子线程结束才能结束呢?
    """
    主线程的结束也就意味着进程的结束
    主线程必须等待其他非守护线程的结束才能结束
    (意味子线程在运行的时候需要使用进程中的资源,而主线程一旦结束了资源也就销毁了)
    """
    from threading import Thread
    import time
    def foo():
        print(123)
        time.sleep(1)
        print("end123")
    
    def bar():
        print(456)
        time.sleep(3)
        print("end456")
    
    if __name__ == '__main__':
        t1=Thread(target=foo)
        t2=Thread(target=bar)
        t1.daemon=True
        t1.start()
        t2.start()
        print("main-------")
    迷惑人的小例子

    出现上述结果的原因是因为在程序运行中,主线程的结束意味着进程的结束,进程一结束,假设内部线程还未结束,线程就没有资源可以使用了,所以主线程会等待所有非守护线程结束才结束.所以在这个立体中主线程会等待bar线程的结束才结束,而当bar线程结束时foo已经执行完了,所以输出结果如上图所示

      5.线程间通信

    from threading import Thread
    
    
    money = 666
    
    def task():
        global money
        money = 999
    
    t = Thread(target=task)
    t.start()
    t.join()
    print(money)  # 999

    ps:进程的创建是在内存中开辟一块空间,而线程是在进程的内部的,所以在同一进程内部的线程都是使用那个进程中的资源,所以线程间的通信的没有隔阂的

      6.线程互斥锁

    from threading import Thread,Lock
    import time
    
    
    n = 100
    
    def task(mutex):
        global  n
        mutex.acquire()
        tmp = n
        time.sleep(0.1)
        n = tmp - 1
        mutex.release()
    
    t_list = []
    mutex = Lock()
    for i in range(100):
        t = Thread(target=task,args=(mutex,))
        t.start()
        t_list.append(t)
    for t in t_list:
        t.join()
    print(n)
  • 相关阅读:
    php json_encode JSON_UNESCAPED_UNICODE
    ubuntu 添加多个ssh公钥和私钥
    如何自动化新增配置文件呢?
    git配置ssh公钥
    项目PHP新知识点
    mysql 数据库复制表 create table city1 like city;
    .NET 泛型集合数据写CSV文件
    .NET C# 泛型队列
    逆向地理编码--根据地址搜索定位,点击地图、获取经纬度信息
    正向地理编码-根据输入地址获取经纬度
  • 原文地址:https://www.cnblogs.com/z929chongzi/p/11340842.html
Copyright © 2011-2022 走看看