Kafka入门(一) 概述、部署与API的简单使用

  • MQ
    • 传统架构的问题
    • MQ简介
    • MQ的优缺点
      • 优点
      • 缺点
    • 同步与异步
    • P2P模式
    • 订阅发布模式
  • Kafka概述
    • 简介
    • 特点
    • 名称解释
      • Broker
      • Producer
      • Consumer
      • Consumer Group
      • Topic
      • Partition
      • 分区副本
      • Segment
      • Offset
    • 对比
  • Kafka集群架构
  • 部署
    • 第一台机安装
    • 分发及修改其余节点配置
    • 启动与关闭
    • 封装脚本
      • 启动脚本
      • 关闭脚本
  • Kafka压力测试
    • 创建Topic
    • 生产测试
    • 消费测试
  • Topic管理
    • 创建
    • 列举
    • 查看
    • 删除
  • 生产者消费者测试
  • Java API
    • 生产者API
    • 消费者API
    • 测试

MQ

传统架构的问题

Web1.0时代的网站,经过Nginx反向代理给Tomcat服务器,后端直接与MySQL交互,如果读写的请求很多,MySQL无法支撑很大的并发量,会导致MySQL宕机。

由于大多数情况写入只进行一次(注册等操作),大部分时候是读,就可以使用Redis作为读取时的缓存(将部分常用数据预加载到内存),MySQL只进行少量写的操作,从而实现读写分离,且占流量大头的读被Redis分担,解决了高并发读取的问题。

但是并发写入的数据也很多时,使用Redis将所有数据都缓存在内存原理上非常正确,实际上成本过高,这就需要低成本的消息队列来解决。消息队列例如Kafka,使用的内存+硬盘(类似HBASE的思想),解决高并发写入的问题时,性价比高。

MQ简介

消息队列是一种异步的服务间通信方式,是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。

简言之:消息队列MQ用于实现两个系统之间或者两个模块之间传递消息数据时,实现数据缓存,是中间件。

既然是基于队列实现数据缓存,当然也就符合队列的特点:FIFO(先进先出)。

需要实现实时、高性能、高吞吐、高可靠的消息传递架构中都可以考虑使用MQ。

MQ是通信方式,MQTT是物联网通讯协议(TCP是通讯层协议。MQTT是基于客户端-服务器的消息发布/订阅传输协议,也就是改良TCP的应用层通讯协议),并不是一回事。。。但是有一些相似性,使用MQTT接收下位机传感器的数据并传送给Kafka,再最终保存到HDFS是正常操作。

MQ的优缺点

优点

实现了架构解耦。例如:A→B如果要改为A→C,需要停机改代码重新编译部署,耦合度高。如果有MQ,就是A→MQ,B和C都从MQ中拉取数据即可,显然耦合度降低。

保证了最终一致性。虽然并不能保证数据实时传输,但是可以确保最终数据都被接收到。

实现了异步传输,提高了传输性能。如果A发送用时10s,B、C接收都用1000s,合计2010s。如果使用MQ,B和C可以并行接收,只需要1010s。

缺点

由于增加了MQ,架构变复杂,运维难度提高。MQ作为信息流的唯一通道,必须保证MQ是可靠的,如果MQ故障,整个系统都瘫痪。

数据保证更加复杂,必须保证生产安全和消费安全。在传输过程中需要保证数据安全(不丢包、不重复)。没有MQ时只需要保证A→B和A→C分别安全即可(TCP底层就会校验是否有丢包);有了MQ后,就得保证A、B、C与MQ的连接都是安全的。

出现故障是必然的,不出现故障是偶然的。。。采用分布式系统实现高可用可以确保MQ的可靠性。最常用的Kafka自己有机制确保不丢包、不重复。

同步与异步

提交请求→后台处理→返回处理结果,类似这种提交完毕稍加等待可以直接看到最终结果的立即一致性的就是同步处理。例如:存钱、转账,不会看到数字渐变的过程。。。常用的TCP通信协议,每次都3次握手,传输完毕后4次挥手,传输时要确保成功一次才会继续下一次。安全性好但是性能差。这就是典型的同步传输。

提交请求→放入MQ(返回临时结果)→后台继续处理,类似这种会看到过程中临时结果的情况就是异步处理。用户暂时不需要关心最终结果,只要保证最终结果正确,实现最终一致性即可。常见的UDP通信协议,不管有木有收到都直接发送下一条,速度很快,但是容易丢包。这就是典型的异步传输。上位机当然不可能像下位机那样达到上百Hz的刷新率(Winform容易卡死),UI刷新20帧就够了,死循环一般用单独的进程,也是异步。

P2P模式

负责往消息队列中写数据的就是生产者,负责缓存传递数据的就是MQ,负责从MQ读取数据的就是消费者。这种模式下,生产者往MQ写数据,消费者从MQ读取数据。读取成功,也就是消费成功后,会返回一个确认信号,MQ会把消费成功的数据删除。这样就导致数据只能被一个消费者使用,无法实现数据的共享。

订阅发布模式

生产者、消费者、MQ都与P2P一致。多了主题(Topic),用于划分存储不同业务的数据。

生产者依旧是往MQ生产数据,将数据写入到对应的主题中。消费者可以订阅主题,如果主题中出现了新的数据,(可以是多个)消费者就可以立即消费(生产者与主题是一对一的关系,主题与消费者是一对多的关系)。

一个消费者可以订阅多个主题,一个主题可以被多个消费者订阅,消费成功以后,不会立即主动删除数据。例如:订阅公众号。。。MQTT当然也是订阅发布的模式。。。

Kafka概述

简介

Kafka官网这样介绍:
Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

翻译过来:
Apache Kafka是一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和任务关键型应用程序(分布式的基于订阅发布模式的高吞吐高性能的实时消息队列系统)。是用Scala开发的。异常简练:

val inputData = sc.textFile("HDFS文件地址")
val wcResult = inputData.flatMap(line => line.split("\\s+")).map(word => (word,1)).reduceByKey((tmp,item) => tmp+item)val wcResult = inputData.flatMap(_.split("\\s+")).map((_,1)).reduceByKey(_+_)wcResult.saveAsText(“HDFS路径”)

比起MapReduce的自带Word Count,简练太多了。。。

Kafka主要用作分布式消息队列(分布式存储)。作为实时消息队列存储是常使用的功能。

当然Scala开发的程序都适合做运算。Kafka的KafkaStream有分布式流式计算的功能,但是有Spakr Streaming和Flink这些更强的工具,基本用不到Scala的这个功能。

特点

Zookeeper:公平节点、数据量小、实时、不用做数据存储,协调服务组件。
HDFS:离线、文件系统、永久性存储、大数据量。
Redis:实时、NoSQL数据库、大数据量临时存储、小数据量永久性存储(分布式内存)。
Hbase:实时、NoSQL数据库、大数据量永久性存储(分布式堆内存+分布式硬盘HDFS)。
Kafka:实时、消息队列、大数据量的临时性存储(分布式内存Page Cache+自己管理的分布式硬盘)。

Kafka是一个高吞吐、高性能、高灵活性、安全性的分布式基于订阅发布模式的消息队列系统:

Kafka的高性能(也就是实时读写数据)是通过分布式内存的Page Cache实现的。∵Scala兼容Java,用堆内存时如果Scala进程卡死,就会被GC回收堆内存,∴不用堆内存。PageCache是操作系统的页缓存,Kafka进程卡死后重启(不是机器重启),数据依然在内存中(类似C++的内存泄露,但不是占用的堆内存)。

Kafka的高并发是通过分布式并行读写实现的。

Kafka的高吞吐是通过分布式硬盘存储实现的,没有使用HDFS,而是自己管理硬盘,进行顺序读写。

Kafka的高可靠是通过分布式主从架构实现的。

Kafka的高安全性通过将内存数据同步到硬盘、硬盘中备份副本(∵单机存储超过1份,宕机时全部失效,故副本总数不超过机器个数)实现。

名称解释

Broker

Kafka是分布式集群,这多台机器的每个节点就是一个Broker。

Producer

生产者,负责将数据写入Kafka,一般生产数据都是使用数据采集工具,例如Sqoop、Flume。

Consumer

消费者,负责从Kafka消费数据。

Consumer Group

Kafka中必须以消费者组的形式从Kafka中消费数据。每个消费者都属于某个消费者组,多个消费者就可以并行消费数据。∴一个消费者组中的多个消费者消费的数据是不同的,所有消费者的数据都是自己所属的消费者组数据的子集,+起来才是完整数据。

Topic

数据主题(功能上有点像元数据),用于对数据进行分类,区分不同数据。类似MySQL的,但是Kafka是分布式存储,更类似HBASE分布式表Region。一个Topic可以划分为多个分区Partition,每个分区存储在不同的Kafka节点。

Partition

数据分区,用于实现Topic的分布式存储。一个Topic可以划分为多个分区,类似HBASE表可以划分为多个Region。每个分区存储在不同的Kafka节点Broker上。

分区副本

Kafka选用了副本机制来保证数据的安全性。上文也提过每个Kafka分区都可以有不超过节点个数的副本数。

Kafka将分区的多个副本划分为Leader(对外提供读写)和Follower(与Leader同步数据)。如果Leader宕机,Kafka会模拟Zookeeper的公平节点架构,借助Zookeeper重新选举出新的Leader,重新对外提供读写。

Segment

对每个分区的数据进行了更细的划分,先写入的数据会先生成一对Segment文件,存储到一定条件以后,后面数据写入另外一对Segment文件,每个文件就叫Segment文件对(类似KV键值对,成对存在)。

每个Segment是一对(也就是2个)文件:
.index后缀的文件存储文件的索引;.log后缀的文件真正存储数据的日志。

这种设计是为了加快数据检索的效率,将数据按照规则写入不同文件,以后可以根据规则快速的定位数据所在的文件。Segment的名字是这个Segment记录的offset的最小值(类似链表/迭代器的指针,指向下一个元素)。

Offset

Kafka中所有数据的读取都是按照Offset来读取数据,Offset就是每条数据的偏移量(写入分区的顺序)。MQ是FIFO的,先写入的Offset小(第一条=0),后写入的Offset大。

基于offset来指定数据的顺序,消费时就可以按照offset顺序来读取(消费者消费Topic分区中的数据按照offset进行顺序消费)。

生产者往Kafka中写入数据,写入某个分区

每个分区单独管理一套Offset,Offset从0开始对每条数据进行编号。

Kafka写入数据也是按照KV来写入数据:

offset Key   Value

对比

对比项\工具 HDFS HBASE Redis Kafka
第一层(逻辑划分) 目录(文件夹) NameSpace DB0 Topic
第二层(逻辑划分) 文件 Table X X
存储分区 Block Region 分片 Partition
分区规则 每128M 范围 槽位 自己指定
分区安全 副本 WAL+HDFS副本 副本 副本
存储单元 X 列族 X Segment
存储位置 硬盘 内存memstore+硬盘StoreFile 内存 内存 + .log + .index
架构 NameNode + DataNode Hmaster + HregionServer Reids节点 Kafka Contorler + Kafka Broker
HA实现 Zookeeper Kafka Contorler + Kafka:Broker 自行配置 Kafka Contorler + Kafka:Broker

Kafka集群架构

Zookeeper:辅助选举、元数据存储。
Kafka:分布式主从架构,实现消息队列的构建。

Kafka是分布式主从架构:
主节点Kafka Controller是特殊的Broker(从Broker中选举出来的将才。。。还要负责普通Broker的工作),比普通的Broker多了管理节点的功能(管理从节点的Topic、分区、副本)。启动集群/Controller宕机,都会由Zookeeper辅助选举出Controller。

从节点Kafka Broker对外提供读写请求,还会监听Controller(Controller故障会重新选举)。

部署

官网下载地址

kafka_2.12-2.4.1显然有2个版本号。。。2.12是Scala版本号,2.4.1是Kafka版本号。

第一台机安装

上传安装包至node1:

cd /export/software/
rz

解压并创建日志目录并查看:

tar -zxvf kafka_2.12-2.4.1.tgz -C /export/server/
cd /export/server/kafka_2.12-2.4.1/
mkdir logs

切换目录:

[root@node1 kafka_2.12-2.4.1]# cd config/
[root@node1 config]# ll -ah
总用量 76K
drwxr-xr-x 2 root root 4.0K 3月   3 2020 .
drwxr-xr-x 7 root root  101 5月  30 18:03 ..
-rw-r--r-- 1 root root  906 3月   3 2020 connect-console-sink.properties
-rw-r--r-- 1 root root  909 3月   3 2020 connect-console-source.properties
-rw-r--r-- 1 root root 5.2K 3月   3 2020 connect-distributed.properties
-rw-r--r-- 1 root root  883 3月   3 2020 connect-file-sink.properties
-rw-r--r-- 1 root root  881 3月   3 2020 connect-file-source.properties
-rw-r--r-- 1 root root 2.2K 3月   3 2020 connect-log4j.properties
-rw-r--r-- 1 root root 1.6K 3月   3 2020 connect-mirror-maker.properties
-rw-r--r-- 1 root root 2.3K 3月   3 2020 connect-standalone.properties
-rw-r--r-- 1 root root 1.2K 3月   3 2020 consumer.properties
-rw-r--r-- 1 root root 4.6K 3月   3 2020 log4j.properties
-rw-r--r-- 1 root root 1.9K 3月   3 2020 producer.properties
-rw-r--r-- 1 root root 6.7K 3月   3 2020 server.properties
-rw-r--r-- 1 root root 1.1K 3月   3 2020 tools-log4j.properties
-rw-r--r-- 1 root root 1.2K 3月   3 2020 trogdor.conf
-rw-r--r-- 1 root root 1.2K 3月   3 2020 zookeeper.properties

consumer.properties就是消费者配置文件,producer.properties就是生产者配置文件,server.properties就是Kafka服务配置文件。先修改server.properties

vim server.properties

第21行的唯一id是int类型,用于唯一标识服务端id,node1不用修改。

第60行需要修改日志目录:

log.dirs=/export/server/kafka_2.12-2.4.1/logs

第123行需要修改Zookeeper地址:

zookeeper.connect=node1:2181,node2:2181,node3:2181

末尾添加2个配置,允许删除topic,并且指定当前KafkaServer的主机名:

delete.topic.enable=true
host.name=node1

记得保存。

分发及修改其余节点配置

node1执行:

cd /export/server/
scp -r kafka_2.12-2.4.1 node2:$PWD
scp -r kafka_2.12-2.4.1 node3:$PWD

PWD是大写!!!

node2与node3中分别修改:

vim /export/server/kafka_2.12-2.4.1/config/server.properties

21行的broker.id=1、broker.id=2,末尾的host.name=node2、host.name=node3并保存。。。

添加环境变量:

vim /etc/profile
#KAFKA_HOME
export KAFKA_HOME=/export/server/kafka_2.12-2.4.1
export PATH=:$PATH:$KAFKA_HOME/bin

保存后刷新:

source /etc/profile

为了后续方便使用,可以3台都配置。。。只配置一台也可以。

启动与关闭

由于Kafka依赖Zookeeper,故先启动HDFS、YARN、Zookeeper再启动Kafk。。。

由于已经配置了环境变量,可以直接启动(但是需要指定配置文件路径):

kafka-server-start.sh /export/server/kafka_2.12-2.4.1/config/server.properties

没有配置环境变量则需要使用绝对路径:

/export/server/zookeeper-3.4.6/bin/start-zk-all.sh /export/server/kafka_2.12-2.4.1/config/server.properties

启动后是前台运行的:

[2021-05-30 21:49:38,306] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

如果需要后台运行需要输出重定向(不要换行,这是一条命令):

kafka-server-start.sh config/server.properties >>/dev/null 2>&1 & >>/dev/null 2>&1 &

关闭:

kafka-server-stop.sh

当然ctrl+c也行。。。

封装脚本

这么麻烦肯定是不行的。。。还得一台一台启动。。。

启动脚本

vim /export/server/kafka_2.12-2.4.1/bin/start-kafka.sh
#!/bin/bash
KAFKA_HOME=/export/server/kafka_2.12-2.4.1for number in {1..3}
dohost=node${number}echo ${host}/usr/bin/ssh ${host} "cd ${KAFKA_HOME};source /etc/profile;export JMX_PORT=9988;${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties >>/dev/null 2>&1 &"echo "${host} started"
done

保存后添加可执行权限:

chmod u+x /export/server/kafka_2.12-2.4.1/bin/start-kafka.sh

关闭脚本

vim /export/server/kafka_2.12-2.4.1/bin/stop-kafka.sh
#!/bin/bash
KAFKA_HOME=/export/server/kafka_2.12-2.4.1for number in {1..3}
dohost=node${number}echo ${host}/usr/bin/ssh ${host} "cd ${KAFKA_HOME};source /etc/profile;${KAFKA_HOME}/bin/kafka-server-stop.sh"echo "${host} stoped"
done

保存后添加可执行权限:

chmod u+x /export/server/kafka_2.12-2.4.1/bin/stop-kafka.sh

封装完毕后可以使用start-kafka.sh启动:

[root@node1 ~]# start-kafka.sh
node1
node1 started
node2
node2 started
node3
node3 started

使用stop-kafka.sh关闭:

[root@node1 ~]# stop-kafka.sh
node1
node1 stoped
node2
node2 stoped
node3
node3 stoped

Kafka压力测试

当然要先启动Kafka才能进行。。。启动已有脚本了。。。

Kafka自带了压力测试脚本,正式使用前需要测试,防止超过Kafka的峰值导致集群宕机。由于Kafka需要写入和读取到硬盘,故瓶颈在硬盘的读写速度。笔者试试读写400M/S的SSD能支撑多大的并发。

创建Topic

cd /export/server/kafka_2.12-2.4.1/
bin/kafka-topics.sh --create --topic bigdata --partitions 2 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092

生产测试

cd /export/server/kafka_2.12-2.4.1/bin/
[root@node1 bin]# kafka-producer-perf-test.sh --topic cdbigdata --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=1
[2021-05-30 22:03:53,227] WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {cdbigdata=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
42177 records sent, 8433.7 records/sec (8.04 MB/sec), 2473.3 ms avg latency, 3454.0 ms max latency.
97920 records sent, 19580.1 records/sec (18.67 MB/sec), 1763.3 ms avg latency, 2353.0 ms max latency.
119984 records sent, 23996.8 records/sec (22.89 MB/sec), 1431.4 ms avg latency, 1929.0 ms max latency.
162592 records sent, 32518.4 records/sec (31.01 MB/sec), 1001.4 ms avg latency, 1202.0 ms max latency.
146032 records sent, 29206.4 records/sec (27.85 MB/sec), 1098.3 ms avg latency, 1327.0 ms max latency.
141072 records sent, 27984.9 records/sec (26.69 MB/sec), 1170.9 ms avg latency, 1314.0 ms max latency.
158768 records sent, 31753.6 records/sec (30.28 MB/sec), 1076.7 ms avg latency, 1253.0 ms max latency.
132528 records sent, 26505.6 records/sec (25.28 MB/sec), 1157.2 ms avg latency, 1470.0 ms max latency.
125072 records sent, 25014.4 records/sec (23.86 MB/sec), 1353.0 ms avg latency, 1574.0 ms max latency.
152160 records sent, 30425.9 records/sec (29.02 MB/sec), 1086.2 ms avg latency, 1191.0 ms max latency.
154320 records sent, 30864.0 records/sec (29.43 MB/sec), 1069.4 ms avg latency, 1174.0 ms max latency.
156672 records sent, 31334.4 records/sec (29.88 MB/sec), 1034.6 ms avg latency, 1208.0 ms max latency.
170416 records sent, 34076.4 records/sec (32.50 MB/sec), 971.6 ms avg latency, 1094.0 ms max latency.
162224 records sent, 32444.8 records/sec (30.94 MB/sec), 1017.6 ms avg latency, 1146.0 ms max latency.
174320 records sent, 34864.0 records/sec (33.25 MB/sec), 955.7 ms avg latency, 1135.0 ms max latency.
167056 records sent, 33411.2 records/sec (31.86 MB/sec), 967.5 ms avg latency, 1211.0 ms max latency.
171808 records sent, 34354.7 records/sec (32.76 MB/sec), 957.6 ms avg latency, 1051.0 ms max latency.
217968 records sent, 43593.6 records/sec (41.57 MB/sec), 751.9 ms avg latency, 950.0 ms max latency.
219248 records sent, 43849.6 records/sec (41.82 MB/sec), 744.9 ms avg latency, 1009.0 ms max latency.
297568 records sent, 59513.6 records/sec (56.76 MB/sec), 569.1 ms avg latency, 891.0 ms max latency.
285152 records sent, 57030.4 records/sec (54.39 MB/sec), 571.0 ms avg latency, 688.0 ms max latency.
290480 records sent, 58096.0 records/sec (55.40 MB/sec), 565.1 ms avg latency, 686.0 ms max latency.
332736 records sent, 66547.2 records/sec (63.46 MB/sec), 498.8 ms avg latency, 733.0 ms max latency.
340080 records sent, 68016.0 records/sec (64.87 MB/sec), 481.8 ms avg latency, 609.0 ms max latency.
335792 records sent, 67131.5 records/sec (64.02 MB/sec), 484.6 ms avg latency, 528.0 ms max latency.
5000000 records sent, 38812.643607 records/sec (37.01 MB/sec), 837.03 ms avg latency, 3454.00 ms max latency, 835 ms 50th, 1408 ms 95th, 1953 ms 99th, 3332 ms 99.9th.

500w条数据,每条1K,虚拟机集群效果差强人意。。。

消费测试

需要Topic先有数据,否则会报错:

WARNING: Exiting before consuming the expected number of messages: timeout (10000 ms) exceeded. You can use the --timeout option to increase the timeout.
2021-05-30 22:13:31:244, 2021-05-30 22:13:41:356, 0.0000, 0.0000, 0, 0.0000, 1622384012202, -1622384002090, -0.0000, -0.0000

使用之前的那个Topic显然很合适:

[root@node1 bin]# kafka-consumer-perf-test.sh --topic cdbigdata --broker-list node1:9092,node2:9092,node3:9092  --fetch-size 1048576 --messages 5000000
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2021-05-30 22:24:39:208, 2021-05-30 22:25:10:412, 4768.3716, 152.8128, 5000000, 160235.8672, 1622384679574, -1622384648370, -0.0000, -0.0031

500w条,读取速度为150M/S,用时160s,差不多也是3w条/s。。。

Topic管理

由于配置了环境变量,直接:

kafka-topics.sh

就会显示各种帮助。。。

创建

kafka-topics.sh --create --topic bigdata01 --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092

其中:

--create:创建
--topic :指定操作的Topic的名称
--partitions:指定分区个数,默认为1
--replication-factor:副本因子,默认为1
--bootstrap-server:指定Kafka服务端地址

列举

kafka-topics.sh --list -bootstrap-server node1:9092,node2:9092,node3:9092

可以看到:

[root@node1 bin]# kafka-topics.sh --list -bootstrap-server node1:9092,node2:9092,node3:9092
__consumer_offsets
bigdata
bigdata01
cdbigdata

查看

拿之前创建的Topic开刀:

kafka-topics.sh --describe --topic bigdata01  --bootstrap-server node1:9092,node2:9092,node3:9092

运行后:

[root@node1 bin]# kafka-topics.sh --describe --topic bigdata01  --bootstrap-server node1:9092,node2:9092,node3:9092
Topic: bigdata01        PartitionCount: 3       ReplicationFactor: 2    Configs: segment.bytes=1073741824Topic: bigdata01        Partition: 0    Leader: 1       Replicas: 1,2   Isr: 1,2Topic: bigdata01        Partition: 1    Leader: 0       Replicas: 0,1   Isr: 0,1Topic: bigdata01        Partition: 2    Leader: 2       Replicas: 2,0   Isr: 2,0

其中:

Partition:分区编号
Replicas:分区副本所在的Kafka Broker ID,每个分区的副本有两种角色(leader副本,follower副本)
Leader:leader 副本所在的Kafka节点
Isr:In-Sync-Replicas:正在同步的副本,可用的副本(用于leader故障时,选举新的leader)

删除

kafka-topics.sh --delete --topic bigdata02  --bootstrap-server node1:9092,node2:9092,node3:9092

生产者消费者测试

构建生产者、消费者对象后,就可以指定Topic和Kafka集群地址,在命令行中生产者发送消息,消费者就可以接收到消息:
node1中:

kafka-console-producer.sh --topic bigdata01 --broker-list node1:9092,node2:9092,node3:9092

node1再新建个会话:

kafka-console-consumer.sh --topic bigdata01 --bootstrap-server node1:9092,node2:9092,node3:9092  --from-beginning

生产者发送:

[root@node1 bin]# kafka-console-producer.sh --topic bigdata01 --broker-list node1:9092,node2:9092,node3:9092
>hehe1
>haha
>oo
>

消费者接收:

[root@node1 ~]# kafka-console-consumer.sh --topic bigdata01 --bootstrap-server node1:9092,node2:9092,node3:9092  --from-beginning   hehe1
haha
oo

其中:

--from-beginning:从每个分区的最初开始消费,默认从最新的offset进行消费

使用该参数可以使用之前的生产者数据。。。但是数据顺序不对(从第一个、第二个、第三个分区读取数据来消费。。。分区内数据是有序的)。。。

ctrl+c强制结束消费者的进程后,再次启动消费者,内容与之前一致。

如果没有这个参数,就是直接从最新的偏移量读取,会丢失之前的数据。可以取消这个参数试试:

[root@node1 bin]# kafka-console-producer.sh --topic bigdata01 --broker-list node1:9092,node2:9092,node3:9092
>hehe1
>haha
>oo
>ss
>
[root@node1 ~]# kafka-console-consumer.sh --topic bigdata01 --bootstrap-server node1:9092,node2:9092,node3:9092
ss

Java API

生产者API


Diamond types are not supported at language level '5'这种问题很常见。。。

art+enter选设置leve to 7 或者:

左上角File→Project Structure→Modules→Language level选8号即可。

package com.aa;import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;public class KafkaClientProducerTest {public static void main(String[] args) {//构建配置管理对象的实例Properties properties = new Properties();//指定Kafka集群的地址properties.put("bootstrap.servers","node1:9092,node2:9092,node3:9092");/*acks的参数表示生产者生产数据到Kafka的方式0:生产者将数据提交给Kafka,不管Kafka有没有写入成功,都直接发送下一条。快、数据丢失的概率比较高1:生产者将数据提交给Kafka,Kafka将数据写入对应分区的leader分区后,返回一个ack给生产者,生产者发送下一条。有一定的数据丢失风险,如果follower没来得及跟leader同步,leader故障,数据会丢失。如果生产者长时间没有收到ack,就认为数据写入失败,重试写入数据,直到收到Kafka的ack,再发送下一条。如果ack丢失,就会导致数据重复的问题。all或-1:生产者将数据提交给Kafka,Kafka将数据写入对应分区的leader分区后,等待所有Follower同步成功,返回一个ack给生产者,生产者发送下一条。安全但是慢。*/properties.put("acks","all");//指定KV序列化类型properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//构建生产者的实例对象,用于生产数据到Kafka,需要加载生产者配置Producer<String, String> producer = new KafkaProducer<>(properties);//生产数据到kafka:send//生产的数据对象:ProducerRecord类型for (int i = 0; i < 10; i++){//方式一:指定topic,key,value             --根据Key的编码取余分区个数
//            producer.send(new ProducerRecord<String, String>("bigdata01", i+"", "fafa"+i));//方式二:指定topic,value                 --全部写入某一个分区
//            producer.send(new ProducerRecord<String,String>("bigdata01","hehe"+i));//方式三:指定topic,partition,key,value  -- 指定写入分区producer.send(new ProducerRecord<String,String>("bigdata01",0,i+"","haha"+i));}//关闭生产者producer.close();}
}

消费者API

package com.aa;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class KafkaClientConsumerTest {public static void main(String[] args) {//构建配置管理对象的实例Properties properties = new Properties();//指定Kafka集群的地址properties.put("bootstrap.servers","node1:9092,node2:9092,node3:9092");//指定消费者属于哪个组properties.setProperty("group.id", "test01");//自动提交properties.setProperty("enable.auto.commit", "true");//自动提交的时间间隔properties.setProperty("auto.commit.interval.ms", "1000");//指定KV的反序列化的类properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//构建消费者对象,加载消费配置KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);//订阅Topicconsumer.subscribe(Arrays.asList("bigdata01"));//从Kafka中拉取Topic的数据while (true) {//实时不断从Kafka中拉取数据ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));//取出本次拉取的每一条数据for (ConsumerRecord<String, String> record : records){//获取topicString topic = record.topic();//获取分区int part = record.partition();//获取offsetlong offset = record.offset();//获取KVString key = record.key();String value = record.value();//处理System.out.println(topic+"\t"+part+"\t"+offset+"\t"+key+"\t"+value);}}}
}

测试

遇到了报错:
Error:java: Compilation failed: internal java compiler error。。。

pom.xml需要配置:

<repositories><repository><id>aliyun</id><url>http://maven.aliyun.com/nexus/content/groups/public/</url></repository></repositories><dependencies><!-- Kafka的依赖 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.4.1</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>

锁定版本JDK1.8后终于能运行了:

Kafka入门(一) 概述、部署与API的简单使用相关推荐

  1. kafka——2.11 单机部署 与topic基础简单应用

    一.要使用kafka首先要配置启动好zookeeper 1.在zookeeper的conf目录里 执行: cp zoo_sample.cfg   zoo.cfg 2.打开zoo.cfg的文件 命  令 ...

  2. Kafka 入门之集群部署遇到问题

    最近,因为上级主管部门需要通过使用Kafka向其传输文件,又因为此前没有接触过kafka,所以在部署测试kafka程序期间遇到很多问题,在这里总结4个问题与1个建议,方便入门者参考也便于遇到类似问题进 ...

  3. SSM_Mybatis_Day01(快速入门、映射文件概述、核心配置文件概述、相应API、代理开发方式、映射文件深入、数据类型的映射、列名和属性名不一致的时候的处理)

    SSM_Mybatis_Day01(快速入门.映射文件概述.核心配置文件概述.相应API.代理开发方式.映射文件深入.数据类型的映射.列名和属性名不一致的时候的处理) 1. Mybatis mybat ...

  4. Kafka入门教程与详解

    1 Kafka入门教程 1.1 消息队列(Message Queue) Message Queue消息传送系统提供传送服务.消息传送依赖于大量支持组件,这些组件负责处理连接服务.消息的路由和传送.持久 ...

  5. Kafka 入门和 Spring Boot 集成

    2019独角兽企业重金招聘Python工程师标准>>> Kafka 入门和 Spring Boot 集成 概述 kafka 是一个高性能的消息队列,也是一个分布式流处理平台(这里的流 ...

  6. Kafka入门篇学习笔记整理

    Kafka入门篇学习笔记整理 Kafka是什么 Kafka的特性 应用场景 Kafka的安装 单机版部署 集群部署环境准备 Kafka 2.x集群部署 Kafka 3.x集群部署 监听器和内外网络 K ...

  7. Kafka入门教程(一)

    转自:https://blog.csdn.net/yuan_xw/article/details/51210954 1 Kafka入门教程 1.1 消息队列(Message Queue) Messag ...

  8. Kafka教程(一)Kafka入门教程

    Kafka教程(一)Kafka入门教程 1 Kafka入门教程 1.1 消息队列(Message Queue) Message Queue消息传送系统提供传送服务.消息传送依赖于大量支持组件,这些组件 ...

  9. Kafka 入门 (一)

    Kafka 入门(一) Apache Kafka起源于LinkedIn,后来于2011年成为开源Apache项目,然后于2012年成为First-class Apache项目.Kafka是用Scala ...

最新文章

  1. php for循环in的用法,JavaScript中for in循环是如何使用的?需要注意些什么?
  2. 高德地图JavaScript API开发研究
  3. sentinel 不显示项目_Sentinel+Nacos实现资源流控、降级、热点、授权
  4. 【C++】递归打印杨辉三角
  5. python 计时器_Python上下文管理器的魔力
  6. ListView提示和技巧
  7. RayMarching3:组合与变幻
  8. lzg_ad:原创XPE开发视频教材
  9. VS2012+SQLServer2008 R2 开发工具
  10. xshell5 Xshell6 商业版的破解版
  11. wxparse的使用php返回数组输出,小程序应用实践:wxParse多数据循环使用方法
  12. php 回显,PHP实时回显 实时输出结果的方法 实时反馈结果到浏览器
  13. Word文档中去除EndNote格式
  14. 【数据库基础】什么是A、C、 I 、D?
  15. 荣耀MagicBook 2019 Intel版发布:性能新升级 续航长达15小时!
  16. 太湖之光超级计算机应用最高奖,世界最快超级计算机“神威·太湖之光”获得100多项应用成果...
  17. Hololens开发笔记
  18. 清华镜像源地址(国内下载python包必备地址)
  19. SQL 多个字段的值拼接成Josn
  20. 数据结构 第七章 图(图的概念和存储)

热门文章

  1. 计算机专业英语第13章,第13章计算机专业英语.doc
  2. 计算机网络实验:Lab4 使用ns2进行网络模拟
  3. 网络工程师生涯中必玩的六款网络模拟器,有没有你没玩过的?
  4. 最新:深入理解Java虚拟机:JVM高级特性与最佳实践(第3版)周志明
  5. 【Linux】 - linux文本编辑器vim的常用操作
  6. 神犇营my0001:春晓
  7. tomcat默认编码问题
  8. Python GUI项目:古诗词鉴赏系统
  9. 信息系统项目管理师考试大纲
  10. C语言 实现离散数学合式公式真值表