zoukankan      html  css  js  c++  java
  • SparkStreaming wordCountDemo基础案例

    体现sparkStreaming的秒级准实时性,所以我们需要一个能够持续输入数据的东东

    1.CentOS上下载nc

    创建一个scala工程,导入相关pom依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.shiao</groupId>
        <artifactId>spark-01</artifactId>
        <version>1.0</version>
    
        <packaging>jar</packaging>
    
        <properties>
            <scala.version>2.11.8</scala.version>
            <hadoop.version>2.7.4</hadoop.version>
            <spark.version>2.0.2</spark.version>
        </properties>
    
        <dependencies>
            <!--scala依赖-->
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
            </dependency>
            <!--spark依赖-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <!--hadoop依赖-->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
    
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.30</version>
            </dependency>
    
    
            <!--引入spark-streaming依赖-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>2.0.2</version>
            </dependency>
    
        </dependencies>
    
    
    
    
        <!--配置插件-->
        <build>
            <plugins>
                <!--scala编译插件-->
                <plugin>
                    <groupId>org.scala-tools</groupId>
                    <artifactId>maven-scala-plugin</artifactId>
                    <version>2.15.2</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
                <!--项目打包插件-->
                <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>WordCount</mainClass>
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                </plugin>
            </plugins>
    
        </build>
    
        
    </project>
    

      创建一个object

    编写代码

    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    object SparkStreamingWordCount {
      def main(args: Array[String]): Unit = {
    
    
        //创建sparkContext
        val configStr = new SparkConf().setAppName("SparkStreamingWordCount").setMaster("local[2]")
        val sc = new SparkContext(configStr)
    
        //创建streamingContext
        val scc = new StreamingContext(sc, Seconds(5))
    
        //去掉多余的日志,影响观看
        sc.setLogLevel("WARN")
    
        //创建receive获取socket数据
        val lines: ReceiverInputDStream[String] = scc.socketTextStream("192.168.52.110", 9999)
    
        //计数处理,以逗号划分,分成一个个字符串;对每个字符串进行处理成值为1的元组;对相同单词进行相加;进行打印
        val value: DStream[(String, Int)] = lines.flatMap(_.split("\,")).map((_, 1)).reduceByKey(_ + _)
        value.print()
    
        //开启并阻塞线程,以保持不断获取
        scc.start()
        scc.awaitTermination()
      }
    }

    跑起来

    使用scoket nc打开9999端口发送数据

     测试

  • 相关阅读:
    selenium
    python第三方模块的安装
    程序员学习网站
    python 数据较大 性能分析
    linux ~/ 和 /
    VMWare虚拟机 window文件传递
    vi命令
    os.system
    win10系统进入BIOS
    pyinstaller将python脚本生成exe
  • 原文地址:https://www.cnblogs.com/BigDataBugKing/p/11227899.html
Copyright © 2011-2022 走看看