zoukankan      html  css  js  c++  java
  • 12.python进程协程异步IO

    进程

    Python中的多线程无法利用多核优势 , 所以如果我们想要充分地使用多核CPU的资源 , 那么就只能靠多进程了

    multiprocessing模块中提供了Process , Queue , Pipe , Lock , RLock , Event , Condition等组件 , 与threading模块有很多相似之处

    1.创建进程

    from multiprocessing import Process
    import time
    
    def func(name):
        time.sleep(2)
        print('hello',name)
    
    if __name__ == '__main__':
        p= Process(target=func,args=('derek',))
        p.start()
        # p.join()
        print('end...')
    View Code

    2.进程间通讯

    (1)Queue

    不同进程间内存是不共享的,要想实现两个进程间的数据交换。进程间通信有两种主要形式 , 队列和管道

    from multiprocessing import Process, Queue   #Queue是进程排列
    
    def f(test):
        test.put('22')   #通过创建的子进程往队列添加数据,实线父子进程交互
    
    if __name__ == '__main__':
        q = Queue()      #父进程
        q.put("11")
    
        p = Process(target=f, args=(q,))   #子进程
        p.start()
        p.join()
    
        print("取到:",q.get_nowait())
        print("取到:",q.get_nowait())
    
    #父进程在创建子进程的时候就把q克隆一份给子进程
    #通过pickle序列化、反序列化,来达到两个进程之间的交互
    
    
    
    结果:
    取到: 11
    取到: 22
    Queue

    (2)Pipe(管道)

    The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way).

    from multiprocessing import Process, Pipe
    
    def f(conn):
        conn.send('11')
        conn.send('22')
        print("from parent:",conn.recv())
        print("from parent:", conn.recv())
        conn.close()
    
    if __name__ == '__main__':
        parent_conn, child_conn = Pipe()   #生成管道实例,可以互相send()和recv()
    
        p = Process(target=f, args=(child_conn,))
        p.start()
    
        print(parent_conn.recv())      # prints "11"
        print(parent_conn.recv())      # prints "22"
        parent_conn.send("33")         # parent 发消息给 child
        parent_conn.send("44")
        p.join()
    Pipe

    3.Manager

    进程之间是相互独立的 ,Queue和pipe只是实现了数据交互,并没实现数据共享,Manager可以实现进程间数据共享 。

    Manager还支持进程中的很多操作 , 比如Condition , Lock , Namespace , Queue , RLock , Semaphore等

    from multiprocessing import Process, Manager
    import os
    
    def f(d, l):
        d[os.getpid()] =os.getpid()
        l.append(os.getpid())
        print(l)
    
    if __name__ == '__main__':
        with Manager() as manager:
            d = manager.dict()  #{} #生成一个字典,可在多个进程间共享和传递
    
            l = manager.list(range(5))     #生成一个列表,可在多个进程间共享和传递
            p_list = []
            for i in range(2):
                p = Process(target=f, args=(d, l))
                p.start()
                p_list.append(p)
            for res in p_list: #等待结果
                res.join()
            print(d)
            print(l)
    View Code

    4.lock

    from multiprocessing import Process, Lock
    
    def f(l, i):
        #l.acquire()
        print('hello world', i)
        #l.release()
    
    
    if __name__ == '__main__':
        lock = Lock()
    
        for num in range(100):
            Process(target=f, args=(lock, num)).start()     #要把lock传到函数的参数l
            
    #lock防止在屏幕上打印的时候会乱
    lock

    5.进程池

    进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。

    进程池中有以下几个主要方法:

    1. apply:从进程池里取一个进程并执行
    2. apply_async:apply的异步版本
    3. terminate:立刻关闭线程池
    4. join:主进程等待所有子进程执行完毕,必须在close或terminate之后
    5. close:等待所有进程结束后,才关闭线程池
    from  multiprocessing import Process, Pool
    import time
    import os
    
    def Foo(i):
        time.sleep(2)
        print("in process",os.getpid())
        return i + 100
    
    def Bar(arg):
        print('-->exec done:', arg,os.getpid())
    
    if __name__ == '__main__':    #多进程,必须加这一句(windows系统)
        pool = Pool(processes=3) #允许进程池同时放入3个进程
        print("主进程",os.getpid())
        
        for i in range(10):       
            pool.apply_async(func=Foo, args=(i,), callback=Bar) #callback=回调,执行完Foo(),接着执行Bar()
            # pool.apply(func=Foo, args=(i,)) #串行
            
        print('end')
        pool.close()
        pool.join()   #进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。必须先close(),再join()
    Pool

    协程

    1.简介

    协程(Coroutine) : 是单线程下的并发 , 又称微线程 , 纤程 . 协程是一种用户态的轻量级线程 , 即协程有用户自己控制调度

    协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。

    协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态

    使用协程的优缺点

    优点 :

    1. 协程的切换开销更小 , 属于程序级别的切换 , 更加轻量级
    2. 单线程内就可以实现并发的效果 , 最大限度利用CPU

    缺点 :

    1. 协程的本质是单线程下 , 无法利用多核 , 可以是一个程序开启多个进程 , 每个进程内开启多个线程 , 每个线程内开启协程
    2. 协程指的是单个线程 , 因而一旦协程出现阻塞 将会阻塞整个线程

    2.Greenlet

    greenlet是一个用C实现的协程模块,相比与python自带的yield,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator

    手动切换

    from greenlet import greenlet
    
    def test1():
        print(12)
        gr2.switch()      #到这里切换到gr2,执行test2()
        print(34)
        gr2.switch()      #切换到上次gr2运行的位置
    
    def test2():
        print(56)
        gr1.switch()      #切换到上次gr1运行的位置
        print(78)
    
    gr1 = greenlet(test1)      #启动一个协程gr1
    gr2 = greenlet(test2)      #启动一个协程gr2
    
    gr1.switch()        #开始运行gr1
    greenlet

    3.Gevent

    Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。

    (1)IO阻塞自动切换

    import gevent
    
    def foo():
        print('Running in foo')
        gevent.sleep(2)
        print('阻塞时间最长,最后运行')
    
    def bar():
        print('running in bar')
        gevent.sleep(1)
        print('foo()还在阻塞,这里第二个运行')
    
    def func3():
        print("running in func3 ")
        gevent.sleep(0)
        print("其它两个还在IO阻塞先运行")
    
    #创建协程实例
    gevent.joinall([
        gevent.spawn(foo), #生成,
        gevent.spawn(bar),
        gevent.spawn(func3),
    ])
    
    #遇到IO自动切换
    
    
    
    
    结果:
    Running in foo
    running in bar
    running in func3 
    其它两个还在IO阻塞先运行
    foo()还在阻塞,这里第二个运行
    阻塞时间最长,最后运行
    
    Process finished with exit code 0
    View Code

     由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成:

    (2)爬虫例子:

    from urllib import request
    import gevent,time
    from gevent import monkey
    monkey.patch_all() #作用:把当前程序的所有的io操作给我单独的做上标记
    
    def f(url):
        print('GET: %s' % url)
        resp = request.urlopen(url)
        data = resp.read()
        print('%d bytes received from %s.' % (len(data), url))
    
    #同步需要的时间
    urls = ['https://www.python.org/',
            'https://www.yahoo.com/',
            'https://github.com/' ]
    time_start = time.time()
    for url in urls:
        f(url)
    print("同步cost",time.time() - time_start)
    
    #下面是异步花费的时间
    async_time_start = time.time()
    gevent.joinall([
        gevent.spawn(f, 'https://www.python.org/'),
        gevent.spawn(f, 'https://www.yahoo.com/'),
        gevent.spawn(f, 'https://github.com/'),
    ])
    print("异步cost",time.time() - async_time_start)
    
    
    结果:
    GET: https://www.python.org/
    48954 bytes received from https://www.python.org/.
    GET: https://www.yahoo.com/
    491871 bytes received from https://www.yahoo.com/.
    GET: https://github.com/
    51595 bytes received from https://github.com/.
    同步cost 4.928282260894775
    GET: https://www.python.org/
    GET: https://www.yahoo.com/
    GET: https://github.com/
    48954 bytes received from https://www.python.org/.
    494958 bytes received from https://www.yahoo.com/.
    51599 bytes received from https://github.com/.
    异步cost 1.4920852184295654

    IO多路复用

    详解:http://www.cnblogs.com/alex3714/articles/5876749.html

    selectors模块

    selectors基于select模块实现IO多路复用,调用语句selectors.DefaultSelector(),特点是根据平台自动选择最佳IO多路复用机制,调用顺序:epoll > poll > select

    做一个socket servers

    import selectors
    import socket
    sel = selectors.DefaultSelector()        # 根据平台自动选择最佳IO多路复用机制
    
    def accept(sock, mask):
        conn, addr = sock.accept()           # Should be ready
        # print('accepted', conn, 'from', addr,mask)
        conn.setblocking(False)              #设置为非阻塞IO
        sel.register(conn, selectors.EVENT_READ, read)
                                             #新连接注册read回调函数
                                             #将conn和read函数注册到一起,当conn有变化时执行read函数
    
    def read(conn, mask):
        data = conn.recv(1024)  # Should be ready
        if data:
            print('echoing', repr(data), 'to', conn)
            conn.send(data)                  # Hope it won't block
        else:
            print('closing', conn)
            sel.unregister(conn)
            conn.close()
    
    sock = socket.socket()
    sock.bind(('localhost', 9999))
    sock.listen(100)
    sock.setblocking(False)             #设置为非阻塞IO
    sel.register(sock, selectors.EVENT_READ, accept)
                                        # 将sock和accept函数注册到一起,当sock有变化时执行accept函数
    
    while True:
        events = sel.select()  #默认阻塞,有活动连接就返回活动的连接列表,监听[(key1,mask1),(key2),(mask2)]
    
        for key, mask in events:
            callback = key.data                 #accept      #1 key.data就是accept   # 2 key.data就是read
            callback(key.fileobj, mask)         #key.fileobj=  文件句柄
                                                # 1 key.fileobj就是sock   # 2 key.fileobj就是conn
    server
    client
    import socket
    import sys
    
    messages = [ b'This is the message. ',
                 b'It will be sent ',
                 b'in parts.',
                 ]
    server_address = ('localhost', 9999)
    
    # Create a TCP/IP socket
    socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(5)]
    print(socks)
    # Connect the socket to the port where the server is listening
    print('connecting to %s port %s' % server_address)
    for s in socks:
        s.connect(server_address)
    
    for message in messages:
    
        # Send messages on both sockets
        for s in socks:
            print('%s: sending "%s"' % (s.getsockname(), message) )
            s.send(message)
    
        # Read responses on both sockets
        for s in socks:
            data = s.recv(1024)
            print( '%s: received "%s"' % (s.getsockname(), data) )
            if not data:
                print( 'closing socket', s.getsockname() )
    mutlti conn socket client
     
     
     
  • 相关阅读:
    JS中的函数声明存在的“先使用,后定义”
    JS原型与原型链终极详解
    Angular--页面间切换及传值的四种方法
    Angularjs Controller 间通信机制
    angularjs 中使用 service 在controller 之间 share 对象和数据
    CSS中!important的使用
    angularjs checkbox 框的操作
    纯CSS气泡框实现方法探究
    AngularJS 实现页面滚动到底自动加载数据的功能
    AngularJS的Filter用法详解
  • 原文地址:https://www.cnblogs.com/gaidy/p/12095540.html
Copyright © 2011-2022 走看看