zoukankan      html  css  js  c++  java
  • spark运行wordcount程序

    首先提一下spark rdd的五大核心特性:

    1、rdd由一系列的分片组成,比如说128m一片,类似于hadoop中的split
    2、每一个分区都有一个函数去迭代/运行/计算
    3、一系列的依赖,比如:rdda转换为rddb,rddb转换为rddc,那么rddc依赖于rddb,rddb依赖于rdda。
    lineage:保存了一些列的转换
    4、对于每个k-v的rdd可以指定一个partition,告诉它如何分区,常用分区规则有hash和range
    5、处理rdd split的数据在哪里,尽量在哪里做计算(移动计算而非移动数据),这里选择最优位置,为什么存在选择,因为hdfs默认存储3个副本,每个副本都是一个选择。

    RDD的两种创建方式:

    parallelist

    外部数据源

    RDD的两种操作方式:

    transformation:从一个RDD转化为另一个RDD

    action:输出结果集

    RDD依赖关系:

    窄依赖(narrow dependencies):n——>1
    子RDD的每个分区依赖于常数个父分区(即与数据规模无关)
    输入输出一对一的算子,且结果RDD的分区结构不变,主要是map,flatmap
    输入输出一对一,但结果集RDD的分区结构发生了变化,如union、coalesce
    从输入中选择部分元素的算子,如filter、distinct、subtract、sample

    宽依赖(wide dependencies):1——>n
    子RDD的每个分区依赖于所有父RDD分区
    对单个RDD基于key进行重组和reduce,如groupByKey、reduceByKey
    对两个RDD基于key进行join和重组,如join

    spark的shuffer过程类似于mapreduce shuffer过程。

    创建spark应用模板:

    1)创建SparkConf

    2)创建SparkContact

    3)加工逻辑

    4)stop()关闭资源

    ---------------------------------------------------------------------------------------------------------------------

    接下来通过wordcont程序熟悉一下:

    检查需要分析的文本文件:

    # bin/hdfs dfs -ls /user/hadoop/wordcount/input/
    Found 1 items
    -rw-r--r--   3 root supergroup         63 2017-05-22 14:48 /user/hadoop/wordcount/input/wc.input
    [root@db02 hadoop-2.5.0]# bin/hdfs dfs -text /user/hadoop/wordcount/input/wc.input
    hadoop hdfs mapreduce
    zookeeper
    spark hive hbase
    spark hadoop


    编辑scala程序实现wordcount功能:
    1)读取文本文件
    scala> val linesRdd = sc.textFile("hdfs://db02:8020/user/hadoop/wordcount/input/wc.input")
    2)按空格切分文件
    #scala> val wordRdd = linesRdd.map(line => line.split(" "))
    scala> val wordRdd = linesRdd.flatMap(line => line.split(" "))
    3)map函数统计单词
    scala> val keyvalRdd = wordRdd.map(word => (word,1))
    4)统计单词
    scala> val countRdd = keyvalRdd.reduceByKey((a,b) => (a+b))
    5)输出结果集
    scala> countRdd.collect

    6)将以上程序整合成一行scala程序,结果如下:
    sc.textFile("hdfs://db02:8020/user/hadoop/wordcount/input/wc.input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).collect

    7)spark默认输出结果是没有排序的,如果想要wordcount输出结果按照key排序可以使用sortByKey()函数:

    升序:sc.textFile("hdfs://db02:8020/user/hadoop/wordcount/input/wc.input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).sortByKey(true).collect

    降序:sc.textFile("hdfs://db02:8020/user/hadoop/wordcount/input/wc.input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).sortByKey(false).collect

    8)如果想要输出结果按照value排序可以使用sortByKey的如下技巧:

    sc.textFile("hdfs://db02:8020/user/hadoop/wordcount/input/wc.input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).collect

    9)wordcount结果按值降序排序,可以使用take(n)函数输出前n个结果:

    sc.textFile("hdfs://db02:8020/user/hadoop/wordcount/input/wc.input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).take(3)

  • 相关阅读:
    剑指offer系列0:替换空格&从头到尾打印链表
    算法1:动态规划
    设计模式2:策略模式
    NPOI导出xls、xlsx和csv
    EF6
    oracle导出数据字典
    oracle分组函数
    oracle分析函数中的开窗函数
    Oracle 列转行&行转列
    Oracle基本函数总结
  • 原文地址:https://www.cnblogs.com/wcwen1990/p/6892652.html
Copyright © 2011-2022 走看看