zoukankan      html  css  js  c++  java
  • Python开发之路-多进程、进程队列、事件驱动模型、IO模型前戏

    1.多进程(multiprocessing)- 很大程度上是为了解决GIL问题

    import os,time
    from multiprocessing import Process
    
    def info(title):
        print('title:',title)
        print('parent process:',os.getppid())
        print('process id:',os.getpid())  #获取进程ID
    
    def f(name):
        info('funtion f')
        print('hello',name)
    
    if __name__ == '__main__':
    
        info('main process line')
        time.sleep(1)
        p = Process(target=info,args=('chris',))  #此时他为子进程  ppid为 上方父进程的pid
        p.start()
        p.join()

    设置成守护进程的方法:p.daemon=True

    其他方法:

    (1)is_alive():返回进程是否在进行

    (2)join([timeout]):阻塞当前上下文环境的进程,直到调用此方法的进程终止或到达指定的timeout(可选参数)

    (3)terminate():不管任务是否完成,立即停止工作进程

    属性:

    (1)daemon:和线程的setDaemon功能一样

    (2)name:进程名字

    (3)pid:进程号

    from multiprocessing import Process
    import time
    
    class Myprocess(Process):
        def __init__(self,num):
            super(Myprocess,self).__init__()
            self.num = num
        def run(self):
            time.sleep(1)
            print(self.is_alive())
            print(self.num,self.pid)
            time.sleep(5)
    
    if __name__ == '__main__':
        l = []
        for i in range(10):
            p = Myprocess(i)
            l.append(p)
        for p in l:
            p.start()
        print('the ending ')
    属性方法调用

    2.进程间通信

    在线程里,同一个进程里的所有线程共享这个进程里内存空间所定义的全局变量

    (1)队列通信

    #
    #
    # from multiprocessing import Process
    # import time
    #
    # class Myprocess(Process):
    #     def __init__(self,num):
    #         super(Myprocess,self).__init__()
    #         self.num = num
    #     def run(self):
    #         time.sleep(1)
    #         print(self.is_alive())
    #         print(self.num,self.pid)
    #         time.sleep(5)
    #
    # if __name__ == '__main__':
    #     l = []
    #     for i in range(10):
    #         p = Myprocess(i)
    #         l.append(p)
    #     for p in l:
    #         p.start()
    #     print('the ending ')
    
    import queue,time
    import multiprocessing
    
    def foo(q):
        time.sleep(1)
        print('son process',id(q))
        q.put(123)
        q.put('chris')
    
    
    if __name__ == '__main__':
        # q = queue.Queue() #这是线程队列
        q = multiprocessing.Queue() #创建一个进程队列
        print('main process',id(q))
        p = multiprocessing.Process(target=foo,args=(q,)) #将q队列传过去,其实这个q是复制而来的
        p.start()
        p.join()
        print(q.get(block=False)) #如果队列为空,这直接报错,而不进行同步
        print(q.get(block=False))
    队列通信

    (2)管道通信  

    from multiprocessing import Process,Pipe
    import time
    def f(conn):
        conn.send([12,{'name':'chris'},66])
        response = conn.recv()
        print('收到回复:',response)
        conn.close()
        print('子进程ID:',id(conn))
    
    if __name__ == '__main__':
        parent_conn,chil_conn = Pipe() #生成了管道的两个头 可以分配给主进程和子进程 双向管道 可以在管道内收发消息
        print('ID-1',id(chil_conn))
    
        p = Process(target=f,args=(chil_conn,))
        p.daemon = True
        p.start()
        time.sleep(5)
        print(parent_conn.recv())  #收到子进程发来的消息
        parent_conn.send('儿子你好!')
        p.join()
    管道通信

    (3)Mangers

    Queue&Pipe只是实现了数据交互(本质是复制,不能修改),并没实现数据共享,既一个进程去更改另一个进程的数据

    能数据共享的类型:列表、字典、变量、互斥锁、递归锁、信号量、队列

    from multiprocessing import Process,Manager
    
    def f(d,l,i):
        d[i] = 'chris'  # 第一次 d{'name-0':'chris'}
        l.append(i) # [0,1,2,3,4,0]
        print('son id',id(d),id(l))
    
    if __name__ == '__main__':
        with Manager() as manger:
            d = manger.dict() #进程里创建字典是调用manager内部封装下的dict
            l = manger.list(range(5))
            print('main process:',id(d),id(l))
    
            p_list = []
    
            for i in range(10):
                p = Process(target=f,args=(d,l,i))
                p.start()
                p_list.append(p)
    
            for a in p_list:
                a.join()
    
            print('the end ')
            print(d) #查看子进程是否修改了数据
            print(l)
    Manager实现数据共享

     3.进程同步

    虽说进程间的资源不共享,但有时候有些资源需要进程间进行共享(比如共用一块屏幕时,如果不加锁,多个进程之间会互相抢资源,发生串行)

    注意!!进程之间一定要传送值,因为进程之间不共享数据!!!所以在主进程创建的变量,子进程要使用的话需要将变量传过去!!!

    #进程同步
    
    from multiprocessing import Process,Lock
    
    def f(l,i):
        with l: #相当于 l.acquire()
            print('hello world %s' %i)
    
    if __name__ == '__main__':
        l = Lock()
        for i in range(10):
            Process(target=f,args=(l,i)).start()
    进程同步

    4.进程池

    进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池中没有可供使用的进程,那么程序就会等待,直到进程池中有可用的进程为止

    进程池中有两个方法:

    apply-同步方法 和 apply_async-异步方法

    apply_async三个参数,func,args,和回调函数callback - 就是某个动作或者某个函数执行成功后再去执行的函数  回调函数是在主进程下调用!! 好处:日志记录时,让子进程能够顺利结束,日志记录的任务属于逻辑之外的任务,交给主进程去做就好了  接受到的参数来自子进程的return返回值

    #进程池
    
    from multiprocessing import Process,Pool
    import time,os
    
    def foo(i):
        time.sleep(1)
        print(i)
        print('son id', os.getpid())
        return i+100
    
    def bar(arg): #接受到的参数来自子进程的return返回值 i+100
        print(os.getpid())
        print('logger:',arg)
    
    if __name__ == '__main__':
    
        pool = Pool(5) #创建进程池对象,默认数量为电脑的CPU数量,此时最大进程数为5,4个并行,一个与其中一个CPU并发切换
    
        bar(1)
        print('----'*5)
    
        for i in range(6):
            # pool.apply(func=foo,args=(i,)) #同步的
            pool.apply_async(func=foo,args=(i,),callback=bar) #异步的 回调函数是在主进程下进行的!!而不是在子进程
    
        pool.close() #close 要放在join前面 和join两个缺一不可
        pool.join()
    进程池

    5.协程(非抢占式) -  协作式  用户态的切换

    主要解决的也是IO操作

    又叫微线程 本质:就是一个线程

    优势:

    1.没有切换的消耗

    2.没有锁的概念

    有一个问题:能用多核吗?可以采用多进程+协程,一个很好的解决方案,既在多个进程里跑协程

    与生成器的交互相似,yield交互的两个方法 (1)next()或__next__() (2)send 将值送到指针停留出的yield位置 再往下找下一个yield

    def f():
        print('ok')
        res = yield 1
        print(res)
        print('hello')
        yield
    f = f() #得到生成器对象
    print(next(f))
    # print(f.__next__())
    # print(f.__next__())
    f.send(5)
    生成器

    本节的主题是基于单线程来实现并发,即只用一个主线程(很明显可利用的cpu只有一个)情况下实现并发,为此我们需要先回顾下并发的本质:切换+保存状态

    cpu正在运行一个任务,会在两种情况下切走去执行其他的任务(切换由操作系统强制控制),一种情况是该任务发生了阻塞,另外一种情况是该任务计算的时间过长或有一个优先级更高的程序替代了它

    ps:在介绍进程理论时,提及进程的三种执行状态,而线程才是执行单位,所以也可以将上图理解为线程的三种状态 

        一:其中第二种情况并不能提升效率,只是为了让cpu能够雨露均沾,实现看起来所有任务都被“同时”执行的效果,如果多个任务都是纯计算的,这种切换反而会降低效率。为此我们可以基于yield来验证。yield本身就是一种在单线程下可以保存任务运行状态的方法,我们来简单复习一下:

    #1 yiled可以保存状态,yield的状态保存与操作系统的保存线程状态很像,但是yield是代码级别控制的,更轻量级
    #2 send可以把一个函数的结果传给另外一个函数,以此实现单线程内程序之间的切换  

    协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。、

    需要强调的是:

    #1. python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)
    #2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)

    对比操作系统控制线程的切换,用户在单线程内控制协程的切换

    优点如下:

    #1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
    #2. 单线程内就可以实现并发的效果,最大限度地利用cpu

    缺点如下:

    #1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程
    #2. 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程

    总结协程特点:

    1. 必须在只有一个单线程里实现并发
    2. 修改共享数据不需加锁
    3. 用户程序里自己保存多个控制流的上下文栈
    4. 附加:一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))

    生成器协程:

    import time,queue
    
    def consumer(name):
        print('----> [%s]ready to eat baozi...' %name)
        while True:
            new_baozi = yield
            print('[%s] is eating baozi %s' %(name,new_baozi))
    
    def producer():
        r = con.__next__()
        r = con2.__next__()
        n = 0
        while 1:
            time.sleep(1)
            print('33[32:1m[producer]33[0m is making baozi %s and %s' %(n,n+1))
            con.send(n) #send 后 生成器对象就能走到下一个yield
            con2.send(n+1)
    
            n += 2
    
    if __name__ == '__main__':
        con = consumer('chris')
        con2 = consumer('joe')
        producer()
    生成器协程

    Greenlet模块 通过switch接口来实现切换以实现协程!!比yield更方便

    如果我们在单个线程内有20个任务,要想实现在多个任务之间切换,使用yield生成器的方式过于麻烦(需要先得到初始化一次的生成器,然后再调用send。。。非常麻烦),而使用greenlet模块可以非常简单地实现这20个任务直接的切换

    pip3 install greenlet
    from greenlet import greenlet
    
    def eat(name):
        print('%s eat 1' %name)
        g2.switch('egon')
        print('%s eat 2' %name)
        g2.switch()
    def play(name):
        print('%s play 1' %name)
        g1.switch()
        print('%s play 2' %name)
    
    g1=greenlet(eat)
    g2=greenlet(play)
    
    g1.switch('egon')#可以在第一次switch时传入参数,以后都不需要
    Greenlet

    Gevent介绍

    Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

    import requests,time
    import gevent
    start = time.time()
    
    def f(url):
    
        resp = requests.get(url)
    
        data = resp.text
        print('%d bytes receieved from %s' %(len(data),url))
    
    gevent.joinall([
        gevent.spawn(f,'https://www.baidu.com/'),
        gevent.spawn(f,'https://www.sina.com.cn/'),
    ])
    gevent 要比协程快

     6.事件驱动模型

    事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定,它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两种常见的编程方式是(单线程)同步,以及多线程编程

     目前国内大部分的UI编程都是事件驱动模型,如很多UI平台都会提供onClick()事件,这个事件就代表鼠标按下事件,事件驱动模型大体思路如下:

    1.有一个事件(消息)队列

    2.鼠标按下时,往这个队列中增加一个点击事件(消息);

    3.有个循环,不断从队列取出事件,根据不同的事件,调用不同的函数,如onClick(),onKeyDown()等

    4.事件(消息)一般都各自保存各自的处理函数指针,这样,每个消息都有独立的处理函数

     7.IO模型前戏 

    内核态:独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限

    用户态:用户使用自己应用程序的内存空间

    
    在进行解释之前,首先要说明几个概念:
    
    用户空间和内核空间
    进程切换
    进程的阻塞
    文件描述符
    缓存 I/O
    用户空间与内核空间
    现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。 
    操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。 
    为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。 
    针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。 
    
    进程切换
    为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复以前挂起的某个进程的执行。这种行为被称为进程切换,这种切换是由操作系统来完成的。因此可以说,任何进程都是在操作系统内核的支持下运行的,是与内核紧密相关的。 
    从一个进程的运行转到另一个进程上运行,这个过程中经过下面这些变化:
    
    保存处理机上下文,包括程序计数器和其他寄存器。
    
    更新PCB信息。
    
    把进程的PCB移入相应的队列,如就绪、在某事件阻塞等队列。
    
    选择另一个进程执行,并更新其PCB。
    
    更新内存管理的数据结构。
    
    恢复处理机上下文。 
    注:总而言之就是很耗资源的
    
    进程的阻塞
    正在执行的进程,由于期待的某些事件未发生,如请求系统资源失败、等待某种操作的完成、新数据尚未到达或无新工作做等,则由系统自动执行阻塞原语(Block),使自己由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也因此只有处于运行态的进程(获得CPU),才可能将其转为阻塞状态。当进程进入阻塞状态,是不占用CPU资源的。
    
    文件描述符fd
    文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。 
    文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。
    import socket
    print(socket.socket())
    <socket.socket fd=172, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0>
    缓存 I/O
    缓存 I/O 又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。用户空间没法直接访问内核空间的,内核态到用户态的数据拷贝 
    缓存 I/O 的缺点: 
    
    数据在传输过程中需要在应用程序地址空间和内核进行多次数据拷贝操作,这些数据拷贝操作所带来的 CPU 以及内存开销是非常大的。
    
    
  • 相关阅读:
    Sql Server2000里面获得数据库里面所有的用户表名称 和对应表的列名称
    c#开发windows应用程序几个小技巧
    "Unexpected Error 0x8ffe2740 Occurred" Error Message When You Try to Start a Web Site
    注册asp.net到iis
    O/R Mapping 影射文件生成的一点研究(暂时放在这里,实现以后再进行总结)
    WMI使用集锦
    NHibernate快速指南
    项目打包以前需要删除的文件
    Asp.Net 学习资源列表
    精彩Blog
  • 原文地址:https://www.cnblogs.com/caixiaowu/p/12589341.html
Copyright © 2011-2022 走看看