2019独角兽企业重金招聘Python工程师标准>>>

写4个类,比如我的是下面4个类

备注:因为我司用的是内部的zebra框架,所以代码自行替换连接池为druid等.

参数也请自行替换

1)MyJDBCAppendTableSink.java

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.flink.api.java.io.jdbc;import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.BatchTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.util.TableConnectorUtil;
import org.apache.flink.types.Row;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;import java.io.IOException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;/*** An at-least-once Table sink for JDBC.** <p>The mechanisms of Flink guarantees delivering messages at-least-once to this sink (if* checkpointing is enabled). However, one common use case is to run idempotent queries* (e.g., <code>REPLACE</code> or <code>INSERT OVERWRITE</code>) to upsert into the database and* achieve exactly-once semantic.</p>*/
@SuppressWarnings("rawtypes")
public class MyJDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {private final MyJDBCOutputFormat outputFormat;private String[] fieldNames;private TypeInformation[] fieldTypes;MyJDBCAppendTableSink(MyJDBCOutputFormat outputFormat) {this.outputFormat = outputFormat;}public static MyJDBCAppendTableSinkBuilder builder() {return new MyJDBCAppendTableSinkBuilder();}@Overridepublic void emitDataStream(DataStream<Row> dataStream) {dataStream.addSink(new MyJDBCSinkFunction(outputFormat)).name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames));}@Overridepublic void emitDataSet(DataSet<Row> dataSet) {dataSet.output(outputFormat);}@Overridepublic TypeInformation<Row> getOutputType() {return new RowTypeInfo(fieldTypes, fieldNames);}@Overridepublic String[] getFieldNames() {return fieldNames;}@Overridepublic TypeInformation<?>[] getFieldTypes() {return fieldTypes;}@Overridepublic TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {int[] types = outputFormat.getTypesArray();String sinkSchema =String.join(", ", IntStream.of(types).mapToObj(JDBCTypeUtil::getTypeName).collect(Collectors.toList()));String tableSchema =String.join(", ", Stream.of(fieldTypes).map(JDBCTypeUtil::getTypeName).collect(Collectors.toList()));String msg = String.format("Schema of output table is incompatible with JDBCAppendTableSink schema. " +"Table schema: [%s], sink schema: [%s]", tableSchema, sinkSchema);Preconditions.checkArgument(fieldTypes.length == types.length, msg);for (int i = 0; i < types.length; ++i) {Preconditions.checkArgument(JDBCTypeUtil.typeInformationToSqlType(fieldTypes[i]) == types[i],msg);}MyJDBCAppendTableSink copy;try {copy = new MyJDBCAppendTableSink(InstantiationUtil.clone(outputFormat));} catch (IOException | ClassNotFoundException e) {throw new RuntimeException(e);}copy.fieldNames = fieldNames;copy.fieldTypes = fieldTypes;return copy;}@VisibleForTestingMyJDBCOutputFormat getOutputFormat() {return outputFormat;}
}

2)MyJDBCAppendTableSinkBuilder.java

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.flink.api.java.io.jdbc;import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Preconditions;/*** A builder to configure and build the JDBCAppendTableSink.*/
public class MyJDBCAppendTableSinkBuilder {private String query;private int[]  parameterTypes;private String jdbcRef;public MyJDBCAppendTableSinkBuilder setJdbcRef(String ref) {this.jdbcRef = ref;return this;}/*** Specify the query that the sink will execute. Usually user can specify* INSERT, REPLACE or UPDATE to push the data to the database.* @param query The query to be executed by the sink.* @see org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.JDBCOutputFormatBuilder#setQuery(String)*/public MyJDBCAppendTableSinkBuilder setQuery(String query) {this.query = query;return this;}/*** Specify the type of the rows that the sink will be accepting.* @param types the type of each field*/public MyJDBCAppendTableSinkBuilder setParameterTypes(TypeInformation<?>... types) {int[] ty = new int[types.length];for (int i = 0; i < types.length; ++i) {ty[i] = JDBCTypeUtil.typeInformationToSqlType(types[i]);}this.parameterTypes = ty;return this;}/*** Specify the type of the rows that the sink will be accepting.* @param types the type of each field defined by {@see java.sql.Types}.*/public MyJDBCAppendTableSinkBuilder setParameterTypes(int... types) {this.parameterTypes = types;return this;}/*** Finalizes the configuration and checks validity.** @return Configured JDBCOutputFormat*/public MyJDBCAppendTableSink build() {Preconditions.checkNotNull(parameterTypes, "Types of the query parameters are not specified."+ " Please specify types using the setParameterTypes() method.");MyJDBCOutputFormat format = MyJDBCOutputFormat.buildJDBCOutputFormat().setQuery(query).setSqlTypes(parameterTypes).setJdbcRef(jdbcRef).finish();return new MyJDBCAppendTableSink(format);}
}

3)MyJDBCOutputFormat.java

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.flink.api.java.io.jdbc;import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.dianping.zebra.group.jdbc.GroupDataSource;import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;/*** OutputFormat to write Rows into a JDBC database.* The OutputFormat has to be configured using the supplied OutputFormatBuilder.** @see Row* @see DriverManager*/
public class MyJDBCOutputFormat extends RichOutputFormat<Row> {/**  */private static final long   serialVersionUID = -6949239510340172802L;private static final Logger LOG              = LoggerFactory.getLogger(JDBCOutputFormat.class);private int[]               typesArray;private String              sql;private String              jdbcRef;private GroupDataSource     groupDataSource;public MyJDBCOutputFormat() {}@Overridepublic void configure(Configuration parameters) {LOG.info("configure function invoked , thread -[{}]", Thread.currentThread().getName());}/*** Connects to the target database and initializes the prepared statement.** @param taskNumber The number of the parallel instance.* @throws IOException Thrown, if the output could not be opened due to an* I/O problem.*/@Overridepublic void open(int taskNumber, int numTasks) throws IOException {//老代码//        try {//            establishConnection();//            upload = dbConn.prepareStatement(query);//        } catch (SQLException sqe) {//            throw new IllegalArgumentException("open() failed.", sqe);//        } catch (ClassNotFoundException cnfe) {//            throw new IllegalArgumentException("JDBC driver class not found.", cnfe);//        }GroupDataSource dataSource = new GroupDataSource();dataSource.setJdbcRef(jdbcRef);dataSource.init();this.groupDataSource = dataSource;LOG.info("open function invoked , taskNumber-[{}], numTasks-[{}], jdbcRef-[{}] ,thread -[{}]",taskNumber, numTasks, jdbcRef, Thread.currentThread().getName());}//private void establishConnection() throws SQLException, ClassNotFoundException {//废弃//        Class.forName(drivername);//        if (username == null) {//            dbConn = DriverManager.getConnection(dbURL);//        } else {//            dbConn = DriverManager.getConnection(dbURL, username, password);//        }//}/*** Adds a record to the prepared statement.** <p>When this method is called, the output format is guaranteed to be opened.** <p>WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to* insert a null value but it's not guaranteed that the JDBC driver handles PreparedStatement.setObject(pos, null))** @param row The records to add to the output.* @see PreparedStatement* @throws IOException Thrown, if the records could not be added due to an I/O problem.*/@Overridepublic void writeRecord(Row row) throws IOException {//Connection dbConn = null;PreparedStatement preparedStatement = null;//try {if (typesArray != null && typesArray.length > 0&& typesArray.length != row.getArity()) {LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");}dbConn = groupDataSource.getConnection();preparedStatement = dbConn.prepareStatement(sql);if (typesArray == null) {// no types providedfor (int index = 0; index < row.getArity(); index++) {LOG.warn("Unknown column type for column {}. Best effort approach to set its value: {}.",index + 1, row.getField(index));preparedStatement.setObject(index + 1, row.getField(index));}} else {// types providedfor (int index = 0; index < row.getArity(); index++) {if (row.getField(index) == null) {preparedStatement.setNull(index + 1, typesArray[index]);} else {// casting values as suggested by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.htmlswitch (typesArray[index]) {case java.sql.Types.NULL:preparedStatement.setNull(index + 1, typesArray[index]);break;case java.sql.Types.BOOLEAN:case java.sql.Types.BIT:preparedStatement.setBoolean(index + 1,(boolean) row.getField(index));break;case java.sql.Types.CHAR:case java.sql.Types.NCHAR:case java.sql.Types.VARCHAR:case java.sql.Types.LONGVARCHAR:case java.sql.Types.LONGNVARCHAR:preparedStatement.setString(index + 1,(String) row.getField(index));break;case java.sql.Types.TINYINT:preparedStatement.setByte(index + 1, (byte) row.getField(index));break;case java.sql.Types.SMALLINT:preparedStatement.setShort(index + 1, (short) row.getField(index));break;case java.sql.Types.INTEGER:preparedStatement.setInt(index + 1, (int) row.getField(index));break;case java.sql.Types.BIGINT:preparedStatement.setLong(index + 1, (long) row.getField(index));break;case java.sql.Types.REAL:preparedStatement.setFloat(index + 1, (float) row.getField(index));break;case java.sql.Types.FLOAT:case java.sql.Types.DOUBLE:preparedStatement.setDouble(index + 1,(double) row.getField(index));break;case java.sql.Types.DECIMAL:case java.sql.Types.NUMERIC:preparedStatement.setBigDecimal(index + 1,(java.math.BigDecimal) row.getField(index));break;case java.sql.Types.DATE:preparedStatement.setDate(index + 1,(java.sql.Date) row.getField(index));break;case java.sql.Types.TIME:preparedStatement.setTime(index + 1,(java.sql.Time) row.getField(index));break;case java.sql.Types.TIMESTAMP:preparedStatement.setTimestamp(index + 1,(java.sql.Timestamp) row.getField(index));break;case java.sql.Types.BINARY:case java.sql.Types.VARBINARY:case java.sql.Types.LONGVARBINARY:preparedStatement.setBytes(index + 1, (byte[]) row.getField(index));break;default:preparedStatement.setObject(index + 1, row.getField(index));LOG.warn("Unmanaged sql type ({}) for column {}. Best effort approach to set its value: {}.",typesArray[index], index + 1, row.getField(index));// case java.sql.Types.SQLXML// case java.sql.Types.ARRAY:// case java.sql.Types.JAVA_OBJECT:// case java.sql.Types.BLOB:// case java.sql.Types.CLOB:// case java.sql.Types.NCLOB:// case java.sql.Types.DATALINK:// case java.sql.Types.DISTINCT:// case java.sql.Types.OTHER:// case java.sql.Types.REF:// case java.sql.Types.ROWID:// case java.sql.Types.STRUC}}}}if (false == preparedStatement.execute()) {LOG.error("execute preparedStatement fail !!!");}} catch (SQLException e) {throw new RuntimeException("Preparation/Execution of JDBC statement failed.", e);} finally {//释放preparedStatementif (null != preparedStatement) {try {preparedStatement.close();preparedStatement = null;} catch (SQLException e) {}}//释放dbConnif (null != dbConn) {try {dbConn.close();//释放到连接池,此逻辑由zebra保证dbConn = null;} catch (SQLException e) {}}}}void flush() {}public int[] getTypesArray() {return typesArray;}/*** Executes prepared statement and closes all resources of this instance.** @throws IOException Thrown, if the input could not be closed properly.*/@Overridepublic void close() throws IOException {if (null != groupDataSource) {try {groupDataSource.close();} catch (SQLException e) {LOG.error("", e);} finally {groupDataSource = null;//help GC}}}public static MyJDBCOutputFormatBuilder buildJDBCOutputFormat() {return new MyJDBCOutputFormatBuilder();}/*** Builder for a {@link JDBCOutputFormat}.*/public static class MyJDBCOutputFormatBuilder {private final MyJDBCOutputFormat format;protected MyJDBCOutputFormatBuilder() {this.format = new MyJDBCOutputFormat();}public MyJDBCOutputFormatBuilder setQuery(String query) {format.sql = query;return this;}public MyJDBCOutputFormatBuilder setSqlTypes(int[] typesArray) {format.typesArray = typesArray;return this;}public MyJDBCOutputFormatBuilder setJdbcRef(String ref) {format.jdbcRef = ref;return this;}/*** Finalizes the configuration and checks validity.** @return Configured MyJDBCOutputFormat*/public MyJDBCOutputFormat finish() {if (format.sql == null) {throw new IllegalArgumentException("No sql statement supplied.");}return format;}}}

4)MyJDBCSinkFunction.java

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.flink.api.java.io.jdbc;import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.types.Row;class MyJDBCSinkFunction extends RichSinkFunction<Row> implements CheckpointedFunction {/**  */private static final long serialVersionUID = 2120156461628723467L;final MyJDBCOutputFormat outputFormat;MyJDBCSinkFunction(MyJDBCOutputFormat outputFormat) {this.outputFormat = outputFormat;}@Overridepublic void invoke(Row value) throws Exception {outputFormat.writeRecord(value);}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {outputFormat.flush();}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);RuntimeContext ctx = getRuntimeContext();outputFormat.setRuntimeContext(ctx);outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());}@Overridepublic void close() throws Exception {outputFormat.close();super.close();}
}

转载于:https://my.oschina.net/qiangzigege/blog/2872867

参考官方mysql自定义一个mysql sink connector相关推荐

  1. Mtop,Mysql Top,一个Mysql的监控工具

    mtop 实时监控 MySQL Requires the following perl modules: Module             Available At     ----------- ...

  2. python将数组传入mysql_通过python将文件中的数据传输到MySQL,传到,mysql

    我们需要几个包来实现这一功能,下面我和详细讲一下这几个包 一.pymysql 要传数据到数据库中肯定先要连接数据库 ①连接数据库 db = pymysql.connect('localhost', ' ...

  3. mysql 定一个函数_mysql自定义函数

    mysql中的UDF(自定义函数),其实是个好东西,比如可以写好一些方法或 函数,然后进行调用,而且是在SQL语句中可以进行调用. DROP FUNCTION CalculateAmount CREA ...

  4. 如何添加MySQL插件_如何开发一个自定义的MySQL插件

    MySQL自带了很多插件,比如半同步插件.审计插件.密码验证插件等等,甚至MySQL存储引擎也是以插件方式实现的.MySQL开放的插件接口,为开发者开发自定义插件提供了便利.本文将介绍如何快速开发一个 ...

  5. ssh mysql环境搭建_搭建一个MySQL高可用架构集群环境

    架构 使用一台MHA manager.一台MySQL master节点.两台MySQL slave节点 软件版本 虚拟机:Ubuntu 18 MySQL:5.7.32 MHA:0.54 环境检查 安装 ...

  6. golang mysql proxy_mixer: 一个用go实现的mysql proxy

    介绍 mixer是一个用go实现的mysql proxy,支持基本的mysql代理功能. mysql的中间件很多,对于市面上面现有的功能强大的proxy,我主要考察了如下几个: mysql-proxy ...

  7. 自定义安装mysql linux_linux下 安装mysql 问题

    展开全部 因为32313133353236313431303231363533e4b893e5b19e31333365633934mysql程序在启动的时候  非常依赖my.cnf里面的配置,而my. ...

  8. mysql自定义变量比较大小_MySQL 自定义变量@ 常用案例

    以下文章来源于SQL开发与优化 大家好,我是知数堂SQL 优化班老师 网名:骑龟的兔子 很久没有写文章,最近碰到了一个非常有意思的Oracle SQL 案例, 这个案例,我用了一些窗口函数来解决的,后 ...

  9. mysql自定义变量赋值顺序_MySQL 自定义变量@ 常用案例

    大家好,我是知数堂SQL 优化班老师 网名:骑龟的兔子 很久没有写文章,最近碰到了一个非常有意思的Oracle SQL 案例, 这个案例,我用了一些窗口函数来解决的,后来想想,能否跟MySQL 有所关 ...

  10. mysql 自定义数据_MySQL数据库自定义变量@的用法与常用案例

    很久没有写文章,最近碰到了一个非常有意思的Oracle SQL 案例,这个案例,我用了一些窗口函数来解决的,后来想想,能否跟MySQL有所关联,就用一个SQL,总结了@的常用用法. 首先我们看下,如下 ...

最新文章

  1. 使用matlab进行mex编译时的路径问题mexopts
  2. C/C++程序员必读的十本书(上)
  3. Bootstrap按钮支持的元素
  4. spring数据字典_Redis为什么默认16个数据库?
  5. 最新型号设备信息对照表_所有iPhone设备都可能被解锁!黑客发布新款越狱软件“Unc0ver”...
  6. WPF在DLL中读取Resource的方法
  7. struts2标签库使用小结
  8. HP电脑的增霸卡功能操作详解
  9. 21世纪十大营销法则
  10. java7723魂斗罗2_魂斗罗3代-完全版
  11. 关于音频EQ、DRC、等响度、3D环绕音、虚拟低音、变音、AEC、AGC、ANS等解释
  12. 批准此iphone 前往已登录iCloud的其他设备来批准这台iPhone
  13. 执念斩长河21年Q2生活心得
  14. 数字和ASII码之间的转换
  15. 字体设计:五种常用的字体修改方法,让你的文字更好看
  16. 全网最全的浏览器本地存储,赶紧收藏
  17. SPRINGBOOT中如何进行开发环境和生产环境的配置?
  18. SNMP 协议RFC
  19. 云计算笔记一 云计算系统的产生
  20. ESXI 6.0正式版官网下载地址

热门文章

  1. 人生最要紧的是充实自己
  2. pandas读取csv文件数据并对数据分类使用matplotlib画出折线图
  3. matlab数字信号处理 王彬 pdf,MATLAB数字信号处理
  4. 设置 jetbrains 的背景颜色和字体。
  5. YAF根据条件拼接SQL语句
  6. C++文件操作的HelloWorld
  7. 一组这几年,美国人均寿命变化的数据
  8. OFFICE技术讲座:边框底纹对布局的影响
  9. VS2015编译的OpenJDK8,会崩溃
  10. 编译OpenJDK8-u302出错:error C3861: “INT64_C”: 找不到标识符