zoukankan      html  css  js  c++  java
  • RabbitMq与Redis的使用

    RabbitMQRedisMysql

    RabbitMQ 消息队列
    python里有threading QUEUE 只用于线程间交互,进程QUEUE 用于父进程与子进程或者是兄弟进程
    RabbitMQ采用消息轮询的方式发送消息。一个一个的给每个消费者
    应用之间使用socket实现数据共享
    链接几个应用的中间商著名的有:
    1. RabbitMQ 2. ZeroMQ 3. ActiveMQ
    RabbitMQ使用
    生产者:
    1. 引用pika模块
    import pika
    2. 建立socket
    connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    3. 声明一个管道
    channel=connection.channel()
    4. 在管道中声明队列
    channel.queue_declare(queue='队列名')
    5. 通过管道发送消息 rountingkey就是队列名字
    channel.basic_publish(exchange='',routing_key='队列名',body='Hello World!')
    6. 关闭队列,不用关闭管道
    connection.close()
    消费者(可能是其他机器,可以跨机器) 1. 引用pika模块
    import pika
    2. 建立socket
    connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    3. 声明一个管道
    channel=connection.channel()
    4. 在管道中声明队列
    channel.queue_declare(queue='队列名')
    5. 定义函数,标准的处理消息的函数都会带下面四个参数ch管道声明的对象地址,method包含你要发消息给谁的信息,properties包含发消息端的设置信息
    def callback(ch,method,properties,body): print(ch,method,properties,body) print("[x] Received %r"%body) ch.basic_ack(delivery_tag=method.delivery_tag)手动确认消息处理完,不然消息一直不销毁
    6. 消费消息,定义函数的目的,如果收到消息,就用定义的函数处理消息no
    ack参数消息确认,当为True时消息不等消费者确认消息队列就销毁消息,为False时需要等待消费者处理完消息的确认消息队消息队列才销毁消息(判断的是socket是否断开)
    channel.basic_consume('定义的函数名',queue='队列名',no_ack=True)
    7. 启动管道接收消息,启动后一直处于开启状态,没有消息就等待。
    channel.start_consuming()

    RabbitMQ消息分发轮询

    采用轮询的方式,依次的发给每个消费者
    生产者会等待消费者确定处理完消息的回复才会销毁消息。
    当消息执行到一半的时候,消费者断开,消息会保留发送给下一个消费者处理

    消息持久化

    消息必须等消费者手动确定后,才销毁ch.basicack(deliverytag=method.delivery_tag)手动确认消息处理完,不然消息一直不销毁
    当RabbitMQ服务停止,服务里的消息队列会销毁。
    如果想保持消息队列的持久化,必须在声明队列的时候设置,durable=True。这样当RabbitMQ服务断开再重启,消息队列还是存在,消息会销毁
    channel.queue_declare(queue='队列名',durable=True) 消息也持久化
    channel.basic_publish(exchange='',routing_key='队列名',body='Hello World!',properties=pika.BasicProperties(delivery_mode=2))

    广播模式

    消费者端加channel.basicqos(prefetchcount=1),加过这句话实现了,不是按顺序分发,而是看哪一个是空闲的,才分发给空闲的消费者消息。多大本事干多少活。
    广播是生产者发消息,所有消费者都收到。
    用exchange实现广播。 fanout:所有bind到此exchange的queue都可以接收消息
    direct:通过routingkey和exchange决定的那个唯一的queue可以接收消息
    topic:所有符合routingkey(此时可以是一个表达式)的routingkey所bind的queue可以接受消息。

    fanout纯广播

    设置管道的时候设置
    channel.exchange_declare(exchange='logs',type='fanout')
    不声明queue
    生产者:
    1. 引用pika模块
    import pika
    2. 建立socket
    connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    3. 声明一个管道
    channel=connection.channel()
    4. 设置管道的时候设置
    channel.exchange_declare(exchange='logs',exchange_type='fanout')
    5. 通过管道发送消息,广播不需要管道名
    channel.basic_publish(exchange='logs',routing_key='',body='Hello World!')
    6. 关闭队列,不用关闭管道
    connection.close()
    消费者(可能是其他机器,可以跨机器) 1. 引用pika模块
    import pika
    2. 建立socket
    connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    3. 声明一个管道
    channel=connection.channel()
    4. 设置管道的时候设置
    channel.exchange_declare(exchange='logs',exchange_type='fanout') 5. 生成随机queue与exchange转发器绑定。 ``` result=channel.queuedeclare(exclusive=True)exclusive排他的,单独的,生成随街queue,绑定再exchange上 queuename=result.method.queue
    channel.queuebind(exchange='logs',queue=queuename)绑定转发器,接受转发器里的消息,exchange与随机生成的queue绑定

    6. 定义函数,标准的处理消息的函数都会带下面四个参数ch管道声明的对象地址,method包含你要发消息给谁的信息,properties def callback(ch,method,properties,body): print(ch,method,properties,body) print("[x] Received %r"%body)
    ch.basicack(deliverytag=method.deliverytag)手动确认消息处理完,不然消息一直不销毁 ``` 7. 消费消息,定义函数的目的,如果收到消息,就用定义的函数处理消息noack参数消息确认,当为True时消息不等消费者确认消息队列就销毁消息,为False时需要等待消费者处理完消息的确认消息队消息队列才销毁消息(判断的是socket是否断开)
    channel.basic_consume('定义的函数名',queue='队列名',no_ack=True)
    8. 启动管道接收消息,启动后一直处于开启状态,没有消息就等待。
    channel.start_consuming()

    direct广播 info warning error 划分消息

    生产者:
    1. 引用pika模块
    import pika
    2. 建立socket
    connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    3. 声明一个管道
    channel=connection.channel()
    4. 设置管道的时候设置
    channel.exchange_declare(exchange='direct_logs',exchange_type='direct') 5. 接受分发消息的级别
    severity=sys.argv[1] if len(sys.argv)>1 else 'info' message=' '.join(sys.argv[2:]) or 'hello world!' 5. 通过管道发送消息,广播不需要管道名
    channel.basic_publish(exchange='direct_logs', routing_key=severity,#类似指定queue body=message)
    6. 关闭队列,不用关闭管道
    connection.close()
    消费者
    1. 引用pika模块
    import pika
    2. 建立socket
    connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    3. 声明一个管道
    channel=connection.channel()
    4. 设置管道的时候设置
    channel.exchange_declare(exchange='direct_logs',exchange_type='direct') 5. 生成随机queue与exchange转发器绑定。 ``` result=channel.queuedeclare(exclusive=True)exclusive排他的,单独的,生成随街queue,绑定再exchange上 queuename=result.method.queue
    severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error] " % sys.argv[0]) sys.exit(1)

    for severity in severities: channel.queuebind(exchange='directlogs',queue=queuename,routingkey=severity)

    6. 定义函数,标准的处理消息的函数都会带下面四个参数ch管道声明的对象地址,method包含你要发消息给谁的信息,properties def callback(ch,method,properties,body): print(ch,method,properties,body) print("[x] Received %r"%body)
    ch.basicack(deliverytag=method.deliverytag)手动确认消息处理完,不然消息一直不销毁 ``` 7. 消费消息,定义函数的目的,如果收到消息,就用定义的函数处理消息noack参数消息确认,当为True时消息不等消费者确认消息队列就销毁消息,为False时需要等待消费者处理完消息的确认消息队消息队列才销毁消息(判断的是socket是否断开)
    channel.basic_consume('定义的函数名',queue='队列名',no_ack=True)
    8. 启动管道接收消息,启动后一直处于开启状态,没有消息就等待。
    channel.start_consuming()

    topic广播

    更细致的消息过滤,包括应用程序,#收所有消息,.info接受带有.info的消息,mysql.接受带mysql的消息
    生产者
    ``` import pika import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel()

    channel.exchangedeclare(exchange='topiclogs', type='topic')

    routingkey = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basicpublish(exchange='topiclogs', routingkey=routingkey, body=message) print(" [x] Sent %r:%r" % (routingkey, message)) connection.close() 消费者 import pika import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel()

    channel.exchangedeclare(exchange='topiclogs', type='topic')

    result = channel.queuedeclare(exclusive=True) queuename = result.method.queue

    bindingkeys = sys.argv[1:] if not bindingkeys: sys.stderr.write("Usage: %s [binding_key]... " % sys.argv[0]) sys.exit(1)

    for bindingkey in bindingkeys: channel.queuebind(exchange='topiclogs', queue=queuename, routingkey=binding_key)

    print(' [*] Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body))

    channel.basicconsume(callback, queue=queuename, no_ack=True)

    channel.start_consuming()
    ```

    RabbitMQ rpc(remote procedure call)远程调用一个方法

    即使生产者又是消费者
    startcosuming为阻塞模式,rpc不用阻塞,rpc是执行一会这个再去执行另一个。processdata_events()非阻塞方法,能收到消息就收,没有消息不阻塞继续往下执行
    服务器端
    ```

    coding:utf-8

    author = 'Alex Li' import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))

    channel = connection.channel()

    channel.queuedeclare(queue='rpcqueue')

    def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2)

    def on_request(ch, method, props, body): n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',

    routing_key=props.reply_to,

    properties=pika.BasicProperties(correlation_id = <br/>
    props.correlation_id),

    body=str(response))

    ch.basic_ack(delivery_tag = method.delivery_tag)


    channel.basicqos(prefetchcount=1) channel.basic_consume(onrequest, queue='rpcqueue')

    print(" [x] Awaiting RPC requests") channel.start_consuming() 客户端 import pika import uuid

    class FibonacciRpcClient(object): def init(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))

        self.channel = self.connection.channel()

    result = self.channel.queue_declare(exclusive=True)
    self.callback_queue = result.method.queue
    self.channel.basic_consume(self.on_response, no_ack=True,<br/>
                               queue=self.callback_queue)<br/>
    

    def on_response(self, ch, method, props, body):

    if self.corr_id == props.correlation_id:

    self.response = body

    def call(self, n):

    self.response = None

    self.corr_id = str(uuid.uuid4())

    self.channel.basic_publish(exchange='',

    routing_key='rpc_queue',

    properties=pika.BasicProperties(

    replyto = self.callbackqueue,#客户端发送消息是,把接收返回消息的管道也告诉给服务器端 correlationid = self.corrid,#用于判断服务器端返回的结果和我的请求是否是同一条。目的就是为了客户端可以同时发两条消息,当服务器端返回结果时,需要有判断关于哪条请求的结果 ), body=str(n)) while self.response is None: self.connection.processdataevents() return int(self.response)

    fibonacci_rpc = FibonacciRpcClient()

    print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response) ```

    Redis

    缓存中间商,用socket,例如:mongodb,redis,memcache
    * 连接方式
    * 连接池
    * 操作
    - String 操作
    - Hash 操作
    - List 操作
    - Set 操作
    - Sort Set 操作
    * 管道
    * 发布订阅

    String操作
    1. set(name, value, ex=None, px=None, nx=False, xx=False)
      在Redis中设置值,默认,不存在则创建,存在则修改
      参数:
      ex,过期时间(秒)
      px,过期时间(毫秒)
      nx,如果设置为True,则只有name不存在时,当前set操作才执行
      xx,如果设置为True,则只有name存在时,岗前set操作才执行
    2. setnx(name, value) 设置值,只有name不存在时,执行设置操作(添加)
    3. setex(name, value, time) 设置值 参数:time,过期时间(数字秒 或 timedelta对象)
    4. psetex(name, timems, value) 设置值 参数:timems,过期时间(数字毫秒 或 timedelta对象)
    5. mset(*args, **kwargs) 批量设置值 如:mset(k1='v1', k2='v2')或mget({'k1': 'v1', 'k2': 'v2'})
    6. get(name) 获取值
    7. mget(keys, *args) 批量获取 如:mget('ylr', 'wupeiqi')或r.mget(['ylr', 'wupeiqi'])
    8. getset(name, value) 设置新值并获取原来的值
    9. getrange(key, start, end) 获取子序列(根据字节获取,非字符) 参数: name,Redis 的 name start,起始位置(字节) end,结束位置(字节) 如: "武沛齐" ,0-3表示 "武"
    10. setrange(name, offset, value) 修改字符串内容,从指定字符串索引开始向后替换(新值太长时,则向后添加) 参数: offset,字符串的索引,字节(一个汉字三个字节) value,要设置的值
    11. setbit(name, offset, value) BITCOUNT 统计二进制有多少个1 对name对应值的二进制表示的位进行操作 参数: name,redis的name offset,位的索引(将值变换成二进制后再进行索引) value,值只能是 1 或 0 注:如果在Redis中有一个对应: n1 = "foo", 那么字符串foo的二进制表示为:01100110 01101111 01101111 所以,如果执行 setbit('n1', 7, 1),则就会将第7位设置为1, 那么最终二进制则变成 01100111 01101111 01101111,即:"goo" 扩展,转换二进制表示: source = "武沛齐" source = "foo" for i in source: num = ord(i) print bin(num).replace('b','') 特别的,如果source是汉字 "武沛齐"怎么办? 答:对于utf-8,每一个汉字占 3 个字节,那么 "武沛齐" 则有 9个字节 对于汉字,for循环时候会按照 字节 迭代,那么在迭代时,将每一个字节转换 十进制数,然后再将十进制数转换成二进制 11100110 10101101 10100110 11100110 10110010 10011011 11101001 10111101 10010000 -------------------------- ----------------------------- ----------------------------- 武 沛 齐
    12. getbit(name, offset) 获取name对应的值的二进制表示中的某位的值 (0或1)
    13. bitcount(key, start=None, end=None) 获取name对应的值的二进制表示中 1 的个数 参数: key,Redis的name start,位起始位置 end,位结束位置
    14. bitop(operation, dest, *keys) 获取多个值,并将值做位运算,将最后的结果保存至新的name对应的值 参数: operation,AND(并) 、 OR(或) 、 NOT(非) 、 XOR(异或) dest, 新的Redis的name *keys,要查找的Redis的name 如: bitop("AND", 'newname', 'n1', 'n2', 'n3') 获取Redis中n1,n2,n3对应的值,然后讲所有的值做位运算(求并集),然后将结果保存 newname 对应的值中
    15. strlen(name) 返回name对应值的字节长度(一个汉字3个字节)
    16. incr(self, name, amount=1) 自增 name对应的值,当name不存在时,则创建name=amount,否则,则自增。 参数: name,Redis的name amount,自增数(必须是整数) 注:同incrby
    17. incrbyfloat(self, name, amount=1.0) 自增 name对应的值,当name不存在时,则创建name=amount,否则,则自增。 参数: name,Redis的name amount,自增数(浮点型)
    18. decr(self, name, amount=1) 自减 name对应的值,当name不存在时,则创建name=amount,否则,则自减。 参数: name,Redis的name amount,自减数(整数)
    19. append(key, value) 在redis name对应的值后面追加内容 参数: key, redis的name value, 要追加的字符串   
  • 相关阅读:
    如何提高工作效率,重复利用时间
    好记性不如烂笔头
    如何应对面试中关于“测试框架”的问题
    通宵修复BUG的思考
    工作方法的思考
    别认为那是一件简单的事情
    开发人员需要熟悉缺陷管理办法
    不了解系统功能的思考
    如何布置任务
    事事有回音
  • 原文地址:https://www.cnblogs.com/dcotorbool/p/8508703.html
Copyright © 2011-2022 走看看