zoukankan      html  css  js  c++  java
  • 02.Spark 标签生成(Java和Scala的两种实现)

    Spark 标签生成(Java和Scala的两种实现)

    气温数据多重聚合

    [Scala]实现聚合气温数据。聚合出Max,Min.AVG

    /**
     *  气温数据聚合应用
    */
    object TempAggDemo{
      def main(args:Array[String]):Unit={
        //配置一下
        val conf=new SparkConf()
        conf.setAppName("tempAgg")
        conf.setMaster("local")
        val sc=new SparkContext(conf)
        //1.加载文件
        val rdd1=sc.textFile("file:///d:/mr/temp3.dat")
        //2.切割成对(1930,54)
        val rdd2=rdd1.map(line=>{
          var arr=line.split(" ")
          (arr(0).toInt,arr(1).toInt)
        PairRDD
        //3.分组  按key(年度)分组RDD一但变成了二元组就变成了PairRDD
          //(19300->{23,34,67},1931->{.......})
        val rdd3=rdd2.groupByKey()
        //4.对组内元素进行统计聚合 mapValues()可以迭代的量   K不需要变,看V变
          val rdd4=rdd3.mapValues(it=>{
          val mx= it.max   //max[B >:Int](implicit cmp:Ordering[B]) 最大值
          val mn=it.min
          val sum=it.sun
          val size=it.size
          (mx,mn,sum.toFloat /size) //把每一个年度的气温数据聚合成了三元组
          })
         //5.按照年度排序
          val rdd5=rdd4.sortByKey(true) //true升序
         //6.输出
        rdd5.collect().foreach(println)
      }
    }
    

    reduceByKey的特点:reduceByKey不会改变K,V的类型。进来是什么类型,出去还是什么类型。

    如果要聚合三个值,max,min,(sum,size)通过reduceByKey一次出来。原来的K,V,由于reduceByKey不能改变类型。而我们要的结果至少包含三个结果。它们起码是三元组。
    [Scala]

    object TempAggDemo2{
      def main(args:Array[String]):Unit={
        val conf=new SparkConf()
        conf.setAppName("tempAgg")
        conf.setMaster("local")
        val sc=new SparkContext(conf)
        //1.加载文件
        val rdd1=sc.textFile("file:///d:/mr/temp3.dat")
        //2.切割成对(1930,54)
        val rdd2=rdd1.map(line=>{
        var arr=line.split(" ")
        //(mx,mn,sum,count)
        val year=arr(0).toInt
        val tmp=arr(1).toInt
        //这里又是一个元组(temp最高气温,temp最低气温,temp气温总和,)在变换的时候我们把V当成一个元组来代替,K就是年份。按K聚合,所以所有年份相同的V都要在一起聚合。而此刻的V不是一个单纯的数字了。它是一个元组。把它重向的捏在一起。reduceByKey的特点就是它不能改变V的类型。
        (year,(temp,temp,temp,1))
    })
        //每一元素后面都带1
        // rdd2.collect().foreach(println) //(1953,(23,23,23,1))(1951,(26,26,26,1))
        //3.聚合  reduceByKey(func:((Int,Int,Int,Int),(Int,Int,Int,Int))=>(Int,Int,Int,Int))  它是两个元组组成一个元组.聚合是纵向捏合的过程。两个元组,它捏合完后产生的新值。还要跟第三个再聚,不断的取最大值最小值和avg
        val rdd3=rdd2.rdd2.reduceByKey((a,b)=>{
          import scla.math._
          (max(a._1,b_1),min(a._2,b_2),a._3+b._3,a._4+b._4)
        })
        //rdd3.collect().foreach(println) //(1921,(48,-50,348,53))(1933,(41,-44,-38,49))
        //4.变换 K不需要变K是年度,把V变掉就行了 V是一个元组
        val  rdd4=rdd3.mapValue(t=>{
          (t._1,t._2,t._3.toFloat /t.-4)
        }).sortByKey()
      }
    }
    
    

    [Java]

    map(Function<String,R>f) JavaRDD<R>它就没有各种ByKey的操作
    mapToPair(PairFunction<String..>)JavaPairRDD<K2,V2>
    String 输入的行 Integer K 四元组Tuple4<>
    import scala.Float;
    import scala.Tuple2;
    import scala.Tuple3;
    import scala.Tuple4;
    public class TempAggDemeJava2{
       public static void main(String[] args){
            SparkConf conf=new SparkConf();
            conf.setAppName("tempAgg");
            conf.setMaster("local");
            JavaSparkContext sc=new JavaSparkContext(conf);
            //1.加载文件
           JavaRDD<String>rdd1=sc.textFile(path:"file:///d:/mr/temp3.dat");
           //2.变换(1903,(32,23,23,1))
           JavaRDD<Integer,Tuple4<Integer,Integer,Integer,Integer>>rdd2=
           rdd1.mapToPair(newPairFunction<String,Integer,
           Tuple4<Integer,Integer,Integer,Integer>>(){
           public Tuple2<Integer,Tuple4<Integer,Integer,Integer,Integer>>
           call(String s)throws Exception{
           String[] arr=s.plit(regex:" ");
           int year=Integer.parseInt(arr[0]);
           int tmp=Integer.parseInt(arr[1]);
           Tuple4<Integer,Integer,Integer,Integer> v=new                              Tuple4<Integer,Integer,Integer,Integer>(tmp,tmp,temp,_4:1);
           
          return new Tuple2<Integer,Tuple4<Integer,Integer,Integer,Integer>>(year,v);
           }
          });
          //3.聚合
          JavaPairRDD<Integer,Tuple4<Integer,Integer,Integer,Integer>>redd2.reduceByKey(
                 new Function2<Tuple4<Integer,Integer,Integer,Integer>,
                 Tuple4<Integer,Integer,InInteger,Integer,Integer,Integer>,
                 Tuple4<Integer,Integer,Integer,Integer>>(){
                 public Tuple4<Integer,Integer,Integer,Integer>call
                 (Tuple4<Integer,Integer,Integer,Integer>v1,
                  Tuple4<Integer,Integer,Integer,Integer>v2)throws Exception{
                     //v1和v2是返回一个新的元组
                     int mx=Math.max(v1._1(),v2._1());
                     int mn=Math.min(v1._2(),v2._2());
                     int sum=v1._3()+v2._3();
                     int count=v1._4()+v2._4();
                     return new Tuple4<Integer,Integer,Integer,Integer>(mx,mn,sum,count);
                   }
                 });
           //4.map取出avg K是年度  V变成了三元组
                JavaPairRDD rdd4=rdd3.mapValues(new Function<Tuple4<Integer,Integer,Integer,Integer>,
                Tuple3<Integer,Integer,Float>>(){
                public Tuple3<Integer,Integer,Float>call
                (Tuple4<Integer,Integer,Integer>v1)throws Exception{
                return new Tuple3<Integer,Integer,Float>(v1._1(),v1._2(),_3:         (float)v1._3() /v1._4());
                }
           });
         //5.排序
         JavaPairRDD<Integer,Tuple3<Integer,Integer,Float>>rdd5=rdd4.sortByKey();
         //6.列表
         List<Tuple2<Integer,Tuple3<Integer,Integer,Float>>> list=rdd5.collect();
         for(Tuple2<Integer,Tuple3<Integer,Integer,Float>> t:list){
           System.out.println(t);
         }
       }
    }
    

    [标签生成Scala版本]

    [TaggejScala.scala]标签生成Scala
    ​```scala
    /**
    *
    *标签生成
    */
    object TaggenScala{
      def main(args:Array[String]):Unit={
        val conf=new SparkConf()
        conf.setAppName("Taggen")
        conf.setMaster("local")
        val sc=new SparkContext(conf)
        //1.加载文件,将泛型加全
        val rdd1:RDD[(String,util.List[String])]=
        sc.textFile("file:///d:/mr/temptags.txt")
        //2.解析每行的json数据成为集合  //map这种是变换
        val rdd2:RDD[(String)]=rdd1.map(line=>{
          val arr:Array[String]=line.split("	")
          //提取商家id
          val busii:String=arr(0)
          //json对象
          val json:String=arr(1)
          val list:java.util.List[String]=TagUtil.extractTag(json)
          Tuple2[String,java.util.List[String]](busid,list)
        })
        //3.过滤空集合  filter(f:((String,util.List[String]))=>Boolean),(String,util.List[String])这个参数是二元组,返回值是Boolean,rdd2.filter()这里可以用一个高阶函数传进去。传清楚的话它就是一个元组rdd2.filter(t=>{ }),要把这个元组定义齐全的话。类型可以声明,它的类型是一个二元组rdd2.filter((t:Tuple2[String,java.util.List[String]])=>
        val rdd3:RDD[(String,util.List[String])]=
        rdd2.filter((t:Tuple2[String,java.util.List[String]])=>{
          !t._2.isEmpty  //二元组一旦进来就可以拿到它第二个元素,看它是否为空
        })
        //4.将值压扁 flatMapValues[U](f:(util.List[String]))=>TraversableOnce[U]):这个是对值进行压扁,如果对值一旦压扁。值压扁之后值压扁之后,每一值都会跟K从新组合新的K。 flatMap:
        val rdd4:RDD[(String,String)]rdd3.flatMapValues((list:java.util.List[String])=>{
          //导入隐式转换
          import scala.collection.JavaConversions._
          //返回list的话它需要返回可以迭代的量,需要导入隐式转换
        })
        //5.滤掉数组的tag,它的类型没有变还是元组
        val rdd5:RDD[(String,String)]=rdd4.filter((t:Tuple2[String,String])=>{
          try{
            //
            Integer.pparseInt(t._2)
            false 
          }catch{
            case _=>true
          }
        })
        //6.标一成对,第5步已经是一对了但是不是想要的。需要重新配对成新的一对。
        val rdd6:RDD[Tuple2[Tuple2[String,String],Int]]=
        rdd5.map((t:Tuple2[String,String])=>{
              Tuple2[Tuple2[String,String],Int](t,1)
        })              //新的二元组是一个嵌套的二元组,元组是Optin的方法
        //7.聚合
        val rdd7:RDD[Tuple2[Tuple2[String,String],Int]]=
        rdd6.reduceByKey((a:Int,b:Int)=>{
          a+b
        })
        //8.重组
        val rdd8:RDD[Tuple2[String,Tuple2[String,Int]]]=
        rdd7.map((t:Tuple2[Tuple2[String,String],Int])=>{
          Tuple2[String,List[Tuple2[String,Int]]](t._1._1,Tuple2[String,Int](t._1._2,t._2)::Nil)  //List里面的元素是二元组
          //Tuple2[String,Tuple2[String,Int]](t._1._1,Tuple2[String,Int](t._1._2,t._2)::Nil) //在reduceByKey它是纵向捏合,把一个K下所有的V聚合在一起形成一个值。但又不能改变V的类型。如果原来的K是一个二元组。最终把所有的V聚完之后还是一个二元组。这是不合适的。此刻的目的是把它同一商家下的所有评论都放一起。每一个二元组都做到集合里面去。这样一来两个集合就可以聚到一个集里面去了。当我们变换的时候,我们不会把它变成一个二元组。 而是把它变成集合。(Tuple2[String,Int](t._1._2,t._2)::Nil)    
        })     //
        //9.reduceByKey
        val rdd9:RDD[Tuple2[String,List[Tuple2[String,Int]]]] =
        rdd8.reduceByKey((a:List[Tuple2[String,Int]],b:List[Tuple2[String,Int]])=>{
          a:::b  //::是一个数字和一个集合的添加,[B >:(String,Int)](x:B):它是一个元组结尾的操作符是右操作符。
                //:::[B >: (String,Int)](prefix: List[B])这是一个界定,B是List 它是柯里化函数
        })
        //10.分组内排序
        val rdd10:RDD[Tuple2[String,List[Tuple2[String,Int]]]] =
        rdd9.mapValues((list:List[Tuple2[String,Int]]))=>{
          //sortBy[B](f:((String,Int))=>B)(implicit ord:Ordering[B]),sortBy它要排序它的元素是一个元组,把它的类型(二元组)给它。  List集合是不可变集合(immutable)  源码  typeList[+A]=scala.collection.immutable.List[A] val List=scala.collection.immutable.List  所以它排序完之后产生的是新集合。
          val list2:List[Tuple2[String,Int]]=list.sortBy((t:Tuple2[String,Int])=>{
            -t._2
          })
          //取前5
          list2.take(5) 
        })
        //11.商家间排序  sortBy[K](f:((String,List[(String,Int)]))=>K,A,ASCENDING:Boolean=true,numpartitions:Int...)它是一个高阶
        val rdd11:RDD[Tuple2[String,List[Tuple2[String,Int]]]] =
        rdd10.sortBy((t:Tuple2[String,List[Tuple2[String,Int]]])=>{
          t.-2(0)._2  //t.-2就是集合(0)就是商家的评论最大数。再把它取出来。
        },false)
        //收集  打印
        rdd11 .collect().foreach(println)
      }
    }
    总结:把所有泛型类型都定义上。
    
    public class TagUtil{
      /*
      *从json中抽取评论集合
      */
      public static List<String>extractTag(String json){
        List<String>list=new ArrayList<String>();
        //将字符串解析成json对象
        JSONObject obj=JSON.parseObject(json);
        JSONArray arr=obj.getJSONArray(key:"extInfoList");
        if(arr !=null && arr.size()>0){
           //得到数组的第一json对象
          JSONObject firstObj=arr.getJSONObject(index:0);
          JSONArray values=firstObj.getJSONArray(key:"values");
          if(arr !=null &&arr.size()>0){
            //得到数组的第一个json对象
            JSONObject firstObj=arr.getJSONObject(index:0);
            JSONArray values=firstObj.getJSONArray(key:"values");
            if(values !=null && arr.size()>0){
              //得到字符串解析成json对象
              JSONObject firstObj=arr.getJSONObject(index:0);
              JSONArray values=firstObj.getJSONArray(key:"values");
              if(values !=null &&values.size()>0){
                for(int i=0;i<values.size();i++){
                  String tag=values.getString(i);
                  list.add(tag);
                }
              }
            }
            return list;
          }
          
      }
    }
    

    [TagUtil.java]

    public class TagUtil{
      //从json中抽取评论集合
      public static List<String>extractTag(String json){
        List<Stromg>list=new ArrayList<String>();
        //将字符串解析成json对象
        JSONObject obj=JSON.parseObject(json);
        JSONArray arr=obj.getJSONArray("extInfoList");
        if(arr !=null && arr.size()>0){
          //得到数组的第一个json对象
          JSONObject firstObj=arr.getJSONObject(0);
          JSONArray values=firstObj.getJSONArray("values");
          if(arr !=null && arr.size()>0){
            //得到数组的第一个json对象
            JSONObject firstObj=arr.getJSONObject(0);
            JSONArray values=firstObj.getJSONArray("values");
            if(values !=null &&values.size()>0){
              for(int i=0;i<values.size();i++){
                String tag=values,getString(i);
                list.add(tag);
              }
            }
          }
          return list;
        }
      }
    }
    
    //java方法定义
    public staic <T>T getMiddle(List<T>list){
      return list.get(list.size() /2);
    }
    @Test
    public void testMethod(){
      List<Integer>list=new ArrayList<Integer>();
      list.add(1);
      list.add(2);
      list.add(3);
      System.out.println(getMiddle(list));
    }
    

    Maven

    <dependency>
      <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId> fastjson</artifactId>
        <version>1.2.24</version>
      </dependency>
    </dependency>
    

    [标签生成Java版本]

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    
    public class TaggenJava2{
      public static void main(String[] args){
        SparkConf conf=new SparkConf();
        conf.setAppName("tempAgg");
        conf,setMaster("local");
        JavaSparkContext sc=new JavaSparkContext(conf);
        //1.加载文件
        JavaRDD<String>rdd1=sc.textFile(path:"file:///d:/tenotags.txt");
        //变换   2. 切割
        JavaPairRDD<String,List<String>>rdd2=
        rdd1.mapToPair(new PairFunction<String,String,List<String>>(){
          public Tuple2<String,List<String>>call(String s)throws Exception{
            String[] arr=s.split(regex:"	");
            String busid=arr[0];
            List<String>tags=TagUtil.extractTag(arr[1]);
            return new Tuple2<String,List<String>>(busid,tags); //现在已经把它变成集合了,空集合要把它滤掉
          }
        });
        //注意:操作过程期间,集合过滤越往前越好。
        //3.过滤空集合
        JavaPairRDD<String,List<String>>rdd3=
          rdd2.filter(new Function<Tuple2<String,List<String>>,Boolean>(){
            public Boolean call(Tuple2<String,List<String>>t)throws Exception{
              return !t._2().isEmpty();
            }
          });
         //4.压扁值
        JavaPairRDD<String,String>rdd4=
          rdd3.flatMapValues(new Function<List<String>,Iterable<String>>(){
          public Iterable<String> call(List<String> v1)throws Exception{
            return v1;
          }
        });
        //5.过滤掉数字标签,过滤的话需要把K和V传进来,它是过滤不会改变内容
        rdd4.filter(new Function<Tuple2<String,String>,Boolean>(){
          public Boolean call(Tuple2<String,String>t)throws Exception{
            try{
              Integer.parseInt(t._2());
              return false;
            }catch(Exception e){
              e.printStackTrace();
            }
            return true;
          }
        });
         //6.重组,标1成对 新K是二元组
        JavaPairRDD<Tuple2<String,String>,Integer> rdd6=
          rdd5.mapToPair(new PairFunction<Tuple2<String,String>,
          Tuple2<String,String>,Integer>(){
                public Tuple2<Tuple2<String,String>,Integer>call
                   (Tuple2<String,String>t)throws Exception{
                      return new Tuple2<Tuple2<String,String>,Integer>(t,1);
                    }
            });
          //7.聚合值
        JavaPairRDD<Tuple2<String,String>,Integer> rdd7=
          rdd6.reduceByKey(new Function2<Integer,Integer,Integer>(){
            public Integer call(Integer v1,Integer v2)throws Exception{
             return v1+v2;
          }
        });
        //8.重组组员(busid,(tag,num))  不是把它变成元组是把它变成集合
        JavaPairRDD<String,List<Tuple2<String,Integer>>>rdd8=
        rdd7.mapToPair(new PairFunction<Tuple2<Tuple2<String,String>,Integer>,
        String,Tuple2<String,Integer>>(){
          public Tuple2<String,List<Tuple2<String,Integer>>>call
           (Tuple2<Tuple2<String,String>,Integer>t)thros Exception{
             List<Tuple2<String,Integer>>list=new ArrayList<Tuple2<String,Integer>>();
              list.add(new Tuple2<String,Integer>(t._1._2,t._2));
                return new Tuple2<String,Tuple2<String,Integer>>(t._1._1,
                       new Tuple2<String,Integer>(t._1._2,t._2));            
          } 
        });
        //9.聚合  变成集合了,把两个集合变成一个集合
        JavaPairRDD<String,List<Tuple2<String,Integer>>> rdd9=rdd8.reduceByKey(
          new Function2<List<Tuple2<String,Integer>>,List<Tuple2<String,Integer>>,
          List<Tuple2<String,Integer>>>(){
            public List<Tuple2<String,Integer>>call(List<Tuple2<String,Intger>>
            v1,List<Tuple2<String,Integer>>v2)throws Exception{
               v1.addAll(v2); //v1的所有集合添加所有v2的集合,没有返回值
                return v1;
            }
          });
        //10.商家内排序,对V变换,此时list是V
        JavaPairRDD<String,List<Tuple2<String,Integer>>> rdd10=
        rdd9.mapValues(new Funciton<List<Ti[le2<String,Integer>>,
        List<Tuple2<String,Integer>>>(){
        public List<Tuple2<String,Integer>>call(List<Tuple2<String,Integer>>v1)
          throws Exception{
         //Comparator对比器二元元组sort(Comparator<?super Tuple2<String,Integer>>>c])
          v1.sort(new Comparator<Tuple2<String,Integer>>(){
            //比较集合中的两个元素的大小有三家评论和数量
            public int compare(Tuple2<String,Intger>o1,Tuple2<String,Integer>o2){
              return -(o1._2-o2._2); //-()降序
            }
          });
          
          //subList返回的列表是不能串行化的,要用一个串行化的方法返回这个值。
          List<Tuple2<String,Integer>>newList=new ArrayList<Tuple2<String,Integer>>();
          newList.addAll(v1.subList(0,v1.size()>5?5:v1.size()));
          
          //subList(int fromIndex,int toIndex)
         // return v1;//sort是没有返回值的,它是对V本身排序的
         //return v1.subList(0,4);//前5  有些集合里面可能没有5个元素会报错要先判断一下
          //return v1.subList(0,v1.size()>5?5:v1.size());//>5就5否则的话就v1.size
          return newList;
        }
        }); 
       //11.变换pairRDD到普通RDD,否则没有sortBy方法,这里先把它转成非JavaPairRDD的  JavaPairRDD它没有排序的方法,只有按Key
         //第一个是二元组,第二个是List 这个是评论数量<Tuple2<String,Ingerger>                 
         JavaRDD<Tuple2<String,List<Tuple2<String,Ingerger>>>>rdd11=
         rdd10.map(newFunction<Tuple2<String,List<Tuple2<String,
         Integer>>>,Tuple2<String,List<Tuple2<String,Integer>>>>(){
           public Tuple2<String,List<Tuple2<String,Integer>>>call
          (Tuple2<String,List<Tuple2<String,Integer>>>v1)throws Exception{
             return t;
           }                                 
          });
          //12.商家间排序                                 
         //Object这是你要返回的值,Function这是一个元组,它返回的是一个整数
         //按照每一个商家它第一个评论的数量,来倒排
         rdd11.sortBy(new Function<Tuple2<String,List<Tuple2<String,Integer>>>,
         Integer(){
           public Integer call(Tuple2<String,List<Tuple2<String,Integer>>>t)
           throw Exception{
             return -t._2.get(0)._2;
           }
          },true,2);// sortBy 
                                                                             
          List<Tuple2<String,String>>list=rdd5.collect();
           for(Tuple2 t:list){
                System.out.println(t);
          }
        
        //输出rdd7
         List list=rdd8.collect();
           for(Object o:list){
                System.out.println(o);
          }
      }
    }
                                            
    [sort源码]//sort没有返回值
    default void sort(Comparator<?super E>c){
       Object[] a=this.toArray();
       Arrays.sort(a,(Comparator)c);
       ListIterator<E>i=this.listIterator();
      for(Object e : a){
        i.next();
        i.set((E)e);
      }
    }  
                                            
                                            JavaRDD<Tuple2<String,List<Tuple2<String>>>>.....      
    [sortBy]
    //JFunction[T,S]参数,ascending:升降序  numPartition:分区                             
    def sortBy[S](f:JFunction[T,S],ascending:Boolean,numPartition:Int):JavaRDD[T]
    
  • 相关阅读:
    GET 请求和 POST 请求
    爬虫
    模板继承
    静态文件配置
    终端cmd创建django
    商城商品分类导航效果
    css样式
    视图部分
    django初识和路由
    【源码分析】cocos2dx的Action
  • 原文地址:https://www.cnblogs.com/SteveDZC/p/9779451.html
Copyright © 2011-2022 走看看