之前测试的一些spark案例都是采用离线处理,spark streaming的流处理一样可以运行经典的wordcount。
基本环境:
spark-2.0.0
scala-2.11.0
IDEA-15.0.6
创建项目,贴上代码:
package org.iie import org.apache.log4j.{Level,Logger} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.SparkConf import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext} /** * Created by hbwxcw on 2016/12/9. */ object NetworkWordCount { def main(args: Array[String]) { import org.apache.log4j.{Level,Logger} Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.spark.sql").setLevel(Level.WARN) Logger.getLogger("org.apache.spark.streaming").setLevel(Level.WARN) val sparkConf = new SparkConf().setAppName("nwc") val ssc = new StreamingContext(sparkConf,Seconds(1)) val lines = ssc.socketTextStream(args(0),args(1).toInt,StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x,1)).reduceByKey(_+_) wordCounts.print() ssc.start() ssc.awaitTermination() } }
记得在pom.xml下引入依赖:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.0.0</version> </dependency>
再生成jar包传到集群上,用spark-submit进行提交。
但是得注意后面得加上主机名和端口号。。
你就会看到下面这个家伙= =。
表示上面那个ERROR对结果没什么影响啊,不管它= =。。
再在另外一个窗口运行:
nc -l -p 9999
我用的是9999端口。。你们随意。。。
网上好多博客都用的是nc -lk 9999,反正我是没用,疑似版本问题。。。
顺便贴一下结果:
在一端用nc输入:o o a a ss s aa aa
另外一端出现: