zoukankan      html  css  js  c++  java
  • Python之异步IO&RabbitMQ&Redis

    协程:

    1、单线程运行,无法实现多线程。

    2、修改数据时不需要加锁(单线程运行),子程序切换是线程内部的切换,耗时少。

    3、一个cpu可支持上万协程,适合高并发处理。

    4、无法利用多核资源,因为协程只有一个线程。

    使用yield实现协程:

    import time
    import Queue
    def consumer(name):
        print("--->starting eating baozi...")
        while True:
            new_baozi = yield
            print("[%s] is eating baozi %s" % (name,new_baozi))
            #time.sleep(1)
    
    def producer():
        r = con.next()#拥有yield的函数是迭代起,使用next()方法取值。
        r = con2.next()
        n = 0
        while n < 5:
            n +=1
            con.send(n)
            con2.send(n)
            print("33[32;1m[producer]33[0m is making baozi %s" %n )
    
    if __name__ == '__main__':
        con = consumer("c1")
        con2 = consumer("c2")
        p = producer()

    gevent:

    gevent是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

    安装第三方库:

    sudo apt-get install libevent-dev
    sudo apt-get install python-dev
    sudo easy-install gevent

    eg:

    import gevent
    
    def foo():
        print('Running in foo')
        gevent.sleep(0)
        print('Explicit context switch to foo again')
    
    def bar():
        print('Explicit context to bar')
        gevent.sleep(0)
        print('Implicit context switch back to bar')
    
    gevent.joinall([
        gevent.spawn(foo),
        gevent.spawn(bar),
    ])
    
    
    
    #执行结果:
    Running in foo
    Explicit context to bar
    Explicit context switch to foo again
    Implicit context switch back to bar

    异步IO:

    程序在遇到IO操作时,会切换执行其他任务。

    from gevent import monkey; monkey.patch_all()
    import gevent
    from  urllib2 import urlopen
    
    def f(url):
        print('GET: %s' % url)
        resp = urlopen(url)
        data = resp.read()
        print('%d bytes received from %s.' % (len(data), url))
    
    gevent.joinall([
            gevent.spawn(f, 'https://www.python.org/'),
            gevent.spawn(f, 'https://www.yahoo.com/'),
            gevent.spawn(f, 'https://github.com/'),
    ])

    事件驱动与异步IO:

    单线程、多线程与事件驱动编程模型的比较如下图:

    这个程序有3个任务需要完成,每个任务都在等待I/O操作时阻塞自身。阻塞在I/O操作上所花费的时间已经用灰色框标示出来了。

    在单线程同步模型中,任务按照顺序执行。如果某个任务因为I/O而阻塞,其他所有的任务都必须等待,直到它完成之后它们才能依次执行。这种明确的执行顺序和串行化处理的行为是很容易推断得出的。如果任务之间并没有互相依赖的关系,但仍然需要互相等待的话这就使得程序不必要的降低了运行速度。

    在多线程版本中,这3个任务分别在独立的线程中执行。这些线程由操作系统来管理,在多处理器系统上可以并行处理,或者在单处理器系统上交错执行。这使得当某个线程阻塞在某个资源的同时其他线程得以继续执行。与完成类似功能的同步程序相比,这种方式更有效率,但程序员必须写代码来保护共享资源,防止其被多个线程同时访问。多线程程序更加难以推断,因为这类程序不得不通过线程同步机制如锁、可重入函数、线程局部存储或者其他机制来处理线程安全问题,如果实现不当就会导致出现微妙且令人痛不欲生的bug。

    在事件驱动版本的程序中,3个任务交错执行,但仍然在一个单独的线程控制中。当处理I/O或者其他昂贵的操作时,注册一个回调到事件循环中(程序遇到IO操作时,会将该操作任务放到操作系统提供的高速队列中,切换去执行其他程序的非IO操作。待该IO操作结束后,需回调IO结果,这个过程中该任务需要不断的循环去询问队列中IO操作是否完成),然后当I/O操作完成时继续执行。回调描述了该如何处理某个事件。事件循环轮询所有的事件,当事件到来时将它们分配给等待处理事件的回调函数。这种方式让程序尽可能的得以执行而不需要用到额外的线程。事件驱动型程序比多线程程序更容易推断出行为,因为程序员不需要关心线程安全问题。

    当我们面对如下的环境时,事件驱动模型通常是一个好的选择:

    1. 程序中有许多任务,而且…
    2. 任务之间高度独立(因此它们不需要互相通信,或者等待彼此)而且…
    3. 在等待事件到来时,某些任务会阻塞。

    当应用程序需要在任务间共享可变的数据时,这也是一个不错的选择,因为这里不需要采用同步处理。

    网络应用程序通常都有上述这些特点,这使得它们能够很好的契合事件驱动编程模型。

     (参考连接:http://www.cnblogs.com/Anker/p/3254269.html)

    常用异步IO模型:Select、Poll和Epoll

    select:

    程序在向操作系统的队列中询问IO操作是否完成时,队列会返回给程序整个队列里所有注册的IO任务列表(程序将队列中所有IO列表copy一份)。此时程序会循环所有IO任务列表来寻找属于自己注册的那个,这样效率很低,消耗资源。系统默认打开文件个数为1024,若队列中IO个数超过此数量,还需要修改系统设置。

    poll:

    poll和select没有本质区别,只是将最大文件量的限制取消了。

    epoll:

    程序去队列中查找IO任务时,队列回返回程序一个队列的描述符(并不是返回整个队列IO列表)。epoll还采用了内存映射(mmap)技术,将内核内存(内核态)映射为一个文件,使得程序(用户态)可以直接访问,不需要再复制队列列表。

    关于这三者的比较,可以参考这篇文章:

    http://www.cnblogs.com/Anker/p/3265058.html

    Python MySQL API

    参考地址:http://www.cnblogs.com/wupeiqi/articles/5095821.html

    Redis

    下载安装:http://redis.io/

    tar zxvf redis-3.0.7.tar.gz 
    cd redis-3.0.7
    make

    启动服务端:(默认端口号为6379)

    cd src/
    ./redis-server

    启动客户端:

    ./redis-cli

    常用操作:

    127.0.0.1:6379> keys *#查看所有键值
    (empty list or set)
    127.0.0.1:6379> set name ahaii#添加值
    OK
    127.0.0.1:6379> get name#根据键查看值
    "ahaii"
    127.0.0.1:6379> set name ahaii ex 5#设置值的有效时间
    OK
    127.0.0.1:6379> get name
    "ahaii"
    127.0.0.1:6379> get name
    (nil)

     Python操作Redis:

    安装redis模块:

    pip install redis

    基本操作:

    #!/usr/bin/python
    
    import redis
    
    r = redis.Redis(host='localhost',port=6379)
    r.set('name','gom')
    print r.get('name')

    连接池:

    可以设置一个连接池,使多个redis实例共享一个连接池,避免每次建立、释放连接的开销。

    #!/usr/bin/python
    
    import redis
    
    r = redis.Redis(host='localhost',port=6379)
    r.set('name','gom')
    print r.get('name')

    常用方法:

    set:

    set(name, value, ex=None, px=None, nx=False, xx=False)

    redis中设置值时,默认若不存在则创建该值,若存在则修改该值。

      ex:过期时间(秒)

      px:过期时间(毫秒)

      nx:如果设置为True,则只有name不存在时,当前set操作才执行

      xx:如果设置为True,则只有name存在时,岗前set操作才执行

    setnx(name,value):只有name不存在时,执行添加操作

    setex(name,value,time):设置值和过期时间(秒)

    psetex(name,value,time):设置值和过期时间(毫秒)

    mset(*args, **kwargs):批量设置,如

    mset(k1='v1', k2='v2')
        或
    mget({'k1': 'v1', 'k2': 'v2'})

    get:取值

    getrange:截取一段,类似列表切片

    import redis
    
    r = redis.Redis(host='localhost',port=6379)
    r.set('id','qwerty')
    print r.getrange('id',2,4)#截取从第2到4个字符(从0开始)
    
    
    #执行结果:ert

    其他操作参考以下博客:

    http://www.cnblogs.com/wupeiqi/articles/5132791.html

    http://www.jb51.net/article/56448.htm

    Redis订阅与发布

    Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。

    Redis 客户端可以订阅任意数量的频道。

    下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:

    当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:

    实例:

    创建名为redisChat的订阅频道:

    127.0.0.1:6379> SUBSCRIBE redisChar
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "redisChar"
    3) (integer) 1

    在另外一个redis客户端中发布消息:

    127.0.0.1:6379> PUBLISH redisChar 'hello ahaii'
    (integer) 1
    127.0.0.1:6379> PUBLISH redisChar 'learn redis'
    (integer) 1

    此时,第一个客户端中就会收到这两条消息:

    1) "message"
    2) "redisChar"
    3) "hello ahaii"
    1) "message"
    2) "redisChar"
    3) "learn redis"

    RabbitMQ:

    RabbitMQ 是信息传输的中间者。本质上,他从生产者(producers)接收消息,转发这些消息给消费者(consumers)。换句话说,他能够按根据你指定的规则进行消息转发、缓冲、和持久化。

    ConnectionFactory、Connection、Channel
    ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。ConnectionFactory为Connection的制造工厂。
    Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。

    RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)可以从Queue中获取消息并消费。

    多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

    RabbitMQ安装:

    sudo apt-get install rabbitmq-server

    启动服务:

    sudo /etc/init.d/rabbitmq-server start

    安装Python的RabbitMQ API:

    sudo easy_install pika

    生产者与消费者实例:

    生产者:

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))#指定主机
    
    channel = connection.channel()#生成一个管道
    
    channel.queue_declare(queue='hello')#在管道中创建一个队列
    
    channel.basic_publish(exchange='',routing_key='hello',body='hello ahaii')
    #exchange:交换器,消息首先到达exchange,exchange会根据队列的名字,将消息转发到指定的队列。
    #routing_key:指定队列名字
    #body:消息体
    
    print 'send hello ahaii'
    connection.close()

    消费者:

    import  pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()#创建管道
    
    channel.queue_declare(queue='hello')#创建队列
    #消费者创建队列的目的是,当生产者(程序)晚于消费者启动时,该队列依然存在(由消费者创建),这样避免因队列不存在而报错
    
    def callback(ch,method,properties,body):#回调函数,参数固定
        print 'recived %s' %body
    
    channel.basic_consume(callback,queue='hello',no_ack=True)#当从hello收到消息后,调用回调函数(callback),no_ack表示是否需要确认该任务正常执行完毕
    print 'waiting for messaage:'
    channel.start_consuming()#开始阻塞,接收任务

    消息持久化:

    将消息持久化后,服务重启后,消息依然存在。

    持久化参数:

    channel.queue_declare(queue='hello', durable=True)

    参考:http://blog.csdn.net/column/details/rabbitmq.html

  • 相关阅读:
    topcoder srm 320 div1
    topcoder srm 325 div1
    topcoder srm 330 div1
    topcoder srm 335 div1
    topcoder srm 340 div1
    topcoder srm 300 div1
    topcoder srm 305 div1
    topcoder srm 310 div1
    topcoder srm 315 div1
    如何统计iOS产品不同渠道的下载量?
  • 原文地址:https://www.cnblogs.com/ahaii/p/5341289.html
Copyright © 2011-2022 走看看