zoukankan      html  css  js  c++  java
  • RabbitMQ基本使用

     
    一、基本使用
    生产者(producter):
    import pika
    import json
    
    # 生成证书
    credentials = pika.PlainCredentials('shampoo', '123456')  # mq用户名和密码
    # 创建一个实例(虚拟队列需要指定参数 virtual_host,如果是默认的可以不填)
    connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
    # 生成一个管道
    channel=connection.channel()
    # 声明消息队列,消息将在这个队列传递,如不存在,则创建
    result = channel.queue_declare(queue = 'python-test')
    
    for i in range(10):
        message=json.dumps({'OrderId':"1000%s"%i})
        # 向队列插入消息
        channel.basic_publish(exchange = '',routing_key = 'python-test',body = message)   # routing_key是队列名
        print(message)
    connection.close()
    消费者(consumer):
    import pika
    credentials = pika.PlainCredentials('shampoo', '123456')
    connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials))
    channel = connection.channel()
    # 申明消息队列,如果不存在,则创建队列
    channel.queue_declare(queue = 'python-test', durable = False)
    # 定义一个回调函数来处理消息队列中的消息
    def callback(ch, method, properties, body):   # 四个参数为标准格式
         print(body.decode())
         ch.basic_ack(delivery_tag = method.delivery_tag)  # 告诉生成者,消息处理完成
       
    # 收到消息,就用callback来处理消息
    channel.basic_consume('python-test',callback)
    # 开始接收信息,并进入阻塞状态,队列里有消息才会调用callback进行处理
    channel.start_consuming()
    二、分发机制
    1.上面的只是一个生产者、一个消费者,能不能一个生产者多个消费者呢?
    可以用上面的例子,多启动几个消费者consumer,看一下消息的接收情况。
    RabbitMQ会采用轮询机制把消息依次分发
     
    2.假如消费者处理消息需要15秒,如果当机了,那这个消息处理明显还没处理完,怎么处理?
    你没给我回复确认,就代表消息没处理完。
    上面的效果消费端断了就转到另外一个消费端去了,但是生产者怎么知道消费端断了呢?
    因为生产者和消费者是通过socket连接的,socket断了,就说明消费端断开了。
     
    3.上面的模式只是依次分发,实际情况是机器配置不一样。怎么设置类似权重的操作?
    就是RabbitMQ给消费者发消息的时候检测下消费者里的消息数量,如果超过指定值(比如1条),就不给你发了。
    只需要在消费者端,channel.basic_consume前加上如下配置:
    channel.basic_qos(prefetch_count=1) # 类似权重,按能力分发,如果有一个消息,就不在给你发
    channel.basic_consume( # 消费消息
     
    三、RabbitMq 持久化
    MQ默认建立的是临时 queue 和 exchange,如果不声明持久化,一旦 rabbitmq 挂掉,queue、exchange 将会全部丢失。
    所以我们一般在创建 queue 或者 exchange 的时候会声明 持久化。
    1.queue 声明持久化
    # 声明消息队列,如不存在,则创建。durable = True 代表消息队列持久化存储,False 非持久化存储
    result = channel.queue_declare(queue = 'python-test',durable = True)
    注意每个队列都得写,客户端、服务端声明的时候都得写。
     
    2.exchange 声明持久化
    # 声明exchange,如不存在,则创建. durable = True 代表exchange持久化存储,False 非持久化存储
    channel.exchange_declare(exchange = 'python-test', durable = True)
    注意:如果已存在一个非持久化的 queue 或 exchange ,执行上述代码会报错,因为当前状态不能更改 queue 或 exchange 存储属性,需要删除重建。
    如果 queue 和 exchange 中一个声明了持久化,另一个没有声明持久化,则不允许绑定。
     
     3.消息持久化
    虽然 exchange 和 queue 都申明了持久化,但如果消息只存在内存里,rabbitmq 重启后,内存里的东西还是会丢失。所以必须声明消息也是持久化,从内存转存到硬盘。
    # 向队列插入数值 routing_key是队列名。delivery_mode = 2 声明消息持久化,delivery_mod = 1 消息非持久化
    channel.basic_publish(exchange = '',routing_key = 'python-test',body = message, properties=pika.BasicProperties(delivery_mode = 2))
     
    4.acknowledgement 消息不丢失
    消费者(consumer)调用callback函数时,会存在处理消息失败的风险,如果处理失败,则消息丢失。但是也可以选择消费者处理失败时,将消息回退给 rabbitmq ,重新再被消费者消费,这个时候需要设置确认标识。
    channel.basic_consume(callback,queue = 'python-test', no_ack = False) # no_ack 设置成 False,未收到确认标识,消息会重回队列。True,如果接收消息后,机器宕机消息就丢了
     
     

  • 相关阅读:
    POJ 1948 Triangular Pastures
    2018ACM/ICPC 青岛现场赛 E题 Plants vs. Zombies
    三大博弈
    ACM-ICPC 2018年北京网络赛 D-80 days
    hdu 2062 Subset sequence
    转-利用 Python 练习数据挖掘
    内联函数
    C++中冒号(:)和双冒号(::)的用法
    理性,感性和爱
    修改IE8搜索框为指定搜索引擎,如CSDN、百度知道等
  • 原文地址:https://www.cnblogs.com/absoluteli/p/13986010.html
Copyright © 2011-2022 走看看