OpenTSDB源码解析

  • OpenTSDB查询路由/api/query的解析
    • 1. TSDMain.java
      • main()
    • 2. RpcManager.java
      • instance()
      • initializeBuiltinRpcs()
    • 3. QueryRpc.java
      • execute()
      • handleQuery()
      • GET方式-parseQuery()
      • GET方式-parseMTypeSubQuery()
    • 4. HttpJsonSerializer.java
      • POST方式- parseQueryV1()
    • 5. TSQuery.java
      • validateAndSetQuery()
    • 6. TSSubQuery.java
      • validateAndSetQuery()

OpenTSDB查询路由/api/query的解析

1. TSDMain.java

main()

整个project的入口在net/opentsdb/tools/TSDMain.java文件的main()函数中。

/*** Main class of the TSD, the Time Series Daemon.*/
final class TSDMain {......private static TSDB tsdb = null;public static void main(String[] args) throws IOException {//可以在打印日志时,获取类的具体信息Logger log = LoggerFactory.getLogger(TSDMain.class);log.info("Starting.");log.info(BuildData.revisionString());log.info(BuildData.buildString());......final ServerSocketChannelFactory factory;int connections_limit = 0;try {connections_limit = config.getInt("tsd.core.connections.limit");} catch (NumberFormatException nfe) {usage(argp, "Invalid connections limit", 1);}if (config.getBoolean("tsd.network.async_io")) {int workers = Runtime.getRuntime().availableProcessors() * 2;if (config.hasProperty("tsd.network.worker_threads")) {try {workers = config.getInt("tsd.network.worker_threads");} catch (NumberFormatException nfe) {usage(argp, "Invalid worker thread count", 1);}}final Executor executor = Executors.newCachedThreadPool();final NioServerBossPool boss_pool = new NioServerBossPool(executor, 1, new Threads.BossThreadNamer());final NioWorkerPool worker_pool = new NioWorkerPool(executor, workers, new Threads.WorkerThreadNamer());**factory = new NioServerSocketChannelFactory(boss_pool, worker_pool);**} else {factory = new OioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), new Threads.PrependThreadNamer());}......final ServerBootstrap server = new ServerBootstrap(factory);final RpcManager manager = RpcManager.instance(tsdb);server.setPipelineFactory(new PipelineFactory(tsdb, manager, connections_limit));......final InetSocketAddress addr = new InetSocketAddress(bindAddress,config.getInt("tsd.network.port"));server.bind(addr);......

2. RpcManager.java

上面代码中比较重要的是final RpcManager manager = RpcManager.instance(tsdb);
server.setPipelineFactory(new PipelineFactory(tsdb, manager, connections_limit));
我们先分析final RpcManager manager = RpcManager.instance(tsdb);

instance()

RpcManager.java中的instance方法如下:

public static synchronized RpcManager instance(final TSDB tsdb) {final RpcManager existing = INSTANCE.get();if (existing != null) {return existing;}final RpcManager manager = new RpcManager(tsdb);final String mode = Strings.nullToEmpty(tsdb.getConfig().getString("tsd.mode"));// Load any plugins that are enabled via Config.  Fail if any plugin cannot be loaded.final ImmutableList.Builder<RpcPlugin> rpcBuilder = ImmutableList.builder();if (tsdb.getConfig().hasProperty("tsd.rpc.plugins")) {final String[] plugins = tsdb.getConfig().getString("tsd.rpc.plugins").split(",");manager.initializeRpcPlugins(plugins, rpcBuilder);}manager.rpc_plugins = rpcBuilder.build();final ImmutableMap.Builder<String, TelnetRpc> telnetBuilder = ImmutableMap.builder();final ImmutableMap.Builder<String, HttpRpc> httpBuilder = ImmutableMap.builder();manager.initializeBuiltinRpcs(mode, telnetBuilder, httpBuilder);manager.telnet_commands = telnetBuilder.build();manager.http_commands = httpBuilder.build();final ImmutableMap.Builder<String, HttpRpcPlugin> httpPluginsBuilder = ImmutableMap.builder();if (tsdb.getConfig().hasProperty("tsd.http.rpc.plugins")) {final String[] plugins = tsdb.getConfig().getString("tsd.http.rpc.plugins").split(",");manager.initializeHttpRpcPlugins(mode, plugins, httpPluginsBuilder);}manager.http_plugin_commands = httpPluginsBuilder.build();INSTANCE.set(manager);return manager;}

再仔细看manager.initializeBuiltinRpcs(mode, telnetBuilder, httpBuilder); 是如何初始化telnetBuilder和httpBuilder的。

initializeBuiltinRpcs()

private void initializeBuiltinRpcs(final String mode,final ImmutableMap.Builder<String, TelnetRpc> telnet,final ImmutableMap.Builder<String, HttpRpc> http) {final Boolean enableApi = tsdb.getConfig().getString("tsd.core.enable_api").equals("true");if (mode.equals("rw") || mode.equals("wo")) {final PutDataPointRpc put = new PutDataPointRpc();telnet.put("put", put);if (enableApi) {http.put("api/put", put);}if(mode.equals("rw") || mode.equals("ro")) {......if (enableApi) {http.put("api/aggregators", aggregators);http.put("api/annotation", annotation_rpc);http.put("api/annotations", annotation_rpc);http.put("api/config", new ShowConfig());http.put("api/dropcaches", dropcaches);http.put("api/query", new QueryRpc());http.put("api/search", new SearchRpc());http.put("api/serializers", new Serializers());http.put("api/stats", stats);http.put("api/suggest", suggest_rpc);http.put("api/tree", new TreeRpc());http.put("api/uid", new UniqueIdRpc());http.put("api/version", version);}}}}

在实例化函数instance()中,我们对RpcManager类的成员变量telnet_commands和http_commands进行了初始化,他们的类型分别是ImmutableMap<String, TelnetRpc>和ImmutableMap<String, HttpRpc>,可以看出是Map类型,且Map键值不可变。而初始化的主要方法是 initializeBuiltinRpcs(final String mode, final ImmutableMap.Builder<String, TelnetRpc> telnet, final ImmutableMap.Builder<String, HttpRpc> http), 由于java是引用传递,所以对Map类型的http 和telnet的改变在函数执行结束之后会保留,从上面的代码可以看出,如果http服务打开了的话,那么就对Map类型的http进行初始化。我们主要关注的是/api/query这一项。

3. QueryRpc.java

execute()

final class QueryRpc implements HttpRpc {private static final Logger LOG = LoggerFactory.getLogger(QueryRpc.class);....../*** Implements the /api/query endpoint to fetch data from OpenTSDB.* @param tsdb The TSDB to use for fetching data* @param query The HTTP query for parsing and responding*/@Overridepublic void execute(final TSDB tsdb, final HttpQuery query) throws IOException {// only accept GET/POST/DELETEif (query.method() != HttpMethod.GET && query.method() != HttpMethod.POST &&query.method() != HttpMethod.DELETE) {throw new BadRequestException(HttpResponseStatus.METHOD_NOT_ALLOWED, "Method not allowed", "The HTTP method [" + query.method().getName() +"] is not permitted for this endpoint");}if (query.method() == HttpMethod.DELETE && !tsdb.getConfig().getBoolean("tsd.http.query.allow_delete")) {throw new BadRequestException(HttpResponseStatus.BAD_REQUEST,"Bad request","Deleting data is not enabled (tsd.http.query.allow_delete=false)");}final String[] uri = query.explodeAPIPath();final String endpoint = uri.length > 1 ? uri[1] : ""; /api/query/last这个端点if (endpoint.toLowerCase().equals("last")) {handleLastDataPointQuery(tsdb, query);/api/query/gexp这个端点} else if (endpoint.toLowerCase().equals("gexp")){handleQuery(tsdb, query, true);/api/query/exp这个端点} else if (endpoint.toLowerCase().equals("exp")) {handleExpressionQuery(tsdb, query);return;} else {/api/query 这个端点handleQuery(tsdb, query, false);}}

我们主要关注/api/query这个端点的处理方法。

handleQuery()

下面就是handleQuery()方法。

* Processing for a data point query* @param tsdb The TSDB to which we belong* @param query The HTTP query to parse/respond* @param allow_expressions Whether or not expressions should be parsed* (based on the endpoint)*/private void handleQuery(final TSDB tsdb, final HttpQuery query, final boolean allow_expressions) {......// POST方式if (query.method() == HttpMethod.POST) {switch (query.apiVersion()) {case 0:case 1:data_query = query.serializer().parseQueryV1();break;default:query_invalid.incrementAndGet();throw new BadRequestException(HttpResponseStatus.NOT_IMPLEMENTED, "Requested API version not implemented", "Version " + query.apiVersion() + " is not implemented");}expressions = null;}// GET方式else {expressions = new ArrayList<ExpressionTree>();data_query = parseQuery(tsdb, query, expressions);}//DELETE方式if (query.getAPIMethod() == HttpMethod.DELETE &&tsdb.getConfig().getBoolean("tsd.http.query.allow_delete")) {data_query.setDelete(true);}// validate and then compile the queriestry {LOG.debug(data_query.toString());//验证查询data_query.validateAndSetQuery();}catch (Exception e) {throw new BadRequestException(HttpResponseStatus.BAD_REQUEST, e.getMessage(), data_query.toString(), e);}......
}

我们关注GET方式时的处理方式,parseQuery(tsdb, query, expressions)。

GET方式-parseQuery()

以下是parseQuery(tsdb, query, expressions);方法。

public static TSQuery parseQuery(final TSDB tsdb, final HttpQuery query,final List<ExpressionTree> expressions) {final TSQuery data_query = new TSQuery();data_query.setStart(query.getRequiredQueryStringParam("start"));data_query.setEnd(query.getQueryStringParam("end"));if (query.hasQueryStringParam("padding")) {data_query.setPadding(true);}if (query.hasQueryStringParam("no_annotations")) {data_query.setNoAnnotations(true);}if (query.hasQueryStringParam("global_annotations")) {data_query.setGlobalAnnotations(true);}if (query.hasQueryStringParam("show_tsuids")) {data_query.setShowTSUIDs(true);}if (query.hasQueryStringParam("ms")) {data_query.setMsResolution(true);}if (query.hasQueryStringParam("show_query")) {data_query.setShowQuery(true);}  if (query.hasQueryStringParam("show_stats")) {data_query.setShowStats(true);}    if (query.hasQueryStringParam("show_summary")) {data_query.setShowSummary(true);}// handle tsuid queries firstif (query.hasQueryStringParam("tsuid")) {final List<String> tsuids = query.getQueryStringParams("tsuid");     for (String q : tsuids) {parseTsuidTypeSubQuery(q, data_query);}}//解析m参数后面的字符串if (query.hasQueryStringParam("m")) {final List<String> legacy_queries = query.getQueryStringParams("m");      for (String q : legacy_queries) {parseMTypeSubQuery(q, data_query);}......
}

可以从标黄部分看出,如果查询里面有m参数的话,那么就得到m参数的值。利用parseMTypeSubQuery(q, data_query)方法循环对每个参数m的值进行解析。我们再具体看看如何解析的。

GET方式-parseMTypeSubQuery()

以下是parseMTypeSubQuery(q, data_query);方法

private static void parseMTypeSubQuery(final String query_string, TSQuery data_query) {if (query_string == null || query_string.isEmpty()) {throw new BadRequestException("The query string was empty");}// m is of the following forms:// agg:[interval-agg:][rate:]metric[{tag=value,...}]// where the parts in square brackets `[' .. `]' are optional.final String[] parts = Tags.splitString(query_string, ':');int i = parts.length;if (i < 2 || i > 5) {throw new BadRequestException("Invalid parameter m=" + query_string + " ("+ (i < 2 ? "not enough" : "too many") + " :-separated parts)");}final TSSubQuery sub_query = new TSSubQuery();// the aggregator is firstsub_query.setAggregator(parts[0]);i--; // Move to the last part (the metric name).List<TagVFilter> filters = new ArrayList<TagVFilter>();sub_query.setMetric(Tags.parseWithMetricAndFilters(parts[i], filters));sub_query.setFilters(filters);// parse out the rate and downsampler for (int x = 1; x < parts.length - 1; x++) {if (parts[x].toLowerCase().startsWith("rate")) {sub_query.setRate(true);if (parts[x].indexOf("{") >= 0) {sub_query.setRateOptions(QueryRpc.parseRateOptions(true, parts[x]));}} else if (Character.isDigit(parts[x].charAt(0))) {sub_query.setDownsample(parts[x]);} else if (parts[x].toLowerCase().startsWith("explicit_tags")) {sub_query.setExplicitTags(true);}}if (data_query.getQueries() == null) {final ArrayList<TSSubQuery> subs = new ArrayList<TSSubQuery>(1);data_query.setQueries(subs);}data_query.getQueries().add(sub_query);}

以上标就是主要的解析步骤了。先按冒号:进行切分。先得到头和尾。再解析剩余的可选部分。

4. HttpJsonSerializer.java

我们继续分析POST请求方式的处理逻辑。

POST方式- parseQueryV1()

/*** Parses a timeseries data query* @return A TSQuery with data ready to validate* @throws JSONException if parsing failed* @throws BadRequestException if the content was missing or parsing failed*/public TSQuery parseQueryV1() {final String json = query.getContent();if (json == null || json.isEmpty()) {throw new BadRequestException(HttpResponseStatus.BAD_REQUEST,"Missing message content","Supply valid JSON formatted data in the body of your request");}try {return JSON.parseToObject(json, TSQuery.class);} catch (IllegalArgumentException iae) {throw new BadRequestException("Unable to parse the given JSON", iae);}}

主要就是将json文档转成了TSQuery的对象的格式。

5. TSQuery.java

以下是验证查询参数的代码。

validateAndSetQuery()

public void validateAndSetQuery() {if (start == null || start.isEmpty()) {throw new IllegalArgumentException("Missing start time");}start_time = DateTime.parseDateTimeString(start, timezone);if (end != null && !end.isEmpty()) {end_time = DateTime.parseDateTimeString(end, timezone);} else {end_time = System.currentTimeMillis();}if (end_time <= start_time) {throw new IllegalArgumentException("End time [" + end_time + "] must be greater than the start time ["+ start_time +"]");}if (queries == null || queries.isEmpty()) {throw new IllegalArgumentException("Missing queries");}// validate queriesint i = 0;for (TSSubQuery sub : queries) {sub.validateAndSetQuery();final DownsamplingSpecification ds = sub.downsamplingSpecification();if (ds != null && timezone != null && !timezone.isEmpty() && ds != DownsamplingSpecification.NO_DOWNSAMPLER) {final TimeZone tz = DateTime.timezones.get(timezone);if (tz == null) {throw new IllegalArgumentException("The timezone specification could not be found");}ds.setTimezone(tz);}if (ds != null && use_calendar && ds != DownsamplingSpecification.NO_DOWNSAMPLER) {ds.setUseCalendar(true);}sub.setIndex(i++);}}

6. TSSubQuery.java

以下是验证子查询参数的方法。

validateAndSetQuery()

public void validateAndSetQuery() {if (aggregator == null || aggregator.isEmpty()) {throw new IllegalArgumentException("Missing the aggregation function");}try {agg = Aggregators.get(aggregator);} catch (NoSuchElementException nse) {throw new IllegalArgumentException("No such aggregation function: " + aggregator);}// we must have at least one TSUID OR a metricif ((tsuids == null || tsuids.isEmpty()) && (metric == null || metric.isEmpty())) {throw new IllegalArgumentException("Missing the metric or tsuids, provide at least one");}// Make sure we have a filter listif (filters == null) {filters = new ArrayList<TagVFilter>();}// parse the downsampler if we have oneif (downsample != null && !downsample.isEmpty()) {// downsampler given, so parse itdownsample_specifier = new DownsamplingSpecification(downsample);} else {// no downsamplerdownsample_specifier = DownsamplingSpecification.NO_DOWNSAMPLER;}}

到此,/api/query端口的查询解析代码已经分析完毕。后面将研究OpenTSDB是如何执行查询操作的。

OpenTSDB查询代码解析相关推荐

  1. 视觉SLAM开源算法ORB-SLAM3 原理与代码解析

    来源:深蓝学院,文稿整理者:何常鑫,审核&修改:刘国庆 本文总结于上交感知与导航研究所科研助理--刘国庆关于[视觉SLAM开源算法ORB-SLAM3 原理与代码解析]的公开课. ORB-SLA ...

  2. java操作es聚合操作并显示其他字段_java使用elasticsearch分组进行聚合查询过程解析...

    这篇文章主要介绍了java使用elasticsearch分组进行聚合查询过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 java连接elas ...

  3. python手机版代码-Python手机号码归属地查询代码

    简单的一个例子,是以前用Dephi写的,前不久刚实现了一个在Python中使用Delphi控件来编写界面程序,于是趁热写一个类似的的查询方案. 本实例是通过www.ip138.com这个网站来查询的, ...

  4. java 树状数据算法_使用递归算法结合数据库解析成Java树形结构的代码解析

    这篇文章主要介绍了使用递归算法结合数据库解析成Java树形结构的代码解析的相关资料,需要的朋友可以参考下 1.准备表结构及对应的表数据 a.表结构:create table TB_TREE ( CID ...

  5. MATLAB程序详细解析,遗传算法——matlab代码解析

    遗传算法--matlab代码解析 本文为学习B站老哥数学建模课程之后的一点笔记,图片源自web,代码源自老哥程序包,侵权删. 详细的遗传算法原理不再赘述,百度即可找到. 算法定义 遗传算法(GA)是模 ...

  6. cmd显示服务器对区域没有权威,查询dns解析服务器地址cmd命令

    查询dns解析服务器地址cmd命令 内容精选 换一换 一次完整的HTTP请求包括域名解析.建立TCP连接.发起请求.服务器接收到请求进行处理并返回处理结果.浏览器对HTML代码进行解析并请求其他资源, ...

  7. Selenium学习 - 库代码解析

    Selenium学习 - 库代码解析 一.selenium/common exceptions.py 定义了一个继承自Exception类的WebDriverException基础异常类,然后通过它扩 ...

  8. [从零手写VIO|第五节]——后端优化实践——单目BA求解代码解析

    长篇警告⚠⚠⚠ 目录 solver 全流程回顾 Solver三要素 Solver求解中的疑问 核心问题 代码解析 1. TestMonoBA.cpp 2. 后端部分: 2.1 顶点 2.2 边(残差) ...

  9. Learning Memory-guided Normality for Anomaly Detection 代码解析

    Learning Memory-guided Normality for Anomaly Detection 代码解析 目录 : 整体结构 训练(train)部分分析 评估(evaluate)部分分析 ...

最新文章

  1. Android下创建一个输入法
  2. Spring源码解析——如何阅读源码
  3. 这就是飞秋下载早期的学习生涯
  4. 快抢!猪年之前最后一波送书福利,错过只能等“明年”
  5. 解读鸿蒙轻内核的监控器:异常钩子函数
  6. fastjson php,Fastjson JSONPath
  7. JVM中的垃圾收集算法
  8. 新手如何快速入门软件测试?你还缺这几样...
  9. 安卓机型app的编译与反编译 apk文件的简单说明与解析
  10. linux ubuntu软件中心,Ubuntu 20.04 将Ubuntu软件中心切换到Snap商店
  11. 如何成为城建档案管理员(资料员)
  12. php $stmt,PHP在stmt准备和执行语句后获取结果
  13. 闲鱼互动玩法标准化建设
  14. MOS管选型参数:VGS(th)
  15. 用智能音箱+AI 联网玩狼人杀将是怎样的体验?
  16. Android APP应用完整性检查
  17. Building Crosswalk For Windows
  18. 2021年绵阳东辰中学高考成绩查询,绵阳市所有高中学校排名,2021年绵阳市重点高中分数线排名榜...
  19. 核心价值观编码器【matlab版】
  20. python期末考试试卷及解析

热门文章

  1. Android动态换肤实现原理解析,灵魂拷问
  2. LSTM如何解决梯度消失
  3. 电子邮件有哪些安全隐患_您的公司电子邮件并不像您想象的那样安全
  4. python协程三种实现方法
  5. ffmpeg+jsmpeg+nginx实现多道h5视频直播
  6. 手把手教你如何调配监控镜头
  7. 探索思维导图:提升思维能力与效率的利器
  8. 我们结婚吧,在区块链上!
  9. session使用实例
  10. 【Java学习】新知识点补充三:程序流程控制