一、kafka详解

安装包下载地址:https://download.csdn.net/download/weixin_45894220/87020758

1.1Kafka是什么?

1、Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目,该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。
2、Kafka是一个分布式消息队列:生产者、消费者的功能。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。
3、Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性

1.2kakfa与zookeeper的关系?

1、所有Broker的管理,broker 会向 zookeeper
发送心跳请求来上报自己的状态。体现在zookeeper上会有一个专门用来Broker服务器列表记录的点,节点路径为/brokers/ids
2、zookeeper 保存了 topic 相关配置,例如 topic 列表、每个 topic 的 partition数量、副本的位置等等。
3、kafka 集群中有一个或多个broker,其中有一个通过zookeeper选举为leader控制器。控制器负责管理整个集群所有分区和副本的状态,例如某个分区的 leader 故障了,控制器会选举新的 leader。

1.3常见的消息队列

我们知道常见的消息系统有Kafka、RabbitMQ、ActiveMQ等等,但是这些消息系统中所使用的消息模式如下两种:

Peer-to-Peer (Queue)
简称PTP队列模式,也可以理解为点到点。例如单发邮件,我发送一封邮件给小徐,我发送过之后邮件会保存在服务器的云端,当小徐打开邮件客户端并且成功连接云端服务器后,可以自动接收邮件或者手动接收邮件到本地,当服务器云端的邮件被小徐消费过之后,云端就不再存储(这根据邮件服务器的配置方式而定)。

名词解释:

Producer=生产者
Queue=队列
Consumer=消费者

Peer-to-Peer模式工作原理:

1、消息生产者Producer1生产消息到Queue,然后Consumer1从Queue中取出并且消费消息。
2、消息被消费后,Queue将不再存储消息,其它所有Consumer不可能消费到已经被其它Consumer消费过的消息。
3、Queue支持存在多个Producer,但是对一条消息而言,只会有一个Consumer可以消费,其它Consumer则不能再次消费。
4、但Consumer不存在时,消息则由Queue一直保存,直到有Consumer把它消费。

Publish/Subscribe(Topic)
简称发布/订阅模式。例如我微博有30万粉丝,我今天更新了一条微博,那么这30万粉丝都可以接收到我的微博更新,大家都可以消费我的消息。
注:以下图示中的Pushlisher是错误的名词,正确的为Publisher

名词解释:

Publisher=发布者
Topic=主题
Subscriber=订阅者

Publish/Subscribe模式工作原理:

1、消息发布者Publisher将消息发布到主题Topic中,同时有多个消息消费者
2、Subscriber消费该消息。和PTP方式不同,发布到Topic的消息会被所有订阅者消费。
3、当发布者发布消息,不管是否有订阅者,都不会报错信息。 4、一定要先有消息发布者,后有消息订阅者。

注意:Kafka所采用的就是发布/订阅模式,被称为一种高吞吐量、持久性、分布式的发布订阅的消息队列系统。

1.4常用消息系统对比

1、RabbitMQ Erlang编写,支持多协议 AMQP,XMPP,SMTP,STOMP。支持负载均衡、数据持久化。同时
支持Peer-to-Peer和发布/订阅模式
2、Redis 基于Key-Value对的NoSQL数据库,同时支持MQ功能,可做轻量级队列服务使用。就入队操作而言,
Redis对短消息(小于10KB)的性能比RabbitMQ好,长消息的性能比RabbitMQ差。
3、ZeroMQ轻量级,不需要单独的消息服务器或中间件,应用程序本身扮演该角色,Peer-to-Peer。它实质上是一个库,需要开发人员自己组合多种技术,使用复杂度高
4、ActiveMQ JMS实现,Peer-to-Peer,支持持久化、XA事务
5、Kafka/Jafka 高性能跨语言的分布式发布/订阅消息系统,数据持久化,全分布式,同时支持在线和离线处理
6、MetaQ/RocketMQ 纯Java实现,发布/订阅消息系统,支持本地事务和XA分布式事务

1.5Kafka六大特点

1、高吞吐量、低延迟:可以满足每秒百万级别消息的生产和消费。它的延迟最低只有几毫秒,topic可以分多个partition, consumer group 对partition进行consumer操作
2、持久性、可靠性:有一套完善的消息存储机制,确保数据高效安全且持久化。消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
3、分布式:基于分布式的扩展;Kafka的数据都会复制到几台服务器上,当某台故障失效时,生产者和消费者转而使用其它的Kafka。
4、可扩展性:kafka集群支持热扩展
5、容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
6、高并发:支持数千个客户端同时读写

1.6Kafka的几个概念Kafka的几个概念

1、Kafka作为一个集群运行在一个或多个服务器上,这些服务器可以跨多个机房,所以说kafka是分布式的发布订阅消息队列系统。
2、Kafka集群将记录流存储在称为Topic的类别中。
3、每条记录由键值;"key value"和一个时间戳组成。

1.7Kafka核心组件

1、Producer:消息生产者,产生的消息将会被发送到某个topic
2、Consumer:消息消费者,消费的消息内容来自某个topic
3、Topic:消息根据topic进行归类,topic其本质是一个目录,即将同一主题消息归类到同一个目录
4、Broker:每一个kafka实例(或者说每台kafka服务器节点)就是一个broker,一个broker可以有多个topic

Zookeeper:zookeeper集群不属于kafka内的组件,但kafka依赖zookeeper集群保存meta信息,所以在此做声明其重要性。

zookeeper集群搭建地址:https://blog.csdn.net/weixin_45894220/article/details/127866337

结构图如下

1、Producer:消息和数据的生产者,主要负责生产Push消息到指定Broker的Topic中。
2、Broker:Kafka节点就是被称为Broker,Broker主要负责创建Topic,存储Producer所发布的消息,记录消息处理的过程,现是将消息保存到内存中,然后持久化到磁盘。
3、Topic:同一个Topic的消息可以分布在一个或多个Broker上,一个Topic包含一个或者多个Partition分区,数据被存储在多个Partition中。
4、replication-factor:复制因子;这个名词在上图中从未出现,在我们下一章节创建Topic时会指定该选项,意思为创建当前的Topic是否需要副本,如果在创建Topic时将此值设置为1的话,代表整个Topic在Kafka中只有一份,该复制因子数量建议与Broker节点数量一致。
5、Partition:分区;在这里被称为Topic物理上的分组,一个Topic在Broker中被分为1个或者多个Partition,也可以说为每个Topic包含一个或多个Partition,(一般为kafka节.点数CPU的总核心数量)分区在创建Topic的时候可以指定。分区才是真正存储数据的单元。

6、Consumer:消息和数据的消费者,主要负责主动到已订阅的Topic中拉取消息并消费,为什么Consumer不能像Producer一样的由Broker去push数据呢?因为Broker不知道Consumer能够消费多少,如果push消息数据量过多,会造成消息阻塞,而由Consumer去主动pull数据的话,Consumer可以根据自己的处理情况去pull消息数据,消费完多少消息再次去取。这样就不会造成Consumer本身已经拿到的数据成为阻塞状态。
7、ZooKeeper:ZooKeeper负责维护整个Kafka集群的状态,存储Kafka各个节点的信息及状态,实现Kafka集群的高可用,协调Kafka的工作内容。
Broker和Consumer有使用到ZooKeeper,而Producer并没有使用到ZooKeeper。

因为Kafka从0.8版本开始,Producer并不需要根据ZooKeeper来获取集群状态,而是在配置中指定多个Broker节点进行发送消息,同时跟指定的Broker建立连接,来从该Broker中获取集群的状态信息,所以Producer可以知道集群中有多少个Broker是否在存活状态,每个Broker上的Topic有多少个Partition。
Prodocuer会讲这些元信息存储到Producuer的内存中。
如果Producoer向集群中的一台Broker节点发送信息超时等故障,Producer会主动刷新该内存中的元信息,以获取当前Broker集群中的最新状态,转而把信息发送给当前可用的Broker,当然Prodocuer也可以在配置中指定周期性的去刷新Broker的元信息以更新到内存中。

注意:
我们可以看到上图,Broker和Consumer有使用到ZooKeeper,而Producer并没有使用到ZooKeeper,因为Kafka从0.8版本开始,Producer并不需要根据ZooKeeper来获取集群状态,而是在配置中指定多个Broker节点进行发送消息,同时跟指定的Broker建立连接,来从该Broker中获取集群的状态信息,这是Producer可以知道集群中有多少个Broker是否在存活状态,每个Broker上的Topic有多少个Partition,Prodocuer会讲这些元信息存储到Producuer的内存中。如果Producoer像集群中的一台Broker节点发送信息超时等故障,Producer会主动刷新该内存中的元信息,以获取当前Broker集群中的最新状态,转而把信息发送给当前可用的Broker,当然Prodocuer也可以在配置中指定周期性的去刷新Broker的元信息以更新到内存中。
注意:只有Broker和ZooKeeper才是服务,而Producer和Consumer只是Kafka的SDK罢了

1.8Kafka数据处理步骤

1、Producer产生消息,发送到Broker中
2、Leader状态的Broker接收消息,写入到相应topic中
3、Leader状态的Broker接收完毕以后,传给Follow状态的Broker作为副本备份
4、Consumer消费Broker中的消息

1.9Kafka名词解释和工作方式

Producer:消息生产者,就是向kafka broker发消息的客户端。 Consumer:消息消费者,向kafka
broker取消息的客户端 Topic:可以理解为一个队列。
Consumer Group(CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。
Broker:一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。

1.10Consumer与topic关系

kafka只支持Topic

1、每个group中可以有多个consumer,每个consumer属于一个consumer
group;通常情况下,一个group中会包含多个consumer,这样不仅可以提高topic中消息的并发消费能力,而且还能提高"故障容错"性,如果group中的某个consumer失效那么其消费的partitions将会有其他consumer自动接管。
2、对于Topic中的一条特定的消息,只会被订阅此Topic的每个group中的其中一个consumer消费,此消息不会发送给一个group的多个consumer;那么一个group中所有的consumer将会交错的消费整个Topic,每个group中consumer消息消费互相独立,我们可以认为一个group是一个"订阅"者。
3、在kafka中,一个partition中的消息只会被group中的一个consumer消费(同一时刻);
一个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer可以同时消费多个partitions中的消息。
4、kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息。

kafka只能保证一个partition中的消息被某个consumer消费时是顺序的;事实上,从Topic角度来说,当有多个partitions时,消息仍不是全局有序的。

1.11Kafka消息的分发

1、Producer客户端负责消息的分发
2、kafka集群中的任何一个broker都可以向producer提供metadata信息,这些metadata中包含"集群中存活的servers列表"、“partitions
leader列表"等信息;
3、当producer获取到metadata信息之后, producer将会和Topic下所有partition leader保持socket连接;
4、消息由producer直接通过socket发送到broker,中间不会经过任何"路由层”。
事实上,消息被路由到哪个partition上由producer客户端决定,比如可以采用"random"“key-hash”"轮询"等。

如果一个topic中有多个partitions,那么在producer端实现"消息均衡分发"是必要的。

1、在producer端的配置文件中,开发者可以指定partition路由的方式。
2、Producer消息发送的应答机制设置发送数据是否需要服务端的反馈,有三个值0,1,-1
0: producer不会等待broker发送ack
1: 当leader接收到消息之后发送ack
-1: 当所有的follower都同步消息成功后发送ack request.required.acks=0

1.12Consumer的负载均衡

当一个group中,有consumer加入或者离开时,会触发partitions均衡.均衡的最终目的,是提升topic的并发消费能力,步骤如下:

1、假如topic1,具有如下partitions: P0,P1,P2,P3
2、加入group A 中,有如下consumer:C0,C1
3、首先根据partition索引号对partitions排序: P0,P1,P2,P3
4、根据consumer.id排序: C0,C1
5、计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)
6、然后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]

Kafka原理参考地址:
https://zhuanlan.zhihu.com/p/163836793
https://cdn.modb.pro/db/105106
https://www.jianshu.com/p/47487f35b964

二、kafka集群搭建超详细教程

2.1、准备三个虚拟机:


1、虚拟机上配置有ssh服务,可以进行免密登录
2、Kafka运行在JVM上,需要安装JDK
3、kafka依赖zookeeper,需要安装zookeeper,
具体可参考:
https://blog.csdn.net/weixin_45894220/article/details/127866337

2.2、下载安装包

[root@hadoop1 ~]# cd /opt/module

#下载kafka安装包

[root@hadoop1 module]# wget https://archive.apache.org/dist/kafka/2.6.0/kafka_2.13-2.6.0.tgz

2.3、解压

[root@hadoop1 module]# tar -zxvf kafka_2.13-2.6.0.tgz
[root@hadoop1 module]# mv kafka_2.13-2.6.0 kafka

2.4、创建存放kafka消息的目录

[root@hadoop1 module]# cd kafka
[root@hadoop1 kafka]# mkdir kafka-logs

2.5、修改配置文件

进入配置文件目录

[root@hadoop1 config]# cd config/

备份

[root@hadoop1 config]# cp server.properties server.properties.bak

修改配置文件

[root@hadoop1 config]# vim server.properties
# 修改如下参数
broker.id=0
listeners=PLAINTEXT://hadoop1:9092
log.dirs=/opt/module/kafka/kafka-logs
zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181

参数说明:

broker.id : 集群内全局唯一标识,每个节点上需要设置不同的值
listeners:这个IP地址也是与本机相关的,每个节点上设置为自己的IP地址
log.dirs :存放kafka消息的
zookeeper.connect : 配置的是zookeeper集群地址

2.6、分发kafka安装目录

#分发kafka安装目录给其他集群节点

[root@hadoop1 config]# scp -r /opt/module/kafka/ hadoop2:/opt/module
[root@hadoop1 config]# scp -r /opt/module/kafka/ hadoop3:/opt/module

分发完成后,其他集群节点都需要修改配置文件server.properties中的broker.id和listeners参数

hadoop2 修改
broker.id=2
listeners=PLAINTEXT://hadoop2:9092hadoop3修改
broker.id=3
listeners=PLAINTEXT://hadoop3:9092

2.7、编写kafka集群操作脚本

[root@hadoop1 config]# cd /opt/module/kafka/bin

#创建kafka启动脚本

vim kafka-cluster.sh
# 添加如下内容
#!/bin/bash
case $1 in
"start"){for i in hadoop1 hadoop2 hadoop3do echo -------------------------------- $i kafka 启动 ---------------------------ssh $i "source /etc/profile;/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"done
}
;;
"stop"){for i in hadoop1 hadoop2 hadoop3doecho -------------------------------- $i kafka 停止 ---------------------------ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh"done
}
;;
esac

保存退出后,修改执行权限

chmod +x ./kafka-cluster.s

脚本命令说明:
启动kafka集群命令

./kafka-cluster.sh start

停止kafka集群命令

./kafka-cluster.sh stop

7.8、启动kafka集群

首先启动zookeeper集群
然后执行kafka集群脚本启动命令

[root@hadoop1 bin]# ./kafka-cluster.sh start
-------------------------------- hadoop1 kafka 启动 ---------------------------
-------------------------------- hadoop2 kafka 启动 ---------------------------
-------------------------------- hadoop3 kafka 启动 ---------------------------

查看进程是否存在

[root@hadoop1 bin]# netstat -tunlp

7.9、测试验证

kafka集群启动成功后,我们就可以对kafka集群进行操作了
创建主题

[root@hadoop1 kafka]# cd /opt/module/kafka
[root@hadoop1 kafka]# ./bin/kafka-topics.sh --create --bootstrap-server hadoop1:9092 --replication-factor 3 --partitions 1 --topic test输出:Created topic test.

查看主题列表

[root@hadoop1 kafka]# ./bin/kafka-topics.sh --list --bootstrap-server hadoop1:9092输出:test

启动控制台生产者

[root@hadoop1 kafka]# ./bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic test

启动控制台消费者

[root@hadoop1 kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --topic test --from-beginning

在生产者控制台输入hello kafka,消费者控制台,就可以消费到生产者的消息,输出 hello kafka,表示消费端成功消费了生产者生产的消息!
至此,我们就顺利完成了kafka集群搭建的整个过程!

kafka详解及集群环境搭建相关推荐

  1. 2W 字详解 Redis 集群环境搭建实践

    点击上方 "终端研发部"关注, 星标或置顶一起成长 本文是Redis集群学习的实践总结(基于Redis 6.0+),详细介绍逐步搭建Redis集群环境的过程,并完成集群伸缩的实践. ...

  2. 【Hadoop生态圈】1.Hadoop入门教程及集群环境搭建

    文章目录 1.简介 2.环境准备 3.安装hadoop 3.修改Hadoop配置文件 3.1.hadoop-env.sh配置 3.2.core-site.xml配置 3.3.hdfs-site.xml ...

  3. 学习笔记之-Kubernetes(K8S)介绍,集群环境搭建,Pod详解,Pod控制器详解,Service详解,数据存储,安全认证,DashBoard

    笔记来源于观看黑马程序员Kubernetes(K8S)教程 第一章 kubernetes介绍 应用部署方式演变 在部署应用程序的方式上,主要经历了三个时代: 传统部署:互联网早期,会直接将应用程序部署 ...

  4. 2W 字详解 Redis 6.0 集群环境搭建实践

    原文链接:https://www.cnblogs.com/hueyxu/p/13884800.html 本文是Redis集群学习的实践总结(基于Redis 6.0+),详细介绍逐步搭建Redis集群环 ...

  5. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(九)安装kafka_2.11-1.1.0

    如何搭建配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.& ...

  6. 高吞吐消息中间件Kafka集群环境搭建(3台kafka,3台zookeeper)

    高吞吐消息中间件Kafka集群环境搭建(3台kafka,3台zookeeper) 一.集群搭建要求 1.搭建设计 2.分配六台Linux,用于安装拥有三个节点的Kafka集群和三个节点的Zookeep ...

  7. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十一)NIFI1.7.1安装

    一.nifi基本配置 1. 修改各节点主机名,修改/etc/hosts文件内容. 192.168.0.120master192.168.0.121slave1192.168.0.122 slave2 ...

  8. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十二)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网。...

    Centos7出现异常:Failed to start LSB: Bring up/down networking. 按照<Kafka:ZK+Kafka+Spark Streaming集群环境搭 ...

  9. 大数据 -- Hadoop集群环境搭建

    首先我们来认识一下HDFS, HDFS(Hadoop Distributed File System )Hadoop分布式文件系统.它其实是将一个大文件分成若干块保存在不同服务器的多个节点中.通过联网 ...

最新文章

  1. 中文 Markdown 编写格式规范的命令行工具 lint-md
  2. python是全栈_Python全栈之路-3-字符串
  3. 在c语言中除法运算符,c – 不需要的除法运算符行为,我该怎么办?
  4. vim编辑模式_sublime vim模式和快捷键
  5. Linux入门学习(七)
  6. App客户端性能测试点总结
  7. java输出希腊字符表,希腊字母unicode表
  8. 罗技F310与F710游戏手柄驱动(附C++源码)
  9. 正则表达式 /^[0-9]+$/
  10. 监控不同外挂盘的硬盘io、查看linux命令运行时间和记录、iostat命令查看硬盘io、查看硬盘io的几种方法、定位到硬盘io高的dm
  11. 一个dht网络的“磁力链接”搜索python代码
  12. 世界杯快结束了,VAR的故事才刚刚开始
  13. 计算机存储器的有关术语,关于计算机存储器,不正确的描述是()。
  14. Django-rest-framework简介
  15. 监听手机接收短信——模拟获取短信的验证码
  16. zookeeper和k8s_Kubernetes(k8s)运行ZooKeeper,一个分布式系统协调器
  17. ZCMU--5193: 韩信点兵(C语言)
  18. em在聊天中是什么意思_emmmm是什么意思 聊天中emmmm什么意思
  19. 那些代购的“大鹅”,到底有多少是真的?
  20. 127.0.0.1:8080可以访问但是用本机ip就无法访问

热门文章

  1. 电动车跷跷板制作心得
  2. 《线性代数:矩阵》:伴随矩阵
  3. 杜彪:天猫数据如何运营、变现? 利用阿里云聚石塔在双11的成功案例
  4. 对标海底捞?百胜中国市值有望超过海底捞吗?
  5. Vue中使用Echarts中的地图组件报错:TypeError: api.coord is not a function
  6. 联想lenovo ThinkPad笔记本电脑开机进入BIOS的方法
  7. 论物联网与大数据、云计算、工业物联网、区块链
  8. 某大型保险集团在线财险业务系统数据库存储架构由集中式向分布式转型实践
  9. Latent Dirichlet Allocation
  10. 哈理工电信MATLAB音乐合成包络谐波代码