zoukankan      html  css  js  c++  java
  • 进程,线程,锁,生产者与消费者模型,异步

     

    1. 编程语言的发展:

    机器语言(二进制) 混编语言(指令,命令形式) 高级语言(面向过程/面向对象)

    2. 操作系统的目标: 方便用户使用,低耦合,高可用

       计算机组成: cpu,主板,存储设备,输入设备,输出设备

    3. 并行: 指同一时间点,有多个程序在同时进行

      并发: 指同一时间断内,多个程序都不分先后的依次被执行过

    4. 进程的组成:  代码段, 数据段,  PCB(进程控制块)

      进程是资源分配的基本单位,线程是cpu调度的基本单位

      进程的三个基本状态:

      1)就绪状态: 获得了除CPU之外运行所需要的所有资源

      2)执行状态: 获得了所有资源,处于正在执行的状态

      3)阻塞状态: 因各种原因,导致进程放弃了CPU,进程处于内存中,没有继续执行

      特殊状态:  挂起状态,了解即可, 放弃CPU,也不在内存中.

    5. 操作系统的作用:1.封装了所欲硬件接口,方便用户使用; 2.对计算机内所有资源,

    行统一的调度和分配

    6. 多进程相关概念: 并行,并发,同步,异步,阻塞,非阻塞

       : 早期CPU是单核时,没有并行的概念,只有并发(微观上串行,宏观上并行)

      获取当前进程的pid: os.getpid();  获取当前进程父进程的pid: os.getppid()

      开启子进程的两种方式:

         1)  p=Process( target=函数名, args=(参数1,参数2...) )

    当定义的函数不需要参数时,args可以没有

         2) 自定义个类,去继承process

    进程的常用方法:

          p.start() 告知操作系统,给开启个子进程.本质调用的p.run()

          p.join() 异步变同步(等待子进程结束,再执行join下的主进程代码)

          p.is_alive() 打印即可,用来判断p进程是否还活着,结果为TrueFalse

          p.terminate() 告知操作系统,结束掉p进程

       进程的常用属性:

          p.name=’xxx’ 用在主程序里if判断下,用来给子进程p来命名的

          p.pid   注意,pid后是不带括号的,用来查询子进程ppid.注意区分os.getpid()

                p.pid是用在主进程中查ppid,os.getpid()是用来查当前进程的pid

          p.daemon=True p进程设置为守护进程(daemon=后默认是false,即普通进程)

              普通进程是当所有子进程都结束后主进程才结束,(此时主进程中的所有代码都已经执行完了,程序会停在最后的空白处,注意和p.join()区分开).守护进程是指:当主进程运行完子进程便会立即结束.且守护进程不能再创建子进程.注意:p.daemon=True 必须放在p.start()之前才能生效...注意daemon写法

    演示:

    from  multiprocessing  import  Process  导入模块

    n=1000

    def  fun(n):

        print(‘这是第%s’  %  (n) )

        print(‘子进程和主进程的pid分别是:’ , os.getpid() , os.getppid() )

    if __name__ == ‘__main__’:

        n=1

        p=process( target=fun, args=(n,)) 实列个对象,对象目标是fun,args后的东西是给目标fun函数

          传的参数,必须是个元组,参数有多个时以逗号隔开,不需要参数时可不写,当传的参数和子进程

    中的冲突时,以主进程为准. 列中打印的n值为1, 当主不传n,打印的结果为1000

       p.daemon=True 表示把子程p设为守护进程,必须在start,且子进程内不能有开启子进程的代码

    p.start()       告知操作系统开启个进程,内容p中的所有

        p.join()     是让主进程呆在这里,p子进程结束后再往下执行主进程代码  

    p.terminate()   告知操作系统杀死这个p进程

        print(p.is_alive)  查看p进程是否还活着

       p.name=’xxx’  把子进程p命名为xxx

       print( p.name , p.pid , p.daemon ) 分别是用来查看子进程p的名字,pid和是否为守护进程

    :互斥锁:(锁机制)

      1.  from multiprocessing import Lock从模块中导入Lock

        先实列化出来个锁对象:l=Lock()

        注意:互斥锁必须是l.acquire()抢一次,然后l.release()释放一次,才能继续l.acquire(),不能连续的l.acquire().

      2. 互斥锁和joIn的区别:

         二者的原理都是一样,都是将并发变成串行,从而保证有序

         区别:join是人为指定的顺序,而互斥锁是所有进程平等的竞争,谁先抢到谁执行.

              join是将子进程整体代码串行,而互斥锁能让其中一部分代码串行.

    :信号机制:

        from multiprocessing import semaphore 导入模块

        y=semaphore(n)  初始化一把锁配n把钥匙,n是个可指定的int类型

        y.acquire()  拿钥匙,锁门

        y.release()   开门,还钥匙

       信号量机制比锁机制多了个计数器,用来记录当前剩余几把钥匙,当计数器为0,表示

    没有钥匙了,此时其余的y.acquire()处于阻塞;y.release()一次,计数器就加1,y.acquire()一次,计数器就减1.

    .事件机制:  from multiprocessing import Event 导入的模块

        e=Event() 实列化个e事件对象

        e.set() e 的布尔值设置为True   e.clear() e的布尔值设置为False

        e.wait() 根据e的布尔值进入阻塞(False)或非阻塞(True),括号内可加入时间,比如数字2, 意为2秒后自动放行(即使不运行e.set())也会自动转为非阻塞通过.

        e.is_set() 标识 (用来查看当前e的布尔值)

        这个e.is_set()的布尔值可被任意子进程随时感应到,任意子进程也可去改变它

    .IPC机制:  队列 from multiprocessing import Queue

      IPC -- inter process Communication  进程间通信

        1)队列: 建立在内存空间中,且是共享的空间,能自动帮我们处理好锁定的问题

    队列用来存程序之间沟通的消息,数据量不应该过大

         maxsize的值超过内存的限制就变得毫无意义了

         用法:y=Queue(数字) 表示示列化个y对象,并规定了y队列能放的东西个数

             y.put(元素) 表示放入一个元素,运行一次放一个,队列满则阻塞等待

             y.get() 表示从y队列中取出一个元素,队列为空则阻塞等待

             y.get_nowait() 不阻塞,如果没有数据就报错

             y.put_nowait()不阻塞,如果队列满了则报错

        注意,如果上面数字是3,则放入元素操作或取出元素操作大于3则会提示报错

        2)管道(了解)  from multiprocessing import Pipe

           con1,con2=Pipe() 实列化个管道对象,得到管道两头con1con2

           管道也是多进程间通信的一种方式,但其是不安全的,对于数据没有锁机制,

           管道在使用过程中必须遵循一端发送对应另一端接收.  管道中有个著名的错

    EOFError是指主进程中关闭了发送端,子进程还继续收,就会报这个错

    .生产者与消费者模型:

       模型指的是一种解决问题的思路,而不是具体的东西

       使用该模型的应用场景: 如果程序中有两类任务,一类是负责产生数据,另一类是负

    责处理产生的数据时,就可考虑使用该模型

       该模型的运作方式: 生产者生产数据,存放于共享空间,然后消费者取走处理.

       该模型实现方式之一:  生产者进程+队列+消费者进程

       模型优点:实现生产者和消费者解耦合,平衡生产者的生产和消费者处理数据的能力

       这里使用个queue模块的升级版:JoinableQueue,其继承了Queue,且提供了随时感

    put进去的数据何时被消费者处理完的能力

       from multiprocessing import JoinableQueue,Process
    def xf(q):
        while 1:
            res=q.get() #循环取出
            print(res)
            q.task_done() #每取出一个会返回给q.join一个消费的信息
    def sc(q):
        for i in range(1,10):
            q.put('这是生产的第%s个东西' % i) #每生产一个就放如队列中
        q.join() #能自动记录生产者产出的所有数量,并实时检测队列内是否有东西.阻塞等待状态
                 #队列为空时自动结束运行,意为者子进程就结束了运行
    if __name__ == '__main__':
        q=JoinableQueue(2) #实列化个队列对象,括号内数字可设定队列的容纳数
        p_xf=Process(target=xf,args=(q,))
        p_sc=Process(target=sc,args=(q,))
        p_xf.daemon=True #把消费者设为守护,感知主运行完代码就结束
        p_xf.start()
        p_sc.start()
        p_sc.join()#者里的join就是个正常的,表示等待p_sc这个子进程结束的意思

    .进程之间的共享内存:

      from multiprocessing import Manager, Value 这两个模块都可实现所有主进程和子进程对同一数据的修改,并能在每个进程中生效和体现

    def fun(num):
        num[0]-=1
        print('子进程中的num值为',num)
    if __name__ == '__main__':
        m=Manager() #实列化了个共享的内存空间
        num=m.list([1,2,3]) #把数据放入这个内存空间中
        p=Process(target=fun,args=(num,))
        p.start()
        p.join()
        print('父进程中的num值是',num)

    ============================================

    def run(num):
        num.value-=1 #每次取值都得使用对象.value
        print('子进程中num值为:',num.value)
    if __name__ == '__main__':
        num=Value('i',100) #放入共享空间中的数据需注明类型,有点类似于struct
        p=Process(target=run,args=(num,))
        p.start()
        print('主进程中num的值为', num.value)
        p.join()
        print('主进程中num的值为',num.value)

    .进程池

       存放进程的池子,进程多少可自定义,这些进程一直处于待命状态,一旦有任务来,马上就有进程去处理,进程池还会帮程序员去管理池中的进程.

       from multiprocessing import Pool  下代码中os.cpu_count()是调用os取得cpu核数

       p=Pool(os.cpu_count()+1) 实列化个对象池,池中进程数为cpu核数+1,也可手动指定

      开启进程池的三种方法:

       1)  map(func.iterable)

        func: 进程池中的进程执行的任务函数

        iterable: 可迭代对象,是把可迭代对象中的每个元素依次传给任务函数当参数

       2)  apply(func,args=()):  同步的,就是说池中的进程一个一个的去执行任务

        func: 进程池中的进程执行的任务函数

        args: 可迭代对象型的参数,是传给任务函数的参数

         同步处理任务时,不需要closejoin, 且池中的所有进程都时普通进程(主进程需

    要等待子进程的结束而结束)

       3) apply_async(func.args=(),callback=None)异步的,表示池中进程一次性全去执行任务

        func: 进程池中的进程执行的任务函数

        args: 可迭代对象型的参数,时传给任务函数的参数

        callback: 回调函数.进程池中有进程处理完任务了,返回的结果可以交给回调函数,

    由回调函数进型进一步处理,回调函数只有异步才有,同步没有.

        注意: 使用异步处理任务时,进程池中的所有进程都是守护进程,随着主进程代码执

    行完毕就结束,所以必须要求加上close join

        回调函数的使用: 进程的任务函数的返回值,会被回调函数的形参接收到,以此进行

    进一步的处理操作;回调函数是由主进程调用的,子进程只负责把结果传给回

    调函数,回调函数默认是None,需要使用时把默认值替换为函数名即可.

    方法一:  异步处理

    def run(i):
        i+=1
        print(i,'这里的')
        return i
    if __name__ == '__main__':
        p=Pool(4) #造个进程池,指定了4个进程
        res=p.map(run,[i for i in range(20)]) #开始让池中所有进程同时去执行任务
        print(res) #使用map,可接收到所有子进程中的返回值,结果是个列表

    方法二:  同步处理

    def run(i):
        i=str(i)+'0'
        print(i)
        return i
    if __name__ == '__main__':
        p=Pool(4) #造个进程池,指定4个进程
        for i in range(20):
            res=p.apply(run,args=(i,))#括号内依次放要执行的任务,传给任务的参数
        print(res) #也可接收子进程返回值,不过因apply是同步执行,所以放在循环外
                   #打印处的永远是最后一个的返回值,前面的会被全部覆盖掉.

                   # 可在for循环内依次把res加入到列表中,即可全部保存下来.

    方法三:  异步处理

    def run(i):
        i=str(i)+'0'
        print(i)
        return i
    if __name__ == '__main__':
        lis=[]
        p=Pool(2)
        for i in range(20):
            res=p.apply_async(run,args=(i,))
            lis.append(res)
        p.close() #关闭系统资源
        p.join()  #等待所有子进程结束
        [print(i.get()) for i in lis] #得到的res是个无限容量的队列,get到数值

    为什么说队列是带锁的,在生产者消费者模型处找:设定队列容量3,通过放入3个数据后就再也放不进去,判定的队列是带锁的.

    线程部分:  线程模块导入: from threading import Thread

    线程,又被称为轻量级进程,他是计算机中最小的可执行单位;线程无法独立拥有资源,必须依赖于所属的进程去获取资源,一个进程中,所有的线程共享进程内所有资源,既然共享,必然会造成混乱,使用时需注意这点.   补充:cpython一类解释器中,GIL这么个概念(全局解释锁),所以对于线程来说,没有真正意义的并行.

    GIL锁的是线程,意思是同一时刻,只允许一个线程访问CPU.

    线程又分为用户级线程和内核级线程:

    用户级线程:对于程序员来说,这样的线程完全被其控制,调度,执行

    内核级线程:对于计算机内核来说,这样的线程完全被内核控制,调度.

    .线程与进程的对比:

    1)进程组成: 代码段, 数据段, PCB(进程控制块process control block)

    线程组成: 代码段, 数据段, TCB(线程控制块Thread control block)

    2) CPU切换进程的速度要比切换线程的速度慢很多很多

      如果IO操作多时,最好使用线程去处理问题 .

      如果计算密集时,最好使用进程去处理问题 .

      同一个进程中,所有线程共享进程的pid,也就是说所有线程共享所属进程的所有资

    源和内存地址.

      3)守护线程与守护进程:

        守护进程: 要么自己正常结束,要么随着主进程代码读取完后被迫结束

                 (主进程代码读完会等着所有非守护进程结束后才结束自己,守护进程在

    主进程读完代码的那一刻就会立即结束,而不会去等主进程的结束)

        守护线程: 要么自己正常结束,要么随着主线程代码运行完而被迫结束

                 (线程没有主次之分,一个进程的开启,必然会自带有一个线程,进程根本

    就不是一个执行单位,它是一个资源单位.这里所说的主线程只是为了

    区分各个线程而用的一个叫法;同进程一样,主线程执行完代码后也得

    等其余所有线程结束而结束,而守护线程是随着主线程的结束而结束

    ,这里需要注意守护线程和守护进程的区别)

    .线程的使用方法:

       线程也有锁机制,信号机制与事件机制,用法同进程的用法一模一样,除此之外还有些

    线程的特有用法: (前三个见进程处)

       1)条件机制: from threading import Condition 导入模块

         条件是让程序员自行去调度线程的一个机制.condition(条件)涉及4个方法.

    con.acquire()  con.release()  con.wait()  con.notify() 这四个方法,用法如下:

    from threading import Thread,Condition     #notify(通知)
    def run(con,i):
        con.acquire() #拿钥匙
        con.wait()   #阻塞住
        print('%s个通过了...' % i)
        con.release() #还钥匙
    if __name__ == '__main__':
        con=Condition() #实列化个条件机制对象出来
        for i in range(10):
            t=Thread(target=run,args=(con,i))
            t.start()
        while 1:
            number=int(input('请输入放行数量>>:'))
            con.acquire()
            con.notify(number) #发信号给wait,让它放行number个线程
            con.release()

       2)定时器: from threading import Timer(计时器)

    from threading import Timer
    def run():
        print('睡眠时间过后开始运行....')
    Timer(4,run).start() #4秒后运行run函数,就这么简单

    补充: 1)所有的锁都是把并发转为串行,牺牲了效率,提升了数据安全

          from  threading  import  RLock

          用法: 1=2=3.....=RLock()

    2)死锁现象和解决办法之递归锁:

         递归锁的真正含义是:当某个进程先获取到递归锁时,每锁一次递归锁上引用计数

    就多加1,只要递归锁上的引用计数不为0,其余进程就无法再获取该锁,直到获取到的这个进程把递归锁上的计数给release0时才可以.

        3)GIL:全局解释器锁

             本质就是一把互斥锁,是加在解释器身上的.

             同一个进程内的所有线程都需要先抢到GIL,才能执行解释器代码.

          GIL优缺点: 保证了Cpython解释器内存管理的线程安全,但限制了同一进程内

    所有的线程同一时刻只能有一个执行,也就是说Cpython解释器的多线程

    无法实现并行.

    补充: 进程池,线程池的另一类开启方式:(含同步异步)

    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    p=ProcessPoolExecutor(4)  #造个4进程的池子
    def server():
        f=socket.socket()
        f.bind(('192.168.10.154',8749))
        f.listen()
        while 1:
            con,add=f.accept()
            p.submit(work,con,add) #线程池的开启方法
        f.close()
    def work(con,add):
        while 1:
            try:
                res=con.recv(1024).decode('utf-8')
                if not res: break #防止客户端单方面退出造成服务端无限循环收空
                con.send(res.encode('utf-8'))
            except ConnectionResetError:
                break
        con.close()
    if __name__ == '__main__':
        server()

    要使用线程就直接用Thread...来造线程的池子,然后把进程换成线程即可

    补充: 同步,异步,阻塞与非阻塞概念:

         同步,异步,指的是程序运行中的两种状态.

         阻塞与非阻塞指的是提交任务的两种方式.

    2)线程池/进程池异步处理: 换线程为进程即可

    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import time,random
    def run(i):
        print('%s个进来了....' % i)
        time.sleep(random.randint(1,3))
        return i**2
    if __name__ == '__main__':
        lis=[]
        t=ThreadPoolExecutor(4)#造个4线程的池子,不指定时默认数是电脑核数的5
        for i in range(1,20):
            obj=t.submit(run,i) #把参数传给线程池内去运行
            lis.append(obj)   #obj理解为线程空间,把该空间加入到列表中
        t.shutdown(wait=True) #相当于pooi中的closejoin的联合,表示等待所有线程处理完
        print(lis[索引].result()) #lis是个列表,通过索引得到具体的对象,result得到结果
        print('') #程序会阻塞在上面shutdown,运行完上所有代码才会运行这个

    3)线程池/进程池的同步处理:  换线程为进程即可

    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import time,random
    def run(i):
        print('%s个进来了....' % i)
        time.sleep(random.randint(1,3))
        return i**2
    if __name__ == '__main__':
        t=ThreadPoolExecutor(4)#造个4线程的池子,不指定时默认数是电脑核数的5
        for i in range(1,20):
            obj=t.submit(run,i).result() #程序同步等待在result,等着拿结果
            print(obj,'这里不会先执行?')   #打印结果
        print('') #程序会阻塞在上面result,运行完上所有代码才会运行这个

  • 相关阅读:
    读书笔记:你就是极客软件开发人员生存指南
    读书笔记:重来 Rework
    敏捷个人2012.1月份线下活动报道:谈谈职业
    敏捷个人2011.12月份线下活动报道:认识自我
    敏友的【敏捷个人】有感(12): 敏友们自发组织的线上思想的碰撞
    敏捷团队:我尽力先做好本职工作是否正确?
    OpenExpressApp:精通 WPF UI Virtualization
    MDSF:发布图形编辑器源码OpenGraphicEditor
    产品管理:用户访谈之道
    敏捷个人架构图
  • 原文地址:https://www.cnblogs.com/quzq/p/9715809.html
Copyright © 2011-2022 走看看