zoukankan      html  css  js  c++  java
  • Spark实战1

    1. RDD-(Resilient Distributed Dataset)弹性分布式数据集
          Spark以RDD为核心概念开发的,它的运行也是以RDD为中心。有两种RDD:第一种是并行Collections,它是Scala collection,可以进行并行计算;第二种是Hadoop数据集,它是并行计算HDFS文件的每条记录,凡是Hadoop支持的文件系统,都可以进行操作。这两种RDD都以同样的方式处理。

        1.1 RDD之并行Collections
             并行Collections由SparkContext的parallelize方法,在一个已经存在的Scala collection上创建。这个collection上的成员会被copy成分布式数据库,也就是copy到所有节点,于是就可以进行       并行计算了。举例如下: 
             

           #scala的collection
           scala> val data = Array(1, 2, 3, 4, 5)
           data: Array[Int] = Array(1, 2, 3, 4, 5) 
           #并行collection
           scala> val distData = sc.parallelize(data)
           distData: spark.RDD[Int] = spark.ParallelCollection@10d13e3e 

            第一条语句创建一个Scala collection,第二条语句将它转化成并行collection。并行collection有一个重要参数,就是slices数,spark在进行计算的时候,每个slice对应一个task。通常,一个       CPU对应2~4个slice。一般情况下,Spark会根据集群的状况,自动计算slice也可以手动指定,比如说,paralize(data,10)就是指定了10个slice。 
         

        1.2 RDD之Hadoop数据集 
             Spark支持在任何Hadoop能处理的文件系统上创建分布式数据集,包括本地文件系统,Amazon S3,Hypertable,HBase等等。Spark支持文本文件,序列文件,以及任何Hadoop的                   InputFormat。 比如,从文本文件创建数据集的方式如下: 

           scala> val distFile = sc.textFile("data.txt")
           distFile: spark.RDD[String] = spark.HadoopRDD@1d4cee08 

             如果给distFile设置slice数量,形如sc.textFile("data.txt",5)。默认情况下,spark为data.txt的每个block块设置一个slice。

        Note: 手工设置的slice数,只能比文件的block块数量大,不能比它小。 
             对于SequenceFile序列文件,SparkContext的sequenceFile[k, v]函数将它转化成RDD。 对其他的Hadoop InputFormat,SparkContext.hadoopRDD方法处理。

    2.  RDD运算
            RDD支持两种运算:变换transformation-从已有的RDD创建一个新的RDD,如map;或者从action中创建RDD,如reduce。 Spark的transformation都是lazy的,Spark会记下这些transformation,不立刻计算结果直到action需要返回结果的时候再进行计算。
        Note: 默认情况下,每个RDD的transformation都会重新计算,但如果将RDD用persist持久化到内存里,或者缓存到内存里,它就不重新计算了,由此加快查询速度。 

    3. RDD持久化

          如果一个RDD被持久化了,那么,每个节点都会存放这个RDD的所有slice,于是可以在内存进行计算,可以重用,这样可以让后来的action计算更快,通常会把速度提高至少十倍。对迭代式计算来说,持久化非常关键。RDD的persist方法和cache方法都可以进行持久化。RDD是容错的--如果它的任何部分丢失了,都会重新计算创建。 
        Note:RDD有不同的存储方式,可以存在硬盘,或者内存,或者复制到所有节点。而cache函数只有一个默认的存储方式就是内存。 

    4. 共享变量-广播变量、累计量
         4.1 广播变量

              即在集群的每个节点机器上都缓存一个只读的变量,比如说,每个节点都保存一份输入数据的只读缓存。 
         广播变量的使用方式:

            scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
            broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c) 
            scala> broadcastVar.value
            res0: Array[Int] = Array(1, 2, 3) 

          Note:创建了广播变量之后,就不能使用broadcastVar了,要使用broadcastVar.value。
         

         4.2 累计量
              只能是用作计数器counter或者求和sum,只能做add运算,例如:

            scala> val accum = sc.accumulator(0)
            accum: spark.Accumulator[Int] = 0 
            scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
            ...
            10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s 
            scala> accum.value
            res2: Int = 10 
    

    Spark实战1:计算某段时间内卖的最火的Item

    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *    http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package com.husor.Project
    
    import org.apache.spark.{SparkContext, SparkConf}
    import org.apache.spark.SparkContext._
    
    /**
     * Created by Kelvin Lee on 2014/12/10.
     */
    
    /* Test cases
          log=item&au=0&id=470158&sid=&_t=1417674589632
          log=item&au=0&id=357332&sid=&_t=1417737534715
          log=item&au=0&id=431796&sid=&_t=1417739107530
          log=item&au=0&id=488016&sid=&_t=1417780009676
          log=item&au=0&id=468117&sid=&_t=1417780024422
          log=item&au=0&id=468117&sid=&_t=1417780025946
          log=item&au=0&id=468117&sid=&_t=1417780025946
          log=item&au=0&id=468117&sid=&_t=1417780024422
          log=item&au=0&id=141073&sid=&_t=1418054075319
          log=item&au=0&id=141073&sid=&_t=1418054264602
        * */
    object Hot_Product_TopK {
      def main(args: Array[String]) {
    
        println("Test is starting......")
    
        System.setProperty("hadoop.home.dir", "d:\winutil\")
    
        /*if (args.length < 5) {
          System.err.println("Usage: Input <directory> , Output <directory>")
          System.exit(1)
        }
    
        val inputFile  = args(0)
        val outDir     = args(1)
        val start_time = args(2).split("_")(0) + " " + args(2).split("_")(1)
        val end_time   = args(3).split("_")(0) + " " + args(3).split("_")(1)
        val kNum       = args(4).toInt*/
    
        val inputFile  = "SparkTest/TestData/order.txt"
        val inputFile1  = "SparkTest/TestData/order1.txt"
        val outDir  = "SparkTest/Output5"
        val start_time = "2014-12-04 14:20:14"
        val end_time   = "2014-12-08 23:59:14"
        val kNum       = 2
    
        // Checks argument formats
        val logPattern = """^log=(.+)&au=(.+)&id=(.+)&sid=&_t=(.+)""".r
    
        val conf = new SparkConf().setAppName("Hot_Product_TopK").setMaster("local")
        // Create the context
        val sc = new SparkContext(conf)
    
        val orderlog1s = sc.textFile(inputFile)
        val orderlogs = orderlog1s.union(sc.textFile(inputFile1))
    
        val transferOrderLogs = orderlogs.map( (line:String) => {
    
                   // Matches related Data By Regex logPattern
                   val logPattern(itemType,userType,itemId,orderTime) = line
    
                   // Converts unixTimeStamp type to Date
                   val createdTime = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new java.util.Date(orderTime.substring(0,10).toLong*1000))
    
                   // Outputs related data what you want given the order log
                   (itemType,userType,itemId,createdTime)
        })
    
        // Gets related data between start_time and end_time
        val givenDateLogs = transferOrderLogs.filter( info => info._4 >= start_time && info._4 <= end_time )
    
        // Counts the related item Id
        val itemIdCounts = givenDateLogs.map( info1 => (info1._3,1)).reduceByKey(_ + _)
    
        //itemIdCounts.saveAsTextFile(outDir)
    
        // Sorts related item Ids according to the counts of Item Id
        val sorted = itemIdCounts.map {
          //exchange key and value
          case(key, value) => (value, key)
        }.sortByKey(true, 1)
    
        println("sorted: " + sorted)
    
        // Gets the top K's Item Ids
        val topK = sorted.top(kNum)
        // Outputs Value and Key to the Console
        topK.foreach(println)
    
        val ex_VK_KV = topK.map {
        //exchange key and value
          case(value, key) => (key, value)
        }
        // Outputs Key and Value to the Console
        ex_VK_KV.foreach(println)
    
        // Transfers Tuple's to RDD's type, storing result to the file system(such as HDFS or local file)
        sc.parallelize(ex_VK_KV,2).saveAsTextFile(outDir)
    
        sc.stop()
    
        println("Test is Succeed!!!")
    
      }
    }
  • 相关阅读:
    web接口文档apidoc的使用
    python 文件重命名
    sort()排序
    JavaScript自定义事件和触发(createEvent, dispatchEvent)
    Sql 2016 安装到sql_shared_mrconfigaction-install-confignonrc-cpu64卡住不动,是什么原因呢?
    基础提供程序在Open上失败
    win10 Hyper-v 虚拟机监控程序灰色 尝试检索虚拟交换机列表时出错【转载】
    相对路径获取文件名
    省市区三级联动
    反射==>不明确的匹配
  • 原文地址:https://www.cnblogs.com/likai198981/p/4157725.html
Copyright © 2011-2022 走看看