zoukankan      html  css  js  c++  java
  • spark算子介绍

    1.spark的算子分为转换算子和Action算子,Action算子将形成一个job,转换算子RDD转换成另一个RDD,或者将文件系统的数据转换成一个RDD

    2.Spark的算子介绍地址:http://spark.apache.org/docs/2.3.0/rdd-programming-guide.html

    3.Spark操作基本步骤【java版本,其他语言可以根据官网的案例进行学习】

    (1)创建配置文件,将集群的运行模式设置好,给作业起一个名字,可以使用set方法其他配置设入。

    SparkConf sparkConf = new SparkConf().setAppName("Demo").setMaster("local");
    这里使用的是local的运行模式,起的名字是Demo

    (2)创建SparkContext

    JavaSparkContext javaContext = new JavaSparkContext(sparkConf);

    (3)使用算子,操作数据

         JavaRDD<String> javaRdd = sparkContext.textFile("logfile.txt",1);
            javaRdd = javaRdd.cache();//这一句必须这样写,我们在数据计算很费时的时候,将数据缓存
            long line = javaRdd.count();
            System.out.println(line);

    (4)关闭资源

    sparkContext.close();

    上面以一个求出数据行数的例子,看一下代码操作的流程。

    4.Action算子和介绍和举例

    (1)map算子;将数据读取使用map进行操作,使用foreach算子计算出 结果。 每一次读取partition中的一条数据进行分析

      案例:将数据乘以10,在输出,测试算子。

    package kw.test.demo;
    
    import java.util.Arrays;
    import java.util.List;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.VoidFunction;
    
    /*
     * 本案例:将数据 值乘以一个数,然后将数据的值返回。
     */
    public class MapApp {
        public static void main(String[] args) {
            
            SparkConf conf = new SparkConf().setMaster("local").setAppName("MapTest");
            JavaSparkContext jsc= new JavaSparkContext(conf);
            List<Integer> list = Arrays.asList(1,2,3,4,5) ; 
            JavaRDD<Integer> javaRdd = jsc.parallelize(list);
            JavaRDD<Integer> result = javaRdd.map(new Function<Integer,Integer>() {
                @Override
                public Integer call(Integer list) throws Exception {
                    // TODO Auto-generated method stub
                    return list*10;
                }
            });
            result.foreach(new  VoidFunction<Integer>() {
                @Override
                public void call(Integer result) throws Exception {
                    // TODO Auto-generated method stub
                    System.out.println(result);
                }
            });
            jsc.close();
        }
    }

    (2)MapPartition:将一整块的数据放入然后处理,他和map的区别就是,map将一部分数据放入然后计算,MapPartition将一整块的数据一起放入计算。

    如果数据量小的时候,可以是Mappartition中,如果数据量比较大的时候使用Map会比较好,因为可以防止内存溢出。

    package kw.test.demo;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    
    import org.apache.hadoop.hive.metastore.api.Function;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.VoidFunction;
    
    public class MapPartitionApp {
        public static void main(String[] args) {
            /*
             * 创建配置文件
             * 创建出RDD
             */
            SparkConf sparkconf = new SparkConf().setMaster("local").setAppName("mapPartition");
            JavaSparkContext javaSparkContext = new JavaSparkContext(sparkconf);
            /*
             * mapPartition的使用是将一个块一起放入到算子中操作。
             * 
             * 假如说RDD上的数据不是太多的时候,可以使用mapPartition 来操作,如果一个RDD的数据比较多还是使用map好
             * 返回了大量数据,容易曹成内存溢出。
             */
        /*    准备数据集*/
            List <String> list= Arrays.asList("kangwang","kang","wang");
            JavaRDD<String> javaRDD = javaSparkContext.parallelize(list);
            
            final Map<String,Integer> sore = new HashMap<String ,Integer>();
            sore.put("kangwang", 0);
            sore.put("kang", 13);
            sore.put("wang", 454);
            
            JavaRDD<Integer> sRDD= javaRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() {
    
                @Override
                public Iterator<Integer> call(Iterator<String> it) throws Exception {
                    // TODO Auto-generated method stub
                    List list = new ArrayList();
                    
                    while(it.hasNext())
                    {
                        String name = it.next();
                        Integer so = sore.get(name);
                        list.add(so);
                    }
                    Iterator i =list.iterator();
                    return  i;
                }
            });        
            sRDD.foreach(new VoidFunction<Integer>() {
                
                @Override
                public void call(Integer it) throws Exception {
                    // TODO Auto-generated method stub
                    System.out.println("it的值是"+it);
                }
            });
        }
    }

    (3)MapPartitionWithIndex:

    本案例:
      查看将数据的分配到具体的快上的信息。
      我们可以指定partition的个数,默认是2
      parallelize并行集合的时候,指定了并行度,也就是partition的个数是2
      具体他们的数据怎样分,我们并不知道,由spark自己分配
     如果想要知道,就可以使用此算子,将数据的值打印出来。

    package kw.test.demo;
    
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.VoidFunction;
    public class MapPartionWithIndex {
        public static void main(String[] args) {
            SparkConf sparkConf = new SparkConf().setAppName("MapPartitionWithIndex").setMaster("local");
            JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
            
            //准备数据
            List<String> list =new ArrayList<String>();
            list.add("Demo1");
            list.add("Demo2");
            list.add("Demo3");
            list.add("Demo4");
            list.add("Demo5");
            list.add("Demo6");
            list.add("Demo7");
            list.add("Demo8");
            list.add("Demo9");
            list.add("Demo10");
            list.add("Demo11");
            list.add("Demo12");
            //创建RDD,指定map的个数4
            JavaRDD<String> javaRDD = javaSparkContext.parallelize(list, 2);
            JavaRDD<String> javaRDD1 =javaRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
    
                @Override
                public Iterator<String> call(Integer index, Iterator<String> it2) throws Exception {
                    // TODO Auto-generated method stub
                    //index是partition的个数
                    List<String> list = new ArrayList<String>();
                    while(it2.hasNext())
                    {
                        String name = it2.next();
                        String info = "partition是:"+index+"数据的name是:"+name;
                        list.add(info);
                    }
                    return list.iterator();
                }
                
            }, true);
            
            javaRDD1.foreach(new VoidFunction<String>() {
                
                @Override
                public void call(String infos) throws Exception {
                    // TODO Auto-generated method stub
                    System.out.println(infos);
                }
            });       
        }
    }

    (4)coalesce算子,是架构RDD的partition的数量缩减

       将一定数量的partition压缩到更少的partition分区中去 

      使用的场景,很多时候在filter算子应用之后会优化一下到使用coalesce算子。 

      filter算子应用到RDD上面,说白了会应用到RDD对应到里面的每个partition上

      数据倾斜,换句话说就是有可能的partition里面就剩下了一条数据 建议使用coalesce算子,

      从前各个partition中 数据都更加的紧凑就可以减少它的 个数

    package kw.test.demo;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.VoidFunction;
    
    public class CoalesceOpter {
        public static void main(String[] args) {
            SparkConf sparkConf = new SparkConf().setAppName("coalesceDemo").setMaster("local");
            JavaSparkContext javaContext = new JavaSparkContext(sparkConf);
    
            List<String> list = Arrays.asList("kw1","djf","kw1","fgf",
                    "djf","kw1","djf","sdsds","kw1","ssdu","djf");
            
            JavaRDD<String>javaRDD = javaContext.parallelize(list,6);
            JavaRDD<String> info = javaRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
    
                @Override
                public Iterator<String> call(Integer arg0, Iterator<String> arg1) throws Exception {
                    // TODO Auto-generated method stub
                    List<String> list =new ArrayList<String>();
                    while(arg1.hasNext())
                    {
                        String name = arg1.next();
                        String info = arg0 +"^^^^^……………………………………………………………………"+ name;
                        list.add(info);
                    }
                    return list.iterator();
                }
            }, true);
            info.foreach(new VoidFunction<String>() {
                
                @Override
                public void call(String arg0) throws Exception {
                    // TODO Auto-generated method stub
                    System.out.println(arg0);
                }
            });
            info.coalesce(3);
            JavaRDD<String> javaRDD1 = info.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
    
                @Override
                public Iterator<String> call(Integer arg0, Iterator<String> arg1) throws Exception {
                    // TODO Auto-generated method stub
                    List<String> list = new ArrayList<String>();
                    while(arg1.hasNext())
                    {
                        String name = arg1.next();
                        String info2 ="        " +name +"………………………………" +arg0;
                        list.add(info2);
                    }
                    return list.iterator();
                }
            }, true);
            javaRDD1.foreach(new VoidFunction<String>() {
                
                @Override
                public void call(String arg0) throws Exception {
                    // TODO Auto-generated method stub
                    System.out.println(arg0);
                }
            });
        }
    }
    * 

    (5)filter此案例将数据的值过滤出来。使用的是filter算子

    package kw.test.demo;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.VoidFunction;
    
    /*
     * 此案例将数据的值过滤出来。使用的是filter算子
     */
    public class APPFilter {
        public static void main(String[] args) {
            SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("Filter");
            JavaSparkContext jsc = new JavaSparkContext(sparkConf);
            
            List<Integer> list = new ArrayList<Integer>();
            list.add(2);
            list.add(26);
            list.add(23);
            list.add(24);
            list.add(256);
            list.add(278);
            list.add(2543);
            list.add(23);
            list.add(26);
            
            JavaRDD<Integer> javaRDD = jsc.parallelize(list,2);
            //返回值  将返回true的数据返回
            JavaRDD<Integer> num= javaRDD.filter(new Function<Integer, Boolean>() {
                
                @Override
                public Boolean call(Integer it) throws Exception {
                    // TODO Auto-generated method stub
                    return it%2==0;
                }
            });
            num.foreach(new VoidFunction<Integer>() {
                
                @Override
                public void call(Integer arg0) throws Exception {
                    // TODO Auto-generated method stub
                    System.out.println(arg0);
                }
            });
        }
    }

     spark程序可以在本地运行,也可以在集群中运行,可以大成jar,放到真实的集群环境中运行程序。

     

  • 相关阅读:
    【MSSQL】MSSQL还原单mdf文件报1813错误
    【JSP】JSP基础学习记录(二)—— JSP的7个动作指令
    【JSP】JSP基础学习记录(一)—— 基础介绍以及3个编译指令
    【Other】U盘FAT32转NTFS且无数据丢失
    python map()
    python关于分割与拼接的那些事
    python shutil.copy()用法
    python enumerate用法
    工作中遇到一些难题1_5
    廖雪峰读书笔记_文件读写总结_2016_12_23
  • 原文地址:https://www.cnblogs.com/kw28188151/p/8570728.html
Copyright © 2011-2022 走看看