zoukankan      html  css  js  c++  java
  • Spark Streaming整合Flume + Kafka wordCount

    flume配置文件 flume_to_kafka.conf

    a1.sources = r1 
    a1.sinks = k1 
    a1.channels = c1 
    
    a1.sources.r1.type = spooldir 
    a1.sources.r1.channels = c1 
    a1.sources.r1.spoolDir = /home/hadoop/logs/ 
    a1.sources.r1.fileHeader = true 
    
    a1.channels.c1.type = memory 
    a1.channels.c1.capacity = 10000 
    a1.channels.c1.transactionCapacity = 10000 
    a1.channels.c1.byteCapacityBufferPercentage = 20 
    a1.channels.c1.byteCapacity = 800000 
    
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink 
    a1.sinks.k1.topic = spark 
    a1.sinks.k1.brokerList = m1:9092,m2:9092,m3:9092 
    a1.sinks.k1.requiredAcks = 1 
    a1.sinks.k1.batchSize = 20 
    a1.sinks.k1.channel = c1
    
    # Bind the source and sink to the channel 
    a1.sources.r1.channels = c1 
    a1.sinks.k1.channel = c1 
    

    kafka

    1、启动kafka

    ./bin/kafka-server-start.sh ./config/server.properties

    2、创建spark topic

    bin/kafka-topics.sh --create --zookeeper m1:2181 --replication-factor 2 --partitions 2 --topic spark

    启动flume

    flume-ng agent -c conf/ -f conf/flume_to_kafka.conf -n a1

    测试是否可以正常消费到数据

    bin/kafka-console-consumer.sh --bootstrap-server m1:9092,m2:9092,m3:9092 --from-beginning --topic spark

    代码实现

    object SparkStreamDemo {
      def main(args: Array[String]) {
    
        val conf = new SparkConf()
        conf.setAppName("spark_streaming")
        conf.setMaster("local[*]")
    
        val sc = new SparkContext(conf)
        sc.setCheckpointDir("D:/checkpoints")
        sc.setLogLevel("ERROR")
    
        val ssc = new StreamingContext(sc, Seconds(5))
    
        val topics = Map("spark" -> 2)
        val lines = KafkaUtils.createStream(ssc, "m1:2181,m2:2181,m3:2181", "spark", topics).map(_._2)
    
        val ds1 = lines.flatMap(_.split(" ")).map((_, 1))
    
        val ds2 = ds1.updateStateByKey[Int]((x:Seq[Int], y:Option[Int]) => {
          Some(x.sum + y.getOrElse(0))
        })
    
        ds2.print()
    
        ssc.start()
        ssc.awaitTermination()
    
      }
    }
    

      

  • 相关阅读:
    ElastaticSearch学习笔记(三) ----- 聚合查询
    ElastaticSearch学习笔记(二) ----- DSL查询与过滤
    ElastaticSearch学习笔记(一) ----- 基础概念
    BizTalk连接SAP方法
    解决spark日志清理问题
    Spark Standalone模式 高可用部署
    @Data注解踩坑之大小写
    SVN No such revision *
    svn: Base checksum mismatch on
    Bean的原始版本与最终版本不一致?记一次Spring IOC探索之旅
  • 原文地址:https://www.cnblogs.com/heml/p/6795206.html
Copyright © 2011-2022 走看看