zoukankan      html  css  js  c++  java
  • python使用消息队列RabbitMq(进阶)

    import pika
      
    connection = pika.BlockingConnection(pika.ConnectionParameters(
                   'localhost'))
    channel = connection.channel()
      
    #声明queue
    channel.queue_declare(queue='hello')
      
    # RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body='Hello World!')
    print(" [x] Sent 'Hello World!'")
    connection.close()
    发送
    __author__ = 'hardy'
    import pika
      
    connection = pika.BlockingConnection(pika.ConnectionParameters(
                   'localhost'))
    channel = connection.channel()
      
      
    #You may ask why we declare the queue again ‒ we have already declared it in our previous code.
    # We could avoid that if we were sure that the queue already exists. For example if send.py program
    #was run before. But we're not yet sure which program to run first. In such cases it's a good
    # practice to repeat declaring the queue in both programs.
    channel.queue_declare(queue='hello')
      
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
      
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=True)
      
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    接收

    消息队列的发送端流程

      1、连接

    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    

      2、声明queue

    channel.queue_declare(queue='hello')
    

      队列持久化

    channel.queue_declare(queue='hello', durable=True)

      

      3、发送消息

    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body='Hello World!')
    

      消息持久化(必须队列持久化)

    channel.basic_publish(exchange='',
                          routing_key="hello",
                          body=message,
                          properties=pika.BasicProperties(
                             delivery_mode = 2, # make message persistent
                          ))
    

      4、关闭

    connection.close()
    

    消息队列接收端流程

      1、连接

    connection = pika.BlockingConnection(pika.ConnectionParameters(
                   'localhost'))
    channel = connection.channel()
    

      2、声明queue

    channel.queue_declare(queue='hello')
    

      3、创建回调函数(处理数据)

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
    

      4、设置

    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=True)
    

      5、开始接收数据

    channel.start_consuming()
    

      6、确认消息被消费

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        time.sleep(body.count(b'.'))
        print(" [x] Done")
        ch.basic_ack(delivery_tag = method.delivery_tag)
    

      

    channel.basic_consume(callback,
                           queue='task_queue',
                           no_ack=True  #no_ack=True消息不需要确认,默认no_ack=false,消息需要确认
                           )
    

      

  • 相关阅读:
    Windows开启telnet服务 + 连接失败处理
    注册表比较工具
    wmic命令
    python netifaces模块
    【转】wireshark基本用法及过虑规则
    设置Intel网卡以抓取报文的vlan tag
    【转】 中兴OLT-C300常用命令
    Iris分类以及数组reshape想到的
    关于plot画图的原理
    Python的rand vs randn以及linspace
  • 原文地址:https://www.cnblogs.com/hardykay/p/10207953.html
Copyright © 2011-2022 走看看