zoukankan      html  css  js  c++  java
  • Python-RabbitMQ(持久化)

    生产者:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    import pika
     
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()  # 声明一个管道,在管道里发消息
    # 声明queue
    channel.queue_declare(queue='hello', durable=True)  # 在管道里还得声明一个队列
    # durable只是把队列持久化,消息不持久化
     
    channel.basic_publish(exchange='',
                          routing_key='hello',  # 就是列队queue名字
                          body='Hello World' # 消息内容
                          properties=pika.BasicProperties(
                                delivery_mode=2,#消息持久化如果队列没有设置durable=True的话消息是没有办法持久化的
                          )
                          )
    print(" [x] Sent 'Hello World!'")
    connection.close()  # 不用关闭管道,关闭连接就行
     

     消费者:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    import pika
     
    # 建立到达RabbitMQ Server的connection
    # 此处RabbitMQ Server位于本机-localhost
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
     
    # 声明queue,确认要从中接收message的queue
    # queue_declare函数是幂等的,可运行多次,但只会创建一次
    # 若可以确信queue是已存在的,则此处可省略该声明,如producer已经生成了该queue
    # 但在producer和consumer中重复声明queue是一个好的习惯
    channel.queue_declare(queue='hello',durable=True)
     
    print(' [*] Waiting for messages. To exit press CTRL+C')
     
    channel.basic_qos(prefetch_count=1)#如果有一个消息,服务器就不发,没消息就发
    # 定义回调函数
    # 一旦从queue中接收到一个message回调函数将被调用
    # ch:channel
    # method:
    # properties:
    # body:message
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        ch.basic_ack(delivery_tag=method.delivery_tag)#执行完后确认,client执行完后给rabbitmq返回的一个标识,收到这个标识后rabbitmq认为这个消息处理完了,不会在重复发送给其他client继续执行
     
    # 从queue接收message的参数设置
    # 包括从哪个queue接收message,用于处理message的callback,是否要确认message
    # 默认情况下是要对消息进行确认的,以防止消息丢失。
    # 此处将no_ack明确指明为True,不对消息进行确认。
    channel.basic_consume(callback,
                          queue="hello",
                         #no_ack=True#不对消息确认
                           )
     
    # 开始循环从queue中接收message并使用callback进行处理
    channel.start_consuming()
  • 相关阅读:
    四套读写方案
    如何保证ArrayList线程安全
    异常总结<经典例题>
    java.移位运算符
    java反射机制
    面试题:return和finally执行
    Spring_通过注解配置 Bean(1)
    Spring_通过 FactoryBean 配置 Bean
    Spring_通过工厂方法配置 Bean
    Spring_管理 Bean 的生命周期
  • 原文地址:https://www.cnblogs.com/394510636-ff/p/9282164.html
Copyright © 2011-2022 走看看