zoukankan      html  css  js  c++  java
  • Druid学习之路 (五)Druid的数据摄取任务类型

    作者:Syn良子 出处:https://www.cnblogs.com/cssdongl/p/9885534.html 转载请注明出处

    Druid的数据摄取任务类型


    Druid支持很多种类型的数据摄取任务.任务通过CURL POST的方式提交到Overlord节点然后分配给middle manager运行.

    Segment创建任务类型


    本地批处理索引任务

    本地批处理摄取任务

    {
    "type" : "index",
    "spec" : {
    "dataSchema" : {
      "dataSource" : "wikipedia",
      "parser" : {
        "type" : "string",
        "parseSpec" : {
          "format" : "json",
          "timestampSpec" : {
            "column" : "timestamp",
            "format" : "auto"
          },
          "dimensionsSpec" : {
            "dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
            "dimensionExclusions" : [],
            "spatialDimensions" : []
          }
        }
      },
      "metricsSpec" : [
        {
          "type" : "count",
          "name" : "count"
        },
        {
          "type" : "doubleSum",
          "name" : "added",
          "fieldName" : "added"
        },
        {
          "type" : "doubleSum",
          "name" : "deleted",
          "fieldName" : "deleted"
        },
        {
          "type" : "doubleSum",
          "name" : "delta",
          "fieldName" : "delta"
        }
      ],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "DAY",
        "queryGranularity" : "NONE",
        "intervals" : [ "2013-08-31/2013-09-01" ]
      }
    },
    "ioConfig" : {
      "type" : "index",
      "firehose" : {
        "type" : "local",
        "baseDir" : "examples/indexing/",
        "filter" : "wikipedia_data.json"
       }
    },
    "tuningConfig" : {
      "type" : "index",
      "targetPartitionSize" : 5000000,
      "maxRowsInMemory" : 75000
    }
    }
    }
    

    以上为本地索引任务的语法格式,注意type必须为"index",这个任务将本地examples/indexing/下的wikipedia_data.json文件摄取到druid的segment中去,可以通过CURL POST的方式提交到Overlord,并不需要额外的hadoop配置


    Hadoop索引任务

    {
    "type" : "index_hadoop",
    "spec" : {
    "dataSchema" : {
      "dataSource" : "wikipedia",
      "parser" : {
        "type" : "hadoopyString",
        "parseSpec" : {
          "format" : "json",
          "timestampSpec" : {
            "column" : "timestamp",
            "format" : "auto"
          },
          "dimensionsSpec" : {
            "dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
            "dimensionExclusions" : [],
            "spatialDimensions" : []
          }
        }
      },
      "metricsSpec" : [
        {
          "type" : "count",
          "name" : "count"
        },
        {
          "type" : "doubleSum",
          "name" : "added",
          "fieldName" : "added"
        },
        {
          "type" : "doubleSum",
          "name" : "deleted",
          "fieldName" : "deleted"
        },
        {
          "type" : "doubleSum",
          "name" : "delta",
          "fieldName" : "delta"
        }
      ],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "DAY",
        "queryGranularity" : "NONE",
        "intervals" : [ "2013-08-31/2013-09-01" ]
      }
    },
    "ioConfig" : {
      "type" : "hadoop",
      "inputSpec" : {
        "type" : "static",
        "paths" : "/MyDirectory/example/wikipedia_data.json"
      }
    },
    "tuningConfig" : {
      "type": "hadoop"
    }
    },
    "hadoopDependencyCoordinates": <my_hadoop_version>
    }
    

    以上为Hadoop索引任务的语法格式,注意type必须为"index_hadoop",这个任务将/MyDirectory/example/wikipedia_data.json文件摄取到druid的segment中去,注意这个路径是基于HDFS的,任务可以通过CURL POST的方式提交到Overlord,需要额外的hadoop已经配置好,因为最终会转化为MapReduce的方式来摄取


    Kafka索引任务

    {
    "type": "kafka",
    "dataSchema": {
    "dataSource": "metrics-kafka",
    "parser": {
      "type": "string",
      "parseSpec": {
        "format": "json",
        "timestampSpec": {
          "column": "timestamp",
          "format": "auto"
        },
        "dimensionsSpec": {
          "dimensions": [],
          "dimensionExclusions": [
            "timestamp",
            "value"
          ]
        }
      }
    },
    "metricsSpec": [
      {
        "name": "count",
        "type": "count"
      },
      {
        "name": "value_sum",
        "fieldName": "value",
        "type": "doubleSum"
      },
      {
        "name": "value_min",
        "fieldName": "value",
        "type": "doubleMin"
      },
      {
        "name": "value_max",
        "fieldName": "value",
        "type": "doubleMax"
      }
    ],
    "granularitySpec": {
      "type": "uniform",
      "segmentGranularity": "HOUR",
      "queryGranularity": "NONE"
    }
    },
    "tuningConfig": {
    "type": "kafka",
    "maxRowsPerSegment": 5000000
    },
    "ioConfig": {
    "topic": "metrics",
    "consumerProperties": {
      "bootstrap.servers": "localhost:9092"
    },
    "taskCount": 1,
    "replicas": 1,
    "taskDuration": "PT1H"
    }
    }
    

    以上为Kafka索引任务的语法格式,注意type必须为"kafka",这个任务通过localhost:9092端口来消费kafka中的数据并摄取到druid的segment中去,注意这个kafka摄取的任务类型还在实验阶段并且需要kafka0.10的支持


    流式Streaming push任务类型

    这种任务类型是通过Tranquility来自动化的创建realtime任务类型提交到overlord来运行.Tranquility是什么?如下为其github地址

    https://github.com/druid-io/tranquility

    我们可以利用Tranquility消费实时数据并向Druid发送实时事件流,并无缝地处理分区,复制,提供服务发现等功能.我举个栗子,比如你可以通过storm或者sparkstreaming或者flink来集成Tranquility实时的消费kafka数据流并加载到druid的segments中去并且可以同时进行实时的数据查询.这种方案要写大量代码的但是相对来说比较成熟自由度较高.随后我会找时间单独详细讲解.


    压缩任务类型

    {
    "type": "compact",
    "id": <task_id>,
    "dataSource": <task_datasource>,
    "interval": <interval to specify segments to be merged>,
    "dimensions" <custom dimensionsSpec>,
    "tuningConfig" <index task tuningConfig>,
    "context": <task context>
    }
    

    注意任务类型必须为compact,这个任务类型可以压缩指定时间段内的segments到一个新的segments并同时指定分区数和维度组合

    参考资料:Druid的任务类型总览

  • 相关阅读:
    Gym
    HDU 4087 ALetter to Programmers (三维坐标旋转 矩阵 + 矩阵快速幂)
    POJ 3845 Fractal (向量旋转,好题)
    HDU 1700 Points on Cycle (向量旋转 + 圆内接三角形周长和面积最大的是正三角形)
    POJ 1271 Nice Milk (半平面交应用)
    POJ 2540 Hotter Colder (半平面交应用 + 向量旋转)
    luoguP3705 [SDOI2017]新生舞会
    luoguP4123 [CQOI2016]不同的最小割
    luoguP2046 [NOI2010]海拔
    luoguP3227 [HNOI2013]切糕
  • 原文地址:https://www.cnblogs.com/cssdongl/p/9885534.html
Copyright © 2011-2022 走看看