zoukankan      html  css  js  c++  java
  • python爬虫【第2篇】【多进程】

    一、多进程

    1.fork方法(os模块,适用于Lunix系统)

    fork方法:调用1次,返回2次。原因:操作系统经当前进程(父进程)复制出一份进程(子进程),两个进程几乎完全相同,fork方法分别在父进程、子进程中返回,子进程返回值为0,父进程中返回的是子进程的ID。

    普通方法:调用1次,返回1次

    import os
    
    if __name__ == '__main__':
        print 'current Process (%s) start ....'%(os.getpid())        #getpid()用户获取当前进程ID
        pid = os.fork()
        if pid <0:
            print 'error in fork'
        elif pid == 0:
            print 'I am child process (%s)' and my parent process is (%s)',(os.getpid(),os.getppid())
        else:
            print 'I (%s) created a child process (%s).',(os.getpid(),pid)
    
    
    运行结果如下:
    current Process (3052) start ....
    I (3052) created a child process (3053).
    I am child process (3053) and my parent process is (3052)

    2.multiprocessing模块(跨平台)

    import os
    
    # 从multiprocessing模块中导入Process类
    from multiprocessing import Process
    
    def run_proc(name):
        print 'Child process %s (%s) Running...' % (name,os.getpid())
    if __name__ == '__main__':
        print 'Parent process %s.' % os.getpid()
        for i in range(5):
            p = Process(target = run_proc,args = (str(i),))
            print 'Process will start'
            #用于启动进程
            p.start()
        # 用于实现进程间的同步
        p.join()
        print 'Process end'
    
    执行结果如下:
    Parent process 2392.
    Process will start.
    Process will start.
    Process will start.
    Process will start.
    Process will start.
    Child process 2 (10748) Runing...
    Child process 0 (5324) Runing...
    Child process 1 (3196) Runing...
    Child process 3 (4680) Runing...
    Child process 4 (10696) Runing...
    Process end        

     3.multiprocessing模块(进程池)

    功能:提供指定数量的进程供相互调用,默认为CPU核数。新请求提交至Pool中,若池没有满,则创建新的进程执行该请求

    from multiprocessing import pool
    import os,time,random
    
    def run_task(name):
        print 'Task %s (pid = %s) is running...' %(name,os.getpid())
        time.sleep(random.random() * 3)
        print 'Task %s end' %name
        
    if __name__=='__main__':
        print 'Current process %s' %os.getpid()
        p = Pool(processs = 3)
        for i in range(5):
            p.apply_async(run_task,args = (i,))
        print 'Waiting for all subprocesses done...'
        p.close()
        p.join()
        print 'All subprocesses done'
    
    执行结果:
    Current process 9176
    Waiting for all subprocesses done...
    Task 0 (pid = 11012) is running...
    Task 1 (pid = 12464) is running...    
    Task 2 (pid = 11260) is running...
    Task 2 end
    Task 3 (pid = 11260) is running...
    Task 0 end
    Task 4 (pid = 11012) is running...
    Task 1 end
    Task 3 end
    Task 4 end
    All subprocesses done
    
    常见容量为3的进程池,依次向进程池中添加5个任务。
    Pool对象调用join()方法会等待所有子进程执行完毕
    调用join()前须调用close()方法
    调用close()方法后就不能继续添加新的Process

    4.进程间通信

    ①.Queue:可在多个进程间的数据传递(Put和Get两种方法)

    Put方法:插入数据到队列中(blocked,timeoutl两个可选参数,如果blocked为True(默认值)

    并且timeout为正值,

    该方法会阻塞timeout指定时间,直至队列有剩余空间,如果超时,会抛出Queue.Full异常,

    如果blocked为False,但Queue已满,会立即抛出Queue.Full异常)

    Get方法:

    从队列读取并删除一个元素(blcoked,timeout两个可选参数,如果blocked为True(默认值)并且timeout为正值
    name在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,则分为两种情况:
    如果Queue有一个值可用,则立即返回该值;
    否则如果队列为空,则立即抛出Queue.Empty异常
    from multiprocessing import Process,Queue
    import os,time,random
    
    # 写数据进程执行的代码
    def proc_write(q,urls):
        print ('Process (%s) is writing...' % os.getpid())
        for url in urls:
            q.put(url)
            print('Put %s to queue...' % url)
            time.sleep(random.random() * 3)
    
    # 读数据进程执行的带啊
    def proc_read(q):
        print ('Process (%s) is reading...' % os.getpid())
        while True:
            url = q.get(True)
            print('Get %s from queue' % url)
                
    if __name__=='__main__':
        # 父进程创建Queue,并传给各个子进程
        q = Queue()
        proc_writer1 = Process(target=proc_write,args=(q,['url1','url2','url3']))
        proc_writer2 = Process(target=proc_write,args=(q,['url4','url5','url6']))
        proc_reader = Process(target=proc_read,args=(q,))
        
        # 启动子进程proc_writer,写入:
        proc_writer1.start()
        proc_writer2.start()
        
        # 启动子进程proc_reader,读取:
        proc_reader.start()
        
        # 等待proc_writer结束:
        proc_writer1.join()
        proc_writer2.join()
        
        #proc_reader进程是死循环,无法等待其结束,只能强行终止:
        proc_reader.terminate()
        
    执行结果:
    Process(9968) is writing...
    Process(9512) is writing...
    Put url1 to queue...
    Put url4 to queue...
    Process(1124) is reading...
    Get url1 from queue
    Get url4 from queue
    
    Put url5 to queue...
    Get url5 from queue
    
    Put url2 to queue...
    Get url2 from queue
    
    Put url6 to queue...
    Get url6 from queue
    
    Put url3 to queue...
    Get url3 from queue

     ②.Pipe:用来在两个进程间进行通信,两个进程分别位于管道两端

    Pipe方法返回(conn1,conn2)代表一个管道的两个端

    pipe方法有duplex参数,

      默认值为True,则该管道是全双工模式,即conn1、conn2均可收发

      duplex为False,则conn1只负责接收消息,conn2只负责发送消息

      send方法:发送消息方法

      recv方法:接收消息方法

      全双工模式:调用conn1.send()方法发送消息,conn1.recv接收消息,若无消息可接收,recv方法会一直阻塞;若管道已被关闭,recv方法会报错

    import multiprocessing
    import random
    import os,random
    
    def proc_send(pipe,urls):
        for url in urls:
            print 'Process (%s) send: %s' %(os.getpid(),url)
            pipe.send(url)
            time.sleep(random.random())
            
    def proc_recv(pipe):
        while True:
            print 'Process (%s) rev:%s' %(os.getpid(),pipe.recv())
            time.sleep(random.random())
            
    if __name__=='__main__':
        pipe = multiprocessing.Pipe()
        p1 = multiprocessing.Process(target=proc_send,args=(pipe[0],['url_'+str(i) for i in range(10)]))
        p2 = multiprocessing.Process(target=proc_recv,args=(pipe[1],))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        
    执行结果:
    Process(10448) send:url_0
    Process(5832) rev:url_0
    
    Process(10448) send:url_1
    Process(5832) rev:url_1
    
    Process(10448) send:url_2
    Process(5832) rev:url_2
    
    Process(10448) send:url_3
    Process(5832) rev:url_3
    
    Process(10448) send:url_4
    Process(5832) rev:url_4
    
    Process(10448) send:url_5
    Process(5832) rev:url_5
    
    Process(10448) send:url_6
    Process(5832) rev:url_6
    
    Process(10448) send:url_7
    Process(5832) rev:url_7
    
    Process(10448) send:url_8
    Process(5832) rev:url_8
    
    Process(10448) send:url_9
    Process(5832) rev:url_9
  • 相关阅读:
    spring mvc+ELK从头开始搭建日志平台
    java分布式系统开关功能设计(服务升降级)
    可伸缩性架构常用技术
    大众点评订单系统分库分表实践
    分布式缓存--系列1 -- Hash环/一致性Hash原理
    Netty原理剖析
    一个轻量级分布式 RPC 框架 — NettyRpc
    HDU 2583 permutation
    HDU 2573 Typing
    c语言中逗号运算符和逗号表达式
  • 原文地址:https://www.cnblogs.com/loser1949/p/9215498.html
Copyright © 2011-2022 走看看