点击上方蓝色字体,选择“设为星标”

回复"面试"获取更多惊喜

八股文教给我,你们专心刷题和面试

Hi,我是王知无,一个大数据领域的原创作者。

放心关注我,获取更多行业的一手消息。

一. 组件及版本

本文用的组件包括以下几个,是参考了官方案例,版本可以参考github以及里面的pom文件。本文假定以下环境均已安装好。

  • JDK (1.8)

  • MySQL(version 5.6)

  • Hadoop (2.7.2)

  • Hive (version 2.4)

  • Spark (version 2.4.1)

  • Kafka (version 0.11)

  • Griffin (version 0.6.0)

  • Zookeeper (version 3.4.1)

这里有详细的配置过程和可能遇到的bug。

二. kafka数据生成脚本

由于是测试案例,我们就写一个生成数据的脚本,并且把数据写到kafka source中,真实的场景应该是源源不断写数据到kafka中的(比如flume或者其他工具),具体数据脚本和模版可以参考官方demo数据

gen-data.sh

#!/bin/bash#current time
cur_time=`date +%Y-%m-%d_%H:%M:%S`
sed s/TIME/$cur_time/ /opt/module/data/source.temp > /opt/module/data/source.tp#create data
for row in 1 2 3 4 5 6 7 8 9 10
dosed -n "${row}p" < /opt/module/data/source.tp > slinecnt=`shuf -i1-2 -n1`clr="red"if [ $cnt == 2 ]; then clr="yellow"; fised s/COLOR/$clr/ sline >> /opt/module/data/source.data
done
rm slinerm source.tp#import data
kafka-console-producer.sh --broker-list hadoop101:9092 --topic source < /opt/module/data/source.datarm source.dataecho "insert data at ${cur_time}"

streaming-data.sh

#!/bin/bash#create topics
kafka-topics.sh --create --zookeeper hadoop101:2181 --replication-factor 1 --partitions 1 --topic source
kafka-topics.sh --create --zookeeper hadoop101:2181 --replication-factor 1 --partitions 1 --topic target#every minute
set +e
while true
do/opt/module/data/gen-data.shsleep 90
done
set -e

source.temp

{"id": 1, "name": "Apple", "color": "COLOR", "time": "TIME"}
{"id": 2, "name": "Banana", "color": "COLOR", "time": "TIME"}
{"id": 3, "name": "Cherry", "color": "COLOR", "time": "TIME"}
{"id": 4, "name": "Durian", "color": "COLOR", "time": "TIME"}
{"id": 5, "name": "Lichee", "color": "COLOR", "time": "TIME"}
{"id": 6, "name": "Peach", "color": "COLOR", "time": "TIME"}
{"id": 7, "name": "Papaya", "color": "COLOR", "time": "TIME"}
{"id": 8, "name": "Lemon", "color": "COLOR", "time": "TIME"}
{"id": 9, "name": "Mango", "color": "COLOR", "time": "TIME"}
{"id": 10, "name": "Pitaya", "color": "COLOR", "time": "TIME"}

三. Flink流式处理

flink流式数据分成三个部分,读取kafka,业务处理,写入kafka

  1. 首先交代我的pom.xml引入的依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xxxx</groupId><artifactId>kafka_Flink_kafka_Test</artifactId><version>1.0-SNAPSHOT</version><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.7.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.0</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.ink.FlinkLambdaTest.FlinkToLambda</mainClass></transformer><transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>reference.conf</resource></transformer></transformers><relocations><relocation><pattern>org.codehaus.plexus.util</pattern><shadedPattern>org.shaded.plexus.util</shadedPattern><excludes><exclude>org.codehaus.plexus.util.xml.Xpp3Dom</exclude><exclude>org.codehaus.plexus.util.xml.pull.*</exclude></excludes></relocation></relocations></configuration></execution></executions></plugin></plugins></build><dependencies><!--<dependency>--><!--<groupId>org.apache.flink</groupId>--><!--<artifactId>flink-table_2.10</artifactId>--><!--<version>1.3.2</version>--><!--</dependency>--><dependency><groupId>org.json</groupId><artifactId>json</artifactId><version>20090211</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.6.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.11</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_2.11</artifactId><version>1.10.1</version></dependency></dependencies></project>
  1. 先写个bean类模版,用来接收json数据

import java.util.Date;public class Student{private int id;private String name;private String color;private Date time;public Student(){}public Student(int id, String name, String color, Date time) {this.id = id;this.name = name;this.color = color;this.time = time;}public int getId() {return id;}public void setId(int id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getColor() {return color;}public void setColor(String color) {this.color = color;}public Date getTime() {return time;}public void setTime(Date time) {this.time = time;}@Overridepublic String toString() {return "Student{" +"id=" + id +", name='" + name + '\'' +", color='" + color + '\'' +", time='" + time + '\'' +'}';}
}
  1. 读取kafka,有关读取和写入kafka的配置信息,是可以写到kafkaUtil工具类中的,我这里为了方便,就直接嵌入到代码中了,就做个测试

// 创建Flink执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 一定要设置启动检查点!!//env.enableCheckpointing(5000);//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1);// Kafka参数Properties properties = new Properties();properties.setProperty("bootstrap.servers", "hadoop101:9092");properties.setProperty("group.id", "consumer-group");properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("auto.offset.reset", "latest");String inputTopic = "source";String outputTopic = "target";// SourceFlinkKafkaConsumer010<String> consumer =new FlinkKafkaConsumer010<String>(inputTopic, new SimpleStringSchema(), properties);DataStream<String> stream = env.addSource(consumer);
  1. flink业务处理,这一块由于所处的业务不同,我只是简单demo以下,以20%的概率修改数据使之成为异常数据用于检测,这是为了模拟业务中可能对数据处理有误而发生数据质量问题。这里要特别提一下,本案例是假定flink业务处理时延忽略不计,真实场景中可能由于flink处理延迟导致target端误认为数据丢失,这一部分我还在研究他的源码,日后更新,有了解的大神,还请指点迷津。

//使用Flink算子简单处理数据// Transformations// 使用Flink算子对输入流的文本进行操作// 按空格切词、计数、分区、设置时间窗口、聚合//{"id": 1, "name": "Apple", "color": "COLOR", "time": "TIME"}DataStream<String> outMap = stream.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {return handleData(value);}});
public static String handleData(String line){try {if (line!=null&& !line.equals("")){Gson gson = new GsonBuilder().setLenient().setDateFormat("yyyy-MM-dd_HH:mm:ss").create();JsonReader reader = new JsonReader(new StringReader(line));Student student = gson.fromJson(reader, Student.class);int rand = ra.nextInt(10) + 1;if (rand > 8) student.setName(student.getName() + "_" + ra.nextInt(10));return gson.toJson(student);}else return "";}catch (Exception e){return "";}}

因为遇到了几个bug,所以这样创建gson

  1. 写入kafka,其中FlinkKafkaProducer010我们选择的构造器是(brokerList,topicId,serializationSchema)

//SinkoutMap.addSink(new FlinkKafkaProducer010<String>("hadoop101:9092","target",new SimpleStringSchema()));outMap.print();env.execute();

四. Apache Griffin配置与启动

有关griffin的streaming模式配置,就是配置dq.json和env.json

dq.json

{"name": "streaming_accu","process.type": "streaming","data.sources": [{"name": "src","baseline": true,"connector": {"type": "kafka","version": "0.10","config": {"kafka.config": {"bootstrap.servers": "hadoop101:9092","group.id": "griffin","auto.offset.reset": "largest","auto.commit.enable": "false"},"topics": "source_1","key.type": "java.lang.String","value.type": "java.lang.String"},"pre.proc": [{"dsl.type": "df-opr","rule": "from_json"}]},"checkpoint": {"type": "json","file.path": "hdfs://hadoop101:9000/griffin/streaming/dump/source","info.path": "source_1","ready.time.interval": "10s","ready.time.delay": "0","time.range": ["-5m", "0"],"updatable": true}}, {"name": "tgt","connector": {"type": "kafka","version": "0.10","config": {"kafka.config": {"bootstrap.servers": "hadoop101:9092","group.id": "griffin","auto.offset.reset": "largest","auto.commit.enable": "false"},"topics": "target_1","key.type": "java.lang.String","value.type": "java.lang.String"},"pre.proc": [{"dsl.type": "df-opr","rule": "from_json"}]},"checkpoint": {"type": "json","file.path": "hdfs://hadoop101:9000/griffin/streaming/dump/target","info.path": "target_1","ready.time.interval": "10s","ready.time.delay": "0","time.range": ["-1m", "0"]}}],"evaluate.rule": {"rules": [{"dsl.type": "griffin-dsl","dq.type": "accuracy","out.dataframe.name": "accu","rule": "src.login_id = tgt.login_id AND src.bussiness_id = tgt.bussiness_id AND src.event_id = tgt.event_id","details": {"source": "src","target": "tgt","miss": "miss_count","total": "total_count","matched": "matched_count"},"out":[{"type":"metric","name": "accu"},{"type":"record","name": "missRecords"}]}]},"sinks": ["HdfsSink"]
}

env.json

{"spark": {"log.level": "WARN","checkpoint.dir": "hdfs://hadoop101:9000/griffin/checkpoint","batch.interval": "20s","process.interval": "1m","init.clear": true,"config": {"spark.default.parallelism": 4,"spark.task.maxFailures": 5,"spark.streaming.kafkaMaxRatePerPartition": 1000,"spark.streaming.concurrentJobs": 4,"spark.yarn.maxAppAttempts": 5,"spark.yarn.am.attemptFailuresValidityInterval": "1h","spark.yarn.max.executor.failures": 120,"spark.yarn.executor.failuresValidityInterval": "1h","spark.hadoop.fs.hdfs.impl.disable.cache": true}},"sinks": [{"name":"ConsoleSink","type": "console"},{"name":"HdfsSink","type": "hdfs","config": {"path": "hdfs://hadoop101:9000/griffin/persist"}},{"name":"ElasticsearchSink","type": "elasticsearch","config": {"method": "post","api": "http://hadoop101:9200/griffin/accuracy"}}],"griffin.checkpoint": [{"type": "zk","config": {"hosts": "hadoop101:2181","namespace": "griffin/infocache","lock.path": "lock","mode": "persist","init.clear": true,"close.clear": false}}]
}

最后把项目提交到spark上运行,检测数据

spark-submit --class org.apache.griffin.measure.Application --master yarn --deploy-mode client --queue default \
--driver-memory 1g --executor-memory 1g --num-executors 3 \
<path>/griffin-measure.jar \
<path>/env.json <path>/dq.json

五. 全局代码

在本地创建个maven项目,由于这是个简单的测试项目,自己构建就好,我只写了两个类做测试

Student.class

import java.util.Date;public class Student{private int id;private String name;private String color;private Date time;public Student(){}public Student(int id, String name, String color, Date time) {this.id = id;this.name = name;this.color = color;this.time = time;}public int getId() {return id;}public void setId(int id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getColor() {return color;}public void setColor(String color) {this.color = color;}public Date getTime() {return time;}public void setTime(Date time) {this.time = time;}@Overridepublic String toString() {return "Student{" +"id=" + id +", name='" + name + '\'' +", color='" + color + '\'' +", time='" + time + '\'' +'}';}
}

flinkProcess.class

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.stream.JsonReader;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;import java.io.StringReader;
import java.util.Properties;
import java.util.Random;public class flinkProcess {public static Random ra = new Random();public static void main(String[] args) throws Exception {// 创建Flink执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 一定要设置启动检查点!!//env.enableCheckpointing(5000);//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1);// Kafka参数Properties properties = new Properties();properties.setProperty("bootstrap.servers", "hadoop101:9092");properties.setProperty("group.id", "consumer-group");properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("auto.offset.reset", "latest");String inputTopic = "source";String outputTopic = "target";// SourceFlinkKafkaConsumer010<String> consumer =new FlinkKafkaConsumer010<String>(inputTopic, new SimpleStringSchema(), properties);DataStream<String> stream = env.addSource(consumer);//使用Flink算子简单处理数据// Transformations// 使用Flink算子对输入流的文本进行操作// 按空格切词、计数、分区、设置时间窗口、聚合//{"id": 1, "name": "Apple", "color": "COLOR", "time": "TIME"}DataStream<String> outMap = stream.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {return handleData(value);}});//SinkoutMap.addSink(new FlinkKafkaProducer010<String>("hadoop101:9092","target",new SimpleStringSchema()));outMap.print();env.execute();}public static String handleData(String line){try {if (line!=null&& !line.equals("")){Gson gson = new GsonBuilder().setLenient().setDateFormat("yyyy-MM-dd_HH:mm:ss").create();JsonReader reader = new JsonReader(new StringReader(line));Student student = gson.fromJson(reader, Student.class);int rand = ra.nextInt(10) + 1;if (rand > 8) student.setName(student.getName() + "_" + ra.nextInt(10));return gson.toJson(student);}else return "";}catch (Exception e){return "";}}
}

提示:在kafka中如果生成了一些不合格式的数据,程序会一直报错,可以参考这篇文章删除掉相应的kafka dataDir和zookeeper的znode数据,重新生成数据,运行代码。

如果这个文章对你有帮助,不要忘记 「在看」 「点赞」 「收藏」 三连啊喂!

2022年全网首发|大数据专家级技能模型与学习指南(胜天半子篇)

互联网最坏的时代可能真的来了

我在B站读大学,大数据专业

我们在学习Flink的时候,到底在学习什么?

193篇文章暴揍Flink,这个合集你需要关注一下

Flink生产环境TOP难题与优化,阿里巴巴藏经阁YYDS

Flink CDC我吃定了耶稣也留不住他!| Flink CDC线上问题小盘点

我们在学习Spark的时候,到底在学习什么?

在所有Spark模块中,我愿称SparkSQL为最强!

硬刚Hive | 4万字基础调优面试小总结

数据治理方法论和实践小百科全书

标签体系下的用户画像建设小指南

4万字长文 | ClickHouse基础&实践&调优全视角解析

【面试&个人成长】2021年过半,社招和校招的经验之谈

大数据方向另一个十年开启 |《硬刚系列》第一版完结

我写过的关于成长/面试/职场进阶的文章

当我们在学习Hive的时候在学习什么?「硬刚Hive续集」

Apache Griffin+Flink+Kafka实现流式数据质量监控实战相关推荐

  1. kafka处理流式数据_通过Apache Kafka集成流式传输大数据

    kafka处理流式数据 从实时过滤和处理大量数据,到将日志数据和度量数据记录到不同来源的集中处理程序中,Apache Kafka越来越多地集成到各种系统和解决方案中. 使用CData Sync ,可以 ...

  2. 使用 Flink Hudi 构建流式数据湖

    简介: 本文介绍了 Flink Hudi 通过流计算对原有基于 mini-batch 的增量计算模型的不断优化演进. 本文介绍了 Flink Hudi 通过流计算对原有基于 mini-batch 的增 ...

  3. flink java生成流式数据

    写法比较套路,整体思路是: 定义一个需要生成的数据类型 实现SourceFunction接口的两个功能 直接使用env.addSource()传入即可 import org.apache.flink. ...

  4. 【译】Databricks使用Spark Streaming和Delta Lake对流式数据进行数据质量监控介绍

    编译:刘佳毅,花名佳易,阿里巴巴计算平台事业部EMR团队开发工程师,目前从事大数据安全相关方面工作. 摘要: 本文主要对Databricks如何使用Spark Streaming和Delta Lake ...

  5. flink源码分析_Flink源码分析之深度解读流式数据写入hive

    前言 前段时间我们讲解了flink1.11中如何将流式数据写入文件系统和hive [flink 1.11 使用sql将流式数据写入hive],今天我们来从源码的角度深入分析一下.以便朋友们对flink ...

  6. python 可视化监控平台_python可视化篇之流式数据监控的实现

    preface 流式数据的监控,以下主要是从算法的呈现出发,提供一种python的实现思路 其中: 1.python是2.X版本 2.提供两种实现思路,一是基于matplotlib的animation ...

  7. Iceberg 在基于 Flink 的流式数据入库场景中的应用

    本文以流式数据入库的场景为基础,介绍引入 Iceberg 作为落地格式和嵌入 Flink sink 的收益,并分析了当前可实现的框架及要点. 应用场景 流式数据入库,是大数据和数据湖的典型应用场景.上 ...

  8. 大数据Hadoop之——新一代流式数据湖平台 Apache Hudi

    文章目录 一.概述 二.Hudi 架构 三.Hudi的表格式 1)Copy on Write(写时复制) 2)Merge On Read(读时合并) 3)COW vs MOR 四.元数据表(Metad ...

  9. Demo:基于 Flink SQL 构建流式应用

    摘要:上周四在 Flink 中文社区钉钉群中直播分享了<Demo:基于 Flink SQL 构建流式应用>,直播内容偏向实战演示.这篇文章是对直播内容的一个总结,并且改善了部分内容,比如除 ...

最新文章

  1. 控制科学对计算机能力的要求,0811控制科学与工程基本要求.pdf
  2. 详解spl_autoload_register()函数
  3. DNN 4.x CodeSmith模板
  4. 细谈Ehcache页面缓存的使用
  5. 张平文院士:展示计算数学的魅力
  6. 【nodejs原理源码赏析(5)】net模块与通讯的实现
  7. ES5实现ES6的一些方法-call,bind,is,promise
  8. 【转】关于VB中Shell及ShellExecute的总结与记录
  9. 教你轻松计算AOE网关键路径(转)
  10. 3dmax导入REVIT
  11. 【关键词排名点击软件】网站关键词挖掘常用的五个工具
  12. dell pc restore 修复计算机,dell 恢复出厂系统 修复计算机选项失效
  13. mysql密码和权限配置
  14. NYOJ-57-6174问题-2013年6月29日11:58:06
  15. 创建Angular项目及常用命令
  16. java pack unpack_pack/unpack函数与二进制
  17. 曲子龙:相比其它诈骗,区块链ICO到底牛在哪?
  18. 未来的量子计算机模型,量子计算机上量子人工生命模型
  19. ELK整合:ElasticSearch定期删除过期数据
  20. 华擎主板设置来电开机_华擎主板BIOS文字说明

热门文章

  1. (九) 正则表达式——文本处理(用s///替换、split与join函数、列表上下文中的m//、非贪婪量词、文件更新等)
  2. CCAI 2017 | 中国工程院院士李德毅:L3的挑战与量产
  3. 《云计算架构技术与实践》连载(2):1.2 云计算的发展趋势
  4. iVMS-4200 Vs区别_高中和大学的这些区别虽鲜为人知,却字字有据,句句真实
  5. <<视觉问答>>2022:CLIP Models are Few-shot Learners: Empirical Studies on VQA and Visual Entailment
  6. “char”知多少。
  7. 嵌入式linux培训教程,嵌入式Linux开发学习之Linux文件系统学习
  8. 计算机化系统验证管理 360,欧盟发布2018版《计算机化系统验证指南》
  9. 给生命一段独处的时光
  10. python excel文件换题头