<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <artifactId>bigdata</artifactId> <groupId>qinfeng.zheng</groupId> <version>1.0-SNAPSHOT</version> </parent> <groupId>qinfeng.zheng</groupId> <artifactId>spark-streaming</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-make:transitive</arg> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.18.1</version> <configuration> <useFile>false</useFile> <disableXmlReport>true</disableXmlReport> <includes> <include>**/*Test.*</include> <include>**/*Suite.*</include> </includes> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>qinfeng.zheng.java.KafkaReceiverWordCount</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 创建时间: 10:57 2018/7/8 * 修改时间: * 编码人员: ZhengQf * 版 本: 0.0.1 * 功能描述: 流式读取hdfs://hdp01:9000/wc/目录下面的文件内容,计算wordcount * 最好打成jar上传到linux服务器上运行.windows平台有时不会打印内容 */ object HDFSWordCount { def main(args: Array[String]): Unit = { // System.setProperty("HADOOP_USER_NAME","root") val conf = new SparkConf().setAppName("HDFSWordCount").setMaster("local") // val sc = new SparkContext(conf) // val rdd = sc.textFile("hdfs://hdp01:9000/wc/wc.txt") // rdd.foreach(print) val scc = new StreamingContext(conf, Seconds(10)); //同一个文件名的文件不会重复读取,即便是修改了文件内容也不会重复读取 val lines = scc.textFileStream("D:\tmp\wc") //读取本地文件 //读取hdfs上的文件,在window读取hdfs可能存在问题 // val lines = scc.textFileStream("hdfs://hdp01:9000/wc/") val words: DStream[String] = lines.flatMap(_.split(" ")) val wordPairs: DStream[(String, Int)] = words.map((_, 1)) val wc: DStream[(String, Int)] = wordPairs.reduceByKey(_ + _) //wc.saveAsTextFiles("./stream/") //指定计算结果的存储路径 wc.print() //print action算子 scc.start() scc.awaitTermination() scc.stop() } }