zoukankan      html  css  js  c++  java
  • 09_解决进程间通信线程间通信的资源竞争-同步互斥机制

    1.同步和互斥

        1.目的: 对共有资源的操作会产生争夺,同步互斥是一种解决争夺的方案

        2.临界资源: 多个进程或线程都可以操作的资源

        3.临界区: 操作临界资源的代码段

        4.同步:
            同步是一种合作关系,为完成某个任务多进程或多线程之间形成一种协调,按照条件依次执行传递告知资源情况,这种协调可能是因为阻塞关系达成的
            同步就是协同步调,按预定的先后次序进行运行,如:消息队列通信
            进程(线程)同步可理解为进程(线程)A和B一块配合,A执行到一定程度时要依靠B的某个结果于是停下来示意B运行;B执行再将结果给A;A再继续操作

        5.互斥:
            互斥是一种制约关系,一个进程(线程)进入到临界区会进行加锁操作,其它进程(线程)在企图操作临界资源就会阻塞,只有当资源被释放才能进行操作
            当多个线程同时修改共享数据的时候,需要进行同步控制,使用互斥锁则保证了每次只有一个线程进行写入操作,保证了多线程情况下数据的正确性
            某个线程要更改共享数据时,先将其锁定此时资源即上锁,其他线程不能更改;直到该线程释放资源即解锁,其他的线程才能再次锁定该资源

    2.进程事件-Event

        1.进程事件概述: 一个进程通过对Event的事件状态的设置,另外一个进程判断事件状态来确认是阻塞等待还是继续执行

        2.语法概述

    from multiprocessing import Event
    
    e = Event()  # 创建事件对象
    e.wait()  # 提供事件阻塞
    e.set()  # 对事件对象进程设置,此时wait判断如果事件被set则结束阻塞
    e.clear()  # 清除该事件对象的set
    e.is_set()  # 监测对象是否被设置,设置返回True

        3.临界资源操作示例

    from multiprocessing import Process
    from multiprocessing import current_process
    from multiprocessing import Event
    import time
    
    
    def wait_event(e):
        print("子进程%s阻塞等待主进程开放临界区" % current_process())
        e.wait()
        print("子进程%s开始操作临界区" % current_process(), e.is_set())
    
    
    def wait_event_timeout(e):
        print("子进程%s阻塞等待主进程开放临界区" % current_process())
        e.wait(3)
        print("子进程%s等待3秒后不再等待主进程开放临界区" % current_process(), e.is_set())
        print("子进程%s开始执行其他操作" % current_process(), e.is_set())
    
    
    def main():
        e = Event()
        p1 = Process(target=wait_event, name="block", args=(e,))
        p2 = Process(target=wait_event_timeout, name="non-block", args=(e,))
        p1.start()
        p2.start()
    
        print("主进程正在操作临界资源")
        print("-" * 50)
        time.sleep(6)
        e.set()
        print("-" * 50)
        print("主进程结束对临界资源的操作,开放临界区")
    
        p1.join()
        p2.join()
    
    
    if __name__ == "__main__":
        main()
    """执行结果
        主进程正在操作临界资源
        --------------------------------------------------
        子进程<Process(block, started)>阻塞等待主进程开放临界区
        子进程<Process(non-block, started)>阻塞等待主进程开放临界区
        子进程<Process(non-block, started)>等待3秒后不再等待主进程开放临界区 False
        子进程<Process(non-block, started)>开始执行其他操作 False
        --------------------------------------------------
        主进程结束对临界资源的操作,开放临界区
        子进程<Process(block, started)>开始操作临界区 True
    """

        4.红绿灯示例

    from multiprocessing import Process
    from multiprocessing import Event
    import time
    import random
    
    def tra(e):
        '''信号灯函数'''
        # e.set()
        # print('33[32m 绿灯亮! 33[0m')
        while 1:  # 红绿灯得一直亮着,要么是红灯要么是绿灯
            if e.is_set():  # True,代表绿灯亮,那么此时代表可以过车
                time.sleep(5)  # 所以在这让灯等5秒钟,这段时间让车过
                print('33[31m 红灯亮! 33[0m')  # 绿灯亮了5秒后应该提示到红灯亮
                e.clear()  # 把is_set设置为False
            else:
                time.sleep(5)  # 此时代表红灯亮了,此时应该红灯亮5秒,在此等5秒
                print('33[32m 绿灯亮! 33[0m')  # 红的亮够5秒后,该绿灯亮了
                e.set()  # 将is_set设置为True
    
    
    def Car(i,e):
        e.wait()  # 车等在红绿灯,此时要看是红灯还是绿灯,如果is_set为True就是绿灯,此时可以过车
        print('第%s辆车过去了'%i)
    
    
    if __name__ == '__main__':
        e = Event()
        triff_light = Process(target=tra, args=(e,))  # 信号灯的进程
        triff_light.start()
        for i in range(50):  # 描述50辆车的进程
            if i % 3 == 0:
                time.sleep(2)
            car = Process(target=Car, args=(i + 1, e,))
            car.start()

    3.进程锁-Lock

        1.进程锁概述: 在lock对象处于上锁状态时,再企图上锁则会阻塞,直到锁被释放才能继续执行上锁操作

        2.语法概述

    from multiprocess import Lock
    
    lock = Lock()  # 创建锁对象
    lock.acquire()  # 上锁
    lock.release()  # 解锁

        3.上下文管理器实现锁管理

    from multiprocess import Lock
    
    lock = Lock()  # 创建锁对象
    # 给with代码段上锁,with代码的结束自动解锁
    with lock:
        pass

        4.进程锁的实现

    from multiprocessing import Process
    from multiprocessing import Lock
    from time import sleep
    import sys
    
    
    def worker1(lock):
        lock.acquire()  # 上锁
        for _ in range(5):
            sleep(1)
            # sys.stdout为所有进程共有资源
            sys.stdout.write("worker1输出
    ")
        lock.release()  # 释放锁
    
    
    def worker2(lock):
        lock.acquire()  # 上锁
        for _ in range(5):
            sleep(1)
            sys.stdout.write("worker2输出
    ")
        lock.release()  # 释放锁
    
    
    def main():
        # 创建Lock对象
        lock = Lock()
        p1 = Process(target=worker1, args=(lock,))
        p2 = Process(target=worker2, args=(lock,))
    
        p1.start()
        p2.start()
        p1.join()
        p2.join()
    
    
    if __name__ == "__main__":
        main()
    """执行结果
        worker1输出
        worker1输出
        worker1输出
        worker1输出
        worker1输出
        worker2输出
        worker2输出
        worker2输出
        worker2输出
        worker2输出
    """

    4.进程条件变量-Condition

        1.进程条件变量概述: Condition条件变量通常与一个锁关联
            需要在多个Contidion中共享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例
            除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于状态图中的等待阻塞状态
            直到另一个线程调用notify()/notifyAll()通知;得到通知后线程进入锁定池等待锁定

        2.语法

    from multiprocessing import Condition
    
    con = Condition
    
    con.acquire(): 进程锁
    con.release(): 释放锁
    con.wait(timeout)
        进程挂起,直到收到一个notify通知或者超时才会被唤醒继续运行,timeout超时时间,秒
        wait()必须在已获得Lock前提下才能调用否则会触发RuntimeError
    con.notify(n=1)
        通知其他进程,那些挂起的进程接到这个通知之后会开始运行,默认是通知一个正等待该condition的进程,最多则唤醒n个等待的进程
        notify()必须在已获得Lock前提下才能调用否则会触发RuntimeError,notify()不会主动释放Lock
    con.notifyAll(): 如果wait状态进程比较多,notifyAll的作用就是通知所有进程

        3.进程条件变量的实现

    import multiprocessing, time
    
    
    def A(cond):
        name = multiprocessing.current_process().name
        print("%s进程开始执行A函数" % name)
        with cond:
            print("进程%s执行完成,通知其他进程可以执行" % name)
            cond.notify_all()  # 通知其他进程,那些挂起的进程接到这个通知之后会开始运行
    
    
    def B(cond):
        name = multiprocessing.current_process().name
        print("%s进程开始执行B函数" % name)
        with cond:
            cond.wait()  # 进程挂起,直到收到一个notify通知或者超时才会被唤醒继续运行
            print("%s进程继续执行B函数" % name)
    
    
    def main():
        # 创建进程条件变量
        cond = multiprocessing.Condition()
        p = multiprocessing.Process(target=A, args=(cond,))
        p_list = [multiprocessing.Process(target=B, name="Process2[%d]" % i, args=(cond,)) for i in range(1, 3)]
        # 开始进程
        for i in p_list:
            i.start()
            time.sleep(1)
        p.start()
        # 阻塞等待回收进程
        p.join()
        for i in p_list:
            i.join()
    
    
    if __name__ == "__main__":
        main()
    """执行结果
        Process2[1]进程开始执行B函数
        Process2[2]进程开始执行B函数
        Process-1进程开始执行A函数
        进程Process-1执行完成,通知其他进程可以执行
        Process2[1]进程继续执行B函数
        Process2[2]进程继续执行B函数
    """

    5.线程事件-Event

        1.语法概述

    from threading import Event
    
    e = Event()  # 创建事件对象
    e.wait()  # 提供事件阻塞
    e.set()  # 对事件对象进程设置,此时wait判断如果事件被set则结束阻塞
    e.clear()  # 清除该事件对象的set
    e.is_set()  # 监测对象是否被设置,设置返回True

        2.事件解决线程间的资源竞争

    import threading
    from time import sleep
    
    
    def fun(event):
        print("呼叫foo")
        global s
        s = "奔波儿灞"
    
    
    def foo(event):
        print("等待口令")
        sleep(2)
        if s == "奔波儿灞":
            print("收到的口令是: %s" % s)
        else:
            print("口令错误...")
        event.set()  # 对事件对象进程设置,此时wait判断如果事件被set则结束阻塞
    
    
    def bar(event):
        print("bar开始执行")
        sleep(1)
        event.wait()  # 提供事件阻塞
        global s
        s = "霸波尔奔"
    
    
    def main():
        s = None
        event = threading.Event()
        t1 = threading.Thread(name="fun", target=fun, args=(event,))
        t2 = threading.Thread(name="foo", target=foo, args=(event,))
        t3 = threading.Thread(name="bar", target=bar, args=(event,))
        t1.start()
        t2.start()
        t3.start()
        t1.join()
        t2.join()
        t3.join()
    
    
    if __name__ == "__main__":
        main()
    """执行结果
        呼叫foo
        等待口令
        bar开始执行
        收到的口令是: 奔波儿灞
    """

        3.事件实现线程间的同步机制

    from threading import Thread, Event
    import time, random
    
    
    def conn_mysql(e, i):
        count = 1
        while count <= 3:
            if e.is_set():
                print('第%s个人连接成功!' % i)
                break
            print('正在尝试第%s次重新连接...' % (count))
            e.wait(0.5)
            count += 1
    
    
    def check_mysql(e):
        print('33[42m 数据库正在维护 33[0m')
        time.sleep(random.randint(1, 2))
        e.set()
    
    
    if __name__ == '__main__':
        e = Event()
        t_check = Thread(target=check_mysql, args=(e,))
        t_check.start()
    
        for i in range(10):
            t_conn = Thread(target=conn_mysql, args=(e, i))
            t_conn.start()

    6.线程锁-Lock

        1.语法概述

    lock = Lock()  # 创建锁
    lock.acquire()  # 加锁
    lock.release()  # 解锁

        2.线程锁解决线程间的资源竞争

    import threading
    import time
    
    g_num = 0
    
    def test1(num):
        global g_num
        for i in range(num):
            mutex.acquire()  # 上锁
            g_num += 1
            mutex.release()  # 解锁
    
        print("---test1---g_num=%d" % g_num)
    
    
    def test2(num):
        global g_num
        for i in range(num):
            mutex.acquire()  # 上锁
            g_num += 1
            mutex.release()  # 解锁
        print("---test2---g_num=%d" % g_num)
    
    
    # 创建一个互斥锁,默认是未上锁的状态
    mutex = threading.Lock()
    
    # 创建2个线程,让他们各自对g_num加1000000次
    p1 = threading.Thread(target=test1, args=(1000000,))
    p1.start()
    p2 = threading.Thread(target=test2, args=(1000000,))
    p2.start()
    
    # 等待计算完成
    while len(threading.enumerate()) != 1:
        time.sleep(1)
    
    print("2个线程对同一个全局变量操作之后的最终结果是:%s" % g_num)

        3.线程锁的死锁问题-添加超时时间避免死锁

    # 在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁
    # 避免死锁:程序设计时要尽量避免(银行家算法), 添加超时时间等
    import threading
    import time
    
    class MyThread1(threading.Thread):
        def run(self):
            # 对mutexA上锁
            mutexA.acquire()
    
            # mutexA上锁后,延时1秒,等待另外那个线程 把mutexB上锁
            print(self.name + "----do1---up----")
            time.sleep(1)
    
            # 此时会堵塞,因为这个mutexB已经被另外的线程抢先上锁了
            mutexB.acquire()
            print(self.name + "----do1---down----")
            mutexB.release()
    
            # 对mutexA解锁
            mutexA.release()
    
    
    class MyThread2(threading.Thread):
        def run(self):
            # 对mutexB上锁
            mutexB.acquire()
    
            # mutexB上锁后,延时1秒,等待另外那个线程 把mutexA上锁
            print(self.name + "----do2---up----")
            time.sleep(1)
    
            # 此时会堵塞,因为这个mutexA已经被另外的线程抢先上锁了
            mutexA.acquire()
            print(self.name + "----do2---down----")
            mutexA.release()
    
            # 对mutexB解锁
            mutexB.release()
    
    
    mutexA = threading.Lock()
    mutexB = threading.Lock()
    
    if __name__ == "__main__":
        t1 = MyThread1()
        t2 = MyThread2()
        t1.start()
        t2.start()

        4.线程锁的死锁问题-通过递归锁RLock避免死锁

    from threading import Thread
    from threading import RLock
    import time
    # RLock是递归锁 --- 是无止尽的锁,但是所有锁都有一个共同的钥匙
    # 想解决死锁,配一把公共的钥匙就可以了
    
    
    def man(l_tot, l_pap):
        l_tot.acquire()  # 是男的获得厕所资源,把厕所锁上了
        print('Kali在厕所上厕所')
        time.sleep(1)
        l_pap.acquire()  # 男的拿纸资源
        print('Kali拿到卫生纸了!')
        time.sleep(0.5)
        print('Kali完事了!')
        l_pap.release()  # 男的先还纸
        l_tot.release()  # 男的还厕所
    
    def woman(l_tot, l_pap):
        l_pap.acquire()  # 女的拿纸资源
        print('Coco拿到卫生纸了!')
        time.sleep(1)
        l_tot.acquire()  # 是女的获得厕所资源,把厕所锁上了
        print('Coco在厕所上厕所')
        time.sleep(0.5)
        print('Coco完事了!')
        l_tot.release()  # 女的还厕所
        l_pap.release()  # 女的先还纸
    
    
    if __name__ == '__main__':
        l_tot = RLock()
        l_pap = RLock()
        t_man = Thread(target=man, args=(l_tot, l_pap))
        t_woman = Thread(target=woman, args=(l_tot, l_pap))
        t_man.start()
        t_woman.start()

    7.线程条件变量-Condition

        语法概述

    # 条件是让程序员自己去调度线程的一个机制
    con = threading.Condition()
    con.acquire()  # 对资源加锁,加锁后其他位置再加锁则阻塞
    con.release()  # 解锁
    con.wait()  # wait函数只能在加锁状态下使用,wait函数会先解锁然后让线程处于等待通知的阻塞状态
    con.notify()  # 发送通知,线程接收到通知后结束wait阻塞并且执行acquire加锁操作

        线程条件变量示例-打压股市

    import threading
    import time
    
    
    class Gov(threading.Thread):
        def run(self):
            global num
            con.acquire()  # 对资源加锁,加锁后其他位置再加锁则阻塞
            while True:
                print("开始拉升股市")
                num += 1
                print("拉升了%s个点" % num)
                time.sleep(1)
                if num == 5:
                    print("暂时安全")
                    con.notify()  # 发送通知,线程接收到通知后结束wait阻塞并且执行acquire加锁操作
                    print("不操作状态")
                    con.wait()  # wait先解锁然后让线程处于等待通知的阻塞状态,直到接收到t2线程发出的通知后结束阻塞并加锁
            con.release()
    
    
    class Consumers(threading.Thread):
        def run(self):
            global num
            con.acquire()  # 再对资源加锁,此时会阻塞在这里
            while True:
                if num > 0:
                    print("开始打压股市")
                    num -= 1
                    print("下降到%s个点" % num)
                    time.sleep(1)
                if num == 0:
                    print("天台见")
                    con.notify()  # 发送通知,线程接收到通知后结束wait阻塞并且执行acquire加锁操作
                    print("不能再下降了")
                    con.wait()  # wait先解锁然后让线程处于等待通知的阻塞状态,直到接收到t1线程发出的通知后结束阻塞并加锁
            con.release()
    
    
    def main():
        global num
        num = 0
        # 创建条件变量
        global con
        con = threading.Condition()
    
        t1 = Gov()
        t2 = Consumers()
        t1.start()
        t2.start()
        t1.join()
        t2.join()
    
    
    if __name__ == "__main__":
        main()
    """执行结果
        开始拉升股市
        拉升了1个点
        开始拉升股市
        拉升了2个点
        开始拉升股市
        拉升了3个点
        开始拉升股市
        拉升了4个点
        开始拉升股市
        拉升了5个点
        暂时安全
        不操作状态
        开始打压股市
        下降到4个点
        开始打压股市
        下降到3个点
        开始打压股市
        下降到2个点
        开始打压股市
        下降到1个点
        开始打压股市
        下降到0个点
        天台见
        不能再下降了
        开始拉升股市
        拉升了1个点
    """

        线程条件变量示例-吃火锅

    import threading
    import time
    
    con = threading.Condition()
    num = 0
    
    
    # 生产者
    class Producer(threading.Thread):
    
        def __init__(self):
            threading.Thread.__init__(self)
    
        def run(self):
            # 锁定线程
            global num
            con.acquire()
            while True:
                print("开始添加!!!")
                num += 1
                print("火锅里面鱼丸个数:%s" % str(num))
                time.sleep(1)
                if num >= 5:
                    print("火锅里面里面鱼丸数量已经到达5个,无法添加了!")
                    # 唤醒等待的线程
                    con.notify()  # 唤醒小伙伴开吃啦
                    # 等待通知
                    con.wait()
            # 释放锁
            con.release()
    
    
    # 消费者
    class Consumers(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
    
        def run(self):
            con.acquire()
            global num
            while True:
                print("开始吃啦!!!")
                num -= 1
                print("火锅里面剩余鱼丸数量:%s" % str(num))
                time.sleep(2)
                if num <= 0:
                    print("锅底没货了,赶紧加鱼丸吧!")
                    con.notify()  # 唤醒其它线程
                    # 等待通知
                    con.wait()
            con.release()
    
    
    def main():
        p = Producer()  # 实例化一个生产者对象
        c = Consumers()  # 实例化一个消费者对象
        p.start()
        c.start()
    
    
    if __name__ == "__main__":
        main()
  • 相关阅读:
    织梦dedecms上传漏洞uploadsafe.inc.php修复
    dedecms漏洞修复大全含任意文件上传漏洞与注入漏洞
    DEDECMS批量导入excel数据到后台文章系统的开发教程
    使用DEDE织梦计划任务功能定时更新首页
    如何解决织梦DedeCms文章标题字数长度限制的方法教程
    织梦后台点击网站主页跳转到../index.php?upcache=1删除方法
    dedecms漏洞修复大全含任意文件上传漏洞与注入漏洞
    dedecms模板中联动菜单高级使用技巧
    解决dede的loop中无法使用limit的方案+文章前数字序号
    DEDECMS 又一种隔行换色和分组加线的方法
  • 原文地址:https://www.cnblogs.com/tangxuecheng/p/13621149.html
Copyright © 2011-2022 走看看