ApacheCalcite官网介绍

  • Apache Calcite is a dynamic data management framework.
  • It contains many of the pieces that comprise a typical database management system, but omits some key functions: storage of data, algorithms to process data, and a repository for storing metadata.
  • Calcite intentionally stays out of the business of storing and processing data. As we shall see, this makes it an excellent choice for mediating between applications and one or more data storage locations and data processing engines. It is also a perfect foundation for building a database: just add data.

简单认识

  • Calcite是一个动态的数据管理框架, 目前的理解它主要有sql的解析,校验,查询优化等功能, 它不是一个数据库, 不包含存储的处理数据的功能, 数据存储和处理是用户自定义实现的, 可以结合应用程序和一个或多个数据源.
  • 另外Flink流计算中也是基于Calcite, Flink的source到sink的数据投递需要使用到的sql connector, 也是跟Calcite的Adapters适配器类似的思想, flink的学习使用是在另一个专栏. 学习Calcite也是对学习Flinksql很有帮助

入门使用1(Reflective java)

首先要知道的几个概念, 比如mysql

connection 连接 与数据库的连接
Schema 模式,是数据库对象集合 或叫database? 包含了表,视图等多种对象
Table 包含了字段
Field 字段  

Calcite也是有并不限于connection, Schema, Table, Field这几种概念

calcite核心依赖

<dependency><groupId>org.apache.calcite</groupId><artifactId>calcite-core</artifactId><version>1.26.0</version>
</dependency>
package com.cxydevelop.calcite;import org.apache.calcite.adapter.java.ReflectiveSchema;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;public class ReflectiveSchemaTest {// tablepublic static class People {// fieldpublic String id;public String name;public People(String id, String name) {this.id = id;this.name = name;}}// tablepublic static class Detail {// fieldpublic String id;public int age;public Detail(String id, int age) {this.id = id;this.age = age;}}// schemapublic static class JavaSchema {public final People[] people = new People[]{new People("1","namea"),new People("2","nameb"),new People("3","namec")};public final Detail[] detail = new Detail[]{new Detail("1",1),new Detail("2",22),new Detail("3",333)};}public static void main(String[] args) throws Exception {Class.forName("org.apache.calcite.jdbc.Driver");Properties info = new Properties();info.setProperty("lex", "JAVA");// 创建连接Connection connection = DriverManager.getConnection("jdbc:calcite:", info);CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);// SchemaPlus相当databasesSchemaPlus rootSchema = calciteConnection.getRootSchema();// 创建schema, ReflectiveSchema内部通过放射会根据提供的schema创建table,fieldSchema schema = new ReflectiveSchema(new JavaSchema());// databases添加schemarootSchema.add("sc", schema);// 创建 StatementStatement statement = calciteConnection.createStatement();// 执行sqlResultSet resultSet = statement.executeQuery("" +"select p.id, p.name, d.age " +"from sc.people p left join sc.detail d  on p.id = d.id " +"where p.id >= 2" +"");printResultSet(resultSet);resultSet.close();statement.close();connection.close();}private static void printResultSet(ResultSet resultSet) throws SQLException {ResultSetMetaData metaData = resultSet.getMetaData();int columnCount = metaData.getColumnCount();while(resultSet.next()){List<Object> row = new ArrayList<>();for (int i = 1; i < columnCount+1; i++) {row.add(resultSet.getObject(i));}System.out.println(row);}}
}

最终的输出是

[2, nameb, 22]
[3, namec, 333]

入门使用2(csv)

这里入门使用的是calcite自带的一个csv设配器

calcite-example-csv依赖

<dependency><groupId>org.apache.calcite</groupId><artifactId>calcite-example-csv</artifactId><version>1.21.0</version>
</dependency>

项目代码结构

1 csv_default.json

json是用于定义schema,table等, 下面指定了并且指定了工场类是org.apache.calcite.adapter.csv.CsvSchemaFactory, CsvSchemaFactory会根据自己的规则解析并在Calcite中创建对应的schema,table等, 详情后面会说,现在只是简单使用

{"version": "1.0","defaultSchema": "csv","schemas": [{"name": "csv","type": "custom","factory": "org.apache.calcite.adapter.csv.CsvSchemaFactory","operand": {"directory": "D:\\cxy\\idea_workspace\\myproject\\calcite\\src\\main\\resources\\csv_dir"}}]
}

2 resources/csv_dir下面的csv_user.csv和csv_detail.csv分别模拟存储两个表信息

csv_user.csv

ID,NAME
1,name1
2,name2
3,name3

csv_detail.csv

ID,age,DESC
1,11,aaa
2,22,bbb
3,33,cccc

3 DefaultCsvTest

测试代码

package com.cxydevelop.calcite;import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.junit.Before;
import org.junit.Test;import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;public class DefaultCsvTest {private Connection conn;@Beforepublic void setup() throws SQLException {Properties config = new Properties();config.put("model", DefaultCsvTest.class.getClassLoader().getResource("csv_default.json").getPath());config.put("caseSensitive", "false");conn = DriverManager.getConnection("jdbc:calcite:",config);}@Testpublic void query() throws Exception {List<String> sqlList = new ArrayList<>();sqlList.add("select * from csv.csv_user");sqlList.add("select id, name || '_after_append' from csv_user");sqlList.add("select t.id,t.name,t2.age from csv_user t left join csv_detail t2 on t.id = t2.id");for (String sql : sqlList) {System.out.println("-----------------");System.out.println(sql);printResultSet(conn.createStatement().executeQuery(sql));}}private void printResultSet(ResultSet resultSet) throws SQLException {ResultSetMetaData metaData = resultSet.getMetaData();int columnCount = metaData.getColumnCount();while(resultSet.next()){List<Object> row = new ArrayList<>();for (int i = 1; i < columnCount+1; i++) {row.add(resultSet.getObject(i));}System.out.println(row);}}
}

4 运行结果,可以看到我们可以用sql来查询csv文件内容

-----------------
select * from csv.csv_user
[1, name1]
[2, name2]
[3, name3]
-----------------
select id, name || '_after_append' from csv_user
[1, name1_after_append]
[2, name2_after_append]
[3, name3_after_append]
-----------------
select t.id,t.name,t2.age from csv_user t left join csv_detail t2 on t.id = t2.id
[1, name1, 11]
[2, name2, 22]
[3, name3, 33]

探究calcite如何读取查询csv

首先测试程序中基本没有什么代码,calcite就能读取我们提供的csv文件, 主要是因为csv_default.json里面定义了两个内容

  1. 一个是factory, 对应calcite提供的csv schema工厂
  2. 一个是directory, 对应csv文件目录

所以关键是CsvSchemaFactory做了啥

进入CsvSchemaFactory, 可以发现CsvSchemaFactory实现了SchemaFactory, 重写了create方法返回Schema, create方法里面可以读取到我们定义的配置( 比如存csv的文件夹路径 ), 并且用于创建schema

进入CsvSchema,可以发现CsvSchema继承了AbstractSchema, 成员属性有tableMap, 所以跟数据库类似, schema下面有对应的table

在getTableMap中调用的createTableMap方法 , 可以看到是解析了文件夹下面的文件,调用createTable, 放在map中返回

createTable中是创建对应的表,可以看到还有几种类型的table, 默认是会创建CsvScannableTable

CsvScannableTable主要是实现了ScannableTable, 实现了scan方法, 返回了一个 CsvEnumerator

CsvEnumerator中主要是moveNext和current两个方法获取和返回csv的数据

上面截图的代码基本都是calcite提供的csv设配器源码,主要是了解其基本的规则,后面可以自定义实现自己的设配器

csv细节

上面说到createTable中是创建对应的表,有几种类型的table, 默认是会创建CsvScannableTable, 通过源码可以看到可以在之前提到的json文件中配置flavor

分别是以下三种

  1. SCANNABLE
  2. FILTERABLE
  3. TRANSLATABLE

CsvScannableTable实现ScannableTable的scan方法, " 字面意思 "是直接扫描全表?

public class CsvScannableTable extends CsvTable implements ScannableTable {CsvScannableTable(Source source, RelProtoDataType protoRowType) {super(source, protoRowType);}public String toString() {return "CsvScannableTable";}public Enumerable<Object[]> scan(DataContext root) {final int[] fields = CsvEnumerator.identityList(this.fieldTypes.size());final AtomicBoolean cancelFlag = (AtomicBoolean)Variable.CANCEL_FLAG.get(root);return new AbstractEnumerable<Object[]>() {public Enumerator<Object[]> enumerator() {return new CsvEnumerator(CsvScannableTable.this.source, cancelFlag, false, (String[])null, new ArrayRowConverter(CsvScannableTable.this.fieldTypes, fields));}};}
}

CsvFilterableTable实现FilterableTable的scan方法, 相比ScannableTable多了一个参数filters, " 字面意思 "是可以提前通过filters的规则过滤掉一些数据?

public class CsvFilterableTable extends CsvTable implements FilterableTable {public CsvFilterableTable(Source source, RelProtoDataType protoRowType) {super(source, protoRowType);}public String toString() {return "CsvFilterableTable";}public Enumerable<Object[]> scan(DataContext root, List<RexNode> filters) {final String[] filterValues = new String[this.fieldTypes.size()];filters.removeIf((filter) -> {return this.addFilter(filter, filterValues);});final int[] fields = CsvEnumerator.identityList(this.fieldTypes.size());final AtomicBoolean cancelFlag = (AtomicBoolean)Variable.CANCEL_FLAG.get(root);return new AbstractEnumerable<Object[]>() {public Enumerator<Object[]> enumerator() {return new CsvEnumerator(CsvFilterableTable.this.source, cancelFlag, false, filterValues, new ArrayRowConverter(CsvFilterableTable.this.fieldTypes, fields));}};}private boolean addFilter(RexNode filter, Object[] filterValues) {if (filter.isA(SqlKind.AND)) {((RexCall)filter).getOperands().forEach((subFilter) -> {this.addFilter(subFilter, filterValues);});} else if (filter.isA(SqlKind.EQUALS)) {RexCall call = (RexCall)filter;RexNode left = (RexNode)call.getOperands().get(0);if (left.isA(SqlKind.CAST)) {left = (RexNode)((RexCall)left).operands.get(0);}RexNode right = (RexNode)call.getOperands().get(1);if (left instanceof RexInputRef && right instanceof RexLiteral) {int index = ((RexInputRef)left).getIndex();if (filterValues[index] == null) {filterValues[index] = ((RexLiteral)right).getValue2().toString();return true;}}}return false;}
}
CsvTranslatableTable实现TranslatableTable的toRel方法, 应该是最高级的用法, 自己写规则, 具体用法后面边学习边用
public class CsvTranslatableTable extends CsvTable implements QueryableTable, TranslatableTable {CsvTranslatableTable(Source source, RelProtoDataType protoRowType) {super(source, protoRowType);}public String toString() {return "CsvTranslatableTable";}public Enumerable<Object> project(DataContext root, final int[] fields) {final AtomicBoolean cancelFlag = (AtomicBoolean)Variable.CANCEL_FLAG.get(root);return new AbstractEnumerable<Object>() {public Enumerator<Object> enumerator() {return new CsvEnumerator(CsvTranslatableTable.this.source, cancelFlag, CsvTranslatableTable.this.fieldTypes, fields);}};}public Expression getExpression(SchemaPlus schema, String tableName, Class clazz) {return Schemas.tableExpression(schema, this.getElementType(), tableName, clazz);}public Type getElementType() {return Object[].class;}public <T> Queryable<T> asQueryable(QueryProvider queryProvider, SchemaPlus schema, String tableName) {throw new UnsupportedOperationException();}public RelNode toRel(ToRelContext context, RelOptTable relOptTable) {int fieldCount = relOptTable.getRowType().getFieldCount();int[] fields = CsvEnumerator.identityList(fieldCount);return new CsvTableScan(context.getCluster(), relOptTable, this, fields);}
}

Apache Calcite初探和csv简单例子相关推荐

  1. 【Calcite】Apache Calcite 框架初探及概念详解

    1. 简介 Calcite 是什么?如果用一句话形容 Calcite,Calcite 是一个用于优化异构数据源的查询处理的基础框架. 最近十几年来,出现了很多专门的数据处理引擎.例如列式存储 (HBa ...

  2. Apache Calcite官方文档中文版-概览-1.背景

    第一部分 概览 1. 背景   Apache Calcite是一个动态数据管理框架.它包含了许多组成典型数据管理系统的经典模块,但省略了一些关键性的功能: 数据存储,数据处理算法和元数据存储库.    ...

  3. Apache Calcite官方文档中文版- 进阶-1. 适配器

    第二部分 进阶(Advanced) 1. 适配器(Adapters) 1.1 Schema adapters   一个schema adapter允许Calcite去读取特定类型的数据,将这些数据以一 ...

  4. Apache Calcite 处理流程详解(一)

    关于 Apache Calcite 的简单介绍可以参考 Apache Calcite:Hadoop 中新型大数据查询引擎 这篇文章,Calcite 一开始设计的目标就是 one size fits a ...

  5. Apache Calcite官方文档中文版- 概览-2. 教程

    第一部分 概览 2. 教程   本章针对Calcite的连接建立提供了循序渐进的教程,使用一个简单的适配器来将一个CSV文件目录以包含Schema信息的tables形式呈现,并提供了一个完全SQL接口 ...

  6. 【手把手教你全文检索】Apache Lucene初探

    讲解之前,先来分享一些资料 首先呢,学习任何一门新的亦或是旧的开源技术,百度其中一二是最简单的办法,先了解其中的大概,思想等等.这里就贡献一个讲解很到位的ppt.已经被我转成了PDF,便于搜藏. 其次 ...

  7. Apache Calcite论文概要

    ABSTRACT calcite的特点: 模块化优化规则和可扩展查询优化器 支持各种查询语言的查询处理器 可扩展适配器架构 异构数据模型和存储 1.INTRODUCTION 面临问题: 多种异构数据源 ...

  8. Apache Calcite教程-SQL解析-Calcite SQL解析

    Calcite SQL解析 一.代码结构 其中,在codegen文件夹下,  config.fmpp (主要制定实现类路径)表示calcite 模板配置,Parser.jj是JavaCC解析器所需解析 ...

  9. Apache calcite Quickstart

    [Calcite]Apache Calcite 框架初探及概念详解_董嘻嘻的博客-CSDN博客_apache calcite Apache Calcite介绍_pucheung的博客-CSDN博客 A ...

最新文章

  1. UA MATH ECE636 信息论10 Non-adaptive Group Testing
  2. 串口ic读卡器源码-c#代码(2)续上
  3. 今日arXiv精选 | 18篇近期值得关注的Transformer工作
  4. 浅谈 举家搬迁静态文件到CDN
  5. OpenStack 已死?
  6. 20200710:动态规划复习day03
  7. found linux系统wget出现not_Java 9 AOT 试用:仅支持 64 位 Linux和java.base 模块编译
  8. 从华为“流程与IT管理部”看IT部门定位
  9. 推荐 | 掌握这些套路,你也能解决 90% 的 NLP 问题
  10. shame on u
  11. matlab 三维画图总结
  12. python数据分析与可视化从入门到精通_零基础学Python爬虫、数据分析与可视化从入门到精通...
  13. 广州大学 计算机网络实验3 使用网络协议分析器捕捉和分析协议数据包 2020版
  14. springCloud框架搭建,添加feign和Zuul
  15. 游戏开发中为什么要控制模型的面数
  16. TIPOP 出货单单头
  17. 翌加:抖音账号被限流了如何解决
  18. 美国麦当劳“四川辣酱”将再次回归;“澳大利亚制造”巧克力要崛起;中国食品行业首个“零碳工厂”诞生 | 食品饮料新品...
  19. c#期末考试知识点_c#期末考试复习题
  20. 【C++要笑着学】关键字 | 命名空间 | 输入和输出

热门文章

  1. 我校全力开展抗洪救灾工作
  2. 使用obi fluid进行洪水模拟,持续更新~
  3. 别不信,小宝宝爱盯着妈妈看原来跟大脑发育有关
  4. 给曾经爱过、正在爱着、将要爱的人们!
  5. 2007经典搞笑警句
  6. j3455安装linux 4k驱动,nas-j3455kvm安装win10及集成显卡直通
  7. Java微信公众号开发之微信公众平台接入开发者
  8. 【MATLAB-app】如何使用键盘回调以及在app设计中应用
  9. WAF应用防火墙的功能
  10. win10磁贴中的图标变成白色的解决方法