zoukankan      html  css  js  c++  java
  • 多进程

    #单进程
    
    #!/usr/bin/python
    # -*- coding: utf-8 -*-
    from multiprocessing import Process,Pool
    import time,os
    def fs(name,seconds):
      print '%s will sleep: %d s,Now is %s ,fs pid is: %s' % (name,seconds,time.ctime(),os.getpid())
      time.sleep(seconds)
      print 'Now is: %s' % (time.ctime())
      return 'fs' + name
    
    if __name__ == '__main__':
      print 'main Pname is %s' % (os.getpid())
      for i in range(1,11):
        p=Process(target=fs,args=(str(i),2))
        p.start()
        p.join()
      print 'main end: ' + str(os.getpid())
    #定义最大进程数量,Pool默认大小为CPU核心数量
    
    #!/usr/bin/python
    # -*- coding: utf-8 -*-
    from multiprocessing import Process,Pool
    import time,os
    def fs(name,seconds):
      print '%s will sleep: %d s,Now is %s ,fs pid is: %s' % (name,seconds,time.ctime(),os.getpid())
      time.sleep(seconds)
      print 'Now is: %s' % (time.ctime())
      return 'fs' + name
    
    if __name__ == '__main__':
      print 'main Pname is %s' % (os.getpid())
      p=Pool(processes=3)  #定义最多开启3个进程
      for i in range(1,11):
        p.apply_async(fs,args=(str(i),2))   #如果fs只接受一个参数,则写法为 p.apply_async(fs,args=(a1,))
      p.close()
      p.join()
      print 'main end: ' + str(os.getpid())
    #获取每个进程的执行结果
    
    from multiprocessing import Process,Pool
    import time,os
    def fs(name,seconds):
      print '%s will sleep: %d s,Now is %s ,fs pid is: %s' % (name,seconds,time.ctime(),os.getpid())
      time.sleep(seconds)
      print 'Now is: %s' % (time.ctime())
      return 'fs' + name
    
    if __name__ == '__main__':
      print 'main Pname is %s' % (os.getpid())
      p=Pool(processes=3)
      #result只能获取函数fs的return语句的结果,与fs中的print无关
      result=[]
      for i in range(1,11):
        print 'i is: %d' % (i)
        result.append(p.apply_async(fs,args=(str(i),2)))
      p.close()
      p.join()
      for r in result:
        print r.get()
      print 'main end: ' + str(os.getpid())

     Win32平台添加如下代码,防止多进程崩溃
     

    from multiprocessing import Process, freeze_support
    
    if __name__ == '__main__':
        freeze_support()

    p.start()来启动子进程

    p.join()方法来使得子进程运行结束后再执行父进程

    示例:

    ping多个域名:

    def fping(ip):
        import subprocess,sys
        reload(sys)
        sys.setdefaultencoding('utf-8')
        sc = subprocess.Popen(['ping.exe',ip,'-n','2'],shell=True,stdout=subprocess.PIPE)
        while sc.poll() == None:
            sclines = sc.stdout.readlines()
            for l in sclines:
                #return l.strip().decode('GBK')
                print l.strip().decode('GBK')
    
    
    if __name__ == '__main__':
        from multiprocessing import Process,Pool,freeze_support,Queue
        freeze_support()
        ips=['www.baidu.com','www.163.com','www.sina.com.cn','www.cctv.com','www.xin.com','apollo.youxinpai.com']
        p=Pool(processes=4) 
        for ip in ips:
            p.apply_async(fping,args=(ip,))
        p.close()
        p.join()

    函数fping通过使用return取得返回结果:

    def fping(ip):
        import subprocess,sys
        reload(sys)
        sys.setdefaultencoding('utf-8')
        sc = subprocess.Popen(['ping.exe',ip,'-n','1'],shell=True,stdout=subprocess.PIPE)
        while sc.poll() == None:
            sclines = sc.stdout.readlines()
            prs =''
            for l in sclines:
                #return l.strip().decode('GBK')
                prd = l.strip().decode('GBK') +'
    '
                prs += prd
            return prs.strip()
    
    print fping('www.baidu.com')

    https://docs.python.org/2/library/multiprocessing.html

    共享内存变量 multiprocessing.Queue/Array,效率高于manager()

    from multiprocessing import Process, Queue
    
    def f(q):
        q.put([42, None, 'hello'])
    
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=f, args=(q,)) #q可以传递到函数中,但通过applay_async的方式传递不进去。另这个Queue()有大小限制,大了的话程序就假死。
        p.start()
        print q.get()    # prints "[42, None, 'hello']" 
        p.join()
    import multiprocessing
    from multiprocessing import Process, Value, Array
      
    def f(n, a):
        n.value   = 3.14
        a[0]      = 5
    
    if __name__ == '__main__':
      
        num   = multiprocessing.Value('d', 0.0)
        arr   = multiprocessing.Array('i', range(10))
        #arr = Array('c', 'oaaaaaaaaaaaaaa') #字符串类型
          
        p = multiprocessing.Process(target=f, args=(num, arr))
        p.start()
        p.join()
          
        print num.value #返回3.14
        print arr[:] #返回[5, 1, 2, 3, 4, 5, 6, 7, 8, 9]

    使用共享变量list实现真正的多进程并发:

    # -*- coding: UTF-8 -*-
    from multiprocessing import Process,Pool,freeze_support,Manager
    import subprocess,sys,time
    reload(sys)
    sys.setdefaultencoding('utf-8')
    
    def fping(ip,ls):
        sc = subprocess.Popen(['ping.exe',ip,'-n','1'],shell=True,stdout=subprocess.PIPE)
        while sc.poll() == None:
            sclines = sc.stdout.readlines()
            ls.append(sclines)
    
    if __name__ == '__main__':
        freeze_support()
        #使用Manager()在多进程间共享变量ls
        manager = Manager()
        ls = []
        ls = manager.list()
    
        ips=['www.baidu.com','www.163.com','www.sina.com.cn','www.cctv.com','www.xin.com','apollo.youxinpai.com']
        
        p=Pool(processes=4)  #定义进程数量
        for ip in ips:
            p.apply_async(fping,args=(ip,ls)) 
    
        p.close()
        p.join()
        
        #将结果写入到文本文件中
        fpingfile = 'e:\ping.txt'
        fw = open(fpingfile,'a')
        for lrs in ls:
            for lr in lrs:
                fw.write(lr.strip().decode('GBK') + '
    ')
        fw.close()

    共享变量使用dict:

    def testfunc(key,value,ls):
        ls[key]=value
        print 'process id: ',os.getpid()
    
    if __name__ == '__main__':
    
        freeze_support()
        manager = Manager()
    
        ls = Manager().dict()
        p=Pool(processes=8)
        for ll in range(10):
            lld = ll+1
            p.apply_async(testfunc,args=(ll,lld,ls))
    
        p.close()
        p.join()
        print 'main end,main process id: ',os.getpid()
        print ls
    返回:{0: 1, 1: 2, 2: 3, 3: 4, 4: 5, 5: 6, 6: 7, 7: 8, 8: 9, 9: 10}

     共享变量使用value:

    def testfunc(cc,ls,lock):
        #lock.acquire() #效果同with lock写法
        with lock:
            ls.value +=cc
        #lock.release()
        print 'process id: ',os.getpid(),'  ',ls.value
    
    if __name__ == '__main__':
        freeze_support()
        manager = Manager()
        lock = manager.Lock() #使用multiprocessing中导入的Lock,经测试无法使用,定义为global,或者通过args传递均无法使用
    
        ls = Manager().Value('tmp',0)
    
        p=Pool() #processes=2
        for ll in range(6):
            p.apply_async(testfunc,args=(ll,ls,lock))
    
        p.close()
        p.join()
        print 'main end,main process id: ',os.getpid()
        print ls.value #返回15

     共享变量Queue的应用:(先进先出,测试的时候,如果不加锁,也能正常put进queue。因为多进程中的queue有安全机制,所以不用加lock)

    # 写数据进程执行的代码:
    def write(q,lock,value):
        #with lock:
        lock.acquire() #加上锁
        print 'Put %s to queue...' % value
        time.sleep(0.2)        
        q.put(value)        
        lock.release() #释放锁  
    
    # 读数据进程执行的代码:
    def read(q):
        while True:
            #not q.empty():
            #value = q.get_nowait()
    try:
          q.get(timeout=3) #3秒后还取不到数据则抛出Queue.empty。用该参数变相结束read进程。
    print 'Get %s from queue.queue size is %s' % (value,q.qsize()) time.sleep(0.5)
    except:
    break
    if __name__=='__main__': freeze_support() manager = Manager() # 父进程创建Queue,并传给各个子进程: q = manager.Queue() lock = manager.Lock() #初始化一把锁 p = Pool() for ll in range(10): pw = p.apply_async(write,args=(q,lock,ll))   pr = p.apply_async(read,args=(q,)) #read并发多个进程。
    p.close() p.join()
    print print 'done'

     1.

    if __name__ == '__main__':
        #global q
        freeze_support()
        q=Manager().Queue()
        p=Pool()
        p2=Pool(2)
        for ll in range(10):
            pw = p.apply_async(write,args=(ll,q))
        #read可以使用另一个进程池,也可以共享queue
        p2.apply_async(read,args=(q,)) #read并发一个进程
        p2.close()
        p.close()
        p.join()
        p2.join()

    2.

    if __name__ == '__main__':
        #global q
        freeze_support()
        q=Manager().Queue()
        p=Pool()
        p2=Pool(2)
        for ll in range(10):
            pw = p.apply_async(write,args=(ll,q))
        #read可以另起一个进程,也可以共享queue
        p1=Process(target=read,args=(q,)) #read一个进程
        p1.start()
        p1.join()

     3.

        for i in range(3):
            p = Process(target=write2,args=(i,q))
            threads.append(p)
            p.start()
    
        for t in threads:
            t.join()

     通过class派生类:

    import multiprocessing
    class Worker(multiprocessing.Process):
    def run(self):
            print 'In %s' % self.name
            return
    
    if __name__ == '__main__':
        jobs = []
        for i in range(5):
            p = Worker()
            jobs.append(p)
            p.start()
        for j in jobs:
            j.join()
    import multiprocessing, Queue
    import os
    import time
    from multiprocessing import Process
    from time import sleep
    from random import randint
    
    class Producer(multiprocessing.Process):
        def __init__(self, queue):
            multiprocessing.Process.__init__(self)
            self.queue = queue
            
        def run(self):
            for i in range(10):
                value = self.queue.put(i)
                print multiprocessing.current_process().name + 'is putting ' + str(i) + ' ' + str(os.getpid())
                sleep(randint(1, 3))
            
            
    class Consumer(multiprocessing.Process):
        def __init__(self, queue):
            multiprocessing.Process.__init__(self)
            self.queue = queue
            
        def run(self):
            while True:
                d = self.queue.get(timeout=1)
                if d != None:
                    print multiprocessing.current_process().name + 'is getting ' + str(d) + ' ' +str(os.getpid())
                    sleep(randint(1, 4))
                    continue
                else:
                    break
                    
    #create queue
    queue = multiprocessing.Queue(10)
           
    if __name__ == "__main__":
        print 'start'
        #create processes    
        threads = []
        for i in range(3):
            pp = Producer(queue)
            pc = Consumer(queue)
            threads.append(pp)
            threads.append(pc)
            pp.start()
            pc.start()
        
        for t in threads:
            t.join()        
    
    返回:
    start
    Producer-1is putting 0 65620
    Producer-3is putting 0 49132
    Consumer-6is getting 0 58876
    Consumer-4is getting 0 55396
    Producer-5is putting 0 63544
    Consumer-2is getting 0 62548
    Producer-1is putting 1 65620
    Producer-3is putting 1 49132
    Consumer-6is getting 1 58876
    Producer-5is putting 1 63544
    Consumer-2is getting 1 62548
    Producer-3is putting 2 49132
    Producer-1is putting 2 65620
    Producer-3is putting 3 49132
    Consumer-6is getting 1 58876
    Consumer-4is getting 2 55396
    Producer-5is putting 2 63544
    Consumer-2is getting 2 62548
    Producer-5is putting 3 63544
    Producer-3is putting 4 49132
    Consumer-4is getting 3 55396
    Producer-5is putting 4 63544
    Consumer-2is getting 2 62548
    Producer-1is putting 3 65620
    Consumer-4is getting 3 55396
    Producer-3is putting 5 49132
    Consumer-6is getting 4 58876

     经测试:Manage().list()或Queue()在使用过程中效率远低于global变量。

    多进程间共享变量:

    http://www.tuicool.com/articles/ZZri22

    http://www.cnblogs.com/itech/archive/2012/01/10/2318120.html

  • 相关阅读:
    java程序后台报错java.net.SocketException: Too many open files
    linux中,查看某个命令是来自哪个RPM包或者是通过哪个RPM包安装的
    Oracle卸载之linux快速卸载rac脚本-一键卸载
    40个DBA日常维护的SQL脚本
    Oracle SQL开发 之 Select语句完整的执行顺序
    Oracle开发 之 主-外键约束FK及约束的修改
    drop user 报错ora-00604
    oracle Bug 4287115(ora-12083)
    Ora-1157 ora-1110错误解决案例一枚
    rac库grid目录权限(6751)导致数据库宕机案例 此方法仅用于紧急救助
  • 原文地址:https://www.cnblogs.com/dreamer-fish/p/5132917.html
Copyright © 2011-2022 走看看