zoukankan      html  css  js  c++  java
  • (转)Python操作RabbitMq详解

    原文:https://blog.csdn.net/weixin_45144837/article/details/104335115

    一、简介:
    RabbitMq是实现了高级消息队列协议(AMQP)的开源消息代理中间件。消息队列是一种应用程序对应用程序的通行方式,应用程序通过写消息,将消息传递与队列,由另一应用程序读取完成通信,而作为中间件的RabbitMq无疑是目前最流行的消息队列之一

    二、VirtualHost
    RabbitMq的VirtualHost(虚拟消息服务器),每个VirtualHost相当于一个相对独立的RabbitMQ服务器;每个VirtualHost之间是相互隔离的,exchange、queue、message不能互通。

    添加用户

    下面给大家介绍权限也就是方框中的内容,

    超级管理员(admin):可登录管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作
    监控者(monitoring):可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
    策略制定者(policymaker):可登陆管理控制台
    拿数据库(用MySQL)来类比:RabbitMq相当于MySQL,RabbitMq中的VirtualHost就相当于MySQL中的一个库。
    管理管理者(management):仅可登录管理控制台,无法看到节点信息,也无法对策略进行管理
    其他:无法登录管理控制台,通常就是普通的生产者和消费者
    三、RabbitMq的应用场景
    系统的高可用:日常生活当中各种商城秒杀,高流量、高并发的场景,当服务器接受到如此大量请求处理业务时,有宕机的风险,某些业务可能极其复杂,但这部分不是高时效性,不需要立即反馈给用户,我们可以将这部分处理请求抛给队列,让程序后置去处理,减轻服务器在高并发场景下的压力
    分布式系统,集成系统,子系统之间的对接,以及架构设计中常常需要考虑消息队列的应用
    四、RabbitMq中的Connection和Channel
    我们知道无论是生产者还是消费者,都需要RabbitMq Broker 建立连接,这个连接就是一条TCP连接,也就是Connection
    一旦TCP连接建立起来,客户端紧接着就可以创建一个AMQP信道(Channel),每个信道都会被指派一个唯一的ID
    信道是建立在Connection之上的虚拟连接,RabbitMq处理的每条AMQP指令都是通过信道完成的。


    后续。。。。

    五、RabbitMq生产者消费者模型
    生产者(producter) 队列消息的产生者,复制生产消息,并将消息传入队列
    生产者代码:

    import pika
    import json

    credentials = pika.PlainCredentials('lvhua','123456')#mq用户名和密码,用于认证
    #虚拟队列需要指定参数virtual_host,如果是默认的可以不填
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',port=5672,virtual_host='/',credentials=credentials))
    channel = connection.channel()# 创建一个AMQP信道

    #声明队列,并设置durable为True,为了避免rabbitMq-server挂掉数据丢失,将durable设为True
    channel.queue_declare(queue='1',durable=True)
    for i in range(10): # 创建10个q
    message = json.dumps({'OrderId':"1000%s"%i})
    # exchange表示交换器,可以精确的指定消息应该发到哪个队列中,route_key设置队列的名称,body表示发送的内容
    channel.basic_publish(exchange='',routing_key='1',body=message)
    print(message)
    connection.close()
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    消费者(consumer):队列消息的接收者,扶着接收并处理消息队列中的消息

    import pika
    credentials = pika.PlainCredentials('lvhua','123456')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost',
    port=5672,
    virtual_host='/',
    credentials=credentials
    ))
    channel = connection.channel()
    #声明消息队列,消息在这个队列中传递,如果不存在,则创建队列
    channel.queue_declare(queue='1',durable=True)
    # 定义一个回调函数来处理消息队列中消息,这里是打印出来
    def callback(ch,method,properties,body):
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print(body.decode())
    #告诉rabbitmq,用callback来接收消息
    channel.basic_consume('1',callback)
    #开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
    channel.start_consuming()
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    六、RabbitMq持久化
    MQ默认建立的临时的queue和exchange,如果不声明持久化,一旦rabbitmq挂掉,queue,exchange将会全部丢失,所以我们一般在创建queue或者exchange的时候会声明持久化
    1.queue声明持久化

    # 声明消息队列,消息将在这个队列传递,如不存在,则创建。durable = True 代表消息队列持久化存储,False 非持久化存储
    result = channel.queue_declare(queue = 'python-test',durable = True)
    1
    2
    exchange声明持久化
    # 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建.durable = True 代表exchange持久化存储,False 非持久化存储
    channel.exchange_declare(exchange = 'python-test', durable = True)
    1
    2
    注意:如果已存在一个非持久化的queue或exchange,执行上述代码会报错,因为当前状态不能更该queue 或 exchange存储属性,需要删除重建,如果queue和exchange中一个声明了持久化,另一个没有声明持久化,则不允许绑定

    消息持久化
    虽然exchange和queue都声明了持久化,但如果消息只存在内存里,rabbitmq重启后,内存里的东西还是会丢失,所以必须声明消息也是持久化,从内存转存到到硬盘

    # 向队列插入数值 routing_key是队列名。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化
    channel.basic_publish(exchange = '',routing_key = 'python-test',body = message, properties=pika.BasicProperties(delivery_mode = 2))
    1
    2
    acknowledgement消息不丢失
    消费者(consume)调用callback函数时,会存在处理消息失败的风险,如果处理失败,则消息会丢失,但是也可以选择消费者处理失败时,将消息回退给rabbitmq,重新再被消费者消费,这个时候需要设置确认标识。
    channel.basic_consume(callback,queue = 'python-test',
    # no_ack 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉
    no_ack = False)
    1
    2
    3
    七、RabbitMq发布与订阅
    在上一章中,我们创建了一个工作队列,工作队列模式的设想是每一条消息只会被转发给一个消费者。本章将会讲解完全不一样的场景: 我们会把一个消息转发给多个消费者,这种模式称之为发布-订阅模式。
    RabbitMq消息模式的核心思想是:一个生产者并不会直接往一个队列中发送消息,事实上,生产者根本不知道它发送的消息将被转发到哪些队列。
    实际上,生产者只能把消息发送给一个exchange,exchange只做一件简单的事情:一方面它们接收从生产者发送过来的消息,另一方面,它们把接收到的消息推送给队列。一个exchage必须清楚地知道如何处理一条消息.  
    rabbitmq的发布与订阅要借助交换机(Exchange)的原理实现:

    Exchange 一共有三种工作模式:fanout, direct, topicd

    模式一:fanout
    这种模式下,传递到exchange的消息将会==转发到所有于其绑定的queue上

    不需要指定routing_key,即使指定了也是无效的。
    需要提前将exchange和queue绑定,一个exchange可以绑定多个queue,一个queue可以绑定多个exchange。
    需要先启动订阅者,此模式下的队列是consume随机生成的,发布者仅仅发布消息到exchange,由exchange转消息至queue。
    exchange交换器
    首先我们创建一个fanout类型的交换器,我们称之为:python-test:

    channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='fanout')
    1
      广播模式交换器很简单,从字面意思也能理解,它其实就是把接收到的消息推送给所有它知道的队列。
      想查看当前系统中有多少个exchange,可以从控制台查看
      可以看到有很多以amq.*开头的交换器,以及(AMQP default)默认交换器,这些是默认创建的交换器。
      在前面,我们并不知道交换器的存在,但是依然可以将消息发送到队列中,那其实并不是因为我们可以不使用交换器,实际上是我们使用了默认的交换器(我们通过指定交换器为字字符串:""),回顾一下我们之前是如何发送消息的:

    channel.basic_publish(exchange='',routing_key='1',body=message)
    1
      第一个参数是交换器的名字,空字符串表示它是一个默认或无命名的交换器,消息将会由指定的路由键(第二个参数,routingKey,后面会讲)转发到队列。
      你可能会有疑问:既然exchange可以指定为空字符串(""),那么可否指定为null?
        答案是:不能!

      通过跟踪发布消息的代码,在AMQImpl类中的Publish()方面中,可以看到,不光是exchange不能为null,同时routingKey路由键也不能为null,否则会抛出异常:

    临时队列
    在前面的例子中,我们使用的队列都是有具体的队列名,创建命名队列是很必要的,因为我们需要将消费者指向同一名字的队列。因此,要想在生产者和消费者中间共享队列就必须要使用命名队列。

    发布者:
    import pika
    import json

    credentials = pika.PlainCredentials('lvhua', '123456') # mq用户名和密码
    # 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
    connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost',port = 5672,virtual_host = '/',credentials = credentials))
    channel=connection.channel()
    # 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储
    channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='fanout')
    for i in range(10):
    message=json.dumps({'OrderId':"1000%s"%i})
    # 向队列插入数值 routing_key是队列名。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化。routing_key 不需要配置
    channel.basic_publish(exchange = 'python-test',routing_key = '',body = message,
    properties=pika.BasicProperties(delivery_mode = 2))
    print(message)
    connection.close()
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    订阅者1
    import pika

    credentials = pika.PlainCredentials('lvhua', '123456')
    connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost',port = 5672,virtual_host = '/',credentials = credentials))
    channel = connection.channel()
    # 创建临时队列,队列名传空字符,consumer关闭后,队列自动删除
    result = channel.queue_declare('4')
    # 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储
    channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='fanout')
    # 绑定exchange和队列 exchange 使我们能够确切地指定消息应该到哪个队列去
    channel.queue_bind(exchange = 'python-test',queue = "4")
    # 定义一个回调函数来处理消息队列中的消息,这里是打印出来
    def callback(ch, method, properties, body):
    ch.basic_ack(delivery_tag = method.delivery_tag)
    print(body.decode())

    channel.basic_consume(result.method.queue,callback,# 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉
    auto_ack = False)
    channel.start_consuming()
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    订阅者2
    import pika

    credentials = pika.PlainCredentials('lvhua', '123456')
    connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost',port = 5672,virtual_host = '/',credentials = credentials))
    channel = connection.channel()
    # 创建临时队列,队列名传空字符,consumer关闭后,队列自动删除
    result = channel.queue_declare('2')
    # 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储
    channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='fanout')
    # 绑定exchange和队列 exchange 使我们能够确切地指定消息应该到哪个队列去
    channel.queue_bind(exchange = 'python-test',queue = "2")
    # 定义一个回调函数来处理消息队列中的消息,这里是打印出来
    def callback(ch, method, properties, body):
    ch.basic_ack(delivery_tag = method.delivery_tag)
    print(body.decode())

    channel.basic_consume(result.method.queue,callback,# 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉
    auto_ack = False)
    channel.start_consuming()
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    模式二:direct
    这种工作模式的原理是消息发送至exchange,exchange根据**路由键(routing_key)**转发到相对应的queue上。

    可以使用默认exchange=’ ',也可以自定义exchange
    这种模式下不需要将exchange和任何进行绑定,当然绑定也是可以的,可以将exchange和queue,routing_key和queue进行绑定
    传递或接收消息时,需要指定routing_key
    需要先启动订阅者,此模式下队列是consumer随机生成的,发布者仅仅发布消息到exchange,由exchange转发消息至queue。
    发布者:
    import pika
    import json

    credentials = pika.PlainCredentials('shampoo', '123456') # mq用户名和密码
    # 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
    connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
    channel=connection.channel()
    # 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储
    channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='direct')

    for i in range(10):
    message=json.dumps({'OrderId':"1000%s"%i})
    # 指定 routing_key。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化
    channel.basic_publish(exchange = 'python-test',routing_key = 'OrderId',body = message,
    properties=pika.BasicProperties(delivery_mode = 2))
    print(message)
    connection.close()
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    订阅者:
    import pika

    credentials = pika.PlainCredentials('shampoo', '123456')
    connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
    channel = connection.channel()
    # 创建临时队列,队列名传空字符,consumer关闭后,队列自动删除
    result = channel.queue_declare('',exclusive=True)
    # 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储
    channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='direct')
    # 绑定exchange和队列 exchange 使我们能够确切地指定消息应该到哪个队列去
    channel.queue_bind(exchange = 'python-test',queue = result.method.queue,routing_key='OrderId')
    # 定义一个回调函数来处理消息队列中的消息,这里是打印出来
    def callback(ch, method, properties, body):
    ch.basic_ack(delivery_tag = method.delivery_tag)
    print(body.decode())


    #channel.basic_qos(prefetch_count=1)
    # 告诉rabbitmq,用callback来接受消息
    channel.basic_consume(result.method.queue,callback,
    # 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉
    auto_ack = False)
    channel.start_consuming()
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    模式三:topicd
      这种模式和第二种差不多,exchange也是通过路由键routing_key来转发消息到指定的queue。不同之处在于:
    **routing_key使用正则表达式支持模糊匹配,**但匹配规则又与常规正则表达式不同,比如"#"是匹配全部,“*”是匹配一个词。
    举例:routing_key =“#orderid#”,意思是将消息转发至所有 routing_key 包含 “orderid” 字符的队列中。代码和模式二 类似,

  • 相关阅读:
    JAVA版SqlHelper
    JAVA中的继承特点1
    C# SqlBulkCopy类批量导入数据
    动态注册HttpModule管道,实现global.asax功能
    实现自己的前端模板轻量级框架
    事务消息中心-TMC
    Win10应用设计的那些事儿
    考拉定时任务框架kSchedule
    如何玩转基于风险的测试
    谈谈Java异常处理这件事儿
  • 原文地址:https://www.cnblogs.com/liujiacai/p/15489453.html
Copyright © 2011-2022 走看看