zoukankan      html  css  js  c++  java
  • Python之异步IO&RabbitMQ&Redis

    协程:

    1、单线程运行,无法实现多线程。

    2、修改数据时不需要加锁(单线程运行),子程序切换是线程内部的切换,耗时少。

    3、一个cpu可支持上万协程,适合高并发处理。

    4、无法利用多核资源,因为协程只有一个线程。

    使用yield实现协程:

    import time
    import Queue
    def consumer(name):
        print("--->starting eating baozi...")
        while True:
            new_baozi = yield
            print("[%s] is eating baozi %s" % (name,new_baozi))
            #time.sleep(1)
    
    def producer():
        r = con.next()#拥有yield的函数是迭代起,使用next()方法取值。
        r = con2.next()
        n = 0
        while n < 5:
            n +=1
            con.send(n)
            con2.send(n)
            print("33[32;1m[producer]33[0m is making baozi %s" %n )
    
    if __name__ == '__main__':
        con = consumer("c1")
        con2 = consumer("c2")
        p = producer()

    gevent:

    gevent是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

    安装第三方库:

    sudo apt-get install libevent-dev
    sudo apt-get install python-dev
    sudo easy-install gevent

    eg:

    import gevent
    
    def foo():
        print('Running in foo')
        gevent.sleep(0)
        print('Explicit context switch to foo again')
    
    def bar():
        print('Explicit context to bar')
        gevent.sleep(0)
        print('Implicit context switch back to bar')
    
    gevent.joinall([
        gevent.spawn(foo),
        gevent.spawn(bar),
    ])
    
    
    
    #执行结果:
    Running in foo
    Explicit context to bar
    Explicit context switch to foo again
    Implicit context switch back to bar

    异步IO:

    程序在遇到IO操作时,会切换执行其他任务。

    from gevent import monkey; monkey.patch_all()
    import gevent
    from  urllib2 import urlopen
    
    def f(url):
        print('GET: %s' % url)
        resp = urlopen(url)
        data = resp.read()
        print('%d bytes received from %s.' % (len(data), url))
    
    gevent.joinall([
            gevent.spawn(f, 'https://www.python.org/'),
            gevent.spawn(f, 'https://www.yahoo.com/'),
            gevent.spawn(f, 'https://github.com/'),
    ])

    事件驱动与异步IO:

    单线程、多线程与事件驱动编程模型的比较如下图:

    这个程序有3个任务需要完成,每个任务都在等待I/O操作时阻塞自身。阻塞在I/O操作上所花费的时间已经用灰色框标示出来了。

    在单线程同步模型中,任务按照顺序执行。如果某个任务因为I/O而阻塞,其他所有的任务都必须等待,直到它完成之后它们才能依次执行。这种明确的执行顺序和串行化处理的行为是很容易推断得出的。如果任务之间并没有互相依赖的关系,但仍然需要互相等待的话这就使得程序不必要的降低了运行速度。

    在多线程版本中,这3个任务分别在独立的线程中执行。这些线程由操作系统来管理,在多处理器系统上可以并行处理,或者在单处理器系统上交错执行。这使得当某个线程阻塞在某个资源的同时其他线程得以继续执行。与完成类似功能的同步程序相比,这种方式更有效率,但程序员必须写代码来保护共享资源,防止其被多个线程同时访问。多线程程序更加难以推断,因为这类程序不得不通过线程同步机制如锁、可重入函数、线程局部存储或者其他机制来处理线程安全问题,如果实现不当就会导致出现微妙且令人痛不欲生的bug。

    在事件驱动版本的程序中,3个任务交错执行,但仍然在一个单独的线程控制中。当处理I/O或者其他昂贵的操作时,注册一个回调到事件循环中(程序遇到IO操作时,会将该操作任务放到操作系统提供的高速队列中,切换去执行其他程序的非IO操作。待该IO操作结束后,需回调IO结果,这个过程中该任务需要不断的循环去询问队列中IO操作是否完成),然后当I/O操作完成时继续执行。回调描述了该如何处理某个事件。事件循环轮询所有的事件,当事件到来时将它们分配给等待处理事件的回调函数。这种方式让程序尽可能的得以执行而不需要用到额外的线程。事件驱动型程序比多线程程序更容易推断出行为,因为程序员不需要关心线程安全问题。

    当我们面对如下的环境时,事件驱动模型通常是一个好的选择:

    1. 程序中有许多任务,而且…
    2. 任务之间高度独立(因此它们不需要互相通信,或者等待彼此)而且…
    3. 在等待事件到来时,某些任务会阻塞。

    当应用程序需要在任务间共享可变的数据时,这也是一个不错的选择,因为这里不需要采用同步处理。

    网络应用程序通常都有上述这些特点,这使得它们能够很好的契合事件驱动编程模型。

     (参考连接:http://www.cnblogs.com/Anker/p/3254269.html)

    常用异步IO模型:Select、Poll和Epoll

    select:

    程序在向操作系统的队列中询问IO操作是否完成时,队列会返回给程序整个队列里所有注册的IO任务列表(程序将队列中所有IO列表copy一份)。此时程序会循环所有IO任务列表来寻找属于自己注册的那个,这样效率很低,消耗资源。系统默认打开文件个数为1024,若队列中IO个数超过此数量,还需要修改系统设置。

    poll:

    poll和select没有本质区别,只是将最大文件量的限制取消了。

    epoll:

    程序去队列中查找IO任务时,队列回返回程序一个队列的描述符(并不是返回整个队列IO列表)。epoll还采用了内存映射(mmap)技术,将内核内存(内核态)映射为一个文件,使得程序(用户态)可以直接访问,不需要再复制队列列表。

    关于这三者的比较,可以参考这篇文章:

    http://www.cnblogs.com/Anker/p/3265058.html

    Python MySQL API

    参考地址:http://www.cnblogs.com/wupeiqi/articles/5095821.html

    Redis

    下载安装:http://redis.io/

    tar zxvf redis-3.0.7.tar.gz 
    cd redis-3.0.7
    make

    启动服务端:(默认端口号为6379)

    cd src/
    ./redis-server

    启动客户端:

    ./redis-cli

    常用操作:

    127.0.0.1:6379> keys *#查看所有键值
    (empty list or set)
    127.0.0.1:6379> set name ahaii#添加值
    OK
    127.0.0.1:6379> get name#根据键查看值
    "ahaii"
    127.0.0.1:6379> set name ahaii ex 5#设置值的有效时间
    OK
    127.0.0.1:6379> get name
    "ahaii"
    127.0.0.1:6379> get name
    (nil)

     Python操作Redis:

    安装redis模块:

    pip install redis

    基本操作:

    #!/usr/bin/python
    
    import redis
    
    r = redis.Redis(host='localhost',port=6379)
    r.set('name','gom')
    print r.get('name')

    连接池:

    可以设置一个连接池,使多个redis实例共享一个连接池,避免每次建立、释放连接的开销。

    #!/usr/bin/python
    
    import redis
    
    r = redis.Redis(host='localhost',port=6379)
    r.set('name','gom')
    print r.get('name')

    常用方法:

    set:

    set(name, value, ex=None, px=None, nx=False, xx=False)

    redis中设置值时,默认若不存在则创建该值,若存在则修改该值。

      ex:过期时间(秒)

      px:过期时间(毫秒)

      nx:如果设置为True,则只有name不存在时,当前set操作才执行

      xx:如果设置为True,则只有name存在时,岗前set操作才执行

    setnx(name,value):只有name不存在时,执行添加操作

    setex(name,value,time):设置值和过期时间(秒)

    psetex(name,value,time):设置值和过期时间(毫秒)

    mset(*args, **kwargs):批量设置,如

    mset(k1='v1', k2='v2')
        或
    mget({'k1': 'v1', 'k2': 'v2'})

    get:取值

    getrange:截取一段,类似列表切片

    import redis
    
    r = redis.Redis(host='localhost',port=6379)
    r.set('id','qwerty')
    print r.getrange('id',2,4)#截取从第2到4个字符(从0开始)
    
    
    #执行结果:ert

    其他操作参考以下博客:

    http://www.cnblogs.com/wupeiqi/articles/5132791.html

    http://www.jb51.net/article/56448.htm

    Redis订阅与发布

    Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。

    Redis 客户端可以订阅任意数量的频道。

    下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:

    当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:

    实例:

    创建名为redisChat的订阅频道:

    127.0.0.1:6379> SUBSCRIBE redisChar
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "redisChar"
    3) (integer) 1

    在另外一个redis客户端中发布消息:

    127.0.0.1:6379> PUBLISH redisChar 'hello ahaii'
    (integer) 1
    127.0.0.1:6379> PUBLISH redisChar 'learn redis'
    (integer) 1

    此时,第一个客户端中就会收到这两条消息:

    1) "message"
    2) "redisChar"
    3) "hello ahaii"
    1) "message"
    2) "redisChar"
    3) "learn redis"

    RabbitMQ:

    RabbitMQ 是信息传输的中间者。本质上,他从生产者(producers)接收消息,转发这些消息给消费者(consumers)。换句话说,他能够按根据你指定的规则进行消息转发、缓冲、和持久化。

    ConnectionFactory、Connection、Channel
    ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。ConnectionFactory为Connection的制造工厂。
    Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。

    RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)可以从Queue中获取消息并消费。

    多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

    RabbitMQ安装:

    sudo apt-get install rabbitmq-server

    启动服务:

    sudo /etc/init.d/rabbitmq-server start

    安装Python的RabbitMQ API:

    sudo easy_install pika

    生产者与消费者实例:

    生产者:

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))#指定主机
    
    channel = connection.channel()#生成一个管道
    
    channel.queue_declare(queue='hello')#在管道中创建一个队列
    
    channel.basic_publish(exchange='',routing_key='hello',body='hello ahaii')
    #exchange:交换器,消息首先到达exchange,exchange会根据队列的名字,将消息转发到指定的队列。
    #routing_key:指定队列名字
    #body:消息体
    
    print 'send hello ahaii'
    connection.close()

    消费者:

    import  pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()#创建管道
    
    channel.queue_declare(queue='hello')#创建队列
    #消费者创建队列的目的是,当生产者(程序)晚于消费者启动时,该队列依然存在(由消费者创建),这样避免因队列不存在而报错
    
    def callback(ch,method,properties,body):#回调函数,参数固定
        print 'recived %s' %body
    
    channel.basic_consume(callback,queue='hello',no_ack=True)#当从hello收到消息后,调用回调函数(callback),no_ack表示是否需要确认该任务正常执行完毕
    print 'waiting for messaage:'
    channel.start_consuming()#开始阻塞,接收任务

    消息持久化:

    将消息持久化后,服务重启后,消息依然存在。

    持久化参数:

    channel.queue_declare(queue='hello', durable=True)

    参考:http://blog.csdn.net/column/details/rabbitmq.html

  • 相关阅读:
    codevs2606 约数和问题
    UOJ150 运输计划
    codevs1279 Guard 的无聊
    codevs1997 守卫者的挑战
    codevs1291 火车线路
    codevs1217 借教室
    codevs1281 Xn数列
    codevs1218 疫情控制
    codevs1199 开车旅行
    BZOJ1941 [Sdoi2010]Hide and Seek
  • 原文地址:https://www.cnblogs.com/ahaii/p/5341289.html
Copyright © 2011-2022 走看看