zoukankan      html  css  js  c++  java
  • spark

    spark的安装

    • 系统:Ubuntu

    下载地址

    tar zxf spark-2.2.1-bin-hadoop2.7.tgz
    mkdir spark-2.2.1
    mv spark-2.2.1-bin-hadoop2.7 spark-2.2.1
    cd spark-2.2.1
    ln -s spark-2.2.1-bin-hadoop2.7 spark
    • 在bash里面添加下面的命令
    export SPARK_HOME=/home/yueyao/spark-2.2.1/spark
    export PATH=$SPARK_HOME/bin:$PATH
    • 由于系统自带的java版本比较低,因此下载java的jdk重新进行安装
    tar zxf jdk-8u131-linux-x64.tar.gz
    export JAVA_HOME=/home/yueyao/java-1.8/jdk1.8.0_131/
    export PATH=$JAVA_HOME/bin:$PATH
    • 写一个简单的Spark程序
    #首先初始化SparkContext,导入Spark包并且创建SparkContext
    from pyspark import SparkConf, SparkContext
    #创建一个SparkConf 对象,设置应用的名称
    conf = SparkConf().setMaster("local").setAppName("My App")
    #基于SparkConf 对象创建一个SparkContext对象
    sc = SparkContext(conf = conf)

    • java版本的单词数统计应用
    // 创建一个Java版本的Spark Context
    SparkConf conf = new SparkConf().setAppName("wordCount");
    JavaSparkContext sc = new JavaSparkContext(conf)
    // 读取我们输入的数据
    JavaRDD<String> input = sc.textFile(inputFile);
    //切分为单词
    JavaRDD<String> words = input.flatMap(
        new FlatMapFunction<String,String>(){
            public Iterable<String> call(String x){
                return Arrays.asList(x.split(" "));
            }
        }
    );
    // 抓换位键值对并计数
    JavaPairRDD<String,Integer> counts = word.mapToPair(
        new PairFunction<String, String, Integer>(){
            public Tuple2<String, Integer> call(String x){
                return new Tuple2(x,1);
            }
        }
    ).reduceByKey( new Function2<Integer,Integer,Integer>(){
        public Integer call(Integer x, Integer y){
            return x + y;
        }
        }
    );
    // 将统计出来的单词总数存入一个文本文件,引发求值
    counts.saveAsTextFile(outputFile);
    • 创建一个Scala版本的单词数统计应用
    //创建一个Scala版本的Spark Context
    val conf = new SparkConf().setAppName("wordCount")
    val sc = new SparkContext(conf)
    
    //读取我们的输入数据
    val input = sc.textFile(inputFile)
    //把它切分成一个个单词
    val words = input.flatMap(line => line.split(" "))
    // 转换键值对并计数
    val counts = words.map(word => (word, 1)).reduceByKey{case (x,y) => x+y}
    //将统计出来的单词总数存入一个文本文件,引发求值
    counts.saveAsTextFile(outputFile)
    • 创建一个python版本的wordcount,有个疑问,自己建立的hadoop小集群不能读取本地文件,只能上传到hdfs再读写文件
    #导入所需要的模块
    from pyspark import SparkConf, SparkContext
    import os
    #初始化一个sparkconf对象和一个sparkcontext对象
    conf = SparkConf().setMaster("local").setAppName("Yue Yao app")
    sc = SparkContext(conf = conf)
    #调用hadoop命令在hdfs上建立文件夹,同时上传文件到hdfs,这一步可以忽略
    os.system('hadoop fs -mkdir /user/yueyao/Spark/input/')
    os.system('hadoop fs -put Test.md /user/yueyao/Spark/input')
    os.system('hadoop fs -mkdir /user/yueyao/Spark/output/')
    #设置输入的文件路径和输出路径
    input_file = "/user/yueyao/Spark/input/Test.md"
    output_file = "/user/yueyao/Spark/output/out2"
    
    #这里是读入文件
    lines = sc.textFile(input_file)
    #对数据进行分割
    split_file = lines.flatMap(lambda line:line.split(" "))
    #对单词进行计数
    word_count = split_file.map(lambda word:(word,1))
    #通过reduce合并计数
    total_count = word_count.reduceByKey(lambda a,b:a+b)
    #输出文件,这里是写到了hdfs上面
    total_count.saveAsTextFile(output_file)
    
    #上面的步骤可以简写成下面的部分
    #counts = lines.flatMap(lambda line:line.split(" ")).map(lambda word:(word,1)).reduceByKey(lambda a,b:a+b)
    #couts.saveAsTextFile(output_file)
  • 相关阅读:
    RDS 工作笔记
    网站测试需要提供的参数和结果分析
    php 安全编程
    留住青春的格子
    保持工作精力旺盛的方法
    百万格子的标签认领可以提高你在alexa的排名的格子
    老电影,似水流年的记忆
    五行 八字 计算
    iis6.0 的 性能比较
    各种情绪和调节方法
  • 原文地址:https://www.cnblogs.com/raisok/p/10917680.html
Copyright © 2011-2022 走看看