zoukankan      html  css  js  c++  java
  • 线程池

    之前用的multiprocessing.Process和threading.Thread都是一个线程只能执行一个任务,如果想用一个线程执行多个任务,该怎么办呢?

    from threading import Thread
    
    def func():
        print('执行一个线程')
    
    if __name__ == '__main__':
        t = Thread(target=func)
        t.start()
        t.start()
    
    >>
    执行一个线程
    Traceback (most recent call last):
      File "H:/exercise/并发/进程池与线程池/demo.py", line 9, in <module>
        t.start()
      File "C:python36lib	hreading.py", line 842, in start
        raise RuntimeError("threads can only be started once")
    RuntimeError: threads can only be started once

    可重复利用的线程

    from threading import Thread
    import queue
    import time
    
    class MyThread(Thread):
        def __init__(self):
            super().__init__()
            self.queue = queue.Queue() #实例一个队列
            self.daemon = True # 主线程结束,子线程也当结束
            self.start() # 实例的时候就启动线程
    
        def run(self):  # 不断获取并执行任务
            while True:
                func,args,kwargs = self.queue.get()  # 等待获取任务,没有任务就阻塞
                try:
                    func(*args,**kwargs)  # 执行任务
                finally:
                    self.queue.task_done()  # 告诉queue 这次任务执行完毕 队列计数器会减1
    
        def apply_async(self,func,args=(),kwargs={}):
            self.queue.put((func,args,kwargs)) # 把任务添加到队列中 队列计数器会加1
    
        def join(self):
            self.queue.join() # 若队列还有任务,则会阻塞,若队列没有任务了,不会阻塞 队列根据计数器判断是否还有任务
    
    def func1():
        time.sleep(2)
        print('任务1')
    def func2():
        time.sleep(2)
        print('任务2')
    def func3():
        time.sleep(2)
        print('任务3')
    
    if __name__ == '__main__':
        thread = MyThread()
        thread.apply_async(func1) # 添加任务到线程队列
        thread.apply_async(func2)
        thread.apply_async(func3)
        thread.join()  # 如果没有这个 会因为主线程结束,子线程不执行,有了这个,当队列任务没执行完之前,将阻塞再这里
        print('执行完毕!')
    
    >>
    任务1
    任务2
    任务3
    执行完毕!

    线程池

    线程池的简单实现

    方法一:

    import threading
    import queue
    import time
    
    class MyThreadPool:
        def __init__(self,n):
            self.queue = queue.Queue()
            for i in range(n):
                threading.Thread(target=self.worker,daemon=True).start()
    
        def worker(self): #不断获取并执行任务
            while True:
                func,args,kwargs = self.queue.get() # 获取任务 没有任务就阻塞
                func(*args,**kwargs)  #执行任务
                self.queue.task_done() #每次队列加元素,计数器+1,每次task_done() 计数器-1,如果计数器为0,认为队列任务结束
    
        def apply_async(self,func,args=(),kwargs={}): # 把任务放到队列
            self.queue.put((func,args,kwargs))
    
        def join(self):
            self.queue.join() #queue.join() 队列里有元素,会阻塞,直到队列没有元素,才会不阻塞
    
    def func1():
        time.sleep(2)
        print('任务1')
    def func2():
        time.sleep(2)
        print('任务2')
    def func3():
        time.sleep(2)
        print('任务3')
    
    def func4(name):
        time.sleep(2)
        print(name)
    
    if __name__ == '__main__':
        pool = MyThreadPool(3)
        pool.apply_async(func1)
        pool.apply_async(func2)
        pool.apply_async(func3)
        pool.apply_async(func4,args=('Jack',))
        pool.join() # 阻塞直至所有任务执行完,如果没有,会因为主线程结束而不执行子线程
    
    >>
    两秒后打印:
    任务3
    任务1
    任务2
    再过两秒打印:
    Jack

    方法二:面向对象的方式

    import threading
    import queue
    import time
    
    
    class MyThread(threading.Thread):
        def __init__(self,queue):
            super().__init__()
            self.queue = queue
            self.daemon = True  # 主线程结束,子线程也应该结束
            self.start()
    
        def run(self):  # 不断获取并执行任务
            while True:
                func,args,kwargs = self.queue.get() # 获取任务 没有任务就阻塞
                func(*args,**kwargs)   # 执行任务
                self.queue.task_done()  # 每次队列加元素,计数器+1,每次task_done() 计数器-1,如果计数器为0,认为队列任务结束
    
    
    class Threadpool():
        def __init__(self,n):
            self.queue = queue.Queue()
            for i in range(n):
                MyThread(self.queue)  # 开启n个线程,队列等待接受任务
    
        def apply_async(self,func,args=(),kwargs={}): # 把任务放到队列
            self.queue.put((func,args,kwargs))
    
        def join(self):
            self.queue.join() #queue.join() 队列里有元素,会阻塞,直到队列没有元素,才会不阻塞
    
    def func1():
        time.sleep(2)
        print('任务1')
    def func2():
        time.sleep(2)
        print('任务2')
    def func3():
        time.sleep(2)
        print('任务3')
    
    if __name__ == '__main__':
        pool = Threadpool(2)
        pool.apply_async(func1)
        pool.apply_async(func2)
        pool.apply_async(func3)
        pool.join() # 阻塞直至所有任务执行完,如果没有join,会因为主线程结束而不执行子线程
        print('任务执行完毕')
    
    >>
    2s后打印
    任务2
    任务1
    再过2s打印
    任务3
    任务执行完毕

    注意:线程是由解释器调度的,我们无法控制线程的执行顺序。

    python自带的线程池

    使用进程池

    from multiprocessing import Pool #进程池
    import time
    def func(n):
        time.sleep(2)
        print(n)
    
    if __name__ == '__main__':
        pool = Pool(4) # 实例进程池,不传参数默认是CPU核数
        for i in range(10):
            pool.apply_async(func,args=(i,)) #把任务提交到队列
        pool.close() #关闭进程池,不让再提交任务
        pool.join() #等待队列任务都完成,规定在join()之前先要close

    使用线程池

    from multiprocessing.pool import ThreadPool 
    import time
    def func(n):
        time.sleep(2)
        print(n)
    
    if __name__ == '__main__':
        pool = ThreadPool(4) # 实例线程池,不传参数默认是CPU核数
        for i in range(10):
            pool.apply_async(func,args=(i,)) #把任务提交到队列
        pool.close() #关闭线程池,不让再提交任务
        pool.join() #等待队列任务都完成,规定在join()之前先要close

    两个库的api基本一致,不过执行起来不一样,进程对计算密集型的操作比较拿手,可以调度多个CPU执行。线程是轻量级的进程,但是只是一个CPU执行,更适用于IO密集型操作。

    使用线程池来实现并发服务器

    import socket
    from multiprocessing.pool import ThreadPool
    
    def worker(conn,addr):
        while True:
            data = conn.recv(1024)
            if data:
                print(data.decode())
                conn.send(data)
            else:
                conn.close()
                print('{}已关闭'.format(addr))
                break
    
    if __name__ == '__main__':
        pool = ThreadPool()
        sock = socket.socket()
        sock.bind(('',9999))
        sock.listen(5)
        print('开始监听!')
        while True:
            conn,addr = sock.accept()
            print('{}已连接'.format(addr))
            pool.apply_async(worker,args=(conn,addr))
  • 相关阅读:
    快速排序算法
    冒泡排序
    正则表达式
    博客园 自定义 个性主题优化 这是我迄今为止用过的最好的模板 silence
    Python 中 如何复制图片?如何更改图片存储路径? 在遇无数坑后的最全解答 百试百灵
    作业test
    day69
    day68
    day67
    day67test
  • 原文地址:https://www.cnblogs.com/woaixuexi9999/p/9332880.html
Copyright © 2011-2022 走看看