zoukankan      html  css  js  c++  java
  • Spark Streaming整合logstash + Kafka wordCount

    1、安装logstash,直接解压即可

    测试logstash是否可以正常运行

    bin/logstash -e 'input { stdin { } } output { stdout {codec => rubydebug } }'

    只获取消息

    bin/logstash -e 'input { stdin { } } output { stdout {codec => plain { format => "%{message}" } } }'

    2、编写logstash配置文件
    2、1在logstash目录下创建conf目录
    2、2在conf目录下创建文件logstash.conf,内容如下

    input {
    file {
    type => "logs"
    path => "/home/hadoop/logs/*.log"
    discover_interval => 10
    start_position => "beginning" 
    }
    }
    
    output {
    kafka {
    codec => plain {
    format => "%{message}"
    }
    topic_id => "spark"	
    }
    }

    logstash input: https://www.elastic.co/guide/en/logstash/current/input-plugins.html
    logstash output: https://www.elastic.co/guide/en/logstash/current/output-plugins.html

    3、启动logstash采集数据

    bin/logstash -f conf/logstash.conf

    4、代码

    package bigdata.spark
    
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkContext, SparkConf}
    
    /**
      * Created by Administrator on 2017/4/28.
      */
    object SparkStreamDemo {
      def main(args: Array[String]) {
    
        val conf = new SparkConf()
        conf.setAppName("spark_streaming")
        conf.setMaster("local[*]")
    
        val sc = new SparkContext(conf)
        sc.setCheckpointDir("D:/checkpoints")
        sc.setLogLevel("ERROR")
    
        val ssc = new StreamingContext(sc, Seconds(5))
    
        val topics = Map("spark" -> 2)
        val lines = KafkaUtils.createStream(ssc, "m1:2181,m2:2181,m3:2181", "spark", topics).map(_._2)
    
        val ds1 = lines.flatMap(_.split(" ")).map((_, 1))
    
        val ds2 = ds1.updateStateByKey[Int]((x:Seq[Int], y:Option[Int]) => {
          Some(x.sum + y.getOrElse(0))
        })
    
        ds2.print()
    
        ssc.start()
        ssc.awaitTermination()
    
      }
    }
    

      

  • 相关阅读:
    hdu1506(dp)
    windows下安装JMeter
    phpstudy 80端口被占用,修改端口
    久违的phpstorm
    软件项目版本号的命名规则及格式
    phpstudy 局域网访问
    java+eclipse+selenium环境搭建
    软件测试方法汇总
    功能测试大全
    如何有效地描述软件缺陷(Defect)?
  • 原文地址:https://www.cnblogs.com/heml/p/6796131.html
Copyright © 2011-2022 走看看