zoukankan      html  css  js  c++  java
  • python+spark程序代码片段

    处理如此的字符串:
    time^B1493534543940^Aid^B02CD^Aasr^B叫爸爸^Anlp^B{"domain":"com.abc.system.chat","intent":"chat","slots":{"tts":"爸爸","asr":"叫爸爸"},"voice":"叫爸爸","confidence":1.0,"cloud":false,"posStart":0,"posEnd":0}^Adomain^Bcom.abc.chat^Aintent^Bchat
    
    python版spark代码如下
    
    from operator import add
    import time
    
    
    def getInfo(str, sep1, sep2):
        thedate = 'today'
        sn = 'default'
        if str is not None:
            fields = str.split(sep1)
            if len(fields) > 1:
                for field in fields:
                    if field is not None:
                        kv = field.split(sep2)
                        if len(kv) == 2:
                            if kv[0] == 'time':
                                timestamp = int(kv[1]) / 1000
                                time_local = time.localtime(timestamp)
                                thedate = time.strftime("%Y-%m-%d", time_local)
                            if kv[0] == 'id':
                                sn = kv[1]
        if thedate is not None and sn is not None:
            res = thedate + "|" + sn
        return res
    
    rdd1 = sc.textFile("/Users/zhangzhenghai/example.log")
    rdd2 = rdd1.map(lambda x: (getInfo(x,'u0001','u0002'),1))
    rdd3 = rdd2.reduceByKey(add)
    rdd4 = rdd3.map(lambda x: (x[1],x[0]))
    rdd5 = rdd4.sortByKey(False)
    rdd6 = rdd5.map(lambda x:(x[1],x[0]))
    rdd6.collect()

    以上仅供学习参考

  • 相关阅读:
    用icas下载文件报错
    jboss7.1.1相关error及解决办法
    Base-64编码介绍
    上传文件路径问题
    ZooKeeper安装(Windows)
    DBCP连接池配置参数说明
    Linux普通用户使用sudo权限启停apache服务
    线程池中的队列
    java线程池原理及实现方式
    https基础流程
  • 原文地址:https://www.cnblogs.com/zhzhang/p/6794377.html
Copyright © 2011-2022 走看看