zoukankan      html  css  js  c++  java
  • 网络编程进阶(进程、线程、协程、IO模型)

    一、多进程

    开启子进程的两种方式

      方式一:

    from multiprocessing import Process
    import time
    
    
    def func(name):
        print('%s is going' % name)
        time.sleep(3)
        print('%s is done' % name)
    
    
    if __name__ == '__main__':
        a = Process(target=func, args=('jack',))  # 必须加逗号
        a.start()
        print('asdwad')
    View Code

      方式二:

    from multiprocessing import Process
    import time
    
    
    class MYfunc(Process):
        def __init__(self, name):
            super().__init__()  # 继承Process的方法
            self.name = name
    
        def run(self):  # 默认的函数名为run,不能去修改 
            print('%s is going' % self.name)
            time.sleep(3)
            print('%s is done' % self.name)
    
    
    if __name__ == '__main__':
        a = MYfunc('jack')  
        a.start()
        print('asdwad')
    View Code

      查看进程的pid------------------os.getpid() 方法/ os.getppid

    僵尸进程:一个进程出发子进程后,子进程执行结束后,没有回收的,任然保存在进程中的,这种称谓僵尸进程。

    孤儿进程:一个父进程退出了,而他的子进程还在运行,那么这些子进程就是孤儿进程,会被init进程回收。

    Process对象的其他方法和属性

      jion()方法(回收僵尸进程,等子进程结束了,回收子进程,在去做接下来的工作)

      is_alive()  判断进程是活的还是死的,返回TRUE/FALSE

    p.start():启动进程,并调用该子进程中的p.run() 
    p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  
    
    p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
    p.is_alive():如果p仍然运行,返回True
    
    p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间。

    守护进程

      1.守护进程会在主进程代码结束后就终止

      2.守护进程内无法在开启子进程,否则抛出异常

    from multiprocessing import Process
    import time
    
    
    def func(name):
        print('%s is going' % name)
        time.sleep(3)
        print('%s is done' % name)
    
    
    if __name__ == '__main__':
        a = Process(target=func, args=('jack',))  # 必须加逗号
        a.daemon = True  # 必须在主进程结束前开启
        a.start()
    
        print('主进程')

    互斥锁(保证共享数据的安全)

      (特点:由并发变成了串行,降低了效率,避免了竞争)

    import os
    import time
    from multiprocessing import Process, Lock
    
    
    def work(lock):
        lock.acquire()  # 上锁
        print('%s is running' % os.getpid())
        time.sleep(2)
        print('%s is done' % os.getpid())
        lock.release()  # 解锁
    
    
    if __name__ == '__main__':
        lock = Lock()
        for i in range(3):
            p = Process(target=work, args=(lock,))  # 需要把所传给子进程;为了保证使用的是同一把锁
            p.start()

    队列(推荐使用)

    创建队列的类(底层就是以管道和锁的方式实现)

    from multiprocessing import Queue
    
    q = Queue(3)  # 指定允许的最大的项数
    # 1 q.put方法用以插入数据到队列中,
    #       put方法还有两个可选参数:blocked和timeout。
         如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
    # 2 q.get方法可以从队列读取并且删除一个元素。 # 同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常. # 3 q.get_nowait():同q.get(False) # 4 q.put_nowait():同q.put(False) # 5 q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。 # 6 q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。 # 7 q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样

    生产者消费者模型

      生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

    import random
    import time
    from multiprocessing import Process, JoinableQueue
    
    
    def consumer(q, name):
        while True:
            res = q.get()
            time.sleep(random.randint(1, 3))
            print('33[43m%s 吃 %s33[0m' % (name, res))
            q.task_done()  # 发送信号给q.join()
    
    
    def producer(q, name, food):
        for i in range(3):
            time.sleep(random.randint(1, 3))
            res = '%s%s' % (food, i)
            q.put(res)
            print('33[45m%s 生产了 %s33[0m' % (name, res))
        q.join()  # 等到消费者把队列中的数据都取走之后, 生产者的进程才结束
    
    
    if __name__ == '__main__':
        q = JoinableQueue()
        # 生产者们:即厨师们
        p1 = Process(target=producer, args=(q, '老王', '包子'))
    
        # 消费者们:即吃货们
        c1 = Process(target=consumer, args=(q, '小李'))
        c1.daemon = True
    
        # 开始
        p1.start()
        c1.start()
    
        p1.join()
        print('')

    二、线程

      多线程:在一个进程中存在多个控制线程,多个控制线程共享该进程的地址空间;相当于一个车间有多条流水线,多条流水线都共用这个车间一样。

      开启线程的两种方式:

    from threading import Thread
    
    
    def talk(name):
        print('%s is going' % name)
        time.sleep(2)
        print('%s is  end' % name)
     
    
    if __name__ == '__main__':
        t1 = Thread(target=talk, args=('egon',))
        t1.start()
        print('主线程')
    import time
    import random
    from threading import Thread
    
    
    class MyThread(Thread):
        def __init__(self, name):
            super().__init__()
            self.name = name
    
        def run(self):
            print('%s is going' % self.name)
            time.sleep(random.randrange(1, 5))
            print('%s is  end' % self.name)
    
    
    if __name__ == '__main__':
        t1 = MyThread('egon')
        t1.start()
        print('')

    进程和线程的区别

      1.同一进程内的线程共享该进程内的地址资源

      2.创造线程的开销要远小于创造进程的开销

    线程对象的其他属性和方法

    Thread实例对象的方法
      # isAlive(): 返回线程是否活动的。
      # getName(): 返回线程名。
      # setName(): 设置线程名。
    
    threading模块提供的一些方法:
      # threading.currentThread(): 返回当前的线程变量。
      # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
      # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

    守护线程

      主线程所在的进程内的所有非守护进程运行完毕才会结束。

      设置方法和上面一样

    GIL和互斥锁的区别:

      GIL所的本质就是互斥锁,它管理的是PYTHON 解释器级别的,比如垃圾回收的数据;而LOCK保护的是开发者的数据。

    DIL与多线程

      IO密集型 用多线程;计算密集型,用多进程。

    死锁

      指两个或者两个以上的进程或者线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力干预,他们讲无法推进下去。   

    递归锁

      可以解决死锁的问题,所谓递归锁就是,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock

    mutexA=mutexB=RLock() #一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止

    信号量

      实质上也是一把锁

    from threading import Thread, Semaphore
    import threading
    import time
    
    
    def func():
        sm.acquire()
        print('%s get sm' % threading.current_thread().getName())
        time.sleep(3)
        sm.release()
    
    
    if __name__ == '__main__':
        sm = Semaphore(5)
        for i in range(23):
            t = Thread(target=func)
            t.start()
            
            
    # Semaphore管理一个内置的计数器,
    # 每当调用acquire()时内置计数器-1;
    # 调用release() 时内置计数器+1;
    # 计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

    EVENT事件

    import time
    from threading import Thread, Event, currentThread
    
    event = Event()
    
    
    def conn():
        n = 0
        while not event.is_set():
            if n == 3:
                print('%s try too many times' % currentThread().getName())
                return
            print('%s try %s' % (currentThread().getName(), n))
            event.wait(0.5)
            n += 1
    
        print('%s is connected' % currentThread().getName())
    
    
    def check():
        print('%s is checking' % currentThread().getName())
        time.sleep(5)
        event.set()
    
    
    if __name__ == '__main__':
        for i in range(3):
            t = Thread(target=conn)
            t.start()
        t = Thread(target=check)
        t.start()
    
    
    # from threading import Event
    # event.isSet():返回event的状态值;
    # event.wait():如果 event.isSet()==False将阻塞线程;
    # event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
    # event.clear():恢复event的状态值为False。

    定时器

      设置多长时间后去执行其他操作;应用方面-------验证码的更新

    from threading import Timer
    
    def hello():
        print("hello, world")
    
    t = Timer(1, hello)
    t.start()  # after 1 seconds, "hello, world" will be printed

    线程queue

    import queue
    
    q=queue.Queue()  # 队列/先进先出
    q.put('first')
    q.put('second')
    q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())
    
    
    
    '''
    结果(先进先出):
    first
    second
    third
    '''
    import queue
    
    q=queue.LifoQueue()  # 堆栈/ 后进先出
    q.put('first')
    q.put('second')
    q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())
    
    
    
    '''
    结果(后进先出):
    third
    second
    first
    '''
    import queue
    
    q=queue.PriorityQueue()  # 优先级队列
    #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
    q.put((20,'a'))
    q.put((10,'b'))
    q.put((30,'c'))
    
    print(q.get())
    print(q.get())
    print(q.get())
    
    
    
    '''
    结果(数字越小优先级越高,优先级高的优先出队):
    (10, 'b')
    (20, 'a')
    (30, 'c')
    '''

    进程池和线程池(对于开启的进程数或线程数加以控制,让它在一个机器可以承受的范围内)

    from concurrent.futures import ThreadPoolExecutor
    import time,os,random
    
    
    def task(name):
        print('name:%s pid:%s run' % (name, os.getpid()))
        time.sleep(random.randint(1, 3))
    
    
    if __name__ == '__main__':
        # pool=ProcessPoolExecutor(4)  # 设置最大的进程数
        pool = ThreadPoolExecutor(5)  # 设置最大的线程数
    
        for i in range(10):
            pool.submit(task, 'egon%s' % i)
    
        pool.shutdown(wait=True)  # 等进程或者线程所有任务执行完毕之后回收资源才继续
    
        print('')

        进程池和线程池的基本方法

    1、submit(fn, *args, **kwargs)
    异步提交任务
    
    2、map(func, *iterables, timeout=None, chunksize=1) 
    取代for循环submit的操作
    
    3、shutdown(wait=True) 
    相当于进程池的pool.close()+pool.join()操作
    wait=True,等待池内所有任务执行完毕回收完资源后才继续
    wait=False,立即返回,并不会等待池内的任务执行完毕
    但不管wait参数为何值,整个程序都会等到所有任务执行完毕
    submit和map必须在shutdown之前
    
    4、result(timeout=None)
    取得结果
    
    5、add_done_callback(fn)
    回调函数

       进程池和线程池提交任务的方式:异步调用/ 回调机制

         异步调用:提交任务后,不会再原地等待任务执行完毕;程序并行。

         同步调用:提交完任务后,在原地等待任务执行结束,拿到结果,才能执行下一个任务;导致程序串行执行,效率低。

         回调机制:可以为进程池或线程池内的每个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发,并接收任务的返回值当作参数,该函数称为回调函数

    from concurrent.futures import ThreadPoolExecutor
    import time
    import random
    
    
    def eat(name):
        print('%s is eating' %name)
        time.sleep(random.randint(3,5))
        res=random.randint(7,13)*'#'
        return {'name':name,'res':res}
    
    
    def weigh(baozi):
        baozi=baozi.result()  # 拿到结果
        name=baozi['name']
        size=len(baozi['res'])
        print('%s 吃了 《%s》个包子' %(name,size))
    
    
    if __name__ == '__main__':
        pool=ThreadPoolExecutor(13)
    
        pool.submit(eat,'AAA').add_done_callback(weigh)
    
        pool.submit(eat,'BBB').add_done_callback(weigh)
    
        pool.submit(eat,'CCC').add_done_callback(weigh)

    三、协程

      什么是协程: 是单线程下的并发,又称微线程,纤程。协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。

      特点:1.必须在只有一个单线程里实现并发;2.修改共享数据不需要加锁;3.用户自己的程序控制调度;4.一个协程遇到IO操作自动切换到其他协程。

      greenlet模块,可轻易的实现多个任务之间的切换

    from greenlet import greenlet
    
    def eat(name):
        print('%s eat 1' %name)
        g2.switch('用户二')
        print('%s eat 2' %name)
        g2.switch()
    def play(name):
        print('%s play 1' %name)
        g1.switch()
        print('%s play 2' %name)
    
    g1=greenlet(eat)
    g2=greenlet(play)
    
    g1.switch('用户一')#可以在第一次switch时传入参数,以后都不需要

      gevent 模块(推荐使用)

    #用法
    g1=gevent.spawn(func,1,,2,3,x=4,y=5)创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的
    
    g2=gevent.spawn(func2)
    
    g1.join() #等待g1结束
    
    g2.join() #等待g2结束
    
    #或者上述两步合作一步:gevent.joinall([g1,g2])
    
    g1.value#拿到func1的返回值

    遇到IO阻塞时自动切换任务

    from gevent import monkey;monkey.patch_all()  
    # 要用gevent,需要将from gevent import monkey;monkey.patch_all()放到文件的开头
    import gevent import time def eat(): print('eat food 1') time.sleep(2) print('eat food 2') def play(): print('play 1') time.sleep(1) print('play 2') g1=gevent.spawn(eat) # 创建一个协程对象 g2=gevent.spawn(play_phone) gevent.joinall([g1,g2]) # 等待任务结束 print('')

    四、IO模型

    IO操作需要经历的两个阶段:1.等待数据  2.拷贝数据

    阻塞IO模型(blocking IO)-------等待数据和拷贝数据两个阶段都被阻塞了。

    非阻塞IO模型(Noblocking IO)-------处理了等待数据的阶段,拷贝数据阶段仍然存在。 

      缺点:

    1. 循环调用recv()将大幅度推高CPU占用率;这也是我们在代码中留一句time.sleep(2)的原因,否则在低配主机下极容易出现卡机情况
    2. 任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。
    这会导致整体数据吞吐量的降低。

    多路复用IO模型(IO multiplexing)----------select 监听,可以处理多个连接,但不适用于单个连接。

    异步IO模型Asynchronous I/O------等待数据和拷贝数据两个阶段都优化了。

  • 相关阅读:
    DP 训练题目
    洛谷 P1736 创意吃鱼法
    树形背包
    树形DP
    轻松完爆Helm私有仓库
    轻松完爆Helm公共仓库
    一分钟轻松玩转Helm
    ceph -s 出现 mon is allowing insecure global_id reclaim
    Django下载与简介
    部署ceph集群 (Nautilus版)
  • 原文地址:https://www.cnblogs.com/Holmes-98/p/14674982.html
Copyright © 2011-2022 走看看