rabbitmq_server_topic topic模式
1 #!/usr/bin/env python 2 #{data} {time} 3 #_*_coding:utf-8_*_ 4 5 import pika,sys,time 6 connection = pika.BlockingConnection(pika.ConnectionParameters( 7 'localhost')) 8 channel = connection.channel()#管道 9 10 11 12 #severity = sys.argv[1] if len(sys.argv) > 1 else 'info'#启动参数 默认无参数为 info 级别 13 routing_key= sys.argv[1] if len(sys.argv) > 1 else 'anorrymous.info'#启动参数 默认无参数为 info 级别 14 msg=''.join(sys.argv[2:]) or 'info:消息默认发送………'#启动参数 为空,发默认消息 15 for i in range(10): 16 time.sleep(1) 17 channel.basic_publish(exchange='direct_logs',#绑定频道 18 #routing_key=severity,#默认的消息队列级别 19 routing_key=routing_key,#默认的消息队列级别 20 body=msg+str(i), 21 #properties=pika.BasicProperties(delivery_mode=2)#持久化 广播不能使用 22 ) 23 #print(msg,severity) 24 print(msg,routing_key) 25 connection.close() 26 #channel.close()
topic_client: 可按级别来接收 广播
1 #!/usr/bin/env python 2 #{data} {time} 3 #_*_coding:utf-8_*_ 4 5 import pika,time,sys 6 7 connection = pika.BlockingConnection(pika.ConnectionParameters( 8 'localhost' )) 9 channel = connection.channel() 10 11 channel.exchange_declare(exchange='direct_logs',#定义一个接收的频道 12 type='topic') 13 14 reult=channel.queue_declare(exclusive=True)#随机生成唯一的队列名,会在消息接收后自动删除 15 queuename=reult.method.queue#队列名 自动生成 16 17 18 #severities = sys.argv[1:] 19 binding_key = sys.argv[1:] 20 #if not severities: 21 if not binding_key: 22 sys.stderr.write("Usage: %s [info] [warning] [error] " % sys.argv[0])#启动接收的消息级别 23 sys.exit(1) 24 25 #for severity in severities:#循环接收各级别的消息 26 for severity in binding_key:#循环接收各级别的消息 27 channel.queue_bind(exchange='direct_logs', 28 queue=queuename, 29 routing_key=severity) 30 31 def callback(ch, method, properties, body):#回调函数 32 print('接收消息中…………') 33 #time.sleep(5) 34 print(" [x] Received %r" % body.decode()) 35 ch.basic_ack(delivery_tag=method.delivery_tag) 36 37 38 channel.basic_qos(prefetch_count=1)#同时只处理一个消息 39 channel.basic_consume(callback,#接收到消息调用回调函数 callback 40 queue=queuename, 41 #no_ack=True 42 ) 43 44 print(' [*] 接收消息中. To exit press CTRL+C') 45 46 channel.start_consuming()#启动消息接收