zoukankan      html  css  js  c++  java
  • openstack 王者归来学习笔记

    rpc.call方法执行的流程:(下次看代码的时候可以根据这流程来看,注意:由于rpc服务器和客户端具有很松的耦合性,因此以上步骤并不是绝对的。)

    1.rpc服务器定义和启动rpc服务

    2.rpc服务器建立与rabbitmq服务器的连接

    3.rpc服务器创建和激活主题消费者

    4.rpc客户端向主交换机发送rpc请求

    5.rpc服务器接收和处理rpc请求

    6.rpc客户端创建和激活直接消费者,并等待rpc响应。

    目前有许多工具包实现了与rabbitmq的交互,openstack中使用的是kombu,Kombu is a messaging library for Python.(Kombu是一个针对Python的消息传递库。)

    RPC call代码学习

    server.py
    import service

    srv = service.Service() #创建RPC服务
    srv.start() 1 #启动RPC服务

    while True:
    srv.drain_events() #监听RPC请求,这个方法的功能是用来接收和处理PRC请求,调用BrokerConnection对象的drain_events方法,用的是
    impl_kombu.py文件中的 self.connection = kombu.connection.BrokerConnection(**rabbit_params),但是help(kombu.connection)却找不到BrokerConnection
    service.py
    import rpc
    import manager
    import dispatcher

    TOPIC = 'sendout_request'

    class Service(object):
    def __init__(self):
    self.topic = TOPIC
    self.manager = manager.Manager()

    def start(self): 2
    self.conn = rpc.create_connection() 3
    rpc_dispatcher = dispatcher.RpcDispatcher(self.manager)
    self.conn.create_consumer(self.topic, rpc_dispatcher)
    self.conn.consume()

    def drain_events(self):
    self.conn.drain_events()
    rpc.py
    import impl_kombu

    def create_connection():
    return impl_kombu.Connection() 4


    impl_kombu.py 这个文件实现了import kombu 接口,Kombu is a messaging library for Python.(Kombu是一个针对Python的消息传递库。)
    class Connection(object):

    def __init__(self):
    self.consumers = []
    self.connection = None
    self.reconnect() 5
    def reconnect(self):     6
    sleep_time = conf.get('interval_start', 1) conf这里是字典,get的方法的作用是如果有interval_start这个值就获取出来,如果没有就用默认值1
    stepping = conf.get('interval_stepping', 2)
    interval_max = conf.get('interval_max', 30)
    sleep_time -= stepping

    while True:
    try:
    self._connect() 7
    return
    except Exception, e:
    if 'timeout' not in str(e):
    raise

    sleep_time += stepping
    sleep_time = min(sleep_time, interval_max)
    print("AMQP Server is unreachable,"
    "trying to connect %d seconds later " % sleep_time)
    time.sleep(sleep_time)

    def _connect(self):   8
    hostname = rabbit_params.get('hostname')
    port = rabbit_params.get('port')

    if self.connection: #第一次调用这里是none所以会执行if外的语句
    print("Reconnecting to AMQP Server on "
    "%(hostname)s:%(port)d " % locals())
    self.connection.release()
    self.connection = None

    self.connection = kombu.connection.BrokerConnection(**rabbit_params) 9 **的意思是传的参是一个字典,https://www.cnblogs.com/omg-hxy/p/9081177.html 这是kombu的接口,不过用help(kombu.connection)没有找到
    self.consumer_num = itertools.count(1) # itertools.count(1) 这里会count()会创建一个无限的迭代器
        self.connection.connect()                   #这里self.connection是BrokerConnection的对象,Establish connection to server immediately.
    self.channel = self.connection.channel() #这里self.connection是channel()是BrokerConnection的对象,
    Create and return a new channel.

    for consumer in self.consumers: #第一次self.consumers没有值, 所以一次返回到 7 ,5,4,3接着会执行3以后的代码,然后返回到1下面的while true
    consumer.reconnect(self.channel)

    server.py

    while True:
        srv.drain_events()  到这里最终调用了哪里,为什么就停止在这里了?  Wait for a single event from the server.




    每天做好自己该做的事情,你就不会感到迷茫。
  • 相关阅读:
    android websocket推送
    proguardgui.bat来混淆已有的jar包
    android raw与assets区别
    Eclipse开发Android报错Jar mismatch! Fix your dependencies
    gc overhead limit exceeded
    如何签名apk,并让baidu地图正常显示
    Eclipse--Team--SVN--URL修改
    监听EditText
    android 注销
    从Android手机中取出已安装的app包,导出apk
  • 原文地址:https://www.cnblogs.com/sosogengdongni/p/10241283.html
Copyright © 2011-2022 走看看