ubuntu安装rabbitmq-server和python安装pika:
sudo apt update
sudo apt install rabbitmq-server
sudo netstat -tulnp #查看 5672
ps -ef|grep rabbitmq-server
pip install pika
1、队列、消息持久化,接收端确认
#send.py
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) #队列持久化 message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print(" [x] Sent %r" % message) connection.close()
#receive.py
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) #消费确认 channel.basic_qos(prefetch_count=1) #每次接收一个 channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
2、发布订阅模式:fanout
#send.py
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
#receive.py
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') result = channel.queue_declare(exclusive=True) #由系统为用户生成独一无二的queue_name queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
3、 exchange:direct 组播
# emit_log_direct.py import pika import sys #establish connection connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs',exchange_type='direct') #转换器名:direct_logs severity = sys.argv[1] if len(sys.argv) > 1 else 'info' #列表中索引为1的值是严重程度,0是文件名 message = ''.join(sys.argv[2:]) or 'hello world' #消息 channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message, ) print('[x] sent %r:%r'%(severity,message)) connection.close()
#receive_log_direct.py import pika import sys #establish connection connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs',exchange_type='direct') result = channel.queue_declare(exclusive=True) #Exclusive (used by only one connection and the queue will be deleted when that connection closes) #为消费者随机取队列名,保证所有消费者的队列名不重复,另外连接断开就删除自己的队列 queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage:%s [info] [warning] [error] "% sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) #绑定到名为‘direct_logs’的转发器上,生产者的转发器给queue为queue_name,routing_key为severity的队列中推送消息 def callback(ch,method,properties,body): print(" [x] %r:%r" % (method.routing_key, body)) #取出的消息是:routing_key为severity
channel.basic_consume(callback,queue=queue_name,no_ack=True) print('[*] waiting for msg.to exit press ctrl+c') channel.start_consuming() #死循环
用法:命令行中
生产者:
python producer.py info this is info [x] sent 'info':'thisisinfo'
消费者1:接收error
python recieve.py error [*] waiting for msg.to exit press ctrl+c
消费者2:接收info error
python recieve.py info error [*] waiting for msg.to exit press ctrl+c [x] 'info':b'thisisinfo'
消费者3:接收info
python recieve.py info [*] waiting for msg.to exit press ctrl+c [x] 'info':b'thisisinfo'
4、exchange:topic
- * (star) can substitute for exactly one word.
- # (hash) can substitute for zero or more words.
# emit_log_topic.py import pika import sys #establish connection connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs',exchange_type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ''.join(sys.argv[2:]) or 'hello world' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message, ) print('[x] sent %r:%r'%(routing_key,message)) connection.close()
#receive_log_topic.py
import pika import sys #establish connection connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs',exchange_type='topic') result = channel.queue_declare(exclusive=True) #Exclusive (used by only one connection and the queue will be deleted when that connection closes) #为消费者随机取队列名,保证所有消费者的队列名不重复,另外连接断开就删除自己的队列 queue_name = result.method.queue bingding_keys = sys.argv[1:] if not bingding_keys: sys.stderr.write("Usage:%s [bingding_keys]... "% sys.argv[0]) sys.exit(1) for bingding_key in bingding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=bingding_key) def callback(ch,method,properties,body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback,queue=queue_name,no_ack=True) print('[*] waiting for msg.to exit press ctrl+c') channel.start_consuming() #死循环