一、kafka基本介绍

1概念:是一个分布式的基于发布/订阅模式的消息队列,应用于大数据实时处理

1.消息队列(topic):

优点:解耦 可恢复性 缓冲 削峰 异步通信

两种模式:

点对点模式:一对一,消费者主动拉去数据,消息收到后消息清除

发布/订阅模式:一对多、消费者消费数据后不会清除数据

二、kafka架构

kafka cluster:kafka集群管理消息

broker:一台kafka服务器就是一个broker

topic:消息队列主题

partition:分区

1.kafka基础架构:

1).kafka架构中涉及到kafka集群(多个broker)生产者(生产消息)、消费者(消费消息)、zookeeper(注册消息)

2).kafka集群

kafka集群由多个broker组成,每个broker都有一个唯一id

kafka内部维护topics,每个topic可以有多个分区(partition),每个分区可以有多个副本(replication)

一个topic的多个分区可以存在到一个broker

一个topic的一个分区的多个副本必须在不同的broker

kafka中所有的读和写都是由leader负责

3).生产者:

生产者的主要作用就是面向topic生产数据,

4).消费者

消费者主要是以消费者组的名义面向topic进行消息的消费

一个消费者组中的一个消费者可以同时消费一个topic主题中的多个分区的数据

一个topic主题中的一个分区只能被一个消费者组中的一个消费者消费

消费者在消费数据的过程中需要实时记录offset(消费的位置),记录的方式为:group+topic+partition

5).zookeeper

zookeeper 主要作用是让kafka去注册消息,例如,每个broker启动后会在zookeeper中注册,并产生出controller

在0.9版本之前,消费者维护的offset存储在zookeeper中

在0.9版本以后,消费者维护的offset存储在kafka本地

2.kafka启停脚本

脚本的使用: Welcome to nginx! start | stop

#!/bin/bashif [ $# -lt 1 ]
thenecho "usage: Welcome to nginx! {start | stop}"exit
ficase $1 in
start)bash /home/atguigu/bin/zk.sh $1sleep 5000for i in hadoop102 hadoop103 hadoop104doecho "==================> start $i kafka <======================"ssh $i /opt/module/kafka-2.4.1/bin/kafka-server-start.sh -daemon /opt/module/kafka-2.4.1/config/server.propertiesdone
;;
stop)bash /home/atguigu/bin/zk.sh $1sleep 5000for i in hadoop102 hadoop103 hadoop104doecho "==================> stop $i kafka <======================"ssh $i /opt/module/kafka-2.4.1/bin/kafka-server-stop.sh stopdone
;;*)echo "input args error..."exit
;;
esac

3.kafka命令行操作

1)查看当前服务器中所有的topic

--bootstrap-server == --zookepper 注意后面的端口号不一样

kafka-topics.sh --bootstrap-server hadoop102:9092 --list
kafka-topics.sh --zookepper hadoop102:2181 --list

2)创建topic

kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic t_name replication-factor 3 partition 2 

3)查看某个topic的详情

kafka-topic.sh --bootstrap-server hadoop102:9092 --describe --topic t_name

4)修改分区数

kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic t_name --partitions 2

5)删除topic

kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic t_name

6)发送消息

kafka-console-producer.sh --broker-list hadoop102:9092 --topic t_name

7)消费消息

消费连接topic后接收的数据,前面已有的不消费

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic t_name

消费所有topic队列中的数据,前面已有的按分区消费读取

kafka-console_consumer.sh --bootstrap-server hadoop102:9092 --topic t_name --from-beginning

8)消费者组

通过修改配置文件创建 consumer.properties

group.id=mygroup

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic t_name  --consumer.config config/consumer.properties

通过命令行创建

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic t_name --group group_name

三、kafka架构深入

1.工作流程及文件存储机制

1)工作流程:

kafak是基于生产者-消费者模型来工作的,生产者生产消息,消费者消费消息,而两者不直接连接,通过一个阻塞消息队列topic来进行联系

2)文件存储机制:

topic只是个逻辑上的概念,每个topic可以有多个分区partition,真正在磁盘上存储是以partition来进行存储的,每个partition对应一个log文件,producer生产的数据会不断追加到该log文件末端,而且每条数据都有自己的offset(group+topic+partition),实际上为了防止该log文件过大,每个partition又会分片为多个segment,每个segment对应两个文件——“.index”文件和“.log”文件。

注意:每个log文件的文件名,是以offset+1来命名的

2.生产者

1)分区策略

分区原因:分区是为了能够在集群中横向扩展

提高并发,提高吞吐量

2)分区原则

将producer发送的数据分装成一个ProducerRecord对象。

(1) 指明 partition 的情况下,直接将指明的值直接作为 partiton 值;

(2) 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;

(3) 既没有 partition 值又没有 key 值的情况下, kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,kafka再随机一个分区进行使用.

3)数据可靠性的保证

  • 生产者发送到topic partition的可靠性保证:

生产者向topic发送数据,topic收到数据后会返回一个ack(acknowledgement 确认收到)producer收到ack后则进行下一轮的发送,否则重新发送数据

  • topic partition存储数据的可靠性

kafka采用的同步策略为当全部的flower完成数据同步后集群才返回ack,但是如果有一个follower出现了故障leader也不能一直等下去啊,为了解决这个问题,kafka让leader维护了一个动态的ISR(in-sync-replica set),是一个和leader保持同步的follower的集合,当次集合里面的所有follower完成同步则leader给producer发送ack,如果follower出现故障不能向leader同步数据,则该follower会被踢出ISR,如果leader发生了故障,就会在ISR中重新选举出新的leader

  • leader是什么时候向producer发送ack的呢?

有三种情况,这与ack的参数配置有关

acks=0 :leader接收到数据但未写入磁盘就返回ack,此时leader故障可能会丢失数据

acks=1:leader落盘成功返回ack,但follower还未同步,此时leader故障会丢失数据

acks=-1:leader和follower都落盘成功返回ack,但在发送前leader故障则ack未被接收,producer会重新发送数据,会造成数据重复

  • leader和follower怎么同步数据的呢?

首先要明白两个概念:

LEO:指的是每个副本最大的offset

HW:指的是消费者能见到的最大的offset,ISR队列中最小的LEO

leader中的LEO肯定是最大的,follower向leader同步数据,而在HW之前的数据才是向消费者可见的,这是因为HW之前的数据才是leader真正ack响应后的数据

当follower发生故障时,则就会被临时踢出ISR中,待恢复后,则follower会读取本地磁盘里记录的HW值,并将log文件高于HW的部分去掉,从HW开始向leader进行同步数据,当该follower的LEO的值大于该partition的HW时,则重新加入ISR中

当leader发生故障时,会从ISR中选出一个新的leader,此时leader和follower都会先将各自log文件中高于HW的部分去掉,然后等待向leader同步数据

  • Exactly Once = At Least Once + 幂等性

保证数据的不重不漏

幂等性:无论操作了多少次,其结果都与一次操作相同

通过PID来保证其数据幂等性

3.消费者

1)消费方式

push(推)模式很难适应不同消费速率的消费者,所有kafka采用的是pull(拉)模式从broker中读取数据,但是pull实际是一个长轮询的过程,可以传入一个timeout参数来避免空转

2)分区分配策略

主要问题是哪个partition被哪个consumer消费的问题

三种分配策略:

RoundRobin:根据consumer的数量把分区partition轮询方式分配,当consumer或partition有数量上的变动时则所有数据进行重新分配

Range:根据consumer的订阅来进行分配,当订阅相同,则根据consumer的数据分成连续几份进行分配

Sticky:首次跟RoundRobin分配方式相同,当有consumer数据变化时,只需要将此consumer对应的分区进行分配即可

3)offset的维护

Kafka 0.9版本之前,consumer默认将offset保存在Zookeeper中,从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets。

4)kafka为什么能高效读写?

  • 顺序写磁盘:kafka写的过程都是追加写到文件末尾,顺序写,省去了大量的磁头寻址的时间
  • 应用Pagecache:页缓存将小块写组装成大块写,并在页缓存中进行简单的排序处理,读取数据是如果页缓存中有则直接读取,大大节省了读写时间
  • 零复制技术:拷贝过程由磁盘到内核态后直接写入到目的磁盘,不再进入用户态,提高了效率

5)zookepper在kafka中的作用

当kafka刚启动时broker都会向zookeeper去注册,争抢选举出Controller,再有Controller负责管理topic的分区分配和leader选举等工作

6)kafka的事务

producer端事务:引入全局唯一TransactionID与Producer的PID进行绑定

consumer端事务:借助于其他支持事务的框架来实现

kafka修改分区数_Kafka笔记相关推荐

  1. kafka 修改分区_kafka修改分区和副本数

    kafka修改分区和副本数 查看现在副本分配情况 ../bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic test1 ...

  2. kafka 修改分区_kafka的分区数设置

    越多的分区可以提供更高的吞吐量 首先我们需要明白以下事实:在kafka中,单个patition是kafka并行操作的最小单元.在producer和broker端,向每一个分区写入数据是可以完全并行化的 ...

  3. kafka 修改分区_Kafka动态调整topic分区partition - russle的专栏 - CSDN博客

    我们在使用kafka时,初期创建时所指定topic属性需要修改,如何动态修改kafka属性?kafka提供了命令行工具-kafka-topics.sh. kafka-topics.sh工具介绍 kaf ...

  4. kafka修改分区数_大数据技术:解析SparkStreaming和Kafka集成的两种方式

    Spark Streaming是基于微批处理的流式计算引擎,通常是利用Spark Core或者Spark Core与Spark Sql一起来处理数据.在企业实时处理架构中,通常将Spark Strea ...

  5. kafka修改分区数_ELK|kafka增加分区或调整副本数

    Sommaire de cet article : kafak管理常用的工具是kafka-manager和kakfa-eagle.这两个工具在分区管理上都是只管加不管减,副本调整的话,也不支持,只能自 ...

  6. kafka 脚本发送_Kafka笔记归纳(第五部分:一致性保证,消息重复消费场景及解决方式)...

    写在开头: 本章是Kafka学习归纳第五部分,着重于强调Kafka的事一致性保证,消息重复消费场景及解决方式,记录偏移量的主题,延时队列的知识点. 文章内容输出来源:拉勾教育大数据高薪训练营. 一致性 ...

  7. kafka 基础概念、命令行操作(查看所有topic、创建topic、删除topic、查看某个Topic的详情、修改分区数、发送消息、消费消息、 查看消费者组 、更新消费者的偏移位置)

    文章目录 前言 1. 基础概念 Broker Producer Consumer Consumer Group Topic Partition Replica 2. 命令行操作 2.1 查看所有top ...

  8. 《Apache Kafka 实战》读书笔记-认识Apache Kafka

    <Apache Kafka 实战>读书笔记-认识Apache Kafka 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 一.kafka概要设计 kafka在设计初衷就是 ...

  9. Kafka入门篇学习笔记整理

    Kafka入门篇学习笔记整理 Kafka是什么 Kafka的特性 应用场景 Kafka的安装 单机版部署 集群部署环境准备 Kafka 2.x集群部署 Kafka 3.x集群部署 监听器和内外网络 K ...

最新文章

  1. python内置方法就是内置函数_python内置函数
  2. 手机经常提示找不到服务器,经常出现找不到服务器是什么原因?什么网也打 – 手机爱问...
  3. 关于Swift4.0 Method Swizzling(iOS的hook机制)使用
  4. AngularJS+Satellizer+Node.js+MongoDB-Instagram-20
  5. 一致性算法 - Raft
  6. Fence Repair
  7. Masonry 原理与使用说明
  8. 利用python爬虫(part4)--requests模块之requests.get方法
  9. 大页内存的使用:HugePages(大内存页)的原理与使用
  10. VS IED 自己开发小插件
  11. 利用Adobe AIR本地扩展支持Android开发
  12. hdu 3461 Code Lock(并查集)2010 ACM-ICPC Multi-University Training Contest(3)
  13. oracle试图怎么使用,oracle 视图的介绍和使用
  14. 牛股轮回另类可能:未来的牛股在哪?
  15. <第六、七周>新店日记,shopee怎么怎么开广告?怎么定价比较合理?
  16. Qt for winrt结合Winrt API开发
  17. 2023.02.14草图大师 卧室房间 效果图
  18. 科技巨头纷纷发力AI,智能硬件已来临,变现还会远吗?
  19. python 根据身份证号计算年龄和性别_excel如何根据身份证号计算男女出生日期、性别和年龄?分享了!...
  20. JAVA 面试知识点(个人总结)

热门文章

  1. JS存取Cookies值
  2. IOS调用WCF提供的服务方法,但是方法的参数是WCF那边自定义的对象,这样有办法调用么,如果可以IOS应该怎么传参呢?请问有了解的么,...
  3. 从Web借鉴UI设计
  4. Android开发四 开发第一个Android应用
  5. 解决python调用TensorFlow时出现FutureWarning: Passing (type, 1) or '1type' as a synonym of type is deprecate
  6. 在Sublime Text 3上安装代码格式化插件CodeFormatter
  7. Linux之du df free:du文件大小 df分区使用 free内存
  8. TP查询数据库多维数组
  9. PHP获取今天, 本周 ,半月 ,本月 ,本季 ,本年,昨天 ,上月时间段
  10. tp5 上传文件乱码问题