# -*- coding: UTF-8 -*- #!/bin/env python3 # filename readFromKafkaStreamingGetLocation.py import IP from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils import datetime class KafkaMessageParse: def extractFromKafka(self,kafkainfo): if type(kafkainfo) is tuple and len(kafkainfo) == 2: return kafkainfo[1] def lineFromLines(self,lines): if lines is not None and len(lines) > 0: return lines.strip().split(" ") def messageFromLine(self,line): if line is not None and "message" in line.keys(): return line.get("message") def ip2location(self,ip): result = [] country = 'country' province = 'province' city = 'city' ipinfo = IP.find(ip.strip()) try: location = ipinfo.split(" ") if len(location) == 3: country = location[0] province = location[1] city = location[2] elif len(location) == 2: country = location[0] province = location[1] else: pass except Exception: pass result.append(ip) result.append(country) result.append(province) result.append(city) return result def vlistfromkv(self, strori, sep1, sep2): resultlist = [] fields = strori.split(sep1) for field in fields: kv = field.split(sep2) resultlist.append(kv[1]) return resultlist def extractFromMessage(self, message): if message is not None and len(message) > 1: if len(message.split("u0001")) == 8: resultlist = self.vlistfromkv(message, "x01", "x02") source = resultlist.pop() ip = resultlist.pop() resultlist.extend(self.ip2location(ip)) resultlist.append(source) result = "x01".join(resultlist) return result def tpprint(val, num=10000): """ Print the first num elements of each RDD generated in this DStream. @param num: the number of elements from the first will be printed. """ def takeAndPrint(time, rdd): taken = rdd.take(num + 1) print("########################") print("Time: %s" % time) print("########################") DATEFORMAT = '%Y%m%d' today = datetime.datetime.now().strftime(DATEFORMAT) myfile = open("/data/speech/speech." + today, "a") for record in taken[:num]: print(record) myfile.write(str(record)+" ") myfile.close() if len(taken) > num: print("...") print("") val.foreachRDD(takeAndPrint) if __name__ == '__main__': zkQuorum = 'datacollect-1:2181,datacollect-2:2181,datacollect-3:2181' topic = {'speech-1': 1, 'speech-2': 1, 'speech-3': 1, 'speech-4':1, 'speech-5':1} groupid = "rokid-speech-get-location" master = "local[*]" appName = "SparkStreamingRokid" timecell = 5 sc = SparkContext(master=master, appName=appName) ssc = StreamingContext(sc, timecell) # ssc.checkpoint("checkpoint_"+time.strftime("%Y-%m-%d", time.localtime(time.time()))) kvs = KafkaUtils.createStream(ssc, zkQuorum, groupid, topic) kmp = KafkaMessageParse() lines = kvs.map(lambda x: kmp.extractFromKafka(x)) lines1 = lines.flatMap(lambda x: kmp.lineFromLines(x)) valuedict = lines1.map(lambda x: eval(x)) message = valuedict.map(lambda x: kmp.messageFromLine(x)) rdd2 = message.map(lambda x: kmp.extractFromMessage(x)) # rdd2.pprint() tpprint(rdd2) # rdd2.fileprint(filepath="result.txt") # rdd2.foreachRDD().saveAsTextFiles("/home/admin/agent/spark/result.txt") # sc.parallelize(rdd2.cache()).saveAsTextFile("/home/admin/agent/spark/result", "txt") # rdd2.repartition(1).saveAsTextFiles("/home/admin/agent/spark/result.txt") ssc.start() ssc.awaitTermination()