zoukankan      html  css  js  c++  java
  • 02.Spark 标签生成(Java和Scala的两种实现)

    Spark 标签生成(Java和Scala的两种实现)

    气温数据多重聚合

    [Scala]实现聚合气温数据。聚合出Max,Min.AVG

    /**
     *  气温数据聚合应用
    */
    object TempAggDemo{
      def main(args:Array[String]):Unit={
        //配置一下
        val conf=new SparkConf()
        conf.setAppName("tempAgg")
        conf.setMaster("local")
        val sc=new SparkContext(conf)
        //1.加载文件
        val rdd1=sc.textFile("file:///d:/mr/temp3.dat")
        //2.切割成对(1930,54)
        val rdd2=rdd1.map(line=>{
          var arr=line.split(" ")
          (arr(0).toInt,arr(1).toInt)
        PairRDD
        //3.分组  按key(年度)分组RDD一但变成了二元组就变成了PairRDD
          //(19300->{23,34,67},1931->{.......})
        val rdd3=rdd2.groupByKey()
        //4.对组内元素进行统计聚合 mapValues()可以迭代的量   K不需要变,看V变
          val rdd4=rdd3.mapValues(it=>{
          val mx= it.max   //max[B >:Int](implicit cmp:Ordering[B]) 最大值
          val mn=it.min
          val sum=it.sun
          val size=it.size
          (mx,mn,sum.toFloat /size) //把每一个年度的气温数据聚合成了三元组
          })
         //5.按照年度排序
          val rdd5=rdd4.sortByKey(true) //true升序
         //6.输出
        rdd5.collect().foreach(println)
      }
    }
    

    reduceByKey的特点:reduceByKey不会改变K,V的类型。进来是什么类型,出去还是什么类型。

    如果要聚合三个值,max,min,(sum,size)通过reduceByKey一次出来。原来的K,V,由于reduceByKey不能改变类型。而我们要的结果至少包含三个结果。它们起码是三元组。
    [Scala]

    object TempAggDemo2{
      def main(args:Array[String]):Unit={
        val conf=new SparkConf()
        conf.setAppName("tempAgg")
        conf.setMaster("local")
        val sc=new SparkContext(conf)
        //1.加载文件
        val rdd1=sc.textFile("file:///d:/mr/temp3.dat")
        //2.切割成对(1930,54)
        val rdd2=rdd1.map(line=>{
        var arr=line.split(" ")
        //(mx,mn,sum,count)
        val year=arr(0).toInt
        val tmp=arr(1).toInt
        //这里又是一个元组(temp最高气温,temp最低气温,temp气温总和,)在变换的时候我们把V当成一个元组来代替,K就是年份。按K聚合,所以所有年份相同的V都要在一起聚合。而此刻的V不是一个单纯的数字了。它是一个元组。把它重向的捏在一起。reduceByKey的特点就是它不能改变V的类型。
        (year,(temp,temp,temp,1))
    })
        //每一元素后面都带1
        // rdd2.collect().foreach(println) //(1953,(23,23,23,1))(1951,(26,26,26,1))
        //3.聚合  reduceByKey(func:((Int,Int,Int,Int),(Int,Int,Int,Int))=>(Int,Int,Int,Int))  它是两个元组组成一个元组.聚合是纵向捏合的过程。两个元组,它捏合完后产生的新值。还要跟第三个再聚,不断的取最大值最小值和avg
        val rdd3=rdd2.rdd2.reduceByKey((a,b)=>{
          import scla.math._
          (max(a._1,b_1),min(a._2,b_2),a._3+b._3,a._4+b._4)
        })
        //rdd3.collect().foreach(println) //(1921,(48,-50,348,53))(1933,(41,-44,-38,49))
        //4.变换 K不需要变K是年度,把V变掉就行了 V是一个元组
        val  rdd4=rdd3.mapValue(t=>{
          (t._1,t._2,t._3.toFloat /t.-4)
        }).sortByKey()
      }
    }
    
    

    [Java]

    map(Function<String,R>f) JavaRDD<R>它就没有各种ByKey的操作
    mapToPair(PairFunction<String..>)JavaPairRDD<K2,V2>
    String 输入的行 Integer K 四元组Tuple4<>
    import scala.Float;
    import scala.Tuple2;
    import scala.Tuple3;
    import scala.Tuple4;
    public class TempAggDemeJava2{
       public static void main(String[] args){
            SparkConf conf=new SparkConf();
            conf.setAppName("tempAgg");
            conf.setMaster("local");
            JavaSparkContext sc=new JavaSparkContext(conf);
            //1.加载文件
           JavaRDD<String>rdd1=sc.textFile(path:"file:///d:/mr/temp3.dat");
           //2.变换(1903,(32,23,23,1))
           JavaRDD<Integer,Tuple4<Integer,Integer,Integer,Integer>>rdd2=
           rdd1.mapToPair(newPairFunction<String,Integer,
           Tuple4<Integer,Integer,Integer,Integer>>(){
           public Tuple2<Integer,Tuple4<Integer,Integer,Integer,Integer>>
           call(String s)throws Exception{
           String[] arr=s.plit(regex:" ");
           int year=Integer.parseInt(arr[0]);
           int tmp=Integer.parseInt(arr[1]);
           Tuple4<Integer,Integer,Integer,Integer> v=new                              Tuple4<Integer,Integer,Integer,Integer>(tmp,tmp,temp,_4:1);
           
          return new Tuple2<Integer,Tuple4<Integer,Integer,Integer,Integer>>(year,v);
           }
          });
          //3.聚合
          JavaPairRDD<Integer,Tuple4<Integer,Integer,Integer,Integer>>redd2.reduceByKey(
                 new Function2<Tuple4<Integer,Integer,Integer,Integer>,
                 Tuple4<Integer,Integer,InInteger,Integer,Integer,Integer>,
                 Tuple4<Integer,Integer,Integer,Integer>>(){
                 public Tuple4<Integer,Integer,Integer,Integer>call
                 (Tuple4<Integer,Integer,Integer,Integer>v1,
                  Tuple4<Integer,Integer,Integer,Integer>v2)throws Exception{
                     //v1和v2是返回一个新的元组
                     int mx=Math.max(v1._1(),v2._1());
                     int mn=Math.min(v1._2(),v2._2());
                     int sum=v1._3()+v2._3();
                     int count=v1._4()+v2._4();
                     return new Tuple4<Integer,Integer,Integer,Integer>(mx,mn,sum,count);
                   }
                 });
           //4.map取出avg K是年度  V变成了三元组
                JavaPairRDD rdd4=rdd3.mapValues(new Function<Tuple4<Integer,Integer,Integer,Integer>,
                Tuple3<Integer,Integer,Float>>(){
                public Tuple3<Integer,Integer,Float>call
                (Tuple4<Integer,Integer,Integer>v1)throws Exception{
                return new Tuple3<Integer,Integer,Float>(v1._1(),v1._2(),_3:         (float)v1._3() /v1._4());
                }
           });
         //5.排序
         JavaPairRDD<Integer,Tuple3<Integer,Integer,Float>>rdd5=rdd4.sortByKey();
         //6.列表
         List<Tuple2<Integer,Tuple3<Integer,Integer,Float>>> list=rdd5.collect();
         for(Tuple2<Integer,Tuple3<Integer,Integer,Float>> t:list){
           System.out.println(t);
         }
       }
    }
    

    [标签生成Scala版本]

    [TaggejScala.scala]标签生成Scala
    ​```scala
    /**
    *
    *标签生成
    */
    object TaggenScala{
      def main(args:Array[String]):Unit={
        val conf=new SparkConf()
        conf.setAppName("Taggen")
        conf.setMaster("local")
        val sc=new SparkContext(conf)
        //1.加载文件,将泛型加全
        val rdd1:RDD[(String,util.List[String])]=
        sc.textFile("file:///d:/mr/temptags.txt")
        //2.解析每行的json数据成为集合  //map这种是变换
        val rdd2:RDD[(String)]=rdd1.map(line=>{
          val arr:Array[String]=line.split("	")
          //提取商家id
          val busii:String=arr(0)
          //json对象
          val json:String=arr(1)
          val list:java.util.List[String]=TagUtil.extractTag(json)
          Tuple2[String,java.util.List[String]](busid,list)
        })
        //3.过滤空集合  filter(f:((String,util.List[String]))=>Boolean),(String,util.List[String])这个参数是二元组,返回值是Boolean,rdd2.filter()这里可以用一个高阶函数传进去。传清楚的话它就是一个元组rdd2.filter(t=>{ }),要把这个元组定义齐全的话。类型可以声明,它的类型是一个二元组rdd2.filter((t:Tuple2[String,java.util.List[String]])=>
        val rdd3:RDD[(String,util.List[String])]=
        rdd2.filter((t:Tuple2[String,java.util.List[String]])=>{
          !t._2.isEmpty  //二元组一旦进来就可以拿到它第二个元素,看它是否为空
        })
        //4.将值压扁 flatMapValues[U](f:(util.List[String]))=>TraversableOnce[U]):这个是对值进行压扁,如果对值一旦压扁。值压扁之后值压扁之后,每一值都会跟K从新组合新的K。 flatMap:
        val rdd4:RDD[(String,String)]rdd3.flatMapValues((list:java.util.List[String])=>{
          //导入隐式转换
          import scala.collection.JavaConversions._
          //返回list的话它需要返回可以迭代的量,需要导入隐式转换
        })
        //5.滤掉数组的tag,它的类型没有变还是元组
        val rdd5:RDD[(String,String)]=rdd4.filter((t:Tuple2[String,String])=>{
          try{
            //
            Integer.pparseInt(t._2)
            false 
          }catch{
            case _=>true
          }
        })
        //6.标一成对,第5步已经是一对了但是不是想要的。需要重新配对成新的一对。
        val rdd6:RDD[Tuple2[Tuple2[String,String],Int]]=
        rdd5.map((t:Tuple2[String,String])=>{
              Tuple2[Tuple2[String,String],Int](t,1)
        })              //新的二元组是一个嵌套的二元组,元组是Optin的方法
        //7.聚合
        val rdd7:RDD[Tuple2[Tuple2[String,String],Int]]=
        rdd6.reduceByKey((a:Int,b:Int)=>{
          a+b
        })
        //8.重组
        val rdd8:RDD[Tuple2[String,Tuple2[String,Int]]]=
        rdd7.map((t:Tuple2[Tuple2[String,String],Int])=>{
          Tuple2[String,List[Tuple2[String,Int]]](t._1._1,Tuple2[String,Int](t._1._2,t._2)::Nil)  //List里面的元素是二元组
          //Tuple2[String,Tuple2[String,Int]](t._1._1,Tuple2[String,Int](t._1._2,t._2)::Nil) //在reduceByKey它是纵向捏合,把一个K下所有的V聚合在一起形成一个值。但又不能改变V的类型。如果原来的K是一个二元组。最终把所有的V聚完之后还是一个二元组。这是不合适的。此刻的目的是把它同一商家下的所有评论都放一起。每一个二元组都做到集合里面去。这样一来两个集合就可以聚到一个集里面去了。当我们变换的时候,我们不会把它变成一个二元组。 而是把它变成集合。(Tuple2[String,Int](t._1._2,t._2)::Nil)    
        })     //
        //9.reduceByKey
        val rdd9:RDD[Tuple2[String,List[Tuple2[String,Int]]]] =
        rdd8.reduceByKey((a:List[Tuple2[String,Int]],b:List[Tuple2[String,Int]])=>{
          a:::b  //::是一个数字和一个集合的添加,[B >:(String,Int)](x:B):它是一个元组结尾的操作符是右操作符。
                //:::[B >: (String,Int)](prefix: List[B])这是一个界定,B是List 它是柯里化函数
        })
        //10.分组内排序
        val rdd10:RDD[Tuple2[String,List[Tuple2[String,Int]]]] =
        rdd9.mapValues((list:List[Tuple2[String,Int]]))=>{
          //sortBy[B](f:((String,Int))=>B)(implicit ord:Ordering[B]),sortBy它要排序它的元素是一个元组,把它的类型(二元组)给它。  List集合是不可变集合(immutable)  源码  typeList[+A]=scala.collection.immutable.List[A] val List=scala.collection.immutable.List  所以它排序完之后产生的是新集合。
          val list2:List[Tuple2[String,Int]]=list.sortBy((t:Tuple2[String,Int])=>{
            -t._2
          })
          //取前5
          list2.take(5) 
        })
        //11.商家间排序  sortBy[K](f:((String,List[(String,Int)]))=>K,A,ASCENDING:Boolean=true,numpartitions:Int...)它是一个高阶
        val rdd11:RDD[Tuple2[String,List[Tuple2[String,Int]]]] =
        rdd10.sortBy((t:Tuple2[String,List[Tuple2[String,Int]]])=>{
          t.-2(0)._2  //t.-2就是集合(0)就是商家的评论最大数。再把它取出来。
        },false)
        //收集  打印
        rdd11 .collect().foreach(println)
      }
    }
    总结:把所有泛型类型都定义上。
    
    public class TagUtil{
      /*
      *从json中抽取评论集合
      */
      public static List<String>extractTag(String json){
        List<String>list=new ArrayList<String>();
        //将字符串解析成json对象
        JSONObject obj=JSON.parseObject(json);
        JSONArray arr=obj.getJSONArray(key:"extInfoList");
        if(arr !=null && arr.size()>0){
           //得到数组的第一json对象
          JSONObject firstObj=arr.getJSONObject(index:0);
          JSONArray values=firstObj.getJSONArray(key:"values");
          if(arr !=null &&arr.size()>0){
            //得到数组的第一个json对象
            JSONObject firstObj=arr.getJSONObject(index:0);
            JSONArray values=firstObj.getJSONArray(key:"values");
            if(values !=null && arr.size()>0){
              //得到字符串解析成json对象
              JSONObject firstObj=arr.getJSONObject(index:0);
              JSONArray values=firstObj.getJSONArray(key:"values");
              if(values !=null &&values.size()>0){
                for(int i=0;i<values.size();i++){
                  String tag=values.getString(i);
                  list.add(tag);
                }
              }
            }
            return list;
          }
          
      }
    }
    

    [TagUtil.java]

    public class TagUtil{
      //从json中抽取评论集合
      public static List<String>extractTag(String json){
        List<Stromg>list=new ArrayList<String>();
        //将字符串解析成json对象
        JSONObject obj=JSON.parseObject(json);
        JSONArray arr=obj.getJSONArray("extInfoList");
        if(arr !=null && arr.size()>0){
          //得到数组的第一个json对象
          JSONObject firstObj=arr.getJSONObject(0);
          JSONArray values=firstObj.getJSONArray("values");
          if(arr !=null && arr.size()>0){
            //得到数组的第一个json对象
            JSONObject firstObj=arr.getJSONObject(0);
            JSONArray values=firstObj.getJSONArray("values");
            if(values !=null &&values.size()>0){
              for(int i=0;i<values.size();i++){
                String tag=values,getString(i);
                list.add(tag);
              }
            }
          }
          return list;
        }
      }
    }
    
    //java方法定义
    public staic <T>T getMiddle(List<T>list){
      return list.get(list.size() /2);
    }
    @Test
    public void testMethod(){
      List<Integer>list=new ArrayList<Integer>();
      list.add(1);
      list.add(2);
      list.add(3);
      System.out.println(getMiddle(list));
    }
    

    Maven

    <dependency>
      <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId> fastjson</artifactId>
        <version>1.2.24</version>
      </dependency>
    </dependency>
    

    [标签生成Java版本]

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    
    public class TaggenJava2{
      public static void main(String[] args){
        SparkConf conf=new SparkConf();
        conf.setAppName("tempAgg");
        conf,setMaster("local");
        JavaSparkContext sc=new JavaSparkContext(conf);
        //1.加载文件
        JavaRDD<String>rdd1=sc.textFile(path:"file:///d:/tenotags.txt");
        //变换   2. 切割
        JavaPairRDD<String,List<String>>rdd2=
        rdd1.mapToPair(new PairFunction<String,String,List<String>>(){
          public Tuple2<String,List<String>>call(String s)throws Exception{
            String[] arr=s.split(regex:"	");
            String busid=arr[0];
            List<String>tags=TagUtil.extractTag(arr[1]);
            return new Tuple2<String,List<String>>(busid,tags); //现在已经把它变成集合了,空集合要把它滤掉
          }
        });
        //注意:操作过程期间,集合过滤越往前越好。
        //3.过滤空集合
        JavaPairRDD<String,List<String>>rdd3=
          rdd2.filter(new Function<Tuple2<String,List<String>>,Boolean>(){
            public Boolean call(Tuple2<String,List<String>>t)throws Exception{
              return !t._2().isEmpty();
            }
          });
         //4.压扁值
        JavaPairRDD<String,String>rdd4=
          rdd3.flatMapValues(new Function<List<String>,Iterable<String>>(){
          public Iterable<String> call(List<String> v1)throws Exception{
            return v1;
          }
        });
        //5.过滤掉数字标签,过滤的话需要把K和V传进来,它是过滤不会改变内容
        rdd4.filter(new Function<Tuple2<String,String>,Boolean>(){
          public Boolean call(Tuple2<String,String>t)throws Exception{
            try{
              Integer.parseInt(t._2());
              return false;
            }catch(Exception e){
              e.printStackTrace();
            }
            return true;
          }
        });
         //6.重组,标1成对 新K是二元组
        JavaPairRDD<Tuple2<String,String>,Integer> rdd6=
          rdd5.mapToPair(new PairFunction<Tuple2<String,String>,
          Tuple2<String,String>,Integer>(){
                public Tuple2<Tuple2<String,String>,Integer>call
                   (Tuple2<String,String>t)throws Exception{
                      return new Tuple2<Tuple2<String,String>,Integer>(t,1);
                    }
            });
          //7.聚合值
        JavaPairRDD<Tuple2<String,String>,Integer> rdd7=
          rdd6.reduceByKey(new Function2<Integer,Integer,Integer>(){
            public Integer call(Integer v1,Integer v2)throws Exception{
             return v1+v2;
          }
        });
        //8.重组组员(busid,(tag,num))  不是把它变成元组是把它变成集合
        JavaPairRDD<String,List<Tuple2<String,Integer>>>rdd8=
        rdd7.mapToPair(new PairFunction<Tuple2<Tuple2<String,String>,Integer>,
        String,Tuple2<String,Integer>>(){
          public Tuple2<String,List<Tuple2<String,Integer>>>call
           (Tuple2<Tuple2<String,String>,Integer>t)thros Exception{
             List<Tuple2<String,Integer>>list=new ArrayList<Tuple2<String,Integer>>();
              list.add(new Tuple2<String,Integer>(t._1._2,t._2));
                return new Tuple2<String,Tuple2<String,Integer>>(t._1._1,
                       new Tuple2<String,Integer>(t._1._2,t._2));            
          } 
        });
        //9.聚合  变成集合了,把两个集合变成一个集合
        JavaPairRDD<String,List<Tuple2<String,Integer>>> rdd9=rdd8.reduceByKey(
          new Function2<List<Tuple2<String,Integer>>,List<Tuple2<String,Integer>>,
          List<Tuple2<String,Integer>>>(){
            public List<Tuple2<String,Integer>>call(List<Tuple2<String,Intger>>
            v1,List<Tuple2<String,Integer>>v2)throws Exception{
               v1.addAll(v2); //v1的所有集合添加所有v2的集合,没有返回值
                return v1;
            }
          });
        //10.商家内排序,对V变换,此时list是V
        JavaPairRDD<String,List<Tuple2<String,Integer>>> rdd10=
        rdd9.mapValues(new Funciton<List<Ti[le2<String,Integer>>,
        List<Tuple2<String,Integer>>>(){
        public List<Tuple2<String,Integer>>call(List<Tuple2<String,Integer>>v1)
          throws Exception{
         //Comparator对比器二元元组sort(Comparator<?super Tuple2<String,Integer>>>c])
          v1.sort(new Comparator<Tuple2<String,Integer>>(){
            //比较集合中的两个元素的大小有三家评论和数量
            public int compare(Tuple2<String,Intger>o1,Tuple2<String,Integer>o2){
              return -(o1._2-o2._2); //-()降序
            }
          });
          
          //subList返回的列表是不能串行化的,要用一个串行化的方法返回这个值。
          List<Tuple2<String,Integer>>newList=new ArrayList<Tuple2<String,Integer>>();
          newList.addAll(v1.subList(0,v1.size()>5?5:v1.size()));
          
          //subList(int fromIndex,int toIndex)
         // return v1;//sort是没有返回值的,它是对V本身排序的
         //return v1.subList(0,4);//前5  有些集合里面可能没有5个元素会报错要先判断一下
          //return v1.subList(0,v1.size()>5?5:v1.size());//>5就5否则的话就v1.size
          return newList;
        }
        }); 
       //11.变换pairRDD到普通RDD,否则没有sortBy方法,这里先把它转成非JavaPairRDD的  JavaPairRDD它没有排序的方法,只有按Key
         //第一个是二元组,第二个是List 这个是评论数量<Tuple2<String,Ingerger>                 
         JavaRDD<Tuple2<String,List<Tuple2<String,Ingerger>>>>rdd11=
         rdd10.map(newFunction<Tuple2<String,List<Tuple2<String,
         Integer>>>,Tuple2<String,List<Tuple2<String,Integer>>>>(){
           public Tuple2<String,List<Tuple2<String,Integer>>>call
          (Tuple2<String,List<Tuple2<String,Integer>>>v1)throws Exception{
             return t;
           }                                 
          });
          //12.商家间排序                                 
         //Object这是你要返回的值,Function这是一个元组,它返回的是一个整数
         //按照每一个商家它第一个评论的数量,来倒排
         rdd11.sortBy(new Function<Tuple2<String,List<Tuple2<String,Integer>>>,
         Integer(){
           public Integer call(Tuple2<String,List<Tuple2<String,Integer>>>t)
           throw Exception{
             return -t._2.get(0)._2;
           }
          },true,2);// sortBy 
                                                                             
          List<Tuple2<String,String>>list=rdd5.collect();
           for(Tuple2 t:list){
                System.out.println(t);
          }
        
        //输出rdd7
         List list=rdd8.collect();
           for(Object o:list){
                System.out.println(o);
          }
      }
    }
                                            
    [sort源码]//sort没有返回值
    default void sort(Comparator<?super E>c){
       Object[] a=this.toArray();
       Arrays.sort(a,(Comparator)c);
       ListIterator<E>i=this.listIterator();
      for(Object e : a){
        i.next();
        i.set((E)e);
      }
    }  
                                            
                                            JavaRDD<Tuple2<String,List<Tuple2<String>>>>.....      
    [sortBy]
    //JFunction[T,S]参数,ascending:升降序  numPartition:分区                             
    def sortBy[S](f:JFunction[T,S],ascending:Boolean,numPartition:Int):JavaRDD[T]
    
  • 相关阅读:
    一些业内有名的网站收集
    WCF重载
    FCKEditor fckconfig.js配置,添加字体和大小 附:中文字体乱码问题解决
    查询第几条到第几条的数据的SQL语句
    SPOJ 9939 Eliminate the Conflict
    UVA 10534 Wavio Sequence
    HDU 3474 Necklace
    POJ 2823 Sliding Window
    UVA 437 The Tower of Babylon
    UVA 825 Walking on the Safe Side
  • 原文地址:https://www.cnblogs.com/SteveDZC/p/9779451.html
Copyright © 2011-2022 走看看