zoukankan      html  css  js  c++  java
  • 消息队列持久化

    https://blog.csdn.net/appleyuchi/article/details/79190113

    队列和消息是两个概念?

     假如消息队列test里面还有消息等待消费者(consumers)去接收,但是这个时候服务器端宕机了,这个时候消息是否还在?

     1、队列消息非持久化

     

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    import pika
     
    # 声明一个socket 实例
    connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    # 声明一个管道
    channel = connect.channel()
    # 声明queue名称为test
    channel.queue_declare(queue="test")
     
    #RabbitMQ的消息永远不会被直接发送到队列中,它总是需要经过一次交换
    channel.basic_publish(exchange='',
                          routing_key="test",
                          body="hello word")
     
    print("Sent 'hello world'")
     
    connect.close()

     

     

     

    channel.queue_declare(queue="test") 

     

     

     

    ①队列持久化很简单,只需要在服务端(produce)声明queue的时候添加一个参数:

    channel.queue_declare(queue='shuaigaogao', durable=True# durable=True 持久化

     

    channel.basic_publish(exchange="",
                          routing_key="shuaigaogao"#queue的名字
                          body="hello world",   #body是要发送的内容
                          properties=pika.BasicProperties(delivery_mode=2,) # make message persistent=>使消息持久化的特性
                          )

     

    1
    channel.queue_declare(queue='shuaigaogao', durable=True)

    小结:

    RabbitMQ在服务端没有声明队列和消息持久化时,队列和消息是存在内存中的,服务端宕机了,队列和消息也不会保留。

    • 服务端声明持久化,客户端想接受消息的话,必须也要声明queue时,也要声明持久化,不然的话,客户端执行会报错。

     以上两句是整篇文章的重中之重!!!

     

    RabbitMQ 消息公平分发 

    可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。

     

    channel.basic_qos(prefetch_count=1)

     

    服务端:

    import pika
     
    # 声明一个socket 实例
    connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    # 声明一个管道
    channel = connect.channel()
    # 声明queue名称为test
    channel.queue_declare(queue="test", durable=True# 队列持久化
     
    #RabbitMQ的消息永远不会被直接发送到队列中,它总是需要经过一次交换
    channel.basic_publish(exchange='',
                          routing_key="test",
                          body="hello word",
                          properties=pika.BasicProperties(delivery_mode=2,))  # 消息持久化
     
    print("Sent 'hello world'")
     
    connect.close()

     

     

    import pika
    import time
    # 声明socket实例
    connect = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    # 声明一个管道  虽然在之前的produce代码中声明过一次管道,
    # 但是在不知道produce中的管道是否运行之前(如果未运行,consumers中也不声明的话就会报错),
    # 在consumers中也声明一次是一种正确的做法
    channel = connect.channel()
     
    #声明queue
    channel.queue_declare(queue="test", durable=True)
     
     
    #回调函数
    def callback(ch, method, properites, body):
        time.sleep(30)
        print("-----", ch, method, properites, body)
        print("Received %r" % body)
        ch.basic_ack(delivery_tag=method.delivery_tag)  # 手动确认收到消息,添加手动确认时,no_ack必须为False,不然就会报错
     
    channel.basic_qos(prefetch_count=1# 在消息消费之前加上消息处理配置
     
    channel.basic_consume(callback,
                          queue="test",
                          no_ack=False)
     
    print("Waiting for messages")
    #这个start只要一启动,就一直运行,它不止收一条,而是永远收下去,没有消息就在这边卡住
    channel.start_consuming()
  • 相关阅读:
    Java 常提到的自然序(Natural Ordering)
    设计模式(三)行为模式
    设计模式(二)结构模式
    设计模式(一)建造者模式
    设计模式的概念以及面向对象设计原则
    Java源码 HashMap<K,V>
    mybatis注解使用
    spring整合mybatis
    数据库中的表批量映射为对象
    返回用户提交的图像工具类
  • 原文地址:https://www.cnblogs.com/fengff/p/8862967.html
Copyright © 2011-2022 走看看