zoukankan      html  css  js  c++  java
  • python之线程和进程(并发编程)

    python的GIL

    In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

    上面的核心意思就是,无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行

    线程

    1同步锁

    2死锁递归锁

    3:信号量和同步对象(了解)

    4队列------生产者消费者模型

    5进程

    线程的基本调用

    <python的线程与threading模块>

    import threading  # 线程
    import time
    
    
    def Hi(num):
        print('hello %d' % num)
        time.sleep(3)
    
    if __name__ == '__main__':
        # 第一个参数是要执行的函数名,第二个是函数的参数(必须是可迭代对象)
        t1 = threading.Thread(target=Hi, args=(10, ))  # 创建一个线程对象
        t1.start()  # 开启线程
        t2 = threading.Thread(target=Hi, args=(9, ))  # 创建一个线程对象
        t2.start()  # 开启线程
        print('ending....')
    #   这就是并发的现象
    
    #   并行:指的是两个或者多个事件在同一时刻发生(同时调用多核)
    #   并发:指的是两个或者多个事件在同一时间间隔内发生(在一个核内快速的切换)

    第二种调用方式

    import threading
    import time
    
    
    class MyThread(threading.Thread):
        def __init__(self,num):
            threading.Thread.__init__(self)
            self.num = num
    
        def run(self):#定义每个线程要运行的函数
    
            print("running on number:%s" %self.num)
    
            time.sleep(3)
    
    if __name__ == '__main__':
    
        t1 = MyThread(1)
        t2 = MyThread(2)
        t1.start()
        t2.start()
        
        print("ending......")

    join和setDaemon

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 18-5-22 下午8:45
    # @Author  : LK
    # @File    : lesson2.py
    # @Software: PyCharm
    
    import threading
    import time
    
    def music():
        print('begin listen music %s'%time.ctime())
        # t = input('请输入内容>>>')
        # time.sleep(3)
        print('stop listen music %s'%time.ctime())
        # print(t)
    
    
    def game():
        print('begin to game %s'%time.ctime())
        time.sleep(5)
        print('stop to game %s'%time.ctime())
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=music)
        t2 = threading.Thread(target=game)
    
        t1.start()
        t2.start()
    
        # t1.join()   #   join就是等待的意思,让该线程执行完毕后,在执行主线程
        # t2.join()   # 注意如果注释这一句,结果是什么
    
        print('ending....')
    
        print(t1.getName())  #  获取线程名,
    join
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 18-5-22 下午9:26
    # @Author  : LK
    # @File    : 守护线程.py
    # @Software: PyCharm
    
    #   守护线程 就是:和主线程一起退出,如果主线程结束了,那么不管守护的线程,有没有执行完毕,都退出
    import threading
    import time
    
    
    def music():
        print('begin listen music %s' % time.ctime())
        time.sleep(3)
        print('stop listen music %s' % time.ctime())
    
    
    def game():
        print('begin to game %s' % time.ctime())
        time.sleep(5)
        print('stop to game %s' % time.ctime())
    
    
    t1 = threading.Thread(target=music)
    t2 = threading.Thread(target=game)
    threads = []
    threads.append(t1)
    threads.append(t2)
    if __name__ == '__main__':
    
        # t1.setDaemon(True)
        t2.setDaemon(True)
        for t in threads:
            t.start()
            # t.setDaemon(True) # 守护线程, 就是和主线程一块结束
        print('ending....')
    setDaemon

    join():在子线程完成运行之前,这个子线程的父线程将一直被阻塞。

    setDaemon(True):

             将线程声明为守护线程,必须在start() 方法调用之前设置, 如果不设置为守护线程程序会被无限挂起。这个方法基本和join是相反的。

             当我们 在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成

             想退出时,会检验子线程是否完成。如 果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是 只要主线程

             完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以 用setDaemon方法啦

    其他方法

    # run():  线程被cpu调度后自动执行线程对象的run方法
    # start():启动线程活动。
    # isAlive(): 返回线程是否活动的。
    # getName(): 返回线程名。
    # setName(): 设置线程名。
    
    threading模块提供的一些方法:
    # threading.currentThread(): 返回当前的线程变量。
    # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
    # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

    同步锁lock

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 18-5-23 下午4:42
    # @Author  : LK
    # @File    : 同步锁.py
    # @Software: PyCharm
    import threading, time
    
    #   用多线程去调用一个每次减1的函数,里面让他停留一会就会切换cpu,模拟线程安全问题,引出同步锁(互斥锁)
    
    
    def sub():
        global num
        #  这样坐是0,因为num-=1 这个执行的很快,没有到达时间片切换就执行完了,每次取的都是不同的值
        # num -= 1
        # 如果这里让他停一会,就会发生,前面的一些拿到的值为100,然后开始停下来,让其他线程取值
        # 取得值还可能是100, 再过几个线程,取的时候就是前面的线程执行完了(到达时间片切换了)
        #  取得是就可能是其他值(前面几个线程执行后的结果),最终结果就不在是0
        #   这就是线程安全问题,多个线程都在处理同一个数据,
        # 如果处理的太慢(sleep)就会发生多个线程处理的值都是原始的值(多个线程处理一个值)
    
        #   处理方法: 就是在执行这三条语句时,不让cpu切换,知道处理完成时,再切换(同步锁)
        '''
        定义一个锁:lock = threading.Lock()
        lock.acquire()  #   锁起来(获取)
        这里是要执行的语句,
        lock.release()   #释放
        这里的不执行完,不切换cpu
        '''
        lock.acquire()
        temp = num
        time.sleep(0.001)
        num = temp - 1
        lock.release()
    
    
    num = 100
    l = []
    lock = threading.Lock()
    for i in range(100):
        t = threading.Thread(target=sub)
        l.append(t)
        t.start()
    
    for t in l:
        t.join()
    print(num)
    同步锁(互斥锁)

    死锁(递归锁)

    #   产生死锁:
    '''
    第一个线程执行doLockA, 然后lockB, 执行完actionA, 都释放了
    开始执行actionB,对B上锁,不允许切换cpu,与此同时第二个线程开始执行actionA,对A上锁
    不允许切换cpu,两个线程同时需要对方的资源释放但是都没有释放,所以死锁
    (就像你问我要个香蕉, 我问你要个苹果, 你说我先给你,我说你先给我,于是就死锁了)
    '''
    '''
    解决方法:
    用threading.RLock() 去替换锁A和锁B
    rlock就是在内部实现一个计数器,在同一个线程中,加锁就+1,释放就减一,只要锁数大于0,
    其他线程就不能申请锁,
    就是执行过程就是:当A函数执行完之后,所有线程开始抢占资源,谁抢到谁开始执行,
    比如线程2抢到, 线程2开始执行A函数,A没有执行完之前其他线程不能够继续执行
    '''
     
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 18-5-23 下午8:33
    # @Author  : LK
    # @File    : 递归锁_死锁.py
    # @Software: PyCharm
    import threading, time
    
    #   产生死锁:
    '''
    第一个线程执行doLockA, 然后lockB, 执行完actionA, 都释放了
    开始执行actionB,对B上锁,不允许切换cpu,与此同时第二个线程开始执行actionA,对A上锁
    不允许切换cpu,两个线程同时需要对方的资源释放但是都没有释放,所以死锁
    (就像你问我要个香蕉, 我问你要个苹果, 你说我先给你,我说你先给我,于是就死锁了)
    '''
    
    
    class myThread(threading.Thread):
        def actionA(self):
            lockA.acquire()
            print(self.name, 'doLockA', time.ctime())
            time.sleep(3)
            lockB.acquire()
            print(self.name, 'doLockB', time.ctime())
            lockB.release()
            lockA.release()
    
        def actionB(self):
            lockB.acquire()
            print(self.name, 'doLockB', time.ctime())
            time.sleep(1)
            lockA.acquire()
            print(self.name, 'doLockA', time.ctime())
            lockA.release()
            lockB.release()
    
        def run(self):
            self.actionA()
            time.sleep(0.5)
            self.actionB()
    
    
    if __name__ == '__main__':
        lockA = threading.Lock()
        lockB = threading.Lock()
        r_lock = threading.RLock()
        '''
        解决方法:
        用threading.RLock() 去替换锁A和锁B
        rlock就是在内部实现一个计数器,在同一个线程中,加锁就+1,释放就减一,只要锁数大于0,
        其他线程就不能申请锁,
        就是执行过程就是:当A函数执行完之后,所有线程开始抢占资源,谁抢到谁开始执行,
        比如线程2抢到, 线程2开始执行A函数,A没有执行完之前其他线程不能够继续执行
        '''
    
        threads = []
        for i in range(5):
            '''创建5个线程对象'''
            threads.append(myThread())
        for t in threads:
            t.start()
        for i in threads:
            t.join()
        print('ending.....')
    死锁(递归锁)

    信号量与同步对象

    信号量, 也是一种锁的机制, 在那个递归锁中的同时事件的安全问题中
    里面是多个线程同时抢占资源, 但是这个是只允许指定的个数,去抢占
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 18-5-24 下午4:45
    # @Author  : LK
    # @File    : 信号量.py
    # @Software: PyCharm
    
    '''
    信号量, 也是一种锁的机制, 在那个递归锁中的同时事件的安全问题中
    里面是多个线程同时抢占资源, 但是这个是只允许指定的个数,去抢占
    '''
    import threading, time
    class myThread(threading.Thread):
        def run(self):
            if semaphore.acquire():
                print(self.name)
                time.sleep(3)
                semaphore.release()
    
    if __name__ == '__main__':
    
        #   信号量, 参数的意思是同时允许几个线程去执行
        semaphore = threading.Semaphore(5)
    
        threads = []
        for i in range(10):
            threads.append(myThread())
    
        for t in threads:
            t.start()
        for t in threads:
            t.join()
    信号量

    同步对象

    event

    '''
    event 同步对象标志位,就是在一个线程中设定后,在其他线程中也能捕获到
    需求:一个boss类,一个worker类,
    当boss对象执行后,worker才执行,就需要设置一个同步的标志位,用来判断
    该执行那个线程
    '''
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 18-5-23 下午9:38
    # @Author  : LK
    # @File    : 同步对象.py
    # @Software: PyCharm
    
    '''
    event 同步对象标志位,就是在一个线程中设定后,在其他线程中也能捕获到
    需求:一个boss类,一个worker类,
    当boss对象执行后,worker才执行,就需要设置一个同步的标志位,用来判断
    该执行那个线程
    '''
    '''
    执行过程:
    刚开始有5个worker线程和1个boss线程执行,
    当worker执行时,wait没有被设定,一直在阻塞,但是boss可以执行
    于是设定event并开始等5秒, 同时5个worker中的标志位变了,可以继续执行了
    1秒后,worker清除event标志,继续阻塞,又过了4秒boss又设置了,
    同时worker又可以执行了
    '''
    import threading, time
    class Boss(threading.Thread):
        def run(self):
            print(self.name,'boss今天大家要加班到22点')
            print(event.isSet())  # false,判断是否标志
            event.set()
            time.sleep(5)
            print(self.name,'boss: <22:00>可以下班了')
            print(event.isSet())
            event.set()
    
    
    class Worker(threading.Thread):
        def run(self):
            event.wait()  # 一旦event被设定 就等同于pass, 不设定就阻塞等待被设定
            print(self.name,'worker:哎呀生活苦呀,')
            time.sleep(1)
            event.clear()
            event.wait()
            print(self.name,'worker: oyear下班了')
    
    
    #   如果不用event的话,就可能导致worker先说话,但是需求是让boss先说话,
    #   因为cpu是抢占的方式执行的,不会按照需求走,所以就要引入同步标志位event
    if __name__ == '__main__':
        event = threading.Event()
        threads =[]
    
        #   创建5个worker线程,1个boss线程
        for i in range(5):
            threads.append(Worker())
        threads.append(Boss())
    
        for t in threads:
            t.start()
        for t in threads:
            t.join()
        print('ending....')
    同步对象

    队列----多线程常用方法

    队列的引入

    '''
    用多个线程同时对列表进行删除最后一个值,
    会报错, 是因为可能会有多个线程同时取到最后一个值,都进行删除,
    就会报错, 解决方法:可以用锁,将其锁住,(相当于串行的执行), 或者用队列
    '''
    '''
    l = [1,2,3,4,5]
    def fun():
        while 1:
            if not l:
                break
            a = l[-1]
            print(a)
            time.sleep(1)
            l.remove(a)
    if __name__ == '__main__':
        t1 = threading.Thread(target=fun, args=())
        t2 = threading.Thread(target=fun, args=())
        t1.start()
        t2.start()
    '''
    队列的引入
    Python Queue模块有三种队列及构造函数:
    1、Python Queue模块的FIFO队列先进先出。 class queue.Queue(maxsize)
    2、LIFO类似于堆,即先进后出。 class queue.LifoQueue(maxsize)
    3、还有在一种是优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize)
    import queue
    
    #   创建一个队列,里面最多可以存放3个数据
    q = queue.Queue(maxsize=3)  # FIFO,先进先出
    # 将一个值入队
    q.put(10)
    q.put(9)
    q.put(8)
    #   如果这里在加上一个put, 就是有4个数据入队,但是空间不够,就会一直阻塞,直到,队列有空间时(从队列中取出get)
    #   block参数默认是True, 如果改成False,当队列满的时候如果继续入队,就不会堵塞而是报错queue.Full
    # q.put(9, block=False)
    while 1:
        '''
        将一个值从队列中取出
        q.get()
        调用队列对象的get()
        方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,
        get()
        就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。
        '''
        # 取完之后将会阻塞, block=Fales,如果队列为空时,继续出队,就会报错,queue.Empty
        #   get类似与put
        data = q.get(block=False)
        print(data)
    调用一
    # 先进后出  LIFO 类似与栈
    import queue
    # 先进后出  LIFO 类似与栈
    q = queue.LifoQueue(maxsize=4)
    q.put('你好')
    q.put(123)
    q.put({"name":"lucky"})
    
    while 1:
        data = q.get()
        print(data)
    调用二
    # 还有在一种是优先级队列级别越低越先出来1级最低。class queue.PriorityQueue(maxsize)
    import queue
    q = queue.PriorityQueue(maxsize=3)
    q.put([2,'你好'])
    q.put([3,123,'aa'])
    q.put([1,{"name":"lucky"}])
    
    while 1:
        data = q.get()
        # print(data)
        print(data[1])  #   只显示数据,
    调用三
    import queue
    q = queue.Queue(maxsize=4)
    q.put('a')
    q.put('b')
    q.put('c')
    print(q.qsize())  # 打印当前队列的大小,现在队列中有几个值
    print(q.full())  # 判断是否满
    print(q.empty())  # 判断是否为空
    # q.put_nowait(3)  #  相当于q.put(3, block=Flase)
    # q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
    # q.join() 实际上意味着等到队列为空,再执行别的操作
    while 1:
        data = q.get()
        print(data)
        print(q.qsize())  # 打印当前队列的大小,现在队列中有几个值
        print(q.full())   # 判断是否满
        print(q.empty())  # 判断是否为空
    队列的其他方法

    进程的基本使用

    多进程模块 multiprocessing

    进程的使用和线程的使用和调用的方法基本一样

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 18-5-25 下午5:31
    # @Author  : LK
    # @File    : 测试进程和线程.py
    # @Software: PyCharm
    from threading import Thread
    from multiprocessing import Process
    import time
    
    def fun():
        print(time.ctime())
    if __name__ == '__main__':
        threads = []
        for i in range(1000):
            # t = Thread(target=fun)  # 线程
            t = Process(target=fun)   # 进程
            threads.append(t)
        for t in threads:
            t.start()
        for t in threads:
            t.join()
    from  multiprocessing import Process
    import time
    
    def f(name):
        time.sleep(1)
        print('hello  %s'%name, time.ctime())
    
    if __name__ == '__main__':
        p_list = []
        for t in range(3):
            p = Process(target=f, args=('luck',))
            p_list.append(p)
    
        #   三个进程时在同一时刻开启的
        for p in p_list:
            p.start()
        for p in p_list:
            p.join()
    
        print('ending....')
    进程的调用方法一
    class myProcess(Process):
        def __init__(self):
            # 父类初始化, 不传参数不用写
            super(myProcess, self).__init__()
        def run(self):
            time.sleep(1)
            print(self.name,'hello ',time.ctime())
    
    if __name__ == '__main__':
        p_list = []
        for i in range(3):
            p_list.append(myProcess())
        for p in p_list:
            p.start()
        for p in p_list:
            p.join()
    进程的调用方法二

    查看进程号

    #   查查看进程的id
    #   os.getppid()获得当前进程的父进程,os.getpid()获得当前进程
    #   os.getppid 是pycharm的进程号
    '''
    from multiprocessing import Process
    import os
    import time
    
    
    def info(title):
        print("title:", title)
        print('父进程 id:', os.getppid())
        print('当前进程 id:', os.getpid())
        # time.sleep(3)
    
    
    if __name__ == '__main__':
        info('主进程')
        time.sleep(1)
        print("------------------")
        p = Process(target=info, args=('lucky子进程',))
        p.start()
        p.join()
        p_list = []
        for i in range(3):
            p = Process(target=info, args=('子进程%d' % i,))
            p_list.append(p)
        for p in p_list:
            p.start()
        for p in p_list:
            p.join()
    查看进程号,进程之间的关系理解
    Process 类
    构造方法:
    
    Process([group [, target [, name [, args [, kwargs]]]]])
    
      group: 线程组,目前还没有实现,库引用中提示必须是None; 
      target: 要执行的方法; 
      name: 进程名; 
      args/kwargs: 要传入方法的参数。
    
    实例方法:
    
      is_alive():返回进程是否在运行。
    
      join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
    
      start():进程准备就绪,等待CPU调度
    
      run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。
    
      terminate():不管任务是否完成,立即停止工作进程
    
    属性:
    
      daemon:和线程的setDeamon功能一样
    
      name:进程名字。
    
    
    '''
    
    from multiprocessing import Process
    import time
    
    class myProcess(Process):
        def __init__(self, args):
            super(myProcess,self).__init__()
            self.args = args
        def fun(self):
            time.sleep(1)
            print(self.name, self.is_alive(), self.args, self.pid)
            time.sleep(1)
    
        def run(self):
            self.fun()
    
    
    
    if __name__ == '__main__':
        p_list = []
        for i in range(10):
            # p = Process(target=fun, args=('子进程%d' % i,))
            p_list.append(myProcess('子进程%d' % i))
        # p_list[-2].daemon = True
        # 因为这三个进程是同时执行的,所以这个属性不明显
        for p in p_list:
            p.start()
        for p in p_list:
            p.join()
    进程模块的其他方法

    进程间的通信

    队列,这里的队列和线程里的队列调用方法不一样

    #   什么是进程通信,为什么要进程通信
    #   因为进程与进程之间是相互独立的,不像线程可以信息共享,要想让进程间相互信息共享,就要传参数(利用队列)
    
    from multiprocessing import Process, Queue # 进程队列
    import time
    import queue   # 线程队列
    
    #   主进程get数据,子进程put数据,
    #   阻塞的原因是,这个队列的数据不共享,在子进程中队列和主进程的队列信息不共享,没有任何关系
    #   结局方法,将队列作为参数传过去
    #   注意在win上面不传参数不能运行,但是在linux下能运行
    #  所以轻易不要用多进程,进行进程间通信时,会涉及到copy资源
    # def fun(q):
    # '''  队列
    def fun():
        q.put(1)
        q.put(2)
        q.put(3)
        print(id(q))
        # print(q.empty())
    # def fun2():
    def fun2(q):
        while 1:
            # print(q.empty())
            data = q.get()
            print(data)
            print(id(q))
    
    if __name__ == '__main__':
        # q = queue.Queue()  # 线程队列,
        q = Queue()          # 进程队列
        # p = Process(target=fun, args=(q,))  # 传参进行了copy占用资源
        p = Process(target=fun)
        p.start()
        p.join()
        p2 = Process(target=fun2)
        p2 = Process(target=fun2, args=(q,))
        p2.start()
        p2.join()

    管道

    from multiprocessing import Pipe
    def fun(child_conn):
        child_conn.send([12, {"name":"yuan"}, 'hello'])
        print('主进程说:',child_conn.recv())
        print('子进程id',id(child_conn))
    
    if __name__ == '__main__':
        prepare_conn, child_conn = Pipe()  # 双向管道
        print('id:',id(prepare_conn), id(child_conn))
        p = Process(target=fun, args=(child_conn,))
        p.start()
        print('子进程说:',prepare_conn.recv())
        prepare_conn.send('你好')
        print('主进程id',id(prepare_conn))
        p.join()

    Managers

    Queue和pipe只是实现了数据交互,并没有实现数据共享,即一个进程去更改另外一个进程的数据

     
    Managers支持的数据类型
    # A manager returned by Manager() will support types list, dict, Namespace,
    # Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier,
    #  Queue, Value and Array. For example:
    
    from multiprocessing import Manager
    # Queue和pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据。
    # 所以利用Managers 进行数据共享, 同样也需要传参
    
    def fun(d, l, i):
        d[i]='字典'+ str(i)
        l.append(i)
        # pass
    if __name__ == '__main__':
        with Manager() as manager:
            d = manager.dict()  #  d = {} 创建一个字典
            l = manager.list(range(5)) # l = range(5)
            p_list = []
            for i in range(5):
                p = Process(target=fun,args=(d,l,i,))
                p.start()
                p_list.append(p)
            for p in p_list:
                p.join()
    
            print(d)
            print(l)
    Managers模块

    进程同步

    #   进程锁, lock
    from multiprocessing import Process, Lock
    import time
    
    def fun(i):
        lock.acquire()
        print('%d'%i)
        lock.release()
    if __name__ == '__main__':
        lock = Lock()
        for i in range(10):
            p = Process(target=fun, args=(i,)).start()
    进程同步,锁

    进程池

    进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,
    如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。
    进程池中有两个方法:
    apply 同步
    apply_async 异步(常用)

    在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。多进程是实现并发的手段之一,需要注意的问题是:

    1. 很明显需要并发执行的任务通常要远大于核数
    2. 一个操作系统不可能无限开启进程,通常有几个核就开几个进程
    3. 进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行)

    例如当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个。。。手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。

    我们就可以通过维护一个进程池来控制进程数目,比如httpd的进程模式,规定最小进程数和最大进程数... 
    ps:对于远程过程调用的高级应用程序而言,应该使用进程池,Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。

     

    '''
    进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,
    如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。
    进程池中有两个方法:
    apply   同步
    apply_async   异步(常用)
    '''
    from multiprocessing import Process, Pool
    import time, os
    
    def foo(i):
        time.sleep(1)
        print('子进程:',os.getpid(),i)
        return 'Hello %s'%i
    def bar(args):
        # print('',os.getpgid())
        print(args)
        print('主进程:',os.getpid())
    
    if __name__ == '__main__':
        pool = Pool(5)   #  进程池中开5个进程, 有100个任务需要执行, 每次执行5个
        print('main :',os.getpid())
        for i in range(100):
            #   回调函数:某个动作或者 函数执行成功后,再去执行的函数
            # pool.apply_async(func=foo, args=(i,))
            #   bar作为回调函数, 每次执行一个进程后都会执行回调函数, 回调函数是主进程中调用的
            #   func中函数的返回值,传给回调函数做参数
            pool.apply_async(func=foo, args=(i,), callback=bar)
        #   close和join必须加,而且顺序固定
        pool.close()
        pool.join()
    进程池

    回调函数

     回掉函数:

    需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数

    我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。

    from multiprocessing import Pool
    import requests
    import json
    import os
    
    def get_page(url):
        print('<进程%s> get %s' %(os.getpid(),url))
        respone=requests.get(url)
        if respone.status_code == 200:
            return {'url':url,'text':respone.text}
    
    def pasrse_page(res):
        print('<进程%s> parse %s' %(os.getpid(),res['url']))
        parse_res='url:<%s> size:[%s]
    ' %(res['url'],len(res['text']))
        with open('db.txt','a') as f:
            f.write(parse_res)
    
    
    if __name__ == '__main__':
        urls=[
            'https://www.baidu.com',
            'https://www.python.org',
            'https://www.openstack.org',
            'https://help.github.com/',
            'http://www.sina.com.cn/'
        ]
    
        p=Pool(3)
        res_l=[]
        for url in urls:
            res=p.apply_async(get_page,args=(url,),callback=pasrse_page)
            res_l.append(res)
    
        p.close()
        p.join()
        print([res.get() for res in res_l]) #拿到的是get_page的结果,其实完全没必要拿该结果,该结果已经传给回调函数处理了
    进程池应用

    协程

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 18-5-26 上午10:19
    # @Author  : LK
    # @File    : 协程.py
    # @Software: PyCharm
    from greenlet import greenlet
    
    
    #   优缺点
    '''
    def test1():
        print(12)
        g2.switch()  #  切换到g2 
        print(24)
        g2.switch()  #  切换到g2
    def test2():
        print(56)
        g1.switch()  #  切换到g1
        print(65)
        g1.switch()
    
    if __name__ ==  '__main__':
        g1 = greenlet(test1)
        g2 = greenlet(test2)
        g2.switch()
        
    '''
    '''
    import gevent
    
    import requests,time
    
    
    start=time.time()
    
    def f(url):
        print('GET: %s' % url)
        resp =requests.get(url)
        data = resp.text
        f = open('new.html', 'w', encoding='utf-8')
        f.write(data)
        f.close()
        print('%d bytes received from %s.' % (len(data), url))
    
    #   这就相当于一个协程, 每次进行读写操作阻塞时,都会切换cpu
    gevent.joinall([
    
            gevent.spawn(f, 'https://www.python.org/'),
            gevent.spawn(f, 'https://www.yahoo.com/'),
            gevent.spawn(f, 'https://www.baidu.com/'),
            gevent.spawn(f, 'https://www.sina.com.cn/'),
    
    ])
    
    # f('https://www.python.org/')
    #
    # f('https://www.yahoo.com/')
    #
    # f('https://baidu.com/')
    #
    # f('https://www.sina.com.cn/')
    
    print("cost time:",time.time()-start)
    
    '''
    # import threading, gevent
    # def fun():
    #     a = input('>>>')
    #     print(a)
    # def fun2():
    #     print('
    hello')
    # # gevent.joinall([
    # #
    # #         gevent.spawn(fun()),
    # #         gevent.spawn(fun2()),
    # # ])
    # if __name__ == '__main__':
    #     # t1 = threading.Thread(target=fun).start()
    #     # t2 = threading.Thread(target=fun2).start()
    #     gevent.joinall([
    # 
    #             gevent.spawn(fun),
    #             gevent.spawn(fun2),
    #     ])
    未完整

    协程参考:

    http://www.cnblogs.com/linhaifeng/articles/7429894.html

    协程,协作式----- 非抢占式的程序

    自己调动 

    Yield(协程)

    用户态的切换呢

    关键点在哪一步切换

    协程主要解决的是io操作

    协程:本质上就是一个线程

    协程的优势:

    1:没有切换的消耗

    2:没有锁的概念

    但是不能用多核,所以用多进程+协程,是一个很好的解决并发的方案

    驱动事件

    @font-face{ font-family:"Times New Roman"; } @font-face{ font-family:"宋体"; } p.MsoNormal{ mso-style-name:正文; mso-style-parent:""; margin:0pt; margin-bottom:.0001pt; mso-pagination:none; text-align:left; font-family:'Times New Roman'; font-size:12.0000pt; mso-font-kerning:1.0000pt; } span.msoIns{ mso-style-type:export-only; mso-style-name:""; text-decoration:underline; text-underline:single; color:blue; } span.msoDel{ mso-style-type:export-only; mso-style-name:""; text-decoration:line-through; color:red; } @page{mso-page-border-surround-header:no; mso-page-border-surround-footer:no;}@page Section0{ } div.Section0{page:Section0;}

    Io多路复用

    Select 触发方式

    1:水平处罚

    2:边缘触发

    3:IO多路复用优势:同时可以监听多个链接

    Select  负责监听

    IO多路复用:

    Select  那个平台都有

    Poll

    Epoll 效率最高

  • 相关阅读:
    经典论文翻译导读之《Finding a needle in Haystack: Facebook’s photo storage》
    Etcd源码解析(转)
    etcd集群故障处理(转)
    etcd集群部署与遇到的坑(转)
    LeetCode All in One 题目讲解汇总(转...)
    pyinstaller-python->exe
    3个方法解决百度网盘限速(转)
    Tensorflow 教程系列 | 莫烦Python
    分布式存储Seaweedfs源码分析
    解决Java Web项目中Word、Excel等二进制文件编译后无法打开的问题
  • 原文地址:https://www.cnblogs.com/xiaokang01/p/9096437.html
Copyright © 2011-2022 走看看