zoukankan      html  css  js  c++  java
  • python-47-多线程锁/信号量/事件/池

     前言

    多进程中讲到了锁/信号量/事件/进程池,同样多线程中也一样存在这些东西。

    锁:Lock,多个进程中同一时间,只能排序执行程序,这里会多讲一个RLock递归锁。

    信号量:多个进程中同一时间,同时被N个进程执行。

    事件:Event,就好像红绿灯事件一样,通过一个信号控制多个进程,同时执行或阻塞。

    线程池:一个池子里面同一时间只运行N个线程。

     一、线程锁

    1、Lock给线程加锁

    # 1、Lock给线程加锁
    from threading import Thread,Lock
    import time
    def func(l):
        l.acquire()     # 拿钥匙
        global num
        t=num
        time.sleep(0.1)
        num=t-1
        l.release()     # 还钥匙
    num=5
    t_lst=[]
    l=Lock()
    for i in range(5):
        t=Thread(target=func,args=(l,))
        t.start()
        t_lst.append(t)
    for i in t_lst:i.join()
    print(num)

    ①未解锁前:按正常逻辑,num=5,每个线程执行都-1,执行了5次,最后打印结果为0才准确。

    但是由于线程是对数据不安全的,在num未被修改前已经被另外的线程拿到了原来的值,所以永远num=4。

    ②加锁后:约束了每个线程必须还钥匙后才能另一线程进入。

    2、递归锁,用来解决Lock互斥锁造成死锁问题。

    ①互斥锁形成的死锁:被堵塞了,原因多次acquire()造成。

    # 2、递归锁,用来解决Lock互斥锁造成死锁问题
    # ①死锁:被堵塞了
    from multiprocessing import Lock
    rl=Lock()
    rl.acquire()
    rl.acquire()
    rl.acquire()
    rl.acquire()
    rl.acquire()
    print(6666)

     ②RLock:递归锁,可以在一个线程中acquire()多次。所以能直接打印出结果,而不像互斥锁被堵塞了。

    # ②RLock:递归锁,可以在一个线程中acquire()多次。
    from multiprocessing import RLock
    rl=RLock()          # 递归锁:RLock
    rl.acquire()
    rl.acquire()
    rl.acquire()
    rl.acquire()
    rl.acquire()
    print(6666)

     二、信号量

    1、Semaphore的两个方法:acquire()、release()。

    from threading import Semaphore,Thread
    import time
    def func(s):
        s.acquire()
        print(111)
        time.sleep(2)
        s.release()
    s=Semaphore(2)      # 2个线程,既每次运行2个线程。
    for i in range(10):
        t=Thread(target=func,args=(s,))
        t.start()

     三、事件

    1、Event:事件,实例连接三次

    from threading import Event,Thread
    import time,random
    def connect(e):
        count=0
        while count<3:
            e.wait(0.5)             # 如果is_set()为False,等待0.5s就结束。
            if e.is_set()==True:
                print('连接成功!')
                break
            else:
                count+=1
                print('第%s次连接失败~')
        else:print('TimeoutError 连接超时')
    
    def check(e):
        time.sleep(random.randint(0,3))
        e.set()
    e=Event()
    t=Thread(target=connect,args=(e,))
    t1=Thread(target=check,args=(e,))
    t.start()
    t1.start()

     

      2、Condition:条件

    wait()等待notify,又数值就执行,但是一次性的。

    # 2、Condition:条件
    # wait()等待,钥匙是一次性的。
    from threading import Thread,Condition
    def func(con,i):
        con.acquire()
        con.wait()          # 等钥匙,不会归还,用了就没有了。
        print('我是:',i)
        con.release()
    
    if __name__ == '__main__':
        con=Condition()
        for i in range(10):
            t=Thread(target=func,args=(con,i))
            t.start()
        while 1:
            num=int(input('>>>'))
            con.acquire()
            con.notify(num)     # 造钥匙数量
            con.release()

     3、Timer定时器

    from threading import Timer
    def func():
        print('等待了3s')
        
    Timer(3,func).start()       # 异步执行,非阻塞。
    print(111)
    print(222)

     成为同步:Timer几秒执行,就等待几秒。

    4、队列

    # ①普通队列,先进先出
    import queue
    q=queue.Queue()
    q.put(1)
    q.put(2)
    print(q.get())
    print(q.get())
    
    # ②栈:先进后出
    q=queue.LifoQueue()
    q.put(1)
    q.put(2)
    print(q.get())  # get取到的是2,先进后出。
    print(q.get())
    
    # ③优先级队列
    q=queue.PriorityQueue()
    q.put(11)
    q.put(10)
    q.put(20)
    print(q.get())

     

     

     5、线程池与回调函数

    ①线程池:ThreadPoolExecutor

    from concurrent.futures import ThreadPoolExecutor
    import time
    def func(i):
        time.sleep(1)
        return i
    def func1(i):
        msg=i.result()                        # 拿返回值
        print(msg+1)
    tp=ThreadPoolExecutor(max_workers=5)     # 默认为计算机本身的cpu个数
    t_lst=[]
    for i in range(10):
        t=tp.submit(func,i)                  # 提交异步任务执行
        t_lst.append(t)
    tp.shutdown()                            # 相当于close+join:关闭接受任务,一次性返回结果
    for t in t_lst:print(t.result())        # result 获取返回值

     ②回调函数:.add_done_callback(func1)

    from concurrent.futures import ThreadPoolExecutor
    import time
    def func(i):
        time.sleep(1)
        return i
    def func1(i):
        msg=i.result()                        # 拿返回值
        print(msg+1)
    tp=ThreadPoolExecutor(max_workers=5)     # 默认为计算机本身的cpu个数
    # 回调函数
    for i in range(10):
        t=tp.submit(func,i).add_done_callback(func1)

     欢迎来大家QQ交流群一起学习:482713805

  • 相关阅读:
    Scala--基础
    maven
    Storm 运行例子
    Storm 安装部署
    Storm
    Kafka 集群部署
    Redis Twemproxy
    Redis Sentinel
    获取URL中参数的值
    浏览器滚动条样式
  • 原文地址:https://www.cnblogs.com/gsxl/p/12591522.html
Copyright © 2011-2022 走看看