zoukankan      html  css  js  c++  java
  • 线程池,协程

    1.线程池:

      实例化线程池 ThreadPoolExcutor  起 5*cpu_count

      异步提交任务 submit/map

      阻塞直到任务结束 shutdown

      获取子线程的返回值 result

      回调函数 add_done_callback

    起线程池推荐使用concurrent futrues

    工作中提高效率,往往需要:

      多进程 cpu_count

      多线程 cpu_cpunt * 5

      起单个线程

    import time
    from concurrent.futures import ThreadPoolExecutor
    
    def func(i):
        print('thread',i)
        time.sleep(1)
        print('thread %s end',i)
    tp = ThreadPoolExecutor(5)
    tp.submit(func,1)
    tp.shutdown()
    print('主线程')
    

      起多个线程

    import time
    from concurrent.futures import ThreadPoolExecutor
    from threading import current_thread
    def func(i):
        print('thread',i)
        time.sleep(1)
        print('thread %s end',i,current_thread())
    tp = ThreadPoolExecutor(5) #5个线程
    for i in range(20): #20个任务
        tp.submit(func,i)
    tp.shutdown()
    print('主线程')
    

      获取线程池返回值

    import time
    from concurrent.futures import ThreadPoolExecutor
    from threading import current_thread
    def func(i):
        print('thread',i)
        time.sleep(1)
        print('thread %s end',i,current_thread())
        return i*'*'
    tp = ThreadPoolExecutor(5) #5个线程
    ret_1 = []
    for i in range(20): #20个任务
        ret = tp.submit(func,i)
        # print(ret)#<Future at 0x1c984d42b00 state=pending>类似回执
        # print(ret.result()) 这样取值,异步变同步
        ret_1.append(ret)
    for ret in ret_1:
        print(ret.result())
    tp.shutdown()
    print('主线程')
    

      map简化操作

    import time
    from concurrent.futures import ThreadPoolExecutor
    from threading import current_thread
    def func(i):
        print('thread',i)
        time.sleep(1)
        print('thread %s end',i,current_thread())
        return i*'*'
    tp = ThreadPoolExecutor(5) #5个线程
    res = tp.map(func,range(10))#map没有办法添加回调函数
    for i in res: print(i)

      线程池回调函数:与进程不同,线程回调函数由子线程实现

    import time
    from concurrent.futures import  ThreadPoolExecutor
    from threading import current_thread
    def func(i):
        print('thread',i,current_thread())
        time.sleep(1)
        print('thread %s end' % i)
        return i*'*'
    
    def call_back(arg):
        print(arg.result())
    
    tp = ThreadPoolExecutor(5)
    for i in range(20):
        ret = tp.submit(func,i).add_done_callback(call_back)
    print('主线程')
    

      进程池回调函数

    import time
    from concurrent.futures import  ThreadPoolExecutor,ProcessPoolExecutor
    from threading import current_thread
    def func(i):
        print('thread',i,current_thread())
        time.sleep(1)
        print('thread %s end' % i)
        return i*'*'
    
    def call_back(arg):
        print(arg.result())
    if __name__ == '__main__':
        tp = ProcessPoolExecutor(5)
        for i in range(20):
            ret = tp.submit(func,i).add_done_callback(call_back)
        print('主线程')
    

    2.协程 :纤程,比线程更小的单位,cpu没有办法看到

      一条线程在多个任务之间来回切换

      切换这个动作是耗费时间的

      对于cpu,操作系统来说,协程是不存在的

      它们只能看到线程

    yield

    def consumer():
        p = producer()
        for num in p:
            print(num)
    def producer():
        for i in range(1000):
            yield i
    consumer()
    

    协程的优势:

    更好的利用协程 

    1

    一个线程的执行明确的切分开

    两个任务,帮助你记住那个任务执行到那个位置上了,并且实现安全的切换

    当一个任务不得不陷入阻塞的时候,在这个任务阻塞的时候,切换到另一个任务继续执行

    你的程序只要还有任务需要执行,你的当前线程永远不会阻塞

    2

     利用协程在多个任务陷入阻塞的时候进行切换来保证一个线程在处理多个任务的时候总是忙碌

    能够更加充分的利用cpu,抢占跟多的时间片

     3

    无论是进程或是线程,都是由操作系统来进行切换的,开启过多的进程或线程,会给操作系统的调度带来压力

    如果开启的是协程,,协程在程序之间的切换操作系统感知不到,无论开启多少个协程,对于操作系统来说都是开启一个线程

    系统的调度不会感到有压力

    协程的本质就是一条线程,不会产生数据安全问题

    3,协程模块:

      greenlet   gevent的底层,协程,切换的模块

      gevent   直接用的,gevent能够提供更全面的功能

    from greenlet import greenlet
    def eat():
        print('eatting1')
        g2.switch() #切换
        print('eatting2')
    
    def play():
        print('playing1')
        print('playing2')
        # g1.switch()
    g1 = greenlet(eat)
    g2 = greenlet(play)
    g1.switch() 
    

        gevent 切换

    def eat():
        print('eatting1')
        time.sleep(1)
        print('eatting2')
    
    def play():
        print('palying1')
        time.sleep(1)
        print('palying2')
    g1 = gevent.spawn(eat) #自动的检测阻塞事件,遇见阻塞了就会进行切换,不然不执行
    g2 = gevent.spawn(play)  #有些阻塞gevent不认识
    g1.join()  #阻塞知道g1结束,
    g2.join()
    

      from gevent import monkey

      monkey.patch_all()

      记录一下模块,使gevent都认识

    from gevent import monkey #记录一下模块,使gevent都认识
    monkey.patch_all()
    import time
    import gevent
    
    def eat():
        print('eatting1')
        time.sleep(1)
        print('eatting2')
    
    def play():
        print('palying1')
        time.sleep(1)
        print('palying2')
    g1 = gevent.spawn(eat) #自动的检测阻塞事件,遇见阻塞了就会进行切换,不然不执行
    g2 = gevent.spawn(play)  #有些阻塞gevent不认识
    g1.join()  #阻塞知道g1结束,
    g2.join()
    

      

    spawn(函数名) 产生了一个协程任务,在遇到IO操作的时候帮助我们在多任务之间进行切换

    join() 阻塞,直到某个任务被执行完毕

    join_all

    value属性  获取返回值

    from gevent import monkey;monkey.patch_all()
    import time
    import gevent
    def eat():
        print('eat 1')
        time.sleep(1)
        print('eat 2')
        return 'eating end'
    def play():
        print('play 1')
        time.sleep(1)
        print('play 2')
        return 'play finished'
    g1 = gevent.spawn(eat)
    g2 = gevent.spawn(play)
    gevent.joinall([g1,g2])
    print(g1.value)
    print(g2.value)
    

      

    爬虫

    from gevent import monkey;monkey.patch_all()
    import gevent
    import time
    import requests
    url_lst = ['http://www.baidu.com',
                'http://www.4399.com',
                'http://www.7k7k.com',
                'http://www.sogou.com',
                'http://www.sohu.com',
                'http://www.sina.com',
                'http://www.jd.com']
    def get_url(url):
        response  = requests.get(url)
        if response.status_code == 200:
            print(url,len(response.text))
    
    start = time.time()
    g_lst = []
    for url in url_lst:
        ret = gevent.spawn(get_url,url)
        g_lst.append(ret)
    gevent.joinall(g_lst)
    print(time.time() - start)
    

    协程来实现一个并发的socketserver

    # from gevent import monkey;monkey.patch_all()
    #  import sockt
    from gevent import socket #相当于monkey....
    import gevent
    sk = socket.socket()
    sk.bind(('192.168.16.33',8088))
    sk.listen()
    
    def server_talk(conn):
        while True:
            conn.send(b'byebye')
            print(conn.recv(1024))
    
    while True:
        conn,addr = sk.accept()
        gevent.spawn(server_talk,conn)
    
    # 主进程accept 基本上都是阻塞
    

      

    import socket
    from threading import Thread
    
    def client():
        sk = socket.socket()
        sk.connect(('192.168.16.33', 8088))
        while True:
            print(sk.recv(1024))
            sk.send(b'hello')
    for i in range(300):
        Thread(target=client).start()
    

      

  • 相关阅读:
    Oracle学习笔记--Oracle启动过程归纳整理
    Oracle 11g rac开启归档
    Oracle 11g rac中关于crsctl stop cluster/crs/has的区别
    Linux环境下安装MySQL 5.7.28
    将root用户权限赋予普通用户
    用Navicat Premium 连接mysql数据库时报错 -- 1130 Host xxxx is not allowed to connect to this MySQL server
    PCoIP vs HDX (CITRIX ICA)远程访问协议对比
    systemctl
    组播查询命令
    默认路由ip default-network和ip route 0.0.0.0 0.0.0.0区别
  • 原文地址:https://www.cnblogs.com/lijinming110/p/9708252.html
Copyright © 2011-2022 走看看