zoukankan      html  css  js  c++  java
  • spark发现新词

    1. package com.icklick.spark.wordSegment  
    2. import org.apache.log4j.{ Level, Logger }  
    3. import org.apache.spark.{ SparkConf, SparkContext }  
    4. import  com.iclick.spark.wordSegment.util.CounterMap  
    5. import  scala.collection.mutable.ArrayBuffer  
    6. import com.google.common.collect.Maps  
    7. import  java.text.SimpleDateFormat  
    8. import scala.collection.JavaConversions._  
    9. import scala.collection.JavaConverters._  
    10. import scala.collection.mutable.Map  
    11. import  com.iclick.spark.wordSegment.util.AtomsUitl  
    12. import  org.apache.spark.sql.SQLContext  
    13. import org.apache.spark.sql.functions._  
    14. import  org.apache.spark.sql.SaveMode  
    15. import com.iclick.spark.wordSegment.util.ConterHashSet  
    16. import org.apache.commons.lang.StringUtils  
    17. import com.mysql.jdbc.Driver  
    18.   
    19. ///tmp/yuming/webtable/ds=16-04-17   hadoop数据目录  
    20. object WordSegment{   
    21.   def main(args: Array[String]): Unit = {   
    22.       //关闭一些不必要的日志  
    23.     Logger.getLogger("org.apache.spark").setLevel(Level.WARN)  
    24.     Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)  
    25.       
    26.       
    27.     //master  
    28.     if (args.length < 5) {  
    29.           System.err.println("Usage: path ,maxLen ,pmi, info,shuffle_count")  
    30.           System.exit(1)  
    31.         }  
    32.       
    33.           val path=args(0).toString  
    34.           val maxLen=args(1).toInt  
    35.           val pmi=args(2).toDouble  
    36.           val  info=args(3).toDouble  
    37.           val  shuffle_count=args(4).toInt  
    38.           val save_path_result=if(args.length>=6){ args(5).toString} else "/tmp/wilson/"  
    39.        
    40.       
    41.          val conf = new SparkConf().set("spark.driver.maxResultSize","10g").  
    42.          set("spark.sql.shuffle.partitions",s"${shuffle_count}").set("spark.network.timeout","850s").  
    43.          set("spark.shuffle.compress","true").set("spark.shuffle.spill.compress","true").set("spark.shuffle.manager","sort")  
    44.         if (System.getProperty("local") != null) {  
    45.           conf.setMaster("local").setAppName("wordSegname")  
    46.         }  
    47.         val sc = new SparkContext(conf)    
    48.         val  sqlContext=new  SQLContext(sc)  
    49.        
    50.       
    51.     //local  
    52.    /* val conf = new SparkConf().setAppName("wordSegname").setMaster("local[4]").  
    53.     set("spark.sql.shuffle.partitions","10").set("spark.network.timeout","30s")  
    54.     .set("spark.shuffle.compress","true").set("spark.shuffle.spill.compress","true")  
    55.     .set("spark.shuffle.manager","sort")  
    56.     val sc = new SparkContext(conf)    
    57.     val  sqlContext=new SQLContext(sc)  
    58.     val path="D:\wilson.zhou\Downloads\西游记.txt"  
    59.     val maxLen=6  
    60.     val path1="D:\temp\text.txt"  
    61.     val pmi=0  
    62.     val  info=0  
    63.     val save_path_result="/tmp/wilson/"*/  
    64.       
    65.       
    66.       
    67.     
    68. //    val word=scala.io.Source.fromFile("D:\wilson.zhou\Downloads\红楼梦.txt").getLines().mkString("")  
    69.       
    70.      val sdf = new java.text.SimpleDateFormat("yyyy-MM-dd:HH:mm:ss")  
    71.     var start=sdf.format(System.currentTimeMillis())  
    72.     val word1=sc.textFile(path).map{x=>  
    73.         
    74.       val x_filter=x.replaceAll("[" + AtomsUitl.stopwords + "]"" ").replaceAll("\p{Punct}"" ").replaceAll("\pP"" ")  
    75.                 .replaceAll(" "" ").replaceAll("\p{Blank}"" ").replaceAll("\p{Space}"" ").replaceAll("\p{Cntrl}"" ")  
    76.                           
    77.                         x_filter  
    78.     }  
    79.       
    80.     val    sum_document=word1.count()     
    81.       
    82.      val word_document=word1.zipWithIndex.filter { x => !StringUtils.isBlank(x._1) }.flatMap{x=>  
    83.       val  arr= ArrayBuffer[(String,Int)]()  
    84.       val  line=x._1.split(" ")  
    85.       for(i<-line){  
    86.         arr+=((i,x._2.toInt))  
    87.       }   
    88.       arr }.map{x=>(x._1.trim,x._2)}.filter(x=> !StringUtils.isBlank(x._1))  
    89.     
    90.       println("Calculate the iterms  documnt")  
    91.         
    92.         val word_document_caculate= word_document.map{x=>("$"+ x._1 +"$",x._2)}.flatMap{  
    93.           x=>   var  arr=ArrayBuffer[(String,Int)]()  
    94.            for( y<- 1 to AtomsUitl.len(x._1)-2){  
    95.              arr+=((AtomsUitl.substring(x._1,y, Math.min(maxLen+y,AtomsUitl.len(x._1))),x._2))  
    96.            }  
    97.           arr  
    98.         }.sortBy(x=>x._1)  
    99.           
    100.           
    101.         println("documnet   caculate  will  start")  
    102.           
    103.           
    104.       val word_document_result=word_document_caculate.map{  
    105.           x=>  
    106.             val first=AtomsUitl.substring(x._1, 01)  
    107.             (first,x._1,x._2)       
    108.         }.groupBy((f:(String,String,Int))=>f._1).map{  
    109.           x=>x._2  
    110.         }.flatMap{  
    111.             
    112.         x=>  
    113.             
    114.             val documnet=Maps.newHashMap[String,ConterHashSet]  
    115.            var  arrBuff=ArrayBuffer[(String,Int)]()  
    116.           
    117.             for(curr <- x){  
    118.                 for( ii<-  1 to AtomsUitl.len(curr._2)-1){  
    119.                                 val  w1=AtomsUitl.substring(curr._2, 0,ii)  
    120.                                   if(documnet.containsKey(w1)){  
    121.                                     documnet.get(w1).addelment(curr._3.asInstanceOf[java.lang.Integer])  
    122.                                   }else{  
    123.                                     val cm=new  ConterHashSet();  
    124.                                     cm.addelment(curr._3.asInstanceOf[java.lang.Integer])  
    125.                                     documnet.put(w1,cm)  
    126.                                   }  
    127.                                 }  
    128.           }  
    129.             val documnet_iter=documnet.keySet.iterator  
    130.             while(documnet_iter.hasNext()){  
    131.               val w=documnet_iter.next()  
    132.                   val freq=documnet.get(w).getsize()  
    133.                   arrBuff+=((w,freq))  
    134.                 }  
    135.         arrBuff  
    136.     }  
    137.           
    138. //    word_document_result.take(20).foreach(println)  
    139. //    println("word_document_result's count:"+word_document_result.count())  
    140.       
    141.           
    142.           
    143.           
    144.           
    145.           
    146.           
    147.           
    148.       println("information entropy and information")  
    149.       val word=word1.flatMap{x=>  
    150.       val  line=x.split(" ")  
    151.      line  
    152.        }.filter(x=> !StringUtils.isBlank(x))  
    153.           
    154.         //  //计算左信息熵做准备  
    155.        println("Calculate the left word information entropy and information entropy .....")  
    156.          
    157.      val  wordleft=word.map(x=>AtomsUitl.reverse(x)).map{x=>"$"+ x +"$"}.flatMap{  
    158.           x=>   var  arr=ArrayBuffer[String]()  
    159.            for( y<- 1 to AtomsUitl.len(x)-2){  
    160. //             arr+=x.substring(y, Math.min(maxLen + y,  x.length()))  
    161.              arr+=AtomsUitl.substring(x,y, Math.min(maxLen + y,  AtomsUitl.len(x)))  
    162.            }  
    163.           arr  
    164.         }.sortBy(x=>x)  
    165.             
    166.       val wordleft_caculate= wordleft.map{  
    167.           s=>  
    168. //            val first=s.substring(01).toString()  
    169.             val  first=AtomsUitl.substring(s, 0,1).toString  
    170.               
    171.               
    172.             (first,s)  
    173.         }.groupBy((f:(String,String))=>f._1).map{  
    174.           x=>x._2  
    175.         }.flatMap{  
    176.           x=>  
    177.             val stat = Maps.newHashMap[String, CounterMap]()  
    178.             var  arrBuff=ArrayBuffer[(String,Double)]()  
    179.             for(curr <- x){  
    180.                 for( ii<-  1 to AtomsUitl.len(curr._2)-1){  
    181.                     
    182. //                              val w = curr._2.substring(0,ii)  
    183.                                 val w = AtomsUitl.substring(curr._2, 0, ii)  
    184. //                              val suffix = curr._2.substring(ii).substring(01)  
    185.                                   
    186.                                val suffix= AtomsUitl.substring(AtomsUitl.substring(curr._2,ii),0,1)  
    187.                                   
    188.                                 if (stat.containsKey(w)) {  
    189.                                     stat.get(w).incr(suffix)  
    190.                                 } else {  
    191.                                     val cm = new CounterMap()  
    192.                                     cm.incr(suffix)  
    193.                                       stat.put(w, cm)  
    194.                                 }  
    195.             }  
    196.         }  
    197.         var  iterator_stat=stat.keySet().iterator()  
    198.         while(iterator_stat.hasNext()){  
    199.           var w=iterator_stat.next()  
    200.             var  cm = stat.get(w);  
    201.                                 var freq = 0  
    202.                                 var re = 0.0  
    203.                                   
    204.                                 var  cm_iter=cm.countAll().keySet().iterator()  
    205.                                 while(cm_iter.hasNext()) {  
    206.                                     freq += cm.get(cm_iter.next())  
    207.                                 }  
    208.                                     var  cm_iter1=cm.countAll().keySet().iterator()  
    209.                                 while(cm_iter1.hasNext()) {  
    210.                                     var p = cm.get(cm_iter1.next()) * 1.0 / freq  
    211.                                     re += -1 * Math.log(p) * p  
    212.                                 }  
    213. //                                  print("freq的值是:"+freq+"    ")  
    214. //                                  println("re的值是:"+re)  
    215.                                   
    216.                         arrBuff+=((AtomsUitl.reverse(w),re))          
    217.         }  
    218.          arrBuff     
    219.      }  
    220.   
    221. //      wordleft_caculate.take(20).foreach(println)    
    222. //      println("左邻信息个个数是:"+wordleft_caculate.count())  
    223. //      println(wordleft_caculate.map(x=>x._1).distinct().count())  
    224.           
    225. //     println("wordleft'coutn----->"+wordleft.count)  
    226.         
    227.         
    228.         
    229.         
    230.         
    231.         
    232.         //计算右信息熵做准备  
    233.        println("Calculate the  right word information entropy and information entropy .....")  
    234.      val wordright=word.map{x=>"$"+ x +"$"}.flatMap{  
    235.           x=>  
    236.           var  arr=ArrayBuffer[String]()  
    237. //         AtomsUitl.len(x)-2  
    238.            for( y<- 1 to AtomsUitl.len(x)-2){  
    239. //             arr+=x.substring(y, java.lang.Math.min(maxLen + y,  x.length()))  
    240.                   arr+=(AtomsUitl.substring(x,y,Math.min(maxLen+y,AtomsUitl.len(x))))  
    241.            }  
    242.           arr  
    243.         }.sortBy(x=>x)  
    244.           
    245.     
    246.      //计算右邻字信息熵  
    247.      val wordright_caculate=wordright.map{  
    248.           s=>  
    249. //            val first=s.substring(01).toString()  
    250.               
    251.             val  first=AtomsUitl.substring(s, 0,1).toString()  
    252.             (first,s)  
    253.         }.groupBy((f:(String,String))=>f._1).map{  
    254.           x=>x._2  
    255.         }.flatMap{  
    256.            x=>  
    257.             var stat = Maps.newHashMap[String, CounterMap]()  
    258.             var  arrBuff=ArrayBuffer[(String,Int,Double)]()  
    259.             for(curr <- x){  
    260.                 for(i<-  1 to AtomsUitl.len(curr._2)-1){  
    261. //                              val w = curr._2.substring(0, i)  
    262.                   val w=AtomsUitl.substring(curr._2,0,i)  
    263.                                   
    264. //                              val suffix = curr._2.substring(i).substring(01)  
    265.                   val suffix=AtomsUitl.substring(AtomsUitl.substring(curr._2, i), 0,1).toString  
    266.                                 if (stat.containsKey(w)) {  
    267.                                     stat.get(w).incr(suffix);  
    268.                                 } else {  
    269.                                     val cm = new CounterMap();  
    270.                                     cm.incr(suffix);  
    271.                                       stat.put(w, cm);  
    272.                             }  
    273.             }  
    274.         }  
    275.               
    276.        var  iterator_stat=stat.keySet().iterator()  
    277.         while(iterator_stat.hasNext()){  
    278.           var w=iterator_stat.next()  
    279.             var  cm = stat.get(w);  
    280.                                 var freq = 0  
    281.                                 var re = 0.0  
    282.                                   
    283.                                 var  cm_iter=cm.countAll().keySet().iterator()  
    284.                                 while(cm_iter.hasNext()) {  
    285.                                     freq += cm.get(cm_iter.next())  
    286.                                 }  
    287.                                     var  cm_iter1=cm.countAll().keySet().iterator()  
    288.                                 while(cm_iter1.hasNext()) {  
    289.                                     var p = cm.get(cm_iter1.next()) * 1.0 / freq  
    290.                                     re += -1 * Math.log(p)  * p  
    291.                                 }  
    292.                                       
    293. //                          print("w的值是:"+w+" ")  
    294. //                          print("freq的值是:"+freq+" ")  
    295. //                          println("re的值是"+re)  
    296.                           
    297.                         arrBuff+=((w,freq,re))        
    298.         }  
    299.          arrBuff       
    300.    }  
    301. //    println("计算右邻信息前20条")  
    302. //    wordright_caculate.take(20).foreach(println)  
    303. //    println("右信息表的总共个数:"+wordright_caculate.count())  
    304.   
    305.     
    306.     
    307.       
    308. //    wordright_caculate.  
    309.    //左右合并开始  
    310.         println(" Merge  will begin to  calculated..............")  
    311.           import sqlContext.implicits._  
    312.  /*  val  word_caculate_total1=wordright_caculate.union(wordleft_caculate).sortBy(x=>x).groupBy((f:(String,Int,Double))=>f._1,20).map(x=>x._2)  
    313.    val  word_caculate_total= word_caculate_total1.map{  
    314.           x=>  
    315.          val  hashtable=new java.util.Hashtable[String,String]()  
    316.          hashtable.put("name","null")  
    317.          hashtable.put("freq","0")  
    318.          hashtable.put("e",java.lang.Double.MAX_VALUE.toString())  
    319.       for(str<-x){  
    320.         
    321.          hashtable.put("name",str._1)  
    322.   
    323.          if(str._2!= -20){  
    324.           hashtable.put("freq",String.valueOf(str._2))  
    325.          }  
    326.            
    327.          if(str._3<java.lang.Double.parseDouble(hashtable.get("e"))){  
    328.            hashtable.put("e",String.valueOf(str._3))  
    329.          }  
    330.          
    331.        }  
    332.        
    333.       (hashtable.get("name") ,hashtable.get("freq").toInt,hashtable.get("e").toDouble)  
    334.        }.filter(x=> !StringUtils.isBlank(x._1) && x._1.length>1)*/  
    335.          
    336.      val  wordright_caculate_todf=  wordright_caculate.toDF("right_name","freq","right_info")  
    337.       val  wordleft_caculate_todf=    wordleft_caculate.toDF("left_name","left_info")  
    338.           val udf_get_min:((Double,Double)=>Double)=(arg1:Double,arg2:Double)=>Math.min(arg1,arg2)  
    339.           val sqlfunctin=udf(udf_get_min)  
    340.        val word_caculate_total=wordright_caculate_todf.join(wordleft_caculate_todf,wordright_caculate_todf("right_name")===wordleft_caculate_todf("left_name"),"left").  
    341.           withColumn("info", sqlfunctin(col("right_info"),col("left_info"))).drop("right_info").  
    342.           drop("left_name").drop("left_info").filter(length(wordright_caculate_todf("right_name"))>1).rdd  
    343.     
    344.   
    345. //  wordright_caculate.union(wordleft_caculate).groupBy((f:(String,Int,Double))=>f._1).map(x=>x._2).take(20).foreach(println)  
    346.   
    347.     
    348.   
    349.   println("计算凝固度")  
    350.   val  size_pmi=wordright_caculate.count()  
    351.   println("最后步骤中的size的总数是:"+size_pmi)  
    352.   println("map_total has down")  
    353.   //计算凝固度  
    354.   val  last= word_caculate_total.flatMap{  
    355.     x=>  
    356.        var w=x.apply(0).toString  
    357.       var f=x.apply(1).toString.toInt  
    358.       var  e=x.apply(2).toString.toDouble  
    359.         
    360. //       var w=x._1  
    361. //      var f=x._2  
    362. //      var  e=x._3  
    363.      var  arr=ArrayBuffer[(String,Int,Double,String,String)]()  
    364.       for(s <- 1 to  AtomsUitl.len(w)-1){  
    365. //        var  lw=w.substring(0,s)  
    366.         try{  
    367.         var lw=AtomsUitl.substring(w, 0,s)  
    368. //        var  rw=w.substring(s)  
    369.           
    370.         var rw=AtomsUitl.substring(w, s)  
    371.          arr+=((w,f,e,lw,rw))  
    372.         }catch{  
    373.           case e:Exception=>arr+=(("",0,0.0,"",""))  
    374.             
    375.         }  
    376.          
    377.         }  
    378.          
    379.        arr  
    380.        }.filter(f=> !StringUtils.isBlank(f._4)&& !StringUtils.isBlank(f._5))   
    381.          
    382.          
    383.        println("dataframe merge  will begin to  calculated..............")  
    384. //        last.take(30).foreach(println)  
    385.        
    386.        val  df= last.toDF("w_total","f","e","lw","rw")  
    387.        val  df1=wordright_caculate.toDF("w","freq","re")  
    388.          
    389.        val  df2_drop=df.join(df1,df("lw")===df1("w"),"left").drop("re").drop("w").withColumnRenamed("freq""lw_freq")  
    390. //       val df2_drop=df2.drop("re").drop("w").withColumnRenamed("freq""lw_freq")  
    391.        val df3_drop=df2_drop.join(df1,df2_drop("rw")===df1("w"),"left").drop("re").drop("w").withColumnRenamed("freq""rw_freq")  
    392. //       val df3_drop=df3.drop("re").drop("w").withColumnRenamed("freq""rw_freq")  
    393.          
    394.      
    395.          
    396.          
    397.           
    398.          
    399.          
    400. //       948014  
    401.    //凝固度計算  
    402.      /*val result=df3_drop.rdd.groupBy{f=>f(0)}.map{  
    403.            x=>  
    404.         val map=new java.util.HashMap[String,String]()  
    405.         map.put("max","1")  
    406.         for(i<-x._2){  
    407.           map.put("w_total",i.apply(0).toString)  
    408.           map.put("f",i.apply(1).toString)  
    409.           map.put("e",i.apply(2).toString)  
    410.             
    411.           var  ff:java.lang.Long=try{  
    412.             i.apply(5).toString.toLong*i.apply(6).toString.toLong  
    413.           }catch{  
    414.             case e:Exception=>1l  
    415.           }  
    416.           if(ff>map.get("max").toLong){  
    417.               map.put("max",ff.toString)  
    418.           }  
    419.          }  
    420.            var   pf=map.get("f").toLong*size_pmi*1.0/map.get("max").toLong   
    421.                 var pmi=Math.log(pf)   
    422.                   
    423.           var  w_total= map.get("w_total")  
    424.           var f=map.get("f").toInt  
    425.           var e=map.get("e").toDouble  
    426.           map.clear()  
    427.                    (w_total,f,pmi,e,0)  
    428. //        ( map.get("w_total"),map.get("f").toInt ,pmi,map.get("e").toDouble,0)  
    429.         }.filter(f=>f._3>pmi&& f._4>info&& !StringUtils.isBlank(f._1))   
    430.           
    431.           
    432.        val  resultToDf=  result.toDF("name","freq","pmi","info","zero")  
    433.        */  
    434.          
    435.         println("dataframe join has down")  
    436.          
    437.         //计算凝聚度 改用DataFrame的形式  
    438.         val udf_get_pmi=(arg1:Int,arg2:Int,arg3:Int)=>Math.log((arg1.toLong*size_pmi.toLong*1.0)/(arg2.toLong*arg3.toLong))  
    439.           val udf_get_pmi_udf=udf(udf_get_pmi)    
    440.             
    441.        val resultToDf=df3_drop.withColumn("pmi",udf_get_pmi_udf(col("f"),col("rw_freq"),col("lw_freq"))).withColumn("zero", col("f")*0).  
    442.        drop("rw_freq").drop("lw_freq").drop("lw").drop("rw").sort($"w_total",$"pmi".desc).dropDuplicates(Array("w_total")).  
    443.        filter($"pmi">pmi && $"e">info).withColumnRenamed("w_total""name").withColumnRenamed("f""freq").withColumnRenamed("e""info")  
    444.         
    445.       
    446.          
    447.        println("The final result will be caculated")  
    448.        val  word_document_resultToDf=word_document_result.toDF("name1","document")  
    449.         val resultToDf2= resultToDf.join(word_document_resultToDf,word_document_resultToDf("name1")===resultToDf("name"),"left").  
    450.                       withColumn("documentcount",col("zero")+sum_document).drop("zero").drop("name1")  
    451. //       val resultToDf2 =resultToDf1.withColumn("documentcount",col("zero")+sum_document).drop("zero").drop("name1")  
    452. //       resultToDf2.show(20)  
    453. //       互信息    凝聚度pmi  
    454. //      左右熵  e  
    455.   
    456.      //把结果存入到hdfs中  
    457.      println("Results will stored into  HDFS.")  
    458.      val sdf1=new SimpleDateFormat("yy-MM-dd")  
    459.     val save_path=save_path_result+sdf1.format(System.currentTimeMillis())  
    460.     try{  
    461.       resultToDf2.rdd.map{  
    462.         x=>  
    463.           var  name=x.apply(0).toString  
    464.           var  freq=x.apply(1).toString  
    465.           var entropy=x.apply(2).toString  
    466.           var info=x.apply(3).toString  
    467.           var document=x.apply(4).toString  
    468.           var documenttotal=x.apply(5).toString  
    469.           s"${name},${freq},${info},${entropy},${document},${documenttotal}"  
    470.       }.saveAsTextFile(save_path)  
    471.       println("....................sucess.............")  
    472. //      resultToDf2.rdd.repartition(1).saveAsTextFile(save_path)  
    473.   }catch{  
    474.     case e:Exception=>println("some errors  happend  when sava  the last datas")  
    475.   }  
    476.        
    477.      //把结果插入到mysql数据库中  
    478. /*  val  driver="com.mysql.jdbc.Driver"  
    479.    Class.forName(driver)  
    480.          val url ="jdbc:mysql://10.1.1.28:3306/spark"  
    481.      val pro=new java.util.Properties  
    482.      pro.setProperty("user","usr_dba")  
    483.      pro.setProperty("password","4rfv%TGB^YHN")  
    484.      pro.setProperty("use_unicode""true")  
    485.      pro.setProperty("characterEncoding""utf8")  
    486.      resultToDf2.write.mode(SaveMode.Overwrite).jdbc(url, "wordsegment",pro)  
    487.      */  
    488.    
    489.     println(start)  
    490.     println(sdf.format(System.currentTimeMillis()))  
    491.     sc.stop()  
    492.     
    493.   }  
    494. }  
  • 相关阅读:
    Extension Methods(扩展方法)
    linux面试题
    渗透测试 day4
    渗透测试 day3
    渗透测试 day2
    渗透测试 day1
    9.3 网络安全介绍
    9.2 Iptables
    8.30 进程管理
    8.29 linux的网络
  • 原文地址:https://www.cnblogs.com/think90/p/6379693.html
Copyright © 2011-2022 走看看