zoukankan      html  css  js  c++  java
  • 网络编程9_线程-条件,定时器,队列,线程池, 协程

     线程
    一. 条件
        使得线程等待,只有满足某条件时,才释放n个线程
        import time
        from threading import Thread,RLock,Condition,current_thread
        
        def func1(c):
            c.acquire(False) #固定格式
            # print(1111)
        
            c.wait()  #等待通知,
            time.sleep(3)  #通知完成后大家是串行执行的,这也看出了锁的机制了
            print('%s执行了'%(current_thread().getName()))
        
            c.release()
        
        if __name__ == '__main__':
            c = Condition()
            for i in range(5):
                t = Thread(target=func1,args=(c,))
                t.start()
        
            while True:
                num = int(input('请输入你要通知的线程个数:'))
                c.acquire() #固定格式
                c.notify(num)  #通知num个线程别等待了,去执行吧
                c.release()
        
        #结果分析: 
        # 请输入你要通知的线程个数:3
        # 请输入你要通知的线程个数:Thread-1执行了 #有时候你会发现的你结果打印在了你要输入内容的地方,这是打印的问题,没关系,不影响
        # Thread-3执行了
        # Thread-2执行了
    二. 定时器
        定时器,指定n秒后执行某个操作,这个做定时任务的时候可能会用到
        import time
        from threading import Timer,current_thread #这里就不需要再引入Timer
        import threading
        def hello():
            print(current_thread().getName())
            print("hello, world")
            # time.sleep(3) #如果你的子线程的程序执行时间比较长,那么这个定时任务也会乱,当然了,主要还是看业务需求
        t = Timer(10, hello)  #创建一个子线程去执行后面的函数
        t.start()  # after 1 seconds, "hello, world" will be printed
        # for i in range(5):
        #     t = Timer(2, hello)
        #     t.start()  
        #     time.sleep(3) #这个是创建一个t用的时间是2秒,创建出来第二个的时候,第一个已经过了两秒了,所以你的5个t的执行结果基本上就是2秒中,这个延迟操作。
        
        print(threading.active_count())
        print('主进程',current_thread().getName())
    三. 线程队列
        queue队列 :使用import queue,用法与进程Queue一样
        queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
        1. class queue.Queue(maxsize=0)    先进先出
        import queue
        q = queue.Queue(3)
        q.put(1)
        q.put(2)
        print("当前队列长度: ", q.qsize())
        q.put(3)
        print("查看队列状态: ", q.full())
        try:
            q.put_nowait(4)
        except Exception:
            print("当前队列已满")
        print(q.get())
        print(q.get())
        print(q.get())
        print("查看队列状态: ", q.empty())
        try:
            q.get_nowait()
        except Exception:
            print("队列已空")
        2. class queue.LifoQueue(maxsize=0)    后进先出
        q = queue.LifoQueue(3)
        q.put(1)
        q.put(2)
        q.put(3)
        print("查看队列状态: " ,q.full())
        print(q.get())
        print(q.get())
        print("查看当前队列长度: ", q.qsize())
        print(q.get())
        3. class queue.PriorityQueue(maxsize=0)    优先级队列
        def f1():
            pass
        class Animal:
            pass
        a = Animal()
        q = queue.PriorityQueue(3)
        q.put((1, Animal))
        q.put((3, f1))
        q.put((5, a))
        print(q.get())
        print(q.get())
        print(q.get())
        这三种队列都是线程安全的,不会出现多个线程抢占同一个资源或数据的情况。
    四. 线程池 concurrent.futures 模块
        早期的时候我们没有线程池,现在python提供了一个新的标准或者说内置的模块,这个模块里面提供了新的线程池和进程池,之前我们说的进程池是在multiprocessing里面的,现在这个在这个新的模块里面,他俩用法上是一样的。
        为什么要将进程池和线程池放到一起呢,是为了统一使用方式,使用threadPollExecutor和ProcessPollExecutor的方式一样,而且只要通过这个concurrent.futures导入就可以直接用他们两个了:
        concurrent.futures模块提供了高度封装的异步调用接口
        ThreadPoolExecutor:线程池,提供异步调用
        ProcessPoolExecutor: 进程池,提供异步调用
        Both implement the same interface, which is defined by the abstract Executor class.
        1. 基本方法
        #submit(fn, *args, **kwargs)
        异步提交任务
        #map(func, *iterables, timeout=None, chunksize=1) 
        取代for循环submit的操作
        #shutdown(wait=True) 
        相当于进程池的pool.close()+pool.join()操作
        wait=True,等待池内所有任务执行完毕回收完资源后才继续
        wait=False,立即返回,并不会等待池内的任务执行完毕
        但不管wait参数为何值,整个程序都会等到所有任务执行完毕
        submit和map必须在shutdown之前
        #result(timeout=None)
        取得结果
        #add_done_callback(fn)
        回调函数
        2. 线程池的简单使用
        import time, os, threading
        from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
        def func(n):
            time.sleep(1)
            print("%s打印的: %s"%(threading.get_ident(), n))
            return n*n
        # 默认一般起线程个数不超过CPU个数*5
        tpool = ThreadPoolExecutor(max_workers=5)
        # 异步执行
        t_list = []
        for i in range(5):
            # 提交执行函数,返回一个结果对象,i作为任务函数的参数 def submit(self, fn, *args, **kwargs):可以传任意形式的参数
            t = tpool.submit(func, i)
            t_list.append(t)
            # print(t.result())
            # 这个返回的结果对象t,不能直接去拿结果,不然又变成串行了,可以理解为拿到一个号码,等所有线程的结果都出来之后,我们再去通过结果对象t获取结果
        tpool.shutdown()
        # 起到原来的close阻止新任务进来 + join的作用,等待所有的线程执行完毕
        print("主线程")
        for ti in t_list:
            print(">>>", ti.result())
        # 我们还可以不用shutdown(),用下面这种方式
        # while 1:
        #     for n,ti in enumerate(t_list):
        #         print('>>>>', ti.result(),n)
        #     time.sleep(1)
            #每个两秒去去一次结果,哪个有结果了,就可以取出哪一个,想表达的意思就是说不用等到所有的结果都出来再去取,可以轮询着去取结果,因为你的任务需要执行的时间很长,那么你需要等很久才能拿到结果,通过这样的方式可以将快速出来的结果先拿出来。如果有的结果对象里面还没有执行结果,那么你什么也取不到,这一点要注意,不是空的,是什么也取不到,那怎么判断我已经取出了哪一个的结果,可以通过枚举enumerate来搞,记录你是哪一个位置的结果对象的结果已经被取过了,取过的就不再取了
        #结果分析: 打印的结果是没有顺序的,因为到了func函数中的sleep的时候线程会切换,谁先打印就没准儿了,但是最后的我们通过结果对象取结果的时候拿到的是有序的,因为我们主线程进行for循环的时候,我们是按顺序将结果对象添加到列表中的。
        # 6696打印的: 0
        # 5044打印的: 3
        # 4424打印的: 2
        # 8840打印的: 1
        # 1244打印的: 4
        # 主线程
        # >>> 0
        # >>> 1
        # >>> 4
        # >>> 9
        # >>> 16
        3. 线程池的简单使用
        只需要将这一行代码改为下面这一行就可以了,其他的代码都不用变
        # tpool = ThreadPoolExecutor(max_workers=5) 
        tpool = ProcessPoolExecutor(max_workers=4)
        #默认一般起进程的数据不超过CPU个数
        4. map的使用
        import time, os, random, threading
        from concurrent.futures import ThreadPoolExecutor
        def work(n):
            print("%s is running"%threading.get_ident())
            time.sleep(random.randint(1,3))
            return n**2
        if __name__ == '__main__':
            t = ThreadPoolExecutor(max_workers=4)
            # for i in range(10):
            #     t.submit(work, i)
            # 用map取代submit
            s = t.map(work, range(5))
            print([i for i in s])
        # 5792 is running
        # 5824 is running
        # 9208 is running
        # 1296 is running
        # 5792 is running
        # [0, 1, 4, 9, 16]
        5. 回调函数
    五. 协成
        1. 背景
        对于单线程下,我们不可避免程序中出现io操作,但如果我们能在自己的程序中(即用户程序级别,而非操作系统级别)控制单线程下的多个任务能在一个任务遇到io阻塞时就切换到另外一个任务去计算,这样就保证了该线程能够最大限度地处于就绪态,即随时都可以被cpu执行的状态,相当于我们在用户程序级别将自己的io操作最大限度地隐藏起来,从而可以迷惑操作系统,让其看到:该线程好像是一直在计算,io比较少,从而更多的将cpu的执行权限分配给我们的线程。
        协程的本质就是在单线程下,由用户自己控制一个任务遇到io阻塞了就切换另外一个任务去执行,以此来提升效率。为了实现它,我们需要找寻一种可以同时满足以下条件的解决方案:
        1). 可以控制多个任务之间的切换,切换之前将任务的状态保存下来,以便重新运行时,可以基于暂停的位置继续执行。
        2). 作为1的补充:可以检测io操作,在遇到io操作的情况下才发生切换
        2. 协成介绍
        协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。、
        需要强调的是:
        1). python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)
        2). 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)
        对比操作系统控制线程的切换,用户在单线程内控制协程的切换
        优点如下:
        1). 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
        2). 单线程内就可以实现并发的效果,最大限度地利用cpu
        缺点如下:
        1). 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程
        2). 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程
        总结协程特点:
        1). 必须在只有一个单线程里实现并发
        2). 修改共享数据不需加锁
        3). 用户程序里自己保存多个控制流的上下文栈
        4). 附加:一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))
        3. greenlet
        如果我们在单个线程内有20个任务,要想实现在多个任务之间切换,使用yield生成器的方式过于麻烦(需要先得到初始化一次的生成器,然后再调用send。。。非常麻烦),而使用greenlet模块可以非常简单地实现这20个任务直接的切换
        #真正的协程模块就是使用greenlet完成的切换
        from greenlet import greenlet
        
        def eat(name):
            print('%s eat 1' %name)  #2
            g2.switch('taibai')   #3
            print('%s eat 2' %name) #6
            g2.switch() #7
        def play(name):
            print('%s play 1' %name) #4
            g1.switch()      #5
            print('%s play 2' %name) #8
        
        g1=greenlet(eat)
        g2=greenlet(play)
        
        g1.switch('taibai')#可以在第一次switch时传入参数,以后都不需要  1
        #单纯的切换(在没有io的情况下或者没有重复开辟内存空间的操作),反而会降低程序的执行速度
        #顺序执行
        import time
        def f1():
            res=1
            for i in range(100000000):
                res+=i
        
        def f2():
            res=1
            for i in range(100000000):
                res*=i
        
        start=time.time()
        f1()
        f2()
        stop=time.time()
        print('run time is %s' %(stop-start)) #10.985628366470337
        
        #切换
        from greenlet import greenlet
        import time
        def f1():
            res=1
            for i in range(100000000):
                res+=i
                g2.switch()
        
        def f2():
            res=1
            for i in range(100000000):
                res*=i
                g1.switch()
        
        start=time.time()
        g1=greenlet(f1)
        g2=greenlet(f2)
        g1.switch()
        stop=time.time()
        print('run time is %s' %(stop-start)) # 52.763017892837524
        greenlet只是提供了一种比generator更加便捷的切换方式,当切到一个任务执行时如果遇到io,那就原地阻塞,仍然是没有解决遇到IO自动切换来提升效率的问题。
        虽然没有规避固有的I/O时间,但是我们使用这个时间来做别的事情了,一般在工作中我们都是进程+线程+协程的方式来实现并发,以达到最好的并发效果,如果是4核的cpu,一般起5个进程,每个进程中20个线程(5倍cpu数量),每个线程可以起500个协程,大规模爬取页面的时候,等待网络延迟的时间的时候,我们就可以用协程去实现并发。 并发数量 = 5 * 20 * 500 = 50000个并发,这是一般一个4cpu的机器最大的并发数。nginx在负载均衡的时候最大承载量就是5w个
        单线程里的这20个任务的代码通常会既有计算操作又有阻塞操作,我们完全可以在执行任务1时遇到阻塞,就利用阻塞的时间去执行任务2。。。。如此,才能提高效率,这就用到了Gevent模块。
        4. gevent模块
        Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
        1). 用法
        g1=gevent.spawn(func,1,2,3,x=4,y=5)创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的,spawn是异步提交任务
        g2=gevent.spawn(func2)
        g1.join() #等待g1结束,上面只是创建协程对象,这个join才是去执行
        g2.join() #等待g2结束  有人测试的时候会发现,不写第二个join也能执行g2,是的,协程帮你切换执行了,但是你会发现,如果g2里面的任务执行的时间长,但是不写join的话,就不会执行完等到g2剩下的任务了
        #或者上述两步合作一步:gevent.joinall([g1,g2])
        g1.value#拿到func1的返回值
        2). 遇到I/O阻塞会自动切换任务
        import gevent

        def eat(name):
            print("%s eat 1"% name)
            gevent.sleep(2)
            print("%s eat 2"% name)
        
        def play(name):
            print("%s play 1" % name)
            gevent.sleep(1)
            print("%s play 2" % name)
        
        g1 = gevent.spawn(eat, "egon")
        g2 = gevent.spawn(play, "egon")
        
        g1.join()
        g2.join()
        # gevent.spawn([g1, g2])
        
        print("主")
        # egon eat 1
        # egon play 1
        # egon play 2
        # egon eat 2
        # 主
        上例gevent.sleep(2)模拟的是gevent可以识别的io阻塞,
        而time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了
        from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面,如time,socket模块之前
        或者我们干脆记忆成:要用gevent,需要将from gevent import monkey;monkey.patch_all()放到文件的开头

        我们可以用threading.current_thread().getName()来查看每个g1和g2,查看的结果为DummyThread-n,即假线程,虚拟线程,其实都在一个线程里面
        进程线程的任务切换是由操作系统自行切换的,你自己不能控制
        协程是通过自己的程序(代码)来进行切换的,自己能够控制,只有遇到协程模块能够识别的IO操作的时候,程序才会进行任务切换,实现并发效果,如果所有程序都没有IO操作,那么就基本属于串行执行了。
        5. 协成子同步与异步
        from gevent import spawn, joinall,monkey; monkey.patch_all()
        import time
        
        def task(pid):
            """
               Some non-deterministic task
            """
            time.sleep(0.5)
            print('Task %s done' % pid)
        
        # 同步
        def synchronous():
            for i in range(10):
                task(i)
        
        # spawn()异步提交任务
        def asynchronous():
            g_l = [spawn(task, i) for i in range(10)]
            joinall(g_l)
        
        
        if __name__ == '__main__':
            print("Synchronous:")
            synchronous()
        
            print("Asynchronous:")
            asynchronous()
            # 上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,后者阻塞当前流程,并执行所有给定的greenlet。执行流程只会在 所有greenlet执行完后才会继续向下走。
        6. gevent应用之一: 爬虫
        from gevent import monkey; monkey.patch_all()
        import gevent
        import requests
        import time
        
        def get_page(url):
            print("GET: %s"%url)
            response = requests.get(url)
            # print(response.status_code)
            if response.status_code == 200:
                print("%d bytes receved from %s"%(len(response.text), url))
                # print(response.text)
        
        s = time.time()
        gevent.joinall([
            gevent.spawn(get_page, 'https://www.python.org/'),
            gevent.spawn(get_page, 'https://www.yahoo.com/'),
            gevent.spawn(get_page, 'https://github.com/'),
        ])
        
        e = time.time()
        print("run time is %s" % (e-s))
        
        # GET: https://www.python.org/
        # GET: https://www.yahoo.com/
        # GET: https://github.com/
        # 48862 bytes receved from https://www.python.org/
        # 79878 bytes receved from https://github.com/
        # 518555 bytes receved from https://www.yahoo.com/
        # run time is 10.245655298233032
        
        # 将上面的程序最后加上一段串行的代码看看效率:如果你的程序不需要太高的效率,那就不用什么并发啊协程啊之类的东西。
        
        print('--------------------------------')
        s = time.time()
        requests.get('https://www.python.org/')
        requests.get('https://www.yahoo.com/')
        requests.get('https://github.com/')
        t = time.time()
        print('串行时间>>',t-s)
        
        # --------------------------------
        # 串行时间>> 13.477648973464966
        7. gevent应用之二: 实现单线程下的socket并发
        通过gevent实现单线程下的socket并发(from gevent import monkey;monkey.patch_all()一定要放到导入socket模块之前,否则gevent无法识别socket的阻塞)
        一个网络请求里面经过多个时间延迟time

        1). 服务端:
        from gevent import monkey; monkey.patch_all()
        from socket import *
        import gevent
        
        #如果不想用money.patch_all()打补丁,可以用gevent自带的socket
        # from gevent import socket
        # s=socket.socket()
        
        def server(server_ip, port):
            s = socket(AF_INET, SOCK_STREAM)
            s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
            s.bind((server_ip, port))
            s.listen(5)
            while 1:
                conn, addr = s.accept()
                gevent.spawn(talk, conn, addr)
        
        def talk(conn, addr):
            try:
                while 1:
                    res = conn.recv(1024).decode("utf-8")
                    print("client %s:%s msg: %s"%(addr[0], addr[1], res))
                    # msg = input(">>>: ").strip()
                    msg = res.upper()
                    conn.send(msg.encode("utf-8"))
            except Exception as e:
                print(e)
            finally:
                conn.close()
        
        if __name__ == '__main__':
            server("127.0.0.1", 8080)

        2). 客户端
        from socket import *

        c = socket(AF_INET, SOCK_STREAM)
        c.connect(("127.0.0.1", 8080))
        
        while 1:
            msg = input(">>>: ").strip()
            if not msg:
                continue
        
            c.send(msg.encode("utf-8"))
            msg2 = c.recv(1024)
        
            print("客户端: ", msg2.decode("utf-8"))

        3). 多线程并发多个客户端,去请求上面的服务端是没问题的
        from threading import Thread
        from socket import *
        import threading
        
        def client(server_ip, port):
            c = socket(AF_INET, SOCK_STREAM)
            c.connect((server_ip, port))
            count = 0
            while 1:
                c.send(("%s say hello %s" % (threading.current_thread().getName(), count)).encode("utf-8"))
                msg = c.recv(1024)
                print("server: ", msg.decode("utf-8"))
                count += 1
        
        if __name__ == '__main__':
            for i in range(500):
                t = Thread(target=client, args=("127.0.0.1", 8080))
                t.start()

  • 相关阅读:
    win 7 iis 的一些问题
    Google Maps JavaScript API V3 根据地址 加载地图
    ASP.NET 在程序中动态删除、修改配置文件节点值的方法
    Lambda 表达式的深化及使用
    NHibernate配置的总体流程
    Adobe Acrobat Automation和Aspose.Pdf添加文本印章和水印的对比
    如何导入Swagger文件到Eolinker
    接口文档规范
    对外接口安全性需要考虑什么?
    如何降低API文档对接成本
  • 原文地址:https://www.cnblogs.com/guyannanfei/p/10268936.html
Copyright © 2011-2022 走看看