实时处理(流处理)

结论

Spark和Flink的数据源最好都是Kafka等消息队列,这样才能更好的保证Exactly-Once(精准一次);

作为流处理框架,Flink是当前最优秀的实时处理框架,并处于飞速发展的状态中;

Spark社区活跃度高,生态圈庞大,Spark-Streaming技术成熟稳定,且Spark是批处理框架中使用最为广泛的框架,如果需要批处理的情况下,批处理和流处理都是用Spark,可以大大减少框架的学习成本,并且不需要不同的框架,因此在没有特殊情况时,推荐使用Spark-Streaming作为流处理的框架。

Spark框架可以作用于几乎所有大数据的任务。

如果需要完美的话,以下一些情况推荐使用Flink:

1、对数据要求是毫秒级的实时处理场景;

2、必须按照事件时间语义的流处理场景;

数据源(实时数据)

Spark:自定义数据输入,最好是Kafka等消息队列;

Flink:自定义数据输入,最好是Kafka等消息队列;

计算引擎

Spark:

Spark Streaming,通过Java或者scala编写计算代码,属于微批次处理,运行的时候需要指定批处理的时间,每次运行 job 时处理一个批次的数据,实时性还不够;

实际上每个job都可以理解为一个Spark-core任务,每次处理一个job都是先缓存数据,然后进行批处理的;

Spark Streaming 只支持处理时间,Structured streaming(Spark的结构化微批次处理框架,还停留在beta阶段,因此官方声明,仅供用户学习、实验和测试) 支持处理时间和事件时间,同时支持 watermark 机制处理滞后数据;因此Streaming本身没办法处理事件时间语义的数据,且对无法针对乱序数据和滞后进行处理;

SparkStreming通过checkPoint机制与Kafka保证数据输入的Exactly-Once,数据写出的Exactly-Once处理需要自定义;

SparkStreaming只支持基于时间的窗口操作(处理时间或者事件时间);

SparkStreaming社区活跃度高,技术成熟稳定。

Flink

Flink ,通过Java或者scala编写计算代码,是基于事件驱动的,事件可以理解为消息。事件驱动的应用程序是一种状态应用程序,它会从一个或者多个流中注入事件,通过触发计算更新状态,或外部动作对注入的事件作出反应;

事件数据会在整个Flink任务中流动处理,一个处理完成之后立马处理下一个;

Flink 支持三种时间机制:事件时间,注入时间,处理时间,同时支持 watermark 机制处理滞后数据;支持对乱序数据和滞后数据进行处理;

Flink通过checkPoint机制与Kafka保证数据输入的Exactly-Once,也可以Kafka作为输出就可以保证输出的Exactly-Once;

Flink支持的窗口操作非常灵活,不仅支持时间窗口,还支持基于数据本身的窗口(另外还支持基于time、count、session,以及data-driven的窗口操作),可以自由定义想要的窗口操作;

Flink的每个任务都会有对应的webUI查看,并且可以通过webUI对应的api接口,在我们自己的程序中获取到对应任务的一些信息,非常方便对任务的管理。

Flink处于高速发展阶段,社区活跃度不如SparkStreaming。

数据输出

Spark:Spark-Streaming可以选择输出到文件或者自定义数据输出,通过转换成DataFrame数据结构可以很方便的将数据输出到数据库或是其他结构化文件中

Flink:支持输出到文件、Redis、Netty、Kafka等,也可以自定义数据输出。

执行效率

Spark:基于内存的计算,属于微批次低延迟的模拟流处理;

Flink:基于内存的计算,真正的流处理。

任务提交

Spark

Spark的提交方法jar包+Shell命令、jar包+Spark-launcher、ssh命令

jar包+Shell命令、jar包+Spark-launcher方法均需要依赖小程序,后者通过launcher能够更好的对Spark任务进行管理;

ssh命令直接提交方法不依赖小程序,但是对于任务的管理更加不方便;

Spark 提供了以Spark-launcher 作为JAVA API编程的方式提交,这种方式不需要使用命令行,能够实现与Spring整合,让应用在tomcat中运行;Spark-launcher值支持将任务运行,日志分开输出,便于问题的回溯,可以自定义监听器,当信息或者状态变更时,进行相关操作,支持暂停、停止、断连、获得AppId、获得任务State等多种功能。

可以通过第三方组件对Spark-Streaming任务进行优雅退出。

Flink

Flink的提交方法包括jar包+Shell命令、jar包+Flink-Yarn、ssh命令

ar包+Shell命令、jar包+Spark-launcher方法均需要依赖小程序,后者通过launcher能够更好的对Spark任务进行管理;

ssh命令直接提交方法不依赖小程序,但是对于任务的管理更加不方便;

Flink-Yarn方式可以通过JAVA API远程进行任务的提交,不需要依赖小程序和命令行,能够实现与Spring整合,让应用在tomcat中运行;相比通过命令行方式,能够直接通过api获取到webUI地址以及applicationId,可以更加方便的进行任务的管理。

可以通过yarn方式,或者webUI的api接口来退出Flink任务。

实时处理

测试环境

Kafka作为数据源,数据结果均输出到数据库中。

Hadoop的Yarn作为任务运行的资源调度管理器(spark on yarn 和 flink on yarn)。

数据获取与采集

实时数据可以通过以下方式进行采集:

1、可以通过Flume监控日志文件的方式,然后数据既传给HDFS进行保存,又将数据发送给Kafka;

2、通过Kafka消息队列进行数据的获取,Flume作为Kafka的其中一个消费者将数据保存到HDFS保存;

本次实验只涉及Kafka的部分,Kafka的topic为streamTest,然后以控制台输入作为Kafka的数据生产者

## 创建topic
kafka-topics.sh --zookeeper hadoop113:2181 --create --topic streamTest --replication-factor 2 --partitions 2## 控制台生产者
kafka-console-producer.sh --broker-list localhost:9092 --topic streamTest

job提交与运行

Spark-Streaming

object WordCountByStream {//初始化连接池var dataSource: DataSource = init()def main(args: Array[String]): Unit = {// 创建 SparkConfval sparkConf: SparkConf = new SparkConf().setAppName("ReceiverWordCount").setMaster("yarn")//创建 StreamingContextval ssc = new StreamingContext(sparkConf, Seconds(3))// 定义 Kafka 参数val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop113:9092,hadoop114:9092,hadoop115:9092",ConsumerConfig.GROUP_ID_CONFIG -> "sparkStreamTestGroup",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest")// 读取 Kafka 数据创建 DStreamval kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("streamTest"), kafkaPara))val wordCountDS: DStream[(String, Long)] = kafkaDStream.flatMap(kafkaData => {val data = kafkaData.value()val datas: Array[String] = data.split(" ")datas.map(str => str -> 1L)}).reduceByKey(_ + _)// 输出wordCountDS.foreachRDD(rdd => {rdd.foreachPartition(iter => {// 此处是放入数据库中,可以在这里设置放到其他地方,如redis等val conn = getConnectionval pstat = conn.prepareStatement("""| insert into spark_streaming_test| (word, cnt)| values| (?, ?)| on duplicate key| update cnt = cnt + ?|""".stripMargin)iter.foreach {case (word, count) => {println(s"$word, $count")pstat.setString(1, word)pstat.setLong(2, count)pstat.setLong(3, count)pstat.executeUpdate()}}pstat.close()conn.close()})})// 开启任务ssc.start()// 优雅退出stopHandle(ssc)// 这个是批量处理的退出,用Ctrl+C来退出//  ssc.awaitTermination()}def stopHandle(ssc: StreamingContext): Unit = {// 优雅的关闭// 计算节点不再接受新的数据,而是将现有的数据处理完毕,然后关闭// mysql、redis、zk、hdfs等var needStop = false;while (true) {// 判断是否需要关闭if (needStop) {if (ssc.getState() == StreamingContextState.ACTIVE) {ssc.stop(true, true)System.exit(0)}}Thread.sleep(10000)needStop = true}}//初始化连接池方法def init(): DataSource = {val properties = new Properties()properties.setProperty("driverClassName", "com.mysql.jdbc.Driver")properties.setProperty("url", "jdbc:mysql://10.10.10.38:13306/stream_test?useUnicode=true&characterEncoding=UTF-8")properties.setProperty("username", "root")properties.setProperty("password", "123456")properties.setProperty("maxActive", "50")DruidDataSourceFactory.createDataSource(properties)}//获取 MySQL 连接def getConnection: Connection = {dataSource.getConnection}
}

那么可以和Spark的方法完全一样,将以上代码达成jar包,然后提交运行即可。

/opt/module/spark-yarn/bin/spark-submit --class com.starnet.server.bigdata.spark.stream.wordcount.WordCountByStream  --master yarn  --deploy-mode cluster /home/bd/SPARK/spark-test-1.0.0.jar

或者是通过Spark-launcher的方式进行提交。

在Yarn上运行时,可以提前将所使用的jar包全部上传到HDFS然后进行jar路径的配置,可以使每次任务提交时不用重新上次依赖文件,Flink也是如此。

Flink

public class WordCount {public static void main(String[] args) throws Exception {// 创建流处理执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建kafka配置对象Properties properties = new Properties();properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop113:9092,hadoop114:9092,hadoop115:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flinkTestGroup");properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");// 从Kafka读取数据DataStreamSource<String> inputDataStream = env.addSource(new FlinkKafkaConsumer010<String>("streamTest", new SimpleStringSchema(), properties));// 基于数据流记性转换计算DataStream<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] fields = s.split(" ");for (String  field: fields) {collector.collect(new Tuple2<String, Integer>(field, 1));}}}).keyBy(0).sum(1);resultStream.addSink(new MyJdbcSink());// 执行任务env.execute();}public static class MyJdbcSink extends RichSinkFunction<Tuple2<String, Integer>> {Connection connection = null;PreparedStatement updateStmt = null;@Overridepublic void open(Configuration parameters) throws Exception {connection = DriverManager.getConnection("jdbc:mysql://10.10.10.38:13306/stream_test","root","123456");updateStmt = connection.prepareStatement("insert into flink_test (word, cnt) " +"values " +"(?, ?) " +"on duplicate key " +"update cnt = cnt + ?");}// 每次更新数据时,调用连接执行sql@Overridepublic void invoke(Tuple2<String, Integer> value, Context context) throws Exception {System.out.println(value.toString());updateStmt.setString(1, value.f0);updateStmt.setLong(2, value.f1);updateStmt.setLong(3, value.f1);updateStmt.execute();}@Overridepublic void close() throws Exception {connection.close();updateStmt.close();}}
}

Flink Per Job

那么可以将以上代码打成jar包,然后提交运行的方式进行处理。

bin/flink run –m yarn-cluster -c com.starnet.server.bigdata.flink.WordCount /home/bd/FLINK/FlinkTest-1.0-SNAPSHOT-jar-with-dependencies.jar

通过jar提交成功之后,控制台输出如下:

2021-09-26 09:51:58,784 INFO  org.apache.flink.yarn.YarnClusterDescriptor                   - YARN application has been deployed successfully.
2021-09-26 09:51:58,785 INFO  org.apache.flink.yarn.YarnClusterDescriptor                   - The Flink YARN session cluster has been started in detached mode. In order to stop Flink gracefully, use the following command:
$ echo "stop" | ./bin/yarn-session.sh -id application_1631184398602_0052
If this should not be possible, then you can also kill Flink via YARN's web interface or via:
$ yarn application -kill application_1631184398602_0052
Note that killing Flink might not clean up all job artifacts and temporary files.
2021-09-26 09:51:58,785 INFO  org.apache.flink.yarn.YarnClusterDescriptor                   - Found Web Interface hadoop114:39315 of application 'application_1631184398602_0052'.
Job has been submitted with JobID 83ea15f02379ee196c730d6919f93243

其中可以通过以下两个命令进行任务的关闭

echo "stop" | /.../flink/.../bin/yarn-session.sh -id application_id/.../hadoop/.../yarn application -kill application_id

并且其中有hadoop114:39315为提交flink任务之后,在yarn上开启的flink集群的web地址,可以在web上进行任务的关闭或者任务信息的获取等,也可以通过Flink的客户端api进行信息的获取和任务的停止https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/rest_api/

以上信息均是在提交jar进程完成之后去进行信息的过滤和提取,提取出我们需要的信息,然后进行任务的退出。

目前Flink-on-yarn的per job模式只找到了通过jar包提交的方法。

Flink Application

对于per job模式,jar包的解析、生成JobGraph是在客户端上执行的,然后将生成的jobgraph提交到集群。如果任务特别多的话,那么这些生成JobGraph、提交到集群的操作都会在实时平台所在的机器上执行,那么将会给服务器造成很大的压力。

此外这种模式提交任务的时候会把本地flink的所有jar包先上传到hdfs上相应 的临时目录,这个也会带来大量的网络的开销,所以如果任务特别多的情况下,平台的吞吐量将会直线下降。

所以针对flink per job模式的一些问题,flink 引入了一个新的部署模式–Application模式。 目前 Application 模式支持 Yarn 和 K8s 的部署方式,Yarn Application 模式会在客户端将运行任务需要的依赖都上传到 Flink Master,然后在 Master 端进行任务的提交。

此外,还支持远程的用户jar包来提交任务,比如可以将jar放到hdfs上,进一步减少上传jar所需的时间,从而减少部署作业的时间。

提交方法如下:

jar包提交

那么可以将以上代码打成jar包,然后提交运行的方式进行处理。

bin/flink run -yd -m yarn-cluster -c com.starnet.server.bigdata.flink.WordCount /home/bd/FLINK/FlinkTest-1.0-SNAPSHOT-jar-with-dependencies.jar/opt/module/flink-1.11.4/bin/flink run -yd -m yarn-cluster -c com.starnet.server.bigdata.flink.WordCount -yD yarn.provided.lib.dirs="hdfs://hadoop113:8020/jar/flink11/libs" /home/bd/FLINK/Flink1.11-1.0-SNAPSHOT-jar-with-dependencies.jar

通过jar提交成功之后,控制台输出如下:

2021-09-26 18:00:54,129 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
2021-09-26 18:00:54,529 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface hadoop115:35553 of application 'application_1631184398602_0055'.

优雅退出和web访问均与以上相同。

java-api方式提交

提交相关代码如下:

public void crateStreamTaskByFlinkClient() {//flink的本地配置目录,为了得到flink的配置// 如果出现org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.错误// 则在flink-config.yaml加入// classloader.resolve-order: parent-firstString configurationDirectory = "/opt/module/flink-1.11.4/conf";// String configurationDirectory = "/home/lxj/workspace/Olt-Test/bigdata/bigdataserver/src/main/resources/flink/conf";//存放flink集群相关的jar包目录String flinkLibs = "hdfs://hadoop113:8020/jar/flink11/libs";//用户jarString userJarPath = "hdfs://hadoop113:8020/jar/userTask/Flink1.11-1.0-SNAPSHOT-jar-with-dependencies.jar";String flinkDistJar = "hdfs://hadoop113:8020/jar/flink11/libs/flink-dist_2.12-1.11.4.jar";YarnClient yarnClient = YarnClient.createYarnClient();YarnConfiguration yarnConfiguration = new YarnConfiguration();yarnClient.init(yarnConfiguration);yarnClient.start();// 设置日志的,没有的话看不到日志YarnClusterInformationRetriever clusterInformationRetriever = YarnClientYarnClusterInformationRetriever.create(yarnClient);//获取flink的配置Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(configurationDirectory);flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);flinkConfiguration.set(PipelineOptions.JARS,Collections.singletonList(userJarPath));Path remoteLib = new Path(flinkLibs);flinkConfiguration.set(YarnConfigOptions.PROVIDED_LIB_DIRS,Collections.singletonList(remoteLib.toString()));flinkConfiguration.set(YarnConfigOptions.FLINK_DIST_JAR,flinkDistJar);// 设置为application模式flinkConfiguration.set(DeploymentOptions.TARGET,YarnDeploymentTarget.APPLICATION.getName());// yarn application nameflinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "flink-application");YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, configurationDirectory);ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();// 设置用户jar的参数和主类ApplicationConfiguration appConfig = new ApplicationConfiguration(new String[] {"test"}, "com.starnet.server.bigdata.flink.WordCount");YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(flinkConfiguration,yarnConfiguration,yarnClient,clusterInformationRetriever,true);try {ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(clusterSpecification,appConfig);ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();ApplicationId applicationId = clusterClient.getClusterId();String webInterfaceURL = clusterClient.getWebInterfaceURL();log.error("applicationId is {}", applicationId);log.error("webInterfaceURL is {}", webInterfaceURL);// 退出// yarnClusterDescriptor.killCluster(applicationId);} catch (Exception e){log.error(e.getMessage(), e);}
}

此方法可以远程提交Flink任务到yarn上运行,并且可以通过javaApi获取到提交之后的applicationId和web地址,以及任务的退出,整体可以不依赖小程序,非常的方便。

结果获取

以上结果最终都是保存如数据库了,因此本次实验的流式处理结果均以查询数据库的方式获取结果。

大数据框架调研-流处理-Spark与Flink相关推荐

  1. 大数据框架复习-flink

    大数据框架复习-flink flink的简单介绍 Flink 是一个面向流处理和批处理的分布式数据计算引擎,能够基于同一个 Flink 运行,可以提供流处理和批处理两种类型的功能. 在 Flink 的 ...

  2. 大数据框架对比:Hadoop、Storm、Samza、Spark和Flink——flink支持SQL,待看

    简介 大数据是收集.整理.处理大容量数据集,并从中获得见解所需的非传统战略和技术的总称.虽然处理数据所需的计算能力或存储容量早已超过一台计算机的上限,但这种计算类型的普遍性.规模,以及价值在最近几年才 ...

  3. 大数据准实时流式系统设计(一)——基于大数据框架设计

    前段时间负责了公司一个新的项目,项目不属于直接面向用户的线上实时响应系统,要求做到尽快毫秒级或者秒级响应的准实时系统.结合以前学习的一些大数据理论方面和参与的准实时系统方面的经验,对准实时系统架构设计 ...

  4. 排名前6位的最流行的大数据框架,你在用哪一款?

    介绍大数据框架 市场上有许多可用的框架.其中一些更受欢迎,例如Spark,Hadoop,Hive和Storm.Presto在效用指数上得分很高,而Flink具有巨大的潜力.另外还有一些需要提及的其他内 ...

  5. 玩转人工智能(3)常用的大数据框架简单介绍

    时光不老,我们不散. 讲大数据框架前,简单的介绍下大数据的文化.信息时代人类社会的进步得益于分享和开源.大数据时代属于信息时代的第三代发展阶段(2001年到2011年可以认为是CT行业的黄金期,200 ...

  6. 大数据各组件理论性总结---spark和hadoop(将持续更新)

    Hadoop和spark的起源 Hadoop起源 1998年9月4日,Google公司在美国硅谷成立.正如大家所知,它是一家做搜索引擎起家的公司 无独有偶,一位名叫Doug Cutting的美国工程师 ...

  7. 什么是大数据?常用的大数据框架

    1.什么是大数据? 在互联网技术发展到现今阶段,大量日常.工作等事务产生的数据都已经信息化,人类产生的数据量相比以前有了爆炸式的增长,以前的传统的数据处理技术已经无法胜任,需求催生技术,一套用来处理海 ...

  8. 五种大数据框架你必须要知道

    学习大数据不可不知的五种大数据框架,码笔记分享大数据框架Hadoop.Storm.Samza.Spark和Flink五种大数据框架详解: 一:Hadoop大数据框架 Hadoop 大数据框架?第一映入 ...

  9. 2018年新春报喜!热烈祝贺王家林大咖大数据经典传奇著作《SPARK大数据商业实战三部曲》 畅销书籍 清华大学出版社发行上市!

    2018年新春报喜!热烈祝贺王家林大咖大数据经典传奇著作<SPARK大数据商业实战三部曲>畅销书籍 清华大学出版社发行上市! 本书基于Spark 2.2.0新版本,以Spark商业案例实战 ...

最新文章

  1. keil5建立多文件的时候为什么总是出错
  2. ASP.Net中怎样获得存储过程传出的参数。
  3. bzoj 1124 [POI2008]枪战Maf 贪心
  4. Flex DES加密
  5. 使用AspectJ注解技术实现AOP功能
  6. 浅析MSIL中间语言——基础篇
  7. Android/Linux性能分析工具推荐
  8. vue 封装dialog_GitHub - 1014156094/vue-mobile-dialog: Vue移动端基础组件 - 对话框
  9. LINQ TO SQL 动态查询
  10. Mac OS 加入域
  11. 编程机器人考级证书有用吗_机器人编程有等级考试吗?
  12. linux 下载ftp 命令,在Linux命令行中安装及使用FTP客户端的方法
  13. 【CSDN软件工程师能力认证学习精选】Web前端经典面试试题及答案-史上最全前端面试题(含答案)
  14. 25岁,想转行到 IT,应怎样入门?
  15. LiveNVR监控流媒体Onvif/RTSP功能支持海康摄像头通过海康SDK的方式接入直播观看录像回看预置位操作
  16. 估计标准误差syx_相关系数与估计标准误差的关系
  17. FileInputStream和BufferedInputStream的比较
  18. 风景照片的PS后期处理(3)
  19. tomcat上面出现红色的×
  20. ECSHOP通过改变模板路径制作手机站

热门文章

  1. 计算机病毒有哪些名称和时间,计算机病毒名称,请举出所有您知道的生物病毒的名称,越多越好。...
  2. python基础训练 day1
  3. Java数据结构之234树
  4. 关于机器学习工程的最佳实践
  5. ValueError: cannot set a row with mismatched columns Pandas报错,超级折磨人
  6. 3.BI可视化编辑器之右击菜单的“置顶、置底“实现
  7. java c 网络_Socket网络通讯开发总结之:Java 与 C进行Socket通讯(转)
  8. 高可用 Prometheus:问题集锦
  9. 人工智能洗稿-免费自媒体洗稿工具
  10. 阿里云点播Hls标准加密简单搭建过程