zoukankan      html  css  js  c++  java
  • Python:Rocketmq消息队列使用

    rocketmq可以与kafka等一起使用,用于实时消息处理。

    安装rocketmq

    pip install rocketmq [-i https://pypi.tuna.tsinghua.edu.cn/simple]

    生产消息producer

    from rocketmq.client import Producer, Message
    import json
    
    producer = Producer('PID-test')
    producer.set_namesrv_addr('xxx.xxx.xxx.xxx:xxxxx')  #rocketmq队列接口地址(服务器ip:port)
    producer.start()
    
    msg_body = {"id":"001","name":"test_mq","message":"abcdefg"}
    ss = json.dumps(msg_body).encode('utf-8')
    
    msg = Message('topic_name')   #topic名称
    msg.set_keys('xxxxxx')
    msg.set_tags('xxxxxx')
    msg.set_body(ss)      #message body
    
    retmq = producer.send_sync(msg)
    print(retmq.status, retmq.msg_id, retmq.offset)
    producer.shutdown()

    其中:

    • 设置ip:port的位置:producer.set_namesrv_addr('xxx.xxx.xxx.xxx:xxxxx') 

    当只有单一服务器时,格式是上面这个;

    当有多个服务器地址(集群模式)时,可以使用:producer.set_namesrv_addr("xxx.xxx.xxx.xxx:xxxxx,xxx.xxx.xxx.xxx:xxxxx,xxx.xxx.xxx.xxx:xxxxx")

     不过以下这种方式本人测试不通过:producer.set_namesrv_addr(["xxx.xxx.xxx.xxx:xxxxx","xxx.xxx.xxx.xxx:xxxxx","xxx.xxx.xxx.xxx:xxxxx"])

    • 如果使用pandas数据,pandas数据可以直接转换

    some_df.to_json(orient='records').encode('utf-8'),然后放入body中发送。

    消费消息consumer

    可以使用 PushConsumer 和  PullConsumer,同样来自 rocketmq.client。

    # 使用PullConsumer时
    from rocketmq.client import PullConsumer
    consumer = PullConsumer('CID_test')
    consumer.set_namesrv_addr('xxx.xxx.xxx.xxx:xxxxx')
    consumer.start()
    
    for msg in consumer.pull('topic_name'):
        print(msg.id, msg.body)
    consumer.shutdown()
    
    # PushConsumer与此类似
    from rocketmq.client import PushConsumer

    :目前rocketmq库只支持linux和mac。

    #

    参考:

    https://www.oschina.net/p/rocketmq-python

    https://github.com/apache/rocketmq-client-python

  • 相关阅读:
    10.17 作业
    10.12 classmethod,staticmethod,反射,魔法方法
    10.11 组合,封装,多态
    10.11 作业
    day20 作业
    10.10 类的继承,继承关系,派生,新式类,经典类
    10.9 类,对象,查找顺序,对象绑定方法
    day 55小结
    day 54小结
    day 53小结
  • 原文地址:https://www.cnblogs.com/qi-yuan-008/p/14022378.html
Copyright © 2011-2022 走看看