RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message发送者以下简称P,相对应的“消费者”乃message接收者以下简称C,message通过queue由P到C,queue存在于RabbitMQ,可存储尽可能多的message,多个P可向同一queue发送message,多个C可从同一个queue接收message。
1 安装配置epel源 2 $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm 3 4 安装erlang 5 $ yum -y install erlang 6 7 安装RabbitMQ 8 $ yum -y install rabbitmq-server
安装rabbitmq API
1 pip install pika 2 or 3 easy_install pika 4 or 5 源码 6 7 https://pypi.python.org/pypi/pika
使用API操作RabbitMQ
基于Queue实现生产者消费者模型
1 #!/usr/bin/env python 2 import pika 3 4 # ######################### 生产者 ######################### 5 6 connection = pika.BlockingConnection(pika.ConnectionParameters( 7 host='localhost')) 8 channel = connection.channel() 9 10 channel.queue_declare(queue='hello') 11 12 channel.basic_publish(exchange='', 13 routing_key='hello', 14 body='Hello World!') 15 print(" [x] Sent 'Hello World!'") 16 connection.close()
1 #!/usr/bin/env python 2 import pika 3 4 # ########################## 消费者 ########################## 5 6 connection = pika.BlockingConnection(pika.ConnectionParameters( 7 host='localhost')) 8 channel = connection.channel() 9 10 channel.queue_declare(queue='hello') 11 12 def callback(ch, method, properties, body): 13 print(" [x] Received %r" % body) 14 15 channel.basic_consume(callback, 16 queue='hello', 17 no_ack=True) 18 19 print(' [*] Waiting for messages. To exit press CTRL+C') 20 channel.start_consuming()
1、acknowledgment 消息不丢失(订阅端消息不丢失)
no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。
1 #!/usr/bin/env python 2 # time: 3 # Auto:PANpan 4 # func: 5 import pika 6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器 7 channel=connection.channel()#创建频道,通过频道操作rabbitmq 8 channel.queue_declare(queue='hai')#创建一个MQ队列,名称为 hai,如果不创建,必须保证生产者已经创建,否则报错 9 10 def callback(ch,method,properties,body): 11 print("[x] Received %r" %body)#打印获得消息的内容 12 ch.basic_ack(delivery_tag=method.delivery_tag)#当no_ack=Fales的时候,使用该语句提醒服务端,消息执行完毕,服务端可以删除队列中的消息 13 14 channel.basic_consume(callback,queue='hai',no_ack=False) 15 #获取hai队列中的消息,获得消息后执行callback函数,no_ack参数默认值为False,表示客户端收到消息后是否等待确认消息, 16 # 如果为true,则表示获取数据后生产者将该消息删除,消费者如果丢失消息,无法找回,如果为false,则表示生产者等待消费者获取消息后等待生产者确认, 17 # 如果没有收到确认,则会重新将消息加入到队列中,消息不会丢失 18 19 print('[*]Waiting for messages to exit press CTRL+C') 20 channel.start_consuming()
1 #!/usr/bin/env python 2 # time: 3 # Auto:PANpan 4 # func: 5 import pika 6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器 7 channel=connection.channel()#创建频道,通过频道操作rabbitmq 8 channel.queue_declare(queue='hai')#创建一个MQ队列,名称为 hai 9 channel.basic_publish(exchange='',routing_key='hai',body='hello world') 10 #向hai队列中插入"hello world"消息。routing_key指定队列名称,body指定消息内容 11 print("[x] Sent 'hello world' ") 12 connection.close()
2、durable 消息不丢失(服务端消息不丢失)
1 # time: 2 # Auto:PANpan 3 # func: 4 import pika 5 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器 6 channel=connection.channel()#创建频道,通过频道操作rabbitmq 7 channel.queue_declare(queue='hai',durable=True)#创建一个MQ队列,名称为 hai,如果不创建,必须保证生产者已经创建,否则报错.durable=True是让消息持久化 8 9 def callback(ch,method,properties,body): 10 print("[x] Received %r" %body)#打印获得消息的内容 11 ch.basic_ack(delivery_tag=method.delivery_tag)#当no_ack=Fales的时候,使用该语句提醒服务端,消息执行完毕,服务端可以删除队列中的消息 12 13 channel.basic_consume(callback,queue='hai',no_ack=True) 14 #获取hai队列中的消息,获得消息后执行callback函数,no_ack参数默认值为False,表示客户端收到消息后是否等待确认消息, 15 # 如果为true,则表示获取数据后生产者将该消息删除,消费者如果丢失消息,无法找回,如果为false,则表示生产者等待消费者获取消息后等待生产者确认, 16 # 如果没有收到确认,则会重新将消息加入到队列中,消息不会丢失 17 18 print('[*]Waiting for messages to exit press CTRL+C') 19 channel.start_consuming()
1 #!/usr/bin/env python 2 # time: 3 # Auto:PANpan 4 # func: 5 import pika 6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器 7 channel=connection.channel()#创建频道,通过频道操作rabbitmq 8 channel.queue_declare(queue='hai',durable=True)#创建一个MQ队列,名称为 hai 9 channel.basic_publish(exchange='',routing_key='hai',body='hello world', 10 properties=pika.BasicProperties(delivery_mode=2,))#该语句作用为告诉rabbit服务器将消息持久化 11 #向hai队列中插入"hello world"消息。routing_key指定队列名称,body指定消息内容 12 print("[x] Sent 'hello world' ") 13 connection.close()
3、消息获取顺序
默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。
channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照索引排列
1 #!/usr/bin/env python 2 # time: 3 # Auto:PANpan 4 # func: 5 import pika 6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器 7 channel=connection.channel()#创建频道,通过频道操作rabbitmq 8 channel.queue_declare(queue='hai',durable=True)#创建一个MQ队列,名称为 hai,如果不创建,必须保证生产者已经创建,否则报错.durable=True是让消息持久化 9 10 def callback(ch,method,properties,body): 11 print("[x] Received %r" %body)#打印获得消息的内容 12 ch.basic_ack(delivery_tag=method.delivery_tag)#当no_ack=Fales的时候,使用该语句提醒服务端,消息执行完毕,服务端可以删除队列中的消息 13 channel.basic_qos(prefetch_count=1)#客户端按顺序去取,默认为奇偶数 14 channel.basic_consume(callback,queue='hai',no_ack=True) 15 #获取hai队列中的消息,获得消息后执行callback函数,no_ack参数默认值为False,表示客户端收到消息后是否等待确认消息, 16 # 如果为true,则表示获取数据后生产者将该消息删除,消费者如果丢失消息,无法找回,如果为false,则表示生产者等待消费者获取消息后等待生产者确认, 17 # 如果没有收到确认,则会重新将消息加入到队列中,消息不会丢失 18 19 print('[*]Waiting for messages to exit press CTRL+C') 20 channel.start_consuming()
1 #!/usr/bin/env python 2 # time: 3 # Auto:PANpan 4 # func: 5 import pika 6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器 7 channel=connection.channel()#创建频道,通过频道操作rabbitmq 8 channel.queue_declare(queue='hai',durable=True)#创建一个MQ队列,名称为 hai 9 channel.basic_publish(exchange='',routing_key='hai',body='hello world', 10 properties=pika.BasicProperties(delivery_mode=2,))#该语句作用为告诉rabbit服务器将消息持久化 11 #向hai队列中插入"hello world"消息。routing_key指定队列名称,body指定消息内容 12 print("[x] Sent 'hello world' ") 13 connection.close()
4、发布订阅
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。
exchange type = fanout
1 #!/usr/bin/env python 2 # time: 3 # Auto:PANpan 4 # func: 5 import pika 6 import sys 7 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接消息队列服务器 8 channel=connection.channel()#创建频道,通过频道对rabbitmq进行操作 9 10 channel.exchange_declare(exchange='logs',type='fanout')#创建exchange,名称为logs,若果该消息队列已经创建,可以省略 11 message=''.join(sys.argv[1:]) or "info: Hello Wrold" 12 13 channel.basic_publish(exchange='logs',routing_key='',body=message)#将消息添加今年队列 14 print('[x] sent %r'%message) 15 connection.close()
1 #!/usr/bin/env python 2 # time: 3 # Auto:PANpan 4 # func: 5 import pika 6 connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8')) #连接消息队列服务器 7 channel = connection.channel()#创建频道,通过频道对rabbitmq进行操作 8 9 channel.exchange_declare(exchange='logs',#创建exchange,名称为logs 10 type='fanout')#type='fanout'作用为凡是和exchange相关联的队列,在用户给exchange发消息时,所有关联队列都会受到消息 11 12 result=channel.queue_declare(exclusive=True)#不指定队列名,有系统随机创建 13 queue_name=result.method.queue 14 15 channel.queue_bind(exchange='logs',queue=queue_name)#将exchange和当前的消息队列做一个绑定 16 print(' [*] Waiting for logs. To exit press CTRL+C') 17 18 19 def callback(ch,method,properties,body): 20 print('[x] %r' %body) 21 22 channel.basic_consume(callback,queue=queue_name,no_ack=True)#在队列中获取消息 23 24 channel.start_consuming()
5、关键字发送
exchange type = direct
之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
1 #!/usr/bin/env python 2 # time: 3 # Auto:PANpan 4 # func: 5 import pika 6 import sys 7 8 connection = pika.BlockingConnection(pika.ConnectionParameters( 9 host='192.168.11.138')) 10 channel = connection.channel() 11 12 channel.exchange_declare(exchange='direct_logs', 13 type='direct') 14 15 #severity = sys.argv[1] if len(sys.argv) > 1 else 'info' 16 #message = ' '.join(sys.argv[2:]) or 'Hello World!' 17 severity='info' 18 message='test' 19 channel.basic_publish(exchange='direct_logs', 20 routing_key=severity, 21 body=message) 22 print(" [x] Sent %r:%r" % (severity, message)) 23 connection.close()
1 #!/usr/bin/env python 2 # time: 3 # Auto:PANpan 4 # func: 5 import pika 6 import sys 7 8 connection = pika.BlockingConnection(pika.ConnectionParameters( 9 host='10.0.0.8')) 10 channel = connection.channel() 11 12 channel.exchange_declare(exchange='direct_logs', 13 type='direct')#设置exchange类型为direct 14 15 result = channel.queue_declare(exclusive=True) #创建随机队列 16 queue_name = result.method.queue 17 18 # severities = sys.argv[1:] 19 # if not severities: 20 # sys.stderr.write("Usage: %s [info] [warning] [error] " % sys.argv[0]) 21 # sys.exit(1) 22 severities=['error'] 23 for severity in severities: 24 channel.queue_bind(exchange='direct_logs', 25 queue=queue_name, 26 routing_key=severity)#绑定关键字 27 28 print(' [*] Waiting for logs. To exit press CTRL+C') 29 30 def callback(ch, method, properties, body): 31 print(" [x] %r:%r" % (method.routing_key, body)) 32 33 channel.basic_consume(callback, 34 queue=queue_name, 35 no_ack=True) 36 37 channel.start_consuming()
1 #!/usr/bin/env python 2 # time: 3 # Auto:PANpan 4 # func: 5 #!/usr/bin/env python 6 # time: 7 # Auto:PANpan 8 # func: 9 import pika 10 import sys 11 12 connection = pika.BlockingConnection(pika.ConnectionParameters( 13 host='10.0.0.8')) 14 channel = connection.channel() 15 16 channel.exchange_declare(exchange='direct_logs', 17 type='direct') 18 19 result = channel.queue_declare( ) 20 #声明queue,确认要从中接收message的queue 21 #queue_declare函数是幂等的,可运行多次,但只会创建一次 22 #若可以确信queue是已存在的,则此处可省略该声明,如producer已经生成了该queue 23 #但在producer和consumer中重复声明queue是一个好的习惯 24 #例如: channel.queue_declare(queue='hello') 25 queue_name = result.method.queue 26 27 # severities = sys.argv[1:] 28 # if not severities: 29 # sys.stderr.write("Usage: %s [info] [warning] [error] " % sys.argv[0]) 30 # sys.exit(1) 31 severities=['error','info'] 32 for severity in severities: 33 channel.queue_bind(exchange='direct_logs', 34 queue=queue_name, 35 routing_key=severity) 36 37 print(' [*] Waiting for logs. To exit press CTRL+C') 38 39 def callback(ch, method, properties, body): 40 print(" [x] %r:%r" % (method.routing_key, body)) 41 42 channel.basic_consume(callback, 43 queue=queue_name, 44 no_ack=True) 45 46 channel.start_consuming()
6、模糊匹配
exchange type = topic
在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。
- # 表示可以匹配 0 个 或 多个 单词
- * 表示只能匹配 一个 单词
1 #!/usr/bin/env python 2 import pika 3 import sys 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host='localhost')) 7 channel = connection.channel() 8 9 channel.exchange_declare(exchange='topic_logs', 10 type='topic') 11 12 result = channel.queue_declare(exclusive=True) 13 queue_name = result.method.queue 14 15 binding_keys = sys.argv[1:] 16 if not binding_keys: 17 sys.stderr.write("Usage: %s [binding_key]... " % sys.argv[0]) 18 sys.exit(1) 19 20 for binding_key in binding_keys: 21 channel.queue_bind(exchange='topic_logs', 22 queue=queue_name, 23 routing_key=binding_key) 24 25 print(' [*] Waiting for logs. To exit press CTRL+C') 26 27 def callback(ch, method, properties, body): 28 print(" [x] %r:%r" % (method.routing_key, body)) 29 30 channel.basic_consume(callback, 31 queue=queue_name, 32 no_ack=True) 33 34 channel.start_consuming()
1 #!/usr/bin/env python 2 import pika 3 import sys 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host='localhost')) 7 channel = connection.channel() 8 9 channel.exchange_declare(exchange='topic_logs', 10 type='topic') 11 12 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' 13 message = ' '.join(sys.argv[2:]) or 'Hello World!' 14 channel.basic_publish(exchange='topic_logs', 15 routing_key=routing_key, 16 body=message) 17 print(" [x] Sent %r:%r" % (routing_key, message)) 18 connection.close()
注: