zoukankan      html  css  js  c++  java
  • 【慕课网实战】Spark Streaming实时流处理项目实战笔记十一之铭文升级版

    铭文一级:

    第8章 Spark Streaming进阶与案例实战

    黑名单过滤

    访问日志 ==> DStream
    20180808,zs
    20180808,ls
    20180808,ww
    ==> (zs: 20180808,zs)(ls: 20180808,ls)(ww: 20180808,ww)

    黑名单列表 ==> RDD
    zs
    ls
    ==>(zs: true)(ls: true)

    ==> 20180808,ww

    leftjoin
    (zs: [<20180808,zs>, <true>]) x
    (ls: [<20180808,ls>, <true>]) x
    (ww: [<20180808,ww>, <false>]) ==> tuple 1

    第9章 Spark Streaming整合Flume

    Push方式整合

    Flume Agent的编写: flume_push_streaming.conf

    simple-agent.sources = netcat-source
    simple-agent.sinks = avro-sink
    simple-agent.channels = memory-channel

    simple-agent.sources.netcat-source.type = netcat
    simple-agent.sources.netcat-source.bind = hadoop000
    simple-agent.sources.netcat-source.port = 44444

    simple-agent.sinks.avro-sink.type = avro
    simple-agent.sinks.avro-sink.hostname = 192.168.199.203
    simple-agent.sinks.avro-sink.port = 41414

    simple-agent.channels.memory-channel.type = memory

    simple-agent.sources.netcat-source.channels = memory-channel
    simple-agent.sinks.avro-sink.channel = memory-channel

    flume-ng agent
    --name simple-agent
    --conf $FLUME_HOME/conf
    --conf-file $FLUME_HOME/conf/flume_push_streaming.conf
    -Dflume.root.logger=INFO,console


    hadoop000:是服务器的地址
    local的模式进行Spark Streaming代码的测试 192.168.199.203

    本地测试总结
    1)启动sparkstreaming作业
    2) 启动flume agent
    3) 通过telnet输入数据,观察IDEA控制台的输出

    spark-submit
    --class com.imooc.spark.FlumePushWordCount
    --master local[2]
    --packages org.apache.spark:spark-streaming-flume_2.11:2.2.0
    /home/hadoop/lib/sparktrain-1.0.jar
    hadoop000 41414

    铭文二级:

    第8章 Spark Streaming进阶与案例实战

    复制NetworkWordCount改成TransformApp:

    1.构建黑名单

    val blacks = List("zs","ls")

    val blacksRDD = ssc.sparkContext.parallelize(blacks).map(x=>(x,true))

    传入的数据:20180808,zs

    需要构建的各种形式:(zs: 20180808,zs)(ls: 20180808,ls)(ww: 20180808,ww)

    黑名单:(zs: true)(ls: true)

    RDD=(zs: [<20180808,zs>, <true>]) x 
    (ls: [<20180808,ls>, <true>]) x
    (ww: [<20180808,ww>, <false>]) 

    val clicklog = lines.map(x => (x.split(",")(1),x)).transform(rdd => {

      rdd.leftOuterJoin(blacksRDD)

      .filter(x => x._2._2.getOrElse(flase) != true)

      .map(x => x._2._1)

    })

    clicklog.print()    //打印来看看

    实战:整合Spark Streaming与Spark SQL的操作

    直接拷贝官方源码来测试->点击

    导入相应的包

    在pom.xml导入SparkSQL的依赖(将Spark Streaming的改成sql即可)

    官方关键代码:

    // Convert RDD[String] to RDD[case class] to DataFrame
    val wordsDataFrame = rdd.map(w => Record(w)).toDF()
    // Creates a temporary view using the DataFrame
    wordsDataFrame.createOrReplaceTempView("words")

    运行监测即可

    第9章 Spark Streaming整合Flume(push与pull方式)

    push方式(看官网):

    一、Flume配置->二、导入依赖->三、FlumeUtils->四、spark-submit提交

    一、cp exec-memory-avro.conf flume-push-streaming.conf

    修改agent、source、channel、sink名称(官网点击

    exec source改成netcat source因为等下从端口获取数据

    type、bind、port:44444

    sink改成avro sink:

    type、hostname、port:41414

    二、导入依赖(官网模板):

    资源依赖参考对比:

    Source     Artifact
    Kafka      spark-streaming-kafka-0-8_2.11
    Flume      spark-streaming-flume_2.11
    Kinesis    spark-streaming-kinesis-asl_2.11 [Amazon Software License]

    三、FlumeUtils(参数由Edit Configurations传入)返回值为JavaReceiverInputDStream:

    /**
      * Spark Streaming整合Flume的第一种方式
      */
    object FlumePushWordCount {
      def main(args: Array[String]): Unit = {
        if(args.length != 2) {
          System.err.println("Usage: FlumePushWordCount <hostname> <port>")
          System.exit(1)
        }
        val Array(hostname, port) = args
        val sparkConf = new SparkConf() //.setMaster("local[2]").setAppName("FlumePushWordCount")
        val ssc = new StreamingContext(sparkConf, Seconds(5))
        //TODO... 如何使用SparkStreaming整合Flume
        val flumeStream = FlumeUtils.createStream(ssc, hostname, port.toInt)
        flumeStream.map(x=> new String(x.event.getBody.array()).trim)
          .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    本机代码联调测试:

    1、sink上的ip改成本机ip

    2、本地测试的代码就修改成自己0.0.0.0,port为41414

    3、启动顺序:

    启动代码程序->Flume启动->telnet localhost 44444

    四、spark-submit提交到生产:

    打包:mvn clean package -DskipTests

    可以得到路径:sparktrain-1.0.jar

    传文件到虚拟机命令(仅适用于mac用户):

    scp sparktrain-1.0.jar hadoop@hadoop000:~/lib

    完整指令:

    spark-submit

    --class com.imooc.spark.FlumePushWordCount
    --master local[2]
    --packages org.apache.spark:spark-streaming-flume_2.11:2.2.0
    /home/hadoop/lib/sparktrain-1.0.jar
    hadoop000 41414

     

  • 相关阅读:
    jquery 建议编辑器
    开发中可能会用到的几个 jQuery 小提示和技巧
    Httpsqs的安装以及安装过程错误的解决方法 转
    ajax加载模块实时刷新的原理
    好用的php类库和方法
    js中masonry与infinitescroll结合 形成瀑布流
    网站架构从无到有
    可扩展Web架构与分布式系统
    JSONP跨域的原理解析
    你写的前端到底用没用到这些
  • 原文地址:https://www.cnblogs.com/kkxwz/p/8380463.html
Copyright © 2011-2022 走看看