zoukankan      html  css  js  c++  java
  • 常用Transformation算子

    map 产生的键值对是tupple,      split分隔出来的是数组

    一、常用Transformation算子 (map  、flatMap 、filter 、groupByKey 、reduceByKey 、sortByKey  、join 、cogroup )

         JAVA:

    package day2;
    
    import java.util.Arrays;
    import java.util.List;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    
    /**
     * 演示常用的Transformation类型的算子
     */
    public class TransformationDemo {
    
    	public static void main(String[] args) {
    		distinctDemo();
    //		filterDemo();
    //		mapDemo();
    	}
    	public static void distinctDemo(){
    		SparkConf conf=new SparkConf().setAppName("distinctDemo")
    				.setMaster("local");
    		JavaSparkContext sc=new JavaSparkContext(conf);
    		//
    		JavaRDD<Integer> datasRDD = sc.parallelize(Arrays.asList(1,2,3,3,4,2,1));
    		JavaRDD<Integer> resultRDD = datasRDD.distinct();
    		List<Integer> list = resultRDD.collect();
    		for (Integer value : list) {
    			System.out.println(value);
    		}
    		sc.close();
    	}
    	/**
    	 * filter算子使用案例
    	 * filter算子可以根据某一条件过滤数据。
    	 * 如果条件成立,返回true,保留数据。
    	 * 如果条件不成立,返回false,过滤掉数据。
    	 */
    	public static void filterDemo(){
    		SparkConf conf=new SparkConf().setAppName("filterDemo")
    						.setMaster("local");
    		JavaSparkContext sc=new JavaSparkContext(conf);
    		//模拟数据,并创建初始RDD
    		JavaRDD<Integer> datasRdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7));
    		JavaRDD<Integer> resultRdd = datasRdd.filter(new Function<Integer, Boolean>() {
    
    			private static final long serialVersionUID = 1L;
    
    			public Boolean call(Integer num) throws Exception {
    				// TODO Auto-generated method stub
    				return num%2!=0;
    			}
    		});
    		List<Integer> list = resultRdd.collect();
    		for (Integer value : list) {
    			System.out.println(value);
    		}
    		sc.close();
    				
    	}
    	/**
    	 * map算子案例
    	 * 用Java开发Spark中,有两种类型的map算子:
    	 * 第一种,返回单个值用map()
    	 * 第二种,返回键值对用mapToPair()
    	 */
    	public static void mapDemo(){
    		SparkConf conf=new SparkConf().setAppName("mapDemo")
    				.setMaster("local");
    		JavaSparkContext sc=new JavaSparkContext(conf);
    		
    		//模拟数据,将每个数据乘2
    		List<Integer> datas = Arrays.asList(1,2,3,4,5,6,7);
    		//创建初始的RDD
    		JavaRDD<Integer> datasRdd = sc.parallelize(datas);
    		JavaRDD<Integer> result = datasRdd.map(new Function<Integer, Integer>() {
    			private static final long serialVersionUID = 1L;
    
    			public Integer call(Integer num) throws Exception {
    				// TODO Auto-generated method stub
    				return num*2;
    			}
    		});
    		
    		List<Integer> resultInfo = result.collect();
    		for (Integer data : resultInfo) {
    			System.out.println(data);
    		}
    		sc.close();
    	}

    }

     

    public static void joinDemo(){
    		SparkConf conf=new SparkConf().setAppName("joinDemo")
    						.setMaster("local");
    		JavaSparkContext sc=new JavaSparkContext(conf);
    		//模拟数据
    		@SuppressWarnings({ "unused", "unchecked" })
    		List<Tuple2<Integer, String>> stusInfo = Arrays.asList(
    			new Tuple2<Integer, String>(1,"张三"),
    			new Tuple2<Integer, String>(2,"李四"),
    			new Tuple2<Integer, String>(3,"王五"),
    			new Tuple2<Integer, String>(4,"小六")
    				);
    		@SuppressWarnings({ "unchecked", "unused" })
    		List<Tuple2<Integer, Integer>> scoresInfo = Arrays.asList(
    			new Tuple2<Integer, Integer>(1, 90),
    			new Tuple2<Integer, Integer>(2, 80),
    			new Tuple2<Integer, Integer>(3, 96)
    				);
    		//创建初始RDD
    		JavaPairRDD<Integer, String> stusRDD = sc.parallelizePairs(stusInfo);
    		JavaPairRDD<Integer, Integer> scoresRDD = sc.parallelizePairs(scoresInfo);
    		//通过join算子进行统计
    		JavaPairRDD<Integer, Tuple2<String, Integer>> resultRDD = stusRDD.join(scoresRDD);
    		resultRDD.foreach(new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() {
    
    			private static final long serialVersionUID = 1L;
    
    			public void call(Tuple2<Integer, Tuple2<String, Integer>> info)
    					throws Exception {
    				// TODO Auto-generated method stub
    				System.out.println("学号:"+info._1 +" 姓名:"
    				                  +info._2._1+" 分数: "+info._2._2);
    			}
    		});
    		sc.close();
    	}
    
    	/**
    	 * reduceByKey案例
    	 */
    	public static void reduceByKeyDemo(){
    		SparkConf conf=new SparkConf().setAppName("reduceByKeyDemo")
    					.setMaster("local");
    		JavaSparkContext sc=new JavaSparkContext(conf);
    		//模拟数据,(班级,分数),统计每班的成绩
    		@SuppressWarnings("unchecked")
    		List<Tuple2<String, Integer>> scores = Arrays.asList(
    			new Tuple2<String, Integer>("class1",90),
    			new Tuple2<String, Integer>("class2",99),
    			new Tuple2<String, Integer>("class1",92),
    			new Tuple2<String, Integer>("class1",93),
    			new Tuple2<String, Integer>("class2",80),
    			new Tuple2<String, Integer>("class1",90)
    		);
    		//创建初始的RDD
    		JavaPairRDD<String, Integer> datasRDD = sc.parallelizePairs(scores);
    		//通过reduceByKey统计每个班级的总分
    		/**
    		 * reduceByKey,首先根据Key进行分组,然后对分组后的Value值进行计算
    		 */
    		JavaPairRDD<String, Integer> resultRDD = datasRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
    			
    			private static final long serialVersionUID = 1L;
    
    			public Integer call(Integer num1, Integer num2) throws Exception {
    				// TODO Auto-generated method stub
    				return num1+num2;
    			}
    		});
    		List<Tuple2<String, Integer>> list = resultRDD.collect();
    		for (Tuple2<String, Integer> tuple2 : list) {
    			System.out.println("班级:"+tuple2._1+",总分:"+tuple2._2);
    		}
    		sc.close();
    	}
    	/**
    	 * groupByKey案例
    	 */
    	public static void groupByKeyDemo(){
    		SparkConf conf=new SparkConf().setAppName("groupByKey")
    						.setMaster("local");
    		JavaSparkContext sc=new JavaSparkContext(conf);
    		
    		//模拟数据,(班级,分数),统计每班的成绩
    		@SuppressWarnings("unchecked")
    		List<Tuple2<String, Integer>> scores = Arrays.asList(
    			new Tuple2<String, Integer>("class1",90),
    			new Tuple2<String, Integer>("class2",99),
    			new Tuple2<String, Integer>("class1",92),
    			new Tuple2<String, Integer>("class1",93),
    			new Tuple2<String, Integer>("class2",80),
    			new Tuple2<String, Integer>("class1",90)
    				);
    		//创建初始的RDD
    		//注意:此时用sc.parallelizePairs产生键值对类型的JavaPairRDD
    		JavaPairRDD<String, Integer> datasRDD = sc.parallelizePairs(scores);
    		
    		/**
    		 * groupByKey,根据Key进行分组(聚合),将Key值相同的Value放在一个集合中。
    		 */
    		JavaPairRDD<String, Iterable<Integer>> resultRDD = datasRDD.groupByKey();
    		
    		
    		resultRDD.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {
    			private static final long serialVersionUID = 1L;
    
    			public void call(Tuple2<String, Iterable<Integer>> infos) throws Exception {
    				// TODO Auto-generated method stub
    				System.out.println("班 级: "+infos._1);
    				System.out.println(infos._2.toString());
    				System.out.println("===================================");
    			}
    		});
    		sc.close();
    	}
    	/**
    	 * distinct()案例
    	 */
    	public static void distinctDemo(){
    		SparkConf conf=new SparkConf().setAppName("distinctDemo")
    				.setMaster("local");
    		JavaSparkContext sc=new JavaSparkContext(conf);
    		//
    		JavaRDD<Integer> datasRDD = sc.parallelize(Arrays.asList(1,2,3,3,4,2,1));
    		JavaRDD<Integer> resultRDD = datasRDD.distinct();
    		List<Integer> list = resultRDD.collect();
    		for (Integer value : list) {
    			System.out.println(value);
    		}
    		sc.close();
    	}
    

      

    // cogroup() 
    public static void cogroup(){ SparkConf conf=new SparkConf().setAppName("cogroup").setMaster("local"); JavaSparkContext sc=new JavaSparkContext(conf); @SuppressWarnings("unchecked") List<Tuple2<Integer, String>> asList = Arrays.asList( new Tuple2<Integer, String>(1, "张三"), new Tuple2<Integer, String>(2, "李四"), new Tuple2<Integer, String>(3, "刘伟"), new Tuple2<Integer, String>(4, "凌风") ); @SuppressWarnings("unchecked") List<Tuple2<Integer, Integer>> asList2 = Arrays.asList( new Tuple2<Integer, Integer>(1, 90), new Tuple2<Integer, Integer>(2, 60), new Tuple2<Integer, Integer>(3, 80) ); JavaPairRDD<Integer, String> parallelizePairs = sc.parallelizePairs(asList); JavaPairRDD<Integer, Integer> parallelizePairs2 = sc.parallelizePairs(asList2); JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> cogroup = parallelizePairs.cogroup(parallelizePairs2); cogroup.foreach(new VoidFunction<Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>>() { /** * */ private static final long serialVersionUID = 1L; public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t) throws Exception { System.out.println(t._1+" "+t._2._1+" "+t._2._2); } }); }

      

     scala:

    object Transformatiopn {
      def main(args: Array[String]): Unit = {
     //   mapDemo
        filter
      }
        def mapDemo: Unit = {
          val conf = new SparkConf().setAppName("mapDemo").setMaster("local")
          val sc = new SparkContext(conf)
          val datasRdd = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7), 2)
          val result = datasRdd.map(x => x * 2)
          result.foreach(println(_))
          sc.stop()
        }
        def filter: Unit ={
          val conf = new SparkConf().setAppName("mapDemo").setMaster("local")
          val sc = new SparkContext(conf)
          val datasRdd = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7))
          val result = datasRdd.filter(x=>x%2!=0)
          result.foreach(println(_))
          sc.stop()
        }

    }

     

    def groupBykey: Unit ={
          val conf=new SparkConf().setAppName("maoDemo").setMaster("local")
          val sc=new SparkContext(conf)
          val dataRDD=sc.parallelize(Array(Tuple2("class1",90),Tuple2("class1",91)
          ,Tuple2("class2",91),Tuple2("class2",93)))
          val result =dataRDD.groupByKey()
          result.foreach(it=>{
    
            println(it._1)
          println(it._2.toString())})
          sc.stop()
        }
    def  reduceBykey: Unit ={
          val conf=new SparkConf().setAppName("maoDemo").setMaster("local")
          val sc=new SparkContext(conf)
          val dataRDD=sc.parallelize(Array(Tuple2("class1",90),Tuple2("class1",91)
            ,Tuple2("class2",91),Tuple2("class2",93)))
          val result=dataRDD.reduceByKey(_+_)
          result.foreach(it=>{
    
            println(it._1)
            println(it._2)})
          sc.stop()
        }
        def join: Unit ={
          val conf=new SparkConf().setAppName("maoDemo").setMaster("local")
          val sc=new SparkContext(conf)
          val stuRDD=sc.parallelize(Array(Tuple2(1,"zhangsan"),Tuple2(2,"lisi"),Tuple2(3,"liuwe"),Tuple2(4,"liu")))
          val scoreRDD=sc.parallelize(Array(Tuple2(1,80),Tuple2(2,89),Tuple2(3,90)))
          val result=stuRDD.join(scoreRDD)
          result.foreach(it=>{
            println(it._1+" "+it._2._1+" "+it._2._2)
          })
        }

       

    //cogroup算子
    def cogroup: Unit ={ val conf=new SparkConf().setAppName("cogroup").setMaster("local") val sc=new SparkContext(conf) val stuRDD=sc.parallelize(Array(Tuple2(1,"zhangsan"),Tuple2(2,"lisi"),Tuple2(3,"liuwe"),Tuple2(4,"liu"))) val scoreRDD=sc.parallelize(Array(Tuple2(1,80),Tuple2(2,89),Tuple2(3,90))) val result =stuRDD.cogroup(scoreRDD) result.foreach(it=>{ println(it._1+" "+it._2._1+" "+it._2._2) }) }

      

        2、join()  分为:join (相当于内连接)、leftOuterJoin(左外链接)、rightOuterJoin(右外连接)

            hive中空值返回NULL,spark中返回none

            cogroup  类似于leftOuterJoin
     

    补充:1、农产品项目 (统计每个省份的农产品市场总数)

    package SparkCore.day1
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * Created by tg on 3/21/17.
      */
    object ProductDemo {
      def main(args: Array[String]): Unit = {
        marketCount
      }
    
      /**
        * 统计每个省份的农产品市场总数
        */
      def marketCount: Unit ={
        val conf=new SparkConf().setAppName("marketCount")
          .setMaster("local")
        val sc=new SparkContext(conf)
        val lines=sc.textFile("file:///home/tg/datas/product.txt")
        /**
          * 1、map算子分隔数据
          * 2、filter算子根据每行的长度进行过滤
          * 3、map算子取出每行的省份、农产品市场
          * 4、distinct算子对省份、农产品市场进行去重
          * 5、groupByKey按照省份进行分组,将相同省份的农产品市场放在同一个集合中,
          * 集合的长度即为农产品市场总数
          */
        lines.map(x=>x.split("	"))
            .filter(x=>x.length==6)
            .map(x=>x(4)+":"+x(3)) //山东:济南农产品市场
            .distinct() //对“省份:产品市场”进行去重
            .map(x=>{
              val info=x.split(":")
              val prov=info(0)
              val market=info(1)
              (prov,market)  //形成键值对(省份,农产品市场)
            }).groupByKey() //根据KEY值省份进行分组
              .map(x=>{
                val prov=x._1
                val mcount=x._2.size
                (prov,mcount) //形成键值对(省份,农产品市场总数)
              })
              .foreach(item=>{
                println(item._1+"省的农产品市场总数:"+item._2)
              })
        sc.stop()
      }
    }

    hive 实现;select p.provice,count(*)act from (select provice,market from prducts group by provice,market)p group by p.provice order by act desc;

     

    /**
        * 统计没有农产品市场的省份有哪些
        */
      def noMarketProvince: Unit ={
        val conf=new SparkConf().setAppName("noMarketProvince")
                  .setMaster("local")
        val sc=new SparkContext(conf)
        //有农产品市场的省份,全部数据
        val haveMarketProvince=sc.textFile("file:///home/tg/datas/product.txt")
        //全国所有的省份
        val allProvince=sc.textFile("file:///home/tg/datas/allprovince.txt")
    
        //针对有农产品市场的省份,全部数据进行操作
        val haveMarketRDD=haveMarketProvince.map(_.split("	"))
              .filter(x=>x.length==6 && x(4)!=null)
              .map(x=>{
                val prov=x(4).trim
                (prov,prov) //形成键值对
              }).distinct() //对省份进行去重
        //针对全国所有的省份进行操作
        val allProvinceRDD=allProvince.map(x=>(x,x))
        //进行左外联接操作
        val resultRDD=allProvinceRDD.leftOuterJoin(haveMarketRDD)
        resultRDD.filter(x=>x._2._2==None)
          .foreach(x=>println(x._1))
    
        sc.stop()
      }
     

    用hive 实现:
    select a.provinceName from (select provice from products group by provice)p right outer join allprovince a on p.province=provinceName wheere p.province is null;
    另一种:
     select provinces from allprovinces where provinces not in (select distinct(province) province from products);

        

    //统计农产品种类数排名前三的省份
    def top3: Unit ={ val conf =new SparkConf().setAppName("top3").setMaster("local") val sc=new SparkContext(conf) val lines=sc.textFile("file:///home/hadoop/product.txt") val lines1=sc.textFile("file:///home/hadoop/allprovince.txt") lines.map(_.split(",")).filter(x=>x.length==6&&x(0)!=null&&x(4)!=null).map(x=>(x(4),x(0))) .distinct() .groupByKey() .map(x=>{ val sheng=x._1 val count=x._2.size (count,sheng) }).sortByKey(false).take(3) .foreach(item=>{ println(item._2+" "+item._1) }) sc.stop() }

    hive实现:select p.province,count(*) as cnt from (select province,name from products group by province,name)p group by p.province order by cnt desc limit 3;

      

    //统计每个省份排名前三的农产品市场
    def ofodaan: Unit ={
        val conf =new SparkConf().setAppName("ofodaan").setMaster("local")
        val sc=new SparkContext(conf)
        val lines=sc.textFile("file:///home/hadoop/product.txt")
        lines.map(_.split(",")).filter(x=>x.length==6&&x(0)!=null&&x(3)!=null&&x(4)!=null).map(x=>{
          val name=x(0).trim
          val market=x(3).trim
          val prov=x(4).trim
          (prov+"-"+market,name)
        }).groupByKey().map(x=>{
          val info=x._1.split("-")
          val prov=info(0)
          val market=info(1)
          val mcount=x._2.size
          (prov,(market,mcount))
        }).groupByKey().map(x=>{
          val prov=x._1
          val datas=x._2.toArray.sortWith(_._2>_._2).take(3)
          (prov,datas)
        }).foreach(it=>{
          println(it._1)
          it._2.foreach(x=>{
            println(x._1+" "+x._2)
          })
        })
    

      

    //计算山西省的每种农产品的价格波动趋势,即计算每天价格均值
    
    用hive  实现:
    select name ,times,
    if(count(price)>2,round((sum(price)-max(price)-min(price))/(count(price)-2),2),round(sum(price)/count(price),2))
    from product where province='山西'
    group by name,times;

    //统计排名前3的省份共同拥有的农产品类型
    hive实现:

    select c.name,count(*) as cont from
    (select a.province,a.name from
    (select provice,name from product group by province,name)a
    left semi join
    (select p.provice,count(*) as cnt from
    (select provice,name from product group by province,name) p
    group by p.province order by cnt desc limit 3)b
    on a.province=b.province)c
    group by c.name having cont>2;



      

    //统计排名前3的省份共同拥有的农产品类型
    //用到ArrayBuffer,需要导入 import scala.collection.mutable.ArrayBuffer
    def top_3: Unit ={
        val conf =new SparkConf().setAppName("ofodaan").setMaster("local")
        val sc=new SparkContext(conf)
        val lines=sc.textFile("file:///home/hadoop/product.txt")
        val result=lines.map(_.split(",")).filter(x=>x.length==6&&x(0)!=null&&x(4)!=null)
          .map(x=>{
            val prov=x(4).trim
            val name=x(0).trim
            (prov,name)
          })
          .distinct()
          .groupByKey()
          .map(x=>{
            val prov=x._1
            val names=x._2
            val ncount=names.toArray.length
            (ncount,(prov,names))
          }).sortByKey(false).take(3)
          .map(x=>{
            val prov=x._2._1
            val names=x._2._2.toArray
            (prov,names)
          })
        val provs=new ArrayBuffer[String]() //存放排名前3的省份
    
        val array1=new ArrayBuffer[String]() //存入排名第一的省份的农产品
        val array2=new ArrayBuffer[String]() //存入排名第二的省份的农产品
        val array3=new ArrayBuffer[String]() //存入排名第三的省份的农产品
    
        val numTest=sc.accumulator(0) //创建累加器
        result.foreach(item=>{
          provs+=item._1
          numTest+=1
          if(numTest.value==1){
            array1++=item._2 //排名第一的省份的农产品
          }else if(numTest.value==2){
            array2++=item._2 //排名第二的省份的农产品
          }else{
            array3++=item._2 //排名第三的省份的农产品
          }
        })
        //通过intersect取交集,即可统计出排名前3的省份共同所有的农产品类型
        val produts=(array1.intersect(array2)).intersect(array3)
    
        println ("排名前三名的省份是:")
        for(elem <- provs) println(elem)
        println("排名前3的省份共同所有的农产品类型:")
        for(elem <- produts) println(elem)
    
        sc.stop()
      }
    

      

          2、其他格式的文件导入Linux上,会出现乱码的格式:

             另存为text文件,用nodepad打开转换为utf-8,如果还不行,   在CRT上打开options ---sessions-----Appearance  选择编码utf-8

          3、sortByKey 默认是true(升序)

          4、已经有了hive为何还要推出Hbase:

                hive:离线批处理

                Hbase:数据实时计算问题

          5、hadoop 插件重要性依次是:

                hive    

                MR  HBase(storm+Hbase+kafka)

                hdfs  zookeeper

               中间件:kafka  flume  sqoop  azkaban   Ooize 

              spark:

               scala spark  core  spark  streaming

               spark sql     spark mllib

               spark Graphx

         6、在spark架构的运式原理中

             主要进程:DriverMasterWorkerExecutor

             线程:Task

  • 相关阅读:
    vs中nodejs代码 resharper 提示 ECMAScript2015 Feature. your Current language level is ECMAScript5的解决办法
    port: ${SERVER_PORT:9190} #首先取环境变量,如果环境变量中没有,就取 9190 这个固定值
    SpringbBoot之JPA批量更新
    Linq To Object多字段组合唯一校验
    LINQ获取两个List的交集
    Linq实现分组后取最大(小)值
    Maven:浅析依赖(dependency)关系中 scope 的含义
    Source roots (or source folders) Test source roots (or test source folders; shown as rootTest)Resource rootsTest resource roots
    Parameter 0 of method sqlSessionTemplate in org.mybatis.spring.boot.autoconfigure.MybatisAutoConfiguration required a single bean, but 2 were found:
    sping boot/cloud配置文件 on 读取为true
  • 原文地址:https://www.cnblogs.com/liuwei6/p/6596025.html
Copyright © 2011-2022 走看看