zoukankan      html  css  js  c++  java
  • 4月28日 python学习总结 线程与协程

    一、 异步与回调机制  

     问题:

     1、任务的返回值不能得到及时的处理,必须等到所有任务都运行完毕才能统一进行处理

     2、解析的过程是串行执行的,如果解析一次需要花费2s,解析9次则需要花费18s

    解决一: (线程实现异步,回调解析结果)    

    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    from threading import current_thread
    import requests
    import os
    import time
    import random
    
    def get(url):
        print('%s GET %s' %(current_thread().name,url))
        response=requests.get(url)
        time.sleep(random.randint(1,3))
    
        if response.status_code == 200:
            # 干解析的活
            return response.text
    
    def pasrse(obj):
        res=obj.result()
        print('%s 解析结果为:%s' %(current_thread().name,len(res)))
    
    if __name__ == '__main__':
        urls=[
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.python.org',
        ]
        pool=ThreadPoolExecutor(4)
        for url in urls:
            obj=pool.submit(get,url)             #放入进程池,实现异步操作
            obj.add_done_callback(pasrse)    #回调,将线程执行结果当作参数传递给pasrse函数,线程是谁先空闲谁执行结果处理,不存在主次之分
    
        print('主线程',current_thread().name)

       

         解决二: (进程实现异步,回调解析结果)

    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import requests
    import os
    import time
    import random
    
    def get(url):
        print('%s GET %s' %(os.getpid(),url))
        response=requests.get(url)
        time.sleep(random.randint(1,3))
    
        if response.status_code == 200:
            # 干解析的活
            return response.text
    
    def pasrse(obj):
        res=obj.result()
        print('%s 解析结果为:%s' %(os.getpid(),len(res)))
    
    if __name__ == '__main__':
        urls=[
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.python.org',
        ]
    
        pool=ProcessPoolExecutor(4)
        for url in urls:
            obj=pool.submit(get,url)           #放入进程池,实现异步操作
            obj.add_done_callback(pasrse)      #回调,将进程执行结果当作参数传递给pasrse函数,由主进程执行
    
        print('主进程',os.getpid())

    二、线程queue

    import queue
    
    q=queue.Queue(3) #队列:先进先出
    q.put(1)
    q.put(2)
    q.put(3)
    # q.put(4)
    
    print(q.get())
    print(q.get())
    print(q.get())
    
    q=queue.LifoQueue(3) #堆栈:后进先出
    
    q.put('a')
    q.put('b')
    q.put('c')
    
    print(q.get())
    print(q.get())
    print(q.get())
    
    q=queue.PriorityQueue(3) #优先级队列:可以以小元组的形式往队列里存值,第一个元素代表优先级,数字越小优先级越高
    q.put((10,'user1'))
    q.put((-3,'user2'))
    q.put((-2,'user3'))
    
    print(q.get())
    print(q.get())
    print(q.get())

    三、线程Event   

    from threading import Event,current_thread,Thread
    import time
    
    event=Event()      # 监听信号 初始值为False
    
    def check():
        print('%s 正在检测服务是否正常....' %current_thread().name)
        time.sleep(5)
        event.set()         #set 方法将信号值 置为True
    
    
    def connect():
        count=1
        while not event.is_set():        #判断标记为是否为True
            if count ==  4:
                print('尝试的次数过多,请稍后重试')
                return
            print('%s 尝试第%s次连接...' %(current_thread().name,count))
            event.wait(1)             #括号里的是等待时间,程序想继续运行,除非标志位为True或者超时,此处超时不会报错,是继续执行
            count+=1
        print('%s 开始连接...' % current_thread().name)
    
    if __name__ == '__main__':
        t1=Thread(target=connect)
        t2=Thread(target=connect)
        t3=Thread(target=connect)
    
        c1=Thread(target=check)
    
        t1.start()
        t2.start()
        t3.start()
        c1.start()

    四、协程    

    1、单线程下实现并发:协程

    并发指的多个任务看起来是同时运行的

    并发实现的本质:切换+保存状态

     

    2、并发、并行、串行:

    并发:看起来是同时运行,切换+保存状态
    并行:真正意义上的同时运行,只有在多cpu的情况下才能
    实现并行,4个cpu能够并行4个任务

    串行:一个人完完整整地执行完毕才运行下一个任务

    import time
    def consumer():
         '''任务1:接收数据,处理数据'''
         while True:
             x=yield
     
     
    def producer():
         '''任务2:生产数据'''
         g=consumer()
         next(g)
         for i in range(10000000):
            g.send(i)
    
     start=time.time()
     #基于yield保存状态,实现两个任务直接来回切换,即并发的效果
     #PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
     producer() #1.0202116966247559
    
     stop=time.time()
     print(stop-start)

        并不是所有协程都能提升效率,如果是IO密集型的,协程会提高执行效率,然而计算密集型的切换并不能提高效率,反而会降低效率                                                                        

    五、单线程下实现遇到IO切换

        1、greentlet可以切换,但不能遇到IO切  

    from greenlet import greenlet
    import time
    
    def eat(name):
        print('%s eat 1' %name)
        time.sleep(30)
        g2.switch('alex')        #遇到switch切换
        print('%s eat 2' %name)
        g2.switch()
    def play(name):
        print('%s play 1' %name)
        g1.switch()
        print('%s play 2' %name)
    
    g1=greenlet(eat)
    g2=greenlet(play)
    
    g1.switch('egon')

    ·    

        2、gevent切换,只能识别自己的IO操作,无法数别系统定义的IO,如time.sleep()

    import gevent
    
    def eat(name):
        print('%s eat 1' %name)
        gevent.sleep(5)        #gevent自定义的IO  可切换
        print('%s eat 2' %name)
    def play(name):  
        print('%s play 1' %name)
        gevent.sleep(3)
        print('%s play 2' %name)
    
    g1=gevent.spawn(eat,'egon')
    g2=gevent.spawn(play,'alex')
    
    # g1.join()
    # g2.join()
    gevent.joinall([g1,g2])
    
    
    
    #无法识别,不能切换
    from gevent import monkey;monkey.patch_all()
    import gevent
    import time
    
    def eat(name):
        print('%s eat 1' %name)
        time.sleep(5)         #无法识别,不能切换
        print('%s eat 2' %name)
    def play(name):
        print('%s play 1' %name)
        time.sleep(3)
        print('%s play 2' %name)
    
    g1=gevent.spawn(eat,'egon')
    g2=gevent.spawn(play,'alex')
    
    # g1.join()
    # g2.join()
    gevent.joinall([g1,g2])

    3、若想要实现系统定义的IO切换需加上       

    import monkey;monkey.patch_all()

     

    eg:

    from gevent import monkey;monkey.patch_all()
    from threading import current_thread
    import gevent
    import time
    
    def eat():
        print('%s eat 1' %current_thread().name)
        time.sleep(5)
        print('%s eat 2' %current_thread().name)
    def play():
        print('%s play 1' %current_thread().name)
        time.sleep(3)
        print('%s play 2' %current_thread().name)
    
    g1=gevent.spawn(eat)
    g2=gevent.spawn(play)
    
    # gevent.sleep(100)
    # g1.join()
    # g2.join()
    print(current_thread().name)
    gevent.joinall([g1,g2])
  • 相关阅读:
    网管必备网站地址
    数组是否包含某个元素
    Thinking in java(八)
    Thinking in java(八)
    Java8系列之重新认识HashMap
    Java8系列之重新认识HashMap
    MarkdownPad2.5/2 注册码
    MarkdownPad2.5/2 注册码
    java8函数式编程(2)
    java8函数式编程(2)
  • 原文地址:https://www.cnblogs.com/95lyj/p/8997575.html
Copyright © 2011-2022 走看看