zoukankan      html  css  js  c++  java
  • 20、Python之进程,协程,I/O

    一、进程

        进程的使用与线程基本差不多,但由于进程之间的资源是无法共享的,从而引发出进程同步,进程通信等一系列的概率,首先我们来看一下,python中创建进程的2中方法。

    1、直接调用

    1 import multiprocessing
    2 
    3 def run(n):
    4     print("process %s is runing" % n)
    5 if __name__ == '__main__':
    6     p = multiprocessing.Process(target=run,args=(1,))
    7     p.start()
    View Code

    2、集成式调用

     1 import multiprocessing
     2 
     3 class MyProcess(multiprocessing.Process):
     4     def __init__(self,process_name):
     5         self.__process_name = process_name
     6         super(MyProcess,self).__init__()
     7     def run(self):
     8         print("%s is running" % self.__process_name)
     9 if __name__ == '__main__':
    10     p = MyProcess("myprocess")
    11     p.start()
    View Code

    关于进程的其他用法与线程基本一样,这里不再啰嗦。

        进程间的通信

    不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:

    1、进程Queue用法与线程的使用一样,但该为进程的Queue,与线程的Queue不同

     1 #Author gwx
     2 import multiprocessing
     3 from multiprocessing import Queue
     4 # from queue import Queue #这个导入的线程的队列
     5 class MyProcess(multiprocessing.Process):
     6     def __init__(self,process_name,q):
     7         self.__process_name = process_name
     8         self.__q = q
     9         super(MyProcess,self).__init__()
    10     def run(self):
    11         self.__q.put(self.__process_name)
    12 
    13 if __name__ == '__main__':
    14     q = Queue()
    15     q.put("mainprocess")
    16     p1 = MyProcess("myprocess",q)
    17     p2 = MyProcess("yourprocess",q)
    18     p1.start()
    19     p2.start()
    20     p1.join()
    21     p2.join()
    22     while q.qsize() > 0:
    23         print(q.get())
    View Code

    线程的queue是可以随时访问的,但进程的不可以的,所以进程的Queue是专门解决这个问题的,其本质上如图所示。

     进程1和进程2进行操作queue时,都会取第三方内存中queue的数据,操作完之后再将数据的备份放回第三方内存中,本质上是以第三方的队列做一个桥梁,咱们后面学习到的rabbitMQ也是这个原理。

    2、管道(Pipe)。Pipe也可以用于进程之间的通信,需要通信的2个进程分别位于管道的2端,通过send和recv方法进行发收数据。

     1 from multiprocessing import Process,Pipe
     2 
     3 def run(conn):
     4     conn.send("你好,我是子进程")
     5     print(conn.recv())
     6 if __name__ == '__main__':
     7     parent_conn,child_conn = Pipe()
     8     parent_conn.send("你好我是父进程")
     9     p = Process(target=run,args=(child_conn,))
    10     p.start()
    11     p.join()
    12     print(parent_conn.recv())
    View Code

    3、Managers。该类也能用于进程之间的通信,用法如下:

     1 from multiprocessing import Process,Manager
     2 import os
     3 def run(d,l):
     4     d[os.getpid()] = os.getpid()
     5     l.append(os.getpid())
     6 if __name__ == '__main__':
     7     with Manager() as manager:
     8         manager = Manager()
     9         d = manager.dict()
    10         l = manager.list()
    11 
    12         p_list = []
    13         for i in range(10):
    14             p = Process(target=run,args=(d,l))
    15             p.start()
    16             p_list.append(p)
    17         for pp in p_list:
    18             pp.join()
    19         print(d)
    20         print(l)
    View Code

        进程同步

    我们说进程之间的内存空间是不可以相互访问的,所以对于内存空间而言,进程之前是不存在加锁这一说法的,但是对于I/O而言,它对于所有进程来说是共享资源,需要加锁,当然这里的锁引用的也是进程的锁而不是线程的锁,案例如下:

     1 from multiprocessing import Process,Lock
     2 
     3 def run(l):
     4     l.acquire()
     5     print("hello multiprocessing")
     6     l.release()
     7 if __name__ == '__main__':
     8     l = Lock()
     9     for i in range(10):
    10         p = Process(target=run,args=(l,))
    11         p.start()
    View Code

        进程池

    进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

    进程池中有两个方法:

    • apply  
    • apply_async
     1 from multiprocessing import Process,Pool
     2 import time
     3 def run(i):
     4     print("hello multiprocessing",i)
     5     time.sleep(1)
     6 if __name__ == '__main__':
     7     pool = Pool(5)
     8     for i in range(10):
     9         # pool.apply(run,args=(i,))
    10         pool.apply_async(run,args=(i,))
    11     pool.close()
    12     pool.join()  #这块不知道为啥
    View Code

    二、协程

         协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程

    协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

    协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

    协程的好处:

    • 无需线程上下文切换的开销
    • 无需原子操作锁定及同步的开销
      •   "原子操作(atomic operation)是不需要synchronized",所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。
    • 方便切换控制流,简化编程模型
    • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。

    缺点:

    • 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
    • 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

    一个标准的协程应该符合以下几点:

    1. 必须在只有一个单线程里实现并发
    2. 修改共享数据不需加锁
    3. 用户程序里自己保存多个控制流的上下文栈
    4. 一个协程遇到IO操作自动切换到其它协程

        Greenlet:greenlet是一个用C实现的协程模块,相比与python自带的yield,它可以使你在任意函数之间随意切换,使用方法如下:

     1 from greenlet import greenlet
     2 
     3 def test1():
     4     print("11111111")
     5     gr2.switch()
     6     print("33333333")
     7     gr2.switch()
     8 def test2():
     9     print("22222222")
    10     gr1.switch()
    11     print("44444444")
    12 
    13 gr1 = greenlet(test1)
    14 gr2 = greenlet(test2)
    15 gr1.switch()
    View Code

    看上面的例子发现,greenlet不能实现自动切换,每次都需要switch来进行切换。

        Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。使用方法如下:

     1 import gevent
     2 
     3 def test1():
     4     print("11111111")
     5     gevent.sleep(2)
     6     print("33333333")
     7 def test2():
     8     print("22222222")
     9     gevent.sleep(1)
    10     print("44444444")
    11 
    12 gevent.joinall([gevent.spawn(test1),gevent.spawn(test2)])
    View Code

          joinall就是等待多个协程执行完毕。

         使用协程对性能的提升情况,看下面这段代码。

     1 import gevent
     2 
     3 def task(pid):
     4     gevent.sleep(0.5)
     5     print('Task %s done' % pid)
     6 
     7 def synchronous():
     8     for i in range(1, 10):
     9         task(i)
    10 
    11 
    12 def asynchronous():
    13     threads = [gevent.spawn(task, i) for i in range(10)]
    14     gevent.joinall(threads)
    15 
    16 
    17 print('Synchronous:')
    18 synchronous()
    19 
    20 print('Asynchronous:')
    21 asynchronous()
    View Code

        一个简单的爬虫小程序

     1 from urllib import request
     2 import gevent
     3 from gevent import monkey
     4 
     5 monkey.patch_all() #把当前所有的程序的io操作给我单独的做上记号
     6 def f(url):
     7     print("Get:%s" % url)
     8     resp = request.urlopen(url)
     9     with open("url.html","wb") as file:
    10         for line in resp.readlines():
    11             file.write(line)
    12 
    13 gevent.joinall([
    14     gevent.spawn(f,"http://www.cnblogs.com/win0211/category/1154652.html"),
    15     gevent.spawn(f,"http://www.cnblogs.com/win0211/p/8549921.html")
    16               ])
    View Code

        使用多协程实现socket并发。

    服务端代码:

     1 import socket
     2 from gevent import monkey
     3 import gevent
     4 
     5 monkey.patch_all()#告诉编译器采用多协程的方式处理
     6 def sever_handle(conn):
     7     while True:
     8         data = conn.recv(1024)
     9         print(data)
    10         conn.send(data)
    11     conn.close()
    12 
    13 server = socket.socket()
    14 server.bind(("localhost",100))
    15 while True:
    16     server.listen()
    17     conn,addre = server.accept()
    18     gevent_handle = gevent.spawn(sever_handle,conn)#来一个连接,开启一个协程
    19     # gevent.joinall() 因为这里是while True所以不需要joinall
    View Code

    客户端代码:

     1 import socket,threading,time
     2 def client_01(i):
     3     time.sleep(2)
     4     client = socket.socket()
     5     client.connect(("localhost",100))
     6     while True:
     7         data = input(">>")
     8         client.send(data.encode("utf-8"))
     9         data = client.recv(1023)
    10         print(data)
    11     client.close()
    12 
    13 client_01(1)
    View Code

    三、I/O

        对于一次IO访问(以read举例),数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说,当一个read操作发生时,它会经历两个阶段:
    1. 等待数据准备 (Waiting for the data to be ready)
    2. 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)

    正式因为这两个阶段,linux系统产生了下面五种网络模式的方案。
    - 阻塞 I/O(blocking IO)
    - 非阻塞 I/O(nonblocking IO)
    - I/O 多路复用( IO multiplexing)
    - 信号驱动 I/O( signal driven IO)
    - 异步 I/O(asynchronous IO)

    注:由于signal driven IO在实际中并不常用,所以我这只提及剩下的四种IO Model。

    阻塞 I/O(blocking IO)

    在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:

     

    非阻塞 I/O(nonblocking IO)

    linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:

    I/O 多路复用( IO multiplexing)

    IO multiplexing就是我们说的select,poll,epoll,有些地方也称这种IO方式为event driven IO。select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select,poll,epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。

    异步 I/O(asynchronous IO)

    inux下的asynchronous IO其实用得很少。先看一下它的流程:

    使用select实现I/O多路复用(socket服务端)。

     1 import select,socket,queue
     2 
     3 server = socket.socket()
     4 
     5 server.bind(("localhost",999))
     6 
     7 server.listen(10000)
     8 
     9 server.setblocking(False) #设置为不阻塞
    10 inputs = []
    11 outputs = []
    12 msg_dict = {}
    13 inputs.append(server)#将service加入
    14 
    15 while True:
    16     print("...........................")
    17     readable,writeable,exceptional = select.select(inputs,outputs,inputs)#如果没有数据 则阻塞
    18     for r in readable:
    19         if r is server:
    20             print("来了一个新连接")
    21             conn,address = r.accept()
    22             conn.setblocking(False)
    23             inputs.append(conn)
    24             msg_dict[conn] = queue.Queue()
    25         else:
    26             print("准备收数据")
    27             data = r.recv(1024)
    28             print(data)
    29             msg_dict[r].put(data)
    30             outputs.append(r)
    31     for w in writeable:   #
    32         print("准备发数据")
    33         w.send(msg_dict[w].get())
    34         outputs.remove(w)
    35     for e in exceptional:
    36         print("出错")
    37         if e in outputs:
    38             outputs.remove(e)
    39         inputs.remove(e)
    40         del msg_dict[e]
    41 
    42 server.close()
    View Code

    使用selectors实现I/O多路复用(socket服务端),selectors是对select的封装,功能更强大。实例代码如下:

     1 import  selectors #封装了select的操作
     2 import socket
     3 
     4 sel = selectors.DefaultSelector()#进行默认配置
     5 
     6 server = socket.socket()
     7 server.bind(("localhost",999))
     8 server.setblocking(False)
     9 server.listen(10000)
    10 def read(conn,mask):
    11     print("开始接收数据")
    12     data = conn.recv(1024)
    13     print(data)
    14     conn.send(data)
    15     conn.setblocking(False)
    16     if not data:#客户端断开 将连接从sel删除
    17         print("断开连接")
    18         conn.close()
    19         sel.unregister(conn)#
    20 
    21 def accept(server,mask):
    22     print("新开一个连接")
    23     conn,addr = server.accept()
    24     sel.register(conn,selectors.EVENT_READ,read)
    25 
    26 sel.register(server,selectors.EVENT_READ,accept)#注册服务,关联文件句柄和回调函数
    27 print("server",server)
    28 print(accept,read)
    29 while True:
    30     try:
    31         events = sel.select()#阻塞 是否有活动的事件
    32         print(events)
    33         for key, mask in events:  #
    34             # callback = key.data  # key.data为accept或者read的地址 key.fileobj为连接地址
    35             # callback(key.fileobj, mask)
    36             key.data(key.fileobj,mask)#上面2行代码等同于这一行
    37     except OSError as e:
    38         print(e)
    View Code
  • 相关阅读:
    linux系统命令记录
    window下,nodejs 安装 http-server,开启命令行HTTP服务器
    前端学习记录2:设计稿分析
    前端学习记录1:开始
    如何做到让自己长时间精神专注?
    sublime使用
    nodejs 第一次使用
    收集好看的效果及互动页面
    转 如何赢得朋友和获得影响力
    转 scrollLeft,scrollWidth,clientWidth,offsetWidth之完全详解
  • 原文地址:https://www.cnblogs.com/win0211/p/8592316.html
Copyright © 2011-2022 走看看