zoukankan      html  css  js  c++  java
  • python 之 RabbitMQ

    介绍

    RabbitMQ是一个消息代理:它接受和转发消息。您可以将其视为邮局:当您将要发布的邮件放在邮箱中时,您可以确定邮递员最终会将邮件发送给您的收件人。在这个类比中,RabbitMQ是一个邮箱,邮局和邮递员。RabbitMQ和邮局之间的主要区别在于它不处理纸张,而是接受,存储和转发二进制blob数据。RabbitMQ所使用的 AMQP 0-9-1版本协议

    专业术语

    • Broker: 简单来说就是消息队列服务器实体

    • Exchange: 消息交换机,它指定消息按什么规则,路由到哪个队列

    • Queue: 消息队列载体,每个消息都会被投入到一个或多个队列

    • Binding: 绑定,它的作用就是把exchange和queue按照路由规则绑定起来

    • Routing Key: 路由关键字,exchange根据这个关键字进行消息投递

    • VHost: 虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。

    • Producer: 消息生产者,就是投递消息的程序

    • Consumer: 消息消费者,就是接受消息的程序

    • Channel: 消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务

          由Exchange、Queue、RoutingKey三个才能决定一个从Exchange到Queue的唯一的线路。

    python 实现Rabbitmq 消息的产生和消费

    #!/usr/bin/python
    # -*- coding: utf-8 -*-
    # @Author  : Yunhgu
    # @File    : announcer.py
    # @Software: Vscode
    # @Time    : 2021-04-07 11:11:47
    
    import pika
    import json
    import time
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    
    # 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。
    # durable = True 代表exchange持久化存储,False 非持久化存储
    channel.exchange_declare(exchange='python-exchange-direct',
                             durable=True, exchange_type='direct')
    
    for i in range(100):
        message = json.dumps({'OrderId': "1000%s" % i})
    # 向队列插入数值 routing_key是队列名。delivery_mode = 2 声明消息在队列中持久化,
    # delivery_mod = 1 消息非持久化。routing_key 不需要配置
        channel.basic_publish(exchange='python-exchange-direct',
                              routing_key='OrderId',
                              body=message,
                              properties=pika.BasicProperties(delivery_mode=2))
        print(message)
        time.sleep(2)
    connection.close()
    #!/usr/bin/python
    # -*- coding: utf-8 -*-
    # @Author  : Yunhgu
    # @File    : subscriber.py
    # @Software: Vscode
    # @Time    : 2021-04-07 11:15:13
    
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    # 创建临时队列,队列名传空字符,consumer关闭后,队列自动删除
    result = channel.queue_declare('python-direct', exclusive=True)
    # 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。
    # durable = True 代表exchange持久化存储,False 非持久化存储
    channel.exchange_declare(exchange='python-exchange-direct',
                             durable=True, exchange_type='direct')
    # 绑定exchange和队列  exchange 使我们能够确切地指定消息应该到哪个队列去
    channel.queue_bind(exchange='python-exchange-direct',
                       queue=result.method.queue,
                       routing_key='OrderId')
    # 定义一个回调函数来处理消息队列中的消息,这里是打印出来
    
    
    def callback(ch, method, properties, body):
        print(properties)
        ch.basic_ack(delivery_tag=method.delivery_tag)
        print(body.decode())
    
    
    # auto_ack设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。
    # True,无论调用callback成功与否,消息都被消费掉
    channel.basic_consume(result.method.queue, callback,
                          auto_ack=False)
    channel.start_consuming()
    不论你在什么时候开始,重要的是开始之后就不要停止。 不论你在什么时候结束,重要的是结束之后就不要悔恨。
  • 相关阅读:
    Unix命令大全
    vs2008 与 IE8出现的兼容性问题
    Java 创建文件、文件夹以及临时文件
    如何修改Wamp中mysql默认空密码
    PAT 乙级真题 1003.数素数
    Tags support in htmlText flash as3
    DelphiXE4 FireMonkey 试玩记录,开发IOS应用 还是移植
    10 Great iphone App Review sites to Promote your Apps!
    HTML tags in textfield
    Delphi XE4 IOS 开发, "No eligible applications were found“
  • 原文地址:https://www.cnblogs.com/yunhgu/p/14708228.html
Copyright © 2011-2022 走看看