zoukankan      html  css  js  c++  java
  • Python+SparkStreaming+kafka+写入本地文件案例(可执行)

    从kafka中读取指定的topic,根据中间内容的不同,写入不同的文件中。

    文件按照日期区分。

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 2018/4/9 11:49
    # @Author  : baoshan
    # @Site    : 
    # @File    : readTraceFromKafkaStreamingToJson.py
    # @Software: PyCharm Community Edition
    
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils
    import datetime
    import json
    import time
    from collections import defaultdict
    
    import subprocess
    
    
    class KafkaMessageParse:
    
        def extractFromKafka(self, kafkainfo):
            if type(kafkainfo) is tuple and len(kafkainfo) == 2:
                return kafkainfo[1]
    
        def lineFromLines(self, lines):
            if lines is not None and len(lines) > 0:
                return lines.strip().split("
    ")
    
        def messageFromLine(self, line):
            if line is not None and "message" in line.keys():
                return line.get("message")
    
        def extractFromMessage(self, message):
            try:
                jline = json.loads(message)
                result = defaultdict()
                name = jline.get("name")
    
                if "speech" in name:
                    trace_id = jline.get("trace_id")
                    parent_id = jline.get("parent_id")
                    span_id = jline.get("span_id")
                    sa = jline.get("sa")
                    sr = jline.get("sr")
                    ss = jline.get("ss")
                    ret = jline.get("ret")
    
                    result['trace_id'] = trace_id
                    result['parent_id'] = parent_id
                    result['span_id'] = span_id
                    result['name'] = name
                    result['sa'] = sa
                    result['sr'] = sr
                    result['ss'] = ss
                    result['ret'] = ret
    
                    annotation = jline.get("annotation")
                    try:
                        for anno in annotation:
                            if anno.get("name") == "nlp":
                                debug_log = anno.get("debug_log")
                                debug_log_anno = debug_log[0]
                                asr = debug_log_anno.get("asr")  # asr
                                nlp = debug_log_anno.get("nlp")
    
                                action = debug_log_anno.get("action")
                                jaction = json.loads(action)
                                response = jaction.get("response")
                                tts = response.get("action").get("directives")[0].get("item").get("tts")
                                result['tts'] = tts
    
                                jnlp = json.loads(nlp)
                                intent = jnlp.get('intent')
                                app_id = jnlp.get('appId')
                                cloud = jnlp.get("cloud")
                                slots = jnlp.get("slots")
    
                                result['app_id'] = app_id
                                result['intent'] = intent
                                result['cloud'] = cloud
                                result['asr'] = asr
                                result['nlp'] = nlp
                                result['slots'] = slots
                        debug_log = jline.get("debug_log")
                        debug_log0 = debug_log[0]
                        session_id = debug_log0.get("session_id")
                        codec = debug_log0.get("codec")
                        if not session_id:
                            session_id = ""  # 超级无敌重要
                        wavfile = session_id + ".wav"
                        codecfile = session_id + "." + codec
    
                        asrtimestr = session_id.split("-")[-1]
                        try:
                            st = time.localtime(float(asrtimestr))
                        except:
                            st = time.localtime()
                        asrtime = time.strftime("%Y-%m-%d %H:%M:%S", st)
                        asrthedate = time.strftime("%Y%m%d", st)
    
                        asrdeviceid = debug_log0.get("device_id")
                        asrdevicetype = debug_log0.get("device_type")
                        asrdevicekey = debug_log0.get("device_key")
    
                        result['session_id'] = session_id
                        result['device_id'] = asrdeviceid
                        result['device_key'] = asrdevicekey
                        result['device_type'] = asrdevicetype
                        result['thedate'] = asrtime
                        result['wavfile'] = wavfile
                        result['codecfile'] = codecfile
                        result['asrthedate'] = asrthedate
    
                        strmessage = json.dumps(result, ensure_ascii=False)
    
                        return strmessage
                    except:
                        return strmessage
    
                elif "tts" in name: # tts
                    try:
                        trace_id = jline.get("trace_id")
                        parent_id = jline.get("parent_id")
                        span_id = jline.get("span_id")
                        name = jline.get("name")
                        sa = jline.get("sa")
                        sr = jline.get("sr")
                        ss = jline.get("ss")
                        ret = jline.get("ret")
    
                        result['trace_id'] = trace_id
                        result['parent_id'] = parent_id
                        result['span_id'] = span_id
                        result['name'] = name
                        result['sa'] = sa
                        result['sr'] = sr
                        result['ss'] = ss
                        result['ret'] = ret
    
                        debug_log = jline.get("debug_log")
                        debug_log_tts = debug_log[0]
                        text = debug_log_tts.get("text")
                        codec = debug_log_tts.get("codec")
                        declaimer = debug_log_tts.get("declaimer")
                        logs = debug_log_tts.get("logs")
                        params = debug_log_tts.get("params")
    
                        result['text'] = text
                        result['codec'] = codec
                        result['declaimer'] = declaimer
                        result['logs'] = logs
                        result['params'] = params
    
                        strresult = json.dumps(result, ensure_ascii=False)
    
                        return strresult
                    except:
                        return None
            except:
                return ''
    
    def tpprint(val, num=10000):
        """
        Print the first num elements of each RDD generated in this DStream.
        @param num: the number of elements from the first will be printed.
        """
        def takeAndPrint(time, rdd):
            taken = rdd.take(num + 1)
            print("########################")
            print("Time: %s" % time)
            print("########################")
            DATEFORMAT = '%Y%m%d'
            today = datetime.datetime.now().strftime(DATEFORMAT)
    
            speechfile = open("/mnt/data/trace/trace.rt.speech." + today, "a")
            ttsfile = open("/mnt/data/trace/trace.rt.tts." + today, "a")
            otherfile = open("/mnt/data/trace/trace.rt.other." + today, "a")
    
            for record in taken[:num]:
                if record is not None and len(record) > 2: # None 不打印
                    print(record)
                    jrecord = json.loads(record)
                    name = jrecord.get("name")
                    if "speech" in name:
                        speechfile.write(str(record) + "
    ")
                    elif "tts" in name:
                        ttsfile.write(str(record) + "
    ")
                    else:
                        otherfile.write(str(record) + "
    ")
    
            speechfile.close()
            ttsfile.close()
            otherfile.close()
    
            if len(taken) > num:
                print("...")
    
        val.foreachRDD(takeAndPrint)
    
    
    if __name__ == '__main__':
        zkQuorum = 'datacollect-1:2181,datacollect-2:2181,datacollect-3:2181'
        topic = {'trace-open-gw-5': 1, 'trace-open-gw-6': 1, 'trace-open-gw-7': 1, 'trace-open-gw-8': 1, 'trace-open-gw-9': 1}
        groupid = "rokid-trace-rt"
        master = "local[*]"
        appName = "SparkStreamingRokidTrace"
        timecell = 5
    
        sc = SparkContext(master=master, appName=appName)
        ssc = StreamingContext(sc, timecell)
    
        kvs = KafkaUtils.createStream(ssc, zkQuorum, groupid, topic)
        kmp = KafkaMessageParse()
        lines = kvs.map(lambda x: kmp.extractFromKafka(x))
        lines1 = lines.flatMap(lambda x: kmp.lineFromLines(x))
        valuedict = lines1.map(lambda x: eval(x))
        message = valuedict.map(lambda x: kmp.messageFromLine(x))
        rdd2 = message.map(lambda x: kmp.extractFromMessage(x))  # result is a json str
    
        tpprint(rdd2)
    
        ssc.start()
        ssc.awaitTermination()

    还请各位大仙不吝赐教!

  • 相关阅读:
    How to configure security of ActiveMQ ?
    CentOS 搭建 nginx + tomcat
    25个 Git 进阶技巧
    写给Git初学者的7个建议
    my links
    Shell scripts to Create a local dir base on the time.
    81For全栈技术网
    一款可视化的在线制作H5
    在线制作h5
    在线制作h5——上帝的礼物
  • 原文地址:https://www.cnblogs.com/zhzhang/p/8762908.html
Copyright © 2011-2022 走看看