zoukankan      html  css  js  c++  java
  • day53-线程池、协程、IO

    #1、from concurrent import futures可以开启进程池和线程池。concurrent是包,futures是模块,ThreadPoolExecutor是类,submit是方法。
    #submit创建和开启子线程:
    from concurrent import futures
    import time
    import random
    
    def func(n):
        print(n)
        time.sleep(random.randint(1,3))#看效果:一开始先执行5个线程,后来谁先执行完就结束,轮到下一个线程执行。
    
    p = futures.ThreadPoolExecutor(5) #线程池里有5个线程。
    for i in range(10):              #开启10个子线程。
        p.submit(func,'hello,world')  #submit合并了创建线程对象和start的功能。
    
    #2、result获取返回值,shutdown封装了close和join:
    #主线程传参给子线程处理数据,子线程把值返回给主线程。
    from concurrent import futures
    def func(n):
        print(n)
        return n + 1
    
    thread_pool = futures.ThreadPoolExecutor(5)
    t_lst = []
    for i in range(10):
        t = thread_pool.submit(func,1) #submit提交任务
        t_lst.append(t)
    thread_pool.shutdown() #shutdown封装了close()和join(),意思是线程池关闭继续放线程的功能,
                           # 主线程阻塞在这里,等待子线程全部结束之后才变成非阻塞,下面的代码才能继续执行。
                           #这样操作的结果就是,先打印出n最后才打印n+1。
    for t in t_lst:
        print(t.result()) #获取返回值
    
    #3、map创建和开启子线程,后面必须是可迭代的,不可以接收返回值,所以如果func有return返回值,是无法接收的。
    from concurrent import futures
    def func(i):
        print(i)
    
    thread_pool = futures.ThreadPoolExecutor(5)
    thread_pool.map(func,range(10))
    
    #4、回调函数:add_done_callback(call)
    #call的args接收func的返回值
    from concurrent import futures
    def func(i):
        return i*'*'
    
    def call(args):
        print(args.result())
    
    thread_pool = futures.ThreadPoolExecutor(5)
    for i in range(10):
        t = thread_pool.submit(func,i)
        t.add_done_callback(call)

    #5、from concurrent import futures 还可以开启进程,只需要把上面的ThreadPoolExecutor修改为ProcessPoolExecutor,再加上
    #if __name__ == '__main__'就可以了,其他代码都不需要修改。
    if __name__ == '__main__':
    p = futures.ProcessPoolExecutor()

    #6、进程和线程都不能无限开启,进程数量 = CPU+1 ,线程数量 = CPU*5

    协程:
    
    #1、greenlet:在单线程中切换状态的模块
    from greenlet import greenlet
    def eat():
        print('a')
        g2.switch()
        print('c')
        g2.switch()
    
    def eat2():
        print('b')
        g1.switch()
        print('d')
    
    g1 = greenlet(eat)
    g2 = greenlet(eat2)
    g1.switch()
    
    #2、gevent的底层是greenlet
    import gevent
    def func():
        print(1)
        gevent.sleep(1) #如果这里是time.sleep(1)是无法切换的,因为gevent只能识别它内置的IO才会自动切换。
        print(3)
    
    def func1():
        print(2)
        gevent.sleep(1)
        print(4)
    
    g1 = gevent.spawn(func)
    g2 = gevent.spawn(func1)
    # g1.join()
    # g2.join()
    gevent.joinall([g1,g2]) #相当于g1.join()和g2.join(),主协程等待子协程的结束而结束。
    # 1
    # 2
    # 3
    # 4
    
    #3、第一二句代码必须写在最上面,意思是把IO都打成一个包,这样下面的time.sleep就可以识别了,然后就可以切换了。
    #第一二句代码能识别gevent内置当中的IO,以及导入模块当中的IO,导入的模块目前学习到的是:time,socket,urllib,requests。
    from gevent import monkey
    monkey.patch_all()
    import time
    import gevent
    def func(n):
        print(n)
        time.sleep(1)
        print(3)
    
    def func1():
        print(2)
        time.sleep(1)
        print(4)
    
    g1 = gevent.spawn(func,1)#创建协程对象,g1 = gevent.spawn(func,1,2,3,x=1),可以是位置实参或关键字实参。
    g2 = gevent.spawn(func1)
    gevent.joinall([g1,g2])
    # 1
    # 2
    # 3
    # 4
    
    #4、协程的本质就是在单线程下,由用户自己控制一个任务遇到io阻塞了就切换另外一个任务去执行,以此来提升效率。
    
    #5、协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是协程:
    # 协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。
    
    #6、需要强调的是:
    #1. python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)
    #2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)
    
    #7、对比操作系统控制线程的切换,用户在单线程内控制协程的切换,
    #优点如下:1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
    #         2. 单线程内就可以实现并发的效果,最大限度地利用cpu
    #缺点如下:1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程
    #         2. 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程
    
    #8、server:
    from gevent import monkey
    monkey.patch_all()
    import socket
    import gevent
    
    def talk(conn):
        while True:
            ret = conn.recv(1024).decode('utf-8')
            print(ret)
            conn.send(ret.upper().encode('utf-8'))
        conn.close()
    
    sk = socket.socket()
    sk.bind(('127.0.0.1',8080))
    sk.listen()
    while True:                  #每接收一个连接就开启一个协程。
        conn,addr = sk.accept()
        gevent.spawn(talk,conn)
    
    sk.close()
    
    #client:
    import socket
    from threading import Thread
    
    def talk():
        sk = socket.socket()
        sk.connect(('127.0.0.1', 8080))
        while True:
            sk.send(b'hi')
            ret = sk.recv(1024).decode('utf-8')
            print(ret)
        sk.close()
    
    for i in range(500):
        Thread(target=talk).start()
    
    
    IO

    #阻塞IO:工作效率低
    #非阻塞IO:工作效率高,CPU负担大,不建议使用。
    #IO多路复用:在有多个对象需要IO阻塞的时候,能够有效的减少阻塞带来的时间损耗,在一定程度上减少CPU的负担。选择这个比较好。
    #异步IO:工作效率高,CPU负担小

    #IO多路复用:select:选择
    #步骤:1、select监视read_lst里的每个对象,当发现client十个线程请求连接,sk会收到信号,每循环一次增加一个conn,
    # 十次就是10个conn,这时read_lst列表里面有一个sk和10个conn,sk是不变的。
    # 2、接着client每一次发消息过来,conn收到信号,而select每一次都会选择把10个conn中的一个或者多个放在rl里面,不会放sk,
    # 接着就执行else下面的代码。
    #server:
    import socket
    import select
    sk = socket.socket()
    sk.bind(('127.0.0.1',8080))
    sk.listen()

    read_lst = [sk]
    count = 1
    while True:
    rl,wl,xl = select.select(read_lst,[],[])#select阻塞,rl读列表,wl写列表,xl修改列表。
    print(count,rl)
    count += 1
    for item in rl:
    if item == sk:
    conn,addr = item.accept()
    print(conn)
    read_lst.append(conn)
    print(read_lst)
    else:
    ret = item.recv(1024).decode('utf-8')
    if not ret: #如果ret是空的
    item.close() #关闭conn
    read_lst.remove(item) #列表删除这个conn,避免下次重复被使用
    else: #ret不是空的
    print(ret)
    item.send('recv succeed'.encode('utf-8'))


    #client:
    import socket
    from threading import Thread
    import time
    def talk(args):
    sk = socket.socket()
    sk.connect(('127.0.0.1',8080))
    for i in range(10):
    time.sleep(2)
    sk.send(('线程%s,第%s次对话'%(args,i)).encode('utf-8'))
    ret = sk.recv(1024).decode('utf-8')
    print(ret)
    sk.close()
    for i in range(10):
    Thread(target=talk,args=(i,)).start()
    
    
     
  • 相关阅读:
    《网络是怎样连接的》读书笔记一
    移植mplayer到开发板
    解决ubuntu安装ssh服务无法打开解析包问题
    嵌入式软件工程师面经
    c语言-数组、指针面试题
    Linux命令- echo、grep 、重定向、1>&2、2>&1的介绍
    回调函数的作用
    数据结构-单链表回环函数判断
    算法-一步步教你如何用c语言实现堆排序(非递归)
    算法-快速排序
  • 原文地址:https://www.cnblogs.com/python-daxiong/p/12142818.html
Copyright © 2011-2022 走看看