zoukankan      html  css  js  c++  java
  • Python与RabbitMQ交互

    RabbitMQ 消息队列

      成熟的中间件RabbitMQ、ZeroMQ、ActiveMQ等等

      RabbitMQ使用erlang语言开发,使用RabbitMQ前要安装erlang语言

      RabbitMQ允许不同应用、程序间交互数据

      python中的Threading queue只能允许单进程内多线程交互的

      python中的MultiProcessing queue只能允许父进程与子进程或同父进程的多个子进程交互


    RabbitMQ启动:
      1.windows中默认安装成功,在服务列表中会显示自动启动
      2.Linux中使用命令rabbitmq-server start

    RabbitMQ支持不同的语言,对于不同语言有相应的模块,这些模式支持使用开发语言连接RabbitMQ
    Python连接RabbitMQ模块有:
      1.pika主流模块
      2.Celery分布式消息队列
      3.Haigha提供了一个简单的使用客户端库来与AMQP代理进行交互的方法


    使用RabbitMQ前,首先阅读开始文档: http://www.rabbitmq.com/getstarted.html


    简单的发送接收实例
      默认情况下,使用同一队列的进程,接收消息方使用轮询的方式,依次获取消息
      对于一条消息的接收来说,只有当接收方收到消息,并处理完消息,给RabbitMQ发送ack,队列中的消息才会删除
      如果在处理的过程中socket断开,那么消息自动转接到下一个接收方

     producer.py

    __author__ = 'Cq'
    
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
                   'localhost'))
    
    #声明一个管道
    channel = connection.channel()
    
    #声明queue,这个队列在RabbitMQ中生成,发送方和接收方使用同一个队列
    channel.queue_declare(queue='hello2', durable=True)
    
    #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
    channel.basic_publish(exchange='',
                          routing_key='hello2',#队列名称
                          body='Hello World!',
                          properties=pika.BasicProperties(
                             delivery_mode = 2, # make message persistent
                          )
                        )#body消息内容
    print(" [x] Sent 'Hello World!'")
    connection.close()
    View Code

    consumer.py

    __author__ = 'Cq'
    
    import pika
    import time
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
                   'localhost'))
    channel = connection.channel()
    
    
    #You may ask why we declare the queue again ‒ we have already declared it in our previous code.
    # We could avoid that if we were sure that the queue already exists. For example if send.py program
    #was run before. But we're not yet sure which program to run first. In such cases it's a good
    # practice to repeat declaring the queue in both programs.
    #发送方和接收方不知道谁首先连接到RabbitMQ,双方连接上来都先声明一个队列
    channel.queue_declare(queue='hello2', durable=True)
    
    def callback(ch, method, properties, body):
        print("recived message...")
        # time.sleep(30)
        print(" [x] Received %r" % body)
        #处理完成消息后,主动要向RabbitMQ发送ack
        ch.basic_ack(delivery_tag=method.delivery_tag)
        #ch -->  管道内存对象的地址
        #method --> 指定各种参数
        #properties -->
        #python3 socket等发送网络包都是byte格式
    
    #如果队列里还有1条消息未处理完,将不能接收新的消息
    channel.basic_qos(prefetch_count=1)
    
    #声明接收收消息变量
    channel.basic_consume(callback,#收到消息后执行的回调函数
                          queue='hello2',)
                         #no_ack=True)#执行完callback函数后,默认会发送ack给RabbitMQ
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    #开始接收消息,不停循环接收,没有消息挂起等待
    channel.start_consuming()
    View Code

    在RabbitMQ中查看当前队列数
      1.windows中查看队列
      在RabbitMQ安装目录下,sbin下有个管理工具rabbitmqctl.bat可以查看队列和队列中的消息数
      E:RabbitMQ Server abbitmq_server-3.6.14sbin>rabbitmqctl.bat list_queues
      Listing queues
      hello 1

    消息持久化
    如果当RabbitMQ服务器宕机了,不允许为处理的消息丢失时
      1.需要在声明队列时,声明为持久队列,只是队列持久化,消息未能持久化
        channel.queue_declare(queue='hello',durable=True)

      2.需要在发送端发送消息时声明
        channel.basic_publish(exchange='',
        routing_key='hello', #队列名称
        body='Hello World!', #body消息内容
        properties=pika.BasicProperties(
        delivery_mode = 2, # make message persistent
        #..这里可以添加附带参数,客户的通过回调函数的位置参数prop.参数名获取
        ))

    消息处理配置
      对于不同性能的机器,处理消息量大小不同
      判断接收方消息队列里是否有未处理的消息,如果队列里还有1条消息未处理完,将不能接收新的消息
      channel.basic_qos(prefetch_count=1)

    发送广播消息
      使用exchange,exchange的类型决定如果发送广播消息,它就是一个转发器
        类型:
          fanout: 所有bind到此exchange的queue都可以接收消息
          direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
          topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
          headers: 通过headers 来决定把消息发给哪些queue


      fanout纯广播,只要bind到exchange的queue都能收到广播消息
        ☆发送的消息只广播发送一次
        channel.exchange_declare(exchange='log', type='fanout')
        channel.basic_publish(exchange='log',
        routing_key='',
        body=message)

      实例:

      fanout_producer.py

    __author__ = 'Cq'
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='logs',
                             exchange_type='fanout')
    
    message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    channel.basic_publish(exchange='logs',
                          routing_key='',
                          body=message)
    print(" [x] Sent %r" % message)
    connection.close()
    View Code

      fanout_consumer.py

    __author__ = 'Cq'
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='logs',
                             exchange_type='fanout')
    
    #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
    #此queue名唯一,且只接收广播消息,当不需要接收时,能自动销毁
    result = channel.queue_declare(exclusive=True)
    #不需要queue名,只要绑定到转发器就能接收消息
    
    queue_name = result.method.queue
    
    channel.queue_bind(exchange='logs',
                       queue=queue_name)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()
    View Code

      topic过滤内容广播,队列只接收关心的消息

      实例:

      topic_producer.py

    __author__ = 'Cq'
    
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='topic_logs1',
                             exchange_type='topic')
    
    #默认发送的消息格式为xxx.info
    severity = sys.argv[1] if len(sys.argv) > 1 else 'test_message.info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='topic_logs1',
                          routing_key=severity,
                          body=message)
    print(" [x] Sent %r:%r" % (severity, message))
    View Code

      topic_consumer.py

    __author__ = 'Cq'
    
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='topic_logs1',
                             exchange_type='topic')
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    severities = sys.argv[1:]
    if not severities:
        sys.stderr.write("Usage: %s [info] [warning] [error]
    " % sys.argv[0])
        sys.exit(1)
    
    for severity in severities:
        channel.queue_bind(exchange='topic_logs1',
                           queue=queue_name,
                           routing_key=severity)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()
    View Code

           过滤条件设置

    To receive all the logs run:
    python receive_logs_topic.py "#"
    
    To receive all logs from the facility "kern":
    python receive_logs_topic.py "kern.*"
    
    Or if you want to hear only about "critical" logs:
    python receive_logs_topic.py "*.critical"
    
    You can create multiple bindings:
    python receive_logs_topic.py "kern.*" "*.critical"
    
    And to emit a log with a routing key "kern.critical" type:
    python emit_log_topic.py "kern.critical" "A critical kernel error"
    View Code
    发送端
        python topic_producer.py xxx.info         messagexxxx
        python topic_producer.py xxx.warngin   messagexxxx
        python topic_producer.py xxx.error       messagexxxx
    
    接收端
        python topic_consumer.py *.info
        python topic_consumer.py *.warngin
        python topic_consumer.py *.error
        python topic_consumer.py *.*
    View Code

    参考博客:http://www.cnblogs.com/alex3714/articles/5248247.html

  • 相关阅读:
    关于观察者模式和发布/订阅模式
    git:error: Your local changes to the following files would be overwritten by merge:
    node中几个路径的梳理
    centOS 开启服务器后无法访问(大坑啊)
    文件上传简记
    自建nodejs服务器(一:有个服务器)
    nodejs上使用sql
    express笔记
    windows下node配置npm全局路径(踩坑)
    DropMaster
  • 原文地址:https://www.cnblogs.com/cq146637/p/8075211.html
Copyright © 2011-2022 走看看