zoukankan      html  css  js  c++  java
  • kafka + spark Streaming + Tranquility Server发送数据到druid

      花了很长时间尝试druid官网上说的Tranquility嵌入代码进行实时发送数据到druid,结果失败了,各种各样的原因造成了失败,现在还没有找到原因,在IDEA中可以跑起,放到线上就死活不行,有成功了的同仁希望贴个链接供我来学习学习;后来又尝试了从kafka实时发送到druid,还是有些错误(现在已经解决, 后面再记录一下);最后没办法呀,使用Tranquility Server呗 _ _!

    Tranquility Server的配置和启动请移步:https://github.com/druid-io/tranquility/blob/master/docs/server.md

    (一)、在启动了自己定制的server之后可以利用druid bin目录下的generate-example-metrics生成测试数据 (定制的server.json如下)

    server.json的配置

    {
      "dataSources" : {
        "reynold_metrics" : {
          "spec" : {
            "dataSchema" : {
              "dataSource" : "reynold_metrics",
              "parser" : {
                "type" : "string",
                "parseSpec" : {
                  "timestampSpec" : {
                    "column" : "timestamp",
                    "format" : "auto"
                  },
                  "dimensionsSpec" : {
                    "dimensions" : [],
                    "dimensionExclusions" : [
                      "timestamp",
                      "value"
                    ]
                  },
                  "format" : "json"
                }
              },
              "granularitySpec" : {
                "type" : "uniform",
                "segmentGranularity" : "hour",
                "queryGranularity" : "none"
              },
              "metricsSpec" : [
                {
                  "type" : "count",
                  "name" : "count"
                },
                {
                  "name" : "value_sum",
                  "type" : "doubleSum",
                  "fieldName" : "value"
                },
                {
                  "fieldName" : "value",
                  "name" : "value_min",
                  "type" : "doubleMin"
                },
                {
                  "type" : "doubleMax",
                  "name" : "value_max",
                  "fieldName" : "value"
                }
              ]
            },
         "tuningConfig" : { "type" : "realtime", "maxRowsInMemory" : "100000", "intermediatePersistPeriod" : "PT10M", "windowPeriod" : "PT10M" } }, "properties" : { "task.partitions" : "1", "task.replicants" : "1" } } }, "properties" : { "zookeeper.connect" : "reynold-master:2181,reynold-slave02:2181,reynold-slave03:2181", "druid.discovery.curator.path" : "/druid/discovery", "druid.selectors.indexing.serviceName" : "druid/overlord", "http.port" : "8200", "http.threads" : "16" } }

    (二)、创建kafka的topic并往里面发送数据

    删除topic:kafka-topics  --delete --topic reynold --zookeeper localhost:2181
    创建topic:kafka-topics  --create --topic reynold --zookeeper localhost:2181 --partitions 10 --replication-factor 1
    消费数据:kafka-console-consumer --topic reynold --zookeeper localhost:2181 --from-beginning
    生产数据:kafka-console-producer --broker-list reynold-master:9092 --topic reynold

    {"count": 1, "value_min": 74.0, "timestamp": "2017-03-09T02:34:24.000Z", "value_max": 74.0, "metricType": "request/latency", "server": "www5.example.com", "http_method": "GET", "value_sum": 74.0, "http_code": "200", "unit": "milliseconds", "page": "/"}
    {"count": 1, "value_min": 75.0, "timestamp": "2017-03-09T02:34:24.000Z", "value_max": 75.0, "metricType": "request/latency", "server": "www5.example.com", "http_method": "GET", "value_sum": 75.0, "http_code": "200", "unit": "milliseconds", "page": "/list"}
    {"count": 1, "value_min": 143.0, "timestamp": "2017-03-09T02:38:06.000Z", "value_max": 143.0, "metricType": "request/latency", "server": "www2.example.com", "http_method": "GET", "value_sum": 143.0, "http_code": "200", "unit": "milliseconds", "page": "/"}

     (三)、使用spark streaming消费kafka中的数据并通过http发送到druid

    object SparkDruid {
    
      val kafkaParam = Map[String, String](
        "metadata.broker.list" -> "reynold-master:9092,reynold-slave01:9092,reynold-slave02:9092,reynold-slave03:9092",
        "auto.offset.reset" -> "smallest"
      )
    
      def main(args: Array[String]): Unit = {
        val sparkContext = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("SparkDruid"))
        val ssc = new StreamingContext(sparkContext, Seconds(3))
        val topic: String = "reynold" //消费的 topic 名字
        val topics: Set[String] = Set(topic) //创建 stream 时使用的 topic 名字集合
    
        var kafkaStream: InputDStream[(String, String)] = null
    
        kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParam, topics)
    
        kafkaStream.map(msg => msg._2).foreachRDD { rdd =>
          rdd.foreach(strJson => Https.post("http://reynold-master:8200/v1/post/zcx_metrics", strJson))
        }
    
        ssc.start()
        ssc.awaitTermination()
      }
    }

    Https类如下:

    import java.io.InputStreamReader
    
    import com.google.common.io.CharStreams
    import org.apache.http.client.methods.{HttpGet, HttpPost}
    import org.apache.http.entity.StringEntity
    import org.apache.http.impl.client.HttpClients
    
    /**  * 通过http请求的方式,可以
      * 1. 向druid里面发送数据
      * 2. 提供一些查询的druid的方法
      * 3. 顺带查询hbase数据的方法
      */
    object Https {
      private val httpClient = HttpClients.createDefault()
    
      def get(url: String): String = {
        val _get = new HttpGet(url)
        val resp = httpClient.execute(_get)
        try {
          if (resp.getStatusLine.getStatusCode != 200) {
            throw new RuntimeException("error: " + resp.getStatusLine)
          }
          CharStreams.toString(new InputStreamReader(resp.getEntity.getContent))
        } finally {
          resp.close()
        }
      }
    
     //既可以发送数据,也可以请求数据(以结果的形式返回)
      def post(url: String, content: String): String = {
        val _post = new HttpPost(url)
        _post.setHeader("Content-Type", "application/json")
        _post.setEntity(new StringEntity(content,"utf-8"))
        val resp = httpClient.execute(_post)
        try {
          CharStreams.toString(new InputStreamReader(resp.getEntity.getContent))
        } finally {
          resp.close()
        }
    
      }
    
      object MapTypeRef extends com.fasterxml.jackson.core.`type`.TypeReference[Map[String, Any]]
    
      object ListMapTypeRef extends com.fasterxml.jackson.core.`type`.TypeReference[List[Map[String, Any]]]
    
      def queryHBase(sql: String): List[Map[String, Any]] = {
      //将request为json格式 val request
    = new String(Mapper.mapper.writeValueAsBytes(Map( "action" -> "query", "sql" -> sql )))
      //发送json格式的请求 val resp
    = post("http://reynold-master:8209/api", request) val rs = Mapper.mapper.readValue[Map[String, Any]](resp, MapTypeRef) //val rs = Mapper.mapper.readValue[Map[String, Any]](resp, classOf[Map[String, Any]]) 这种方式也可以 rs("result").asInstanceOf[List[Map[String, Any]]] } def queryDruid(json: String): String = { post("http://reynold-master:18082/druid/v2", json) } private def getDruid(path: String): String = { get("http://reynold-master:18082/druid/v2" + path) } def druidDataSources(): List[String] = { Mapper.mapper.readValue[List[String]](getDruid("/datasources"), ListMapTypeRef) } def druidDimension(datasource: String): String = { getDruid(s"/datasources/$datasource/dimensions") } def druidMetrics(datasource: String): String = { getDruid(s"/datasources/$datasource/metrics") } def main(args: Array[String]): Unit = { println(queryHBase("select * from user_tags limit 2")) } }

    Mapper.scala

    package com.donews.util
    
    import com.fasterxml.jackson.databind.ObjectMapper
    import com.fasterxml.jackson.module.scala.DefaultScalaModule
    
    
    /**
      * Created by reynold
      */
    
    object Mapper {
      val mapper = new ObjectMapper()
      mapper.registerModule(DefaultScalaModule)
    }

    注意在pom文件中添加下面的依赖:

    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
        <version>4.3.3</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.module</groupId>
        <artifactId>jackson-module-scala_2.11</artifactId>
        <version>2.4.5</version>
    </dependency>

     

  • 相关阅读:
    CQUOJ 10819 MUH and House of Cards
    CQUOJ 9920 Ladder
    CQUOJ 9906 Little Girl and Maximum XOR
    CQUOJ 10672 Kolya and Tandem Repeat
    CQUOJ 9711 Primes on Interval
    指针试水
    Another test
    Test
    二分图匹配的重要概念以及匈牙利算法
    二分图最大匹配
  • 原文地址:https://www.cnblogs.com/leixingzhi7/p/6524802.html
Copyright © 2011-2022 走看看