zoukankan      html  css  js  c++  java
  • 线程回调,线程中的队列,事件,greenlet模块,gevent模块,自定义补丁, 单线程实现并发,协程

    1.线程回调

    在线程池/进程池
    每次提交任务,都会返回一个表示任务的对象,Future对象
    Future对象具备一个绑定方法,add_done_callback 用于指定回调函数
      add 意味着可以添加多个回调函数
    如果直接使用Thread的话,如何完成回调

    from threading import Thread
    import time
    
    
    def call_back(res):
        print('任务结果拿到了:%s' % res)
    
    
    def parser(res):
        print('任务结果拿到了:%s' % res)
    
    
    def task(parser):
        print('run')
        time.sleep(1)
        res = 100  # 表示任务结果
        parser(res)  # 执行回调函数,并传入任务结果
    
    
    t = Thread(target=task, args=(call_back,)) # 在这里指定parser也可以
    t.start()
    
    print('over')
    View Code

    2.线程中的队列

    from queue import Queue,LifoQueue,PriorityQueue
    
    # 与进程中的Joinablequeue使用方法一模一样,但是不具备IPC
    
    # Queue 队列
    # q = Queue()
    #
    # # 可以往里面添加数据
    # q.put('123')
    # q.put('456')
    #
    # # 可以获取数据
    # print(q.get())
    # print(q.get())
    #
    # # 获取值,设定取值时间,超时报错
    # # print(q.get(block=True,timeout=3))
    #
    # # 告诉队列取值已经处理完毕
    # q.task_done()
    # q.task_done()
    # # 等待队列为空后结束队列
    # q.join()
    # 输出结果
    # 123
    # 456
    
    
    
    # LifoQueue,翻译为last in first out 后进先出,先进后出,模拟堆栈的模式
    
    # 除了与Queue的队列不一样之外,其他的都一样
    # lq = LifoQueue()
    #
    # lq.put('123')
    # lq.put('456')
    #
    # print(lq.get())
    # print(lq.get())
    # 输出结果
    # 456
    # 123
    
    
    # 具备优先级的队列
    # PriorityQueue
    # 可以存储一个可以比较大小的对象,对象越小,优先级越高,自定义对象不能使用比较运算符,所以不能存储
    
    
    # q = PriorityQueue()
    # 
    # q.put('b')
    # q.put('a')
    # 
    # print(q.get())  # 会优先得到a
    View Code

    3.事件 Event()

    了解Event之前,我们先了解一个案例:

    from threading import Thread, Event
    
    import time
    
    #
    # is_running = False
    
    # def boot_server():
    #     global is_running
    #     print('正在启动服务器。。。')
    #     time.sleep(3)
    #     print('服务器启动成功!')
    #     is_running = True
    #
    #
    # def connect_server():
    #     while True:
    #         if is_running:
    #             print('连接服务器成功!')
    #             break
    #         else:
    #             time.sleep(0.5)
    #             print('error,服务器未启动')
    #
    #
    # t1 = Thread(target=boot_server)
    # t1.start()  
    #
    # t2 = Thread(target=connect_server)
    # t2.start()
    View Code

    案例:比如说我们做一个客户端连接服务器的模型,服务器与客户端同时启动 而客户端需要访问服务器,

    而服务器启动需要时间,但是两个端口却是同时开启 所以客户端不能及时访问到服务器。

    Event方法

    python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法wait、clear、set

    事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么执行event.wait 方法时便不再阻塞。

    • clear:将“Flag”设置为False

    • set:将“Flag”设置为True

    用 threading.Event 实现线程间通信,使用threading.Event可以使一个线程等待其他线程的通知,我们把这个Event传递到线程对象中,

    Event默认内置了一个标志,初始值为False。一旦该线程通过wait()方法进入等待状态,直到另一个线程调用该Event的set()方法将内置标志设置为True时,该Event会通知所有等待状态的线程恢复运行。

    使用Event的案例:

    from threading import Thread, Event
    
    import time
    # 而使用Event方法就可以解决这种问题
    boot_event = Event()
    
    
    # boot_event.clear()    恢复事件的状态为False
    # boot_event.is_set()   返回事件的状态
    # boot_event.wait()     等待事件发生,就是等待事件被设置为True
    # boot_event.set()      设置事件为True
    
    
    def boot_server():
        print('正在启动服务器。。。')
        time.sleep(3)
        print('服务器启动成功')
        boot_event.set()  # 标记事件已经发生了,将状态设置为True
    
    
    def connect_server():
        boot_event.wait()  # 等待事件发生,如果状态成为了True,会执行下面的代码
        print('连接服务器成功!')
    
    
    t1 = Thread(target=boot_server)
    t1.start()
    
    t2 = Thread(target=connect_server)
    t2.start()
    View Code

    引子

    上一节中我们知道GIL锁将导致CPython无法利用多核CPU的优势,只能使用单核并发的执行。很明显效率不高,那有什么办法能够提高效率呢?

    效率要高只有一个方法就是让这个当前线程尽可能多的占用CPU时间,如何做到?

    任务类型可以分为两种 IO密集型 和 计算密集型

    对于计算密集型任务而言 ,无需任何操作就能一直占用CPU直到超时为止,没有任何办法能够提高计算密集任务的效率,除非把GIL锁拿掉,让多核CPU并行执行。

    对于IO密集型任务任务,一旦线程遇到了IO操作CPU就会立马切换到其他线程,而至于切换到哪个线程,应用程序是无法控制的,这样就导致了效率降低。

    如何能提升效率呢?想一想如果可以监测到线程的IO操作时,应用程序自发的切换到其他的计算任务,是不是就可以留住CPU?的确如此

    一、单线程实现并发

    单线程实现并发这句话乍一听好像在瞎说

    首先需要明确并发的定义

    并发:指的是多个任务同时发生,看起来好像是同时都在进行

    并行:指的是多个任务真正的同时进行

    早期的计算机只有一个CPU,既然CPU可以切换线程来实现并发,那么为何不能再线程中切换任务来并发呢?

    上面的引子中提到,如果一个线程能够检测IO操作并且将其设置为非阻塞,并自动切换到其他任务就可以提高CPU的利用率,指的就是在单线程下实现并发。

    如何能够实现并发呢

    并发 = 切换任务+保存状态,只要找到一种方案,能够在两个任务之间切换执行并且保存状态,那就可以实现单线程并发

    python中的生成器就具备这样一个特点,每次调用next都会回到生成器函数中执行代码,这意味着任务之间可以切换,并且是基于上一次运行的结果,这意味着生成器会自动保存执行状态!

     

    案例:yiled实现并发效果

    # 使用生成器来实现单线并发多个任务
    
    import time
    
    
    def func1():
        a = 1
        for i in range(1000000):
            a += 1
            print('a run')
            time.sleep(3)
            yield
    
    
    def func2():
        res = func1()
        a = 1
        for i in range(1000000):
            a += 1
            print('b run')
            next(res)
    st = time.time()
    func2()
    print(time.time() - st)
    
    '''
    对于纯计算的任务而言,单线程并发反而使执行效率下降了一半左右。所以这样的方案对于纯计算任务而言是没有必要的
    我们暂且不考虑这样的并发对程序的好处是什么,在上述代码中。使用yield来切换代码结构非常混乱,如果任务太多,
    而且每个都需要切换的话,那么会大大的增加时间。降低了效率,因此就有人专门对yield进行了封装,于是便有了greenlet模块
    '''
    View Code

    2.greenlet模块

    '''
    一个'greenlet’是一个小型的独立伪线程,可以把它想象成一些栈帧,栈是初始调用的函数,而栈顶是当前'greenlet'的暂停位置
    你使用'greenlet' 创建一堆这样的堆栈,然后在他们之间跳转执行。跳转必须显式声明的:一个greenlet必须选择要跳转到的另一个greenlet,
    这会让前一个挂起,而后一个在此前挂起处恢复执行。不同greenlets之间的跳转称为切换(switching) 。
    
      当你创建一个greenlet时,它得到一个开始时为空的栈;当你第一次切换到它时,它会执行指定的函数,这个函数可能会调用其他函数、切换跳出greenlet等等。
    最终栈底的函数执行结束出栈时,这个greenlet的栈又变成空的,这个greenlet也就死掉了。greenlet也会因为一个未捕捉的异常死掉。
    '''
    import greenlet
    import time
    
    
    def task1():
        print('task1,run')
        g2.switch()  # 切换任务至g2
        print('task1 over')
        g2.switch()  # 切换任务至g2
    
    
    def task2():
        print('task2,run')
        g1.switch()  # 切换任务至g1
        time.sleep(2)
        print('task2,over')
    
    
    g1 = greenlet.greenlet(task1)
    g2 = greenlet.greenlet(task2)
    
    
    g1.switch()
    
    print('主 over')
    
    '''
    该模块简化了yield复杂的代码结构,实现了单线程下多任务并发,但是无论直接使用yield还是greenlet都不能检测到IO操作
    如果遇到IO时同样进入阻塞状态,所以此时的并发是没有任何意义的
    因此就延申出了gevent模块,既能检测IO,又能实现单线程并发,注意的是gevent模块自身无法检测IO
    '''
    View Code

     

      协程·

    协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。

    需要强调的是:

    1. python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)
    2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)

    对比操作系统控制线程的切换,用户在单线程内控制协程的切换

    优点如下:

    1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
    2. 单线程内就可以实现并发的效果,最大限度地利用cpu

    缺点如下:

    1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程来尽可能提高效率
    2. 协程本质是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程

     

    1.gevent模块 协程的使用

    import time
    
    '''
    gevent 不具备检测IO的能力,需要为它打补丁,打上补丁之后就能检测IO
    注意补丁一定需要打在最上面,必须保证导入模块前就打好补丁
    '''
    
    from gevent import monkey
    
    monkey.patch_all()
    
    from threading import current_thread
    import gevent
    
    
    def task1():
        # print(current_thread(), 1)
        print('task1,run')
    
        time.sleep(3)
        print('task1 over')
    
    
    def taks2():
        # print(current_thread(), 2)
        print('task2 run')
        print('task2 over')
    
    
    
    # spawn 用于创建一个协程任务
    g1 = gevent.spawn(task1)
    g2 = gevent.spawn(taks2)
    
    
    # 任务要执行,必须保证主线程没挂,因为所有协程任务都是主线在执行
    # 必须调用join来等待协程任务,理论上等待执行时间最长的任务,
    # 但是我们在执行过程中并不知道那个任务执行的时间最长,所有全部join
    # 这里有一个方法可以全部join,
    gevent.joinall([g1,g2])
    print('over')
    View Code

    monkey () 补丁

    在Python语言中,monkey patch 指的是对于一个类或者模块所进行的动态修改

    2.自定义补丁练习

    import json
    # 自定义json补丁
    
    def dumps(obj):
        print('这是打完补丁后的dumps函数 哈哈哈哈!')
    
    
    def loads(json_str):
        print('这是打完补丁后的loads函数 嘻嘻嘻嘻!')
    
    
    def patch_json():
        json.dumps = dumps
        json.loads = loads
    
    # 打补丁
    patch_json()
    
    # 再次调用会执行覆盖的dumps与loads方法
    json.dumps('123123')  # 输出结果  '这是打完补丁后的dumps函数 哈哈哈哈!'
    json.loads('123321')  # 输出结果  '这是打完补丁后的loads函数 嘻嘻嘻嘻!’
    View Code
  • 相关阅读:
    Android中实现定时器的三种方法 分类: Android 2015-07-14 18:04 11人阅读 评论(0) 收藏
    java构造器内部多态方法
    java继承方法覆盖
    java对象实例化 静态块,对象块,构造函数执行顺序
    Linux 的系统运行级别
    Jmeter启动jmeter-server.bat 报java.io.FileNotFoundException:rmi_keystore.jks 解决方法
    jmeter中JSON Extractors使用
    CentOS6.5下安装jenkins
    day12接口自动化测试框架
    day10 python接口开发、mock接口、网络编程
  • 原文地址:https://www.cnblogs.com/liguodeboke/p/10986975.html
Copyright © 2011-2022 走看看