zoukankan      html  css  js  c++  java
  • Spark程序进行单元测试-使用scala

    Spark 中进行一些单元测试技巧:
    最近刚写了一点Spark上的单元测试,大概整理了一些

    rdd测试

    spark程序一般从集群中读取数据然后通过rdd进行转换,这其中涉及到集群,每次修改bug,上传到集群再运行测试,代价还是挺大;所以尽可能先本地进行单元测试,以减少在集群上运行时错误,特别是map等各种tranforms动作的逻辑错误;以下示例用于测试本地返回rdd相关的方法(利用spark本地模式进行单元测试)
    Tips:

    //定义一个简单的wordcount
    object WordCount extends Serializable{ 
      def count(lines:RDD[String]): RDD[(String,Int)]={
        val rdd=lines.flatMap(line=>line.split("\s")).map(word=>(word,1)).reduceByKey(_ + _)
        rdd
      }
    }
    //引入scalatest建立一个单元测试类,混入特质BeforeAndAfter,在before和after中分别初始化sc和停止sc,
    //初始化SparkContext时只需将Master设置为local(local[N],N表示线程)即可,无需本地配置或搭建集群,
    
    class WordCountTests extends FlatSpec with BeforeAndAfter{
      val master="local" //sparkcontext的运行master 
      var sc:SparkContext=_
      it should("test success") in{
       //其中参数为rdd或者dataframe可以通过通过简单的手动构造即可
        val seq=Seq("the test test1","the test","the")
        val rdd=sc.parallelize(seq)
        val wordCounts=WordCount.count(rdd)
        wordCounts.map(p=>{
           p._1 match {
             case "the"=>
               assert(p._2==3)
             case "test"=>
               assert(p._2==2)
             case "test1"=>
               assert(p._2==1)
             case _=>
               None
           }
        }).foreach(_=>())
      }
      //这里before和after中分别进行sparkcontext的初始化和结束,如果是SQLContext也可以在这里面初始化
      before{
        val conf=new SparkConf()
          .setAppName("test").setMaster(master)
        sc=new SparkContext(conf)
      }
    
      after{
        if(sc!=null){
          sc.stop()
        }
      }
    }

    无返回值方法测试

    有时候一个方法起到一个调用流程的作用,最后可能是输出或者写入某个文件而没有返回值,一般单元测试可能是查看最后有没有输出文件,但是ide在本地可能不太好进行测试 
    例如:

    trait WriterHandle{
       def writer(records:Seq[GenericRecord]):Unit={
         val parquetWriter=...
         records.foreach(parquetWriter.writer(..)) 
       }
    }
    //一个类处混入这个特质,经过一定转换后将结果数据写入parquet中
    class ProcessHandle(objects:Iterator[T]) extends Serializable with WriterHandle{
      def process():Unit={
         val records:Seq[GenericRecord]=build(objects)={
            ...
         }
         //这里调用了特质writer中的writer方法,实际单元测试运行到这里可能写入的时候会出错,不能正常测试通过
         writer(records)
      }
    }
    class Writertests extends FlatSpec {
      it should("write success") in{
        val objects=Seq(object1,object2..).toIterator 
        //在new处理类,混入原先特质的一个子特质
        val process=new ProcessHandle(objects) with Writerhandletest 
      }
    } 
    //可以自定义一个trait继承自原先的特质,通过将原先的方法覆盖,然后在重写后的方法里面的根据传入值定义所需要断言即可
    trait Writerhandletest extends WriterHandle{
      override def writer(records:Seq[GenericRecord]):Unit={
         assert(records.length==N)
         assert(records(0).XX=="xxx")
       }
    }

    如有必要也可以测试下私有方法:

    理论上来说,私有方法都会被公有方法调用,调用公有方法也可以验证私有方法,不过如果公有方法不方便测试也可以对某个私有方法进行测试,就看是否有必要 
    可以测试如下:

    class  MyTest(s:String){
      //此公有方法可能不方便测试
      def ():Unit={
         ...
         doSth(s)
      }
      //这里私有方法,可能是逻辑关键所在,有必要测试
      private def doSth(s:String):String={
         ...
      }
    }

    编写单元测试

    //要混入PrivateMethodTester特质
    class MytestTests extends FlatSpec with PrivateMethodTester{
      it should("write success") in{
    
            //首先new一个要测试的类
        val myTest=new MyTest("string") 
           //其中通过PrivateMethod修饰,[]中为返回值, ('method)单引号后跟私有方法名 
        val dosth=PrivateMethod[String]('doSth)
           //通过invokePrivate 委托调用私有方法,注意参数要对,貌似传null的话会找不到对应的方法
        val str=myTest invokePrivate dosth("string")
           //最后断言相应的至即可
        asset(str=="string") 
      }
    } 
    
    
  • 相关阅读:
    gitlab搭建
    java数组
    安裝nextcloud
    Spring的定时任务@Scheduled(cron = "0 0 1 * * *")
    java内存结构(下)
    java内存结构(上)
    多线程的三个特性
    @RequestBody用法
    SwaggerAPI注解详解(转载)
    在jpanel中添加jbutton并自由设置按钮大小和位置
  • 原文地址:https://www.cnblogs.com/itboys/p/11107860.html
Copyright © 2011-2022 走看看