zoukankan      html  css  js  c++  java
  • oslo_messaging中kombu实现

    分析kombu

    oslo_messaging是对kombu的封装,kombu是对amqp的封装。这篇文章对oslo_messaging的分析停留在kombu层面。

    amqp中有几个概念,connections建立rabbitmq连接,channel一次连接会话,Exchange交换消息,消息通过channel发送到Exchange,由于Exchange绑定Queue和routing_key。消息会被转发到Queue中匹配routing_key的Queue中。发送消息的对象,称为生产者Producer。在Queue一侧的消费者Consumer,存在对Queue进行监听,一旦Queue中存在数据,则调用callback方法处理消息。

    在olso_messaging中对kombu的调用,在server端,模拟如下。1.建立连接2.创建Exchange 3.创建Queue,并将Exchange与Queue绑定,Queue的名称为routing_key 4.创建Consumer对Queue监听:

    from kombu.entity import Exchange, Queue
    from kombu.messaging import Consumer
    from kombu.connection import Connection
    
    
    def process_media(body, message):
        print body
        message.ack()
    
    connection = Connection('amqp://guest:guest@localhost:5672//')
    channel = connection.channel()
    
    media_exchange = Exchange('caesar', 'topic', channel)
    routing_key='caesar'
    video_queue = Queue(routing_key, exchange=media_exchange, routing_key=routing_key, channel=channel)
    
    
    consumer =  Consumer(channel, queues=[video_queue], callbacks=[process_media])
    consumer.consume()
    
    while True:
        connection.drain_events()
    
    consumer.cancel()
    

        

    在clinet端,通过Producer对Exchange发送数据,数据自动会被消费

    from kombu import Connection,Exchange
    
    exchange=Exchange('caesar','topic',durable=True)
    with Connection('amqp://guest:guest@localhost//') as conn:
        # produce
        chann = conn.channel()
        exchange(chann).declare()
        producer = conn.Producer(chann, serializer='json')
        producer.publish({'name': 'caesar'}, exchange=exchange, routing_key='caesar')
    

    分析oslo_messaging

     客户端                                                                                                                                                          

    服务端

     

    1. kombu = oslo_messaging._drivers.impl_rabbit:RabbitDriver

    在ampqdriver.py的AMQPDriverBase类中使用kombu,定义send,listen,send_notification,listen_for_notification,用于消息的发送与监听。以send_notification为例,调用_send方法,在_send中又调用notify_send,创建exchange,创建默认Queue(使用routing_key,用于没有消费的情况下,如果将来使用者将consumer绑定到这个queue上,可以获取数据)并绑定exchange和channel,创建Producer,并向channel发送数据。

    其中ensure_publishing 在发送失败时,进行retry次重试,直到成功为止。

    在listen_for_notification中建立连接后,创建topic.priority的queue。PollStyleListenerAdapter启动一个线程对获取到的数据进行处理,此处返回此类

     

    以上方法又封装在oslo_messaging的Transport中,而Target中封装topic和exchange等信息

    2.  messaging = oslo_messaging.notify.messaging:MessagingDriver

    在MessagingDriver中调用_send_notification发送消息

    3.Notifier和server

    在Notifier.py中调用messaging中的notifer发送不同级别的消息

     

     在server.py中MessageHandingServer中继承service,ServiceBase启动一个进程服务,定义的start,stop,wait方法作用 PollStyleListenerAdapter,用于获取Queue中的数据。定义_process_incoming。在listener.py中NotificationServer继承MessageHandingServer,实现_process_incoming,将获取的数据进行分发

    在分发的时候,根据endpoint类中的方法级别和filter_rule(endpoint为普通类),根据消息中的ctxt,publisher_id,event_type等,如果与filter_rule匹配,则将调用此endpoint中的级别方法。可参考https://www.cnblogs.com/CaesarLinsa/p/8591847.html。

     在get_notification_listener 返回数据处理server,其中存在dispatcher,用于分发到指定的endpoint的级别方法

  • 相关阅读:
    多元高斯分布(斯坦福machine learning week 9)
    异常检测(斯坦福machine learning week 9)
    Python编码透析
    nlp Python库之pynlpir
    降维(斯坦福machine learning week 8)
    主成分分析PCA之协方差矩阵的理解
    聚类(斯坦福machine learning week 8)
    svm之使用SVM(斯坦福machine learning week 7)
    java泛型总结
    Java之IO流学习总结
  • 原文地址:https://www.cnblogs.com/CaesarLinsa/p/10926348.html
Copyright © 2011-2022 走看看