zoukankan      html  css  js  c++  java
  • python 多进程——使用进程池,多进程消费的数据)是一个队列的时候,他会自动去队列里依次取数据

    我的mac 4核,因此每次执行的时候同时开启4个线程处理:

    # coding: utf-8
    
    import time
    from multiprocessing import Pool
    
    
    def long_time_task(name):
        print 'task %s starts running' % name
        time.sleep(3)
        print 'task %s ends running --3 seconds' % name
    
    
    if __name__ == '__main__':
        start = time.time()
        p = Pool()
        for i in range(10):  # CPU有几核,每次就取出几个进程
            p.apply_async(func=long_time_task, args=(i,))
        p.close()  # 调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了
        p.join()  # 对Pool对象调用join()方法会等待所有子进程执行完毕
        end = time.time()
        print('多进程(非阻塞)执行共需时间为:%.2f' % (end - start))

    运行效果:

    task 0 starts running
    task 1 starts running
    task 2 starts running
    task 3 starts running
    task 0 ends running --3 seconds
    task 1 ends running --3 seconds
    task 3 ends running --3 seconds
    task 2 ends running --3 seconds
    task 4 starts running
    task 5 starts running
    task 6 starts running
    task 7 starts running
    task 5 ends running --3 seconds
    task 4 ends running --3 seconds
    task 7 ends running --3 seconds
    task 6 ends running --3 seconds
    task 8 starts running
    task 9 starts running
    task 8 ends running --3 seconds
    task 9 ends running --3 seconds
    多进程(非阻塞)执行共需时间为:9.13

    解释:

    CPU先取出0-3号进程,执行完毕后,4~8号进程才开始执行。0-3号进程花了3秒钟,4~8号 进程也花了3秒。最后两个进程9,10又花了三秒,一共9秒。

    也就意味着,我的代码可以这样写,当history_ddos(多进程消费的数据)是一个队列的时候,他会自动去队列里依次取数据

        f = open("history_ddos.json", "r")
        history_ddos = json.load(f)
        f.close()
    
        # 10表示进程池中最多有10个进程一起执行
        p = Pool(10)
        for item in history_ddos:
            # find_ddos_botnet(item)
            p.apply_async(func=find_ddos_botnet(), args=(item,))
        p.close()
        p.join()

     多个子进程并返回值 apply_async()本身就可以返回被进程调用的函数的返回值。上一个创建多个子进程的代码中,如果在函数func中返回一个值,那么pool.apply_async(func, (msg, ))的结果就是返回pool中所有进程的值的对象(注意是对象,不是值本身)。

    import multiprocessing
        import time
         
        def func(msg):
            return multiprocessing.current_process().name + '-' + msg
         
        if __name__ == "__main__":
            pool = multiprocessing.Pool(processes=4) # 创建4个进程
            results = []
            for i in xrange(10):
                msg = "hello %d" %(i)
                results.append(pool.apply_async(func, (msg, )))
            pool.close() # 关闭进程池,表示不能再往进程池中添加进程,需要在join之前调用
            pool.join() # 等待进程池中的所有进程执行完毕
            print ("Sub-process(es) done.")
         
            for res in results:
                print (res.get())

    结果:

    Sub-process(es) done.
    ForkPoolWorker-1-hello 0
    ForkPoolWorker-2-hello 1
    ForkPoolWorker-3-hello 2
    ForkPoolWorker-1-hello 3
    ForkPoolWorker-4-hello 4
    ForkPoolWorker-1-hello 5
    ForkPoolWorker-2-hello 6
    ForkPoolWorker-1-hello 7
    ForkPoolWorker-2-hello 8
    ForkPoolWorker-3-hello 9

  • 相关阅读:
    【算法】数据结构
    【POJ】1222 EXTENDED LIGHTS OUT
    【BZOJ】1013 [JSOI2008]球形空间产生器sphere
    【有上下界网络流】【ZOJ】2314 Reactor Cooling
    【CODEVS】1281 Xn数列
    【POJ】3070 Fibonacci
    【CODEVS】3546 矩阵链乘法
    【BZOJ】1070: [SCOI2007]修车
    Quoit Design(hdu 1007)
    tree(poj 1741)
  • 原文地址:https://www.cnblogs.com/bonelee/p/9645429.html
Copyright © 2011-2022 走看看