zoukankan      html  css  js  c++  java
  • 线程queue、线程进程池、异步回调机制

    1. 线程 queue

    queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

    queue 三种方法
    class queue.Queue(maxsize=0) #队列:先进先出

    import queue
    
    q=queue.Queue()
    q.put('first')
    q.put('second')
    q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())
    
    
    
    '''
    结果(先进先出):
    first
    second
    third
    '''
    

    class queue.LifoQueue(maxsize=0) #堆栈:last in fisrt out

    import queue
    
    q=queue.LifoQueue()
    q.put('first')
    q.put('second')
    q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())
    
    
    '''
    结果(后进先出):
    third
    second
    first
    '''
    

    class queue.PriorityQueue(maxsize=0) #优先级队列:存储数据时可设置优先级的队列

    import queue
    
    q=queue.PriorityQueue()
    #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
    q.put((20,'a'))
    q.put((10,'b'))
    q.put((30,'c'))
    
    print(q.get())
    print(q.get())
    print(q.get())
    
    
    
    '''
    结果(数字越小优先级越高,优先级高的优先出队):
    (10, 'b')
    (20, 'a')
    (30, 'c')
    '''
    

    其他属性介绍

    import queue  # 线程queue
    
    # q = queue.Queue(3.py)  # 线程q对象,先进先出队列queue,容量为 3个
    # 1.常规用法
    # q.put(1)
    # q.put(2)
    # q.put(3.py)
    # print(q.get())
    # print(q.get())
    # print(q.get())
    # 2.不等待用法 以及 设置超时时间
    # q.put(1)
    # q.put(1)
    # q.put(3.py)
    # # q.put(4, block=False)  # 等于q.put_nowait()  # 非阻塞状态 # raise queue.Full
    # q.put(4, timeout=3.py)  # 设置超时时间
    #
    # print(q.get())
    # print(q.get())
    # print(q.get())  # q.get  与q.put 类似用法
    

    2.进程池与线程池

    在刚开始学多进程或多线程时,我们迫不及待地基于多进程或多线程实现并发的套接字通信,然而这种实现方式的致命缺陷是:服务的开启的进程数或线程数都会随着并发的客户端数目地增多而增多,这会对服务端主机带来巨大的压力,甚至于不堪重负而瘫痪,于是我们必须对服务端开启的进程数或线程数加以控制,让机器在一个自己可以承受的范围内运行,这就是进程池或线程池的用途,例如进程池,就是用来存放进程的池子,本质还是基于多进程,只不过是对开启进程的数目加上了限制。

    前戏

    官网:(https://docs.python.org/dev/library/concurrent.futures.html)
    
    concurrent.futures模块提供了高度封装的异步调用接口
    ThreadPoolExecutor:线程池,提供异步调用
    ProcessPoolExecutor: 进程池,提供异步调用
    
    Both implement the same interface, which is defined by the abstract Executor class.
    

    基础属性

    	1、submit(fn, *args, **kwargs)
    	异步提交任务
    
    	2、map(func, *iterables, timeout=None, chunksize=1) 
    	取代for循环submit的操作
    
    	3、shutdown(wait=True) 
    	相当于进程池的pool.close()+pool.join()操作
    	wait=True,等待池内所有任务执行完毕回收完资源后才继续
    	wait=False,立即返回,并不会等待池内的任务执行完毕
    	但不管wait参数为何值,整个程序都会等到所有任务执行完毕
    	submit和map必须在shutdown之前
    
    	4、result(timeout=None)
    	取得结果
    
    	5、add_done_callback(fn)
    	回调函数
    

    3.进程池 | 线程池

    The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.
    
    class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None)
    An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.
    

    基础用法

    # from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
    # import os
    # import time
    # import random
    #
    #
    # def task(name):
    #     """
    #
    #     :param name:
    #     :return:
    #     """
    #     print("%s is running pid:%s" % (name, os.getpid()))
    #     time.sleep(random.randint(1, 8))
    #
    #
    # if __name__ == '__main__':
    #     # pool = ProcessPoolExecutor(3)  # 线程池与进程池属性方法基本相同 
    #     pool = ThreadPoolExecutor(3)
    #     for i in range(10):
    #         pool.submit(task, "alex-%s" % i)  # 异步调用
    #
    #     print("main Process")
    

    map的用法

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    
    import os,time,random
    def task(n):
        print('%s is runing' %os.getpid())
        time.sleep(random.randint(1,3))
        return n**2
    
    if __name__ == '__main__':
    
        executor=ThreadPoolExecutor(max_workers=3)
    
        # for i in range(11):
        #     future=executor.submit(task,i)
    
        executor.map(task,range(1,12)) #map取代了for+submit
    	
    

    回调函数

    可以为进程池或线程池内的每个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发,并接收任务的返回值当作参数,该函数称为回调函数

    # 1.同步调用 : 同步调用:提交完任务后,就在原地等待任务执行完毕,
    # # 拿到结果,再执行下一行代码,导致程序是串行执行
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    import time
    import random
    
    
    def prepare(name):
        """
        :param name:
        :return:
        """
        print("%s is prepare the dinner !" % name)
        time.sleep(random.randint(3, 6))
        dinner = chr(random.randint(65, 90))
        return {"name": name, "dinner": dinner}
    
    
    
    def eat(dinner_dict_obj):
        """
        :param dinner_dict_obj:
        :return:
        """
        dinner_dict = dinner_dict_obj.result()  # 返回一个对象 然后 result() 来返回结果
        print("we having dinner %s is cooked by %s" % (dinner_dict["dinner"], dinner_dict["name"]))
    
    
    if __name__ == '__main__':
        pool = ThreadPoolExecutor(2)
        # name_list = ["alex", "wxx", "tony", "jack"]
        # for name in name_list:
        #     # 2、异步调用:提交完任务后,不地等待任务执行完毕,
        #     pool.submit(prepare, name).add_done_callback(eat)  # 异步调用 拿到返回的执行结果future对象给一步函数
    
        # 1.同步调用 : 同步调用:提交完任务后,就在原地等待任务执行完毕,
        # 拿到结果,再执行下一行代码,导致程序是串行执行
        # dinner_dict = pool.submit(prepare, "alex").result()
    	# eat(dinner_dict)
    

    基于异步调用的简单网络爬虫

    
    from concurrent.futures import ThreadPoolExecutor
    import requests
    import time
    import re
    
    
    def get(url):
        print("download the %s" % url)
        time.sleep(3)
        resp = requests.get(url)
        # print(resp.text)
        return {"url": url, "content": resp.text}
    
    
    def parse(resp_text_obj):
        resp_text_dict = resp_text_obj.result()
        par = re.compile(r"src=.*.jpg|http://.*.jpg") # 正则提取数据
        print(re.findall(par, resp_text_dict["content"]))
        print("%s is len %s" % (resp_text_dict["url"], len(resp_text_dict["content"])))
    
    
    if __name__ == '__main__':
        pool = ThreadPoolExecutor(3)
        urls = [
    
            "http://www.cnblogs.com/zjcode/p/8650090.html",
            "http://www.521609.com/",
            "http://dig.chouti.com/",
            "http://www.bootcss.com/"
        ]
        for url in urls:
    			pool.submit(get, url).add_done_callback(parse)
    
  • 相关阅读:
    Mongodb_文件存储
    Mongodb_技巧
    Blend_Effect
    WPF_界面_图片/界面/文字模糊解决之道整理
    ASP.NET Boilerplate 深入系列之:概述
    P1280 尼克的任务
    P1802 5倍经验日
    271. 杨老师的照相排列
    P1726 上白泽慧音
    P1983 [NOIP2013 普及组] 车站分级
  • 原文地址:https://www.cnblogs.com/zjcode/p/9025358.html
Copyright © 2011-2022 走看看