zoukankan      html  css  js  c++  java
  • 39

    1   同一个进程内的队列(多线程)

    先进先出
    import queue                        # 是用于同一进程内的队列,不能做多进程之间的通信
    
    q = queue.Queue()
    # 先进先出
    q.put(1)
    q.put(2)
    q.put(3)
    print(q.get())
    print(q.get())
    进程内通信 Queue()

    先进后出

    import queue 
    
    q = queue.LifoQueue()
    # 后进先出的队列
    q.put(1)
    q.put(2)
    q.put(3)
    print(q.get())
    LifoQueue()

    优先级队列

    import queue  
    
    q = queue.PriorityQueue()
    # 优先级队列,put()方法接收的是一个元组(),第一个位置是优先级,第二个位置是数据
    # 优先级如果是数字,直接比较数值
    # 如果是字符串,是按照 ASCII 码比较的。当ASCII码相同时,会按照先进先出的原则
    q.put((1, 'abc'))
    q.put((5, 'qwe'))
    q.put((-5, 'zxc'))
    print(q.get())
    print(q.get())
    print(chr(48))
    PriorityQueue()


    import queue

    queue.Queue() 先进先出
    queue.LifoQueue() 后进先出
    queue.PriorityQueue() 优先级队列



    优先级队列 q = queue.PriorityQueue()
    q.put() 接收的是一个元组
    元组中第一个参数是:表示当前数据的优先级
    元组中第二个参数是:需要存放到队列中的数据
    优先级的比较(首先保证整个队列中,所有表示优先级的东西类型必须一致)
    如果都是int,比数值的大小
    如果都是str,比较字符串的大小(从第一个字符的ASCII码开始比较)

    2 线程池

    进程池、线程池效率对比
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    from multiprocessing import Pool
    
    #  concurrent.futures 这个模块是异步调用的机制
    #  concurrent.futures 提交任务都是用submit
    #  for + submit 多个任务的提交
    #  shutdown 是等效于Pool中的close+join,是指不允许再继续向池中增加任务,然后让父进程(线程)等待池中所有进程执行完所有任务。
    
    # from multiprocessing import Pool.apply / apply_async
    import time
    
    def func(num):
        sum = 0
        for i in range(num):
            for j in range(i):
                for x in range(j):
                    sum += x ** 2
        print(sum)
    
    if __name__ == '__main__':
        pass
        # pool的进程池的效率演示
        p = Pool(5)
        start = time.time()
        for i in range(100):
            p.apply_async(func,args=(i,))
        p.close()
        p.join()
        print('Pool进程池的效率时间是%s'%(time.time() - start))
    pool的进程池的效率演示

    Pool进程池的效率时间是0.7165861129760742

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    from multiprocessing import Pool
    
    #  concurrent.futures 这个模块是异步调用的机制
    #  concurrent.futures 提交任务都是用submit
    #  for + submit 多个任务的提交
    #  shutdown 是等效于Pool中的close+join,是指不允许再继续向池中增加任务,然后让父进程(线程)等待池中所有进程执行完所有任务。
    
    # from multiprocessing import Pool.apply / apply_async
    import time
    
    def func(num):
        sum = 0
        for i in range(num):
            for j in range(i):
                for x in range(j):
                    sum += x ** 2
        print(sum)
    
    if __name__ == '__main__':
    
       tp = ProcessPoolExecutor(5)
        start = time.time()
        for i in range(100):
            tp.submit(func, i)
        tp.shutdown()  # 等效于 进程池中的 close + join
        print('ProcessPoolExecutor进程池的消耗时间为%s' % (time.time() - start))
    ProcessPoolExecutor多进程的效率演示

    ProcessPoolExecutor进程池的消耗时间为0.8894784450531006




    在一个池子里,放固定数量的线程,这些线程等待任务,一旦有任务来,就有线程自发的去执行任务。

    # concurrent.futures 这个模块是异步调用的机制
    # concurrent.futures 提交任务都是用submit
    # for + submit 多个任务的提交
    # shutdown 是等效于Pool中的close+join,是指不允许再继续向池中增加任务,然后让父进程(线程)等待池中所有进程执行完所有任务。

    如何把多个任务扔进池中?


    from concurrent.futures import ThreadPoolExecutor
    import time
    
    
    
    def func(num):
        sum = 0
        for i in range(num):
            sum += i ** 2
        print(sum)
    
    
    t = ThreadPoolExecutor(20)
    start = time.time()
    t.map(func,range(1000))# 提交多个任务给池中。  等效于 for + submit
    t.shutdown()
    print(time.time() - start)
    多任务的提交


    不同的方式提交多个任务(for+submit 或者 map),拥有不同的拿结果的方式
      如果是map的方式提交任务,结果是一个生成器,采用__next__()的方式去拿结果
    from concurrent.futures import ThreadPoolExecutor
    import time
    
    
    def func(num):
        sum = 0
        # time.sleep(5)
        # print(num) # 异步的效果
        for i in range(num):
            sum += i ** 2
        return sum
    
    t = ThreadPoolExecutor(20)
    
    # 下列代码是用map的方式提交多个任务,对应 拿结果的方法是__next__()  返回的是一个生成器对象
    res = t.map(func, range(1000))
    t.shutdown()
    print(res.__next__())
    print(res.__next__())
    print(res.__next__())
    print(res.__next__())
    map -__next()__ 取返回值

    如果是for+submit的方式提交任务,拿结果用result方法
    # 下列代码是用for + submit提交多个任务的方式,对应拿结果的方法是result
    res_l = []
    for i in range(1000):
        re = t.submit(func,i)
        res_l.append(re)
    # t.shutdown()
    [print(i.result()) for i in res_l]
    # 在Pool进程池中拿结果,是用get方法。   在ThreadPoolExecutor里边拿结果是用result方法
    submit ——xx.result()



    关于回调函数,不管是Pool进程池的方式,还是ProcessPoolExecutor的方式开启进程池,
    回调函数都是由父进程调用


    from concurrent.futures import ProcessPoolExecutor
    # 不管是ProcessPoolExecutor的进程池  还是Pool的进程池,回调函数都是父进程调用的。
    import os
    
    
    
    
    def func(num):
        sum = 0
        for i in range(num):
            sum += i ** 2
        return sum
    
    def call_back_fun(res):
        print(res.result(),os.getpid())
        # print(os.getpid())
    
    if __name__ == '__main__':
        print(os.getpid())
        t = ProcessPoolExecutor(20)
        for i in range(1000):
            t.submit(func,i).add_done_callback(call_back_fun)
        t.shutdown()
    回调函数

    关于回调函数,ThreadPoolExecutor
    回调函数是谁调用????


  • 相关阅读:
    人生十鉴
    ASP.NET MVC 3 Preview 1 发布
    基于IPagedList 的 Asp.Net MVC 分页
    jenkins 更改用户
    MATLAB常用日期和时间函数
    Could not load file or assembly 'AjaxControlToolkit' or one of its dependencies. 错误解决
    ORA00932: inconsistent datatypes: expected A got B
    几种数据库的大数据批量插入
    水晶报表 无效索引。 (异常来自 HRESULT:0x8002000B (DISP_E_BADINDEX))错误的解决
    oracle 更新多条数据
  • 原文地址:https://www.cnblogs.com/zhuangdd/p/12829858.html
Copyright © 2011-2022 走看看