python使用pika扩展库操作RabbitMQ的流程梳理。

  1. 客户端连接到消息队列服务器,打开一个channel。
  2. 客户端声明一个exchange,并设置相关属性。(可以省略,不过必须保证所指定的交换器存在)
  3. 客户端声明一个queue,并设置相关属性。(可以省略,不过必须保证所指定的队列存在)
  4. 客户端使用routing key,在exchange和queue之间建立好绑定关系。(可以省略,不过必须保证其绑定关系)

作为生产者
客户端投递消息到exchange。

作为消费者
客户端通过队列接收消息,处理后进行应答。

producerConn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
producerChannel = consumerConn.channel()
producerChannel.exchange_declare(exchange = 'exchangeName',
    type = 'topic',
    durable=True,
    auto_delete=False)
producerChannel.queue_declare(queue = 'queueName', durable=True)
producerChannel.queue_bind(exchange = 'exchangeName',
    queue = 'queueName',
    routing_key = 'key')
msg_props = pika.BasicProperties()
msg_props.content_type = "application/json"
producerChannel.basic_publish(exchange = 'exchangeName',
    routing_key = 'key',
    body = '{"name": "android"}',
    properties = msg_props)

  

作为消费者需要定义一个回调函数

def callback(ch, method, properties, body):
    mainLogger.info(body)
    ch.basic_ack(delivery_tag = method.delivery_tag)

consumerConn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
consumerChannel = consumerConn.channel()
consumerChannel.exchange_declare(exchange = 'exchangeName',
    type = 'topic',
    durable=True,
    auto_delete=False)
consumerChannel.queue_declare(queue = 'queueName', durable=True)
consumerChannel.queue_bind(exchange = 'exchangeName',
    queue = 'queueName',
    routing_key = 'key')
consumerChannel.basic_qos(prefetch_count=1)
consumerChannel.basic_consume(callback , queue = 'queueName')
consumerChannel.start_consuming()

  

官方文档:
http://www.rabbitmq.com/tutorials/tutorial-one-python.html