zoukankan      html  css  js  c++  java
  • python多进程,进程池,数据共享,进程通信,分布式进程

    一、操作系统中相关进程的知识

      Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。
      子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。
      Python的os模块封装了常见的系统调用,其中就包括fork,可以在Python程序中轻松创建子进程。

      示例如下

    import os
    pid=os.fork()
    if pid==0:
        print('I am child process %s my parents is %s'%(os.getpid(),os.getppid()))
    else:
        print('I (%s) just created a child process (%s).'%(os.getpid(),pid))
    

      输出如下

    I (64225) just created a child process (64226).
    I am child process 64226 my parents is 64225
    

    二、跨平台模块multiprocessing

    multiprocessing模块提供了一个Process类来代表一个进程对象。

      示例1

    from multiprocessing import Process
    import os
    
    # 子进程要执行的代码
    def run_proc(name):
        print('Run child process %s (%s)...' % (name, os.getpid()))
    
    if __name__=='__main__':
        print('Parent process %s.' % os.getppid())
        p = Process(target=run_proc, args=('test',))
        print('Child process will start.')
        p.start()
        p.join()
        print('Child process end.')
    #join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。  
    

      示例2

    from multiprocessing import Process
    import time
    import os
    class P(Process):
        def run(self):
            print('Run child process %s (%s)...'%(self.name,os.getpid()))  # 默认函数对象有name方法 ,结果为:P-1
            time.sleep(3)
            print('%s is done' % self.name)
    if __name__ == '__main__':
        print('Parent process %s.' % os.getppid())
        p=P()
        p.start()
        p.join()
    

    三、进程数据隔离

    多个进程间的数据是隔离的,也就是说多个进程修改全局变量互不影响

      验证示例

    from multiprocessing import Process
    import time
    x=100
    def task():
        global x
        print('子进程开启,当前x的值为%d'%x)
        time.sleep(3)
        x=10
        print('子进程结束,当前x的值为%d'%x)
    
    if __name__ == '__main__':
        print('当前为父进程,准备开启子进程,x的值为%d' % x)
        p1=Process(target=task)
        p1.start()
        p1.join()
        print('当前为父进程,准备结束父进程,x的值为%d' % x)
    

      输出

    当前为父进程,准备开启子进程,x的值为100
    子进程开启,当前x的值为100
    子进程结束,当前x的值为10
    当前为父进程,准备结束父进程,x的值为100
    

    注意:有些情况是需要加锁的情况,如文件读写问题

    四、多进程并行执行

      示例如下

    import time
    from multiprocessing import Process
    
    def task(name,n):
        print('%s is running'%name)
        time.sleep(n)
        print('%s is done'%name)
    
    if __name__ == '__main__':
        p1=Process(target=task,args=("进程1",1)) #用时1s
        p2=Process(target=task,args=("进程2",2)) #用时1s
        p3=Process(target=task,args=("进程3",3)) #用时1s
        
        start_time=time.time()
        p1.start()
        p2.start()
        p3.start()
        # 当第一秒在运行p1时,其实p2、p3也已经在运行,当1s后到p2时只需要再运行1s就到p3了,到p3也是一样。
        p1.join()
        p2.join()
        p3.join()
        stop_time=time.time()     
        print(stop_time-start_time) #3.2848567962646484
    

    五、进程池

    1、线性执行( pool.apply() )

    from multiprocessing import Pool  # 导入进程池模块pool
    import time,os
    def foo(i):
        time.sleep(2)
        print("in process", os.getpid())  # 打印进程号
    if __name__ == "__main__":
        pool = Pool(processes=5)   # 设置允许进程池同时放入5个进程
        for i in range(10):
            pool.apply(func=foo, args=(i,))   # 同步执行挂起进程
        print('end')
        pool.close() # 关闭进程池,不再接受新进程
        pool.join()  # 进程池中进程执行完毕后再关闭,如果注释掉,那么程序直接关闭。
    

    2、并发执行( pool.apply_async() )

    from multiprocessing import Pool  # 导入进程池模块pool
    import time,os
    def foo(i):
        time.sleep(2)
        print("in process", os.getpid())  # 打印进程号
    if __name__ == "__main__":
        pool = Pool(processes=5)   # 设置允许进程池同时放入5个进程,并且将这5个进程交给cpu去运行
        for i in range(10):
            pool.apply_async(func=foo, args=(i,))   # 采用异步方式执行foo函数
        print('end')
        pool.close()
        pool.join()  # 进程池中进程执行完毕后再关闭,如果注释掉,那么程序直接关闭。
    

    3、设置回调

    from multiprocessing import Process,Pool
    import time,os
    def foo(i):
        time.sleep(2)
        print("in process", os.getpid())  # 打印子进程的进程号
    def bar(arg):#注意arg参数是必须要有的
        print('-->exec done:', arg, os.getpid())   # 打印进程号
     
    if __name__ == "__main__":
        pool = Pool(processes=2)
        print("主进程", os.getpid())   # 主进程的进程号
        for i in range(3):
            pool.apply_async(func=foo, args=(i,), callback=bar)   # 执行回调函数callback=Bar
        print('end')
        pool.close()
        pool.join()  # 进程池中进程执行完毕后再关闭,如果注释掉,那么程序直接关闭。
    

      执行结果

    主进程 752
    end
    in process 2348
    -->exec done: None 752
    in process 8364
    -->exec done: None 752
    in process 2348
    -->exec done: None 752
    #回调函数说明fun=Foo干不完就不执行bar函数,等Foo执行完就去执行Bar
    #这个回调函数是主进程去调用的,而不是每个子进程去调用的。
    

    六、子进程

      1、 很多时候子进程是一个外部进程,如执行一条命令,这和命令行执行效果是一样的
      示例如下

    import subprocess
    print('$nslookup https://www.baidu.com')
    r = subprocess.call(['nslookup','https://www.baidu.com'])
    print('Exit code',r)
    

      2、 有时候子进程还需要进行输入,可以通过communicate方法来输入
      示例如下

    import subprocess
    print('$ nslookup https://www.baidu.com')
    p = subprocess.Popen(['nslookup'],stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
    output,err = p.communicate(b'set q=mx
    baidu.com
    exit
    ')
    print(output.decode('gbk'))
    print('Exit code:',p.returncode)
    

      输出如下

    $ nslookup https://www.baidu.com
    默认服务器:  bogon
    Address:  192.168.111.1
    
    > > 服务器:  bogon
    Address:  192.168.111.1
    
    baidu.com	MX preference = 10, mail exchanger = mx.maillb.baidu.com
    baidu.com	MX preference = 20, mail exchanger = jpmx.baidu.com
    baidu.com	MX preference = 15, mail exchanger = mx.n.shifen.com
    baidu.com	MX preference = 20, mail exchanger = mx50.baidu.com
    baidu.com	MX preference = 20, mail exchanger = mx1.baidu.com
    > 
    Exit code: 0
    

    七、守护进程

    守护进程在主进程代码执行完毕时立刻挂掉,然后主进程等待非守护进程执行完毕后回收子进程的资源(避免产生僵尸进程),整体才算结束。
    示例

    from multiprocessing import Process
    import os
    import time
    
    def task(x):
        print('%s is running ' %x)
        time.sleep(3)
        print('%s is done' %x)
    
    if __name__ == '__main__':
        p1=Process(target=task,args=('守护进程',))
        p2=Process(target=task,args=('子进程',))
        p2.start()
        p1.daemon=True   # 设置p1为守护进程
        p1.start()
        print('主进程代码执行完毕')
    
    >>:主进程代码执行完毕
    >>:子进程 is running
    >>:子进程 is done
    

    可以从结果看出,主进程代码执行完,守护进程立即挂掉,主进程在等待子进程执行完毕后退出

    八、进程间通信

      如果想要进程间通信可以使用QueuePipe来实现
      使用Queue示例

    from multiprocessing import Queue,Process
    def put_id(q):
         q.put([1,2,3,4])
    if __name__ == '__main__':
         q=Queue()
         p=Process(target=put_id,args=(q,))
         p.start()
         print(q.get())
         p.join()
    # 输出
    [1,2,3,4]
    

    注意:在这需要从multiprocessing导入Queue模块

      使用Pipe示例

    from multiprocessing import Process,Pipe
    def put_id(conn):
        conn.send([1,2,3])
        conn.send([4,5,6])
        conn.close()
        
    if __name__ == '__main__':
        ## 生成管道。 生成时会产生两个返回对象,这两个对象相当于两端的电话,通过管道线路连接。
        ## 两个对象分别交给两个变量。
        parent_conn,child_conn=Pipe()
        p=Process(target=put_id,args=(child_conn,))#child_conn需要传给对端,用于send数据给parent_conn
        p.start()
        print(parent_conn.recv())  # parent_conn在这断用于接收数据>>>>[1,2,3]
        print(parent_conn.recv())  # parent_conn在这断用于接收数据>>>>[4,5,6]
        p.join()
    

    注意两端要发送次数和接受次数要对等,不然会卡住直到对等

    九、进程间数据共享(字典和列表型)

      前面说过,进程间数据是隔离的,如果想要进程间数据共享可以通过Manager来实现
      示例如下

    from multiprocessing import Manager,Process
    from random import randint
    import os
    def run(d,l):
        d[randint(1,50)]=randint(51,100)#生成一个可在多个进程之间传递和共享的字典
        l.append(os.getpid())
        print(l)
    if __name__ == '__main__':
        with Manager() as manage: #做一个别名,此时manager就相当于Manager()
            d=manage.dict()#生成一个可在多个进程之间传递和共享的字典
            l=manage.list(range(5))#生成一个可在多个进程之间传递和共享的列表
            p_list=[]
            for i in range(10):#生成10个进程
                p=Process(target=run,args=(d,l))
                p_list.append(p)# 将每个进程放入空列表中
                p.start()
            for i in p_list:
                i.join()
            print(d)#所有进程都执行完毕后打印字典
            print(l)#所有进程都执行完毕后打印列表
    

    十、分布式进程

      在做分布式计算时显然进程比线程各合适,一来进程更稳定,二来线程最多只能在同一台机器的多个cpu上运行;
      multiprocessingmanagers子模块支持把多进程分布到多个机器上,一个服务进程用作调度者,依靠网络将任务分布到其它多个进程中。
      假设有一个需求,拥有两台机器,一台机器用来做发送任务的服务进程,一台用来做处理任务的服务进程;
      示例如下

    # task_master.py
    from multiprocessing.managers import BaseManager
    from queue import Queue
    import random
    import time
    
    task_queue = Queue()
    result_queue = Queue()
    
    class QueueManager(BaseManager):
            pass
    
    def get_task_queue():
        global task_queue
        return task_queue
    
    
    def get_result_queue():
        global result_queue
        return result_queue
    
    
    if __name__ == '__main__':
        # 将两个队列注册到网络上,calltable参数关联Queue对象
        QueueManager.register('get_task_queue', callable=get_task_queue)
        QueueManager.register('get_result_queue', callable=get_result_queue)
    
        # 创建一个队列管理器,绑定端口5000,设定密码为abc
        manager = QueueManager(address=('127.0.0.1',5000),authkey=b'abc')
        manager.start()
    
        # 通过网络获取Queue对象
        task = manager.get_task_queue()
        result = manager.get_result_queue()
    
        # 放任务进去
        for i in range(10):
            n = random.randint(0,1000)
            print('Put Task %d'%n)
            task.put(n)
    
        # 从结果队列获取结果
        print('Try get results')
        for i in range(10):
            r = result.get()
            print('Result: %s' % r)
    
        manager.shutdown()
        print('master exit')
    

    注意:一定要用注册过的Queue对象,另外在linux/unix/mac等系统上注册可直接使用QueueManager.register('get_result_queue', callable=lambda : result_queue)

    # task_worker.py
    from multiprocessing.managers import BaseManager
    from queue import Queue
    from queue import Empty
    import time
    
    class QueueManager(BaseManager):
        pass
    
    if __name__ == '__main__':
        # 从服务器上获取,所以注册时只需要提供名字,也就是接口名字
        QueueManager.register('get_task_queue')
        QueueManager.register('get_result_queue')
    
        # 连接到服务器,也就是task_master.py的机器
        server_addr = '127.0.0.1'
        manager = QueueManager(address=(server_addr,5000),authkey=b'abc')
        manager.connect()
    
        # 获取Queue对象
        task = manager.get_task_queue()
        result = manager.get_result_queue()
    
        # 从队列提取任务,将处理结果插入result队列
        for i in range(10):
            try:
                n = task.get(timeout=1)
                print('run task %d*%d'%(n,n))
                r = '%d * %d = %d'%(n,n,n*n)
                time.sleep(1)
                result.put(r)
            except Empty:
                print('task queue is empty')
        print('worker exit')
    
  • 相关阅读:
    Windows永久修改pip安装源
    MySQL 超键 候选键 主键 外键是什么
    scrapy中间件
    crawlspider
    scrapy
    MongoDB 命令
    如何反扒
    表前缀sw_时
    自增序号,而且默认变量就是$i,也就是说在你的volist标签之内,可以直接使用$i
    html,if标签使用
  • 原文地址:https://www.cnblogs.com/FuckSpider/p/11552031.html
Copyright © 2011-2022 走看看