在公司中,遇到这样一个业务,需要将数据库A从oracle迁移到pg数据库,原本让实习生去实现了这样的一个工具,但是最后他写出来的工具存在较大问题。

正好最近在学习spark、flink等流式处理框架,那么我们就用flink来处理这样一个需求吧:

1、主类:

package com.ogj.flink;import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.types.Row;import java.util.concurrent.TimeUnit;/*** @author ouguangji*/
public class DbMove {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSource<Row> dataSource = env.createInput(JDBCInputFormat.buildJDBCInputFormat().setDBUrl(DBContent.SourceDB.url).setDrivername(DBContent.PGDRIVER).setUsername(DBContent.SourceDB.username).setPassword(DBContent.SourceDB.password).setQuery("select task_name,file_path from cloud_task").setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO)).finish());dataSource.output(JDBCOutputFormat.buildJDBCOutputFormat().setDBUrl(DBContent.DstDB.url).setDrivername(DBContent.PGDRIVER).setUsername(DBContent.DstDB.username).setPassword(DBContent.DstDB.password).setQuery("insert into test(task_name,file_path) values(?,?)").finish());env.execute("db move");System.out.println("写入数据中");TimeUnit.SECONDS.sleep(5);//查询出来DataSource<Row> read = env.createInput(JDBCInputFormat.buildJDBCInputFormat().setDBUrl(DBContent.DstDB.url).setDrivername(DBContent.PGDRIVER).setUsername(DBContent.DstDB.username).setPassword(DBContent.DstDB.password).setQuery("select task_name,file_path from test").setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO)).finish());System.out.println("read dst dataSource");read.print();System.out.println("=========end==========");}
}

2、数据库配置类

package com.ogj.flink;public class DBContent {public static final String MYSQLDRIVER = "com.mysql.jdbc.Driver";public static final String PGDRIVER = "org.postgresql.Driver";public static class SourceDB {public static String url = "jdbc:postgresql://127.0.0.1:5432/dmt_url?currentSchema=schema_name";public static String username = "postgres";public static String password = "123456";}public static class DstDB {public static String url = "jdbc:postgresql://127.0.0.1:5432/RequestMonitor";public static String username = "postgres";public static String password = "123456";}
}

3、maven打包配置:

<build>
<!--        &lt;!&ndash;scala待编译的文件目录&ndash;&gt;-->
<!--        <sourceDirectory>src/main/scala</sourceDirectory>-->
<!--        <testSourceDirectory>src/test/scala</testSourceDirectory>-->
<!--        &lt;!&ndash;scala插件&ndash;&gt;--><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin><!--            <plugin>-->
<!--                <groupId>net.alchim31.maven</groupId>-->
<!--                <artifactId>scala-maven-plugin</artifactId>-->
<!--                <version>3.2.2</version>-->
<!--                <executions>-->
<!--                    <execution>-->
<!--                        <goals>-->
<!--                            <goal>compile</goal>-->
<!--                            <goal>testCompile</goal>-->
<!--                        </goals>-->
<!--                        <configuration>-->
<!--                            <args>-->
<!--                                &lt;!&ndash;<arg>-make:transitive</arg>&ndash;&gt;&lt;!&ndash;scala2.11 netbean不支持这个参数&ndash;&gt;-->
<!--                                <arg>-dependencyfile</arg>-->
<!--                                <arg>${project.build.directory}/.scala_dependencies</arg>-->
<!--                            </args>-->
<!--                        </configuration>-->
<!--                    </execution>-->
<!--                </executions>-->
<!--            </plugin>--><!-- 这是个编译java代码的 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.0</version><configuration><source>8</source><target>8</target><encoding>UTF-8</encoding></configuration><executions><execution><phase>compile</phase><goals><goal>compile</goal></goals></execution></executions></plugin><!--            &lt;!&ndash; 这是个编译scala代码的 &ndash;&gt;-->
<!--            <plugin>-->
<!--                <groupId>net.alchim31.maven</groupId>-->
<!--                <artifactId>scala-maven-plugin</artifactId>-->
<!--                <version>3.2.1</version>-->
<!--                <executions>-->
<!--                    <execution>-->
<!--                        <id>scala-compile-first</id>-->
<!--                        <phase>process-resources</phase>-->
<!--                        <goals>-->
<!--                            <goal>add-source</goal>-->
<!--                            <goal>compile</goal>-->
<!--                        </goals>-->
<!--                    </execution>-->
<!--                </executions>-->
<!--            </plugin>--><!--manven打包插件--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>reference.conf</resource></transformer><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>cn.itcast.rpc.Master</mainClass> <!--main方法--></transformer></transformers></configuration></execution></executions></plugin></plugins></build>

运行完毕就是这样的啦:

使用flink迁移数据相关推荐

  1. 分库分表:如何做到永不迁移数据和避免热点?

    点击关注公众号,Java干货及时送达 中大型项目中,一旦遇到数据量比较大,小伙伴应该都知道就应该对数据进行拆分了.有垂直和水平两种. 垂直拆分比较简单,也就是本来一个数据库,数据量大之后,从业务角度进 ...

  2. 你知道怎么分库分表吗?如何做到永不迁移数据和避免热点吗?

    点击关注公众号,Java干货及时送达 前言 中大型项目中,一旦遇到数据量比较大,小伙伴应该都知道就应该对数据进行拆分了.有垂直和水平两种. 垂直拆分比较简单,也就是本来一个数据库,数据量大之后,从业务 ...

  3. 迁移数据中心的三大风险与应对策略

    数据中心迁移是项复杂工程,可能很难向为迁移活动拨款的企业高管解释清楚,需要了解并管理相关的业务运营风险.下面我们分析一下与管理数据中心有关的几个难题和风险. 服务可用性 数据中心的主要目的就是,托管运 ...

  4. 迁移数据文件到ASM【转】

    1.迁移数据文件到ASM  1)数据库一致性情况下迁移:  将数据库启动到mount状态,生成rman copy 语句,然后在rman中执行:  SQL> startup mount  SQL& ...

  5. 分库分表这样玩,可以永不迁移数据、避免热点

    作者 | 老顾聊技术 来源 | http://www.toutiao.com/i6677459303055491597 中大型项目中,一旦遇到数据量比较大,小伙伴应该都知道就应该对数据进行拆分了.有垂 ...

  6. 分库分表?如何做到永不迁移数据和避免热点?

    一.前言 中大型项目中,一旦遇到数据量比较大,小伙伴应该都知道就应该对数据进行拆分了.有垂直和水平两种. 垂直拆分比较简单,也就是本来一个数据库,数据量大之后,从业务角度进行拆分多个库.如下图,独立的 ...

  7. rediscluster全局数据_redis迁移,从redis cluster集群迁移数据到另外一个redis cluster集群...

    redis迁移,redis集群迁移$ yum install automake libtool autoconf bzip2 -y $ git clone https://github.com/vip ...

  8. flink 写入到es_《从0到1学习Flink》—— Flink 写入数据到 Kafka

    前言 之前文章 <从0到1学习Flink>-- Flink 写入数据到 ElasticSearch 写了如何将 Kafka 中的数据存储到 ElasticSearch 中,里面其实就已经用 ...

  9. ASP.NET Core Identity 迁移数据 - ASP.NET Core 基础教程 - 简单教程,简单编程

    ASP.NET Core Identity 迁移数据 - ASP.NET Core 基础教程 - 简单教程,简单编程 原文:ASP.NET Core Identity 迁移数据 - ASP.NET C ...

最新文章

  1. python生成pdf报表_用python的reportlab库生成PDF报表
  2. asp格式化日期函数
  3. NYOJ 150 Train Problem I STL栈
  4. 毕业论文排版之Word 中公式居中,编号靠右该怎么设置(针对左右不对称页边距)
  5. bpython_Python机器学习(入门)
  6. SVN ---文件加锁,执行clean up命令
  7. Document/View 模式下的窗口重绘
  8. golang之web编程入门
  9. java 私有云_[原创]私有云中实现以应用程序为中心的PaaS
  10. Contest2973 - 2021秋组队训练赛第十二场 问题 M: Cook Pancakes!
  11. 盘古开源在芯片领域崛起,专注于芯片研发
  12. BIOS,U-BOOT,BootLoader三者的对比
  13. C#中ref和out关键字的应用以及区别。
  14. 动态规化 - 最小编辑距离
  15. 用户协议html代码,微信小程序同意用户协议确认投稿页面设计制作开发教程
  16. Chromium网页GPU光栅化原理分析
  17. 高通平台wifi模块SMMU配置
  18. 曲面细分(loop曲面细分,catmull曲面细分)(计算机图形学学习笔记)
  19. ipad横竖屏转屏的坑要注意
  20. 动态调试OllyDbg工具

热门文章

  1. 表格的属性、表格的合并及表单
  2. 2023 简单在线画板HTML源码
  3. 手机厂商“卷”到了手腕上
  4. leetcode【121】Best Time to Buy and Sell Stock【c++,O(n)复杂度,时间97%,空间100%】
  5. python 调用scp命令 实践
  6. Knockout.js学习笔记----绑定
  7. 星淘惠告诉你跨境平台那么多,凭什么要选亚马逊?
  8. 小红书最新的内容趋势是什么?
  9. transact sql
  10. 50本关于软件测试的书籍