zoukankan      html  css  js  c++  java
  • 消息队列持久化

    https://blog.csdn.net/appleyuchi/article/details/79190113

    队列和消息是两个概念?

     假如消息队列test里面还有消息等待消费者(consumers)去接收,但是这个时候服务器端宕机了,这个时候消息是否还在?

     1、队列消息非持久化

     

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    import pika
     
    # 声明一个socket 实例
    connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    # 声明一个管道
    channel = connect.channel()
    # 声明queue名称为test
    channel.queue_declare(queue="test")
     
    #RabbitMQ的消息永远不会被直接发送到队列中,它总是需要经过一次交换
    channel.basic_publish(exchange='',
                          routing_key="test",
                          body="hello word")
     
    print("Sent 'hello world'")
     
    connect.close()

     

     

     

    channel.queue_declare(queue="test") 

     

     

     

    ①队列持久化很简单,只需要在服务端(produce)声明queue的时候添加一个参数:

    channel.queue_declare(queue='shuaigaogao', durable=True# durable=True 持久化

     

    channel.basic_publish(exchange="",
                          routing_key="shuaigaogao"#queue的名字
                          body="hello world",   #body是要发送的内容
                          properties=pika.BasicProperties(delivery_mode=2,) # make message persistent=>使消息持久化的特性
                          )

     

    1
    channel.queue_declare(queue='shuaigaogao', durable=True)

    小结:

    RabbitMQ在服务端没有声明队列和消息持久化时,队列和消息是存在内存中的,服务端宕机了,队列和消息也不会保留。

    • 服务端声明持久化,客户端想接受消息的话,必须也要声明queue时,也要声明持久化,不然的话,客户端执行会报错。

     以上两句是整篇文章的重中之重!!!

     

    RabbitMQ 消息公平分发 

    可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。

     

    channel.basic_qos(prefetch_count=1)

     

    服务端:

    import pika
     
    # 声明一个socket 实例
    connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    # 声明一个管道
    channel = connect.channel()
    # 声明queue名称为test
    channel.queue_declare(queue="test", durable=True# 队列持久化
     
    #RabbitMQ的消息永远不会被直接发送到队列中,它总是需要经过一次交换
    channel.basic_publish(exchange='',
                          routing_key="test",
                          body="hello word",
                          properties=pika.BasicProperties(delivery_mode=2,))  # 消息持久化
     
    print("Sent 'hello world'")
     
    connect.close()

     

     

    import pika
    import time
    # 声明socket实例
    connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    # 声明一个管道  虽然在之前的produce代码中声明过一次管道,
    # 但是在不知道produce中的管道是否运行之前(如果未运行,consumers中也不声明的话就会报错),
    # 在consumers中也声明一次是一种正确的做法
    channel = connect.channel()
     
    #声明queue
    channel.queue_declare(queue="test", durable=True)
     
     
    #回调函数
    def callback(ch, method, properites, body):
        time.sleep(30)
        print("-----", ch, method, properites, body)
        print("Received %r" % body)
        ch.basic_ack(delivery_tag=method.delivery_tag)  # 手动确认收到消息,添加手动确认时,no_ack必须为False,不然就会报错
     
    channel.basic_qos(prefetch_count=1# 在消息消费之前加上消息处理配置
     
    channel.basic_consume(callback,
                          queue="test",
                          no_ack=False)
     
    print("Waiting for messages")
    #这个start只要一启动,就一直运行,它不止收一条,而是永远收下去,没有消息就在这边卡住
    channel.start_consuming()
  • 相关阅读:
    2018年左其盛读过评过的书(持续更新中)
    2星|《用场景营销引爆你的生意》:总共4个推荐案例,3个已经失败
    2018左其盛经管新书差评榜(持续更新中)
    3星|《十大全球CEO亲授企业高速成长的关键战略》:作为CEO,我也非常坦率地表明过家庭优先于工作
    2018左其盛好书榜(持续更新中)
    3星|《你的品牌需要一个讲故事的人》:有理论没案例
    《思考快与慢》前传,两位天才犹太心理学家的传奇人生与学术故事:4星|《思维的发现》
    C#如何在派生类中不显示父类的一些属性以及TypeDescriptor使用
    在XML里的XSD和DTD以及standalone的使用
    数据库操作之简单带参操作
  • 原文地址:https://www.cnblogs.com/fengff/p/8862967.html
Copyright © 2011-2022 走看看