1、场景

kafka日志数据从kafka读取

1、关联字典表:完善日志数据

2、判断日志内容级别:多路输出

低级:入clickhouse

高级:入clickhouse的同时推送到kafka供2次数据流程处理。

2、实现

package com.ws.kafka2clickhouse;import cn.hutool.json.JSONUtil;
import com.ws.kafka2clickhouse.bean.CompanyInfo;
import com.ws.kafka2clickhouse.bean.LogEvent;
import com.ws.kafka2clickhouse.sink.MyClickHouseSink;
import org.apache.avro.data.Json;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;public class Kafka2ClickHouse {public static void main(String[] args) throws Exception {System.setProperty("java.net.preferIPv4Stack", "true");System.setProperty("HADOOP_USER_NAME", "hdfs");StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(8);
//        env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
//        env.getCheckpointConfig().setCheckpointStorage("file:///D:/out_test/ck");
//        env.getCheckpointConfig().setCheckpointStorage("hdfs://hdp01:8020/tmp/kafka2hdfs/");StreamTableEnvironment tenv = StreamTableEnvironment.create(env);// 1、读取主流日志数据KafkaSource<String> build = KafkaSource.<String>builder().setTopics("dataSource").setGroupId("group1").setBootstrapServers("hdp01:6667").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> kafka = env.fromSource(build, WatermarkStrategy.noWatermarks(), "kafka");// 2、主流数据json转换成POJO对象SingleOutputStreamOperator<LogEvent> beans = kafka.map((MapFunction<String, LogEvent>) s -> JSONUtil.toBean(s, LogEvent.class));// 3、加载字典表cdc流tenv.executeSql("CREATE TABLE dmpt_base_oper_log (\n" +"id bigInt primary key," +"title String" +") WITH (\n" +"'connector' = 'mysql-cdc',\n" +"'hostname' = 'localhost',\n" +"'port' = '3306',\n" +"'username' = 'root',\n" +"'password' = 'root',\n" +"'database-name' = 'test',\n" +"'table-name' = 'test_recursive'\n" +")");Table result = tenv.sqlQuery("select * from dmpt_base_oper_log");DataStream<Row> dict = tenv.toChangelogStream(result);dict.print();// 4、加工字典数据,并组装上 字典表更新类型SingleOutputStreamOperator<CompanyInfo> companyDict = dict.map(new RichMapFunction<Row, CompanyInfo>() {@Overridepublic CompanyInfo map(Row row) throws Exception {Long id = (Long) row.getField("id");String title = (String) row.getField("title");// 携带上cdc数据的数据类型,《新增,删除,修改》RowKind kind = row.getKind();return new CompanyInfo(id, title, kind);}});// 5、对字典数据进行广播MapStateDescriptor<Long, CompanyInfo> company_info_desc = new MapStateDescriptor<>("company_info_dict", Long.class, CompanyInfo.class);BroadcastStream<CompanyInfo> broadcastStream = companyDict.broadcast(company_info_desc);// 6、创建测流OutputTag<String> tokafka = new OutputTag<String>("tokafka") {};SingleOutputStreamOperator<LogEvent> beans_company = beans.connect(broadcastStream).process(new BroadcastProcessFunction<LogEvent, CompanyInfo, LogEvent>() {@Overridepublic void processElement(LogEvent logEvent, ReadOnlyContext readOnlyContext, Collector<LogEvent> collector) throws Exception {// 新来一条数据流,处理方法ReadOnlyBroadcastState<Long, CompanyInfo> broadcastState = readOnlyContext.getBroadcastState(company_info_desc);CompanyInfo companyInfo = broadcastState.get(logEvent.getMessageId());// 7、如果有单位信息,代表为高级用户数据,将消息同时吐到kafka,因此再输出到主流的同时往测流中也输出一份if (companyInfo != null) {logEvent.setCompanyInfo(companyInfo);readOnlyContext.output(tokafka, JSONUtil.toJsonStr(logEvent));}collector.collect(logEvent);}@Overridepublic void processBroadcastElement(CompanyInfo companyInfo, Context context, Collector<LogEvent> collector) throws Exception {// 新来一条广播流,处理方法BroadcastState<Long, CompanyInfo> broadcastState = context.getBroadcastState(company_info_desc);// 新增if (companyInfo.getRowKind().name().equalsIgnoreCase(RowKind.INSERT.name())) {broadcastState.put(companyInfo.getId(), companyInfo);} else if (companyInfo.getRowKind().name().equalsIgnoreCase(RowKind.DELETE.name())) {// 删除broadcastState.remove(companyInfo.getId());} else {// 修改broadcastState.remove(companyInfo.getId());broadcastState.put(companyInfo.getId(), companyInfo);}}});//准备向ClickHouse中插入数据的sqlString insetIntoCkSql = "insert into default.dns_logs values(?,?,?,?,?,?,?,?,?,?,?,?,?)";//设置ClickHouse SinkSinkFunction<LogEvent> sink = JdbcSink.sink(//插入数据SQLinsetIntoCkSql,//设置插入ClickHouse数据的参数new JdbcStatementBuilder<LogEvent>() {@Overridepublic void accept(PreparedStatement preparedStatement, LogEvent logEvent) throws SQLException {try {preparedStatement.setString(1, logEvent.getMessageType());preparedStatement.setLong(2, logEvent.getMessageId());preparedStatement.setString(3, logEvent.getDeviceId());preparedStatement.setString(4, logEvent.getCol1());preparedStatement.setString(5, logEvent.getCol2());preparedStatement.setString(6, logEvent.getCol3());preparedStatement.setString(7, logEvent.getCol4());preparedStatement.setString(8, logEvent.getHeaders().getDeviceTime());preparedStatement.setLong(9, logEvent.getHeaders().get_uid());preparedStatement.setString(10, logEvent.getHeaders().getProductId());preparedStatement.setString(11, logEvent.getHeaders().getOrgId());if (logEvent.getCompanyInfo() != null) {preparedStatement.setString(12, logEvent.getCompanyInfo().getTitle());} else {preparedStatement.setString(12, null);}preparedStatement.setString(13, logEvent.getRegion());} catch (SQLException e) {e.printStackTrace();}}},//设置批次插入数据new JdbcExecutionOptions.Builder()// 批次大小,默认5000.withBatchSize(10000)// 批次间隔时间.withBatchIntervalMs(5000).withMaxRetries(3).build(),//设置连接ClickHouse的配置new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("ru.yandex.clickhouse.ClickHouseDriver").withUrl("jdbc:clickhouse://192.16.10.118:1111").withUsername("default").withPassword("xxxx").build());// 8、所有数据进入基础库beans_company.addSink(sink);beans_company.print("基础库clickhouse");// 9、高级用户同时推送到分析kafkaDataStream<String> sideOutput = beans_company.getSideOutput(tokafka);sideOutput.print("增强分析kafka");Properties properties = new Properties();properties.setProperty("bootstrap.servers", "hdp01:6667");// 10、构建kafka sinkKafkaSerializationSchema<String> serializationSchema = new KafkaSerializationSchema<String>() {@Overridepublic ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {return new ProducerRecord<>("dataZengQiang", // target topicelement.getBytes(StandardCharsets.UTF_8)); // record contents}};FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>("dataZengQiang",             // target topicserializationSchema,    // serialization schemaproperties,             // producer configFlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance// 11、写入kafkasideOutput.addSink(myProducer);env.execute();}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>test</artifactId><groupId>org.example</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>flink</artifactId><properties><flink.version>1.13.2</flink.version><scala.version>2.11</scala.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.version}</artifactId><version>${flink.version}</version></dependency><!-- flinkSql 需要的依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.1</version></dependency><!-- clickhouse驱动 --><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.2.4</version></dependency><!-- flink-cdc-mysql 连接器--><dependency><groupId>com.ws</groupId><artifactId>mysql-cdc</artifactId><version>2.2.0</version><scope>system</scope><systemPath>${project.basedir}/lib/flink-connector-mysql-cdc-2.3-SNAPSHOT.jar</systemPath></dependency></dependencies><build><plugins><plugin><!-- 把依赖打进jar包 --><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.3.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>

flink-cdc,clickhouse写入,多路输出相关推荐

  1. StarRocks X Flink CDC,打造端到端实时链路

    实时数仓建设背景 实时数仓需求 随着互联网行业的飞速发展,企业业务种类变得越来越多,数据量也变得越来越大.以 Apache Hadoop 生态为核心的数据看板业务一般只能实现离线的业务.在部分领域,数 ...

  2. 如何利用 Flink CDC 实现数据增量备份到 Clickhouse

    挖了很久的CDC坑,今天打算填一填了.本文我们首先来介绍什么是CDC,以及CDC工具选型,接下来我们来介绍如何通过Flink CDC抓取mysql中的数据,并把他汇入Clickhouse里,最后我们还 ...

  3. Flink CDC 将MySQL的数据写入Hudi实践

    Flink CDC + Hudi实践 一.依赖关系 1.Maven依赖 2.SQL客户端JAR 二.设置MySQL服务器 1.创建MySQL用户: 2.向用户授予所需的权限: 3.最终确定用户的权限: ...

  4. Flink CDC 2.0 正式发布,详解核心改进

    简介:Flink CDC 2.0.0 版本于 8 月 10 日正式发布,点击了解详情- 本文由社区志愿者陈政羽整理,内容来源自阿里巴巴高级开发工程师徐榜江 (雪尽) 7 月 10 日在北京站 Flin ...

  5. Flink CDC 2.0 数据处理流程全面解析

    点击上方蓝色字体,选择"设为星标" 回复"面试"获取更多惊喜 8月份 FlinkCDC 发布2.0.0版本,相较于1.0版本,在全量读取阶段支持分布式读取.支持 ...

  6. Flink CDC 新一代数据集成框架

    前言: 主要讲解了技术原理,入门与生产实践,主要功能:全增量一体化数据集成.实时数据入库入仓.最详细的教程.Flink CDC 是Apache Flink的一个重要组件,主要使用了CDC技术从各种数据 ...

  7. 一文进入Flink CDC 的世界

    文章目录 前言 为什么要学FlinkCDC 什么是CDC CDC 的种类 Flink CDC Flink CDC 案例实操 依赖准备 编码(DataStream) 本地测试 配置MySQL中的binl ...

  8. Flink CDC + Hudi 海量数据入湖在顺丰的实践

    摘要:本文整理自顺丰大数据研发工程师覃立辉在 5月 21 日 Flink CDC Meetup 的演讲.主要内容包括: 顺丰数据集成背景 Flink CDC 实践问题与优化 未来规划 Tips:点击「 ...

  9. Flink系列之:Flink CDC深入了解MySQL CDC连接器

    Flink系列之:Flink CDC深入了解MySQL CDC连接器 一.增量快照特性 1.增量快照读取 2.并发读取 3.全量阶段支持 checkpoint 4.无锁算法 5.MySQL高可用性支持 ...

最新文章

  1. 福昕pdf编辑器 android,机PDF编辑器安卓/iOS哪家强?职场达人都在用
  2. mysql缺少函数_零散的MySQL基础总是记不住?看这一篇就够了!
  3. Dell T620大硬盘安装Win2008R2
  4. python3中的dict循环性能对比
  5. 混合音乐推荐系统_比女朋友更懂你的“音乐推荐系统”,是怎样搭建出来的?...
  6. 前端学习(1855)vue之电商管理系统电商系统之安装mysql出现VCRUNTIME140_1.dll
  7. C++之指针探究(十):this指针
  8. php 设置多个html条件_PHP-FPM是个啥
  9. PHP数组合并的常见问题
  10. 两个三维图像互信息python_两的解释|两的意思|汉典“两”字的基本解释
  11. mysql中count(*)、count(1)和count(字段)的区别
  12. Spring SAS 0.2.0 上手教程
  13. 论文中设置章节自动编号
  14. 清华小学上册计算机教学案例,创新型教学案例.doc
  15. 用MATLAB写斐波那契数列
  16. 海外众筹:kickstarter众筹创建成功项目分享
  17. C++(11):多线程同步packaged_task
  18. 富有哲理的12条大数据金句
  19. 羽毛球击球点与自身位置[羽球技术入门必读]
  20. Js日期函数-Date方法

热门文章

  1. java程序cpu突然飚高_高频面试题:Java程序占用 CPU 过高怎么排查
  2. 搭建windows下的php开发环境
  3. 51 Node 大鱼吃小鱼(栈水题)
  4. win10电脑pppoe拨号模块损坏_系统pppoe拨号模块损坏怎么办
  5. 蓝牙电话协议HFP(Hands-Free Profile) 传输手机状态信息(信号/漫游/电量/运行商/电话状态)
  6. java tcp 乱码_Java和C++通过Socket通信中文乱码的解决
  7. bzoj2733 永无乡
  8. 运动控制器激光振镜控制
  9. UnicodeDecodeError: ‘ascii‘ codec can‘t decode byte 0xbb in position 51: ord
  10. CF1005A Tanya and Stairways