目录

  • 1. 测试数据
  • 2. 定义dataSchema
    • 2.1 指定数据源名称
    • 2.2 指定__time时间戳
    • 2.3 定义Rollup相关
      • 2.3.1 "rollup" : true
      • 2.3.2 "rollup" : false
  • 2.4 定义granularitySpec
  • 3. 定义task type
  • 4. 定义数据输入源
  • 5. 其它优化参数
  • 6. 提交任务
  • 7. 查询结果数据

1. 测试数据

我们要插入的测试是一批网络流数据,如下所示:

{"ts":"2018-01-01T01:01:35Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":2000, "dstPort":3000, "protocol": 6, "packets":10, "bytes":1000, "cost": 1.4}
{"ts":"2018-01-01T01:01:51Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":2000, "dstPort":3000, "protocol": 6, "packets":20, "bytes":2000, "cost": 3.1}
{"ts":"2018-01-01T01:01:59Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":2000, "dstPort":3000, "protocol": 6, "packets":30, "bytes":3000, "cost": 0.4}
{"ts":"2018-01-01T01:02:14Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":40, "bytes":4000, "cost": 7.9}
{"ts":"2018-01-01T01:02:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":50, "bytes":5000, "cost": 10.2}
{"ts":"2018-01-01T01:03:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":60, "bytes":6000, "cost": 4.3}
{"ts":"2018-01-01T02:33:14Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8", "srcPort":4000, "dstPort":5000, "protocol": 17, "packets":100, "bytes":10000, "cost": 22.4}
{"ts":"2018-01-01T02:33:45Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8", "srcPort":4000, "dstPort":5000, "protocol": 17, "packets":200, "bytes":20000, "cost": 34.5}
{"ts":"2018-01-01T02:35:45Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8", "srcPort":4000, "dstPort":5000, "protocol": 17, "packets":300, "bytes":30000, "cost": 46.3}

将其保存到quickstart/tutorial/ingestion-tutorial-data.json文件中

2. 定义dataSchema

ingestion spec中最核心的是dataSchema,定义了如何将输入数据解析为一组列

在quickstart/tutorial目录下新建文件ingestion-tutorial-index.json,在其中添加一个空的dataSchema,内容如下:

[root@bigdata001 apache-druid-0.22.1]# cat quickstart/tutorial/ingestion-tutorial-index.json
"dataSchema" : {}
[root@bigdata001 apache-druid-0.22.1]#

2.1 指定数据源名称

通过dataSource指定

[root@bigdata001 apache-druid-0.22.1]# cat quickstart/tutorial/ingestion-tutorial-index.json
"dataSchema" : {"dataSource" : "ingestion-tutorial"
}
[root@bigdata001 apache-druid-0.22.1]#

2.2 指定__time时间戳

通过timestampSpec指定__time时间戳。通过column定义__time时间戳的来源时间字段,通过format定义来源时间字段的时间类型

[root@bigdata001 apache-druid-0.22.1]# cat quickstart/tutorial/ingestion-tutorial-index.json
"dataSchema" : {"dataSource" : "ingestion-tutorial","timestampSpec" : {"column" : "ts","format" : "iso"}
}
[root@bigdata001 apache-druid-0.22.1]#

2.3 定义Rollup相关

rollup定义在granularitySpec下面

  1. 如果启用了rollup,输入字段分为两类,“dimensions"和"metrics”。“dimensions” 是用于rollup的聚合字段,"metrics"是计算的指标字段
  2. 如果关闭了rollup,则所有字段都是"dimensions",不会发生预聚合

2.3.1 “rollup” : true

本示例,我们开启rollup

[root@bigdata001 apache-druid-0.22.1]# cat quickstart/tutorial/ingestion-tutorial-index.json
"dataSchema" : {"dataSource" : "ingestion-tutorial","timestampSpec" : {"column" : "ts","format" : "iso"},"granularitySpec" : {"rollup" : true}
}
[root@bigdata001 apache-druid-0.22.1]#

dimension和Metrics的字段划分

  • dimension:srcIP、srcPort、dstIP、dstPort、protocol
  • Metrics:packets、bytes、cost

dimension的指定

dimensions定义在dimensionsSpec下面

[root@bigdata001 apache-druid-0.22.1]# cat quickstart/tutorial/ingestion-tutorial-index.json
"dataSchema" : {"dataSource" : "ingestion-tutorial","timestampSpec" : {"column" : "ts","format" : "iso"},"granularitySpec" : {"rollup" : true},"dimensionsSpec" : {"dimensions" : ["srcIP",{"name" : "srcPort", "type" : "long"},{"name" : "dstIP", "type" : "string"},{"name" : "dstPort", "type" : "long"},{"name" : "protocol", "type" : "string"}]}
}
[root@bigdata001 apache-druid-0.22.1]#
  • 字段类型可以是long、float、double、string
  • 如果字段类型是string类型,可以只指定字段名称,因为默认的字段类型就是string。如"srcIP"
  • protocol字段在数据文件中是数值类型,本应定义为long类型,但这里我们定义为string类型。会强制将long类型转换为string类型
  • 数据文件中的字段为数值类型,定义为数值类型可以减少磁盘空间,降低处理开销,Metrics字段推荐用数值类型。定义为string类型可以使用位图索引,dimension字段推荐用string类型

Metrics的指定

使用metricsSpec进行定义

[root@bigdata001 apache-druid-0.22.1]# cat quickstart/tutorial/ingestion-tutorial-index.json
"dataSchema" : {"dataSource" : "ingestion-tutorial","timestampSpec" : {"column" : "ts","format" : "iso"},"granularitySpec" : {"rollup" : true},"dimensionsSpec" : {"dimensions" : ["srcIP",{"name" : "srcPort", "type" : "long"},{"name" : "dstIP", "type" : "string"},{"name" : "dstPort", "type" : "long"},{"name" : "protocol", "type" : "string"}]},"metricsSpec" : [{"type" : "count", "name" : "count"},{"type" : "longSum", "name" : "packets", "fieldName" : "packets"},{"type" : "longSum", "name" : "bytes", "fieldName" : "bytes"},{"type" : "doubleSum", "name" : "cost", "fieldName" : "cost"}]
}
[root@bigdata001 apache-druid-0.22.1]#

这里我们定义了一个count聚合字段,用来计算每个分组中,原始数据有多少行数据

2.3.2 “rollup” : false

如果Rollup为false,所有字段都定义在dimensionsSpec。例子如下:

"dataSchema" : {"dataSource" : "ingestion-tutorial","timestampSpec" : {"column" : "ts","format" : "iso"},"granularitySpec" : {"rollup" : false},"dimensionsSpec" : {"dimensions" : ["srcIP",{"name" : "srcPort", "type" : "long"},{"name" : "dstIP", "type" : "string"},{"name" : "dstPort", "type" : "long"},{"name" : "protocol", "type" : "string"},{"name" : "packets", "type" : "long"},{"name" : "bytes", "type" : "long"},{"name" : "srcPort", "type" : "double"}]}
}

2.4 定义granularitySpec

  • type的可选值有uniform、arbitrary

    1. uniform:所有segment都有统一的时间间隔,例如所有segment都包含一小时的数据
  • segmentGranularity:指定每个segment的时间间隔大小,例如:HOUR、DAY、WEEK
  • queryGranularity:指定segment中__time字段的保存粒度,也称bucketing粒度
  • intervals:对于批处理任务,不在该intervals时间范围内的数据,不会被保存
[root@bigdata001 apache-druid-0.22.1]# cat quickstart/tutorial/ingestion-tutorial-index.json
"dataSchema" : {"dataSource" : "ingestion-tutorial","timestampSpec" : {"column" : "ts","format" : "iso"},"granularitySpec" : {"rollup" : true,"type" : "uniform","segmentGranularity" : "HOUR","queryGranularity" : "MINUTE","intervals" : ["2018-01-01/2018-01-02"]},"dimensionsSpec" : {"dimensions" : ["srcIP",{"name" : "srcPort", "type" : "long"},{"name" : "dstIP", "type" : "string"},{"name" : "dstPort", "type" : "long"},{"name" : "protocol", "type" : "string"}]},"metricsSpec" : [{"type" : "count", "name" : "count"},{"type" : "longSum", "name" : "packets", "fieldName" : "packets"},{"type" : "longSum", "name" : "bytes", "fieldName" : "bytes"},{"type" : "doubleSum", "name" : "cost", "fieldName" : "cost"}]
}
[root@bigdata001 apache-druid-0.22.1]#
  • 我们的数据时间有1点和2点的,所以会分成两个segment
  • __time会被保存为分钟粒度,比如"2018-01-01T01:01:35Z"保存为"2018-01-01T01:01:00Z"

3. 定义task type

  • dataSchema在所有task type中都一样
  • 但每个task type的其它部分定义的format不一样

本示例,我们使用本地batch数据插入spec,从本地文件读取数据

{"type" : "index_parallel","spec" : {"dataSchema" : {"dataSource" : "ingestion-tutorial","timestampSpec" : {"column" : "ts","format" : "iso"},"granularitySpec" : {"rollup" : true,"type" : "uniform","segmentGranularity" : "HOUR","queryGranularity" : "MINUTE","intervals" : ["2018-01-01/2018-01-02"]},"dimensionsSpec" : {"dimensions" : ["srcIP",{"name" : "srcPort", "type" : "long"},{"name" : "dstIP", "type" : "string"},{"name" : "dstPort", "type" : "long"},{"name" : "protocol", "type" : "string"}]},"metricsSpec" : [{"type" : "count", "name" : "count"},{"type" : "longSum", "name" : "packets", "fieldName" : "packets"},{"type" : "longSum", "name" : "bytes", "fieldName" : "bytes"},{"type" : "doubleSum", "name" : "cost", "fieldName" : "cost"}]}}
}

4. 定义数据输入源

在ioConfig中定义:

  • ioConfig也需要指定task type
  • 同时定义inputSource指定数据的来源
  • 定义inputFormat指定文件的数据格式
[root@bigdata001 apache-druid-0.22.1]# cat quickstart/tutorial/ingestion-tutorial-index.json
{"type" : "index_parallel","spec" : {"dataSchema" : {"dataSource" : "ingestion-tutorial","timestampSpec" : {"column" : "ts","format" : "iso"},"granularitySpec" : {"rollup" : true,"type" : "uniform","segmentGranularity" : "HOUR","queryGranularity" : "MINUTE","intervals" : ["2018-01-01/2018-01-02"]},"dimensionsSpec" : {"dimensions" : ["srcIP",{"name" : "srcPort", "type" : "long"},{"name" : "dstIP", "type" : "string"},{"name" : "dstPort", "type" : "long"},{"name" : "protocol", "type" : "string"}]},"metricsSpec" : [{"type" : "count", "name" : "count"},{"type" : "longSum", "name" : "packets", "fieldName" : "packets"},{"type" : "longSum", "name" : "bytes", "fieldName" : "bytes"},{"type" : "doubleSum", "name" : "cost", "fieldName" : "cost"}]},"ioConfig" : {"type" : "index_parallel","inputSource" : {"type" : "local","baseDir" : "quickstart/tutorial","filter" : "ingestion-tutorial-data.json"},"inputFormat" : {"type" : "json"}}}
}
[root@bigdata001 apache-druid-0.22.1]#

5. 其它优化参数

在tuningConfig中定义

  • 需要定义task type
  • 再指定针对该task type的优化参数

这里我们设置本地文件数据导入的优化参数,形成最终的spec

[root@bigdata001 apache-druid-0.22.1]# cat quickstart/tutorial/ingestion-tutorial-index.json
{"type" : "index_parallel","spec" : {"dataSchema" : {"dataSource" : "ingestion-tutorial","timestampSpec" : {"column" : "ts","format" : "iso"},"granularitySpec" : {"rollup" : true,"type" : "uniform","segmentGranularity" : "HOUR","queryGranularity" : "MINUTE","intervals" : ["2018-01-01/2018-01-02"]},"dimensionsSpec" : {"dimensions" : ["srcIP",{"name" : "srcPort", "type" : "long"},{"name" : "dstIP", "type" : "string"},{"name" : "dstPort", "type" : "long"},{"name" : "protocol", "type" : "string"}]},"metricsSpec" : [{"type" : "count", "name" : "count"},{"type" : "longSum", "name" : "packets", "fieldName" : "packets"},{"type" : "longSum", "name" : "bytes", "fieldName" : "bytes"},{"type" : "doubleSum", "name" : "cost", "fieldName" : "cost"}]},"ioConfig" : {"type" : "index_parallel","inputSource" : {"type" : "local","baseDir" : "quickstart/tutorial","filter" : "ingestion-tutorial-data.json"},"inputFormat" : {"type" : "json"}},"tuningConfig" : {"type" : "index_parallel","maxRowsPerSegment" : 5000000}    }
}
[root@bigdata001 apache-druid-0.22.1]#

6. 提交任务

将ingestion-tutorial-data.json和ingestion-tutorial-index.json文件,分发到Druid集群其它服务器的quickstart/tutorial目录下

[root@bigdata001 apache-druid-0.22.1]#
[root@bigdata001 apache-druid-0.22.1]# scp quickstart/tutorial/ingestion-tutorial-data.json root@bigdata002:/opt/apache-druid-0.22.1/quickstart/tutorial/
ingestion-tutorial-data.json                                                                                                             100% 1408   956.5KB/s   00:00
[root@bigdata001 apache-druid-0.22.1]# scp quickstart/tutorial/ingestion-tutorial-index.json root@bigdata002:/opt/apache-druid-0.22.1/quickstart/tutorial/
ingestion-tutorial-index.json                                                                                                            100% 1373     1.4MB/s   00:00
[root@bigdata001 apache-druid-0.22.1]#

在命令行执行task

[root@bigdata001 apache-druid-0.22.1]# bin/post-index-task --file quickstart/tutorial/ingestion-tutorial-index.json --url http://bigdata003:9081
Beginning indexing data for ingestion-tutorial
Task started: index_parallel_ingestion-tutorial_mclognkd_2022-04-01T02:55:13.038Z
Task log:     http://bigdata003:9081/druid/indexer/v1/task/index_parallel_ingestion-tutorial_mclognkd_2022-04-01T02:55:13.038Z/log
Task status:  http://bigdata003:9081/druid/indexer/v1/task/index_parallel_ingestion-tutorial_mclognkd_2022-04-01T02:55:13.038Z/status
Task index_parallel_ingestion-tutorial_mclognkd_2022-04-01T02:55:13.038Z still running...
Task index_parallel_ingestion-tutorial_mclognkd_2022-04-01T02:55:13.038Z still running...
Task index_parallel_ingestion-tutorial_mclognkd_2022-04-01T02:55:13.038Z still running...
Task finished with status: SUCCESS
Completed indexing data for ingestion-tutorial. Now loading indexed data onto the cluster...
[root@bigdata001 apache-druid-0.22.1]#

7. 查询结果数据

dsql>
dsql> select * from "ingestion-tutorial";
┌──────────────────────────┬───────┬──────┬───────┬─────────┬─────────┬─────────┬──────────┬─────────┬─────────┐
│ __time                   │ bytes │ cost │ count │ dstIP   │ dstPort │ packets │ protocol │ srcIP   │ srcPort │
├──────────────────────────┼───────┼──────┼───────┼─────────┼─────────┼─────────┼──────────┼─────────┼─────────┤
│ 2018-01-01T01:01:00.000Z │  6000 │  4.9 │     3 │ 2.2.2.2 │    3000 │      60 │ 6        │ 1.1.1.1 │    2000 │
│ 2018-01-01T01:02:00.000Z │  9000 │ 18.1 │     2 │ 2.2.2.2 │    7000 │      90 │ 6        │ 1.1.1.1 │    5000 │
│ 2018-01-01T01:03:00.000Z │  6000 │  4.3 │     1 │ 2.2.2.2 │    7000 │      60 │ 6        │ 1.1.1.1 │    5000 │
│ 2018-01-01T02:33:00.000Z │ 30000 │ 56.9 │     2 │ 8.8.8.8 │    5000 │     300 │ 17       │ 7.7.7.7 │    4000 │
│ 2018-01-01T02:35:00.000Z │ 30000 │ 46.3 │     1 │ 8.8.8.8 │    5000 │     300 │ 17       │ 7.7.7.7 │    4000 │
└──────────────────────────┴───────┴──────┴───────┴─────────┴─────────┴─────────┴──────────┴─────────┴─────────┘
Retrieved 5 rows in 0.24s.dsql>

Apache druid的ingestion spec数据插入规范相关推荐

  1. apache druid 与kafka整合使用

    前言 在上一篇,我们了解了apache druid的搭建,以及如何快速导入外部数据源到apache druid中进行数据分析和使用 本篇,我们结合一个实际的简单的应用场景,来说说apache drui ...

  2. Apache Druid历险记

    1. Druid简介 1. 1 概述 Druid是一个快速的列式分布式的支持实时分析的数据存储系统.它在处理PB级数据.毫秒级查询.数据实时处理方面,比传统的OLAP系统有了显著的性能改进. OLAP ...

  3. Apache Druid远程代码执行漏洞(CVE-2021-25646)

    Apache Druid远程代码执行漏洞(CVE-2021-25646) 0x01 漏洞简介 Apache Druid 是用 Java 编写的面向列的开源分布式数据存储, 通常用于商业智能/ OLAP ...

  4. Apache Druid(二、架构设计)

    文章目录 回顾 架构 整体设计 进程服务作用 数据流 数据生产 数据查询 查询的优化 索引服务 存储设计 Datasrouces and segments segment设计 特殊数据结构 命名设计 ...

  5. Apache Druid Console 远程命令执行漏洞

    一.漏洞概述 Apache Druid 是用Java编写的面向列的开源分布式数据存储,旨在快速获取大量事件数据,并在数据之上提供低延迟查询. Apache Druid 默认情况下缺乏授权认证,攻击者可 ...

  6. 【Druid】(四)Apache Druid 部署和配置(单机版 / Docker 容器版 / Kubernetes 集群版)

    文章目录 一.Apache Druid 部署 1.1 单机版 1.1.1 Jar 包下载 1.1.2 Druid 的安装部署 1.2 Docker 容器版 1.2.1 下载 1.2.2 配置 Dock ...

  7. 实时统计分析系统-Apache Druid

    Druid.io(以下简称Druid)是2013年底开源出来的, 主要解决的是对实时数据以及较近时间的历史数据的多维查询提供高并发(多用户),低延时,高可靠性的问题. Druid简介: Druid是一 ...

  8. Apache Druid RCE(CVE-2021-25646)复现

    漏洞概述 Apache Druid 是用Java编写的面向列的开源分布式数据存储,旨在快速获取大量事件数据,并在数据之上提供低延迟查询. Apache Druid 默认情况下缺乏授权认证,攻击者可以发 ...

  9. POI:从Excel文件中读取数据,向Excel文件中写入数据,将Excel表格中的数据插入数据库,将数据库中的数据添加到Excel表

    POI 简介: POI是Apache软件基金会用Java编写的免费开源的跨平台的 Java API,Apache POI提供API给Java程序对Microsoft Office格式档案读和写的功能. ...

  10. Flume-0.9.4数据插入HBase-0.96

    来自:http://blog.csdn.net/iam333/article/details/18770977 最近由于业务需要,需要将flume的数据插入HBase-0.96,利用flume的实时日 ...

最新文章

  1. linux终端获取root,ubuntu18.04获取root权限并用root用户登录的实现
  2. orleans/Documentation
  3. 信用卡的3种分期模式全面比较
  4. C和指针之动态内存分配malloc、calloc、realloc简单使用和区别
  5. 博弈论 斯坦福game theory stanford week 3.2_
  6. 人工智障学习笔记——机器学习(16)降维小结
  7. html中js定义的方法无效,javascript中定义函数有几种常用方法?
  8. 这样去分析大盘才能稳赚不亏best
  9. DIV_ROUND_UP(x,y)实现x/y向上取整
  10. [转载]千古真人张三丰
  11. 亲情友情爱情:《悲惨世界》第四部《卜吕梅街的柔情和圣德尼街的史诗》/人性:《悲惨世界》第五部《冉阿让》摘录...
  12. 用PS把人物皮肤处理的质感又通透
  13. JSP中四种作用域的不同
  14. Java EE知识储备(二)
  15. 数据安全法等相关法律等的归纳小结
  16. Tableau工具提示:显示图表
  17. 如何选购一款性能较好的固态硬盘
  18. unity 加载ARCGIS 地球 实现天气系统
  19. 面试 | 推荐几个程序员刷题的网站!面试必备!!!
  20. 深度学习技术是我国制造光刻机弯道超车的机会吗?

热门文章

  1. 无线路由器破解之cdlinux
  2. 表达式之谜---半斤(复合赋值表达式)
  3. 每天记忆五个词根之五
  4. word排版快捷指令_在word文档中如何利用快捷键快速排版呢?
  5. Vivado IP核之浮点数加减法 Floating-point
  6. c# webbrowser html5,C#设置WebBrowser IE浏览器版本
  7. 格林尼治时间与本地时间的转换
  8. 阿里云因发现Log4j2漏洞未及时上报,被工信部处罚!
  9. nginx中的sub_filter
  10. 各平台电脑开启虚拟化的方法