zoukankan      html  css  js  c++  java
  • 多任务—进程

    一、进程以及状态

    1. 进程

    程序:例如xxx.py这是程序,是一个静态的

    进程:一个程序运行起来后,代码+用到的资源 称之为进程,它是操作系统分配资源的基本单元。

     

    2. 进程的状态

    工作中,任务数往往大于cpu的核数,即一定有一些任务正在执行,而另外一些任务在等待cpu进行执行,因此导致了有了不同的状态

    • 就绪态:当进程已分配到除CPU以外的所有必要的资源,只要获得处理机便可立即执行,这时的进程状态称为就绪状态。

    • 执行态:当进程已获得处理机,其程序正在处理机上执行,此时的进程状态称为执行状态。

    • 等待态:正在执行的进程,由于等待某个事件发生而无法执行时,便放弃处理机而处于阻塞状态。引起进程阻塞的事件可有多种,例如,等待I/O完成、申请缓冲区不能满足、等待信件(信号)、一个程序sleep等

    3.同步和异步

    所谓同步就是一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列。要么成功都成功,失败都失败,两个任务的状态可以保持一致。

    所谓异步是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了。至于被依赖的任务最终是否真正完成,依赖它的任务无法确定,所以它是不可靠的任务序列。

    4.阻塞与非阻塞

    阻塞和非阻塞这两个概念与程序(线程)等待消息通知(无所谓同步或者异步)时的状态有关。也就是说阻塞与非阻塞主要是程序(线程)等待消息通知时的状态角度来说的

    1. 同步阻塞形式

      效率最低。银行排队。

    1. 异步阻塞形式

      如果在银行等待办理业务的人采用的是异步的方式去等待消息被触发(通知),也就是领了一张小纸条,假如在这段时间里他不能离开银行做其它的事情,那么很显然,这个人被阻塞在了这个等待的操作上面;

      异步操作是可以被阻塞住的,只不过它不是在处理消息时阻塞,而是在等待消息通知时被阻塞。

    1. 同步非阻塞形式

      实际上是效率低下的。

      想象一下你一边打着电话一边还需要抬头看到底队伍排到你了没有,如果把打电话和观察排队的位置看成是程序的两个操作的话,这个程序需要在这两种不同的行为之间来回的切换,效率可想而知是低下的。

    1. 异步非阻塞形式

      效率更高,

      因为打电话是你(等待者)的事情,而通知你则是柜台(消息触发机制)的事情,程序没有在两种不同的操作中来回切换。

      比如说,这个人突然发觉自己烟瘾犯了,需要出去抽根烟,于是他告诉大堂经理说,排到我这个号码的时候麻烦到外面通知我一下,那么他就没有被阻塞在这个等待的操作上面,自然这个就是异步+非阻塞的方式了。

     

    很多人会把同步和阻塞混淆,是因为很多时候同步操作会以阻塞的形式表现出来,同样的,很多人也会把异步和非阻塞混淆,因为异步操作一般都不会在真正的IO操作处被阻塞。

    二、进程的创建

    1.进程的创建-multiprocessing

    multiprocessing模块就是跨平台版本的多进程模块,提供了一个Process类来代表一个进程对象,这个对象可以理解为是一个独立的进程,可以执行另外的事情

     from multiprocessing import Process
     import time
     
     
     def func1():
         while True:
             print("test1")
             time.sleep(1)
     
     def func2():
         while True:
             print("test2")
             time.sleep(1)
     
     
     if __name__ == '__main__':
         p1 = Process(target=func1)
         p2 = Process(target=func2)
         p1.start()
         p2.start()
    创建进程

    说明

    • 创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动

    2. 进程pid

     from multiprocessing import Process
     import time
     import os
     
     
     def func():
         print("子进程的pid:%d" % os.getpid())   # os.getpid 获取当前进程的进程号
         print("子进程的父进程pid:%d" % os.getppid())    # os.getppid 获取当前进程的父进程的进程号
     
     
     if __name__ == "__main__":
         print("父进程pid:%d" % os.getpid())
         p = Process(target=func)
         p.start()
                                                                                                     
    进程pid

     

    3. Process语法结构如下:

    Process([group [, target [, name [, args [, kwargs]]]]])

    • target:如果传递了函数的引用,可以任务这个子进程就执行这里的代码
    • args:给target指定的函数传递的参数,以元组的方式传递
    • kwargs:给target指定的函数传递命名参数
    • name:给进程设定一个名字,可以不设定
    • group:指定进程组,大多数情况下用不到

    Process创建的实例对象的常用方法:

    • start():启动子进程实例(创建子进程)
    • is_alive():判断进程子进程是否还在活着
    • join([timeout]):是否等待子进程执行结束,或等待多少秒
    • terminate():不管任务是否完成,立即终止子进程

    Process创建的实例对象的常用属性:

    • name:当前进程的别名,默认为Process-N,N为从1开始递增的整数
    • pid:当前进程的pid(进程号)

    4. 给子进程指定的函数传递参数

     from multiprocessing import Process
     import time
     import os
     
     
     def func(name, age, **kwargs):
         for i in range(10):
             print("子进程运行中:name=%s,age=%d,pid=%d" % (name, age, os.getpid()))
             print(kwargs)
             time.sleep(0.2)
     
     
     if __name__ == "__main__":
         p = Process(target=func, args=('test', 11), kwargs={"ab":'ab'})
         p.start()
         time.sleep(1)   # 1秒后结束子进程
         p.terminate()
         p.join()
    
    # 运行结果
    子进程运行中:name=test,age=11,pid=4260
    {'ab': 'ab'}
    子进程运行中:name=test,age=11,pid=4260
    {'ab': 'ab'}
    子进程运行中:name=test,age=11,pid=4260
    {'ab': 'ab'}
    子进程运行中:name=test,age=11,pid=4260
    {'ab': 'ab'}
    子进程运行中:name=test,age=11,pid=4260
    {'ab': 'ab'}
    View Code

    5. 进程间不共享全局变量

     from multiprocessing import Process
     import time
     import os
     
     
     num_list = [11,22,33]
     
     def func1():
         print("in func1:pid=%d, num_list=%s" % (os.getpid(), num_list))
         for i in range(3):
             num_list.append(i)
             time.sleep(1)
             print("in func1:pid=%d, num_list=%s" % (os.getpid(), num_list))
     
     
     def func2():
         print("in func2:pid=%d, num_list=%s" % (os.getpid(), num_list))
     
     
     if __name__ == "__main__":
         p1 = Process(target=func1)
         p1.start()
         p1.join()
         p2 = Process(target=func2)
         p2.start()
    
    # 执行结果:
    in func1:pid=4344, num_list=[11, 22, 33]
    in func1:pid=4344, num_list=[11, 22, 33, 0]
    in func1:pid=4344, num_list=[11, 22, 33, 0, 1]
    in func1:pid=4344, num_list=[11, 22, 33, 0, 1, 2]
    in func2:pid=4345, num_list=[11, 22, 33]
    View Code

    6.守护进程

    守护进程会随着主进程的结束而结束。

    主进程创建守护进程

      其一:守护进程会在主进程代码执行结束后就终止

      其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

    注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

     from multiprocessing import Process
     import time
     import os
     
     
     class MyProcess(Process):
     
         def run(self):
             print(os.getpid(), self.name)
     
     
     p = MyProcess()
     p.daemon = True #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p
     即终止运行
     p.start()
     time.sleep(10) # 在sleep时查看进程id对应的进程ps -ef|grep id
     print('')
    守护进程开启
     from multiprocessing import Process
     import time
     
     
     def func1():
         print("123")
         time.sleep(1)
         print("123end")
     
     
     def func2():
         print("456")
         time.sleep(3)
         print("456end")
     
     
     p1 = Process(target=func1)
     p2 = Process(target=func2)
     p1.daemon = True
     p1.start()
     p2.start()
     time.sleep(0.1)
     print("main-------")#打印该行则主进程代码结束,则守护进程p1应该被终止.#可能会有p1任务执行的打印信息123,因为主进程打印main----时,p1也执行了,但是随即被终止.
    
    
    # 执行结果:
    123
    456
    main-------
    456end
    主进程代码执行结束守护进程立即结束

     7.使用多进程实现socket服务端的并发效果

     from multiprocessing import Process
     import socket
     
     
     def server(conn):
         recv_data = conn.recv(1024).decode('utf-8')
         print(recv_data)
         conn.send("抱着妹妹上花轿".encode('utf-8'))
         conn.close()
     
     if __name__ == "__main__":
         tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         tcp_socket.bind(('192.168.1.1',8010))
         tcp_socket.listen(128)
         while True:
             conn, addr = tcp_socket.accept()
             p = Process(target=server, args=(conn,))
             p.start()
    server
     from socket import *
     
     
     # 创建套接字 
     tcp_client_socket = socket(AF_INET, SOCK_STREAM)
     
     # 目的地址
     server_ip = input("服务端ip:")
     server_port = input("服务端端口:")
     
     # 链接服务器
     tcp_client_socket.connect((server_ip, int(server_port)))
     
     # 客户端发送信息
     send_data = input("输入发送的信息:")
     tcp_client_socket.send(send_data.encode('utf-8'))
     
     # 接受服务端发来的信息
     recv_data = tcp_client_socket.recv(1024)
     print("收到的信息:%s" % recv_data.decode('utf-8'))
     
     # 关闭套接字
     tcp_client_socket.close()
    client

     三、进程间通信-Queue

    1. Queue的使用

    可以使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序,首先用一个小实例来演示一下Queue的工作原理:

     from multiprocessing import Queue
     
     
     q = Queue(3)    # 初始哈Queue对象,最多可接手3条put
     q.put(1)
     q.put(2)
     print(q.full())   # False
     q.put(3)
     print(q.full())   # True
     
     
     #因为消息列队已满下面的try都会抛出异常,第一个try会等待2秒后再抛出异常,第二个Try会立刻抛出异常
     try:
         q.put(4, True, 2)
     except:
         print("消息队列已满:现有消息数量%s" % q.qsize())
     
     try:
         q.put_nowait(4)
     except:
         print("消息队列已满:现有消息数量%s" % q.qsize())
     
     #推荐的方式,先判断消息列队是否已满,再写入
     if not q.full():
         q.put_nowait(4)
     
     #读取消息时,先判断消息列队是否为空,再读取
     if not q.empty():
         for i in range(q.qsize()):
             print(q.get_nowait())
                                                  
    View Code

    说明

    初始化Queue()对象时(例如:q=Queue()),若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头);

    • Queue.qsize():返回当前队列包含的消息数量;

    • Queue.empty():如果队列为空,返回True,反之False ;

    • Queue.full():如果队列满了,返回True,反之False;

    • Queue.get([block[, timeout]]):获取队列中的一条消息,然后将其从列队中移除,block默认值为True;

    1)如果block使用默认值,且没有设置timeout(单位秒),消息列队如果为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止,如果设置了timeout,则会等待timeout秒,若还没读取到任何消息,则抛出"Queue.Empty"异常;

    2)如果block值为False,消息列队如果为空,则会立刻抛出"Queue.Empty"异常;

    • Queue.get_nowait():相当Queue.get(False);

    • Queue.put(item,[block[, timeout]]):将item消息写入队列,block默认值为True;

    1)如果block使用默认值,且没有设置timeout(单位秒),消息列队如果已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息列队腾出空间为止,如果设置了timeout,则会等待timeout秒,若还没空间,则抛出"Queue.Full"异常;

    2)如果block值为False,消息列队如果没有空间可写入,则会立刻抛出"Queue.Full"异常;

    • Queue.put_nowait(item):相当Queue.put(item, False);

    2. Queue实例

    我们以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

     from multiprocessing import Process, Queue
     import time,random
     
     
     # 写数据进程执行的代码:
     def write(q):
         for i in range(3):
             print("puting %d in queue" % i)
             q.put(i)
             time.sleep(random.random())
     
     
     def read(q):
         while True:
             if not q.empty():
                 print("%i reading in queue" % q.get(True))
                 time.sleep(random.random())
             else:
                 break
     
     if __name__ == "__main__":
         q = Queue()
         wp = Process(target=write, args=(q,))
         rp = Process(target=read, args=(q,))
     
         wp.start()   # 启动子进程wq,写入:
         wp.join()    # 等待pw结束:
     
         rp.start()
         rp.join()
         print('所有数据都写入并且读完')
    View Code

    四、进程池Pool

    当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态成生多个进程,但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocessing模块提供的Pool方法。

    初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务,请看下面的实例:

     from multiprocessing import Process, Pool
     import time, os, random
     
     
     def func(num):
         start_time = time.time()
         print("%d开始执行,进程号%s" %(num,os.getpid()))
         time.sleep(random.random())
         end_time = time.time()
         print("%d执行完毕,耗时%0.2f" % (num,end_time-start_time))
     
     
     po = Pool(3)
     for i in range(10):
         # Pool().apply_async(要调用的目标,(传递给目标的参数元祖,))
         # 每次循环将会用空闲出来的子进程去调用目标
         po.apply_async(func, (i,))
     
     print("--start--")
     po.close()   # 关闭进程池,关闭后po不再接收新的请求
     po.join()   # 等待po中所有子进程执行完成,必须放在close语句之后
     print("--end--")
    
    
    # 执行结果:
    --start--
    0开始执行,进程号4333
    1开始执行,进程号4334
    2开始执行,进程号4332
    0执行完毕,耗时0.65
    3开始执行,进程号4333
    1执行完毕,耗时0.85
    4开始执行,进程号4334
    2执行完毕,耗时0.95
    5开始执行,进程号4332
    4执行完毕,耗时0.43
    6开始执行,进程号4334
    5执行完毕,耗时0.41
    7开始执行,进程号4332
    3执行完毕,耗时0.85
    8开始执行,进程号4333
    6执行完毕,耗时0.51
    9开始执行,进程号4334
    9执行完毕,耗时0.24
    8执行完毕,耗时0.54
    7执行完毕,耗时0.75
    --end--
    View Code

    multiprocessing.Pool常用函数解析:

    • apply_async(func[, args[, kwds]]) :使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args为传递给func的参数列表,kwds为传递给func的关键字参数列表;
    • close():关闭Pool,使其不再接受新的任务;
    • terminate():不管任务是否完成,立即终止;
    • join():主进程阻塞,等待子进程的退出, 必须在close或terminate之后使用;

    进程池中的Queue

    如果要使用Pool创建进程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否则会得到一条如下的错误信息:

    RuntimeError: Queue objects should only be shared between processes through inheritance.

    下面的实例演示了进程池中的进程如何通信:

     from multiprocessing import Manager, Pool
     import os, time, random
     
     
     def write(q):
         print("write启动(%s),父进程(%s)" % (os.getpid(), os.getppid()))
         for i in range(3):
             q.put(i)
     
     def read(q):
         print("read启动(%s), 父进程(%s)" % (os.getpid(), os.getppid()))
         for i in range(q.qsize()):
             print("正在从队列读取消息%d" % q.get())
     
     if __name__ == "__main__":
         print("start:%s" % os.getpid())
         q = Manager().Queue()
         po = Pool()
         po.apply_async(write, (q,))
         time.sleep(1)   # 先让上面的任务向Queue存入数据,然后再让下面的任务开始从中取数据
     
         po.apply_async(read, (q,))
     
         po.close()
         po.join()
         print("end:%s" % os.getpid())
    
    # 执行结果:
    start:4586
    write启动(4592),父进程(4586)
    read启动(4592), 父进程(4586)
    正在从队列读取消息0
    正在从队列读取消息1
    正在从队列读取消息2
    end:4586
    View Code

    应用:文件夹copy器(多进程版)

     import multiprocessing, time, os, random
     
     
     def copy_file(queue, file_name, need_copy_file, new_file):
         f_read = open(need_copy_file + '/' + file_name, 'rb')
         f_write = open(new_file + '/' + file_name, 'wb')
         while True:
             time.sleep(random.random())
             content = f_read.read()
             if content:
                 f_write.write(content)
             else:
                 break
         f_read.close()
         f_write.close()
         # 发送已经拷贝完毕的文件名字
         queue.put(file_name)
     
     
     if __name__ == "__main__":
     
         # 获取想要copy的文件夹
         need_copy_file = input("输入文件名:")
         # 新的文件夹名称
         new_file = need_copy_file + "[副本]"
         # 创建目标文件夹
         try:
             os.mkdir(new_file)
         except:
             print("创建文件失败")
         # 获取这个文件夹中所有的普通文件名
         file_names = os.listdir(need_copy_file)
     
         # 创建队列
         queue = multiprocessing.Manager().Queue()
         # 创建进程池
         pool = multiprocessing.Pool(3)
         # 向进程池中添加任务
         for file_name in file_names:
             pool.apply_async(copy_file, (queue, file_name, need_copy_file, new_file))
         pool.close()
         # 主进程显示进度
         all_file_nums = len(file_names)
         while True:
             file_name = queue.get()
             if file_name in file_names:
                 file_names.remove(file_name)
             copy_rate = (all_file_nums - len(file_names))*100/all_file_nums
             print("
    %.2f...(%s)" % (copy_rate, file_name) + " "*50, end="")
             if copy_rate >= 100:
                 print("
    copy完成")
                 break
    View Code
  • 相关阅读:
    MongoDB
    Redis:C#使用Redis
    最大化 AIX 上的 Java 性能,第 4 部分: 监视流量
    最大化 AIX 上的 Java 性能,第 3 部分: 更多就是更好
    最大化 AIX 上的 Java 性能,第 2 部分: 速度需求
    最大化 AIX 上的 Java 性能,第 1 部分: 基础
    阿里面试重点总结
    函数库学习入门指引
    OracleAWR删除历史快照说明
    ActiveMQ实现负载均衡+高可用部署方案(转)
  • 原文地址:https://www.cnblogs.com/ForT/p/10673666.html
Copyright © 2011-2022 走看看