zoukankan      html  css  js  c++  java
  • spark学习

    一、什么是spark?

    1.spark是大规模数据处理的统一分析引擎。

    2.组成(http://spark.apache.org/docs/latest/quick-start.html)

    SparkCore:将分布式数据抽象为弹性分布式数据集(RDD),实现了应用任务调度、RPC、序列化和压缩,并为运行在其上的上层组件提供API。

    SparkSQL:Spark Sql 是Spark来操作结构化数据的程序包,可以让我使用SQL语句的方式来查询数据,Spark支持 多种数据源,包含Hive表,parquest以及JSON等内容。

    SparkStreaming: 是Spark提供的实时数据进行流式计算的组件。

    MLlib:提供常用机器学习算法的实现库。

    GraphX:提供一个分布式图计算框架,能高效进行图计算。

    ...

    二、RDD和wordcount

    spark-shell计算wordCount:

    1. 创建文本文件wordcount.txt放置hdfs某路径下( hadoop dfs -ls chenht)
    2. 启动spark-shell:./bin/spark-shell
    3. val lineRDD = sc.textFile("chenht/wordcount.txt")
    4. lineRDD.foreach(println)
    5. val wordRDD = lineRDD.flatMap(line => line.split(" "))
    6. wordRDD.collect
    7. val wordCountRDD = wordRDD.map(word => (word,1))
    8. wordCountRDD.collect
    9. val resultRDD = wordCountRDD.reduceByKey((x,y)=>x+y)
    10. resultRDD.collect
    11. val orderedRDD = resultRDD.sortByKey(false)
    12. orderedRDD.collect
    13. 简便写法:sc.textFile("chenht/wordcount.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortByKey().collect

    java编写wordcount

      

     1     public static void main(String[] args) {
     2         if (args.length < 1) {
     3             System.err.println("Usage: JavaWordCount <file>");
     4             System.exit(1);
     5         }
     6         SparkConf conf = new SparkConf().setAppName("JavaWordCount");
     7         JavaSparkContext sc = new JavaSparkContext(conf);
     8 
     9         List<Tuple2<String, Integer>> output = sc.textFile(args[0]).flatMap(line -> {
    10             return Arrays.asList(line.split(" "));
    11         }).mapToPair(word -> {
    12             return new Tuple2<>(word, 1);
    13         }).reduceByKey((v1, v2) -> {
    14             return v1 + v2;
    15         }).collect();
    16 
    17         for (Tuple2<?, ?> tuple : output) {
    18             System.out.println(tuple._1() + ": " + tuple._2());
    19         }
    20         sc.stop();
    21     }

    spark-submit提交:

    #!/bin/sh
    
    textPath="/user/root/chenht/wordcount.txt"
    
    echo "----start wordcount" `date`
    
    /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/bin/spark-submit 
    --num-executors 3  
    --driver-cores 2 
    --driver-memory 1G 
    --executor-cores 1 
    --executor-memory 1G 
    --class com.chenht.spark01.WordCount2 
    --name wordCount 
    --master yarn /home/chenht/spark/spark.jar $textPath
    
    echo "----end wordcount" `date`
    

    三、广播变量和累加器

    1.广播变量:

    如果我们要在分布式计算里面分发大对象,例如:字典,集合,黑白名单等,这个都会由Driver端进行分发,一般来讲,如果这个变量不是广播变量,那么每个task就会分发一份,这在task数目十分多的情况下Driver的带宽会成为系统的瓶颈,而且会大量消耗task服务器上的资源,如果将这个变量声明为广播变量,那么知识每个executor拥有一份,这个executor启动的task会共享这个变量,节省了通信的成本和服务器的资源。

    2.累加器

    在spark应用程序中,我们经常会有这样的需求,如异常监控,调试,记录符合某特性的数据的数目,这种需求都需要用到计数器,如果一个变量不被声明为一个累加器,那么它将在被改变时不会再driver端进行全局汇总,即在分布式运行时每个task运行的只是原始变量的一个副本,并不能改变原始变量的值,但是当这个变量被声明为累加器后,该变量就会有分布式计数的功能。

  • 相关阅读:
    FPGA STA(静态时序分析)
    Github实例教程-创建库、创建主页
    【JS】两种计时器/定时器
    【java】 获取计算机信息及Java信息
    【java 上传+下载】
    【POI xls】解析xls遇到的问题
    【jackson 异常】com.fasterxml.jackson.databind.JsonMappingException异常处理
    【tomcat 无法部署】svn上下载的maven项目无法部署到tomcat中
    【jQuery 冻结任意行列】冻结任意行和列的jQuery插件
    【JSON 注解】JSON循环引用2----JSON注解@JsonIgnoreProperties+JAVA关键字transient+后台对象与JSON数据的格式互相转化
  • 原文地址:https://www.cnblogs.com/chenhtblog/p/10530538.html
Copyright © 2011-2022 走看看