一、Rabbitmq
RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。RabbitMQ使用的是AMQP协议,它是一种二进制协议。默认启动端口 5672。在 RabbitMQ 中,如下图结构:
- 左侧 P 代表 生产者,也就是往 RabbitMQ 发消息的程序。生产者需要完成的任务:
-
1 创建RabbitMQ连接 2 获取信道 3 声明交换器 4 创建消息 5 发布消息 6 关闭信道 7 关闭RabbitMQ连接
- 中间即是 RabbitMQ,其中包括了 交换机 和 队列。
- 右侧 C 代表 消费者,也就是往 RabbitMQ 拿消息的程序。消费者需要完成的任务:
1 创建RabbitMQ连接 2 获取信道 3 声明交换器 4 声明队列 5 队列和交换器绑定 6 消费信息 7 关闭信道 8 关闭RabbitMQ连接
- Exchange: 接受客户端发送的消息,并根据Binding将消息路由给服务器中的队列,Exchange分为direct, fanout, topic三种。
- Binding: 连接Exchange和Queue,包含路由规则。
- Queue: 消息队列,存储还未被消费的消息。
- Message: Header+Body
- Channel: 通道,执行AMQP的命令;一个连接可创建多个通道以节省资源
1. dircted exchange
路由键exchange,该交换机收到消息后会把消息发送到指定routing-key的queue中。rabbitmq内部默认有一个特殊的dircted exchange,该交换机的name是空字符串,所有queue都默认binding 到该交换机上。所有binding到该交换机上的queue,routing-key都和queue的name一样。
生产者:
1 import pika 2 credentials = pika.PlainCredentials('admin', '123456') 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 '192.168.170.134',5672,'/',credentials)) 5 channel = connection.channel() 6 7 # 声明queue 8 channel.queue_declare(queue='hello') 9 10 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 11 channel.basic_publish(exchange='', 12 routing_key='hello', 13 body='Hello World!') 14 print(" [x] Sent 'Hello World!'") 15 connection.close()
消费者:
1 import pika 2 credentials = pika.PlainCredentials('admin', '123456') 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 '192.168.170.134',5672,'/',credentials)) 5 channel = connection.channel() 6 def callback(ch, method, properties, body): 7 print(" [x] Received %r" % body) 8 9 10 channel.basic_consume(callback, 11 queue='hello', 12 no_ack=True) 13 14 print(' [*] Waiting for messages. To exit press CTRL+C') 15 channel.start_consuming()
队列绑定关键字,发送者将数据关键字发送到消息Exchange,Exchange根据关键字判定应该将数据发送至指定队列。
生产者:
1 import pika,sys 2 3 credentials = pika.PlainCredentials('admin', '123456') 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 '192.168.170.134',5672,'/',credentials)) 6 channel = connection.channel() 7 channel.exchange_declare(exchange='direct_logs', 8 type='direct') 9 10 severity = sys.argv[1] if len(sys.argv) > 1 else 'info' 11 message = ' '.join(sys.argv[2:]) or 'Hello World!' 12 channel.basic_publish(exchange='direct_logs', 13 routing_key=severity, 14 body=message) 15 print(" [x] Sent %r:%r" % (severity, message)) 16 connection.close()
消费者:
1 import pika,sys 2 3 credentials = pika.PlainCredentials('admin', '123456') 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 '192.168.170.134',5672,'/',credentials)) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange='direct_logs', 9 type='direct') 10 11 result = channel.queue_declare(exclusive=True) 12 queue_name = result.method.queue 13 14 severities = sys.argv[1:] 15 if not severities: 16 sys.stderr.write("Usage: %s [info] [warning] [error] " % sys.argv[0]) 17 sys.exit(1) 18 19 for severity in severities: 20 channel.queue_bind(exchange='direct_logs', 21 queue=queue_name, 22 routing_key=severity) 23 24 print(' [*] Waiting for logs. To exit press CTRL+C') 25 26 27 def callback(ch, method, properties, body): 28 print(" [x] %r:%r" % (method.routing_key, body)) 29 30 31 channel.basic_consume(callback, 32 queue=queue_name, 33 no_ack=True) 34 35 channel.start_consuming()
运行结果:
2. fanout exchange
发布/订阅exchange ,发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,二发布者发布消息时,会将消息放置在所有相关队列中。任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。
生产者:
1 import pika,sys 2 3 credentials = pika.PlainCredentials('admin', '123456') 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 '192.168.170.134',5672,'/',credentials)) 6 channel = connection.channel() 7 # 声明queue 8 channel.exchange_declare(exchange='logs', 9 type='fanout') 10 11 message = ' '.join(sys.argv[1:]) or "info: Hello World!" 12 channel.basic_publish(exchange='logs', 13 routing_key='', 14 body=message) 15 print(" [x] Sent %r" % message) 16 connection.close()
消费者:
1 import pika 2 3 credentials = pika.PlainCredentials('admin', '123456') 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 '192.168.170.134',5672,'/',credentials)) 6 channel = connection.channel() 7 8 result = channel.queue_declare(exclusive=True) # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 9 queue_name = result.method.queue 10 11 channel.queue_bind(exchange='logs', 12 queue=queue_name) 13 14 print(' [*] Waiting for logs. To exit press CTRL+C') 15 16 17 def callback(ch, method, properties, body): 18 print(" [x] %r" % body) 19 20 21 channel.basic_consume(callback, 22 queue=queue_name, 23 no_ack=True) 24 25 channel.start_consuming()
3. topic exchange
在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入"路由值"和"关键字"进行匹配,匹配成功,则将数据发送到指定队列。
-
# :表示可以匹配0个或多个单词;
-
* :表示只能匹配一个单词。
生产者:
1 import pika,sys 2 3 credentials = pika.PlainCredentials('admin', '123456') 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 '192.168.170.134',5672,'/',credentials)) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange='topic_logs', 9 type='topic') 10 11 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' 12 message = ' '.join(sys.argv[2:]) or 'Hello World!' 13 channel.basic_publish(exchange='topic_logs', 14 routing_key=routing_key, 15 body=message) 16 print(" [x] Sent %r:%r" % (routing_key, message)) 17 connection.close()
消费者:
1 import pika,sys 2 3 credentials = pika.PlainCredentials('admin', '123456') 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 '192.168.170.134',5672,'/',credentials)) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange='topic_logs', 9 type='topic') 10 11 result = channel.queue_declare(exclusive=True) 12 queue_name = result.method.queue 13 14 binding_keys = sys.argv[1:] 15 if not binding_keys: 16 sys.stderr.write("Usage: %s [binding_key]... " % sys.argv[0]) 17 sys.exit(1) 18 19 for binding_key in binding_keys: 20 channel.queue_bind(exchange='topic_logs', 21 queue=queue_name, 22 routing_key=binding_key) 23 24 print(' [*] Waiting for logs. To exit press CTRL+C') 25 26 27 def callback(ch, method, properties, body): 28 print(" [x] %r:%r" % (method.routing_key, body)) 29 30 31 channel.basic_consume(callback, 32 queue=queue_name, 33 no_ack=True) 34 35 channel.start_consuming()
二、基于rabbitmq的RPC
基于rabbitmq的rpc实现流程:
(1)首先客户端发送一个reply_to和corrention_id的请求,发布到RPC队列中;
(2)服务器端处理这个请求,并把处理结果发布到一个回调Queue,此Queue的名称应当与reply_to的名称一致
(3)客户端从回调Queue中得到先前correlation_id设定的值的处理结果。如果碰到和先前不一样的corrention_id的值,将会忽略而不是抛出异常。
对于上面所提到的回调Queue中的消费处理使用的是BasicProperties类。
服务端:
1 import pika 2 3 cre_publiser = pika.PlainCredentials('admin', '123456') 4 conn_para = pika.ConnectionParameters('192.168.170.134',5672,'/',cre_publiser) 5 connection = pika.BlockingConnection(conn_para) 6 7 # 建立会话 8 channel = connection.channel() 9 10 # 声明RPC请求队列 11 channel.queue_declare(queue='rpc_queue') 12 13 # 数据处理方法 14 def fib(n): 15 if n == 0: 16 return 0 17 elif n == 1: 18 return 1 19 else: 20 return fib(n-1) + fib(n-2) 21 22 # 对RPC请求队列中的请求进行处理 23 def on_request(ch, method, props, body): 24 n = int(body) 25 26 print(" [.] fib(%s)" % n) 27 28 # 调用数据处理方法 29 response = fib(n) 30 31 # 将处理结果(响应)发送到回调队列 32 ch.basic_publish(exchange='', 33 routing_key=props.reply_to, 34 properties=pika.BasicProperties(correlation_id=props.correlation_id), 35 body=str(response)) 36 ch.basic_ack(delivery_tag = method.delivery_tag) 37 38 # 负载均衡,同一时刻发送给该服务器的请求不超过一个 39 channel.basic_qos(prefetch_count=1) 40 41 channel.basic_consume(on_request, 42 queue='rpc_queue') 43 44 print(" [x] Awaiting RPC requests") 45 channel.start_consuming()
客户端:
1 import pika 2 import uuid 3 class FibonacciRpcClient(object): 4 def __init__(self): 5 self.cre_publiser = pika.PlainCredentials('admin', '123456') 6 self.conn_para = pika.ConnectionParameters('192.168.170.134',5672,'/',self.cre_publiser) 7 self.connection = pika.BlockingConnection(self.conn_para) 8 9 self.channel = self.connection.channel() 10 11 result = self.channel.queue_declare(exclusive=True) 12 self.callback_queue = result.method.queue 13 14 self.channel.basic_consume(self.on_response, 15 no_ack=True, 16 queue=self.callback_queue) 17 18 def on_response(self, ch, method, props, body): 19 if self.corr_id == props.correlation_id: 20 self.response = body 21 22 def call(self, n): 23 self.response = None 24 self.corr_id = str(uuid.uuid4()) 25 self.channel.basic_publish(exchange='', 26 routing_key='rpc_queue', 27 properties=pika.BasicProperties( 28 reply_to=self.callback_queue, 29 correlation_id=self.corr_id, 30 ), 31 body=str(n)) 32 while self.response is None: 33 self.connection.process_data_events() 34 return int(self.response) 35 36 37 fibonacci_rpc = FibonacciRpcClient() 38 39 print(" [x] Requesting fib(6)") 40 response = fibonacci_rpc.call(6) 41 print(" [.] Got %r" % response)