kafka简介

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。

关键词

  • 分布式流处理平台。
  • 在系统之间构建实时数据流管道。
  • 以topic分类对记录进行存储
  • 每个记录包含key-value+timestamp
  • 每秒钟百万消息吞吐量。

安装kafka

0.选择三台主机安装kafka
1.准备zk
略
2.jdk
略
3.tar文件
4.环境变量
略
5.配置kafka
[kafka/config/server.properties]
...
broker.id=201
...
listeners=PLAINTEXT://:9092
...
log.dirs=/home/centos/kafka/logs
...
zookeeper.connect=s201:2181,s202:2181,s203:21816.分发server.properties,同时修改每个文件的broker.id7.启动kafka服务器
a)先启动zk
b)启动kafka
[s202 ~ s204]
$>bin/kafka-server-start.sh -daemon config/server.propertiesc)验证kafka服务器是否启动
$>netstat -anop | grep 90928.创建主题
$>bin/kafka-topics.sh --create --zookeeper s201:2181 --replication-factor 3 --partitions 3 --topic test9.查看主题列表
$>bin/kafka-topics.sh --list --zookeeper s201:218110.启动控制台生产者
$>bin/kafka-console-producer.sh --broker-list s202:9092 --topic test11.启动控制台消费者
$>bin/kafka-console-consumer.sh --bootstrap-server s202:9092 --topic test --from-beginning --zookeeper s202:218112.在生产者控制台输入hello world

kafka 的使用场景

  • 埋点日志的收集一个公司可以用Kafka可以收集各种服务的log。
  • 消息系统:解耦和生产者和消费者、缓存消息等。
  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘
  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
  • 流式处理:比如spark streaming和flink
  • 事件源

kafka如何保证的消息数据不丢失

当讨论这个问题的时候,首先需要考量kafka的运行机制。kafka主要分为三个组件,producer、consumer、broker。所以也必须从三个方面去考量,producer、consumer、broker端数据不丢失。

一、producer端如何保证数据不丢失

1.ack的配配置策略

acks = 0    
生产者发送消息之后 不需要等待服务端的任何响应,它不管消息有没有发送成功,如果发送过程中遇到了异常,
导致broker端没有收到消息,消息也就丢失了。实际上它只是把消息发送到了socketBuffer(缓存)中,
而socketBuffer什么时候被提交到broker端并不关心,它不担保broker端是否收到了消息,
但是这样的配置对retry是不起作用的,因为producer端都不知道是否发生了错误,
而且对于offset的获取永远都是-1,因为broker端可能还没有开始写数据。
这样不保险的操作为什么还有这样的配置?kafka对于收集海量数据,
如果在收集某一项日志时是允许数据量有一定丢失的话,是可以用这种配置来收集日志。   
acks = 1(默认值)    
生产者发送消息之后,只要分区的leader副本成功写入消息,那么它就会收到来自服务端的成功响应。
其实就是消息只发给了leader leader收到消息后会返回ack到producer端。
如果消息无法写入leader时(选举、宕机等情况时),生产都会收到一个错误的响应,为了避免消息丢失,
生产者可以选择重发消息,如果消息成功写入,在被其它副本同步数据时leader  崩溃,那么此条数据
还是会丢失,因为新选举的leader是没有收到这条消息,ack设置为1是消息可靠性和吞吐量折中的方案。  
acks = all (或-1)    
生产者在发送消息之后,需要等待ISR中所有的副本都成功写入消息之后才能够收到来自服务端的成功响应,
在配置环境相同的情况下此种配置可以达到最强的可靠性。即:在发送消息时,需要leader 向fllow
同步完数据之后,也就是ISR队列中所有的broker全部保存完这条消息后,才会向ack发送消息,表示发送成功。

2.retries的配置策略

在kafka中错误分为2种,一种是可恢复的,另一种是不可恢复的。  

  • 可恢复性的错误:  

如遇到在leader的选举、网络的抖动等这些异常时,如果我们在这个时候配置的retries大于0的, 也就是可以进行重试操作,那么等到leader选举完成后、网络稳定后,这些异常就会消息,错误也就可以恢复, 数据再次重发时就会正常发送到broker端。需要注意retries(重试)之间的时间间隔, 以确保在重试时可恢复性错误都已恢复。  

  • 不可恢复性的错误:  

如:超过了发送消息的最大值(max.request.size)时,这种错误是不可恢复的,如果不做处理, 那么数据就会丢失,因此我们需要注意在发生异常时把这些消息写入到DB、缓存本地文件中等等, 把这些不成功的数据记录下来,等错误修复后,再把这些数据发送到broker端。

如何选择

高可用型配置

acks = all,retries > 0 retry.backoff.ms=100(毫秒) (并根据实际情况设置retry可能恢复的间隔时间)  

  • 优点:这样保证了producer端每发送一条消息都要成功,如果不成功并将消息缓存起来,等异常恢复后再次发送。  
  • 缺点:这样保证了高可用,但是这会导致集群的吞吐量不是很高,因为数据发送到broker之后,leader要将数据同步到fllower上,如果网络带宽、不稳定等情况时,ack响应时间会更长

折中型配置:

acks = 1 retries > 0 retries 时间间隔设置 (并根据实际情况设置retries可能恢复的间隔时间)  

  • 优点:保证了消息的可靠性和吞吐量,是个折中的方案  
  • 缺点:性能处于2者中间3.高吞吐型   

高效率配置:

acks = 0  

  • 优点:可以相对容忍一些数据的丢失,吞吐量大,可以接收大量请求  
  • 缺点:不知道发送的消息是 否成功

每种配置都有对应的生产用途,视情况而定。。

二、consumer端如何保证数据不丢失

consumer端配置

1、group.id: consumer group 分组的一个id

消费者隶属消费组的名称,kafka的每个partition值允许同一个group的一个consumer消费。这样做的目的是为了保证kafka的高吞吐量

2、auto.offset.reset = earliest(最早) /latest(最晚)

设置从哪个位置开始消费

3、enable.auto.commit = true/false(默认true)

当设置为true时,意味着由kafka的consumer端自己间隔一定的时间会自动提交
offset,如果设置成了fasle,也就是由客户端(自己写代码)来提交,那就还得控制提交的时间间隔
auto.commit.interval.ms
当enabe.auto.commit设置为true时才生效,表示开启自动提交消费位移功能时自动提交消费位移的时间间隔。

在consumer阶段,如果设置为true,意味着会自动提交offset,比如说当你pull了30条数据,但是当处理20条数据的时候自动提交了commit,当处理21条数据的时候,系统崩了,那当你再去拉取数据的时候,就会从30开始啦,那就会丢失21-30的数据

如果设置为false,可以手动提交,你可以处理一条提交一次,也可以处理一批提交一批,但是consumer在消费数据的时候,是以batch的模式去pull数据的,假设pull了30条数据,你在处理30条数据的时候,没处理一条,就提交一次的话,会非常影响消费能力,你可以还是按照一批来处理,设置一个累加器,处理一条加1,如果在处理数据时发生了异常,那就把当前处理失败的offset进行提交(放在finally代码块中)注意一定要确保offset的正确性,当下次再次消费的时候就可以从提交的offset处进行再次消费。

consumer 保证确保消息只被处理一次处理,同时确保幂等性
需要结合具体的业务来看 :

  • 比如你拿个数据要写库,先根据主键查一下,如果这数据都有了,你就别插入了,update一下好吧
  • 比如你是写redis,那没问题了,反正每次都是set,天然幂等性
  • 比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的id,类似订单id之类的东西,然后你这里消费到了之后,先根据这个id去比如redis里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个id写redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。
  • 还有比如基于数据库的唯一键来保证重复数据不会重复插入多条,拿到数据的时候,每次重启可能会有重复,因为kafka消费者还没来得及提交offset,重复数据拿到了以后我们插入的时候,因为有唯一键约束了,所以重复数据只会插入报错,不会导致数据库中出现脏数据

三、broker端是如何保证数据不丢失的

1.replication-factor 3    

在创建topic时会通过replication-factor来创建副本的个数,它提高了kafka的高可用性,同时,它允许n-1台broker挂掉,设置好合理的副本因子对kafka整体性能是非常有帮助的,通常是3个,极限是5个,如果多了也会影响开销。

2.min.insync.replicas = 2     

分区ISR队列集合中最少有多少个副本,默认值是1 

3.unclean.leader.election.enable = false     

是否允许从ISR队列中选举leader副本,默认值是false,如果设置成true,则可能会造成数据丢失。

leader选举造成的数据丢失

3个replica分别为0 1 2,0为leader,数据都能完全同步到100,在某一时刻,分别有2个fllow挂掉了,此时有producer往0 的replica上发送50条数据完后,此时的leader挂掉了,而此时刚好的1个fllow起来了,它没有向leader上feach数据,因为leader已经不存在了,此时有2种处理方法:重新起来的fllow可以成为1个leader,需要通过 unclean.leader.election.enable=true,这样做保证了高可用,但是这样做的弊端是:新起来的fllow成为了leader,但是它会丢失部分数据,虽然这样保证了高可用。另一种情况是设置为false,不让fllow竞选leader,但是这样也会造成数据的丢失。假如在ISR的队列里面,只有0 1,但此时replica 1 没有来得及向leader feach数据leader挂掉了,这样也会造成数据的丢失。

broker配置策略

  • min.insync.replica

在一个topic中,1个分区 有3个副本,在创建时设置了min.insync.replica=2,假如此时在ISR中只有leader副本(1个)存在,在producer端生产数据时,此时的acks=all,这也就意味着在producer向broker端写数据时,必须保证ISR中指定数量的副本(包含leader、fllow副本)全部同步完成才算写成功,这个数量就是由min.insync.replica来控制的,这样producer端向broker端写数据是不成功,因为ISR中只有leader副本,min.insync.replica要求2个副本,此时的producer生产数据失败(异常),当然consumer端是可以消费数据的,只不过是没有新数据产生而已.这样保证了数据的一致性,但这样会导致高可用性降低了。一般的配置是按: n/2 +1 来配置min.insync.replicas 的数量的,

同时也要将unclean.leader.election.enable=false

  • unclean.leader.election.enable

假如现在有leader 0 fllow 1 fllow 2 三个副本,存储的数据量分别是10 9 8,此时的broker的配置是:min.insync.replica=2 acks=all,leader的数据更新到了15,在没有同步到fllow 1 fllow 2时挂掉了,此时的ISR队列中是有fllow 1 和fllow 2的,如果unclean.leader.election.enable设置的是true,表示在ISR中的副本是可以竞选leader这样就会造成9-15或8-15之间的数据丢失,所以unclean.leader.election.enable必须设置成成false,这样整个kafka cluster都不读写了,这样就保证了数据的高度一致性.

kafka中topic设计原理

因为consumer group 中所有的consumer一定会消费topic中的partition,而一个partition只能同时被同一group中的一个consumer消费;

所以最优的设计就是:

  • consumer group下的consumer thread的数量等于partition数量,这样效率是最高的。
  • 一个Topic的Partition数量大于等于Broker的数量,可以提高吞吐率

参考文章

https://www.cnblogs.com/MrRightZhao/p/11498952.html

kafka控制台模拟消费_Kafka 详解相关推荐

  1. kafka控制台模拟消费_Flink初试——对接Kafka

    本篇文章我们用 Flink Kafka Connector对接Kafka,实现一个简单的报警业务.我们暂时不去谈论理论,先上手实现这个简单的需求. flink-connector-kafka是 fli ...

  2. Kafka生成者/消费组详解

    什么是 Kafka Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区.多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系 ...

  3. python流获取控制台_对Python捕获控制台输出流的方法详解

    有时候我们的代码里可能要调用控制台命令,比如我想用Python写一个批量编译 .java 文件的脚本,用到如下代码 常规用法 os.system import os,traceback try: p ...

  4. php curl 模拟多线程,php利用curl 多线程 模拟 并发的详解

    php利用curl 多线程 模拟 并发的详解 发布于 2014-12-07 10:17:25 | 265 次阅读 | 评论: 0 | 来源: 网友投递 PHP开源脚本语言PHP(外文名: Hypert ...

  5. 模拟二进制交叉算子详解

    一起来学演化计算-SBX(Simulated binary crossover)模拟二进制交叉算子详解 觉得有用的话,欢迎一起讨论相互学习~ 参考文献 衷心感谢武汉科技大学张凯教授的精心培育和指导 以 ...

  6. 单片机电流检测电路图大全(四款模拟电路设计原理图详解) - 信号处理电子电路图

    电路图简介: 本文主要介绍了单片机电流检测电路图大全(四款模拟电路设计原理图详解).它的主要功能是完成对过电压的瞬时值和峰值的检测.过电压次数的检测.电源输出电压和电流的检测,并通过键盘的操作显示出各 ...

  7. 三菱模块增益和偏置调整步骤_三菱PLC与西门子plc的模拟量编程详解

    三菱PLC与西门子plc的模拟量编程详解 Date:2015-01-01 origin:RCCN上海日成 Visit:2413 三菱和西门子的PLC都有自己的独特的长处的,先讲讲三菱2AD模块吧,2A ...

  8. 【kafka】Kafka消费者分区分配策略详解

    文章目录 1.概述 2.RoundRobinAssignor详解 3.RangeAssignor详解 4.StickyAssignor详解 5.CooperativeStickyAssignor详解 ...

  9. kafka 同步提交 异步_详解Kafka设计架构核心——Kafka副本机制详解

    所谓的副本机制(Replication),也可以称之为备份机制,通常是指分布式系统在多台网络互联的机器上保存有相同的数据拷贝.副本机制有什么好处呢? 1. 提供数据冗余.即使系统部分组件失效,系统依然 ...

最新文章

  1. crontab 各参数详解及如何查看日志记录
  2. install tabix/bgzip
  3. 汉语自然语言处理工具包下载
  4. Codeforces Round #548 (Div. 2) A. Even Substrings
  5. 阿联酋esma认证怎么做_行业视野 | 关于阿联酋eCall 的要求更新
  6. OJ1057: 素数判定(C语言经典列题,判断变量的应用)
  7. VS2010 Resource view为空的解决办法
  8. 十分钟学习python_Python学习笔记一:十分钟入门
  9. POJ 1185 炮兵阵地(动态规划+状态压缩)
  10. numpy : numpy.random
  11. ccs中如何插入字体
  12. MATLAB中SVM(支持向量机)的用法
  13. 计算机网络——网络安全
  14. 专业绘图(Visio 2016)实战视频课程-专题视频课程
  15. TVS防护电路的典型应用
  16. 优化你简历的8个技巧
  17. 使用UltraEdit编辑器之HelloWorld的实现
  18. 传奇地图事件触发脚本
  19. Springboot with Impala and Kudu
  20. TCP: too many of orphaned sockets报错解决

热门文章

  1. prometheus-net.DotNetRuntime 获取 CLR 指标原理解析
  2. 回顾 | 进击吧! Blazor !第三期 信息交互
  3. dotNET:怎样处理程序中的异常(理论篇)?
  4. 【半译】扩展shutdown超时设置以保证IHostedService正常关闭
  5. 什么?原来C#还有这两个关键字
  6. Consul初探-集成ocelot
  7. 【翻译】无需安装Python,就可以在.NET里调用Python库
  8. CanalSharp.AspNetCore v0.0.4-支持输出到MongoDB
  9. 微软MVP张善友告诉你,微服务选型要注意这些地方
  10. 在Windows 下如何使用 AspNetCore Api 和 consul