zoukankan      html  css  js  c++  java
  • python浅学【网络服务中间件】之RabbitMQ

    一、关于AMQP:

    AMQP,即Advanced Message Queuing Protocol,高级消息队列协议。

    AMQP使符合要求的客户端应用程序能够与符合要求的消息传递中间件代理进行通信。

    AMQP是一种新的消息传递中间件开放标准。它解决了如何跨LANS和WANS连接应用程序的问题。在AMQP之前,没有相互连接应用程序的标准,这是大型软件体系结构中比较昂贵的部分之一。AMQP是一个线级协议,外加一个用于路由和消息排队的模型。它包括非常高性能的发布-订阅(通过单个代理的速度可达每秒150k条消息)和高可靠性的消息传递(无论如何都保证交付)。

    AMQP是一种可编程的协议:

    定义了三种实体(对象)
      queues, exchanges, bindings
      queues :队列,存储信息,用于消费
      exchanges : 消息中转站,包含多种类型
      bindings :消息转发规则,定义了route,规定怎么把消息发送到队列

    二、关于RabbitMQ:

    RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

    消息代理的应用:

      (1)将消息路由到一个或多个目的地
      (2)将消息转换为替代表示
      (3)执行消息聚合,将消息分解为多个消息并将其发送到目的地,然后将响应重新组合成一条消息以返回给用户
      (4)与外部的存储库交互以扩充消息或存储消息
      (5)调用web服务以检索数据
      (6)回应事件或错误
      (7)使用发布订阅模式提供内容和基于主题的消息路由

    三、RabbitMQ交换机exchanges的四种类型:

    exchanges 的属性:
      name 名字
      durability 持久化
      auto-delete 自动删除(所有队列都解除绑定的时候)

    exchanges 类型: direct exchange,fanout exchange,topic exchange,headers exchange

    1、direct exchange:
      queue创建时,绑定一个同名的routing key
      用途:把任务分配给多个workers, 每个work做特定工作,比如写日志

     2、fanout exchange:
      传递消息到每一个queue,忽略routeing key
      用途:多处记录日志,统一发布通知,球赛更新比分, 分布式系统更新配置信息

    3、topic exchange:
      根据规则匹配相应的queue, 实现订阅发布
      用途:根据不同标签更新新闻,根据位置信息提供商品

    4、Headers Exchange
      根据多个属性当做消息头,忽略routing key, 需要开发者定义更多内容
      用途:当direct 和routeing key 不是字符串时,可使用这个自定义属性匹配

    四、python简单操作RabbitMQ:

    client端:消费信息

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    # msg_worker.py
    # Author:Riy
    
    
    import time
    import pika
    
    
    # 连接服务
    config = {
        "username": "root",
        "password": "root",
        "host": "127.0.0.1",
        "port": ""
    }
    creds = pika.PlainCredentials(config["username"], config["password"])
    params = pika.ConnectionParameters(config["host"], credentials=creds)
    connection = pika.BlockingConnection(params)
    
    # 创建队列
    channel = connection.channel()
    channel.queue_declare(queue="msg_queue", durable=True)
    
    # 接收,消费消息
    def callback(ch, method, properties, body):
        print(f"收到消息:{body}")
        time.sleep(body.count(b"-"))
        print('ok')
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    
    # 均衡任务
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume("msg_queue", callback)
    
    channel.start_consuming()

    server端:发送信息

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    # msg_pub.py
    # Author:Riy
    
    
    import sys
    import pika
    
    
    # 连接服务
    config = {
        "username": "root",
        "password": "root",
        "host": "127.0.0.1",
        "port": ""
    }
    creds = pika.PlainCredentials(config["username"], config["password"])
    params = pika.ConnectionParameters(config["host"], credentials=creds)
    connection = pika.BlockingConnection(params)
    
    # 创建队列
    channel = connection.channel()
    channel.queue_declare(queue="msg_queue", durable=True)
    
    # 发送消息
    msg = " ".join(sys.argv[1:]) or "new msg from Riy"
    channel.basic_publish(exchange="", 
                        routing_key="msg_queue",
                        body=msg,
                        properties=pika.BasicProperties(
                            delivery_mode=2 # TODO
                        ))
    
    print(f"发送消息:{msg}")
    connection.close()
  • 相关阅读:
    js实现对身份证校验
    zip解压缩
    zip压缩
    文件内容编解码
    文件的操作
    Mysql账号管理
    深度优先算法DFS
    Java常见知识问答
    Hibernate的单向OneToMany、单向ManyToOne
    Angularjs在线编辑器
  • 原文地址:https://www.cnblogs.com/riyir/p/12555182.html
Copyright © 2011-2022 走看看