zoukankan      html  css  js  c++  java
  • day33:进程锁&事件&进程队列&进程间共享数据

    目录

    1.锁:Lock

    2.信号量:Semaphone

    3.事件:Event

    4.进程队列:Queue

    5.生产者和消费者模型

    6.JoinableQueue

    7.Manager:进程之间共享数据

    锁:Lock

    1.锁的基本概念

    上锁和解锁是一对,只上锁不解锁会发生死锁现象(代码阻塞,不往下执行了)

    互斥锁 : 互斥锁是进程之间的互相排斥,谁先抢到这个锁资源就先使用,后抢到后使用

    2.锁的基本用法

    # 创建一把锁
    lock = Lock()
    # 上锁
    lock.acquire()
    # 连续上锁不解锁是死锁
    lock.acquire()  # error
    
    print("厕所中...")
    
    # 解锁
    lock.release()
    print("执行程序 ... ")

    对上面代码的一波分析:因为上锁后又解锁,所以最后一行的执行程序...可以打印出

    但是如果连续上两把锁,解一把锁,则会产生死锁状态。无法打印后面的执行程序...

    3.模拟12306抢票软件

    # 读写数据库中的票数
    def wr_info(sign, dic=None):
        if sign == "r":
            with open("ticket.txt", mode="r", encoding="utf-8") as fp:
                dic = json.load(fp)
            return dic
    
        elif sign == "w":
            with open("ticket.txt", mode="w", encoding="utf-8") as fp:
                json.dump(dic, fp)
    
    
    # 抢票方法
    def get_ticket(person):
        # 获取数据库中实际的票数
        dic = wr_info("r")
        print(dic)
    
        # 模拟一下网络延迟
        time.sleep(0.5)
    
        # 判断票数
        if dic["count"] > 0:
            print("%s抢到票了" % (person))
            dic["count"] -= 1
            wr_info("w", dic)
        else:
            print("%s没有抢到这张票" % (person))
    
    
    def run(person, lock):
        # 查看剩余票数
        dic = wr_info("r")
        print("%s 查询票数: %s" % (person, dic["count"]))
    
        # 上锁
        lock.acquire()
        # 开始抢票
        get_ticket(person)
        lock.release()
    
    
    if __name__ == "__main__":
        lock = Lock()
        lst = ["Fly", "Hurt", "Mojo", "Snow", "1dao", "770", "JieJ", "139", "Gemini", "SK"]
        for i in lst:
            p = Process(target=run, args=(i, lock))
            p.start()

    运行结果如下图所示

    代码分析:

    创建进程的时候,仍然是异步并发,

    在执行到上锁时,多个进程之间变成了同步程序.

    先来的先上锁,先执行,后来的进程后上锁,后执行

    信号量:Semaphone

    信号量 Semaphore 本质上就是锁,只不过可以控制上锁的数量

    1.Semaphore的基本用法

    sem = Semaphore(4) # 锁的数量为4
    sem.acquire()
    sem.acquire()
    sem.acquire()
    sem.acquire()
    # sem.acquire() # 上第五把锁出现死锁状态.
    print("执行相应的操作")
    sem.release()

    2.模拟KTV房间唱歌

    def ktv(person, sem):
        sem.acquire()
        print("%s进入了ktv,正在唱歌" % (person))
        # 开始唱歌,唱一段时间
        time.sleep(random.randrange(3, 7))  # 3 4 5 6
        print("%s离开了ktv,唱完了" % (person))
        sem.release()
    
    
    if __name__ == "__main__":
        sem = Semaphore(4)
        lst = ["Fly", "Hurt", "1dao", "Mojo", "Snow", "770", "Giao", "SK", "139", "Gemini"]
        for i in lst:
            p = Process(target=ktv, args=(i, sem))
            p.start()

    运行结果如下图所示

    代码分析:

    Semaphore 可以设置上锁的数量

    同一时间最多允许几个进程上锁

    创建进程的时候,是异步并发

    执行任务的时候,遇到锁会变成同步程序.

    事件:Event

    1.Event中的基本方法

    阻塞事件:

    e = Event()生成事件对象e

    e.wait()动态给程序加阻塞 , 程序当中是否加阻塞完全取决于该对象中的is_set() [默认返回值是False]

      如果是True 不加阻塞

      如果是False 加阻塞

    控制这个属性的值:

    set()方法 将这个属性的值改成True

    clear()方法 将这个属性的值改成False

    is_set()方法 判断当前的属性是否为True (默认上来是False)

    2.Event的基本语法

    1.小试牛刀

    e = Event()
    print(e.is_set()) # 默认是False状态
    e.wait()
    print("程序运行中 ... ")

    运行结果如下图所示

    2.is_set=True

    e = Event()
    e.set() # 将内部成员属性值由False -> True
    print(e.is_set())
    e.wait()
    print("程序运行中 ... ")
    
    e.clear() # 将内部成员属性值由True => False
    e.wait()
    print("程序运行中2 ... ")

    运行结果如下图所示

    3.wait里加参数

    e = Event()
    # wait参数 可以写时间 wait(3) 代表最多等待3秒钟
    e.wait(3) 
    print("程序运行中3 ... ")

    3.模拟经典红绿灯效果

    def traffic_light(e):
        print("红灯亮")
        while True:
            if e.is_set():
                # 绿灯状态,亮1秒钟
                time.sleep(1)
                print("红灯亮")
                e.clear()
            else:
                # 红灯状态,亮1秒钟
                time.sleep(1)
                print("绿灯亮")
                e.set()
    
    def car(e,i):
        # not False => True => 目前是红灯,小车在等待
        if not e.is_set():
            print("car%s 在等待" % (i))
            # 加阻塞
            e.wait()
        print("car%s 通行了" % (i))
    
    # 不关红绿灯,一直跑
    """
    if __name__ == "__main__":
        e = Event()
        # 创建交通灯对象
        p1 = Process(target=traffic_light,args=(e,))
        p1.start()
        
        # 创建车对象
        for i in range(1,21):
            time.sleep(random.randrange(0,2)) # 0 1
            p2 = Process(target=car,args=(e,i))
            p2.start()
    """
    
    # 当所有小车都跑完之后,把红绿灯收拾起来,省电
    if __name__ == "__main__":
        lst = []
        e = Event()
        # 创建交通灯对象
        p1 = Process(target=traffic_light,args=(e,))
        
        # 设置红绿灯为守护进程
        p1.daemon = True
        p1.start()
        
        # 创建车对象
        for i in range(1,21):
            time.sleep(random.randrange(0,2)) # 0 1
            p2 = Process(target=car,args=(e,i))
            p2.start()
            lst.append(p2)
            
        # 让所有的小车都通行之后,在结束交通灯
        for i in lst:
            i.join()
    
        print("程序结束 ... ")

    运行结果如下图所示

    那么怎样才能做到,当20辆车已经通行之后,停止红绿灯的交替闪烁呢?

    # 当所有小车都跑完之后,把红绿灯收拾起来,省电
    if __name__ == "__main__":
        lst = []
        e = Event()
        # 创建交通灯对象
        p1 = Process(target=traffic_light, args=(e,))
    
        # 设置红绿灯为守护进程
        p1.daemon = True
        p1.start()
    
        # 创建车对象
        for i in range(1, 21):
            time.sleep(random.randrange(0, 2))  # 0 1
            p2 = Process(target=car, args=(e, i))
            p2.start()
            lst.append(p2)
    
        # 让所有的小车都通行之后,在结束交通灯
        for i in lst:
            i.join()
    
        print("程序结束 ... ")

    运行结果如下图所示

    代码分析:

    1.设置p1交通信号灯为deamon守护进程,当主进程结束,守护进程-红绿灯进程也结束

    2.i.join:当所有小车都通行之后,再结束交通信号灯进程

    进程队列:Queue

    什么是队列? 

    队列特点: 先进先出,后进后出

    1.put() 往队列里放值

    q = Queue()
    # 1.put 往队列中放值
    q.put(100)
    q.put(101)
    q.put(102)

    2.get() 从队列里取值

    # 2.get 从队列中取值
    res = q.get()
    print(res)
    res = q.get()
    print(res)
    res = q.get()
    print(res)

    3.队列中如果已经没有数据了,在调用get会发生阻塞.

    res = q.get()
    print(res)

    执行结果如下图所示

    4.get_nowait 拿不到数据就报错

    get_nowait  存在系统兼容性问题[windows]好用 [linux]不好用 不推荐

    res = q.get_nowait()
    print(res)

     

    当然,我们可以使用try捕获到这个错误

    try:
        res = q.get_nowait()
        print(res)
    except queue.Empty:
        pass

    5.设置队列的长度

    q2 = Queue(4)
    q2.put(200)
    q2.put(201)
    q2.put(202)
    q2.put(203)  
    # 如果超过了队列的指定长度,在继续存值会出现阻塞现象
    # q2.put(204) # 超出长度会阻塞

    6.put_nowait() 非阻塞版本的put,超出长度后,直接报错

    q = Queue(3)
    q.put(100)
    q.put(101)
    q.put(102)
    q.put_nowait(204) # 超出队列长度,直接报错

     同样,我们可以使用try捕获异常

    try:
        q2.put_nowait(205)
    except queue.Full:
        pass

    7.进程之间的数据共享

    def func(q3):
        # 2.子进程获取数据
        res = q3.get()
        print(res)
    
        # 3.子进程存数据
        q3.put("马生平")
    
    
    if __name__ == "__main__":
        q3 = Queue()
        p = Process(target=func, args=(q3,))
        p.start()
    
        # 1.主进程添加数据
        q3.put("王凡")
    
        # 为了等待子进程把数据放到队列中,需要加join
        p.join()
    
        # 4.主进程获取数据
        res = q3.get()
        print(res)
    
        print("主程序结束 ... ")

    生产者和消费者模型

    1.基本模型

    def consumer(q,name):
        while True:
            food = q.get()
            time.sleep(random.uniform(0.1,1))
            print("%s 吃了一个%s" % (name,food))
            
        
    # 生产者模型
    def producer(q,name,food):
        for i in range(5):
            time.sleep(random.uniform(0.1,1))
            # 打印生产的数据
            print("%s 生产了 %s%s" % (name,food,i))
            # 存储生产的数据
            q.put(food + str(i))
        
    
    if __name__ == "__main__":
        q = Queue()
        # 消费者
        p1 = Process(target=consumer,args=(q,"宋云杰"))
        # 生产者
        p2 = Process(target=producer,args=(q,"马生平","黄瓜"))
    
        p1.start()
        p2.start()
        

    2.优化版

    # 消费者模型
    def consumer(q,name):
        while True:
            food = q.get()
            if food is None:
                break
            time.sleep(random.uniform(0.1,1))
            print("%s 吃了一个%s" % (name,food))
            
        
    # 生产者模型
    def producer(q,name,food):
        for i in range(5):
            time.sleep(random.uniform(0.1,1))
            # 打印生产的数据
            print("%s 生产了 %s%s" % (name,food,i))
            # 存储生产的数据
            q.put(food + str(i))
        
    
    if __name__ == "__main__":
        q = Queue()
        # 消费者
        p1 = Process(target=consumer,args=(q,"宋云杰"))
        # 生产者
        p2 = Process(target=producer,args=(q,"马生平","黄瓜"))
        
    
        p1.start()
        p2.start()
        
        # 在生产者生产完所有数据之后,在队列的末尾添加一个None
        p2.join()
        # 添加None
        q.put(None)

    优化版和普通版有什么不同呢?

    运行结果如下图所示

    JoinableQueue

    1.前戏

    上面的生产者-消费者模型只是针对于一个生产者和一个消费者

    那如果是多个生产者和多个消费者呢?

    2.JoinableQueue基本语法

    put 存储

    get 获取

    task_done

    join

    task_done 和 join 配合使用

    队列中 1 2 3 4 5

    put 一次 内部的队列计数器加1

    get 一次 通过task_done让队列计数器减1

    join函数,会根据队列计数器来判断是阻塞还是放行

    队列计数器 = 0 , 意味着放行

    队列计数器 != 0 , 意味着阻塞

    jq =JoinableQueue()
    jq.put("a") # 向队列中存储一个a 并且队列计数器会加1
    print(jq.get()) # 取出队列的a并打印
    jq.task_done() # 通过task_done让队列计数器减1,此时队列计数器为0
    jq.join() # 队列计数器为0,放行,打印下面的内容
    print("finish")

    3.用JoinableQueue优化生产者-消费者模型

    def consumer(q, name):
        while True:
            food = q.get()
            time.sleep(random.uniform(0.1, 1))
            print("%s 吃了一个%s" % (name, food))
            # 当队列计数器减到0的时,意味着进程队列中的数据消费完毕
            q.task_done()
    
    
    # 生产者模型
    def producer(q, name, food):
        for i in range(5):
            time.sleep(random.uniform(0.1, 1))
            # 打印生产的数据
            print("%s 生产了 %s%s" % (name, food, i))
            # 存储生产的数据
            q.put(food + str(i))
    
    
    if __name__ == "__main__":
        q = JoinableQueue()
        # 消费者
        p1 = Process(target=consumer, args=(q, "宋云杰"))
        p3 = Process(target=consumer, args=(q, "李博伦"))
        # 生产者
        p2 = Process(target=producer, args=(q, "马生平", "黄瓜"))
    
        # 设置p1消费者为守护进程
        p1.daemon = True
        p3.daemon = True
        p1.start()
        p2.start()
        p3.start()
    
    
        # 把所有生产者生产的数据存放到进程队列中
        p2.join()
        # 为了保证消费者能够消费完所有数据,加上队列.join
        # 当队列计数器减到0的时,放行,不在阻塞,程序彻底结束.
        q.join()
        print("程序结束 ... ")

    运行结果如下图所示

    Manager:进程之间共享数据

    def work(data,lock):
    
        # 1.正常写法
        # 上锁
        lock.acquire()
        # 修改数据
        data["count"] -= 1
        # 解锁
        lock.release()
    
        # 2.使用with语法可以简化上锁和解锁两步操作
        with lock:
            data[0] += 1
    
    if __name__ == "__main__":
        lst = []
        lock = Lock()
        m = Manager()
        data = m.dict( {"count":20000} )
        data = m.list( [1,2,3] )
        for i in range(50):
            p = Process(target=work,args=(data,lock))
            p.start()
            lst.append(p)
            
        # 确保所有进程执行完毕之后,在向下执行,打印数据,否则报错.
        for i in lst:
            i.join()
            
        print(data)
            
  • 相关阅读:
    xcode 工具栏中放大镜的替换的说明
    xcode 工具栏中放大镜的替换的简单说明
    xcode 资源管理
    泛型的冒泡,插入,选择,希尔算法
    一套手写ajax加一般处理程序的增删查改
    Android自定义控件_自绘控件
    查看自己Android设备分辨率
    Collection集合 和 Map
    深入理解Java中的面向对象
    webserivce请求头组装
  • 原文地址:https://www.cnblogs.com/libolun/p/13526100.html
Copyright © 2011-2022 走看看