在 2016 年中旬,车好多集团(瓜子二手车&毛豆新车)开始引入 Kafka 技术栈,起初的目标是作为大数据系统的数据输入,随着使用范围的推广,逐步成为集团数据链路的核心组件.

Kafka 在车好多的发展时间线

消息队列

Kafka 简介

Kafka 的官方的介绍是: Apache Kafka® 是一个开源的分布式的事件流平台,有成千上万的公司将 Kafka 作为高性能数据流通道、流式分析和数据整合工具的至关重要的应用来使用。

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines,streaming analytics,data integration,and mission-critical applications.

消息队列的选型

现在在广泛使用的消息队列中,每一种都有自己的优势场景和特点。我们选择 Kafka 的主要原因是看好 Kafka 的生态。相比其他 MQ 系统,Kafka 除了消息队列的核心功能外,官方还提供了输入输出的能力(Kafka connect); 流式计算的能力(KStream / KSQL); 此外社区也十分活跃,提供了多语言的 SDK,以及一些运维部署工具和监控插件; 同时 Kafka 与 Hadoop 等大数据组件结合更紧密,有一些比较成熟的方案将 Kafka 中的数据导入大数据集群和 Spark streaming / Flink 等流式计算框架等。最终我们在选型的时候选择了 Kafka。

集群管理工具

在公司早期,数据量比较小,仅用两台机器通过手工部署组成一个最小集群。随着公司发展和 Kafka 在公司推广,Kafka 逐渐成为瓶颈,需要做一些扩容等运维操作,手工运维集群的复杂程度上升明显,我们团队开始调研使用一些工具来处理这类操作。我们首先引入了 Cloudera Manager (简称CM),它是 Cloudera 公司开发的一款大数据相关集群安装部署工具,这款工具包括自动化安装部署、集中管理、集群监控报警等功能,可以在短时间内快速安装集群,提高集群管理的效率。但是有一些自定义开发的插件无法通过 CM 直接部署,我们引入了 AWX。它是 Red Hat® 公司提供的一套开源工具,可以通过图形化界面来操作 Ansible,还提供了一些运行记录和定时运行任务等功能。

Kafka 第一次大升级

使用 Kafka 的一段比较短的时间内,我们只有几台低配置的机器,接入的服务比较少,负载也很低。随着业务逐步增多加上业务旺季数据量爆发式地上涨,网卡流量每天的大部分时段在 70% 以上,磁盘 IO 也处于一个比较高的区间。

我们第一次遇比较严重的问题,是由于某一个任务启动的时候,从数据库加载了一些信息到 Kafka 中,接受这些数据的 topic 的类型为 compact + delete,数据频繁更新的时候,会不断地刷写文件和删除原文件,导致了这个 topic 的 IO 持续处于高位,影响同磁盘下其他任务的同步,继而导致大量同步延迟,ISR 降低。

在这个时刻遇到的另一个问题是,公司一些高优先级的线上系统也依赖了同一套 Kafka 服务来传递消息,Kafka 服务出现了问题,公司所有使用 Kafka 的应用都将受到影响。因此我们规划对 Kafka 做一次大升级。

原因分析

一段时间内,Kafka 频繁发生事故,我们分析了导致事故的原因有以下几点:

  1. 设备老旧,一台机器只有一块磁盘来抗住 IO 的压力,网卡也仅是一块千兆网卡,在大流量的任务运行时都可能被打满,导致同步延迟和读写数据速度慢;
  2. 初期集群只有两台机器,因此默认配置为 2 副本,最小可用副本数 1 副本,如果有某个机器宕机,可能出现数据丢失的情况;
  3. 数据量小但是重要的 topic 和数据量大相对不重要的 topic 在同一个集群上,流量激增的时候,可能导致前者的可用性下降;
  4. 存在部分历史遗留问题,如: 早期使用 Kafka 的时候,没有对 PHP 和 GO 语言的客户端进行验证,partition 的 leader 切换部分客户端无法自动处理; 一部分用户使用通过 Zookeeper 连接的老版本的 Kafka 客户端,offset 仍然记录在 Zookeeper,consumer 实例可能出现脑裂且不方便统一做监控等等。

我们开始准备升级工作,这时涉及到两种方案,A 方案是重新搭建服务,由业务方进行迁移; B 方案是在原先的集群上先扩容再缩容,逐步替换掉老的机器,再滚动升级。

方案的对比 & 选择

A 方案优势

可能风险: A 方案不存在数据同步导致网卡流量和磁盘 IO 激增的情况; B 方案因机器老旧,可能因为数据迁移导致网卡流量和磁盘 IO激增的情况;
影响范围: A 方案由业务方逐个控制,遇到问题随时回滚,影响范围小; B 方案的操作同时影响所有的使用方,假如迁移后仅有个别使用方遇到问题需要全部回滚;
集群拆分: A 方案可以在迁移时候主动选择使用在线/离线集群; B 方案需要全部指向离线集群,有高优先级需求的业务还需要单独再做迁移;
集群管理: A 方案可以通过更成熟的工具,界面化地维护 Kafka 集群; B 方案很难直接接入管理工具.

B 方案优势

跨部门沟通: A 方案沟通复杂,需要建立各个业务方的信任; B 方案只需要通知在某个时间段操作即可;
具体业务: A 方案需要协调各个业务方的时间,跨部门使用的 topic 迁移存在先后关系; B 方案可以忽略此问题;
旧版客户端: A 方案需要逐个迁移,不再提供通过 Zookeeper 连接的方式,需要业务方在迁移前清理历史遗留问题; B 方案不需要推动业务方清理历史遗留问题;

基于以上的方案对比,我们在迁移的前置工作中,除了提供稳定的集群外,还做了以下几件事情:

  1. 开发一个小工具,可以将 Zookeeper 上的 offset 信息同步到 broker 中,当上线新的连接方式的消费的时候,可以直接从对应 offset 开始消费,无需业务方开发额外代码,并维护了一份迁移的文档以帮助业务方理解和使用该工具。 以此来解决老旧客户端的问题。
  2. 设计先在新集群上线消费者,然后迁移生产者的方案并形成文档,在跨部门跨项目的场景中,建议使用此方案进行迁移,降低沟通成本。
  3. 快速上线 topic 的元信息维护系统,记录 topic 的使用方信息,便于沟通.
  4. 在业务方迁移时候,Kafka-admin 团队驻场,有任何问题可以第一时间解决.

通过提供以上的技术支持,业务方愿意与我们配合,使用 A 方案进行迁移。在迁移的同时我们也做了多集群的部署的方案。

多集群部署

单机的服务的可用性,受到众多硬件环境及软件本身的因素影响,可用性远小于集群服务的可用性,但是将使两个可用性只有 99% 的集群并联起来,整体服务的可用性也可以达到 99.99%。因此我们规划了多个 Kafka 集群的方案,来提升整体 Kafka 服务的可用性,避免单个 Kafka 集群的可用性降低带来对整体的影响。

对于不同的业务,集群的稳定性需求是不同的,除了常规的集群外,一些比较重要的业务场景如: 订单的消息流转,需要更高的 SLA,我们为这类需求单独建立了一套在线集群,牺牲一些资源使用率,换取更高的稳定性; 还有一些实时流计算任务(flink / KSQL 等),会在任务启动时候,从 Kafka 中读取最近一段时间的数据,来维护对应的数据到流计算工具内的缓存,这样处理会在短时间大量地使用网络带宽和磁盘 IO,并且打乱操作系统的页缓存,对整体的性能产生影响,我们为这类需求单独创建了一套分析用集群,提升短时间高吞吐的能力。

拆分集群

业务专用集群一般为极重要(可以特批机器)的业务,只为某一个业务服务,用极低的资源冗余,换取高 SLA,所有数据会同步到 offline 集群;
online 集群作为"重要"(目前定义重要是: 直接影响用户)业务使用的集群,用一部分资源冗余,换取相对高的 SLA,所有数据会同步到 offline 集群;
offline 集群作为数据的全集,会从 online 集群和重要业务单独集群同步数据过来,其余与线上业务无关的数据同步任务,会使用这套集群,另外如果其他集群有不可短时间立即恢复的问题出现,可以立即切换域名指向 offline 集群,把对下游应用影响的时间压缩到最短;
分析集群,从 offline 集群同步指定的 topic,为分析类任务的批量读取提供服务。

AVRO

对 Kafka 的集群进行拆分之后,服务的稳定性得到了保障. 我们团队的主要工作方向慢慢向帮助技术人员提效和降低大数据技术使用门槛发展。一方面,跨业务消息传递无强制格式要求,业务方通常使用 Json 类型字符串,虽然 Json 使用灵活,但是如果业务产生变更,业务上下游的通知不及时,很容易解析异常; 偶尔也会有一些误发送其他 topic 的数据或不完整格式的脏数据被发到集群中,也会导致消息解析失败。另一方面,我们也开始规划以 Kafka 为核心链路,向其他存储如: ES / Hadoop(Hive,HBase) / KUDU / ClickHouse / NEO4J 等组件同步数据的方案。业务上只需要一次把数据写入 Kafka 中,即可在各个其他需要的组件中使用该数据,降低了其他组件的接入门槛,从传统的安装大数据环境 → 数据导入→ 清洗为列式存储再进行查询,到在业务代码发送数据到 Kafka 后直接通过查询引擎(如 Presto)查询,便利地享受大数据带来的红利。结合了跨业务方对数据格式的需求和下游系统大部分需要依赖数据的结构同步,我们引入了 AVRO 的序列化方式。

Schema registry 和兼容性

Schema registry 是一个提供 schema 注册和访问的工具,对 schema 的多版本管理提供了很好的帮助。车好多集团早期业务飞速发展阶段,数据结构经常会有调整,但是下游存储大多不支持随意变更数据结构,结合 Schema Registry 提供的几种兼容性,与业务方多次沟通之后,我们权衡出一套适合我们的兼容性规范: 只允许在原 schema 的基础上追加带有默认值的字段。我们开始选择了 BACKWARD 兼容,但是 BACKWARD 是允许删除字段和在原 schema 中间插入字段的,这对下游同步数据是不友好的,可能导致历史数据错位(如 Hive),我们在公司的技术规范中做了如上限制,一方面保障了数据链路的稳定性,另一方面在跨团队传递消息时也解耦了依赖关系。

AVRO 类型消息收发过程

SDK

有了方案之后,我们在一些业务方做试点,发现使用起来出现的问题五花八门,接入的时候还是需要了解很多细节,这样对我们的全局推广是很不利的。因此我们开始开发 SDK 来帮助大家更简单的接入,只需要拼装对象和发送即可,消费的方式也提供了直接消费出数据并转换成对应的对象,再次试点之后,大家接入的速度快了很多并且遇到的问题也减少了。我们逐步推广到全公司使用这套 SDK。

不过推广到全公司之后,发现有一些业务团队的开发语言会使用 PHP,Python 和 GO,我们只是开发了 Java 的 SDK,于是联系公司比较擅长各种语言的同事,统一设计的思路,针对不同语言开发各自的 SDK 并形成文档,帮助更多的人接入。

数据链路

介绍了业务上的数据的输入之后,我们来看看其他的数据输入和输出,如:MySQL binlog 的输入,输出到 HDFS(Hive),HBase 等等.

Kafka connect

从 Kafka 到其他组件,简单来看就是消费消息到下游组件。我们选择了 Kafka connect 作为数据的输入输出的平台,通过开发和改造插件的形式,灵活控制增减任务。什么是 Kafka connect ? Kafka Connect是一种用于在Kafka和其他系统之间可扩展的、可靠的流式传输数据的工具。Kafka connect 以插件的形式,完成不同上下游组件的数据输入输出。我们除了可以使用一些开源的插件来完成这些工作,还可以自定义开发,完善符合业务要求的业务逻辑,使用起来非常灵活。

Source

Kafka connect 分成两个部分,我们首先来看看数据从其他组件进入到 Kafka,也就是 source 部分。我们最主要的 source 任务就是将 MySQL 的 binlog 抽取到 Kafka 中,提供给业务方使用和同步到下游组件。

CDC

我们早期使用的 canal 同步 binlog 到 Kafka 中,但是在我们选型的时候,canal 还是主从结构,嵌入到 Kafka connect 中不是很方便; 现在来看 Debezium 也是个不错的选择,不过在选型的时候,它还没发布正式版本,当时也没有选择。最终我们选择了Maxwell,一个比较小巧的工具改造起来比较方便,也便于嵌入 Kafka connect。

Sink

数据进入到 Kafka 之后,除了下游业务直接消费,我们还需要将这些数据同步到其他存储中,以便于统计分析等后续处理。常见的场景例如: 写入数据到 ElasticSearch,为数据的搜索提供服务; 写入到图数据库 Neo4J ,为图谱做数据源; 写入到 Redis,维护最新的数据,提升缓存命中率等等。我们详细的聊聊使用最多的几个场景,写入 HDFS、HBase 和 KUDU。

HDFS

我们经常遇到一些场景,需要将一些比较像日志类型的结构化的数据进行永久存储,存储到 MySQL 之类的关系型数据库的话,数据量比较大,优先级和重要程度又没有这么高,积累下来查询又比较慢,还可能影响重要业务的查询。我们为了解决这类问题,引入了同步数据到 HDFS(Hive),在发送的时候可以使用异步方式发送到 Kafka,不会 block 主要业务流程,查询的时候可以通过 Presto 等 OLAP 引擎查询,提升查询速度。

时效性与小文件

Kafka 里边的数据本身是流式的,写入到 HDFS(Hive) 就需要考虑多久做一次文件的刷写,文件刷新频率过低,业务上查询数据的实时性就会变差; 反之提升文件刷新频率,HDFS 对小文件又非常不友好,日积月累会导致 HDFS 集群的巨大压力。我们结合业务方对时效性的要求和公司 HDFS 的现状,制定了生成文件的规则: 每 5 分钟进行一次文件的刷写。

小文件问题,需要合并

如果只是限制了每 5 分钟刷写一次文件,每个 partition 每天仍然要产生将近 300 个文件,长期积累下去,小文件的个数仍然会对 HDFS 的服务造成影响,我们开发了一套小文件合并的程序,在晚上把每天每个 partition 的小文件合并掉成一个。简单介绍一下合并的过程:

  1. 通过 spark,将当天的分区的数据 select insert into 一个临时目录;
  2. 校对新生成的数据和原始数据是否一致;
  3. 将原分区目录中的文件移动到一个临时目录,如无需回滚 7 天后删除;
  4. 将新生成的数据文件移动到原目录。

通过以上的合并方案,能够有效控制小文件个数在一定范围,降低对 HDFS 的影响。

HBase & KUDU

我们另外一类比较大的应用场景是将数据同步到 HBase 和 KUDU 中,为离线和在线的数仓提供服务。车好多集团和很多流量类型的互联网公司的一个比较大的区别是: 业务周期比较长,数据更新频繁。一辆车从开始入库到多项检测,再到销售过程,周期最长可能会持续好几个月。这样就会有一些订单的周期会横跨多个月,并且经常产生状态的更新。HBase 和 KUDU 可以帮助我们更新数据的变化,利用这一特性,将前文提到的 Kafka 中的 MySQL binlog 数据 sink 到 HBase 和 KUDU 中。如前文提到的 Avro Schema 兼容性问题,当数据库的数据格式在规范内进行变更,下游存储可以自动适应新的数据结构,将新的表结构同步到 KUDU 和 Hive on HBase 的表中。为数仓系统提供近实时的 ODS 层数据,支持离线和实时的报表统计需求,这是 CDC 产生的数据的一种主要应用形式。

数仓体系

遇到过的问题

使用 Kafka connect 的过程并非一帆风顺,我们也遇到过一些问题。

版本的问题

我们开始使用的版本存在两个比较明显的问题:

  1. 每次添加任务会做一次全局的 rebalance,这会使大多数不需要做 balance 的任务重启。等到全部任务再回到正常状态,整个集群至少会产生秒级别的数据延迟;
  2. 任务重启的时候,有概率会导致原任务停止失败,又重新拉起了新的任务。因为绝大多数是 sink 类型的任务,受到 partition 个数的限制,初期体现并不明显,增加了 source 类型的任务,这个问题才逐渐凸显.

关注社区的一些动态只会,发现在新的版本里边,已经解决了这两个问题,于是我们决定对 Kafka connect 集群进行升级。但是调研、测试并升级的周期稍长,为了保障业务短期内不受到影响,我们与业务线沟通了 SLA 的要求,一些高 SLA 需求的业务如: 主站搜索依赖的 binlog 数据流,我们独立部署集群,如果有上线需求仅在业务低峰时段进行,以缓解数据延迟。

升级

计划升级之后,我们梳理了一下老的集群,当初为了避免全局 rebalance 带来的影响,建立了多套小集群,在升级之后我们希望能合并成尽量少的集群,方便管理。升级过程中仍然发现了许多问题:

  1. 一些与业务方约定的内容和公司规范有关的自定义开发内容代码提交混乱,升级时无法快速打入新版本;
  2. 多个小集群有不同的 Kafka connect 版本和不同的组件的插件版本;
  3. 日志和 JMX 端口配置混乱,不利于错误排查,上线部署流程不规范。

借着升级的机会,我们重新梳理了自定义开发的代码; 统一了多个版本的 Kafka connect 和相关插件; 统一了监控配置和部署流程。

反思

复盘这次升级的过程中遇到的问题,我们也总结并形成一些规范:

  1. 时刻考虑写下的代码是否是可维护的,尤其是在一些比较成熟的项目中的二次开发,规范 review 机制,制定二次开发代码提交的规范;
  2. 规范部署流程,通过 Ansible 的标准部署方式,配置文件通过 git 管理,做到配置文件的版本可追溯;
  3. 升级过程中如果遇到问题,应该立即回滚,而不是在线上环境做更多的尝试,有可能导致二次事故。

平台化

为了提升开发人员效率,我们结合公司的 org 系统,开发了一些小工具. 它可以帮助开发人员快速排查错误,观察监控数据,订阅报警等等; 也可以帮助一些合作方快速地完成任务的调整。

平台化工具

我们使用的组件大多是有一些开源的工具,提供给我们最通用的基础功能,但是仍然有一些公司内部深度定制的功能,已经权限相关的功能需要增加。

Kafka 平台

相比很多开源的 Kafka 提供的工具,我们做了一些建设,除了开源常用的功能外,补充了几个我们使用频率比较高的需求:

  1. 开源工具有很多可以对 topic 等内容操作的功能,我们调整了只开放读操作;
  2. 增加通过 offset 或者时间点获取数据的方式;
  3. 增加一些公司内部约定好的格式,如 avro 序列化,可以直接解析并展示;
  4. 展示一些 topic 所属部门的相关信息,方便沟通;
  5. 增加生产消费的监控展示和一些报警的订阅。

schema 工具

Avro Schema registry 主要的作用是管理 schema 的历史版本,这里边涉及到兼容性的问题(可以参考官网). 我们选择的兼容性如前文提到的是: 只允许在原 schema 的基础上追加带有默认值的字段。我们维护了一套 schema 的管理工具,主要功能如下:

  1. 按照事先约定的规范,对提交的内容做格式检查;
  2. 每个业务部门有自己的组,组内人员可以提交 schema,小组 leader 进行审核;
  3. 自动化检查提交的 schema 是否符合规范和兼容性,符合则提交到 git 中,对修改内容的版本做记录;
  4. 通过 git 的 CI/CD,将提交的 schema 进行编译,生成 go 代码和 jar 包提交到 git 和公司 Maven 私服。

通过这套流程,让 schema 的修改标准化,并且自动化地做好编译打包工作,流程完毕之后可以直接引用。

总结&展望

经过四年多的磨砺,Kafka 在车好多从 0 到 1 再到 10,已经成为一个非常核心的组件,每天承接车好多集团百亿级别消息的流转和数据的同步的工作。未来我们会继续跟进社区探索高版本 Kafka 的能力,将一些类似去除 Zookeeper 依赖等新特性在车好多落地;深耕更多业务场景,探索 Pulsar 等新的消息队列方案对用户零侵入的落地方式;开发上下游辅助工具,探索 Kafka 跨集群备份时 offset 自动对齐工具链,帮助业务开发的同事提升开发效率。让 Kafka 在车好多集团的应用更上一层楼。

kudu接受kafka消息_Kafka 在车好多相关推荐

  1. kudu接受kafka消息_Kafka入门详解

    1.1 什么是kafka? Kafka最初由Linkedin公司开发,是一个分布式.支持分区的(partition).多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的 ...

  2. kafka使用_kafka使用Interceptors实现消息端到端跟踪

    现在,Kafka指标的采集都仅包括客户端或broker,这使得用户跟踪消息在集群内的传递路径,构建系统端到端的性能和行为画像变的困难.从技术上讲,通过修改应用以收集或跟踪额外的信息来测量系统端到端的性 ...

  3. 传小米要在A股港股同时上市;车好多完成8亿美元融资;三星市值蒸发近600亿美元丨价值早报

    第[878]期早报由[周五]赞助播出 01 今日头条 三星近4个月市值蒸发600亿美元 据国外媒体报道,三星电子公司最新旗舰智能手机并不能成为自己股价暴跌的解药,该公司自去年11月份以来因股价大跌,其 ...

  4. kafka消息会不会丢失

    转载:https://baijiahao.baidu.com/s?id=1583469327946027281&wfr=spider&for=pc 消息发送方式 想清楚Kafka发送的 ...

  5. kafka消息堆积原因解析

    kafka消息堆积,可以调节如下两个参数 max.poll.records 一次调用poll()返回的最大记录数. 默认值500 就是一次最多拉取500条记录 max.poll.interval.ms ...

  6. kafka消息服务的producer、broker、consumer的配置

    2019独角兽企业重金招聘Python工程师标准>>> server.properties配置: server.properties中所有配置参数说明(解释)如下列表: 参数 说明( ...

  7. 19 kafka消息队列

    文章目录 19 kafka消息队列 一.kafka介绍 1.消息队列基本介绍 2.常用的消息队列介绍 3.消息队列的应用场景 4.消息队列的两种模式 5.kafka的基本介绍 6.kafka的架构介绍 ...

  8. SpringBoot集成Kafka消息队列

    1.说明 Spring可以方便的集成使用 Kafka消息队列 , 只需要引入依赖包spring-kafka, 注意版本兼容问题, 本文详细介绍SpringBoot集成Kafka的方法, 以及生产者和消 ...

  9. Kafka消息队列简介

    Kafka消息队列简介 1 基本概念 Broker Kafka集群包含一个或多个服务器,这种服务器被称为broker Topic 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic. ...

最新文章

  1. 2021入坑图像分割,我该从哪儿入手?
  2. oracle em 乱码问题
  3. 微信公众平台自定义菜单接口API指南
  4. openstack搭建之-nova配置(10)
  5. 即时通讯音视频开发(一):视频编解码之理论概述
  6. AV1挑起的Codec之战
  7. 由于不当的执行顺序导致的死锁
  8. QT中动态库和静态库使用
  9. 插入移动硬盘_Win10插入移动硬盘或U盘有提示声但电脑中不显示的解决方法
  10. JavaScript数据结构——字典(Dictionary)
  11. “杀死” APP 的留白设计!
  12. JdbcTemplate实验
  13. python如何屏幕截图_Python编写屏幕截图程序方法
  14. java如何调用webservice_Java如何调用WebService接口
  15. 利用python画简单图形示例代码_使用python画社交网络图实例代码
  16. Shamir密钥分享算法简析
  17. Android中的EditText属性说明
  18. KNIME的学习使用心得
  19. html图片变颜色,css 改变图片颜色(纯色)常用图标换色
  20. Unity 延时的三种方法

热门文章

  1. vector容器 begin()与end()函数、front()与back()的用法
  2. 归一化互相关Normalization cross correlation (NCC)
  3. 在Blender中创建惊人的低多边形动画
  4. C4D灯光照明技术学习教程
  5. 强大的Charles的使用,强大的flutter1.9
  6. HPU组队赛B:问题(二进制枚举)
  7. NYOJ 90 —— 求正整数n划分为若干个正整数的划分个数
  8. Intent携带额外的数据的方法
  9. 【CSON原创】HTML5游戏框架cnGameJS开发实录(外部输入模块篇)
  10. spacemacs各种问题修复方法