文章目录

  • 简介
    • ClickHouse简介
    • RoaringBitmap(RBM)原理
    • ClickHouse中使用RBM存在的问题
  • RoaringBitmap(RBM)定制序列化实现
    • ClickHouse中RoaringBitmap的结构解析
    • Spark中RoaringBitmap的实现
    • 定制RBM序列化方式以兼容ClickHouse
      • Byte(1) - 类型标识生成
      • VarInt(SerializedSizeInBytes) - 序列化后的字节长度
      • ByteArray(RoaringBitmap) - RBM序列化
      • 定制序列化的整体实现
      • Spark和ClickHouse生成的RBM序列化数据比对
        • ClickHouse中生成的RBM数据:
        • Spark中定制生成的RBM数据:
  • Spark生成RBM导入ClickHouse实现
    • 创建ClickHouse表
    • 使用Spark JDBC方式导入
    • 查询CK表中数据验证
  • 64位RoaringBitmap(RBM)定制序列化实现

简介

ClickHouse简介

ClickHouse是由号称“俄罗斯Google”的Yandex公司开发并在2016年开源。ClickHouse是一个列存储数据库,是原生的向量化执行引擎。目前ClickHouse在OLAP领域得到了广泛的使用,其首要原因是查询速度快。

在大数据处理中,海量数据的判重和基数统计是两个绕不开的基础问题。ClickHouse的解决方案是使用RoaringBitmap,其已有丰富的bitmap操作函数支持,可以实现非常灵活方便的判重和基数统计操作。

RoaringBitmap(RBM)原理

Bitmap用位图的方式来存储id数值信息,可以实现精确地基数统计。但因为Bitmap在数值稀疏时会造成很大空间浪费,因此提出了用RoaringBitmap(RBM)对稀疏位图进行压缩,减少内存占用并提高效率。RBM的主要思路是:将32位无符号整数按照高16位分桶,即最多可能有2^16=65536个桶,又称为container。存储数据时,按照数据的高16位找到container(找不到就会新建一个),再将低16位放入container中。也就是说,一个RBM就是很多container的集合。其详细的原理可参考文章。

ClickHouse中使用RBM存在的问题

在查看了ClickHouse的文档及搜索了各公司的实践方案(如腾讯和头条)后,发现目前只能将原始明细的id数据导入到ClickHouse后,再通过创建物化视图的方式构建RBM结构进行使用。但是原始明细数据量往往非常大,这不仅给数据ETL处理造成了很大的负担,也对计算资源以及ClickHouse的集群资源要求非常高。

我们的数据ETL处理通常是基于hadoop平台的Spark计算框架进行处理,那能不能在Spark进行数据处理时就将Bitmap数据预计算存储好,这样不仅数据量会大大地减少,同时也能大大地减少对ClickHouse集群资源的要求!

RoaringBitmap(RBM)定制序列化实现

ClickHouse中RoaringBitmap的结构解析

为了在Spark数据预处理时提前计算存储好RBM,首先就需要了解ClickHouse中RBM结构的实现原理。通过在ClickHouse的源码社区进行咨询,了解到ClickHouse是利用CRoaring实现的RBM,并且其存储结构的格式是Byte(1), VarInt(SerializedSizeInBytes), ByteArray(RoaringBitmap)

Spark中RoaringBitmap的实现

目前CRoaring没有对应的Java语言实现库,而在Java中RBM的常用实现库是RoaringBitmap,并且其已经在Spark、Kylin和Druid等系统中得到了应用。RoaringBitmap库提供了完善的bitmap操作,对其进行封装后就可以集成到SparkSQL中进行使用。

这是我在Spark中自定义实现的bitmap相关udf函数。因此可以在Hive中定义Binary列来存储RBM计算的中间结果,这样既可以使用SparkSQL对中间结果进行再聚合统计,同时也可以将中间结果直接导入到ClickHouse中进行查询,极大地减少了导入ClickHouse的数据量。

定制RBM序列化方式以兼容ClickHouse

查看了ClickHouse中RBM实现源码和Java版RoaringBitmap的序列化方式后,发现两者序列化格式不兼容。为了可以将中间结果数据导入ClickHouse中,需要在导入前定制序列化方式使其与ClickHouse中的RBM兼容。前面已经了解到ClickHouse中RBM的存储结构是Byte(1), VarInt(SerializedSizeInBytes), ByteArray(RoaringBitmap),因此可对各结构进行实现组装。

Byte(1) - 类型标识生成

第一部分是用一个字节来标识该RBM的类型。原来ClickHouse中对于小集合(基数小于32)的实现进行了性能优化,对其采用SmallSet集合的方式存储,其大小最大限定写死为32。也就是说当位图的基数少于32时,仅使用SmallSet存储;一旦超过此阈值,就调用toLarge()方法转化为RBM,此后都在RBM上操作。第一个字节就是用来标识底层存储是采用SmallSet还是RBM实现的,如果是SmallSet则标识为0,否则标识为1,因此实现如下。

// rb: RoaringBitmap
if (rb.getCardinality <= 32) {bos.put(new Integer(0).toByte) // bos:ByteBuffer} else {bos.put(new Integer(1).toByte)
}

VarInt(SerializedSizeInBytes) - 序列化后的字节长度

第二部分是用VarInt类型来存储第三部分ByteArray(RoaringBitmap)的所占用字节长度。参考代码可实现如下:

// rb: RoaringBitmap、bos:ByteBuffer
VarInt.putVarInt(rb.serializedSizeInBytes(), bos)

ByteArray(RoaringBitmap) - RBM序列化

第三部分就是RBM序列化后的数据,可直接用例Java中RoaringBitmap的序列化进行实现(经验证与CRoaring序列化方式是一致的):

// rb: RoaringBitmap、bos:ByteBuffer
rb.serialize(bos)

定制序列化的整体实现

由于ClickHouse的RBM在基数小于32时用SmallSet集合进行了优化,因此在查看了ClickHouse中SmallSet的实现后,定制序列化方法的整体实现如下:

def serialize(rb: RoaringBitmap): ByteBuffer = {if (rb.getCardinality <= 32) {val bos1 = ByteBuffer.allocate(2 + 4*rb.getCardinality)val bos = if (bos1.order eq LITTLE_ENDIAN) bos1 else bos1.slice.order(LITTLE_ENDIAN)bos.put(new Integer(0).toByte)bos.put(rb.getCardinality.toByte)rb.toArray.foreach(i => bos.putInt(i))bos} else {val varIntLen = VarInt.varIntSize(rb.serializedSizeInBytes())val bos1 = ByteBuffer.allocate(1 + varIntLen + rb.serializedSizeInBytes()) // 1表示标识位,即是否小于32个值;varIntLen表示后面数据的字节长度;val bos = if (bos1.order eq LITTLE_ENDIAN) bos1 else bos1.slice.order(LITTLE_ENDIAN)bos.put(new Integer(1).toByte)VarInt.putVarInt(rb.serializedSizeInBytes(), bos)rb.serialize(bos)bos}
}

Spark和ClickHouse生成的RBM序列化数据比对

已完成RBM整体的序列化后,为了能和ClickHouse生成的RBM数据进行比对,因此在ClickHouse中使用base64Encode函数将字节数据转换成字符串数据后与Spark中生成的数据进行比对以验证我们定制序列化方式的正确性:

ClickHouse中生成的RBM数据:

:) SELECT base64Encode(toString(bitmapBuild([toUInt32(32), toUInt32(65), toUInt32(127), toUInt32(1026)])));SELECT base64Encode(toString(bitmapBuild([toUInt32(32), toUInt32(65), toUInt32(127), toUInt32(1026)])))┌─base64Encode(toString(bitmapBuild(array(toUInt32(32), toUInt32(65), toUInt32(127), toUInt32(1026)))))─┐
│ AAQgAAAAQQAAAH8AAAACBAAA                                                                              │
└───────────────────────────────────────────────────────────────────────────────────────────────────────┘

Spark中定制生成的RBM数据:

val rb = RoaringBitmap.bitmapOf(32, 65, 127, 1026)
// serialize即上一小节定义的定制序列化函数
val encode = new String(Base64.getEncoder.encode(serialize(rb).array()))
println(encode.equals("AAQgAAAAQQAAAH8AAAACBAAA"))

Spark生成RBM导入ClickHouse实现

现在我们已经在Spark中可以直接生成和ClickHouse中RBM兼容的数据了,那最直接的方法是直接将序列化后的字节数据插入roaring_bitmap AggregateFunction(groupBitmap, UInt32)字段中,在测试后发现插入数据一直无法成功,报如下错误:

DB::Exception: Cannot read all data. Bytes read: 292. Bytes expected: 5201903.

在官方文档中看到AggregateFunction的列只能使用NSERT SELECT结合aggregate -State-函数插入数据,可能是这个原因导致的。

既然无法直接插入AggregateFunction列的数据,通过查看ClickHouse文档后发现其支持物化表达式的能力,可定义某一列是另一列通过某个表达式计算生成出来的。那我们可以把roaring_bitmap AggregateFunction(groupBitmap, UInt32)定义成物化表达式列,而将RBM序列化数据插入普通字符串列中,这样就可以实现优美的数据打通能力,具体操作步骤如下。

创建ClickHouse表

定义bitmap类型的物化表达式列,其是通过原始RBM序列化数据生成得到的。但如果直接将序列化数据插入String列,会发现写入后的数据和写入前不一致(可能是因为有特殊字符的原因)。因此想到的一种方式是将RBM序列化数据先用Base64编码成普通字符串插入,然后通过物化表达式解码成RBM序列化的数据。

// 创建分布式表
CREATE TABLE stone.bitmap_test ON CLUSTER cluster_name (ds Int64,user String,roaring_bitmap AggregateFunction(groupBitmap, UInt32)  MATERIALIZED base64Decode(user)
)
ENGINE = ReplicatedMergeTree('xxx', '{replica}')
PARTITION BY ds
ORDER BY (ds)
SETTINGS index_granularity = 8192;// 创建view视图用于查询分布式表数据
CREATE TABLE stone.bitmap_test_view on cluster cluster_name AS stone.bitmap_test ENGINE=Distributed(cluster_name, stone, bitmap_test, rand());

使用Spark JDBC方式导入

构造一个简单的DataFrame数据写入ClickHouse表(写入函数saveToClickHouse可自定义实现),注意物化表达化列是不需要插入数据的,它会自动计算生成。

// 构造待写入数据
val rows = new util.ArrayList[Row]()
rows.add(Row(1234561L, new String(Base64.getEncoder.encode(serialize(RoaringBitmap.bitmapOf(1 until 13: _*)).array()))))
rows.add(Row(1234562L, new String(Base64.getEncoder.encode(serialize(RoaringBitmap.bitmapOf(1 until 133: _*)).array()))))
rows.add(Row(1234563L, new String(Base64.getEncoder.encode(serialize(RoaringBitmap.bitmapOf(1 until 1334: _*)).array()))))
rows.add(Row(1234564L, new String(Base64.getEncoder.encode(serialize(RoaringBitmap.bitmapOf(1 until 133334: _*)).array()))))
val schema = StructType(List(StructField("ds", LongType),StructField("user", StringType)))
val df = jobManager.spark.createDataFrame(rows, schema)// 自定义的向ck表写入数据的方法
saveToClickHouse(df,"stone","bitmap_test","cluster_name",Some(1),  // 写入并发partition数Some(5000)) // 每次写入数据的batch size

查询CK表中数据验证

查询ClickHouse中表的数据可以发现数据正确被插入了,并且物化表达式生成的RBM列数据也是正常的。至此,SparkSQL & ClickHouse打通RBM功能的流程就全部完成了!

:) select ds, bitmapCardinality(roaring_bitmap) as uv from stone.bitmap_test_view;SELECT ds, bitmapCardinality(roaring_bitmap) AS uv
FROM stone.bitmap_test_view┌──────ds─┬───uv─┐
│ 1234563 │ 1333 │
└─────────┴──────┘
┌──────ds─┬──uv─┐
│ 1234562 │ 132 │
└─────────┴─────┘
┌──────ds─┬─uv─┐
│ 1234561 │ 12 │
└─────────┴────┘
┌──────ds─┬─────uv─┐
│ 1234564 │ 133333 │
└─────────┴────────┘4 rows in set. Elapsed: 0.003 sec.

64位RoaringBitmap(RBM)定制序列化实现

前面介绍了RBM都是基于32位实现的,但在很多实际应用中往往都需要用到64位RBM的实现。调研后发现java的RoaringBitmap包是支持64位实现的Roaring64NavigableMap,而且ClickHouse中也支持64位的RBM实现。查看两者的实现源码后对前面小节的定制序列化实现进行调整即可支持64位RBM的定制序列化,具体实现代码如下,已经结合ClickHouse测试通过。

def serialize(rb: Roaring64NavigableMap): ByteBuffer = {// ck中rbm对小于32的基数进行了优化,使用smallset进行存放if (rb.getLongCardinality <= 32) {// the serialization structure of roaringbitmap in clickhouse: Byte(1), VarInt(SerializedSizeInBytes), ByteArray(RoaringBitmap)// and long occupies 8 bytesval bos1 = ByteBuffer.allocate(1 + 1 + 8*rb.getIntCardinality)val bos = if (bos1.order eq LITTLE_ENDIAN) bos1 else bos1.slice.order(LITTLE_ENDIAN)bos.put(new Integer(0).toByte)bos.put(rb.getIntCardinality.toByte)rb.toArray.foreach(i => bos.putLong(i))bos} else {// Roaring64NavigableMap serialize with prefix of "signedLongs" and "highToBitmap.size()"// Refer to the implementation of the serialize method of Roaring64NavigableMap, remove the prefix bytesval rbmPrefixBytes = 1 + 8val serializedSizeInBytes = rb.serializedSizeInBytes().toInt - rbmPrefixBytesval varIntLen = VarInt.varLongSize(serializedSizeInBytes)// the serialization structure of roaringbitmap in clickhouse: Byte(1), VarInt(SerializedSizeInBytes), ByteArray(RoaringBitmap)val bos1 = ByteBuffer.allocate(1 + varIntLen + serializedSizeInBytes)val bos = if (bos1.order eq LITTLE_ENDIAN) bos1 else bos1.slice.order(LITTLE_ENDIAN)bos.put(new Integer(1).toByte)VarInt.putVarInt(serializedSizeInBytes, bos)val baos = new ByteArrayOutputStream()rb.serialize(new DataOutputStream(baos))bos.put(baos.toByteArray.slice(rbmPrefixBytes, rb.serializedSizeInBytes().toInt))bos}
}

SparkSQL ClickHouse RoaringBitmap使用实践相关推荐

  1. 【clickhouse】ClickHouse基础、实践、调优全视角解析

    1.概述 ClickHouse基础.实践.调优全视角解析(part1-配置篇) ClickHouse基础.实践.调优全视角解析(part2-表引擎介绍) ClickHouse基础.实践.调优全视角解析 ...

  2. 1w字详解 ClickHouse漏斗模型实践方案(收藏)

    作者:互联网大数据团队- Wu Yonggang 日常工作中做为数仓开发工程师.数据分析师经常碰到漏斗分析模型,本文详细介绍漏斗模型的概念及基本原理,并阐述了其在平台内部的具体实现.针对实际使用过程的 ...

  3. 史上最简单的spark教程第十三章-SparkSQL编程Java案例实践(终章)

    Spark-SQL的Java实践案例(五) 本章核心:JDBC 连接外部数据库,sparkSQL优化,故障监测 史上最简单的spark教程 所有代码示例地址:https://github.com/My ...

  4. 【用户画像】Clickhouse位图函数实践总结

    文章目录 1 位图概念 2 位图函数 2.1 位图函数作用 2.2 位图函数构造方法 2.3 位图函数的基本分类 2.4 位图函数基本使用 2.4.1 数据准备 2.4.2 构造位图 2.4.2.1 ...

  5. 数仓ClickHouse多维分析应用实践

    点击上方 "肉眼品世界"关注, 星标或置顶一起成长 你点的每个赞,我都当成了喜欢

  6. 【clickhouse】clickhouse 最佳实践

    文章目录 1.概述 1.概述 转载:ClickHouse Better Practices clickhouse优化最佳实践(持续更新-)

  7. ClickHouse 冷热分离存储在得物的实践

    1. 业务背景 得物上一代日志平台的存储主要依赖于 ES.随着公司业务的高速发展,日志场景逐步产生了一些新需求,主要表现在:应用数量逐步增多,研发需要打印更多的日志定位业务问题,安全合规需要保留更长时 ...

  8. ClickHouse在工业互联网场景的OLAP平台建设实践

    作者:许亮 背景介绍 京东工业是2021独立出来成立的新事业群-京东工业事业群,包括工业品.工业服务.工业互联等四大板块业务.工业互联业务主要是搭建工业互联网平台,用于将实时现场工业数据汇入平台进行分 ...

  9. Hbase、Kudu和ClickHouse横向对比

    好记忆不如烂笔头,能记下点东西,就记下点,有时间拿出来看看,也会发觉不一样的感受. 目录 1 前言 2 安装部署方式对比 3 组成架构对比 4 基本操作对比 4.1 数据读写操作 4.2 数据查询操作 ...

  10. 用户行为分析模型实践--漏斗分析模型

    在<用户行为分析模型实践--路径分析模型 >中,讲述了基于平台化查询中查询时间短.需要可视化的要求,并结合现有的存储计算资源以及具体需求,我们在实现中将路径数据进行枚举后分为两次进行合并. ...

最新文章

  1. Linux之软件卸载 apt-get
  2. 【Android 插件化】VirtualAppEx 编译运行 ( VirtualAppEx 简介 | 配置 VirtualAppEx 编译环境 | 编译运行 VirtualAppEx 代码 )
  3. 开运算和闭运算_OpenCV计算机视觉学习(5)——形态学处理(腐蚀膨胀,开闭运算,礼帽黑帽,边缘检测)...
  4. react学习笔记10:显示隐藏效果和tab切换效果
  5. [2018.10.18 T3] 小 G 的线段树
  6. python跨平台是什么意思_跨平台是什么意思?通俗深刻的解释
  7. SNMP 枚举工具 Snmpwalk
  8. 微信三方平台调试过程中遇到的问题
  9. Python学习之路:函数传递可变参数与不可变参数
  10. php 提取视频中的声音,怎么提取视频的声音 提取视频中的声音
  11. linux实验手册汇总,Linux实验手册汇总(226页)-原创力文档
  12. GoLang之go test测试
  13. vim编辑器退不出来的问题
  14. HCIA:动态路由路由协议RIP及DHCP
  15. WIN10恢复文件默认打开方式
  16. 新组装的电脑,主板灯亮,开机没反应
  17. 脚本一键部署(dhcp、dns、pxe、raid、nfs+apache+expect、lvm、磁盘分区、监控资源)
  18. 清除路由器密码 路由器密码破解
  19. 如何批量生成含有产品信息的二维码
  20. [原创].NET 业务框架开发实战之十 第一阶段总结,深入浅出,水到渠成(后篇)...

热门文章

  1. Python爬虫是什么?
  2. 3个酷到没同学的冷门专业,开始逆袭了?
  3. VSCode Remote 报错,无法连接??别慌,小二来了!
  4. 一次人大金仓剔除锁经历
  5. mysql的append用法_insert 中append 用法详解
  6. js 前端实现打印功能
  7. rust发射台主楼_各专业分类词库(完全).doc
  8. 牛客 | C 选择颜色
  9. git 分支关系图谱讲解
  10. 提示网站服务器403,浏览器打开网页时出现http 403 禁止访问错误是什么原因?