Spark streaming  在各种流程处理框架生态中占着举足轻重的位置, 但是不可避免地也会面对网络波动带来的数据延迟的问题,所以必须要进行增量数据的累加。 在更新Spark 应用的时候或者其他不可避免的异常宕机的时候,增量累加会带来重复消费的问题,在一些需要严格保证 exact once 的场景下, 这个时候我们就需要进行离线修复,从而保证exact once 语义, 本文将针对这个问题,提供一些常见的解决方案和处理方式。

\\

下图中展示了数据延迟的一个场景:

\\

(点击放大图像)

\\\\

在讨论解决消息乱序问题之前,需先定义时间和顺序。在流处理中,时间的概念有两个:

\\

  • Event time :Event time是事件发生的时间,经常以时间戳表示,并和数据一起发送。带时间戳的数据流有,Web服务日志、监控agent的日志、移动端日志等;\\t
  • Processing time  :Processing time是处理事件数据的服务器时间,一般是运行流处理应用的服务器时钟。\

上图中 time1,time2, time3等是我们Spark straming 拿到消息将要处理的时间, 图中方块中的数字代表这个event 产生的时间, 有可能因为网络抖动导致部分机器上的日志收集产生了延迟, 在time3的batch中包含event time 为2的日志, 特别说明一下, kafka 中的不同分区的消息也是没有顺序的。

\\

在实时处理过程中也就产生了两个问题:

\\

  • Spark streaming 从Kafka 中拉取到的一批数据,我们可能认为里面包含多个时间区间的数据\\t
  • 同一个时间的数据可能出现在多个 batch 中\

针对第一个问题, 一个 batch 中包含多个时间区间的数据, 加入我们的区间粒度是5分钟, 那么一个batch钟有可能包含 0~5 时间区间中的部分数据,  也有可能包含 5~10 时间区间中的部分数据, 这个很好处理,我们先对时间进行向下5分钟取整,然后使用取整后的时间分为多组, 然后计算出来指标,` select time, count(*) group by  取整(time),就算出来了这个batch中每个时间区间中的数据。

\\

但是对于第二个问题,就很麻烦, 图中举例, 时间区间中 2 出现在了 time2 和time3,  我们需要在两个batch中计算出2 的指标, 然后进行累计,  这个累计的过程, 你可以在内存中保存状态, 使用Spark streaming 中的 UpdateStateByKey等算子, 但是不推荐这样使用, 这样就在你的应用中引入了状态和Checkpoint机制, 还有一个方法, 就是把这个状态放在持久化存储中, 比如每次都在 Redis, 或者Hbase 中进行累计,Spark 从 Kafka 拉取日志是可以做到 至少消费一次,但是这种模式 很难保证 exact once 。

\\

假如有下面一种情形,

\\

(点击放大图像)

\\\\

就会存在这种情况, 我们对 job1 执行 Checkpoint 操作, 然后 job1 被调度执行, 从Kafka 拉取数据处理, 然后结果保存在HBase 中, 保存了一半, 机器挂了, 如果重启,recover, 这时候 job1 就会被重复执行, Kafka 中的数据就会被重复消费, HBase中的部分指标也就多加了一份,虽然我们可以使用 Spark 或者 Flink 中提供的 Watermark 功能。

\\

(点击放大图像)

\\\\

也就是维护一个窗口, 然后设置一个最大等待时间, T1 ~T4 中的数据到了最大等待时间后就会触发计算,但是这样也会有问题, 如果部分数据的延迟超过了最大等待时间,  这部分数据也就永远的丢失了。

\\

当然如果业务可以容忍, 那么使用这个功能也是可以的,每次都使用 全量覆盖操作。

\\

解决方案

\\

以上我们面临的问题是 Spark streaming + Kafka 组合可以保证at lease once ,但是很难保证 exact once, 也就是会重复消费, 我们得想办法做到去重, 计算结果 落地存储会有两种模式:

\\

  • append 增量的模式, 也就是每次都做累加\\t
  • complete的模式, 也即是保证幂等性, 每次都是覆盖, 保证没有副作用\

因为同一个时间的数据可能出现在多个 batch 中,所以我们在准实时计算中, 只能是append 模式, 上文我们已经论证过了,这种模式会出现重复消费的问题。

\\

由于机器挂了的现象是偶发的, 所以我们可以在挂掉后,  对数据进行离线修复, 也就是我们要保证有一份全量的离线数据。

\\

这份数据我们要保证是不漏不多, 而且是按照event time 时间区间分开的, 这样我们就可以针对出问题的时间区间, 加载这个时间区间的离线数据, 算出结果, 然后进行覆盖。这样就保证了数据的准确性。

\\

我们落地的数据的特点是:

\\

  • 全量的,不漏不多\\t
  • 按照定义的时间区间分片\

因为从Kafka 中拉取存储能保证不丢, 这里我们考虑如何去重, 首先我们要对消息能有一个唯一 ID, 我们使用Kafka的partition加offset作为这个消息的唯一ID, 如果存储到HBase,  这样的话在生成一个消息的时候,我们的ID就不会重复,即使你重跑很多次,HBase会自动把它去重。

\\

如果存储到 hdfs,  我们可以每行数据前面都用 ID 作为头字段, 离线处理的时候根据这个字段先进行去重处理,这样也能保证了 exact once 语义。

\\

输出流程

\\

我们看下 Spark streaming 存储到HDFS或者HBase 都会调用 saveAsHadoopDataset。

\\

\val writer = new SparkHadoopWriter(hadoopConf)\writer.open()\ Utils.tryWithSafeFinallyAndFailureCallbacks {\        while (iter.hasNext) {\          val record = iter.next()\          writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])\        }\      }(finallyBlock = writer.close())\writer.commit()

\\

这里根据你传入的 OutFormat 调用 getwriter。

\\

(点击放大图像)

\\\\

然后再 writer上调用 open write close commit 方法。

\\

这里如果是 HBase  就是调用 HBase client 的写入方法:

\\

  • 用户提交put请求后,HBase客户端会将put请求添加到本地buffer中,符合一定条件就会通过AsyncProcess异步批量提交。HBase默认设置autoflush=true,表示put请求直接会提交给服务器进行处理;用户可以设置autoflush=false,这样的话put请求会首先放到本地buffer,等到本地buffer大小超过一定阈值(默认为2M,可以通过配置文件配置)之后才会提交。很显然,后者采用group commit机制提交请求,可以极大地提升写入性能,但是因为没有保护机制,如果客户端崩溃的话会导致提交的请求丢失。\\t
  • 在提交之前,HBase会在元数据表.meta.中根据rowkey找到它们归属的region server,这个定位的过程是通过HConnection的locateRegion方法获得的。如果是批量请求的话还会把这些rowkey按照HRegionLocation分组,每个分组可以对应一次RPC请求。\\t
  • HBase会为每个HRegionLocation构造一个远程RPC请求MultiServerCallable,然后通过rpcCallerFactory.newCaller()执行调用,忽略掉失败重新提交和错误处理,客户端的提交操作到此结束。\

这里如果是 HDFS 文件写入:

\\

  • 首先根据 TaskAttemptID构造出来一个临时写入路径,构造一个文件流\\t
  • 写入临时写入路径\\t
  • commit 的时候调用 commitTask 根据目标路径是否存在, 如果已经存在就删除临时文件,报错, 如果不存在就直接 rename, 把临时文件名, 改为目标文件名, 这里主要是防止多个分区写入同一个目标文件,导致的冲突。\\t
  • 多文件分组输出\

如果有一个需求,需要把数据根据不同的key输出到不同的文件中, 上文中,我们先根据 batch 进行分组, 然后不同分组的文件输出到不同的文件,这时候就需要用到MultipleOutputFormat

\\

\ TreeMap\u0026lt;String, RecordWriter\u0026lt;K, V\u0026gt;\u0026gt; recordWriters = new TreeMap\u0026lt;String, RecordWriter\u0026lt;K, V\u0026gt;\u0026gt;();\ K actualKey = generateActualKey(key, value);\ V actualValue = generateActualValue(key, value);\ RecordWriter\u0026lt;K, V\u0026gt; rw = this.recordWriters.get(finalPath);\ if (rw == null) {\     rw = getBaseRecordWriter(myFS, myJob, finalPath, myProgressable);\     this.recordWriters.put(finalPath, rw);\ }\ rw.write(actualKey, actualValue);

\\

这里就是维护了一个TreeMap, 里面每个不同的key, 构造一个 writer,  这个writer  是getBaseRecordWriter -\u0026gt; theTextOutputFormat.getRecordWriter根据临时路径构造出一个输出流, 包装为一个  LineRecordWriter 最终的 writer就是在这个 DataOutputStream 上进行输出,

\\

上层多文件输出根据不同的key, 从treeMap上获取到不同的文件输出流, 然后进行多文件输出。

\\

这里会存在一个问题,  同一个时间的数据可能出现在多个 batch 中, 就是会产生很多小文件,HDFS 对小文件支持很差,我们需要合并小文件,但是我们也可以直接在输出的时候进行 append 操作,就直接避免了产生小文件。

\\

这里就需要改源码了。

\\

(点击放大图像)

\\\\

上面的类图可以清楚的显示类图的关系,  MultipleOutputFormat 的writer 会调用子类的 getBaseRecordWriter, 我们可以在这里改写一下,  使用我们自己的 TextOutputFormatNew 的 getRecordWriterNew 方法, 在方法里面构造输出流的时候, 如果文件已经存在,就进行 append 操作。

\\

\val fileOut: FSDataOutputStream = if (HDFSFileService.existsPath(file)) {\        println(\"appendfile\")\        fs.append(file)\      } else {\        println(\"createfile\")\        fs.create(file, progress)\      }

\\

\def getTaskOutputPath(job: JobConf, iname: String): Path = {\    val name: String = job.get(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR)\    val completePath = name + \"/\" + iname\    val path = new Path(completePath)\    path\  }

\\

把构造临时路径的方法也修改了, 强制不产生临时路径, 每次都往同一个文件中进行 append, 这样就达到了目的。

\\

小结

\\

本文提供的解决方案, 在不修改Spark 源码本身的前提下, 进行了一些必要的扩展,  其实本质上来讲, 就是我们假定异常状况是经常发生的, 我们就要面对它,就是要对输入流 kafka 中的原始数据进行唯一标识,保证可以去重,然后持久化。 对发生异常的时间区间, 进行数据重放,就像数据中用 redo 日志进行重放一样。

\\

作者介绍

\\

孙彪彪,目前在七牛云存储从事 Spark 相关工作,对 Spark 有深入的研究和实践,微信公众号:Spark技术分享。email: 1319027852@qq.com。

Spark Streaming中流式计算的困境与解决之道相关推荐

  1. 大数据学习系列----基于Spark Streaming流式计算

    2019独角兽企业重金招聘Python工程师标准>>> 个性化的需求 随着互联网知识信息指数级膨胀,个性化的需求对于用户来说越来越重要,通过推荐算法和用户点击行为的流式计算可以很简单 ...

  2. Spark Streaming 流式计算实战

    这篇文章由一次平安夜的微信分享整理而来.在Stuq 做的分享,原文内容. 业务场景 这次分享会比较实战些.具体业务场景描述: 我们每分钟会有几百万条的日志进入系统,我们希望根据日志提取出时间以及用户名 ...

  3. grafana计算不同时间的差值_大数据时代!如何基于Spark Streaming构建实时计算平台...

    随着互联网技术的迅速发展,用户对于数据处理的时效性.准确性与稳定性要求越来越高,如何构建一个稳定易用并提供齐备的监控与预警功能的实时计算平台也成了很多公司一个很大的挑战. 自2015年携程实时计算平台 ...

  4. 021 Spark Streaming

    1.简介 Spark Streaming抽象.架构与原理 StreamingContext 是 Spark Streaming 程序的入口,其指定sparkConf.确定DStream生成的间隔.设定 ...

  5. Spark Streaming高级特性在NDCG计算实践

    从storm到spark streaming,再到flink,流式计算得到长足发展, 依托于spark平台的spark streaming走出了一条自己的路,其借鉴了spark批处理架构,通过批处理方 ...

  6. 图解大数据 | 流式数据处理-Spark Streaming

    作者:韩信子@ShowMeAI 教程地址:http://www.showmeai.tech/tutorials/84 本文地址:http://www.showmeai.tech/article-det ...

  7. Spark流式计算概念

    流式计算 特点: 数据是⽆界的(unbounded) 数据是动态的 计算速度是⾮常快的 计算不⽌⼀次 计算不能终⽌ 相对应,离线计算特点: 数据是有界的(Bounded) 数据静态的 计算速度通常较慢 ...

  8. Spark Streaming 实时计算在甜橙金融监控系统中的应用、性能优化、任务监控

    1 写在前面 目前公司对实时性计算的需要及应用越来越多,本文选取了其中之一的 Spark Streaming 来介绍如何实现高吞吐量并具备容错机制的实时流应用.在甜橙金融监控系统项目中,需要对每天亿万 ...

  9. 《大数据》2015年第3期“研究”——大数据流式计算:应用特征和技术挑战

    大数据流式计算:应用特征和技术挑战 孙大为 (中国地质大学信息工程学院 北京 100083) 摘要:在大数据时代,数据的时效性日益突出,数据的流式特征更加明显,越来越多的应用场景需要部署在流式计算平台 ...

最新文章

  1. linux安装qq_Windows和Linux的比较
  2. Ubuntu中软件安装与卸载
  3. 1400协议是什么和28181区别_gb28181协议常见流程简析
  4. 基于内容的视频搜索引擎
  5. 预充电电路工作原理_变频器整流回路 为什么要预充电电路-工业支持中心-西门子中国...
  6. Unity Easy AR
  7. 笔记本Win10系统关于启动禁用触控板设置
  8. vue + echarts 实现以中国为中心的世界3D地图
  9. react 移动端 实现video的自动播放
  10. unity3d 建立3D中文立体字
  11. DPDK内存管理二:初始化
  12. 概率图模型--最大后验概率状态推理MAP
  13. 基于HTML5+CSS3小说阅读网站设计
  14. fftshift小技巧
  15. Solaris IPMP Link模式
  16. 微创新时代个人崛起的方法
  17. JDK的下载、安装与部署图文教程
  18. ubuntu无法添加PPA的解决办法
  19. java 百度鹰眼sdk,百度地图API基础操作--百度鹰眼
  20. 我的世界服务器修改物品id,【图片】求助:如何修改物品ID【minecraft吧】_百度贴吧...

热门文章

  1. PHP7 网络编程(四)signal信号【待】
  2. 10.2829(NOIP模拟修正总结)
  3. 图的存储结构(邻接矩阵)
  4. Windows PowerShell 语言快速参考
  5. 诗与远方:无题(五十四)- 曾经写给妹子的一首诗
  6. 今年四月份,发现我的文章被人全部复制了,抄到博客园了,连原文出处都没有,就算你写个参考文章也行呀
  7. Android之通过VideoView控件播放一个视频出现的问题以及我的解决办法
  8. idea 批量导入包
  9. spring基础——外部引入属性文件创建bean
  10. js并发上传文件到不同服务器,simple-uploader.js 功能强大的上传组件 - 文章教程