zoukankan      html  css  js  c++  java
  • Python Day 34 JoinableQueue、生产者消费者模型、多线程、开启线程的两种方式、线程与进程区别案例、Tread类常用属性、守护进程、线程互斥锁、死锁问题、可重入锁、信号量

      ##内容回顾

    #守护进程
        一个进程可以设为另一个进程的守护进程
        特点是:被守进程结束时,守护进程也会随之结束
        父进程交给子进程一个任务,然而父进程先与子进程结束了,子进程的任务也就没必要继续执行了
        p.daemon = True
    
    
    #互斥锁
        互斥  互相排斥
        锁的本质就是一个标志  这个标志有两个状态 一个为锁定  一个为未锁定
        什么时候用 ,当多个进程要操作同一个资源时,就会造成数据错乱 , 通常将写入操作加锁  ,读取操作不需加
    
        加锁会把原本并发的任务,修改为串行 ,降低了效率  ,保证了数据的安全性
        锁可以指定一部分代码串行,其他任然可以并发
    
        加锁的位置,是需要重点考虑的问题
    
        join 也可以将任务变为串行
            join固定任务的执行顺序
            join会会使得子进程的代码全部串行  ,并且主进程也会阻塞住
    
        注意:
            1.要保证安全必须保证大家用的都是同一把锁
            2.不能对一把锁连续执行acquire  将会导致锁死
    
    
    
    #IPC
        进程间通讯
            1.管道       单向通讯   ,传输的是二进制
            2.共享文件   数据量几乎不受限制,但是速度慢
            3.共享内存   数据量较小  但是速度快
            4.socket     编程复杂  传输的是二进制
    
            最主要的方式就是共享内存
            1.Manager()  作为了解
            2.Queue    必须掌握的方式     是一个种数据容器  其特点是先进先出 并且进程中的#队列   可以共享数据,自带锁机制
    
    #生产者消费者模型
        要解决的问题: 生成者与消费者能力不平衡,导致效率低
        如何解决的:
            1.把生成方和消费分解开耦合,即把任务分到不同进程中  各司其职
            2.分开后,由于进程之间相互隔离,所以需要一个共享的容器,Queue 闪亮登场
                        解决了数据交换的问题 和锁的问题
    #生产者消费者模型总结
    
        #程序中有两类角色
            一类负责生产数据(生产者)
            一类负责处理数据(消费者)
            
        #引入生产者消费者模型为了解决的问题是:
            平衡生产者与消费者之间的工作能力,从而提高程序整体处理数据的速度
            
        #如何实现:
            生产者<-->队列<——>消费者
        #生产者消费者模型实现类程序的解耦和

      ##JoinableQueue

    #JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
    
       #参数介绍:
        maxsize是队列中允许最大项数,省略则无大小限制。    
      #方法介绍:
        JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
        q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
        q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
    
    
    
    #总之
    task_done的次数 == put的调用次数
    
    
    #案例:
    from multiprocessing import JoinableQueue
    # join是等待某个任务完成    able 可以怎么着   Queue 是队列
    # 可以被join的队列
    
    q = JoinableQueue()
    
    print("_______________")
    q.put("123")
    q.put("123")
    
    
    # print("取走了一个%s" % q.get())
    # task_done告诉队列这个数据已经被处理完了
    q.task_done()  #注意:该函数 不是表示任务全部处理完成  而是 取出来某个数据处理完成
    
    # print("再取走了一个%s" % q.get())
    # q.task_done()
    
    print("..............")
    q.join()   # 等待队列中的数据被处理完毕     join task_done的次数 == put的调用次数
    print("over")

      ##生产者消费者模型

    """
    要生成热狗
    思聪负责吃热狗
    
    思路:
        生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。
    解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号 q.put(None),这样消费者在接收到结束信号后就可以break出死循环
     if not hotdog:break   但是有问题:多个生产者的时候一个消费者的情况,其中有一个生产者生产完了就发送None给消费者了,然而另一个生产者没有生产完成
     导致强制结束,你也可以再加一个消费者,这样生产者和消费者一一对应虽然不会提前结束,但是结果是不对的,一个消费者可能吃了两个
     生产者生产的了,如果又加一个生产者,又会出现问题,所以要最终思路:明确商家生成完毕  在明确消费者吃完了  就算结束
     第一步:三个生产者,两个消费者情况下  p.join() print("第一家生成完毕") 。。。。依次到第三家生产完毕,
     第二步:要明确消费者吃完了需要JoinableQueue
     
     最后:还要让消费者设置为守护进程,因为主进程结束了,子进程(消费者)没有必要还在运行,这样才算真正结束
    
    """
    import random
    import time
    from multiprocessing import Process, Queue,JoinableQueue
    
    
    def make_hotdog(name,q):
        for i in range(5):
            time.sleep(random.randint(1,3))
            print("%s生产了热狗%s" % (name,i))
            q.put("%s的%s号热狗" % (name,i))
        # q.put(None)
    
    def eat_hotdog(name,q):
        while True:
            hotdog = q.get()
            # if not hotdog:
            #     break
            time.sleep(random.randint(1, 3))
            print("%s吃掉了%s" % (name,hotdog))
            # 必须记录 该数据 处理完成了
            q.task_done()#向q.join()发送一次信号,证明一个数据已经被取走了
    
    if __name__ == '__main__':
        q = JoinableQueue()
    
        p = Process(target=make_hotdog,args=("owen的热狗店",q))
        p2 = Process(target=make_hotdog,args=("bgon的热狗店",q))
        p3 = Process(target=make_hotdog,args=("jerry的热狗店",q))
    
        c = Process(target=eat_hotdog,args=("思聪",q))
        c.daemon = True
        c2 = Process(target=eat_hotdog, args=("大葱", q))
        c2.daemon = True
        # c3 = Process(target=eat_hotdog, args=("二葱", q))
    
        p.start()
        p2.start()
        p3.start()
    
        c.start()
        c2.start()
        # c3.start()
    
        # 目前的思路  是当商家做完以后 放一个None  作为结束标志   不够好 必须明确商家和消费者的个数
    
    
        # 明确商家生成完毕  在明确消费者吃完了  就算结束
        p.join()
        print("第一家生成完毕")
    
        p2.join()
        print("第二家生成完毕")
    
        p3.join()
        print("第三家生成完毕")
    
    
        # 消费者吃完了
        q.join()
        print("消费吃完了!")
        print("美好的一天结束了!")

      ##多线程

    #1、什么是线程 
    
    ​线程是操作系统最小的运算调度单位,被包含在进程中,一个线程就是一个固定的 执行流程  
    
    #2、线程的进程的关系  重点
    
    线程不能单独存在 必须存在于进程中, 
    
    ​进程是一个资源单位,其包含了运行程序所需的所有资源 
    
    ​线程才是真正的执行单位 
    
    ​没有线程,进程中的资源无法被利用起来,所以一个进程至少包含一个线程,称之为主线程 
    
    ​当我们启动一个程序时,操作系统就会自己为这个程序创建一个主线程
    
    ​线程可以由程序后期开启  ,自己开启线程称之为子线程  
    
    
    #3、为什么需要线程   重点
    目的只有一个就是提高效率 
    
    ​就像一个车间 如果产量更不上 就在造一条流水线 
    
    ​当然可以再造一个新车间,那需要把原材料运过去  ,这个过程是非常耗时的  
    
    ​所以通常情况是创建新的流水线 而不是车间  即 线程
    
    #4、线程的特点  重点
    1.每个进程都会有一个默认的线程
    
    ​2.每个进程可以存在多个线程,多个线程之间,是平等的没有父子关系    所有线程的PID都是相同的  
    
    ​3.同一进程中的所有线程之间数据是共享的
    
    ​4.创建线程的开销远比创建进程小的多

       ##开启线程的两种方式

    from threading import Thread,current_thread
    import time
    
    def task():
        print("2",current_thread())
        print("子线程running")
        time.sleep(10)
        print("子线程over")
    
    
    # 使用方法一  直接实例化Thread类,target参数用于指定子线程要执行的任务
    if __name__ == '__main__':
        t = Thread(target=task)
        t.start()
    
        task()
        #执行顺序不固定 如果开启线程速度足够快  可能子线程先执行
        print("主线程over")
        print("1",current_thread())
    
    
    # 使用方法二:继承Tread类,覆盖run方法
    class MyThread(Thread):
        def run(self):
            print("子线程run!")
    m = MyThread()
    m.start()
    print("主线over")
    
    
    
    # 使用方法和多进程一模一样   开启线程的代码可以放在任何位置  开启进程必须放在判断下面

      ##线程与进程区别案例

    #1.同一进程中 线程之间数据共享
    a = 100
    def task():
        global a
        print("子线程 run........")
        a = 1
    
    t = Thread(target=task)
    t.start()
    
    print(a) # 1
    print("over")
    
    #2.创建线程的开销远比创建进程小的多
    import os
    from threading import  Thread
    from multiprocessing import Process
    
    
    import time
    
    

    def task():
        # print("hello")
        print(os.getpid())
        pass
    
    
    if __name__ == '__main__':
    
    
        st_time = time.time()
    
    
        ts = []
        for i in range(100):
            t = Thread(target=task)
            # t = Process(target=task)
            t.start()
            ts.append(t)
    
    
        for t in ts:
            t.join()
    
    
        print(time.time()-st_time)
        print("主over")
    
    

      ##Tread类常用属性

    # threading模块包含的常用方法
    import threading
    print(threading.current_thread().name) #获取当前线程对象
    print(threading.active_count()) # 获取目前活跃的线程数量
    print(threading.enumerate()) # 获取所有线程对象,第一个为主线程,其他为子线程,要遍历子线程的话记得切片[1:]
    
    
    t = Thread(name="aaa")
    # t.join() # 主线程等待子线程执行完毕
    print(t.name) # 线程名称
    print(t.is_alive()) # 是否存活
    print(t.isDaemon()) # 是否为守护线程

      ##守护进程

    # 主线程代码执行 完毕后 不会立即结束 会等待其他子线程结束
    #  主  会等待非守护线程 即t2
    # 主线程会等待所有非守护线程结束后结束
    
    
    
    # 守护线程会等到所有非守护线程结束后结束  !    前提是除了主线程之外 还有后别的非守护
    # 当然如果守护线程已经完成任务 立马就结束了
    
    # 皇帝如果活着    守护者 妃子死了 皇帝正常运行    皇帝死了 无论守护者是否完成任务 都立即结束
    
    #案例
    from threading import Thread
    import time
    
    def task():
        print("子1running......")
        time.sleep(100)
        print("子1over......")
    
    def task2():
        print("子2running......")
        time.sleep(4)
        print("子2over......")
    
    t = Thread(target=task)
    t.daemon = True
    t.start()
    
    t2 =Thread(target=task2)
    t2.start()
    
    print("主over")
    
    # 子 1 run   子2 run    主   子over   子2over     结束了

       ##线程互斥锁

    #共享意味着竞争 
    
    ​    线程中也存在安全问题,
    
    ​    多线程可以并发执行,一旦并发了并且访问了同一个资源就会有问题  
    
    ​    解决方案:还是互斥锁 

     当多个线程要并发修改同一资源时,也需要加互斥锁来保证数据安全。

    
    

      同样的一旦加锁,就意味着串行,效率必然降低。

    #案例
    from threading import Thread,enumerate,Lock
    import time
    
    number = 10
    
    lock = Lock()
    
    def task():
        global number
        lock.acquire()
        a = number
        time.sleep(0.1)
        number = a - 1
        lock.release()
    
    for i in range(10):
        t = Thread(target=task)
        t.start()
    
    for t in enumerate()[1:]:
        # print(t)
        t.join()
    
    print(number)
    
    # 用于访问当前正在运行的所有线程
    # print(enumerate())

      ##死锁问题

    """
        死锁问题
        当程序出现了不止一把锁,分别被不同的线程持有, 有一个资源 要想使用必须同时具备两把锁
        这时候程序就会进程无限卡死状态 ,这就称之为死锁
        例如:
            要吃饭 必须具备盘子和筷子   但是一个人拿着盘子 等筷子  另一个人拿着筷子等盘子
        
        如何避免死锁问题  
            锁不要有多个,一个足够
            如果真的发生了死锁问题,必须迫使一方先交出锁
            
    """
    
    #案例
    from threading import Lock, current_thread, Thread
    import time
    # 盘子
    lock1 = Lock()
    
    # 筷子
    lock2 = Lock()
    
    def eat1():
        lock1.acquire()
        print("%s抢到了盘子" % current_thread().name)
        time.sleep(0.5)
        lock2.acquire()
        print("%s抢到了筷子" % current_thread().name)
    
        print("%s开吃了!" % current_thread().name)
        lock2.release()
        print("%s放下筷子" % current_thread().name)
    
        lock1.release()
        print("%s放下盘子" % current_thread().name)
    
    
    def eat2():
        lock2.acquire()
        print("%s抢到了筷子" % current_thread().name)
    
        lock1.acquire()
        print("%s抢到了盘子" % current_thread().name)
    
    
        print("%s开吃了!" % current_thread().name)
    
    
        lock1.release()
        print("%s放下盘子" % current_thread().name)
        lock2.release()
        print("%s放下筷子" % current_thread().name)
    
    
    t1 = Thread(target=eat1)
    
    
    t2 = Thread(target=eat2)
    
    t1.start()
    t2.start()

      ##可重入锁

    Rlock  称之为递归锁或者可重入锁
    
    Rlock不是用来解决死锁问题的
    
    与Lock唯一的区别:
    Rlock同一线程可以多次执行acquire 但是执行几次acquire就应该对应release几次
        
    如果一个线程已经执行过acquire 其他线程将无法执行acquire
    
    #案例
    from threading import RLock, Lock, Thread
    
    # l = Lock()
    #
    # l.acquire()
    # print("1")
    # l.acquire()
    # print("2")
    
    
    l = RLock()
    
    # l.acquire()
    # print("1")
    # l.acquire()
    # print("2")
    
    def task():
        l.acquire()
        print("子run......")
        l.release()
    
    
    # 主线程锁了一次
    l.acquire()
    l.acquire()
    
    l.release()
    l.release()
    t1 = Thread(target=task)
    t1.start()

      ##信号量

    可以现在被锁定的代码 同时可以被多少线程并发访问
    Lock 锁住一个马桶  同时只能有一个
    Semaphore 锁住一个公共厕所    同时可以来一堆人
    
    
    用途: 仅用于控制并发访问   并不能防止并发修改造成的问题
    
    #案例
    from threading import Semaphore, Thread
    import time
    
    s = Semaphore(5)
    def task():
        s.acquire()
        print("子run")
        time.sleep(3)
        print("子over")
        s.release()
    
    for i in range(10):
        t = Thread(target=task)
        t.start()
  • 相关阅读:
    Google Maps 尝鲜
    ASDoc 的一些参数
    一本比较简单易懂的中文python入门教程
    word2010 2007中如何去掉首页页码
    转贴:关于出现java.lang.UnsupportedClassVersionError 错误的原因
    Windows下搭建SVN傻瓜式教程
    Red Hat中jdk1.6.0_03 tomcat6.0.35将hudson.war放入webapp后启动tomcat报错X connection to localhost:11.0 broken
    使用alternatives切换red hat linux的jdk版本
    linux安装ant 1.8.2
    反编译插件jadclips
  • 原文地址:https://www.cnblogs.com/liangzhenghong/p/10974069.html
Copyright © 2011-2022 走看看