zoukankan      html  css  js  c++  java
  • Python操作RabbitMQ

    什么叫消息队列

    消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。

    消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

    为何用消息队列

    从上面的描述中可以看出消息队列是一种应用间的异步协作机制,那什么时候需要使用 MQ 呢?

    以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知。在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,比如发放红包、发短信通知等。这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑。

    RabbitMQ 

    RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。需要在机算计上先安装RabbitMQ 

    rabbitMQ是一款基于AMQP协议的消息中间件,它能够在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性上表现优秀。使用消息中间件利于应用之间的解耦,生产者(客户端)无需知道消费者(服务端)的存在。而且两端可以使用不同的语言编写,大大提供了灵活性。

    官方文档 

    rabbitMQ工作模型

    简单模式

    # 生产者
    import pika
    # 创建连接
    connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
    # 创建通信对象
    channel = connection.channel()
    # 创建一个名为hello的队列,队列存在于rabbitMQ服务器中
    channel.queue_declare(queue='hello')
    
    channel.basic_publish(exchange='',
                          routing_key='hello', # 指定队列
                          body='Hello World!') # 发送消息
    
    print(" [x] Sent 'Hello World!'")
    # 关闭连接
    connection.close()
    

    消费者

    # 消费者
    import pika
    # 创建连接
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    # 创建通信对象
    channel = connection.channel()
    # 创建名为hello的队列,因为不知道是生产者先启动还是消费者先启动,
    # 所以两边都去创建队列,后启动的不会重复创建
    channel.queue_declare(queue='hello')
    
    
    def callback(ch, method, properties, body):
        # body就是从队列中取到的数据
        print(" [x] Received %r" % body)
    
    
    channel.basic_consume(callback,      # 回调函数
                          queue='hello', # 选择队列
                          no_ack=True)   # 是否不回应,默认True
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming() # 开始接收
    

    相关参数

    (1)
    no_ack=False
    # 消费者从队列中取出数据时,队列中的数据不会删除,直到消费者处理完数据发送信号才删除
    # 避免消费者死机数据丢失
    
    # 回调函数中发送信号
    ch.basic_ack(delivery_tag=method.delivery_tag)
    
    (2)
    durable = True
    # 生产者将消息发送至服务器时,服务器将数据保存到本地
    # 避免服务器死机队列中数据丢失
    在创建队列时设置durable = True
    在向对列中发送数据时添加
     properties=pika.BasicProperties(
                              delivery_mode=2, # make message persistent
                          )
    

    完成1,2的生产者消费者

    # 生产者
    #!/usr/bin/env python
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
    channel = connection.channel()
    
    # make message persistent
    channel.queue_declare(queue='hello', durable=True)
    
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body='Hello World!',
                          properties=pika.BasicProperties(
                              delivery_mode=2, # make message persistent
                          ))
    print(" [x] Sent 'Hello World!'")
    connection.close()
    
    
    # 消费者
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
    channel = connection.channel()
    
    # make message persistent
    channel.queue_declare(queue='hello', durable=True)
    
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        import time
        time.sleep(10)
        print 'ok'
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=False)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    

    (3) 消息获取顺序

    默认消息队列里的数据是按照顺序被消费者拿走,消费者1,2,3只能依次取数据,但是会由于1,2,3处理数据的速度不同,导致后面的消费者处于等待状态

    channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按顺序取

    # 消费者模型
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
    channel = connection.channel()
    
    # make message persistent
    channel.queue_declare(queue='hello')
    
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        import time
        time.sleep(10)
        print 'ok'
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    channel.basic_qos(prefetch_count=1) # 消费者不再按顺序取
    
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=False)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

    exchange模型

    发布订阅

    发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

    exchange_type = fanout
    

    生产者

    # 生产者
    import pika
    import sys
    # 创建连接
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    # 创建通信对象
    channel = connection.channel()
    # 创建一个名为logs的交换机,模式为fanout
    channel.exchange_declare(exchange='logs',
                             exchange_type='fanout')
    
    message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    channel.basic_publish(exchange='logs', # 交换模式
                          routing_key='', # 关键字为空
                          body=message)
    print(" [x] Sent %r" % message)
    connection.close()
    

    消费者

    # 消费者
    import pika
    # 创建连接
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    # 创建通信对象
    channel = connection.channel()
    # 创建一个名为logs的交换机,模式为fanout,与创建队列相同不知道谁先启动
    channel.exchange_declare(exchange='logs',
                             exchange_type='fanout')
    # 生成一个随机队列
    result = channel.queue_declare(exclusive=True)
    # 拿到队列名
    queue_name = result.method.queue
    
    channel.queue_bind(exchange='logs', # 绑定交换机与队列的关系
                       queue=queue_name)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()
    

    关键字发送

    之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列

     exchange_type = direct
    

    消费者

    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='direct_logs',
                             exchange_type='direct')
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    severities = "info"
    
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity) # 队列的关键字,可以为一个队列绑定多个关键字
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()
    
    # 在生产者发送消息时,可以通过关键字发送给指定队列
    channel.basic_publish(exchange='logs',
                          routing_key='info',
                          body=message)
    

    模糊匹配

    exchange_type = topic
    

    在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

    • # 表示可以匹配 0 个 或 多个 单词
    • *  表示只能匹配 一个 单词
      import pika
      import sys
      
      connection = pika.BlockingConnection(pika.ConnectionParameters(
              host='localhost'))
      channel = connection.channel()
      
      channel.exchange_declare(exchange='direct_logs',
                               exchange_type='topic')
      
      result = channel.queue_declare(exclusive=True)
      queue_name = result.method.queue
      
      severities = "info.#"
      
      channel.queue_bind(exchange='topic_logs',
                         queue=queue_name,
                         routing_key=severity) # 发送是关键字,而是发送一个可匹配的字符
      
      print(' [*] Waiting for logs. To exit press CTRL+C')
      
      def callback(ch, method, properties, body):
          print(" [x] %r:%r" % (method.routing_key, body))
      
      channel.basic_consume(callback,
                            queue=queue_name,
                            no_ack=True)
      
      channel.start_consuming()
      

        

  • 相关阅读:
    C# 图片与Base64的相互转化
    LeetCode 303. Range Sum Query – Immutable
    LeetCode 300. Longest Increasing Subsequence
    LeetCode 292. Nim Game
    LeetCode 283. Move Zeroes
    LeetCode 279. Perfect Squares
    LeetCode 268. Missing Number
    LeetCode 264. Ugly Number II
    LeetCode 258. Add Digits
    LeetCode 257. Binary Tree Paths
  • 原文地址:https://www.cnblogs.com/wwg945/p/8678542.html
Copyright © 2011-2022 走看看