zoukankan      html  css  js  c++  java
  • Python 多线程、多进程 (二)之 多线程、同步、通信

    Python 多线程、多进程 (一)之 源码执行流程、GIL
    Python 多线程、多进程 (二)之 多线程、同步、通信
    Python 多线程、多进程 (三)之 线程进程对比、多线程

    一、python多线程

    对于I/O操作的时候,进程与线程的性能差别不大,甚至由于线程更轻量级,性能更高。这里的I/O包括网络I/O和文件I/O

    1、实例

    假如利用socket发送http请求,也就是网络I/O。爬取列表网页中的写href链接,然后获取href链接之后,在爬去链接的网页详情。
    如果不适用多线程的话,程序串行的执行,结果就是要先等待列表网页获取所有的href的链接之后,才可以逐个的爬去href链接所指的网页详情,这就使得等待时间很长。
    如果使用多线程编程,线程A执行第一个列表网页程序,遇到I/O操作,GIL释放,当获取到第一个href链接之后,线程B就自动的去获取href链接所指的网页详情。

    2、多线程实现

    使用sleep模拟网络I/IO

    # test3.py
    
    import time
    import threading
    
    def get_detail_html(url):
        print("get detail html started")
        time.sleep(2)
        print("get detail html end")
    
    def get_detail_url(url):
        print("get detail url started")
        time.sleep(4)
        print("get detail url end")
    
    if  __name__ == "__main__":
    
        # 函数方法 arg 为函数参数
        thread1 = threading.Thread(target=get_detail_html, args=("",))
        thread2 = threading.Thread(target=get_detail_url, args=("",))
    
        start_time = time.time()
    
        # 子线程1,2开始
        thread1.start()  
        thread2.start()
    
        print ("last time: {}".format(time.time()-start_time))
    
    # 执行结果
    get detail html started
    get detail url started
    last time: 0.0019958019256591797  # 忽略为0
    get detail html end
    get detail url end
    

    按照上面线程并行执行的逻辑应该是打印时间为2秒,但是结果却为0。
    任何进程默认就会启动一个线程,该线程称为主线程,主线程又可以启动新的线程。上面的thread1与thread2就是主线程启动的两个新的线程,那么在两个子线程启动之后,主线程中其余的程序段print函数也在并行执行,所以时间为0。当两个子线程运行完毕之后,主线程退出,进程关闭,程序运行结束。才会打印出get detail html end,get detail url end。

    3、守护线程

    那么如何使得主线程退出的时候子线程也退出。或者说,主线程推出的时候kill掉子线程?

    <1>、将子线程设置成守护线程

    # test4.py
    
    import time
    import threading
    
    def get_detail_html(url):
        print("get detail html started")
        time.sleep(2)
        print("get detail html end")
    
    
    def get_detail_url(url):
        print("get detail url started")
        time.sleep(4)
        print("get detail url end")
    
    if  __name__ == "__main__":
        # 函数方法 arg 为函数参数
        thread1 = threading.Thread(target=get_detail_html, args=("",))
        thread2 = threading.Thread(target=get_detail_url, args=("",))
    
        thread1.setDaemon(True)
        thread2.setDaemon(True)
        # 将两个线程设置为守护线程,即主线程退出,这两个子线程也退出,kill
    
        start_time = time.time()
         # 子程开始
        thread1.start() 
        thread2.start()
    
        #当主线程退出的时候, 子线程kill掉
        print ("last time: {}".format(time.time()-start_time))
    
    # 输出
    get detail html started
    get detail url started
    last time: 0.0
    

    将两个线程设置为守护线程,即主线程退出,这两个守护线程也退出。打印结果中执行到print之后直接程序结束。

    由于两个线程的时间不相同,那么两者有什么区别呢

    <2>、先将thread1设置为守护线程

    # test5.py
    
    import time
    import threading
    
    def get_detail_html(url):
        print("get detail html started")
        time.sleep(2)
        print("get detail html end")
    
    
    def get_detail_url(url):
        print("get detail url started")
        time.sleep(4)
        print("get detail url end")
    
    
    if  __name__ == "__main__":
        # 函数方法 arg 为函数参数
        thread1 = threading.Thread(target=get_detail_html, args=("",))
        thread2 = threading.Thread(target=get_detail_url, args=("",))
    
        thread1.setDaemon(True)  # 只将thread设置为守护线程
        # thread2.setDaemon(True)
    
        start_time = time.time()
    
        thread1.start()  
        thread2.start()
    
        #当主线程退出的时候, 子线程kill掉
        print ("last time: {}".format(time.time()-start_time))
    
    # 结果 
    get detail html started
    get detail url started
    last time: 0.000997781753540039
    get detail html end
    get detail url end
    

    只将thread1设置为守护线程之后,由于thread2的sleep时间为4秒,所以主线程仍会等待thread2执行结束之后才退出,而thread1由于时间为2秒,所以也会打印。

    <3>、先将thread2设置为守护线程

    # test6.py
    
    import time
    import threading
    
    def get_detail_html(url):
        print("get detail html started")
        time.sleep(2)
        print("get detail html end")
    
    
    def get_detail_url(url):
        print("get detail url started")
        time.sleep(4)
        print("get detail url end")
    
    
    if  __name__ == "__main__":
    
        # 函数方法 arg 为函数参数
        thread1 = threading.Thread(target=get_detail_html, args=("",))
        thread2 = threading.Thread(target=get_detail_url, args=("",))
    
        # thread1.setDaemon(True)
        thread2.setDaemon(True)
    
        start_time = time.time()
    
        thread1.start()  
        thread2.start()
    
        print ("last time: {}".format(time.time()-start_time))
    
    # 输出
    get detail html started
    get detail url started
    last time: 0.0029969215393066406
    get detail html end
    

    由于只将thread2设置为守护线程,print函数执行结束的时候会首先kill掉thread2线程。但是由于thread1线程还未结束,程序仍会等待两秒输出get detail html end才结束。

    4、线程阻塞

    上面说了如何在主线程结束的时候,直接kill掉子线程。那么如何使子线程执行结束才执行主线程,就是阻塞主进程。

    <1>、结束两个子线程

    # test7.py
    
    import time
    import threading
    
    def get_detail_html(url):
        print("get detail html started")
        time.sleep(2)
        print("get detail html end")
    
    def get_detail_url(url):
        print("get detail url started")
        time.sleep(4)
        print("get detail url end")
    
    if  __name__ == "__main__":
    
        # 函数方法 arg 为函数参数
        thread1 = threading.Thread(target=get_detail_html, args=("",))
        thread2 = threading.Thread(target=get_detail_url, args=("",))
    
        start_time = time.time()
    
        # 子线程开始
        thread1.start()
        thread2.start()
    
        # 子线程程结束
        thread1.join()
        thread2.join()
    
        #当主线程退出的时候, 子线程kill掉
        print ("last time: {}".format(time.time()-start_time))
    
    #输出
    get detail html started
    get detail url started
    get detail html end
    get detail url end
    last time: 4.001712799072266
    

    由于调用了两个thread的join方法,主线程阻塞,当子线程结束之后,print函数执行后主线程退出,程序结束。

    <2>、结束thread1线程

    # test8.py
    
    import time
    import threading
    
    def get_detail_html(url):
        print("get detail html started")
        time.sleep(2)
        print("get detail html end")
    
    
    def get_detail_url(url):
        print("get detail url started")
        time.sleep(4)
        print("get detail url end")
    
    if  __name__ == "__main__":
    
        # 函数方法 arg 为函数参数
        thread1 = threading.Thread(target=get_detail_html, args=("",))
        thread2 = threading.Thread(target=get_detail_url, args=("",))
    
        start_time = time.time()
    
        # 子线程开始
        thread1.start()
        thread2.start()
    
        # 1线程程结束
        thread1.join()
        # thread2.join()
    
        #当主线程退出的时候, 子线程kill掉
        print ("last time: {}".format(time.time()-start_time))
    
    # 输出
    get detail html started
    get detail url started
    get detail html end
    last time: 2.001251220703125
    get detail url end
    

    由于调用了thread1的join方法,阻塞主线程,thread1直接结束之后print打印时间,但是对另一个线程没有影响。所以在打印last time: 2.001251220703125时间,等待两秒打印get detail url end,主线程才会退出。

    <3>、结束thread2线程

    # test9.py
    
    import time
    import threading
    
    def get_detail_html(url):
        print("get detail html started")
        time.sleep(2)
        print("get detail html end")
    
    
    def get_detail_url(url):
        print("get detail url started")
        time.sleep(4)
        print("get detail url end")
    
    if  __name__ == "__main__":
    
        # 函数方法 arg 为函数参数
        thread1 = threading.Thread(target=get_detail_html, args=("",))
        thread2 = threading.Thread(target=get_detail_url, args=("",))
    
        start_time = time.time()
    
        # 子线程开始
        thread1.start()
        thread2.start()
    
        # 2线程程结束
        # thread1.join()
        thread2.join()
    
        #当主线程退出的时候, 子线程kill掉
        print ("last time: {}".format(time.time()-start_time))
    
    # 输出
    get detail html started
    get detail url started
    get detail html end
    get detail url end
    last time: 4.002287864685059
    

    由于thread2线程的sleep的时间为4秒,期间thread1已经执行完毕,所以打印时间为4秒。

    5、Thread类继承式创建

    同样的也可以使用类继承的方法创建线程实例,效果一样的

    # test10.py
    
    import time
    import threading
    
    class GetDetailHtml(threading.Thread):
        def __init__(self, name):
            super().__init__(name=name)
    
        def run(self):
            print("get detail html started")
            time.sleep(2)
            print("get detail html end")
    
    
    class GetDetailUrl(threading.Thread):
        def __init__(self, name):
            super().__init__(name=name)
    
        def run(self):
            print("get detail url started")
            time.sleep(4)
            print("get detail url end")
    
    if  __name__ == "__main__":
    
        # 类继承方法
        thread1 = GetDetailHtml("get_detail_html")
        thread2 = GetDetailUrl("get_detail_url")
        start_time = time.time()
    
        # 子线程开始
        thread1.start()
        thread2.start()
    
        # 子线程程结束
        thread1.join()
        thread2.join()
    
        #当主线程退出的时候, 子线程kill掉
        print ("last time: {}".format(time.time()-start_time))
    

    二、线程通信

    1、共享变量通信

    共享变量通信,是线程间通信最简单的方式,但也是最容易出问题的方式。以上面爬去页面和网页链接的实例进行扩展。在上面的实例中,因为要解决请求列表页面的时候网络时延问题,引入了多线程并行,边爬去列表页获取href,再爬取href指向的想起那个页面,下面将爬去的页面存入列表实现。

    # test11.py
    
    import threading
    import time
    
    detail_url_list = []  # 存储着爬取下来的href链接
    
    def get_detail_html(detail_url_list):  # 参数这里作为对全局变量的引用
        while True:
            # 使用while语句使得线程持续爬去
            if len(detail_url_list):
                url = detail_url_list.pop()
                print('get detail html start')
                time.sleep(2)
                print('get detail html end')
    
    
    def get_detail_url(detail_url_list):
        while True:
            # 使用while语句使得线程持续爬取
            print('get detail url start')
            time.sleep(4)
    
            for i in range(20):
                detail_url_list.append('http://www.xxxx.com/{}.html'.format(i))
            print('get detail end')
    
    
    if __name__ == "__main__":
        thread_detail_url = threading.Thread(target=get_detail_url, args=(detail_url_list,))
    
        for i in range(10):
            # 为了模拟多个线程并发,这里创建了十个子线程
            html_thread = threading.Thread(target=get_detail_html, args=(detail_url_list,))
            html_thread.start()
    
        start_time = time.time()
    
        print("last time: {}".format(time.time() - start_time))
    

    但是上面问题也会很明显,在GIL的示例中,total变量由于变量共享的缘故,没有按照预期的执行。而在上面的爬虫实例中,detail_url_list作为全局共享变量,pop操作,append操作,多个线程共用资源,都不是线程安全的操作,会出现问题。所以就必须给变量加上锁,保持安全性。为了摆脱这种问题,使用消息队列通信

    2、消息队列通信

    消息队列通信也就是使用Queue这个类来表示变量,从而达到线程安全,由于Queue这个类内部封装了deque,也就是python中的双端队列。双端对列本身就是安全界别很高的一种类型,实现线程间的安全操作。

    # test12.py
    
    #通过queue的方式进行线程间同步
    from queue import Queue
    import time
    import threading
    
    def get_detail_html(queue):
        #爬取文章详情页
        while True:
            url = queue.get()
            print("get detail html started")
            time.sleep(2)
            print("get detail html end")
    
    
    def get_detail_url(queue):
        # 爬取文章列表页
        while True:
            print("get detail url started")
            time.sleep(4)
            for i in range(20):
                queue.put("http://projectsedu.com/{id}".format(id=i))
            print("get detail url end")
    
    if  __name__ == "__main__":
        detail_url_queue = Queue(maxsize=1000)
    
        thread_detail_url = threading.Thread(target=get_detail_url, args=(detail_url_queue,))
        for i in range(10):
            html_thread = threading.Thread(target=get_detail_html, args=(detail_url_queue,))
            html_thread.start()
    
        start_time = time.time()
    
        # detail_url_queue.task_done()
        detail_url_queue.join()
    
        #当主线程退出的时候, 子线程kill掉
        print ("last time: {}".format(time.time()-start_time))
    

    使用了消息队列替代共享变量

    • Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
    • q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。没有参数时,q.put的个数大于队列数时,会一直阻塞住。
    • q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。没有参数时,q.get的个数大于队列数时,会一直阻塞住。
    • q.put_nowait()等价于q.put(block=False)队列满时再存也会抛异常
    • q.get_nowait()等价于q.get(block=False)队列为空取不出时会抛异常
    • q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
    • q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

    三、线程同步

    1、加锁

    在上面的第一个GIL示例中,由于GIL释放的缘故,多个线程共享变量,导致total的值不像预期那样为0的问题发生,也就是如何线程同步。最简单的方式就是加锁。加锁使得一个线程在占用资源的时候,别的线程都必须等待,只有当这个线程主动释放资源的时候,其他线程才能使用资源,也就是资源占用互斥。这样就可要保证共享变量的安全性。

    # test13.py
    
    from threading import Lock
    
    #在同一个线程里面,可以连续调用多次acquire, 一定要注意acquire的次数要和release的次数相等
    total = 0
    lock = Lock()
    
    def add():
        global lock
        global total
        for i in range(1000000):
            lock.acquire()   # 加锁
            total += 1
            lock.release()  # 释放锁
    
    
    def desc():
        global total
        global lock
        for i in range(1000000):
            lock.acquire()  # 加锁
            total -= 1
            lock.release()  # 释放锁
    
    import threading
    
    thread1 = threading.Thread(target=add)
    thread2 = threading.Thread(target=desc)
    
    thread1.start()
    thread2.start()
    
    thread1.join()
    thread2.join()
    
    print(total)
    
    # 输出
    在等待了一段时间后输出0
    0 # total的打印结果为0
    
    加锁的时候要保证加上锁执行完成之后,就要释放掉,不然会一直占用资源。
    
    

    加锁的结果使得在执行total-=1或者total+=1的赋值语句的时候,该赋值语句对应的多条字节码指令执行完之后,才会其他进程执行修改total值。该线程占用了锁,所以其他线程不能修改total值,只有当该释放了锁,其他线程才能修改total值,不会造成修改共享变量的冲突。这是加锁的好处,那么代价也十分明显
    加锁缺点:

    • 加锁性能
    • 死锁风险

    补充:另外自己加的锁使用户级别的与GIL不同。

    <1>、性能问题

    本来的多线程,由于加锁的缘故,首先是阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行。并且由于来回切换线程的缘故,程序性能变得低下
    将test2.py改成如下

    # test14.py
    
    total = 0
    
    def add():
        global total
        for i in range(1000000):
            total += 1
    def desc():
        global total
        for i in range(1000000):
            total -= 1
    
    import threading
    import time
    start_time = time.time()
    
    add()
    desc()
    
    print(total)
    
    print("last time: {}".format(time.time() - start_time))
    
    # 输出
    0
    last time: 0.314816951751709
    

    这是简单的单线程程序,持续时间为0.3秒。没有使用thread多线程

    下面使用threading多线程,并且加锁

    # test15.py
    
    from threading import Lock  
    
    total = 0
    lock = Lock()
    
    def add():
        global lock
        global total
        for i in range(1000000):
            lock.acquire()
            total += 1
            lock.release()
    
    
    def desc():
        global total
        global lock
        for i in range(1000000):
            lock.acquire()
            total -= 1
            lock.release()
    
    import threading
    import time
    
    start_time = time.time()
    
    thread1 = threading.Thread(target=add)
    thread2 = threading.Thread(target=desc)
    
    thread1.start()
    thread2.start()
    
    thread1.join()
    thread2.join()
    
    print(total)
    print("last time: {}".format(time.time() - start_time))
    
    # 输出
    0
    last time: 5.062084674835205
    

    使用了多线程,为了保证共享变量的安全性操作,线程同步,加锁导致类似单线程,程序的运行时间达到了5秒钟。可见线程之间的切换十分浪费时间。所以说,CPython的GIL本意是用来保护所有全局的解释器和环境状态变量的,如果去掉GIL,就需要更多的更细粒度的锁对解释器的众多全局状态进行保护。做过测试将GIL去掉,加入更细粒度的锁。但是实践检测对单线程来说,性能更低。

    <2>、死锁风险

    来看下面例子
    这里为了在一个线程中多次调用lock,使用可重入的锁Rlock对象
    Lock与Rlock区别
    RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。注意:如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐。

    # test15.py
    
    from threading import RLock  # 可重入的锁
    
    total = 0
    lock = RLock()
    
    def add():
        global lock
        global total
        for i in range(1000000):
            lock.acquire()
            lock.acquire()  # 这里加了两次锁
            total += 1
            lock.release()
    
    
    def desc():
        global total
        global lock
        for i in range(1000000):
            lock.acquire()
            total -= 1
            lock.release()
    
    import threading
    import time
    
    start_time = time.time()
    
    thread1 = threading.Thread(target=add)
    thread2 = threading.Thread(target=desc)
    
    thread1.start()
    thread2.start()
    
    thread1.join()
    thread2.join()
    
    print(total)
    print("last time: {}".format(time.time() - start_time))
    

    由于在add函数中加了两次锁lock.acquire(),结果就是线程永远都不获释放掉共享变量。一直占用资源,其他的线程请求资源没有结果,多个线程挂起,既不能执行,也无法结束,一直处于等待状态,造成死锁,只能靠操作系统强制终止。最终程序也没有任何结果输出。
    所以在同一个线程里面,可以连续调用多次acquire, 一定要注意acquire的次数要和release的次数相等

    还有就是,线程的相互等待,假如内存中又两中资源a和b,而线程A(a,b)和线程B(a,b)都申请资源。

    第一步
    线程A先申请a资源,线程B先申请b资源,因此没有问题

    第二步
    由于a,b均已被A,B占用,并且A申请b,B申请b,在位获得新的资源的时候两者都不会退出对现有资源的占用,这就造成了两个线程相互等待,并且这种等待会一直持续下去,造成死锁。

    2、线程复杂通信

    在上面看到线程进行通信的时候需要加锁,如果如何使用锁进行线程的对话功能,例如

    • 线程A:hello,你好啊
    • 线程B:你好
    • 线程A:吃饭了吗
    • 线程B:吃过了,你呢
    • 线程A:我也吃过了,咱们去搞PVM吧
    • 线程B:ok,走吧

    <1>、简单锁

    像上面的线程通信,如果使用简单的Rlock锁

    import threading
    
    
    class ThreadA(threading.Thread):
        def __init__(self, lock):
            super().__init__(name="线程A")
            self.lock = lock
    
        def run(self):
            self.lock.acquire()
            print("{} : hello, 你好 ".format(self.name))
            self.lock.release()
            self.lock.acquire()
            print("{} : 吃过饭了吗 ".format(self.name))
            self.lock.release()
            self.lock.acquire()
            print("{} : 我也吃过了,咱们去找PVM吧".format(self.name))
            self.lock.release()
    
    class ThreadB(threading.Thread):
        def __init__(self, lock):
            super().__init__(name="线程B")
            self.lock = lock
    
        def run(self):
            self.lock.acquire()
            print("{} : 你好 ".format(self.name))
            self.lock.release()
            self.lock.acquire()
            print("{} : 吃过了,你呢".format(self.name))
            self.lock.release()
            self.lock.acquire()
            print("{} : ok,走吧 ".format(self.name))
            self.lock.release()
    
    if __name__ == "__main__":
    
        lock = threading.RLock()
    
        a_thread = ThreadA(lock)
        b_thread = ThreadB(lock)
    
        a_thread.start()
        b_thread.start()
    
    # 输出
    线程A : hello, 你好 
    线程A : 吃过饭了吗 
    线程A : 我也吃过了,咱们去找PVM吧
    线程B : 你好 
    线程B : 吃过了,你呢
    线程B : ok,走吧 
    

    显然没有完成线程通信的基本功能。

    <2>、threading.Condition()

    解决方案:在线程复杂通信时使用threading.Condition(),可以把Condiftion理解为一把高级的琐,它提供了比Lock, RLock更高级的功能,允许我们能够控制复杂的线程同步问题。threadiong.Condition在内部维护一个琐对象(默认是RLock),可以在创建Condigtion对象的时候把琐对象作为参数传入。Condition也提供了acquire, release方法,其含义与琐的acquire, release方法一致,其实它只是简单的调用内部琐对象的对应的方法而已。Condition还提供wait方法、notify方法、notifyAll方法。这些方法只有在占用琐(acquire)之后才能调用,否则将会报RuntimeError异常。

    方法介绍

    • acquire()/release():获得/释放 Lock
    • wait([timeout]):线程挂起,直到收到一个notify通知或者超时(可选的,浮点数,单位是秒s)才会被唤醒继续运行。wait()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。调用wait()会释放Lock,直至该线程被Notify()、NotifyAll()或者超时线程又重新获得Lock.
    • notify(n=1):通知其他线程,那些挂起的线程接到这个通知之后会开始运行,默认是通知一个正等待该condition的线程,最多则唤醒n个等待的线程。notify()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。notify()不会主动释放Lock。
    • notifyAll(): 如果wait状态线程比较多,notifyAll的作用就是通知所有线程

    源码分析

    # 部分源码
    
    _PyRLock = _RLock
    
    
    class Condition:
    
        def __init__(self, lock=None):
            if lock is None:
                lock = RLock()
            self._lock = lock
            # Export the lock's acquire() and release() methods
            self.acquire = lock.acquire
            self.release = lock.release
    
        def __enter__(self):
            return self._lock.__enter__()
    
        def __exit__(self, *args):
            return self._lock.__exit__(*args)
    

    进入Condition这个类中查看源码发现,在默认的情况下,Condition是封装的锁对象是Rlock,另外Condition类实现了__enter__,__exit__两个特殊方法,由鸭子类型可知,说明可以像上下文管理器一样使用它。
    而在__enter__与__exit__两个特殊方法中分别调用了self.acquire()与self.release()两个方法,所以说不使用with上下文管理器的话也可以直接使用acquire()与release()两个方法进行加锁释放锁。

    解决实例

    class ThreadA(threading.Thread):
        def __init__(self, cond):
            super().__init__(name="线程A")
            self.cond = cond
    
        def run(self):
            with self.cond:
                print("{} : hello, 你好 ".format(self.name))  # 4
                self.cond.notify()  # 5
                self.cond.wait()  # 6
    
                print("{} : 吃过饭了吗 ".format(self.name))
                self.cond.notify()
                self.cond.wait()
    
                print("{} : 我也吃过了,咱们去找PVM吧".format(self.name))
                self.cond.notify()
                self.cond.wait()
    
    
    class ThreadB(threading.Thread):
        def __init__(self, cond):
            super().__init__(name="线程B")
            self.cond = cond
    
        def run(self):
            with self.cond:
                self.cond.wait()  # 2
                print("{} : 你好 ".format(self.name))  # 7
                self.cond.notify()
    
                self.cond.wait()
                print("{} : 吃过了,你呢".format(self.name))
                self.cond.notify()
    
                self.cond.wait()
                print("{} : ok,走吧 ".format(self.name))
                self.cond.notify()
    
    
    if __name__ == "__main__":
    
        cond = threading.Condition()
    
        b_thread = ThreadB(cond)
        a_thread = ThreadA(cond)
    
        b_thread.start()  # 1
        a_thread.start()  # 3
    
    # 输出结果
    
    线程A : hello, 你好 
    线程B : 你好 
    线程A : 吃过饭了吗 
    线程B : 吃过了,你呢
    线程A : 我也吃过了,咱们去找PVM吧
    线程B : ok,走吧 
    

    完成线程之间的复杂通信。
    这里需要注意的是:两个线程之间的开启先后顺序。b线程需要先于a线程开启。原因:
    1 先开启b线程
    2 wait方法会首先上一把锁,线程处于阻塞态
    3 开启a线程
    4 打印 线程A:hello,你好啊
    5 这个时候cond对象调用notify方法,会释放掉之前上的锁
    6 调用wait方法,为自己又上了一把锁
    7 由于notify方法已经打开了锁,或继续执行,打印 线程B:你好
    其实wait方法会维持一个锁,而这个锁只有notify方法才能打开。如果a线程先开启,则是调用了wait方法维持了一把锁,并没有其他的线程会调用notify方法释放这把锁。则最终只会输出 线程A : hello, 你好 ,而线程一直处于死锁状态。

    补充:Condition对象会维持两层锁,而不是两个锁,更不是简单的一个锁。在开启或者关闭上下文管理器对象的时候__enter__,__exit__方法会开启释放掉底层锁(直接使用acquire()与release()两个方法也行),这一层锁是一个。而在持续连续调用的wait和notify方法则是对第二层锁进行操作,而这一层所在Condition对象内部是封装到一个双端队列中,在每次调用wait的时候分配一把锁并放入到cond的等待队列中,等到notify方法的唤醒。可以进入Condition源码查看

    3、Semaphore(信号量)

    同时只有n个线程可以获得semaphore,即可以限制最大连接数为n),也就是线程最大并发量的控制。
    Semaphore管理一个内置的计数器,每当调用acquire()时内置计数器-1;调用release() 时内置计数器+1;计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
    信号量使得一个程序中有很多个线程,但是只有n多个线程获得信号量,处于运行态

    class HtmlSpider(threading.Thread):
        def __init__(self, url, sem):
            super().__init__()
            self.url = url
            self.sem = sem
    
        def run(self):
            time.sleep(2)
            print("got html text success, time is {}".format(time.ctime()))
            self.sem.release()
    
    class UrlProducer(threading.Thread):
        def __init__(self, sem):
            super().__init__()
            self.sem = sem
    
        def run(self):
            for i in range(20):
                self.sem.acquire()
                html_thread = HtmlSpider("https://baidu.com/{}".format(i), self.sem)
                html_thread.start()
    
    if __name__ == "__main__":
        sem = threading.Semaphore(4)  # 每次只有4个线程获取信号量
        url_producer = UrlProducer(sem)
        url_producer.start()
    

    在上面示例中,模拟爬虫,创建20个子线程爬取html页面,如果不是用信号量,二十条数据一次返回。使用信号量,使得每次只有4个线程运行。

    # 输出结果
    
    got html text success, time is Tue Nov 20 17:17:55 2018
    got html text success, time is Tue Nov 20 17:17:55 2018
    got html text success, time is Tue Nov 20 17:17:55 2018
    got html text success, time is Tue Nov 20 17:17:55 2018
    got html text success, time is Tue Nov 20 17:17:57 2018
    got html text success, time is Tue Nov 20 17:17:57 2018
    got html text success, time is Tue Nov 20 17:17:57 2018
    got html text success, time is Tue Nov 20 17:17:57 2018
    got html text success, time is Tue Nov 20 17:17:59 2018
    got html text success, time is Tue Nov 20 17:17:59 2018
    got html text success, time is Tue Nov 20 17:17:59 2018
    got html text success, time is Tue Nov 20 17:17:59 2018
    got html text success, time is Tue Nov 20 17:18:01 2018
    got html text success, time is Tue Nov 20 17:18:01 2018
    got html text success, time is Tue Nov 20 17:18:01 2018
    got html text success, time is Tue Nov 20 17:18:01 2018
    got html text success, time is Tue Nov 20 17:18:03 2018
    got html text success, time is Tue Nov 20 17:18:03 2018
    got html text success, time is Tue Nov 20 17:18:03 2018
    got html text success, time is Tue Nov 20 17:18:03 2018
    

    每个两秒打印一次结果,一次四条数据。总共二十个。

    4、线程池

    Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码,但是当项目达到一定的规模,频繁创建/销毁进程或者线程是非常消耗资源的,这个时候我们就要编写自己的线程池/进程池,以空间换时间。但从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的进一步抽象,对编写线程池/进程池提供了直接的支持。

    concurrent.futures模块的基础是Exectuor,Executor是一个抽象类,它不能被直接使用。但是它提供的两个子类ThreadPoolExecutor和ProcessPoolExecutor却是非常有用,顾名思义两者分别被用来创建线程池和进程池的代码。我们可以将相应的tasks直接放入线程池/进程池,不需要维护Queue来操心死锁的问题,线程池/进程池会自动帮我们调度。

    Future你可以把它理解为一个在未来完成的操作,这是异步编程的基础,传统编程模式下比如我们操作queue.get的时候,在等待返回结果之前会产生阻塞,cpu不能让出来做其他事情,而Future的引入帮助我们在等待的这段时间可以完成其他的操作。

    <1>、使用submit来操作线程池/进程池:

    from concurrent.futures import ThreadPoolExecutor
    import urllib.request
    URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
    def load_url(url):
        with urllib.request.urlopen(url, timeout=60) as conn:
            print('%r page is %d bytes' % (url, len(conn.read())))
    
    executor = ThreadPoolExecutor(max_workers=3)
    
    for url in URLS:
        future = executor.submit(load_url,url)
        print(future.done())
    
    print('主线程')
    
    # 运行结果:
    False
    False
    False
    主线程
    'https://www.baidu.com/' page is 227 bytes
    'https://github.com/' page is 75633 bytes
    'http://www.163.com' page is 703974 bytes
    

    根据运行结果,使用submit方法来往线程池中加入一个task,submit返回一个Future对象,对于Future对象可以简单地理解为一个在未来完成的操作。由于线程池异步提交了任务,主线程并不会等待线程池里创建的线程执行完毕,所以执行了print('主线程'),相应的线程池中创建的线程并没有执行完毕,故future.done()返回结果为False。

    <2>、 用map来操作线程池/进程池:

    from concurrent.futures import ThreadPoolExecutor
    import urllib.request
    URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
    def load_url(url):
        with urllib.request.urlopen(url, timeout=60) as conn:
            print('%r page is %d bytes' % (url, len(conn.read())))
    
    executor = ThreadPoolExecutor(max_workers=3)
    
    executor.map(load_url,URLS)
    
    print('主线程')
    # 结果
    主线程
    'https://www.baidu.com/' page is 227 bytes
    'https://github.com/' page is 75633 bytes
    'http://www.163.com' page is 703974 bytes
    

    从运行结果可以看出,map是按照URLS列表元素的顺序返回的,并且写出的代码更加简洁直观,可以根据具体的需求任选一种。

    <3>、wait

    wait方法接会返回一个tuple(元组),tuple中包含两个set(集合),一个是completed(已完成的)另外一个是uncompleted(未完成的)。使用wait方法的一个优势就是获得更大的自由度,它接收三个参数FIRST_COMPLETED, FIRST_EXCEPTION 和ALL_COMPLETE,默认设置为ALL_COMPLETED

    如果采用默认的ALL_COMPLETED,程序会阻塞直到线程池里面的所有任务都完成,再执行主线程:

    from concurrent.futures import ThreadPoolExecutor,wait,as_completed
    import urllib.request
    URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
    def load_url(url):
        with urllib.request.urlopen(url, timeout=60) as conn:
            print('%r page is %d bytes' % (url, len(conn.read())))
    
    executor = ThreadPoolExecutor(max_workers=3)
    
    f_list = []
    for url in URLS:
        future = executor.submit(load_url,url)
        f_list.append(future)
    print(wait(f_list))
    
    print('主线程')
    # 输出
    'https://www.baidu.com/' page is 227 bytes
    'https://github.com/' page is 75627 bytes
    'http://www.163.com' page is 703988 bytes
    DoneAndNotDoneFutures(done={<Future at 0x2ab6ea89d30 state=finished returned NoneType>, <Future at 0x2ab6ea89240 state=finished returned NoneType>, <Future at 0x2ab6e93f7b8 state=finished returned NoneType>}, not_done=set())
    主线程
    

    如果采用FIRST_COMPLETED参数,程序并不会等到线程池里面所有的任务都完成:

    from concurrent.futures import ThreadPoolExecutor,wait,as_completed
    import urllib.request
    URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
    def load_url(url):
        with urllib.request.urlopen(url, timeout=60) as conn:
            print('%r page is %d bytes' % (url, len(conn.read())))
    
    executor = ThreadPoolExecutor(max_workers=3)
    
    f_list = []
    for url in URLS:
        future = executor.submit(load_url,url)
        f_list.append(future)
    print(wait(f_list,return_when='FIRST_COMPLETED'))
    
    print('主线程')
    # 输出
    'https://www.baidu.com/' page is 227 bytes
    DoneAndNotDoneFutures(done={<Future at 0x2cd5581a240 state=finished returned NoneType>}, not_done={<Future at 0x2cd5581ad30 state=running>, <Future at 0x2cd556cf7f0 state=running>})
    主线程
    'http://www.163.com' page is 703991 bytes
    'https://github.com/' page is 75625 bytes
    

    <4>、回调函数

    import requests
    import time
    from concurrent.futures import ThreadPoolExecutor
    
    
    def get(url):
        print('GET {}'.format(url))
        response = requests.get(url)
        time.sleep(2)
        if response.status_code == 200:  # 200代表状态:下载成功了
            return {'url': url, 'content': response.text}
    
    
    def parse(res):
        print('%s parse res is %s' % (res['url'], len(res['content'])))
        return '%s parse res is %s' % (res['url'], len(res['content']))
    
    
    def save(res):
        print('save', res)
    
    
    def task(res):
        res = res.result()
        par_res = parse(res)
        save(par_res)
    
    
    if __name__ == '__main__':
        urls = [
            'http://www.cnblogs.com',
            'https://www.python.org',
            'https://www.openstack.org',
        ]
    
        pool = ThreadPoolExecutor(2)
        for i in urls:
            pool.submit(get, i).add_done_callback(task)
            '''
            这里的回调函数拿到的是一个对象。得
            先把返回的res得到一个结果。即在前面加上一个res.result() 
            谁好了谁去掉回调函数
            回调函数也是一种编程思想。不仅在线程池用,在进程池也用
            '''
        pool.shutdown()  # 相当于进程池里的close和join
    
    # 输出
    GET http://www.cnblogs.com
    GET https://www.python.org
    https://www.python.org parse res is 50114
    save https://www.python.org parse res is 50114
    GET https://www.openstack.org
    https://www.openstack.org parse res is 63253
    save https://www.openstack.org parse res is 63253
    http://www.cnblogs.com parse res is 40382
    save http://www.cnblogs.com parse res is 40382
    

    上一篇:Python 多线程、多进程 (一)之 源码执行流程、GIL

    下一篇:Python 多线程、多进程 (三)之 线程进程对比、多线程

  • 相关阅读:
    PredictionIO+Universal Recommender快速开发部署推荐引擎的问题总结(1)
    SpringJDBC的JdbcTemplate在MySQL5.7下不支持子查询的问题
    POP3接收邮件
    发送邮件
    电子邮件介绍
    线程优先级队列
    线程同步
    threading模块
    _thread模块
    使用线程
  • 原文地址:https://www.cnblogs.com/welan/p/10009623.html
Copyright © 2011-2022 走看看