文章目录

  • 1.概述
  • 2.Distributed之表查询流程

1.概述

转载:【ClickHouse源码】Distributed之表select流程
仅仅转载一下,防止以后用到

2.Distributed之表查询流程

Distributed表引擎不会真实存储数据,是ClickHouse提供的一个分布式查询引擎,其查询原理大致概括起来就是将server端接收到的查询请求进行重写,并发送到指定的多个server端去执行查询,最终由接到请求的server端进行汇总,最后返回给client端。这个过程可以通过源码来更清晰的了解以下。

首先,从BlockInputStreams StorageDistributed::read方法说起,因为从InterpreterSelectQuery*这类的查询都会调用BlockInputStreams 类型的read方法

BlockInputStreams StorageDistributed::read(const Names & /*column_names*/,const SelectQueryInfo & query_info,const Context & context,QueryProcessingStage::Enum processed_stage,const size_t /*max_block_size*/,const unsigned /*num_streams*/)
{auto cluster = getCluster();// 获取settings,比如内存最大使用量之类的配置const Settings & settings = context.getSettingsRef();// 这里就是上面提到过的重写const auto & modified_query_ast = rewriteSelectQuery(query_info.query, remote_database, remote_table, remote_table_function_ptr);// 初始化一个不包含数据的BlockBlock header =InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage)).getSampleBlock();// 根据是使用表函数还是直接使用库表的不同进入不同的逻辑ClusterProxy::SelectStreamFactory select_stream_factory = remote_table_function_ptr? ClusterProxy::SelectStreamFactory(header, processed_stage, remote_table_function_ptr, context.getExternalTables()): ClusterProxy::SelectStreamFactory(header, processed_stage, QualifiedTableName{remote_database, remote_table}, context.getExternalTables());// 是否自动跳过未使用的shard,如果配置了sharding_key,可以减小查询要搜索的shard范围if (settings.optimize_skip_unused_shards){if (has_sharding_key){auto smaller_cluster = skipUnusedShards(cluster, query_info);if (smaller_cluster){cluster = smaller_cluster;LOG_DEBUG(log, "Reading from " << database_name << "." << table_name << ": ""Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): "" " << makeFormattedListOfShards(cluster));}else{LOG_DEBUG(log, "Reading from " << database_name << "." << table_name << ": ""Unable to figure out irrelevant shards from WHERE/PREWHERE clauses - the query will be sent to all shards of the cluster");}}}// 根据重写的ast执行查询return ClusterProxy::executeQuery(select_stream_factory, cluster, modified_query_ast, context, settings);
}

read方法主要是sql重写及根据表函数及库表的不同逻辑初始化SelectStreamFactory,executeQuery方法是查询的入口

BlockInputStreams executeQuery(IStreamFactory & stream_factory, const ClusterPtr & cluster,const ASTPtr & query_ast, const Context & context, const Settings & settings)
{BlockInputStreams res;// 将重写的ast转为字符串,为了发送给其他serverconst std::string query = queryToString(query_ast);// 移除一些上下文的user限制,比如本次触发查询的user在其他server上,对于其他server而言// 是个新的user,不会累积统计一些限制Context new_context = removeUserRestrictionsFromSettings(context, settings);// user限流设置ThrottlerPtr user_level_throttler;if (auto process_list_element = context.getProcessListElement())user_level_throttler = process_list_element->getUserNetworkThrottler();// 如果没有配置限制,那么会使用最大带宽ThrottlerPtr throttler;if (settings.max_network_bandwidth || settings.max_network_bytes){throttler = std::make_shared(settings.max_network_bandwidth,settings.max_network_bytes,"Limit for bytes to send or receive over network exceeded.",user_level_throttler);}elsethrottler = user_level_throttler;// 为cluster的每个shard上创建stream_factory,并执行查询for (const auto & shard_info : cluster->getShardsInfo())stream_factory.createForShard(shard_info, query, query_ast, new_context, throttler, res);return res;
}

executeQuery方法主要是修改和设置一些配置,接下来是stream_factory的创建了,createForShard是个虚函数,具体实现如下

void SelectStreamFactory::createForShard(const Cluster::ShardInfo & shard_info,const String & query, const ASTPtr & query_ast,const Context & context, const ThrottlerPtr & throttler,BlockInputStreams & res)
{// 构造一个本地流方法auto emplace_local_stream = [&](){res.emplace_back(createLocalStream(query_ast, context, processed_stage));};// 构造一个远程流方法auto emplace_remote_stream = [&](){auto stream = std::make_shared(shard_info.pool, query, header, context, nullptr, throttler, external_tables, processed_stage);stream->setPoolMode(PoolMode::GET_MANY);if (!table_func_ptr)stream->setMainTable(main_table);res.emplace_back(std::move(stream));};// 获取settings配置const auto & settings = context.getSettingsRef();// prefer_localhost_replica默认为true,如果shard_info还本地分片,进入以下逻辑if (settings.prefer_localhost_replica && shard_info.isLocal()){StoragePtr main_table_storage;// 根据是不是表函数方式使用不同逻辑获取main_table_storage,即一个IStorageif (table_func_ptr){const auto * table_function = table_func_ptr->as();TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_function->name, context);main_table_storage = table_function_ptr->execute(table_func_ptr, context, table_function_ptr->getName());}elsemain_table_storage = context.tryGetTable(main_table.database, main_table.table);// 如果main_table_storage不存在,就尝试去其他server获取if (!main_table_storage){ProfileEvents::increment(ProfileEvents::DistributedConnectionMissingTable);if (shard_info.hasRemoteConnections()){LOG_WARNING(&Logger::get("ClusterProxy::SelectStreamFactory"),"There is no table " << main_table.database << "." << main_table.table<< " on local replica of shard " << shard_info.shard_num << ", will try remote replicas.");emplace_remote_stream();}elseemplace_local_stream(); return;}const auto * replicated_storage = dynamic_cast(main_table_storage.get());// 如果不是ReplicatedMergeTree引擎表,使用本地server,如果是就要考虑各个副本的// 延迟情况,如果延迟不满足会在去寻找其他副本if (!replicated_storage){emplace_local_stream();return;}UInt64 max_allowed_delay = settings.max_replica_delay_for_distributed_queries;// 如果没设置最大延迟,依旧选择本地副本查询if (!max_allowed_delay){emplace_local_stream();return;}UInt32 local_delay = replicated_storage->getAbsoluteDelay();// 如果设置了最大延迟且本地延迟小于最大延迟,本地副本依然有效,选择本地副本if (local_delay < max_allowed_delay){emplace_local_stream();return;}// 如果以上逻辑都没有进入,说明已经不满足延迟条件了,会执行以下代码ProfileEvents::increment(ProfileEvents::DistributedConnectionStaleReplica);LOG_WARNING(&Logger::get("ClusterProxy::SelectStreamFactory"),"Local replica of shard " << shard_info.shard_num << " is stale (delay: " << local_delay << "s.)");// 如果没有这是fallback,就不能使用本地副本,去尝试获取远程副本if (!settings.fallback_to_stale_replicas_for_distributed_queries){if (shard_info.hasRemoteConnections()){emplace_remote_stream();return;}elsethrow Exception("Local replica of shard " + toString(shard_info.shard_num)+ " is stale (delay: " + toString(local_delay) + "s.), but no other replica configured",ErrorCodes::ALL_REPLICAS_ARE_STALE);}// 如果没有远程副本可选,而且设置了fallback,则才会选择本地副本if (!shard_info.hasRemoteConnections()){emplace_local_stream();return;}// 构造lazily_create_stream方法,避免在主线程中进行连接auto lazily_create_stream = [pool = shard_info.pool, shard_num = shard_info.shard_num, query, header = header, query_ast, context, throttler,main_table = main_table, table_func_ptr = table_func_ptr, external_tables = external_tables, stage = processed_stage,local_delay]()-> BlockInputStreamPtr{auto current_settings = context.getSettingsRef();auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings).getSaturated(current_settings.max_execution_time);std::vector try_results;try{// 这里会去远端获取entry,getManyForTableFunction和getManyChecked方法// 最后都会调用getManyImpl方法,只不过传入的TryGetEntryFunc不同if (table_func_ptr)try_results = pool->getManyForTableFunction(timeouts, ¤t_settings, PoolMode::GET_MANY);elsetry_results = pool->getManyChecked(timeouts, ¤t_settings, PoolMode::GET_MANY, main_table);}catch (const Exception & ex){if (ex.code() == ErrorCodes::ALL_CONNECTION_TRIES_FAILED)LOG_WARNING(&Logger::get("ClusterProxy::SelectStreamFactory"),"Connections to remote replicas of local shard " << shard_num << " failed, will use stale local replica");elsethrow;}double max_remote_delay = 0.0;for (const auto & try_result : try_results){if (!try_result.is_up_to_date)max_remote_delay = std::max(try_result.staleness, max_remote_delay);}// 下面是将得到的result进行聚合if (try_results.empty() || local_delay < max_remote_delay)return createLocalStream(query_ast, context, stage);else{std::vector connections;connections.reserve(try_results.size());for (auto & try_result : try_results)connections.emplace_back(std::move(try_result.entry));return std::make_shared(std::move(connections), query, header, context, nullptr, throttler, external_tables, stage);}};res.emplace_back(std::make_shared("LazyShardWithLocalReplica", header, lazily_create_stream));}elseemplace_remote_stream();
}

createForShard主要是决定选择本地还是远程副本的问题,下面继续看下getManyImpl方法

std::vector ConnectionPoolWithFailover::getManyImpl(const Settings * settings,PoolMode pool_mode,const TryGetEntryFunc & try_get_entry)
{// 决定获取entries的数量size_t min_entries = (settings && settings->skip_unavailable_shards) ? 0 : 1;size_t max_tries = (settings ?size_t{settings->connections_with_failover_max_tries} :size_t{DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES});size_t max_entries;if (pool_mode == PoolMode::GET_ALL){min_entries = nested_pools.size();max_entries = nested_pools.size();}else if (pool_mode == PoolMode::GET_ONE)max_entries = 1;else if (pool_mode == PoolMode::GET_MANY)max_entries = settings ? size_t(settings->max_parallel_replicas) : 1;elsethrow DB::Exception("Unknown pool allocation mode", DB::ErrorCodes::LOGICAL_ERROR);// 获取策略,NEAREST_HOSTNAME、IN_ORDER、RANDOM、FIRST_OR_RANDOMGetPriorityFunc get_priority;switch (settings ? LoadBalancing(settings->load_balancing) : default_load_balancing){case LoadBalancing::NEAREST_HOSTNAME:get_priority = [&](size_t i) { return hostname_differences[i]; };break;case LoadBalancing::IN_ORDER:get_priority = [](size_t i) { return i; };break;case LoadBalancing::RANDOM:break;case LoadBalancing::FIRST_OR_RANDOM:get_priority = [](size_t i) -> size_t { return i >= 1; };break;}bool fallback_to_stale_replicas = settings ? bool(settings->fallback_to_stale_replicas_for_distributed_queries) : true;return Base::getMany(min_entries, max_entries, max_tries, try_get_entry, get_priority, fallback_to_stale_replicas);
}

getManyImpl方法主要是决定用多少entries以及远程副本的策略,继续看getMany方法

PoolWithFailoverBase::getMany(size_t min_entries, size_t max_entries, size_t max_tries,const TryGetEntryFunc & try_get_entry,const GetPriorityFunc & get_priority,bool fallback_to_stale_replicas)
{......std::string fail_messages;bool finished = false;while (!finished){for (size_t i = 0; i < shuffled_pools.size(); ++i){if (up_to_date_count >= max_entries || entries_count + failed_pools_count >= nested_pools.size()) {finished = true;break;}ShuffledPool & shuffled_pool = shuffled_pools[i];TryResult & result = try_results[i];if (shuffled_pool.error_count >= max_tries || !result.entry.isNull())continue;std::string fail_message;// 这里就是调用了上面提到的TryGetEntryFunc方法来真正的获取entryresult = try_get_entry(*shuffled_pool.pool, fail_message);if (!fail_message.empty())fail_messages += fail_message + '\n';if (!result.entry.isNull()){++entries_count;if (result.is_usable){++usable_count;if (result.is_up_to_date)++up_to_date_count;}}else{LOG_WARNING(log, "Connection failed at try №"<< (shuffled_pool.error_count + 1) << ", reason: " << fail_message);ProfileEvents::increment(ProfileEvents::DistributedConnectionFailTry);shuffled_pool.error_count = std::min(max_error_cap, shuffled_pool.error_count + 1);if (shuffled_pool.error_count >= max_tries){++failed_pools_count;ProfileEvents::increment(ProfileEvents::DistributedConnectionFailAtAll);}}}}if (usable_count < min_entries)throw DB::NetException("All connection tries failed. Log: \n\n" + fail_messages + "\n",DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED);try_results.erase(std::remove_if(try_results.begin(), try_results.end(),[](const TryResult & r) { return r.entry.isNull() || !r.is_usable; }),try_results.end());// 以下代码主要是对结果进行排序std::stable_sort(try_results.begin(), try_results.end(),[](const TryResult & left, const TryResult & right){return std::forward_as_tuple(!left.is_up_to_date, left.staleness)< std::forward_as_tuple(!right.is_up_to_date, right.staleness);});......return try_results;
}

getMany方法就是真正获取entry并进行排序的过程,至此,Distributed表的查询的大体流程就完整了。

【clickhouse】clickhouse源码 Distributed之表select流程相关推荐

  1. 修改gh-ost源码实现两表在线高速复制

    修改gh-ost源码实现两表在线高速复制 一.问题起源 笔者所在的公司的需要对核心业务表tb_doc 进行表分区,目前该表的记录数为190,522,155. 由于该表没有分区,新增分区需要创建影子表, ...

  2. Mybatis源码分析--关联表查询及延迟加载原理(二)

    在上一篇博客Mybatis源码分析--关联表查询及延迟加载(一)中我们简单介绍了Mybatis的延迟加载的编程,接下来我们通过分析源码来分析一下Mybatis延迟加载的实现原理. 其实简单来说Myba ...

  3. Redis如何实现分布式锁延时队列以及限流应用丨Redis源码原理|跳表|B+树|分布式锁|中间件|主从同步|存储原理

    Redis如何实现分布式锁延时队列以及限流应用 视频讲解如下,点击观看: Redis如何实现分布式锁延时队列以及限流应用丨Redis源码原理|跳表|B+树|分布式锁|中间件|主从同步|存储原理|数据模 ...

  4. Fuchsia源码分析--系统调用流程

    Fuchsia源码分析--系统调用流程 以zx_channel_create为例 Fuchsia系统调用的定义 Fuchsia系统调用定义文件的编译 Fuchsia系统调用用户空间的调用流程 zx_c ...

  5. 12.源码阅读(app启动流程-android api 26)

    activity的启动流程之前已经通过源码了解了,那么app的启动流程是怎样的,从我们按下app的图标,到应用启动起来显示出画面,中间都经历了什么? 安卓是基于java的,所以和java有一定的相似性 ...

  6. RxJava Agera 从源码简要分析基本调用流程(2)

    2019独角兽企业重金招聘Python工程师标准>>> 版权声明:本文由晋中望原创文章,转载请注明出处:  文章原文链接:https://www.qcloud.com/communi ...

  7. HDFS源码分析DataXceiver之整体流程

    在<HDFS源码分析之DataXceiverServer>一文中,我们了解到在DataNode中,有一个后台工作的线程DataXceiverServer.它被用于接收来自客户端或其他数据节 ...

  8. 视频直播APP源码开发iOS音频播放流程

    视频直播APP源码开发iOS音频播放流程 概览 随着移动互联网的发展,如今的手机早已不是打电话.发短信那么简单了,播放音乐.视频.录音.拍照等都是很常用的功能.在iOS中对于多媒体的支持是非常强大的, ...

  9. 【Debug跟踪Hadoop3.0.0源码之MapReduce Job提交流程】第三节 Job提交前的初始化

    [Debug跟踪Hadoop3.0.0源码之MapReduce Job提交流程]第三节 Job提交前的初始化 回顾 Job提交前的初始化 后记 跳转 回顾 上一节中我们对 jobSubmitter(提 ...

最新文章

  1. OPA 7 - opaTest
  2. 多么痛的领悟--写在领英股票被腰斩之后
  3. 最热门的10个Java微服务框架
  4. 服务器训练数据 关闭终端,一文明白使用nohup将服务器训练程序后台运行不关闭+随时通过查看训练情况输出与visdom可视化...
  5. 华为 HarmonyOS2.0(鸿蒙OS) 开发者beta公测招募的报名流程
  6. 如何访问docker内php,如何进入docker容器
  7. sql表格模型获取记录内容_SQL Server和BI –如何使用Excel记录表格模型
  8. 怀旧在2022:游戏ROM下载+游戏模拟器推荐(安卓/iOS)
  9. brctl 命令详解
  10. SPSS中的数据分析—描述性统计分析【3】
  11. xlsxwriter去掉网格线_xlsxwriter图表网格间距
  12. 小学计算机课第二课堂活动,小学信息技术第二课堂计划.doc
  13. redis数据结构分析-redisObject-SDS
  14. 100句充满智慧的人生格言
  15. 摸鱼儿·雁丘词 / 迈陂塘
  16. 小程序任务栏「最近使用」变两行,张小龙说得对,这里确实不是「入口」
  17. 英语单词: entropy;熵
  18. 使用mysqladmin修改用户密码的正确方法!
  19. 中国单反数码相机市场现状动态及前景规模调查报告2022-2028年版
  20. Go-Micro微服务框架使用

热门文章

  1. 亚马逊创始人下月将乘自家火箭进入太空 亲弟弟同行
  2. 独家=垄断?从抖音快手电影宣发签独家谈起
  3. 联想拯救者电竞手机Pro透明版马上就到:一眼就能看到“芯”
  4. 小鹏汽车第10000辆P7下线
  5. “浪姐”万茜盗号事件是锅传锅?阿里、网易都来回应了
  6. 尽说大实话!周鸿祎:有的软件会偷偷打开你的摄像头或麦克风
  7. 苹果iOS 14系统面板截图曝光:加入新墙纸设置
  8. 王思聪在京被执行案和解了!已履行5000万
  9. 心疼吗?被指是“傻X” 罗永浩深夜怒怼网友
  10. 蔚来将推第三款SUV 续航超500km 明年有望交付