zoukankan      html  css  js  c++  java
  • Python:多线程编程

    1.IO编程

    IO(input/output)。凡是用到数据交换的地方,都会涉及io编程,例如磁盘,网络的数据传输。在IO编程中,stream(流)是一种重要的概念,分为输入流(input stream)和输出流(output stream)。可以把流季节为一个水管,数据相当于水管中的水,但是只能单向流动,所以数据传输过程中需要假设两个水管,一个负责输入,一个负责输出,这样读写就可以实现同步。

    2.GIL:global interpreter lock全局解释器锁

    在Python中,GIL导致同一时刻只有一个线程进入解释器。

    计算机有些操作不依赖于CPU,I/O操作几乎不利用,IO密集型不占用CPU,多线程可完成

    计算操作都要用到CPU,适合多进程

    • IO密集型任务或函数,可以用多线程
    • 计算密集型任务函数,sorry,改C
    • 计算密集型程序适合C语言多线程,I/O密集型适合脚本语言开发的多线程

    线程和进程的区别:

    • 线程共享创建它的进程的地址空间; 进程有自己的地址空间。
    • 线程可以直接访问其进程的数据段; 进程有自己的父进程的数据段副本。
    • 线程可以直接与其进程的其他线程进行通信; 进程必须使用进程间通信来与兄弟进程进行通信。
    • 新线程很容易创建; 新进程需要父进程的重复。
    • 线程可以对相同进程的线程进行相当的控制;
    • 流程只能控制子进程。
    • 对主线程的更改(取消,优先级更改等)可能会影响进程的其他线程的行为; 对父进程的更改不会影响子进程。

    进程优点:同时利用多个CPU,同时进行多个操作;缺点:耗费资源(需要开辟多个内存空间)

    线程优点:共享多个资源,IO操作,创造并发操作;缺点:抢占资源

    3.threading模块

    threading模块对象
    Thread    表示一个线程的执行的对象
    Lock    锁原语对象(跟thread模块里的锁对象相同)
    RLock    可重入锁对象。使单线程可以再次获得已经获得了的锁(递归锁定)
    Condition条件变量对象能让一个线程停下来,等待其他线程满足了某个“条件”。如状态的改变或值的改变
    Event    通用的条件变量。多个线程可以等待某个时间的发生,在事件发生后,所有的线程都被激活
    Semaphore    为等待锁的线程提供一个类似“等候室”的结构
    BoundedSemaphore    与Semaphore类似,只是它不允许超过初始值
    Timer    与thread类似,只是它要等待一段时间后才开始运行
    Barrier    创建一个障碍,必须达到指定数量的线程后才可以继续

    3.1线程的两种调用方式

    #直接调用
    import time
    import threading
    
    begin=time.time()
    def foo(n):
        print('start-foo%s'%n)
        time.sleep(1)
        print('end foo')
    
    def bar(n):
        print('start-bar%s'%n)
        time.sleep(2)
        print('end bar')
    
    # foo(2)
    # bar(2)
    t1=threading.Thread(target=foo,args=(1,))
    t2=threading.Thread(target=bar,args=(1,))
    t1.start()
    t2.start()
    end=time.time()
    print('_________main_________')
    print(t1.getName())
    print(t2.getName())
    print(end-begin)
    #继承式调用
    import threading
    import time
    class MyThread(threading.Thread):
        def __init__(self, num):
            threading.Thread.__init__(self)
            self.num = num
            
        def run(self):  #定义每个线程要运行的函数
            print("running on number:%s" % self.num)
            time.sleep(4)
    
    if __name__ == '__main__':
        t1 = MyThread(111)
        t2 = MyThread(222)
        t1.start()
        t2.start()

    threading的Thread类主要的运行对象

    函数
    start()                开始线程的执行
    run()                定义线程的功能的函数(一般会被子类重写)
    join(timeout=None)    程序挂起,直到线程结束;如果给了timeout,则最多阻塞timeout秒
    getName()            返回线程的名字
    setName(name)        设置线程的名字
    isAlive()            布尔标志,表示这个线程是否还在运行中
    isDaemon()            返回线程的daemon标志
    setDaemon(daemonic)    把线程的daemon标志设为daemonic(一定要在调用start()函数前调用)
    threading.currentThread(): 返回当前的线程变量。
    threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。

    3.2 join and setDaemon

    import threading
    from time import ctime,sleep
    
    def music(func):
        for i in range(2):
            print ("Begin listening to %s. %s" %(func,ctime()))
            sleep(2)
            print("end listening %s"%ctime())
    
    def move(func):
        for i in range(2):
            print ("Begin watching at the %s! %s" %(func,ctime()))
            sleep(5)
            print('end watching %s'%ctime())
    
    threads = []
    t1 = threading.Thread(target=music,args=('七里香',))
    threads.append(t1)
    t2 = threading.Thread(target=move,args=('阿甘正传',))
    threads.append(t2)
    
    # join在子线程完成运行之前,这个子线程的父线程将一直被阻塞。
    if __name__ == '__main__':
        t2.setDaemon(True)
        for t in threads:
            t.setDaemon(True)#守护线程,必须在start() 方法调用之前设置, 如果不设置为守护线程程序会被无限挂起。
            t.start()
            # t.join()#跟线程没有关系了,像正常一样运行,没意义
        t1.join()
        # t2.join()########考虑这三种join位置下的结果?
        print(threading.current_thread())
        print(threading.active_count())
        print ("all over %s" %ctime())

    join()会等到线程结束,或者在给了timeout参数的时候,等到超时为止。使用join()比使用一个等待锁释放的无限循环清楚一些(也称“自旋锁”)。

    join()的另一个比较重要的方法是它可以完全不用调用。一旦线程启动后,就会一直运行,直到线程的函数结束,退出为止。
    如果你的主线程除了等线程结束外,还有其他的事情要做(如处理或等待其他的客户请求),那就不用调用join(),只有在你要等待线程结束的时候才要调用join()。

    setDaemon(True):将线程声明为守护线程,必须在start() 方法调用之前设置, 如果不设置为守护线程程序会被无限挂起。这个方法基本和join是相反的。当我们 在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成想退出时,会检验子线程是否完成。如 果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是 只要主线程完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以 用setDaemon方法

    3.3 同步锁Lock

    由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以,出现了线程锁 - 同一时刻允许一个线程执行操作。

    import time
    import threading
    def addNum():
        global num #在每个线程中都获取这个全局变量
        num-=1
        temp=num
        print('--get num:',num )
        print('看下结果是0')
        print('再看下结果')
        time.sleep(0.001)
        num =temp-1 #对此公共变量进行-1操作
    
    num = 100  #设定一个共享变量
    thread_list = []
    for i in range(100):
        t = threading.Thread(target=addNum)
        t.start()
        thread_list.append(t)
    
    for t in thread_list: #等待所有线程执行完毕
        t.join()
    
    print('final num:', num )

    注意:

    1:  why num-=1没问题呢?这是因为动作太快(完成这个动作在切换的时间内)

    2: if sleep(1),现象会更明显,100个线程每一个一定都没有执行完就进行了切换,我们说过sleep就等效于IO阻塞,1s之内不会再切换回来,所以最后的结果一定是99.

    多个线程都在同时操作同一个共享资源,所以造成了资源破坏,怎么办呢?

    有同学会想用join呗,但join会把整个线程给停住,造成了串行,失去了多线程的意义,而我们只需要把计算(涉及到操作公共数据)的时候串行执行。

    我们可以通过同步锁来解决这种问题

    import time
    import threading
    begin=time.time()
    def addNum():
        global num #在每个线程中都获取这个全局变量
        # num-=1
        lock.acquire()
        temp=num
        # print('--get num:',num )
        time.sleep(0.0001)
        num =temp-1 #对此公共变量进行-1操作
        lock.release()
    num = 100  #设定一个共享变量
    thread_list = []
    lock=threading.Lock()
    for i in range(100):
        t = threading.Thread(target=addNum)
        t.start()
        thread_list.append(t)
    for t in thread_list: #等待所有线程执行完毕
        t.join()
    end=time.time()
    print('final num:', num )
    print(end-begin)

    同步锁与GIL的关系?

    Python的线程在GIL的控制之下,线程之间,对整个python解释器,对python提供的C API的访问都是互斥的,这可以看作是Python内核级的互斥机制。但是这种互斥是我们不能控制的,我们还需要另外一种可控的互斥机制———用户级互斥。内核级通过互斥保护了内核的共享资源,同样,用户级互斥保护了用户程序中的共享资源。

    GIL 的作用是:对于一个解释器,只能有一个thread在执行bytecode。所以每时每刻只有一条bytecode在被执行一个thread。GIL保证了bytecode 这层面上是thread safe的。
    但是如果你有个操作比如 x += 1,这个操作需要多个bytecodes操作,在执行这个操作的多条bytecodes期间的时候可能中途就换thread了,这样就出现了data races的情况了。

    Lock(指令锁)是可用的最低级的同步指令。Lock处于锁定状态时,不被特定的线程拥有。Lock包含两种状态——锁定和非锁定,
    以及两个基本的方法。可以认为Lock有一个锁定池,当线程请求锁定时,将线程至于池中,直到获得锁定后出池。
    池中的线程处于状态图中的同步阻塞状态。
    构造方法:Lock()
    实例方法:
    acquire([timeout]): 使线程进入同步阻塞状态,尝试获得锁定。
    release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。

    3.4 线程死锁和递归锁

    在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,因为系统判断这部分资源都正在使用,所有这两个线程在无外力作用下将一直等待下去。

    RLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。RLock使用了“拥有的线程”和“递归等级”的概念,
    处于锁定状态时,RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire(),释放锁时需要调用release()相同次数。
    可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用 acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态。
    构造方法:RLock()
    实例方法:acquire([timeout])/release(): 跟Lock差不多。

    #__author: greg
    #date: 2017/9/17 21:38
    # 线程死锁和递归锁
    # Thread deadlocks and recursive locks
    import threading,time
    class myThread(threading.Thread):
        def doA(self):
            lockA.acquire()
            print(self.name,"gotlockA",time.ctime())
            time.sleep(3)
            lockB.acquire()
            print(self.name,"gotlockB",time.ctime())
            lockB.release()
            lockA.release()
    
        def doB(self):
            lockB.acquire()
            print(self.name,"gotlockB",time.ctime())
            time.sleep(2)
            lockA.acquire()
            print(self.name,"gotlockA",time.ctime())
            lockA.release()
            lockB.release()
    
        def run(self):
            self.doA()
            self.doB()
    
    if __name__=="__main__":
        lockA=threading.Lock()
        lockB=threading.Lock()
        threads=[]
        for i in range(5):
            threads.append(myThread())
        for t in threads:
            t.start()
        for t in threads:
            t.join()#等待线程结束
    View Code

    解决办法:使用递归锁,将 lockA=threading.Lock() lockB=threading.Lock()

    改为Rlock
    为了支持在同一线程中多次请求同一资源,python提供了“可重入锁”:threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有的acquire都被release,其他的线程才能获得资源。

    #__author: greg
    #date: 2017/9/17 21:49
    import threading,time
    class myThread(threading.Thread):
        def doA(self):
            lock.acquire()
            print(self.name,"gotlockA",time.ctime())
            # time.sleep(2)
            lock.acquire()
            print(self.name,"gotlockB",time.ctime())
            lock.release()
            lock.release()
    
        def doB(self):
            lock.acquire()
            print(self.name,"gotlockC",time.ctime())
            # time.sleep(3)
            lock.acquire()
            print(self.name,"gotlockD",time.ctime())
            lock.release()
            lock.release()
    
        def run(self):
            self.doA()
            self.doB()
    
    if __name__=="__main__":
        # lockA=threading.Lock()
        # lockB=threading.Lock()
        lock = threading.RLock()
        threads=[]
        for i in range(5):
            threads.append(myThread())
        for t in threads:
            t.start()
        for t in threads:
            t.join()#等待线程结束
    View Code

    3.5 单线程和多线程执行对比

    import threading
    from time import ctime, sleep
    
    class MyThread(threading.Thread):
        def __init__(self, func, args, name='', verb=False):
            threading.Thread.__init__(self)
            self.name = name
            self.func = func
            self.args = args
            self.verb = verb
    
        def getResult(self):
            return self.res
    
        def run(self):
            if self.verb:
                print('starting', self.name, 'at:', ctime())
            self.res = self.func(*self.args)
            if self.verb:
                print(self.name, 'finished at:', ctime())
    
    def fib(x):
        sleep(0.005)
        if x < 2: return 1
        return (fib(x-2) + fib(x-1))
    
    def fac(x):
        sleep(0.1)
        if x < 2: return 1
        return (x * fac(x-1))
    
    def sum(x):
        sleep(0.1)
        if x < 2: return 1
        return (x + sum(x-1))
    
    funcs = (fib, fac, sum)
    n = 12
    
    def main():
        nfuncs = range(len(funcs))
        print('*** SINGLE THREAD')
        for i in nfuncs:
            print('starting', funcs[i].__name__,'at:', ctime())
            print(funcs[i](n))
            print(funcs[i].__name__, 'finished at:',ctime())
        print('
    *** MULTIPLE THREADS')
        threads = []
        for i in nfuncs:
            t = MyThread(funcs[i], (n,),
            funcs[i].__name__)
            threads.append(t)
        for i in nfuncs:
            threads[i].start()
        for i in nfuncs:
            threads[i].join()
            print(threads[i].getResult())
        print('all DONE')
    
    if __name__ == '__main__':
        main()
    View Code

    3.6 条件变量同步(Condition)

          有一类线程需要满足条件之后才能够继续执行,Python提供了threading.Condition 对象用于条件变量线程的支持,它除了能提供RLock()或Lock()的方法外,还提供了 wait()、notify()、notifyAll()方法。

     lock_con=threading.Condition([Lock/Rlock]): 锁是可选选项,不传人锁,对象自动创建一个RLock()。Condition(条件变量)通常与一个锁关联。需要在多个Contidion中共享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例。可以认为,除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于状态图中的等待阻塞状态,直到另一个线程调用notify()/notifyAll()通知;得到通知后线程进入锁定池等待锁定。

    构造方法: Condition([lock/rlock])
    实例方法:
    acquire([timeout])/release(): 调用关联的锁的相应方法。
    wait([timeout]): 调用这个方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。
    notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池);
    其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
    notifyAll(): 调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。
    调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
    wait():条件不满足时调用,线程会释放锁并进入等待阻塞;
    notify():条件创造后调用,通知等待池激活一个线程;
    notifyAll():条件创造后调用,通知等待池激活所有线程。

    #__author: greg
    #date: 2017/9/18 11:21
    import threading
    import time
    product = None# 商品
    con = threading.Condition()# 条件变量
    def produce():# 生产者方法
        global product
        if con.acquire():
            while True:
                if product is None:
                    print('produce...')
                    product = 'anything'
                    con.notify() # 通知消费者,商品已经生产
                con.wait()# 等待通知
                time.sleep(2)
    
    def consume():# 消费者方法
        global product
        if con.acquire():
            while True:
                if product is not None:
                    print('consume...')
                    product = None
                    con.notify()  # 通知生产者,商品已经没了
                con.wait()  # 等待通知
                time.sleep(2)
    
    t1 = threading.Thread(target=produce)
    t2 = threading.Thread(target=consume)
    t2.start()
    t1.start()
    View Code

    3.7 事件event

    python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

    事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

    • clear:将“Flag”设置为False
    • set:将“Flag”设置为True

    event.isSet():返回event的状态值;

    event.wait():如果 event.isSet()==False将阻塞线程;

    event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;

    event.clear():恢复event的状态值为False。

    #__author: greg
    #date: 2017/9/18 19:37
    import threading
    import time
    
    event = threading.Event()
    
    def func():
        # 等待事件,进入等待阻塞状态
        print('%s wait for event...' % threading.currentThread().getName())
        event.wait()
        # 收到事件后进入运行状态
        print('%s recv event.' % threading.currentThread().getName())
    
    t1 = threading.Thread(target=func)
    t2 = threading.Thread(target=func)
    t1.start()
    t2.start()
    time.sleep(2)
    print('MainThread set event.')# 发送事件通知
    event.set()

    threading基于Java的线程模型设计。锁(Lock)和条件变量(Condition)在Java中是对象的基本行为(每一个对象都自带了锁和条件变量),
    而在Python中则是独立的对象。Python Thread提供了Java Thread的行为的子集;没有优先级、线程组,线程也不能被停止、暂停、恢复、中断。
    Java Thread中的部分被Python实现了的静态方法在threading中以模块方法的形式提供。
    threading 模块提供的常用方法:
    threading.currentThread(): 返回当前的线程变量。
    threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
    threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

    Event(事件)是最简单的线程通信机制之一:一个线程通知事件,其他线程等待事件。
    Event内置了一个初始为False的标志,当调用set()时设为True,调用clear()时重置为 False
    wait()将阻塞线程至等待阻塞状态。Event其实就是一个简化版的 Condition
    Event没有锁,无法使线程进入同步阻塞状态。
    构造方法:
    Event()实例方法
    isSet(): 当内置标志为True时返回True。
    set(): 将标志设为True,并通知所有处于等待阻塞状态的线程恢复运行状态。
    clear(): 将标志设为False。
    wait([timeout]): 如果标志为True将立即返回,否则阻塞线程至等待阻塞状态,等待其他线程调用set()。
    3.8信号量(Semaphore)

    Semaphore/BoundedSemaphore

    Semaphore(信号量)是计算机科学史上最古老的同步指令之一。Semaphore管理一个内置的计数器,

    每当调用acquire()时-1,调用release() 时+1。计数器不能小于0;当计数器为0时,
    acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。
    基于这个特点,Semaphore经常用来同步一些有“访客上限”的对象,比如连接池。
    BoundedSemaphore 与Semaphore的唯一区别在于前者将在调用release()时检查计数器的值是否超过了计数器的初始值,
    如果超过了将抛出一个异常。
    构造方法: Semaphore(value=1): value是计数器的初始值。
    实例方法:
    acquire([timeout]): 请求Semaphore。如果计数器为0,将阻塞线程至同步阻塞状态;否则将计数器-1并立即返回。
    release(): 释放Semaphore,将计数器+1,如果使用BoundedSemaphore,还将进行释放次数检查。
    release()方法不检查线程是否已获得 Semaphore。

    
    
    #__author: greg
    #date: 2017/9/18 10:02
    import threading,time
    class myThread(threading.Thread):
        def run(self):
            if semaphore.acquire():
                print(self.name)
                time.sleep(3)
                semaphore.release()
    
    if __name__=="__main__":
        semaphore=threading.Semaphore(10)
        thrs=[]
        for i in range(100):
            thrs.append(myThread())
        for t in thrs:
            t.start()
    #__author: greg
    #date: 2017/9/18 11:28
    import threading
    import time
    semaphore = threading.Semaphore(2)# 计数器初值为2,比较4
    # semaphore = threading.BoundedSemaphore(2)
    def func():
        # 请求Semaphore,成功后计数器-1;计数器为0时阻塞
        print('%s acquire semaphore...' % threading.currentThread().getName())
        if semaphore.acquire():
            print('%s get semaphore' % threading.currentThread().getName())
            time.sleep(4)
            print('%s release semaphore' % threading.currentThread().getName())# 释放Semaphore,计数器+1
            semaphore.release()
    
    t1 = threading.Thread(target=func)
    t2 = threading.Thread(target=func)
    t3 = threading.Thread(target=func)
    t4 = threading.Thread(target=func)
    t1.start()
    t2.start()
    t3.start()
    t4.start()
    time.sleep(2)
    # 没有获得semaphore的主线程也可以调用release
    # 若使用BoundedSemaphore,t4释放semaphore时将抛出异常
    print('MainThread release semaphore without acquire')
    semaphore.release()
    实例2
  • 相关阅读:
    QuickTest Professional对web网站进行测试后没有生成脚本信息解决办法
    如何使用loadrunner进行web网站性能测试
    spring boot架构浅谈
    spring cloud架构
    crontab误删操作的恢复与防范
    linux命令重定向>、>>、 1>、 2>、 1>>、 2>>、 <
    redis原理及使用
    PHP三种字符串界定符的区别(单引号,双引号,<<<)
    php代码加密|PHP源码加密——实现方法
    java一键搭建新项目(地址)
  • 原文地址:https://www.cnblogs.com/ningxin18/p/7890735.html
Copyright © 2011-2022 走看看