kafka传递文件_Kafka权威指南(二)数据传递/数据管道/数据镜像
可靠的数据传递
可靠性保证
- kafka可以保证分区消息的顺序
- 只有当消息被写入分区的所有同步副本时,才被认为是已提交的
- 只要还有一个副本是活跃的,那么已经提交的消息就不会丢失
- 消费者只能读取已经提交的消息
消息存储的可靠性和一致性的重要程度与可用性、高吞吐量、低延迟和硬件成本之间的权衡
复制
kafka复制机制和分区多副本架构是可靠性保证的核心
同步分区要满足的条件
- 与zookeeper间有一个活跃的会话,在过去6s内发送过心跳
- 过去10s内从首领获取过消息
- 过云10s内从首领获取过最新消息,即几乎是零延迟
一个滞后的同步副本会导致生产者和消费者变慢,因为消息被认为已提交前,需要得到同步副本的确诊
非同步副本不会影响性能,但更少的同步副本意味着更低的有效复制系统
broker配置
影响消息存储可靠性的参数,这些参数即可应用于全局,也可应用于特定的主题
1、复制系数
主题级别:replication.factor
broker级别:default.replication.factor
即分区副本个数,默认3副本
broker.rack 可以为每个broker配置所在机架的名字,此时kafka会保证分区的副本被分布在多个机架上
2、不完全的首领选举
broker级别:unclean.leader.election 默认true
在选举首领副本过程中没有丢失数据,即提交的数据同时存在于所有同步副本上,好么选举是完全的
如果在首领不可用时,其他副本都不同步,就是不完全
注意:允许不同步的跟随副本提升为首领副本,就要承担丢失数据和数据不一致的风险;若不允许它成为首领,则要接受较低可用性的风险(需要等待原首领恢复到可用状态)
3、最少同步副本
主题级别:min.insync.replicas
broker级别:min.insync.replicas
如果同步副本数量小于最小值,broker会停止接受生产者的请求,消费者仍然可正常读取已有数据 (只读)
在可靠的系统里使用生产者
可能存在的问题
- acks设置为1,首领收到消息后,崩溃,消息还没有同步到同步副本时
- acks设置为all,客户端发送后首领崩溃,但客户端没有正确处理"首领不可用"异常,并认为消息已正常投递
发送确认
acks=0 一定会丢失消息,但可得到惊人的吞吐量和带宽利用率
acks=1 风险点,客户端没有正确处理 LeaderNotAvailableException 异常;首领同步消息时崩溃
acks=all 配合min.insync.replicas参数,最保险,但会降低吞吐量
配置生产者的重试参数
broker返回的错误分为两种:可通过重试解决和不能通过重试解决
对于重试次数,需要根据实际情况进行设置 (MirrorMaker的重试策略是无限重试,决不丢失消息)
重试发送已经失败的消息会带来风险,如导致消息重复
重试和恰当的错误处理可保证每个消息"至少被保存一次",但无法保证"只被保存一次"
解决办法是为每条消息加入唯一标识符 和 应用程序做到"幂等"
额外的错误处理
不可重试的错误:消息大小错误、认证错误、序列化错误、重试次数上限或内存达到上限时的错误
在可靠的系统里使用消费者
跟踪哪些消息是已经读取过的,哪些是还没有读取过的
消费者提交了偏移量却未能处理完消息,就有可能造成消息丢失
消费者的可靠性配置
- group.id
- auto.offset.reset 在没有偏移量或偏移量无效时,客户端的行为 latest/earliest
- enable.auto.commit 是否自动提交偏移量,自动提交无法控制重复处理消息
- auto.commit.interval.ms 自动提交频率
显式提交偏移量
1、总是在处理完事件后再提交偏移量
2、提交频率是性能和重复消息数量之间的权衡
3、确保对提交的偏移量心里有数 是读取到的最新偏移量还是处理过的最新偏移量
4、再均衡
5、消费者可能需要重试
6、消费者可能需要维护状态
7、长时间处理
8、仅一次传递 幂等性写入
验证系统可靠性
1、配置验证
考虑首领选举、控制器选举、依次重启、不完全首领选举情况下的状态
2、应用程序验证
考虑客户端从服务器断开连接、首领选举、依次重启broker、依次重启消费者、依次重启生产者情况下的状态
3、在生产环境监控可靠性
对生产者,error-rate和retry-rate 及错误日志
对消费者,consumer-lag
构建数据管道
可作为各数据段间大型缓冲区,有效解耦管道数据的生产者和消费者
构建数据管道时需要考虑的问题
1、及时性
2、可靠性
kafka本身支持"至少一次传递",再结合具有事务模型或唯一键特性的外部存储系统,kafka也能实现"仅一次传递"
3、高吞吐量和动态吞吐量
kafka和connect api与数据格式无关,生产者和消费者可以使用各种序列化器来表示任意格式的数据
ETL 提取-转换-加载 转换过程由数据管道完成
ELT 提取-加载-转换 转换过程由下游应用程序完成,高保真
Kafka支持加密传输数据,还支持认证和授权,并能提供审计日志
如果数据管道过多地处理数据,会给下游系统造成一些限制和束缚,如果下游系统有新的需求,数据管道就要做相应的变更
更灵活的方式是尽量保留原始数据的完整性,让下游应用自己决定如何处理和聚合数据
Connect API和客户端API
客户端是需要被内嵌到应用程序里的
Connect用于从外部数据存储系统读取数据,或将数据推送到外部系统,是独立于外部系统的一个中间适配器
Kafka Connect
为连接器插件提供一组API和运行时
# 启动Connect
bin/connect-distributed.sh config/connect-distributed.properties
# 通过API查看已安装的connector插件
curl http://localhost:8083/connector-plugins
# 单机模式
bin/connect-standalone.sh +配置参数
文件通过数据管道传输示例:
1、启动一个分布式worker进程
bin/connect-distributed.sh config/connect-distributed.properties &
2、启动一个文件数据源 (把server.properties发送到主题kafka-config-topic,连接器名称load-kafka-config)
echo '{"name":"load-kafka-config","config":{"connector.class":"FileStreamSource","file":"config/server.properties","topic":"kafka-config-topic"}}' | \
curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"
3、消费刚刚发到kafka的消息
bin/kafka-console-consumer.sh --new --bootstrap-server=localhost:9092 --topic kafka-config-topic --from-beginning
#==== 接收文件
1、启动一个文件数据池
echo '{"name":"dump-kafka-config","config":{"connector.class":"FileStreamSlink","file":"copy-of-server-properties","topics":"kafka-config-topic"}}' | \
curl -X POST -d @- http://local-host:8083/connectors --header "content-Type:application/json"
# 会生成文件copy-of-server-properties内容和源文件相同
2、删除连接器
curl -X DELETE http://localhost:8083/connectors/dump-kafka-config
示例,MySQL -> ElasticSearch page128(109)
https://github.com/confluentinc/kafka-connect-elasticsearch
https://github.com/confluentinc/kafka-connect-jdbc
深入理解Connect
worker进程集群
1、连接器和任务
连接器 决定需要运行多少个任务;按任务拆分数据复制;从worker进程获取任务配置
# 如果通过REST API启动连接器,可能会启动任意节点上的连接器,连接器的任务会在该节点上执行
2、任务
任务负责将数据移入或移出kafka
3、worker进程
是连接器和任务的容器
4、转化器和Connect的数据模型
用户配置worker进程(或连接器)时,可选择使用合适的转化器,用于将数据保存到kafka
可用的转化器有:avro、json、string
5、偏移量管理
源连接器返回给worker进程的记录里包含一个逻辑分区和一个逻辑偏移量 (非kafka的分区和偏移量)
其他选择
1、基于图形界面的ETL工具
kafka可以是一个支持数据集成(使用Connect)、应用集成(使用生产者和消费者)和流式处理平台
kafka可以成为ETL工具的替代品(ETL工具太过复杂)
2、流式处理框架
数据集成系统应该只做一件事:传统数据;可靠性是数据集成系统唯一一个重要需求
跨集群数据镜像
集群间的数据复制叫做镜像,kafka内置的跨集群复制工具是 MirrorMaker
跨集群镜像的使用场景
- 区域集群和中心集群
- 冗余 DR
- 云迁移
跨数据中心通信的现实情况
- 高延迟
- 有限的带宽
- 高成本
因为kafka服务器和客户端是按照单个数据中心进行设计、开发、测试和调优的,默认的超时时间和缓冲区大小不适用于跨多个数据中心安装kafka服务器
向远程数据中心生成数据要忍受高延迟,且需增加重试次数和增大缓冲区以解决潜在的网络问题
Hub和Spoke架构
该架构适用于一个中心kafka集群对应多个本地kafka集群的情况
数据只会在本地数据中心生成,且每个数据中心的数据只会被镜像到中央数据中心一次
不足:一个数据中心的应用程序无法访问另一个数据中心的数据
双活架构 Active-Active
两个或多个数据中心需共享数据且每个数据中心都可以生产和读取数据
优势:可就近为用户提供服务,具有性能优势,不会因数据可用性问题在功能方面作出牺牲;冗余和弹性
不足:如何进行多个位置数据异步读取和异步更新时避免冲突;数据一致性问题;循环复制
主备架构 Active-Standby
优势:易于实现,可用于任何场景
不足:浪费了一个集群
失效备援
要实现不丢失数据或无重复数据的kafka集群失效备援是不可能的
- 数据丢失和不一致性 镜像是异步的,会落后源一定时的消息
- 失效备援之后的起始偏移量
-- 偏移量自动重置,直接从头或末尾开始消费消息,存在消息重复消费和丢失的问题
-- 复制偏移量主题,__consumer_offsets 需要注意的是,两边的实际偏移量可能并不一致 (需要从0开始镜像数据并持续镜像偏移量主题)
-- 基于时间的失效备援 (新版kafka支持)
-- 偏移量外部映射 非常复杂
kafka客户端只需要连接一个broker,就可以获取到整个集群的元数据,并发现集群里的其他broker,一般提供3个broker信息就可以了
延展集群 stretch cluster
跨多个数据中心安装单个kafka集群
优势:同步复制;此外,所有broker都发挥了作用
不足:应对灾难类型很有限;运维复杂 需要3个数据中心,因为zookeeper要求大多数节点用用时,整个集群才可用
MirrorMaker
MirrorMaker为每个消费者分配一个线程,消费者从源集群的主题和分区上读取数据,然后通过公共生产者将数据发送到目标集群
MirrorMaker进程若发生崩溃,默认最多出现60秒的得复数据
能尽可能做到仅一次传递
MirrorMaker示例
bin/kafka-mirror-maker --consumer.config etc/kafka/consumer.properties --producer.config etc/kafka/producer.properties --new.consumer --num.streams=2 --whitelist ".*"
配置说明
consumer.config
# 所有消费者共用该配置,所有消费者属于同一个消费者群组
# auto.commit.enable=false 通常不需修改,若修改该参数,可能导致数据丢失
# auto.offset.reset一般需要修改,默认是latest(仅对MirrorMaker启动之后到达集群的数据进行镜像),根据情况可修改为earliest
producer.config
生产者的配置文件,唯一必选的参数是 bootstrap.servers
new.consumer
选择consumer的版本,建议使用更新版本的消费者,因为更稳定
num.streams
消费者数量
whitelist
正则表达式,代表需要进行镜像的主题名,所有与表达式匹配的主题都可以被镜像
在生产环境部署MirrorMaker
可以在docker容器里运行MirrorMaker
MirrorMaker是完全无状态的,也不需要磁盘存储(所有数据和状态都保存在kafka上)
单个实例的吞吐量受限于生产者,因为一个实例只有一个生产者,而要提高整体吞吐量,可部署多个实例
如果可能,尽量将MirrorMaker运行在目标数据中心
若网络出现问题,一个无法连接到集群的消费者比一个无法连接到集群的生产者要安全得多
如果跨数据中心流量需要加密,最好把MirrorMaker放在源数据中心
将MirrorMaker部署到生产环境时,需要对以下几项内容进行监控
- 延迟监控
-- 通过MirrorMaker提交到源集群的偏移量,默认有一分钟的延迟,因为1分钟提交一次,然后通过 kafka-consumer-groups工具获取该偏移值
-- 通过消费者JMX发布的最大延迟,只反映消费者读取的数据,没有考虑生产者是否成功将数据发送到目标集群
## 如果MirrorMaker跳过或丢弃部分消息,上面两种方法都无法检测到
- 度量指标监控
-- 消费者:fetch-size-avg,fetch-size-max,fetch-rate,fetch-throttle-time-avg,fetch-throttle-time-max
-- 生产者:batch-size-avg,batch-size-max,requests-in-flight,record-retry-rate
-- 同时适用两者:io-ratio,io-wait-ratio
- canary
-- 每分钟向源集群特定主题发送一个事件,然后尝试从目标集群读取这个事件,在超过指定时间还没有读取到就触发报警
MirrorMaker调优
MirrorMaker集群的大小,取决于对吞吐量的需求和对延迟的接受程度
压测:
使用kafka-performance-producer工具,在源集群上制造负载,然后启动MirrorMaker对这个负载进行镜像,分别设置不同的消费者数,观察性能在哪个点开始下降
然后将 num.streams 的值设置为一个小于当前点的整数,竞而得到单个实例的最大吞吐量
Linux系统调优
如果MirrorMaker是跨数据中心运行的,可对linux网络进行优化
- 增加TCP缓冲区大小 net.core.rmem_default,net.core.rmem_max,net.core.wmem_default,net.core.wmem_max,net.core.optmem_max
- 启用时间窗口自动伸缩 sysctl -w net.ipv4.tcp_window_scaling=1 或者添加到 /etc/sysctl.conf
- 减少TCP慢启动时间 将/proc/sys/net/ipv4/tcp_slow_start_after_idle设为0
可参考:Sandra K.Johnson等人合著的Performance tuning for Linux servers
对生产者调优
- max.in.flight.requests.per.connection 默认只有一个处理中的请求,即生产者在发送下一个消息前,当前发送的消息必须得到目标集群确认
这是唯一能保证消息次序的方法,若不在乎消息次序,可增加该值以提高吞吐量
- linger.ms batch.size 若总是发送未填满的批次(batch-size-avg和batch-size-max的值总是比batch.size低),可增加latency.ms以尽量满批次
若都是满批次,可加大batch.size以加大批次大小
提升消费者吞吐量
- range 分区分配策略,但会导致分配不均,可以考虑改为round robin
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoudRobinAssignor
- fetch.max.bytes 若指标fetch-size-avg和fetch-size-max的数值与fetch.max.bytes很接近,说明消费者读取的数据已经接近上限
此时可以增大fetch.max.bytes在每个请求里读取更多的数据
- fetch.min.bytes fetch.max.wait 若指标fetch-rate值很高,说明消费者发送的请求太多,且获取不到足够的数据
可调大这两个参数,这样消费者的每个请求可以获取到更多的数据
其他跨集群镜像方案
1、优步的uReplicator
随着主题和分区的增加及集群吞吐量的增长,MirrorMaker面临的问题
- 再均衡延迟 新加分区或消费者离线时
- 难以增加新主题 使用了正则匹配,当有新主题加入时,会发生再均衡,这往往是不可预期的
使用Apache Helix管理主题列表并分配给每个实例分区,避免了再均衡的问题
2、Confluent的Replicator
Replicator解决的痛点
- 分散的集群配置 保证不同集群中配置的一致性
- 在集群管理方面面临的挑战 MirrorMaker本身的部署、监控和配置管理
Replicator不仅从kafka主题复制数据,还会从zk上复制主题的配置信息
喜欢 (1)or分享 (0)
kafka传递文件_Kafka权威指南(二)数据传递/数据管道/数据镜像相关推荐
- Linux文件命令精通指南(二)(转)
Linux文件命令精通指南(二)(转) 文件处理命令 剖析一个文件列表 ls 命令用来查看用户有执行权限的任意目录中的文件列表,该命令有许多有趣的选项.例如: $ ls -liah * 22684 - ...
- 基于python的气象数据分析_基于python的《Hadoop权威指南》一书中气象数据下载和map reduce化数据处理及其......
文档内容: 1:下载<hadoop权威指南>中的气象数据 2:对下载的气象数据归档整理并读取数据 3:对气象数据进行map reduce进行处理 关键词:<Hadoop权威指南> ...
- 07 Confluent_Kafka权威指南 第七章: 构建数据管道
文章目录 CHAPTER 7 Building Data Pipelines 构建数据管道 将数据集成到上下文 Considerations When Building Data Pipelines ...
- 《数据中心虚拟化技术权威指南》一2.2 数据中心网络拓扑
本节书摘来自异步社区<数据中心虚拟化技术权威指南>一书中的第2章,第2.2节,作者[巴西]Gustavo A. A. Santana,更多章节内容可以访问云栖社区"异步社区&qu ...
- 大数据数据仓库——hive学习权威指南
友情提示:更多有关大数据.人工智能方面技术文章请关注博主个人微信公众号:大数据分析爱好者社区! 学习hive权威指南 目录: ETL介绍 大数据平台架构概述 系统数据流动 hive概述 hive在ha ...
- Linux文件命令精通指南(三)(转)
Linux文件命令精通指南(三)(转) 查看和查找文件 文件过滤器 用来读取文件内容和在文件内容上执行操作的命令有时被称为 1y滤器.sed 和 awk 命令是两个滤波器的例子,因为在以前的 OTN ...
- 【读书笔记】Jenkins权威指南
目录 一.Jenkins权威指南 二.书摘 第一章.Jenkins简介 第二章.迈入Jenkins的第一步 第三章.安装Jenkins 第四章.配置Jenkins服务器 第五章.设置构件作业 第六章. ...
- kafka 不同分区文件存储_Kafka 系列(二)文件存储机制与Producer架构原理怎样保证数据可靠性??...
文章目录 Kafka工作流程及文件存储机制 工作流程: topic底层存储: Producer生产者架构: 一:分区存储策略: 1.分区的原因: 2.分区的原则: ProducerRecord构造器: ...
- 送5本《Kafka权威指南》第二版
文末送书 科学家们每一次发生分歧都是因为掌握的数据不够充分.所以,我们可以先就获取哪一类数据达成一致,只要获取了数据,问题也就迎刃而解了.要么我是对的,要么你是对的,要么我们都是错的,然后继续. -- ...
最新文章
- AutoMapper用法
- 美国《消费者报告》实测特斯拉Model 3,“完全自动驾驶”名不符实
- 这个40M的小工具助你在windows下处理数据如虎添翼!!
- python 安卓模拟点击_python模拟点击在ios中实现的实例讲解
- 海康威视连续采图与单步采图_c#
- 单点登录Redis存储Session及Cookie场景介绍
- XCTF-MISC-新手区-坚持60s
- 2. 堪比JMeter的.Net压测工具 - Crank 进阶篇 - 认识yml
- 移动应用程序和网页应用程序_如何开发感觉像本机移动应用程序的渐进式Web应用程序...
- 判断radio单选按钮是否选中
- [水力建模]EPANET代码解读1
- [视频教程]用Unity3d开发跳一跳小游戏
- IE7下position:relative的问题
- 一文看懂张小龙 2018 微信公开课演讲
- java商城如何防止超卖_电商中怎么防止超卖
- 计算机老师任课教师寄语,任课老师新学期寄语
- JAVA 数组降序排列思路
- Matlab如何调整背景颜色
- 你的手机广告被偷了!通过重定向广告窃取个人隐私,攻击者还能进行恶意广告攻击...
- 引入微信支付Java SDK WxPayAPI_JAVA.zip