zoukankan      html  css  js  c++  java
  • 进程间的通信IPC机制/生产者消费模型/线程初识(含线程互斥锁)

    进程间的通信IPC机制

    • 队列:先进先出
    • 堆栈:先进后出
    # 队列
    from multiprocessing import Queue
    ​
    q = Queue(3)  # 括号内可以传参数,表示的是这个队列的最大存储数为3
    # 往队列中添加数据
    q.put(1)
    q.put(2)
    print(q.empty())  # False,判断队列中的数据是否为空
    print(q.full())  # 返回False,判断队列是否满了
    q.put(3)
    # q.put(4) 当队列已满时,再插入数据,不会报错,会原地等待,知道队列中出现数据可以取走(阻塞态)
    print(q.full())  # 返回True,判断队列是否满了
    print(q.empty())  # True,判断队列中的数据是否为空
    print(q.get_nowait())  # 取值,没有值就报错
    # 从队列中取数据
    print(q.get())  # 返回1
    print(q.get())  # 返回2
    print(q.get())  # 返回3
    print(q.get())  # 当队列中的数据被取完之后,再次获取时,程序会阻塞,直到队列中被放入值

    注意:full,get_nowait,empty都不适用于多进程的情况

    # 进程间的通信IPC机制
    # 子进程放数据,主进程获取数据,两个子进程相互放,存取数据 from multiprocessing import Process,Queue ​ def producer(q): q.put('hello GF~') def consumer(q): print(q.get()) ​ if __name__ == '__main__': q = Queue() p = Process(target=producer,args=(q,)) c = Process(target=consumer,args=(q,)) p.start() c.start() # hello GF~

    生产者消费模型

    • 生产者:生产/制造数据的
    • 消费者:消费/处理数据的

    例子:做包子的与买包子的

    • 买包子的远比卖包子的多,做包子的远比买包子的少

    PS:供需不平衡

    from multiprocessing import Process,Queue,JoinableQueue
    import time
    ​
    def producer(name,food,q):
        for i in range(10):
            data = '%s生产了%s'%(name,food)
            time.sleep(random.random())
            q.put(data)
            print(data)
        
    def consumer(name,q):
        while True:
            data = q.get()
            if data == None: break
            print('%s吃了%s%s'%(name,data,i))
            time.sleep(random.random())
            q.task_done()  # 告诉队列你已经从队列中取出了一个数据,并且处理完毕了
    if __name__ == '__main__':
        q = JoinableQueue()
        p = Process(target=producer,args=('大厨egon','馒头',q))
        p1 = Process(target=producer,args=('跟班tank','生蚝',q))
        c = Process(target=consumer,args=('某某某',q))
        c1 = Process(target=consumer,args=('某某',q))
        p.start()
        p1.start()
        c.daemon = True
        c1.daemon = True
        c.start()
        c1.start()
        p.join()
        p1.join()
        q.join()  # 等待队列中数据全部取出
        # q.put(None)
        # q.put(None)
        # 有几个消费者就写几个None

    线程理论

    什么是线程?

    • 进程:资源单位
    • 线程:执行单位

    将内存比作工厂,那么进程就相当于是工厂内车间,而线程就相当于车间内的流水线

    PS:每个进程都自带一个线程,线程才是真正的执行单位,进程只是在进程运行过程中提供代码运行所需要的资源

    为什么要有线程?

    • 开进程
      • 1.申请内存空间,耗资源
      • 2."拷贝代码" 耗费资源
    • 开线程
      • 一个进程内可以有多个线程,并且线程与线程之间数据是共享的

    PS:开启线程的开销要远远小于开启进程的开销


    创建线程的两种方式

    第一种:

    from threading import Thread
    import time
    ​
    def task(name):
        print('%s is running'%name)
        time.sleep(3)
        print('%s is over'%name)
        
    # 开线程不需要再__main__代码块内,但是习惯性的还是写在__main__代码块内
    t = Thread(target=task,args=('egon',))
    t.start()  # 告诉操作系统开辟一个线程,线程开销远小于进程
    # 小的代码执行完,线程就已经开启了
    print('')
    '''
    egon is running
    主
    (三秒后出现)
    egon is over
    '''

    第二种:

    from threading import Thread
    import time
    ​
    class MyThread(Thread):
        def __init__(self,name):
            super().__init__()
            self.name = name
        def run(self):
            print('%s is running'%self.name)
            time.sleep(3)
            print('%s is over'%self.name)
    ​
    t = MyThread('egon')
    t.start()
    print('')
    '''
    egon is running
    主
    (三秒后出现)
    egon is over
    '''

    线程对象及其他方法

    同一个进程内可以有多个线程:

    from threading import Thread,current_thread,active_count
    import time
    import os
    ​
    def task(name):
        print('%s is running'%name)
        print('子current_thread:',current_thread().name)
        print('',os.getpid())
        time.sleep(3)
        print('%s is over'%name)
    ​
    t = Thread(target=task,args=('egon',))
    t1 = Thread(target=task,args=('jason',))
    t.start()
    t1.start()
    print('')
    print('主current_thread:',current_thread().name)
    print('',os.getpid())
    '''
    egon is running
    子current_thread: Thread-1
    子 3404
    jason is running
    主
    主current_thread: Thread-2
    主current_thread: MainThread
    子 3404
    主 3404
    egon is over
    jason is over
    '''

    记录一个进程内的当前正在活跃的线程数(当t.join()时):

    from threading import Thread,current_thread,active_count
    import time
    import os
    ​
    def task(name,i):
        print('%s is running'%name)
        time.sleep(i)
        print('%s is over'%name)
    ​
    t = Thread(target=task,args=('egon',1))
    t1 = Thread(target=task,args=('egon',2))
    t.start()
    t1.start()
    t.join()  # 主线程等待子线程运行完毕
    print('当前正在活跃的线程数:',active_count())
    print('')
    '''
    egon is running
    jason is running
    egon is over
    当前正在活跃的线程数:2-----其中主线程也算一个
    主
    jason is over
    '''

    记录一个进程内的当前正在活跃的线程数(当t1.join()时):

    from threading import Thread,current_thread,active_count
    import time
    import os
    ​
    def task(name,i):
        print('%s is running'%name)
        time.sleep(i)
        print('%s is over'%name)
    ​
    t = Thread(target=task,args=('egon',1))
    t1 = Thread(target=task,args=('egon',2))
    t.start()
    t1.start()
    t1.join()  # 主线程等待子线程运行完毕
    print('当前正在活跃的线程数:',active_count())
    print('')
    '''
    egon is running
    jason is running
    egon is over
    jason is over
    当前正在活跃的线程数:1------只剩下主线程了
    主
    '''

    守护线程

    未设置守护线程前的情况:

    from threading import Thread,current_thread
    import time
    ​
    def task(i):
        print(curren_thread().name)
        time.sleep(i)
        print('GG')
    ​
    for i in range(3):
        t = Thread(target=task,args=(i,))
        t,start()
    ​
    print('')
    '''
    Thread-1
    GG
    Thread-2
    Thread-3
    主
    GG
    GG
    '''
    # 主线程运行结束之后需要等待子线程结束后才能结束

    设置守护线程后的情况:

    from threading import Thread,current_thread
    import time
    ​
    def task(i):
        print(current_thread().name)
        time.sleep(i)
        print('GG')
    ​
    t = Thread(target=task,args=(1,))
    t.daemon = True
    t.start()
    print('')
    '''
    Thread-1
    主
    '''

    总结:

    • 主线程的结束就意味着进程的结束
    • 主线程必须等待其他非守护线程子线程的结束才能结束(销毁内存空间)
    • 意味着子线程在运行的时候需要使用进程中的资源,而主线程一旦结束了资源也就销毁了

    关于深入理解守护线程的案例:

    from threading import Thread
    from multiprocessing import Process
    import time
    def foo():
        print(123)
        time.sleep(1)
        print("end123")
    ​
    def bar():
        print(456)
        time.sleep(3)
        print("end456")
    ​
    if __name__ == '__main__':
        t1=Thread(target=foo)
        t2=Thread(target=bar)
        t1.daemon=True
        t1.start()
        t2.start()
        print("main-------")
    '''
    (瞬间出现)
    123
    456
    main-------
    (一秒后出现)
    end123
    (两秒后出现,合计三秒出现)
    end456
    '''

    分析:

    • 单纯的主线程结束后并未全部结束,而主线程必须等待所有非守护线程的结束才能结束
    • 案例中主线程在等待bar函数的结束,而bar要结束需要3s,且bar函数对应的t2线程并非守护线程,这时候哪怕t1线程为守护线程,且对应的foo函数早已将1s走完,整个主线程也未结束

    线程间通信

    from threading import Thread
    ​
    money = 666def task():
        global money
        money = 999
    ​
    t = Thread(target=task)
    t.start()
    t.join()
    print(money)  # 999

    线程互斥锁

    from threading import Thread,Lock
    import time
    ​
    n = 100def task(mutex):
        global n
        mutex.acquire()
        tmp = n
        time.sleep(0.1)
        n = tmp - 1
        mutex.release()
    ​
    t_list = []
    mutex = Lock()
    for i in range(100):
        t = Thread(target=task,args=(mutex,))
        t,start()
        t_list.append(t)
    for t in t_list:
        t.join()
    print(n)  # 0
  • 相关阅读:
    Jmeter分离登录事务的另一种方式
    数据驱动 vs 关键字驱动:对搭建UI自动化测试框架的探索
    使用jmeter往指定文件中插入一定数量的数据
    JMeter 各组件介绍以及用法
    JVM(2) Java内存溢出异常
    JVM(1) Java内存区域
    OptimalSolution(1)--递归和动态规划(4)其他问题
    MySQL开发篇(5)索引、视图、触发器、SQL中的安全问题、SQL Mode、
    OptimalSolution(1)--递归和动态规划(3)数组和字符串问题
    OptimalSolution(1)--递归和动态规划(2)矩阵的最小路径和与换钱的最少货币数问题
  • 原文地址:https://www.cnblogs.com/zhukaijian/p/11340904.html
Copyright © 2011-2022 走看看