点击箭头处“蓝色字”,关注我们哦!!

维表关联是离线计算或者实时计算里面常见的一种处理逻辑,常常用于字段补齐、规则过滤等,一般情况下维表数据放在MySql等数据库里面,对于离线计算直接通过ETL方式加载到Hive表中,然后通过sql方式关联查询即可,但是对于实时计算中Flink、SparkStreaming的表都是抽象的、虚拟的表,那么就没法使用加载方式完成。透过维表服务系列里面讲到的维表关联都是使用编码方式完成,使用Map或者AsyncIO方式完成,但是这种硬编码方式开发效率很低,特别是在实时数仓里面,我们希望能够使用跟离线一样sql方式完成维表关联操作。

在Flink1.9中提供了使用sql化方式完成维表关联,只需要实现LookupableTableSource接口即可,可以实现同步或者异步关联。在1.9之前就需要自己实现sql语法解析,然后在转换为API方式,对上层提供sql语法。看一个sql语句:

select * from orders o join gdsInfo g on o.gdsId=g.gdsId

orders表示流表,gdsInfo 表示维表。根据sql解析顺序先 from 部分、然后where 部分、最后select,那么对于join 方式,相当于join生成了一张临时表,然后去select 这张临时表,因此可以确认sql解析流程:1. 识别出流表与维表
2. 解析join部分,生成临时表3. select 临时表现在使用calcite解析这条语句

public class ParseDemo {    public static void main(String[] args) {        //假设gdsInfo就是维表        String sql = "select * from orders o join gdsInfo g on o.gdsId=g.gdsId";        SqlParser.Config config = SqlParser.configBuilder().setLex(Lex.MYSQL).build();        SqlParser sqlParser = SqlParser.create(sql, config);        SqlSelect sqlSelect = null;        try {            sqlSelect = (SqlSelect) sqlParser.parseStmt();        } catch (Exception e) {            e.printStackTrace();        }        SqlNode sqlFrom = sqlSelect.getFrom();        boolean isSideJoin = false;        String leftTable = "";        String rightTable = "";        String newName = ""; //临时表        SqlJoin sqlJoin = null;                //解析join        if (sqlFrom.getKind() == SqlKind.JOIN) {            sqlJoin = (SqlJoin) sqlFrom;            SqlNode left = sqlJoin.getLeft();            SqlNode right = sqlJoin.getRight();            isSideJoin = true;            leftTable = paserTableName(left);            rightTable = paserTableName(right);        }                //生成新的select        if (isSideJoin) {            newName = leftTable + "_" + rightTable;            SqlParserPos pos = new SqlParserPos(0, 0);            SqlIdentifier sqlIdentifier = new SqlIdentifier(newName, pos);            sqlSelect.setFrom(sqlIdentifier);        }    }      //解析表    private static String paserTableName(SqlNode tbl) {        if (tbl.getKind() == SqlKind.AS) {            SqlBasicCall sqlBasicCall = (SqlBasicCall) tbl;            return sqlBasicCall.operands[1].toString();        }        return ((SqlIdentifier) tbl).toString();    }}

那么我们需要的就是生成新的select节点与SqlJoin节点,执行逻辑就是根据SqlJoin节点做维表关联之后生成新的表,然后去select这样新的表。sql解析部分已经完成,既然使用sql化方式,因此也需要定义源表与维表,数据源一般是kafka, 定义源表需要:表名称、字段名称、字段类型、数据格式、topic;维表假设为mysql,需要定义:表名称、字段类型、字段名称、关联方式(同步/异步)、缓存方式(LRU/全部缓存、无缓存)。源表定义:

CREATE TABLE orders(    orderId varchar,    gdsId varchar,    orderTime varchar )WITH(    type = 'kafka',    kafka.bootstrap.servers = 'localhost:9092',    kafka.topic = 'topic1',    kafka.group.id = 'gId1',    sourcedatatype ='json' );

维表定义:

CREATE TABLE gdsInfo(    gdsId varchar,    gdsName varchar,    price double )WITH(    type='mysql',    url='jdbc:mysql://localhost:3306/paul',    userName='root',    password='123456',    tableName='gdsInfo',    cache = 'LRU',    isSideTable='true'    );

现在就是要如何解析这些语句,正则表达式是首选,需要解析出表名称、字段、属性三个部分:creat table xxx (xxx) with(xxx);正则表达式可为:

(?i)create\s+table\s+(\S+)\s*\((.+)\)\s*with\s*\((.+)\)

?i表示后面的匹配忽略大小写,\s+ 表示匹配多个空格,\S+表示匹配多个字符,.+ 表示匹配任意字符。定义一个table类:

class TableInfo{    private String tableName; // 表名称    private Map<String,String> fieldsInfo; //字段名称->类型    private Properties props; //表属性    private boolean isSideTable; //是否为维表    }

解析:

public class ParseCreate {    public static final String REG_CREATE="(?i)create\\s+table\\s+(\\S+)\\s*\\((.+)\\)\\s*with\\s*\\((.+)\\)";    public static void main(String[] args) {        String createSql="CREATE TABLE orders(" + "    orderId varchar," + "    gdsId varchar,"                + "    orderTime varchar" + " )WITH(" + "    type = 'kafka',"                + "    kafka.bootstrap.servers = 'localhost:9092'," + "    kafka.topic = 'topic1',"                + "    kafka.group.id = 'gId1'," + "    sourcedatatype ='json'" + " );";        Pattern pattern=Pattern.compile(REG_CREATE);        TableInfo tableInfo=new TableInfo();        Matcher matcher=pattern.matcher(createSql);        if(matcher.find()){            tableInfo.setTableName(matcher.group(1));            String fieldsStr=matcher.group(2);            String propsStr=matcher.group(3);            tableInfo.setFieldsInfo(parseFiles(fieldsStr));            tableInfo.setProps(parseProps(propsStr));            if(Boolean.valueOf(tableInfo.getProps().getProperty("isSideTable","false"))){                tableInfo.setSideTable(true);            }        }    }    public static Map<String,String> parseFiles(String fieldsStr){        Map<String,String> fieldsInfo=new HashMap<>();        String[] fieldsArray=fieldsStr.split(",");        for(String field: fieldsArray){           String[] fieldInfo=field.trim().split(" ");           fieldsInfo.put(fieldInfo[0],fieldInfo[1]);        }        return fieldsInfo;    }    public static Properties parseProps(String propsStr){        Properties props=new Properties();        String[] propsArray=propsStr.split(",");        for(String prop: propsArray){            String[] propInfo=prop.trim().split("=");            props.setProperty(propInfo[0],propInfo[1]);        }        return props;    }}

至此完成了简易的create语句解析,下一篇将介绍如何将解析后的create与维表关联转换为可执行代码。


原创不易,好看,就点个"在看"

sql 忽略大小写_Flink使用Calcite解析Sql做维表关联(一)相关推荐

  1. 淘宝数据库OceanBase SQL编译器部分 源码阅读--解析SQL语法树

    http://blog.csdn.net/qq910894904/article/details/28658421 OceanBase是阿里巴巴集团自主研发的可扩展的关系型数据库,实现了跨行跨表的事务 ...

  2. flink sql 知其所以然(十四):维表 join 的性能优化之路(上)附源码

    看了那么多的技术文,你能明白作者想让你在读完文章后学到什么吗? 大数据羊说__的文章会让你明白 大数据羊说 用数据提升美好事物发生的概率~ 43篇原创内容 公众号 博主会阐明博主期望本文能给小伙伴们带 ...

  3. SQL 忽略大小写模糊查询

    where upper(colName) like '%value%' where lower(keyword ) like '%shoes%'

  4. mysql表关联字段长度不一样_mysql优化sql案例,5.6版本的致命点之两表关联的字段类型相同的重要性...

    时间大大的缩短: 但是这个还不是最优化的:后面又发现关联表那边有这样的一个语句: FROM np_order n left join web114_order_ledger wol on n.orde ...

  5. flink sql 知其所以然(二)| 自定义 redis 数据维表(附源码)

    感谢您的关注  +  点赞 + 再看,对博主的肯定,会督促博主持续的输出更多的优质实战内容!!! 1.序篇-本文结构 背景篇-为啥需要 redis 维表 目标篇-做 redis 维表的预期效果是什么 ...

  6. sql怎么撤回update_【干货】SQL基础快速入门

    我们知道,关系数据库通过表来管理数据,数据库中可以同时存储多张表,数据库管理的是表,那么谁来管理数据库呢? 数据库由数据库管理系统(DBMS)来操纵和管理,终端用户通过dbms访问数据库中的数据,数据 ...

  7. sql怎么两个表关联查询

    在 SQL 中,我们可以使用 JOIN 关键字来两个表关联查询.例如,假设有两个表:table1 和 table2,并且它们之间有一个关联字段 field1.你可以使用如下的语句来两个表关联查询: S ...

  8. Flink SQL 功能解密系列 —— 维表 JOIN 与异步优化

    2019独角兽企业重金招聘Python工程师标准>>> 引子 流计算中一个常见的需求就是为数据流补齐字段.因为数据采集端采集到的数据往往比较有限,在做数据分析之前,就要先将所需的维度 ...

  9. calcite查询mysql_Apache Calcite教程-SQL解析-Calcite SQL解析

    Calcite SQL解析 代码目录 如图: java config.fmpp calcite 模板配置mysql Parser.jj JavaCC解析器git parserImpls.ftl/com ...

最新文章

  1. mysql oracle sqlit_【Go语言】连接数据库SQLite、MySQL、Oracle
  2. CouchDB 简单HTTP接口使用说明
  3. docker安装在服务器的那个位置,docker容器卷通常会放在什么位置
  4. 命令注入工具Commix
  5. 正余弦函数的Talor近似
  6. mysqlcsv导入中文乱码_Mysql 导入导出csv 中文乱码问题的解决方法
  7. 无名管道pipe使用方法
  8. Android Studio中导入第三方库
  9. hdu 3925 一道复杂的简单题
  10. attachEvent传递给其handler的一个默认参数
  11. betterscroll的使用
  12. Okra框架(二) 搭建Socket服务器
  13. ArcGIS教程 - 7 空间数据编辑
  14. 谈谈像素以及微信小程序的 rpx
  15. 如何让iPad变成Mac的扩展屏幕
  16. 【python】Python性能鸡汤
  17. IIC通信协议(STM32学习笔记 一)
  18. 推荐!最新机器学习、深度学习绘图模板.ppt
  19. 凌动上网本改装linux,Ubuntu Netbook Remix 专为上网本打造的linux系统--梦飞翔的地方(梦翔天空)...
  20. 假装认真的LaTeX学习笔记(1)—— Sublime中自动补全LaTeX命令(LaTeX-cwl安装教程)

热门文章

  1. Nginx的继续深入(日志轮询切割,重写,负载均衡等)
  2. Matlab图像处理基本函数(1)
  3. extmail电子邮件系统
  4. Centos下通过SMTP发送邮件失败解决
  5. 基于JSP实现网上商城系统
  6. Python测试框架doctest
  7. Postgresql的使用
  8. mysql主从复制错误sql-running no
  9. YII2 载入默认值 loadDefaultValues
  10. JavaScript stringObject.replace() 方法