zoukankan      html  css  js  c++  java
  • spark

    1、在yarn模式下运行spark作业

    (1)如果想让spark作业可以运行在yarn上,那么首先必须在配置文件spark-env.sh文件中,配置HADOOP_CONF_DIR或者YARN_CONF_DIR属性,因为再使用spark-submit提交spark作业时,要先给yarn的resourcemanager发送请求,所以spark需要读spark-env.sh配置文件获取。

    (2)使用yarn-cluster模式提交时,使用以下语法即可: 

    ./bin/spark-submit
    --class path.to.your.Class
    --master yarn-cluster
    [options]
    <app jar>
    [app options]

    比如如下脚本示例:

    $ ./bin/spark-submit --class org.leo.spark.study.WordCount
    --master yarn-cluster
    --num-executors 1
    --driver-memory 100m
    --executor-memory 100m
    --executor-cores 1
    --queue hadoop队列
    /usr/local/spark-study/spark-study.jar

    • partitionBy算子

     1 package mapPartitions.xls;
     2 
     3 import java.util.ArrayList;
     4 import java.util.Arrays;
     5 import java.util.HashMap;
     6 import java.util.Iterator;
     7 import java.util.List;
     8 import java.util.Map;
     9 
    10 import org.apache.spark.SparkConf;
    11 import org.apache.spark.api.java.JavaRDD;
    12 import org.apache.spark.api.java.JavaSparkContext;
    13 import org.apache.spark.api.java.function.FlatMapFunction;
    14 
    15 public class TransFormation09_partitionBy {
    16 
    17     public static void main(String[] args) {
    18         // 
    19         partitionBy01();
    20     }
    21 
    22     public static void  partitionBy01(){
    23         SparkConf conf = new SparkConf().setAppName("MapPartitions").setMaster("local");
    24         JavaSparkContext sc = new JavaSparkContext(conf);
    25 
    26         List<String> studentNames = Arrays.asList("durant", "westbrook", "george", "wade");
    27         JavaRDD<String> studentNamesRDD = sc.parallelize(studentNames, 2);
    28 
    29         final Map<String, Double> studentScoreMap = new HashMap<String, Double>();
    30         studentScoreMap.put("durant", 278.5);
    31         studentScoreMap.put("westbrook", 290.0);
    32         studentScoreMap.put("george", 301.0);
    33         studentScoreMap.put("wade", 205.0);
    34 
    35         JavaRDD<Double> studentScoresRDD = studentNamesRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Double>() {
    36 
    37                     @Override
    38                     public java.util.Iterator<Double> call(Iterator<String> iterator) throws Exception {
    39 
    40                         List<Double> studentScoreList = new ArrayList<Double>();
    41 
    42                         while(iterator.hasNext()) {
    43                             String studentName = iterator.next();
    44                             Double studentScore = studentScoreMap.get(studentName);
    45                             studentScoreList.add(studentScore);
    46                         }
    47                         return studentScoreList.iterator();
    48                     }
    49 
    50                 });
    51 
    52         for(Double studentScore: studentScoresRDD.collect()) {
    53             System.out.println(studentScore);
    54         }
    55 
    56         sc.close();
    57     }
    58 }

    输出结果:

    278.5
    290.0
    301.0
    205.0

    • mapPartitionsWithIndex算子

    每次在进行分区数据传入时,顺便将分区号传入,(分区数量由函数parallelize指定)所以根据这个算子我们可以得知每个数据的分区编号。

     1 package mapPartitions.xls;
     2 
     3 import java.util.ArrayList;
     4 import java.util.Arrays;
     5 import java.util.Iterator;
     6 import java.util.List;
     7 
     8 import org.apache.spark.SparkConf;
     9 import org.apache.spark.api.java.JavaRDD;
    10 import org.apache.spark.api.java.JavaSparkContext;
    11 import org.apache.spark.api.java.function.Function2;
    12 
    13 public class mapPartitionsWithIndex {
    14 
    15     public static void main(String[] args) {
    16         SparkConf conf = new SparkConf().setAppName("mapPartitionsWithIndex").setMaster("local");
    17         JavaSparkContext sc = new JavaSparkContext(conf);
    18 
    19         List<String> studentNames = Arrays.asList("durant", "westbrook", "george", "wade", "kobe");
    20 
    21         JavaRDD<String> studentNamesRDD = sc.parallelize(studentNames, 9);
    22 
    23         JavaRDD<String> studentWithClassRDD = studentNamesRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
    24                     @Override
    25                     public Iterator<String> call(Integer index, Iterator<String> iterator) throws Exception {
    26 
    27                         List<String> studentWithClassList = new ArrayList<String>();
    28 
    29                         while(iterator.hasNext()) {
    30                             String studentName = iterator.next();
    31                             String studentWithClass = studentName + "_" + (index + 1);
    32                             studentWithClassList.add(studentWithClass);
    33                         }
    34 
    35                         return studentWithClassList.iterator();
    36                     }
    37 
    38                 }, true);
    39 
    40         for(String studentWithClass : studentWithClassRDD.collect()) {
    41             System.out.println(studentWithClass);
    42         }
    43 
    44         sc.close();
    45     }
    46 
    47 }

    输出结果:

    durant_2
    westbrook_4
    george_6
    wade_8
    kobe_9

    • sample算子

     1 package mapPartitions.xls;
     2 
     3 import java.util.Arrays;
     4 import java.util.List;
     5 
     6 import org.apache.spark.SparkConf;
     7 import org.apache.spark.api.java.JavaRDD;
     8 import org.apache.spark.api.java.JavaSparkContext;
     9 
    10 public class Sample {
    11 
    12     public static void main(String[] args) {
    13         //
    14         sample01();
    15     }
    16 
    17     public static void sample01(){
    18 
    19         SparkConf conf = new SparkConf().setAppName("Sample").setMaster("local");
    20         JavaSparkContext sc = new JavaSparkContext(conf);
    21 
    22         List<String> staffList = Arrays.asList("name01", "name02", "name03", "name04", "name05", "name06", "name07", "name08", "name09", "name010");
    23         JavaRDD<String> staffRDD = sc.parallelize(staffList);
    24 
    25         JavaRDD<String> luckyStaffRDD = staffRDD.sample(false, 0.5, System.currentTimeMillis());
    26 
    27         for(String staff : luckyStaffRDD.collect()) {
    28             System.out.println(staff);
    29         }
    30 
    31         sc.close();
    32     }
    33 
    34 
    35 }

    输出结果:

    name01
    name02
    name03
    name04
    name08
    name09

    • union算子

    将两个RDD合并到一起。

     1 package mapPartitions.xls;
     2 
     3 import java.util.Arrays;
     4 import java.util.List;
     5 
     6 import org.apache.spark.SparkConf;
     7 import org.apache.spark.api.java.JavaRDD;
     8 import org.apache.spark.api.java.JavaSparkContext;
     9 
    10 public class union {
    11 
    12     public static void main(String[] args) {
    13         SparkConf conf = new SparkConf().setAppName("union").setMaster("local");
    14         JavaSparkContext sc = new JavaSparkContext(conf);
    15 
    16         List<String> department1StaffList = Arrays.asList("name01", "name02", "name03", "name04");
    17         JavaRDD<String> department1StaffRDD = sc.parallelize(department1StaffList);
    18 
    19         List<String> department2StaffList = Arrays.asList("name05", "name06", "name07", "name08");
    20         JavaRDD<String> department2StaffRDD = sc.parallelize(department2StaffList);
    21 
    22         JavaRDD<String> departmentStaffRDD = department1StaffRDD.union(department2StaffRDD);
    23 
    24         for(String staff : departmentStaffRDD.collect()) {
    25             System.out.println(staff);
    26         }
    27 
    28         sc.close();
    29     }
    30 
    31 }

    运行结果:

    name01
    name02
    name03
    name04
    name05
    name06
    name07
    name08

    • intersection算子

    两个RDD求交集。

     1 package spark.rdd.xls;
     2 
     3 import java.util.Arrays;
     4 import java.util.List;
     5 
     6 import org.apache.spark.SparkConf;
     7 import org.apache.spark.api.java.JavaRDD;
     8 import org.apache.spark.api.java.JavaSparkContext;
     9 
    10 public class Intersection {
    11 
    12     public static void main(String[] args) {
    13         SparkConf conf = new SparkConf().setAppName("Intersection").setMaster("local");
    14         JavaSparkContext sc = new JavaSparkContext(conf);
    15 
    16         List<String> project1MemberList = Arrays.asList("name01", "name02", "name03", "name04");
    17         JavaRDD<String> project1MemberRDD = sc.parallelize(project1MemberList);
    18 
    19         List<String> project2MemberList = Arrays.asList("name01", "name06", "name02", "name07");
    20         JavaRDD<String> project2MemberRDD = sc.parallelize(project2MemberList);
    21 
    22         JavaRDD<String> projectIntersectionRDD = project1MemberRDD.intersection(project2MemberRDD);
    23 
    24         for(String member : projectIntersectionRDD.collect()) {
    25             System.out.println(member);
    26         }
    27 
    28         sc.close();
    29     }
    30 
    31 }

    运行结果:

    name01
    name02

    • distinct算子

    对rdd中的数据进行去重。
     1 package spark.rdd.xls;
     2 
     3 import java.util.Arrays;
     4 import java.util.List;
     5 
     6 import org.apache.spark.SparkConf;
     7 import org.apache.spark.api.java.JavaRDD;
     8 import org.apache.spark.api.java.JavaSparkContext;
     9 import org.apache.spark.api.java.function.Function;
    10 
    11 public class distinct {
    12 
    13     public static void main(String[] args) {
    14         //
    15         distinct01();
    16     }
    17 
    18     public static void distinct01(){
    19         SparkConf conf = new SparkConf().setAppName("Distinct").setMaster("local");
    20         JavaSparkContext sc = new JavaSparkContext(conf);
    21 
    22         // distinct算子
    23         // 对rdd中的数据进行去重
    24 
    25         // uv统计案例
    26         // uv:user view,每天每个用户可能对网站会点击多次
    27         // 此时,需要对用户进行去重,然后统计出每天有多少个用户访问了网站
    28         // 而不是所有用户访问了网站多少次(pv)
    29 
    30         List<String> accessLogs = Arrays.asList(
    31                 "user1 2016-01-01 23:58:42",
    32                 "user1 2016-01-01 23:58:43",
    33                 "user1 2016-01-01 23:58:44",
    34                 "user2 2016-01-01 12:58:42",
    35                 "user2 2016-01-01 12:58:46",
    36                 "user3 2016-01-01 12:58:42",
    37                 "user4 2016-01-01 12:58:42",
    38                 "user5 2016-01-01 12:58:42",
    39                 "user6 2016-01-01 12:58:42",
    40                 "user6 2016-01-01 12:58:45");
    41         JavaRDD<String> accessLogsRDD = sc.parallelize(accessLogs);
    42 
    43         JavaRDD<String> useridsRDD = accessLogsRDD.map(new Function<String, String>() {
    44 
    45             @Override
    46             public String call(String accessLog) throws Exception {
    47                 String userid = accessLog.split(" ")[0];
    48                 return userid;
    49             }
    50 
    51         });
    52 
    53         JavaRDD<String> distinctUseridsRDD = useridsRDD.distinct();
    54         int uv = distinctUseridsRDD.collect().size();
    55         System.out.println("uv: " + uv);
    56 
    57         sc.close();
    58     }
    59 
    60 
    61 }

    输出结果:

    uv: 6

    • aggregateByKey算子

     1 package spark.rdd.xls;
     2 
     3 import java.util.Arrays;
     4 import java.util.List;
     5 
     6 import org.apache.spark.SparkConf;
     7 import org.apache.spark.api.java.JavaPairRDD;
     8 import org.apache.spark.api.java.JavaRDD;
     9 import org.apache.spark.api.java.JavaSparkContext;
    10 import org.apache.spark.api.java.function.FlatMapFunction;
    11 import org.apache.spark.api.java.function.Function2;
    12 import org.apache.spark.api.java.function.PairFunction;
    13 import scala.Tuple2;
    14 
    15 public class aggregateByKey {
    16 
    17     public static void main(String[] args) {
    18         SparkConf conf = new SparkConf().setAppName("AggregateByKey").setMaster("local");
    19         JavaSparkContext sc = new JavaSparkContext(conf);
    20 
    21         JavaRDD<String> lines = sc.textFile("/Users/xls/Desktop/code/bigdata/data/test", 3);
    22 
    23         JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    24 
    25             @Override
    26             public java.util.Iterator<String> call(String line) throws Exception {
    27                 return Arrays.asList(line.split(" ")).iterator();
    28             }
    29 
    30         });
    31 
    32         JavaPairRDD<String, Integer> pairs = words.mapToPair(
    33 
    34                 new PairFunction<String, String, Integer>() {
    35 
    36                     private static final long serialVersionUID = 1L;
    37 
    38                     @Override
    39                     public Tuple2<String, Integer> call(String word) throws Exception {
    40                         return new Tuple2<String, Integer>(word, 1);
    41                     }
    42 
    43                 });
    44 
    45         JavaPairRDD<String, Integer> wordCounts = pairs.aggregateByKey(
    46                 0, new Function2<Integer, Integer, Integer>() {
    47                     
    48                     @Override
    49                     public Integer call(Integer v1, Integer v2) throws Exception {
    50                         return v1 + v2;
    51                     }
    52 
    53                 },
    54 
    55                 new Function2<Integer, Integer, Integer>() {
    56                     @Override
    57                     public Integer call(Integer v1, Integer v2)
    58                             throws Exception {
    59                         return v1 + v2;
    60                     }
    61 
    62                 });
    63 
    64         List<Tuple2<String, Integer>> wordCountList = wordCounts.collect();
    65         for(Tuple2<String, Integer> wordCount : wordCountList) {
    66             System.out.println(wordCount);
    67         }
    68 
    69         sc.close();
    70     }
    71 
    72 }

    输出:

    (xuelisheng,1)
    (spark,2)
    (hadoop,1)
    (scala,1)
    (java,1)

    • cartesian算子

    形成类似于sql中的笛卡尔积。

  • 相关阅读:
    表表达式,Substring, CharIndex, 多行数据变同一行的用法
    武汉三首,记录备忘,写的不好,以后再改
    竟然又有两年没有码字发帖了,真是快长草了,打磨一下,克服拖延症,重新回归,重新写起!
    屈指一算,竟然有一年半没有发帖了,真是时光荏苒,白云苍狗!
    下雨有感
    (ETW) Event Trace for Windows 提高 (含pdf下载)
    (ETW) Event Tracing for Windows 入门 (含pdf下载)
    Requirejs加载超时问题的一个解决方法:设置waitSeconds=0
    如何通过Socket TCP发送并接收一个文件?
    Microsoft.VisualBasic.DateAndTime.Timer 与 DateTime.Now.TimeOfDay.TotalSeconds 相当
  • 原文地址:https://www.cnblogs.com/xuelisheng/p/11506103.html
Copyright © 2011-2022 走看看