zoukankan      html  css  js  c++  java
  • day38——线程queue、事件event、协程

    day38

    线程queue

    多线程抢占资源

    只能让其串行——用到互斥锁

    线程queue
    • 队列——先进先出(FIFO)
    import queue
    q = queue.Queue(3)
    q.put(1)
    q.put(2)
    q.put(3)
    # q.put(4)  # 阻塞等其他进程或者线程来拿
    print(q.get())
    print(q.get())
    print(q.get())
    # print(q.get(block=False))  # 没有值就直接报错
    # q.get(timeout=2)  # 阻塞2s,还没有值直接报错
    
    • 堆栈——先进后出(LIFO)

    import queue
    q = queue.LifoQueue(4)
    q.put(1)
    q.put(2)
    q.put("alex")
    q.put("太白")
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    结果:
    太白
    alex
    2
    1
    
    • 优先级队列——自己设置优先级
    import queue
    q = queue.PriorityQueue(4)
    q.put((5, "元宝"))
    q.put((-2, "狗狗"))
    q.put((0, "2李业"))
    q.put((0, "1刚哥"))
    print(q.get())
    print(q.get())
    print(q.get())  # 数字越小就先出去,相同数字按照asill码来排序
    

    事件event

    开启两个线程,一个线程运行到中间的某个阶段,触发另个线程执行,两个线程增加了耦合性

    版本一
    from threading import Thread
    from threading import current_thread
    import time
    
    flag = False
    
    
    def check():
        print(f"{current_thread().name} 监测服务器是否开启。。。")
        time.sleep(3)
        global flag
        flag = True
        print("服务器已经开启。。。")
    
    
    def connect():
        while 1:
            print(f"{current_thread().name} 等待连接。。。")
            time.sleep(0.5)
            if flag:
                print(f"{current_thread().name} 连接成功。。。")
                break
    
    
    t1 = Thread(target=check)
    t2 = Thread(target=connect)
    t1.start()
    t2.start()
    
    版本二——事件event
    from threading import Thread
    from threading import current_thread
    from threading import Event
    import time
    
    event = Event()
    
    
    def check():
        print(f"{current_thread().name} 监测服务器是否开启。。。")
        time.sleep(3)
        # print(event.is_set())
        event.set()
        # print(event.is_set())
        print("服务器已经开启。。。")
    
    
    def connect():
        print(f"{current_thread().name} 等待连接。。。")
        event.wait()  # 阻塞直到event.set() 方法之后
        # event.wait(1)  # 只阻塞1s,1s之后如果还没有进行set 直接进行下一步操作
        print(f"{current_thread().name} 连接成功。。。")
    
    
    t1 = Thread(target=check)
    t2 = Thread(target=connect)
    t1.start()
    t2.start()
    

    要求:一个线程监测服务器是否开始,另一个线程判断如果开始了,则显示连接成功,此线程只尝试连接3次,1s一次,如果超过3次,还没有连接成功,则显示连接失败

    from threading import Thread
    from threading import current_thread
    from threading import Event
    import time
    
    event = Event()
    
    
    def check():
        print(f"{current_thread().name} 监测服务器是否开启")
        time.sleep(2)
        event.set()
    
    
    def connect():
        print(f"{current_thread().name} 等待连接,,,")
        for i in range(3):
            event.wait(1)
            if event.is_set():
                print("服务器已经开启")
                print(f"{current_thread().name} 连接成功")
                break
            else:
                print(f"{current_thread().name} 连接失败{i+1}次")
    
    
    t1 = Thread(target=check)
    t2 = Thread(target=connect)
    t1.start()
    t2.start()
    

    协程

    协程详情:https://www.cnblogs.com/jin-xin/articles/11245654.html

    一个线程并发的处理任务

    • 串行:一个线程执行一个任务,执行完毕之后,执行下一个任务
    • 并行:多个CPU执行多个任务,4个CPU执行4个任务
    • 并发:一个CPU执行多个任务,看起来像是同时运行

    并发真正的核心:切换CPU+保持状态

    多线程的并发:3个线程处理10个任务,如果线程1处理的这个任务遇到阻塞,CPU被操作系统切换到另一个线程

    一个线程能否并发的处理任务??

    一个线程处理三个任务

    单个CPU:10个任务,让你给我并发执行这10个任务:

    • 方式一:开启多进程并发执行,操作系统切换+保持状态
    • 方式二:开启多线程并发执行,操作系统切换+保持状态
    • 方式三:开启协程并发的执行,自己的程序把控着CPU在3个任务之间来回切换+保持状态

    对3详细解释:协程他切换速度非常快,蒙蔽操作系统的眼睛,让操作系统认为CPU一直再运行你这个线程(协程)

    协程方式最好,为什么?

    优点:

    • 开销小
    • 运行速度快
    • 协程会长期霸占CPU只执行我程序里面的所有任务

    缺点:

    协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程

    协程处理IO密集型好,计算密集型还是串行好

    什么是协程?

    单个线程并发的处理多个任务,程序控制协程的切换+保持状态

    协程的特点
    • 必须在只有一个单线程里实现并发
    • 修改共享数据不需加锁
    • 用户程序里自己保存多个控制流的上下文栈(保持状态)
    • 一个协程遇到IO会自动切换到其他任务
    工作中

    一般在工作中我们都是进程+线程+协程的方式来实现并发,以达到最好的并发效果,如果是4核的cpu,一般起5个进程,每个进程中20个线程(5倍cpu数量),每个线程可以起500个协程,大规模爬取页面的时候,等待网络延迟的时间的时候,我们就可以用协程去实现并发。 并发数量 = 5 * 20 * 500 = 50000个并发,这是一般一个4cpu的机器最大的并发数。nginx在负载均衡的时候最大承载量就是5w个单线程里的这20个任务的代码通常会既有计算操作又有阻塞操作,我们完全可以在执行任务1时遇到阻塞,就利用阻塞的时间去执行任务2。。。。如此,才能提高效率,这就用到了Gevent模块。

    之前学的代码有没有切换:
    def func1():
        print("in func1")
    
    
    def func2():
        print("in func2")
        func1()
        print("end")
    
    
    func2()
    
    切换 + 保持状态:遇到IO不会主动切换
    def gen():
        while 1:
            yield 1
            print(333)
    
    
    def func():
        obj = gen()
        for i in range(10):
            print(next(obj))
    
    
    func()
    
    greenlet——协程底层技术
    from greenlet import greenlet
    import time
    
    
    def eat(name):
        print(f"{name} eat 1")  # 2
        g2.switch("taibai")  # 3
        # time.sleep(3)
        print(f"{name} eat 2")  # 6
        g2.switch()  # 7
    
    
    def play(name):
        print(f"{name} play 1")  # 4
        g1.switch()  # 5
        print(f"{name} play 2")  # 8
    
    
    g1 = greenlet(eat)
    g2 = greenlet(play)
    
    g1.switch("taibai")  # 切换到eat任务 1
    
    协程low版
    • 模拟的阻塞
    import gevent
    import time
    from threading import current_thread
    
    
    def eat(name):
        print(f"{name} eat 1")  # 2
        print(current_thread().name)  # 3
        gevent.sleep(2)
        # time.sleep(2)
        print(f"{name} eat 2")  # 7
    
    
    def play(name):
        print(f"{name} play 1")  # 4
        print(current_thread().name)  # 5
        gevent.sleep(1)
        # time.sleep(1)
        print(f"{name} play 2")  # 6
    
    
    g1 = gevent.spawn(eat, "egon")
    g2 = gevent.spawn(play, "egon")
    print(f"主{current_thread().name}")  # 1
    g1.join()
    g2.join()
    结果:
    主MainThread
    egon eat 1
    MainThread
    egon play 1
    MainThread
    egon play 2
    egon eat 2
    
    • 真正的阻塞
    import gevent
    import time
    from threading import current_thread
    
    
    def eat(name):
        print(f"{name} eat 1")  # 2
        print(current_thread().name)  # 3
        # gevent.sleep(2)
        time.sleep(2)
        print(f"{name} eat 2")  # 4
    
    
    def play(name):
        print(f"{name} play 1")  # 5
        print(current_thread().name)  # 6
        # gevent.sleep(1)
        time.sleep(1)
        print(f"{name} play 2")  # 7
    
    
    g1 = gevent.spawn(eat, "egon")
    g2 = gevent.spawn(play, "egon")
    print(f"主{current_thread().name}")  # 1
    g1.join()
    g2.join()
    结果:
    主MainThread
    egon eat 1
    MainThread
    egon eat 2
    egon play 1
    MainThread
    egon play 2
    
    最终版本
    import gevent
    import time
    from gevent import monkey
    monkey.patch_all()  # 打补丁:将下面所有任务的阻塞都打上标记
    
    
    def eat(name):
        print(f"{name} eat 1")  # 1
        time.sleep(2)
        print(f"{name} eat 2")  # 4
    
    
    def play(name):
        print(f"{name} play 1")  # 2
        time.sleep(1)
        print(f"{name} play 2")  # 3
    
    
    g1 = gevent.spawn(eat, "egon")
    g2 = gevent.spawn(play, "egon")
    
    # g1.join()
    # g2.join()
    gevent.joinall([g1, g2])
    结果:
    egon eat 1
    egon play 1
    egon play 2
    egon eat 2
    
    协程的应用

    爬虫

    from gevent import monkey;monkey.patch_all()
    import gevent
    import requests
    import time
    
    def get_page(url):
        print('GET: %s' %url)
        response=requests.get(url)
        if response.status_code == 200:
            print('%d bytes received from %s' %(len(response.text),url))
    
    
    start_time=time.time()
    gevent.joinall([
        gevent.spawn(get_page,'https://www.python.org/'),
        gevent.spawn(get_page,'https://www.yahoo.com/'),
        gevent.spawn(get_page,'https://github.com/'),
    ])
    stop_time=time.time()
    print('run time is %s' %(stop_time-start_time))
    
    结果:
    GET: https://www.python.org/
    GET: https://www.yahoo.com/
    GET: https://github.com/
    48919 bytes received from https://www.python.org/
    87845 bytes received from https://github.com/
    515896 bytes received from https://www.yahoo.com/
    run time is 2.729017734527588
    

    通过gevent实现单线程下的socket并发(from gevent import monkey;monkey.patch_all()一定要放到导入socket模块之前,否则gevent无法识别socket的阻塞)

    一个网络请求里面经过多个时间延迟time

    server

    from gevent import monkey;monkey.patch_all()
    from socket import *
    import gevent
    
    #如果不想用money.patch_all()打补丁,可以用gevent自带的socket
    # from gevent import socket
    # s=socket.socket()
    
    def server(server_ip,port):
        s=socket(AF_INET,SOCK_STREAM)
        s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
        s.bind((server_ip,port))
        s.listen(5)
        while True:
            conn,addr=s.accept()
            gevent.spawn(talk,conn,addr)
    
    def talk(conn,addr):
        try:
            while True:
                res=conn.recv(1024)
                print('client %s:%s msg: %s' %(addr[0],addr[1],res))
                conn.send(res.upper())
        except Exception as e:
            print(e)
        finally:
            conn.close()
    
    if __name__ == '__main__':
        server('127.0.0.1',8080)
    
    

    client

    from socket import *
    
    client=socket(AF_INET,SOCK_STREAM)
    client.connect(('127.0.0.1',8080))
    
    
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
    
        client.send(msg.encode('utf-8'))
        msg=client.recv(1024)
    

    或多线程并发多个客户端,去请求上面的服务端是没问题的

    from threading import Thread
    from socket import *
    import threading
    
    def client(server_ip,port):
        c=socket(AF_INET,SOCK_STREAM) #套接字对象一定要加到函数内,即局部名称空间内,放在函数外则被所有线程共享,则大家公用一个套接字对象,那么客户端端口永远一样了
        c.connect((server_ip,port))
    
        count=0
        while True:
            c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8'))
            msg=c.recv(1024)
            print(msg.decode('utf-8'))
            count+=1
    if __name__ == '__main__':
        for i in range(500):
            t=Thread(target=client,args=('127.0.0.1',8080))
            t.start()
    

    协程的另外一个模块asyncio

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    # import asyncio
    
    # 起一个任务.
    # async def demo():   # 协程方法
    #     print('start')
    #     await asyncio.sleep(1)  # 阻塞
    #     print('end')
    
    # loop = asyncio.get_event_loop()  # 创建一个事件循环
    # loop.run_until_complete(demo())  # 把demo任务丢到事件循环中去执行
    
    # 启动多个任务,并且没有返回值
    # async def demo():   # 协程方法
    #     print('start')
    #     await asyncio.sleep(1)  # 阻塞
    #     print('end')
    #
    # loop = asyncio.get_event_loop()  # 创建一个事件循环
    # wait_obj = asyncio.wait([demo(),demo(),demo()])
    # loop.run_until_complete(wait_obj)
    
    # 启动多个任务并且有返回值
    # async def demo():   # 协程方法
    #     print('start')
    #     await asyncio.sleep(1)  # 阻塞
    #     print('end')
    #     return 123
    #
    # loop = asyncio.get_event_loop()
    # t1 = loop.create_task(demo())
    # t2 = loop.create_task(demo())
    # tasks = [t1,t2]
    # wait_obj = asyncio.wait([t1,t2])
    # loop.run_until_complete(wait_obj)
    # for t in tasks:
    #     print(t.result())
    
    # 谁先回来先取谁的结果
    # import asyncio
    # async def demo(i):   # 协程方法
    #     print('start')
    #     await asyncio.sleep(10-i)  # 阻塞
    #     print('end')
    #     return i,123
    #
    # async def main():
    #     task_l = []
    #     for i in range(10):
    #         task = asyncio.ensure_future(demo(i))
    #         task_l.append(task)
    #     for ret in asyncio.as_completed(task_l):
    #         res = await ret
    #         print(res)
    #
    # loop = asyncio.get_event_loop()
    # loop.run_until_complete(main())
    
    
    
    # import asyncio
    #
    # async def get_url():
    #     reader,writer = await asyncio.open_connection('www.baidu.com',80)
    #     writer.write(b'GET / HTTP/1.1
    HOST:www.baidu.com
    Connection:close
    
    ')
    #     all_lines = []
    #     async for line in reader:
    #         data = line.decode()
    #         all_lines.append(data)
    #     html = '
    '.join(all_lines)
    #     return html
    #
    # async def main():
    #     tasks = []
    #     for url in range(20):
    #         tasks.append(asyncio.ensure_future(get_url()))
    #     for res in asyncio.as_completed(tasks):
    #         result = await res
    #         print(result)
    #
    #
    # if __name__ == '__main__':
    #     loop = asyncio.get_event_loop()
    #     loop.run_until_complete(main())  # 处理一个任务
    
    
    # python原生的底层的协程模块
        # 爬虫 webserver框架
        # 题高网络编程的效率和并发效果
    # 语法
        # await 阻塞 协程函数这里要切换出去,还能保证一会儿再切回来
        # await 必须写在async函数里,async函数是协程函数
        # loop 事件循环
        # 所有的协程的执行 调度 都离不开这个loop
    
    
  • 相关阅读:
    【hibernate】自定义转换器
    【hibernate】存储图片
    【hibernate】映射可嵌入式组件
    【hibernate】应用程序级别的视图
    adb shell模拟点击事件(input tap)
    Android UIAutomator 定位
    adb devices连接不上设备
    获取元素属性get_attribute
    wait_activity
    webview定位 & native和webview切换
  • 原文地址:https://www.cnblogs.com/NiceSnake/p/11432238.html
Copyright © 2011-2022 走看看