zoukankan      html  css  js  c++  java
  • 消息队列-rabbitMQ

    消息队列两个用处:服务间解耦,缓解压力(削峰平谷),以前用过ZMQ、狼厂内部的NMQ,现在接触了java开源的kafka和RabbitMQ。目前先不求甚解,有个大概的认识。

    RabbitMQ的安装和入门例子见http://www.rabbitmq.com/,挺全的。安装前需要安装erlang,启动方便。

    RabbitMQ是一个强壮的消息队列,安装使用都很容易,支持常用的发布/订阅、消息分发功能。

    架构


    exchange是消息队列的核心,producer只向exchange发送消息,exchange根据routing key向不同的绑定队列发送消息,consumer从各自的队列接收消息。

    使用实例(python):

    producer

    import pika
    import sys
    #producer/consumer通用,与rabbitmq建立连接
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='logs', type='fanout')#声明创建一个名为logs的exchange,类型为fanout,type有fanout/direct/topic/headers
    
    message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    channel.basic_publish(exchange='logs', routing_key='', body=message) #p向某个exchange发送消息,并指出路由key
    print " [x] Sent %r" % (message,)
    connection.close()

    consumer

    #同上,建立连接、声明exchange
    result = channel.queue_declare(exclusive=True)#声明一个队列,名字由系统产生,exclusive指连接断开后删除队列,queue_declare(queue="xx")可以自己命名队列
    queue_name = result.method.queue
    channel.queue_bind(exchange='logs', queue=queue_name) #将队列与名为logs的exchange绑定,不指定exchange则与默认的exchange绑定
    
    print ' [*] Waiting for logs. To exit press CTRL+C'
    
    def callback(ch, method, properties, body):
        print " [x] %r" % (body,)
    
    channel.basic_consume(callback, queue=queue_name, no_ack=True) #consumer接收消息不需向rabbit发送确认
    channel.start_consuming()
    
    

    消息的负载均衡

    • round-robin dispatching:多个consumer订阅同一个队列,会按消息到来的顺序逐个分发给consumers,确保消息个数的平衡
    • fair dispatch:每个consumer处理资源不同(cpu/memory)、每条消息消耗的时间也不同,按照条数平均分配不能充分利用consumer资源。
    #解决办法,consumer加如下代码
    channel.basic_qos(prefetch_count=1)#每次向consumer发一条消息,直到收到consumer的确认再发下一条
    #consumer每次处理完信息后向rabbitmq发送确认。在callback函数结尾处加上ch.basic_ack(delivery_tag=method.delivery_tag)
    channel.basic_consume(callback,queue=queue_name)#默认no_ack=false,rabbitmq会一直等待直到收到确认消息
    

    术语解释

    Broker:简单来说就是消息队列服务器实体。
    Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
    Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
    Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
    Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
    vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
    producer:消息生产者,就是投递消息的程序。
    consumer:消息消费者,就是接受消息的程序。
    channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

    通信协议AMQP(Advanced Message Queuing Protocol)

    AMQP模型中,消息在producer中产生,发送到MQ的exchange上,exchange根据配置的路由方式发到相应的Queue上,Queue又将消息发送给consumer,消息从queue到consumer有push和pull两种方式。 消息队列的使用过程大概如下:

    • 客户端连接到消息队列服务器,打开一个channel。
    • 客户端声明一个exchange,并设置相关属性。
    • 客户端声明一个queue,并设置相关属性。
    • 客户端使用routing key,在exchange和queue之间建立好绑定关系。
    • 客户端投递消息到exchange。

    exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。

    exchange支持的分发类型

    • fanout:向所有队列广播消息,不会根据routing-key来分发
    • direct: 根据routing-key分发不同的消息到相应的订阅队列
    #如上文
    #p:
    channel.basic_public(exchange='logs',routing_key='error',body=message)
    #c:订阅routing_key为error的消息,一个队列可以绑定多次,订阅多个routing_key
    channel.queue_bind(exchange='logs',queue=queue_name, routing_key='error')
    • topic: toipc是direct的升级版,routing-key支持通配符匹配。* 一个词,#0或多个词,如*.orange会接收a.orange/b.orange消息,#.organge会接收a.b.orange/a.orange消息。一个单#,该队列会接收所有的消息,等同于fanout
    • headers
  • 相关阅读:
    获取其他线程的数据用 queue, 多进程Q
    self: 限制并发量asyncio
    asyncio 中给running 的loop 动态添加 Future Task
    雾里看花之 Python Asyncio
    python协程之动态添加任务
    异步IO( asyncio) 协程
    加快phpstorm、rubymine、pycharm系列IDE运行速度的方法
    scrapy 'fcntl' has no attribute 'F_GETFD
    sitemap index
    Django模板系统 运算
  • 原文地址:https://www.cnblogs.com/whuqin/p/4981966.html
Copyright © 2011-2022 走看看