1. 生产者
#coding:utf8 import pika import json import sys severity = sys.argv[1] if len(sys.argv) > 2 else 'info' message = ' '.join(sys.argv[2:]) or "hello word" connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') # 创建交换机 channel.basic_publish(exchange='direct_logs', routing_key= severity, body=message) print " [x] Sent 'Hello World!'" + message connection.close()
2. 订阅者
# coding:utf8 import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error] " % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()