zoukankan      html  css  js  c++  java
  • openstack 王者归来学习笔记

    rpc.call方法执行的流程:(下次看代码的时候可以根据这流程来看,注意:由于rpc服务器和客户端具有很松的耦合性,因此以上步骤并不是绝对的。)

    1.rpc服务器定义和启动rpc服务

    2.rpc服务器建立与rabbitmq服务器的连接

    3.rpc服务器创建和激活主题消费者

    4.rpc客户端向主交换机发送rpc请求

    5.rpc服务器接收和处理rpc请求

    6.rpc客户端创建和激活直接消费者,并等待rpc响应。

    目前有许多工具包实现了与rabbitmq的交互,openstack中使用的是kombu,Kombu is a messaging library for Python.(Kombu是一个针对Python的消息传递库。)

    RPC call代码学习

    server.py
    import service

    srv = service.Service() #创建RPC服务
    srv.start() 1 #启动RPC服务

    while True:
    srv.drain_events() #监听RPC请求,这个方法的功能是用来接收和处理PRC请求,调用BrokerConnection对象的drain_events方法,用的是
    impl_kombu.py文件中的 self.connection = kombu.connection.BrokerConnection(**rabbit_params),但是help(kombu.connection)却找不到BrokerConnection
    service.py
    import rpc
    import manager
    import dispatcher

    TOPIC = 'sendout_request'

    class Service(object):
    def __init__(self):
    self.topic = TOPIC
    self.manager = manager.Manager()

    def start(self): 2
    self.conn = rpc.create_connection() 3
    rpc_dispatcher = dispatcher.RpcDispatcher(self.manager)
    self.conn.create_consumer(self.topic, rpc_dispatcher)
    self.conn.consume()

    def drain_events(self):
    self.conn.drain_events()
    rpc.py
    import impl_kombu

    def create_connection():
    return impl_kombu.Connection() 4


    impl_kombu.py 这个文件实现了import kombu 接口,Kombu is a messaging library for Python.(Kombu是一个针对Python的消息传递库。)
    class Connection(object):

    def __init__(self):
    self.consumers = []
    self.connection = None
    self.reconnect() 5
    def reconnect(self):     6
    sleep_time = conf.get('interval_start', 1) conf这里是字典,get的方法的作用是如果有interval_start这个值就获取出来,如果没有就用默认值1
    stepping = conf.get('interval_stepping', 2)
    interval_max = conf.get('interval_max', 30)
    sleep_time -= stepping

    while True:
    try:
    self._connect() 7
    return
    except Exception, e:
    if 'timeout' not in str(e):
    raise

    sleep_time += stepping
    sleep_time = min(sleep_time, interval_max)
    print("AMQP Server is unreachable,"
    "trying to connect %d seconds later " % sleep_time)
    time.sleep(sleep_time)

    def _connect(self):   8
    hostname = rabbit_params.get('hostname')
    port = rabbit_params.get('port')

    if self.connection: #第一次调用这里是none所以会执行if外的语句
    print("Reconnecting to AMQP Server on "
    "%(hostname)s:%(port)d " % locals())
    self.connection.release()
    self.connection = None

    self.connection = kombu.connection.BrokerConnection(**rabbit_params) 9 **的意思是传的参是一个字典,https://www.cnblogs.com/omg-hxy/p/9081177.html 这是kombu的接口,不过用help(kombu.connection)没有找到
    self.consumer_num = itertools.count(1) # itertools.count(1) 这里会count()会创建一个无限的迭代器
        self.connection.connect()                   #这里self.connection是BrokerConnection的对象,Establish connection to server immediately.
    self.channel = self.connection.channel() #这里self.connection是channel()是BrokerConnection的对象,
    Create and return a new channel.

    for consumer in self.consumers: #第一次self.consumers没有值, 所以一次返回到 7 ,5,4,3接着会执行3以后的代码,然后返回到1下面的while true
    consumer.reconnect(self.channel)

    server.py

    while True:
        srv.drain_events()  到这里最终调用了哪里,为什么就停止在这里了?  Wait for a single event from the server.




    每天做好自己该做的事情,你就不会感到迷茫。
  • 相关阅读:
    node
    github
    [模块] pdf转图片-pdf2image
    python 15 自定义模块 随机数 时间模块
    python 14 装饰器
    python 13 内置函数II 匿名函数 闭包
    python 12 生成器 列表推导式 内置函数I
    python 11 函数名 迭代器
    python 10 形参角度 名称空间 加载顺序
    python 09 函数参数初识
  • 原文地址:https://www.cnblogs.com/sosogengdongni/p/10241283.html
Copyright © 2011-2022 走看看