zoukankan      html  css  js  c++  java
  • 将 Spark Streaming 的结果保存到 S3

    将spark解析的结果保存到S3
    
    这个和保存到本地的区别在于,你需要配置aws的key和密码,以及它的region,代码如下
    
    package com.alo7.spark
    
    import java.util.Properties
    import test07.DWReadS3LogToKafka_Tpuser.getProperties
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.dstream.DStream
    import org.apache.spark.streaming.kafka.KafkaUtils
    import scala.util.parsing.json.JSON
    
    object TestSaveDataToS3Time {
    
      def main(args: Array[String]): Unit = {
    
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
        Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.OFF)
    
        //
        val conf = new SparkConf().setAppName("ReadS3LogToKafka").setMaster("local[*]")
        val sc = new SparkContext(conf)
        val ssc = new StreamingContext(sc, Seconds(10))
    
        ssc.sparkContext.hadoopConfiguration.set("fs.s3a.access.key","这里是你aws的key")
        ssc.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key","这里是你aws的密码")
        ssc.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-north-1.amazonaws.com.cn")
    
    
        val zkQuorum  = "192.168.1.112:2181"
        val group = "testgroup"
        val topics = "test"
        val numThreads = 2
        val topicpMap = topics.split("
    ").map((_,numThreads.toInt)).toMap
    
        val lines: DStream[String] = KafkaUtils.createStream(ssc,zkQuorum,group,topicpMap).map(_._2)
    
        //lines.count().print()
        val analysisProps  = getProperties("/Users/huiliyang/config/tpuser_log_info_config.properties")
    
        //getKeyValue()是我的解析数据的函数
        val formatResult: DStream[String] = getKeyValue(lines,"iclass-tpuser",analysisProps).filter(!_.matches(analysisProps.getProperty("default_output")))
    
        formatResult.count().print()
        //保存数据到S3
        formatResult.saveAsTextFiles("s3a://alo7-dw/tmp/test/2017-10-26/log")
    
        ssc.start()
        ssc.awaitTermination()
      }
    
    spark与S3集成需要的jar包
    <properties>
      <scala.version>2.11.8</scala.version>
      <spark.version>2.2.0</spark.version>
      <hadoop.version>2.7.2</hadoop.version>
      <spark.pom.scope>compile</spark.pom.scope>
    </properties>
    <dependencies>
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>${spark.version}</version>
        <!--<scope>${spark.pom.scope}</scope>-->
      </dependency>
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>${spark.version}</version>
      </dependency>
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
        <version>2.2.0</version>
      </dependency>
      <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-redshift_2.11</artifactId>
        <version>3.0.0-preview1</version>
      </dependency>
      <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-avro_2.11</artifactId>
        <version>3.2.0</version>
      </dependency>
      <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
        <!--<scope>${spark.pom.scope}</scope>-->
      </dependency>
      <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-core</artifactId>
        <version>2.6.5</version>
      </dependency>
      <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.6.5</version>
      </dependency>
      <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-annotations</artifactId>
        <version>2.6.5</version>
      </dependency>
      <dependency>
        <groupId>com.fasterxml.jackson.module</groupId>
        <artifactId>jackson-module-scala_2.11</artifactId>
        <version>2.6.5</version>
      </dependency>
      <dependency>
        <groupId>com.fasterxml.jackson.module</groupId>
        <artifactId>jackson-module-paranamer</artifactId>
        <version>2.6.5</version>
      </dependency>
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-aws</artifactId>
        <version>${hadoop.version}</version>
      </dependency>
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
      </dependency>
      <dependency>
        <groupId>net.java.dev.jets3t</groupId>
        <artifactId>jets3t</artifactId>
        <version>0.9.4</version>
      </dependency>
      <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpcore</artifactId>
        <version>4.4</version>
      </dependency>
      <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
        <version>4.4</version>
      </dependency>
    

      

  • 相关阅读:
    DEDECMS里面DEDE函数解析
    dede数据库类使用方法 $dsql
    DEDE数据库语句 DEDESQL命令批量替换 SQL执行语句
    织梦DedeCms网站更换域名后文章图片路径批量修改
    DSP using MATLAB 示例 Example3.12
    DSP using MATLAB 示例 Example3.11
    DSP using MATLAB 示例 Example3.10
    DSP using MATLAB 示例Example3.9
    DSP using MATLAB 示例Example3.8
    DSP using MATLAB 示例Example3.7
  • 原文地址:https://www.cnblogs.com/cloudrivers/p/13089870.html
Copyright © 2011-2022 走看看