zoukankan      html  css  js  c++  java
  • python并发编程之多线程(操作篇)

    目录:

    一、threading模块介绍

    multiprocess模块的完全模仿了threading模块的接口,二者在使用层面,有很大的相似性,因而不再详细介绍

    二、开启线程的两种方式:

    方式一:

    from threading import Thread
    import time
    
    #方式一
    def talk(name):
        time.sleep(1)
        print("{} age is 18".format(name))
    
    
    if __name__ == '__main__':
        t1=Thread(target=talk,args=("egon",))
        t1.start()
    
        print("")
        
    '''
    主
    egon age is 18
    '''

    方式二:

    from threading import Thread
    import time
    
    class PrintAge(Thread):
        def __init__(self,name):
            super().__init__()
            self.name=name
    
        def run(self):
            time.sleep(1)
            print("{} age is 18".format(self.name))
    
    if __name__ == '__main__':
        t1=PrintAge("egon")
        t1.start()
        print("")
    
    """
    主
    egon age is 18
    """

    三 、在一个进程下开启多个线程与在一个进程下开启多个子进程的区别

    1 谁的开启速度快

    from threading import Thread
    from multiprocessing import Process
    import os
    
    def work():
        print('hello')
    
    if __name__ == '__main__':
        #在主进程下开启线程
        t=Thread(target=work)
        t.start()
        print('主线程/主进程')
        '''
        打印结果:
        hello
        主线程/主进程
        '''
    
        #在主进程下开启子进程
        t=Process(target=work)
        t.start()
        print('主线程/主进程')
        '''
        打印结果:
        主线程/主进程
        hello
        '''

    2 瞅一瞅pid

    from threading import Thread
    from multiprocessing import Process
    import os
    
    def work():
        print('hello',os.getpid())
    
    if __name__ == '__main__':
        #part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样
        t1=Thread(target=work)
        t2=Thread(target=work)
        t1.start()
        t2.start()
        print('主线程/主进程pid',os.getpid())
    
        #part2:开多个进程,每个进程都有不同的pid
        p1=Process(target=work)
        p2=Process(target=work)
        p1.start()
        p2.start()
        print('主线程/主进程pid',os.getpid())

    3 同一进程内的线程共享该进程的数据?

    from  threading import Thread
    from multiprocessing import Process
    import os
    def work():
        global n
        n=0
    
    if __name__ == '__main__':
        # n=100
        # p=Process(target=work)
        # p.start()
        # p.join()
        # print('主',n) #毫无疑问子进程p已经将自己的全局的n改成了0,但改的仅仅是它自己的,查看父进程的n仍然为100
    
    
        n=1
        t=Thread(target=work)
        t.start()
        t.join()
        print('',n) #查看结果为0,因为同一进程内的线程之间共享进程内的数据

    四、练习

    TCP服务端实现并发的效果

    1、用多进程实现:

    #多进程
    from multiprocessing import Process
    import socket
    
    phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
    
    phone.bind(("127.0.0.1",9000))
    phone.listen(5)
    
    #通信循环函数
    def talk(conn,client_addr):
        while True:
            try:
                msg=conn.recv(1024)
                if not msg:break
                conn.send(msg.upper())
            except Exception:
                break
    
    if __name__ == '__main__':
        while True:
            conn,client_addr=phone.accept()
            p=Process(target=talk,args=(conn,client_addr))
            p.start()
    server
    from socket import *
    
    client=socket(AF_INET,SOCK_STREAM)
    client.connect(('127.0.0.1',8080))
    
    
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
    
        client.send(msg.encode('utf-8'))
        msg=client.recv(1024)
        print(msg.decode('utf-8'))
    client

    2、多线程实现

    from threading import Thread
    import socket
    
    phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
    
    phone.bind(("127.0.0.1",9000))
    phone.listen(5)
    
    #通信循环函数
    def talk(conn,client_addr):
        while True:
            try:
                msg=conn.recv(1024)
                if not msg:break
                conn.send(msg.upper())
            except Exception:
                break
    
    if __name__ == '__main__':
        while True:
    
            #多线程
            conn, client_addr = phone.accept()
            t=Thread(target=talk,args=(conn,client_addr))
            t.start()
    server
    from socket import *
    
    client=socket(AF_INET,SOCK_STREAM)
    client.connect(('127.0.0.1',9000))
    
    
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
    
        client.send(msg.encode('utf-8'))
        msg=client.recv(1024)
        print(msg.decode('utf-8'))
    client

    五、线程相关的其他方法

    Thread实例对象的方法
      # isAlive(): 返回线程是否活动的。
      # getName(): 返回线程名。
      # setName(): 设置线程名。
    
    threading模块提供的一些方法:
      # threading.currentThread(): 返回当前的线程变量。
      # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
      # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
    from threading import Thread
    import threading
    from multiprocessing import Process
    import os
    
    def work():
        import time
        time.sleep(3)
        print(threading.current_thread().getName())
    
    
    if __name__ == '__main__':
        #在主进程下开启线程
        t=Thread(target=work)
        t.start()
    
        print(threading.current_thread().getName())
        print(threading.current_thread()) #主线程
        print(threading.enumerate()) #连同主线程在内有两个运行的线程
        print(threading.active_count())
        print('主线程/主进程')
    
        '''
        打印结果:
        MainThread
        <_MainThread(MainThread, started 140735268892672)>
        [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
       2 主线程/主进程 Thread-1
    '''

    主线程等待子线程结束:(join方法)

    from threading import Thread
    import time
    def sayhi(name):
        time.sleep(2)
        print('%s say hello' %name)
    
    if __name__ == '__main__':
        t=Thread(target=sayhi,args=('egon',))
        t.start()
        t.join()
        print('主线程')
        print(t.is_alive())
        '''
        egon say hello
        主线程
        False
        '''

    六、守护线程

    无论是进程还是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁

    需要强调的是:运行完毕并非终止运行

    #1.对主进程来说,运行完毕指的是主进程代码运行完毕
    
    #2.对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕

    详细解释:

    #1 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,
    
    #2 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
    from threading import Thread
    import time
    def sayhi(name):
        time.sleep(2)
        print('%s say hello' %name)
    
    if __name__ == '__main__':
        t=Thread(target=sayhi,args=('egon',))
        t.setDaemon(True) #必须在t.start()之前设置
        t.start()
    
        print('主线程')
        print(t.is_alive())
        '''
        主线程
        True
        '''

    迷惑人的例子

    from threading import Thread
    import time
    def foo():
        print(123)
        time.sleep(1)
        print("end123")
    
    def bar():
        print(456)
        time.sleep(3)
        print("end456")
    
    
    t1=Thread(target=foo)
    t2=Thread(target=bar)
    
    t1.daemon=True
    t1.start()
    t2.start()
    print("main-------")
    
    '''
    123
    456
    main-------
    end123
    end456
    '''

    七、同一个进程下的多个线程数据是共享的

    from threading import Thread
    import time
    
    
    money = 100
    
    
    def task():
        global money
        money = 666
        print(money)
    
    
    if __name__ == '__main__':
        t = Thread(target=task)
        t.start()
        t.join()
        print(money)

    八、Python GIL(Global Interpreter Lock)

    链接:http://www.cnblogs.com/linhaifeng/articles/7449853.html

    """
    In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple 
    native threads from executing Python bytecodes at once. This lock is necessary mainly 
    because CPython’s memory management is not thread-safe. (However, since the GIL 
    exists, other features have grown to depend on the guarantees that it enforces.)
    """
    
    """
    python解释器其实有多个版本
        Cpython
        Jpython
        Pypypython
    但是普遍使用的都是CPython解释器
    
    在CPython解释器中GIL是一把互斥锁,用来阻止同一个进程下的多个线程的同时执行
        同一个进程下的多个线程无法利用多核优势!!!
        同一进程下的线程启动时会先抢GIL,当这个线程执行结束,释放GIL(在这个线程运行的时候其他线程进入等待状态)
        
    因为cpython中的内存管理不是线程安全的
    内存管理(垃圾回收机制)
        1.应用计数
        2.标记清楚
        3.分代回收
        
    """

    重点:
    1.GIL不是python的特点而是CPython解释器的特点
    2.GIL是保证解释器级别的数据的安全
    3.GIL会导致同一个进程下的多个线程的无法同时执行即无法利用多核优势(******)
    4.针对不同的数据还是需要加不同的锁处理
    5.解释型语言的通病:同一个进程下多个线程无法利用多核优势

    九、互斥锁

    需要注意的点:
    #1.线程抢的是GIL锁,GIL锁相当于执行权限,拿到执行权限后才能拿到互斥锁Lock,其他线程也可以抢到GIL,但如果发现Lock仍然没有被释放则阻塞,即便是拿到执行权限GIL也要立刻交出来
    
    #2.join是等待所有,即整体串行,而锁只是锁住修改共享数据的部分,即部分串行,要想保证数据安全的根本原理在于让并发变成串行,join与互斥锁都可以实现,毫无疑问,互斥锁的部分串行效率要更高

    实例:

    没加互斥锁前

    from threading import Thread, Lock
    import time
    
    money = 100
    
    
    def task():
        global money
    
        tmp = money
        time.sleep(0.1)
        money = tmp - 1
    
    
    if __name__ == '__main__':
    
        t_list = []
    
        for i in range(100):
            t = Thread(target=task)
            t.start()
            t_list.append(t)
        for t in t_list:
            t.join()
        print(money)
    
    """
    99
    """

    加上互斥锁:

    from threading import Thread, Lock
    import time
    
    money = 100
    
    
    def task(mutex):
        global money
        mutex.acquire()
        tmp = money
        time.sleep(0.1)
        money = tmp - 1
        mutex.release()
    
    
    if __name__ == '__main__':
    
        t_list = []
        mutex = Lock()
        for i in range(100):
            t = Thread(target=task, args=(mutex,))
            t.start()
            t_list.append(t)
        for t in t_list:
            t.join()
        print(money)
    '''
    0
    '''

    总结:

    GIL与普通互斥锁的区别

    GIL 与普通Lock是两把锁,保护的数据不一样,前者是保护解释器级别的,后者是保护用户自己开发的应用程序的数据,很明显GIL不负责这件事,只能用户自定义加锁处理,即普通互斥锁Lock

    十、同一个进程下的多线程无法利用多核优势,是不是就没有用了

    """
    多线程是否有用要看具体情况
    单核:四个任务(IO密集型计算密集型)
    多核:四个任务(IO密集型计算密集型)
    """
    # 计算密集型   每个任务都需要10s
    单核(不用考虑了)
        多进程:额外的消耗资源
      多线程:介绍开销
    多核
        多进程:总耗时 10+
      多线程:总耗时 40+
    # IO密集型  
    多核
        多进程:相对浪费资源
      多线程:更加节省资源

    代码验证:

    #计算密集型

    # 计算密集型
    from multiprocessing import Process
    from threading import Thread
    import os,time
    
    
    def work():
        res = 0
        for i in range(10000000):
            res *= i
    
    if __name__ == '__main__':
        l = []
        print(os.cpu_count())  # 获取当前计算机CPU个数
        start_time = time.time()
        for i in range(12):
            p = Process(target=work)  # 1.4679949283599854
            t = Thread(target=work)  # 5.698534250259399
            t.start()
            # p.start()
            # l.append(p)
            l.append(t)
        for p in l:
            p.join()
        print(time.time()-start_time)

    #IO密集型

    # IO密集型
    from multiprocessing import Process
    from threading import Thread
    import os,time
    
    
    def work():
        time.sleep(2)
    
    if __name__ == '__main__':
        l = []
        print(os.cpu_count())  # 获取当前计算机CPU个数
        start_time = time.time()
        for i in range(4000):
            # p = Process(target=work)  # 21.149890184402466
            t = Thread(target=work)  # 3.007986068725586
            t.start()
            # p.start()
            # l.append(p)
            l.append(t)
        for p in l:
            p.join()
        print(time.time()-start_time)

    总结:

    """
    多进程和多线程都有各自的优势
    并且我们后面在写项目的时候通常可以
        多进程下面再开设多线程
    这样的话既可以利用多核也可以介绍资源消耗
    """

     十一、死锁现象与递归锁

    死锁:

    所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁

    from threading import Thread, Lock
    import time
    
    
    mutexA = Lock()
    mutexB = Lock()
    # 类只要加括号多次 产生的肯定是不同的对象
    # 如果你想要实现多次加括号等到的是相同的对象 单例模式
    
    
    class MyThead(Thread):
        def run(self):
            self.func1()
            self.func2()
    
        def func1(self):
            mutexA.acquire()
            print('%s 抢到A锁'% self.name)  # 获取当前线程名
            mutexB.acquire()
            print('%s 抢到B锁'% self.name)
            mutexB.release()
            mutexA.release()
            
        def func2(self):
            mutexB.acquire()
            print('%s 抢到B锁'% self.name)
            time.sleep(2)
            mutexA.acquire()
            print('%s 抢到A锁'% self.name)  # 获取当前线程名
            mutexA.release()
            mutexB.release()
    
    
    if __name__ == '__main__':
        for i in range(10):
            t = MyThead()
            t.start()
    
    '''
    Thread-1 拿到A锁
    Thread-1 拿到B锁
    Thread-1 拿到B锁
    Thread-2 拿到A锁
    然后就卡住,死锁了
    '''

    递归锁:

    """
    递归锁的特点    
        可以被连续的acquire和release
        但是只能被第一个抢到这把锁执行上述操作
        它的内部有一个计数器 每acquire一次计数加一 每realse一次计数减一
        只要计数不为0 那么其他人都无法抢到该锁
    """

    将上述死锁的代码中:

    #导入RLock模块
    mutexA = Lock()
    mutexB = Lock()
    # 换成
    mutexA = mutexB = RLock()
    from threading import Thread, RLock
    import time
    
    mutexA = mutexB = RLock()
    
    
    class MyThead(Thread):
        def run(self):
            self.func1()
            self.func2()
    
        def func1(self):
            mutexA.acquire()
            print('%s 抢到A锁' % self.name)  # 获取当前线程名
            mutexB.acquire()
            print('%s 抢到B锁' % self.name)
            mutexB.release()
            mutexA.release()
    
        def func2(self):
            mutexB.acquire()
            print('%s 抢到B锁' % self.name)
            time.sleep(2)
            mutexA.acquire()
            print('%s 抢到A锁' % self.name)  # 获取当前线程名
            mutexA.release()
            mutexB.release()
    
    
    if __name__ == '__main__':
        for i in range(10):
            t = MyThead()
            t.start()
    '''
    Thread-1 抢到A锁
    Thread-1 抢到B锁
    Thread-1 抢到B锁
    Thread-1 抢到A锁
    Thread-2 抢到A锁
    Thread-2 抢到B锁
    Thread-2 抢到B锁
    Thread-2 抢到A锁
    Thread-4 抢到A锁
    Thread-4 抢到B锁
    Thread-4 抢到B锁
    Thread-4 抢到A锁
    Thread-6 抢到A锁
    Thread-6 抢到B锁
    Thread-6 抢到B锁
    Thread-6 抢到A锁
    Thread-8 抢到A锁
    Thread-8 抢到B锁
    Thread-8 抢到B锁
    Thread-8 抢到A锁
    Thread-10 抢到A锁
    Thread-10 抢到B锁
    Thread-10 抢到B锁
    Thread-10 抢到A锁
    Thread-5 抢到A锁
    Thread-5 抢到B锁
    Thread-5 抢到B锁
    Thread-5 抢到A锁
    Thread-9 抢到A锁
    Thread-9 抢到B锁
    Thread-9 抢到B锁
    Thread-9 抢到A锁
    Thread-7 抢到A锁
    Thread-7 抢到B锁
    Thread-7 抢到B锁
    Thread-7 抢到A锁
    Thread-3 抢到A锁
    Thread-3 抢到B锁
    Thread-3 抢到B锁
    Thread-3 抢到A锁
    '''
    执行结果

    十二、信号量

    同进程的一样

    Semaphore管理一个内置的计数器, 每当调用acquire()时内置计数器-1; 调用release() 时内置计数器+1; 计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

    实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):

    from threading import Thread, Semaphore
    import time
    import random
    
    sm = Semaphore(5)  # 括号内写数字 写几就表示开设几个坑位
    
    
    def task(name):
        sm.acquire()
        print('%s 正在蹲坑'% name)
        time.sleep(random.randint(1, 5))
        sm.release()
    
    
    if __name__ == '__main__':
        for i in range(20):
            t = Thread(target=task, args=('伞兵%s号'%i, ))
            t.start()
    '''
    伞兵0号 正在蹲坑
    伞兵1号 正在蹲坑
    伞兵2号 正在蹲坑
    伞兵3号 正在蹲坑
    伞兵4号 正在蹲坑
    伞兵5号 正在蹲坑
    伞兵7号 正在蹲坑
    伞兵6号 正在蹲坑
    伞兵8号 正在蹲坑
    伞兵9号 正在蹲坑
    伞兵10号 正在蹲坑
    伞兵11号 正在蹲坑
    伞兵12号 正在蹲坑
    伞兵13号 正在蹲坑
    伞兵14号 正在蹲坑
    伞兵15号 正在蹲坑
    伞兵16号 正在蹲坑
    伞兵17号 正在蹲坑
    伞兵18号 正在蹲坑
    伞兵19号 正在蹲坑
    '''
    执行结果

    与进程池是完全不同的概念,进程池Pool(4),最大只能产生4个进程,而且从头到尾都只是这四个进程,不会产生新的,而信号量是产生一堆线程/进程

    十三、Event事件

    同进程的一样

    线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

    '''
    一些进程/线程需要等待另外一些进程/线程运行完毕之后才能运行,类似于发射信号一样
    '''

    常用方法:

    event.isSet():返回event的状态值;
    
    event.wait():如果 event.isSet()==False将阻塞线程;一直等待,直到事件状态为真
    
    event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
    
    event.clear():恢复event的状态值为False。

    示例:等红绿灯

    from threading import Thread, Event
    import time
    
    
    event = Event()  # 造了一个红绿灯
    
    
    def light():
        print('红灯亮着的')
        time.sleep(3)
        print('绿灯亮了')
        # 告诉等待红灯的人可以走了
        event.set()
    
    
    def car(name):
        print('%s 车正在等红灯'%name)
        event.wait()  # 等待别人给你发信号
        print('%s 车加油门飙车走了'%name)
    
    
    if __name__ == '__main__':
        t = Thread(target=light)
        t.start()
    
        for i in range(20):
            t = Thread(target=car, args=('%s'%i, ))
            t.start()

    十四、线程q

    """
    同一个进程下多个线程数据是共享的
    为什么先同一个进程下还会去使用队列呢
    因为队列是
        管道 + 锁
    所以用队列还是为了保证数据的安全
    """
    import queue
    
    # 我们现在使用的队列都是只能在本地测试使用
    
    # 1 队列q  先进先出
    # q = queue.Queue(3)
    # q.put(1)
    # q.get()
    # q.get_nowait()
    # q.get(timeout=3)
    # q.full()
    # q.empty()
    
    
    # 后进先出q
    # q = queue.LifoQueue(3)  # last in first out
    # q.put(1)
    # q.put(2)
    # q.put(3)
    # print(q.get())  # 3
    
    # 优先级q   你可以给放入队列中的数据设置进出的优先级
    q = queue.PriorityQueue(4)
    q.put((10, '111'))
    q.put((100, '222'))
    q.put((0, '333'))
    q.put((-5, '444'))
    print(q.get())  # (-5, '444')
    # put括号内放一个元祖  第一个放数字表示优先级
    # 需要注意的是 数字越小优先级越高!!!

    十五、进程池与线程池

    无论是开设进程也好还是开设线程也好 都需要消耗资源
    只不过开设线程的消耗比开设进程的稍微小一点而已

    我们是不可能做到无限制的开设进程和线程的 因为计算机硬件的资源更不上!!!
    硬件的开发速度远远赶不上软件呐

    我们的宗旨应该是在保证计算机硬件能够正常工作的情况下最大限度的利用它
    """
    # 池的概念
    """
    什么是池?
    池是用来保证计算机硬件安全的情况下最大限度的利用计算机
    它降低了程序的运行效率但是保证了计算机硬件的安全 从而让你写的程序能够正常运行

    进程池:

    同步状态:

    from concurrent.futures import  ProcessPoolExecutor
    import time
    import os
    
    pool = ProcessPoolExecutor(5)
    # 括号内可以传数字 不传的话默认会开设当前计算机cpu个数进程
    """
    池子造出来之后 里面会固定存在五个线程
    这个五个线程不会出现重复创建和销毁的过程
    池子造出来之后 里面会固定的几个进程
    这个几个进程不会出现重复创建和销毁的过程
    
    池子的使用非常的简单
    你只需要将需要做的任务往池子中提交即可 自动会有人来服务你
    """
    
    
    def task(n):
        print(n,os.getpid())
        time.sleep(2)
        return n**n
    
    def call_back(n):
        print('call_back>>>:',n.result())
    
    if __name__ == '__main__':
        # pool.submit(task, 1)  # 朝池子中提交任务  异步提交
        # print('主')
        t_list = []
        for i in range(20):  # 朝池子中提交20个任务
            res = pool.submit(task, i)  # <Future at 0x100f97b38 state=running>
            print(res.result())  # result方法   同步提交,得到任务的返回结果,打印的是任务函数task的返回值
            t_list.append(res)
        # 等待线程池中所有的任务执行完毕之后再继续往下执行
        pool.shutdown()  # 关闭线程池  等待线程池中所有的任务运行完毕
        for t in t_list:
            print('>>>:',t.result())  # 肯定是有序的
        print('')
    0 9896
    1
    1 10492
    1
    2 12200
    4
    3 6356
    27
    4 16560
    256
    5 9896
    3125
    6 10492
    46656
    7 12200
    823543
    8 6356
    16777216
    9 16560
    387420489
    10 9896
    10000000000
    11 10492
    285311670611
    12 12200
    8916100448256
    13 6356
    302875106592253
    14 16560
    11112006825558016
    15 9896
    437893890380859375
    16 10492
    18446744073709551616
    17 12200
    827240261886336764177
    18 6356
    39346408075296537575424
    19 16560
    1978419655660313589123979
    >>>: 1
    >>>: 1
    >>>: 4
    >>>: 27
    >>>: 256
    >>>: 3125
    >>>: 46656
    >>>: 823543
    >>>: 16777216
    >>>: 387420489
    >>>: 10000000000
    >>>: 285311670611
    >>>: 8916100448256
    >>>: 302875106592253
    >>>: 11112006825558016
    >>>: 437893890380859375
    >>>: 18446744073709551616
    >>>: 827240261886336764177
    >>>: 39346408075296537575424
    >>>: 1978419655660313589123979
    执行结果

    异步状态:(通过回调机制)

    from concurrent.futures import  ProcessPoolExecutor
    import time
    import os
    
    pool = ProcessPoolExecutor(5)
    # 括号内可以传数字 不传的话默认会开设当前计算机cpu个数进程
    
    def task(n):
        print(n,os.getpid())
        time.sleep(2)
        return n**n
    
    def call_back(n):
        print('call_back>>>:',n.result())
    
    if __name__ == '__main__':
        # pool.submit(task, 1)  # 朝池子中提交任务  异步提交
        t_list = []
        for i in range(20):  # 朝池子中提交20个任务
    
            res = pool.submit(task, i).add_done_callback(call_back)    #给每一个异步提交的任务绑定一个方法,一旦任务有结果了会立刻触发该方法
        print('')
    主
    0 1120
    1 6056
    2 17080
    3 15964
    4 12360
    5 1120
    call_back>>>: 1
    6 17080
    7 6056
    call_back>>>: 4
    call_back>>>: 1
    8 15964
    call_back>>>: 27
    9 12360
    call_back>>>: 256
    10 1120
    call_back>>>: 3125
    11 6056
    12 17080
    call_back>>>: 823543
    call_back>>>: 46656
    13 15964
    call_back>>>: 16777216
    14 12360
    call_back>>>: 387420489
    15 1120
    call_back>>>: 10000000000
    16 17080
    17 6056
    call_back>>>: 8916100448256
    call_back>>>: 285311670611
    18 15964
    call_back>>>: 302875106592253
    19 12360
    call_back>>>: 11112006825558016
    call_back>>>: 437893890380859375
    call_back>>>: 18446744073709551616
    call_back>>>: 827240261886336764177
    call_back>>>: 39346408075296537575424
    call_back>>>: 1978419655660313589123979
    执行结果

    线程池:

    同步状态:

    from concurrent.futures import  ThreadPoolExecutor
    import time
    import os
    
    pool = ThreadPoolExecutor(5)
    # 括号内可以传数字 不传的话默认会开设当前计算机cpu个数进程
    
    def task(n):
        print(n,os.getpid())
        time.sleep(2)
        return n**n
    
    def call_back(n):
        print('call_back>>>:',n.result())
    
    if __name__ == '__main__':
        # pool.submit(task, 1)  # 朝池子中提交任务  异步提交
        # print('主')
        t_list = []
        for i in range(20):  # 朝池子中提交20个任务
            res = pool.submit(task, i)  # <Future at 0x100f97b38 state=running>
            print(res.result())  # result方法   同步提交,得到任务的返回结果,打印的是任务函数task的返回值
            t_list.append(res)
        # 等待线程池中所有的任务执行完毕之后再继续往下执行
        pool.shutdown()  # 关闭线程池  等待线程池中所有的任务运行完毕
        for t in t_list:
            print('>>>:',t.result())  # 肯定是有序的
        print('')
    0 4032
    1
    1 4032
    1
    2 4032
    4
    3 4032
    27
    4 4032
    256
    5 4032
    3125
    6 4032
    46656
    7 4032
    823543
    8 4032
    16777216
    9 4032
    387420489
    10 4032
    10000000000
    11 4032
    285311670611
    12 4032
    8916100448256
    13 4032
    302875106592253
    14 4032
    11112006825558016
    15 4032
    437893890380859375
    16 4032
    18446744073709551616
    17 4032
    827240261886336764177
    18 4032
    39346408075296537575424
    19 4032
    1978419655660313589123979
    >>>: 1
    >>>: 1
    >>>: 4
    >>>: 27
    >>>: 256
    >>>: 3125
    >>>: 46656
    >>>: 823543
    >>>: 16777216
    >>>: 387420489
    >>>: 10000000000
    >>>: 285311670611
    >>>: 8916100448256
    >>>: 302875106592253
    >>>: 11112006825558016
    >>>: 437893890380859375
    >>>: 18446744073709551616
    >>>: 827240261886336764177
    >>>: 39346408075296537575424
    >>>: 1978419655660313589123979
    执行结果

    异步状态:(通过回调机制)

    from concurrent.futures import  ThreadPoolExecutor
    import time
    import os
    
    pool = ThreadPoolExecutor(5)
    # 括号内可以传数字 不传的话默认会开设当前计算机cpu个数进程
    
    def task(n):
        print(n,os.getpid())
        time.sleep(2)
        return n**n
    
    def call_back(n):
        print('call_back>>>:',n.result())
    
    if __name__ == '__main__':
        # pool.submit(task, 1)  # 朝池子中提交任务  异步提交
        t_list = []
        for i in range(20):  # 朝池子中提交20个任务
    
            res = pool.submit(task, i).add_done_callback(call_back)
        print('')
    0 14396
    1 14396
    2 14396
    3 14396
    4 14396主
    
    call_back>>>: 1
    5 14396
    call_back>>>: 27call_back>>>: 256
    6 
    7 14396
    14396
    call_back>>>:call_back>>>: 4 1
    8 
    143969 
    14396
    call_back>>>: 3125
    10 14396
    call_back>>>: 823543call_back>>>: 46656
    11 
    12 14396
    14396
    call_back>>>:call_back>>>: 16777216 387420489
    13 
    14 14396
    14396
    call_back>>>: 10000000000
    15 14396
    call_back>>>:call_back>>>: 285311670611 8916100448256
    16 
    17 14396
    14396
    call_back>>>: 302875106592253
    call_back>>>: 1111200682555801618 
    19 14396
    14396
    call_back>>>: 437893890380859375
    call_back>>>: 827240261886336764177call_back>>>: 18446744073709551616
    
    call_back>>>: 39346408075296537575424call_back>>>:
     1978419655660313589123979
    执行结果

    总结:

    ```
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    pool = ProcessPoolExecutor(5)
    pool.submit(task, i).add_done_callback(call_back)
    ```

    十六、协程

    """
    进程:资源单位
    线程:执行单位
    协程:这个概念完全是程序员自己意淫出来的 根本不存在
            单线程下实现并发
            我们程序员自己再代码层面上检测我们所有的IO操作
            一旦遇到IO了 我们在代码级别完成切换
            这样给CPU的感觉是你这个程序一直在运行 没有IO
            从而提升程序的运行效率
        
    多道技术
        切换+保存状态
        CPU两种切换
            1.程序遇到IO
            2.程序长时间占用
    
    TCP服务端 
        accept
        recv
        
    代码如何做到
        切换+保存状态
    
    切换
        切换不一定是提升效率 也有可能是降低效率
        IO切            提升
        没有IO切 降低
            
    保存状态
        保存上一次我执行的状态 下一次来接着上一次的操作继续往后执行
        yield
    """

    验证切换是否就一定提升效率

    """
    计算密集型:降低效率
    I/O密集型:提高效率
    """

    gevnet模块

    import time
    
    # 串行执行计算密集型的任务   1.085108757019043
    def func1():
        for i in range(10000000):
            i + 1
    
    def func2():
        for i in range(10000000):
            i + 1
    
    start_time = time.time()
    func1()
    func2()
    print(time.time() - start_time)
    
    # 切换 + yield  1.3264529705047607
    import time
    
    
    def func1():
        while True:
            10000000 + 1
            yield
    
    
    def func2():
        g = func1()  # 先初始化出生成器
        for i in range(10000000):
            i + 1
            next(g)
    
    start_time = time.time()
    func2()
    print(time.time() - start_time)

    gevent方法介绍

    gevent.spawn会对传入的子任务集合进行调度,gevent.joinall 方法会阻塞当前程序,除非所有的greenlet都执行完毕,才会退出程序
    公有方法
    gevent.spawn(cls, *args, **kwargs) 创建一个Greenlet对象,其实调用的是Greenlet.spawn(需要from gevent import Greenlet),返回greenlet对象
    gevent.joinall(greenlets, timeout=None, raise_error=False, count=None) 等待所有greenlets全部执行完毕, greenlets为序列,timeout为超时计时器对象,返回执行完毕未出错的的greenlet序列
    greenlet
    g.join() 等待此协程执行完毕后

     导入方法:

    #### 安装
    
    '''
    pip3 install gevent
    '''
    
    from gevent import monkey;monkey.patch_all()
    import time
    from gevent import spawn   #为了不加前缀gevent.spawn
    import gevent
    
    """
    gevent模块本身无法检测常见的一些io操作
    在使用的时候需要你额外的导入一句话,其他的正常使用,import gevent
    from gevent import monkey
    monkey.patch_all()
    又由于上面的两句话在使用gevent模块的时候是肯定要导入的
    所以还支持简写
    from gevent import monkey;monkey.patch_all()
    """

    使用gevent模块join

    #### 安装
    
    '''
    pip3 install gevent
    '''
    
    from gevent import monkey;monkey.patch_all()
    import gevent
    import time from gevent import spawn """ gevent模块本身无法检测常见的一些io操作 在使用的时候需要你额外的导入一句话 from gevent import monkey monkey.patch_all() 又由于上面的两句话在使用gevent模块的时候是肯定要导入的 所以还支持简写 from gevent import monkey;monkey.patch_all() """ def heng(): print('') time.sleep(2) print('') def ha(): print('') time.sleep(3) print('') def heiheihei(): print('heiheihei') time.sleep(5) print('heiheihei') start_time = time.time() g1 = spawn(heng) g2 = spawn(ha) g3 = spawn(heiheihei) g1.join() g2.join() # 等待被检测的任务执行完毕 再往后继续执行 g3.join() print(time.time() - start_time) # 5.0066752433776855

    #执行结果:


    heiheihei


    heiheihei
    5.0066752433776855
     

    使用joinall()

    from gevent import monkey;monkey.patch_all()
    import gevent
    import time
    from gevent import spawn
    
    """
    gevent模块本身无法检测常见的一些io操作
    在使用的时候需要你额外的导入一句话
    from gevent import monkey
    monkey.patch_all()
    又由于上面的两句话在使用gevent模块的时候是肯定要导入的
    所以还支持简写
    from gevent import monkey;monkey.patch_all()
    """
    
    
    def heng():
        print('')
        time.sleep(2)
        print('')
    
    
    def ha():
        print('')
        time.sleep(3)
        print('')
    
    def heiheihei():
        print('heiheihei')
        time.sleep(5)
        print('heiheihei')
    
    
    start_time = time.time()
    
    gevent.joinall([
    spawn(heng),
    spawn(ha),
    spawn(heiheihei)
    ]
    )
    
    print(time.time() - start_time)  #5.0037407875061035

    协程实现TCP服务端的并发

    # 服务端
    from gevent import monkey;monkey.patch_all()
    import socket
    from gevent import spawn
    
    
    def communication(conn):
        while True:
            try:
                data = conn.recv(1024)
                if len(data) == 0: break
                conn.send(data.upper())
            except ConnectionResetError as e:
                print(e)
                break
        conn.close()
    
    
    def server(ip, port):
        server = socket.socket()
        server.bind((ip, port))
        server.listen(5)
        while True:
            conn, addr = server.accept()
            spawn(communication, conn)
    
    
    if __name__ == '__main__':
        g1 = spawn(server, '127.0.0.1', 8080)
        g1.join()
    # 客户端
    from threading import Thread, current_thread
    import socket
    
    
    def x_client():
        client = socket.socket()
        client.connect(('127.0.0.1',8080))
        n = 0
        while True:
            msg = '%s say hello %s'%(current_thread().name,n)
            n += 1
            client.send(msg.encode('utf-8'))
            data = client.recv(1024)
            print(data.decode('utf-8'))
    
    
    if __name__ == '__main__':
        for i in range(500):
            t = Thread(target=x_client)
            t.start()

    总结:

    """
    理想状态:
        我们可以通过
        多进程下面开设多线程
        多线程下面再开设协程序
        从而使我们的程序执行效率提升
    """
  • 相关阅读:
    Leetcode 乘积最大子数组 (两种思路)
    C++string函数库-->to_string
    Zigzags CodeForces
    石子游戏(Leetcode每日一题)
    树形dp入门题(Leetcode 337. 打家劫舍 III)
    E
    背包九讲
    通过树状dp来求树的直径
    329. 矩阵中的最长递增路径(Leetcode每日一题)
    关于图的匹配,边覆盖,独立集,顶点覆盖
  • 原文地址:https://www.cnblogs.com/baicai37/p/12769378.html
Copyright © 2011-2022 走看看