设置关键字,交换机根据消费者传递的关键字判断是否与生产者的一致,一致则将数据传递给消费者
可以实现对消息分组
生产者:
# coding:utf8 # __author: Administrator # date: 2018/3/15 0015 # /usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost' )) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') key = 'hahaha' key = sys.argv[1] if len(sys.argv)>1 else 'info' message = "Hello World" channel.basic_publish(exchange='direct_logs', routing_key=key,#客户端必须与这个密匙相同才会允许从交换机中取走数据 body=message ) print("Send message") connection.close()
消费者:
# coding:utf8 # __author: Administrator # date: 2018/3/15 0015 # /usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost' )) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue key = 'hahaha' # key=sys.argv[1:] # if not key: # sys.stderr.write('Key: %s is error'%key) # sys.exit(1) channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=key)#当消费者和生产者两个的关键字一致,才会接收数据
#注意可以绑定多个关键字,都是以字符串形式 ,如果传递参数是列表形式key = sys.argv[1:],我们需要循环绑定,就是将所有输入的key都绑定到消费者上 print('Waiting for message') def callback(ch, method, properties, body): print('recv: %s'%ch) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()