大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

暴走大数据

点击右侧关注,暴走大数据!

在Kafka、Storm、Flink、Spark Streaming等分布式流处理系统中(没错,Kafka本质上是流处理系统,不是单纯的“消息队列”),存在三种消息传递语义(message delivery semantics),分别是:

  • at least once:每条消息会被收到1次或多次。例如发送方S在超时时间内没有收到接收方R的通知(如ack),或者收到了R的报错,就会不断重发消息直至R传回ack。

  • at most once:每条消息会被收到0次或1次。也就是说S只负责向R发送消息,R也没有任何通知机制。无论R最终是否收到,S都不会重发。

  • exactly once:是上面两个的综合,保证S发送的每一条消息,R都会“不重不漏”地恰好收到1次。它是最强最精确的语义,也最难实现。

在我们的日常工作中,90%的流处理业务都是通过Kafka+Spark Streaming+HDFS来实现的(这里Kafka的作用是消息队列了)。本篇探讨保证exactly once语义的方法。

如上面的图所示,一个Spark Streaming程序由三步组成:输入、处理逻辑、输出。要达到exactly once的理想状态,需要三步协同进行,而不是只与处理逻辑有关。Kafka与Spark Streaming集成时有两种方法:旧的基于receiver的方法,新的基于direct stream的方法。下面两张图可以清楚地说明。

  • 基于receiver的方法

基于receiver的方法采用Kafka的高级消费者API,每个executor进程都不断拉取消息,并同时保存在executor内存与HDFS上的预写日志(write-ahead log/WAL)。当消息写入WAL后,自动更新ZooKeeper中的offset。

它可以保证at least once语义,但无法保证exactly once语义。虽然引入了WAL来确保消息不会丢失,但还有可能会出现消息已经写入WAL,但offset更新失败的情况,Kafka就会按上一次的offset重新发送消息。这种方式还会造成数据冗余(Kafka broker中一份,Spark executor中一份),使吞吐量和内存利用率降低。现在基本都使用下面基于direct stream的方法了。

  • 基于direct stream的方法

基于direct stream的方法采用Kafka的简单消费者API,它的流程大大简化了。executor不再从Kafka中连续读取消息,也消除了receiver和WAL。还有一个改进就是Kafka分区与RDD分区是一一对应的,更可控。

driver进程只需要每次从Kafka获得批次消息的offset range,然后executor进程根据offset range去读取该批次对应的消息即可。由于offset在Kafka中能唯一确定一条消息,且在外部只能被Streaming程序本身感知到,因此消除了不一致性,达到了exactly once。

不过,由于它采用了简单消费者API,我们就需要自己来管理offset。否则一旦程序崩溃,整个流只能从earliest或者latest点恢复,这肯定是不稳妥的。offset管理在之前的文章中提到过,这里不再赘述。

Kafka作为输入源可以保证exactly once,那么处理逻辑呢?答案是显然的,Spark Streaming的处理逻辑天生具备exactly once语义。

Spark RDD之所以被称为“弹性分布式数据集”,是因为它具有不可变、可分区、可并行计算、容错的特征。一个RDD只能由稳定的数据集生成,或者从其他RDD转换(transform)得来。如果在执行RDD lineage的过程中失败,那么只要源数据不发生变化,无论重新执行多少次lineage,都一定会得到同样的、确定的结果。

最后,我们还需要保证输出过程也符合exactly once语义。Spark Streaming的输出一般是靠foreachRDD()算子来实现,它默认是at least once的。如果输出过程中途出错,那么就会重复执行直到写入成功。为了让它符合exactly once,可以施加两种限制之一:幂等性写入(idempotent write)、事务性写入(transactional write)。

  • 幂等性写入

幂等原来是数学里的概念,即f(f(x))=f(x)。幂等写入就是写入多次与写入一次的结果完全相同,可以自动将at least once转化为exactly once。这对于自带主键或主键组的业务比较合适(比如各类日志、MySQL binlog等),并且实现起来比较简单。

但是它要求处理逻辑是map-only的,也就是只能包含转换、过滤等操作,不能包含shuffle、聚合等操作。如果条件更严格,就只能采用事务性写入方法。

 stream.foreachRDD { rdd =>rdd.foreachPartition { iter =>// make sure connection pool is set up on the executor before writingSetupJdbc(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)iter.foreach { case (key, msg) =>DB.autoCommit { implicit session =>// the unique key for idempotency is just the text of the message itself, for example purposessql"insert into idem_data(msg) values (${msg})".update.apply}}}}
  • 事务性写入

这里的事务与DBMS中的事务含义基本相同,就是对数据进行一系列访问与更新操作所组成的逻辑块。为了符合事务的ACID特性(https://en.wikipedia.org/wiki/ACID_(computer_science)),必须引入一个唯一ID标识当前的处理逻辑,并且将计算结果与该ID一起落盘。ID可以由主题、分区、时间、offset等共同组成。

事务操作可以在foreachRDD()时进行。如果数据写入失败,或者offset写入与当前offset range不匹配,那么这一批次数据都将失败并且回滚。

// localTx is transactional, if metric update or offset update fails, neither will be committedDB.localTx { implicit session =>// store metric dataval metricRows = sql"""update txn_data set metric = metric + ${metric}where topic = ${osr.topic}""".update.apply()if (metricRows != 1) {throw new Exception("...")}// store offsetsval offsetRows = sql"""update txn_offsets set off = ${osr.untilOffset}where topic = ${osr.topic} and part = ${osr.partition} and off = ${osr.fromOffset}""".update.apply()if (offsetRows != 1) {throw new Exception("...")}}

欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! ????

Kafka+Spark Streaming如何保证exactly once语义相关推荐

  1. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(九)安装kafka_2.11-1.1.0

    如何搭建配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.& ...

  2. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十一)NIFI1.7.1安装

    一.nifi基本配置 1. 修改各节点主机名,修改/etc/hosts文件内容. 192.168.0.120master192.168.0.121slave1192.168.0.122 slave2 ...

  3. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十二)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网。...

    Centos7出现异常:Failed to start LSB: Bring up/down networking. 按照<Kafka:ZK+Kafka+Spark Streaming集群环境搭 ...

  4. Flume+Kafka+Spark Streaming+MySQL实时日志分析

    文章目录 项目背景 案例需求 一.分析 1.日志分析 二.日志采集 第一步.代码编辑 2.启动采集代码 三.编写Spark Streaming的代码 第一步 创建工程 第二步 选择创建Scala工程 ...

  5. 大数据Spark “蘑菇云”行动第76课: Kafka+Spark Streaming+Redis项目实战

    大数据Spark "蘑菇云"行动第76课:   Kafka+Spark Streaming+Redis项目实战 jedis插件 redis <dependency>   ...

  6. 【python+flume+kafka+spark streaming】编写word_count入门示例

    一. 整体架构的一些理解 1.整体架构的理解: 架构中的角色分为了数据采集,数据缓冲,还有数据处理. flume由于输入和输出的接口众多,于是利用这特点来实现无编程的数据采集. 无编程的数据采集,我是 ...

  7. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十九)ES6.2.2 安装Ik中文分词器

    注: elasticsearch 版本6.2.2 1)集群模式,则每个节点都需要安装ik分词,安装插件完毕后需要重启服务,创建mapping前如果有机器未安装分词,则可能该索引可能为RED,需要删除后 ...

  8. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十八):kafka0.10.1 内置性能测试API用法示例...

    消费者测试: ./kafka-consumer-perf-test.sh --zookeeper vm10.60.0.11.com.cn:2181,vm10.60.0.7.com.cn:2181,vm ...

  9. Flume+Kafka+Spark Streaming实现大数据实时流式数据采集

    近年来,随着企业信息化建设的飞速发展,大数据应用的问题越来越备受关注.很多企业投入大量的人力.物力和财力建设企业大数据平台,平台建设工作涵盖数据采集.数据处理.数据存储.数据服务.数据展示以及数据质量 ...

最新文章

  1. react控制元素的显示或隐藏
  2. ABAP Netweaver, Hybris Commerce和SAP 云平台的登录认证
  3. 无服务器-仅仅是构建现代应用程序的一种方法?
  4. 20155207王雪纯 《Java程序设计》实验一报告
  5. Hadoop1.0.4单节点StandAlone及Pseudo-Distributed部署方法
  6. 没有为此文件配置应用程序上下文_如何在macOS中管理文件关联?
  7. js 获取html文字颜色,js获得网页背景色和字体色的方法
  8. 关于matlab的问题,关于MATLAB的一些基础问题
  9. 7.监控应用和数据可视化 7.1通用健康状态指引器
  10. 初识 InnoDB存储引擎
  11. 微信小程序云开发-树洞小程序Treehole(畅所欲言/交易广场实现)
  12. 世界上首位程序员Ada
  13. 中国石油大学计算机应用基础在线考试答案,中国石油大学2020年春季《计算机应用基础》在线考试答卷附标准答案...
  14. 俄勒冈健康与科学大学计算机,俄勒冈健康与科学大学排名
  15. gcc编译工具常用命令以及汇编语言
  16. Mybatis generator自动生成mybatis配置和类信息
  17. pytorch之torch.zeros_like
  18. python英文文本分析和提取_python如何提取英语pdf内容并翻译
  19. MMC, SDIO, SD
  20. ffmpeg 查询设备_ffmpeg 命令

热门文章

  1. EndNote使用总结+Mendeley
  2. itest监考机制_干货 | iTEST管理员组织大型无纸化考试的经验分享
  3. 算法 2.3.15 螺丝和螺帽
  4. Oracle 11g 64位 下载
  5. pmp考试是智商税吗,是一场持久的割韭菜战吗?
  6. python 获取excel文本框_如何基于python操作excel并获取内容
  7. 产品读书《魔鬼经济学4:用“有色眼镜”看清世界》
  8. 胖虎谈ImageLoader框架(四)
  9. Linux环境下Elasticsearch6.4.3安装
  10. 谷歌战神Jeff Dean