zoukankan      html  css  js  c++  java
  • 03、IDEA下Spark API编程

    03、IDEA下Spark API编程

    3.1 编程实现Word Count

    3.1.1 创建Scala模块

    3.1.2 添加maven支持,并引入spark依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 													http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.oldboy</groupId>
    <artifactId>myspark</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
     <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_2.11</artifactId>
       <version>2.1.0</version>
     </dependency>
    </dependencies>
    </project>
    

    3.1.3 编写scala程序

    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
    * Created by Administrator on 2018/5/8.
    */
    object WordCountScala {
    def main(args: Array[String]): Unit = {
     //1.创建spark配置对象
     val conf = new SparkConf()
    
     //设置App名称
     conf.setAppName("wcApp")
     //设置master
     conf.setMaster("local")
    
     //2.创建spark上下文件对象
     val sc = new SparkContext(conf)
    
     //3.加载文件
     val rdd1 = sc.textFile("d:/mr/1.txt")
    
     //4.压扁
     val rdd2 = rdd1.flatMap(_.split(" "))
    
     //5.标1成对
     val rdd3 = rdd2.map(w => (w,1))
    
     //6.化简
     val rdd4 = rdd3.reduceByKey(_ + _)
    
     //收集数据
     val arr = rdd4.collect()
    
     arr.foreach(println)
    }
    }
    

    3.1.4 编写java程序

    package com.oldboy.spark.java;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import scala.Tuple2;
    
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;
    
    /**
    *
    */
    public class WordCountJava {
    public static void main(String[] args) {
     //1.创建配置对象
     SparkConf conf = new SparkConf() ;
     conf.setAppName("wcApp") ;
     conf.setMaster("local") ;
    
     //2.创建java版的上下文
     JavaSparkContext sc = new JavaSparkContext(conf) ;
    
     //3.加载文件
     JavaRDD<String> rdd1 = sc.textFile("d:/mr/1.txt");
    
     //4.压扁
     JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() {
       public Iterator<String> call(String s) throws Exception {
         String[] arr = s.split(" ");
         return Arrays.asList(arr).iterator();
       }
     }) ;
    
     //5.标一成对
     JavaPairRDD<String,Integer> rdd3 
       = rdd2.mapToPair(new PairFunction<String, String, Integer>() {
         public Tuple2<String, Integer> call(String s) throws Exception {
           return new Tuple2<String, Integer>(s , 1);
         }
       }) ;
    
     //6.化简
     JavaPairRDD<String,Integer> rdd4 
       = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() {
         public Integer call(Integer v1, Integer v2) throws Exception {
           return v1 + v2;
         }
       }) ;
    
     //7.收集
     List<Tuple2<String,Integer>> list = rdd4.collect();
    
     for(Tuple2<String,Integer> t : list){
       System.out.println(t._1() + " : " + t._2);
     }
    }
    }
    

    3.2 编程实现温度统计

    3.2.1 创建Scala模块

    3.2.2 添加maven,引入Spark依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 													http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>com.oldboy</groupId>
      <artifactId>myspark</artifactId>
      <version>1.0-SNAPSHOT</version>
      <dependencies>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.11</artifactId>
          <version>2.1.0</version>
        </dependency>
      </dependencies>
    </project>
    

    3.2.3 编写Scala程序

    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * Created by Administrator on 2018/5/8.
     */
    object TempAggScala {
      def main(args: Array[String]): Unit = {
        //1.创建spark配置对象
        val conf = new SparkConf()
    
        //设置App名称
        conf.setAppName("TempAgg")
        //设置master
        conf.setMaster("local")
    
        //2.创建spark上下文件对象
        val sc = new SparkContext(conf)
    
        //3.加载文件
        val rdd1 = sc.textFile("d:/mr/temps.dat")
    
        //4.变换形成嵌套的元组
        val rdd2 = rdd1.flatMap(line=>{
          val arr = line.split(" ")
          (arr(0).toInt , (arr(1).toInt ,arr(1).toInt))
        })
    
        //5.化简
        val rdd3 = rdd2.reduceByKey((a,b)=>{
          import scala.math
          val max = math.max(a(0) , b(0))
          val min = math.min(a(1) , b(1))
          (max,min)
        })
    
        //6.收集数据
        val arr = rdd3.collect()
        arr.foreach(println)
      }
    }
    

    3.2.4 编写Java程序

    package com.oldboy.spark.java;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import scala.Tuple2;
    import scala.Tuple4;
    
    import java.util.List;
    
    /**
      * 统计气温数据
      */
    public class TempAggJava {
      public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setAppName("tempAggJava");
        conf.setMaster("local") ;
    	
        //创建JavaSparkContext对象
        JavaSparkContext sc = new JavaSparkContext(conf);
    
        //1.加载文件,返回JavaRDD对象
        JavaRDD<String> rdd1 = sc.textFile("d:/mr/temp.dat");
    
        //2.变换
        JavaPairRDD<Integer, Tuple4<Integer, Integer, Double, Integer>> rdd2 
          = rdd1.mapToPair(new PairFunction<String, 
          					Integer,
          					Tuple2<Integer,Integer>>() {
          public Tuple2<Integer, Tuple2<Integer, Integer>> call(String s) 
            throws Exception {
            String[] arr = s.split(" ");
            int year = Integer.parseInt(arr[0]) ;
            int temp = Integer.parseInt(arr[1]) ;
            return new Tuple2<Integer, Tuple2<Integer, Integer>>(year, 
                                           new Tuple2<Integer,Integer>(temp , temp)) ;
          }
        }) ;
    
        //3.聚合
        JavaPairRDD<Integer, Tuple4<Integer, Integer, Double, Integer>> rdd3 
          = rdd2.reduceByKey(
          new Function2<Tuple2<Integer, Integer>, 
          		Tuple2<Integer, Integer>, 
          		Tuple2<Integer, Integer>>() {
            public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1,
                                                 Tuple2<Integer, Integer> v2) 
              throws Exception {
              int max = Math.max(v1._1(),v2._1()) ;
              int min = Math.min(v1._2(),v2._2()) ;
              return new Tuple2<Integer, Integer>(max, min) ;
            }
          }) ;
    
        //收集
        List<Tuple2<Integer, Tuple2<Integer, Integer>>> list = rdd3.collect();
        for(Tuple2<Integer, Tuple2<Integer, Integer>> t : list){
          System.out.println(t);
        }
      }
    }
    
  • 相关阅读:
    kafka关于修改副本数和分区的数的案例实战(也可用作leader节点均衡案例)
    kafka集群监控之kafka-manager部署(kafka-manager的进程为:ProdServerStart)
    flume配置kafka channle的实战案例
    HTML&CSS基础-表格简介
    HTML&CSS基础-雪碧图的制作和使用
    Hadoop生态圈-使用Ganglia监控flume中间件
    JavaScript基础知识-条件运算符(三元运算符)
    初识Apache Kafka 核心概念
    Hadoop生态圈-CentOs7.5单机部署ClickHouse
    安装Cloudera manager agent步骤详解
  • 原文地址:https://www.cnblogs.com/xupccc/p/9543965.html
Copyright © 2011-2022 走看看