zoukankan      html  css  js  c++  java
  • 0704 Process继承实现多进程、Pool进程池,进程间通过队列通信,Pool实现多进程实现复制文件

    通过继承的方式,实现Process多进程

     1 from multiprocessing import Process
     2 import time
     3 
     4 class MyNewProcess(Process):
     5     def run(self):
     6         for i in range(10):
     7             print("----run----")
     8             time.sleep(1)
     9 
    10 
    11 if __name__ == "__main__":
    12     p = MyNewProcess()
    13 
    14     p.start()        # Process 中的 start 方法会主动调用 run 方法
    15 for i in range(10): print("---main---") time.sleep(1)

    Pool 进程池实现多进程

     1 import time
     2 from multiprocessing import Pool
     3 
     4 def worker():
     5     for i in range(10):
     6         print("From worker %s"%i)
     7         time.sleep(0.5)
     8 
     9 def foo():
    10     for i in range(10):
    11         print("From foo %s"%i)
    12         time.sleep(0.5)
    13 
    14 def bar():
    15     for i in range(10):
    16         print("From bar %s"%i)
    17         time.sleep(0.5)
    18 
    19 if __name__ == "__main__":
    20     pool = Pool(3)            # 创建三个 进程
    21     pool.apply_async(worker)
    22     pool.apply_async(foo)
    23     pool.apply_async(bar)
    24 
    25     pool.close()                  # 关闭进程池,禁止添加任务
    26     pool.join()          # 等待子进程结束后,主进程才往下走
    27     print("Is done...")
    28 
    29 
    30 
    31 # process and Pool 最后都是调用 fork
    32 # 通常情况下,主进程一半用来等低啊,,,,真正的任务子进程中执行

    Queue队列的简单使用

     1 from multiprocessing import Queue
     2 
     3 q = Queue(3)    # 初始化一个Queue对象,最多可以put三条信息,如果不写3,那么久无限制
     4 
     5 q.put("Message01")         # 添加信息的方法
     6 q.put("Message02")
     7 print(q.full())          # 查看 队列 是否满了的方法
     8 
     9 q.put("Message03")
    10 print(q.full())
    11 
    12 # 因为队列已经满了,所以下面的消息会出现异常,第一个 try 会等待2秒后再抛出异常,
    13 # 第二个 try 会立刻抛出异常
    14 try:
    15     q.put("Message04", True, 2)
    16 except:
    17     print("消息队列已满,现有消息数量:%s"%q.qsize())
    18 
    19 try:
    20     q.put_nowait("Message04")
    21 except:
    22     print("消息队列已满,现有消息数量:%s"%q.qsize())
    23 
    24 # 推荐使用的方式,先判断队列是否已满,再写入
    25 if not q.full():
    26     q.put_nowait("Message04")
    27 
    28 # 读取消息的时候,先判断消息队列是否为空,再读取
    29 if not q.empty():
    30     for i in range(q.qsize()):
    31         print(q.get_nowait())

    Process配合Queue实现进程间通信

     1 from multiprocessing import Process, Queue
     2 import time, random
     3 
     4 # 写数据进程执行的代码
     5 def write(q):
     6     for value in ['a', 'b', 'c']:
     7         print("Put %s to queue..."%value)
     8         q.put(value)
     9         time.sleep(random.random())
    10 
    11 # 读数据进程的代码
    12 def read(q):
    13     while True:
    14         if not q.empty():
    15             value = q.get(True)
    16             print("Get %s from queue..."%value)
    17             time.sleep(random.random())
    18         else:
    19             break
    20 
    21 if __name__ == "__main__":
    22     # 父进程创建 Queue, 并传给各个子进程
    23     q = Queue()
    24     pw = Process(target=write, args=(q, ))
    25     pr = Process(target=read, args=(q, ))
    26 
    27     # 启动写入子进程,并等待结束
    28     pw.start()
    29     pw.join()
    30 
    31     # 启动读取子进程,并等待结束
    32     pr.start()
    33     pr.join()

    进程池与队列合作实现进程间通信

     1 # 修改 import 中的 Queue 为 Manager
     2 from multiprocessing import Manager, Pool
     3 import os
     4 
     5 
     6 def reader(q):
     7     print("reader启动(%s),父进程为(%s)"%(os.getpid(), os.getppid()))
     8     for i in range(q.qsize()):
     9         print("reader从Queue获取到消息:%s"%q.get(True))
    10 
    11 def writer(q):
    12     print("writer启动(%s),父进程为(%s)"%(os.getpid(), os.getppid()))
    13     for i in "Always":
    14         q.put(i)
    15 
    16 
    17 if __name__ == "__main__":
    18     print("(%s) start"%os.getpid())
    19     q = Manager().Queue()  # 使用Manager中的Queue来初始化
    20     po = Pool()
    21     # 使用阻塞模式创建进程,这样就不需要咋reader中使用死循环了,可以让writer完全执行后,再reader
    22     po.apply(writer, (q,))
    23     po.apply(reader, (q,))
    24 
    25     po.close()
    26     po.join()
    27     print("(%s) End" % os.getpid())

    利用Pool进程池实现简单的文件复制

     1 import os
     2 import time
     3 from multiprocessing import Pool
     4 
     5 def copyFile(oldPath, newPath, fileName):
     6     print("%s 准备复制中。。。"%fileName)
     7     with open("%s\%s"%(oldPath, fileName), 'r') as fr, open("%s\%s"%(newPath, fileName), 'w') as fw:
     8         for line in fr:
     9             fw.write(line)
    10     time.sleep(1)
    11 
    12 
    13 if __name__ == "__main__":
    14     oldPath = r"file"
    15     newPath = r"file-副本"
    16 
    17     os.mkdir(newPath)
    18 
    19     pool = Pool(5)
    20 
    21     fileList = os.listdir(oldPath)
    22 
    23     for fileName in fileList:
    24         pool.apply_async(copyFile, (oldPath, newPath, fileName))
    25 
    26     pool.close()
    27 
    28     pool.join()
    29 
    30     print("文件复制完成....")
  • 相关阅读:
    递归
    递归
    递归
    San Francisco Crime Classification非数值性多分类问题
    kaggle入门题Titanic
    二叉树的前序,中序,后序,层序遍历的递归和非递归实现
    排序算法总结
    [LeetCode]148. Sort List链表归并排序
    [LeetCode]141. Linked List Cycle判断循环链表
    [leetcode]61. Rotate List反转链表k个节点
  • 原文地址:https://www.cnblogs.com/alwaysInMe/p/7119021.html
Copyright © 2011-2022 走看看