zoukankan      html  css  js  c++  java
  • ThreadPoolExecutor多线程异步执行

    https://www.cnblogs.com/pdev/p/10685093.html

    1. 以下为第一种,函数级的异步执行:

    import time
    from concurrent.futures import ThreadPoolExecutor
    
    
    def task1_fn(url):
        time.sleep(10)
        return (url + " FINISHED")
    
    
    def task2_fn(url):
        time.sleep(3)
        return (url + " FINISHED")
    
    
    def RunPool():
        pool = ThreadPoolExecutor(1)  # 启动一个线程池
        task1 = pool.submit(task1_fn, "CPU1")  # 在另外的线程中运行RunBenchmark() 
        task2 = pool.submit(task2_fn, "CPU2")  # 在另外的线程中运行RunBenchmark() 
        cnt = 0
        while True:
            if cnt == 10:
                task1.cancel()
    
            print("task1:%s" % task1.done())
            print("task2:%s" % task2.done())
            if task1.done():
                print('task1 is over')
    
            if task1.done() and task2.done():
                print('all is over.')
                break
            time.sleep(1)
            cnt += 2
    
        print('task1 result:%s' % task1.result())
        print('task2 result:%s' % task2.result())
    
        print("bye")
    
    
    if __name__ == '__main__':
        RunPool()

    2. 类级函数的的异步执行,添加了线程强制中断 pool.shutdown

    import time
    from concurrent.futures import ThreadPoolExecutor
    
    class A():
        def __init__(self):
            pass
    
        def test_ret(self):
            return 'hello world'
    
        def task1_fn(self, url):
            time.sleep(10)
            return (url + " FINISHED" + self.test_ret())
    
        def task2_fn(self, url):
            time.sleep(3)
            return (url + " FINISHED" + self.test_ret())
    
    
    def RunPool():
        a = A()
        pool = ThreadPoolExecutor(2)    # 启动一个线程池
        task1 = pool.submit(a.task1_fn, "CPU1")  # 在另外的线程中运行RunBenchmark() 
        task2 = pool.submit(a.task2_fn, "CPU2")  # 在另外的线程中运行RunBenchmark() 
    
        cnt = 0
        while True:
            if cnt == 10:
                task1.cancel()
    
            print("task1:%s" % task1.done())
            print("task2:%s" % task2.done())
            if task1.done():
                print('task1 is over')
            else:
                task1.cancel()
                pool.shutdown(wait=False)
                print('task1 result AAAAAAAA %s' % task1.result())
                break
    
            # if task1.done() and task2.done():
            #     print('all is over.')
            #     break
            time.sleep(1)
            cnt += 2
    
        print('task1 result:%s' % task1.result())
        print('task2 result:%s' % task2.result())
    
        print("bye")
    
    if __name__ == '__main__':
          RunPool()

    3. 第一个任务一旦完成,则强制终止线程

    应用场景:如果某个任务一直处于执行中,无法退出,此时就需要强制退出,而强制退出一般需要重新线程run方法,但 模块 from concurrent.futures import ThreadPoolExecutor 重写run方法较麻烦,故采用另一种方式

    wait(f_lst, return_when='FIRST_COMPLETED')
    f_lst中存放的是提交到线程池中的线程对象列表
    import time
    import inspect
    import ctypes
    from concurrent.futures import ThreadPoolExecutor, wait
    
    class A():
        def __init__(self):
            pass
    
        def test_ret(self):
            return 'hello world'
    
        def task1_fn(self, url):
            time.sleep(200)
            return (url + " FINISHED " + self.test_ret() + ' ' + str(var))
    
        def task2_fn(self, url):
            time.sleep(5)
            return (url + " FINISHED " + self.test_ret())
    
    
    def RunPool():
        a = A()
        pool = ThreadPoolExecutor(2)  # 启动一个线程池
        task1 = pool.submit(a.task1_fn, "CPU1")  # 在另外的线程中运行RunBenchmark() 
        task2 = pool.submit(a.task2_fn, "CPU2")  # 在另外的线程中运行RunBenchmark() 
    
        f_lst = [task1, task2]
        wait(f_lst, return_when='FIRST_COMPLETED')
    
        try:
            print('task status, task1: %s, task2: %s' % (task1.done(), task2.done()))
            # print('task1 result:%s' % task1.result())
            print('task2 result:%s' % task2.result())
        except Exception as e:
            print('error: %s' % e)
        print("bye")
    
    
    if __name__ == '__main__':
        var = 123
        RunPool()

    注意:只有先完成的任务才可以使用 task.result() 方法,否则会一直卡着,等待 结果返回,而此时线程已被杀死,无法返回结果

    4. 关键点说明:

    1) task.cancel()当线程未运行之前可以取消,但是线程在线程池中启动后,无法再通过此方式取消
    2) pool.shutdown()函数入参 wait=True表示中断线程前,等待直到线程执行完成才会中断, wait=False 表示 线程未执行完成,强制中断

    3)传参方式和位置参数一样,不用使用元组

    5. 适用业务:

    多进程下的多线程,某个任务可能需要等待很长时间,此时就需要中断此任务,或者去执行其他的任务,如果要求线性处理,中断超时任务,就需要用此方法
    针对上述情况下的单个进程,可以使用异步进程池 multiprocessing的apply_async方式,主进程和子进程同时执行,互不影响。

    https://www.cnblogs.com/pdev/p/10685093.html
    **1. 以下为第一种,函数级的异步执行:**`import timefrom concurrent.futures import ThreadPoolExecutor

    import timefrom concurrent.futures import ThreadPoolExecutor
    def task1_fn(url):
        time.sleep(10)
        return (url + " FINISHED")
    
    def task2_fn(url):
        time.sleep(3)
        return (url + " FINISHED")
    
    def RunPool():
        pool = ThreadPoolExecutor(1)  # 启动一个线程池
        task1 = pool.submit(task1_fn, "CPU1")  # 在另外的线程中运行RunBenchmark() 
        task2 = pool.submit(task2_fn, "CPU2")  # 在另外的线程中运行RunBenchmark() 
        cnt = 0
        while True:
            if cnt == 10:
                task1.cancel()
        print("task1:%s" % task1.done())
        print("task2:%s" % task2.done())
        if task1.done():
            print('task1 is over')
        if task1.done() and task2.done():
            print('all is over.')
            break
            time.sleep(1)
            cnt += 2
        print('task1 result:%s' % task1.result())
        print('task2 result:%s' % task2.result())
        print("bye")
    
    if __name__ == '__main__':      
        RunPool()


    **2. 类级函数的的异步执行,添加了线程强制中断 pool.shutdown**

    import timefrom concurrent.futures import ThreadPoolExecutor
    class A():
        def __init__(self):
            pass
        def test_ret(self):
            return 'hello world'
        def task1_fn(self, url):
            time.sleep(10)
            return (url + " FINISHED" + self.test_ret())
        def task2_fn(self, url):
            time.sleep(3)
            return (url + " FINISHED" + self.test_ret())
    
    def RunPool():
        a = A()
        pool = ThreadPoolExecutor(2)    # 启动一个线程池
        task1 = pool.submit(a.task1_fn, "CPU1")  # 在另外的线程中运行RunBenchmark() 
        task2 = pool.submit(a.task2_fn, "CPU2")  # 在另外的线程中运行RunBenchmark() 
        cnt = 0
        while True:
            if cnt == 10:
                task1.cancel()
        print("task1:%s" % task1.done())
        print("task2:%s" % task2.done())
        if task1.done():
            print('task1 is over')
        else:
            task1.cancel()
            pool.shutdown(wait=False)
            print('task1 result AAAAAAAA %s' % task1.result())
            break
        # if task1.done() and task2.done():
        #     print('all is over.')
        #     break
        time.sleep(1)
        cnt += 2
        print('task1 result:%s' % task1.result())
        print('task2 result:%s' % task2.result())
        print("bye")
    
    if __name__ == '__main__':      
        RunPool()

    **> 关键点说明:**1. task.cancel()当线程未运行之前可以取消,但是线程在线程池中启动后,无法再通过此方式取消2. pool.shutdown()函数入参 wait=True表示中断线程前,等待直到线程执行完成才会中断, wait=False 表示 线程未执行完成,强制中断
    **> 适用业务:**      多进程下的多线程,某个任务可能需要等待很长时间,此时就需要中断此任务,或者去执行其他的任务,如果要求线性处理,中断超时任务,就需要用此方法      针对上述情况下的单个进程,可以使用异步进程池 multiprocessing的apply_async方式,主进程和子进程同时执行,互不影响。

    总结:

    #shutdown(wait=True) 
    相当于进程池的pool.close()+pool.join()操作
    wait=True,等待池内所有任务执行完毕回收完资源后才继续
    wait=False,立即返回,并不会等待池内的任务执行完毕
    但不管wait参数为何值,整个程序都会等到所有任务执行完毕
    submit和map必须在shutdown之前
  • 相关阅读:
    PPT图片
    饥荒Steam相关mod代码
    Ubuntu20.04更换阿里源 source.list文件
    中断处理与进程调度的区别与联系
    原语和系统调用的区别
    立下个flag,这个月底之前要发一个深度学习入门系列的文章
    conda安装skimage
    机器学习入门(三)
    zip安装的MySQL绑定my.ini配置文件
    Anaconda配置安装
  • 原文地址:https://www.cnblogs.com/zhanghaibin16/p/13322076.html
Copyright © 2011-2022 走看看