zoukankan      html  css  js  c++  java
  • [python 并行2]线程

    线程篇

    基本使用

    python线程使用的两个模块为: _thread (不推荐再使用)threading
    (查看threading的源码可以发现,threading实际是对_thread进一步的封装,官方将其称为 Low-level threading API,下面简单尝试使用_thread)

    调用start_new_thread()函数生成新线程
    函数声明:_thread.start_new_thread(function, args[, kwargs])
    function: 子线程所执行的函数
    args: 传递的参数,参数类型必须是元组
    kwargs:可选参数

    示例:

    #!usr/bin/env python
    #coding=utf-8
    
    import _thread
    import time
    
    def func(t_name):
        time.sleep(1)
        print(t_name, 'end')
    
    _thread.start_new_thread(func, ('my_thread_1',))    # 传递的参数必须是元组类型
    print('main thread end')
    time.sleep(2)   # 暂停一下等待子线程,避免主线程结束后直接退出,看不到子线程的输出
    

    输出

    main thread end
    my_thread_1 end
    

    更多_thread — Low-level threading API

    threading模块

    需要 import threading
    threading模块提供了比_thread模块更加高级别的方法,如下:

    • threading.active_count(): 返回当前运行的线程数量
    • threading.current_thread(): 返回当前运行的线程对象
    • threading.get_ident(): 返回当前运行的线程标识码
    • threading.enumerate(): 获取运作着的线程对象的列表(包含设置了daemon属性的后台线程)
    • threading.main_thread(): 获取主线程对象

    threading模块包含Thread类来处理线程
    函数声明:class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
    (group: 官方预留的参数)
    target: 子线程要执行的函数
    name: 给子线程命名
    args: 传递参数到要执行的函数中 (类型为元组)
    daemon: 将线程设置为后台线程 1

    Thread类包含的方法:

    • start(): 开始线程,它会安排在单独的控制线程中使该对象的run()方法被调用 (invoked) 2 (如果多次调用,会raise RuntimeError
    • run(): 你可以在子类中重写这个方法,标准的run()方法会在构造器传递了target参数后调用它
    • join(timeout=None): 阻塞当前线程,直到等待调用了join()的线程结束,或到达设置的超时timeout的参数为止 (如果尝试加入当前线程 3,因为会发生死锁,join()会raise RuntimeError。在线程启动前调用join()也会报相同的错误)
    • name: 线程名
    • ident: 线程标识码 (如果线程未start(),则为None。实测线程结束后,ident值还存在)
    • is_alive(): 判断线程是否在运行
    • daemon: 是否为后台线程的属性值
    • isDaemon(): 判断是否为后台线程

    更多threading — Thread-based parallelism

    函数形式

    使用threading.Thread类实例化对象,再调用start()方法运行

    示例:

    #!usr/bin/env python
    #coding=utf-8
    
    import threading
    import time
    
    def func():
        print(threading.current_thread().name, ' start')
        time.sleep(1)
        print(threading.current_thread().name, ' end')
    
    t1 = threading.Thread(target=func)   # 创建线程
    t2 = threading.Thread(target=func)
    
    t1.start()   # 开始线程
    t2.start()
    
    # t1.join()    # 等待该线程结束后,再往下执行
    # t2.join()
    
    print('main thread end')    # 使用threading模块Thread类的线程,程序需要等待全部线程执行完后才退出
    

    输出

    Thread-1  start
    Thread-2  start
    main thread end
    Thread-1  end
    Thread-2  end
    

    继承类的形式

    通过继承threading.Thread类,可以重写run()方法,再实例化该类,调用start()方法运行
    (继承Thread类,并不是非要重写run()

    示例:

    #!usr/bin/env python
    #coding=utf-8
    
    import threading
    import time
    
    class MyThread(threading.Thread):
        def __init__(self, name):
            threading.Thread.__init__(self)
            self.name = name
        
        def run(self):
            print(self.name + ' start')
            time.sleep(1)
            print(self.name + ' end')
    
    t1 = MyThread('thread1')
    t2 = MyThread('thread2')
    
    t1.start()
    t2.start()
    
    # t1.join()
    # t2.join()
    
    print('main thread end')
    

    输出

    thread1 start
    thread2 start
    main thread end
    thread1 end
    thread2 end
    

    线程同步

    当属于并发线程的多个操作尝试访问共享内存,并且至少有一个操作能修改数据的状态时,这时如果没有恰当的同步机制,可能会发生意外情况或bug。使用可以解决此问题。
    当一个线程想要访问共享内存的某一部分区域时,它必须再使用前获取到该部分的锁。并在操作完后,要释放掉之前获取到的锁。

    注意! 要避免死锁 4的情况发生

    使用Lock实现线程同步

    使用threading.Lock()实例化Lock锁对象
    在共享资源操作的部分,调用Lock的方法acquire()获取锁
    结束操作后,调用Lock的方法release()释放锁,以便于其它线程使用该资源

    (函数声明:
    acquire(blocking=True, timeout=-1)
    获取锁,并阻塞其它线程访问这部分资源
    blocking[ 5: 设置为False的线程不会被阻塞 (并且timeout设置为默认值-1时,失去同步效果。设置为非-1值时,被设置为True的线程阻塞,则False立即返回。这2种情况都会提示错误信息)
    timeout: 设置等待的超时值,-1为无限等待,超时后无视阻塞
    返回值为True成功获取锁定,False反之(例如超时到期)
    release()
    在未锁的资源上调用释放锁方法,会引发RuntimeError

    示例:

    #!usr/bin/env python
    #coding=utf-8
    
    import threading
    import time
    
    lock = threading.Lock()     # 创建Lock锁
    num = 0     # 累加这个变量,观察不同步的情况出现
    
    def func():
        lock.acquire()  # 获取锁
        global num
        for i in range(1000000):    # 如果未出现不同步,是由于运算太快,加大循环值
            num += 1
        lock.release()  # 释放锁
    
    t1 = threading.Thread(target=func)
    t2 = threading.Thread(target=func)
    
    t1.start()
    t2.start()
    
    t1.join()
    t2.join()
    
    print(num)
    
    有锁情况下,输出:
    2000000
    
    无锁情况下,不同步,输出:
    第一次输出:
    1253312
    第二次输出:
    1227567
    第三次输出:
    1309097
    </pre>
    

    注! 书中并不提倡使用锁来解决,因为可能会导致死锁情况发生,也会对代码的可读性产生影响,调试困难

    使用RLock实现线程同步

    可重入锁(reentrant lock)
    操作方式同Lock锁

    与Lock的区别:RLock在 同一个线程中可以多次acquire()获取锁而不发生阻塞 (这是为了解决一些特殊场景的使用)

    # 部分代码
    lock = threading.RLock()     # 创建RLock锁
    
    def func():
        lock.acquire()  # 获取锁
        lock.acquire()
        global num
        for i in range(1000000):
            num += 1
        lock.release()  # 释放锁
        lock.release()
    

    注意! acquire()需要成对使用

    使用信号量实现线程同步

    信号量的提出,首次用在操作系统中。它是一个操作系统管理的抽象数据类型,用于同步多个线程对共享资源与数据的访问
    (本质上,信号量是由一个内部变量构成的,它标识出了对其所关联的资源的并发访问量)

    使用threading.Semaphore()创建对象
    在线程模块中,信号量的操作基于acquire()release()

    当一个线程想使用一个资源,它需要调用acquire(),会判断信号量内部变量值_value,如果为0则阻塞线程,并且进行timeout超时处理,如果_value不为0,线程运行 (由于信号量的初始值为非负数,故设计中不存在负数情况的代码)
    当一个线程使用完一个资源后,它需要调用release(),该操作会增加信号量的内部变量值,并通知等待的线程
    注! 书中的描述和threading模块的源码不符,重新按源码的理解写)

    注意! acquire()release()并不需要放在某段代码的前后,来锁住某段资源

    示例:

    (由于书中给的示例代码,感觉很符合理解信号量的特点,这里也采用生产者和消费者的关系编写代码)

    #!usr/bin/env python
    #coding=utf-8
    
    import threading
    import time
    import random
    
    # 可选参数为内部变量_value赋初值,默认为1
    # 如果赋的值小于0,会raise ValueError异常
    sem = threading.Semaphore(0)
    
    def producer():
        global item
        time.sleep(1)
        item = random.randint(0, 1000)
        print('producer: produced', item)
        sem.release()   # 释放信号量,将内部_value加1,并通知其它等待的线程
    
    def consumer():
        print('consumer is waiting')
        sem.acquire()   # 获取信号量,值等于0则阻塞线程,否则内部_value减1,并继续运行
        print('consumer: sonsumed', item)
    
    t1 = threading.Thread(target=producer)
    t2 = threading.Thread(target=consumer)
    
    t1.start()
    t2.start()
    

    输出

    consumer is waiting
    producer: produced 295
    consumer: sonsumed  295
    

    分析:多执行几次,逻辑上可以发现消费者总需要等待生产者生产出产品后,才能消费

    可以看出信号量很适合这样的场景,下面可以测试,生产者可以多生产几个,消费者再消费

    >>> import threading
    >>> sem = threading.Semaphore()
    >>> sem.acquire()    # 获取初始化的信号量
    True
    >>> sem.release()    # 信号量+1
    >>> sem.release()    # 信号量+1
    >>> sem.release()    # 信号量+1
    >>> sem.acquire()    # 信号量-1
    True
    >>> sem.acquire()    # 信号量-1
    True
    >>> sem.acquire()    # 信号量-1
    True
    >>> sem.acquire()    # 由于信号量=0
        # 所以陷入了阻塞
    

    使用条件实现线程同步

    使用threading.Condition()创建对象

    查看Condition的源码,发现可以传入一个锁作为初始化参数。如果不传,默认会赋值RLock锁来进行后续的锁的操作 acquire()release()

    # Condition类的初始化部分源码
        def __init__(self, lock=None):
            if lock is None:
                lock = RLock()
            self._lock = lock
            # Export the lock's acquire() and release() methods
            self.acquire = lock.acquire
            self.release = lock.release
            ...
    

    示例:

    (下例还是使用生产者和消费者的关系编写示例代码,用items作为存储容器,以满了(10个)就不能再生产作为条件,以没了(0个)就不能再消费作为条件)

    #!usr/bin/env python
    #coding=utf-8
    
    import threading
    import time
    
    condition = threading.Condition()   # 创建条件
    items = []      # 作为产品的存储容器,设达到10个为满了,就不能再生产了
    
    def producer():
        global items
        condition.acquire()     # 获取锁
        if len(items) == 10:
            print('producer: stop produce')
            condition.wait()    # 等待(items达到10,等待消费者消费)
        items.append('')
        print('producer: produced', len(items))
        condition.notify()      # 通知等待的线程
        condition.release()     # 释放锁
        
    def consumer():
        global items
        condition.acquire()     # 获取锁
        if len(items) == 0:
            print('consumer: waiting')
            condition.wait()    # 等待(items为0,等待生产者生产)
        items.pop()       # 注! 注释掉这行,会发现,等待wait()在接收到通知notify()后,并没有再次判断条件,直接就接着运行了
        print('consumer: sonsumed', len(items))
        condition.notify()      # 通知等待的线程
        condition.release()     # 释放锁
    
    
    # producer_loop()和consumer_loop()用来多次循环运行,为了达到items为0或10的情况
    def producer_loop():
        for i in range(20):
            time.sleep(1)
            producer()
    
    def consumer_loop():
        for i in range(20):
            time.sleep(4)
            consumer()
    
    
    t1 = threading.Thread(target=producer_loop)
    t2 = threading.Thread(target=consumer_loop)
    
    t1.start()
    t2.start()
    

    输出

    producer: produced 1
    producer: produced 2
    producer: produced 3
    consumer: sonsumed 2
    producer: produced 3
    producer: produced 4
    producer: produced 5
    producer: produced 6
    consumer: sonsumed 5
    producer: produced 6
    producer: produced 7
    producer: produced 8
    producer: produced 9
    consumer: sonsumed 8
    producer: produced 9
    producer: produced 10
    producer: stop produce
    consumer: sonsumed 9
    producer: produced 10
    producer: stop produce
    consumer: sonsumed 9
    producer: produced 10
    producer: stop produce
    consumer: sonsumed 9
    producer: produced 10
    producer: stop produce
    consumer: sonsumed 9
    producer: produced 10
    producer: stop produce
    consumer: sonsumed 9
    producer: produced 10
    producer: stop produce
    consumer: sonsumed 9
    producer: produced 10
    producer: stop produce
    consumer: sonsumed 9
    producer: produced 10
    consumer: sonsumed 9
    consumer: sonsumed 8
    consumer: sonsumed 7
    consumer: sonsumed 6
    consumer: sonsumed 5
    consumer: sonsumed 4
    consumer: sonsumed 3
    consumer: sonsumed 2
    consumer: sonsumed 1
    consumer: sonsumed 0
    

    分析:观察可以发现,生产者生产满了10个就会进入等待wait(),直到消费者通知notify()
    为了观察消费者消费到0个的情况,可以将生产者和消费者循环的等待时间做调整

    使用事件实现线程同步

    查看Event类的源码,发现内部使用的是条件Condition类实现,并传入了Lock锁。事件通过对内部的标志_flag进行管理来实现线程同步
    使用set()方法可以将标志设为True
    使用clear()方法将其重置为False
    使用wait()方法阻塞线程

    # Event类的初始化源码
        def __init__(self):
            self._cond = Condition(Lock())
            self._flag = False
    

    注意! 并不存在set()clear()放在某段代码的前后,来锁住某段资源

    示例:
    (还是采用生产者和消费者的关系编写,生产者做完工作,调用set()设置_flag标志,并通知wait()等待的消费者线程运行;再使用clear()清除_flag标志,以便后面的消费者能正确的进入等待。这个过程类似于事件的触发)

    #!usr/bin/env python
    #coding=utf-8
    
    import threading
    import time
    import random
    
    event = threading.Event()   # 创建事件
    items = [1,2,3] # 设置个初值,便于后面注释event.clear()后的测试(可以发现,消费者不等待了)
    
    def producer():
        global items
        print('producer: start')
        items.append(random.randint(0,100))
        event.set()     # 将内部标志_flag设为True,并通知所有等待的线程(类似于触发事件)
        print('producer: notify')
        event.clear()   # 将内部标志_flag设为False(只有清除了_flag,消费者下一次的wait()操作才会正常进入等待)(clear()操作也可以交给消费者调用,不过为了简化消费者的操作,让消费者只需要等待通知即可)
        print('producer: end')
        
    def consumer():
        global items
        print('consumer: waiting')
        event.wait()    # 等待(等待生产者通知,根据_flag标志判断是否进入等待)
        print('consumer:', items.pop())
    
    
    # producer_loop()和consumer_loop()用来多次循环运行
    def producer_loop():
        for i in range(3):
            time.sleep(1)
            producer()
    
    def consumer_loop():
        while True: # 消费者有wait()等待,就不用线程休眠了,以免错过生产者的set()通知
            consumer()
    
    
    t1 = threading.Thread(target=producer_loop)
    t2 = threading.Thread(target=consumer_loop)
    
    t1.start()
    t2.start()
    

    输出

    consumer: waiting
    producer: start
    producer: notify
    consumer: 57
    consumer: waiting
    producer: end
    producer: start
    producer: notify
    consumer: 10
    producer: end
    consumer: waiting
    producer: start
    producer: notify
    consumer: 24
    producer: end
    consumer: waiting
    

    如果注释掉代码中的# event.clear()一行,会出现如下输出

    consumer: waiting
    producer: start
    producer: notify
    consumer: 19
    producer: end
    consumer: waiting
    consumer: 3
    consumer: waiting
    consumer: 2
    consumer: waiting
    consumer: 1
    consumer: waiting
    Exception in thread Thread-2:
    Traceback (most recent call last):
      File "D:appPythonPython37lib	hreading.py", line 917, in _bootstrap_inner
        self.run()
      File "D:appPythonPython37lib	hreading.py", line 865, in run
        self._target(*self._args, **self._kwargs)
      File "g:/tmp/code.py", line 35, in consumer_loop
        consumer()
      File "g:/tmp/code.py", line 24, in consumer
        print('consumer:', items.pop())
    IndexError: pop from empty list
    

    producer: start

    producer: notify
    producer: end
    producer: start
    producer: notify
    producer: end
    

    分析:会发现,消费者没有等待(打印出的consumer: waiting,只是给自己的提示,实际没有等待),直接进行了列表的pop()操作,直到后面列表为空再弹出时报错为止


    附:其它

    使用with语句 6

    (由于没有理解部分,这一部分基本就是书中原文)

    with: 是Python 2.5中引入的。当有两个相关的操作需要对一个代码块承兑执行时,with语句的作用就彰显出来了。它可以再自动精确的分配或释放资源 (因此也被称为上下文管理器)。如线程模块中,使用到acquire()release()方法的地方,都可以采用with语句块,如下:

    • Lock
    • RLock
    • 条件
    • 信号量 (感觉这用了with就不太灵活了)

    示例:
    (会测试上述列表中的with操作)

    #!usr/bin/env python
    #coding=utf-8
    
    import threading
    
    def func_with(statement):
        with statement: # 会自动进行acquire()和release()
            print('//todo1')
    
    def func_not_with(statement):
        statement.acquire()
        try:    # 为了避免出现异常,导致没有release()释放
            print('//todo2')
        finally:
            statement.release()
    
    lock = threading.Lock()
    rlock = threading.RLock()
    condition = threading.Condition()
    sem = threading.Semaphore(1)    # 采用with,需要初始至少有一个信号量值(因为需要先acquire())
    
    li = [lock, rlock, condition, sem]
    
    for statement in li:
        t1 = threading.Thread(target=func_with, args=(statement,))
        t2 = threading.Thread(target=func_not_with, args=(statement,))
    
        t1.start()
        t2.start()
    

    使用队列实现线程通信

    虽然python线程模块提供了很多同步原语 信号量条件事件,但有时候,在使用场景中,可能采用队列模块会是个最佳选择。它使得线程编程变得更加容易和安全

    使用队列Queue的方式处理,尽管它不属于threading模块,但查看其源码,发现队列的功能实现有用到threading模块

    # Queue的初始化源码
        def __init__(self, maxsize=0):
            self.maxsize = maxsize
            self._init(maxsize)
    
            # mutex must be held whenever the queue is mutating.  All methods
            # that acquire mutex must release it before returning.  mutex
            # is shared between the three conditions, so acquiring and
            # releasing the conditions also acquires and releases mutex.
            self.mutex = threading.Lock()
    
            # Notify not_empty whenever an item is added to the queue; a
            # thread waiting to get is notified then.
            self.not_empty = threading.Condition(self.mutex)
    
            # Notify not_full whenever an item is removed from the queue;
            # a thread waiting to put is notified then.
            self.not_full = threading.Condition(self.mutex)
    
            # Notify all_tasks_done whenever the number of unfinished tasks
            # drops to zero; thread waiting to join() is notified to resume
            self.all_tasks_done = threading.Condition(self.mutex)
            self.unfinished_tasks = 0
    

    Queue会用到如下方法:

    • put(): 添加一个项目到队列
    • get(): 从队列中取出一个项目
    • task_done(): 每处理完一个项目,需要调用该方法
    • join(): 阻塞线程,等待全部的任务完成

    附:查看源代码,分析可得Queue内部的操作是这样的:(这段可以不用看)

    • 调用put() -> with队列满(条件锁not_full) -> 能阻塞?(参数block) & 能超时?(参数timeout) & 需要等待?(条件锁not_full.wait()) -> 添加数据(内部方法_put()) -> 任务计数加1(内部计数变量unfinished_tasks += 1) -> 发起队列非空的通知(条件锁not_empty.notify())
    • 调用get() -> with队列空(条件锁not_empty) -> 能阻塞?(参数block) & 能超时?(参数timeout) & 需要等待?(条件锁not_empty.wait()) -> 取出数据(内部方法_get()) -> 发起队列非满的通知(条件锁not_full.notify()) ( 注意! get()put()操作相比并没有对任务计数操作,需要通过后面task_done()完成任务方法来减少任务数)
    • *调用task_done() -> with任务完成(条件锁all_tasks_done) -> 判断全部任务完成了?是的话发起通知(条件锁all_tasks_done.notify_all()) -> 任务计数减1(内部计数变量unfinished_tasks -= 1) *
    • 调用join() -> with任务完成(条件锁all_tasks_done) -> 循环未完成的任务计数变量(内部计数变量unfinished_tasks) -> 还有没完成的任务,等待(条件锁all_tasks_done.wait() | 全部完成,退出循环,解除线程阻塞)

    示例:

    #!usr/bin/env python
    #coding=utf-8
    
    import threading
    from queue import Queue
    import time
    import random
    
    queue = Queue()
    
    def producer():
        for i in range(5):
            item = random.randint(0, 100)
            queue.put(item)
            print('producer:', 'put', item)
            time.sleep(1)
    
    def consumer():
        while True:
            item = queue.get()
            print('consumer:', 'get', item)
            queue.task_done()
    
    t1 = threading.Thread(target=producer)
    t2 = threading.Thread(target=consumer)
    
    t1.start()
    t2.start()
    

    输出

    producer: put 47
    consumer: get 47
    producer: put 71
    consumer: get 71
    producer: put 99
    consumer: get 99
    producer: put 30
    consumer: get 30
    producer: put 75
    consumer: get 75
    

    1.后台线程: 后台线程在主线程停止后就直接停止运行。他们资源(如打开的文件,数据库事务等)可能不会被正确的释放。如果你想要你的线程优美的停止,让他们不要变为后台和使用一个合适的信号机制如事件Event

    2.nvoke: 官方文档中使用invoke一词,我并没有更好的翻译,因为其它语言中invoke反射是一种技术手段,但Google的翻译中,将此解释为调用 (是我想多了)

    3.加入线程: 加入线程?不理解如何能加入线程,并且官方文档说会死锁。实测,创建2个线程互相join(),虽然陷入死循环,但并没抛出错误 // TODO: 不知理解有偏差没有

    4.死锁: 多个对象,互持对方所需资源的锁,导致都无法访问

    5.blocking: 锁的acquire()方法的参数,有点难理解,文中所写是结合官方文档和实测的结果描述所得。不过一般,我们都不用改变它的默认值 // TODO: 没有从源码分析(找不到源码)

    6.参考书籍:《Python并行编程手册》

  • 相关阅读:
    前端 fetch 通信
    编写高质量的JavaScript代码(一)
    Redis学习笔记1-Redis的介绍和认识
    gitignore不起作用解决的方法
    【我的面经】说说简历的细节——软件开发岗位
    菜鸟的mongoDB学习---(七)MongoDB 备份(mongodump)与恢复(mongorerstore)
    HDU 4927 Series 1
    树状数组求第K小值 (spoj227 Ordering the Soldiers &amp;&amp; hdu2852 KiKi&#39;s K-Number)
    git和SVN的差别
    KVM-Introduce
  • 原文地址:https://www.cnblogs.com/maplesnow/p/12044360.html
Copyright © 2011-2022 走看看