简介

Schema的定义,Druid摄入数据规范的核心是dataSchema,dataSchema定义了如何解析输入的数据,并将数据存储到Druid中。 ingestion spec 由三个部分组成:

{"dataSchema" : {...},"ioConfig" : {...},"tuningConfig" : {...}
}
字段 类型 描述 是否必须
dataSchema JSON Object 指定传入数据的Schema。所有Ingestion Spec都可以共享相同的dataSchema。
ioConfig JSON Object 指定数据的来源和去向。此对象将随摄取方法而变化。
tuningConfig JSON Object 指定如何调整各种摄取参数。此对象将随摄取方法而变化。

1.dataSchema

首先我们创建一个json的文件:kafka-index-day-roll-up.json,在该文件中添加空dataSchema;

"dataSchema" : {}

2.DataSource name

DataSource name指定,数据源名称由dataSchema中的datasource参数指定,在这里我们叫做kafka_to_druid,可以看作是数据库的表名;

"dataSchema" : {"dataSource" : "kafka_to_druid",
}

3.parser-解释器

dataSchema中有一个parser这个字段,它是解释输入数据的解析器,上面的案例中我们使用的是JSON格式的字符串,因此我们使用JSON格式的字符串解释器解析数据。

如果type未包含,则解析器默认为string

如果format未包含,则parseSpec默认为tsv

"dataSchema" : {"dataSource" : "kafka_to_druid","parser" : {"type" : "string","parseSpec" : {"format" : "json"}}}

4.Time column - 时间列

解释器parser需要知道数据中每条数据的产生时间(main timestamp),这个时间戳需要定义在 timestampSpec中。数据中有一列ts就是我们所需要的timestamp,因此我们将带有该信息的timestampSpec 添加到parseSpec中。

字段 类型 描述 是否必须
column String 时间戳的列。
format String iso,posix,millis,micro,nano,auto或任何Joda time格式。 否(默认== ‘auto’)
"dataSchema" : {"dataSource" : "kafka_to_druid","parser" : {"type" : "string","parseSpec" : {"format" : "json","timestampSpec" : {"format" : "auto","column" : "ts"}}}}

5.Column types

上面我们已经定义了time的列,接下来我们定义其它列的类型。Druid支持的column types: String, Long, Float, Double.我们将在接下来的小节中讨论以及如何使用它们。在我们去定义非时间序列之前,我们首先来讨论一下rollup。

6.Rollup

druid在通过roll-up处理后,会将原始数据在注入的时候就开始进行汇总处理。roll-up是在数据存储到segment之前进行的第一层聚合操作。

  1. 如果rollup设置成true,这个时候就需要我们把输入的columns进行分为两类,维度(dimensions)和度量(metrics).dimensions是我们进行group的时候需要的列,metrics是我们进行聚合时需要的列。
  2. 如果rollup设置成false,这个时候我们会将输入的所有columns当做dimensions处理,并且没有预聚合的发生。
"dataSchema" : {"dataSource" : "kafka_to_druid","parser" : {"type" : "string","parseSpec" : {"format" : "json","timestampSpec" : {"format" : "auto","column" : "ts"}}},"granularitySpec" : {"rollup" : true}}

7.选择dimension和metrics

①在上面给到的数据集中,很明显的就可以区分开 dimensions 和 metrics。

Dimensions: startIP |  startPort | endIP  | endPort | protocol
Metrics: packets | bytes | costTime

②接下来我们如何在摄入数据规范中定义这些 dimensions列 和 metrics列呢?Dimensions:使用dimensionsSpec在parseSpec中指定。

"dataSchema" : {"dataSource" : "kafka_to_druid","parser" : {"type" : "string","parseSpec" : {"format" : "json","timestampSpec" : {"format" : "auto","column" : "ts"},"dimensionsSpec" : {"dimensions": ["startIP",{ "name" : "startPort", "type" : "long" },{ "name" : "endIP", "type" : "string" },{ "name" : "endPort", "type" : "long" },{ "name" : "protocol", "type" : "string" }]}   }},"metricsSpec" : [{ "type" : "count", "name" : "count" },{ "type" : "longSum", "name" : "packets", "fieldName" : "packets" },{ "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },{ "type" : "doubleSum", "name" : "costTime", "fieldName" : "costTime" }],"granularitySpec" : {"rollup" : true}}

注:每个维度都有一个name 和 type,type的类型可能是:"long", "float", "double", "string"。我们注意到startIP这个"string"类型的维度,它仅仅只需要指定名字就可以了。

③.在druid中,string 类型是默认的。除此之外,我们注意一下protocol是一个数值型的。但是我们定义的时候将其定义为 string。Druid会强制将该类型进行转换。Metrics:使用metricsSpec 在dataSchema中指定。

"dataSchema" : {"dataSource" : "kafka_to_druid","parser" : {"type" : "string","parseSpec" : {"format" : "json","timestampSpec" : {"format" : "auto","column" : "ts"},"dimensionsSpec" : {"dimensions": ["startIP",{ "name" : "startPort", "type" : "long" },{ "name" : "endIP", "type" : "string" },{ "name" : "endPort", "type" : "long" },{ "name" : "protocol", "type" : "string" }]}   }},"metricsSpec" : [{ "type" : "count", "name" : "count" },{ "type" : "longSum", "name" : "packets", "fieldName" : "packets" },{ "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },{ "type" : "doubleSum", "name" : "costTime", "fieldName" : "costTime" }],"granularitySpec" : {"rollup" : true}}

注:当我们定义metric时,有必要指定在rollup期间对该列执行的聚合类型。我们将packets和bytes定义成long sum聚合操作,costTime定义成double sum聚合操作。 metricsSpec的嵌套级别与dimensionSpec或parseSpec不同,它和dataSchema属于同一嵌套级别。除此,我们还定义了一个count聚合操作器,它会在rollup过程中,记录输入的数据量总共有多少。支持的聚合器类型详情点击link

8.不使用rollup

如果不适用roolup所有输入的colums都被当做"dimensions",不再区分"dimensions" 和"metrics"。

"dimensionsSpec" : {"dimensions": ["startIP",{ "name" : "startPort", "type" : "long" },{ "name" : "endIP", "type" : "string" },{ "name" : "endPort", "type" : "long" },{ "name" : "protocol", "type" : "string" },{ "name" : "packets", "type" : "long" },{ "name" : "bytes", "type" : "long" },{ "name" : "startPort", "type" : "double" }]
}

9.Define Granularities-粒度的定义。

接下来还有一些其他的属性需要在granularitySpec中设置,granularitySpec支持2中类型(type):uniform和arbitrary。在这里,我们使用uniform这种类型,这会使所有的segment都有统一的间隔大小(比如:每个segment都保存一个小时内的值)。

①segment granularity这个属性是指一个segment应该包含多大时间间隔的数据,可以是: DAY, WEEK,HOUR , MINUTE...... 在这里,我们制定segment的粒度是HOUR。

"dataSchema" : {"dataSource" : "kafka_to_druid","parser" : {"type" : "string","parseSpec" : {"format" : "json","timestampSpec" : {"format" : "auto","column" : "ts"},"dimensionsSpec" : {"dimensions": ["startIP",{ "name" : "startPort", "type" : "long" },{ "name" : "endIP", "type" : "string" },{ "name" : "endPort", "type" : "long" },{ "name" : "protocol", "type" : "string" }]}      }},"metricsSpec" : [{ "type" : "count", "name" : "count" },{ "type" : "longSum", "name" : "packets", "fieldName" : "packets" },{ "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },{ "type" : "doubleSum", "name" : "costTime", "fieldName" : "costTime" }],"granularitySpec" : {"type" : "uniform","segmentGranularity" : "HOUR","rollup" : true}}

②.query granularity:查询的粒度通过queryGranularity配置在granularitySpec中,在这里我们使用minute粒度。

"dataSchema" : {"dataSource" : "realtime_kafka_to_druid","parser" : {"type" : "string","parseSpec" : {"format" : "json","timestampSpec" : {"format" : "auto","column" : "ts"},"dimensionsSpec" : {"dimensions": ["startIP",{ "name" : "startPort", "type" : "long" },{ "name" : "endIP", "type" : "string" },{ "name" : "endPort", "type" : "long" },{ "name" : "protocol", "type" : "string" }]}      }},"metricsSpec" : [{ "type" : "count", "name" : "count" },{ "type" : "longSum", "name" : "packets", "fieldName" : "packets" },{ "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },{ "type" : "doubleSum", "name" : "costTime", "fieldName" : "costTime" }],"granularitySpec" : {"type" : "uniform","segmentGranularity" : "HOUR","queryGranularity" : "MINUTE""rollup" : true}}

③.Define an interval:定义时间间隔,在这个时间间隔之外的数据将不会被处理。注意,这个参数设置只在批处理中(batch)。interval需要在 granularitySpec中指定。

"granularitySpec" : {"intervals" : ["2019-01-17/2019-01-18"]
}

10.定义输入数据的数据源

IOConfig规范根据摄取任务类型而有所不同。

  1. 本机批量摄取:请参阅Native Batch IOConfig
  2. Hadoop Batch ingestion:请参阅Hadoop Batch IOConfig
  3. Kafka Indexing Service:请参阅Kafka Supervisor IOConfig
  4. Stream Push Ingestion:使用Tranquility进行Stream Push不需要IO配置
  5. Stream Pull Ingestion(已弃用):请参阅Stream pull ingestion

11.tuningConfig-额外的配置

每个摄入任务都有一个tuningConfig部分,让开发人员自行配置。在这里根据输入的数据源kafka来进行配置tuningConfig。type索引任务类型,此处是kafka 。reportParseExceptions默认是false,如果开启这个功能,当摄入数据过程中出现数据异常将会导致摄入数据停止。

"tuningConfig": {"type": "kafka","reportParseExceptions": false
}

提交我们的task,然后查询数据。

  • 1.需要在Overlord节点执行:

    curl -X 'POST' -H 'Content-Type:application/json' -d @quickstart/kafka-druid/kafka-index-day-roll-up.json http://host1:8090/druid/indexer/v1/supervisor
    

    2.此刻开启程序,往kafka的topic=druid-topic-book中发送数据,此代码不做重点。

    3.上面的步骤执行完之后,我们可以查看druid最终存入的数据。需要在broker节点执行。

    ①.rollup-select-sql.json内容,注意查询的DataSource名称

    {"query":"select * from \"realtime_kafka_to_druid\""
    }
    

    ② 执行

     curl -X 'POST' -H 'Content-Type:application/json' -d @rollup-select-sql.json http://host2:8082/druid/v2/sql
    

    扫一扫加入大数据公众号和技术交流群,了解更多大数据技术,还有免费资料等你哦

    扫一扫加入大数据公众号和技术交流群,了解更多大数据技术,还有免费资料等你哦

    扫一扫加入大数据公众号和技术交流群,了解更多大数据技术,还有免费资料等你哦

Druid实战--摄入数据规范Ingestion Spec相关推荐

  1. Druid实战--数据摄入案例

    Durid摄入数据的方式 通过imply的UI界面配置摄入数据规范Ingestion Spec 手动创建摄入数据规范Ingestion Spec,通过http请求执行 说明:下面以摄入kafka的数据 ...

  2. 快速了解Druid -- 实时大数据分析软件

    Druid 是什么 Druid 单词来源于西方古罗马的神话人物,中文常常翻译成德鲁伊.  本问介绍的Druid 是一个分布式的支持实时分析的数据存储系统(Data Store).美国广告技术公司Met ...

  3. R语言数据描述性统计(Descriptive statistics)实战:数据全局描述信息、数值数据的描述性统计(Numerical data)、离散型数据的描述性统计(Categorical)

    R语言数据描述性统计(Descriptive statistics)实战:数据全局描述信息.数值数据的描述性统计(Numerical data).离散型数据的描述性统计(Categorical) 目录

  4. Keras图像分割实战:数据整理分割、自定义数据生成器、模型训练

    Keras图像分割实战:数据整理分割.自定义数据生成器.模型训练 目录 Keras图像分割实战:数据整理分割.自定义数据生成器.模型训练

  5. HTTP API响应数据规范整理

    2019独角兽企业重金招聘Python工程师标准>>> 关于作者 马隆博(Lenbo Ma),Java,Javascript Blog: http://mlongbo.com E-M ...

  6. Tableau实战系列数据连接及数据准备

    前言 Tableau实战系列数据连接及数据准备 使用多个表存储数据提取数据 你可以将数据提取配置为将其数据存储在多个表中.将数据提取数据存储在多个表中可以 潜在地改善性能,并帮助缩小文件大小. 新的受 ...

  7. 数据连接池druid 和 大数据框架druid

    叫druid的有两个开源项目. 一个是: Druid是一个JDBC组件,它包括三部分:  DruidDriver 代理Driver,能够提供基于Filter-Chain模式的插件体系.  DruidD ...

  8. 赠送300家门店260亿销售额的零售企业Power BI实战示例数据

    焦棚子的文章目录 一背景 2022年即将到来之际,笔者准备在Power BI中做一个实战专题,作为实战专题最基础的就是demo数据,于是我们赠送大家一个300家门店,260亿+销售额,360万行+的零 ...

  9. js循环出来的数据补全_加推实战之数据预测

    加推实战之数据预测 ❝ 从数据中发现隐藏在背后的规律,形成知识? ❞ 从需求说起 对活动或成交数据进行未来一段周期的预测 分析并寻求方案 关于预测马上可以想到的是回归 现有的几个拟合方法可以立刻用上? ...

  10. 实战互联网公司数据存储解决方案

    实战互联网公司数据存储解决方案 参考文章: (1)实战互联网公司数据存储解决方案 (2)https://www.cnblogs.com/tanhualang/p/9283576.html 备忘一下.

最新文章

  1. 二极管7种应用电路详解之六
  2. 野指针出现的三种情况
  3. 用户 'sa' 登录失败。原因: 未与信任 SQL Server 连接相关联。
  4. linux nginx 安装stream,Centos7下Nginx简单搭建与stream模块简单配置
  5. OpenCV删除面积小的区域 实现图像二值化分割 标记连通区域
  6. solr 配置中文分词器
  7. XMPP 扎金花各种网络传送之音频发送
  8. Factory Method (工厂模式)
  9. python开发接口故障码_Python代码样例
  10. python爬虫教程-Python教父|廖雪峰老师官方爬虫教程,13个案例带你全面入门!
  11. indesign教程,如何在对象周围环绕文本?
  12. 冒泡排序和快速排序的区别
  13. DOS批处理文件加密文件夹
  14. 安装Ubuntu的几十次坑的经验
  15. 苹果微信换行怎么打_苹果手机微信怎么加密,教你几招快速加密
  16. 工程师的基本功是什么?如何练习?—学习心得分享
  17. java入门基础(四)
  18. 如何在Python中安装NumPy
  19. mysql 报错“发生系统错误 3.系统找不到指定路径”
  20. Kaggle注册以及问题解决

热门文章

  1. Objective C Bridging header —— swift MD5
  2. Laravel 使用Dingo API
  3. GlusterFS(上)
  4. emmc5.1, ufs2.0, ufs3.0
  5. 从游戏中学习产品设计05:反馈篇
  6. [BZOJ2177][最小/最大(曼哈顿距离)生成树]曼哈顿最小生成树
  7. 【运维】服务器的初步认识
  8. windows配置本地hosts
  9. python实现蜂鸣器演奏两只老虎
  10. 茶叶文化网站设计与实现 HTML+CSS学生网页设计作业源码