2019独角兽企业重金招聘Python工程师标准>>> hot3.png

druid 可以运行在单机环境下,也可以运行在集群环境下。简单起见,我们先从单机环境着手学习。

环境要求

  • java7 或者更高版本
  • linux, macOS或者其他unix系统(不支持windows系统)
  • 8G内存
  • 2核CPU

开始

下载并安装druid

curl -O http://static.druid.io/artifacts/releases/druid-0.9.1.1-bin.tar.gz
tar -xzf druid-0.9.1.1-bin.tar.gz
cd druid-0.9.1.1

文件夹中有如下几个目录:

  • LICENSE      许可证
  • bin/            可执行脚本
  • conf/*      在集群环境下的配置文件
  • conf-quickstart/*        quickstart的配置文件
  • extensions/*       druid所有的扩展文件
  • hadoop-dependencies/* druid的hadoop扩展文件
  • lib/*       druid 依赖的核心软件包
  • quickstart/*     quickstart的数据文件

ZK安装

druid的分布式协同需要依赖zookeeper,所以我们需要安装zk

curl http://www.gtlib.gatech.edu/pub/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz -o zookeeper-3.4.6.tar.gz
tar -xzf zookeeper-3.4.6.tar.gz
cd zookeeper-3.4.6
cp conf/zoo_sample.cfg conf/zoo.cfg
./bin/zkServer.sh start

启动druid服务

启动zk后,我们就可以启动druid的服务了。 首先进入到druid0.9.1.1的根目录,执行

bin/init

druid会自动创建一个var目录, 内含俩个目录,一个是druid, 用于存放本地环境下hadoop的临时文件,索引日志,segments文件及缓存 和 任务的临时文件。 另一个是tmp用于存放其他临时文件。

接下来就可以在控制台启动druid服务了。 在单机情况下,我们可以在一台机器上启动所有的druid服务进程,分终端进行。 在分布式生产集群的环境下, druid的服务进程同样也可以在一起启动。

ava `cat conf-quickstart/druid/historical/jvm.config | xargs` -cp "conf-quickstart/druid/_common:conf-quickstart/druid/historical:lib/*" io.druid.cli.Main server historical
java `cat conf-quickstart/druid/broker/jvm.config | xargs` -cp "conf-quickstart/druid/_common:conf-quickstart/druid/broker:lib/*" io.druid.cli.Main server broker
java `cat conf-quickstart/druid/coordinator/jvm.config | xargs` -cp "conf-quickstart/druid/_common:conf-quickstart/druid/coordinator:lib/*" io.druid.cli.Main server coordinator
java `cat conf-quickstart/druid/overlord/jvm.config | xargs` -cp "conf-quickstart/druid/_common:conf-quickstart/druid/overlord:lib/*" io.druid.cli.Main server overlord
java `cat conf-quickstart/druid/middleManager/jvm.config | xargs` -cp "conf-quickstart/druid/_common:conf-quickstart/druid/middleManager:lib/*" io.druid.cli.Main server middleManager

druid服务进程启动后,可以在控制台看到相应的日志信息。

我前一篇文章中提到过druid有几种节点, 上面的启动命令,对应的就是druid的各种节点

  • historical 为Historical Nodes节点进程。主要用于查询时从deepstroage 加载segments。
  • broker 为Broker Nodes 节点进程。 主要为接收客户端任务,任务分发,负载,以及结果合并等。
  • coordinator 为 Coordinator Nodes 节点进程。   主要负责segments的管理和分发。
  • overlord 为 Overload Nodes 节点进程。 middleManager 为 MiddleManager Nodes 节点进程。   overload 和 middleManager是创建索引的主要服务进程, 具体会在接下来的章节中详细介绍

如果想关闭服务,直接在控制台ctrl + c 就可以了。 如果你彻底清理掉之前的内容,重新开始,需要在关闭服务后,删除目录下的var 文件, 重新执行init脚本。

批量加载数据

服务启动之后,我们就可以将数据load到druid中进行查询了。在druid0.9.1.1的安装包中,自带了2015-09-12的wikiticker数据。我们可以用此数据来作为我们druid的学习实例。

首先我们看一下wikipedia的数据, 除了时间之外,包含的维度(dimensions)有:

  • channel
  • cityName
  • comment
  • countryIsoCode
  • countryName
  • isAnonymous
  • isMinor
  • isNew
  • isRobot
  • isUnpatrolled
  • metroCode
  • namespace
  • page
  • regionIsoCode
  • regionName
  • user

度量(measures) 我们可以设置如下:

  • count
  • added
  • deleted
  • delta
  • user_unique

确定了度量,维度之后,接下来我们就可以导入数据了。首先,我们需要向druid提交一个注入数据的任务,并将目录指向我们需要加载的数据文件wikiticker-2015-09-12-sampled.json

Druid是通过post请求的方式提交任务的, 上面我们也讲过,overload node 用于数据的加载,所以需要在overload节点上执行post请求, 目前单机环境,无需考虑这个。

在druid根目录下执行

curl -X 'POST' -H 'Content-Type:application/json' -d @quickstart/wikiticker-index.json localhost:8090/druid/indexer/v1/task

其中wikiticker-index.json 文件指明了数据文件的位置,类型,数据的schema(如度量,维度,时间,在druid中的数据源名称等)等信息, 之后我也会详细的介绍,大家也可以从官网上查

当控制台打印如下信息后,说明任务提交成功

{"task":"index_hadoop_wikipedia_2013-10-09T21:30:32.802Z"}

可以在overload控制台 http://localhost:8090/console.html来查看任务的运行情况, 当状态为“SUCCESS”时, 说明任务执行成功。

当数据注入成功后,historical node会加载这些已经注入到集群的数据,方便查询,这大概需要花费1-2分钟的时间。 你可以在coordinator 控制台http://localhost:8081/#/来查看数据的加载进度

当名为wikiticker的datasource 有个蓝色的小圈,并显示fully available时,说明数据已经可以了。可以执行查询操作了。

加载流数据

为了实现流数据的加载,我们可以通过一个简单http api来向druid推送数据,而tranquility就是一个不错的数据生产组件

下载并安装tranquility

curl -O http://static.druid.io/tranquility/releases/tranquility-distribution-0.8.0.tgz
tar -xzf tranquility-distribution-0.8.0.tgz
cd tranquility-distribution-0.8.0

druid目录中自带了一个配置文件 conf-quickstart/tranquility/server.json  启动tranquility服务进程, 就可以向druid的 metrics datasource 推送实时数据。

bin/tranquility server -configFile <path_to_druid_distro>/conf-quickstart/tranquility/server.json

这一部分向大家介绍了如何通过tranquility服务来加载流数据, 其实druid还可以支持多种广泛使用的流式框架, 包括Kafka, Storm, Samza, and Spark Streaming等

流数据加载中,维度是可变的,所以在schema定义的时候无需特别指明维度,而是将数据中任何一个字段都当做维度。而该datasource的度量则包含

  • count
  • value_sum (derived from value in the input)
  • value_min (derived from value in the input)
  • value_max (derived from value in the input)

我们采用了一个脚本,来随机生成度量数据,导入到这个datasource中

bin/generate-example-metrics | curl -XPOST -H'Content-Type: application/json' --data-binary @- http://localhost:8200/v1/post/metrics

执行完成后会返回

{"result":{"received":25,"sent":25}}

这表明http server 从你这里接收到了25条数据,并发送了这25条数据到druid。 在你第一次运行的时候,这个过程需要花一些时间,一段数据加载成功后,就可以查询了。

Query data

接下来就是数据查询了,我们可以采用如下几种方式来查询数据

Direct Druid queries      直接通过druid查询

druid提供了基于json的富文本查询方式。在提供的示例中,quickstart/wikiticker-top-pages.json  是一个topN的查询实例。

curl -L -H'Content-Type: application/json' -XPOST --data-binary @quickstart/wikiticker-top-pages.json http://localhost:8082/druid/v2/?pretty

Visualizing data 数据可视化

druid是面向用户分析应用的完美方案, 有很多开源的应用支持druid的数据可视化, 如pivot, caravel 和 metabase等

SQL and other query libraries 查询组件

有许多查询组件供我们使用,如sql引擎, 还有其他各种语言提供的组件,如python和ruby。 具体如下:

python:     druid-io/pydruid

R:                   druid-io/RDruid

JavaScript:  implydata/plywood

7eggs/node-druid-query

Clojure:      y42/clj-druid

Ruby:          ruby-druid/ruby-druid

redBorder/druid_config

SQL:           Apache Calcite

implydata/plyql

PHP:     pixelfederation/druid-php

本篇主要是讲了单机环境下druid的搭建以及使用, 并使用druid安装包自带的例子给大家做了展示。 下一篇我讲介绍在集群环境下Druid如何安装及使用。

Druid是一个为在大数据集之上做实时统计分析而设计的开源数据存储。这个系统集合了一个面向列存储的层,一个分布式、shared-nothing的架构,和一个高级的索引结构,来达成在秒级以内对十亿行级别的表进行任意的探索分析。

特性
为分析而设计——Druid是为OLAP工作流的探索性分析而构建。它支持各种filter、aggregator和查询类型,并为添加新功能提供了一个框架。用户已经利用Druid的基础设施开发了高级K查询和直方图功能。
交互式查询——Druid的低延迟数据摄取架构允许事件在它们创建后毫秒内查询,因为Druid的查询延时通过只读取和扫描优必要的元素被优化。Aggregate和 filter没有坐等结果。
高可用性——Druid是用来支持需要一直在线的SaaS的实现。你的数据在系统更新时依然可用、可查询。规模的扩大和缩小不会造成数据丢失。
可伸缩——现有的Druid部署每天处理数十亿事件和TB级数据。Druid被设计成PB级别。

使用场景:
第一:适用于清洗好的记录实时录入,但不需要更新操作
第二:支持宽表,不用join的方式(换句话说就是一张单表)
第三:可以总结出基础的统计指标,可以用一个字段表示
第四:对时区和时间维度(year、month、week、day、hour等)要求高的(甚至到分钟级别)
第五:实时性很重要
第六:对数据质量的敏感度不高
第七:用于定位效果分析和策略决策参考

2.安装
安装环境
本文使用Imply套件安装,该套件提供了稳定的druid和web访问接口,在安装之前需要先安装node,

node下载地址:https://nodejs.org/en/download/

imply下载地址:http://imply.io/download

node安装完成后使用下列命令检查:

node --version
安装过程
参考wiki:https://imply.io/docs/latest/quickstart

1.解压Imply

tar -xzf imply-2.0.0.tar

2.启动服务
nohup bin/supervise -c conf/supervise/quickstart.conf > test.log &
log日志记录:
[Sun Apr  2 23:32:09 2017] Running command[zk], logging to[/Users/ball/Downloads/imply-2.0.0/var/sv/zk.log]: bin/run-zk conf-quickstart
[Sun Apr  2 23:32:09 2017] Running command[coordinator], logging to[/Users/ball/Downloads/imply-2.0.0/var/sv/coordinator.log]: bin/run-druid coordinator conf-quickstart
[Sun Apr  2 23:32:09 2017] Running command[broker], logging to[/Users/ball/Downloads/imply-2.0.0/var/sv/broker.log]: bin/run-druid broker conf-quickstart
[Sun Apr  2 23:32:09 2017] Running command[historical], logging to[/Users/ball/Downloads/imply-2.0.0/var/sv/historical.log]: bin/run-druid historical conf-quickstart
[Sun Apr  2 23:32:09 2017] Running command[overlord], logging to[/Users/ball/Downloads/imply-2.0.0/var/sv/overlord.log]: bin/run-druid overlord conf-quickstart
[Sun Apr  2 23:32:09 2017] Running command[middleManager], logging to[/Users/ball/Downloads/imply-2.0.0/var/sv/middleManager.log]: bin/run-druid middleManager conf-quickstart
[Sun Apr  2 23:32:09 2017] Running command[pivot], logging to[/Users/ball/Downloads/imply-2.0.0/var/sv/pivot.log]: bin/run-pivot-quickstart conf-quickstart

服务停止与启动命令

./server --down 关闭
./server --restart ${服务名称} 重启
3.数据导入
quickstart/wikiticker-2016-06-27-sampled.json 维奇百科网站日志数据

数据格式:

{"isRobot":true,"channel":"#pl.wikipedia","timestamp":"2016-06-27T00:00:58.599Z","flags":"NB","isUnpatrolled":false,"page":"Kategoria:Dyskusje nad usunięciem artykułu zakończone bez konsensusu − lipiec 2016","diffUrl":"https://pl.wikipedia.org/w/index.php?oldid=46204477&rcid=68522573","added":270,"comment":"utworzenie kategorii","commentLength":20,"isNew":true,"isMinor":false,"delta":270,"isAnonymous":false,"user":"Beau.bot","deltaBucket":200.0,"deleted":0,"namespace":"Kategoria"}

quickstart/wikiticker-index.json定义了任务的数据源,时间信息,维度信息,指标信息,内容如下:

{
  "type" : "index_hadoop",
  "spec" : {
    "ioConfig" : {
      "type" : "hadoop",
      "inputSpec" : {
        "type" : "static",
        "paths" : "quickstart/wikiticker-2016-06-27-sampled.json"
      }
    },
    "dataSchema" : {
      "dataSource" : "wikiticker",
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "day",
        "queryGranularity" : "none",
        "intervals" : ["2016-06-27/2016-06-28"]
      },
      "parser" : {
        "type" : "hadoopyString",
        "parseSpec" : {
          "format" : "json",
          "dimensionsSpec" : {
            "dimensions" : [
              "channel",
              "cityName",
              "comment",
              "countryIsoCode",
              "countryName",
              "isAnonymous",
              "isMinor",
              "isNew",
              "isRobot",
              "isUnpatrolled",
              "metroCode",
              "namespace",
              "page",
              "regionIsoCode",
              "regionName",
              "user",
              "commentLength",
              "deltaBucket",
              "flags",
              "diffUrl"
            ]
          },
          "timestampSpec" : {
            "format" : "auto",
            "column" : "timestamp"
          }
        }
      },
      "metricsSpec" : [
        {
          "name" : "count",
          "type" : "count"
        },
        {
          "name" : "added",
          "type" : "longSum",
          "fieldName" : "added"
        },
        {
          "name" : "deleted",
          "type" : "longSum",
          "fieldName" : "deleted"
        },
        {
          "name" : "delta",
          "type" : "longSum",
          "fieldName" : "delta"
        },
        {
          "name" : "user_unique",
          "type" : "hyperUnique",
          "fieldName" : "user"
        }
      ]
    },
    "tuningConfig" : {
      "type" : "hadoop",
      "partitionsSpec" : {
        "type" : "hashed",
        "targetPartitionSize" : 5000000
      },
      "jobProperties" : {}
    }
  }
}

使用离线导入:

./bin/post-index-task --file quickstart/wikiticker-index.json
Task started: index_hadoop_wikiticker_2017-04-02T15:44:23.464Z
Task log:     http://localhost:8090/druid/indexer/v1/task/index_hadoop_wikiticker_2017-04-02T15:44:23.464Z/log
Task status:  http://localhost:8090/druid/indexer/v1/task/index_hadoop_wikiticker_2017-04-02T15:44:23.464Z/status
Task index_hadoop_wikiticker_2017-04-02T15:44:23.464Z still running...
Task index_hadoop_wikiticker_2017-04-02T15:44:23.464Z still running...
Task index_hadoop_wikiticker_2017-04-02T15:44:23.464Z still running...
Task index_hadoop_wikiticker_2017-04-02T15:44:23.464Z still running...
Task index_hadoop_wikiticker_2017-04-02T15:44:23.464Z still running...
Task index_hadoop_wikiticker_2017-04-02T15:44:23.464Z still running...
Task index_hadoop_wikiticker_2017-04-02T15:44:23.464Z still running...
Task index_hadoop_wikiticker_2017-04-02T15:44:23.464Z still running...
Task finished with status: SUCCESS

4.访问web地址

http://localhost:9095/,

查看刚刚导入数据

5.使用查询语句查询

/quickstart/wikiticker-top-pages.json

{
  "queryType" : "topN",
  "dataSource" : "wikiticker",
  "intervals" : ["2016-06-27/2016-06-28"],
  "granularity" : "all",
  "dimension" : "page",
  "metric" : "edits",
  "threshold" : 25,
  "aggregations" : [
    {
      "type" : "longSum",
      "name" : "edits",
      "fieldName" : "count"
    }
  ]
}

查询语句:
curl -L -H'Content-Type: application/json' -XPOST --data-binary @quickstart/wikiticker-top-pages.json http://localhost:8082/druid/v2/
结果数据:
[{"timestamp":"2016-06-27T00:00:11.080Z","result":[{"page":"Copa América Centenario","edits":29},{"page":"User:Cyde/List of candidates for speedy deletion/Subpage","edits":16},{"page":"Wikipedia:Administrators' noticeboard/Incidents","edits":16},{"page":"2016 Wimbledon Championships – Men's Singles","edits":15},{"page":"Wikipedia:Administrator intervention against vandalism","edits":15},{"page":"Wikipedia:Vandalismusmeldung","edits":15},{"page":"The Winds of Winter (Game of Thrones)","edits":12},{"page":"ولاية الجزائر","edits":12},{"page":"Copa América","edits":10},{"page":"Lionel Messi","edits":10},{"page":"Wikipedia:Requests for page protection","edits":10},{"page":"Wikipedia:Usernames for administrator attention","edits":10},{"page":"Википедия:Опросы/Унификация шаблонов «Не переведено»","edits":10},{"page":"Bailando 2015","edits":9},{"page":"Bud Spencer","edits":9},{"page":"User:Osterb/sandbox","edits":9},{"page":"Wikipédia:Le Bistro/27 juin 2016","edits":9},{"page":"Ветра зимы (Игра престолов)","edits":9},{"page":"Användare:Lsjbot/Namnkonflikter-PRIVAT","edits":8},{"page":"Eurocopa 2016","edits":8},{"page":"Mistrzostwa Europy w Piłce Nożnej 2016","edits":8},{"page":"Usuario:Carmen González C/Science and technology in China","edits":8},{"page":"Wikipedia:Administrators' noticeboard","edits":8},{"page":"Wikipédia:Demande de suppression immédiate","edits":8},{"page":"World Deaf Championships","edits":8}]}]

---------------------

1、环境和架构 
2、druid的安装 
3、druid的配置 
4、overlord json 
5、overlord csv

1、druid 环境和架构 
环境信息 
Centos6.5 
32GB 8C *5 
Zookeeper 3.4.5 
Druid 0.9.2 
Hadoop-2.6.5 
Jdk1.7 
架构 
10.20.23.42 Broker Real-time datanode NodeManager QuorumPeerMain 
10.20.23.29 middleManager datanode NodeManager 
10.20.23.38 overlord datanode NodeManager QuorumPeerMain 
10.20.23.82 coordinator namenode ResourceManager 
10.20.23.41 historical datanode NodeManager QuorumPeerMain

2、druid安装 
Hadoop的安装就不介绍了,之前一直用Hadoop2.3.0安装但是没有成功,所以换成了2.6.5 
和单机一样的流程

1、 先解压

2、 拷贝文件 
拷贝Hadoop的配置文件到 ${DRUID_HOME}/conf/druid/_common目录下面,拷贝4个core-site.xml hdfs-site.xml mapred-site.xml yarn-site.xml

3、 创建目录,拷贝jar包 
在${DRUID_HOME} /hadoop-dependencies/hadoop-client目录下面创建一个2.6.5(建议选择Hadoop的版本号)的文件夹,将Hadoop的jar包拷贝到这个目录下面

4、 修改配置文件

注意:配置文件特别繁琐,只要有一个地方配置错误任务就不能执行

#配置元数据信息,修改成druid-hdfs-storage和mysql-metadata-storage
druid.extensions.loadList=["druid-hdfs-storage","mysql-metadata-storage"]
#配置zookeeper的信息
druid.zk.service.host=10.20.23.82:2181
druid.zk.paths.base=/druid/cluster
#配置元数据MySQL的信息
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://10.20.23.42:3306/druid
druid.metadata.storage.connector.user=root
druid.metadata.storage.connector.password=123456

# 配置存储的信息
# Deep storage
#
# For HDFS (make sure to include the HDFS extension and that your Hadoop config files in the cp):
druid.storage.type=hdfs
druid.storage.storageDirectory=/druid/segments

#配置日志存储的信息
# Indexing service logs
#
# For HDFS (make sure to include the HDFS extension and that your Hadoop config files in the cp):
druid.indexer.logs.type=hdfs
druid.indexer.logs.directory=/druid/indexing-logs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
broker的配置

broker的配置,主要配置根据实际情况修改内存分配的大小。添加druid.host参数和修改Duser.timezone的值,因为druid默认的时区是Z。所以我们需要加上+0800

[hadoop@SZB-L0038784 broker]$ cat jvm.config 
-server
-Xms1g
-Xmx1g
-XX:MaxDirectMemorySize=4096m
-Duser.timezone=UTC+0800
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
[hadoop@SZB-L0038784 broker]$ cat runtime.properties 
druid.host=10.20.23.82
druid.service=druid/broker
druid.port=8082

# HTTP server threads
druid.broker.http.numConnections=5
druid.server.http.numThreads=25

# Processing threads and buffers
druid.processing.buffer.sizeBytes=536870912
druid.processing.numThreads=7

# Query cache
druid.broker.cache.useCache=true
druid.broker.cache.populateCache=true
druid.cache.type=local
druid.cache.sizeInBytes=2000000000
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
coordinator的配置

coordinator的配置,主要配置根据实际情况修改内存分配的大小。添加druid.host参数和修改Duser.timezone的值,因为druid默认的时区是Z。所以我们需要加上+0800

[hadoop@SZB-L0038784 coordinator]$ cat jvm.config 
-server
-Xms1g
-Xmx1g
-Duser.timezone=UTC+0800
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
-Dderby.stream.error.file=var/druid/derby.log
[hadoop@SZB-L0038784 coordinator]$ cat runtime.properties 
druid.host=10.20.23.82
druid.service=druid/coordinator
druid.port=18091

druid.coordinator.startDelay=PT30S
druid.coordinator.period=PT30S
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
historical 的配置

historical的配置,主要配置根据实际情况修改内存分配的大小。添加druid.host参数和修改Duser.timezone的值,因为druid默认的时区是Z。所以我们需要加上+0800

[hadoop@SZB-L0038784 historical]$ cat jvm.config 
-server
-Xms1g
-Xmx1g
-XX:MaxDirectMemorySize=4960m
-Duser.timezone=UTC+0800
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
[hadoop@SZB-L0038784 historical]$ cat runtime.properties 
druid.host=10.20.23.82
druid.service=druid/historical
druid.port=8083

# HTTP server threads
druid.server.http.numThreads=25

# Processing threads and buffers
druid.processing.buffer.sizeBytes=536870912
druid.processing.numThreads=7

# Segment storage
druid.segmentCache.locations=[{"path":"var/druid/segment-cache","maxSize"\:130000000000}]
druid.server.maxSize=130000000000
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
middleManager 的配置

middleManager的配置,主要配置根据实际情况修改内存分配的大小。添加druid.host参数和修改Duser.timezone的值,因为druid默认的时区是Z。所以我们需要加上+0800 
其中hadoop-client:2.6.5 这个2.6.5是和第3点中创建的路径名字是一样的,

[hadoop@SZB-L0038784 middleManager]$ cat jvm.config 
-server
-Xms64m
-Xmx64m
-Duser.timezone=UTC+0800
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
[hadoop@SZB-L0038784 middleManager]$ cat runtime.properties 
druid.service=druid/middleManager
druid.port=8091

# Number of tasks per middleManager
druid.worker.capacity=3

# Task launch parameters
druid.indexer.runner.javaOpts=-server -Xmx2g -Duser.timezone=UTC+0800 -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
druid.indexer.task.baseTaskDir=var/druid/task

# HTTP server threads
druid.server.http.numThreads=25

# Processing threads and buffers
druid.processing.buffer.sizeBytes=536870912
druid.processing.numThreads=2

# Hadoop indexing
druid.host=10.20.23.82
druid.indexer.task.hadoopWorkingPath=/druid/hadoop-tmp
druid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.6.5"]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
overlord 的配置

overlord的配置,主要配置根据实际情况修改内存分配的大小。添加druid.host参数和修改Duser.timezone的值,因为druid默认的时区是Z。所以我们需要加上+0800

[hadoop@SZB-L0038784 overlord]$ cat jvm.config 
-server
-Xms1g
-Xmx1g
-Duser.timezone=UTC+0800
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
[hadoop@SZB-L0038784 overlord]$ cat runtime.properties 
druid.host=10.20.23.82
druid.service=druid/overlord
druid.port=8090

druid.indexer.queue.startDelay=PT30S

druid.indexer.runner.type=remote
druid.indexer.storage.type=metadata
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
5、 在 通过scp拷贝到其他的机器上面去 
6、 在对应机器启动各个进程

java `cat conf/druid/historical/jvm.config | xargs` -cp "conf/druid/_common:conf/druid/historical:lib/*" io.druid.cli.Main server historical

java `cat conf/druid/broker/jvm.config | xargs` -cp "conf/druid/_common:conf/druid/broker:lib/*" io.druid.cli.Main server broker

java `cat conf/druid/coordinator/jvm.config | xargs` -cp "conf/druid/_common:conf/druid/coordinator:lib/*" io.druid.cli.Main server coordinator

java `cat conf/druid/overlord/jvm.config | xargs` -cp "conf/druid/_common:conf/druid/overlord:lib/*" io.druid.cli.Main server overlord

java `cat conf/druid/middleManager/jvm.config | xargs` -cp "conf/druid/_common:conf/druid/middleManager:lib/*" io.druid.cli.Main server middleManager
1
2
3
4
5
6
7
8
9
可以通过下面2个URL查看到对应的页面 
http://10.20.23.82:18091/#/

http://10.20.23.82:8090/console.html

---------------------

http://lxw1234.com/archives/2015/11/554.htm 海量数据实时OLAP分析系统-Druid.io安装配置和体验 
http://druid.io/docs/0.9.2/design/design.html Druid官网搭建

Druid.io 部署&使用文档
1.集群模式下部署
Prerequisites : Java 7 or higher & Zookeeper & mysql

下载Druid.io :
curl -O http://static.druid.io/artifacts/releases/druid-0.9.1.1-bin.tar.gz
tar -xzf druid-0.9.1.1-bin.tar.gz
cd druid-0.9.1.1
1
2
3
文件夹目录结构 :

LICENSE - the license files.
bin/ - scripts related to the single-machine quickstart.
conf/* - template configurations for a clustered setup.
conf-quickstart/* - configurations for the single-machine quickstart.
extensions/* - all Druid extensions.
hadoop-dependencies/* - Druid Hadoop dependencies.
lib/* - all included software packages for core Druid.
quickstart/* - files related to the single-machine quickstart.
所有配置文件均在 conf/* 目录下.

配饰HDFS为Druid.io的deep storage & 配置zk & 配置mysql
修改 conf/druid/_common/common.runtime.properties 文件.

#
# Extensions
#

# This is not the full list of Druid extensions, but common ones that people often use. You may need to change this list
# based on your particular setup.
#使用 "mysql-metadata-storage" 作为metadata的存储
#使用 "druid-hdfs-storage" 作为 deep storage
#使用 "druid-parquet-extensions" 向druid中插入parquet数据
druid.extensions.loadList=["druid-kafka-eight", "druid-histogram", "druid-datasketches",  "mysql-metadata-storage", "druid-hdfs-storage", "druid-avro-extensions", "druid-parquet-extensions"]

# If you have a different version of Hadoop, place your Hadoop client jar files in your hadoop-dependencies directory
# and uncomment the line below to point to your directory.
#druid.extensions.hadoopDependenciesDir=/my/dir/hadoop-dependencies

#
# Logging
#

# Log all runtime properties on startup. Disable to avoid logging properties on startup:
druid.startup.logging.logProperties=true

#
# Zookeeper
#

druid.zk.service.host=tagtic-slave01:2181,tagtic-slave02:2181,tagtic-slave03:2181
druid.zk.paths.base=/druid

#
# Metadata storage
#

# For MySQL:
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://tagtic-master:3306/druid
druid.metadata.storage.connector.user=root
druid.metadata.storage.connector.password=123456

#
# Deep storage
#

# For HDFS (make sure to include the HDFS extension and that your Hadoop config files in the cp):
druid.storage.type=hdfs
druid.storage.storageDirectory=/druid/segments

#
# Indexing service logs
#

# For HDFS (make sure to include the HDFS extension and that your Hadoop config files in the cp):
druid.indexer.logs.type=hdfs
druid.indexer.logs.directory=/druid/indexing-logs

#
# Service discovery
#

druid.selectors.indexing.serviceName=druid/overlord
druid.selectors.coordinator.serviceName=druid/coordinator

#
# Monitoring
#

druid.monitoring.monitors=["com.metamx.metrics.JvmMonitor"]
druid.emitter=logging
druid.emitter.logging.logLevel=info
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
将 Hadoop 的配置文件(core-site.xml, hdfs-site.xml, yarn-site.xml, mapred-site.xml) cp 到 conf/druid/_common 目录下

修改 conf/druid/middleManager/runtime.properties 文件.

druid.service=druid/middleManager
druid.port=18091

# Number of tasks per middleManager
druid.worker.capacity=3

# Task launch parameters
# **CDH版本添加 -Dhadoop.mapreduce.job.classloader=true 来解决hadoop indexer导入时jar包冲突问题**
druid.indexer.runner.javaOpts=-server -Xmx2g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Dhadoop.mapreduce.job.classloader=true

druid.indexer.task.baseTaskDir=var/druid/task

# HTTP server threads
druid.server.http.numThreads=25

# Processing threads and buffers
druid.processing.buffer.sizeBytes=536870912
druid.processing.numThreads=2

# Hadoop indexing
druid.indexer.task.hadoopWorkingPath=/tmp/druid-indexing
druid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.6.0"]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Hadoop集群版本必须和Druid.io中版本同一,可以通过pull-deps下载相同hadoop-dependencies版本,e.g. : 
java -classpath "lib/*" io.druid.cli.Main tools pull-deps --defaultVersion 0.9.1.1 -c io.druid.extensions:mysql-metadata-storage:0.9.1.1 -c druid-hdfs-storage -h org.apache.hadoop:hadoop-client:2.6.0

项目中Druid.io配置端口号:

druid.service=druid/coordinator druid.port=18081
druid.service=druid/broker druid.port=18082
druid.service=druid/historical druid.port=18083
druid.service=druid/overlord druid.port=18090
druid.service=druid/middleManager druid.port=18091
2.启动Druid.io
java `cat conf/druid/coordinator/jvm.config | xargs` -cp conf/druid/_common:conf/druid/coordinator:lib/* io.druid.cli.Main server coordinator &>> logs/coordinator.log &

java `cat conf/druid/overlord/jvm.config | xargs` -cp conf/druid/_common:conf/druid/overlord:lib/* io.druid.cli.Main server overlord &>> logs/overlord.log &

java `cat conf/druid/historical/jvm.config | xargs` -cp conf/druid/_common:conf/druid/historical:lib/* io.druid.cli.Main server historical &>> logs/historical.log &

java `cat conf/druid/middleManager/jvm.config | xargs` -cp conf/druid/_common:conf/druid/middleManager:lib/* io.druid.cli.Main server middleManager &>> logs/middleManager.log &

java `cat conf/druid/broker/jvm.config | xargs` -cp conf/druid/_common:conf/druid/broker:lib/* io.druid.cli.Main server broker &>> logs/broker.log &
1
2
3
4
5
6
7
8
9
3.从HDFS导入数据到Druid.io
批量导入Batch Data Ingestion 
导入Parquet文件

Druid作业查看 Coordinator : http://tagtic-master:18090/console.html 
Druid集群查看Cluster : http://tagtic-master:18081/#/

解决传入数据时区问题 Hadoop Configuration,在conf/druid/_common/mapred-site.xml中添加

<property>
    <name>mapreduce.map.java.opts</name>
    <value>-server -Xmx1536m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps</value>
</property>
<property>
    <name>mapreduce.reduce.java.opts</name>
    <value>-server -Xmx2560m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps</value>
</property>
1
2
3
4
5
6
7
8
     
---------------------

Imply提供了一套完整的部署方式,包括依赖库,Druid,图形化的数据展示页面,SQL查询组件等。本文将基于Imply套件进行说明

单机部署
依赖
Java 8 or better
Node.js 4.5.x or better
Linux, Mac OS X, or other Unix-like OS (Windows is not supported)
At least 4GB of RAM
下载与安装
从https://imply.io/get-started 下载最新版本安装包
tar -xzf imply-2.3.9.tar.gz
cd imply-2.3.9
目录说明如下: 
- bin/ - run scripts for included software. 
- conf/ - template configurations for a clustered setup. 
- conf-quickstart/* - configurations for the single-machine quickstart. 
- dist/ - all included software. 
- quickstart/ - files related to the single-machine quickstart.

启动服务

bin/supervise -c conf/supervise/quickstart.conf
1
2
安装验证
导入测试数据
安装包中包含一些测试的数据,可以通过执行预先定义好的数据说明文件进行导入

bin/post-index-task --file quickstart/wikiticker-index.json
1
可视化控制台
overlord 控制页面:http://localhost:8090/console.html.
druid集群页面:http://localhost:8081
数据可视化页面:http://localhost:9095
数据展示与查询
数据展示:对渠道进行统计的柱状图

SQL数据查询:使用sql查询编辑次数最多的10个page

HTTP POST数据查询
命令:curl -L -H’Content-Type: application/json’ -XPOST –data-binary @quickstart/wikiticker-top-pages.json http://localhost:8082/druid/v2?pretty 
结果:

[ {
  "timestamp" : "2016-06-27T00:00:11.080Z",
  "result" : [ {
    "edits" : 29,
    "page" : "Copa América Centenario"
  }, {
    "edits" : 16,
    "page" : "User:Cyde/List of candidates for speedy deletion/Subpage"
  },
  ..........
  {
    "edits" : 8,
    "page" : "World Deaf Championships"
  } ]
} ]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
集群部署
集群配置的规划需要根据需求来定制,下面以一个开发环境机器搭建为例,描述如何搭建一个有HA特性的Druid集群.

集群部署有以下几点需要说明 
1. 为了保证HA,主节点部署两台 
2. 管理节点与查询节点可以考虑多核大内存的机器

部署规划
角色    机器    配置    集群角色
主节点    10.5.24.137    8C16G    Coordinator,Overlord
主节点    10.5.24.138    8C16G    Coordinator,Overlord
数据节点,查询节点    10.5.24.139    8C16G    Historical, MiddleManager, Tranquility,Broker,Pivot Web
数据节点,查询节点    10.5.24.140    8C16G    Historical, MiddleManager, Tranquility,(数据节点,查询节点)Broker
部署步骤
公共配置
编辑conf/druid/_common/common.runtime.properties 文件内容 
1. loadList配置:==此处需要统一在一个位置统一定义,否则会出现extension加载的问题==

druid.extensions.loadList=["mysql-metadata-storage","druid-hdfs-storage"]
1
Zookeeper
#
# Zookeeper
#

druid.zk.service.host=native-lufanfeng-2-5-24-138:2181,native-lufanfeng-3-5-24-139:2181,native-lufanfeng-4-5-24-140:2181
druid.zk.paths.base=/druid
1
2
3
4
5
6
7
MetaData:使用Mysql
# For MySQL:

druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://10.5.24.151:3306/druid
druid.metadata.storage.connector.user=root
druid.metadata.storage.connector.password=123456
1
2
3
4
5
6
Deepstorage:使用HDFS

#druid.storage.type=local
#druid.storage.storageDirectory=var/druid/segments

druid.storage.type=hdfs
druid.storage.storageDirectory=hdfs://10.5.24.137:9000/druid/segments

#druid.indexer.logs.type=file
#druid.indexer.logs.directory=var/druid/indexing-logs

druid.indexer.logs.type=hdfs
druid.indexer.logs.directory=hdfs://10.5.24.137:9000/druid/indexing-logs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
主节点配置
创建配置文件:cp conf/supervise/master-no-zk.conf conf/supervise/master.conf
编辑master.conf 内容如下:
:verify bin/verify-java
:verify bin/verify-version-check

coordinator bin/run-druid coordinator conf
!p80 overlord bin/run-druid overlord conf
1
2
3
4
5
6
目前的版本中,mysql-metadata-storage没有包含在默认的安装包中,如果使用mysql存储元数据,需要单独安装下对应的扩展,是用下列命令在两个master节点上对需要用到的扩展进行安装:
root@native-lufanfeng-1-5-24-137:~/imply-2.3.8# java -classpath "dist/druid/lib/*"  -Ddruid.extensions.directory="dist/druid/extensions" io.druid.cli.Main tools pull-deps  -c io.druid.extensions:mysql-metadata-storage:0.10.1 -c io.druid.extensions.contrib:druid-rabbitmq:0.10.1 -h org.apache.hadoop:hadoop-client:2.7.0
1
==默认mysql-metadata-storage带的mysql驱动是针对Mysql 5.1的,如果使用Mysql的版本是5.5 或是其他版本,可能会出现”Communications link failure”的错误,此时需要更新Mysql的驱动。==

在10.5.24.137/138上启动master相关服务:nohup bin/supervise -c conf/supervise/master.conf > master.log &
数据节点与查询节点配置
安装NodeJS:apt-get install nodejs
创建配置文件:vim conf/supervise/data-with-query.conf
编辑data-with-query.conf 内容如下:
:verify bin/verify-java
:verify bin/verify-node
:verify bin/verify-version-check

broker bin/run-druid broker conf
imply-ui bin/run-imply-ui conf

historical bin/run-druid historical conf
middleManager bin/run-druid middleManager conf

# Uncomment to use Tranquility Server
#!p95 tranquility-server bin/tranquility server -configFile conf/tranquility/server.json

# Uncomment to use Tranquility Kafka
#!p95 tranquility-kafka bin/tranquility kafka -configFile conf/tranquility/kafka.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
对于集群模式,pivot的配置文件必须调整为mysql,sqllite会导致无法查看datasource,修改conf/pivot/config.xml文件
settingsLocation:
  location: mysql
  uri: 'mysql://root:123456@10.5.24.151:3306/druid'
  table: 'pivot_state'
  initialSettings:
  clusters:
    - name: druid
      type: druid
      host: localhost:8082
1
2
3
4
5
6
7
8
9
10
在10.5.24.139/140两台机器上分别执行:nohup bin/supervise -c conf/supervise/data-with-query.conf > data-with-query.log &
验证
可视化控制台
overlord 控制页面:http://10.5.24.138:8090/console.html.
druid集群页面:http://10.5.24.138:8081
数据可视化页面:http://10.5.24.139:9095
---------------------

下载:
http://druid.io/downloads.html

A version may be declared as a release candidate if it has been deployed to a sizable production cluster. Release candidates are declared as stable after we feel fairly confident there are no major bugs in the version. Check out the Versioning section for how we describe releases.

The current stable is tagged at version 0.10.0.
Druid: druid-0.10.0-bin.tar.gz.
MySQL metadata store extension: mysql-metadata-storage-0.10.0.tar.gz. (Due to licensing, we've separated MySQL metadata store extension from main Druid release. If you would like to use it, please untar this tarball and follow the steps in Include Extensions)

Tranquility: tranquility-distribution-0.8.2.tgz.

安装
http://druid.io/docs/0.10.0/tutorials/cluster.html

Select hardware
TheCoordinator and Overlord processes can be co-located on a single server that is responsible for handling the metadata and coordination needs of your cluster(处理元数据以及集群之间的协调). The equivalent of an AWS m3.xlarge is sufficient for most clusters. This hardware offers:
4 vCPUs
15 GB RAM
80 GB SSD storage
Historicalsand MiddleManagers can be colocated on a single server to handle the actual data in your cluster. These servers benefit greatly from CPU, RAM, and SSDs. The equivalent of an AWS r3.2xlarge is a good starting point. This hardware offers:
8 vCPUs
61 GB RAM
160 GB SSD storage
DruidBrokers accept queries and farm them out to the rest of the cluster. They also optionally maintain an in-memory query cache. These servers benefit greatly from CPU and RAM, and can also be deployed on the equivalent of an AWS r3.2xlarge. This hardware offers:
8 vCPUs
61 GB RAM
160 GB SSD storage
You can consider co-locating any open source UIs or query libraries on the same server that the Broker is running on.
Very large clusters should consider selecting larger servers.

Select OS
We recommend running your favorite Linux distribution. You will also need:
Java 8 or better
Download the distribution
First, download and unpack the release archive. It's best to do this on a single machine at first, since you will be editing the configurations and then copying the modified distribution out to all of your servers.
curl -O http://static.druid.io/artifacts/releases/druid-0.10.0-bin.tar.gztar -xzf druid-0.10.0-bin.tar.gzcd druid-0.10.0

步骤一:Configure deep storage
Druid relies on a distributed filesystem or large object (blob) store for data storage. The most commonly used deep storage implementations are S3 (popular for those on AWS) and HDFS (popular if you already have a Hadoop deployment).

S3 (略)

HDFS(我们使用)
手动替换所依赖的Hadoop的Jar包:
cd $DRUID_HOME
find -name "*hadoop*"

因为我们使用的Hadoop的版本是2.7.0。所以需要将Druid自带的Hadoop的jar包升级到2.7.0,替换2.3.0的jar包。
注意:extensions/druid-hdfs-storage/中默认有guava-16.0.1.jar,而不是Hadoop依赖的guava-11.0.2.jar。这个是没问题的。

将本地的Hadoop的依赖复制到DRUID下。hadoop-client没有,网上下载。
cd $DRUID_HOME/extensions/druid-hdfs-storage
wgethttp://central.maven.org/maven2/org/apache/hadoop/hadoop-client/2.7.0/hadoop-client-2.7.0.jar
cp $HADOOP_HOME/share/hadoop/tools/lib/hadoop-auth-2.7.0.jar $DRUID_HOME/extensions/druid-hdfs-storage
cp $HADOOP_HOME/share/hadoop/common/hadoop-common-2.7.0.jar $DRUID_HOME/extensions/druid-hdfs-storage
cp $HADOOP_HOME/share/hadoop/hdfs/hadoop-hdfs-2.7.0.jar $DRUID_HOME/extensions/druid-hdfs-storage
cp $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-app-2.7.0.jar $DRUID_HOME/extensions/druid-hdfs-storage
cp $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.7.0.jar $DRUID_HOME/extensions/druid-hdfs-storage
cp $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.7.0.jar $DRUID_HOME/extensions/druid-hdfs-storage
cp $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-shuffle-2.7.0.jar $DRUID_HOME/extensions/druid-hdfs-storage
cp $HADOOP_HOME/share/hadoop/yarn/hadoop-yarn-api-2.7.0.jar $DRUID_HOME/extensions/druid-hdfs-storage
cp $HADOOP_HOME/share/hadoop/yarn/hadoop-yarn-client-2.7.0.jar $DRUID_HOME/extensions/druid-hdfs-storage
cp $HADOOP_HOME/share/hadoop/yarn/hadoop-yarn-common-2.7.0.jar $DRUID_HOME/extensions/druid-hdfs-storage
cp $HADOOP_HOME/share/hadoop/yarn/hadoop-yarn-server-common-2.7.0.jar $DRUID_HOME/extensions/druid-hdfs-storage
cp $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.0.jar $DRUID_HOME/extensions/druid-hdfs-storage
cp $HADOOP_HOME/share/hadoop/common/lib/htrace-core-3.1.0-incubating.jar $DRUID_HOME/extensions/druid-hdfs-storage
mkdir $DRUID_HOME/hadoop-dependencies/hadoop-client/2.7.0
cd $DRUID_HOME/hadoop-dependencies/hadoop-client/2.7.0
wgethttp://central.maven.org/maven2/org/apache/hadoop/hadoop-client/2.7.0/hadoop-client-2.7.0.jar

cp $HADOOP_HOME/share/hadoop/hdfs/hadoop-hdfs-2.7.0.jar $DRUID_HOME/hadoop-dependencies/hadoop-client/2.7.0
cp $HADOOP_HOME/share/hadoop/common/hadoop-common-2.7.0.jar $DRUID_HOME/hadoop-dependencies/hadoop-client/2.7.0
cp $HADOOP_HOME/share/hadoop/common/lib/hadoop-auth-2.7.0.jar $DRUID_HOME/hadoop-dependencies/hadoop-client/2.7.0
cp $HADOOP_HOME/share/hadoop/common/lib/hadoop-annotations-2.7.0.jar $DRUID_HOME/hadoop-dependencies/hadoop-client/2.7.0
cp $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-app-2.7.0.jar $DRUID_HOME/hadoop-dependencies/hadoop-client/2.7.0
cp $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.7.0.jar $DRUID_HOME/hadoop-dependencies/hadoop-client/2.7.0
cp $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.7.0.jar $DRUID_HOME/hadoop-dependencies/hadoop-client/2.7.0
cp $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.0.jar $DRUID_HOME/hadoop-dependencies/hadoop-client/2.7.0
cp $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-shuffle-2.7.0.jar $DRUID_HOME/hadoop-dependencies/hadoop-client/2.7.0
cp $HADOOP_HOME/share/hadoop/yarn/hadoop-yarn-server-common-2.7.0.jar $DRUID_HOME/hadoop-dependencies/hadoop-client/2.7.0
cp $HADOOP_HOME/share/hadoop/yarn/hadoop-yarn-common-2.7.0.jar $DRUID_HOME/hadoop-dependencies/hadoop-client/2.7.0
cp $HADOOP_HOME/share/hadoop/yarn/hadoop-yarn-client-2.7.0.jar $DRUID_HOME/hadoop-dependencies/hadoop-client/2.7.0
cp $HADOOP_HOME/share/hadoop/yarn/hadoop-yarn-api-2.7.0.jar $DRUID_HOME/hadoop-dependencies/hadoop-client/2.7.0
cp $HADOOP_HOME/share/hadoop/common/lib/htrace-core-3.1.0-incubating.jar $DRUID_HOME/hadoop-dependencies/hadoop-client/2.7.0
配置common.runtime.properties
配置 conf/druid/_common/common.runtime.properties文件。修改以下内容:
1)设置druid.extensions.hadoopDependenciesDir=/hadoop/haozhuo/druid/druid-0.10.0/hadoop-dependencies
(注意这里不带hadoop-client/2.7.0)
并且
在conf/druid/middleManager/runtime.properties修改druid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.7.0"]
这样运行时,会自动找到/hadoop/haozhuo/druid/druid-0.10.0/hadoop-dependencies/hadoop-client/2.7.0这个路径

或者另一种写法是:
设置druid.extensions.hadoopDependenciesDir=/hadoop/haozhuo/druid/druid-0.10.0/hadoop-dependencies/hadoop-client/2.7.0
设置druid.indexer.task.defaultHadoopCoordinates=[]

这两种方式都是可以的,任选其一

2)将druid.extensions.loadList=["druid-s3-extensions"]修改成 
druid.extensions.loadList=["druid-hdfs-storage"].

3) 注释掉:
#druid.storage.type=local#druid.storage.storageDirectory=var/druid/segments解除注释:druid.storage.type=hdfsdruid.storage.storageDirectory=/druid/segments

4)注释掉:
#druid.indexer.logs.type=file#druid.indexer.logs.directory=var/druid/indexing-logs解除注释:druid.indexer.logs.type=hdfsdruid.indexer.logs.directory=/druid/indexing-logs

5)将Hadoop配置文件:core-site.xml, hdfs-site.xml, yarn-site.xml, mapred-site.xml复制到 conf/druid/_common/目录下

步骤二:Configure addresses for Druid coordination
还是修改conf/druid/_common/common.runtime.properties这个文件:
1)设置ZooKeeper的地址:
druid.zk.service.host=192.168.1.150:2181

2)修改metadata store的配置。我这里使用mysql作为metadata store。注释掉derby的配置。然后使用mysql相关配置

#druid.metadata.storage.type=derby
#druid.metadata.storage.connector.connectURI=jdbc:derby://metadata.store.ip:1527/var/druid/metadata.db;create=true
#druid.metadata.storage.connector.host=metadata.store.ip
#druid.metadata.storage.connector.port=1527
# For MySQL:
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://192.168.1.162:3306/druid
druid.metadata.storage.connector.user=datag
druid.metadata.storage.connector.password=yjkdatag

3)从http://druid.io/downloads.html中下载mysql-metadata-storage-0.10.0.tar.gz ,然后解压到$DRUID_HOME/extensions目录下:

步骤三:Configure Tranquility Server (optional)需要配置
Data streams can be sent to Druid through a simple HTTP API powered by Tranquility Server. If you will be using this functionality, then at this point you should configure Tranquility Server.
http://druid.io/docs/0.10.0/ingestion/stream-ingestion.html#server
Loading streams
Streams can be ingested in Druid using either Tranquility (a Druid-aware client) and the indexing service or through standalone Realtime nodes. The first approach will be more complex to set up, but also offers scalability and high availability characteristics that advanced production setups may require. The second approach has some known limitations.
(使用 Tranquility这种方式摄入数据的特点是:部署起来更加复杂,但是提供高可用和可扩展的特点。适用于生产环境)

Stream push
If you have a program that generates a stream, then you can push that stream directly into Druid in real-time. With this approach, Tranquility is embedded in your data-producing application. Tranquility comes with bindings for the Storm and Samza stream processors.It also has a direct API that can be used from any JVM-based program, such as Spark Streaming or a Kafka consumer.
Tranquility handles partitioning, replication, service discovery, and schema rollover for you, seamlessly and without downtime. You only have to define your Druid schema.
For examples and more information, please see the Tranquility README.

Spark Streaming往Druid发送数据:
https://github.com/druid-io/tranquility/blob/master/docs/spark.md
有Spark2.10的包
https://mvnrepository.com/artifact/io.druid/tranquility-spark_2.10/0.7.2

Stream pull
If you have an external service that you want to pull data from, you have two options. The simplest option is to set up a "copying" service that reads from the data source and writes to Druid using the stream push method.
Another option is stream pull. With this approach, a Druid Realtime Node ingests data from a Firehose connected to the data you want to read. Druid includes builtin firehoses for Kafka, RabbitMQ, and various other streaming systems.
More information
For more information on loading streaming data via a push based approach, please see here.
For more information on loading streaming data via a pull based approach, please see here.

步骤四:Configure Tranquility Kafka (optional ) 暂时不配
Druid can consuming streams from Kafka through Tranquility Kafka. If you will be using this functionality, then at this point you should configure Tranquility Kafka.

步骤五:Configure for connecting to Hadoop(optional)配置
如果想要从Hadoop加载数据,那么就需要配置。注意,这里从HDFS加载数据与用HDFS作为deep storage是不同的概念。
1)修改$DRUID_HOME/conf/druid/middleManager/runtime.properties
druid.service=druid/middleManager
druid.port=8091

# Number of tasks per middleManager
druid.worker.capacity=3

# Task launch parameters
druid.indexer.runner.javaOpts=-server -Xmx2g -Duser.timezone=UTC+08:00 -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

druid.indexer.task.baseTaskDir=/hadoop/haozhuo/druid/var/task

# HTTP server threads
druid.server.http.numThreads=20

# Processing threads and buffers
# druid.processing.buffer.sizeBytes=536870912
druid.processing.buffer.sizeBytes=268435456
druid.processing.numThreads=2

# Hadoop indexing 是HDFS中的路径
druid.indexer.task.hadoopWorkingPath=/druid/tmp/druid-indexing

# 具体根据之前设置的druid.extensions.hadoopDependenciesDir而定。
# 也可能是 druid.indexer.task.defaultHadoopCoordinates=[]
druid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.7.0"]

3)将Hadoop配置文件:core-site.xml, hdfs-site.xml, yarn-site.xml, mapred-site.xml复制到conf/druid/_common/下。(这步骤在步骤1时已经做了)

4)最最重要的一步,也是官网中没有。在从Hadoop导入数据到DRUID时出现了问题,折磨了我好长时间才解决的:
解决guice冲突问题:
DRUID依赖的谷歌的guice的版本是:
./lib/jersey-guice-1.19.jar
./lib/guice-multibindings-4.1.0.jar
./lib/guice-servlet-4.1.0.jar
./lib/guice-4.1.0.jar
而Hadoop依赖的guice版本是
./share/hadoop/yarn/lib/guice-servlet-3.0.jar
./share/hadoop/yarn/lib/guice-3.0.jar
./share/hadoop/yarn/lib/jersey-guice-1.9.jar
./share/hadoop/mapreduce/lib/guice-servlet-3.0.jar
./share/hadoop/mapreduce/lib/guice-3.0.jar
./share/hadoop/mapreduce/lib/jersey-guice-1.9.jar
这样会导致Druid提交job到Hadoop执行MapReduce时会报以下错误:
java.lang.NoSuchMethodError: com.google.inject.util.Types.collectionOf(Ljava/lang/reflect/Type;)Ljava/lang/reflect/ParameterizedType;
at com.google.inject.multibindings.Multibinder.collectionOfProvidersOf(Multibinder.java:202)
at com.google.inject.multibindings.Multibinder$RealMultibinder.<init>(Multibinder.java:283)
at com.google.inject.multibindings.Multibinder$RealMultibinder.<init>(Multibinder.java:258)
at com.google.inject.multibindings.Multibinder.newRealSetBinder(Multibinder.java:178)
at com.google.inject.multibindings.Multibinder.newSetBinder(Multibinder.java:150)
at io.druid.guice.LifecycleModule.getEagerBinder(LifecycleModule.java:131)
at io.druid.guice.LifecycleModule.configure(LifecycleModule.java:137)
at com.google.inject.spi.Elements$RecordingBinder.install(Elements.java:223)
at com.google.inject.spi.Elements.getElements(Elements.java:101)
at com.google.inject.spi.Elements.getElements(Elements.java:92)
我的解决方法是:
cp $DRUID_HOME/lib/*guice* $HADOOP_HOME/share/hadoop/yarn/lib/
cp $DRUID_HOME/lib/*guice* $HADOOP_HOME/share/hadoop/mapreduce/lib/
这样,Hadoop中的guice的版本就有:
./share/hadoop/yarn/lib/jersey-guice-1.19.jar
./share/hadoop/yarn/lib/jersey-guice-1.9.jar
./share/hadoop/yarn/lib/guice-multibindings-4.1.0.jar
./share/hadoop/yarn/lib/guice-3.0.jar
./share/hadoop/yarn/lib/guice-servlet-4.1.0.jar
./share/hadoop/yarn/lib/guice-4.1.0.jar
./share/hadoop/yarn/lib/guice-servlet-3.0.jar
./share/hadoop/mapreduce/lib/jersey-guice-1.19.jar
./share/hadoop/mapreduce/lib/jersey-guice-1.9.jar
./share/hadoop/mapreduce/lib/guice-multibindings-4.1.0.jar
./share/hadoop/mapreduce/lib/guice-3.0.jar
./share/hadoop/mapreduce/lib/guice-servlet-4.1.0.jar
./share/hadoop/mapreduce/lib/guice-4.1.0.jar
./share/hadoop/mapreduce/lib/guice-servlet-3.0.jar
Hadoop中有了guice-4.1.0的版本后执行Druid任务就不会报错了。

解决guava冲突问题:
出现以下问题:
2017-07-19 22:39:43,122 ERROR [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.NoSuchMethodError: com.google.common.base.Enums.getIfPresent(Ljava/lang/Class;Ljava/lang/String;)Lcom/google/common/base/Optional;
原因跟上面一样,Druid需要guava-16.0.1.jar,而Hadoop中只有guava-11.0.2.jar。将Druid中的guava-16.0.1.jar复制到Hadoop中。
cp $DRUID_HOME/lib/guava-16.0.1.jar $HADOOP_HOME/share/hadoop/yarn/lib/
cp $DRUID_HOME/lib/guava-16.0.1.jar $HADOOP_HOME/share/hadoop/common/lib/
cp $DRUID_HOME/lib/guava-16.0.1.jar $HADOOP_HOME/share/hadoop/hdfs/lib/
结果如下:
./share/hadoop/tools/lib/guava-11.0.2.jar
./share/hadoop/common/lib/guava-16.0.1.jar
./share/hadoop/common/lib/guava-11.0.2.jar
./share/hadoop/kms/tomcat/webapps/kms/WEB-INF/lib/guava-11.0.2.jar
./share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib/guava-11.0.2.jar
./share/hadoop/yarn/lib/guava-16.0.1.jar
./share/hadoop/yarn/lib/guava-11.0.2.jar
./share/hadoop/hdfs/lib/guava-16.0.1.jar
./share/hadoop/hdfs/lib/guava-11.0.2.jar
步骤六:Tune Druid Coordinator and Overlord

注意:设置时区,其他配置文件中都要修改!
由于Druid是时间序列数据库,所以对时间非常敏感。Druid底层采用绝对毫秒数存储时间,如果不指定时区,默认输出为零时区时间,即ISO8601中yyyy-MM-ddThh:mm:ss.SSSZ。我们生产环境中采用东八区,也就是Asia/Hong Kong时区,所以需要将集群所有UTC时间调整为UTC+08:00;同时导入的数据的timestamp列格式必须为:yyyy-MM-ddThh:mm:ss.SSS+08:00

如果不设置,Hadoop batch ingestion会失败,出现“No buckets?? seems there is no data to index.”的错误

配置Coordinator:
cd $DRUID_HOME/conf/druid/coordinator
vi jvm.config
-server
-Xms3g
-Xmx3g
-Duser.timezone=UTC+08:00
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=/hadoop/haozhuo/druid/var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
-Dderby.stream.error.file=/hadoop/haozhuo/druid/var/derby.log(这个配置感觉并没啥卵用。用的是mysql作为元数据库)

vi runtime.properties(并没做什么修改,都是默认值)
druid.service=druid/coordinator
druid.port=8081
druid.coordinator.startDelay=PT30S
druid.coordinator.period=PT30S

配置Overlord:
cd $DRUID_HOME/conf/druid/overlord
vi jvm.config
-server
-Xms3g
-Xmx3g
-Duser.timezone=UTC+08:00
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=/hadoop/haozhuo/druid/var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

vi runtime.properties(并没做什么修改,都是默认值)
druid.service=druid/overlord
druid.port=8090
druid.indexer.queue.startDelay=PT30S
druid.indexer.runner.type=remote
druid.indexer.storage.type=metadata

步骤七:Tune Druid processes that serve queries
Druid Historicals andMiddleManagers can be co-located on the same hardware. Both Druid processes benefit greatly from being tuned to the hardware they run on. If you are running Tranquility Server or Kafka, you can also colocate Tranquility with these two Druid processes. If you are using r3.2xlarge EC2 instances, or similar hardware, the configuration in the distribution is a reasonable starting point.
If you are using different hardware, we recommend adjusting configurations for your specific hardware. The most commonly adjusted configurations are:
-Xmx and -Xms
druid.server.http.numThreads
druid.processing.buffer.sizeBytes
druid.processing.numThreads
druid.query.groupBy.maxIntermediateRows
druid.query.groupBy.maxResults
druid.server.maxSize and druid.segmentCache.locations on Historical Nodes
druid.worker.capacity on MiddleManagers

Please see the Druidconfiguration documentation for a full description of all possible configuration options
cd $DRUID_HOME/conf/druid/historical
vi jvm.config
-server
-Xms4g
-Xmx4g
-XX:MaxDirectMemorySize=3072m
-Duser.timezone=UTC+08:00
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=/hadoop/haozhuo/druid/var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

vi runtime.properties
备注:端口本来应该是默认8023,但是该端口被其他程序占用了,所以修改成8283。druid.processing.numThreads和druid.processing.buffer.sizeBytes缩小了一半,内存不够用

druid.service=druid/historical
druid.port=8283
# HTTP server threads
#druid.server.http.numThreads=25
druid.server.http.numThreads=20
# Processing threads and buffers
#druid.processing.buffer.sizeBytes=536870912
druid.processing.buffer.sizeBytes=268435456
#druid.processing.numThreads=7
druid.processing.numThreads=3
# Segment storage
druid.segmentCache.locations=[{"path":"var/druid/segment-cache","maxSize"\:130000000000}]
druid.server.maxSize=130000000000

cd $DRUID_HOME/conf/druid/middleManager
vi jvm.config
-server
-Xms64m
-Xmx64m
-Duser.timezone=UTC+08:00
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=/hadoop/haozhuo/druid/var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

vi runtime.properties
druid.service=druid/middleManager
druid.port=8091
# Number of tasks per middleManager
druid.worker.capacity=3
# Task launch parameters
druid.indexer.runner.javaOpts=-server -Xmx2g -Duser.timezone=UTC+08:00 -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
druid.indexer.task.baseTaskDir=/hadoop/haozhuo/druid/var/task
# HTTP server threads
druid.server.http.numThreads=20
# Processing threads and buffers
druid.processing.buffer.sizeBytes=268435456
druid.processing.numThreads=2
# Hadoop indexing
druid.indexer.task.hadoopWorkingPath=/druid/tmp/druid-indexing
druid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.7.0"]

步骤八:Tune Druid Brokers
Druid Brokers also benefit greatly from being tuned to the hardware they run on. If you are usingr3.2xlarge EC2 instances, or similar hardware, the configuration in the distribution is a reasonable starting point.
If you are using different hardware, we recommend adjusting configurations for your specific hardware. The most commonly adjusted configurations are:
-Xmx and -Xms
druid.server.http.numThreads
druid.cache.sizeInBytes
druid.processing.buffer.sizeBytes
druid.processing.numThreads
druid.query.groupBy.maxIntermediateRows
druid.query.groupBy.maxResults

Please see the Druidconfiguration documentation for a full description of all possible configuration options.

cd $DRUID_HOME/conf/druid/broker
vi jvm.config
-server
-Xms12g
-Xmx12g
-XX:MaxDirectMemorySize=3072m
-Duser.timezone=UTC+08:00
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=/hadoop/haozhuo/druid/var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

vi runtime.properties
备注:端口本来应该是默认8022,但是该端口被其他程序占用了,所以修改成8282。
druid.service=druid/broker
druid.port=8282
# HTTP server threads
druid.broker.http.numConnections=5
druid.server.http.numThreads=25
# Processing threads and buffers
druid.processing.buffer.sizeBytes=268435456
druid.processing.numThreads=3
# Query cache
druid.broker.cache.useCache=true
druid.broker.cache.populateCache=true
druid.cache.type=local
druid.cache.sizeInBytes=1000000000

步骤九:Open ports (if using a firewall)
If you're using a firewall or some other system that only allows traffic on specific ports, allow inbound connections on the following:
1527 (Derby on your Coordinator; not needed if you are using a separate metadata store like MySQL or PostgreSQL)
2181 (ZooKeeper; not needed if you are using a separate ZooKeeper cluster)
8081 (Coordinator)
8082 (Broker) 注意:此端口已被其他程序占用。在conf/druid/historical/runtime.properties中修改成8282。
8083 (Historical) 注意:此端口已被其他程序占用。在conf/druid/historical/runtime.properties中修改成8283。
8084 (Standalone Realtime, if used)
8088 (Router, if used)
8090 (Overlord)
8091, 8100–8199 (Druid Middle Manager; you may need higher than port 8199 if you have a very highdruid.worker.capacity)
8200 (Tranquility Server, if used)
In production, we recommend deploying ZooKeeper and your metadata store on their own dedicated hardware, rather than on the Coordinator server.

步骤十:启动
修改:-Djava.io.tmpdir=var/tmp
修改:conf/druid/coordinator/jvm.config 中的-Dderby.stream.error.file=var/druid/derby.log

确保Zookeeper和MySQL已经启动。
我有三台机子,每台机子如下
机子    官网推荐配置    druid启动部分    进程内存占用    总的内存占用
192.168.1.150    4 vCPUs, 15 GB RAM ,80 GB SSD storage    Coordinator    jvm:3g(默认3g)    jvm:6g
Overlord    jvm :3g(默认3g)
192.168.1.152    8 vCPUs,61 GB RAM,160 GB SSD storage    historical    jvm:4g(默认8g),DirectMemory:3g(默认4g)    jvm:10g+
middleManager    jvm:2g(默认2g),buffer:1g(默认1g)
Tranquility    所需内存另算
192.168.1.153    8 vCPUs,61 GB RAM,160 GB SSD storage    Broker    jvm:12g(默认24g),DirectMemory:3g(默认4g)    jvm:15g
注意historical中DirectMemory中的计算方式:
memoryNeeded[3,221,225,472] = druid.processing.buffer.sizeBytes[536,870,912] * (druid.processing.numMergeBuffers[2] + druid.processing.numThreads[3] + 1)

将druid-0.10.0复制到其他节点:
scp -r druid-0.10.0 hadoop@datanode152:/hadoop/haozhuo/druid
scp -r druid-0.10.0 hadoop@datanode153:/hadoop/haozhuo/druid

1) 在192.168.1.150上Start Coordinator, Overlord
启动coordinator
cd $DRUID_HOME;nohup java `cat conf/druid/coordinator/jvm.config | xargs` -cp conf/druid/_common:conf/druid/coordinator:lib/* io.druid.cli.Main servercoordinator &

tail -1000f nohup.out
启动 overlord
cd $DRUID_HOME; nohup java `cat conf/druid/overlord/jvm.config | xargs` -cp conf/druid/_common:conf/druid/overlord:lib/* io.druid.cli.Main serveroverlord &

2) 在192.168.1.152上Start Historicals and MiddleManagers
启动historical
cd $DRUID_HOME; nohup java `cat conf/druid/historical/jvm.config | xargs` -cp conf/druid/_common:conf/druid/historical:lib/* io.druid.cli.Main serverhistorical &

启动middleManagercd $DRUID_HOME; nohup java `cat conf/druid/middleManager/jvm.config | xargs` -cp conf/druid/_common:conf/druid/middleManager:lib/* io.druid.cli.Main server middleManager&

是否需要部署Tranquility Server?
You can add more servers with Druid Historicals and MiddleManagers as needed.

这部分注意下,后面Spark Streaming流式摄入时可能会用到:
If you are doing push-based stream ingestion with Kafka or over HTTP, you can also start Tranquility Server on the same hardware that holds MiddleManagers and Historicals. For large scale production, MiddleManagers and Tranquility Server can still be co-located. If you are running Tranquility (not server) with a stream processor, you can co-locate Tranquility with the stream processor and not require Tranquility Server.
curl -O http://static.druid.io/tranquility/releases/tranquility-distribution-0.8.0.tgztar -xzf tranquility-distribution-0.8.0.tgzcd tranquility-distribution-0.8.0bin/tranquility <server or kafka> -configFile <path_to_druid_distro>/conf/tranquility/<server or kafka>.json

3)在192.168.1.153上Start Druid Broker
cd $DRUID_HOME; nohup java `cat conf/druid/broker/jvm.config | xargs` -cp conf/druid/_common:conf/druid/broker:lib/* io.druid.cli.Main server broker &

自己写的脚本
所有机子添加:stopDruid.sh
pids=`ps -ef | grep druid | awk '{print $2}'`
for pid in $pids
do
kill -9 $pid
done

部署coordinator和overlord的机子,添加startDruid.sh
java `cat $DRUID_HOME/conf/druid/coordinator/jvm.config | xargs` -cp $DRUID_HOME/conf/druid/_common:$DRUID_HOME/conf/druid/coordinator:$DRUID_HOME/lib/* io.druid.cli.Main server coordinator >> $DRUID_HOME/start.log &

java `cat $DRUID_HOME/conf/druid/overlord/jvm.config | xargs` -cp $DRUID_HOME/conf/druid/_common:$DRUID_HOME/conf/druid/overlord:$DRUID_HOME/lib/* io.druid.cli.Main server overlord >> $DRUID_HOME/start.log &

部署historical和middleManager的机子,添加startDruid.sh
java `cat $DRUID_HOME/conf/druid/historical/jvm.config | xargs` -cp $DRUID_HOME/conf/druid/_common:$DRUID_HOME/conf/druid/historical:$DRUID_HOME/lib/* io.druid.cli.Main server historical >> $DRUID_HOME/start.log &

java `cat $DRUID_HOME/conf/druid/middleManager/jvm.config | xargs` -cp $DRUID_HOME/conf/druid/_common:$DRUID_HOME/conf/druid/middleManager:$DRUID_HOME/lib/* io.druid.cli.Main server middleManager >> $DRUID_HOME/start.log &

部署broker的机子,添加startDruid.sh
java `cat $DRUID_HOME/conf/druid/broker/jvm.config | xargs` -cp $DRUID_HOME/conf/druid/_common:$DRUID_HOME/conf/druid/broker:$DRUID_HOME/lib/* io.druid.cli.Main server broker >> $DRUID_HOME/start.log &

Loading data
Congratulations, you now have a Druid cluster! The next step is to learn about recommended ways to load data into Druid based on your use case. Read more aboutloading data.

Load streaming data
http://druid.io/docs/0.10.0/tutorials/quickstart.html

To load streaming data, we are going to push events into Druid over a simple HTTP API. To do this we will use [Tranquility], a high level data producer library for Druid.
To download Tranquility, issue the following commands in your terminal:
curl -O http://static.druid.io/tranquility/releases/tranquility-distribution-0.8.0.tgztar -xzf tranquility-distribution-0.8.0.tgzcd tranquility-distribution-0.8.0
We've included a configuration file in conf-quickstart/tranquility/server.json as part of the Druid distribution for a metricsdatasource. We're going to start the Tranquility server process, which can be used to push events directly to Druid.
bin/tranquility server -configFile <path_to_druid_distro>/conf-quickstart/tranquility/server.json
This section shows you how to load data using Tranquility Server, but Druid also supports a wide variety of other streaming ingestion options, including from popular streaming systems like Kafka, Storm, Samza, and Spark Streaming.
The dimensions (attributes you can filter and split on) for this datasource are flexible. It's configured for schemaless dimensions, meaning it will accept any field in your JSON input as a dimension.
The metrics (also called measures; values you can aggregate) in this datasource are:
count
value_sum (derived from value in the input)
value_min (derived from value in the input)
value_max (derived from value in the input)
We've included a script that can generate some random sample metrics to load into this datasource. To use it, simply run in your Druid distribution repository:
bin/generate-example-metrics| curl -XPOST -H'Content-Type: application/json' --data-binary @- http://localhost:8200/v1/post/metrics
Which will print something like:
{"result":{"received":25,"sent":25}}
This indicates that the HTTP server received 25 events from you, and sent 25 to Druid. Note that this may take a few seconds to finish the first time you run it, as Druid resources must be allocated to the ingestion task. Subsequent POSTs should complete quickly.
Once the data is sent to Druid, you can immediately query it.
Query data
Direct Druid queries
Druid supports a rich family of JSON-based queries. We've included an example topN query in quickstart/wikiticker-top-pages.json that will find the most-edited articles in this dataset:
curl -L -H'Content-Type: application/json' -XPOST --data-binary @quickstart/wikiticker-top-pages.json http://localhost:8082/druid/v2/?pretty
Visualizing data
Druid is ideal for power user-facing analytic applications. There are a number of different open source applications to visualize and explore data in Druid. We recommend trying Pivot, Superset, or Metabase to start visualizing the data you just ingested.
If you installed Pivot for example, you should be able to view your data in your browser at localhost:9090.
SQL and other query libraries
There are many more query tools for Druid than we've included here, including SQL engines, and libraries for various languages like Python and Ruby. Please see the list of libraries for more information.

测试从HDFS导入数据到Druid
overlord console:http://192.168.1.150:8090/console.html

cd $DRUID_HOME
curl -X 'POST' -H 'Content-Type:application/json' -d @quickstart/wikiticker-index.json 192.168.1.150:8090/druid/indexer/v1/task

查询:
curl -L -H'Content-Type: application/json' -XPOST --data-binary @quickstart/wikiticker-top-pages2.jsonhttp://192.168.1.153:8282/druid/v2/?pretty

流式摄入:
启动:
注意,必须在该目录下启动
cd /hadoop/haozhuo/druid/tranquility-distribution-0.8.2;

bin/tranquility server-configFile $DRUID_HOME/conf/tranquility/server.json

//测试插入
cd $DRUID_HOME;
bin/generate-example-metrics| curl -XPOST -H'Content-Type: application/json' --data-binary @-http://192.168.1.152:8200/v1/post/metrics

//查询
curl -L -H'Content-Type: application/json' -XPOST --data-binary @quickstart/wikiticker-top-pages.json http://192.168.1.152:8082/druid/v2/?pretty

curl -L -H'Content-Type: application/json' -XPOST --data-binary @quickstart/wikiticker-top-pages2.json http://192.168.1.153:8282/druid/v2/?pretty
curl -L -H'Content-Type: application/json' -XPOST --data-binary @quickstart/test-top-pages.json http://192.168.1.153:8282/druid/v2/?pretty
http://192.168.1.150:8081/#/
---------------------

druid.io实践分享之Realtime+kafka

但如何发挥到最好、最稳定,需要更多细节的调整。 
我会从整体部署、实时节点、历史节点、broker节点来依次介绍。

从整体部署来看: 
第一:druid.io 属于IO和CPU双重密集型引擎,所以对内存、CPU、硬盘IO都有特定要求,特别提醒,如果资金充足,可以直接上SSD(内存和CPU同理)。

第二:部署的整个过程需要多次练习,并记录成流程规范,因为整体集群涉及的层次较多,如果不进行流程规范化,会导致因人不同,使线上操作出现问题

第三:相应的监控机制,要配合到位,后期在整体部署中,逐步进行实施自动化方式

第四:各节点类型服务启动时,没有明确的先后顺序

整体完整结构如下:

目前作者成立的https://imply.io 提供了更加完善的解决方案,有兴趣的可以参考下。

首先我们来分析下实时节点。 
对实时节点(realtime)我考虑从两个方面来分享: 
一个方面:数据本身要求 
一个方面:realtime运行阶段出现的一些问题(如:segment堆积等)

首先我会对背景进行介绍下,我们是一个CPA广告联盟,有impression、click、conversion等类型的数据。对我们而言conversion数据最重要,因为我们是conversion来计费的。所以就有一个高标准conversion类型数据不能丢失且不能重复,对另外两类型数据在一定范围内可以接受少数丢失和少数重复。我们是基于6.0的版本来分析的。

初期druid的实时数据获取的方式是通过realtime节点结合kafka的方式,所以提供了不丢失数据的优势,如果对实时数据的各方面要求很高的前提下,realtime节点结合kafka的方式还是会带来诸多问题(这也是新版本中已经不推荐使用的原因,个人见解任何系统只要使用了kafka集群都会有这样的情况发生,只要努力去解决就好)。 
场景如下: 
场景一:realtime保证了不少数据,但没保证不多数据 
场景二:realtime节点是单点特性,这样一旦一个节点出问题,对数据敏感的话,会立马发现问题 
场景三:realtime节点没有提供安全关闭的逻辑(官方是提供直接kill方式) 
场景四:realtime在某些场景中,还是会掉数据 
另外也存在少量的bug,这个可以忽略下(在6.0版本之后都得到了修复)。我将一一对这些场景的产生及解决方式进行阐述。

场景一:
确实只要你的数据进入了kafka集群,数据是真的不丢失(当然如果kafka的硬盘已经满了这种情况,一般都会用监控的方式去规避吧)。但是为了支持多个realtime节点,kafka里的topic必须进行分区,不然无法整体提升并发的能力。如下图:

并且官方文档给出的kafka使用有一个参数: 
"auto.commit.enable": "false" 
也就是交由realtime节点自行管理。这个参数设置成false后,realtime会在把一个时间段的数据持久化之后,才会给kafka集群发一个commit命令。

正是因为这个逻辑,所以在线上运行阶段,当consumer组里某个一个消费者出现某种问题,会导致没有及时对topic消费进行响应(但是已经正常消费数据了),这时候kafka会对分区进行重新调整,导致其它消费者会根据上次的offset进行消费数据,从而最终导致数据重复。如下图:

场景二:
总体来说也是由于场景一带来的副作用,由于分区导致,每一个partition被一个cousumer消费,所以多个realtime节点就是单点特性,kill任意一台realtime节点反过来也会激活kafka集群对partition再分配的处理。所以会出现这样的场景一旦启动realtime节点,就永久无法正常shutdown(只能用kill方式)。并且多realtime节点一起启动时,在启动的过程中,每个节点启动间隔不能太长(原因大家可以想想,所以一般会使用一个远程启动脚本方式,统一进行远程启动)。首先我先启动realtime1节点,如下图:

3个partition先分配给realtime1,分别从offset=300、200、100开始消费。过1分钟后再启动realtime2节点,这样会触发partition动态分配事件,启动时假设realtime1已经消费到offset=312、222、133,但还没有提交commit(所以kafka那边的offset值是不变的) 
如下图:

假设重新分配后,partition-3分配给了realtime2,那么realtime2节点将从offset=100的位置开始进行消费,如下图:

这就是realtime节点间启动间隔不能太长的原因。

场景三:
跟场景二有关,就是没有提供安全关闭的命令(只能kill),这会让实际操作过程中,有种不安全感,虽然kill掉后,可以通过kafka来恢复,但是在这种场景下,还是会导致数据重复,例如:当realtime已经对数据持久化了,但还没来得及返回offset告知kafka集群(是将 auto.commit.enable = false),这时realtime节点被kill掉后。如下图:

kill命令发生在持久化之后,commit之前。如下图:

再重启realtime节点就会引发数据重复消费。 
可能大家会发现,如果kill命令发生在内存Cache到持久化之间,会不会重复?当auto.commit.enable = false情况下,这块的数据是会从kafka里恢复回来的。

解决方案
其实场景一、二和三,总结起来其实是一个类别的情况,需要整体一起考虑去解决。 
后面版本中,官方了给出了一个方案类似于双机热备,简单的说,就是两个realitme节点对应一个partition,但我没有去尝试过,这里主要介绍下我们在当时如何解决这样的场景的(特别是kafka、realtime集群在扩容或者删除节点的时候,也会带来这样的问题)。

首先给出一个最简单方式,在采集层做数据的backup,如果你的segment是设置的一个小时,那么就按小时进行check。发现异常,就用backup数据进行修复还原。 
我们尝试下来后,发现客户在使用的过程中,会反馈为什么上一个小时的数据有变化,给用户的体验不好(当然后期我们可以做到提前发消息通知用户告知),站在用户的角度出发希望数据不要经常变动,这样用户会质疑你的系统,而对我们开发人员来说,可以让数据补进当前这个时间段里,总体来说没有让数据丢失。其次按每小时check后台逻辑复杂,人工干预工作量很大。再次就是当我们开发在进行快速迭代和上线的时候,势必带来相关数据修复工作,影响效率(最常见的就是增加纬度信息时)。

后来我们讨论了后,重新订了一个新方案:

增加安全关闭逻辑
realtime集群增加一层Cache,用于重复数据过滤
改auto.commit.enable = true,并将zk同步时间稍微缩
定期记录partition的offset值,用于回滚
注:这些方式不一定是最通用最完美的方式,但在当时一定是适合我们需要的方式。

以下是安全关闭部分

启动时 RealtimeManager 为每个datasouce, 启动一个线程, 建立一个FireChief
FireChief 持有RealtimePlumber 和Firehose
FireChief消费数据逻辑 
plumber.startJob(); 
while(notSafeCloseFlag && firehose.hasNext()) 
定时 persistent 
plumber.finishJob();
plumber.finishJob逻辑 
basePersistent 下面的datasource下的文件夹表示sink, 现在系统中一个小时一个Sink 
一个plumber 持有多个sink. 
Sink持有多个FireHydrant, 但只有一个是激活状态, 用于接收数据
一个FireHydrant保存在0, 1, 2 文件夹下。 
对应一个IncrementalIndex和一个Segment 
index 表示内存中的对象 
segment 表示持久化对象 
hasSwapped表示十分已经持久化 
swap操作: 建立segment, 将index设置成null
stop逻辑, 见流程图 
RealtimeManager层次图:

stop关闭流程图:

关于Cache层解决方式,很多人都提过有单点故障,但这个时候需要具体分析了,首先我们这里主要解决是conversion问题,就数量级来说conversion比impression、click要小太多了;其次Cache的conversion不是永久的(属于一个区间内),不会出现数据量过于膨胀;再次逻辑简单因为conversion只有主键存储及判断,不会负载很重。要而且经过了时间证明,运行了两年多零故障率。结构如下图:

后面两点,属于小调整,这样的好处: 
一是重复数据量较小,在面对每天10多亿数据的流转过程中,不影响体验 
二是回滚offset的范围可控

通过此方式,我们进行不断的调整、测试、验证和细节优化,最终做到了整体数据丢失率99.99%(由测试部门给出的),conversion数据丢失率是0的效果。并且也让人工干预的操作减少很多。

场景四:
当数据量进来的速率高于消费的速率时,就会引发数据丢失,主要原因就是windowPeriod这个参数设置,假设segment是设置成1个小时,其windowPeriod设置成PT10m,当前时间是13点,那么就表示在13点10分之前(包含10分)还可以继续消费12点-13点之间的数据,大家应该知道在海量数据传输时,不可能在13点整点就能消费完成12点-13点之间的数据,所以druid在配置时,提供了一个缓冲区间,我们在整个运行过程中出现过两次,第一次是就是速率突然暴涨导致,第二次是因为数据的时间导致。

这样的场景在实际的运营过程中,不会常碰到,但不排除恶意攻击或者刷流量的情况。 
一般解决方式: 
第一:流控报警 
第二:时间重置

流控主要是定时监控Kafka集群里的offset的差值,发现差值变大就要报警。这时的策略就是数据采集服务层降低写入速度(这部分最后也可以变成自动化方式进行),让realtime节点集群的消费得到缓解。然后快速定位到底是达到了当前消费极限还是其它问题。 
时间重置通过读取windowPeriod的值,来进行换算,将此记录放到下一个时间段里, 
举例说明:windowPeriod设置成10分钟,当前的click数据的时间是17:59:44,当前系统时间是18:10:23,这时满足18:10:23-17:59:44>10分钟,将对此click数据的时间重置成18:10:23,进入下一个时间段。 
这种方式,特别适合自动化补数,提高效率。

以上就是数据本身的要求所带来的全部内容。

下一篇我将介绍realtime结合kafka在运行阶段会出现场景及解决方式。

本节重点介绍在运行过程中,这两个组件会出现什么问题及解决方式 
场景如下: 
场景1、第一次上线kafka的partition与realtime的个数关系 
场景2、kafka数据写入最优方式 
场景3、realtime配置文件在实际过程的变化及重点参数 
场景4、segment堆积产生的原因及如何避免 
场景5、realtime对JVM的要求 
场景6、多个dataSource,大小表拆分,多topic消费

场景一

初次上线要当前的数据量、集群规模等因素来综合考虑。基本定律是(以一个topic为例): 
partitions=N*realtimes 
partitions表示当前topic的分区数 
N表示每台realtime消费的partition的个数 
realtimes表示realtime节点总数

举例说明: 
3个partitions,3台realtime,那么N就是1。表示每台realtime节点消费1个partitions。 
6个partitions,3台realtime,那么N就是2。表示每台realtime节点消费2个partitions。 
同理都是类似方式计算。 
为撒这样计算?主要考虑就是均衡,充分利用好每台realtime节点的资源。

随着数据量的增加,会碰到当前topic的消费速率变慢的情况,这个时候就会对kafka集群增加节点或增加kafka节点硬盘或扩展partition个数。 
增加节点和增加硬盘都属于kafka本身的操作,不会影响到realtime这层。但扩展partition个数时,realtime节点在水平扩展的时候,就需要考虑下realtime节点增加的个数(参考上面的公式来算)。

场景二

kafka写入最优方式是均衡,如果不均衡,会影响到两个点:

  1. kafka节点有些硬盘会报警并且通过监控平台会发现有些机器负载高,有些负载低。
  2. 接着带来就是realtime消费的不均衡,也会导致有些机器负载高,有些负载低。

所以使用新的hash算法(这个调整后一定要进行测试验证),我们重新开发了分配算法。默认情况下,Kafka根据传递消息的key来进行分区的分配,即hash(key) % numPartitions。 
但因为Key的关系默认情况下,写入各partition数据还是会出现不均衡的。 
调整成轮训方式即可。

场景三

这里先说的参数重点是指runtime.properties文件里属性druid.realtime.specFile所对应的json文件内容。涉及的参数如下: 
第一:shardSpec 
第二:rejectionPolicy 
第三:intermediatePersistPeriod ≤ windowPeriod < segmentGranularity 和 queryGranularity ≤ segmentGranularity

**shardSpec**

默认设置成

"shardSpec": {"type": "none"}

这种情况单机版还可以玩一玩。在多个realtime节点的情况下,这个不能这样设置了。 
现在官方文档提供:linear and numbered,但在源码中还提供了另一种SingleDimensionShardSpec,这种使用较少。 
这里可以给大家分享一下,当多个realtime节点时还配置成默认值的效果(这是当初文档没有现在这么详细导致我们疏忽了),那么就是当前小时范围内的数据是跳跃的,简单的说,就是同一个查询条件发多次,返回的结果集大小不同,有时候返回30多条,有时候又返回50多条。在当时对初学的人来说很紧张,后面通过源码分析和大家的努力,终于发现了是这个参数配置失误导致。 
但这个调整是有要求的,就是调整完后,必须对当前时间段realtime所有节点已经存储的数据要删除掉,不然查询的结果还是跳跃的。 
举例说明: 
当前时间段是12点-13点(假设segment是按小时进行),我是在12点20分调整此参数,那么再重启全部realtime节点之前,必须把12点-13点之间已经持久化的数据要删除掉,不然调整成linear后,是没有任何效果的。这个步骤非常重要。 
另外注意下: 
realtime 节点1设置为: 
“partitionNum” : 1 
realtime 节点2设置为: 
“partitionNum” : 2 
realtime 节点3设置为: 
“partitionNum” : 3 
依次类推。 
关于numbered和linear的区别很小,主要在查询上面的限制,大家查看官方文档再实践下就知道了。

**rejectionPolicy**

此参数要是不理解,带来的效果就是丢失数据,会让你误认为系统有bug。 
有三个值:serverTime(推荐使用)、messageTime、none。 
如果想不丢弃数据就用none,另外一个用系统时间作为参考,一个用event里的timestamp作为参考。如果此参数是none的话,windowPeriod这个参数就没有什么效果了。 
所以当数据有丢失的时候,首先检查时间戳和这个参数。也就是检查数据里面的timestamp和系统时间是否有较大的误差。(提醒下:首先要保证json数据格式要能正常解析)

**intermediatePersistPeriod、windowPeriod、segmentGranularity和queryGranularity**

在实际的使用过程中,这几个参数的关联关系一定要当心。 
首先介绍下intermediatePersistPeriod、windowPeriod、segmentGranularity三个是紧密关联的,特别windowPeriod这个参数不光控制数据消费延迟,也控制着segment合并持久化。 
并且intermediatePersistPeriod参与的逻辑部分和windowPeriod参与的逻辑部分,是两个线程分开进行的。

刘博宇,滴滴出行高级软件开发工程师,就职于滴滴基础平台大数据架构部。负责Druid集群维护与研发工作。

摘要:

Druid是一款支持数据实时写入、低延时、高性能的OLAP引擎,具有优秀的数据聚合能力与实时查询能力。在大数据分析、实时计算、监控等领域都有特定的应用场景,是大数据基础架构建设中重要的一环。Druid在滴滴承接了包括实时报表、监控、数据分析、大盘展示等应用场景的大量业务,作为大数据基础设施服务于公司多条业务线。本次演讲我们将介绍Druid的核心特性与原理,以及在滴滴内部大规模使用中积累的经验。

 分享大纲:

1、Druid特性简介

2、Druid在滴滴的应用

3、Druid平台化建设

4、展望

 正文:

一、Druid特性简介

Druid是针对时间序列数据提供的低延时数据写入以及快速交互式查询的分布式OLAP数据库。其两大关键点是:首先,Druid主要针对时间序列数据提供低延时数据写入和快速聚合查询;其次,Druid是一款分布式OLAP引擎。

针对第一个特点来看,Druid与典型的TSDB,比如InfluxDB、Graphite、OpenTSDB的部分特性类似。这些时序数据库具备一些共同特点,一是写入即可查,通过内存增量索引让数据写入便可查询;二是下采样或RDD,通过下采样或类似于RDD的操作减少数据量,而Druid在数据写入时就会对数据预聚合,进而减少原始数据量,节省存储空间并提升查询效率;三是可能会支持Schema less,在InfluxDB中,用户可任意增加tag,InfluxDB可对新增tag进行聚合查询,但Druid在这点上与InfluxDB略有差异,Druid需要预先定义Schema 。Druid的Schema数据打包在最后形成的数据文件中,数据文件按照时间分片,也就是说过去和未来数据的Schema可以不同,而不同schema的数据可以共存。所以,虽然Druid不是schema less的,但是Schema调整也是比较灵活。

另外,Druid作为一个OLAP数据库。OLAP数据库需要支持类似上卷、切块、切片、下钻等操作,但不适合明细查询。对于类似根据某主键ID定位唯一数据的任务,OLAP数据库并不能友好支持。常用的OLAP数据库实现方式以下几种:1)数据检索引擎,比如ES;2)预计算加KV存储实现,比如Kylin;3)SQL on Hadoop 引擎,比如 Presto、SparkSQL。

接下来,我们就以上中实现进行对比。首先是数据检索引擎的代表ES,ES可以存储结构化和非结构化数据,同时具备明细查询和聚合查询能力,由于其自身是一个数据检索引擎,其索引类型并不是针对聚合分析设计的,所以聚合查询方面开销较大;其次,ES不但要保存所有的原始数据,还需要生成较多的索引,所以存储空间开销会更大,数据的写入效率方面会比Druid差一些。

与ES相比,Druid只能处理结构化数据,因为它必须预定义Schema;其次,Druid会对数据进行预聚合以减少存储空间,同时对数据写入和聚合进行优化。但是,由于进行了预聚合,所以Druid抛弃掉了原始数据,导致其缺少原始明细数据查询能力。如果业务方有需求,可以关闭预聚合,但会丧失Druid的优势。

其次是预计算 + kv存储方式 ,KV存储需要通过预计算实现聚合,可以认为Key涵盖了查询参数,而值就是查询结果,由于直接从KV存储进行查询,所以速度非常快。缺点是因为需要在预计算中处理预设的聚合逻辑,所以损失了查询灵活性,复杂场景下的预计算过程可能会非常耗时,而且面临数据过于膨胀的情况;由于只有前缀拼配一种索引方式,所以在大数据量的复杂过滤条件下,性能下降明显;且缺少聚合下推能力。

与预计算+KV存储方式相比,Druid 是使用Bitmap索引的列式存储,查询速度肯定不如KV存储快; 但是由于使用内存增量索引,增量预聚合的模式,写入即可查,无需等待预计算生成Cube,所以实时性更强;其次,Druid可针对任意维度组合过滤、聚合,查询更加灵活;最后,Scatter & Gather模式支持一定的聚合下推。

最后是SQL on Hadoop, 这类引擎的SQL支持通常很强大,且无冗余数据,不需要预处理。缺点是因为其直接通过计算引擎对Hadoop上的文件进行操作,所以响应速度较慢且QPS相对较低。

与SQL on Hadoop方式相比,Druid的SQL支持有限,但在逐渐完善;必须预定义维度指标。其优势在于可达到亚秒级响应,并发较高。

二、Durid在滴滴的应用

Druid目前在滴滴使用规模大概为多个集群百余台机器,日原始数据写入量在千亿级别,日落盘数据在TB级别,数百实时数据源、千级实时写入任务,日查询量近千万级。主要承接业务有监控、实时报表,大屏展示等。

下图为滴滴实时业务监控案例:

u=2784587847,2969318735&fm=173&app=25&f=JPG?w=640&h=475&s=E0984C3A17DA75C8486584DB0300C0B3

我们的监控体系大概可以分为三层:顶层为业务监控,主要由业务方定义指标,然后配置相应的查询和报警。主要目的在于及时发现业务问题并告警;中层的监控体系是对各服务网关调用的监控日志,主要为了发现某业务问题造成的影响范围和具体影响对象;底层运维体系主要对网络、机器各方面指标进行监控。

之所以业务监控适用Druid,是因为业务指标通常具有较为复杂多变的业务逻辑。Druid本身是一个OLAP引擎,定义一个数据源就可衍生出众多聚合指标,所以很适合这类灵活查询的配置。

第二类应用是实时报表类应用(如下图),实时报表类应用主要用于运营数据分析,客户端网络性能分析以及客服应答实时统计等。这些用户通常是从Hive数据仓库迁移过来的,因为希望获得实时用户体验而Hive查询速度太慢,所以选择迁移。典型应用场景比如快速获取某下雨区域的用户单据,对用户进行优惠券投放进而刺激用户打车。

u=255204357,1146242818&fm=173&app=25&f=JPG?w=640&h=455&s=50187532D7DE65C8467599CF030080B3

第三类是大屏展示类应用(如下图),这类应用主要用于呈现业务性关键结果,通常是PV、UV或TOP N查询,非常适合Druid。

u=786807006,35117801&fm=173&app=25&f=JPG?w=640&h=477&s=3BC1E0008FBA3E8C7CF0E916030040E3

三、Druid平台化建设

在承接应用场景的过程中,我们做了很多平台化建设。简单啊介绍下平台化建设的背景: 业务数据主要来源是日志和binlog;公司统一数据通道是kafka;业务指标多样,逻辑复杂多变;Druid接入配置较复杂,除Schema配置外,还包括实时任务配置;数据进入Druid之前通常需要流计算处理,业务方自己开发既费时又很容易出现问题;Druid数据的对应关系以及数据源衍生指标链路较长,需要进行上下游关系梳理;由于Druid官方主要通过API查询,未提供数据可视化服务组件,因此业务方急需数据可视化相关服务组件。

在以上的背景下,我们构建了实时计算平台,架构图如下:

u=1298651380,803634241&fm=173&app=25&f=JPG?w=640&h=475&s=F398C0235A9E77C86E704143030080FA

底层引擎有三部分组成,流计算引擎包括Flink Streaming和Spark Streaming,存储部分主要依靠Druid;流计算引擎之上,我们主要开发了WebIDE和任务管理功能,WebIDE在Web内部集成,我们会为用户提供相应模板,用户根据相应的模板进行进行开发,即可形成自己的流计算任务,简化了一些逻辑较简单的常见ETL、Join任务的开发。任务完成后可通过任务管理平台直接提交,同时用户自己在本地开发的流计算任务也可以上传到平台,通过平台执行,平台对任务提供相应的指标检测。基于Druid引擎,配置 Druid数据源,通过数据源衍生指标,每一个指标其实就是对Druid的一个查询。用户可针对指标配置告警。上图右侧的DCube是一个拖拽式数据分析的前端工具,DSQL让用户可直接写SQL的方式输出Druid的即席查询能力。

根据配置的指标进行告警,分为两大类,一类是阈值告警;一类是模型告警。通常对规律性不太强的数值配置阈值告警,对规律性较强的指标配置模型告警。如滴滴每天的订单呼叫量基本上呈现一个早高峰、一个晚高峰,中间较平稳的状态。通常会选取过去一段时间的数据进行模型训练,由用户在得到的预测基线上设置置信区间。如果数据超过置信区间,就会报警。当然,也会存在一些较难处理的特殊情况,比如突然下雨、热门电影首映结束等导致的订单激增,需要额外去考虑一些情况。

u=1403659045,230423215&fm=173&app=25&f=JPG?w=640&h=483&s=3EAA78234B1475CAD268F4C00300F0B1

上图为基本工作流,整体工作流程为由MySQL的binlog和日志采集数据,形成原始topic,经过ETL或者多流Join进入清洗后的topic,此时用户可以使用平台提供的魔板功能或自行开发流计算任务,我们会为所有流计算任务定制一些默认的实时指标,这些指标和用户的业务数据都会在Druid中建立datasource。Druid也可通过Hive离线导入数据,离线数据源和实时数据源两部分组成了Druid的基本数据,之后根据基本数据构建业务指标、任务指标,完成报警配置。如果业务方具备一定开发能力,需要把数据接入到自己的系统,我们也会提供一些Open API。平台为用户提供了自助式的Druid数据源接入页面,极大简化地了Druid数据接入的复杂过程。

Druid查询采用100% SQL 的Web化配置。Druid原生查询是DSL,类似JSON格式,但学习成本较高。在支持SQL之后,除了部分用户自建服务外,平台所有查询全部迁移到SQL。

在平台化过程中,我们遇到了一些挑战:一是核心业务与非核心业务共享资源,存在一定风险;二是用户自助提交任务配置、查询不合理,造成异常情况,甚至影响整个集群的稳定性;三是随着业务的快速发展,Druid依赖的组件都需要热迁移到独立部署环境。在滴滴内部,由于Druid数据源基本都是用户自助接入,所以业务增长迅速,一年时间几乎涨了四倍,这对Druid依赖组件的热迁移提出了要求。

针对不同重要程度的业务共享资源问题,首先建设 Druid集群的异地双活机制,核心数据源的集群级双活。其次,通过统一网关对用户屏蔽多集群细节,同时根据用户身份进行查询路由,实现查询资源隔离。最后,进行业务分级,核心业务进行集群级双活,对查询资源需求较大但不过分要求实时性的业务分配独立的查询资源组,其他用户使用默认资源池。

u=3684987798,514022361&fm=173&app=25&f=JPG?w=640&h=480&s=1C867D33111E75CA5CC100CC0200C0B3

上图为基本架构图,首先我们会有多个查询节点分布在不同集群,非核心数据源单写到公共集群,核心数据源双写到两个集群,用户使用身份验证key通过网关路由进行查询。

针对用户配置与查询不合理造成的异常,我们主要做了以下三点:一是引擎层面进行bad case防范,例如,druid数据时间字段设置合理的时间窗口限制,如果数据时间范围异常,我们就会对它进行抛弃;二是对Druid原生API进行封装,提供更加合理的默认配置,主要针对实时任务时长、任务数量以及内存进行配置;三是完善指标监控体系与异常定位手段,保证捕捉到异常查询。Druid和网关日志通常会通过流计算任务进行处理,然后把它们分别写入Druid和ES,数值指标会上报到Graphite,通过Grafana进行展示,综合利用 Druid的聚合分析能力与和ES的明细查询能力定位异常。

针对依赖组件的热迁移问题,Druid主要依赖的组件有三个:ZooKeeper、MySQL、HDFS。在过去一年,滴滴完成了三大组件的迁移,主要过程如下:

1、ZooKeeper迁移原理:扩容-集群分裂-缩容

u=62835724,1100744389&fm=173&app=25&f=JPG?w=640&h=382&s=6C8A5D32114A55494AE0B1CA000090B1

在滴滴内部,ZK原来是与其他业务共用的,我们需要保证其他业务和Druid都不停服的情况下,把Druid的ZK集群单独迁移出来,所以我们采用了上述迁移方案。核心思路就是先扩容,随后利用两套集群配置,触发集群分裂,最后缩容掉不需要的节点。如图4所示,这七台ZK配置其实有两套,第一套是12347五台,第二套是567三台,但它们的leader都是ZK7,此时7个节点同属一个集群。当重启ZK7之后,两套配置的ZK节点会分别独立选取leader,此时进行集群分裂变成两个单独的ZK集群。

2、MySQL热迁移实践

u=2982821302,2396993739&fm=173&app=25&f=JPG?w=640&h=479&s=3A0A7423DF9E74C81465CCD00200C0B3

我们主要使用Kafka-indexing-service作为实时数据写入方式。在实时任务执行过程中,会元数据不断更新到MySQL中的。要想对Mysql进行迁移,首先需要保证元数据在迁移过程中不变,因此首先要了解该数据写入流程。在Kafka-indexing-service的实时任务执行过程中,元数据更新主要来自于实时任务执行状态的变化和数据消费相关的部分API调用。实时任务的生命周期包括读取数据和发布数据两个过程,读取数据是从kafka读取,在内存中建立增量索引;发布数据过程,就是把实时节点消费的数据下推到历史节点,其实是通过首先写到HDFS,然后再由历史节点加载。只有在实时任务状态发生改变时,才会产生元数据更新操作。因此,我们开发了实时任务状态冻结API,把所有实时任务的生命周期都冻结在数据reading的状态,此时进行MySQL迁移,迁移完成之后,重新启动Overlord(一个管理实时任务的节点,所有实时任务的数据状态变化都由Overlord触发),实时任务就会继续进行生命周期迭代。

3、HDFS迁移实践

u=2855690947,363320836&fm=173&app=25&f=JPG?w=640&h=478&s=039660231B0E64CA447450C6030080B3

刚才提到了HDFS在数据发布流程中的作用,数据发布就是指实时节点消费到的数据下推到历史节点的过程。下推的方式就是先推到HDFS,再由历史节点从HDFS拉取。Master节点会告诉历史节点那些数据是需要其拉取的,而从哪里拉取则是从元数据存储中获得的,我们需要做的是保证历史节点可以从两个HDFS读取数据,同时滚动重启实时节点,保证增量数据写到新的HDFS,历史节点从新的HDFS拉取数据。对于存量数据我们只需要更改元数据里历史数据的路径,就可以无缝替换原有HDFS。

接下来简单介绍Druid平台化建设性能优化部分的工作。首先简单介绍下Druid的三种数据写入方式:

1、Standalone Realtime Node

该模式属于单机数据消费,失败后无法恢复,且由于其内部实现机制,该模式无法实现HA,这种方式官方也不推荐使用。

2、Tranquility + indexing-service

该方式主要通过Tranquility 进程消费数据,把数据主动push给indexing- service服务,优势是Tranquility 进程可以帮助管理数据副本、数据实时任务以及生命周期等;其缺点是如果所有副本全部任务失败,无法恢复数据;必须设置数据迟到容忍窗口,该窗口与任务时长挂钩,由于任务时长有限,因此很难容忍长时间数据迟到。

3、Kafka-indexing-service

这是Druid比较新的写入方式,其优势主要体现在数据可靠性及任务恢复方面,弥补了前两种方式的不足。其缺点是数据消费过于依赖Overlord服务,Overlord单机性能将会成为集群规模瓶颈 ;由于Segment与Kafka topic的partition关联,因此容易造成元数据过度膨胀,进而引发性能问题。最终,我们选择Kafka-indexing-service 模式,并开始解决该模式存在的问题。

该写入方式面临的问题经过定位主要有以下三个:一是MySQL查询性能问题, Overlord提供的多个API需要直接对MySQL进行操作,所以MySQL查询性能直接影响Overlord并发水平;二是Druid里所有数据都是JSON存储格式,反序列化非常耗时;三是 Druid中实时任务状态都通过ZK发布,Overlord会监听ZK上面的节点,而这些监听器的回调线程执行函数中会涉及一些MySQL操作,当MySQL比较繁忙时, ZK watch回调单线程模型导致事件处理需要排队。

针对查询性能瓶颈,我们主要针对Druid元数据存储索引进行了优化。通过对Segment定时Merge,合理设置数据生命周期来合并精简元数据; 对Druid数据库连接池DBCP2参数进行优化。针对反序列化与watch回调问题,我们主要对Druid任务管理体系进行了较大修改,进行了Overlord Federation改造,引入namespace概念,每一个namespace下的任务又单独的Overlord管理。这样增加Overlord水平扩展能力,同时亦可做Overlord级别的资源隔离。

四、展望

目前Druid数据消费能力依赖Kafka topic的partition,未来我们希望引入流计算引擎提升单partition消费能力,解耦对Kafka topic partition的依赖,对数据消费和数据处理进行不同的并发度配置。其次,Overlord大量服务涉及对MySQL的直接操作,易导致单机性能瓶颈,后续将会对高并发服务进行内存化改造。数据下推到HDFS之后,Coordinator(Druid里数据分配角色)决定数据的加载位置,这是一个单线程运行模型,当Overload水平扩展之后,每个时间点产生的Segment数量会有很大提升,Coordinator任务处理单线程模型需要优化。最后, 我们希望把Druid部分组件On-yarn,进而提升资源利用率并简化运维操作。

转载于:https://my.oschina.net/hblt147/blog/3005235

druid安装与案例相关推荐

  1. MySQL 系列(一) 生产标准线上环境安装配置案例及棘手问题解决

    MySQL 系列(一) 生产标准线上环境安装配置案例及棘手问题解决 一.简介 MySQL是最流行的开放源码SQL数据库管理系统,它是由MySQL AB公司开发.发布并支持的.有以下特点: MySQL是 ...

  2. Py之lulu:lulu库的简介、安装、案例应用之详细攻略

    Py之lulu:lulu库的简介.安装.案例应用之详细攻略 目录 lulu库的简介 1.支持的站点 lulu库的装 lulu库的案例应用 1.下载音乐 2.下载视频 lulu库的简介 通过该库可下载各 ...

  3. Py之interpret:interpret的简介、安装、案例应用之详细攻略

    Py之interpret:interpret的简介.安装.案例应用之详细攻略 目录 interpret的简介 1.可解释性在ML过程中的重要作用 interpret的安装 interpret的案例应用 ...

  4. ansible 安装 mysql 案例记录

    文章目录 ansible 安装 mysql 案例记录 ansible 安装 mysql 案例记录 本次案例参考 二进制安装mysql 以下 yml 文件仍有很多不完善的地方,后续再继续修改 --- - ...

  5. Dreamweaver:Dreamweaver软件的界面简介、安装、案例应用之详细攻略

    Dreamweaver:Dreamweaver软件的界面简介.安装.案例应用之详细攻略 目录 Dreamweaver软件的简介 Dreamweaver软件的安装 Dreamweaver软件的界面简介 ...

  6. SQL Server 2012安装错误案例:Error while enabling Windows feature: NetFx3, Error Code: -2146498298...

    案例环境: 服务器环境 :    Windows Server 2012 R2 Standard 数据库版本 :    SQL Server 2012 SP1 案例介绍: 在Windows Serve ...

  7. Proxmox VE(PVE)+ceph+物理网络规划-超融合生产环境安装部署案例

    1 Proxmox Virtual Environment介绍 Proxmox VE 是用于企业虚拟化的开源服务器管理平台.它在单个平台上紧密集成了KVM虚拟机管理程序和LXC,软件定义的存储以及网络 ...

  8. CentOS下Druid安装详解

    中文地址 http://www.apache-druid.cn/ 1. 介绍 当前市面上主流的大数据实时分析数据库很多,我们为什么选择Apache Druid?我们先做个对比: Apache Drui ...

  9. ubuntu-hadoop配置安装-简单案例以及伪分布式搭建

    文章目录 linux 下创建 hadoop用户 hadoop的安装与环境配置 先卸载原有的 JDK 安装hadoop和JDK 安装JDK 安装hadoop Standalone Operation 模 ...

最新文章

  1. 自然语言推理和数据集
  2. 虚拟机linux 8.04汉化,在虚拟机中快速安装 Ubuntu 18.04
  3. 《机器学习》、《算法数据结构》、《LeetCode原创题解》开放下载!
  4. Fedora中安装 Shutter步骤介绍
  5. DIY穷人版谷歌眼镜,自定义手势操控,树莓派再一次被开发新玩法
  6. ITK:创建一个大小Size
  7. Java程序员:不要因未知而让云成本大涨
  8. linux 运维视频集合
  9. php的foreach
  10. 牛客14342 神奇的数字
  11. RabbitMQ的5种队列_路由模式_入门试炼_第8篇
  12. Java 200+ 面试题补充② Netty 模块
  13. 10年老分析师:数据分析不只是一个岗位,更是一种职场必备能力
  14. 使用HBuilderX将H5网页打包成APP
  15. [导入]代理猎手找大学代理
  16. 老泪纵横!伴随数代人成长的中国经典动画
  17. ckpt2npy和npy2ckpt转换
  18. 利用PPT制作不一样的动态文字技巧
  19. 分类问题——逻辑回归
  20. [深度学习]动手学深度学习笔记-11

热门文章

  1. 北洋 BTP-R150 打印机驱动
  2. matlab 端点检测 能零比法_端点检测方法
  3. Vue2Editor 中文API
  4. 将输入的单词按首字母排序
  5. 快速检测npm registry镜像网址下载的速度
  6. leetcode简单之613.直线上的最近距离
  7. 如何更改台式计算机屏显时间,电脑屏幕熄灭时间_电脑锁屏怎么设置时间
  8. docker网络连接——docker network connect命令
  9. MATLAB 3D玫瑰花绘制(内附旋转版本)
  10. 服务器安装与维护,服务器安装与维护 PPT课件