zoukankan      html  css  js  c++  java
  • spark_入门(单词统计)

    1、特点

    1. 快如闪电的集群计算:是Hadoop的100倍,磁盘计算快10倍
    2. 大规模快速通用的计算引擎:支持Java/scala/python/R    提供80+种操作符,容易构建并行应用  组合SQL   流计算  复杂分析
    3. 运行环境:Hadoop mesos,standalone等

    2、spark模块

    1. spark core 核心模块
    2. spark SQL 
    3. spark streaming  流计算
    4. spark MLlib       机器学习
    5. spark graph    图计算

    3、spark安装

    1. 配置环境变量的时候记得sbin和bin都要配置

    4、spark启动

    1. 进入bin目录下,启动spark-shell

    5、spark编译

    1. sc  :sparkcontext   spark程序的入口点,封装了所有的方法
    2. RDD:弹性分布式数据集

    6、spark入门案例--单词统计

    scala> val rdd1 = sc.textFile("/home/test.txt")
    rdd1: org.apache.spark.rdd.RDD[String] = /home/test.txt MapPartitionsRDD[1] at textFile at <console>:24
    
    scala> val rdd2 = rdd1.flatMap(line => line.split(" "))
    rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:26
    
    scala> val rdd3 = rdd2.ma
    map   mapPartitions   mapPartitionsWithIndex   max
    
    scala> val rdd3 = rdd2.map(word => (word,1))
    rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:28
    
    scala> val rdd4 = rdd3.reduce
    reduce   reduceByKey   reduceByKeyLocally
    
    scala> val rdd4 = rdd3.reduceByKey
    reduceByKey   reduceByKeyLocally
    
    scala> val rdd4 = rdd3.reduceByKey(_+_)
    rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:30
    
    scala> rdd4.collect()
    res0: Array[(String, Int)] = Array((world2,1), (world1,1), (hello,3), (world3,1))
    

    现在我们将上述代码简化:
    scala> sc.textFile("/home/test.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
    res3: Array[(String, Int)] = Array((world2,1), (world1,1), (hello,3), (world3,1))
    

    7、单词过滤

    scala> sc.textFile("/home/test.txt").flatMap(_.split(" ")).filter(_.contains("wor")).map((_,1)).reduceByKey(_+_).collect
    res4: Array[(String, Int)] = Array((world2,1), (world1,1), (world3,1))
    

    filter:是选出需要的

    8、利用intellij运行spark进行单词统计-Scala

    首先先记录一下遇到的问题:is already defined as object -----》原因是由于在source root的时候直接将整个项目进行了source,导致出现重复定义object

    下面附上代码:
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Demo111 {
      def main(args: Array[String]): Unit = {
    
        /**
          * sparkConf介绍
          */
        /*
         /** Create a SparkConf that loads defaults from system properties and the classpath */
            def this() = this(true)
        */
    
        /**
          * Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
          *
          * Most of the time, you would create a SparkConf object with `new SparkConf()`, which will load
          * values from any `spark.*` Java system properties set in your application as well. In this case,
          * parameters you set directly on the `SparkConf` object take priority over system properties.
          *
          * For unit tests, you can also call `new SparkConf(false)` to skip loading external settings and
          * get the same configuration no matter what the system properties are.
          *
          * All setter methods in this class support chaining. For example, you can write
          * `new SparkConf().setMaster("local").setAppName("My app")`.
          *
          * @param loadDefaults whether to also load values from Java system properties
          *
          * @note Once a SparkConf object is passed to Spark, it is cloned and can no longer be modified
          * by the user. Spark does not support modifying the configuration at runtime.
          */
    
        //创建一个spark configure配置对象
        val conf = new SparkConf();
        conf.setAppName("wordCount");
        //设置master属性
        conf.setMaster("local");
    
        //通过conf创建spark环境变量
        val sc = new SparkContext(conf);
        //开始wordcount
        /**
          * textFile()函数
          * Read a text file from HDFS, a local file system (available on all nodes), or any
          * Hadoop-supported file system URI, and return it as an RDD of Strings.
          * @param path path to the text file on a supported file system
          * @param minPartitions suggested minimum number of partitions for the resulting RDD
          * @return RDD of lines of the text file
          */
        //rdd1按照行数存储
        val rdd1 = sc.textFile("D:\test.txt");
        //System.setProperty("hadoop.home.dir","E:\hadoop\hadoop-2.6.5")
        /**flatMap()函数
          *  Return a new RDD by first applying a function to all elements of this
          *  RDD, and then flattening the results.
          */
        //按照空格分隔
        val rdd2 = rdd1.flatMap(_.split(""))
        //    println(rdd2)
        //初始化所有的单词出现的次数为1
        val rdd3 = rdd2.map((_,1))
        /**
          * Merge the values for each key using an associative and commutative reduce function. This will
          * also perform the merging locally on each mapper before sending results to a reducer, similarly
          * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
          */
        //根据key进行聚合统计
        val rdd4 = rdd3.reduceByKey(_ + _);
        /**
          * Return an array that contains all of the elements in this RDD.
          *
          * @note This method should only be used if the resulting array is expected to be small, as
          * all the data is loaded into the driver's memory.
          */
        //将rdd转为列表
        val result = rdd4.collect()
        for (i <- result) println(i)
    
      }
    
    }


    
    
    运行的结果却是统计了每个字母的出现的次数
    (d,1)
    (w,1)
    (e,1)
    (h,1)
    ( ,1)
    (o,2)
    (r,1)
    (l,3)
    

    那么断定出现问题的地方应该是分隔单词的时候出错
    split(" ")中间少写了一个空格,否则就是分隔为一个一个的字母
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Demo111 {
      def main(args: Array[String]): Unit = {
    
        /**
          * sparkConf介绍
          */
        /*
         /** Create a SparkConf that loads defaults from system properties and the classpath */
            def this() = this(true)
        */
    
        /**
          * Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
          *
          * Most of the time, you would create a SparkConf object with `new SparkConf()`, which will load
          * values from any `spark.*` Java system properties set in your application as well. In this case,
          * parameters you set directly on the `SparkConf` object take priority over system properties.
          *
          * For unit tests, you can also call `new SparkConf(false)` to skip loading external settings and
          * get the same configuration no matter what the system properties are.
          *
          * All setter methods in this class support chaining. For example, you can write
          * `new SparkConf().setMaster("local").setAppName("My app")`.
          *
          * @param loadDefaults whether to also load values from Java system properties
          *
          * @note Once a SparkConf object is passed to Spark, it is cloned and can no longer be modified
          * by the user. Spark does not support modifying the configuration at runtime.
          */
    
        //创建一个spark configure配置对象
        val conf = new SparkConf();
        conf.setAppName("wordCount");
        //设置master属性
        conf.setMaster("local");
    
        //通过conf创建spark环境变量
        val sc = new SparkContext(conf);
        //开始wordcount
        /**
          * textFile()函数
          * Read a text file from HDFS, a local file system (available on all nodes), or any
          * Hadoop-supported file system URI, and return it as an RDD of Strings.
          * @param path path to the text file on a supported file system
          * @param minPartitions suggested minimum number of partitions for the resulting RDD
          * @return RDD of lines of the text file
          */
        //rdd1按照行数存储
        val rdd1 = sc.textFile("D:\test.txt");
        //System.setProperty("hadoop.home.dir","E:\hadoop\hadoop-2.6.5")
        /**flatMap()函数
          *  Return a new RDD by first applying a function to all elements of this
          *  RDD, and then flattening the results.
          */
        //按照空格分隔
        val rdd2 = rdd1.flatMap(line =>line.split(" "));
        //    println(rdd2)
        //初始化所有的单词出现的次数为1
        val rdd3 = rdd2.map((word => (word,1)));
        /**
          * Merge the values for each key using an associative and commutative reduce function. This will
          * also perform the merging locally on each mapper before sending results to a reducer, similarly
          * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
          */
        //根据key进行聚合统计
        val rdd4 = rdd3.reduceByKey(_ + _);
        /**
          * Return an array that contains all of the elements in this RDD.
          *
          * @note This method should only be used if the resulting array is expected to be small, as
          * all the data is loaded into the driver's memory.
          */
        //将rdd转为列表
        val result = rdd4.collect()
        for (i <- result) println(i) // 等价于result.foreach(println);
      }
    
    }


    (hello,1)
    (world,1)

    还有一个很奇怪的问题是,我必须要新建一个Java项目来引入Scala,才可以引入spark,如果直接新建一个Scala项目,就不可以引入

    还有一个小地方需要记住的是:
      //通过conf创建spark环境变量
        val sc = new SparkContext(conf);
    这里的spark的环境变量是通过spark配置对象conf来创建的,所以一定要跟参数conf


    9、java版的spark单词统计
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import scala.Tuple2;
    
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    
    public class wordcount {
        public static void main(String[] args) {
    
            //同样的需要创建一个环境变量的对象conf
            SparkConf conf = new SparkConf();
            conf.setAppName("wordcount_java");
            conf.setMaster("local");
            //创建一个Java spark环境变量
            JavaSparkContext sc = new JavaSparkContext(conf);
            //处理文本,得到按照行数分隔的文本
            JavaRDD rdd1 =  sc.textFile("D:\test.txt");
            /**FlatMapFunction()函数返回一个迭代器
             * @FunctionalInterface
            public interface FlatMapFunction<T, R> extends Serializable {
            Iterator<R> call(T t) throws Exception;
            }
    
             */
            //对每行按照空格进行切割 ,并且压扁,返回一个集合列表的迭代器
           JavaRDD<String> rdd2 =  rdd1.flatMap(new FlatMapFunction() {
               @Override
               public Iterator call(Object o) throws Exception {
                   String s = o.toString();
                   String[] arr =  s.split(" ");
                   List<String> list = new ArrayList<String>();
                   for(String ss: arr){
                       list.add(ss);
                   }
                   return list.iterator();
               }
           });
    
            /**
             * @FunctionalInterface
            public interface PairFunction<T, K, V> extends Serializable {
            Tuple2<K, V> call(T t) throws Exception;
            }
             */
           //初始化每个单词word---(word,1)
          JavaPairRDD<String,Integer> rdd3 =  rdd2.mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String s) throws Exception {
                    return new Tuple2<String,Integer>(s,1);
                }
            });
            /**
             *   def reduceByKey(func: JFunction2[V, V, V]): JavaPairRDD[K, V] = {
                fromRDD(reduceByKey(defaultPartitioner(rdd), func))
             }
             */
    
          //纵向捏合 reduce化简
            JavaPairRDD<String,Integer> rdd4 =  rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            });
    
            /**
             *   /**
             * Return an array that contains all of the elements in this RDD.
             *
             * @note this method should only be used if the resulting array is expected to be small, as
             * all the data is loaded into the driver's memory.
             */
            //打印rdd4
            List<Tuple2<String,Integer>> list =  rdd4.collect();
            for(Tuple2<String,Integer> t : list){
                 System.out.println(t._1 + "===" + t._2);
            }
    
    
    
    
        }
    }
    






    欢迎关注我的公众号:小秋的博客 CSDN博客:https://blog.csdn.net/xiaoqiu_cr github:https://github.com/crr121 联系邮箱:rongchen633@gmail.com 有什么问题可以给我留言噢~
  • 相关阅读:
    每日日报
    每日日报
    java笔记
    每日日报
    每日日报
    每日日报
    查看当前mysql时区 并设置为北京时间
    springboot 指定配置文件启动, 区分开发和线上分支
    Js Contains方法
    vue $refs的基本用法
  • 原文地址:https://www.cnblogs.com/flyingcr/p/10327065.html
Copyright © 2011-2022 走看看