rpc.call方法执行的流程:(下次看代码的时候可以根据这流程来看,注意:由于rpc服务器和客户端具有很松的耦合性,因此以上步骤并不是绝对的。)
1.rpc服务器定义和启动rpc服务
2.rpc服务器建立与rabbitmq服务器的连接
3.rpc服务器创建和激活主题消费者
4.rpc客户端向主交换机发送rpc请求
5.rpc服务器接收和处理rpc请求
6.rpc客户端创建和激活直接消费者,并等待rpc响应。
目前有许多工具包实现了与rabbitmq的交互,openstack中使用的是kombu,Kombu is a messaging library for Python.(Kombu是一个针对Python的消息传递库。)
RPC call代码学习
server.py
import service
srv = service.Service() #创建RPC服务
srv.start() 1 #启动RPC服务
while True:
srv.drain_events() #监听RPC请求,这个方法的功能是用来接收和处理PRC请求,调用BrokerConnection对象的drain_events方法,用的是
impl_kombu.py文件中的 self.connection = kombu.connection.BrokerConnection(**rabbit_params),但是help(kombu.connection)却找不到BrokerConnection
service.py
import rpc
import manager
import dispatcher
TOPIC = 'sendout_request'
class Service(object):
def __init__(self):
self.topic = TOPIC
self.manager = manager.Manager()
def start(self): 2
self.conn = rpc.create_connection() 3
rpc_dispatcher = dispatcher.RpcDispatcher(self.manager)
self.conn.create_consumer(self.topic, rpc_dispatcher)
self.conn.consume()
def drain_events(self):
self.conn.drain_events()
rpc.py
import impl_kombu
def create_connection():
return impl_kombu.Connection() 4
impl_kombu.py 这个文件实现了import kombu 接口,Kombu is a messaging library for Python.(Kombu是一个针对Python的消息传递库。)
class Connection(object):
def __init__(self):
self.consumers = []
self.connection = None
self.reconnect() 5
def reconnect(self): 6
sleep_time = conf.get('interval_start', 1) conf这里是字典,get的方法的作用是如果有interval_start这个值就获取出来,如果没有就用默认值1
stepping = conf.get('interval_stepping', 2)
interval_max = conf.get('interval_max', 30)
sleep_time -= stepping
while True:
try:
self._connect() 7
return
except Exception, e:
if 'timeout' not in str(e):
raise
sleep_time += stepping
sleep_time = min(sleep_time, interval_max)
print("AMQP Server is unreachable,"
"trying to connect %d seconds later " % sleep_time)
time.sleep(sleep_time)
def _connect(self): 8
hostname = rabbit_params.get('hostname')
port = rabbit_params.get('port')
if self.connection: #第一次调用这里是none所以会执行if外的语句
print("Reconnecting to AMQP Server on "
"%(hostname)s:%(port)d " % locals())
self.connection.release()
self.connection = None
self.connection = kombu.connection.BrokerConnection(**rabbit_params) 9 **的意思是传的参是一个字典,https://www.cnblogs.com/omg-hxy/p/9081177.html 这是kombu的接口,不过用help(kombu.connection)没有找到
self.consumer_num = itertools.count(1) #itertools.count(1) 这里会
count()
会创建一个无限的迭代器
self.connection.connect() #这里self.connection是BrokerConnection的对象,Establish connection to server immediately.
self.channel = self.connection.channel() #这里self.connection是channel()是BrokerConnection的对象,Create and return a new channel.
for consumer in self.consumers: #第一次self.consumers没有值, 所以一次返回到 7 ,5,4,3接着会执行3以后的代码,然后返回到1下面的while true
consumer.reconnect(self.channel)
server.py
while True:
srv.drain_events() 到这里最终调用了哪里,为什么就停止在这里了? Wait for a single event from the server.