zoukankan      html  css  js  c++  java
  • 浅析Python多线程

    浅析Python多线程

     

    学习Python多线程的资料很多,吐槽Python多线程的博客也不少。本文主要介绍Python多线程实际应用,且假设读者已经了解多线程的基本概念。如果读者对进程线程概念不甚了解,可参见知名博主 阮一峰 转译的一篇博客:《进程与线程的一个简单解释》

    1 线程的基本操作

    Python中多线程主要有两个模块,_thread和threading模块。前者更底层,后者更常用,能满足绝大部分编程需求,今天主要围绕threading模块展开介绍。启动一个线程需要用threading模块中的Thread。

    线程的启动需要先创建Thread对象,然后调用该对象的start()方法,参见下例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    import time
    import threading
     
    def func(n):
        while n > 0:
            print("线程name:", threading.current_thread().name, "参数n:", n)
            -= 1
            time.sleep(1)
     
    = threading.Thread(target=func, args=(5,))
    t.start()
    print("主线程:", threading.current_thread().name)
    # 运行结果:
    # 线程name: Thread-1 参数n: 5
    # 主线程: MainThread
    # 线程name: Thread-1 参数n: 4
    # 线程name: Thread-1 参数n: 3
    # 线程name: Thread-1 参数n: 2
    # 线程name: Thread-1 参数n: 1

    上例中,threading.current_thread().name 是获取当前线程的name属性。

    Thread中,形参target传入函数名,args传入函数对应的参数,参数必须是可迭代对象,如果是元组且只有一个参数必须写成(参数,)的形式,逗号不能省略

    一旦启动一个线程,该线程将由操作系统来全权管理,独立执行直到目标函数返回。一般情况下,线程的操作有以下几种:

    1
    2
    3
    4
    5
    t.is_alive()    # 查询线程对象的状态,返回布尔值
    t.join()        # 将线程加入到当前线程,并等待其终止
     
    = Thread(target=countdown, args=(10,), daemon=True)  # 后台线程
    t.start()  

    查看线程状态示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    import time
    import threading
     
    def func(n):
        while n > 0:
            print("线程name:", threading.current_thread().name, "参数n:", n)
            -= 1
            time.sleep(1)
     
    = threading.Thread(target=func, args=(2,))
    t.start()
    print("主线程:", threading.current_thread().name)
     
    if t.is_alive():
        print("活着的")
    else:
        print("未存活")
    print("主线程结束")

    让主线程等待其他线程,就是主线程会在join()处一直等待所有线程都结束之后,再继续运行。参见下例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    import time
    import threading
     
    def func(n):
        while n > 0:
            print("线程name:", threading.current_thread().name, "参数n:", n)
            -= 1
            time.sleep(1)
     
    = threading.Thread(target=func, args=(2,))
    t.start()
    t.join()
    print("主线程:", threading.current_thread().name)
    print("主线程结束")
    # 运行结果:
    # 线程name: Thread-1 参数n: 2
    # 线程name: Thread-1 参数n: 1
    # 主线程: MainThread
    # 主线程结束

    后台线程参见下例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    import time
    import threading
     
    def func(n):
        while n > 0:
            print("参数n:", n)
            -= 1
            time.sleep(1)
     
    = threading.Thread(target=func, args=(10, ), daemon=True)
    t.start()
    time.sleep(3)
    print("主线程结束")
     
    # 参数n: 10
    # 参数n: 9
    # 参数n: 8
    # 参数n: 7
    # 主线程结束

    后台线程无法等待,但主线程终止时后台线程自动销毁。 如果要对线程进行高级操作,如发送信号终止线程,都需要自己实现。下例通过轮询控制线程退出

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    import time
    from threading import Thread
     
    class StopThread:
        def __init__(self):
            self._flag = True
     
        def terminate(self):
            self._flag = False
     
        def run(self, n):
            while self._flag and n > 0:
                print('num>>:', n)
                -= 1
                time.sleep(1)
     
    obj = StopThread()
    = Thread(target=obj.run, args=(11,))
    t.start()
     
    time.sleep(5)    # 表示do something
     
    obj.terminate()  # 终止线程
    t.join()
    print("主线程结束")

    上例通过类中的_flag控制线程的终止,当主线程执行5秒之后,主动将_flag赋值为False终止线程。通过轮询终止线程存在一个问题,如果while self._flag and n > 0:这句后,某次循环一直阻塞在I/O操作上,根本不会进行下一次循环,自然就无法终止。这该怎么办呢?留一个思考题。

    多线程还可以通过继承Thread实现,如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    import time
    from threading import Thread
     
    class A(Thread):
        def __init__(self,):
            super().__init__()
     
        def run(self):
            print("run1..", )
            time.sleep(5)
            print("run2..")
     
    obj = A()
    obj.start()
    print("主线程结束")

    2 线程锁和一个怪象

    当我们用多个线程同时修改同一份数据时,怎么保证最终结果是我们期许的呢?举个例子,当前有一个全局变量a=0,如果有10个线程同时对其加1,这就出现了线程间的竞争,到底应该听谁的呢?这时候,应该用线程锁来解决。也就是当某一个线程A对该数据操作时,对该数据加锁,其他线程只能等着,等待A操作完之后释放了该锁,其他线程才能操作该数据,一旦某个线程获得操作数据的权限,立即又加上锁。如此便能保证数据的安全准确。奇怪的是,在Python3中,即使不加锁,好像也不会发生数据出错的情况。或许这个例子不是很好,也或许是Python3中自动加了锁。希望有知道的读者赐教一下。这个奇怪的现象就是下例了:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    from threading import Thread
    import time
     
    def add_one(a):
        time.sleep(1)
        print("in thread a:", a)
        a[1+= 1
     
    if __name__ == '__main__':
        array = [014]
        thread_obj_list = []
     
        for in range(50):
            = Thread(target=add_one, args=(array,))
            t.start()
            thread_obj_list.append(t)
     
        for in thread_obj_list:
            j.join()
     
        print("array result::", array)
        # array result:: [0, 51, 4]  

    我们看到,最后array的第二个元素是51,并没有出错,这真是令人费解。好了,言归正传,来看看线程锁的几个方法吧:

    1
    2
    3
    lock = threading.Lock()     # Lock对象
    lock.acquire()              # 锁定
    lock.release()              # 解锁

    Lock有“锁定”或“解锁”两种状态之一。它是在解锁状态下创建的。它有两个基本方法,acquire() 和 release()
    当状态为解锁时,acquire()将状态更改为锁定并立即返回。当状态被锁定时,acquire()块直到对另一个协程中的release()的调用将其改变为解锁,然后acquire()调用将其重置为锁定并返回。 
    release()方法只应在锁定状态下调用;它将状态更改为已解锁并立即返回。如果尝试释放已解锁的锁,则会引发 RuntimeError。

    下面是一个具体的使用例子:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    from threading import Thread
    import time
    import threading
     
    lock = threading.Lock()
     
    def add_one(a):
        time.sleep(1)
        lock.acquire()
        a[1+= 1
        lock.release()
     
    if __name__ == '__main__':
        array = [014]
        thread_obj_list = []
     
        for in range(50):
            = Thread(target=add_one, args=(array,))
            t.start()
            thread_obj_list.append(t)
     
        for in thread_obj_list:
            j.join()
     
        print("array result::", array)
        # array result:: [0, 51, 4]  

    acquire()和release()方法成对出现。但是这样手动释放有时候可能会遗忘,这时候可以考虑用上下文管理协议。关于上下文管理协议,可参见作者的这篇文章【Python上下文管理器】。

    Lock对象支持with语句:

    1
    2
    3
    4
    def add_one(a):
        time.sleep(1)
        with lock:
            a[1+= 1 

    3 递归锁 

    可重入锁(又称递归锁,RLock),就是大锁中包含子锁的情况下使用。在这种情况下,再用Lock时,就会出现死锁现象,此时应该用threading.RLock()对象了,用法同Lock,参见下例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    from threading import Thread
    import time
    import threading
     
    lock = threading.RLock()
     
    def add_one(a):
        lock.acquire()
        a[1+= 1
        lock.release()
     
    def add_two(b):
        time.sleep(1)
        lock.acquire()
        b[1+= 2
        add_one(b)
        lock.release()
     
    if __name__ == '__main__':
        array = [014]
        thread_obj_list = []
     
        for in range(50):
            = Thread(target=add_two, args=(array,))
            t.start()
            thread_obj_list.append(t)
     
        for in thread_obj_list:
            j.join()
     
        print("array result::", array)
        # array result:: [0, 151, 4]  

    上例读者可以试试Lock(),看看什么效果。RLock()还支持上下文管理协议,上例中的两个函数可以改成这样:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    def add_one(a):
        with rlock:
            a[1+= 1
     
    def add_two(b):
        time.sleep(1)
        with rlock:
            b[1+= 2
            add_one(b)

    4 GIL

    全局解释器锁(英语:Global Interpreter Lock,缩写GIL),是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行。所以很多人说Python的线程是假线程,并能利用多核,并不能真正并行。之所以感觉到线程并行,是因为线程上下文不断切换的缘故。Python 3.2开始使用新的GIL。新的GIL实现中用一个固定的超时时间来指示当前的线程放弃全局锁。在当前线程保持这个锁,且其他线程请求这个锁时,当前线程就会在5毫秒后被强制释放该锁。关于全局锁,强调三点:

    (1)GIL的存在,同一时刻只能有一个线程在运行。

    (2)GIL是CPython的特性,Jython,pypy等并无GIL。

    (3)Cpython的多线程适用于I/O密集型问题,计算密集型问题可使用多进程编程。  

    5 判断线程状态

    在多线程编程中,有时候某个线程依赖另一个线程的状态,需要使用threading库中的Event对象。 Event对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。可将线程设置等待Event对象, 直到有其他线程将Event对象设置为真,这些等待Event对象的线程将开始执行。Event()对象的常用方法:

    1
    2
    3
    4
    5
    6
    event = threading.Event()   # 创建threading.Event()对象
     
    event.is_set()   # 获取event的设置值,默认为False
    event.set()      # 设置event的值为True
    event.clear()    # 设置event的值为False
    event.wait()     # 等到event的值被设为True就执行

    下面通过“交通信号灯”问题示范event的使用:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    import threading
    import time
     
    def traffic_light(event):
        count = 0
        event.set()
        while True:
            # 如果计数器[0, 5)之间, 红灯,event=False
            if 0 <= count < 5:
                event.clear()
                print("light is Red")
            # 如果计数器[5, 10)之间, 绿灯,event=True
            elif 5 <= count < 10:
                event.set()
                print("light is Green")
            # 如果计数器大于10,红灯,将event设置为False,计数器置为0
            else:
                event.clear()
                count = 0
            time.sleep(1)
            count += 1
     
    def car(name, event):
        while True:
            if not event.is_set():
                # event为False, 表示红灯, 车只能等待
                print("RED, the %s is waiting..." % name)
                # 此处会阻塞住,直到event被设置为True在执行
                event.wait()
                print("Green, The %s going...." % name)
     
    = threading.Event()
    light = threading.Thread(target=traffic_light, args=(e,))
    light.start()
    car1 = threading.Thread(target=car, args=("Tesla", e, ))
    car1.start()

    交通信号灯有红灯和绿灯两种状态,每5秒切换一次状态,而car()函数中,只要灯变绿就放car通行。运行试试看。

    event对象的一个重要特点是当它被设置为真时会唤醒所有等待它的线程。如果你只想唤醒单个或者一定数目的线程,最好是使用信号量或者 Condition 对象来替代。

    Condition对象

     condition对象总是与锁关联,可以手动传入锁对象,也可以不传入使用默认值。当有多个线程需要等待某个变量改变时,才开始执行。这种情况可以用condition对象实现。condition对象的主要方法有:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    condition = threading.Condition(lock=None)   # 创建Condition对象  参数可以不传
     
    condition.acquire()    # 加锁
    condition.release()    # 解锁
     
    condition.wait(timeout=None)                 # 阻塞,直到有调用notify(),或者notify_all()时再触发
    condition.wait_for(predicate, timeout=None)  # 阻塞,等待predicate条件为真时执行
     
    condition.notify(n=1)        # 通知n个wait()的线程执行, n默认为1
    condition.notify_all()       # 通知所有wait着的线程执行
     
    with condition:              # 支持with语法,不必每次手动调用acquire()/release()

    看一个例子不是很优雅的例子:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    import threading
    import time
     
    condition = threading.Condition()    # 创建condition对象
     
    def func():
        condition.acquire()    # 如果没有with语句,必写这句,否者报错
        condition.wait()       # 阻塞,等待其他线程调用notify()
        print("in func..")
        condition.release()    # 与acquire()成对出现
     
    # 启10个线程
    for in range(10):
        = threading.Thread(target=func, args=())
        t.start()
     
    time.sleep(5)
     
    condition.acquire()
    condition.notify(2)        # 通知两个线程执行
    condition.release()
     
    # in func..
    # in func..
    # 其他8个线程会继续等待... 

    上例中,我们看到启动的10个线程会等待5秒钟并且调用了notify(2)之后,才会通知两个线程继续运行。且这两个线程执行完毕之后,其他8个线程仍然会阻塞在condition.wait() 处。

    频繁写acquire() / release()很繁琐,下面是优雅的写法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    import threading
    import time
     
    condition = threading.Condition()    # 创建condition对象
     
    def func(n):
        with condition:            # with更优雅
            condition.wait()       # 阻塞,等待其他线程调用notify()
            print("in func..", n)
     
     
    # 启10个线程
    for in range(10):
        = threading.Thread(target=func, args=(i,))
        t.start()
     
    time.sleep(5)
     
    with condition:
        condition.notify_all()        # 通知所有线程执行 

    运行下,是不是等待5秒之后,所有线程都继续执行了?

    7 信号量

    信号量通常用于防范容量有限的资源,例如数据库服务器。一般而言信号量可以控制释放固定量的线程。比如启动100个线程,信号量的控制值设为5,那么前5个线程拿到信号量之后,其余线程只能阻塞,等到这5个线程释放信号量锁之后才能去拿锁。参见下例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    import threading
    import time
     
    def func(n):
        # semaphore.acquire()
        with semaphore:
            time.sleep(2)
            print("Thread::", n)
        # semaphore.release()
     
    semaphore = threading.BoundedSemaphore(5)   # 信号量, 每次释放5个线程
     
    thread_list = []
    for in range(23):
        = threading.Thread(target=func, args=(i,))
        thread_list.append(t)
        t.start()
     
    for in thread_list:
        j.join()
     
    print("all threads done") 

    上例中,可以看到线程是每5个一组进行释放的。 

    8 Barrier对象

    Barriers字面意思是“屏障”,是Python线程(或进程)同步原语。每个线程中都调用wait()方法,当其中一个线程执行到wait方法处会立阻塞;一直等到所有线程都执行到wait方法处,所有线程再继续执行。参见下例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    import time
    import threading
     
    bar = threading.Barrier(3)  # 创建barrier对象,指定满足3个线程
     
    def worker1():
        print("worker1")
        time.sleep(1)
        bar.wait()
        print("worker1 end")
     
    def worker2():
        print("worker2")
        time.sleep(2)
        bar.wait()
        print("worker2 end")
     
    def worker3():
        print("worker3")
        time.sleep(5)
        bar.wait()
        print("worker3 end")
     
     
    thread_list = []
    t1 = threading.Thread(target=worker1)
    t2 = threading.Thread(target=worker2)
    t3 = threading.Thread(target=worker3)
    thread_list.append(t1)
    thread_list.append(t2)
    thread_list.append(t3)
     
    for in thread_list:
        t.start()
     
    # 每个线程中都调用了wait()方法,在所有(此处设置为3)线程调用wait方法之前是阻塞的。
    # 也就是说,只有等到3个线程都执行到了wait方法这句时,所有线程才继续执行。  

    上例中,可以看到,所有线程会先各自运行wait()方法之前的代码,到wait()处阻塞。等待最后一个线程执行到wait()处,也就是5秒之后,所有线程恢复执行。

    9 线程间通信

    两个或多个线程之间相互发送数据最安全的方式可能就是使用 queue 库中的队列了。创建一个线程共享的 Queue 对象,线程通过使用 put()和 get()操作来向队列中添加或者删除元素。Queue对象已经内置了锁机制,编程时不必手动操作锁。下例producer()函数代表包子铺,生产包子放入队列中;consumer()函数代表吃包子的人,不断从队列中取出包子吃掉;以此演示线程间通过队列通信。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    from queue import Queue
    import threading
    import time
     
    = Queue(10)
     
    def producer():
        = 0
        while True:
            q.put("包子%s" % n)
            print("包子铺生产 包子%s" % n)
            += 1
            time.sleep(2)
     
    def consumer():
        while True:
            = q.get()
            print("bucker 吃掉 %s" % r)
            time.sleep(1)
     
    t1 = threading.Thread(target=producer)
    t1.start()
    t2 = threading.Thread(target=consumer)
    t2.start()  

    形如上例的编程模型,又叫生产者-消费者模型。它降低了程序之前的耦合,使得队列的上游只关注生产数据,队列的下游只关注消费数据。在票务系统,或者资源有限的情况中可用此模型。补充两点:

    (1)get() 和 put() 方法都支持非阻塞方式和设定超时。
    (2)q.qsize() , q.full() , q.empty() 等可以获取一个队列的当前大小和状态。但它们不是线程安全的,尽量别用

    10 线程池

    Python3.2开始,增加了标准库concurrent.futures,该库中的ThreadPoolExecutor是自带的线程池。简单使用:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    from concurrent.futures import ThreadPoolExecutor
    import time
     
    def tell(i):
        print("this is tread {}.".format(i))
        time.sleep(1)
     
    if __name__ == '__main__':
        future = ThreadPoolExecutor(10)
        = "ddd"
        for in range(100):
            future.submit(tell, (i,))   # 添加一个线程到线程池
        future.shutdown(wait=True)      # 此函数用于释放异步执行操作后的系统资源。

    其中,submit()方法第一个参数为函数名,第二个为函数的参数。shutdown(wait=True)用于释放异步执行操作后的系统资源。ThreadPoolExecutor还有一个优点就是:任务提交者更方便的从被调用函数中获取返回值。参见下例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    import concurrent.futures
    import requests
     
    URLS = ['http://www.cnblogs.com/zingp/p/5878330.html',
            'http://www.cnblogs.com/zingp/',
            'https://docs.python.org/']
     
    # 爬取网页内容
    def load_url(url, timeout):
        with requests.get(url, timeout=timeout) as conn:
            return conn.text
     
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        # 创建future对象和对应的url的字典
        future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
        for future in concurrent.futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                data = future.result()
            except Exception as err:
                print('url:%s -- err: %s' % (url, err))
            else:
                print(url, len(data))
                 
    # http://www.cnblogs.com/zingp/ 12391
    # http://www.cnblogs.com/zingp/p/5878330.html 90029
    # https://docs.python.org/ 9980   

    上例创建一个大小为3的线程池,用了不少with语句,并用future.result() 获取函数返回值。最终,我们看到爬取了三个网页,并获得网页内容。future.result()操作会阻塞,直到对应的函数执行完成并返回一个结果。

    Python3.2以前并没有自带线程池,那时往往采用自定义线程池。下面一个就是自定义线程池的例子,看看是否能够看得懂:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    import queue
    import threading
    import contextlib
     
    StopEvent = object()
     
    class ThreadPool(object):
        """定义一个线程池类。"""
     
        def __init__(self, max_num, max_task_num=None):
            if max_task_num:
                self.q = queue.Queue(max_task_num)
            else:
                self.q = queue.Queue()
            self.max_num = max_num
            self.cancel = False
            self.terminal = False
            self.generate_list = []
            self.free_list = []
     
        def run(self, func, args, callback=None):
            """
            线程池执行一个任务。
            :param func: 任务函数;
            :param args: 任务函数所需参数;
            :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;
                                2、任务函数返回值(默认为None,即:不执行回调函数);
            :return: 如果线程池已经终止,则返回True否则None。
            """
            if self.cancel:
                return
            if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
                self.generate_thread()
            = (func, args, callback,)
            self.q.put(w)
     
        def generate_thread(self):
            """
            创建一个线程。
            """
            = threading.Thread(target=self.call)
            t.start()
     
        def call(self):
            """
            循环去获取任务函数并执行任务函数。
            """
            current_thread = threading.currentThread()
            self.generate_list.append(current_thread)
     
            event = self.q.get()
            while event != StopEvent:
                func, arguments, callback = event
                try:
                    result = func(*arguments)
                    success = True
                except Exception as e:
                    success = False
                    result = None
     
                if callback is not None:
                    try:
                        callback(success, result)
                    except Exception as e:
                        pass
     
                with self.worker_state(self.free_list, current_thread):
                    if self.terminal:
                        event = StopEvent
                    else:
                        event = self.q.get()
            else:
                self.generate_list.remove(current_thread)
     
        def close(self):
            """
            执行完所有的任务后,所有线程停止。
            """
            self.cancel = True
            full_size = len(self.generate_list)
            while full_size:
                self.q.put(StopEvent)
                full_size -= 1
     
        def terminate(self):
            """
            无论是否还有任务,终止线程。
            """
            self.terminal = True
     
            while self.generate_list:
                self.q.put(StopEvent)
     
            self.q.queue.clear()
     
        @contextlib.contextmanager
        def worker_state(self, state_list, worker_thread):
            """
            用于记录线程中正在等待的线程数。
            """
            state_list.append(worker_thread)
            try:
                # 遇到yield就返回回去执行with中的语句,执行完了回来。
                yield
            finally:
                state_list.remove(worker_thread)  

    创建大的线程池的一个可能需要关注的问题是内存的使用。 例如,如果你在OS X系统上面创建2000个线程,系统显示Python进程使用了超过9GB的虚拟内存。 不过,这个计算通常是有误差的。当创建一个线程时,操作系统会预留一个虚拟内存区域来 放置线程的执行栈(通常是8MB大小)。但是这个内存只有一小片段被实际映射到真实内存中。 因此,Python进程使用到的真实内存其实很小 (比如,对于2000个线程来讲,只使用到了70MB的真实内存,而不是9GB)。如果担心虚拟内存大小,可以使用 threading.stack_size() 函数来降低它。

    1
    2
    import threading
    threading.stack_size(65536)

    如果加上这条语句并再次运行前面的创建2000个线程试验, 会发现Python进程只使用到了大概210MB的虚拟内存,而真实内存使用量没有变。 注意线程栈大小必须至少为32768字节,通常是系统内存页大小(4096、8192等)的整数倍。

    11 小结与讨论

    (1)Python多线程编程常用threading模块。启动一个多线程需要创建一个Thread对象,调用star()方法启动线程。注意is_alive() /join()方法和daemon参数的使用。
    (2)python多线程锁有Lock / Rlock, 全局锁GIL。GIL是CPython特性,同一时刻只能运行一个线程,不能利用多核资源。
    (3)线程同步原语有Event / Condition / Semaphore / Barrier。Event用于常用语通知全部线程,condition和Semapher常用于通知一定数量的线程, Barrier用于多个线程必须完成某些步骤再一起执行。
    (4)Lock / Rlock / Event / Condition / Semaphore 支持上下文管理协议(with语句,好用)。
    (5)线程间通信可以用queue模块中的Queue队列,get()和put()已加锁,是线程安全的。qsize()/full()/empty() 等可以获取一个队列的当前大小和状态, 不是线程安全的,尽量别用。
    (6)concurrent.futures中的ThreadPoolExecutor是Python3.2之后自带的线程池模块,十分好用,支持with语句,通过future.result()获取线程返回值。
    (7)Python多线程适用于I/O密集型问题,CPU密集型问题可以用C代码优化底层算法提升性能,需注意一个写的不好的C语言扩展会导致这个问题更加严重;也可以用pypy或者多进程。

    以上是本篇全部内容,欢迎读者批评指正。

  • 相关阅读:
    redis 数据迁移
    redis
    Redis集群的三种模式
    Golang 协程 (goroutine) 与通道 (channel)
    Python生成器next方法和send方法区别
    python 文件
    Tornado 异步以及非阻塞的I/O
    python 多进程和多线程3 —— asyncio
    利用CSS改变图片颜色的100种方法!
    jquery获取div的位置
  • 原文地址:https://www.cnblogs.com/syq666/p/8665109.html
Copyright © 2011-2022 走看看