zoukankan      html  css  js  c++  java
  • spark streaming简单示例

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <artifactId>bigdata</artifactId>
            <groupId>qinfeng.zheng</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <groupId>qinfeng.zheng</groupId>
        <artifactId>spark-streaming</artifactId>
        <version>1.0-SNAPSHOT</version>
    
    
        <dependencies>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.10</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.10</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka_2.10</artifactId>
            </dependency>
    
        </dependencies>
    
        <build>
            <sourceDirectory>src/main/scala</sourceDirectory>
            <plugins>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.0</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                            <configuration>
                                <args>
                                    <arg>-make:transitive</arg>
                                    <arg>-dependencyfile</arg>
                                    <arg>${project.build.directory}/.scala_dependencies</arg>
                                </args>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-surefire-plugin</artifactId>
                    <version>2.18.1</version>
                    <configuration>
                        <useFile>false</useFile>
                        <disableXmlReport>true</disableXmlReport>
                        <includes>
                            <include>**/*Test.*</include>
                            <include>**/*Suite.*</include>
                        </includes>
                    </configuration>
                </plugin>
    
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.3</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                                <transformers>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass>qinfeng.zheng.java.KafkaReceiverWordCount</mainClass>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </project>
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.DStream
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * 创建时间: 10:57 2018/7/8
      * 修改时间:
      * 编码人员: ZhengQf 
      * 版   本: 0.0.1
      * 功能描述: 流式读取hdfs://hdp01:9000/wc/目录下面的文件内容,计算wordcount
      *         最好打成jar上传到linux服务器上运行.windows平台有时不会打印内容
      */
    object HDFSWordCount {
      def main(args: Array[String]): Unit = {
        //    System.setProperty("HADOOP_USER_NAME","root")
        val conf = new SparkConf().setAppName("HDFSWordCount").setMaster("local")
        //     val sc = new SparkContext(conf)
        //     val rdd = sc.textFile("hdfs://hdp01:9000/wc/wc.txt")
        //     rdd.foreach(print)
        val scc = new StreamingContext(conf, Seconds(10));
        //同一个文件名的文件不会重复读取,即便是修改了文件内容也不会重复读取
        val lines = scc.textFileStream("D:\tmp\wc")   //读取本地文件
        //读取hdfs上的文件,在window读取hdfs可能存在问题
    //    val lines = scc.textFileStream("hdfs://hdp01:9000/wc/")
    
        val words: DStream[String] = lines.flatMap(_.split(" "))
        val wordPairs: DStream[(String, Int)] = words.map((_, 1))
        val wc: DStream[(String, Int)] = wordPairs.reduceByKey(_ + _)
        //wc.saveAsTextFiles("./stream/") //指定计算结果的存储路径
        wc.print() //print  action算子
        scc.start()
        scc.awaitTermination()
        scc.stop()
      }
    
    }
  • 相关阅读:
    jQuery的遍历方法
    xampp配置host和httpd可以随意访问任何本机的地址
    JavaScript的this简单实用
    移动端rem布局和百分比栅格化布局
    你知道用AngularJs怎么定义指令吗?
    谈谈Angular关于$watch,$apply 以及 $digest的工作原理
    深入了解Angularjs指令中的ngModel
    如何将angularJs项目与requireJs集成
    requireJS(二)
    requireJS(一)
  • 原文地址:https://www.cnblogs.com/z-qinfeng/p/11861854.html
Copyright © 2011-2022 走看看