zoukankan      html  css  js  c++  java
  • mk kafka

    # coding:utf-8
    import json
    from pykafka import KafkaClient


    def readkafka():
    with open('oms.json', 'r') as f:
    msg = f.read()
    # msg_d=eval(msg)
    d = json.loads(msg)
    d["waybillNo"] = waybill
    return d

    def sendkafka():
    client = KafkaClient(hosts="10.202.xx.x:xxx")

    topic = client.topics[''] # xxx kafka
    producer = topic.get_producer()
    producer.start()

    with open('./requestfile/IBSE_package.json', 'r') as f:
    msg = f.read()
    msg_d = eval(msg)
    msg_f = json.dumps(msg_d, ensure_ascii=False, encoding='utf-8')
    producer.produce(msg_f)
    producer.stop()



    client = KafkaClient(hosts="10.202.xx.x:xxx")
    topic = client.topics[''] # xxx kafka
    producer = topic.get_producer()
    producer.start()
    with open('fckafka.json', 'r') as f:
    msg = f.read()
    msg_d = eval(msg)
    msg_f = json.dumps(msg_d, ensure_ascii=False, encoding='utf-8')
    producer.produce(msg_f)
    producer.stop()


    client = KafkaClient(hosts="10.202.xx.x:9095")
    topic = client.topics[''] # xxx kafka
    producer = topic.get_producer()
    producer.start()
    with open('oms1.json', 'w') as f1:
    f1.write(json.dumps(readkafka(), encoding="utf-8"))
    f1.close()
    with open('oms1.json', 'r') as f11:
    msg_d = f11.read()
    msg_f = json.dumps(msg_d, ensure_ascii=False, )
    producer.produce(msg_f)
    producer.stop()
  • 相关阅读:
    关于虚拟机链接本地磁盘文件的问题
    javaScript学习笔记
    html学习笔记
    eclipse svn插件安装
    python学习笔记一

    hive数据处理
    WordCount实验
    暑假第六周总结
    暑假第五周总结
  • 原文地址:https://www.cnblogs.com/yaohu/p/12597010.html
Copyright © 2011-2022 走看看