将spark解析的结果保存到S3 这个和保存到本地的区别在于,你需要配置aws的key和密码,以及它的region,代码如下 package com.alo7.spark import java.util.Properties import test07.DWReadS3LogToKafka_Tpuser.getProperties import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka.KafkaUtils import scala.util.parsing.json.JSON object TestSaveDataToS3Time { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.OFF) // val conf = new SparkConf().setAppName("ReadS3LogToKafka").setMaster("local[*]") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(10)) ssc.sparkContext.hadoopConfiguration.set("fs.s3a.access.key","这里是你aws的key") ssc.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key","这里是你aws的密码") ssc.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-north-1.amazonaws.com.cn") val zkQuorum = "192.168.1.112:2181" val group = "testgroup" val topics = "test" val numThreads = 2 val topicpMap = topics.split(" ").map((_,numThreads.toInt)).toMap val lines: DStream[String] = KafkaUtils.createStream(ssc,zkQuorum,group,topicpMap).map(_._2) //lines.count().print() val analysisProps = getProperties("/Users/huiliyang/config/tpuser_log_info_config.properties") //getKeyValue()是我的解析数据的函数 val formatResult: DStream[String] = getKeyValue(lines,"iclass-tpuser",analysisProps).filter(!_.matches(analysisProps.getProperty("default_output"))) formatResult.count().print() //保存数据到S3 formatResult.saveAsTextFiles("s3a://alo7-dw/tmp/test/2017-10-26/log") ssc.start() ssc.awaitTermination() } spark与S3集成需要的jar包 <properties> <scala.version>2.11.8</scala.version> <spark.version>2.2.0</spark.version> <hadoop.version>2.7.2</hadoop.version> <spark.pom.scope>compile</spark.pom.scope> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> <!--<scope>${spark.pom.scope}</scope>--> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>com.databricks</groupId> <artifactId>spark-redshift_2.11</artifactId> <version>3.0.0-preview1</version> </dependency> <dependency> <groupId>com.databricks</groupId> <artifactId>spark-avro_2.11</artifactId> <version>3.2.0</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> <!--<scope>${spark.pom.scope}</scope>--> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.6.5</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.6.5</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>2.6.5</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-scala_2.11</artifactId> <version>2.6.5</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-paranamer</artifactId> <version>2.6.5</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-aws</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>net.java.dev.jets3t</groupId> <artifactId>jets3t</artifactId> <version>0.9.4</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpcore</artifactId> <version>4.4</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.4</version> </dependency>