一、安装RabbitMQ
1. 安装erlang
1 2 3 4 | tar xf otp_src_18.3. tar .gz cd otp_src_18.3 . /configure --prefix= /mapbar/app/erlang make && make install |
2. 安装rabbitMQ
1 2 3 4 5 6 | tar xf rabbitmq-server-generic-unix-3.6.0. tar .xz mv rabbitmq_server-3.6.0 /mapbar/app/ ln -s /mapbar/app/rabbitmq_server-3 .6.0 /mapbar/app/rabbitmq 启动: cd /mapbar/app/rabbitmq/sbin/ . /rabbitmq-server -detached |
3.安装API
1 | pip install pika |
二、Python操作RabbitMQ
1,基本用法
生产者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | mport pika connection = pika.BlockingConnection(pika.ConnectionParameters(host = '192.168.10.131' )) #创建一个链接对象,对象中绑定rabbitmq的IP地址 channel = connection.channel() #创建一个频道 channel.queue_declare(queue = 'name1' ) #通过这个频道来创建队列,如果MQ中队列存在忽略,没有则创建 channel.basic_publish(exchange = '', routing_key = 'name1' , #指定队列名称 body = 'Hello World!' ) #往该队列中发送一个消息 print ( " [x] Sent 'Hello World!'" ) connection.close() #发送完关闭链接 |
消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host= '192.168.10.131' )) #创建一个链接对象,对象中绑定rabbitmq的IP地址 channel = connection.channel() #创建一个频道 channel.queue_declare(queue= 'name1' ) #通过这个频道来创建队列,如果MQ中队列存在忽略,没有则创建 def callback(ch, method, properties, body): #callback函数负责接收队列里的消息 print( " [x] Received %r" % body) channel.basic_consume(callback, #从队列里去消息 queue= 'name1' , #指定队列名 no_ack=True) print( ' [*] Waiting for messages. To exit press CTRL+C' ) channel.start_consuming() |
2,发布订阅
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,二发布者发布消息时,会将消息放置在所有相关队列中。
在RabbitMQ中,所有生产者提交的消息都有Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储,RabbitMQ提供了四种Exchange:fanout、direct、topic、header。由于header模式在实际工作中用的比较少。
发布者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host= 'localhost' )) channel = connection.channel() channel.exchange_declare(exchange= 'test_fanout' , type = 'fanout' ) message = '4456' channel.basic_publish(exchange= 'test_fanout' , routing_key= '' , body=message) print( ' [x] Sent %r' % message) connection.close() |
订阅者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host= 'localhost' )) channel = connection.channel() channel.exchange_declare(exchange= 'test_fanout' , #创建一个exchange type = 'fanout' ) #任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上 #随机创建队列 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue #绑定 channel.queue_bind(exchange= 'test_fanout' , queue=queue_name) #exchange绑定后端队列 print( '<------------->' ) def callback(ch,method,properties,body): print( ' [x] %r' % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming() |