zoukankan      html  css  js  c++  java
  • 异步、+回调机制、线程queue、线程Event、协程、单线程实现遇到IO切换

    # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    # import requests
    # import os
    # import time
    # import random
    #
    # def get(url):
    #     print('%s GET %s' %(os.getpid(),url))
    #     response=requests.get(url)
    #     time.sleep(random.randint(1,3))
    #
    #     if response.status_code == 200:
    #         return response.text
    #
    # def pasrse(res):
    #     print('%s 解析结果为:%s' %(os.getpid(),len(res)))
    #
    # if __name__ == '__main__':
    #     urls=[
    #         'https://www.baidu.com',
    #         'https://www.baidu.com',
    #         'https://www.baidu.com',
    #         'https://www.baidu.com',
    #         'https://www.baidu.com',
    #         'https://www.baidu.com',
    #         'https://www.baidu.com',
    #         'https://www.baidu.com',
    #         'https://www.python.org',
    #
    #     ]
    #
    #     pool=ProcessPoolExecutor(4)
    #     objs=[]
    #     for url in urls:
    #         obj=pool.submit(get,url)
    #         objs.append(obj)
    #
    #     pool.shutdown(wait=True)
    #     # 问题:
    #     # 1、任务的返回值不能得到及时的处理,必须等到所有任务都运行完毕才能统一进行处理
    #     # 2、解析的过程是串行执行的,如果解析一次需要花费2s,解析9次则需要花费18s
    #     for obj in objs:
    #         res=obj.result()
    #         pasrse(res)
    
    
    
    # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    # import requests
    # import os
    # import time
    # import random
    #
    # def get(url):
    #     print('%s GET %s' %(os.getpid(),url))
    #     response=requests.get(url)
    #     time.sleep(random.randint(1,3))
    #
    #     if response.status_code == 200:
    #         pasrse(response.text)
    #
    # def pasrse(res):
    #     print('%s 解析结果为:%s' %(os.getpid(),len(res)))
    #
    # if __name__ == '__main__':
    #     urls=[
    #         'https://www.baidu.com',
    #         'https://www.baidu.com',
    #         'https://www.baidu.com',
    #         'https://www.baidu.com',
    #         'https://www.baidu.com',
    #         'https://www.baidu.com',
    #         'https://www.baidu.com',
    #         'https://www.baidu.com',
    #         'https://www.python.org',
    #
    #     ]
    #
    #     pool=ProcessPoolExecutor(4)
    #     for url in urls:
    #         pool.submit(get,url)
    #
    
    
    
    
    
    # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    # import requests
    # import os
    # import time
    # import random
    #
    # def get(url):
    #     print('%s GET %s' %(os.getpid(),url))
    #     response=requests.get(url)
    #     time.sleep(random.randint(1,3))
    #
    #     if response.status_code == 200:
    #         # 干解析的活
    #         return response.text
    #
    # def pasrse(obj):
    #     res=obj.result()
    #     print('%s 解析结果为:%s' %(os.getpid(),len(res)))
    #
    # if __name__ == '__main__':
    #     urls=[
    #         'https://www.baidu.com',
    #         'https://www.baidu.com',
    #         'https://www.baidu.com',
    #         'https://www.baidu.com',
    #         'https://www.baidu.com',
    #         'https://www.baidu.com',
    #         'https://www.baidu.com',
    #         'https://www.baidu.com',
    #         'https://www.python.org',
    #     ]
    #
    #     pool=ProcessPoolExecutor(4)
    #     for url in urls:
    #         obj=pool.submit(get,url)
    #         obj.add_done_callback(pasrse)
    #
    #     # 问题:
    #     # 1、任务的返回值不能得到及时的处理,必须等到所有任务都运行完毕才能统一进行处理
    #     # 2、解析的过程是串行执行的,如果解析一次需要花费2s,解析9次则需要花费18s
    #     print('主进程',os.getpid())
    
    
    
    #解决问题:
    
    
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    from threading import current_thread
    import requests
    import os
    import time
    import random
    
    def get(url):
        print('%s GET %s' %(current_thread().name,url))
        response=requests.get(url)
        time.sleep(random.randint(1,3))
    
        if response.status_code == 200:
            # 干解析的活
            return response.text
    
    def pasrse(obj):
        res=obj.result()
        print('%s 解析结果为:%s' %(current_thread().name,len(res)))
    
    if __name__ == '__main__':
        urls=[
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.python.org',
        ]
    
        pool=ThreadPoolExecutor(4)
        for url in urls:
            obj=pool.submit(get,url)
            obj.add_done_callback(pasrse)
    
           print('主线程',current_thread().name)
    异步+回调机制

    线程queue:

    1、队列:先进先出

    # q=queue.Queue(3) #队列:先进先出
    # q.put(1)
    # q.put(2)
    # q.put(3)
    # # q.put(4)
    #
    # print(q.get())
    # print(q.get())
    # print(q.get())
    队列

    2、堆栈:后进先出

    # q=queue.LifoQueue(3) #堆栈:后进先出
    #
    # q.put('a')
    # q.put('b')
    # q.put('c')
    #
    # print(q.get())
    # print(q.get())
    # print(q.get())
    堆栈

    3、优先级队列:可以以小元组的形式往队列理存值,第一个元素代表优先级,数字越小优先级别越高

    q=queue.PriorityQueue(3)
    q.put((10,'user1'))
    q.put((-3,'user2'))
    q.put((-2,'user3'))
    
    
    print(q.get())
    print(q.get())
    print(q.get())
    优先级队列

    Event: 进程之间协同工作

    # from threading import Event,current_thread,Thread
    # import time
    #
    # event=Event()
    #
    # def check():
    #     print('%s 正在检测服务是否正常....' %current_thread().name)
    #     time.sleep(3)
    #     event.set()
    #
    #
    # def connect():
    #     print('%s 等待连接...' %current_thread().name)
    #     event.wait()
    #     print('%s 开始连接...' % current_thread().name)
    #
    # if __name__ == '__main__':
    #     t1=Thread(target=connect)
    #     t2=Thread(target=connect)
    #     t3=Thread(target=connect)
    #
    #     c1=Thread(target=check)
    #
    #     t1.start()
    #     t2.start()
    #     t3.start()
    #     c1.start()
    
    
    
    
    from threading import Event,current_thread,Thread
    import time
    
    event=Event()
    
    def check():
        print('%s 正在检测服务是否正常....' %current_thread().name)
        time.sleep(5)
        event.set()
    
    
    def connect():
        count=1
        while not event.is_set():
            if count ==  4:
                print('尝试的次数过多,请稍后重试')
                return
            print('%s 尝试第%s次连接...' %(current_thread().name,count))
            event.wait(1)
            count+=1
        print('%s 开始连接...' % current_thread().name)
    
    if __name__ == '__main__':
        t1=Thread(target=connect)
        t2=Thread(target=connect)
        t3=Thread(target=connect)
    
        c1=Thread(target=check)
    
        t1.start()
        t2.start()
        t3.start()
        c1.start()
    Event

    协程:

    1、单线程下实现并发:协程

        并发指的多个任务看起来是同时运行的

        并发实现的本质:切换+保存状态

        并发、并行、串行:

        并发:看起来是同时运行,切换+保存状态

        并行:真正意义上的同时运行,只有在多cpu的情况下才能

          实现并行,4个cpu能够并行4个任务

        串行:一个人完完整整地执行完毕才运行下一个任务

    # import time
    # def consumer():
    #     '''任务1:接收数据,处理数据'''
    #     while True:
    #         x=yield
    #
    #
    # def producer():
    #     '''任务2:生产数据'''
    #     g=consumer()
    #     next(g)
    #     for i in range(10000000):
    #         g.send(i)
    #
    # start=time.time()
    # #基于yield保存状态,实现两个任务直接来回切换,即并发的效果
    # #PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
    # producer() #1.0202116966247559
    #
    #
    # stop=time.time()
    # print(stop-start)
    
    
    
    
    #
    # import time
    # def consumer(res):
    #     '''任务1:接收数据,处理数据'''
    #     pass
    #
    # def producer():
    #     '''任务2:生产数据'''
    #     res=[]
    #     for i in range(10000000):
    #         res.append(i)
    #
    #     consumer(res)
    #     # return res
    #
    # start=time.time()
    # #串行执行
    # res=producer()
    # stop=time.time()
    # print(stop-start)
    协程

    单线程下实现IO切换:

    # from greenlet import greenlet
    # import time
    #
    # def eat(name):
    #     print('%s eat 1' %name)
    #     time.sleep(30)
    #     g2.switch('alex')
    #     print('%s eat 2' %name)
    #     g2.switch()
    # def play(name):
    #     print('%s play 1' %name)
    #     g1.switch()
    #     print('%s play 2' %name)
    #
    # g1=greenlet(eat)
    # g2=greenlet(play)
    #
    # g1.switch('egon')
    
    
    # import gevent
    #
    # def eat(name):
    #     print('%s eat 1' %name)
    #     gevent.sleep(5)
    #     print('%s eat 2' %name)
    # def play(name):
    #     print('%s play 1' %name)
    #     gevent.sleep(3)
    #     print('%s play 2' %name)
    #
    # g1=gevent.spawn(eat,'egon')
    # g2=gevent.spawn(play,'alex')
    #
    # # gevent.sleep(100)
    # # g1.join()
    # # g2.join()
    # gevent.joinall([g1,g2])
    
    
    
    # from gevent import monkey;monkey.patch_all()
    # import gevent
    # import time
    #
    # def eat(name):
    #     print('%s eat 1' %name)
    #     time.sleep(5)
    #     print('%s eat 2' %name)
    # def play(name):
    #     print('%s play 1' %name)
    #     time.sleep(3)
    #     print('%s play 2' %name)
    #
    # g1=gevent.spawn(eat,'egon')
    # g2=gevent.spawn(play,'alex')
    #
    # # gevent.sleep(100)
    # # g1.join()
    # # g2.join()
    # gevent.joinall([g1,g2])
    
    
    
    
    
    
    from gevent import monkey;monkey.patch_all()
    from threading import current_thread
    import gevent
    import time
    
    def eat():
        print('%s eat 1' %current_thread().name)
        time.sleep(5)
        print('%s eat 2' %current_thread().name)
    def play():
        print('%s play 1' %current_thread().name)
        time.sleep(3)
        print('%s play 2' %current_thread().name)
    
    g1=gevent.spawn(eat)
    g2=gevent.spawn(play)
    
    # gevent.sleep(100)
    # g1.join()
    # g2.join()
    print(current_thread().name)
    gevent.joinall([g1,g2])
    代码
  • 相关阅读:
    线程数量与并行应用性能相关性的测试
    redis命令学习
    shell获取日期(昨天,明天,上月,下月)
    shell获取文件行数
    redis的备份和恢复
    redis使用Java学习
    kafka的一些常用命令
    查看kafka的group.id
    vim搜索后跳到下(上)一个
    redis批量执行
  • 原文地址:https://www.cnblogs.com/TF511/p/9954318.html
Copyright © 2011-2022 走看看