Apache Griffin+Flink+Kafka实现流式数据质量监控实战
点击上方蓝色字体,选择“设为星标”
回复"面试"获取更多惊喜
八股文教给我,你们专心刷题和面试
Hi,我是王知无,一个大数据领域的原创作者。
放心关注我,获取更多行业的一手消息。
一. 组件及版本
本文用的组件包括以下几个,是参考了官方案例,版本可以参考github以及里面的pom文件。本文假定以下环境均已安装好。
JDK (1.8)
MySQL(version 5.6)
Hadoop (2.7.2)
Hive (version 2.4)
Spark (version 2.4.1)
Kafka (version 0.11)
Griffin (version 0.6.0)
Zookeeper (version 3.4.1)
这里有详细的配置过程和可能遇到的bug。
二. kafka数据生成脚本
由于是测试案例,我们就写一个生成数据的脚本,并且把数据写到kafka source中,真实的场景应该是源源不断写数据到kafka中的(比如flume或者其他工具),具体数据脚本和模版可以参考官方demo数据
gen-data.sh
#!/bin/bash#current time
cur_time=`date +%Y-%m-%d_%H:%M:%S`
sed s/TIME/$cur_time/ /opt/module/data/source.temp > /opt/module/data/source.tp#create data
for row in 1 2 3 4 5 6 7 8 9 10
dosed -n "${row}p" < /opt/module/data/source.tp > slinecnt=`shuf -i1-2 -n1`clr="red"if [ $cnt == 2 ]; then clr="yellow"; fised s/COLOR/$clr/ sline >> /opt/module/data/source.data
done
rm slinerm source.tp#import data
kafka-console-producer.sh --broker-list hadoop101:9092 --topic source < /opt/module/data/source.datarm source.dataecho "insert data at ${cur_time}"
streaming-data.sh
#!/bin/bash#create topics
kafka-topics.sh --create --zookeeper hadoop101:2181 --replication-factor 1 --partitions 1 --topic source
kafka-topics.sh --create --zookeeper hadoop101:2181 --replication-factor 1 --partitions 1 --topic target#every minute
set +e
while true
do/opt/module/data/gen-data.shsleep 90
done
set -e
source.temp
{"id": 1, "name": "Apple", "color": "COLOR", "time": "TIME"}
{"id": 2, "name": "Banana", "color": "COLOR", "time": "TIME"}
{"id": 3, "name": "Cherry", "color": "COLOR", "time": "TIME"}
{"id": 4, "name": "Durian", "color": "COLOR", "time": "TIME"}
{"id": 5, "name": "Lichee", "color": "COLOR", "time": "TIME"}
{"id": 6, "name": "Peach", "color": "COLOR", "time": "TIME"}
{"id": 7, "name": "Papaya", "color": "COLOR", "time": "TIME"}
{"id": 8, "name": "Lemon", "color": "COLOR", "time": "TIME"}
{"id": 9, "name": "Mango", "color": "COLOR", "time": "TIME"}
{"id": 10, "name": "Pitaya", "color": "COLOR", "time": "TIME"}
三. Flink流式处理
flink流式数据分成三个部分,读取kafka,业务处理,写入kafka
首先交代我的pom.xml引入的依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xxxx</groupId><artifactId>kafka_Flink_kafka_Test</artifactId><version>1.0-SNAPSHOT</version><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.7.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.0</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.ink.FlinkLambdaTest.FlinkToLambda</mainClass></transformer><transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>reference.conf</resource></transformer></transformers><relocations><relocation><pattern>org.codehaus.plexus.util</pattern><shadedPattern>org.shaded.plexus.util</shadedPattern><excludes><exclude>org.codehaus.plexus.util.xml.Xpp3Dom</exclude><exclude>org.codehaus.plexus.util.xml.pull.*</exclude></excludes></relocation></relocations></configuration></execution></executions></plugin></plugins></build><dependencies><!--<dependency>--><!--<groupId>org.apache.flink</groupId>--><!--<artifactId>flink-table_2.10</artifactId>--><!--<version>1.3.2</version>--><!--</dependency>--><dependency><groupId>org.json</groupId><artifactId>json</artifactId><version>20090211</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.6.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.11</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_2.11</artifactId><version>1.10.1</version></dependency></dependencies></project>
先写个bean类模版,用来接收json数据
import java.util.Date;public class Student{private int id;private String name;private String color;private Date time;public Student(){}public Student(int id, String name, String color, Date time) {this.id = id;this.name = name;this.color = color;this.time = time;}public int getId() {return id;}public void setId(int id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getColor() {return color;}public void setColor(String color) {this.color = color;}public Date getTime() {return time;}public void setTime(Date time) {this.time = time;}@Overridepublic String toString() {return "Student{" +"id=" + id +", name='" + name + '\'' +", color='" + color + '\'' +", time='" + time + '\'' +'}';}
}
读取kafka,有关读取和写入kafka的配置信息,是可以写到kafkaUtil工具类中的,我这里为了方便,就直接嵌入到代码中了,就做个测试
// 创建Flink执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 一定要设置启动检查点!!//env.enableCheckpointing(5000);//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1);// Kafka参数Properties properties = new Properties();properties.setProperty("bootstrap.servers", "hadoop101:9092");properties.setProperty("group.id", "consumer-group");properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("auto.offset.reset", "latest");String inputTopic = "source";String outputTopic = "target";// SourceFlinkKafkaConsumer010<String> consumer =new FlinkKafkaConsumer010<String>(inputTopic, new SimpleStringSchema(), properties);DataStream<String> stream = env.addSource(consumer);
flink业务处理,这一块由于所处的业务不同,我只是简单demo以下,以20%的概率修改数据使之成为异常数据用于检测,这是为了模拟业务中可能对数据处理有误而发生数据质量问题。这里要特别提一下,本案例是假定flink业务处理时延忽略不计,真实场景中可能由于flink处理延迟导致target端误认为数据丢失,这一部分我还在研究他的源码,日后更新,有了解的大神,还请指点迷津。
//使用Flink算子简单处理数据// Transformations// 使用Flink算子对输入流的文本进行操作// 按空格切词、计数、分区、设置时间窗口、聚合//{"id": 1, "name": "Apple", "color": "COLOR", "time": "TIME"}DataStream<String> outMap = stream.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {return handleData(value);}});
public static String handleData(String line){try {if (line!=null&& !line.equals("")){Gson gson = new GsonBuilder().setLenient().setDateFormat("yyyy-MM-dd_HH:mm:ss").create();JsonReader reader = new JsonReader(new StringReader(line));Student student = gson.fromJson(reader, Student.class);int rand = ra.nextInt(10) + 1;if (rand > 8) student.setName(student.getName() + "_" + ra.nextInt(10));return gson.toJson(student);}else return "";}catch (Exception e){return "";}}
因为遇到了几个bug,所以这样创建gson
写入kafka,其中FlinkKafkaProducer010我们选择的构造器是(brokerList,topicId,serializationSchema)
//SinkoutMap.addSink(new FlinkKafkaProducer010<String>("hadoop101:9092","target",new SimpleStringSchema()));outMap.print();env.execute();
四. Apache Griffin配置与启动
有关griffin的streaming模式配置,就是配置dq.json和env.json
dq.json
{"name": "streaming_accu","process.type": "streaming","data.sources": [{"name": "src","baseline": true,"connector": {"type": "kafka","version": "0.10","config": {"kafka.config": {"bootstrap.servers": "hadoop101:9092","group.id": "griffin","auto.offset.reset": "largest","auto.commit.enable": "false"},"topics": "source_1","key.type": "java.lang.String","value.type": "java.lang.String"},"pre.proc": [{"dsl.type": "df-opr","rule": "from_json"}]},"checkpoint": {"type": "json","file.path": "hdfs://hadoop101:9000/griffin/streaming/dump/source","info.path": "source_1","ready.time.interval": "10s","ready.time.delay": "0","time.range": ["-5m", "0"],"updatable": true}}, {"name": "tgt","connector": {"type": "kafka","version": "0.10","config": {"kafka.config": {"bootstrap.servers": "hadoop101:9092","group.id": "griffin","auto.offset.reset": "largest","auto.commit.enable": "false"},"topics": "target_1","key.type": "java.lang.String","value.type": "java.lang.String"},"pre.proc": [{"dsl.type": "df-opr","rule": "from_json"}]},"checkpoint": {"type": "json","file.path": "hdfs://hadoop101:9000/griffin/streaming/dump/target","info.path": "target_1","ready.time.interval": "10s","ready.time.delay": "0","time.range": ["-1m", "0"]}}],"evaluate.rule": {"rules": [{"dsl.type": "griffin-dsl","dq.type": "accuracy","out.dataframe.name": "accu","rule": "src.login_id = tgt.login_id AND src.bussiness_id = tgt.bussiness_id AND src.event_id = tgt.event_id","details": {"source": "src","target": "tgt","miss": "miss_count","total": "total_count","matched": "matched_count"},"out":[{"type":"metric","name": "accu"},{"type":"record","name": "missRecords"}]}]},"sinks": ["HdfsSink"]
}
env.json
{"spark": {"log.level": "WARN","checkpoint.dir": "hdfs://hadoop101:9000/griffin/checkpoint","batch.interval": "20s","process.interval": "1m","init.clear": true,"config": {"spark.default.parallelism": 4,"spark.task.maxFailures": 5,"spark.streaming.kafkaMaxRatePerPartition": 1000,"spark.streaming.concurrentJobs": 4,"spark.yarn.maxAppAttempts": 5,"spark.yarn.am.attemptFailuresValidityInterval": "1h","spark.yarn.max.executor.failures": 120,"spark.yarn.executor.failuresValidityInterval": "1h","spark.hadoop.fs.hdfs.impl.disable.cache": true}},"sinks": [{"name":"ConsoleSink","type": "console"},{"name":"HdfsSink","type": "hdfs","config": {"path": "hdfs://hadoop101:9000/griffin/persist"}},{"name":"ElasticsearchSink","type": "elasticsearch","config": {"method": "post","api": "http://hadoop101:9200/griffin/accuracy"}}],"griffin.checkpoint": [{"type": "zk","config": {"hosts": "hadoop101:2181","namespace": "griffin/infocache","lock.path": "lock","mode": "persist","init.clear": true,"close.clear": false}}]
}
最后把项目提交到spark上运行,检测数据
spark-submit --class org.apache.griffin.measure.Application --master yarn --deploy-mode client --queue default \
--driver-memory 1g --executor-memory 1g --num-executors 3 \
<path>/griffin-measure.jar \
<path>/env.json <path>/dq.json
五. 全局代码
在本地创建个maven项目,由于这是个简单的测试项目,自己构建就好,我只写了两个类做测试
Student.class
import java.util.Date;public class Student{private int id;private String name;private String color;private Date time;public Student(){}public Student(int id, String name, String color, Date time) {this.id = id;this.name = name;this.color = color;this.time = time;}public int getId() {return id;}public void setId(int id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getColor() {return color;}public void setColor(String color) {this.color = color;}public Date getTime() {return time;}public void setTime(Date time) {this.time = time;}@Overridepublic String toString() {return "Student{" +"id=" + id +", name='" + name + '\'' +", color='" + color + '\'' +", time='" + time + '\'' +'}';}
}
flinkProcess.class
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.stream.JsonReader;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;import java.io.StringReader;
import java.util.Properties;
import java.util.Random;public class flinkProcess {public static Random ra = new Random();public static void main(String[] args) throws Exception {// 创建Flink执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 一定要设置启动检查点!!//env.enableCheckpointing(5000);//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1);// Kafka参数Properties properties = new Properties();properties.setProperty("bootstrap.servers", "hadoop101:9092");properties.setProperty("group.id", "consumer-group");properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("auto.offset.reset", "latest");String inputTopic = "source";String outputTopic = "target";// SourceFlinkKafkaConsumer010<String> consumer =new FlinkKafkaConsumer010<String>(inputTopic, new SimpleStringSchema(), properties);DataStream<String> stream = env.addSource(consumer);//使用Flink算子简单处理数据// Transformations// 使用Flink算子对输入流的文本进行操作// 按空格切词、计数、分区、设置时间窗口、聚合//{"id": 1, "name": "Apple", "color": "COLOR", "time": "TIME"}DataStream<String> outMap = stream.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {return handleData(value);}});//SinkoutMap.addSink(new FlinkKafkaProducer010<String>("hadoop101:9092","target",new SimpleStringSchema()));outMap.print();env.execute();}public static String handleData(String line){try {if (line!=null&& !line.equals("")){Gson gson = new GsonBuilder().setLenient().setDateFormat("yyyy-MM-dd_HH:mm:ss").create();JsonReader reader = new JsonReader(new StringReader(line));Student student = gson.fromJson(reader, Student.class);int rand = ra.nextInt(10) + 1;if (rand > 8) student.setName(student.getName() + "_" + ra.nextInt(10));return gson.toJson(student);}else return "";}catch (Exception e){return "";}}
}
提示:在kafka中如果生成了一些不合格式的数据,程序会一直报错,可以参考这篇文章删除掉相应的kafka dataDir和zookeeper的znode数据,重新生成数据,运行代码。
如果这个文章对你有帮助,不要忘记 「在看」 「点赞」 「收藏」 三连啊喂!
2022年全网首发|大数据专家级技能模型与学习指南(胜天半子篇)
互联网最坏的时代可能真的来了
我在B站读大学,大数据专业
我们在学习Flink的时候,到底在学习什么?
193篇文章暴揍Flink,这个合集你需要关注一下
Flink生产环境TOP难题与优化,阿里巴巴藏经阁YYDS
Flink CDC我吃定了耶稣也留不住他!| Flink CDC线上问题小盘点
我们在学习Spark的时候,到底在学习什么?
在所有Spark模块中,我愿称SparkSQL为最强!
硬刚Hive | 4万字基础调优面试小总结
数据治理方法论和实践小百科全书
标签体系下的用户画像建设小指南
4万字长文 | ClickHouse基础&实践&调优全视角解析
【面试&个人成长】2021年过半,社招和校招的经验之谈
大数据方向另一个十年开启 |《硬刚系列》第一版完结
我写过的关于成长/面试/职场进阶的文章
当我们在学习Hive的时候在学习什么?「硬刚Hive续集」
Apache Griffin+Flink+Kafka实现流式数据质量监控实战相关推荐
- kafka处理流式数据_通过Apache Kafka集成流式传输大数据
kafka处理流式数据 从实时过滤和处理大量数据,到将日志数据和度量数据记录到不同来源的集中处理程序中,Apache Kafka越来越多地集成到各种系统和解决方案中. 使用CData Sync ,可以 ...
- 使用 Flink Hudi 构建流式数据湖
简介: 本文介绍了 Flink Hudi 通过流计算对原有基于 mini-batch 的增量计算模型的不断优化演进. 本文介绍了 Flink Hudi 通过流计算对原有基于 mini-batch 的增 ...
- flink java生成流式数据
写法比较套路,整体思路是: 定义一个需要生成的数据类型 实现SourceFunction接口的两个功能 直接使用env.addSource()传入即可 import org.apache.flink. ...
- 【译】Databricks使用Spark Streaming和Delta Lake对流式数据进行数据质量监控介绍
编译:刘佳毅,花名佳易,阿里巴巴计算平台事业部EMR团队开发工程师,目前从事大数据安全相关方面工作. 摘要: 本文主要对Databricks如何使用Spark Streaming和Delta Lake ...
- flink源码分析_Flink源码分析之深度解读流式数据写入hive
前言 前段时间我们讲解了flink1.11中如何将流式数据写入文件系统和hive [flink 1.11 使用sql将流式数据写入hive],今天我们来从源码的角度深入分析一下.以便朋友们对flink ...
- python 可视化监控平台_python可视化篇之流式数据监控的实现
preface 流式数据的监控,以下主要是从算法的呈现出发,提供一种python的实现思路 其中: 1.python是2.X版本 2.提供两种实现思路,一是基于matplotlib的animation ...
- Iceberg 在基于 Flink 的流式数据入库场景中的应用
本文以流式数据入库的场景为基础,介绍引入 Iceberg 作为落地格式和嵌入 Flink sink 的收益,并分析了当前可实现的框架及要点. 应用场景 流式数据入库,是大数据和数据湖的典型应用场景.上 ...
- 大数据Hadoop之——新一代流式数据湖平台 Apache Hudi
文章目录 一.概述 二.Hudi 架构 三.Hudi的表格式 1)Copy on Write(写时复制) 2)Merge On Read(读时合并) 3)COW vs MOR 四.元数据表(Metad ...
- Demo:基于 Flink SQL 构建流式应用
摘要:上周四在 Flink 中文社区钉钉群中直播分享了<Demo:基于 Flink SQL 构建流式应用>,直播内容偏向实战演示.这篇文章是对直播内容的一个总结,并且改善了部分内容,比如除 ...
最新文章
- 控制科学对计算机能力的要求,0811控制科学与工程基本要求.pdf
- 详解spl_autoload_register()函数
- DNN 4.x CodeSmith模板
- 细谈Ehcache页面缓存的使用
- 张平文院士:展示计算数学的魅力
- 【nodejs原理源码赏析(5)】net模块与通讯的实现
- ES5实现ES6的一些方法-call,bind,is,promise
- 【转】关于VB中Shell及ShellExecute的总结与记录
- 教你轻松计算AOE网关键路径(转)
- 3dmax导入REVIT
- 【关键词排名点击软件】网站关键词挖掘常用的五个工具
- dell pc restore 修复计算机,dell 恢复出厂系统 修复计算机选项失效
- mysql密码和权限配置
- NYOJ-57-6174问题-2013年6月29日11:58:06
- 创建Angular项目及常用命令
- java pack unpack_pack/unpack函数与二进制
- 曲子龙:相比其它诈骗,区块链ICO到底牛在哪?
- 未来的量子计算机模型,量子计算机上量子人工生命模型
- ELK整合:ElasticSearch定期删除过期数据
- 华擎主板设置来电开机_华擎主板BIOS文字说明
热门文章
- (九) 正则表达式——文本处理(用s///替换、split与join函数、列表上下文中的m//、非贪婪量词、文件更新等)
- CCAI 2017 | 中国工程院院士李德毅:L3的挑战与量产
- 《云计算架构技术与实践》连载(2):1.2 云计算的发展趋势
- iVMS-4200 Vs区别_高中和大学的这些区别虽鲜为人知,却字字有据,句句真实
- <<视觉问答>>2022:CLIP Models are Few-shot Learners: Empirical Studies on VQA and Visual Entailment
- “char”知多少。
- 嵌入式linux培训教程,嵌入式Linux开发学习之Linux文件系统学习
- 计算机化系统验证管理 360,欧盟发布2018版《计算机化系统验证指南》
- 给生命一段独处的时光
- python excel文件换题头