zoukankan      html  css  js  c++  java
  • RabbitMQ的使用总结

    RabbitMQ介绍

    这里写图片描述
    说明:

    Consumer (消费者):使用队列 Queue 从 Exchange 中获取消息的应用。

    Exchange (交换机):负责接收生产者的消息并把它转到到合适的队列。

    Queue (队列):一个存储Exchange 发来的消息的缓冲,并将消息主动发送给Consumer,或者 Consumer 主动来获取消息。

    Binding (绑定):队列 和 交换机 之间的关系。Exchange 根据消息的属性和 Binding 的属性来转发消息。绑定的一个重要属性是 binding_key。

    Connection (连接)和 Channel (通道):生产者和消费者需要和 RabbitMQ 建立 TCP 连接。一些应用需要多个connection,为了节省TCP 连接,可以使用 Channel,它可以被认为是一种轻型的共享 TCP 连接的连接。连接需要用户认证,并且支持 TLS (SSL)。连接需要显式关闭。

    Message (消息): RabbitMQ 转发的二进制对象,包括Headers(头)、Properties (属性)和 Data (数据),其中数据部分不是必要的。Producer(生产者): 消息的生产者,负责产生消息并把消息发到交换机


    RabbitMQ安装

    第一种安装方式:
    官网下载地址:https://www.rabbitmq.com/download.html

    第二种安装方式:
    使用APT库

     deb http://www.rabbitmq.com/debian/ testing main
     wget http://www.rabbitmq.com/rabbitmq-signing-key-public.asc
     sudo apt-key add rabbitmq-signing-key-public.asc
     apt-get update
     sudo apt-get install rabbitmq-server

    Python操作RabbitMQ

    实现简单消息队列:

    这里写图片描述
    图解:生产者(producer)把消息发送到一个名为“hello”的队列中。消费者(consumer)从这个队列中获取消息。

    生产者(producer.py)全部代码

    #!/usr/bin/env python
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.queue_declare(queue='hello')
    
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body='Hello World!')
    print " [x] Sent 'Hello World!'"
    connection.close()

    消费者(consumer.py)全部代码

    #!/usr/bin/env python
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.queue_declare(queue='hello')
    
    print ' [*] Waiting for messages. To exit press CTRL+C'
    
    def callback(ch, method, properties, body):
        print " [x] Received %r" % (body,)
    
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=True)
    
    channel.start_consuming()

    首先在终端中运行我们的 producer.py 程序:

    $ python producer.py
    [x] Sent 'Hello World!'

    然后,运行 consumer.py 程序

    $ python consumer.py
    [*] Waiting for messages. To exit press CTRL+C
    [x] Received 'Hello World!'

    工作队列(任务队列)

    这里写图片描述

    图解:当我们把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。

    工作队列(任务队列-Task Queues)是为了避免等待一些占用大量资源、时间的操作,它会发送一些耗时的任务给多个工作者(Worker)。

    这个概念在网络应用中是非常有用的,它可以在短暂的HTTP请求中处理一些复杂的任务。

    注意:此处使用默认交换机

    生产者(producer.py)全部代码

    #coding: utf-8
    '''
    循环调度:
    使用工作队列的一个好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了,扩展很简单。
    若有多个消费者,即打开多个终端,运行消费者程序。默认来说,RabbitMQ会按顺序得把消息发送给每个消费者(consumer)。平均每个消费者都会收到同等数量得消息。这种发送消息得方式叫做——轮询(round-robin)。
    '''
    
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    channel.queue_declare(queue='task_queue', durable=True) #durable=True 表示为了不让队列消失,需要把队列声明为持久化
    
    message = ' '.join(sys.argv[1:]) or "Hello World!"
    channel.basic_publish(exchange='',
                          routing_key='task_queue',
                          body=message,
                          properties=pika.BasicProperties(
                            delivery_mode = 2,    #我们需要把我们的消息也要设为持久化——将delivery_mode的属性设为2。
                          ))
    
    print(" [x] Sent %r" % (message,))
    connection.close()

    消费者(consumer.py)全部代码

    #coding: utf-8
    import pika
    import time
    #使用time.sleep()函数来模拟占用大量资源、时间的操作
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    
    #如果你没有特意告诉RabbitMQ,那么在它退出或者崩溃的时候,将会丢失所有队列和消息。为了确保信息不会丢失,有两个事情是需要注意的:我们必须把“队列”和“消息”设为持久化。
    channel.queue_declare(queue='task_queue', durable=True) #durable=True 表示为了不让队列消失,需要把队列声明为持久化
                                                            #这个queue_declare必须在生产者(producer)和消费者(consumer)对应的代码中修改。
                                                            #这时候,我们就可以确保在RabbitMq重启之后queue_declare队列不会丢失。
    print(" [*] Waiting for message. To exit press CTRL+C")
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % (body,))
        #time.sleep(body.count('.'))
        print(" [x] Done")
    
        #一个很容易犯的错误就是忘了basic_ack,后果很严重。
        #消息在你的程序退出之后就会重新发送,如果它不能够释放没响应的消息,RabbitMQ就会占用越来越多的内存。
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    channel.basic_qos(prefetch_count=1)   #我们可以使用basic.qos方法,并设置prefetch_count=1。
                                          #这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)。
    channel.basic_consume(callback, queue='task_queue', no_ack=False)  # no_ack=False 默认值,表示消息响应是开启的
    channel.start_consuming()

    发布/订阅

    这里写图片描述

    图解:为了描述这种模式,我们将会构建一个简单的日志系统。它包括两个程序——第一个程序负责发送日志消息,第二个程序负责获取消息并输出内容。在这个日志系统中,所有正在运行的接收方程序都会接受消息。用其中一个消费者或者接收者把日志写入硬盘中,另外一个消费者或者接受者把日志输出到屏幕上。最终,日志消息被广播给所有的消费者或者接受者。

    注意:此处使用扇形交换机

    生产者(producer.py)全部代码

    #!/usr/bin/env python
    import pika
    import sys
    
    connection =pika.BlockingConnection(
                            pika.ConnectionParameters(host='localhost')
                            )
    
    channel = connection.channel()
    
    channel.exchange_declare(exchange='logs', type='fanout')
    
    message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    channel.basic_publish(exchange='logs',
                          routing_key='',
                          body=message
                          )
    print (" [x] Sent %r" % (message,))
    connection.close()

    消费者(consumer.py)全部代码

    #!/usr/bin/env python
    import pika
    
    connection = pika.BlockingConnection(
                            pika.ConnectionParameters(host='localhost')
                            )
    
    channel = connection.channel()
    
    channel.exchange_declare(exchange='logs', type='fanout')  #扇型交换机
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    channel.queue_bind(exchange='logs', queue=queue_name)
    
    print (' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print " [x] %r" % (body,)
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True
                          )
    
    channel.start_consuming()

    运行如下命令:

    首先,运行消费者(consumer.py)命令

    python3 consumer.py > consumer.log  #把日志保存到文件中
    
    python3 consumer.py    #在屏幕中查看日志

    然后,运行生产者(producer.py)命令

    python3 producer.py

    发布/订阅(改进版)

    下图能够很好的描述这个场景:

    这里写图片描述
    图解:在这个场景中,可以看到直连交换机 X和两个队列进行了绑定。第一个队列使用orange作为绑定键,第二个队列有两个绑定,一个使用black作为绑定键,另外一个使用green。这样以来,当路由键为orange的消息发布到交换机,就会被路由到队列Q1。路由键为black或者green的消息就会路由到Q2。其他的所有消息都将会被丢弃。

    发布/订阅(改进版)
    这里写图片描述

    注意:此处使用直接交换机

    生产者(producer.py)或发送者全部代码

    #!/usr/bin/env python
    import pika
    import sys
    '''
    发送者日志:把消息发送到一个直连交换机,把日志级别作为路由键。这样接收日志的脚本就可以根据严重级别来选择它想要处理的日志。
    '''
    connection = pika.BlockingConnection(
                            pika.ConnectionParameters(host='localhost')
                            )
    
    channel = connection.channel()
    
    channel.exchange_declare(exchange='direct_logs', type='direct')
    
    severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='direct_logs',
                          routing_key=severity,   #此处routing_key为路由键
                          body=message
                          )
    
    print (" [x] Sent %r:%r" % (severity, message))
    connection.close()

    消费者(consumer.py)或接收者全部代码

    #!/usr/bin/env python
    import pika
    import sys
    
    '''
    为每个严重级别分别创建一个新的绑定
    '''
    
    connection = pika.BlockingConnection(
                        pika.ConnectionParameters(host='localhost')
                        )
    
    channel = connection.channel()
    
    channel.exchange_declare(exchange='direct_logs', type='direct')
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    severities = sys.argv[1:]
    if not severities:
        print >> sys.stderr, "Usage: %s [info] [warning] [error]" % 
                             (sys.argv[0],)
        sys.exit(1)
    
    for severity in severities:
        channel.queue_bind(exchange='direct_logs',
                           queue=queue_name,
                           routing_key=severity  #此处routing_key为绑定键
                           )
    
    print (' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print (" [x] %r:%r" % (method.routing_key, body,))
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()

    运行如下命令:

    首先,运行消费者(consumer.py)或者发送者命令

    python3 consumer.py warning error > consumer.log #此命令只是保存warningerror级别的日志到磁盘
    
     python3 consumer.py info warning error

    然后,运行生产者(producer.py)或者发送者命令

    python3 producer.py error "Run. Run. Or it will explode."

    发布/订阅(再改进版)

    从以上例子可以看出,直连交换机替代了扇型交换机,从只能盲目的广播消息改进为有可能选择性的接收日志。
    尽管直连交换机能够改善我们的系统,但是它也有它的限制 —— 没办法基于多个标准执行路由操作。通过使用主题交换机,可以监听来源于“cron”的严重程度为“critical errors”的日志,也可以监听来源于“kern”的所有日志。
    这里写图片描述

    图解:一个携带有 quick.orange.rabbit 的消息将会被分别投递给这两个队列。携带着 lazy.orange.elephant 的消息同样也会给两个队列都投递过去。另一方面携带有 quick.orange.fox 的消息会投递给第一个队列,携带有 lazy.brown.fox 的消息会投递给第二个队列。携带有 lazy.pink.rabbit 的消息只会被投递给第二个队列一次,即使它同时匹配第二个队列的两个绑定。携带着 quick.brown.fox 的消息不会投递给任何一个队列。

    如果我们违反约定,发送了一个携带有一个单词或者四个单词(”orange” or “quick.orange.male.rabbit”)的消息时,发送的消息不会投递给任何一个队列,而且会丢失掉。

    但是另一方面,即使 “lazy.orange.male.rabbit” 有四个单词,他还是会匹配最后一个绑定,并且被投递到第二个队列中。

    注意:此处使用主题交换机

    生产者(producer.py)或发送者全部代码

    #!/usr/bin/env python
    import pika
    import sys
    
    connection = pika.BlockingConnection(
                        pika.ConnectionParameters(host='localhost')
                        )
    
    channel = connection.channel()
    
    channel.exchange_declare(exchange='topic_logs', type='topic')
    
    routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='topic_logs',
                          routing_key=routing_key,
                          body=message
                          )
    print (" [x] Sent %r:%r" % (routing_key, message))
    connection.close()

    消费者(consumer.py)或接收者全部代码

    #!/usr/bin/env python
    import pika
    import sys
    
    connection = pika.BlockingConnection(
                        pika.ConnectionParameters(host='localhost')
                        )
    
    channel = connection.channel()
    
    channel.exchange_declare(exchange='topic_logs', type='topic')
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    binding_keys = sys.argv[1:]
    if not binding_keys:
        print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)
        sys.exit(1)
    
    for binding_key in binding_keys:
        channel.queue_bind(exchange='topic_logs',
                           queue=queue_name,
                           routing_key=binding_key)
    
    print (' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print (" [x] %r:%r" % (method.routing_key, body,))
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True
                          )
    
    channel.start_consuming()

    运行如下命令:

    首先,运行消费者(consumer.py)或者发送者命令

    python3 consumer.py "#"   #接收所有日志
    
    python3 consumer.py "kern.*"    #接收来自kern设备的日志
    
    python3 consumer.py "*.critical"  #只接收严重程度为critical的日志
    
    python3 consumer.py "kern.*" "*.critical"  #建立多个绑定
    

    然后,运行生产者(producer.py)或者发送者命令

    python3 producer.py "kern.critical" "A critical kernel error"

    参考文档:
    http://www.zixuebook.cn/rabbitmq/rabbitmq-python-intro.html
    http://rabbitmq.mr-ping.com/tutorials_with_python/[2]Work_Queues.html

  • 相关阅读:
    js获取当前网址Url
    ajax解决跨域问题
    微信小程序传递URL中含有特殊字符
    layui中的tab切换
    layer.confirm等事件X关闭与取消监听
    Java中的API方法总结
    sublime安装插件
    LNMP的基本配置
    LNMP环境的搭建
    LAMP安装细则
  • 原文地址:https://www.cnblogs.com/jpfss/p/10313016.html
Copyright © 2011-2022 走看看