zoukankan      html  css  js  c++  java
  • 线程之互斥锁与递归锁、队列、线程池

    互斥锁(Mutex)

      线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。互斥锁为资源引入一个状态:锁定/非锁定。某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。

      threading模块中定义了Lock类,可以方便的处理锁定:

    #创建锁
    lock = threading.Lock()
    #锁定
    lock.acquire(blocking=True, timeout=-1)
    #释放
    lock.release()
    

      

    from threading import Thread, Lock
    n = 0
    def func(lock):
        global n
        for i in range(1500000):
            lock.acquire()
            n -= 1
            lock.release()
    def func2(lock):
        global n
        for i in range(1500000):
            lock.acquire()
            n += 1
            lock.release()
    
    if __name__ == '__main__':
        t_lsit = []
        lock = Lock()
        for i in range(10):
            t = Thread(target=func, args=(lock,))
            t2 = Thread(target=func2, args=(lock,))
            t.start()
            t2.start()
            t_lsit.append(t)
            t_lsit.append(t2)
        for t in t_lsit:
            t.join()
        print(n)
    
    
    >>>
    0
    实例

    加锁之后,我们可以确保数据的正确性

    死锁现象

    所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。

    此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁

    import time
    from threading import Thread, Lock
    
    
    noodle_lock = Lock()
    fork_lock = Lock()
    
    def eat1(name):
        noodle_lock.acquire()
        print('%s拿到面条了'%name)
        fork_lock.acquire()
        print('%s拿到叉子了'%name)
        print('%s吃面'%name)
        time.sleep(0.3)
        fork_lock.release()
        print('%s放下叉子'%name)
        noodle_lock.release()
        print('%s放下面'%name)
    
    def eat2(name):
    
        fork_lock.acquire()
        print('%s拿到叉子了'%name)
        noodle_lock.acquire()
        print('%s拿到面条了'%name)
        print('%s吃面'%name)
        time.sleep(0.3)
    
        noodle_lock.release()
        print('%s放下面'%name)
        fork_lock.release()
        print('%s放下叉子'%name)
    
    if __name__ == '__main__':
        name_list = ['luffy', 'zoro']
        name_list2 = ['sanji', 'chopper']
        for name in name_list:
            Thread(target=eat1, args=(name, )).start()
        for name in name_list2:
            Thread(target=eat2, args=(name, )).start()
    

      

    死锁发生的现象
        两把锁
        程序是异步的
        操作的时候,强到一把锁之后还要再去抢第二把锁
        一个线程抢到了一把锁
        另一个线程抢到了另一把锁
    

      

    递归锁

    解决方法,递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。

    这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。

    直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁,二者的区别是:递归锁可以连续acquire多次,而互斥锁只能acquire一次

    from threading import Thread, RLock
    rlock = RLock()
    def func(name):
        rlock.acquire()
        print(name, 1)
        rlock.acquire()
        print(name, 2)
        rlock.acquire()
        print(name, 3)
        rlock.release()
        rlock.release()
        rlock.release()
    
    for i in range(10):
        Thread(target=func, args=('name%s'%i,)).start()
    

      

    递归锁可以解决互斥锁的死锁问题
        互斥锁
            两把锁
            多个线程抢
        递归锁
            一把锁
            多个线程抢
    递归锁能够快速的解决死锁问题
    递归锁好不好?
        递归锁世界上并不是一个好的解决方案
        死锁现象的发生不是互斥锁的问题, 而是程序员的逻辑有问题导致的
        递归锁能够快速的解决死锁的问题
        
    递归锁
        迅速恢复服务,递归锁替换互斥锁
        在接下来的时间中,慢慢的把递归锁替换成互斥锁。
            这样能够完善代码的逻辑
            并且能够提高代码的效率
    
    多个线程之间,用完一个资源再用另外一个资源
    先释放一个资源,再去获取一个资源的锁
    

      

    # import time
    # from threading import Thread,RLock
    #
    # fork_lock = noodle_lock = RLock()
    # def eat1(name):
    #     noodle_lock.acquire()
    #     print('%s拿到面条了'%name)
    #     fork_lock.acquire()
    #     print('%s拿到叉子了'%name)
    #     print('%s吃面'%name)
    #     time.sleep(0.3)
    #     fork_lock.release()
    #     print('%s放下叉子'%name)
    #     noodle_lock.release()
    #     print('%s放下面'%name)
    #
    # def eat2(name):
    #     fork_lock.acquire()
    #     print('%s拿到叉子了' % name)
    #     noodle_lock.acquire()
    #     print('%s拿到面条了'%name)
    #     print('%s吃面'%name)
    #     time.sleep(0.3)
    #     noodle_lock.release()
    #     print('%s放下面'%name)
    #     fork_lock.release()
    #     print('%s放下叉子' % name)
    
    # if __name__ == '__main__':
    #     name_list = ['luffy','zoro']
    #     name_list2 = ['sanji','chopper']
    #     for name in name_list:
    #         Thread(target=eat1,args=(name,)).start()
    #     for name in name_list2:
    #         Thread(target=eat2,args=(name,)).start()
    递归锁解决吃面问题
    from threading import Thread,Lock
    
    fork_noodle_lock = Lock()
    def eat1(name):
        fork_noodle_lock.acquire()
        print('%s拿到面条了'%name)
        print('%s拿到叉子了'%name)
        print('%s吃面'%name)
        time.sleep(0.3)
        fork_noodle_lock.release()
        print('%s放下叉子'%name)
        print('%s放下面'%name)
    
    
    def eat2(name):
        fork_noodle_lock.acquire()
        print('%s拿到叉子了' % name)
        print('%s拿到面条了'%name)
        print('%s吃面'%name)
        time.sleep(0.3)
        fork_noodle_lock.release()
        print('%s放下面'%name)
        print('%s放下叉子' % name)
    
    if __name__ == '__main__':
        name_list = ['luffy','zoro']
        name_list2 = ['sanji','chopper']
        for name in name_list:
            Thread(target=eat1,args=(name,)).start()
        for name in name_list2:
            Thread(target=eat2,args=(name,)).start()
    解决吃面问题

    线程队列

    queue队列 :使用import queue,用法与进程Queue一样

    queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

    class queue.Queue(maxsize=0) #先进先出
    import queue
    
    q=queue.Queue()
    q.put('first')
    q.put('second')
    q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    结果(先进先出):
    first
    second
    third
    先进先出

    class queue.LifoQueue(maxsize=0) #last in fisrt out

    import queue
    
    q=queue.LifoQueue()
    q.put('first')
    q.put('second')
    q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    结果(后进先出):
    third
    second
    first
    '''
    
    后进先出
    后进先出

    class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

    import queue
    
    q=queue.PriorityQueue()
    #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
    q.put((20,'a'))
    q.put((10,'b'))
    q.put((30,'c'))
    
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    结果(数字越小优先级越高,优先级高的优先出队):
    (10, 'b')
    (20, 'a')
    (30, 'c')
    '''
    
    优先级队列
    优先级队列

    线程池

    Python--concurrent.futures

    1.concurent.future模块是用来创建并行的任务,提供了更高级别的接口,
    为了异步执行调用
    2.concurent.future这个模块用起来非常方便,它的接口也封装的非常简单
    3.concurent.future模块既可以实现进程池,也可以实现线程池
    4.模块导入进程池和线程池
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    p = ProcessPoolExecutor(max_works)对于进程池如果不写max_works:默认的是cpu的数目
    p = ThreadPoolExecutor(max_works)对于线程池如果不写max_works:默认的是cpu的数目*5

    基本方法

    1、submit(fn, *args, **kwargs)
    异步提交任务
     
    2、map(func, *iterables, timeout=None, chunksize=1)
    取代for循环submit的操作
     
    3、shutdown(wait=True)
    相当于进程池的pool.close()+pool.join()操作
    wait=True,等待池内所有任务执行完毕回收完资源后才继续
    wait=False,立即返回,并不会等待池内的任务执行完毕
    但不管wait参数为何值,整个程序都会等到所有任务执行完毕
    submit和map必须在shutdown之前
     
    4、result(timeout=None)
    取得结果
     
    5、add_done_callback(fn)
    回调函数
    

      

    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    from threading import currentThread
    import os,time,random
     
     
    def task(n):
        print("%s is running " % currentThread().getName())
        time.sleep(random.randint(1,3))
        return n*2
     
    if __name__ == '__main__':
        start = time.time()
        executor = ThreadPoolExecutor(4)  # 线程池
     
        res = []
        for i in range(10):  # 开启10个任务
            future = executor.submit(task,i)  # 异步提交任务
            res.append(future)
     
        executor.shutdown()  # 等待所有线程执行完毕
        print("++++>")
        for r in res:
            print(r.result())  # 打印结果
     
        end = time.time()
        print(end - start)
     
    ------------输出
     
    <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000000025B0DA0>_0 is running
    <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000000025B0DA0>_1 is running
    <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000000025B0DA0>_2 is running
    <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000000025B0DA0>_3 is running
    <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000000025B0DA0>_3 is running
    <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000000025B0DA0>_1 is running
    <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000000025B0DA0>_0 is running
    <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000000025B0DA0>_2 is running
    <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000000025B0DA0>_3 is running
    <concurrent.futures.thread.ThreadPoolExecutor object at 0x00000000025B0DA0>_1 is running
    ++++>
    0
    2
    4
    6
    8
    10
    12
    14
    16
    18
    5.002286195755005
    

      

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    
    import os,time,random
    def task(n):
        print('%s is runing' %os.getpid())
        time.sleep(random.randint(1,3))
        return n**2
    
    if __name__ == '__main__':
    
        executor=ThreadPoolExecutor(max_workers=3)
    
        # for i in range(11):
        #     future=executor.submit(task,i)
    
        executor.map(task,range(1,12)) #map取代了for+submit
    map
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    from multiprocessing import Pool
    import requests
    import json
    import os
    
    def get_page(url):
        print('<进程%s> get %s' %(os.getpid(),url))
        respone=requests.get(url)
        if respone.status_code == 200:
            return {'url':url,'text':respone.text}
    
    def parse_page(res):
        res=res.result()
        print('<进程%s> parse %s' %(os.getpid(),res['url']))
        parse_res='url:<%s> size:[%s]
    ' %(res['url'],len(res['text']))
        with open('db.txt','a') as f:
            f.write(parse_res)
    
    
    if __name__ == '__main__':
        urls=[
            'https://www.baidu.com',
            'https://www.python.org',
            'https://www.openstack.org',
            'https://help.github.com/',
            'http://www.sina.com.cn/'
        ]
    
        # p=Pool(3)
        # for url in urls:
        #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
        # p.close()
        # p.join()
    
        p=ProcessPoolExecutor(3)
        for url in urls:
            p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果
    
    回调函数
    回调函数
     
  • 相关阅读:
    字符串数组
    常用函数
    判断是否是素数回文数
    杨辉三角
    惨痛的教训 没有 脑子的我
    剪缎带
    ?????函数不起作用
    C#3
    celery 原理和组件
    vue检查用户名是否重复
  • 原文地址:https://www.cnblogs.com/eaoo/p/9707826.html
Copyright © 2011-2022 走看看