zoukankan      html  css  js  c++  java
  • Python 线程池模块threadpool 、 concurrent.futures 的 ThreadPoolExecutor

    一、threadpool   基本用法

    pip install threadpool   

    pool = ThreadPool(poolsize)  
    requests = makeRequests(some_callable, list_of_args, callback)  
    [pool.putRequest(req) for req in requests]  
    pool.wait()

    第一行定义了一个线程池,表示最多可以创建poolsize这么多线程;

    第二行是调用makeRequests创建了要开启多线程的函数,以及函数相关参数和回调函数,其中回调函数可以不写,default是无,也就是说makeRequests只需要2个参数就可以运行;

    第三行使用列表生成式代替for循环,是将所有要运行多线程的请求扔进线程池,

    [pool.putRequest(req) for req in requests]等同于

      for req in requests:  

         pool.putRequest(req) 

    第四行是等待所有的线程完成工作后退出。

    二、代码实例

    要处理的函数,只需要一个传参:

    import time
    import threadpool  
    def sayhello(str):
        print "Hello ",str
        time.sleep(2)
    
    name_list =['xiaozi','aa','bb','cc']
    start_time = time.time()
    pool = threadpool.ThreadPool(10) 
    requests = threadpool.makeRequests(sayhello, name_list) 
    [pool.putRequest(req) for req in requests] 
    pool.wait() 
    print '%d second'% (time.time()-start_time)

    要处理的函数,只需要N个传参:

    方式一:---参数列表元素需要用元组,([args,...], None)

    import time
    import threadpool
    def sayhello(a, b, c):
        print("Hello ",a, b, c)
        time.sleep(2)
    
    def call_back():
        print('call_back...........')
    
    
    name_list = [([1,2,3], None), ([4,5,6], None) ]
    
    start_time = time.time()
    pool = threadpool.ThreadPool(10)
    requests = threadpool.makeRequests(sayhello, name_list)
    [pool.putRequest(req) for req in requests]
    pool.wait()
    print('%d second'% (time.time()-start_time))

    Hello 1 2 3
    Hello 4 5 6
    2 second

    Process finished with exit code 0

    方式二:---参数列表元素需要用元组,(None, {'key':'value', .......})

    import time
    import threadpool
    def sayhello(a, b, c):
        print("Hello ",a, b, c)
        time.sleep(2)
    
    def call_back():
        print('call_back...........')
    
    
    # name_list = [([1,2,3], None), ([4,5,6], None) ]
    name_list = [(None, {'a':1,'b':2,'c':3}), (None, {'a':4,'b':5, 'c':6}) ]
    
    start_time = time.time()
    pool = threadpool.ThreadPool(10)
    requests = threadpool.makeRequests(sayhello, name_list)
    [pool.putRequest(req) for req in requests]
    pool.wait()
    print('%d second'% (time.time()-start_time))

    concurrent.futures 的ThreadPoolExecutor (线程池) 

    https://www.jianshu.com/p/6d6e4f745c27

    从Python3.2开始,标准库为我们提供了 concurrent.futures 模块,它提供了 ThreadPoolExecutor (线程池)和ProcessPoolExecutor (进程池)两个类。

    相比 threading 等模块,该模块通过 submit 返回的是一个 future 对象,它是一个未来可期的对象,通过它可以获悉线程的状态主线程(或进程)中可以获取某一个线程(进程)执行的状态或者某一个任务执行的状态及返回值:

    1. 主线程可以获取某一个线程(或者任务的)的状态,以及返回值。
    2. 当一个线程完成的时候,主线程能够立即知道。
    3. 让多线程和多进程的编码接口一致。

    线程池的基本使用

    #!/usr/bin/env python3
    # -*- coding:utf-8 -*-
    #  @Time: 2020/11/21 17:55
    #  @Author:zhangmingda
    #  @File: ThreadPoolExecutor_study.py
    #  @Software: PyCharm
    #  Description:
    
    from  concurrent.futures import ThreadPoolExecutor
    import time
    
    task_args_list = [('zhangsan', 2),('lishi',3), ('wangwu', 4)]
    def task(name, seconds):
        print('% sleep %s seconds start...' % (name, seconds))
        time.sleep(seconds)
        print('% sleep %s seconds done' % (name, seconds))
        return '%s task done' % name
    
    with ThreadPoolExecutor(max_workers=5) as t:
        # [ t.submit(task, *arg) for  arg in task_args_list]
        task1 = t.submit(task, '张三', 1)
        task2 = t.submit(task, '李四', 2)
        task3 = t.submit(task, '王五', 2)
        task4 = t.submit(task, '赵柳', 3)
        print(task1.done())
        print(task2.done())
        print(task3.done())
        print(task4.done())
        time.sleep(2)
        print(task1.done())
        print(task2.done())
        print(task3.done())
        print(task4.done())
    
        print(task1.result())
        print(task2.result())
        print(task3.result())
        print(task4.result())

    • 使用 with 语句 ,通过 ThreadPoolExecutor 构造实例,同时传入 max_workers 参数来设置线程池中最多能同时运行的线程数目。

    • 使用 submit 函数来提交线程需要执行的任务到线程池中,并返回该任务的句柄(类似于文件、画图),注意 submit() 不是阻塞的,而是立即返回。

    • 通过使用 done() 方法判断该任务是否结束。上面的例子可以看出,提交任务后立即判断任务状态,显示四个任务都未完成。在延时2.5后,task1 和 task2 执行完毕,task3 仍在执行中。

    • 使用 result() 方法可以获取任务的返回值 【注意result 是阻塞的会阻塞主线程】

    主要方法:

    wait

    wait(fs, timeout=None, return_when=ALL_COMPLETED)
    wait 接受三个参数:
    fs: 表示需要执行的序列
    timeout: 等待的最大时间,如果超过这个时间即使线程未执行完成也将返回
    return_when:表示wait返回结果的条件,默认为 ALL_COMPLETED 全部执行完成再返回;可指定FIRST_COMPLETED 当第一个执行完就退出阻塞
    from  concurrent.futures import ThreadPoolExecutor,wait,FIRST_COMPLETED, ALL_COMPLETED
    import time
    
    task_args_list = [('zhangsan', 1),('lishi',2), ('wangwu', 3)]
    task_list = []
    
    
    def task(name, seconds):
        print('% sleep %s seconds start...' % (name, seconds))
        time.sleep(seconds)
        print('% sleep %s seconds done' % (name, seconds))
        return '%s task done' % name
    
    with ThreadPoolExecutor(max_workers=5) as t:
        [task_list.append(t.submit(task, *arg)) for  arg in task_args_list]
        wait(task_list, return_when=FIRST_COMPLETED) # 等了一秒
        print('all_task_submit_complete! and First task complete!')
        print(wait(task_list,timeout=1.5)) # 又等了1.5秒,合计等了2.5秒

    as_completed 

    上面虽提供了判断任务是否结束的方法,但是不能在主线程中一直判断。最好的方法是当某个任务结束了,就给主线程返回结果,而不是一直判断每个任务是否结束。

    concurrent.futures 中 的 as_completed() 就是这样一个方法,当子线程中的任务执行完后,直接用 result() 获取返回结果
    task_args_list = [('zhangsan', 1),('lishi',3), ('wangwu', 2)]
    task_list = []
    
    
    def task(name, seconds):
        print('%s sleep %s seconds start...' % (name, seconds))
        time.sleep(seconds)
        return '%s sleep %s seconds done' % (name, seconds)
    
    with ThreadPoolExecutor(max_workers=5) as t:
        [task_list.append(t.submit(task, *arg)) for  arg in task_args_list]
        [print(future.result()) for future in as_completed(task_list)]
    
        print('All Task Done!!!!!!!!!')

    map

    map(fn, *iterables, timeout=None)

    fn: 第一个参数 fn 是需要线程执行的函数;
    iterables:第二个参数接受一个可迭代对象;
    timeout: 第三个参数 timeout 跟 wait() 的 timeout 一样,但由于 map 是返回线程执行的结果,如果 timeout小于线程执行时间会抛异常 TimeoutError。

    用法如下:

    def spider(page):
        time.sleep(page)
        return page
    
    start = time.time()
    executor = ThreadPoolExecutor(max_workers=4)
    
    i = 1
    for result in executor.map(spider, [2, 3, 1, 4]):
        print("task{}:{}".format(i, result))
        i += 1
    
    #  运行结果
    task1:2
    task2:3
    task3:1
    task4:4

    使用 map 方法,无需提前使用 submit 方法,map 方法与 python 高阶函数 map 的含义相同,都是将序列中的每个元素都执行同一个函数。

    上面的代码对列表中的每个元素都执行 spider() 函数,并分配各线程池。

    可以看到执行结果与上面的 as_completed() 方法的结果不同,输出顺序和列表的顺序相同,就算 1s 的任务先执行完成,也会先打印前面提交的任务返回的结果。






  • 相关阅读:
    Linux下安装nginx
    使用Nginx搭建集群
    怎样解决虚拟机中多台机器之间的相互通信问题??
    怎样安装vmtools
    常用的机器学习&数据挖掘知识(点)领域链接
    sparkan安装链接
    matlab中怎样将散点用光滑曲线连接起来??
    matlab中怎样画出散点图,将这些散点连接成线??
    matlab中怎样计算两个集合的差集?-setdiff函数
    MATLAB中在一个三维矩阵中如何提取出一个二维矩阵,使用permute
  • 原文地址:https://www.cnblogs.com/zhangmingda/p/14016307.html
Copyright © 2011-2022 走看看