zoukankan      html  css  js  c++  java
  • python16_day11【MQ、Redis、Memcache】

    一、RabbitMQ

      是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

    MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

      1.RabbitMQ install

    1 安装配置epel源
    2    $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
    3  
    4 安装erlang
    5    $ yum -y install erlang
    6  
    7 安装RabbitMQ
    8    $ yum -y install rabbitmq-server
      

    注意:service rabbitmq-server start/stop

      2. Python API install  

    1 pip install pika
    2 or
    3 easy_install pika
    4 or
    5 源码
    6 https://pypi.python.org/pypi/pika

      3.基于QUEUE实现生产消费模型

     1 import Queue
     2 import threading
     3 
     4 
     5 message = Queue.Queue(10)
     6 
     7 
     8 def producer(i):
     9     while True:
    10         message.put(i)
    11 
    12 
    13 def consumer(i):
    14     while True:
    15         msg = message.get()
    16 
    17 
    18 for i in range(12):
    19     t = threading.Thread(target=producer, args=(i,))
    20     t.start()
    21 
    22 for i in range(10):
    23     t = threading.Thread(target=consumer, args=(i,))
    24     t.start()

      4.基于RabbitMQ

      对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。

     1 import pika
     2  
     3 # ######################### 生产者 #########################
     4  
     5 connection = pika.BlockingConnection(pika.ConnectionParameters(
     6         host='localhost'))
     7 channel = connection.channel()
     8  
     9 channel.queue_declare(queue='hello')
    10  
    11 channel.basic_publish(exchange='',
    12                       routing_key='hello',
    13                       body='Hello World!')
    14 print(" [x] Sent 'Hello World!'")
    15 connection.close()
    16 
    17 
    18  
    19 # ########################## 消费者 ##########################
    20 import pika
    21 connection = pika.BlockingConnection(pika.ConnectionParameters(
    22         host='localhost'))
    23 channel = connection.channel()
    24  
    25 channel.queue_declare(queue='hello')
    26  
    27 def callback(ch, method, properties, body):
    28     print(" [x] Received %r" % body)
    29  
    30 channel.basic_consume(callback,
    31                       queue='hello',
    32                       no_ack=True)
    33  
    34 print(' [*] Waiting for messages. To exit press CTRL+C')
    35 channel.start_consuming()

      5.消费者ack 

     1 import pika
     2 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672))
     3 channel = connection.channel()
     4 channel.queue_declare(queue='hello')
     5 
     6 
     7 def callback(ch, method, properties, body):
     8     print(" [x] Received %r" % body)
     9 
    10 channel.basic_consume(callback, queue='hello', no_ack=False)
    11 # no_ack: acknowledgment 消息不丢失,MQ判读出现异常,没有消费,没有ack,则把消息放回队列.
    12 channel.start_consuming()
    消息ack

      6.durable消息持久化

     1 import pika
     2 
     3 connection = pika.BlockingConnection(pika.ConnectionParameters(
     4         host='127.0.0.1', port=5672))
     5 channel = connection.channel()
     6 
     7 channel.queue_declare(queue='hello1', durable=True)        # 创建通道, 持久化修改1:durable=True
     8 
     9 channel.basic_publish(exchange='',
    10                       routing_key='hello',
    11                       body='Hello World!',
    12                       properties=pika.BasicProperties(delivery_mode=2)  # 持久化修改2
    13                       )
    14 connection.close()
    生产者
     1 import pika
     2 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672))
     3 channel = connection.channel()
     4 
     5 
     6 def callback(ch, method, properties, body):
     7     print(" [x] Received %r" % body)
     8     import time
     9     time.sleep(10)
    10     print('ok')
    11     ch.basic_ack(delivery_tag=method.delivery_tag)  # 持久化:修改2
    12 
    13 channel.basic_consume(callback,
    14                       queue='hello',
    15                       no_ack=False)         # 持久化:修改1
    16 
    17 channel.start_consuming()
    消费者

      7.消息获取顺序

     1 import pika
     2 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672))
     3 channel = connection.channel()
     4 
     5 
     6 def callback(ch, method, properties, body):
     7     print(" [x] Received %r" % body)
     8     import time
     9     time.sleep(10)
    10     print('ok')
    11     ch.basic_ack(delivery_tag=method.delivery_tag)
    12 channel.basic_qos(prefetch_count=1)         # 默认消息队列里的数据是按照顺序被消费者拿走,
    13                                             # 例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。
    14                                             # 表示谁来谁取,不再按照奇偶数排列
    15 channel.basic_consume(callback,
    16                       queue='hello',
    17                       no_ack=False)
    18 
    19 channel.start_consuming()
    消费者

      8.发布订阅

      exchange type = fanout

     1 import pika
     2 import sys
     3 
     4 connection = pika.BlockingConnection(pika.ConnectionParameters(
     5         host='localhost'))
     6 channel = connection.channel()
     7 
     8 channel.exchange_declare(exchange='logs',
     9                          type='fanout')
    10 
    11 message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    12 channel.basic_publish(exchange='logs',
    13                       routing_key='',
    14                       body=message)
    15 print(" [x] Sent %r" % message)
    16 connection.close()
    发布者
     1 import pika
     2 
     3 connection = pika.BlockingConnection(pika.ConnectionParameters(
     4         host='localhost'))
     5 channel = connection.channel()
     6 
     7 channel.exchange_declare(exchange='logs',
     8                          type='fanout')
     9 
    10 result = channel.queue_declare(exclusive=True)
    11 queue_name = result.method.queue
    12 
    13 channel.queue_bind(exchange='logs',
    14                    queue=queue_name)
    15 
    16 print(' [*] Waiting for logs. To exit press CTRL+C')
    17 
    18 
    19 def callback(ch, method, properties, body):
    20     print(" [x] %r" % body)
    21 
    22 channel.basic_consume(callback,
    23                       queue=queue_name,
    24                       no_ack=True)
    25 
    26 channel.start_consuming()
    订阅者

      9.发布订阅(关键字)

      exchange type = direct

     1 import pika
     2 import sys
     3 
     4 connection = pika.BlockingConnection(pika.ConnectionParameters(
     5         host='localhost'))
     6 channel = connection.channel()
     7 
     8 channel.exchange_declare(exchange='direct_logs',
     9                          type='direct')
    10 
    11 severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    12 message = ' '.join(sys.argv[2:]) or 'Hello World!'
    13 channel.basic_publish(exchange='direct_logs',
    14                       routing_key=severity,
    15                       body=message)
    16 print(" [x] Sent %r:%r" % (severity, message))
    17 connection.close()
    发布者
     1 import pika
     2 import sys
     3 
     4 connection = pika.BlockingConnection(pika.ConnectionParameters(
     5         host='localhost'))
     6 channel = connection.channel()
     7 
     8 channel.exchange_declare(exchange='direct_logs',
     9                          type='direct')
    10 
    11 result = channel.queue_declare(exclusive=True)
    12 queue_name = result.method.queue
    13 
    14 severities = sys.argv[1:]
    15 if not severities:
    16     sys.stderr.write("Usage: %s [info] [warning] [error]
    " % sys.argv[0])
    17     sys.exit(1)
    18 
    19 for severity in severities:
    20     channel.queue_bind(exchange='direct_logs',
    21                        queue=queue_name,
    22                        routing_key=severity)
    23 
    24 print(' [*] Waiting for logs. To exit press CTRL+C')
    25 
    26 def callback(ch, method, properties, body):
    27     print(" [x] %r:%r" % (method.routing_key, body))
    28 
    29 channel.basic_consume(callback,
    30                       queue=queue_name,
    31                       no_ack=True)
    32 
    33 channel.start_consuming()
    订阅者

      10.发布订阅(模糊匹配)

      exchange type = topic

    在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

    • # 表示可以匹配 0 个 或 多个 单词
    • *  表示只能匹配 一个 单词
    发送者路由值              队列中
    old.boy.python          old.*  -- 不匹配
    old.boy.python          old.#  -- 匹配
     1 import pika
     2 import sys
     3 
     4 connection = pika.BlockingConnection(pika.ConnectionParameters(
     5         host='localhost'))
     6 channel = connection.channel()
     7 
     8 channel.exchange_declare(exchange='topic_logs',
     9                          type='topic')
    10 
    11 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    12 message = ' '.join(sys.argv[2:]) or 'Hello World!'
    13 channel.basic_publish(exchange='topic_logs',
    14                       routing_key=routing_key,
    15                       body=message)
    16 print(" [x] Sent %r:%r" % (routing_key, message))
    17 connection.close()
    发布者
     1 import pika
     2 import sys
     3 
     4 connection = pika.BlockingConnection(pika.ConnectionParameters(
     5         host='localhost'))
     6 channel = connection.channel()
     7 
     8 channel.exchange_declare(exchange='topic_logs',
     9                          type='topic')
    10 
    11 result = channel.queue_declare(exclusive=True)
    12 queue_name = result.method.queue
    13 
    14 binding_keys = sys.argv[1:]
    15 if not binding_keys:
    16     sys.stderr.write("Usage: %s [binding_key]...
    " % sys.argv[0])
    17     sys.exit(1)
    18 
    19 for binding_key in binding_keys:
    20     channel.queue_bind(exchange='topic_logs',
    21                        queue=queue_name,
    22                        routing_key=binding_key)
    23 
    24 print(' [*] Waiting for logs. To exit press CTRL+C')
    25 
    26 def callback(ch, method, properties, body):
    27     print(" [x] %r:%r" % (method.routing_key, body))
    28 
    29 channel.basic_consume(callback,
    30                       queue=queue_name,
    31                       no_ack=True)
    32 
    33 channel.start_consuming()
    订阅者

    二、Memcached

      1.安装API

        pip3 install python-memcached

      2.基本使用

    import memcache
     
    mc = memcache.Client(['10.211.55.4:12000'], debug=True)
    mc.set("foo", "bar")
    ret = mc.get('foo')
    print ret
    

      3.支持集群

    • 根据算法将 k1 转换成一个数字
    • 将数字和主机列表长度求余数,得到一个值 N( 0 <= N < 列表长度 )
    • 在主机列表中根据 第2步得到的值为索引获取主机,例如:host_list[N]
    • 连接 将第3步中获取的主机,将 k1 = "v1" 放置在该服务器的内存中
    mc = memcache.Client([('1.1.1.1:12000', 1), ('1.1.1.2:12000', 2), ('1.1.1.3:12000', 1)], debug=True)
     
    mc.set('k1', 'v1')
    

      4.add命令

      添加一条键值对,如果已经存在的 key,重复执行add操作异常

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import memcache
     
    mc = memcache.Client(['10.211.55.4:12000'], debug=True)
    mc.add('k1', 'v1')
    # mc.add('k1', 'v2') # 报错,对已经存在的key重复添加,失败!!!
    

      5.replace命令

      replace 修改某个key的值,如果key不存在,则异常

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import memcache
     
    mc = memcache.Client(['10.211.55.4:12000'], debug=True)
    # 如果memcache中存在kkkk,则替换成功,否则一场
    mc.replace('kkkk','999')
    

      6.set 和 set_multi

      set            设置一个键值对,如果key不存在,则创建,如果key存在,则修改!
      set_multi   设置多个键值对,如果key不存在,则创建,如果key存在,则修改!

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import memcache
     
    mc = memcache.Client(['10.211.55.4:12000'], debug=True)
     
    mc.set('key0', 'wupeiqi')
     
    mc.set_multi({'key1': 'val1', 'key2': 'val2'})
    

      7.delete 和 delete_multi

      delete             在Memcached中删除指定的一个键值对
      delete_multi    在Memcached中删除指定的多个键值对

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import memcache
     
    mc = memcache.Client(['10.211.55.4:12000'], debug=True)
     
    mc.delete('key0')
    mc.delete_multi(['key1', 'key2'])
    

      8.get 和 get_multi

      get            获取一个键值对
      get_multi   获取多一个键值对

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import memcache
     
    mc = memcache.Client(['10.211.55.4:12000'], debug=True)
     
    val = mc.get('key0')
    item_dict = mc.get_multi(["key1", "key2", "key3"])
    

      9.append 和 prepend

      append    修改指定key的值,在该值 后面 追加内容
      prepend   修改指定key的值,在该值 前面 插入内容

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import memcache
     
    mc = memcache.Client(['10.211.55.4:12000'], debug=True)
    # k1 = "v1"
     
    mc.append('k1', 'after')
    # k1 = "v1after"
     
    mc.prepend('k1', 'before')
    # k1 = "beforev1after"
    

      10.decr 和 incr

      incr  自增,将Memcached中的某一个值增加 N ( N默认为1 )
      decr 自减,将Memcached中的某一个值减少 N ( N默认为1 )

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import memcache
     
    mc = memcache.Client(['10.211.55.4:12000'], debug=True)
    mc.set('k1', '777')
     
    mc.incr('k1')
    # k1 = 778
     
    mc.incr('k1', 10)
    # k1 = 788
     
    mc.decr('k1')
    # k1 = 787
     
    mc.decr('k1', 10)
    # k1 = 777
    

      11.gets 和 cas

       如商城商品剩余个数,假设改值保存在memcache中,product_count = 900

      A用户刷新页面从memcache中读取到product_count = 900
      B用户刷新页面从memcache中读取到product_count = 900

      如果A、B用户均购买商品

      A用户修改商品剩余个数 product_count=899
      B用户修改商品剩余个数 product_count=899

      如此一来缓存内的数据便不在正确,两个用户购买商品后,商品剩余还是 899
      如果使用python的set和get来操作以上过程,那么程序就会如上述所示情况!

      如果想要避免此情况的发生,只要使用 gets 和 cas 即可,如:

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import memcache
    mc = memcache.Client(['10.211.55.4:12000'], debug=True, cache_cas=True)
     
    v = mc.gets('product_count')
    # ...
    # 如果有人在gets之后和cas之前修改了product_count,那么,下面的设置将会执行失败,剖出异常,从而避免非正常数据的产生
    mc.cas('product_count', "899")
    

      Ps:本质上每次执行gets时,会从memcache中获取一个自增的数字,通过cas去修改gets的值时,会携带之前获取的自增值和memcache中的自增值进行比较,如果相等,则可以提交,如果不想等,那表示在gets和cas执行之间,又有其他人执行了gets(获取了缓冲的指定值), 如此一来有可能出现非正常数据,则不允许修改。

    三、Redis

      1.安装API

        pip3 install redis

      2.功能介绍

    • 连接方式
    • 连接池
    • 操作
      • String 操作
      • Hash 操作
      • List 操作
      • Set 操作
      • Sort Set 操作
    • 管道
    • 发布订阅

      3.基本操作

    import redis
     
    r = redis.Redis(host='10.211.55.4', port=6379)
    r.set('foo', 'Bar')
    print r.get('foo')
    

      4.连接池

      redis-py使用connection pool来管理对一个redis server的所有连接,避免每次建立、释放连接的开销。默认,每个Redis实例都会维护一个自己的连接池。可以直接建立一个连接池,然后作为参数Redis,这样就可以实现多个Redis实例共享一个连接池。

    import redis
     
    pool = redis.ConnectionPool(host='10.211.55.4', port=6379)
     
    r = redis.Redis(connection_pool=pool)
    r.set('foo', 'Bar')
    print r.get('foo')
    

      5.操作

        参考:http://www.cnblogs.com/wupeiqi/articles/5132791.html

  • 相关阅读:
    Infopath Notify 弹出提示信息
    window.showModalDialog 返回值
    【转】获得正文内容中的所有img标签的图片路径
    Json Datable Convert
    Sharepoint 列表 附件 小功能
    Surgey 权限更改
    SQL 触发器用于IP记录转换
    Caml语句 查询分配给当前用户及当前组
    jquery 1.3.2 auto referenced when new web application in VSTS2010(DEV10)
    TFS diff/merge configuration
  • 原文地址:https://www.cnblogs.com/weibiao/p/6664553.html
Copyright © 2011-2022 走看看