zoukankan      html  css  js  c++  java
  • 060 SparkStream 的wordcount示例

    1.SparkStream

      入口:StreamingContext

      抽象:DStream

    2.SparkStreaming内部原理  

      当一个批次到达的时候,会产生一个rdd,这个rdd的数据就是这个批次所接收/应该处理的数据内容,内部具体执行是rdd job的调度

      batchDuration: 产生RDD的间隔时间(定时任务,间隔给定时间后会生产一个RDD),产生的RDD会缓存到一个Map<Time, RDD>;RDD的调度当集合中有一个rdd的time时间超过当前时间的时候(>=),对应的rdd被触发操作

      

    一:安装nc

    1.说明

      netcat(nc)是一个简单而有用的工具,被誉为网络安全界的“瑞士均道”。

      不仅可以通过使用TCP或UDP协议的网络连接读写数据,同时还是一个功能强大的网络调试和探测工具,能够建立你需要的几乎所有类型的网络连接。

    2.检测nc

      

    3.安装

      sudo yum install -y nc

      

    4.检查是否可以使用

      

    5.使用数据进行测试

      在一个终端输入数据:

        

    6.解决问题

      因为,这里安装了高版本的nc,centos在6.4不适合nc。

      不建议使用nc这种yum的方式。

    7.卸载

      

    8.重新安装

      下载合适的版本

      wget   http://vault.centos.org/6.4/os/x86_64/Packages/nc-1.84-22.el6.x86_64.rpm

      rpm   -iUv    nc-1.84-22.el6.x86_64.rpm

      

    9.测试接受数据

      发送:

      

      接收:

      

    10.yum install nc.x86_64

      这样下载的nc版本是nc-1.84-24.e

      版本还是高,和直接yum install nc的版本一样。

    二:程序

    1.程序

     1 package com.stream.it
     2 
     3 import org.apache.spark.streaming.{Seconds, StreamingContext}
     4 import org.apache.spark.{SparkConf, SparkContext}
     5 
     6 object SparkStreamWordcount {
     7   def main(args: Array[String]): Unit = {
     8     val conf=new SparkConf()
     9         .setAppName("spark-streaming-wordcount")
    10           .setMaster("local[*]")
    11     val sc=SparkContext.getOrCreate(conf)
    12     val ssc=new StreamingContext(sc,Seconds(15))
    13     val hostname="linux-hadoop01.ibeifeng.com"
    14     val port=9999
    15     val dstream=ssc.socketTextStream(hostname,port)
    16 
    17     /**
    18       * 80%的RDD上的方法可以在DStream上直接使用
    19       */
    20     val resultWordcount=dstream
    21       .filter(line=>line.nonEmpty)
    22         .flatMap(line=>line.split(" ").map((_,1)))
    23         .reduceByKey(_+_)
    24     resultWordcount.foreachRDD(rdd=>{
    25       rdd.foreachPartition(iter=>iter.foreach(println))
    26     })
    27 
    28     //启动
    29     ssc.start()
    30     //等到
    31     ssc.awaitTermination()
    32   }
    33 }

    2.发送数据

      

    3.控制台

      

      

  • 相关阅读:
    Shell变量、函数
    Shell
    springMVC基础讲解
    Spring(二)--IoC&AOP
    Spring(一)--简介
    Mybatis(二)--SqlMapConfig.xml配置文件
    Mybatis(一)--简介
    框架的介绍
    博文推荐 | 下一代消息平台 Pulsar 到底是什么
    Pulsar 2.7.0 新增特性概览:事务支持、Topic 级别策略配置等
  • 原文地址:https://www.cnblogs.com/juncaoit/p/9445974.html
Copyright © 2011-2022 走看看