开始学习《ClickHouse原理解析与应用实践》,写博客作读书笔记。

本文全部内容都来自于书中内容,个人提炼。

第9章:

《ClickHouse原理解析与应用实践》读书笔记(6)_Aiky哇的博客-CSDN博客各种查询子句的用法。(本地查询)https://aikysay.blog.csdn.net/article/details/125595848

第10章 副本与分片

10.1 概述

ClickHouse的集群配置非常灵活,用户既可以将所有节点组成一个单一集群,也可以按照业务的诉求,把节点划分为多个小的集群。

在每个小的集群区域之间,它们的节点、分区和副本数量可以各不相同 。

分片之间的数据是不同的,而副本之间的数据是完全相同的。

使用副本的主要目的是防止数据丢失,增加数据存储的冗余;而使用分片的主要目的是实现数据的水平切分。

10.2 数据副本

在MergeTree中,一个数据分区由开始创建到全部完成,会历经两类存储区域。

  1. 内存:数据首先会被写入内存缓冲区。
  2. 本地磁盘:数据接着会被写入tmp临时目录分区,待全部完成后再将临时目录重命名为正式分区。

ReplicatedMergeTree在上述基础之上增加了ZooKeeper的部分, 它会进一步在ZooKeeper内创建一系列的监听节点,并以此实现多个实例之间的通信。

在整个通信过程中,ZooKeeper并不会涉及表数据的传输。

10.2.1 副本的特点

  1. 依赖ZooKeeper:在执行INSERT和ALTER查询的时候, ReplicatedMergeTree需要借助ZooKeeper的分布式协同能力,以实现多个副本之间的同步。但是在查询副本的时候,并不需要使用 ZooKeeper。
  2. 多主架构(Multi Master):可以在任意一个副本上执行 INSERT和ALTER查询,它们的效果是相同的。这些操作会借助 ZooKeeper的协同能力被分发至每个副本以本地形式执行。
  3. Block数据块:在执行INSERT命令写入数据时,会依据 max_insert_block_size的大小(默认1048576行)将数据切分成若干个Block数据块。所以Block数据块是数据写入的基本单元,并且具有写入的原子性和唯一性。
    1. 原子性即:在数据写入时,一个Block块内的数据要么全部写入成功,要么全部失败。
    2. 唯一性即:在写入一个Block数据块的时候,会按照当前Block数据块的数据顺序、数据行和数据大小等指标,计算Hash信息摘要并记录在案。之后,如果某个待写入的Block数据块与先前已被写入的 Block数据块拥有相同的Hash摘要(Block数据块内数据顺序、数据大小和数据行均相同),则该Block数据块会被忽略。这项设计可以预防 由异常原因引起的Block数据块重复写入的问题。

10.2.2 ZooKeeper的配置方式

各个副本所使用的Zookeeper配置通常是相同的,为了便于在多个节点之间复制配置文件,更常见的做法是将这一 部分配置抽离出来,独立使用一个文件保存。

在服务器的/etc/clickhouse-server/config.d目录下创建一个 名为metrika.xml的配置文件:

<?xml version="1.0"?>
<yandex><zookeeper-servers> <!—ZooKeeper配置,名称自定义 --><node index="1"> <!—节点配置,可以配置多个地址--><host>hdp1.nauu.com</host><port>2181</port></node></zookeeper-servers>
</yandex>在全局配置config.xml中使用<include_from>标签导入刚才定义的配置:
<include_from>/etc/clickhouse-server/config.d/metrika.xml</include_from>并引用ZooKeeper配置的定义:
<zookeeper incl="zookeeper-servers" optional="false" />

ClickHouse提供了一张名为zookeeper的 代理表。通过这张表,可以使用SQL查询的方式读取远端ZooKeeper内的数据。

查询的SQL语句中,必须指定path条件:

SELECT * FROM system.zookeeper where path = '/'
┌─name───────┬─value─┬──────czxid─┬──────mzxid─┬───────────────ctime─┬───────────────mtime─┬─version─┬─cversion─┬─aversion─┬─ephemeralOwner─┬─dataLength─┬─numChildren─┬──────pzxid─┬─path─┐
│ zookeeper  │       │          0 │          0 │ 1970-01-01 08:00:00 │ 1970-01-01 08:00:00 │       0 │       -2 │        0 │              0 │          0 │           2 │          0 │ /    │
│ clickhouse │       │ 4294967299 │ 4294967299 │ 2022-06-20 10:52:21 │ 2022-06-20 10:52:21 │       0 │        1 │        0 │              0 │          0 │           1 │ 4294967301 │ /    │
└────────────┴───────┴────────────┴────────────┴─────────────────────┴─────────────────────┴─────────┴──────────┴──────────┴────────────────┴────────────┴─────────────┴────────────┴──────┘SELECT * FROM system.zookeeper where path = '/clickhouse';
┌─name───────┬─value─┬──────czxid─┬──────mzxid─┬───────────────ctime─┬───────────────mtime─┬─version─┬─cversion─┬─aversion─┬─ephemeralOwner─┬─dataLength─┬─numChildren─┬──────pzxid─┬─path────────┐
│ task_queue │       │ 4294967301 │ 4294967301 │ 2022-06-20 10:52:21 │ 2022-06-20 10:52:21 │       0 │        1 │        0 │              0 │          0 │           1 │ 4294967303 │ /clickhouse │
└────────────┴───────┴────────────┴────────────┴─────────────────────┴─────────────────────┴─────────┴──────────┴──────────┴────────────────┴────────────┴─────────────┴────────────┴─────────────┘

10.2.3 副本的定义形式

使用副本增加了数据的冗余存储,所以降低了数据丢失的风险。

每个副本实例都可以作为数据读、写的入口,这无疑分摊了节点的负载。

ReplicatedMergeTree的定义方式如下:

ENGINE = ReplicatedMergeTree('zk_path', 'replica_name')

zk_path用于指定在ZooKeeper中创建的数据表的路径,路径名称是自定义的,并没有固定规则,用户可以设置成自己希望的任何路径。

ck提供的配置模板:/clickhouse/tables/{shard}/table_name

  • /clickhouse/tables/是约定俗成的路径固定前缀,表示存放数据表的根路径。
  • {shard}表示分片编号,通常用数值替代,例如01、02、03。一 张数据表可以有多个分片,而每个分片都拥有自己的副本。
  • table_name表示数据表的名称,为了方便维护,通常与物理表的名字相同(虽然ClickHouse并不强制要求路径中的表名称和物理表名相同);而replica_name的作用是定义在ZooKeeper中创建的副本名称,该名称是区分不同副本实例的唯一标识。一种约定成俗的命名方式是使用所在服务器的域名称。
//1分片,1副本的情形:
// zk_path相同,replica_name不同
ReplicatedMergeTree('/clickhouse/tables/01/test_1, 'ch5.nauu.com')
ReplicatedMergeTree('/clickhouse/tables/01/test_1, 'ch6.nauu.com')// 多个分片、1个副本的情形:
// 分片1
// 2分片,1副本. zk_path相同,其中{shard}=01, replica_name不同
ReplicatedMergeTree('/clickhouse/tables/01/test_1, 'ch5.nauu.com')
ReplicatedMergeTree('/clickhouse/tables/01/test_1, 'ch6.nauu.com')
// 分片2
// 2分片,1副本. zk_path相同,其中{shard}=02, replica_name不同
ReplicatedMergeTree('/clickhouse/tables/02/test_1, 'ch7.nauu.com')
ReplicatedMergeTree('/clickhouse/tables/02/test_1, 'ch8.nauu.com')

10.3 ReplicatedMergeTree原理解析

10.3.1 数据结构

核心逻辑中,大量运用了ZooKeeper的能力,以实现多个ReplicatedMergeTree副本实例之间的协同,包括主副本选举、副本状态感知、操作日志分发、任务队列和BlockID去重判断等。

执行INSERT数据写入、MERGE分区和MUTATION操作的时候,都会涉及与ZooKeeper的通信。

通信的过程中,并不会涉及任何表数据的传输,在查询数据的时候也不会访问ZooKeeper。

从zookeeper数据结构开始介绍。

1.ZooKeeper内的节点结构

在每张ReplicatedMergeTree表的创建过程中,它会以zk_path为根路径,在Zoo-Keeper中为这张表创建一组监听节点。

监听节点可以大致分成如下几类:

1)元数据:

  • /metadata:保存元数据信息,包括主键、分区键、采样表达式等。
  • /columns:保存列字段信息,包括列名称和数据类型。
  • /replicas:保存副本名称,对应设置参数中的replica_name。

2)判断标识: 

  • /leader_election:用于主副本的选举工作,主副本会主导 MERGE和MUTATION操作(ALTER DELETE和ALTER UPDATE)。这些任务在主副本完成之后再借助ZooKeeper将消息事件分发至其他副本。
  • /blocks:记录Block数据块的Hash信息摘要,以及对应的 partition_id。通过Hash摘要能够判断Block数据块是否重复;通过 partition_id,则能够找到需要同步的数据分区。
  • /block_numbers:按照分区的写入顺序,以相同的顺序记录 partition_id。各个副本在本地进行MERGE时,都会依照相同的 block_numbers顺序进行。
  • /quorum:记录quorum的数量,当至少有quorum数量的副本写入成功后,整个写操作才算成功。quorum的数量由insert_quorum参数控制,默认值为0。

3)操作日志:

  • /log:常规操作日志节点(INSERT、MERGE和DROP PARTITION),它是整个工作机制中最为重要的一环,保存了副本需要执行的任务指令。log使用了ZooKeeper的持久顺序型节点,每条指令的名称以log-为前缀递增,例如log-0000000000、log-0000000001 等。每一个副本实例都会监听/log节点,当有新的指令加入时,它们会把指令加入副本各自的任务队列,并执行任务。关于这方面的执行逻辑,稍后会进一步展开。
  • /mutations:MUTATION操作日志节点,作用与log日志类似,当执行ALERT DELETE和ALERT UPDATE查询时,操作指令会被添加到这个节点。mutations同样使用了ZooKeeper的持久顺序型节点,但是它的命名没有前缀,每条指令直接以递增数字的形式保存,例如 0000000000、0000000001等。关于这方面的执行逻辑,同样稍后展开。
  • /replicas/{replica_name}/*:每个副本各自的节点下的一组监听节点,用于指导副本在本地执行具体的任务指令,其中较为重要的节点有如下几个:
    • /queue:任务队列节点,用于执行具体的操作任务。当副本从/log或/mutations节点监听到操作指令时,会将执行任务添加至该节点下,并基于队列执行。
    • /log_pointer:log日志指针节点,记录了最后一次执行的log 日志下标信息,例如log_pointer:4对应了/log/log-0000000003(从 0开始计数)。
    • /mutation_pointer:mutations日志指针节点,记录了最后一次执行的mutations日志名称,例如mutation_pointer:0000000000对应了/mutations/000000000。

2.Entry日志对象的数据结构

两组父节点 /log和/mutations是分发操作指令的信息通道,而发送指令的方式,则是为这些父节点添加子节点。

所有的副本实例,都会监听父节点的变化,当有子节点被添加时,它们能实时感知。

这些被添加的子节点在ClickHouse中被统一抽象为Entry对象,而具体实现则由Log-Entry和MutationEntry对象承载,分别对应/log 和/mutations节点。

1)LogEntry

LogEntry用于封装/log的子节点信息。

  • source replica:发送这条Log指令的副本来源,对应 replica_name。
  • type:操作指令类型,主要有get、merge和mutate三种,分别对应从远程副本下载分区、合并分区和MUTATION操作。
  • block_id:当前分区的BlockID,对应/blocks路径下子节点的名称。
  • partition_name:当前分区目录的名称。

2)MutationEntry

MutationEntry用于封装/mutations的子节点信息。

  • source replica:发送这条MUTATION指令的副本来源,对应 replica_name。
  • commands:操作指令,主要有ALTER DELETE和ALTER UPDATE。
  • mutation_id:MUTATION操作的版本号。
  • partition_id:当前分区目录的ID。

10.3.2 副本协同的核心流程

副本协同的核心流程主要有INSERT、MERGE、MUTATION(数据修改)和ALTER(元数据修改)四种。

其他查询并不支持分布式执行,包括SELECT、CREATE、DROP、RENAME和ATTACH。

1.INSERT的核心执行流程

1)创建第一个副本实例

CREATE TABLE replicated_sales_1(id String,price Float64,create_time DateTime
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/01/replicated_sales_1','ch5.nauu.com')
PARTITION BY toYYYYMM(create_time)
ORDER BY id

创建的过程中,ReplicatedMergeTree会进行一些初始化操作。

  • 根据zk_path初始化所有的ZooKeeper节点。
  • 在/replicas/节点下注册自己的副本实例ch5.nauu.com。
  • 启动监听任务,监听/log日志节点。
  • 参与副本选举,选举出主副本,选举的方式是向/leader_election/插入子节点,第一个插 入成功的副本就是主副本。

2)创建第二个副本实例

创建第二个副本实例。表结构和zk_path需要与第一个副本相同,而replica_name则需要设置成CH6的域名:

CREATE TABLE replicated_sales_1(
//相同结构
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/01/replicated_sales_1','ch6.nauu.com')
//相同结构

第二个ReplicatedMergeTree同样会进行一些初始化操作。

  • 在/replicas/节点下注册自己的副本实例ch6.nauu.com。
  • 启动监听任务,监听/log日志节点。
  • 参与副本选举,选举出主副本。在这个例子中,CH5副本成为主副本。

3)向第一个副本实例写入数据

-- 向第一个副本实例写入数据
INSERT INTO TABLE replicated_sales_1 VALUES('A001',100,'2019-05-10 00:00:00')

上述命令执行之后,首先会在本地完成分区目录的写入:
Renaming temporary part tmp_insert_201905_1_1_0 to 201905_0_0_0

接着向/blocks节点写入该数据分区的block_id:
Wrote block with ID '201905_2955817577822961065_12656761735954722499'

该block_id将作为后续去重操作的判断依据。如果此时再次执行刚才的INSERT语句,试图写 入重复数据,则会出现如下提示:

Block with ID 201905_2955817577822961065_12656761735954722499 already exists; ignoring it.

即副本会自动忽略block_id重复的待写入数据。 此外,如果设置了insert_quorum参数(默认为0),并且insert_quorum>=2,则CH5会进一步监控已完成写入操作的副本个数,只有当写入副本个数大于或等于insert_quorum时,整个写入操 作才算成功。

4)由第一个副本实例推送Log日志

在3步骤完成之后,会继续由执行了INSERT的副本向/log节点推送操作日志。

上面例子中第一个副本CH5担此重任。

日志的编号是/log/log-0000000000,而LogEntry的核心属性如下:

/log/log-0000000000
source replica: ch5.nauu.com
block_id: 201905_...
type : get
partition_name :201905_0_0_0

从日志内容中可以看出,操作类型为get下载,而需要下载的分区是201905_0_0_0。

其余所有副本都会基于Log日志以相同的顺序执行命令。

5)第二个副本实例拉取Log日志

CH6副本会一直监听/log节点变化,当CH5推送了/log/log-0000000000之后,CH6便会触发日志的拉取任务并更新log_pointer,将其指向最新日志下标:

/replicas/ch6.nauu.com/log_pointer : 0

拉取了LogEntry之后,它并不会直接执行,而是将其转为任务对象放至队列:

/replicas/ch6.nauu.com/queue/ Pulling 1 entries to queue: log-0000000000 - log-0000000000

因为同一时段可能会获取多个LogEntry,所以使用队列的形式消化任务是一种更为合理的设计。

注意,拉取的LogEntry是一个区间,这同样也是因为可能会连续收到多个LogEntry。

6)第二个副本实例向其他副本发起下载请求

CH6基于/queue队列开始执行任务。

type类型为get,ReplicatedMerge-Tree知道现在需要同步其他副本中的数据分区。

CH6上的第二个副本实例会开始选择一个远端的其他副本作为数据的下载来源。

选择算法大致为:

  1. 从/replicas节点拿到所有的副本节点。
  2. 遍历这些副本,选取其中一个。选取的副本需要拥有最大的log_pointer下标,并且/queue子节点数量最少。log_pointer下标最大,意味着该副本执行的日志最多,数据应该更加完整;而/queue最小,则意味着该副本目前的任务执行负担较小。

如果第一次下载请求失败,在默认情况下,CH6再尝试请求4次,一共会尝试5次(由 max_fetch_partition_retries_count参数控制,默认为5)。

7)第一个副本实例响应数据下载

CH5根据参数做出响应 ,发送本地分区数据。

8)第二个副本实例下载数据并完成本地写入

CH6副本在收到CH5的分区数据后,首先将其写至临时目录。

待全部数据接收完成之后,重命名该目录

至此,整个写入流程结束。

在INSERT的写入过程中,ZooKeeper不会进行任何实质性的数据传输。

本着谁执行谁负责的原则,在这个案例中由CH5首先在本地写入了分区数据。之后,也由这个副本负责发送 Log日志,通知其他副本下载数据。如果设置了insert_quorum并且insert_quorum>=2,则还会由该副本监控完成写入的副本数量。其他副本在接收到Log日志之后,会选择一个最合适的远端副本,点对点地下载分区数据。

2.MERGE的核心执行流程

无论MERGE操作从哪个副本发起,其合并计划都会交由主副本来制定。

1)创建远程连接,尝试与主副本通信

在CH6节点执行OPTIMIZE,强制触发MERGE合并。

CH6通过/replicas找到主副 本CH5,并尝试建立与它的远程连接。

2)主副本接收通信 

主副本CH5接收并建立来自远端副本CH6的连接。

3)由主副本制定MERGE计划并推送Log日志 

由主副本CH5制定MERGE计划,并判断哪些分区需要被合并。在选定之后,CH5将合并计划转换 为Log日志对象并推送Log日志,以通知所有副本开始合并。

日志信息:

/log/log-0000000002
source replica: ch5.nauu.com
block_id:
type : merge
201905_0_0_0
201905_1_1_0
into
201905_0_1_1

操作类型为Merge合并,而这次需要合并的分区目录是201905_0_0_0 和201905_1_1_0。

与此同时,主副本还会锁住执行线程,对日志的接收情况进行监听。

监听行为由replication_alter_partitions_sync参数控制,默认值为1。当此参数为0时, 不做任何等待;为1时,只等待主副本自身完成;为2时,会等待所有副本拉取完成。

4)各个副本分别拉取Log日志 

CH5和CH6两个副本实例将分别监听/log/log-0000000002日志的推送,它们也会分别拉取日志到本地,并推送到各自的/queue任务队列

5)各个副本分别在本地执行MERGE

CH5和CH6基于各自的/queue队列开始执行任务,开始在本地执行MERGE。

至此,整个合并流程结束。

在MERGE的合并过程中,ZooKeeper也不会进行任何实质性的数据传输,所有的合并操作,最终都是由各个副本在本地完成的。

无论合并动作在哪个副本被触发,都会首先被转交至主副本,再由主副本负责合并计划的制定、消息日志的推送以及对日志接收情况的监控。

3.MUTATION的核心执行流程

执行ALTER DELETE或者ALTER UPDATE操作的时候,即会进入 MUTATION部分的逻辑。

与MERGE类似,由主副本进行响应 。

1)推送MUTATION日志

在CH6节点尝试通过DELETE来删除数据。

执行之后,该副本会接着进行两个重要事项:

  1. 创建MUTATION ID
  2. 将MUTATION操作转换为MutationEntry日志,并推送到/mutations/0000000000

MutationEntry的核心属性:

/mutations/0000000000
source replica: ch6.nauu.com
mutation_id: 2
partition_id: 201905
commands: DELETE WHERE id = \'1\'

MUTATION的操作日志是经由/mutations节点分发至各个副本的。

2)所有副本实例各自监听MUTATION日志

CH5和CH6都会监听/mutations节点,当监听到有新的MUTATION日志加入时,并不是所有副本都会直接做出响应,它们首先会判断自己是否为主副本。

3)由主副本实例响应MUTATION日志并推送Log日志

只有主副本才会响应MUTATION日志。

CH5将MUTATION日志转 换为LogEntry日志并推送至/log节点 。

日志的核心信息:

/log/log-0000000003
source replica: ch5.nauu.com
block_id:
type : mutate
201905_0_1_1
to
201905_0_1_1_2

类型为mutate,需要将201905_0_1_1分区修改为 201905_0_1_1_2(201905_0_1_1 +"_" + mutation_id)。

4)各个副本实例分别拉取Log日志

CH5和CH6两个副本分别监听/log/log-0000000003日志的推送,分别拉取日志到本 地,并推送到各自的/queue任务队列。

5)各个副本实例分别在本地执行MUTATION 

CH5和CH6基于各自的/queue队列开始执行任务,在本地执行MUTATION。

至此,整个MUTATION流程结束。

所有的MUTATION操作,最终都是由各个副本在本地完成的。

无论 MUTATION动作从哪个副本被触发,之后都会被转交至主副本,再由主副本负责推送Log日志,以通知各个副本执行最终的MUTATION逻辑。同时也由主副本对日志接收的情况实行监控。

4.ALTER的核心执行流程 

执行增加、删除表字段等操作的时候,进入ALTER部分的逻辑。

其执行过程中并不会涉及/log日志的分发。整个流程从上至下按照时间顺序进行。

1)修改共享元数据

在CH6节点尝试增加一个列字段,执行之后,CH6会修改ZooKeeper内的共享元数据节点。

数据修改后,节点的版本号也会同时提升。

与此同时,CH6还会负责监听所有副本的修改完成情况

2)监听共享元数据变更并各自执行本地修改 

CH5和CH6两个副本分别监听共享元数据的变更。

之后,它们会分别对本地的元数据版本号与 共享版本号进行对比。

发现本地版本号低于共享版本号,开始在各自的本地执行更新操作 。

3)确认所有副本完成修改

CH6确认所有副本均已完成修改。

至此,整个ALTER流程结束。

本着谁执行谁负责的原则,在这个案例中由CH6 负责对共享元数据的修改以及对各个副本修改进度的监控。

10.4 数据分片

数据副本无法解决数据表的容量问题。

ClickHouse 的数据分片需要结合Distributed表引擎一同使用。

Distributed表引擎自身不存储任何数据,它能够作为分布式表的 一层透明代理,在集群内部自动开展数据的写入、分发、查询、路由等工作。

10.4.1 集群的配置方式

1分片、0副本语义的配置如下
<shard> <!-- 分片 --><replica><!—副本 --></replica>
</shard>示1分片、1副本语义的配置
<shard> <!-- 分片 --><replica><!—副本 --></replica><replica></replica>
</shard>

集群有两种配置形式:

1.不包含副本的分片

【这里使用node的配置实在不常见,我看到的所有厂商的都是使用shard来定义的,这块略过】

2.自定义分片与副本

<!-- 2个分片、0个副本 -->
<sharding_simple> <!-- 自定义集群名称 --><shard> <!-- 分片 --><replica> <!-- 副本 --><host>ch5.nauu.com</host><port>9000</port></replica></shard><shard><replica><host>ch6.nauu.com</host><port>9000</port></replica></shard>
</sharding_simple><!-- 1个分片 1个副本-->
<sharding_simple_1><shard><replica><host>ch5.nauu.com</host><port>9000</port></replica><replica><host>ch6.nauu.com</host><port>9000</port></replica></shard>
</sharding_simple_1><!-- 2个分片 1个副本-->
<sharding_ha><shard><replica><host>ch5.nauu.com</host><port>9000</port></replica><replica><host>ch6.nauu.com</host><port>9000</port></replica></shard><shard><replica><host>ch7.nauu.com</host><port>9000</port></replica><replica><host>ch8.nauu.com</host><port>9000</port></replica></shard>
</sharding_ha>

10.4.2 基于集群实现分布式DDL

默认的情况下,CREATE、DROP、RENAME和ALTER等 DDL语句并不支持分布式执行。

而在加入集群配置后,就可以使用新的语法 实现分布式DDL执行了:

CREATE/DROP/RENAME/ALTER TABLE ON CLUSTER cluster_name

cluster_name对应了配置文件中的集群名称,可以通过系统表来查询:

SELECT cluster, host_name FROM system.clusters
-- 创建表来举例:
-- ClickHouse会根据集群shard_2的配置信息, 分别在CH5和CH6节点本地创建test_1_local。
CREATE TABLE test_1_local ON CLUSTER shard_2(id UInt64--这里可以使用任意其他表引擎,--用{shard}和{replica}两个动态宏变量代替了硬编码方式。
)ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/test_1', '{replica}')
ORDER BY id-- 删除:
DROP TABLE test_1_local ON CLUSTER shard_2-- 执行下面的语句查询系统表,能够看到当前ClickHouse节点中已存在的宏变量:
--ch5节点
SELECT * FROM system.macros
┌─macro───┬─substitution─┐
│ replica │ ch5.nauu.com │
│ shard │ 01 │
└───────┴─────────┘
--ch6节点
SELECT * FROM remote('ch6.nauu.com:9000', 'system', 'macros', 'default')
┌─macro───┬─substitution─┐
│ replica │ ch6.nauu.com │
│ shard │ 02 │
└───────┴─────────┘

宏变量是通过配置文件的形式预先定义在各个节点的配置文件中的:

<macros><shard>01</shard><replica>ch5.nauu.com</replica>
</macros><macros><shard>02</shard><replica>ch6.nauu.com</replica>
</macros>

1.数据结构

1)ZooKeeper内的节点结构

默认情况下,分布式DDL在ZooKeeper内使用的根路径为:/clickhouse/task_queue/ddl

该路径由config.xml内的distributed_ddl配置指定:

<distributed_ddl><!-- Path in ZooKeeper to queue with DDL queries --><path>/clickhouse/task_queue/ddl</path>
</distributed_ddl>

在此根路径之下,还有一些其他的监听节点,其中包括/query- [seq],其是DDL操作日志,每执行一次分布式DDL查询,在该节点下就会新增一条操作日志,以记录相应的操作指令。当各个节点监听到有新日志加入的时候,便会响应执行。DDL操作日志使用ZooKeeper的持久顺序型节点,每条指令的名称以query-为前缀,后面的序号递增,例如query0000000000、query-0000000001等。

每条query-[seq]操作日志之下,还 有两个状态节点:

  1. /query-[seq]/active:用于状态监控等用途,在任务的执行过程中,在该节点下会临时保存当前集群内状态为active的节点。
  2. /query-[seq]/finished:用于检查任务完成情况,在任务的执行过程中,每当集群内的某个host节点执行完毕之后,便会在该节点下写入记录。例如下面的语句。
    /query-000000001/finished
    ch5.nauu.com:9000 : 0
    ch6.nauu.com:9000 : 0

    表示集群内的CH5和CH6两个节点已完成任务。

2)DDLLogEntry日志对象的数据结构

在/query-[seq]下记录的日志信息由DDLLogEntry承载,核心内容:

  • query记录了DDL查询的执行语句

    query: DROP TABLE default.test_1_local ON CLUSTER shard_2
  • hosts记录了指定集群的hosts主机列表,集群由分布式DDL语句中的ON CLUSTER指定
    hosts: ['ch5.nauu.com:9000','ch6.nauu.com:9000']
  • initiator记录初始化host主机的名称
    initiator: ch5.nauu.com:9000hosts主机列表的取值来源等同于:
    --从initiator节点查询cluster信息
    SELECT host_name FROM
    remote('ch5.nauu.com:9000', 'system', 'clusters', 'default')
    WHERE cluster = 'shard_2'
    

2.分布式DDL的核心执行流程

解释分布式DDL的核心执行流程。

(1)推送DDL日志

首先在CH5节点执行CREATE TABLE ON CLUSTER, 本着谁执行谁负责的原则,在这个案例中将会由CH5节点负责创建 DDLLogEntry日志并将日志推送到ZooKeeper,同时也会由这个节点负责监控任务的执行进度。

(2)拉取日志并执行

CH5和CH6两个节点分别监听/ddl/query0000000064日志的推送,于是它们分别拉取日志到本地。首先,它们会判断各自的host是否被包含在DDLLog-Entry的hosts列表中。

如果包含在内, 则进入执行流程,执行完毕后将状态写入finished节点;如果不包含,则忽略这次日志的推送。

(3)确认执行进度

在步骤1执行DDL语句之后,客户端会阻塞等待 180秒,以期望所有host执行完毕。如果等待时间大于180秒,则会转入后台线程继续等待(等待时间由distributed_ddl_task_timeout参数指定, 默认为180秒)。

10.5 Distributed原理解析

自身不存储任何数据,而是作为数据分片的透明代理,能够自动路由数据至集群中的各个节点,所以Distributed表引擎需要和其他数据表引擎一起协同工作。

从实体表层面来看,一张分片表由两部分组成:

  • 本地表:通常以_local为后缀进行命名。本地表是承接数据的载体,可以使用非Distributed的任意表引擎,一张本地表对应了一个数据分片。
  • 分布式表:通常以_all为后缀进行命名。分布式表只能使用 Distributed表引擎,它与本地表形成一对多的映射关系,日后将通过分布式表代理操作多张本地表。

在创建表时并不会进行检查一致性,Distributed表引擎采用了读时检查的机制,这意味着如果它们的表结构不兼容,只 有在查询时才会抛出错误。

10.5.1 定义形式

Distributed表引擎的定义形式如下所示:

ENGINE = Distributed(cluster, database, table [,sharding_key])
  • cluster:集群名称,与集群配置中的自定义名称相对应。在对分布式表执行写入和查询的过程中,它会使用集群的配置信息来找到相应的host节点。
  • database和table:分别对应数据库和表的名称,分布式表使用这组配置映射到本地表。
  • sharding_key:分片键,选填参数。在数据写入的过程中,分布式表会依据分片键的规则,将数据分布到各个host节点的本地表。
-- 在集群sharding_simple创建分布式表
-- 根据rand()随机函数的取值决定数据写入哪个分片。
CREATE TABLE test_shard_2_all ON CLUSTER sharding_simple (id UInt64
)ENGINE = Distributed(sharding_simple, default, test_shard_2_local,rand())-- 在集群sharding_simple创建本地表
CREATE TABLE test_shard_2_local ON CLUSTER sharding_simple (id UInt64
)ENGINE = MergeTree()
ORDER BY id
PARTITION BY id

10.5.2 查询的分类

Distributed表的查询操作可以分为:

  • 会作用于本地表的查询:对于INSERT和SELECT查询, Distributed将会以分布式的方式作用于local本地表。而对于这些查询的具体执行逻辑,将会在后续小节介绍。
  • 只会影响Distributed自身,不会作用于本地表的查询: Distributed支持部分元数据操作,包括CREATE、DROP、RENAME和 ALTER,其中ALTER并不包括分区的操作(ATTACH PARTITION、REPLACE PARTITION等)。这些查询只会修改Distributed表自身,并不会修改 local本地表。(要彻底删除一张分布式表,则需要分别删除分布式 表和本地表)
    --删除分布式表
    DROP TABLE test_shard_2_all ON CLUSTER sharding_simple
    --删除本地表
    DROP TABLE test_shard_2_local ON CLUSTER sharding_simple
  • 不支持的查询:Distributed表不支持任何MUTATION类型的操作,包括ALTER DELETE和ALTER UPDATE。

10.5.3 分片规则

分片键要求返回一个整型类型的取值,包括Int 系列和UInt系列。

--按照用户id的余数划分
Distributed(cluster, database, table ,userid)--按照随机数划分
Distributed(cluster, database, table ,rand())--按照用户id的散列值划分
Distributed(cluster, database, table , intHash64(userid))--如果不声明分片键,那么分布式表只能包含一个分片,这意味着只能映射一张本地表,否则,
在写入数据时将会得到如下异常:
Method write is not supported by storage Distributed with more than one shard and no sharding key provided

数据具体是如何被划分?

1.分片权重(weight)

集群的配置中,有一项weight(分片权重)的设置:

<sharding_simple><!-- 自定义集群名称 --><shard><!-- 分片 --><weight>10</weight><!-- 分片权重 -->……</shard><shard><weight>20</weight>……</shard>
…

weight默认为1,虽然可以将它设置成任意整数,但官方建议应该尽可能设置成较小的值。分片权重会影响数据在分片中的倾斜程度,一个分片权重值越大,那么它被写入的数据就会越多。

 2.slot(槽)

slot的数量等于所有分片的权重之和。

假设集群sharding_simple有两个Shard分 片,第一个分片的weight为10,第二个分片的weight为20,那么slot的数量则等于30。

slot按照权重元素的取值区间,与对应的分片形成映射关系。在这个示例中,如果slot值落在[0,10)区间,则对应第一个分片;如果slot值落在[10,20]区间,则对应第二个分片。

3.选择函数

数用于判断一行待写入的数据应该被写入哪个分片。

步骤为:

  1. 找出slot的取值,slot = shard_value % sum_weight。shard_value是分片键的取值;sum_weight是所有分片的权重之和;slot等于 shard_value和sum_weight的余数。假设某一行数据的shard_value是10,sum_weight是30(两个分片,第一个分片权重为10,第二个分片权重为20),那么slot值等于10(10%30=10)。
  2. 基于slot值找到对应的数据分片。当slot值等于10的时候,它属于[10,20)区间,所以这行数据会对应到第二个Shard分片。

10.5.4 分布式写入的核心流程

两种思路:

  1. 直接将数据写入ClickHouse集群的各个本地表 ,拥有更好的写入性能【也是ck官方推荐的方法】
  2. 通过Distributed表引擎代理写入分片数据。(重点介绍)

1.将数据写入分片的核心流程

1)在第一个分片节点写入本地分片数据

在CH5节点,对分布式表test_shard_2_all执行INSERT查询,写入 10、30、200和55四行数据。

分布式表根据分片规则划分数据,30到分片1;10,200,55到分片2。

属于当前分片的数据直接写入本地表test_shard_2_local。

2)第一个分片建立远端连接,准备发送远端分片数据

将归至远端分片的数据以分区为单位,分别写入test_shard_2_all存储目录下的临时bin文件 。

数据文件的命名规则如下:

/database@host:port/[increase_num].bin

例子中为:/test_shard_2_all/default@ch6.nauu.com:9000/1.bin

10、200和55三行数据会被写入上述这个临时数据文件。

之后开始尝试与远端CH6分片建立连接。

3)第一个分片向远端分片发送数据

监听到/test_shard_2_all目录下的文件变化 ,开始发送数据。

每份目录将会由独立的线程负责发送,数据在传输之前会被压缩。

4)第二个分片接收数据并写入本地

CH6分片节点确认建立与CH5的连接,接收到来自CH5发送的数据后,将它们写入本地表

5)由第一个分片确认完成写入

由CH5分片确认所有的数据发送完毕。

至此,整个流程结束。

Distributed表负责所有分片的写入工作。本着谁执行谁负责的原则,在这个示例中,由CH5节点的分布式表负责切分数据,并向所有其他分片节点发送数据。

Distributed表负责向远端分片发送数据时,有异步写和同步写两种模式:

  • 异步写,则在Distributed表写完本地分片之后,INSERT查询就会返回成功写入的信息;
  • 同步写,则在执行INSERT查询之后,会等待所有分 片完成写入。

insert_distributed_sync参数控制,默认为 false,即异步写。

2.副本复制数据的核心流程

数据在多个副本之间,有两种复制实现方式:

  • 一种是继续借助Distributed表引擎,由它将数据写入副本;
  • 另一种则是借助 ReplicatedMergeTree表引擎实现副本数据的分发。

1)通过Distributed复制数据

Distributed会同时负责分片和副本的数据写入工作,副本数据的写入流程与分片逻辑相同。

在这种实现方案下,Distributed节点需要同时负责分片和副本的数据写入工作,它很有可能会成为写入的单点瓶颈 。

2)通过ReplicatedMergeTree复制数据

在集群的shard配置中增加internal_replication参数并将其设置为 true(默认为false),那么Distributed表在该shard中只会选择一个合适的 replica并对其写入数据。

此时多个replica副本之间的数据复制会交由 ReplicatedMergeTree自己处理。

在shard中选择replica的算法大致如下:

  1. 在ClickHouse的服务节点中,拥有一个全局计数器errors_count,当服务出现任何异常时,该计数累积加1;
  2. 接着,当一个shard内拥有多个replica时,选择errors_count错误最少的那个。

10.5.5 分布式查询的核心流程

集群查询数据只能通过Distributed表引擎实现。

Distributed表会依次查询每个分片的数据,再合并汇总返回。

1.多副本的路由规则

如果集群中的一个shard,拥有多个replica,Distributed表会使用负载均衡算法从众多replica中选择一个。具体使用何种负载均衡算法,则 由load_balancing参数控制:

load_balancing = random/nearest_hostname/in_order/first_or_random

1)random

默认的负载均衡算法,random算法会选择errors_count错误数量最 少的replica,如果多个replica的errors_count计数相同,则在它们之中随机选择一个。

2)nearest_hostname 

random算法的变种,会选择errors_count错误数量最少的 replica,如果多个replica的errors_count计数相同,则选择集群配置中host名称与当前host最相似的一个。

规则是以当前host名称为基准按字节逐位比较,找出不同字节数最少的一个。

3)in_order

random算法的变种,会选择errors_count错误数量最少的replica,如果多个replica的errors_count计数相同,则按照集群配置中replica的定义顺序逐个选择。

4)first_or_random

作in_order算法的变种,首先它会选择errors_count错误数量最少的 replica,如果多个replica的errors_count计数相同,它首先会选择集群配置中第一个定义的replica, 如果该replica不可用,则进一步随机选择一个其他的replica。

2.多分片查询的核心流程 

本着谁执行谁负责的原则,它会由接收SELECT查询的 Distributed表,并负责串联起整个过程。

-- 查询分布式表:
SELECT * FROM distributed_table--sql进行转换后发送到远端分片节点:
SELECT * FROM local_table

Distributed表引擎会将查询计划转换为多个分片的UNION联合查询。

1)查询各个分片数据

在图10-18所示执行计划中,One和Remote步骤是并行执行的。分别负责了本地和远端分片的查询 动作。

2)合并返回结果 

多个分片数据均查询返回后,进行数据合并。

3.使用Global优化分布式子查询 

1)使用本地表的问题

-- 考虑in中使用本地表:
SELECT uniq(id) FROM test_query_all WHERE repo = 100
AND id IN (SELECT id FROM test_query_local WHERE repo = 200)-- ck会将SQL替换成本地表的形式,再发送到每个分片进行执行。
-- 注意IN查询的子句使用的是本地表:
SELECT uniq(id) FROM test_query_local WHERE repo = 100
AND id IN (SELECT id FROM test_query_local WHERE repo = 200)

由于在单个分片上只保存了部分的数据,所以该SQL语句结果是有问题的。

2)使用分布式表的问题

-- 尝试在IN查询子句中使用分布式表:
SELECT uniq(id) FROM test_query_all WHERE repo = 100
AND id IN (SELECT id FROM test_query_all WHERE repo = 200)

虽然结果正确,但是由于in中也使用了分布式表,再次向其他分片发起远程查询,此时会造成查询放大。

3)使用GLOBAL优化查询

使用GLOBAL IN或JOIN进行优化。

SELECT uniq(id) FROM test_query_all WHERE repo = 100
AND id GLOBAL IN (SELECT id FROM test_query_all WHERE repo = 200)

整个过程由上至下大致分成5个步骤:

  1. 将IN子句单独提出,发起了一次分布式查询。
  2. 将分布式表转local本地表后,分别在本地和远端分片执行查询。
  3. 将IN子句查询的结果进行汇总,并放入一张临时的内存表进行保存。
  4. 将内存表发送到远端分片节点。
  5. 将分布式表转为本地表后,开始执行完整的SQL语句,IN子句直接使用临时内存表的数据。

避免了查询放大的问题。

【但是内存表数据会造成网络开销,IN或者JOIN子句返回的数据不宜过大。】

10.6 本章小结

介绍了副本、分片和集群的使用方法以及核心工作流程。

ReplicatedMergeTree表引擎和Distributed表引擎的核心功能与工作流程。

下一章将介绍与ClickHouse管理与运维相关的内容。

《ClickHouse原理解析与应用实践》读书笔记(7)相关推荐

  1. 读书笔记:《德鲁克管理思想精要》- 2

    <德鲁克管理思想精要>  美 . 彼复 . 德鲁克 著     李维安 王世权 刘金岩 译     <The Essential Drucker>The Best of Six ...

  2. 读书笔记:《德鲁克管理思想精要》- 4

    <德鲁克管理思想精要>  美 . 彼复 . 德鲁克 著     李维安 王世权 刘金岩 译     <The Essential Drucker>The Best of Six ...

  3. 读书笔记:《德鲁克管理思想精要》- 6

    <德鲁克管理思想精要>  美 . 彼复 . 德鲁克 著     李维安 王世权 刘金岩 译     <The Essential Drucker>The Best of Six ...

  4. 读书笔记:《德鲁克管理思想精要》- 7

    <德鲁克管理思想精要>  美 . 彼复 . 德鲁克 著     李维安 王世权 刘金岩 译     <The Essential Drucker>The Best of Six ...

  5. 读书笔记:《德鲁克管理思想精要》- 5

    <德鲁克管理思想精要>  美 . 彼复 . 德鲁克 著     李维安 王世权 刘金岩 译     <The Essential Drucker>The Best of Six ...

  6. 读书笔记:《德鲁克管理思想精要》- 1

    <德鲁克管理思想精要>  美 . 彼复 . 德鲁克 著     李维安 王世权 刘金岩 译     <The Essential Drucker>The Best of Six ...

  7. 读书笔记:《德鲁克管理思想精要》- 3

    <德鲁克管理思想精要>  美 . 彼复 . 德鲁克 著     李维安 王世权 刘金岩 译     <The Essential Drucker>The Best of Six ...

  8. 读书笔记:《德鲁克管理思想精要》- 8 汇总

    <德鲁克管理思想精要>  美 . 彼复 . 德鲁克 著     李维安 王世权 刘金岩 译     <The Essential Drucker>The Best of Six ...

  9. 图书推荐:德鲁克管理思想精要(珍藏版)

    本书是德鲁克历年作品的精简摘编版本,目的是为了解决"德鲁克的书这么多",到底应该从哪里看起的问题.可以作为中等快餐作品来阅读. 本书分为三个部分:管理篇,个人篇,社会篇.本人是从& ...

  10. 彼得-德鲁克管理思想分享与理解

    l l <彼得认为:>企业的目的不在自身,而必须存在于企业本身之外,必须存在于社会之中,这就是造就客户 <朝晖理解:>企业是社会经济活动的一个点,一个环节.作为企业的领导者必须 ...

最新文章

  1. 为什么应该学习Kotlin
  2. ASP.NET分页存储过程自定义用户控件
  3. Sublime Text 2.0.1 版本 Build 2217 汉化包
  4. sap 供应商表_财务人员学习SAP的路线图
  5. 计算机在超声的应用,计算机在医学超声成像中应用.pdf
  6. 投资大佬都在看的一张报表
  7. 宇宙是否可以了解,宇宙和计算机科学有关系吗?
  8. 泛微oa主表赋值明细表_OA系统学习--三
  9. java html截图_Java实现网页截图/登录截图
  10. tumblr_如何将Google AdSense添加到您的Tumblr博客
  11. CI/CD到底是什么?看完就能很快理解
  12. 推动区块链技术应用创新河南开展区块链应用场景需求和典型应用案例征集工作
  13. grafana是什么?
  14. VS2017无法打开graphics.h解决方法
  15. python关于列表去重和删除的方法
  16. JVM2-性能监控故障处理工具
  17. 一喝到威士忌真是什么烦恼都忘了
  18. Staubli HCB08.7202/IC/JS3
  19. 跨考考研:从第二学士学位到西北工业大学电子信息专硕拟录取
  20. 《网络游戏核心技术与实战》学习笔记——专业术语

热门文章

  1. 如何高效的使用mac
  2. 关于线性空间和线性映射
  3. python爬取笔趣阁
  4. Java中Date日期时间的工具类
  5. Nagios监控服务器与客户端的安装
  6. Cesium Primitives加载大量图标点
  7. 论文阅读学习 - 深度学习网络模型分析对比
  8. 如果你不想像中兴跳楼程序员那样,2018年就应该做这8件事,别再等到中年危机了
  9. Sci-Hub创始人收到苹果的通知:2年前就把她的账户数据给了FBI
  10. html为什么链接无效,如何揪出网页中的无效链接