zoukankan      html  css  js  c++  java
  • python10

     一、多进程multiprocessing

           multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。

    复制代码
     1 import multiprocessing,threading
     2 import time
     3 
     4 def thread_run():
     5     print(threading.get_ident())
     6 def run(name):
     7     time.sleep(2)
     8     print('Hello ',name)
     9     t = threading.Thread(target=thread_run)
    10     t.start()
    11 
    12 
    13 if __name__== '__main__':
    14     for i in range(10):
    15         p = multiprocessing.Process(target=run, args=('bob_%s'%i,))
    16         p.start()
    复制代码

    运行结果:

    复制代码
     1 Hello  bob_0
     2 1144
     3 Hello  bob_8
     4 1268
     5 Hello  bob_4
     6 4360
     7 Hello  bob_2
     8 768
     9 Hello  bob_6
    10 5308
    11 Hello  bob_5
    12 Hello  bob_1
    13 6076
    14 5088
    15 Hello  bob_9
    16 6104
    17 Hello  bob_3
    18 5196
    19 Hello  bob_7
    20 748
    复制代码

    二、进程池

          如果要创建多个进程,可以使用进程池,启用进程池需要使用Pool库,使用指令pool=Pool()可自动调用所有CPU,

    map()函数相当于一个循环,将参数2中的列表元素逐次灌入参数1的函数中。

    复制代码
     1 from multiprocessing import Pool
     2 
     3 def squre(num):
     4     return num ** 2
     5 
     6 
     7 if __name__ == '__main__':
     8     numbers = [0,1,2,3,4,5]
     9     pool = Pool(processes=5)#进程池中最多能放入5个进程
    10     print(pool.map(squre,numbers))
    复制代码

     运行结果:

    [0, 1, 4, 9, 16, 25]

     Pool还有以下常用的方法:

    • apply_async(func, args)从进程池中取出一个进程执行func,args为func的参数。它将返回一个AsyncResult的对象,我们可以调用get()方法以获得结果。
    • close() 关闭进程池,不再创建新的进程
    • join() wait()进程池中的全部进程。但是必须对Pool先调用close()方法才能join.
    复制代码
     1 from  multiprocessing import Process, Pool,freeze_support
     2 import time
     3 import os
     4 
     5 def Foo(i):
     6     time.sleep(2)
     7     print("in process",os.getpid())
     8     return i + 100
     9 
    10 def Bar(arg):
    11     print('-->exec done:', arg,os.getpid())
    12 
    13 if __name__ == '__main__':
    14     #freeze_support()
    15     pool = Pool(processes=3)
    16     print("主进程",os.getpid())
    17     for i in range(10):
    18         pool.apply_async(func=Foo, args=(i,), callback=Bar) #callback=回调
    19         
    20     print('end')
    21     pool.close()
    22     pool.join() 
    复制代码

     运行结果:

    复制代码
     1 主进程 5660
     2 end
     3 in process 7048
     4 -->exec done: 100 5660
     5 in process 3396
     6 -->exec done: 101 5660
     7 in process 6728
     8 -->exec done: 102 5660
     9 in process 7048
    10 -->exec done: 103 5660
    11 in process 3396
    12 -->exec done: 104 5660
    13 in process 6728
    14 -->exec done: 105 5660
    15 in process 7048
    16 -->exec done: 106 5660
    17 in process 3396
    18 -->exec done: 107 5660
    19 in process 6728
    20 -->exec done: 108 5660
    21 in process 7048
    22 -->exec done: 109 5660
    复制代码

    除了主进程,其它结果是三个一组执行的,因为进程池中每次最多有三个进程。

    三、进程通信

          进程间通信常用两种方法:Queue和pipe,Queue可以用在多个进程间实现通信,pipe用在两个进程间通信。

    复制代码
     1 import os
     2 import multiprocessing
     3 import time
     4 #==================
     5 # input worker
     6 def inputQ(queue):
     7     info = str(os.getpid()) + '(put):' + str(time.time())
     8     queue.put(info)
     9 
    10 # output worker
    11 def outputQ(queue,lock):
    12     info = queue.get()
    13     lock.acquire()
    14     print (str(os.getpid()) + '(get):' + info)
    15     lock.release()
    16 
    17 if __name__ == '__main__':
    18     record1 = []  # store input processes
    19     record2 = []  # store output processes
    20     lock = multiprocessing.Lock()  # To prevent messy print
    21     queue = multiprocessing.Queue(3)
    22 
    23     
    24     for i in range(10):
    25         process = multiprocessing.Process(target=inputQ, args=(queue,))
    26         process.start()
    27         record1.append(process)
    28 
    29    
    30     for i in range(10):
    31         process = multiprocessing.Process(target=outputQ, args=(queue, lock))
    32         process.start()
    33         record2.append(process)
    34 
    35     for p in record1:
    36         p.join()
    37 
    38     queue.close()  
    39 
    40     for p in record2:
    41         p.join()
    复制代码

    运行结果:

    复制代码
     1 4004(get):4556(put):1476337412.4875286
     2 512(get):5088(put):1476337412.6345284
     3 8828(get):7828(put):1476337412.7965286
     4 8372(get):1032(put):1476337412.8185284
     5 7740(get):1496(put):1476337412.9205284
     6 4176(get):632(put):1476337412.9855285
     7 5828(get):8508(put):1476337412.9595284
     8 4236(get):9204(put):1476337412.9925284
     9 7632(get):8956(put):1476337413.2055285
    10 6376(get):4160(put):1476337413.0705285
    复制代码

          Pipe可以是单向(half-duplex),也可以是双向(duplex)。我们通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默认为双向)。一个进程从PIPE一端输入对象,然后被PIPE另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。

    复制代码
     1 from multiprocessing import Process, Pipe
     2 
     3 
     4 def f(conn):
     5     conn.send([42, None, 'hello from child'])
     6     conn.send([42, None, 'hello from child2'])
     7     print("from parent:",conn.recv())
     8     conn.close()
     9 
    10 if __name__ == '__main__':
    11     parent_conn, child_conn = Pipe()
    12     p = Process(target=f, args=(child_conn,))
    13     p.start()
    14     print(parent_conn.recv())  # prints "[42, None, 'hello']"
    15     print(parent_conn.recv())  # prints "[42, None, 'hello']"
    16     parent_conn.send("chupi可好") # prints "[42, None, 'hello']"
    17     p.join()
    复制代码

    运行结果:

    1 [42, None, 'hello from child']
    2 [42, None, 'hello from child2']
    3 from parent: chupi可好
  • 相关阅读:
    自己的思考
    spring MVC整合freemarker
    手把手教你搭建SpringMVC——最小化配置
    深入hibernate的三种状态
    maven 构建slf4j1.7.7之简单测试与源码解析
    maven 构建slf4j1.7.7之简单测试与源码解析
    (转)URI和URL的区别
    Spring缓存机制的理解
    (转)oracle 高水位线详解
    (转)PL/SQL Developer使用技巧、快捷键
  • 原文地址:https://www.cnblogs.com/mrdz/p/5972202.html
Copyright © 2011-2022 走看看