zoukankan      html  css  js  c++  java
  • 将 Spark Streaming 的结果保存到 S3

    将spark解析的结果保存到S3
    
    这个和保存到本地的区别在于,你需要配置aws的key和密码,以及它的region,代码如下
    
    package com.alo7.spark
    
    import java.util.Properties
    import test07.DWReadS3LogToKafka_Tpuser.getProperties
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.dstream.DStream
    import org.apache.spark.streaming.kafka.KafkaUtils
    import scala.util.parsing.json.JSON
    
    object TestSaveDataToS3Time {
    
      def main(args: Array[String]): Unit = {
    
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
        Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.OFF)
    
        //
        val conf = new SparkConf().setAppName("ReadS3LogToKafka").setMaster("local[*]")
        val sc = new SparkContext(conf)
        val ssc = new StreamingContext(sc, Seconds(10))
    
        ssc.sparkContext.hadoopConfiguration.set("fs.s3a.access.key","这里是你aws的key")
        ssc.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key","这里是你aws的密码")
        ssc.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-north-1.amazonaws.com.cn")
    
    
        val zkQuorum  = "192.168.1.112:2181"
        val group = "testgroup"
        val topics = "test"
        val numThreads = 2
        val topicpMap = topics.split("
    ").map((_,numThreads.toInt)).toMap
    
        val lines: DStream[String] = KafkaUtils.createStream(ssc,zkQuorum,group,topicpMap).map(_._2)
    
        //lines.count().print()
        val analysisProps  = getProperties("/Users/huiliyang/config/tpuser_log_info_config.properties")
    
        //getKeyValue()是我的解析数据的函数
        val formatResult: DStream[String] = getKeyValue(lines,"iclass-tpuser",analysisProps).filter(!_.matches(analysisProps.getProperty("default_output")))
    
        formatResult.count().print()
        //保存数据到S3
        formatResult.saveAsTextFiles("s3a://alo7-dw/tmp/test/2017-10-26/log")
    
        ssc.start()
        ssc.awaitTermination()
      }
    
    spark与S3集成需要的jar包
    <properties>
      <scala.version>2.11.8</scala.version>
      <spark.version>2.2.0</spark.version>
      <hadoop.version>2.7.2</hadoop.version>
      <spark.pom.scope>compile</spark.pom.scope>
    </properties>
    <dependencies>
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>${spark.version}</version>
        <!--<scope>${spark.pom.scope}</scope>-->
      </dependency>
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>${spark.version}</version>
      </dependency>
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
        <version>2.2.0</version>
      </dependency>
      <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-redshift_2.11</artifactId>
        <version>3.0.0-preview1</version>
      </dependency>
      <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-avro_2.11</artifactId>
        <version>3.2.0</version>
      </dependency>
      <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
        <!--<scope>${spark.pom.scope}</scope>-->
      </dependency>
      <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-core</artifactId>
        <version>2.6.5</version>
      </dependency>
      <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.6.5</version>
      </dependency>
      <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-annotations</artifactId>
        <version>2.6.5</version>
      </dependency>
      <dependency>
        <groupId>com.fasterxml.jackson.module</groupId>
        <artifactId>jackson-module-scala_2.11</artifactId>
        <version>2.6.5</version>
      </dependency>
      <dependency>
        <groupId>com.fasterxml.jackson.module</groupId>
        <artifactId>jackson-module-paranamer</artifactId>
        <version>2.6.5</version>
      </dependency>
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-aws</artifactId>
        <version>${hadoop.version}</version>
      </dependency>
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
      </dependency>
      <dependency>
        <groupId>net.java.dev.jets3t</groupId>
        <artifactId>jets3t</artifactId>
        <version>0.9.4</version>
      </dependency>
      <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpcore</artifactId>
        <version>4.4</version>
      </dependency>
      <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
        <version>4.4</version>
      </dependency>
    

      

  • 相关阅读:
    sipp如何避免dead call
    6174问题
    笨小熊
    scanf 与 cin 的区别
    谁获得了最高奖学金
    _int64、long long 的区别
    小光棍数
    简单排序法
    La=LaULb (循环链表)
    删除重复的数(顺序有序表)
  • 原文地址:https://www.cnblogs.com/cloudrivers/p/13089870.html
Copyright © 2011-2022 走看看