zoukankan      html  css  js  c++  java
  • spark HelloWorld程序(scala版)

    使用本地模式,不需要安装spark,引入相关JAR包即可:

            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.2.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>2.2.0</version>
            </dependency> 

    创建spark:

            val sparkUrl = "local"
            val conf = new SparkConf()
                    //.setJars(Seq("/home/panteng/IdeaProjects/sparkscala/target/spark-scala.jar"))
                    .set("fs.hdfs.impl.disable.cache", "true")
                    .set("spark.executor.memory", "8g")
    
            val spark = SparkSession
                    .builder()
                    .appName("Spark SQL basic example")
                    .config(conf)
                    .config("spark.some.config.option", "some-value")
                    .master(sparkUrl)
                    .getOrCreate()    

    加载本地文件:

    val parquetFileDF = spark.read.parquet("/home/panteng/下载/000001_0")
                //spark.read.parquet("hdfs://10.38.164.80:9000/user/root/000001_0")

    文件操作:

    parquetFileDF.createOrReplaceTempView("parquetFile")
    
    val descDF = spark.sql("SELECT substring(description,0,3) as pre ,description FROM parquetFile LIMIT 100000")
    val diffDesc = descDF.distinct().sort("description")
    diffDesc.createOrReplaceTempView("pre_desc")
    val zhaoshang = spark.sql("select * from pre_desc")
    zhaoshang.printSchema()

    遍历处理:

    zhaoshang.foreach(row => clustering(row))
    val regexRdd = spark.sparkContext.parallelize(regexList)
    regexRdd.repartition(1).saveAsTextFile("/home/panteng/下载/temp6")
    
    spark.stop()

    附其他函数:

    def clustering(row: Row): String = {
            try {
                var tempRegex = new Regex("null")
                if (textPre.equals(row.getAs[String]("pre"))) {
                    textList = row.getAs[String]("description").replaceAll("\d","0") :: textList
                    return "continue"
                } else {
                    if (textList.size > 2) {
                        tempRegex = ScalaClient.getRegex(textList)
                        regexList = tempRegex :: regexList
                    }
                    if (row.getAs[String]("pre") != null && row.getAs[String]("description") != null) {
                        textPre = row.getAs[String]("pre")
                        textList = textList.dropRight(textList.size)
                        textList = row.getAs[String]("description") :: textList
                    }
                    return "ok - " + tempRegex.toString()
                }
            } catch {
                case e: Exception => println("kkkkkkk" + e)
            }
            return "error"
        }
    package scala.learn
    
    import top.letsgogo.rpc.ThriftProxy
    
    import scala.util.matching.Regex
    
    object ScalaClient {
        def main(args: Array[String]): Unit = {
            val client = ThriftProxy.client
            val seqList = List("您尾号9081的招行账户入账人民币689.00元",
                "您尾号1234的招行一卡通支出人民币11.00元",
                "您尾号2345的招行一卡通支出人民币110.00元",
                "您尾号5432的招行一卡通支出人民币200.00元",
                "您尾号5436的招行一卡通入账人民币142.00元")
            var words: List[String] = List()
            for (seq <- seqList) {
                val list = client.splitSentence(seq)
                for (wordIndex <- 0 until list.size()) {
                    words = list.get(wordIndex) :: words
                }
            }
            val wordlist = words.map(word => (word, 1))
            //方法一:先groupBy再map
            var genealWords: List[String] = List()
            wordlist.groupBy(_._1).map {
                case (word, list) => (word, list.size)
            }.foreach((row) => {
                (if (row._2 >= seqList.size) genealWords = row._1 :: genealWords)
            })
    
            val list = client.splitSentence("您尾号1234的招行一卡通支出人民币200.00元")
            val regexSeq: StringBuilder = new StringBuilder
            val specialChar = List("[", "]", "(", ")")
            for (wordIndex <- 0 until list.size()) {
                var word = list.get(wordIndex)
                if (genealWords.contains(word) && !("*".equals(word))) {
                    if (specialChar.contains(word.mkString(""))) {
                        word = "\" + word
                    }
                    regexSeq.append(word)
                } else {
                    regexSeq.append("(.*)")
                }
            }
            println(regexSeq)
            val regex = new Regex(regexSeq.mkString)
            for (seq <- seqList) {
                println(regex.findAllIn(seq).isEmpty)
            }
        }
    
        def getRegex(seqList: List[String]) = {
            val client = ThriftProxy.client
            var words: List[String] = List()
            for (seq <- seqList) {
                val list = client.splitSentence(seq)
                for (wordIndex <- 0 until list.size()) {
                    words = list.get(wordIndex) :: words
                }
            }
            val wordlist = words.map(word => (word, 1))
            //方法一:先groupBy再map
            var genealWords: List[String] = List()
            wordlist.groupBy(_._1).map {
                case (word, list) => (word, list.size)
            }.foreach((row) => {
                (if (row._2 >= seqList.size) genealWords = row._1 :: genealWords)
            })
    
            val list = client.splitSentence(seqList(0))
            val regexSeq: StringBuilder = new StringBuilder
            val specialChar = List("[", "]", "(", ")")
            for (wordIndex <- 0 until list.size()) {
                var word = list.get(wordIndex)
                if (genealWords.contains(word) && !("*".equals(word))) {
                    if (specialChar.contains(word.mkString(""))) {
                        word = "\" + word
                    }
                    regexSeq.append(word)
                } else {
                    if(regexSeq.size > 4) {
                        val endStr = regexSeq.substring(regexSeq.size - 4, regexSeq.size - 0)
                        if (!"(.*)".equals(endStr)) {
                            regexSeq.append("(.*)")
                        }
                    }else{
                        regexSeq.append("(.*)")
                    }
                }
            }
            println(regexSeq + "  " + seqList.size)
            val regex = new Regex(regexSeq.mkString.replaceAll("0+","\\d+"))
            //for (seq <- seqList) {
            //    println(regex.findAllIn(seq).isEmpty)
            //}
            regex
        }
    }
    批量数据提取正则

    输出目录覆盖:

    spark.hadoop.validateOutputSpecs false

    基于dataSet执行Map,必须定义encoder  否则编译异常!但是对于某些type DataTypes没有提供,只能转为rdd进行map,之后再由RDD 转dataframe

    val schema = StructType(Seq(
    StructField("pre", StringType),
    StructField("description", StringType)
    ))
    val encoder = RowEncoder(schema)
    val replaceRdd = diffDesc.map(row => myReplace(row))(encoder).sort("description")


    任务提交:
    ./spark-2.2.0-bin-hadoop2.7/bin/spark-submit --name panteng --num-executors 100 --executor-cores 4 ./spark-scala.jar spark://dommain:7077

    去除部分日志:
    //        Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    // Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    //        spark.sparkContext.setLogLevel("WARN")
     
    常用配置:

    spark-submit --java 8
    --cluster xxx --master yarn-cluster
    --class xx.xx.xx.xx.Xxx
    --queue default
    --conf spark.yarn.appMasterEnv.JAVA_HOME=/opt/soft/jdk1.8.0
    --conf spark.executorEnv.JAVA_HOME=/opt/soft/jdk1.8.0
    --conf spark.yarn.user.classpath.first=true
    --num-executors 128
    --conf spark.yarn.job.owners=panteng
    --conf spark.executor.memory=10G
    --conf spark.dynamicAllocation.enabled=true
    --conf spark.shuffle.service.enabled=true
    --conf spark.dynamicAllocation.minExecutors=2
    --conf spark.yarn.executor.memoryOverhead=4000
    --conf spark.yarn.driver.memoryOverhead=6000
    --conf spark.driver.memory=10G
    --conf spark.driver.maxResultSize=4G
    --conf spark.rpc.message.maxSize=512
    --driver-class-path hdfs://c3prc-hadoop/tmp/u_panteng/lda-lib/guava-14.0.1.jar
    xx-1.0-SNAPSHOT.jar parm1 parm2 

  • 相关阅读:
    setMasksToBounds
    CSRF跨站
    ORM: object relationship mapping
    orm查询
    图书管理系统(增删改)
    django图书管理半成品(MySQL)
    模板继承(练习测试)
    模板层(template)
    django命令(笔记,自己看的)
    django(注册→登录→主页)增强版
  • 原文地址:https://www.cnblogs.com/tengpan-cn/p/7497488.html
Copyright © 2011-2022 走看看