zoukankan      html  css  js  c++  java
  • [python 并行2]线程

    线程篇

    基本使用

    python线程使用的两个模块为: _thread (不推荐再使用)threading
    (查看threading的源码可以发现,threading实际是对_thread进一步的封装,官方将其称为 Low-level threading API,下面简单尝试使用_thread)

    调用start_new_thread()函数生成新线程
    函数声明:_thread.start_new_thread(function, args[, kwargs])
    function: 子线程所执行的函数
    args: 传递的参数,参数类型必须是元组
    kwargs:可选参数

    示例:

    #!usr/bin/env python
    #coding=utf-8
    
    import _thread
    import time
    
    def func(t_name):
        time.sleep(1)
        print(t_name, 'end')
    
    _thread.start_new_thread(func, ('my_thread_1',))    # 传递的参数必须是元组类型
    print('main thread end')
    time.sleep(2)   # 暂停一下等待子线程,避免主线程结束后直接退出,看不到子线程的输出
    

    输出

    main thread end
    my_thread_1 end
    

    更多_thread — Low-level threading API

    threading模块

    需要 import threading
    threading模块提供了比_thread模块更加高级别的方法,如下:

    • threading.active_count(): 返回当前运行的线程数量
    • threading.current_thread(): 返回当前运行的线程对象
    • threading.get_ident(): 返回当前运行的线程标识码
    • threading.enumerate(): 获取运作着的线程对象的列表(包含设置了daemon属性的后台线程)
    • threading.main_thread(): 获取主线程对象

    threading模块包含Thread类来处理线程
    函数声明:class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
    (group: 官方预留的参数)
    target: 子线程要执行的函数
    name: 给子线程命名
    args: 传递参数到要执行的函数中 (类型为元组)
    daemon: 将线程设置为后台线程 1

    Thread类包含的方法:

    • start(): 开始线程,它会安排在单独的控制线程中使该对象的run()方法被调用 (invoked) 2 (如果多次调用,会raise RuntimeError
    • run(): 你可以在子类中重写这个方法,标准的run()方法会在构造器传递了target参数后调用它
    • join(timeout=None): 阻塞当前线程,直到等待调用了join()的线程结束,或到达设置的超时timeout的参数为止 (如果尝试加入当前线程 3,因为会发生死锁,join()会raise RuntimeError。在线程启动前调用join()也会报相同的错误)
    • name: 线程名
    • ident: 线程标识码 (如果线程未start(),则为None。实测线程结束后,ident值还存在)
    • is_alive(): 判断线程是否在运行
    • daemon: 是否为后台线程的属性值
    • isDaemon(): 判断是否为后台线程

    更多threading — Thread-based parallelism

    函数形式

    使用threading.Thread类实例化对象,再调用start()方法运行

    示例:

    #!usr/bin/env python
    #coding=utf-8
    
    import threading
    import time
    
    def func():
        print(threading.current_thread().name, ' start')
        time.sleep(1)
        print(threading.current_thread().name, ' end')
    
    t1 = threading.Thread(target=func)   # 创建线程
    t2 = threading.Thread(target=func)
    
    t1.start()   # 开始线程
    t2.start()
    
    # t1.join()    # 等待该线程结束后,再往下执行
    # t2.join()
    
    print('main thread end')    # 使用threading模块Thread类的线程,程序需要等待全部线程执行完后才退出
    

    输出

    Thread-1  start
    Thread-2  start
    main thread end
    Thread-1  end
    Thread-2  end
    

    继承类的形式

    通过继承threading.Thread类,可以重写run()方法,再实例化该类,调用start()方法运行
    (继承Thread类,并不是非要重写run()

    示例:

    #!usr/bin/env python
    #coding=utf-8
    
    import threading
    import time
    
    class MyThread(threading.Thread):
        def __init__(self, name):
            threading.Thread.__init__(self)
            self.name = name
        
        def run(self):
            print(self.name + ' start')
            time.sleep(1)
            print(self.name + ' end')
    
    t1 = MyThread('thread1')
    t2 = MyThread('thread2')
    
    t1.start()
    t2.start()
    
    # t1.join()
    # t2.join()
    
    print('main thread end')
    

    输出

    thread1 start
    thread2 start
    main thread end
    thread1 end
    thread2 end
    

    线程同步

    当属于并发线程的多个操作尝试访问共享内存,并且至少有一个操作能修改数据的状态时,这时如果没有恰当的同步机制,可能会发生意外情况或bug。使用可以解决此问题。
    当一个线程想要访问共享内存的某一部分区域时,它必须再使用前获取到该部分的锁。并在操作完后,要释放掉之前获取到的锁。

    注意! 要避免死锁 4的情况发生

    使用Lock实现线程同步

    使用threading.Lock()实例化Lock锁对象
    在共享资源操作的部分,调用Lock的方法acquire()获取锁
    结束操作后,调用Lock的方法release()释放锁,以便于其它线程使用该资源

    (函数声明:
    acquire(blocking=True, timeout=-1)
    获取锁,并阻塞其它线程访问这部分资源
    blocking[ 5: 设置为False的线程不会被阻塞 (并且timeout设置为默认值-1时,失去同步效果。设置为非-1值时,被设置为True的线程阻塞,则False立即返回。这2种情况都会提示错误信息)
    timeout: 设置等待的超时值,-1为无限等待,超时后无视阻塞
    返回值为True成功获取锁定,False反之(例如超时到期)
    release()
    在未锁的资源上调用释放锁方法,会引发RuntimeError

    示例:

    #!usr/bin/env python
    #coding=utf-8
    
    import threading
    import time
    
    lock = threading.Lock()     # 创建Lock锁
    num = 0     # 累加这个变量,观察不同步的情况出现
    
    def func():
        lock.acquire()  # 获取锁
        global num
        for i in range(1000000):    # 如果未出现不同步,是由于运算太快,加大循环值
            num += 1
        lock.release()  # 释放锁
    
    t1 = threading.Thread(target=func)
    t2 = threading.Thread(target=func)
    
    t1.start()
    t2.start()
    
    t1.join()
    t2.join()
    
    print(num)
    
    有锁情况下,输出:
    2000000
    
    无锁情况下,不同步,输出:
    第一次输出:
    1253312
    第二次输出:
    1227567
    第三次输出:
    1309097
    </pre>
    

    注! 书中并不提倡使用锁来解决,因为可能会导致死锁情况发生,也会对代码的可读性产生影响,调试困难

    使用RLock实现线程同步

    可重入锁(reentrant lock)
    操作方式同Lock锁

    与Lock的区别:RLock在 同一个线程中可以多次acquire()获取锁而不发生阻塞 (这是为了解决一些特殊场景的使用)

    # 部分代码
    lock = threading.RLock()     # 创建RLock锁
    
    def func():
        lock.acquire()  # 获取锁
        lock.acquire()
        global num
        for i in range(1000000):
            num += 1
        lock.release()  # 释放锁
        lock.release()
    

    注意! acquire()需要成对使用

    使用信号量实现线程同步

    信号量的提出,首次用在操作系统中。它是一个操作系统管理的抽象数据类型,用于同步多个线程对共享资源与数据的访问
    (本质上,信号量是由一个内部变量构成的,它标识出了对其所关联的资源的并发访问量)

    使用threading.Semaphore()创建对象
    在线程模块中,信号量的操作基于acquire()release()

    当一个线程想使用一个资源,它需要调用acquire(),会判断信号量内部变量值_value,如果为0则阻塞线程,并且进行timeout超时处理,如果_value不为0,线程运行 (由于信号量的初始值为非负数,故设计中不存在负数情况的代码)
    当一个线程使用完一个资源后,它需要调用release(),该操作会增加信号量的内部变量值,并通知等待的线程
    注! 书中的描述和threading模块的源码不符,重新按源码的理解写)

    注意! acquire()release()并不需要放在某段代码的前后,来锁住某段资源

    示例:

    (由于书中给的示例代码,感觉很符合理解信号量的特点,这里也采用生产者和消费者的关系编写代码)

    #!usr/bin/env python
    #coding=utf-8
    
    import threading
    import time
    import random
    
    # 可选参数为内部变量_value赋初值,默认为1
    # 如果赋的值小于0,会raise ValueError异常
    sem = threading.Semaphore(0)
    
    def producer():
        global item
        time.sleep(1)
        item = random.randint(0, 1000)
        print('producer: produced', item)
        sem.release()   # 释放信号量,将内部_value加1,并通知其它等待的线程
    
    def consumer():
        print('consumer is waiting')
        sem.acquire()   # 获取信号量,值等于0则阻塞线程,否则内部_value减1,并继续运行
        print('consumer: sonsumed', item)
    
    t1 = threading.Thread(target=producer)
    t2 = threading.Thread(target=consumer)
    
    t1.start()
    t2.start()
    

    输出

    consumer is waiting
    producer: produced 295
    consumer: sonsumed  295
    

    分析:多执行几次,逻辑上可以发现消费者总需要等待生产者生产出产品后,才能消费

    可以看出信号量很适合这样的场景,下面可以测试,生产者可以多生产几个,消费者再消费

    >>> import threading
    >>> sem = threading.Semaphore()
    >>> sem.acquire()    # 获取初始化的信号量
    True
    >>> sem.release()    # 信号量+1
    >>> sem.release()    # 信号量+1
    >>> sem.release()    # 信号量+1
    >>> sem.acquire()    # 信号量-1
    True
    >>> sem.acquire()    # 信号量-1
    True
    >>> sem.acquire()    # 信号量-1
    True
    >>> sem.acquire()    # 由于信号量=0
        # 所以陷入了阻塞
    

    使用条件实现线程同步

    使用threading.Condition()创建对象

    查看Condition的源码,发现可以传入一个锁作为初始化参数。如果不传,默认会赋值RLock锁来进行后续的锁的操作 acquire()release()

    # Condition类的初始化部分源码
        def __init__(self, lock=None):
            if lock is None:
                lock = RLock()
            self._lock = lock
            # Export the lock's acquire() and release() methods
            self.acquire = lock.acquire
            self.release = lock.release
            ...
    

    示例:

    (下例还是使用生产者和消费者的关系编写示例代码,用items作为存储容器,以满了(10个)就不能再生产作为条件,以没了(0个)就不能再消费作为条件)

    #!usr/bin/env python
    #coding=utf-8
    
    import threading
    import time
    
    condition = threading.Condition()   # 创建条件
    items = []      # 作为产品的存储容器,设达到10个为满了,就不能再生产了
    
    def producer():
        global items
        condition.acquire()     # 获取锁
        if len(items) == 10:
            print('producer: stop produce')
            condition.wait()    # 等待(items达到10,等待消费者消费)
        items.append('')
        print('producer: produced', len(items))
        condition.notify()      # 通知等待的线程
        condition.release()     # 释放锁
        
    def consumer():
        global items
        condition.acquire()     # 获取锁
        if len(items) == 0:
            print('consumer: waiting')
            condition.wait()    # 等待(items为0,等待生产者生产)
        items.pop()       # 注! 注释掉这行,会发现,等待wait()在接收到通知notify()后,并没有再次判断条件,直接就接着运行了
        print('consumer: sonsumed', len(items))
        condition.notify()      # 通知等待的线程
        condition.release()     # 释放锁
    
    
    # producer_loop()和consumer_loop()用来多次循环运行,为了达到items为0或10的情况
    def producer_loop():
        for i in range(20):
            time.sleep(1)
            producer()
    
    def consumer_loop():
        for i in range(20):
            time.sleep(4)
            consumer()
    
    
    t1 = threading.Thread(target=producer_loop)
    t2 = threading.Thread(target=consumer_loop)
    
    t1.start()
    t2.start()
    

    输出

    producer: produced 1
    producer: produced 2
    producer: produced 3
    consumer: sonsumed 2
    producer: produced 3
    producer: produced 4
    producer: produced 5
    producer: produced 6
    consumer: sonsumed 5
    producer: produced 6
    producer: produced 7
    producer: produced 8
    producer: produced 9
    consumer: sonsumed 8
    producer: produced 9
    producer: produced 10
    producer: stop produce
    consumer: sonsumed 9
    producer: produced 10
    producer: stop produce
    consumer: sonsumed 9
    producer: produced 10
    producer: stop produce
    consumer: sonsumed 9
    producer: produced 10
    producer: stop produce
    consumer: sonsumed 9
    producer: produced 10
    producer: stop produce
    consumer: sonsumed 9
    producer: produced 10
    producer: stop produce
    consumer: sonsumed 9
    producer: produced 10
    producer: stop produce
    consumer: sonsumed 9
    producer: produced 10
    consumer: sonsumed 9
    consumer: sonsumed 8
    consumer: sonsumed 7
    consumer: sonsumed 6
    consumer: sonsumed 5
    consumer: sonsumed 4
    consumer: sonsumed 3
    consumer: sonsumed 2
    consumer: sonsumed 1
    consumer: sonsumed 0
    

    分析:观察可以发现,生产者生产满了10个就会进入等待wait(),直到消费者通知notify()
    为了观察消费者消费到0个的情况,可以将生产者和消费者循环的等待时间做调整

    使用事件实现线程同步

    查看Event类的源码,发现内部使用的是条件Condition类实现,并传入了Lock锁。事件通过对内部的标志_flag进行管理来实现线程同步
    使用set()方法可以将标志设为True
    使用clear()方法将其重置为False
    使用wait()方法阻塞线程

    # Event类的初始化源码
        def __init__(self):
            self._cond = Condition(Lock())
            self._flag = False
    

    注意! 并不存在set()clear()放在某段代码的前后,来锁住某段资源

    示例:
    (还是采用生产者和消费者的关系编写,生产者做完工作,调用set()设置_flag标志,并通知wait()等待的消费者线程运行;再使用clear()清除_flag标志,以便后面的消费者能正确的进入等待。这个过程类似于事件的触发)

    #!usr/bin/env python
    #coding=utf-8
    
    import threading
    import time
    import random
    
    event = threading.Event()   # 创建事件
    items = [1,2,3] # 设置个初值,便于后面注释event.clear()后的测试(可以发现,消费者不等待了)
    
    def producer():
        global items
        print('producer: start')
        items.append(random.randint(0,100))
        event.set()     # 将内部标志_flag设为True,并通知所有等待的线程(类似于触发事件)
        print('producer: notify')
        event.clear()   # 将内部标志_flag设为False(只有清除了_flag,消费者下一次的wait()操作才会正常进入等待)(clear()操作也可以交给消费者调用,不过为了简化消费者的操作,让消费者只需要等待通知即可)
        print('producer: end')
        
    def consumer():
        global items
        print('consumer: waiting')
        event.wait()    # 等待(等待生产者通知,根据_flag标志判断是否进入等待)
        print('consumer:', items.pop())
    
    
    # producer_loop()和consumer_loop()用来多次循环运行
    def producer_loop():
        for i in range(3):
            time.sleep(1)
            producer()
    
    def consumer_loop():
        while True: # 消费者有wait()等待,就不用线程休眠了,以免错过生产者的set()通知
            consumer()
    
    
    t1 = threading.Thread(target=producer_loop)
    t2 = threading.Thread(target=consumer_loop)
    
    t1.start()
    t2.start()
    

    输出

    consumer: waiting
    producer: start
    producer: notify
    consumer: 57
    consumer: waiting
    producer: end
    producer: start
    producer: notify
    consumer: 10
    producer: end
    consumer: waiting
    producer: start
    producer: notify
    consumer: 24
    producer: end
    consumer: waiting
    

    如果注释掉代码中的# event.clear()一行,会出现如下输出

    consumer: waiting
    producer: start
    producer: notify
    consumer: 19
    producer: end
    consumer: waiting
    consumer: 3
    consumer: waiting
    consumer: 2
    consumer: waiting
    consumer: 1
    consumer: waiting
    Exception in thread Thread-2:
    Traceback (most recent call last):
      File "D:appPythonPython37lib	hreading.py", line 917, in _bootstrap_inner
        self.run()
      File "D:appPythonPython37lib	hreading.py", line 865, in run
        self._target(*self._args, **self._kwargs)
      File "g:/tmp/code.py", line 35, in consumer_loop
        consumer()
      File "g:/tmp/code.py", line 24, in consumer
        print('consumer:', items.pop())
    IndexError: pop from empty list
    

    producer: start

    producer: notify
    producer: end
    producer: start
    producer: notify
    producer: end
    

    分析:会发现,消费者没有等待(打印出的consumer: waiting,只是给自己的提示,实际没有等待),直接进行了列表的pop()操作,直到后面列表为空再弹出时报错为止


    附:其它

    使用with语句 6

    (由于没有理解部分,这一部分基本就是书中原文)

    with: 是Python 2.5中引入的。当有两个相关的操作需要对一个代码块承兑执行时,with语句的作用就彰显出来了。它可以再自动精确的分配或释放资源 (因此也被称为上下文管理器)。如线程模块中,使用到acquire()release()方法的地方,都可以采用with语句块,如下:

    • Lock
    • RLock
    • 条件
    • 信号量 (感觉这用了with就不太灵活了)

    示例:
    (会测试上述列表中的with操作)

    #!usr/bin/env python
    #coding=utf-8
    
    import threading
    
    def func_with(statement):
        with statement: # 会自动进行acquire()和release()
            print('//todo1')
    
    def func_not_with(statement):
        statement.acquire()
        try:    # 为了避免出现异常,导致没有release()释放
            print('//todo2')
        finally:
            statement.release()
    
    lock = threading.Lock()
    rlock = threading.RLock()
    condition = threading.Condition()
    sem = threading.Semaphore(1)    # 采用with,需要初始至少有一个信号量值(因为需要先acquire())
    
    li = [lock, rlock, condition, sem]
    
    for statement in li:
        t1 = threading.Thread(target=func_with, args=(statement,))
        t2 = threading.Thread(target=func_not_with, args=(statement,))
    
        t1.start()
        t2.start()
    

    使用队列实现线程通信

    虽然python线程模块提供了很多同步原语 信号量条件事件,但有时候,在使用场景中,可能采用队列模块会是个最佳选择。它使得线程编程变得更加容易和安全

    使用队列Queue的方式处理,尽管它不属于threading模块,但查看其源码,发现队列的功能实现有用到threading模块

    # Queue的初始化源码
        def __init__(self, maxsize=0):
            self.maxsize = maxsize
            self._init(maxsize)
    
            # mutex must be held whenever the queue is mutating.  All methods
            # that acquire mutex must release it before returning.  mutex
            # is shared between the three conditions, so acquiring and
            # releasing the conditions also acquires and releases mutex.
            self.mutex = threading.Lock()
    
            # Notify not_empty whenever an item is added to the queue; a
            # thread waiting to get is notified then.
            self.not_empty = threading.Condition(self.mutex)
    
            # Notify not_full whenever an item is removed from the queue;
            # a thread waiting to put is notified then.
            self.not_full = threading.Condition(self.mutex)
    
            # Notify all_tasks_done whenever the number of unfinished tasks
            # drops to zero; thread waiting to join() is notified to resume
            self.all_tasks_done = threading.Condition(self.mutex)
            self.unfinished_tasks = 0
    

    Queue会用到如下方法:

    • put(): 添加一个项目到队列
    • get(): 从队列中取出一个项目
    • task_done(): 每处理完一个项目,需要调用该方法
    • join(): 阻塞线程,等待全部的任务完成

    附:查看源代码,分析可得Queue内部的操作是这样的:(这段可以不用看)

    • 调用put() -> with队列满(条件锁not_full) -> 能阻塞?(参数block) & 能超时?(参数timeout) & 需要等待?(条件锁not_full.wait()) -> 添加数据(内部方法_put()) -> 任务计数加1(内部计数变量unfinished_tasks += 1) -> 发起队列非空的通知(条件锁not_empty.notify())
    • 调用get() -> with队列空(条件锁not_empty) -> 能阻塞?(参数block) & 能超时?(参数timeout) & 需要等待?(条件锁not_empty.wait()) -> 取出数据(内部方法_get()) -> 发起队列非满的通知(条件锁not_full.notify()) ( 注意! get()put()操作相比并没有对任务计数操作,需要通过后面task_done()完成任务方法来减少任务数)
    • *调用task_done() -> with任务完成(条件锁all_tasks_done) -> 判断全部任务完成了?是的话发起通知(条件锁all_tasks_done.notify_all()) -> 任务计数减1(内部计数变量unfinished_tasks -= 1) *
    • 调用join() -> with任务完成(条件锁all_tasks_done) -> 循环未完成的任务计数变量(内部计数变量unfinished_tasks) -> 还有没完成的任务,等待(条件锁all_tasks_done.wait() | 全部完成,退出循环,解除线程阻塞)

    示例:

    #!usr/bin/env python
    #coding=utf-8
    
    import threading
    from queue import Queue
    import time
    import random
    
    queue = Queue()
    
    def producer():
        for i in range(5):
            item = random.randint(0, 100)
            queue.put(item)
            print('producer:', 'put', item)
            time.sleep(1)
    
    def consumer():
        while True:
            item = queue.get()
            print('consumer:', 'get', item)
            queue.task_done()
    
    t1 = threading.Thread(target=producer)
    t2 = threading.Thread(target=consumer)
    
    t1.start()
    t2.start()
    

    输出

    producer: put 47
    consumer: get 47
    producer: put 71
    consumer: get 71
    producer: put 99
    consumer: get 99
    producer: put 30
    consumer: get 30
    producer: put 75
    consumer: get 75
    

    1.后台线程: 后台线程在主线程停止后就直接停止运行。他们资源(如打开的文件,数据库事务等)可能不会被正确的释放。如果你想要你的线程优美的停止,让他们不要变为后台和使用一个合适的信号机制如事件Event

    2.nvoke: 官方文档中使用invoke一词,我并没有更好的翻译,因为其它语言中invoke反射是一种技术手段,但Google的翻译中,将此解释为调用 (是我想多了)

    3.加入线程: 加入线程?不理解如何能加入线程,并且官方文档说会死锁。实测,创建2个线程互相join(),虽然陷入死循环,但并没抛出错误 // TODO: 不知理解有偏差没有

    4.死锁: 多个对象,互持对方所需资源的锁,导致都无法访问

    5.blocking: 锁的acquire()方法的参数,有点难理解,文中所写是结合官方文档和实测的结果描述所得。不过一般,我们都不用改变它的默认值 // TODO: 没有从源码分析(找不到源码)

    6.参考书籍:《Python并行编程手册》

  • 相关阅读:
    700. Search in a Binary Search Tree
    100. Same Tree
    543. Diameter of Binary Tree
    257. Binary Tree Paths
    572. Subtree of Another Tree
    226. Invert Binary Tree
    104. Maximum Depth of Binary Tree
    1、解决sublime打开文档,出现中文乱码问题
    移植seetafaceengine-master、opencv到ARM板
    ubuntu16.04-交叉编译-SeetaFaceEngine-master
  • 原文地址:https://www.cnblogs.com/maplesnow/p/12044360.html
Copyright © 2011-2022 走看看