前言

当实时同步mysql或sqlserver很多表数据到carbondata时,经常要手动调整脚本涉及到的每个表的字段、类型及对应建表语句,耗费大量的机械比对粘贴复制工作时间、精力,下面介绍的脚本能快速调整好改动点,只需手动复制改一下即可(快速、准确)

需同步的mysql或sqlserver原表结构

//sqlserver
CREATE TABLE [dbo].[TOTicket] ([Id] char(16) COLLATE Chinese_PRC_CI_AS  NOT NULL,[ParkId] int  NOT NULL,[Qty] int  NOT NULL,[AgencySaleTicketClassId] int  NULL,[ValidStartDate] datetime  NOT NULL,[ValidDays] int  NOT NULL,[Price] decimal(18,2)  NOT NULL,[SalePrice] decimal(18,2)  NOT NULL,[SettlementPrice] decimal(18,2)  NULL,[Amount] decimal(18,2)  NOT NULL,[InparkCounts] int  NOT NULL,[InvoiceId] bigint  NULL,[TerminalId] int  NULL,[TOVoucherId] char(18) COLLATE Chinese_PRC_CI_AS  NULL,[Remark] varchar(512) COLLATE Chinese_PRC_CI_AS  NULL,[LastModificationTime] datetime  NULL,[LastModifierUserId] bigint  NULL,[CreationTime] datetime  NOT NULL,[CreatorUserId] bigint  NULL,[ParkSettlementPrice] decimal(18,2)  NULL,[TicketSaleStatus] int DEFAULT ((0)) NOT NULL,[TicketFormEnum] int DEFAULT ((0)) NOT NULL,[SyncTicketType] int  NULL,[FirstInparkTime] datetime  NULL,[PrintTicketTime] datetime  NULL,[TOBodyId] char(21) COLLATE Chinese_PRC_CI_AS  NOT NULL,[RV] timestamp  NOT NULL,[IsFreeze] bit DEFAULT ((0)) NOT NULL,[TOHeaderId] char(18) COLLATE Chinese_PRC_CI_AS DEFAULT '' NOT NULL,[PId] nvarchar(30) COLLATE Chinese_PRC_CI_AS  NULL,[AgencyDeriveTicketClassId] int DEFAULT ((0)) NOT NULL,CONSTRAINT [PK_dbo.TOTicket] PRIMARY KEY NONCLUSTERED ([Id])
WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON)
ON [PRIMARY]
)//mysql
CREATE TABLE `parkderiveticketclassrelation` (`Id` bigint(20) NOT NULL,`CreationTime` datetime(6) NOT NULL,`CreatorUserId` bigint(20) NOT NULL,`LastModificationTime` datetime(6) DEFAULT NULL,`LastModifierUserId` bigint(20) NOT NULL,`IsDeleted` bit(1) NOT NULL,`DeletionTime` datetime(6) DEFAULT NULL,`DeleterUserId` bigint(20) NOT NULL,`ParkDeriveTicketClassId` int(11) NOT NULL DEFAULT '0',`ParkId` int(11) NOT NULL,`InParkBeginTime` varchar(10) DEFAULT NULL,`InParkEndTime` varchar(10) DEFAULT NULL,PRIMARY KEY (`Id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC;

目标结构(脚本自动生成)

脚本代码

★★★此脚本为初版临时用,需进一步优化,仅供参考


import org.apache.commons.lang3.StringUtils;import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;/*** @ClassName CarbonSql* @Description //TODO* @Author * @Date 2020/8/18 10:46* @Version 1.0**/
public class CarbonSql {//获取sqlServer表字段、是否主键、类型、表字段描述public static String sqlServerSql(String tableName) {String sql = "SELECT\n" +"    cast(a.name as varchar) as columnName,\n" +"    cast(case when exists(SELECT 1 FROM sysobjects where xtype = 'PK' and parent_obj = a.id and name in (\n" +"               SELECT name FROM sysindexes WHERE indid in(SELECT indid FROM sysindexkeys WHERE id = a.id AND colid = a.colid))) then 'PRI' else '' end as varchar) as priKey,\n" +"    cast(b.name as varchar) as columnType, \n" +"    cast(isnull(g.[value], '') as varchar) as columnComment \n" +"FROM\n" +"    syscolumns a\n" +"left join\n" +"    systypes b\n" +"on\n" +"    a.xusertype = b.xusertype\n" +"inner join\n" +"    sysobjects d\n" +"on\n" +"    a.id = d.id  and d.xtype = 'U' and d.name <> 'dtproperties'\n" +"left join\n" +"    syscomments e\n" +"on\n" +"    a.cdefault = e.id\n" +"left join\n" +"sys.extended_properties g\n" +"on\n" +"    a.id = G.major_id and a.colid = g.minor_id\n" +"left join\n" +"sys.extended_properties f\n" +"on\n" +"    d.id = f.major_id and f.minor_id = 0\n" +"where\n" +"    d.name = '" + tableName + "' \n" +"order by\n" +"    a.id,a.colorder";return sql;}//获取mysql表字段、是否主键、类型、表字段描述public static String mysqlSql(String tableName, String dataBase) {String sql = "Select COLUMN_NAME as columnName, COLUMN_KEY as priKey, DATA_TYPE as columnType, COLUMN_COMMENT as columnComment \n" +"from INFORMATION_SCHEMA.COLUMNS \n" +"Where table_name = '" + tableName + "'  \n" +"AND table_schema = '" + dataBase + "'";return sql;}//将sqlserver或者mysql字段类型转成carbondata对应的类型(若不满足需要,可自行修改)public static String findColumnType2(String str) {str = str.toLowerCase();String type;if (str.startsWith("int") || str.startsWith("smallint")) {type = "INT";}  else if (str.startsWith("bigint")) {type = "LONG";}  else if (str.startsWith("decimal(")) {type = "DOUBLE";}  else if (str.startsWith("bit")) {type = "BOOLEAN";}  else if (str.startsWith("datetime")) {type = "TIMESTAMP";} else {type = "STRING";}return type;}//将结果写入文本中保存public static void write(String ss) throws IOException {String path = "E:\\tmp\\carbon.txt";//将写入转化为流的形式BufferedWriter bw = new BufferedWriter(new FileWriter(path, true));//true,则追加写入text文本//一次写一行
//        String ss = "测试数据";bw.write(ss);bw.newLine();  //换行用bw.flush();//关闭流bw.close();System.out.println("写入成功");}//获取database信息public static void findDataBase(String tableName) {//维度表与增量表划分String incre_table = ",toticket,taickettable3,parkderiveticketclassrelation,mysqltable2,table1,usertable2,tradetable1,";String demi_table = ",taickettable1,mysqltabl1,parkderiveticketclass,usertable1,tradetable2,taickettable2,table2,";//表真实来源String ticket_sqlserver = ",taickettable1,taickettable2,toticket,taickettable3,";String ticket_mysql = ",mysqltabl1,parkderiveticketclass,parkderiveticketclassrelation,mysqltable2,";String shop = ",table1,table2,";String user = ",usertable1,usertable2,";String trade = ",tradetable1,tradetable2,";String sourceType = "";String source = "";String className = "";String url = "";String userName = "";String password = "";String dataBase = "";String str = "," + tableName + ",";if (incre_table.contains(str)) {sourceType = "增量表";} else {sourceType = "维度表";}System.out.println(sourceType);//获取数据库连接if (ticket_sqlserver.contains(str)) {source = "ticket sqlserver";className = "com.microsoft.sqlserver.jdbc.SQLServerDriver";url = "jdbc:sqlserver://1.xx.xx.xx:9527;DatabaseName=data2";userName = "**";password = "*************";dataBase = "";} else if (ticket_mysql.contains(str)) {source = "ticket mysql";className = "com.mysql.cj.jdbc.Driver";if (str.contains("parkderiveticketclass" ) || str.contains("parkderiveticketclassrelation")|| str.contains("mysqltable5") || str.contains("mysqltable9")|| str.contains("mysqltable222")) {url = "jdbc:mysql://1x.xx.xx.xx:3306/data_one";dataBase = "data_one";} else {url = "jdbc:mysql://1x.xx.xx.xx:3306/data_two";dataBase = "data_two";}userName = "***";password = "*****************";} else if (shop.contains(str)) {source = "shop";className = "com.microsoft.sqlserver.jdbc.SQLServerDriver";url = "jdbc:sqlserver://1x.xx.xx.xx:666666;DatabaseName=data999";userName = "***";password = "*******************";dataBase = "";} else if (user.contains(str)) {source = "user数据";className = "com.mysql.cj.jdbc.Driver";url = "jdbc:mysql://xxxxxxxxx:9527/user";userName = "****";password = "*******";dataBase = "user";} else if (trade.contains(str)) {source = "xxx数据";className = "com.mysql.cj.jdbc.Driver";url = "jdbc:mysql://aaa-sss-xxxx.sql.xxxcdb.com:35845/case2222";userName = "****";password = "*******";dataBase = "case2222";}createSql(className, url, userName, password, dataBase, tableName, sourceType, source);}public static void createSql(String className, String url, String userName, String password, String dataBase, String tableName, String sourceType, String source) {//拼接carbon建表语句StringBuffer stringBuffer = new StringBuffer();ResultSet re = null;Statement statement = null;Connection con = null;try {Class.forName(className);con = DriverManager.getConnection(url, userName, password);statement = con.createStatement();String sql = "";if (dataBase.isEmpty()) {sql = sqlServerSql(tableName);} else {sql = mysqlSql(tableName, dataBase);}re = statement.executeQuery(sql);//执行查询语句,接收结果集String columnName = "";List<String> priKeyList = new ArrayList<>();String columnType = "";stringBuffer.append("spark.sql( \n").append("s\"\"\" \n").append("   |CREATE TABLE IF NOT EXISTS ").append(tableName).append(" ( \n");while (re.next()) {columnName = re.getString("columnName");String priKey = re.getString("priKey");if (StringUtils.isNotEmpty(priKey) && priKey.equalsIgnoreCase("PRI")) {priKeyList.add(columnName);}columnType = findColumnType2(re.getString("columnType"));stringBuffer.append("   |   ").append(columnName).append(" ").append(columnType).append(", \n");}//去除最后一行字段的逗号stringBuffer.deleteCharAt(stringBuffer.length() - 3);stringBuffer.append("   |)\n" +"   |USING carbondata\n" +"   |OPTIONS (\n" +"   |`compaction_level_threshold` '4,3',\n" +"   |`table_blocksize` '1024',\n" +"   |`sort_scope` 'local_sort',\n" +"   |`cache_level` 'block',\n" +"   |`sort_columns` 'Id',\n" +"   |`local_dictionary_enable` 'false',\n" +"   |`auto_load_merge` 'false',\n" +"   |`column_meta_cache` '',\n" +"   |`major_compaction_size` '512',\n" +"   |`compaction_preserve_segments` '13')\n" +"\"\"\".stripMargin)\n\n");System.out.println(stringBuffer.toString());stringBuffer.insert(0, ("// 主键 : ").concat(StringUtils.join(priKeyList.toArray(), ",")).concat("\n"));stringBuffer.insert(0, ("// ").concat(tableName).concat(" ").concat(sourceType).concat(" ").concat(" 取自 ").concat(source).concat("\n"));write(stringBuffer.toString());} catch (ClassNotFoundException e) {e.printStackTrace();} catch (SQLException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} finally {if (null != con) {try {con.close();} catch (SQLException e) {e.printStackTrace();}}if (null != statement) {try {statement.close();} catch (SQLException e) {e.printStackTrace();}}if (null != re) {try {re.close();} catch (SQLException e) {e.printStackTrace();}}}}public static void second() throws IOException {//testString tableStr = "toticket,parkderiveticketclassrelation";String[] arr = tableStr.split(",");String tableName = "";for (String s1: arr) {tableName = s1;findDataBase(tableName);}}public static void main(String[] args) throws IOException {second();}}

结果

// toticket 增量表  取自 ticket sqlserver
// 主键 : Id
spark.sql(
s""" |CREATE TABLE IF NOT EXISTS toticket ( |   Id STRING, |   ParkId INT, |   Qty INT, |   AgencySaleTicketClassId INT, |   ValidStartDate TIMESTAMP, |   ValidDays INT, |   Price STRING, |   SalePrice STRING, |   SettlementPrice STRING, |   Amount STRING, |   InparkCounts INT, |   InvoiceId LONG, |   TerminalId INT, |   TOVoucherId STRING, |   Remark STRING, |   LastModificationTime TIMESTAMP, |   LastModifierUserId LONG, |   CreationTime TIMESTAMP, |   CreatorUserId LONG, |   ParkSettlementPrice STRING, |   TicketSaleStatus INT, |   TicketFormEnum INT, |   SyncTicketType INT, |   FirstInparkTime TIMESTAMP, |   PrintTicketTime TIMESTAMP, |   TOBodyId STRING, |   RV STRING, |   IsFreeze BOOLEAN, |   TOHeaderId STRING, |   PId STRING, |   AgencyDeriveTicketClassId INT |)|USING carbondata|OPTIONS (|`compaction_level_threshold` '4,3',|`table_blocksize` '1024',|`sort_scope` 'local_sort',|`cache_level` 'block',|`sort_columns` 'Id',|`local_dictionary_enable` 'false',|`auto_load_merge` 'false',|`column_meta_cache` '',|`major_compaction_size` '512',|`compaction_preserve_segments` '13')
""".stripMargin)// parkderiveticketclassrelation 维度表  取自 ticket mysql
// 主键 : Id
spark.sql(
s""" |CREATE TABLE IF NOT EXISTS parkderiveticketclassrelation ( |   Id LONG, |   CreationTime TIMESTAMP, |   CreatorUserId LONG, |   LastModificationTime TIMESTAMP, |   LastModifierUserId LONG, |   IsDeleted BOOLEAN, |   DeletionTime TIMESTAMP, |   DeleterUserId LONG, |   ParkDeriveTicketClassId INT, |   ParkId INT, |   InParkBeginTime STRING, |   InParkEndTime STRING |)|USING carbondata|OPTIONS (|`compaction_level_threshold` '4,3',|`table_blocksize` '1024',|`sort_scope` 'local_sort',|`cache_level` 'block',|`sort_columns` 'Id',|`local_dictionary_enable` 'false',|`auto_load_merge` 'false',|`column_meta_cache` '',|`major_compaction_size` '512',|`compaction_preserve_segments` '13')
""".stripMargin)

可以直接复制粘贴到spark-shell里直接执行建表语句

快速生成sparksql创建carbondata表结构(同步mysql或sqlserver数据)脚本相关推荐

  1. 推荐一个数据库同步软件,可同步Mysql,sqlserver数据,支持实时同步

    SyncNavigator v8.6.2 SyncNavigator是一款功能强大的数据库同步软件,适用于SQL SERVER, MySQL,具有自动/定时同步数据.无人值守.故障自动恢复.同构/异构 ...

  2. Mysql数据表结构同步Python实现

    Mysql数据表结构同步Python实现 Python源码 #!/usr/bin/pythonimport MySQLdb import configparserclass SchemaMysql:# ...

  3. MySQL在创建相同表结构时as和like 使用的区别

    1.MySQL的复制相同表结构方法: 1)create table table_name as select * from table1 where 1=2 (或者limit  0): 2) crea ...

  4. python同步两张数据表_Python 如何实现数据库表结构同步

    近日,某个QQ 群里的一个朋友提出一个问题,如何将一个DB 的表结构同步给另一个DB. 针对这个问题,我进行了思考与实践,具体的实现代码如下所示: # coding:utf-8 import pymy ...

  5. php表单生成器实验报告,PHP表单生成器,快速生成现代化的form表单,快速上手

    form-builder PHP表单生成器,快速生成现代化的form表单.包含复选框.单选框.输入框.下拉选择框等元素以及省市区三级联动.时间选择.日期选择.颜色选择.树型.文件/图片上传等功能. 本 ...

  6. mysql修改工资字段_基于Linux的MySQL操作实例(修改表结构,MySQL索引,MySQL数据引擎)...

    基于Linux的MySQL操作实例(修改表结构,MySQL索引,MySQL数据引擎) 前言 本篇是基于Linux下针对MySQL表结构的修改,MySQL索引的操作以及MySQL数据引擎的配置和说明. ...

  7. word版表结构转为mysql DDL

    昨天接到个任务,领导给了一份word文档,里面都是mysql表结构.让我把表结构转为mysql建表语句.此刻内心真是一万个... 一.解决思路: 1.解析word表结构 2.解析的数据转为建表语句 二 ...

  8. 【openai】请帮我设计一个通用的ERP管理系统,涉及到的表结构用mysql语言表达出来,全部写出来

    背景 这周末把openAi集成到自己的web系统里面了 尝试提问了几个技术和日常问题,感觉回答的还不错 问题1:[请帮我设计一个通用的ERP管理系统,涉及到的表结构用mysql语言表达出来,全部写出来 ...

  9. Sql Server 生成 Word 文档 表结构

    打开数据库编辑器,输入以下代码并执行(F5) SELECT--表名 = case when a.colorder=1 then d.name else '' end,--表说明 = case when ...

最新文章

  1. Linux上隐藏进程名(初级版)
  2. rsyslog的学习
  3. Python 基础 函数
  4. 【工具收藏】golang 开发工具包,json、sql 转 struct
  5. mysql引擎测试_MySQL MyISAM引擎和InnoDB引擎的性能测试
  6. 小米笔记本服务器系统,小米笔记本Pro GTX版
  7. c#枚举数字转枚举_C#枚举能力问题和解答 套装4
  8. 儿童吹泡泡水简单配方_儿童吹泡泡玩具水怎么制作
  9. laravel的启动过程解析(转)
  10. 硬件创新需要去理解的点(精炼总结)
  11. js与flash结合使用
  12. html5调用 扫码枪,js读取usb扫码枪数据功能代码实现
  13. linux挂载iso5后如何安装,CentOS系统怎样挂载光盘镜像ISO文件
  14. 02_RampTexture(渐变纹理)
  15. C#调用Qt写的dll,并处理异常来自 HRESULT:0x8007000B
  16. 行业分析报告-全球与中国客户满意度(CSAT)调查软件市场现状及未来发展趋势
  17. 2022.1.12C语言小练
  18. 金融行业IT运维现状问题和发展方向
  19. 5.PCIe协议分析3-PCIe TLP包详解1
  20. 连线:iPhone研发不为人知的故事 原型机纰漏百出-译文~iPhone秘史

热门文章

  1. 接口传值、回传、修改Fragment 老王版本
  2. 错误现象:(com.logicaldoc.core.security.dao.HibernateUserDAO 102) -- Packet for query
  3. 模糊聚类分析和模式识别
  4. 生成式摘要的四篇经典论文
  5. 密码学之数字信封 Digital_Envelope
  6. Service-Oriented Architecture:面向服务的架构(SOA)是一个组件模型
  7. shell脚本--重启服务
  8. Linux进阶-FTP服务器源码搭建(pureftpd)
  9. java队列实现入队push和出队pop
  10. 利用matlab数米粒数量,数米粒个数和每个米粒面积的matlab算法实现(递归)。