在上篇文章中提到:服务端*Handler接收到client端SQL后会进行处理,今天具体来分析一下处理过程。会介绍一下大概的处理流程。以TCPHandler为例:

dbms/programs/server/TCPHandler.cpp文件中包含run()方法,调用runImpl()方法,包含注释的代码如下:

void TCPHandler::runImpl() {setThreadName("TCPHandler");......//从客户端读取的数据in = std::make_shared<ReadBufferFromPocoSocket>(socket());//要写到客户端的数据out = std::make_shared<WriteBufferFromPocoSocket>(socket());......try {receiveHello();//接收客户端发起TCP连接的hello packet, 第一次握手}catch (const Exception &e) /// Typical for an incorrect username, password, or address.{......}/// When connecting, the default database can be specified.// 设置默认的数据库if (!default_database.empty()) {....}connection_context.setCurrentDatabase(default_database);}sendHello();//回应客户端的连接, 完成第二次握手  (??三次握手,最后一次在哪儿??)connection_context.setProgressCallback([this](const Progress &value) { return this->updateProgress(value); });//握手连接完毕; 初始化上下文完毕while (1) {/// Set context of request.query_context = connection_context;//等待发起第三次握手. 每隔10s检查一下socket是否有新的输入 && server端是否关闭//while(true)的情况下, 进入while循环, 代码不会向下执行;//while(false)的情况下, 不进入while循环, 向下执行;/// We are waiting for a packet from the client. Thus, every `POLL_INTERVAL` seconds check whether we need to shut down.while (!static_cast<ReadBufferFromPocoSocket &>(*in).poll(global_settings.poll_interval * 1000000) &&!server.isCancelled());/// If we need to shut down, or client disconnects.if (server.isCancelled() || in->eof())break;Stopwatch watch;state.reset();/// Initialized later.std::optional<CurrentThread::QueryScope> query_scope;/** An exception during the execution of request (it must be sent over the network to the client).*  The client will be able to accept it, if it did not happen while sending another packet and the client has not disconnected yet.*///异常处理std::unique_ptr<Exception> exception;bool network_error = false;bool send_exception_with_stack_trace = connection_context.getSettingsRef().calculate_text_stack_trace;try {/// If a user passed query-local timeouts, reset socket to initial state at the end of the querySCOPE_EXIT({ state.timeout_setter.reset(); });//如果用户传递了查询本地超时时间,在查询结束时将套接字重置为初始状态/** If Query - process it. If Ping or Cancel - go back to the beginning.*  There may come settings for a separate query that modify `query_context`.*  客户端传递的SQL中可能包含了settings配置, 需对查询上下文进行修正*///receivePacket()方法根据数据包packet_type选择不同的处理方式//将接收到的内容保存到QueryState中if (!receivePacket())continue;query_scope.emplace(*query_context);send_exception_with_stack_trace = query_context->getSettingsRef().calculate_text_stack_trace;/// Should we send internal logs to client?//通过更改send_logs_level的值, 可以将日志在client端输出if (client_revision >= DBMS_MIN_REVISION_WITH_SERVER_LOGS&& query_context->getSettingsRef().send_logs_level.value != LogsLevel::none) {...}/// We have to copy external tables inside executeQuery() to track limits. Therefore, set callback for it. Must set once.//在执行executeQuery()方法的时候, 如果使用的是外部表, 则需要执行该回调函数. 涉及到从外部表读取数据query_context->setExternalTablesInitializer([&global_settings, this](Context &context) {......});customizeContext(*query_context);//自定义上下文(不太明白具体作用)bool may_have_embedded_data = client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_SUPPORT_EMBEDDED_DATA;/// Processing Query//重要方法, 仔细看看//处理SQL, 返回的是Streams of blocks/// 包含: 1)解析SQL生成抽象语法树AST; 2)根据AST得到执行器interpreter; 3)生成执行计划Pipelinestate.io = executeQuery(state.query, *query_context, false, state.stage, may_have_embedded_data);//这里的out是针对内存来说的, 对于insert语句, 数据从内存序列化到磁盘, 这对于内存来说时outif (state.io.out)//BlockOutputStreamPtr指针不为空, 表示需要从客户端接收数据state.need_receive_data_for_insert = true;//需要从客户端接收数据, 用以执行insert语句(注意不包含insert select 语句)after_check_cancelled.restart();after_send_progress.restart();/// Does the request require receive data from client?if (state.need_receive_data_for_insert)processInsertQuery(global_settings);elseprocessOrdinaryQuery();/// Do it before sending end of stream, to have a chance to show log message in client.query_scope->logPeakMemoryUsage();//记录最大内存使用量等信息sendLogs();sendEndOfStream();//数据发送完毕query_scope.reset();state.reset();}catch (const Exception &e) {...}...catch (...) {...}...watch.stop();LOG_INFO(log, std::fixed << std::setprecision(3)<< "Processed in " << watch.elapsedSeconds() << " sec.");...}}

这里面除去握手,初始化上下文,异常处理和数据统计的代码,最重要的逻辑是下面的这部分:

                /// Processing Query//重要方法, 仔细看看//处理SQL, 返回的是Streams of blocks/// 包含: 1)解析SQL生成抽象语法树AST; 2)根据AST得到执行器interpreter; 3)生成执行计划Pipelinestate.io = executeQuery(state.query, *query_context, false, state.stage, may_have_embedded_data);//这里的out是针对内存来说的, 对于insert语句, 数据从内存序列化到磁盘, 这对于内存来说时outif (state.io.out)//BlockOutputStreamPtr指针不为空, 表示需要从客户端接收数据state.need_receive_data_for_insert = true;//需要从客户端接收数据, 用以执行insert语句(注意不包含insert select 语句)after_check_cancelled.restart();after_send_progress.restart();/// Does the request require receive data from client?if (state.need_receive_data_for_insert)processInsertQuery(global_settings);elseprocessOrdinaryQuery();

再重点分析一下executeQuery()方法:

    //这个方法是针对TCP连接的BlockIO executeQuery(const String &query,Context &context,bool internal,QueryProcessingStage::Enum stage,bool may_have_embedded_data) {BlockIO streams;std::tie(std::ignore, streams) = executeQueryImpl(query.data(), query.data() + query.size(), context, internal,stage, !may_have_embedded_data);return streams;}

简化executeQueryImpl()方法后:

    static std::tuple<ASTPtr, BlockIO> executeQueryImpl(const char *begin,const char *end,Context &context,bool internal,QueryProcessingStage::Enum stage,bool has_query_tail) {...ParserQuery parser(end, settings.enable_debug_queries);ASTPtr ast;const char *query_end;/// Don't limit the size of internal queries. 对于internal queries, 不限制SQL的长度(byte)size_t max_query_size = 0;if (!internal)max_query_size = settings.max_query_size;try {/// TODO Parser should fail early when max_query_size limit is reached.// 解析SQL得到语法树ASTast = parseQuery(parser, begin, end, "", max_query_size);ast->dumpTree();//打印语法树ASTauto *insert_query = ast->as<ASTInsertQuery>();//如果是INSERT query且有额外的setting, 设置这些settingsif (insert_query && insert_query->settings_ast)InterpreterSetQuery(insert_query->settings_ast, context).executeForCurrentContext();//如果是INSERT query且包含具体数据if (insert_query && insert_query->data) {query_end = insert_query->data;insert_query->has_tail = has_query_tail;} elsequery_end = end;}catch (...) {...}/// Copy query into string. It will be written to log and presented in processlist./// If an INSERT query, string will not include data to insertion.// 将SQL复制到字符串query中。// query语句将被写入日志并显示在processlist中。如果是SQL是insert语句, query中将不包含要插入的数据。String query(begin, query_end);// SQL被保存在query这个字符串中(之前应该一直在buffer中)BlockIO res;try {//把SQL记录在server端的日志中, 如以 executeQuery: (from [::1]:62731) 开头logQuery(query.substr(0, settings.log_queries_cut_to_length), context, internal);...// 根据语法树AST调用InterpreterFactory的get()方法, 得到执行器interpreterauto interpreter = InterpreterFactory::get(ast, context, stage);/** Prepare a request for execution. Return block streams* - the stream into which you can write data to execute the query, if INSERT;* - the stream from which you can read the result of the query, if SELECT and similar;* Or nothing if the request INSERT SELECT (self-sufficient query - does not accept the input data, does not return the result).*/// 执行器调用execute()方法执行, 返回BlockIO (block streams)res = interpreter->execute();if (auto *insert_interpreter = typeid_cast<const InterpreterInsertQuery *>(&*interpreter)){context.setInsertionTable(insert_interpreter->getDatabaseTable());}if (process_list_entry) {/// Query was killed before execution. SQL在执行之前被取消了if ((*process_list_entry)->isKilled())throw Exception("Query '" + (*process_list_entry)->getInfo().client_info.current_query_id +"' is killed in pending state",ErrorCodes::QUERY_WAS_CANCELLED);else(*process_list_entry)->setQueryStreams(res);}/// Hold element of process list till end of query execution.res.process_list_entry = process_list_entry;// 这里的in是针对内存来说的,// 对于非insert语句, 数据从磁盘反序列化到内存, 这对于内存来说时in. (数据流向是: 磁盘->内存)// 对于insert语句, 数据从客户端序列化到内存, 这对于内存来说时in. (数据流向是: 客户端->内存)if (res.in) {res.in->setProgressCallback(context.getProgressCallback());res.in->setProcessListElement(context.getProcessListElement());/// Limits on the result, the quota on the result, and also callback for progress./// Limits apply only to the final result.if (stage == QueryProcessingStage::Complete) {IBlockInputStream::LocalLimits limits;limits.mode = IBlockInputStream::LIMITS_CURRENT;limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes,settings.result_overflow_mode);res.in->setLimits(limits);res.in->setQuota(quota);}}// 这里的out是针对内存来说的,// 对于非insert语句, 数据从内存反序列化到客户端命令行, 这对于内存来说时out (数据流向是: 内存->客户端)// 对于insert语句, 数据从内存序列化到磁盘, 这对于内存来说时out (数据流向是: 内存->磁盘)if (res.out) {if (auto stream = dynamic_cast<CountingBlockOutputStream *>(res.out.get())) {stream->setProcessListElement(context.getProcessListElement());}}/// Everything related to query log.{......//打印Query pipeline执行计划if (!internal && res.in) {std::stringstream log_str;log_str << "Query pipeline:\n";res.in->dumpTree(log_str);LOG_DEBUG(&Logger::get("executeQuery"), log_str.str());}}}catch (...) {......}return std::make_tuple(ast, res);}

这部分的重点是:

......ParserQuery parser(end, settings.enable_debug_queries);ASTPtr ast;......// 解析SQL得到语法树ASTast = parseQuery(parser, begin, end, "", max_query_size);......// 根据语法树AST调用InterpreterFactory的get()方法, 得到执行器interpreterauto interpreter = InterpreterFactory::get(ast, context, stage);/** Prepare a request for execution. Return block streams* - the stream into which you can write data to execute the query, if INSERT;* - the stream from which you can read the result of the query, if SELECT and similar;* Or nothing if the request INSERT SELECT (self-sufficient query - does not accept the input data, does not return the result).*/// 执行器调用execute()方法执行, 返回BlockIO (block streams)res = interpreter->execute();......

执行器interpreter调用execute()方法执行, 返回BlockIO (block streams), 这一步就是根据不同的SQL类型去构建block streams了。后面再做具体的分析。

ClickHouse源码阅读(0000 1001) —— CK Server对SQL的处理相关推荐

  1. ClickHouse源码阅读(0000 0110) —— 使用ReplicatedMergeTree引擎时的副本选择问题

    在使用ReplicatedMergeTree引擎和Distributed引擎的时候,对于同一张表,服务器上存在多个副本,在查询数据的时候,是如何在这些副本之间进行选择的呢?结合源码来试着分析一下... ...

  2. 【Flink】Flink 源码阅读笔记(18)- Flink SQL 中的流和动态表

    1.概述 转载:Flink 源码阅读笔记(18)- Flink SQL 中的流和动态表

  3. 【Flink】Flink 源码阅读笔记(16)- Flink SQL 的元数据管理

    1.概述 转载:Flink 源码阅读笔记(16)- Flink SQL 的元数据管理 Flink 源码阅读笔记(17)- Flink SQL 中的时间属

  4. 【Flink】Flink 源码阅读笔记(15)- Flink SQL 整体执行框架

    1.概述 转载:Flink 源码阅读笔记(15)- Flink SQL 整体执行框架 在数据处理领域,无论是实时数据处理还是离线数据处理,使用 SQL 简化开发将会是未来的整体发展趋势.尽管 SQL ...

  5. 深度 | 一条查询SQL的前世今生 —— ClickHouse 源码阅读

    作者:逍凯,阿里云数据库实习开发工程师 注:以下分析基于开源 v19.15.2.2-stable 版本进行,社区最新版本代码改动较大,但是总体思路是不变的. 01 用户提交一条查询SQL背后发生了什么 ...

  6. ClickHouse 源码阅读 —— SQL的前世今生

    注:以下分析基于开源 v19.15.2.2-stable 版本进行,社区最新版本代码改动较大,但是总体思路是不变的. 用户提交一条查询SQL背后发生了什么? 在传统关系型数据库中,SQL处理器的组件主 ...

  7. 64 源码_【ClickHouse内核】源码阅读策略

    " 摘要: 本文主要讲述如何阅读ClickHouse开源数据库代码的一些方式和技巧.主要内容如下: ClickHouse开源库简介 搭建运行环境 针对于ClickHouse库提出问题 阅读开 ...

  8. Golang Http Server源码阅读

    建议看这篇文章前先看一下net/http文档 http://golang.org/pkg/net/http/ net.http包里面有很多文件,都是和http协议相关的,比如设置cookie,head ...

  9. 2022-10-24 ClickHouse 源码解析-查询引擎经典理论

    ClickHouse 源码解析: 综述 ClickHouse 源码解析: MergeTree Write-Path ClickHouse 源码解析: MergeTree Read-Path Click ...

最新文章

  1. python求向量函数的雅可比矩阵_在python Numpy中求向量和矩阵的范数实例
  2. linux服务器间文件夹拷贝
  3. 微信计步器怎么不计步_难以关闭的微信朋友圈广告
  4. VxWorks平台下计算cpu的利用率
  5. word 转 html cms,Java 将Word文件转换为HTML格式文件
  6. 谈谈html5存储之IndexdDB
  7. Pycharm Professional(专业版2018.2.1)最简单方法破解,亲测有效(转)
  8. Android计算器界面布局
  9. 数字逻辑速成复习备考期末
  10. pyecharts对于经纬度_echarts 根据经纬度坐标在地图上描点
  11. WPS在中文状态下输入的标点符号为英文的问题
  12. Java根据图片生成GIF动图
  13. FPGA基础设计(9)Verilog数据类型和表达式
  14. 贪心算法or背包问题
  15. 【JavaSE】《基础篇005》集合
  16. 文字05 自定义字体
  17. 搭建Gitea和Drone环境
  18. 4.什么是MESI缓存一致性协议?怎么解决并发的可见性问题?
  19. Vue 中 v-if 和 v-show 的区别和用法
  20. Wondershare PDFelement for Mac v8.6.1 中文版 – 强大的PDF编辑工具

热门文章

  1. python版本分类及区别_python新版本与旧版本的区别
  2. 洛谷 P1069 细胞分裂 解题报告
  3. 老司机 iOS 周报 #22 | 2018-06-04
  4. 企业内网邮件钓鱼测试新手指南
  5. android 壁纸改变回调,Android Launcher分析和修改13——实现Launcher编辑模式(1) 壁纸更换...
  6. R报warning: Removed 10 rows containing missing values (geom_point)
  7. oppo手机ozip转换zip,ozip解包教程和工具分享,机型全网最全哦~
  8. 二分查找法之GGBond版
  9. 基于人脸识别的登录与注册
  10. 计算数字滤波器的频率响应