1需求

需求:准备将一些明细数据存入clickhouse中,方便事件的在线检索。开发一个Flink的clickhouseSink;

flink版本 1.10 ,flink版本1.12 两个版本

ClickHouse的java api也是基于jdbc的接口来做的。所以按照 JDBCSinkFunction 继承RichSinkFunction来实现。并且还需要创建一个JDBCOutputFormat来用于真正的输出数据到clickhouse

2.代码

2.1JdbcSinkFunction 代码

package com.rongan.realtime.sink;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
public class JDBCSinkFunction<T> extends RichSinkFunction<T> implements CheckpointedFunction {final RAJDBCOutputFormat<T> outputFormat;public JDBCSinkFunction(RAJDBCOutputFormat outputFormat) {this.outputFormat = outputFormat;}@Overridepublic void snapshotState(FunctionSnapshotContext context) {outputFormat.flush();}@Overridepublic void initializeState(FunctionInitializationContext context) {}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);RuntimeContext ctx = getRuntimeContext();outputFormat.setRuntimeContext(ctx);outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());}@Overridepublic void close() throws Exception {outputFormat.close();super.close();}@Overridepublic void invoke(T value, Context context) throws Exception {outputFormat.writeRecord(value);}
}

2.2JDBCOutputFormat代码

package com.rongan.realtime.sink;import org.apache.flink.api.java.io.jdbc.AbstractJDBCOutputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.lang.reflect.Field;
import java.sql.PreparedStatement;
import java.sql.SQLException;public class RAJDBCOutputFormat<T> extends AbstractJDBCOutputFormat<T> {private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class);private final String query;private final int batchInterval;private PreparedStatement upload;private int batchCount = 0;public RAJDBCOutputFormat(String username, String password, String drivername,String dbURL, String query, int batchInterval) {super(username, password, drivername, dbURL);this.query = query;this.batchInterval = batchInterval;}/*** Connects to the target database and initializes the prepared statement.** @param taskNumber The number of the parallel instance.* @throws IOException Thrown, if the output could not be opened due to an*                     I/O problem.*/@Overridepublic void open(int taskNumber, int numTasks) throws IOException {try {establishConnection();upload = connection.prepareStatement(query);} catch (SQLException sqe) {throw new IllegalArgumentException("open() failed.", sqe);} catch (ClassNotFoundException cnfe) {throw new IllegalArgumentException("JDBC driver class not found.", cnfe);}}@Overridepublic void writeRecord(T row) throws IOException {try {setRecordToStatement(upload, row);upload.addBatch();} catch (SQLException e) {throw new RuntimeException("Preparation of JDBC statement failed.", e);}batchCount++;if (batchCount >= batchInterval) {// execute batchflush();}}private void setRecordToStatement(PreparedStatement ps, T obj) {Field[] fields = obj.getClass().getDeclaredFields();//跳过的属性计数for (int i = 0; i < fields.length; i++) {Field field = fields[i];//设置私有属性可访问field.setAccessible(true);try {//获取属性值Object o = field.get(obj);ps.setObject(i + 1, o);} catch (Exception e) {e.printStackTrace();}}}void flush() {try {upload.executeBatch();batchCount = 0;} catch (SQLException e) {throw new RuntimeException("Execution of JDBC statement failed.", e);}}int[] getTypesArray() {return null;}/*** Executes prepared statement and closes all resources of this instance.** @throws IOException Thrown, if the input could not be closed properly.*/@Overridepublic void close() throws IOException {if (upload != null) {flush();try {upload.close();} catch (SQLException e) {LOG.info("JDBC statement could not be closed: " + e.getMessage());} finally {upload = null;}}closeDbConnection();}
}

2.3 demo 输入kafka数据往Clickhouse插入数据

click建表语句

create table t_rsd_incident(id String,src_ip String,dest_ip String,src_network_area  String,dest_network_area String,incident_time String,date String) engine =MergeTreepartition by dateprimary key (id)order by (id)

demo

package com.rongan.realtime.demoimport java.util.{Date, Properties}
import com.rongan.realtime.bean.Incident
import com.rongan.realtime.sink.{JDBCSinkFunction, RAJDBCOutputFormat}
import com.rongan.realtime.util.{DateUtils, FlinkUtil}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.table.api.{Table, TableSchema}
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
import org.apache.kafka.clients.consumer.ConsumerConfigobject WordCount2 {def main(args: Array[String]): Unit = {//1.创建 flink 执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()val tableEnv: StreamTableEnvironment = FlinkUtil.initTableEnv(env)//2.创建kafkasourceval props = new Properties()props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.7.128:9092")props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "TEST")val consumer = new FlinkKafkaConsumer011[String]("demo1", new SimpleStringSchema(), props)val kafkaDataStream: DataStream[String] = env.addSource(consumer).setParallelism(3) //创建kafkasourc流设置并行度为3//3.转换//    kafkaDataStream.print()tableEnv.createTemporaryView("kafkaStream", kafkaDataStream, 'message)//4.执行sqlval table: Table = tableEnv.sqlQuery("select message as id,'192.168.1.1' as srcIp,'192.168.1.1' as destIp,'bsdf' as srcNetworkArea, 'sfse' as destNetworkArea," +"" +"DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss' )as incidentTime from kafkaStream " +"")//4.转换为clickhouse表对应的实体bean类val incidentStream: DataStream[Incident] = tableEnv.toAppendStream[Row](table).map(row => {val id: String = row.getField(0).asInstanceOf[String]val srcIp: String = row.getField(1).asInstanceOf[String]val destIp: String = row.getField(2).asInstanceOf[String]val srcNetworkArea: String = row.getField(3).asInstanceOf[String]val destNetworkArea: String = row.getField(4).asInstanceOf[String]val incidentTime: String = row.getField(5).asInstanceOf[String]new Incident(id, srcIp, destIp, srcNetworkArea, destNetworkArea, incidentTime, DateUtils.targetFormat(new Date(), DateUtils.YYYY_MM_DD))})//5.创建SinkFunctionval format = new RAJDBCOutputFormat[Incident]("", "", "ru.yandex.clickhouse.ClickHouseDriver", "jdbc:clickhouse://192.168.7.123:8123/default", "insert into t_rsd_incident values(?,?,?,?,?,?,?)", 1)val functionSinkFunction = new JDBCSinkFunction[Incident](format)incidentStream.addSink(functionSinkFunction)env.execute()//5.执行任务env.execute("job")}
}case class Incident(id: String, srcIp: String, destIp: String, srcNetworkArea: String, destNetworkArea: String, incidentTime: String, Date: String)

Flink-clickhousesink相关推荐

  1. 阿里SLS -> Flink -> ClickHouseSink 写入动态列

    1.需求 需求:通过flink解析SLS中的JSON数据并以动态列的形式存入clickhouse中,方便事件的在线检索,开发一个Flink的clickhouseSink.其中实现动态列最关键的就是根据 ...

  2. 【clickhouse】flink clickhouse sink Class.forName 卡死

    文章目录 1.概述 1.概述 一个flink项目要用到flink-clickhouse-sink,然后在git上找到的 git地址:https://github.com/ivi-ru/flink-cl ...

  3. 【Flink】flink ClickHouseSink--数据写入ClickHouse

    1.概述 转载:Flink自定义ClickHouseSink–数据写入ClickHouse 这个版本是flink1.11版本才有的相关功能. 遇到需要将Kafka数据写入ClickHouse的场景,本 ...

  4. Apache Flink写入Clickhouse报错 code: 1002, ip:8123 failed to respond

    目录 版本 一.问题 1 问题描述 2 日志 二.解决 解决过程 1 升级驱动jar包版本(无效) 2 修改连接参数(无效) 3 修改服务端参数(有效) 4 修改获取连接方式(todo) 参考资料 版 ...

  5. 如何利用 Flink CDC 实现数据增量备份到 Clickhouse

    挖了很久的CDC坑,今天打算填一填了.本文我们首先来介绍什么是CDC,以及CDC工具选型,接下来我们来介绍如何通过Flink CDC抓取mysql中的数据,并把他汇入Clickhouse里,最后我们还 ...

  6. hadoop,spark,scala,flink 大数据分布式系统汇总

    20220314 https://shimo.im/docs/YcPW8YY3T6dT86dV/read 尚硅谷大数据文档资料 iceberg相当于对hive的读写,starrocks相当于对mysq ...

  7. 2021年大数据Flink(四十八):扩展阅读  Streaming File Sink

    目录 扩展阅读  Streaming File Sink 介绍 场景描述 Bucket和SubTask.PartFile 案例演示 扩展阅读  配置详解 PartFile PartFile序列化编码 ...

  8. 2021年大数据Flink(四十六):扩展阅读 异步IO

    目录 扩展阅读  异步IO 介绍 异步IO操作的需求 使用Aysnc I/O的前提条件 Async I/O API 案例演示 扩展阅读 原理深入 AsyncDataStream 消息的顺序性 扩展阅读 ...

  9. 2021年大数据Flink(四十五):​​​​​​扩展阅读 双流Join

    目录 扩展阅读  双流Join 介绍 Window Join Interval Join ​​​​​​​代码演示1 ​​​​​​​代码演示2 重点注意 扩展阅读  双流Join 介绍 https:// ...

  10. 2021年大数据Flink(四十四):​​​​​​扩展阅读 End-to-End Exactly-Once

    目录 扩展阅读 End-to-End Exactly-Once 流处理的数据处理语义 At-most-once-最多一次 At-least-once-至少一次 Exactly-once-精确一次 En ...

最新文章

  1. php写项目,php写web项目
  2. java定时器检测状态_实用代码| shell脚本实现定时检查应用状态
  3. python mysql数据库长连接_python 长连接 mysql数据库
  4. Python(十)之GUI编程
  5. Android--批量插入数据到SQLite数据库
  6. leetcode844. 比较含退格的字符串(栈+双指针)
  7. 网络编程 socket介绍
  8. 「Luogu5395」【模板】第二类斯特林数·行
  9. Bootstrap 图片样式
  10. 四六级考试中的计算机类文章,毕业设计论文-基于web的英语四六级模拟在线考试系统.doc...
  11. [深度学习] loss不下降的解决方法
  12. springboot 使用spring security
  13. vue3移动端腾讯地图坐标拾取,获取当前定位(腾讯、高德、百度、天地图),火星坐标GCJ-02–>百度坐标BD-09,根据坐标经纬度计算两点距离的方法,点击链接打开地图导航的方法
  14. 在python语言中、写文件的操作是_Python语言之详解文件操作
  15. bootstrap入门之Code代码显示
  16. Multisim14 安装出错情况说明,以及解决办法
  17. # 图书管理系统案例练习
  18. 写入clickhouse效率低总结
  19. 这些 Google 高级搜索技巧,你都知道么?
  20. redis 五大数据类型

热门文章

  1. 【问链-EOS公开课】第六课 什么是石墨烯技术?
  2. 对比Google翻译、百度翻译和有道翻译
  3. 统一配置中心对比介绍
  4. 【RTOS训练营】队列的读写、休眠和唤醒、常规应用、使用和晚课提问
  5. 如何做好跨境电商店铺定位—扬帆牧哲
  6. C语言之__attribute__((visibility(“default“)))等gcc flag讲解(六十二)
  7. Data Whale第20期组队学习 Pandas学习—第一次综合练习
  8. Qt小程序之自绘震动铃铛提示控件
  9. Pointnet(part_seg)train.py,test.py代码随记
  10. 命令关闭所有cmd窗口