zoukankan      html  css  js  c++  java
  • Python语法速查: 20. 线程与并发

    返回目录

    本篇索引

    (1)线程基本概念

    (2)threading模块

    (3)线程间同步原语资源

    (4)queue

      (1)线程基本概念

    当应用程序需要并发执行多个任务时,可以使用线程。多个线程(thread)同时运行在一个进程(process)的内部, 它们可以共享访问本进程内的全局变量数据和资源。各个线程之间的调度由操作系统负责, 具体做法是:给每个线程分配一个小的时间片,并在所有的线程之间循环切换。在具有多核的CPU上, 操作系统有时会安排尽可能使用每个CPU,从而并行执行线程。

    并发编程的复杂性在于,多个线程可能同时更新一个数据,导致数据的损坏或不一致(术语叫做:竞争), 要解决这个问题,必须使用互斥锁或其他类似的同步手段保护这些数据。

    Python解释器使用了内部的GIL(Global Interpreter Lock,全局解释器锁), 这限制了Python程序只能在一个处理器上运行。如果一个应用程序的大部分是I/O密集型的, 那么使用线程是没有问题的。而如果是CPU密集型的,那使用多个线程没任何好处,还会降低运行速度。 因此对于计算密集型的任务,最好使用C扩展模块或multiprocessing模块来代替。 C扩展具有释放解释器锁和并行运行的选项,前提是释放锁时不与解释器进行交互。 multiprocessing模块将工作分派给不受锁限制的单独子进程。

    使用多线程编程还要注意一个问题:开的线程的数量级不能太大。例如,一台使用线程的网络服务器对于100个线程工作情况良好, 但如果增加到10000个线程,性能就会变得非常糟糕。因为每个线程都需要有自己的系统资源, 而且还会产生线程上下文切换、锁和其他相关开销,算下来是个不小的开销。 对这种I/O密集型应用,比较常见的做法是将程序编写为:异步事件处理机制的结构,比如“协程”。 使用异步和协程的方式编程,可以比较轻松地处理诸如10000个连接的情况。

    没有任何方法可以强制终止或挂起其他线程!这是设计上的原因。因此,线程只能自己挂起或自己终止。

      (2)threading模块

    threading模块提供Thread类和各种同步原语,用于编写多线程的程序。 threading模块可创建:Thread对象、Timer对象、 Lock对象、RLock对象等。

    ● Thread对象

    Thread类用于表示单独的控制线程,创建新线程的语法如下:

    Thread(group=None, target=None, name=None, args=(), kwargs={})

    此函数创建一个新的线程实例,target是一个可调用对象(线程启动时,run()方法将调用此对象), name是线程名称,默认将创建一个名为 “Target-N” 格式的唯一名称。 argskwargs是传递给target函数的参数元组、参数字典。

    Thread实例支持以下属性和方法

    属性或方法类型说明
    name 属性 线程名称,这个字符串用于唯一标识线程。
    ident 属性 整数线程标识符,如果线程尚未启动,它的值为None。
    daemon 属性 布尔值,True表示为后台线程。它的初始值从创建线程的线程继承而来。 主线程(控制线程)不是后台线程(主线程的daemon为 False)。 通常 Python 解释器退出之前,会等待所有线程终止。但不会等待daemon为 True的线程。 当主线程结束后,如果其他线程的的daemon都为True,Python解释器将不等待那些线程, 整个Python程序将即时退出。
    t.start() 方法 在主线程中,调用此方法启动线程。
    t.run() 方法 线程启动时,将自动调用此方法。它将调用先前创建线程对象时,由target指定的目标函数。 可以在Thread的子类中,重新定义此方法。
    t.join([timeout]) 方法 在主线程中调用此方法,功能是等待直到线程实例 t 终止或超时为止。timeout是一个浮点数, 单位为“秒”。在线程启动之前不能调用此函数,否则会报错。
    t.is_alive() 方法 如果线程是活动的,返回True,否则返回False。从start()方法返回的那一刻开始, 线程就是活动的,直到它的run()方法终止为止。

    ● 线程使用实例

    通常 Python 解释器退出之前,会等待所有线程终止。但是,若将创建的线程的daemon设置为 True, 会使解释器在主线程结束后立即退出程序,这些daemon为 True的线程将即时被销毁。

    下例中,线程 t 每5秒从后台激活激活运行一次;30秒后,主线程结束,整个Python程序退出:

    import threading
    import time
    
    def clock(interval):
        while True:
            print("The time is %s" % time.ctime())
            time.sleep(interval)
            
    t = threading.Thread(target=clock, args=(5,))
    t.daemon = True
    t.start()
    
    time.sleep(30)

    下例为将同一个线程定义为一个 Thread 的子类:

    import threading
    import time
    
    class ClockThread(threading.Thread):
        def __init__(self, interval):
            threading.Thread.__init__(self)
            self.daemon = True
            self.interval = interval
        def run(self):
            while True:
                print("The time is %s" % time.ctime())
                time.sleep(self.interval)          
    
    t = ClockThread(5)
    t.start()
    
    time.sleep(30)  

    ● Timer对象

    Timer对象用于在稍后某个时间执行一个函数,调用语法如下:

    Timer(interval, func [,args [,kwargs]])

    此函数创建定时器对象,在interval秒之后运行函数funcargskwargs是传递给func函数的参数元组、参数字典。

    Timer实例支持以下方法:

    属性或方法类型说明
    t.start() 方法 启动定时器,func函数将在指定间隔时间后执行。
    t.cancel() 方法 如果函数尚未执行,取消定时器。

    ● 实用工具函数

    函数说明
    active_count() 返回当前活动的Thread对象数量。
    current_thread() 返回调用者的线程对象。
    enumerate() 列出当前所有活动的Thread对象。
    local() 返回local对象,用于保存线程本地的数据。应该保证此对象在每个线程中是唯一的。
    setprofile(func) 设置一个配置文件函数,用于已创建的所有线程。 func在每个线程开始运行之前被传递给 sys.setprofile()函数。
    settrace(func) 设置一个跟踪函数,用于已创建的所有线程。 func在每个线程开始运行前被传递给 sys.settrace()函数。
    stack_size([size]) 返回创建新线程时使用的栈大小。可选整数参数size表示创建新线程时使用的栈大小。 size的值可以是 32768(32 KB) 或更大,而且是 4096 的倍数。如果系统上不支持此操作, 将引发 ThreadError 异常。

      (3)线程间同步原语资源

    ● 锁(Lock)

    锁(或称为“互斥锁”)有2个状态:已锁定、未锁定。如果锁处于已锁定状态,尝试获取锁的线程将被阻塞, 直到锁被释放为止。如果有多个线程等待获取锁,当锁被释放时,只有一个线程能获得它,具体哪个线程随机。 创建锁的语法如下,初始状态为“未锁定”:

    threading.Lock()

    Lock实例支持以下方法

    实例方法说明
    lock.acquire([blocking]) 获取锁,成功则返回 True。blocking参数默认为 True,当锁为已锁定时,则阻塞本线程。 若blocking设为 False,当无法获取锁时,将立即返回 False,不阻塞。
    lock.realease() 释放一个锁,当锁处于“未锁定”状态时、或从与原本调用acquire()方法的线程不同的线程调用此方法, 将出现错误。

    ● 可重入锁(RLock)

    可重入锁(RLock)类似于Lock对象,但同一个线程可以多次获取它。 这允许拥有锁的线程执行嵌套的acquire()release()操作。 在这种情况下,只有最外层的release()操作才能将锁重置为“未锁定”状态。

    创建可重入锁的语法如下:

    threading.RLock()

    RLock实例支持以下方法

    实例方法说明
    rlock.acquire([blocking]) 获取锁,成功获取则返回 True,而且递归级别被设置为1。如果此线程已拥有锁, 则锁的递归级别加1,而且立即返回。blocking的含义同上。
    rlock.realease() 通过减少锁的递归级别来释放它。如果在减值后递归基本为0,锁将被置位“未锁定”状态。 否则,所继续保持“已锁定”状态。只能由目前拥有锁的线程来调用此函数

    ● 信号量(Semaphore)

    信号量是一个基于计数器的同步原语,每次调用acquire()方法时此计数器减1, 每次调用release()方法时此计数器加1。如果计数器为0,acquire()方法将会阻塞。 直到其他线程调用release()方法为止。

    创建信号量对象的语法如下(value是计数器初始值,默认为1):

    threading.Semaphore([value])

    以下创建有边界的信号量对象(BoundedSemaphore),区别是release()操作的次数不能超过acquire()次数:

    threading.BoundedSemaphore([value])

    Semaphore实例支持以下方法

    实例方法说明
    s.acquire([blocking]) 获取信号量。若计数值大于0,则减1,然后立即返回。 若计数值为0,此方法将阻塞,直到另一个线程调用release()方法为止。 blocking的含义同上。
    s.realease() 通过将内部计数器的值加1来释放一个信号量。如果计数值为0,而且另一线程在等待, 则该线程将被唤醒。

    “信号量”和“互斥锁”之间的差别在于,信号量可用于发信号。例如:可以从不同线程调用 acquire()release()方法,以便在生产者和消费者线程之间进行通信。 也可以用后面的“条件变量”来达成。

    ● 事件(Event)

    “事件”用于线程间通信。一个线程发出事件,一个或多个其他线程等待它。 Event实例内部管理着一个标志,可以用set()方法将它设为True, clear()方法设为False,wait()方法将阻塞线程,知道标志为True。

    创建事件的语法如下:

    threading.Event()

    Event实例支持以下方法

    实例方法说明
    e.is_set() 只有当内部标志为True时才返回True。
    e.set() 将内部标志置为True。等待它变为True的所有线程都将被唤醒。
    e.clear() 将内部标志置为False。
    e.wait([timeout]) 阻塞直到内部标志为True。如果进入时内部标志为True,此方法立即返回。 timeout是一个浮点数,单位为秒,指定超时期限。

    “事件”不适合用于生产者-消费者问题,因为事件只有True和False两种状态, 处理过程中信号可能丢失。最好使用“条件变量”。

    ● 条件变量(Condition)

    “条件变量”是另一种同步原语,典型用于生产者-消费者问题。

    创建条件变量的语法如下,lock是可选的Lock或RLock实例,若缺省则创建新的RLock实例

    threading.Condition([lock])

    Condition实例支持以下方法

    实例方法说明
    cv.acquire(*args) 获取底层锁。此方法将调用底层锁上的acquire(*args)方法。
    cv.realease() 释放底层锁。此方法将调用底层锁上对应的release()方法。
    cv.wait([timeout]) 等待直到获得通知或出现超时为止。此方法在调用线程已经获取锁之后调用。 调用时,将释放底层锁,而且线程将进入后随眠状态,直到另一个线程在条件变量上执行 notify()notifyAll()将其唤醒为止。在线程被唤醒之后, 线程将重新获取锁,方法也会返回。 timeout是一个浮点数,单位为秒,指定超时期限。
    cv.notify([n]) 唤醒一个或多个等待此条件变量的线程。此方法只在调用线程已获取条件变量内部锁之后调用。 如果没有正在等待的线程,它就什么也不做。n指定要唤醒的线程数量,默认为1。
    cv.notify_all() 唤醒所有等待此条件的线程。

    下面为使用条件变量的模板:

    import threading
    
    cv = threading.Condition()
    
    def producer():
        while True:
            cv.acquire()
            produce_item()
            cv.notify()
            cv.release()
            
    def consumer():
        while True:
            cv.acquire()
            while not item_is_available():
                cv.wait()   # 等待直到有项出现
            cv.release()
            consume_item()

    如果存在多个线程等待同一个条件,notify()操作可能唤醒他们中的一个或多个。 因此某个线程被唤醒后,可能发现它等待的条件不存在了,所以在consumer()函数中使用while循环, 如果线程醒来,但是生成的项已经消失,它就会回去等待下一个信号。

    ● 使用线程间资源的注意点

    使用以上Lock等之类的线程间资源时,必须非常小心,依赖锁的代码应保证出现异常时正确地释放锁 否则可能导致死锁,典型的代码如下所示:

    try:
        lock.acquire()
        # 关键部分
        ......
    finally:
        lock.release()

    使用上下文管理协议(with),更加简洁:

    with lock:
        # 关键部分
        ......

    另外,编写代码时,一般应该避免同时获取多个锁。

      (4)queue

    queue模块实现了各种“多生产者-多消费者队列”,可用于在执行的多个线程间安全地交换信息。 一般来说,线程间通信最佳的方式就是使用queue,者比前面的诸如“锁”之类的资源都要好用。

    queue模块定义了以下3种不同的队列类型,创建语法与说明如下:

    创建函数说明
    Queue([maxsize]) 创建一个FIFO(先进先出)队列。maxsize是队列中可放入项的最大数量, 缺省或置0则队列大小为无穷大。
    LifoQueue([maxsize]) 创建一个LIFO(后进先出)队列。(即:堆栈)
    PriorityQueue([maxsize]) 创建一个优先级队列,使用这种队列时,项应该是(priority, data)形式的元组, 其中priority是一个数字,数字越大优先级越高。

    队列支持以下实例方法:

    实例方法说明
    q.qsize() 返回队列的大小,因为其他线程可能正在更新队列,此方法返回的数字可能不可靠。
    q.empty() 如果队列为空,则返回True,否则返回False。
    q.full() 如果队列为满,则返回True,否则返回False。
    q.put(item [,block [,timeout]]) item放入队列,如果block为True(默认), 则调用者将阻塞直到队列中出现可用的空闲位置为止;如block为False, 队列满时将引发Full异常。timeout提供可选的超时值,单位为秒, 如果超时则引发Full异常。
    q.put_nowait(item) 等价于q.put(item, False)方法。
    q.get([block [,timeout]]) 从队列中取出一项返回。如果block为True(默认), 则调用者将阻塞直到队列中出现可取出的项为止;如block为False, 队列空时将引发Full异常。timeout提供可选的超时值,单位为秒, 如果超时则引发Full异常。
    q.get_nowait() 等价于q.get(False)方法。
    q.task_done() 队列的消费者用来指示对于项的处理已结束。如果使用此方法, 那么从队列中每取出一项都应该手动调用一次。 此方法主要是辅助q.join()方法用的。
    q.join() 阻塞直到队列中所有的项均被取出和处理完为止。当为队列中的每一项都调用过了一次 q.task_done()方法,此方法将会直接返回。

    ● 使用队列的示例

    注意下例中q.task_done()q.join()的用法, 它们将使得线程在处理完所有项后关闭。

    import threading
    from queue import Queue
    
    class WorkerThread(threading.Thread):
        def __init__(self, *args, **kwargs):
            threading.Thread.__init__(self, *args, **kwargs)
            self.input_queue = Queue()
            
        def send(self, item):
            self.input_queue.put(item)
            
        def close(self):
            self.input_queue.put(None)
            self.input_queue.join()
            
        def run(self):
            while True:
                item = self.input_queue.get()
                if item is None:
                    break
                # 处理项
                print(item)
                self.input_queue.task_done()        
            # 完成
            self.input_queue.task_done()
            return
            
            
    # 使用示例
    w = WorkerThread()
    w.start()
    w.send('hello')
    w.send('world')
    w.close()

     

    返回目录

  • 相关阅读:
    java-03 方法
    cm 安装
    java-02 for循环,while循环
    java-01
    Storm入门,看这篇就够了
    Storm入门,看这篇就够了
    基于Spark的电影推荐系统(电影网站)
    基于Spark的电影推荐系统(实战简介)
    基于Spark的电影推荐系统(Scrapy爬虫)
    基于Spark的电影推荐系统(后台管理系统)
  • 原文地址:https://www.cnblogs.com/initcircuit/p/12273589.html
Copyright © 2011-2022 走看看