Druid是一个用于大数据实时查询和分析的高容错、高性能开源分布式系统,旨在快速处理大规模的数据,并能够实现快速查询和分析。尤其是当发生代码部署、机器故障以及其他产品系统遇到宕机等情况时,Druid仍能够保持100%正常运行。创建Druid的最初意图主要是为了解决查询延迟问题,当时试图使用Hadoop来实现交互式查询分析,但是很难满足实时分析的需要。而Druid提供了以交互方式访问数据的能力,并权衡了查询的灵活性和性能而采取了特殊的存储格式。
Druid功能介于PowerDrill和Dremel之间,它几乎实现了Dremel的所有功能,并且从PowerDrill吸收一些有趣的数据格式。Druid允许以类似Dremel和PowerDrill的方式进行单表查询,同时还增加了一些新特性,如为局部嵌套数据结构提供列式存储格式、为快速过滤做索引、实时摄取和查询、高容错的分布式体系架构等。从官方得知,Druid的具有以下主要特征:
- 为分析而设计——Druid是为OLAP工作流的探索性分析而构建,它支持各种过滤、聚合和查询等类;
- 快速的交互式查询——Druid的低延迟数据摄取架构允许事件在它们创建后毫秒内可被查询到;
- 高可用性——Druid的数据在系统更新时依然可用,规模的扩大和缩小都不会造成数据丢失;
- 可扩展——Druid已实现每天能够处理数十亿事件和TB级数据。
Druid应用最多的是类似于广告分析创业公司Metamarkets中的应用场景,如广告分析、互联网广告系统监控以及网络监控等。当业务中出现以下情况时,Druid是一个很好的技术方案选择:
- 需要交互式聚合和快速探究大量数据时;
- 需要实时查询分析时;
- 具有大量数据时,如每天数亿事件的新增、每天数10T数据的增加;
- 对数据尤其是大数据进行实时分析时;
- 需要一个高可用、高容错、高性能数据库时。
Historical节点 :对非实时数据进行处理存储和查询
Realtime节:实时摄取数据、监听输入数据流
Coordinator节点:监控Historical节点
Broker节点:接收来自外部客户端的查询和将查询转发到Realtime和Historical节点
Indexer节点:负责索引服务
一个Druid集群有各种类型的节点(Node)组成,每个节点都可以很好的处理一些的事情,这些节点包括对非实时数据进行处理存储和查询的Historical节点、实时摄取数据、监听输入数据流的Realtime节、监控Historical节点的Coordinator节点、接收来自外部客户端的查询和将查询转发到Realtime和Historical节点的Broker节点、负责索引服务的Indexer节点。
查询操作中数据流和各个节点的关系如下图所示:
如下图是Druid集群的管理层架构,该图展示了相关节点和集群管理所依赖的其他组件(如负责服务发现的ZooKeeper集群)的关系:
一、Druid简介
二、Druid架构组成及相关依赖
三、Druid集群配置
四、Druid集群启动
五、Druid查询
六、后记
一、Druid简介
Druid是一个为大型冷数据集上实时探索查询而设计的开源数据分析和存储系统,提供极具成本效益并且永远在线的实时数据摄取和任意数据处理。
主要特性:
- 为分析而设计——Druid是为OLAP工作流的探索性分析而构建。它支持各种filter、aggregator和查询类型,并为添加新功能提供了一个框架。用户已经利用Druid的基础设施开发了高级K查询和直方图功能。
- 交互式查询——Druid的低延迟数据摄取架构允许事件在它们创建后毫秒内查询,因为Druid的查询延时通过只读取和扫描有必要的元素被优化。Aggregate和 filter没有坐等结果。
- 高可用性——Druid是用来支持需要一直在线的SaaS的实现。你的数据在系统更新时依然可用、可查询。规模的扩大和缩小不会造成数据丢失。
- 可伸缩——现有的Druid部署每天处理数十亿事件和TB级数据。Druid被设计成PB级别。
就系统而言,Druid功能位于PowerDrill和Dremel之间。它实现几乎所有Dremel提供的工具(Dremel处理任意嵌套数据结构,而Druid只允许一个基于数组的嵌套级别)并且从PowerDrill吸收一些有趣的数据格式和压缩方法。
Druid对于需要实时单一、海量数据流摄取产品非常适合。特别是如果你面向无停机操作时,如果你对查询查询的灵活性和原始数据访问要求,高于对速度和无停机操作,Druid可能不是正确的解决方案。在谈到查询速度时候,很有必要澄清“快速”的意思是:Druid是完全有可能在6TB的数据集上实现秒级查询。
二、Druid架构组成及其他依赖
2.1 Overlord Node (Indexing Service)
Overlord会形成一个加载批处理和实时数据到系统中的集群,同时会对存储在系统中的数据变更(也称为索引服务)做出响应。另外,还包含了Middle Manager和Peons,一个Peon负责执行单个task,而Middle Manager负责管理这些Peons。
2.2 Coordinator Node
监控Historical节点组,以确保数据可用、可复制,并且在一般的“最佳”配置。它们通过从MySQL读取数据段的元数据信息,来决定哪些数据段应该在集群中被加载,使用Zookeeper来确定哪个Historical节点存在,并且创建Zookeeper条目告诉Historical节点加载和删除新数据段。
2.3 Historical Node
是对“historical”数据(非实时)进行处理存储和查询的地方。Historical节点响应从Broker节点发来的查询,并将结果返回给broker节点。它们在Zookeeper的管理下提供服务,并使用Zookeeper监视信号加载或删除新数据段。
2.4 Broker Node
接收来自外部客户端的查询,并将这些查询转发到Realtime和Historical节点。当Broker节点收到结果,它们将合并这些结果并将它们返回给调用者。由于了解拓扑,Broker节点使用Zookeeper来确定哪些Realtime和Historical节点的存在。
2.5 Real-time Node
实时摄取数据,它们负责监听输入数据流并让其在内部的Druid系统立即获取,Realtime节点同样只响应broker节点的查询请求,返回查询结果到broker节点。旧数据会被从Realtime节点转存至Historical节点。
2.6 ZooKeeper
为集群服务发现和维持当前的数据拓扑而服务;
2.7 MySQL
用来维持系统服务所需的数据段的元数据;
2.8 Deep Storage
保存“冷数据”,可以使用HDFS。
三、Druid集群配置
3.1 环境信息
我这里有两台机器,node1有32G内存,上面部署了Histotical Node和Coordinator Node;node2有72G内存,上面部署了其他四个服务。
3.2 通用配置(Common Configuration)
##创建MySQL数据库
CREATE DATABASE `druid` DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
grant all on druid.* to druid@’%’ identified by ‘druid1234′ WITH GRANT OPTION;
flush privileges;
##配置文件
cd $DRUID_HOME/config/_common
vi common.runtime.properties(所有节点)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
##使用Mysql存储元数据 druid.extensions.coordinates=[ "io.druid.extensions:druid-examples" , "io.druid.extensions:druid-kafka-eight" , "io.druid.extensions:mysql-metadata-storage" ] ##zookeeper druid.zk.service.host=zkNode1:2181,zkNode2:2181,zkNode3:2181 ##Mysql配置 druid.metadata.storage. type =mysql druid.metadata.storage.connector.connectURI=jdbc:mysql: //node1 :3306 /druid druid.metadata.storage.connector.user=druid druid.metadata.storage.connector.password=diurd1234 ##配置deep storage到HDFS druid.storage. type =hdfs druid.storage.storageDirectory=hdfs: //cdh5/tmp/druid/storage ##配置查询缓存,暂用本地,可配置memcached druid.cache. type = local druid.cache.sizeInBytes=10737418240 ##配置监控 druid.monitoring.monitors=[ "com.metamx.metrics.JvmMonitor" ] ##配置Indexing service的名字 druid.selectors.indexing.serviceName=druid /overlord ## druid.emitter=logging |
3.3 Overlord Node(Indexing Service)
在运行Overlord Node节点上:
cd $DRUID_HOME/config/overlord
vi runtime.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
druid.host=node2 druid.port=8090 druid.service=druid /overlord # Only required if you are autoscaling middle managers druid.indexer.autoscale.doAutoscale= true druid.indexer.autoscale.strategy=ec2 druid.indexer.autoscale.workerIdleTimeout=PT90m druid.indexer.autoscale.terminatePeriod=PT5M druid.indexer.autoscale.workerVersion=0 # Upload all task logs to deep storage druid.indexer.logs. type =hdfs druid.indexer.logs.directory=hdfs: //cdh5/tmp/druid/indexlog # Run in remote mode druid.indexer.runner. type =remote druid.indexer.runner.minWorkerVersion=0 # Store all task state in the metadata storage druid.indexer.storage. type =metadata |
3.4 MiddleManager Node
在运行MiddleManager Node节点上:
cd $DRUID_HOME/config/middleManager
vi runtime.properties
1
2
3
4
5
6
7
8
9
10
|
druid.host=node2 druid.port=8091 druid.service=druid /middlemanager druid.indexer.logs. type =hdfs druid.indexer.logs.directory=hdfs: //cdh5/tmp/druid/indexlog # Resources for peons druid.indexer.runner.javaOpts=-server -Xmx2g -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps druid.indexer.task.baseTaskDir= /tmp/persistent/task/ |
3.5 Coordinator Node
在运行Coordinator Node节点上:
cd $DRUID_HOME/config/coordinator
vi runtime.properties
1
2
3
4
5
|
druid.host=node1 druid.port=8081 druid.service=coordinator druid.coordinator.startDelay=PT5M |
3.6 Historical Node
在运行Historical Node节点上:
cd $DRUID_HOME/config/historical
vi runtime.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
druid.host=node1 druid.port=8082 druid.service=druid /historical druid.historical.cache.useCache= true druid.historical.cache.populateCache= true druid.processing.buffer.sizeBytes=1073741824 druid.processing.numThreads=9 druid.server.http.numThreads=9 druid.server.maxSize=300000000000 druid.segmentCache.locations=[{ "path" : " /tmp/druid/indexCache" , "maxSize" : 300000000000}] druid.monitoring.monitors=[ "io.druid.server.metrics.HistoricalMetricsMonitor" , "com.metamx.metrics.JvmMonitor" ] |
3.7 Broker Node
在运行Broker Node节点上:
cd $DRUID_HOME/config/broker
vi runtime.properties
1
2
3
4
5
6
7
8
9
10
11
|
druid.host=node2 druid.port=8092 druid.service=druid /broker druid.broker.http.numConnections=20 druid.broker.http.readTimeout=PT5M druid.processing.buffer.sizeBytes=2147483647 druid.processing.numThreads=11 druid.server.http.numThreads=20 |
3.8 Real-time Node
在运行Real-time Node节点上:
cd $DRUID_HOME/config/realtime
vi runtime.properties
1
2
3
4
5
6
7
8
9
10
11
|
druid.host=node2 druid.port=8093 druid.service=druid /realtime druid.processing.buffer.sizeBytes=1073741824 druid.processing.numThreads=5 # Override emitter to print logs about events ingested, rejected, etc druid.emitter=logging druid.monitoring.monitors=[ "io.druid.segment.realtime.RealtimeMetricsMonitor" , "com.metamx.metrics.JvmMonitor" ] |
四、Druid集群启动
首次启动时候,可以遵循下面的启动顺序。
4.1 Broker Node
cd $DRUID_HOME/
cp run_druid_server.sh run_broker.sh
vi run_broker.sh
替换以下内容:
1
2
3
4
5
6
7
|
SERVER_TYPE=broker # start process JAVA_ARGS= "${JAVA_ARGS} -Xmx10g -Xms5g -XX:NewSize=2g -XX:MaxNewSize=2g -XX:MaxDirectMemorySize=24g -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" JAVA_ARGS= "${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8" JAVA_ARGS= "${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid" JAVA_ARGS= "${JAVA_ARGS} -Dcom.sun.management.jmxremote.port=17071 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Ddruid.extensions.localRepository=${MAVEN_DIR}" |
执行./run_broker.sh启动Broker Node:
4.2 Historical Node
cd $DRUID_HOME/
cp run_druid_server.sh run_historical.sh
vi run_historical.sh
替换以下内容:
1
2
3
4
5
6
7
|
SERVER_TYPE=historical # start process JAVA_ARGS= "${JAVA_ARGS} -Xmx10g -Xms10g -XX:NewSize=2g -XX:MaxNewSize=2g -XX:MaxDirectMemorySize=16g -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" JAVA_ARGS= "${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8" JAVA_ARGS= "${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid" JAVA_ARGS= "${JAVA_ARGS} -Ddruid.extensions.localRepository=${MAVEN_DIR}" |
执行命令./run_historical.sh启动Historical Node:
4.3 Coordinator Node
cd $DRUID_HOME/
cp run_druid_server.sh run_coordinator.sh
vi run_coordinator.sh
替换以下内容:
1
2
3
4
5
6
7
|
SERVER_TYPE=coordinator # start process JAVA_ARGS= "${JAVA_ARGS} -Xmx10g -Xms10g -XX:NewSize=512m -XX:MaxNewSize=512m -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" JAVA_ARGS= "${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8" JAVA_ARGS= "${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid" JAVA_ARGS= "${JAVA_ARGS} -Ddruid.extensions.localRepository=${MAVEN_DIR}" |
执行命令./run_coordinator.sh启动Coordinator Node.
4.4 Middle Manager
cd $DRUID_HOME/
cp run_druid_server.sh run_middleManager.sh
vi run_middleManager.sh
替换以下内容:
1
2
3
4
5
|
SERVER_TYPE=middleManager # start process JAVA_ARGS= "${JAVA_ARGS} -Xmx64m -Xms64m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8" JAVA_ARGS="${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir= /tmp/druid -Ddruid.extensions.localR epository=${MAVEN_DIR}" |
执行命令./run_middleManager.sh启动MiddleManager Node。
4.5 Overlord Node
cd $DRUID_HOME/
cp run_druid_server.sh run_overlord.sh
vi run_overlord.sh
替换以下内容:
1
2
3
4
5
6
|
SERVER_TYPE=overlord # start process JAVA_ARGS= "${JAVA_ARGS} -Xmx4g -Xms4g -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" JAVA_ARGS= "${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8" JAVA_ARGS= "${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid" JAVA_ARGS= "${JAVA_ARGS} -Ddruid.extensions.localRepository=${MAVEN_DIR}" |
执行命令./run_overlord.sh启动Overlord Node:
4.6 Real-time Node
cd $DRUID_HOME/
cp run_druid_server.sh run_realtime.sh
vi run_realtime.sh
替换以下内容:
1
2
3
4
5
6
7
8
9
10
11
|
SERVER_TYPE=realtime # start process JAVA_ARGS="${JAVA_ARGS} -Xmx13g -Xms13g -XX:NewSize=2g -XX:MaxNewSize=2g -XX:MaxDirectMemorySize=9g -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails - XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError" JAVA_ARGS= "${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8" JAVA_ARGS= "${JAVA_ARGS} -Ddruid.realtime.specFile=/home/liuxiaowen/druid-0.8.1/examples/wikipedia/wikipedia_realtime.spec" JAVA_ARGS= "${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid" JAVA_ARGS="${JAVA_ARGS} -Dcom.sun.management.jmxremote.port=17072 -Dcom.sun.management.jmxremote.authenticate= false -Dcom.sun.management.jmxremot e.ssl= false " JAVA_ARGS= "${JAVA_ARGS} -Ddruid.extensions.localRepository=${MAVEN_DIR}" |
##特别需要注意参数:
-Ddruid.realtime.specFile=/home/liuxiaowen/druid-0.8.1/examples/wikipedia/wikipedia_realtime.spec
启动RealTime Node需要指定一个realtime数据源的配置文件,本文中使用example提供的wikipedia_realtime.spec,启动后,该数据源从irc.wikimedia.org获取实时数据。
关于RealTime Node的配置,后续文章将会详细介绍。
执行命令./run_realtime.sh启动RealTime Node。
五、Druid查询
第四部分中启动RealTime Node时候使用了例子中自带的配置文件wikipedia_realtime.spec,启动后,该RealTime Node会从irc.wikimedia.org获取实时数据,本章将以该数据源为例,学习几种最常见的查询。
5.1 select查询
首先编辑查询配置文件select_query.json
1
2
3
4
5
6
7
8
9
10
11
|
{ "queryType" : "select" , "dataSource" : "wikipedia" , "dimensions" :[], "metrics" :[], "granularity" : "all" , "intervals" : [ "2015-11-01/2015-11-20" ], "pagingSpec" :{ "pagingIdentifiers" : {}, "threshold" :10} } |
该配置文件的含义是从数据源”wikipedia”进行select查询所有列,时间区间为2015-11-01/2015-11-20,每10条记录一个分页。
执行命令查询:
curl -X POST ‘http://node2:8093/druid/v2/?pretty’ -H ‘content-type: application/json’ -d @select_query.json
瞬间返回结果:
5.2 基于时间序列的查询Timeseries query
编辑查询配置文件timeseries.json
1
2
3
4
5
6
7
8
9
10
|
{ "queryType" : "timeseries" , "dataSource" : "wikipedia" , "intervals" : [ "2010-01-01/2020-01-01" ], "granularity" : "minute" , "aggregations" : [ { "type" : "longSum" , "fieldName" : "count" , "name" : "edit_count" }, { "type" : "doubleSum" , "fieldName" : "added" , "name" : "chars_added" } ] } |
该配置文件的含义是:从数据源” wikipedia”中进行时间序列查询,区间为2010-01-01/2020-01-01,按分钟汇总结果,汇总字段为count和added;
执行查询命令:
curl -X POST ‘http://node2:8093/druid/v2/?pretty’ -H ‘content-type: application/json’ -d @timeseries.json
同样瞬间返回结果:
5.3 TopN查询
编辑查询文件topn.json
1
2
3
4
5
6
7
8
9
10
11
12
13
|
{ "queryType" : "topN" , "dataSource" : "wikipedia" , "granularity" : "all" , "dimension" : "page" , "metric" : "edit_count" , "threshold" : 10, "aggregations" : [ { "type" : "longSum" , "fieldName" : "count" , "name" : "edit_count" } ], "filter" : { "type" : "selector" , "dimension" : "country" , "value" : "United States" }, "intervals" : [ "2012-10-01T00:00/2020-01-01T00" ] } |
该文件含义是:从数据源” wikipedia”进行TopN查询,其中N=10,维度为page,指标为edit_count,也就是,在page维度上将edit_count汇总后取Top 10.
执行查询命令:
curl -X POST ‘http://node2:8093/druid/v2/?pretty’ -H ‘content-type: application/json’ -d @topn.json
结果为:
六、后记
Druid目前已经有很多公司用于实时计算和实时OLAP,而且效果很好。虽然它的配置和查询都比较复杂和繁琐,但如果是真正基于海量数据的实时OLAP,它的威力还是很强大的。我将持续学习和分享Druid的相关技术,验证它在海量数据实时OLAP上的效果,敬请关注我的博客。
参考文章:
http://druid.io
http://www.csdn.net/article/2014-10-30/2822381/2
参考 http://www.infoq.com/cn/news/2015/04/druid-data/
http://druid.io/
http://www.open-open.com/lib/view/open1447852962978.html