目录

  • 掌握Kafka集群部署
  • 了解Kafka集群高层架构
  • 掌握Kafka集群多项核心特性

Kafka集群部署

  • Kafka天然支持集群
  • Kafka集群依赖于Zookeeper进行协调
  • Kafka主要通过brokerId区分不同节点
  • 复制kafka副本
    [root@localhost install]# mv kafka_2.11-2.4.0 kafka_1/
    [root@localhost install]# cp -r kafka_1/ kafka_2/
    [root@localhost install]# cp -r kafka_1/ kafka_3/
    
  • 修改配置文件
    分别修改kafka2、kafka3中的配置文件,端口号9093、9094,日志文件kafka-logs-1、kafka-logs-2

    [root@localhost config]# vim server.properties
    


  • 启动三个kafka进程
    [root@localhost install]# bin/kafka-server-start.sh config/server.properties
    

Kafka副本集

  • kafka副本集是指将日志复制多份
  • kafka可以为每个Topic设置副本集
  • kafka可以通过配置设置默认副本集数量
public static void createTopic(){AdminClient adminClient = adminClient();//副本集因子,这里我们创建三个副本集Short rs = 3;NewTopic newTopic = new NewTopic(TOPIC_NAME,1,rs);CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(newTopic));System.out.println(topics);}
  • Broker:kafka部署的节点
  • Leader:用于处理消息的接受和消息的消费等请求
  • Follower:主要用于备份消息数据

Kafka节点故障

  • Kafka与zookeeper心跳未保持视为节点故障
  • follower消息落后leader太多也视为节点故障
  • kafka会对故障节点进行移除

Kafka节点故障处理

  • Kafka基本不会因为节点故障而丢失数据
  • Kafka语义担保也很大程度上避免数据丢失
  • Kafka会对消息进行集群内平衡,减少消息在某些节点热度过高

Kafka集群之Leader选举

  • kafka并没有采用多数投票来选举leader
  • kafka会动态维护一组leader数据副本(ISR)
  • kafka会在ISR中选择一个速度比较快的设为leader

Kafka集群监控

  • Kafka只能依靠Kafka-run-class.sh等命令进行管理
  • Kafka Manager是目前比较常见的监控工具

Kafka Manager

安装参考地址如下:Kafka集群管理工具kafka-manager安装使用

Kafka面试题

kafka常见应用场景(你们公司实际生产环境如何使用kafka)

  • 日志收集或流式系统: 微信小程序收集用户调查文件,推到kafka里,结合后面的数据平台做分析
  • 消息系统:对消息的有序性要求并不是特别高
  • 用户活动跟踪或运营指标监控:淘宝京东对用户浏览商品做用户画像,对用户感兴趣的信息通过数据平台做精准投放或者推荐。

kafka与其他消息中间件的异同点(为什么使用kafka)

  • kafka概念:分布式流处理平台,跟大数据结合的相对紧密一些,因为大数据中本身就有很多流处理平台,包括storm、spark、flink
  • kafka特性一:提供发布订阅及Topic支持
  • kafka特性二:具有分布式的特性,也就是消息分区的概念。一个Topic下有多个partition,每个Consumer和partition是一一对应的。比如三个partition十个Consumer很多consumer是浪费的。吞吐量高但不保证消息有序,只能保证topic中的某一个partition消息有序,topic中的不同partition不保证消息有序。kafka提供了offset的管理,其他消息中间件消息消费完了就没了,kafka由于日志管理,通过offset在日志里检索消息并消费,可以通过offset消费已经消费过的内容。
  • kafka特性三:主要是基于pull模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。
  • 0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务,几百万上千万丢失几条数据是没有什么所谓,如果想消息一条不丢,也可以实现,但是吞吐量就会下降,因此选择就会容忍极少量数据丢失。
  • 支持跨平台特性,支持java、Python,对于异构的系统支持比好,数据堆积上亿只要数据存储没问题,不会影响kafka的性能。比如ES的集群磁盘已经满了,导致ES做限流,所有数据堆积到kafka,几十亿的数据都堆积到kafka依然不影响kafka的消费能力。
  • 可伸缩,支持水平扩展。

kafka吞吐量为什么大速度快:

  • 日志顺序读写和快速检索

顺序写:顺序写盘,提高磁盘的利用率,比如consumer通过offset顺序的消费数据而不删除已经消费的数据,避免磁盘随机写的过程,MQ的数据一定要记录一个位置offset,offset被consumer消费并记录当前offset,根据当前记录的offset找下一个offset,只有这么过程才能充分利用磁盘利用率。如果删除的话,会产生偏移。所以一般MQ是不允许删除消息的,而阿里云上的MQ允许删除消息其实是做的逻辑删除不是物理删除,对删除的数据打标记,再做转储。

  • Partition机制可以并行
  • 提供批量发送接收及数据压缩机制
  • Page Cache空中接力,通过sendfile实现零拷贝原则

在廉价的单机上都能支持每秒100k的以上吞吐量,并借用Linux内核的Page Cache,不显示用内存而胜似内存,如果生产者和消费者的速度相当,甚至都不需要物理级别的磁盘去做数据交换,直接读page cache,即使重启kafka,你的page cache依然不影响。
page cache:是操作系统实现的主要的磁盘缓存,目的减少对磁盘的io的操作,避免对磁盘io的频繁操作对影响对操作系统的性能。具体来说就是把磁盘中的数据缓存到内存当中,把对磁盘的访问变成对内存的访问。比如高并发项目的早期会读关系型数据库,压力大了会分库分表,要是还是扛不住,就使用远程存储比如redis,对于一些访问量非常巨大的入口如何抗住,可能redis都扛不住,这时候会借力内存,但是都存到javajvm内存还会影响java程序的性能,然后还会把数据放到堆外面存储。
磁盘文件读写流程:当一个进程准备读取磁盘上的文件的时候,操作首先不是直接读磁盘文件,首先做检查,将待读取的数据所在的页是否在page cache中是否存在,如果存在就命中直接把数据返回从而避免了对物理磁盘的io操作。如果没有命中就真正向物理磁盘发起一次io操作,并且将读取的数据先加入到缓存页中然后再返回给进程。很多都是这种操作,比如数据库当你select查询,第一次查询性能会慢一点,第二次重复再执行这条语句就变快了,其实数据库帮你做了cache操作。如果一个文件写到磁盘中,操作系统也会检查数据对应的页是否存在缓存中,如果不存在就在缓存页中添加响应的页,把数据写到页里,被修改的数据就变成脏页了,操作会在合适的时间将脏页的数据刷到磁盘里保证数据的一致性。

如果将磁盘的文件读到应用程序内存中,再通过应用程序再写回到另外的一个应用程序。
1.操作系统会把物理磁盘的文件写到内核读取缓冲区,这是操作系统级别的内核空间上下文。
2.从应用程序的上下文的用户缓冲区读取内核读取缓冲区(内核空间上下文)的文件数据,这时应用程序中就有了,程序中可以打印数据了。
3.假设数据远程传到另一个应用程序中,把数据再写入操作系统级别的os的内核空间缓冲区(内核空间上下文),再传到socket缓冲区,再到达实际的物理网口,通过网络到达消费者的进程。
经历好几次copy,第一次copy到内核中,用户缓冲区又copy一次,内核空间缓冲区又copy一次,socket缓冲区又copy一次,传统的文件读取使用了多次copy。而kafka使用零拷贝技术。

零拷贝:在kafka中会经常的大量的使用page cache的零拷贝技术用来提升性能和吞吐量,消费者在读取服务端的数据的时候,需要将服务端的磁盘文件的数据读取出来,服务端的磁盘文件通过网络发送到消费者进程,生产者发布的消息会被不同消费者多次消费,为了优化这个流程kafka就是用零拷贝。虽然消息是被写到缓存页中,然后由操作系统负责具体的刷盘策略和任务。
零拷贝是不和应用程序上下文做切换和关联的,应用程序完全不做任何的copy。磁盘文件直接在用户的内核空间上下文做一次copy。只是将磁盘文件直接复制到内核读取缓冲区也就是page cache中,然后将数据中从内核读取缓冲区page cache直接发送到网卡中,发送给不同的订阅者都可以使用同一个页面缓存避免重复制

kafka底层原理之日志

Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Message。如图看topic和partition的关系。

partition是以文件的形式存储在文件系统中,比如,创建了一个名为page_visits的topic,其有5个partition,那么在Kafka的数据目录中(由配置文件中的log.dirs指定的)中就有这样5个目录: page_visits-0, page_visits-1,page_visits-2,page_visits-3,page_visits-4,其命名规则为<topic_name>-<partition_id>,里面存储的分别就是这5个partition的数据。

  • Partition的数据文件
    Partition中的每条Message由offset来表示它在这个partition中的偏移量,这个offset不是该Message在partition数据文件中的实际存储位置,而是逻辑上一个值,它唯一确定了partition中的一条Message。因此,可以认为offset是partition中Message的id。partition中的每条Message包含了以下三个属性:
    + offset
    + MessageSize
    + data
    其中offset为long型,MessageSize为int32,表示data有多大,data为message的具体内容。它的格式和Kafka通讯协议中介绍的MessageSet格式是一致。
    Partition的数据文件则包含了若干条上述格式的Message,按offset由小到大排列在一起。它的实现类为FileMessageSet,类图如下:

    它的主要方法如下:
    + append: 把给定的ByteBufferMessageSet中的Message写入到这个数据文件中。
    + searchFor: 从指定的startingPosition开始搜索找到第一个Message其offset是大于或者等于指定的offset,并返回其在文件中的位置Position。它的实现方式是从startingPosition开始读取12个字节,分别是当前MessageSet的offset和size。如果当前offset小于指定的offset,那么将position向后移动LogOverHead+MessageSize(其中LogOverHead为offset+messagesize,为12个字节)。
    + read:准确名字应该是slice,它截取其中一部分返回一个新的FileMessageSet。它不保证截取的位置数据的完整性。
    + sizeInBytes: 表示这个FileMessageSet占有了多少字节的空间。
    + truncateTo: 把这个文件截断,这个方法不保证截断位置的Message的完整性。
    + readInto: 从指定的相对位置开始把文件的内容读取到对应的ByteBuffer中。
    我们来思考一下,如果一个partition只有一个数据文件会怎么样?
    + 新数据是添加在文件末尾(调用FileMessageSet的append方法),不论文件数据文件有多大,这个操作永远都是O(1)的。
    + 查找某个offset的Message(调用FileMessageSet的searchFor方法)是顺序查找的。因此,如果数据文件很大的话,查找的效率就低。
  • 数据文件的分段
    Kafka解决查询效率的手段之一是将数据文件分段,比如有100条Message,它们的offset是从0到99。假设将数据文件分成5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中最小的offset命名。这样在查找指定offset的Message的时候,用二分查找就可以定位到该Message在哪个段中。
  • 为数据文件建索引
    数据文件分段使得可以在一个较小的数据文件中查找对应offset的Message了,但是这依然需要顺序扫描才能找到对应offset的Message。为了进一步提高查找的效率,Kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。
    索引文件中包含若干个索引条目,每个条目表示数据文件中一条Message的索引。索引包含两个部分(均为4个字节的数字),分别为相对offset和position。
  • 相对offset:因为数据文件分段以后,每个数据文件的起始offset不为0,相对offset表示这条Message相对于其所属数据文件中最小的offset的大小。举例,分段后的一个数据文件的offset是从20开始,那么offset为25的Message在index文件中的相对offset就是25-20 = 5。存储相对offset可以减小索引文件占用的空间。
  • position,表示该条Message在数据文件中的绝对位置。只要打开文件并移动文件指针到这个position就可以读取对应的Message了。
    index文件中并没有为数据文件中的每条Message建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。但缺点是没有建立索引的Message也不能一次定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫描的范围就很小了。在Kafka中,索引文件的实现类为OffsetIndex,它的类图如下:

    主要的方法有:
    append方法,添加一对offset和position到index文件中,这里的offset将会被转成相对的offset。
    lookup, 用二分查找的方式去查找小于或等于给定offset的最大的那个offset

总结一下Message是如何在Kafka中存储的,以及如何查找指定offset的Message的。
Message是按照topic来组织,每个topic可以分成多个的partition,比如:有5个partition的名为为page_visits的topic的目录结构为:

partition是分段的,每个段叫LogSegment,包括了一个数据文件和一个索引文件,下图是某个partition目录下的文件:

可以看到,这个partition有4个LogSegment。

查找Message原理图:

比如:要查找绝对offset为7的Message:
首先是用二分查找确定它是在哪个LogSegment中,自然是在第一个Segment中。
打开这个Segment的index文件,也是用二分查找找到offset小于或者等于指定offset的索引条目中最大的那个offset。自然offset为6的那个索引是我们要找的,通过索引文件我们知道offset为6的Message在数据文件中的位置为9807。
打开数据文件,从位置为9807的那个地方开始顺序扫描直到找到offset为7的那条Message。
这套机制是建立在offset是有序的。索引文件被映射到内存中,所以查找的速度还是很快的。
一句话,Kafka的Message存储采用了分区(partition),分段(LogSegment)和稀疏索引这几个手段来达到了高效性。

  • Kafka的日志是以Partition为单位进行保存
  • 日志目录格式为Topic名称+数字
  • 日志文件格式是一个“日志条目”序列
  • 每条日志消息由4字节整形与N字节消息组成
  • 日志分段
    • 每个Partition的日志会分为N个大小相等的segment中
    • 每个segment中的消息数量不一定相等
    • 每个Partition只支持顺序读写
  • segment存储结构
    • Partition会将消息添加到最后一个segment上
    • 当segment达到一定阈值会flush到磁盘上
    • segment文件分为两个部分:index和log文件
  • 日志读操作
    • 首先需要在存储的数据中找出segment文件
    • 通过全局的offset计算出segment中的offset
    • 通过index中的offset寻找具体数据内容
  • 日志写操作
    • 日志允许串行的追加消息到文件最后
    • 当日志文件达到阈值则滚动到新文件上

kafka学习七:kafka之集群篇相关推荐

  1. 搭建高吞吐量 Kafka 分布式发布订阅消息 集群

    搭建高吞吐量 Kafka 分布式发布订阅消息 集群 简介 Kafka 是一种高吞吐的分布式发布订阅消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区. ...

  2. EFK6.3+kafka+logstash日志分析平台集群

    转载来源 :EFK6.3+kafka+logstash日志分析平台集群 :https://www.jianshu.com/p/f956ebbb2499 架构解读 : 第一层.数据采集层 安装fileb ...

  3. Kafka SASL/SCRAM动态认证集群部署

    Kafka SASL/SCRAM动态认证集群部署 目的:配置SASL/PLAIN验证,实现了对Kafka的权限控制.但SASL/PLAIN验证有一个问题:只能在JAAS文件KafkaServer中配置 ...

  4. 高可用集群篇(五)-- K8S部署微服务

    高可用集群篇(五)-- K8S部署微服务 一.K8S有状态服务 1.1 什么是有状态服务 1.2 k8s部署MySQL 1.2.1 创建MySQL主从服务 1.2.2 测试主从配置 1.2.3 k8s ...

  5. [原创]分布式系统之缓存的微观应用经验谈(三)【数据分片和集群篇】

    分布式系统之缓存的微观应用经验谈(三)[数据分片和集群篇] 前言 近几个月一直在忙些琐事,几乎年后都没怎么闲过.忙忙碌碌中就进入了2018年的秋天了,不得不感叹时间总是如白驹过隙,也不知道收获了什么和 ...

  6. Hadoop学习笔记—13.分布式集群中节点的动态添加与下架

    Hadoop学习笔记-13.分布式集群中节点的动态添加与下架 开篇:在本笔记系列的第一篇中,我们介绍了如何搭建伪分布与分布模式的Hadoop集群.现在,我们来了解一下在一个Hadoop分布式集群中,如 ...

  7. ZooKeeper学习笔记(八):ZooKeeper集群写数据原理

    写数据原理 写流程直接请求发送给Leader节点 这里假设集群中有三个zookeeper服务端 ACK (Acknowledge character)即是确认字符,在数据通信中,接收站发给发送站的一种 ...

  8. Redis 学习笔记八:集群模式

    Redis 学习笔记八:集群模式 作者:Grey 原文地址: 博客园:Redis 学习笔记八:集群模式 CSDN:Redis 学习笔记八:集群模式 前面提到的Redis 学习笔记七:主从复制和哨兵只能 ...

  9. Kafka学习笔记——Kafka原理与使用详解

    Kafka 是一个消息系统,原本开发自 LinkedIn,用作 LinkedIn 的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础.现在它已被多家公司作为多种类型 ...

  10. RabbitMQ 集群篇

    RabbitMQ 集群篇 00.集群架构原理 前面我们有介绍到 RabbitMQ 内部有各种基础构件,包括队列.交换器.绑定.虚拟主机等,他们组成了 AMQP 协议消息通信的基础,而这些构件以元数据的 ...

最新文章

  1. 01-01java概述 doc命令、jdk\jre下载安装、path、classpath配置、开发中常见小问题
  2. 临河智慧城管:让城市更和谐
  3. echarts常用方法,legend状态支持两张图片切换(四)
  4. Lesson2 Hello,GLSL
  5. CMM (软件工程与集成产品开发)
  6. [Cocoa]深入浅出 Cocoa 之 Core Data(2)- 手动编写代码
  7. 20200119:(leetcode)回文数(3种解法)
  8. 如何加声调口诀_拼音标声调的规则口诀
  9. java安卓字体_Android中添加外部字体库和竖直排列字体
  10. VBA小模板,跨表统计的2种写法
  11. 在EXCEL表格中经常会遇到有合并单元格时,汇总计算的公式无法直接下拉自动填充计算,掌握这个小技巧一键汇总
  12. 快来天津科技大学找我玩
  13. 如何使用方位X210来查看海康威视IP摄像头
  14. 基于随机森林、svm、CNN机器学习的风控欺诈识别模型
  15. Mutation Observer API
  16. 一生应该必看的20个故事
  17. 社工要掌握哪些计算机基本操作,【作为一名专业社工哪些方面需要注意】- 环球网校...
  18. html5静止手机旋转
  19. 硬盘数据如何恢复?电脑硬盘资料恢复,方法就是这么简单!
  20. 关于计算机实践创新的名言,关于创新与实践名人名言集锦

热门文章

  1. Ubuntu下shift键失灵解决办法
  2. legacy引导gpt分区_装系统用什么分区?gpt分区及mbr分区详细教程(附带bios设置)
  3. 2、Docker部署的Onlyoffice中文字体修改
  4. Toast基本使用方法
  5. tushare 获取复权数据
  6. 网易云登陆界面怎么用PHP做,网易云音乐登录流程图
  7. 如何用Matlab求不定积分
  8. java drm_DRM系统工作原理
  9. 电路交换与分组交换技术
  10. 公关广告策略分析:如何结合广告的推力和公关的拉力