zoukankan      html  css  js  c++  java
  • spark定制之六:sql版start.scala

    上个版本号的start.scala用的是HiveContext。这个是SQLContext的,不需编译。

    # cat testperson.txt #字段用table键分隔

    zs 10 30.0
    li 12 32.0

    # spark-shell -i:start.scala

    scala> help

    依据提示逐步执行

    import org.apache.spark.sql.SchemaRDD  
      
    var FIELD_SEPERATOR = "	"  
    var RECORD_SEPERATOR = "
    "  
    var lastrdd : SchemaRDD = null  
      
    object MyFileUtil extends java.io.Serializable {  
        import org.apache.hadoop.fs.Path  
        import org.apache.hadoop.fs.FileSystem  
        import org.apache.hadoop.fs.FileStatus  
        import scala.collection.mutable.ListBuffer  
      
        def regularFile(filepath:String):String = {  
            if(filepath == "") {  
                filepath;  
            } else if(filepath.startsWith("hdfs:")) {  
                filepath  
            } else if(filepath.startsWith("file:")) {  
                filepath  
            } else if(filepath.startsWith("/")) {  
                "file://" + filepath  
            } else {  
                val workdir = System.getProperty("user.dir")  
                "file://" + workdir + "/" + filepath  
            }  
        }  
      
        var SAFEMINPATH_LENGTH : Int = 24  
      
        def getFileSystem(filepath:String) = {  
            if(filepath.startsWith("hdfs:")) {  
                FileSystem.get(new org.apache.hadoop.conf.Configuration());  
            } else if(filepath.startsWith("file:")) {  
                FileSystem.getLocal(new org.apache.hadoop.conf.Configuration());  
            } else {  
                throw new Exception("file path invalid")  
            }  
        }  
      
        def deletePath(filepath:String) = {  
            if(filepath.length < SAFEMINPATH_LENGTH)  
                throw new Exception("file path is to short")  
            var fs : FileSystem = getFileSystem(filepath)  
            if (fs.exists(new Path(filepath))) {  
                fs.delete(new Path(filepath), true);  
            }  
        }  
      
        def listFile(fs:FileSystem, path:Path, pathlist:ListBuffer[Path], statuslist:ListBuffer[FileStatus]=null) {  
            if ( fs.exists(path) ) {  
                val substatuslist =  fs.listStatus(path);  
                for(substatus <- substatuslist){  
                    if(statuslist != null)  
                        statuslist.append(substatus)  
                    if(substatus.isDir()){  
                        listFile(fs,substatus.getPath(),pathlist);  
                    }else{  
                        pathlist.append(substatus.getPath());  
                    }  
                }  
            }  
        }  
      
        def hasContext(filepath:String) = {  
            val realpath = regularFile(filepath)  
            val fs = getFileSystem(realpath)   
            val pathlist = ListBuffer[Path]()  
            val statuslist = ListBuffer[FileStatus]()  
            listFile(fs,new Path(filepath),pathlist,statuslist)  
            var length:Long = 0  
            for( status <- statuslist )  
                length += status.getLen()  
            length > 0  
        }  
    }  
      
    org.apache.spark.repl.Main.interp.command("""  
    class MySchemaRDD(rdd:org.apache.spark.sql.SchemaRDD) extends java.io.Serializable {  
      
        def go() = {  
            var startstr = ""  
            var endstr = RECORD_SEPERATOR  
            val result = rdd.collect  
            result.foreach( x =>  
                print(x.mkString(startstr,FIELD_SEPERATOR,endstr))  
              )  
        }  
      
        def result() = {  
            rdd.collect  
        }  
      
        def saveto(output: String) = {  
            import org.apache.hadoop.io.{NullWritable,Text}  
            var startstr = ""  
            var endstr = RECORD_SEPERATOR  
            if(output.startsWith("hdfs:")) {  
                val outputpath = MyFileUtil.regularFile(output)  
                MyFileUtil.deletePath(outputpath)  
                rdd.map(x =>   
                      (NullWritable.get(), new Text(x.mkString(FIELD_SEPERATOR)))  
                    ).saveAsHadoopFile[  
                      org.apache.hadoop.mapred.TextOutputFormat[NullWritable, Text]  
                    ](outputpath)  
            } else {  
                val outputpath = MyFileUtil.regularFile(output)  
                MyFileUtil.deletePath(outputpath)  
                val result = rdd.collect()  
                val writer = new java.io.FileWriter(output)  
                result.foreach(x =>   
                    writer.write(x.mkString(startstr,FIELD_SEPERATOR,endstr))  
                  )  
                writer.close()  
            }  
        }  
    }  
    object MySchemaRDD {  
        implicit def toMySchemaRDD(rdd:org.apache.spark.sql.SchemaRDD) = new MySchemaRDD(rdd)  
    }  
    """)  
      
    val ssc = new org.apache.spark.sql.SQLContext(sc)  
    import ssc._  
    import MySchemaRDD._  
    
    def getRegisterString(rddname:String,classname:String,tablename:String,tabledef:String) : String = {  
        val members = tabledef.trim.split(",").map(_.trim.split(" ").filter(""!=)).map(x => (x(0).trim,x(1).trim.head.toString.toUpperCase+x(1).trim.tail))  
        val classmemberdef = members.map(x => (x._1+":"+x._2)).mkString(",")  
        val convertstr = members.map(x => x._2).zipWithIndex.map(x => "t("+x._2+").to"+x._1).mkString(",")  
        return s"""  
            case class ${classname}(${classmemberdef})  
            val schemardd = ${rddname}.map(_.split("${FIELD_SEPERATOR}")).map(t=>${classname}(${convertstr}))  
            ssc.registerRDDAsTable(schemardd,"${tablename}")  
        """  
    }  
    
    org.apache.spark.repl.Main.interp.command("""  
    class MyCommandTranslator(cmd:String) extends java.io.Serializable {  
      
        def go()(implicit f: SchemaRDD => MySchemaRDD) = {  
            lastrdd = sql(cmd)  
            lastrdd.go()  
        }  
      
        def saveto(output: String)(implicit f: SchemaRDD => MySchemaRDD) = {  
            lastrdd = sql(cmd)  
            lastrdd.saveto(output)  
        }  
      
        def result()(implicit f: SchemaRDD => MySchemaRDD) = {  
            lastrdd = sql(cmd)  
            lastrdd.result()  
        }  
      
    //    def hqlgo()(implicit f: SchemaRDD => MySchemaRDD) = {  
    //        lastrdd = hql(cmd)  
    //        lastrdd.go()  
    //    }  
    //  
    //    def hqlsaveto(output: String)(implicit f: SchemaRDD => MySchemaRDD) = {  
    //        lastrdd = hql(cmd)  
    //        lastrdd.saveto(output)  
    //    }  
    //  
    //    def hqlresult()(implicit f: SchemaRDD => MySchemaRDD) = {  
    //        lastrdd = hql(cmd)  
    //        lastrdd.result()  
    //    }  
      
        def defineas(tabledef:String) = {  
            if( tabledef != "" ) {  
                org.apache.spark.repl.Main.interp.command(   
                    getRegisterString(cmd,cmd.toUpperCase,cmd,tabledef)  
                )  
            } else {  
                org.apache.spark.repl.Main.interp.command(  
                    "ssc.registerRDDAsTable(${cmd},"${cmd}")"  
                )  
            }  
        }  
      
        def from(filepath:String) {  
            if( cmd.trim.startsWith("create table ") ) {  
                val tablename = cmd.trim.substring(13).trim().split(" ")(0)  
                val leftstr = cmd.substring(13).trim().substring(tablename.length).trim()  
                val tabledef = leftstr.substring(1,leftstr.length-1).trim()  
                val realfile = MyFileUtil.regularFile(filepath)  
                org.apache.spark.repl.Main.interp.command(  
                    "val "+tablename+" = sc.textFile(""+realfile+"")"  
                )  
                new MyCommandTranslator(tablename).defineas(tabledef)  
            } else {  
                println("usage:")  
                println(""create table sometablename (field1 string,field2 int...)" from "somefile or hdfs:somepath"")  
            }  
        }  
      
        def isok() = {  
            if(cmd.contains(".") || cmd.contains("/")) {  
                MyFileUtil.hasContext(cmd)  
            } else {  
                val res = sql(s"select count(*) from ${cmd}").result()  
                val count = res(0).getLong(0)  
                count > 0  
            }  
        }  
    }  
    object MyCommandTranslator {  
        implicit def stringToTranslator(cmd:String) = new MyCommandTranslator(cmd)  
      
        def show(tabledata:Array[org.apache.spark.sql.Row]) = {  
            tabledata.foreach( x => println(x.mkString("	")))  
        }  
    }  
    """)  
      
    def to = MyCommandTranslator  
    import MyCommandTranslator._  
      
    val onetable = sql("select 1 as id")  
    ssc.registerRDDAsTable(onetable,"onetable")  
      
    def help = {  
        println("""example:  
            "create table testperson (name string,age int,weight double)" from "testperson.txt"  
            "select * from testperson" go  
            "select * from testperson" saveto "somelocalfile.txt"  
            "select * from testperson" saveto "hdfs:/basedir/parentdir/testperson"  
            "testperson" isok  
            "somelocalfile.txt" isok  
            "hdfs:/basedir/parentdir/testperson" isok  
            val data = "select * from testperson" result  
            to show data  
            val somerdddata = sc.textFile("hdfs:/basedir/parentdir/testperson")<span style="font-family: Arial, Helvetica, sans-serif;"> </span>
            "somerdddata" defineas "name string,age int,weight double"  
            "select * from somerdddata" go  
            if you want to see the help of enveronment, please type :help  
            """)  
    }
    


  • 相关阅读:
    币圈惊现门罗币挖矿新家族「罗生门」
    5步告诉你QQ音乐的完美音质是怎么来的,播放器的秘密都在这里
    【云+社区极客说】新一代大数据技术:构建PB级云端数仓实践
    Android P的APP适配总结,让你快人一步
    C++11用于计算函数对象返回类型的统一方法
    C++11用于元编程的类别属性
    C++11多态函数对象包装器
    C++11包装引用
    C++11能用智能指针
    C++正则表达式
  • 原文地址:https://www.cnblogs.com/mqxnongmin/p/10921305.html
Copyright © 2011-2022 走看看