zoukankan      html  css  js  c++  java
  • Spark Streaming实现实时流处理

    一、Streaming与Flume的联调

    Spark 2.2.0 对应于 Flume 1.6.0
     
    两种模式:
     
    1. Flume-style push-based approach:
     
    Flume推送数据給Streaming
     
    Streaming的receiver作为Flume的Avro agent
     
    Spark workers应该跑在Flume这台机器上
     
    Streaming先启动,receiver监听Flume push data的端口
     
     
    实现:
     
    写flume配置文件:
    netcat source -> memory channel -> avro sink
     
    IDEA开发:
    添加Spark-flume依赖
    对应的API是FlumeUtils
     
    开发代码:
     
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.flume.FlumeUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
     
    /*
    * Spark Streaming整合Flume的第一种方式
    * */
    object FlumePushWordCount {
      def main(args: Array[String]): Unit = {
     
        //外部传入参数
        if (args.length != 2) {
          System.out.println("Usage: FlumePushWordCount <hostname> <port>")
          System.exit(1)
        }
     
        val Array(hostname, port) = args  //外部args数组
     
        val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount")
        val ssc = new StreamingContext(sparkConf, Seconds(5))
     
        //选择输入ssc的createStream方法,生成一个InputDStream
        val flumeStream = FlumeUtils.createStream(ssc, hostname, port.toInt)
     
        //由于flume的内容有head有body, 需要先把内容拿出来, 并去掉空值
        flumeStream.map(x => new String(x.event.getBody.array()).trim)
            .flatMap(x => x.split(" ")).map(x => (x, 1)).reduceByKey(_+_).print()
     
        ssc.start()
        ssc.awaitTermination()
      }
    }
     
     
    注意:为了不hard-core,选择外部传入hostname和port
     
    在IDEA测试时,可以在
    里面的program argument输入运行参数
     
    在本地测试时:
     
    先启动Streaming作业,然后启动flume agent,最后通过telnet输入数据,观察IDEA的控制台输出
     
     
    在服务器测试时:
     
    submit时一定要把maven依赖中在--packages加上,自动会在网络上下载依赖
    当不能下载时,需要--jars才能把预先下载好的jar包加上
     
     
     
    2. Pull-based approach using a custom sink:
     
    Streaming拉数据
     
    Flume推送的数据先放到sink缓冲区
     
    Streaming使用一个reliable flume receiver,确保了数据的接收和备份
     
    可靠性更高,支持容错,生产上面常用
     
    一台机器运行Flume agent,Spark集群其他机器可访问这台机器的custom sink
     
    实现:
     
    Flume配置:
    使用相关jars包,配置依赖:(参考Spark官网)
    sink是一个独特的type
     
    IDEA开发:
    对应上面Flume的依赖,使用的是createPollStream,区别于第一种模式
    其他地方都一样,体现了Spark代码的复用性
     
    本地测试:
    先启动flume!!后启动Streaming作业
     
     
     
    二、Streaming与Kafka的联调
     
    Spark2.2.0对应于Kafka 0.8.2.1或更新(本次使用的是0.9.0.0)
     
    两种模式:
     
    1. Receiver-based approach
     
    使用Kafka高级用户API
    为了确保零数据丢失,需要用到Write Ahead Logs(出现于Spark 1.2)
    同步地保存接收到的数据到日志当中,出错时可以恢复(容错机制)
    这是传统的方式,在ZK server中消费数据
     
    用KafkaUtils和Streaming对接,一样需要加入kafka的各种依赖(见官网)
    使用的API是createStream
     
    注意:
    1. 此处的topic分区和RDD的分区不同概念
    2. 多个Kafka DStream可以并行接收
    3. 用write ahead logs时需要配置StorageLevel.MEMORY_AND_DISK_SER
     
     
    准备工作:
     
    启动ZK server
    启动kafka
    ./bin/kafka-server-start.sh -daemon ./config/server.properties
    创建topic
    ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka_streaming_topic
    测试topic能否正确生产和消费
    kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_streaming_topic
    kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka_streaming_topic
     
    IDEA代码:
     
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
     
    /*
    * SparkStreaming对接Kafka其中的Receiver-based方式
    * */
    object KafkaReceiverWordCount {
      def main(args: Array[String]): Unit = {
     
        if (args.length != 4) {
          System.out.println("Usage: KafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>")
          System.exit(1)
        }
     
        val Array(zkQuorum, group, topics, numThreads) = args
     
        val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaReceiverWordCount")
        val ssc = new StreamingContext(sparkConf, Seconds(5))
     
        //createStream需要传入的其中一个参数是一个Map,就是topics对应的线程数
        val topicsMap = topics.split(",").map((_, numThreads.toInt)).toMap
     
        val message = KafkaUtils.createStream(ssc, zkQuorum, group, topicsMap)
     
        //一定要取Stream的第二位才是数据,可以print出来看看,在实际生产中只是更改这一行的业务逻辑!!!
        message.map(_._2).flatMap(_.split(",")).map((_, 1)).reduceByKey(_+_).print()
     
        ssc.start()
        ssc.awaitTermination()
      }
    }
     
     
    本地测试/服务器测试:
     
    从IDEA中输入参数,即可看到结果
     
    从服务器测试也是打包submit就行,看web UI的时候留意验证receiver是占有一个Job的,证实了前面的理论
     
     
     
    2. Direct Approach
     
    No receiver!!!
     
    Spark 1.3 版本开始有
     
    没有了Receiver,而是周期性地检测Kafka的offset,用了kafka simple consumer API
     
    优点:
    1. 简化了并行度,不需要创建多个input stream
    2. 性能更好,达到零数据丢失,且不需要保存副本于write ahead logs中
    3.  一次语义Exactly-once semantics
     
    缺点:不能在zookeeper中更新offset,但可以自己设置让其更新
     
     使用的API是createDirectStream
     
    准备工作和上面一样。
     
    IDEA代码:
     
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
     
     
    /*
    * SparkStreaming对接Kafka其中的Direct方式
    * */
    object KafkaDirectWordCount {
      def main(args: Array[String]): Unit = {
     
        if (args.length != 4) {
          System.out.println("Usage: KafkaReceiverWordCount <brokers> <topics>")
          System.exit(1)
        }
     
        val Array(brokers, topics) = args
     
        val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaReceiverWordCount")
        val ssc = new StreamingContext(sparkConf, Seconds(5))
     
        //createDirectStream需要传入kafkaParams和topicsSet
        val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
        val topicsSet = topics.split(",").toSet
     
        val message = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
          ssc, kafkaParams, topicsSet
        )
     
     
        //一定要取Stream的第二位才是数据,可以print出来看看
        message.map(_._2).flatMap(_.split(",")).map((_, 1)).reduceByKey(_+_).print()
     
        ssc.start()
        ssc.awaitTermination()
      }
    }
     
     
    注意:StringDecoder有可能因为前面写Kafka java API时的包冲突而导入失败
     
    在IDEA运行时报错:
    这是由于之前在Kafka基础学习中我设置的kafka的依赖是0.9.0.0,和我们IDEA冲突,所以要把这一个依赖注释掉才能执行
     
    调优时就是配置createDirectStream的参数嘛!!
     
     
     
    三、Flume + Kafka + Spark Streaming常用流处理架构
     
    实现的需求:实时(到现在为止)的日志访问统计操作
     
    由于本人缺乏日志采集来源,故使用python语言来实现一个日志生成器,模拟生产环境中服务器不断生成日志的过程
    本生成器产生的日志内容包括ip、time、url、status、referer
     
    根据前面的知识,我们在实现的过程中有以下步骤:
    1. Flume的选型,在本例中设为exec-memory-kafka
    2. 打开kafka一个消费者,再启动flume读取日志生成器中的log文件,可看到kafka中成功读取到日志产生器的实时数据
    3. 让Kafka接收到的数据传输到Spark Streaming当中,这样就可以在Spark对实时接收到的数据进行操作了
     
    由于与前面一、二的操作基本一致,此处不再重复列出详细操作过程
     
     
    下面直接进入Spark中对实时数据的操作:
     
    分为数据清洗过程、统计功能实现过程两个步骤!其中统计功能的实现基本上和Spark SQL中的操作一致,这又体现了Spark的代码复用性,即能通用于多个框架中
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
  • 相关阅读:
    关于工作流程引擎中的岗位的设置的问题
    将要发布的开源CCOA的照片一览
    关于多个checkbox在IE6下自由表单设计器中的兼容问题
    ccflow流程自动发起功能增加,如何按指定的时间触发方式发起流程?
    Windows 如何远程强行关机
    Report bulder
    微软sample and code down web address
    如何查看sql server的死锁情况
    如何读取数据的所有用户表
    复制和数据库镜像
  • 原文地址:https://www.cnblogs.com/kinghey-java-ljx/p/8544405.html
Copyright © 2011-2022 走看看