快速生成sparksql创建carbondata表结构(同步mysql或sqlserver数据)脚本
前言
当实时同步mysql或sqlserver很多表数据到carbondata时,经常要手动调整脚本涉及到的每个表的字段、类型及对应建表语句,耗费大量的机械比对粘贴复制工作时间、精力,下面介绍的脚本能快速调整好改动点,只需手动复制改一下即可(快速、准确)
需同步的mysql或sqlserver原表结构
//sqlserver
CREATE TABLE [dbo].[TOTicket] ([Id] char(16) COLLATE Chinese_PRC_CI_AS NOT NULL,[ParkId] int NOT NULL,[Qty] int NOT NULL,[AgencySaleTicketClassId] int NULL,[ValidStartDate] datetime NOT NULL,[ValidDays] int NOT NULL,[Price] decimal(18,2) NOT NULL,[SalePrice] decimal(18,2) NOT NULL,[SettlementPrice] decimal(18,2) NULL,[Amount] decimal(18,2) NOT NULL,[InparkCounts] int NOT NULL,[InvoiceId] bigint NULL,[TerminalId] int NULL,[TOVoucherId] char(18) COLLATE Chinese_PRC_CI_AS NULL,[Remark] varchar(512) COLLATE Chinese_PRC_CI_AS NULL,[LastModificationTime] datetime NULL,[LastModifierUserId] bigint NULL,[CreationTime] datetime NOT NULL,[CreatorUserId] bigint NULL,[ParkSettlementPrice] decimal(18,2) NULL,[TicketSaleStatus] int DEFAULT ((0)) NOT NULL,[TicketFormEnum] int DEFAULT ((0)) NOT NULL,[SyncTicketType] int NULL,[FirstInparkTime] datetime NULL,[PrintTicketTime] datetime NULL,[TOBodyId] char(21) COLLATE Chinese_PRC_CI_AS NOT NULL,[RV] timestamp NOT NULL,[IsFreeze] bit DEFAULT ((0)) NOT NULL,[TOHeaderId] char(18) COLLATE Chinese_PRC_CI_AS DEFAULT '' NOT NULL,[PId] nvarchar(30) COLLATE Chinese_PRC_CI_AS NULL,[AgencyDeriveTicketClassId] int DEFAULT ((0)) NOT NULL,CONSTRAINT [PK_dbo.TOTicket] PRIMARY KEY NONCLUSTERED ([Id])
WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON)
ON [PRIMARY]
)//mysql
CREATE TABLE `parkderiveticketclassrelation` (`Id` bigint(20) NOT NULL,`CreationTime` datetime(6) NOT NULL,`CreatorUserId` bigint(20) NOT NULL,`LastModificationTime` datetime(6) DEFAULT NULL,`LastModifierUserId` bigint(20) NOT NULL,`IsDeleted` bit(1) NOT NULL,`DeletionTime` datetime(6) DEFAULT NULL,`DeleterUserId` bigint(20) NOT NULL,`ParkDeriveTicketClassId` int(11) NOT NULL DEFAULT '0',`ParkId` int(11) NOT NULL,`InParkBeginTime` varchar(10) DEFAULT NULL,`InParkEndTime` varchar(10) DEFAULT NULL,PRIMARY KEY (`Id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC;
目标结构(脚本自动生成)
脚本代码
★★★此脚本为初版临时用,需进一步优化,仅供参考
import org.apache.commons.lang3.StringUtils;import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;/*** @ClassName CarbonSql* @Description //TODO* @Author * @Date 2020/8/18 10:46* @Version 1.0**/
public class CarbonSql {//获取sqlServer表字段、是否主键、类型、表字段描述public static String sqlServerSql(String tableName) {String sql = "SELECT\n" +" cast(a.name as varchar) as columnName,\n" +" cast(case when exists(SELECT 1 FROM sysobjects where xtype = 'PK' and parent_obj = a.id and name in (\n" +" SELECT name FROM sysindexes WHERE indid in(SELECT indid FROM sysindexkeys WHERE id = a.id AND colid = a.colid))) then 'PRI' else '' end as varchar) as priKey,\n" +" cast(b.name as varchar) as columnType, \n" +" cast(isnull(g.[value], '') as varchar) as columnComment \n" +"FROM\n" +" syscolumns a\n" +"left join\n" +" systypes b\n" +"on\n" +" a.xusertype = b.xusertype\n" +"inner join\n" +" sysobjects d\n" +"on\n" +" a.id = d.id and d.xtype = 'U' and d.name <> 'dtproperties'\n" +"left join\n" +" syscomments e\n" +"on\n" +" a.cdefault = e.id\n" +"left join\n" +"sys.extended_properties g\n" +"on\n" +" a.id = G.major_id and a.colid = g.minor_id\n" +"left join\n" +"sys.extended_properties f\n" +"on\n" +" d.id = f.major_id and f.minor_id = 0\n" +"where\n" +" d.name = '" + tableName + "' \n" +"order by\n" +" a.id,a.colorder";return sql;}//获取mysql表字段、是否主键、类型、表字段描述public static String mysqlSql(String tableName, String dataBase) {String sql = "Select COLUMN_NAME as columnName, COLUMN_KEY as priKey, DATA_TYPE as columnType, COLUMN_COMMENT as columnComment \n" +"from INFORMATION_SCHEMA.COLUMNS \n" +"Where table_name = '" + tableName + "' \n" +"AND table_schema = '" + dataBase + "'";return sql;}//将sqlserver或者mysql字段类型转成carbondata对应的类型(若不满足需要,可自行修改)public static String findColumnType2(String str) {str = str.toLowerCase();String type;if (str.startsWith("int") || str.startsWith("smallint")) {type = "INT";} else if (str.startsWith("bigint")) {type = "LONG";} else if (str.startsWith("decimal(")) {type = "DOUBLE";} else if (str.startsWith("bit")) {type = "BOOLEAN";} else if (str.startsWith("datetime")) {type = "TIMESTAMP";} else {type = "STRING";}return type;}//将结果写入文本中保存public static void write(String ss) throws IOException {String path = "E:\\tmp\\carbon.txt";//将写入转化为流的形式BufferedWriter bw = new BufferedWriter(new FileWriter(path, true));//true,则追加写入text文本//一次写一行
// String ss = "测试数据";bw.write(ss);bw.newLine(); //换行用bw.flush();//关闭流bw.close();System.out.println("写入成功");}//获取database信息public static void findDataBase(String tableName) {//维度表与增量表划分String incre_table = ",toticket,taickettable3,parkderiveticketclassrelation,mysqltable2,table1,usertable2,tradetable1,";String demi_table = ",taickettable1,mysqltabl1,parkderiveticketclass,usertable1,tradetable2,taickettable2,table2,";//表真实来源String ticket_sqlserver = ",taickettable1,taickettable2,toticket,taickettable3,";String ticket_mysql = ",mysqltabl1,parkderiveticketclass,parkderiveticketclassrelation,mysqltable2,";String shop = ",table1,table2,";String user = ",usertable1,usertable2,";String trade = ",tradetable1,tradetable2,";String sourceType = "";String source = "";String className = "";String url = "";String userName = "";String password = "";String dataBase = "";String str = "," + tableName + ",";if (incre_table.contains(str)) {sourceType = "增量表";} else {sourceType = "维度表";}System.out.println(sourceType);//获取数据库连接if (ticket_sqlserver.contains(str)) {source = "ticket sqlserver";className = "com.microsoft.sqlserver.jdbc.SQLServerDriver";url = "jdbc:sqlserver://1.xx.xx.xx:9527;DatabaseName=data2";userName = "**";password = "*************";dataBase = "";} else if (ticket_mysql.contains(str)) {source = "ticket mysql";className = "com.mysql.cj.jdbc.Driver";if (str.contains("parkderiveticketclass" ) || str.contains("parkderiveticketclassrelation")|| str.contains("mysqltable5") || str.contains("mysqltable9")|| str.contains("mysqltable222")) {url = "jdbc:mysql://1x.xx.xx.xx:3306/data_one";dataBase = "data_one";} else {url = "jdbc:mysql://1x.xx.xx.xx:3306/data_two";dataBase = "data_two";}userName = "***";password = "*****************";} else if (shop.contains(str)) {source = "shop";className = "com.microsoft.sqlserver.jdbc.SQLServerDriver";url = "jdbc:sqlserver://1x.xx.xx.xx:666666;DatabaseName=data999";userName = "***";password = "*******************";dataBase = "";} else if (user.contains(str)) {source = "user数据";className = "com.mysql.cj.jdbc.Driver";url = "jdbc:mysql://xxxxxxxxx:9527/user";userName = "****";password = "*******";dataBase = "user";} else if (trade.contains(str)) {source = "xxx数据";className = "com.mysql.cj.jdbc.Driver";url = "jdbc:mysql://aaa-sss-xxxx.sql.xxxcdb.com:35845/case2222";userName = "****";password = "*******";dataBase = "case2222";}createSql(className, url, userName, password, dataBase, tableName, sourceType, source);}public static void createSql(String className, String url, String userName, String password, String dataBase, String tableName, String sourceType, String source) {//拼接carbon建表语句StringBuffer stringBuffer = new StringBuffer();ResultSet re = null;Statement statement = null;Connection con = null;try {Class.forName(className);con = DriverManager.getConnection(url, userName, password);statement = con.createStatement();String sql = "";if (dataBase.isEmpty()) {sql = sqlServerSql(tableName);} else {sql = mysqlSql(tableName, dataBase);}re = statement.executeQuery(sql);//执行查询语句,接收结果集String columnName = "";List<String> priKeyList = new ArrayList<>();String columnType = "";stringBuffer.append("spark.sql( \n").append("s\"\"\" \n").append(" |CREATE TABLE IF NOT EXISTS ").append(tableName).append(" ( \n");while (re.next()) {columnName = re.getString("columnName");String priKey = re.getString("priKey");if (StringUtils.isNotEmpty(priKey) && priKey.equalsIgnoreCase("PRI")) {priKeyList.add(columnName);}columnType = findColumnType2(re.getString("columnType"));stringBuffer.append(" | ").append(columnName).append(" ").append(columnType).append(", \n");}//去除最后一行字段的逗号stringBuffer.deleteCharAt(stringBuffer.length() - 3);stringBuffer.append(" |)\n" +" |USING carbondata\n" +" |OPTIONS (\n" +" |`compaction_level_threshold` '4,3',\n" +" |`table_blocksize` '1024',\n" +" |`sort_scope` 'local_sort',\n" +" |`cache_level` 'block',\n" +" |`sort_columns` 'Id',\n" +" |`local_dictionary_enable` 'false',\n" +" |`auto_load_merge` 'false',\n" +" |`column_meta_cache` '',\n" +" |`major_compaction_size` '512',\n" +" |`compaction_preserve_segments` '13')\n" +"\"\"\".stripMargin)\n\n");System.out.println(stringBuffer.toString());stringBuffer.insert(0, ("// 主键 : ").concat(StringUtils.join(priKeyList.toArray(), ",")).concat("\n"));stringBuffer.insert(0, ("// ").concat(tableName).concat(" ").concat(sourceType).concat(" ").concat(" 取自 ").concat(source).concat("\n"));write(stringBuffer.toString());} catch (ClassNotFoundException e) {e.printStackTrace();} catch (SQLException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} finally {if (null != con) {try {con.close();} catch (SQLException e) {e.printStackTrace();}}if (null != statement) {try {statement.close();} catch (SQLException e) {e.printStackTrace();}}if (null != re) {try {re.close();} catch (SQLException e) {e.printStackTrace();}}}}public static void second() throws IOException {//testString tableStr = "toticket,parkderiveticketclassrelation";String[] arr = tableStr.split(",");String tableName = "";for (String s1: arr) {tableName = s1;findDataBase(tableName);}}public static void main(String[] args) throws IOException {second();}}
结果
// toticket 增量表 取自 ticket sqlserver
// 主键 : Id
spark.sql(
s""" |CREATE TABLE IF NOT EXISTS toticket ( | Id STRING, | ParkId INT, | Qty INT, | AgencySaleTicketClassId INT, | ValidStartDate TIMESTAMP, | ValidDays INT, | Price STRING, | SalePrice STRING, | SettlementPrice STRING, | Amount STRING, | InparkCounts INT, | InvoiceId LONG, | TerminalId INT, | TOVoucherId STRING, | Remark STRING, | LastModificationTime TIMESTAMP, | LastModifierUserId LONG, | CreationTime TIMESTAMP, | CreatorUserId LONG, | ParkSettlementPrice STRING, | TicketSaleStatus INT, | TicketFormEnum INT, | SyncTicketType INT, | FirstInparkTime TIMESTAMP, | PrintTicketTime TIMESTAMP, | TOBodyId STRING, | RV STRING, | IsFreeze BOOLEAN, | TOHeaderId STRING, | PId STRING, | AgencyDeriveTicketClassId INT |)|USING carbondata|OPTIONS (|`compaction_level_threshold` '4,3',|`table_blocksize` '1024',|`sort_scope` 'local_sort',|`cache_level` 'block',|`sort_columns` 'Id',|`local_dictionary_enable` 'false',|`auto_load_merge` 'false',|`column_meta_cache` '',|`major_compaction_size` '512',|`compaction_preserve_segments` '13')
""".stripMargin)// parkderiveticketclassrelation 维度表 取自 ticket mysql
// 主键 : Id
spark.sql(
s""" |CREATE TABLE IF NOT EXISTS parkderiveticketclassrelation ( | Id LONG, | CreationTime TIMESTAMP, | CreatorUserId LONG, | LastModificationTime TIMESTAMP, | LastModifierUserId LONG, | IsDeleted BOOLEAN, | DeletionTime TIMESTAMP, | DeleterUserId LONG, | ParkDeriveTicketClassId INT, | ParkId INT, | InParkBeginTime STRING, | InParkEndTime STRING |)|USING carbondata|OPTIONS (|`compaction_level_threshold` '4,3',|`table_blocksize` '1024',|`sort_scope` 'local_sort',|`cache_level` 'block',|`sort_columns` 'Id',|`local_dictionary_enable` 'false',|`auto_load_merge` 'false',|`column_meta_cache` '',|`major_compaction_size` '512',|`compaction_preserve_segments` '13')
""".stripMargin)
可以直接复制粘贴到spark-shell里直接执行建表语句
快速生成sparksql创建carbondata表结构(同步mysql或sqlserver数据)脚本相关推荐
- 推荐一个数据库同步软件,可同步Mysql,sqlserver数据,支持实时同步
SyncNavigator v8.6.2 SyncNavigator是一款功能强大的数据库同步软件,适用于SQL SERVER, MySQL,具有自动/定时同步数据.无人值守.故障自动恢复.同构/异构 ...
- Mysql数据表结构同步Python实现
Mysql数据表结构同步Python实现 Python源码 #!/usr/bin/pythonimport MySQLdb import configparserclass SchemaMysql:# ...
- MySQL在创建相同表结构时as和like 使用的区别
1.MySQL的复制相同表结构方法: 1)create table table_name as select * from table1 where 1=2 (或者limit 0): 2) crea ...
- python同步两张数据表_Python 如何实现数据库表结构同步
近日,某个QQ 群里的一个朋友提出一个问题,如何将一个DB 的表结构同步给另一个DB. 针对这个问题,我进行了思考与实践,具体的实现代码如下所示: # coding:utf-8 import pymy ...
- php表单生成器实验报告,PHP表单生成器,快速生成现代化的form表单,快速上手
form-builder PHP表单生成器,快速生成现代化的form表单.包含复选框.单选框.输入框.下拉选择框等元素以及省市区三级联动.时间选择.日期选择.颜色选择.树型.文件/图片上传等功能. 本 ...
- mysql修改工资字段_基于Linux的MySQL操作实例(修改表结构,MySQL索引,MySQL数据引擎)...
基于Linux的MySQL操作实例(修改表结构,MySQL索引,MySQL数据引擎) 前言 本篇是基于Linux下针对MySQL表结构的修改,MySQL索引的操作以及MySQL数据引擎的配置和说明. ...
- word版表结构转为mysql DDL
昨天接到个任务,领导给了一份word文档,里面都是mysql表结构.让我把表结构转为mysql建表语句.此刻内心真是一万个... 一.解决思路: 1.解析word表结构 2.解析的数据转为建表语句 二 ...
- 【openai】请帮我设计一个通用的ERP管理系统,涉及到的表结构用mysql语言表达出来,全部写出来
背景 这周末把openAi集成到自己的web系统里面了 尝试提问了几个技术和日常问题,感觉回答的还不错 问题1:[请帮我设计一个通用的ERP管理系统,涉及到的表结构用mysql语言表达出来,全部写出来 ...
- Sql Server 生成 Word 文档 表结构
打开数据库编辑器,输入以下代码并执行(F5) SELECT--表名 = case when a.colorder=1 then d.name else '' end,--表说明 = case when ...
最新文章
- Linux上隐藏进程名(初级版)
- rsyslog的学习
- Python 基础 函数
- 【工具收藏】golang 开发工具包,json、sql 转 struct
- mysql引擎测试_MySQL MyISAM引擎和InnoDB引擎的性能测试
- 小米笔记本服务器系统,小米笔记本Pro GTX版
- c#枚举数字转枚举_C#枚举能力问题和解答 套装4
- 儿童吹泡泡水简单配方_儿童吹泡泡玩具水怎么制作
- laravel的启动过程解析(转)
- 硬件创新需要去理解的点(精炼总结)
- js与flash结合使用
- html5调用 扫码枪,js读取usb扫码枪数据功能代码实现
- linux挂载iso5后如何安装,CentOS系统怎样挂载光盘镜像ISO文件
- 02_RampTexture(渐变纹理)
- C#调用Qt写的dll,并处理异常来自 HRESULT:0x8007000B
- 行业分析报告-全球与中国客户满意度(CSAT)调查软件市场现状及未来发展趋势
- 2022.1.12C语言小练
- 金融行业IT运维现状问题和发展方向
- 5.PCIe协议分析3-PCIe TLP包详解1
- 连线:iPhone研发不为人知的故事 原型机纰漏百出-译文~iPhone秘史
热门文章
- 接口传值、回传、修改Fragment 老王版本
- 错误现象:(com.logicaldoc.core.security.dao.HibernateUserDAO 102) -- Packet for query
- 模糊聚类分析和模式识别
- 生成式摘要的四篇经典论文
- 密码学之数字信封 Digital_Envelope
- Service-Oriented Architecture:面向服务的架构(SOA)是一个组件模型
- shell脚本--重启服务
- Linux进阶-FTP服务器源码搭建(pureftpd)
- java队列实现入队push和出队pop
- 利用matlab数米粒数量,数米粒个数和每个米粒面积的matlab算法实现(递归)。