zoukankan      html  css  js  c++  java
  • [python 并行3]进程

    进程篇

    基本使用 1

    #coding=utf-8
    
    import multiprocessing
    import os		# 获取pid用
    import time		# 延时用
    
    # 子进程要执行的函数
    def child_proc(name):
    	print(f'child process {name} pid: {os.getpid()}')
    	time.sleep(3)
    	print(f'{name} finish')
    
    # 主进程,必须在主模块中执行
    if __name__ == '__main__':
    	print(f'parent process {os.getpid()} is running')
    
    	# 生成子进程
    	p1 = multiprocessing.Process(target = child_proc, args = ('child-1',))
    	p2 = multiprocessing.Process(target = child_proc, args = ('child-2',))
    	p1.start()
    	p2.start()
    
    	print(f'parent process {os.getpid()} is end')
    

    输出

    parent process 20114 is running
    parent process 20114 is end
    child process child-1 pid: 20115
    child process child-2 pid: 20116
    child-1 finish
    child-2 finish
    

    注意! Python官方文档提到为何必须要使用if __name__ = '__main__',由于该包的所有功能都需要将主模块导入到子模块中,但是IDLE无法将__main__模块导入子模块,所以只能在文件中编辑好程序执行
    更多multiprocessing — Process-based parallelism


    multiprocessing模块

    :根据查看multiprocessing模块声明,第一行注释显示# Package analogous to 'threading.py' but using processes,可以发现进程的操作与线程相似 (甚至多进程的模块直接就是线程模块改过来的)

    函数声明: class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
    (group: 官方预留的参数)
    target: 子线程要执行的函数
    name: 给子线程命名
    args: 传递参数到要执行的函数中 (类型为元组)
    daemon: 将线程设置为后台线程 2

    Thread类包含的方法:

    • start(): 开始进程,它会安排在单独的控制进程中使该对象的run()方法被调用 (invoked) (如果多次调用,会发生错误AssertionError
    • run(): 你可以在子类中重写这个方法,标准的run()方法会在构造器传递了target参数后调用它
    • join(timeout=None): 阻塞当前进程,直到等待调用了join()的进程结束,或到达设置的超时timeout的参数为止
    • name: 进程名
    • is_alive(): 判断进程是否在运行
    • daemon: 是否为后台进程的属性值
    • pid: 返回进程的id
    • terminate(): 结束进程
      (在Unix上,使用SIGTERM信号量完成;在windows上使用TerminateProcess())
      (请注意,不会执行退出处理程序和最后的子句等。请注意,进程的后代进程不会被终止 - 它们将简单地变成孤立的。)
      (注:如果在关联进程使用管道或队列时使用此方法,则管道或队列可能会损坏,并可能被其他进程无法使用。 类似地,如果进程已获得锁或信号量等,则终止它可能导致其他进程死锁。)
    • kill(): 杀掉进程,与terminate()相同,但在Unix中使用SIGKILL信号量
    • close(): 关闭进程对象,释放所有与之相关的资源。如果是还在运行,会raised错误ValueError,第一次调用会返回成功,其它调用会raise错误ValueError
    • exitcode: 子进程的退出码。如果不是被terminate终止的,将会是None;如果被信号量N终止的,将会返回-N
    • authkey: 进程的身份钥匙(1字节字符串)。当multipriocessing在主进程中被初始化时,会使用os.urandom()标记一个随机字符串。当一个进程对象被创建时,它将会从父进程继承这个身份钥匙,虽然它可能会被改为其它字节字符串
    • sentinel: (哨兵)当进程结束时,一个数值的系统对象处理将变为ready。如果你想立即要等待几个事件,你能用这个值使用multiprocessing.connection.wait(),否则调用join()更简单。

    Pool进程池

    如果进程太多,超过了CPU核数,会导致进程之间的来回切换,影响性能。可以通过创建进程池,把进程加入到里面,如果池中进程没满,就会创建一个进程来执行请求;如果池中进程达到规定的最大值,那么请求会等待
    函数声明: class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
    processes: 指定池中进程数量,如果不指定,则为None,默认就会使用os.cpu_count()
    initializer: 如果不是None,每个进程在开始时都会调用initializer(*initargs)
    maxtasksperchild: 工作进程在退出和替换新的工作进程之前,可以完成的任务数量,以释放资源。默认为None,表示工作进程和这个池的生存时间一样长
    context: 用来指定工作进程的上下文

    下面有2种方法添加进程到进程池中,分别是apply_asyncapply

    apply_async

    apply_async()用来同步执行进程,允许多个进程同时进入池;是异步非阻塞的
    函数声明: apply_async(func[, args[, kwds[, callback[, error_callback]]]])
    callback: 如果指定了回调,那么它应该是一个可调用的,它接受一个参数

    #coding=utf-8
    
    import multiprocessing
    import time		# 延时用
    
    # 子进程要执行的函数
    def child_proc(index):
    	print(f'{index} process is running')
    	time.sleep(3)
    	print(f'{index} process is end')
    
    # 主进程,必须在主模块中执行
    if __name__ == '__main__':
    	print(f'all process start')
    
    	# 生成进程池
    	p = multiprocessing.Pool()
    	for i in range(5):
    		p.apply_async(func = child_proc, args = (i,))
    	p.close()
    	p.join()	# 注! 如果不执行此句,将会直接退出主进程
    
    	print(f'all process done!')
    

    输出

    all process start
    0 process is running
    1 process is running
    2 process is running
    3 process is running
    4 process is running
    0 process is end
    3 process is end
    2 process is end
    4 process is end
    1 process is end
    all process done!
    

    apply

    apply(): 只允许一个进程进入池,在一个进程结束后,另一个才能进入;是阻塞的
    函数声明: apply(func[, args[, kwds]])

    p.apply(func = child_proc, args = (i,))	# 将上面代码中的apply_async换成apply即可
    

    输出

    all process start
    0 process is running
    0 process is end
    1 process is running
    1 process is end
    2 process is running
    2 process is end
    3 process is running
    3 process is end
    4 process is running
    4 process is end
    all process done!
    

    分析:对比apply_async和apply的输出,可以发现,apply_async是同步执行的,而apply是一个一个进入池中执行的


    数据共享

    参考:Python多进程编程-进程间共享数据
    PipeQueue都有一定的数据共享功能,但他们会阻塞进程)
    Queue:采用共享队列的内存的方式共享数据
    注意:存在queue.Queuemultiprocessing.Queue两种队列
    queue.Queue:是进程内非阻塞队列,各进程私有
    multiprocessing.Queue:是跨进程通信队列,各个子进程共有

    共享内存:使用multiprocessing的ValueArray类,实现共享内存的方式共享数据
    共享进程:使用multiprocessing的Manager类,实现共享进程的方式共享数据

    获取返回值(仅主进程获有数据)

    针对进程池实现的方式,可以直接通过获取进程对象的返回值

    #coding=utf-8
    
    import multiprocessing
    
    # 子进程要执行的函数
    def child_proc(x, y):
    	return x + y
    
    # 主进程,必须在主模块中执行
    if __name__ == '__main__':
    	# 生成进程池
    	p = multiprocessing.Pool()
    	z = p.apply(func = child_proc, args = (1, 2))
    	print(z)
    

    Queue

    使用multiprocessing的Queue类,实现进程之间的数据共享

    #coding=utf-8
    
    from multiprocessing import Process, Queue
    
    # 子进程要执行的函数
    def child_proc(queue):
    	num = queue.get()
    	num += 10
    	queue.put(num)
    
    # 主进程,必须在主模块中执行
    if __name__ == '__main__':
    	# 创建共享数据
    	queue = Queue()
    	queue.put(1000)
    
    	# 创建进程
    	p1 = Process(target = child_proc, args = (queue,))
    	p2 = Process(target = child_proc, args = (queue,))
    	p1.start()
    	p2.start()
    	p1.join()
    	p2.join()
    
    	# 打印结果
    	print(queue.get())
    

    Value、Array

    共享内存有2个结构 - ValueArray,它们内部都实现了锁机制,因此是多进程安全的

    #coding=utf-8
    
    from multiprocessing import Process, Value, Array
    
    # 子进程要执行的函数
    def child_proc(num, li):
    	num.value += 100
    	for i in range(len(li)):
    		li[i] += 10
    
    # 主进程,必须在主模块中执行
    if __name__ == '__main__':
    	# 创建共享数据
    	num = Value('d', 0.0)
    	li = Array('i', range(10))
    
    	# 创建进程
    	p1 = Process(target = child_proc, args = (num, li))
    	p2 = Process(target = child_proc, args = (num, li))
    	p1.start()
    	p2.start()
    	p1.join()
    	p2.join()
    
    	# 打印结果
    	print(num.value)
    	for x in li:
    		print(x)
    

    ValueArray都需要设置其中存放值的类型。d:double;i:int;c:char等
    详细:转到multiprocessing.sharedctypes可以查看到各种类型的字符串定义

    Manager

    (上面的共享内存通过Value和Array结构实现,这些值在主进程中管理,很分散)
    Manager通过共享内存来实现共享数据,支持的数据类型很多
    详细multiprocessing.managers

    #coding=utf-8
    
    from multiprocessing import Process, Manager
    
    # 子进程要执行的函数
    def child_proc(dict1, list1):
    	dict1['yourname'] += ' snow'
    	list1[3] += 10
    
    # 主进程,必须在主模块中执行
    if __name__ == '__main__':
    	# 创建共享数据
    	manager = Manager()
    	dict1 = manager.dict()
    	list1 = manager.list(range(4))
    
    	dict1['yourname'] = 'youmux'
    	list1[3] = 0
    
    	# 创建进程
    	p1 = Process(target = child_proc, args = (dict1, list1))
    	p2 = Process(target = child_proc, args = (dict1, list1))
    	p1.start()
    	p2.start()
    	p1.join()
    	p2.join()
    
    	# 打印结果
    	print(dict1['yourname'])
    	for x in list1:
    		print(x)
    

    1.参考书籍: 参考书籍:《Python并行编程手册》

    2.后台线程: 后台线程在主线程停止后就直接停止运行。他们资源(如打开的文件,数据库事务等)可能不会被正确的释放。如果你想要你的线程优美的停止,让他们不要变为后台和使用一个合适的信号机制如事件Event

  • 相关阅读:
    MyBatis映射文件中用#和$传递参数的特点
    使用谷歌浏览器进行Web开发技巧
    YYYY-mm-dd HH:MM:SS 备忘录
    java通过UUID生成16位唯一订单号
    idea如何设置类头注释和方法注释
    如何用符号构建人的思维系统?
    临界点思维模型
    复利思维模型-拥抱人生的指数增长
    提升自我认知的有效方式
    如何去培养顶尖的思维模型?
  • 原文地址:https://www.cnblogs.com/maplesnow/p/12044387.html
Copyright © 2011-2022 走看看