zoukankan      html  css  js  c++  java
  • 协程--gevent模块(单线程高并发)

    协程--gevent模块(单线程高并发)

     

    先恶补一下知识点,上节回顾

    上下文切换:当CPU从执行一个线程切换到执行另外一个线程的时候,它需要先存储当前线程的本地的数据,程序指针等,然后载入另一个线程的本地数据,程序指针等,最后才开始执行。这种切换称为“上下文切换”(“context switch”)

          CPU会在一个上下文中执行一个线程,然后切换到另外一个上下文中执行另外一个线程,上下文切换并不廉价。如果没有必要,应该减少上下文切换的发生

    进程: 一个程序需要运行所需的资源的集合
    每个进程数据是独立的
    每个进程里至少有一个线程
    每个进程里有可以多有个线程
    线程数据是共享的
    进程间共享数据的代价是高昂的,所以要尽量避免进程间的数据共享
    线程间的数据本来就是共享的
    线程要修改同一份数据,必须加锁,互斥锁mutex
    生产者消费者:1.解耦2.提高程序的运行效率,把中间等待的时间省去

    多线程场景: IO密集型,因为IO操作基本不占用CPU,所以多用在web,爬虫,socket交互
    多进程场景:CPU密集型,大数据分析,金融分析,这样用的IO就很少,因为这个进程会进行大量的运算, 但是如果切换了进程,就会变慢
     


    协程

    协程:微线程, 协程是一种用户态的轻量级线程,CPU不知道它的存在,

    协程拥有自己的寄存器上下文和栈.协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈,

    因此协程能保留上一次调用时的状态(即所有局部状态的一个特定组合,),每次过程重入时,就相当于上一次调用的状态, 也就是进入上一次离开时所处逻辑流的位置

    协程的好处:(是程序级别切换,CPU是不知道的.)

    1.无需线程上下文切换,

    2.无需原子操作锁定及同步开销  ,  什么是原子操作?  :是不需要同步的!!,是指不会被线程调度打断的操作;这种操作一旦开始,就运行到结束,中间不会有任何  context switch(切换到另一个线程,)

                             原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心

    3.方便切换控制流,简化编程模型

    4.高并发 + 高扩展 + 低成本 : 一个CPU支持上万的协程都不是问题,所以很适合用于高并发处理

    坏处-----:

    1.无法利用多核资源,协程的本质是个单线程,它不能同时将单个CPU的多个核用上, 协程需要配合进程才能在多CPU上,  适用于CPU密集型应用

    2.进程阻塞 (Blocking)  操作 如IO操作时,会阻塞掉整个程序

    ----什么条件符合才能称之为协程?

      A.必须在只有一个单线程里实现并发

      B.修改共享数据不需要加锁

      C.用户程序里自己保持多个控制流的上下文栈

      D.一个协程遇到IO操作自动切换到其他协程!!!!!!

    重点来了。。。。。 大量的模块知识点---我希望我以后还能记起来----汗颜!

    Greenlet模块

    greenlet是一个用C实现的协程模块,相比与python自带的yield,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator(生成器)

     
    from greenlet import greenlet
    
    def test1():
        print('test1:我是1')
        gr2.switch()    #切换到test2
        print('test1:我是1.1')
        gr2.switch()
    def test2():
        print('test2:我是2')
        gr1.switch()  #切换到test1
        print('test2:我是2.2')
    
    
    gr1=greenlet(test1)
    gr2=greenlet(test2)
    gr1.switch()      #先切换到test1
    >>
    test1:我是1
    test2:我是2
    test1:我是1.1
    test2:我是2.2
     

    swich()  就是切换,   按执行顺序--  但是遇到IO操作 好像并没有自动切换

    Gevent模块

    Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。  

     这里使用gevent.sleep 来获取控制权    

     
    import gevent
    
    def func1():
        print('33[31;1m我是func133[0m')
        gevent.sleep(3)
        print('33[31;1m我是func1.1--我上面有3秒33[0m')
    
    def func2():
        print('33[32;1m我是func2.33[0m')
        gevent.sleep(2)
        print('33[32;1m我是func2.1 我上面有2秒33[0m')
    
    def func3():
        print('33[32;1m我是func3.33[0m')
        gevent.sleep(2)
        print('33[32;1m我是func3.1我上面有2秒33[0m')
    
    gevent.joinall([gevent.spawn(func1),
                    gevent.spawn(func2),
                    gevent.spawn(func3),])
     

    这里会按照sleep 设置来执行      一定会先打印出func2-->func3-->func1  

    同步和异步的性能区别

     
    import gevent
    
    
    def task(pid):
        """
        Some non-deterministic task
        """
        gevent.sleep(0.5)
        print('Task %s done' % pid)
    
    
    def synchronous():
        for i in range(1, 10):
            task(i)
    
    
    def asynchronous():
        #threads = [gevent.spawn(task, i) for i in range(10)]
        threads=[]
        for i in range(10):
            threads.append(gevent.spawn(task,i))
        gevent.joinall(threads)    #执行流程只会在 所有greenlet执行完后才会继续向下走
    
    
    print('Synchronous:')
    synchronous()
    
    print('Asynchronous:')
    asynchronous()
     

    Synchrounous:定义了同步的函数:定义一个for循环。依次把内容传输给task函数,然后打印执行结果-----

    Aynchrounous:定义了异步的函数: 这里用到了一个gevent.spawn方法,就是产生的意思. gevent.joinall 也就是等待所以操作都执行完毕

                    gevent.spawn 可以调用函数

    可是我们一般也不会这么用。去故意的设置一个gevent.sleep来切换  ,下面就来在实际场景中应用

    遇到IO阻塞,自动切换任务

    这里就用到了简单的网页爬虫环境中,  操作IO的时候。自动切换。这里就用到了猴子补丁(monkey.patch_all(), 知道这是运行时,动态修改已有的代码,而不需要修改原始代码)

     
    from gevent import monkey
    import gevent
    import time
    from urllib.request import urlopen
    monkey.patch_all()
    #对比得出 协程 运行出的更快
    #IO阻塞 自动切换任务。。
    def say(url):
        print('get url',url)
        resp = urlopen(url)
        data = resp.read()
        print(len(data),url)
    t1_start = time.time()
    say('http://www.xiaohuar.com/')
    say('http://www.oldboyedu.com/')
    print("普通--time cost",time.time() - t1_start)
    
    t2_stat = time.time()
    gevent.joinall(
        [gevent.spawn(say,'http://www.xiaohuar.com/'),
         gevent.spawn(say,'http://www.oldboyedu.com/'),
         gevent.spawn(say,'http://weibo.com/MMbdzx?from=myfollow_all&is_all=1#_rnd1482040021384')]
    )
    print("gevent---time cost",time.time() - t2_stat)
     

    由于切换时再IO操作就自动完成,所以需要gevent修改py自带的标准库,这一过程在启动时通过monkey patch完成  -- 

    对比2次运行完毕的时间,很明显的看到gevent在处理上,更加有优势,

     到了这里简单的就算完了。。。来进入总结概念的部分--------http://www.cnblogs.com/zcqdream/p/6196948.html   

    通过gevent来实现单线程下的多socket并发

    server 端,采用gevent协程

     
     1 import sys
     2 import socket
     3 import time
     4 import gevent
     5  
     6 from gevent import socket,monkey
     7 monkey.patch_all()
     8  
     9  
    10 def server(port):
    11     s = socket.socket()
    12     s.bind(('0.0.0.0', port))
    13     s.listen(500)
    14     while True:
    15         cli, addr = s.accept()
    16         gevent.spawn(handle_request, cli)   #gevent.spwan调用handle参数并传参
    17  
    18  
    19  
    20 def handle_request(conn):
    21     try:
    22         while True:
    23             data = conn.recv(1024)
    24             print("recv:", data)
    25             conn.send(data)
    26             if not data:             
    27                 conn.shutdown(socket.SHUT_WR)
    28  
    29     except Exception as  ex:
    30         print(ex)
    31     finally:
    32         conn.close()
    33 if __name__ == '__main__':
    34     server(8001)
     

    client端

    单线程的客户端

     
     1 import socket
     2  
     3 HOST = 'localhost'    # The remote host
     4 PORT = 8001           # The same port as used by the server
     5 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     6 s.connect((HOST, PORT))
     7 while True:
     8     msg = bytes(input(">>:"),encoding="utf8")
     9     s.sendall(msg)
    10     data = s.recv(1024)
    11     #print(data)
    12  
    13     print('Received', repr(data))
    14 s.close()
     

    多线程客户端去请求

     
    import socket
    import threading
    
    def sock_conn():
    
        client = socket.socket()
    
        client.connect(("localhost",8001))
        count = 0
        while True:
            #msg = input(">>:").strip()
            #if len(msg) == 0:continue
            client.send( ("hello %s" %count).encode("utf-8"))
    
            data = client.recv(1024)
    
            print("[%s]recv from server:" % threading.get_ident(),data.decode()) #结果
            count +=1
        client.close()
    
    
    for i in range(100):
        t = threading.Thread(target=sock_conn)
        t.start()
     

    python3通过gevent.pool限制协程并发数量

  • 相关阅读:
    Xcode 增强开发效率的插件
    svn 常用指令
    初识JFinal
    企业竞争案例分析
    可行性研究5
    可行性研究习题4
    可行性研究课后题
    关于阅读软件工程和计算机科学技术区别的文章来谈谈自己看法
    关于中文编程是解决中国程序员效率的秘密武器的问题思考
    Java ForkJoinPool 用法及原理
  • 原文地址:https://www.cnblogs.com/leijiangtao/p/11957222.html
Copyright © 2011-2022 走看看