zoukankan      html  css  js  c++  java
  • 【Spark篇】---Spark中Transformations转换算子

    一、前述

    Spark中默认有两大类算子,Transformation(转换算子),懒执行。action算子,立即执行,有一个action算子 ,就有一个job。

    通俗些来说由RDD变成RDD就是Transformation算子,由RDD转换成其他的格式就是Action算子。

    二、常用Transformation算子

     假设数据集为此:

    1、filter

         过滤符合条件的记录数true保留,false过滤掉。

    Java版:

    package com.spark.spark.transformations;
    
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.VoidFunction;
    /**
     * filter
     * 过滤符合符合条件的记录数,true的保留,false的过滤掉。
     *
     */
    public class Operator_filter {
        public static void main(String[] args) {
            /**
             * SparkConf对象中主要设置Spark运行的环境参数。
             * 1.运行模式
             * 2.设置Application name
             * 3.运行的资源需求
             */
            SparkConf conf = new SparkConf();
            conf.setMaster("local");
            conf.setAppName("filter");
            /**
             * JavaSparkContext对象是spark运行的上下文,是通往集群的唯一通道。
             */
            JavaSparkContext jsc = new JavaSparkContext(conf);
            JavaRDD<String> lines = jsc.textFile("./words.txt");
            JavaRDD<String> resultRDD = lines.filter(new Function<String, Boolean>() {
    
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public Boolean call(String line) throws Exception {
                    return !line.contains("hadoop");//这里是不等于
                }
                
            });
            
            resultRDD.foreach(new VoidFunction<String>() {
                
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public void call(String line) throws Exception {
                    System.out.println(line);
                }
            });
            jsc.stop();
        }
    }

    scala版:

    函数解释:

    进来一个String,出去一个Booean.

    结果:

     

     

     2、map

    将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。

    特点:输入一条,输出一条数据。

     

     

    /**
     * map 
     * 通过传入的函数处理每个元素,返回新的数据集。
     * 特点:输入一条,输出一条。
     * 
     * 
     * @author root
     *
     */
    public class Operator_map {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            conf.setMaster("local");
            conf.setAppName("map");
            JavaSparkContext jsc = new JavaSparkContext(conf);
            JavaRDD<String> line = jsc.textFile("./words.txt");
            JavaRDD<String> mapResult = line.map(new Function<String, String>() {
    
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public String call(String s) throws Exception {
                    return s+"~";
                } 
            });
            
            mapResult.foreach(new VoidFunction<String>() {
                
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public void call(String t) throws Exception {
                    System.out.println(t);
                }
            });
            
            jsc.stop();
        }
    }

    函数解释:

    进来一个String,出去一个String。

    函数结果:

     

     3、flatMap(压扁输出,输入一条,输出零到多条)

    map后flat。与map类似,每个输入项可以映射为0到多个输出项。

     

    package com.spark.spark.transformations;
    
    import java.util.Arrays;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    
    /**
     * flatMap
     * 输入一条数据,输出0到多条数据。
     * @author root
     *
     */
    public class Operator_flatMap {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            conf.setMaster("local");
            conf.setAppName("flatMap");
    
            JavaSparkContext jsc = new JavaSparkContext(conf);
            JavaRDD<String> lines = jsc.textFile("./words.txt");
            JavaRDD<String> flatMapResult = lines.flatMap(new FlatMapFunction<String, String>() {
    
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterable<String> call(String s) throws Exception {
                    
                    return Arrays.asList(s.split(" "));
                }
                
            });
            flatMapResult.foreach(new VoidFunction<String>() {
                
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public void call(String t) throws Exception {
                    System.out.println(t);
                }
            });
            
            jsc.stop();
        }
    }

    函数解释:

    进来一个String,出去一个集合。

    Iterater 集合
    iterator 遍历元素

    函数结果:

    4、sample(随机抽样)

    随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样。True,fraction,long

    True 抽样放回

    Fraction 一个比例 float 大致 数据越大 越准确

    第三个参数:随机种子,抽到的样本一样 方便测试

    package com.spark.spark.transformations;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.PairFlatMapFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    
    import scala.Tuple2;
    
    public class Operator_sample {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            conf.setMaster("local");
            conf.setAppName("sample");
            
            JavaSparkContext jsc = new JavaSparkContext(conf);
            JavaRDD<String> lines = jsc.textFile("./words.txt");
            JavaPairRDD<String, Integer> flatMapToPair = lines.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
    
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterable<Tuple2<String, Integer>> call(String t)
                        throws Exception {
                    List<Tuple2<String,Integer>> tupleList = new ArrayList<Tuple2<String,Integer>>();
                    tupleList.add(new Tuple2<String,Integer>(t,1));
                    return tupleList;
                }
            });
            JavaPairRDD<String, Integer> sampleResult = flatMapToPair.sample(true,0.3,4);//样本有7个所以大致抽样为1-2个
            sampleResult.foreach(new VoidFunction<Tuple2<String,Integer>>() {
                
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public void call(Tuple2<String, Integer> t) throws Exception {
                    System.out.println(t);
                }
            });
            
            jsc.stop();
        }
    }

    函数结果:

     5.reduceByKey

    将相同的Key根据相应的逻辑进行处理。

     

    package com.spark.spark.transformations;
    
    import java.util.Arrays;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    
    import scala.Tuple2;
    
    public class Operator_reduceByKey {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf();
            conf.setMaster("local").setAppName("reduceByKey");
            JavaSparkContext jsc = new JavaSparkContext(conf);
            JavaRDD<String> lines = jsc.textFile("./words.txt");
            JavaRDD<String> flatMap = lines.flatMap(new FlatMapFunction<String, String>() {
    
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterable<String> call(String t) throws Exception {
                    return Arrays.asList(t.split(" "));
                }
            });
            JavaPairRDD<String, Integer> mapToPair = flatMap.mapToPair(new PairFunction<String, String, Integer>() {
    
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public Tuple2<String, Integer> call(String t) throws Exception {
                    return new Tuple2<String,Integer>(t,1);
                }
                
            });
            
            JavaPairRDD<String, Integer> reduceByKey = mapToPair.reduceByKey(new Function2<Integer,Integer,Integer>(){
    
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1+v2;
                }
                
            },10);
            reduceByKey.foreach(new VoidFunction<Tuple2<String,Integer>>() {
                
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public void call(Tuple2<String, Integer> t) throws Exception {
                    System.out.println(t);
                }
            });
            
            jsc.stop();
        }
    }

     函数解释:

    函数结果:

     

    6、sortByKey/sortBy

    作用在K,V格式的RDD上,对key进行升序或者降序排序。

    Sortby在java中没有

     

    package com.spark.spark.transformations;
    
    import java.util.Arrays;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    
    import scala.Tuple2;
    
    public class Operator_sortByKey {
    	public static void main(String[] args) {
    		SparkConf conf = new SparkConf();
    		conf.setMaster("local");
    		conf.setAppName("sortByKey");
    		JavaSparkContext jsc = new JavaSparkContext(conf);
    		JavaRDD<String> lines = jsc.textFile("./words.txt");
    		JavaRDD<String> flatMap = lines.flatMap(new FlatMapFunction<String, String>() {
    
    			/**
    			 * 
    			 */
    			private static final long serialVersionUID = 1L;
    
    			@Override
    			public Iterable<String> call(String t) throws Exception {
    				return Arrays.asList(t.split(" "));
    			}
    		});
    		JavaPairRDD<String, Integer> mapToPair = flatMap.mapToPair(new PairFunction<String, String, Integer>() {
    
    			/**
    			 * 
    			 */
    			private static final long serialVersionUID = 1L;
    
    			@Override
    			public Tuple2<String, Integer> call(String s) throws Exception {
    				return new Tuple2<String, Integer>(s, 1);
    			}
    		});
    		
    		JavaPairRDD<String, Integer> reduceByKey = mapToPair.reduceByKey(new Function2<Integer, Integer, Integer>() {
    			
    			/**
    			 * 
    			 */
    			private static final long serialVersionUID = 1L;
    
    			@Override
    			public Integer call(Integer v1, Integer v2) throws Exception {
    				return v1+v2;
    			}
    		});
    		reduceByKey.mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {
    
    			/**
    			 * 
    			 */
    			private static final long serialVersionUID = 1L;
    
    			@Override
    			public Tuple2<Integer, String> call(Tuple2<String, Integer> t)
    					throws Exception {
    				return new Tuple2<Integer, String>(t._2, t._1);
    			}
    		}).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {//先把key.value对调,然后排完序后再对调回来 false是降序,True是升序
    
    			/**
    			 * 
    			 */
    			private static final long serialVersionUID = 1L;
    
    			@Override
    			public Tuple2<String, Integer> call(Tuple2<Integer, String> t)
    					throws Exception {
    				return new Tuple2<String,Integer>(t._2,t._1);
    			}
    		}).foreach(new VoidFunction<Tuple2<String,Integer>>() {
    			
    			/**
    			 * 
    			 */
    			private static final long serialVersionUID = 1L;
    
    			@Override
    			public void call(Tuple2<String, Integer> t) throws Exception {
    				System.out.println(t);
    			}
    		});
    	}
    }
    

     代码解释:先对调,排完序,在对调过来

     代码结果:

  • 相关阅读:
    python操作Excel读写--使用xlrd
    python 使用pymssql连接sql server数据库
    python pdb调试
    sqlser生成guid与复制造数
    sqlser游标与for循环
    bat写循环
    Jenkins配置多任务
    git命令行与Jenkins
    Jenkins执行python脚本
    Windows环境Jenkins配置免密登录Linux
  • 原文地址:https://www.cnblogs.com/LHWorldBlog/p/8401163.html
Copyright © 2011-2022 走看看