生产者:fanout_publiser.py
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
#有的机器type不行,换成exchange_type就没问题了
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message
)
print("[x] Sent %r"%message)
connection.close()
消费者:fanout_consumer.py
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
#exclusive 不指定queue名字 ,rabbitmq会随机分一个,exclusive会在使用此queue的消费者
#断开后,自动将queue删除
result =channel.queue_declare(exclusive=True)
queue_name=result.method.queue#取到queue名字
print("random queuename",queue_name)
#绑定转发器,让发送端知道是哪个queue
channel.queue_bind(exchange='logs',
queue=queue_name)
print(' [*] Waiting for logs, To exit press CRTL +C')
def callback(ch,method,properties,body):
print("[x] %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)#不确认消息
channel.start_consuming()